-options to play with
[oweals/gnunet.git] / src / fs / gnunet-service-fs_stream.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_stream.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_constants.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet-service-fs.h"
33 #include "gnunet-service-fs_indexing.h"
34 #include "gnunet-service-fs_stream.h"
35
36 /**
37  * After how long do we termiante idle connections?
38  */
39 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
40
41
42 /**
43  * Information we keep around for each active streaming client.
44  */
45 struct StreamClient
46 {
47   /**
48    * DLL
49    */ 
50   struct StreamClient *next;
51
52   /**
53    * DLL
54    */ 
55   struct StreamClient *prev;
56
57   /**
58    * Socket for communication.
59    */ 
60   struct GNUNET_STREAM_Socket *socket;
61
62   /**
63    * Handle for active read operation, or NULL.
64    */ 
65   struct GNUNET_STREAM_IOReadHandle *rh;
66
67   /**
68    * Handle for active write operation, or NULL.
69    */ 
70   struct GNUNET_STREAM_IOWriteHandle *wh;
71   
72   /**
73    * Tokenizer for requests.
74    */
75   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
76   
77   /**
78    * Current active request to the datastore, if we have one pending.
79    */
80   struct GNUNET_DATASTORE_QueueEntry *qe;
81
82   /**
83    * Task that is scheduled to asynchronously terminate the connection.
84    */
85   GNUNET_SCHEDULER_TaskIdentifier terminate_task;
86
87   /**
88    * Task that is scheduled to terminate idle connections.
89    */
90   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
91
92   /**
93    * Size of the last write that was initiated.
94    */ 
95   size_t reply_size;
96
97 };
98
99
100 /**
101  * Query from one peer, asking the other for CHK-data.
102  */
103 struct StreamQueryMessage
104 {
105
106   /**
107    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
108    */
109   struct GNUNET_MessageHeader header;
110
111   /**
112    * Block type must be DBLOCK or IBLOCK.
113    */
114   uint32_t type;
115
116   /**
117    * Query hash from CHK (hash of encrypted block).
118    */
119   struct GNUNET_HashCode query;
120
121 };
122
123
124 /**
125  * Reply to a StreamQueryMessage.
126  */
127 struct StreamReplyMessage
128 {
129
130   /**
131    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
132    */
133   struct GNUNET_MessageHeader header;
134
135   /**
136    * Block type must be DBLOCK or IBLOCK.
137    */
138   uint32_t type;
139
140   /**
141    * Expiration time for the block.
142    */
143   struct GNUNET_TIME_AbsoluteNBO expiration;
144
145   /* followed by the encrypted block */
146
147 };
148
149
150 /** 
151  * Handle for a stream to another peer.
152  */
153 struct StreamHandle;
154
155
156 /**
157  * Handle for a request that is going out via stream API.
158  */
159 struct GSF_StreamRequest
160 {
161
162   /**
163    * DLL.
164    */
165   struct GSF_StreamRequest *next;
166
167   /**
168    * DLL.
169    */
170   struct GSF_StreamRequest *prev;
171
172   /**
173    * Which stream is this request associated with?
174    */
175   struct StreamHandle *sh;
176
177   /**
178    * Function to call with the result.
179    */
180   GSF_StreamReplyProcessor proc;
181
182   /**
183    * Closure for 'proc'
184    */
185   void *proc_cls;
186
187   /**
188    * Query to transmit to the other peer.
189    */
190   struct GNUNET_HashCode query;
191
192   /**
193    * Desired type for the reply.
194    */
195   enum GNUNET_BLOCK_Type type;
196
197   /**
198    * Did we transmit this request already? YES if we are
199    * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
200    */
201   int was_transmitted;
202 };
203
204
205 /** 
206  * Handle for a stream to another peer.
207  */
208 struct StreamHandle
209 {
210   /**
211    * Head of DLL of pending requests on this stream.
212    */
213   struct GSF_StreamRequest *pending_head;
214
215   /**
216    * Tail of DLL of pending requests on this stream.
217    */
218   struct GSF_StreamRequest *pending_tail;
219
220   /**
221    * Map from query to 'struct GSF_StreamRequest's waiting for
222    * a reply.
223    */
224   struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
225
226   /**
227    * Connection to the other peer.
228    */
229   struct GNUNET_STREAM_Socket *stream;
230
231   /**
232    * Handle for active read operation, or NULL.
233    */ 
234   struct GNUNET_STREAM_IOReadHandle *rh;
235
236   /**
237    * Handle for active write operation, or NULL.
238    */ 
239   struct GNUNET_STREAM_IOWriteHandle *wh;
240
241   /**
242    * Tokenizer for replies.
243    */
244   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
245
246   /**
247    * Which peer does this stream go to?
248    */ 
249   struct GNUNET_PeerIdentity target;
250
251   /**
252    * Task to kill inactive streams (we keep them around for
253    * a few seconds to give the application a chance to give
254    * us another query).
255    */
256   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
257
258   /**
259    * Task to reset streams that had errors (asynchronously,
260    * as we may not be able to do it immediately during a
261    * callback from the stream API).
262    */
263   GNUNET_SCHEDULER_TaskIdentifier reset_task;
264
265   /**
266    * Is this stream ready for transmission?
267    */
268   int is_ready;
269
270 };
271
272
273 /**
274  * Listen socket for incoming requests.
275  */
276 static struct GNUNET_STREAM_ListenSocket *listen_socket;
277
278 /**
279  * Head of DLL of stream clients.
280  */ 
281 static struct StreamClient *sc_head;
282
283 /**
284  * Tail of DLL of stream clients.
285  */ 
286 static struct StreamClient *sc_tail;
287
288 /**
289  * Number of active stream clients in the 'sc_*'-DLL.
290  */
291 static unsigned int sc_count;
292
293 /**
294  * Maximum allowed number of stream clients.
295  */
296 static unsigned long long sc_count_max;
297
298 /**
299  * Map from peer identities to 'struct StreamHandles' with streams to
300  * those peers.
301  */
302 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
303
304
305 /* ********************* client-side code ************************* */
306
307 /**
308  * Iterator called on each entry in a waiting map to 
309  * call the 'proc' continuation and release associated
310  * resources.
311  *
312  * @param cls the 'struct StreamHandle'
313  * @param key the key of the entry in the map (the query)
314  * @param value the 'struct GSF_StreamRequest' to clean up
315  * @return GNUNET_YES (continue to iterate)
316  */
317 static int
318 free_waiting_entry (void *cls,
319                     const struct GNUNET_HashCode *key,
320                     void *value)
321 {
322   struct GSF_StreamRequest *sr = value;
323
324   sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
325             GNUNET_TIME_UNIT_FOREVER_ABS,
326             0, NULL);
327   GSF_stream_query_cancel (sr);
328   return GNUNET_YES;
329 }
330
331
332 /**
333  * Destroy a stream handle.
334  *
335  * @param sh stream to process
336  */
337 static void
338 destroy_stream_handle (struct StreamHandle *sh)
339 {
340   struct GSF_StreamRequest *sr;
341
342   while (NULL != (sr = sh->pending_head))
343   {
344     sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
345               GNUNET_TIME_UNIT_FOREVER_ABS,
346               0, NULL);
347     GSF_stream_query_cancel (sr);
348   }
349   GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
350                                          &free_waiting_entry,
351                                          sh);
352   if (NULL != sh->wh)
353     GNUNET_STREAM_io_write_cancel (sh->wh);
354   if (NULL != sh->rh)
355     GNUNET_STREAM_io_read_cancel (sh->rh);
356   if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
357     GNUNET_SCHEDULER_cancel (sh->timeout_task);
358   GNUNET_STREAM_close (sh->stream);
359   GNUNET_assert (GNUNET_OK ==
360                  GNUNET_CONTAINER_multihashmap_remove (stream_map,
361                                                        &sh->target.hashPubKey,
362                                                        sh));
363   GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
364   GNUNET_free (sh);
365 }
366
367
368 /**
369  * Transmit pending requests via the stream.
370  *
371  * @param sh stream to process
372  */
373 static void
374 transmit_pending (struct StreamHandle *sh);
375
376
377 /**
378  * Function called once the stream is ready for transmission.
379  *
380  * @param cls the 'struct StreamHandle'
381  * @param socket stream socket handle
382  */
383 static void
384 stream_ready_cb (void *cls,
385                  struct GNUNET_STREAM_Socket *socket)
386 {
387   struct StreamHandle *sh = cls;
388
389   sh->is_ready = GNUNET_YES;
390   transmit_pending (sh);
391 }
392
393
394 /**
395  * Iterator called on each entry in a waiting map to 
396  * move it back to the pending list.
397  *
398  * @param cls the 'struct StreamHandle'
399  * @param key the key of the entry in the map (the query)
400  * @param value the 'struct GSF_StreamRequest' to move to pending
401  * @return GNUNET_YES (continue to iterate)
402  */
403 static int
404 move_to_pending (void *cls,
405                  const struct GNUNET_HashCode *key,
406                  void *value)
407 {
408   struct StreamHandle *sh = cls;
409   struct GSF_StreamRequest *sr = value;
410   
411   GNUNET_assert (GNUNET_YES ==
412                  GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
413                                                        key,
414                                                        value));
415   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
416                                sh->pending_tail,
417                                sr);
418   sr->was_transmitted = GNUNET_NO;
419   return GNUNET_YES;
420 }
421
422
423 /**
424  * We had a serious error, tear down and re-create stream from scratch.
425  *
426  * @param sh stream to reset
427  */
428 static void
429 reset_stream (struct StreamHandle *sh)
430 {
431   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
432               "Resetting stream to %s\n",
433               GNUNET_i2s (&sh->target));
434   if (NULL != sh->rh)
435     GNUNET_STREAM_io_read_cancel (sh->rh);
436   GNUNET_STREAM_close (sh->stream);
437   sh->is_ready = GNUNET_NO;
438   GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
439                                          &move_to_pending,
440                                          sh);
441   sh->stream = GNUNET_STREAM_open (GSF_cfg,
442                                    &sh->target,
443                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
444                                    &stream_ready_cb, sh,
445                                    GNUNET_STREAM_OPTION_END);
446 }
447
448
449 /**
450  * Task called when it is time to destroy an inactive stream.
451  *
452  * @param cls the 'struct StreamHandle' to tear down
453  * @param tc scheduler context, unused
454  */
455 static void
456 stream_timeout (void *cls,
457                 const struct GNUNET_SCHEDULER_TaskContext *tc)
458 {
459   struct StreamHandle *sh = cls;
460
461   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
462               "Timeout on stream to %s\n",
463               GNUNET_i2s (&sh->target));
464   sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
465   destroy_stream_handle (sh);
466 }
467
468
469 /**
470  * Task called when it is time to reset an stream.
471  *
472  * @param cls the 'struct StreamHandle' to tear down
473  * @param tc scheduler context, unused
474  */
475 static void
476 reset_stream_task (void *cls,
477                    const struct GNUNET_SCHEDULER_TaskContext *tc)
478 {
479   struct StreamHandle *sh = cls;
480
481   sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
482   reset_stream (sh);
483 }
484
485
486 /**
487  * We had a serious error, tear down and re-create stream from scratch,
488  * but do so asynchronously.
489  *
490  * @param sh stream to reset
491  */
492 static void
493 reset_stream_async (struct StreamHandle *sh)
494 {
495   if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
496     sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
497                                                sh);
498 }
499
500
501 /**
502  * We got a reply from the stream.  Process it.
503  *
504  * @param cls the struct StreamHandle 
505  * @param status the status of the stream at the time this function is called
506  * @param data traffic from the other side
507  * @param size the number of bytes available in data read; will be 0 on timeout 
508  * @return number of bytes of processed from 'data' (any data remaining should be
509  *         given to the next time the read processor is called).
510  */
511 static size_t
512 handle_stream_reply (void *cls,
513                      enum GNUNET_STREAM_Status status,
514                      const void *data,
515                      size_t size)
516 {
517   struct StreamHandle *sh = cls;
518
519   sh->rh = NULL;
520   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521               "Received %u bytes from stream to %s\n",
522               (unsigned int) size,
523               GNUNET_i2s (&sh->target));
524   if (GNUNET_SYSERR == 
525       GNUNET_SERVER_mst_receive (sh->mst,
526                                  NULL,
527                                  data, size,
528                                  GNUNET_NO, GNUNET_NO))
529   {
530     GNUNET_break_op (0);
531     reset_stream_async (sh);
532     return size;
533   }
534   sh->rh = GNUNET_STREAM_read (sh->stream,
535                                GNUNET_TIME_UNIT_FOREVER_REL,
536                                &handle_stream_reply,
537                                sh);
538   return size;
539 }
540
541
542 /**
543  * Functions of this signature are called whenever we transmitted a
544  * query via a stream.
545  *
546  * @param cls the struct StreamHandle for which we did the write call
547  * @param status the status of the stream at the time this function is called;
548  *          GNUNET_OK if writing to stream was completed successfully,
549  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
550  *          mean time.
551  * @param size the number of bytes written
552  */
553 static void
554 query_write_continuation (void *cls,
555                           enum GNUNET_STREAM_Status status,
556                           size_t size)
557 {
558   struct StreamHandle *sh = cls;
559
560   sh->wh = NULL;
561   if ( (GNUNET_STREAM_OK != status) ||
562        (sizeof (struct StreamQueryMessage) != size) )
563   {
564     reset_stream (sh);
565     return;
566   }
567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
568               "Successfully transmitted %u bytes via stream to %s\n",
569               (unsigned int) size,
570               GNUNET_i2s (&sh->target));
571   if (NULL == sh->rh)
572     sh->rh = GNUNET_STREAM_read (sh->stream,
573                                  GNUNET_TIME_UNIT_FOREVER_REL,
574                                  &handle_stream_reply,
575                                  sh);
576   transmit_pending (sh);
577 }
578           
579
580 /**
581  * Transmit pending requests via the stream.
582  *
583  * @param sh stream to process
584  */
585 static void
586 transmit_pending (struct StreamHandle *sh)
587 {
588   struct StreamQueryMessage sqm;
589   struct GSF_StreamRequest *sr;
590
591   if (NULL != sh->wh)
592     return;
593   sr = sh->pending_head;
594   if (NULL == sr)
595     return;
596   GNUNET_CONTAINER_DLL_remove (sh->pending_head,
597                                sh->pending_tail,
598                                sr);
599   GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
600                                      &sr->query,
601                                      sr,
602                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
603   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
604               "Sending query via stream to %s\n",
605               GNUNET_i2s (&sh->target));
606   sr->was_transmitted = GNUNET_YES;
607   sqm.header.size = htons (sizeof (sqm));
608   sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
609   sqm.type = htonl (sr->type);
610   sqm.query = sr->query;
611   sh->wh = GNUNET_STREAM_write (sh->stream,
612                                 &sqm, sizeof (sqm),
613                                 GNUNET_TIME_UNIT_FOREVER_REL,
614                                 &query_write_continuation,
615                                 sh);
616 }
617
618
619 /**
620  * Closure for 'handle_reply'.
621  */
622 struct HandleReplyClosure
623 {
624
625   /**
626    * Reply payload.
627    */ 
628   const void *data;
629
630   /**
631    * Expiration time for the block.
632    */
633   struct GNUNET_TIME_Absolute expiration;
634
635   /**
636    * Number of bytes in 'data'.
637    */
638   size_t data_size;
639
640   /** 
641    * Type of the block.
642    */
643   enum GNUNET_BLOCK_Type type;
644   
645   /**
646    * Did we have a matching query?
647    */
648   int found;
649 };
650
651
652 /**
653  * Iterator called on each entry in a waiting map to 
654  * process a result.
655  *
656  * @param cls the 'struct HandleReplyClosure'
657  * @param key the key of the entry in the map (the query)
658  * @param value the 'struct GSF_StreamRequest' to handle result for
659  * @return GNUNET_YES (continue to iterate)
660  */
661 static int
662 handle_reply (void *cls,
663               const struct GNUNET_HashCode *key,
664               void *value)
665 {
666   struct HandleReplyClosure *hrc = cls;
667   struct GSF_StreamRequest *sr = value;
668   
669   sr->proc (sr->proc_cls,
670             hrc->type,
671             hrc->expiration,
672             hrc->data_size,
673             hrc->data);
674   GSF_stream_query_cancel (sr);
675   hrc->found = GNUNET_YES;
676   return GNUNET_YES;
677 }
678
679
680 /**
681  * Functions with this signature are called whenever a
682  * complete reply is received.
683  *
684  * Do not call GNUNET_SERVER_mst_destroy in callback
685  *
686  * @param cls closure with the 'struct StreamHandle'
687  * @param client identification of the client, NULL
688  * @param message the actual message
689  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
690  */
691 static int
692 reply_cb (void *cls,
693           void *client,
694           const struct GNUNET_MessageHeader *message)
695 {
696   struct StreamHandle *sh = cls;
697   const struct StreamReplyMessage *srm;
698   struct HandleReplyClosure hrc;
699   uint16_t msize;
700   enum GNUNET_BLOCK_Type type;
701   struct GNUNET_HashCode query;
702
703   msize = ntohs (message->size);
704   switch (ntohs (message->type))
705   {
706   case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
707     if (sizeof (struct StreamReplyMessage) > msize)
708     {
709       GNUNET_break_op (0);
710       reset_stream_async (sh);
711       return GNUNET_SYSERR;
712     }
713     srm = (const struct StreamReplyMessage *) message;
714     msize -= sizeof (struct StreamReplyMessage);
715     type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
716     if (GNUNET_YES !=
717         GNUNET_BLOCK_get_key (GSF_block_ctx,
718                               type,
719                               &srm[1], msize, &query))
720     {
721       GNUNET_break_op (0); 
722       reset_stream_async (sh);
723       return GNUNET_SYSERR;
724     }
725     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
726                 "Received reply `%s' via stream\n",
727                 GNUNET_h2s (&query));
728     GNUNET_STATISTICS_update (GSF_stats,
729                               gettext_noop ("# replies received via stream"), 1,
730                               GNUNET_NO);
731     hrc.data = &srm[1];
732     hrc.data_size = msize;
733     hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
734     hrc.type = type;
735     hrc.found = GNUNET_NO;
736     GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
737                                                 &query,
738                                                 &handle_reply,
739                                                 &hrc);
740     if (GNUNET_NO == hrc.found)
741     {
742       GNUNET_STATISTICS_update (GSF_stats,
743                                 gettext_noop ("# replies received via stream dropped"), 1,
744                                 GNUNET_NO);
745       return GNUNET_OK;
746     }
747     return GNUNET_OK;
748   default:
749     GNUNET_break_op (0);
750     reset_stream_async (sh);
751     return GNUNET_SYSERR;
752   }
753 }
754
755
756 /**
757  * Get (or create) a stream to talk to the given peer.
758  *
759  * @param target peer we want to communicate with
760  */
761 static struct StreamHandle *
762 get_stream (const struct GNUNET_PeerIdentity *target)
763 {
764   struct StreamHandle *sh;
765
766   sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
767                                           &target->hashPubKey);
768   if (NULL != sh)
769   {
770     if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
771     {
772       GNUNET_SCHEDULER_cancel (sh->timeout_task);
773       sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
774     }
775     return sh;
776   }
777   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
778               "Creating stream to %s\n",
779               GNUNET_i2s (target));
780   sh = GNUNET_malloc (sizeof (struct StreamHandle));
781   sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
782                                       sh);
783   sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
784   sh->target = *target;
785   sh->stream = GNUNET_STREAM_open (GSF_cfg,
786                                    &sh->target,
787                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
788                                    &stream_ready_cb, sh,
789                                    GNUNET_STREAM_OPTION_END);
790   GNUNET_assert (GNUNET_OK ==
791                  GNUNET_CONTAINER_multihashmap_put (stream_map,
792                                                     &sh->target.hashPubKey,
793                                                     sh,
794                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
795   return sh;
796 }
797
798
799 /**
800  * Look for a block by directly contacting a particular peer.
801  *
802  * @param target peer that should have the block
803  * @param query hash to query for the block
804  * @param type desired type for the block
805  * @param proc function to call with result
806  * @param proc_cls closure for 'proc'
807  * @return handle to cancel the operation
808  */
809 struct GSF_StreamRequest *
810 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
811                   const struct GNUNET_HashCode *query,
812                   enum GNUNET_BLOCK_Type type,
813                   GSF_StreamReplyProcessor proc, void *proc_cls)
814 {
815   struct StreamHandle *sh;
816   struct GSF_StreamRequest *sr;
817
818   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819               "Preparing to send query for %s via stream to %s\n",
820               GNUNET_h2s (query),
821               GNUNET_i2s (target));
822   sh = get_stream (target);
823   sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
824   sr->sh = sh;
825   sr->proc = proc;
826   sr->proc_cls = proc_cls;
827   sr->type = type;
828   sr->query = *query;
829   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
830                                sh->pending_tail,
831                                sr);
832   if (GNUNET_YES == sh->is_ready)
833     transmit_pending (sh);
834   return sr;
835 }
836
837
838 /**
839  * Cancel an active request; must not be called after 'proc'
840  * was calld.
841  *
842  * @param sr request to cancel
843  */
844 void
845 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
846 {
847   struct StreamHandle *sh = sr->sh;
848
849   if (GNUNET_YES == sr->was_transmitted)
850     GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
851                                           &sr->query,
852                                           sr);
853   else
854     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
855                                  sh->pending_tail,
856                                  sr);
857   GNUNET_free (sr);
858   if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
859        (NULL == sh->pending_head) )
860     sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
861                                                      &stream_timeout,
862                                                      sh);
863 }
864
865
866 /* ********************* server-side code ************************* */
867
868
869 /**
870  * We're done with a particular client, clean up.
871  *
872  * @param sc client to clean up
873  */
874 static void
875 terminate_stream (struct StreamClient *sc)
876 {
877   GNUNET_STATISTICS_update (GSF_stats,
878                             gettext_noop ("# stream connections active"), -1,
879                             GNUNET_NO);
880   if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
881     GNUNET_SCHEDULER_cancel (sc->terminate_task); 
882   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
883     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
884  if (NULL != sc->rh)
885     GNUNET_STREAM_io_read_cancel (sc->rh);
886   if (NULL != sc->wh)
887     GNUNET_STREAM_io_write_cancel (sc->wh);
888   if (NULL != sc->qe)
889     GNUNET_DATASTORE_cancel (sc->qe);
890   GNUNET_SERVER_mst_destroy (sc->mst);
891   GNUNET_STREAM_close (sc->socket);
892   GNUNET_CONTAINER_DLL_remove (sc_head,
893                                sc_tail,
894                                sc);
895   sc_count--;
896   GNUNET_free (sc);
897 }
898
899
900 /**
901  * Task run to asynchronously terminate the stream.
902  *
903  * @param cls the 'struct StreamClient'
904  * @param tc scheduler context
905  */ 
906 static void
907 terminate_stream_task (void *cls,
908                        const struct GNUNET_SCHEDULER_TaskContext *tc)
909 {
910   struct StreamClient *sc = cls;
911
912   sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
913   terminate_stream (sc);
914 }
915
916
917 /**
918  * Task run to asynchronously terminate the stream due to timeout.
919  *
920  * @param cls the 'struct StreamClient'
921  * @param tc scheduler context
922  */ 
923 static void
924 timeout_stream_task (void *cls,
925                      const struct GNUNET_SCHEDULER_TaskContext *tc)
926 {
927   struct StreamClient *sc = cls;
928
929   sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
930   terminate_stream (sc);
931 }
932
933
934 /**
935  * Reset the timeout for the stream client (due to activity).
936  *
937  * @param sc client handle to reset timeout for
938  */
939 static void
940 refresh_timeout_task (struct StreamClient *sc)
941 {
942   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
943     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
944   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
945                                                    &timeout_stream_task,
946                                                    sc);
947 }
948
949
950 /**
951  * We had a serious error, termiante stream,
952  * but do so asynchronously.
953  *
954  * @param sc stream to reset
955  */
956 static void
957 terminate_stream_async (struct StreamClient *sc)
958 {
959   if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
960     sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
961                                                    sc);
962 }
963
964
965 /**
966  * Functions of this signature are called whenever data is available from the
967  * stream.
968  *
969  * @param cls the closure from GNUNET_STREAM_read
970  * @param status the status of the stream at the time this function is called
971  * @param data traffic from the other side
972  * @param size the number of bytes available in data read; will be 0 on timeout 
973  * @return number of bytes of processed from 'data' (any data remaining should be
974  *         given to the next time the read processor is called).
975  */
976 static size_t 
977 process_request (void *cls,
978                  enum GNUNET_STREAM_Status status,
979                  const void *data,
980                  size_t size);
981
982
983 /**
984  * We're done handling a request from a client, read the next one.
985  *
986  * @param sc client to continue reading requests from
987  */
988 static void
989 continue_reading (struct StreamClient *sc)
990 {
991   int ret;
992
993   ret = 
994     GNUNET_SERVER_mst_receive (sc->mst,
995                                NULL,
996                                NULL, 0,
997                                GNUNET_NO, GNUNET_YES);
998   if (GNUNET_NO == ret)
999     return; 
1000   refresh_timeout_task (sc);
1001   sc->rh = GNUNET_STREAM_read (sc->socket,
1002                                GNUNET_TIME_UNIT_FOREVER_REL,
1003                                &process_request,
1004                                sc);      
1005 }
1006
1007
1008 /**
1009  * Functions of this signature are called whenever data is available from the
1010  * stream.
1011  *
1012  * @param cls the closure from GNUNET_STREAM_read
1013  * @param status the status of the stream at the time this function is called
1014  * @param data traffic from the other side
1015  * @param size the number of bytes available in data read; will be 0 on timeout 
1016  * @return number of bytes of processed from 'data' (any data remaining should be
1017  *         given to the next time the read processor is called).
1018  */
1019 static size_t 
1020 process_request (void *cls,
1021                  enum GNUNET_STREAM_Status status,
1022                  const void *data,
1023                  size_t size)
1024 {
1025   struct StreamClient *sc = cls;
1026   int ret;
1027
1028   sc->rh = NULL;
1029   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030               "Received %u byte query via stream\n",
1031               (unsigned int) size);
1032   switch (status)
1033   {
1034   case GNUNET_STREAM_OK:
1035     ret = 
1036       GNUNET_SERVER_mst_receive (sc->mst,
1037                                  NULL,
1038                                  data, size,
1039                                  GNUNET_NO, GNUNET_YES);
1040     if (GNUNET_NO == ret)
1041       return size; /* more messages in MST */
1042     if (GNUNET_SYSERR == ret)
1043     {
1044       GNUNET_break_op (0);
1045       terminate_stream_async (sc);
1046       return size;
1047     }
1048     break;
1049   case GNUNET_STREAM_TIMEOUT:
1050   case GNUNET_STREAM_SHUTDOWN:
1051   case GNUNET_STREAM_SYSERR:
1052   case GNUNET_STREAM_BROKEN:
1053     terminate_stream_async (sc);
1054     return size;
1055   default:
1056     GNUNET_break (0);
1057     return size;
1058   }
1059   continue_reading (sc);
1060   return size;
1061 }
1062
1063
1064 /**
1065  * Sending a reply was completed, continue processing.
1066  *
1067  * @param cls closure with the struct StreamClient which sent the query
1068  * @param status result code for the operation
1069  * @param size number of bytes that were transmitted
1070  */
1071 static void
1072 write_continuation (void *cls,
1073                     enum GNUNET_STREAM_Status status,
1074                     size_t size)
1075 {
1076   struct StreamClient *sc = cls;
1077   
1078   sc->wh = NULL;
1079   if ( (GNUNET_STREAM_OK == status) &&
1080        (size == sc->reply_size) )
1081   {
1082     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083                 "Transmitted %u byte reply via stream\n",
1084                 (unsigned int) size);
1085     GNUNET_STATISTICS_update (GSF_stats,
1086                               gettext_noop ("# Blocks transferred via stream"), 1,
1087                               GNUNET_NO);
1088     continue_reading (sc);
1089   }
1090   else
1091   {
1092     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1093                 "Transmission of reply failed, terminating stream\n");
1094     terminate_stream (sc);    
1095   }
1096 }
1097
1098
1099 /**
1100  * Process a datum that was stored in the datastore.
1101  *
1102  * @param cls closure with the struct StreamClient which sent the query
1103  * @param key key for the content
1104  * @param size number of bytes in data
1105  * @param data content stored
1106  * @param type type of the content
1107  * @param priority priority of the content
1108  * @param anonymity anonymity-level for the content
1109  * @param expiration expiration time for the content
1110  * @param uid unique identifier for the datum;
1111  *        maybe 0 if no unique identifier is available
1112  */
1113 static void 
1114 handle_datastore_reply (void *cls,
1115                         const struct GNUNET_HashCode * key,
1116                         size_t size, const void *data,
1117                         enum GNUNET_BLOCK_Type type,
1118                         uint32_t priority,
1119                         uint32_t anonymity,
1120                         struct GNUNET_TIME_Absolute
1121                         expiration, uint64_t uid)
1122 {
1123   struct StreamClient *sc = cls;
1124   size_t msize = size + sizeof (struct StreamReplyMessage);
1125   char buf[msize] GNUNET_ALIGN;
1126   struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
1127
1128   sc->qe = NULL;
1129   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1130   {
1131     if (GNUNET_OK !=
1132         GNUNET_FS_handle_on_demand_block (key,
1133                                           size, data, type,
1134                                           priority, anonymity,
1135                                           expiration, uid,
1136                                           &handle_datastore_reply,
1137                                           sc))
1138     {
1139       continue_reading (sc);
1140     }
1141     return;
1142   }
1143   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
1144   {
1145     GNUNET_break (0);
1146     continue_reading (sc);
1147     return;
1148   }
1149   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1150               "Starting transmission of %u byte reply via stream\n",
1151               (unsigned int) size);
1152   srm->header.size = htons ((uint16_t) msize);
1153   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
1154   srm->type = htonl (type);
1155   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
1156   memcpy (&srm[1], data, size);
1157   sc->reply_size = msize;
1158   sc->wh = GNUNET_STREAM_write (sc->socket,
1159                                 buf, msize,
1160                                 GNUNET_TIME_UNIT_FOREVER_REL,
1161                                 &write_continuation,
1162                                 sc);
1163   if (NULL == sc->wh)
1164   {
1165     terminate_stream (sc);
1166     return;
1167   }
1168 }
1169
1170
1171 /**
1172  * Functions with this signature are called whenever a
1173  * complete query message is received.
1174  *
1175  * Do not call GNUNET_SERVER_mst_destroy in callback
1176  *
1177  * @param cls closure with the 'struct StreamClient'
1178  * @param client identification of the client, NULL
1179  * @param message the actual message
1180  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1181  */
1182 static int
1183 request_cb (void *cls,
1184             void *client,
1185             const struct GNUNET_MessageHeader *message)
1186 {
1187   struct StreamClient *sc = cls;
1188   const struct StreamQueryMessage *sqm;
1189
1190   switch (ntohs (message->type))
1191   {
1192   case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
1193     if (sizeof (struct StreamQueryMessage) != 
1194         ntohs (message->size))
1195     {
1196       GNUNET_break_op (0);
1197       terminate_stream_async (sc);
1198       return GNUNET_SYSERR;
1199     }
1200     sqm = (const struct StreamQueryMessage *) message;
1201     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1202                 "Received query for `%s' via stream\n",
1203                 GNUNET_h2s (&sqm->query));
1204     GNUNET_STATISTICS_update (GSF_stats,
1205                               gettext_noop ("# queries received via stream"), 1,
1206                               GNUNET_NO);
1207     refresh_timeout_task (sc);
1208     sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1209                                        0,
1210                                        &sqm->query,
1211                                        ntohl (sqm->type),
1212                                        0 /* priority */, 
1213                                        GSF_datastore_queue_size,
1214                                        GNUNET_TIME_UNIT_FOREVER_REL,
1215                                        &handle_datastore_reply, sc);
1216     if (NULL == sc->qe)
1217       continue_reading (sc);
1218     return GNUNET_OK;
1219   default:
1220     GNUNET_break_op (0);
1221     terminate_stream_async (sc);
1222     return GNUNET_SYSERR;
1223   }
1224 }
1225
1226
1227 /**
1228  * Functions of this type are called upon new stream connection from other peers
1229  * or upon binding error which happen when the app_port given in
1230  * GNUNET_STREAM_listen() is already taken.
1231  *
1232  * @param cls the closure from GNUNET_STREAM_listen
1233  * @param socket the socket representing the stream; NULL on binding error
1234  * @param initiator the identity of the peer who wants to establish a stream
1235  *            with us; NULL on binding error
1236  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
1237  *             stream (the socket will be invalid after the call)
1238  */
1239 static int 
1240 accept_cb (void *cls,
1241            struct GNUNET_STREAM_Socket *socket,
1242            const struct GNUNET_PeerIdentity *initiator)
1243 {
1244   struct StreamClient *sc;
1245
1246   if (NULL == socket)
1247     return GNUNET_SYSERR;
1248   if (sc_count >= sc_count_max)
1249   {
1250     GNUNET_STATISTICS_update (GSF_stats,
1251                               gettext_noop ("# stream client connections rejected"), 1,
1252                               GNUNET_NO);
1253     return GNUNET_SYSERR;
1254   }
1255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1256               "Accepting inbound stream connection from `%s'\n",
1257               GNUNET_i2s (initiator));
1258   GNUNET_STATISTICS_update (GSF_stats,
1259                             gettext_noop ("# stream connections active"), 1,
1260                             GNUNET_NO);
1261   sc = GNUNET_malloc (sizeof (struct StreamClient));
1262   sc->socket = socket;
1263   sc->mst = GNUNET_SERVER_mst_create (&request_cb,
1264                                       sc);
1265   sc->rh = GNUNET_STREAM_read (sc->socket,
1266                                GNUNET_TIME_UNIT_FOREVER_REL,
1267                                &process_request,
1268                                sc);
1269   GNUNET_CONTAINER_DLL_insert (sc_head,
1270                                sc_tail,
1271                                sc);
1272   sc_count++;
1273   refresh_timeout_task (sc);
1274   return GNUNET_OK;
1275 }
1276
1277
1278 /**
1279  * Initialize subsystem for non-anonymous file-sharing.
1280  */
1281 void
1282 GSF_stream_start ()
1283 {
1284   stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1285   if (GNUNET_YES ==
1286       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1287                                              "fs",
1288                                              "MAX_STREAM_CLIENTS",
1289                                              &sc_count_max))
1290   {
1291     listen_socket = GNUNET_STREAM_listen (GSF_cfg,
1292                                           GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1293                                           &accept_cb, NULL,
1294                                           GNUNET_STREAM_OPTION_END);
1295   } 
1296 }
1297
1298
1299 /**
1300  * Function called on each active streams to shut them down.
1301  *
1302  * @param cls NULL
1303  * @param key target peer, unused
1304  * @param value the 'struct StreamHandle' to destroy
1305  * @return GNUNET_YES (continue to iterate)
1306  */
1307 static int
1308 release_streams (void *cls,
1309                  const struct GNUNET_HashCode *key,
1310                  void *value)
1311 {
1312   struct StreamHandle *sh = value;
1313
1314   destroy_stream_handle (sh);
1315   return GNUNET_YES;
1316 }
1317
1318
1319 /**
1320  * Shutdown subsystem for non-anonymous file-sharing.
1321  */
1322 void
1323 GSF_stream_stop ()
1324 {
1325   struct StreamClient *sc;
1326
1327   while (NULL != (sc = sc_head))
1328     terminate_stream (sc);
1329   if (NULL != listen_socket)
1330   {
1331     GNUNET_STREAM_listen_close (listen_socket);
1332     listen_socket = NULL;
1333   }
1334   GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1335                                          &release_streams,
1336                                          NULL);
1337   GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1338   stream_map = NULL;
1339 }
1340
1341 /* end of gnunet-service-fs_stream.c */