-switch to hash map for replies to avoid linear scan, add timeout for inactive clients
[oweals/gnunet.git] / src / fs / gnunet-service-fs_stream.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_stream.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - limit # concurrent clients
28  */
29 #include "platform.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_stream_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_applications.h"
35 #include "gnunet-service-fs.h"
36 #include "gnunet-service-fs_indexing.h"
37 #include "gnunet-service-fs_stream.h"
38
39 /**
40  * After how long do we termiante idle connections?
41  */
42 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
43
44
45 /**
46  * Information we keep around for each active streaming client.
47  */
48 struct StreamClient
49 {
50   /**
51    * DLL
52    */ 
53   struct StreamClient *next;
54
55   /**
56    * DLL
57    */ 
58   struct StreamClient *prev;
59
60   /**
61    * Socket for communication.
62    */ 
63   struct GNUNET_STREAM_Socket *socket;
64
65   /**
66    * Handle for active read operation, or NULL.
67    */ 
68   struct GNUNET_STREAM_IOReadHandle *rh;
69
70   /**
71    * Handle for active write operation, or NULL.
72    */ 
73   struct GNUNET_STREAM_IOWriteHandle *wh;
74   
75   /**
76    * Tokenizer for requests.
77    */
78   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
79   
80   /**
81    * Current active request to the datastore, if we have one pending.
82    */
83   struct GNUNET_DATASTORE_QueueEntry *qe;
84
85   /**
86    * Task that is scheduled to asynchronously terminate the connection.
87    */
88   GNUNET_SCHEDULER_TaskIdentifier terminate_task;
89
90   /**
91    * Task that is scheduled to terminate idle connections.
92    */
93   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
94
95   /**
96    * Size of the last write that was initiated.
97    */ 
98   size_t reply_size;
99
100 };
101
102
103 /**
104  * Query from one peer, asking the other for CHK-data.
105  */
106 struct StreamQueryMessage
107 {
108
109   /**
110    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
111    */
112   struct GNUNET_MessageHeader header;
113
114   /**
115    * Block type must be DBLOCK or IBLOCK.
116    */
117   uint32_t type;
118
119   /**
120    * Query hash from CHK (hash of encrypted block).
121    */
122   struct GNUNET_HashCode query;
123
124 };
125
126
127 /**
128  * Reply to a StreamQueryMessage.
129  */
130 struct StreamReplyMessage
131 {
132
133   /**
134    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
135    */
136   struct GNUNET_MessageHeader header;
137
138   /**
139    * Block type must be DBLOCK or IBLOCK.
140    */
141   uint32_t type;
142
143   /**
144    * Expiration time for the block.
145    */
146   struct GNUNET_TIME_AbsoluteNBO expiration;
147
148   /* followed by the encrypted block */
149
150 };
151
152
153 /** 
154  * Handle for a stream to another peer.
155  */
156 struct StreamHandle;
157
158
159 /**
160  * Handle for a request that is going out via stream API.
161  */
162 struct GSF_StreamRequest
163 {
164
165   /**
166    * DLL.
167    */
168   struct GSF_StreamRequest *next;
169
170   /**
171    * DLL.
172    */
173   struct GSF_StreamRequest *prev;
174
175   /**
176    * Which stream is this request associated with?
177    */
178   struct StreamHandle *sh;
179
180   /**
181    * Function to call with the result.
182    */
183   GSF_StreamReplyProcessor proc;
184
185   /**
186    * Closure for 'proc'
187    */
188   void *proc_cls;
189
190   /**
191    * Query to transmit to the other peer.
192    */
193   struct GNUNET_HashCode query;
194
195   /**
196    * Desired type for the reply.
197    */
198   enum GNUNET_BLOCK_Type type;
199
200   /**
201    * Did we transmit this request already? YES if we are
202    * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
203    */
204   int was_transmitted;
205 };
206
207
208 /** 
209  * Handle for a stream to another peer.
210  */
211 struct StreamHandle
212 {
213   /**
214    * Head of DLL of pending requests on this stream.
215    */
216   struct GSF_StreamRequest *pending_head;
217
218   /**
219    * Tail of DLL of pending requests on this stream.
220    */
221   struct GSF_StreamRequest *pending_tail;
222
223   /**
224    * Map from query to 'struct GSF_StreamRequest's waiting for
225    * a reply.
226    */
227   struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
228
229   /**
230    * Connection to the other peer.
231    */
232   struct GNUNET_STREAM_Socket *stream;
233
234   /**
235    * Handle for active read operation, or NULL.
236    */ 
237   struct GNUNET_STREAM_IOReadHandle *rh;
238
239   /**
240    * Handle for active write operation, or NULL.
241    */ 
242   struct GNUNET_STREAM_IOWriteHandle *wh;
243
244   /**
245    * Tokenizer for replies.
246    */
247   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
248
249   /**
250    * Which peer does this stream go to?
251    */ 
252   struct GNUNET_PeerIdentity target;
253
254   /**
255    * Task to kill inactive streams (we keep them around for
256    * a few seconds to give the application a chance to give
257    * us another query).
258    */
259   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
260
261   /**
262    * Task to reset streams that had errors (asynchronously,
263    * as we may not be able to do it immediately during a
264    * callback from the stream API).
265    */
266   GNUNET_SCHEDULER_TaskIdentifier reset_task;
267
268   /**
269    * Is this stream ready for transmission?
270    */
271   int is_ready;
272
273 };
274
275
276 /**
277  * Listen socket for incoming requests.
278  */
279 static struct GNUNET_STREAM_ListenSocket *listen_socket;
280
281 /**
282  * Head of DLL of stream clients.
283  */ 
284 static struct StreamClient *sc_head;
285
286 /**
287  * Tail of DLL of stream clients.
288  */ 
289 static struct StreamClient *sc_tail;
290
291 /**
292  * Map from peer identities to 'struct StreamHandles' with streams to
293  * those peers.
294  */
295 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
296
297
298 /* ********************* client-side code ************************* */
299
300 /**
301  * Iterator called on each entry in a waiting map to 
302  * call the 'proc' continuation and release associated
303  * resources.
304  *
305  * @param cls the 'struct StreamHandle'
306  * @param key the key of the entry in the map (the query)
307  * @param value the 'struct GSF_StreamRequest' to clean up
308  * @return GNUNET_YES (continue to iterate)
309  */
310 static int
311 free_waiting_entry (void *cls,
312                     const struct GNUNET_HashCode *key,
313                     void *value)
314 {
315   struct GSF_StreamRequest *sr = value;
316
317   sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
318             GNUNET_TIME_UNIT_FOREVER_ABS,
319             0, NULL);
320   GSF_stream_query_cancel (sr);
321   return GNUNET_YES;
322 }
323
324
325 /**
326  * Destroy a stream handle.
327  *
328  * @param sh stream to process
329  */
330 static void
331 destroy_stream_handle (struct StreamHandle *sh)
332 {
333   struct GSF_StreamRequest *sr;
334
335   while (NULL != (sr = sh->pending_head))
336   {
337     sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
338               GNUNET_TIME_UNIT_FOREVER_ABS,
339               0, NULL);
340     GSF_stream_query_cancel (sr);
341   }
342   GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
343                                          &free_waiting_entry,
344                                          sh);
345   if (NULL != sh->wh)
346     GNUNET_STREAM_io_write_cancel (sh->wh);
347   if (NULL != sh->rh)
348     GNUNET_STREAM_io_read_cancel (sh->rh);
349   if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
350     GNUNET_SCHEDULER_cancel (sh->timeout_task);
351   GNUNET_STREAM_close (sh->stream);
352   GNUNET_assert (GNUNET_OK ==
353                  GNUNET_CONTAINER_multihashmap_remove (stream_map,
354                                                        &sh->target.hashPubKey,
355                                                        sh));
356   GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
357   GNUNET_free (sh);
358 }
359
360
361 /**
362  * Transmit pending requests via the stream.
363  *
364  * @param sh stream to process
365  */
366 static void
367 transmit_pending (struct StreamHandle *sh);
368
369
370 /**
371  * Function called once the stream is ready for transmission.
372  *
373  * @param cls the 'struct StreamHandle'
374  * @param socket stream socket handle
375  */
376 static void
377 stream_ready_cb (void *cls,
378                  struct GNUNET_STREAM_Socket *socket)
379 {
380   struct StreamHandle *sh = cls;
381
382   sh->is_ready = GNUNET_YES;
383   transmit_pending (sh);
384 }
385
386
387 /**
388  * Iterator called on each entry in a waiting map to 
389  * move it back to the pending list.
390  *
391  * @param cls the 'struct StreamHandle'
392  * @param key the key of the entry in the map (the query)
393  * @param value the 'struct GSF_StreamRequest' to move to pending
394  * @return GNUNET_YES (continue to iterate)
395  */
396 static int
397 move_to_pending (void *cls,
398                  const struct GNUNET_HashCode *key,
399                  void *value)
400 {
401   struct StreamHandle *sh = cls;
402   struct GSF_StreamRequest *sr = value;
403   
404   GNUNET_assert (GNUNET_YES ==
405                  GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
406                                                        key,
407                                                        value));
408   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
409                                sh->pending_tail,
410                                sr);
411   sr->was_transmitted = GNUNET_NO;
412   return GNUNET_YES;
413 }
414
415
416 /**
417  * We had a serious error, tear down and re-create stream from scratch.
418  *
419  * @param sh stream to reset
420  */
421 static void
422 reset_stream (struct StreamHandle *sh)
423 {
424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425               "Resetting stream to %s\n",
426               GNUNET_i2s (&sh->target));
427   if (NULL != sh->rh)
428     GNUNET_STREAM_io_read_cancel (sh->rh);
429   GNUNET_STREAM_close (sh->stream);
430   sh->is_ready = GNUNET_NO;
431   GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
432                                          &move_to_pending,
433                                          sh);
434   sh->stream = GNUNET_STREAM_open (GSF_cfg,
435                                    &sh->target,
436                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
437                                    &stream_ready_cb, sh,
438                                    GNUNET_STREAM_OPTION_END);
439 }
440
441
442 /**
443  * Task called when it is time to destroy an inactive stream.
444  *
445  * @param cls the 'struct StreamHandle' to tear down
446  * @param tc scheduler context, unused
447  */
448 static void
449 stream_timeout (void *cls,
450                 const struct GNUNET_SCHEDULER_TaskContext *tc)
451 {
452   struct StreamHandle *sh = cls;
453
454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
455               "Timeout on stream to %s\n",
456               GNUNET_i2s (&sh->target));
457   sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
458   destroy_stream_handle (sh);
459 }
460
461
462 /**
463  * Task called when it is time to reset an stream.
464  *
465  * @param cls the 'struct StreamHandle' to tear down
466  * @param tc scheduler context, unused
467  */
468 static void
469 reset_stream_task (void *cls,
470                    const struct GNUNET_SCHEDULER_TaskContext *tc)
471 {
472   struct StreamHandle *sh = cls;
473
474   sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
475   reset_stream (sh);
476 }
477
478
479 /**
480  * We had a serious error, tear down and re-create stream from scratch,
481  * but do so asynchronously.
482  *
483  * @param sh stream to reset
484  */
485 static void
486 reset_stream_async (struct StreamHandle *sh)
487 {
488   if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
489     sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
490                                                sh);
491 }
492
493
494 /**
495  * We got a reply from the stream.  Process it.
496  *
497  * @param cls the struct StreamHandle 
498  * @param status the status of the stream at the time this function is called
499  * @param data traffic from the other side
500  * @param size the number of bytes available in data read; will be 0 on timeout 
501  * @return number of bytes of processed from 'data' (any data remaining should be
502  *         given to the next time the read processor is called).
503  */
504 static size_t
505 handle_stream_reply (void *cls,
506                      enum GNUNET_STREAM_Status status,
507                      const void *data,
508                      size_t size)
509 {
510   struct StreamHandle *sh = cls;
511
512   sh->rh = NULL;
513   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
514               "Received %u bytes from stream to %s\n",
515               (unsigned int) size,
516               GNUNET_i2s (&sh->target));
517   if (GNUNET_SYSERR == 
518       GNUNET_SERVER_mst_receive (sh->mst,
519                                  NULL,
520                                  data, size,
521                                  GNUNET_NO, GNUNET_NO))
522   {
523     GNUNET_break_op (0);
524     reset_stream_async (sh);
525     return size;
526   }
527   sh->rh = GNUNET_STREAM_read (sh->stream,
528                                GNUNET_TIME_UNIT_FOREVER_REL,
529                                &handle_stream_reply,
530                                sh);
531   return size;
532 }
533
534
535 /**
536  * Functions of this signature are called whenever we transmitted a
537  * query via a stream.
538  *
539  * @param cls the struct StreamHandle for which we did the write call
540  * @param status the status of the stream at the time this function is called;
541  *          GNUNET_OK if writing to stream was completed successfully,
542  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
543  *          mean time.
544  * @param size the number of bytes written
545  */
546 static void
547 query_write_continuation (void *cls,
548                           enum GNUNET_STREAM_Status status,
549                           size_t size)
550 {
551   struct StreamHandle *sh = cls;
552
553   sh->wh = NULL;
554   if ( (GNUNET_STREAM_OK != status) ||
555        (sizeof (struct StreamQueryMessage) != size) )
556   {
557     reset_stream (sh);
558     return;
559   }
560   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
561               "Successfully transmitted %u bytes via stream to %s\n",
562               (unsigned int) size,
563               GNUNET_i2s (&sh->target));
564   if (NULL == sh->rh)
565     sh->rh = GNUNET_STREAM_read (sh->stream,
566                                  GNUNET_TIME_UNIT_FOREVER_REL,
567                                  &handle_stream_reply,
568                                  sh);
569   transmit_pending (sh);
570 }
571           
572
573 /**
574  * Transmit pending requests via the stream.
575  *
576  * @param sh stream to process
577  */
578 static void
579 transmit_pending (struct StreamHandle *sh)
580 {
581   struct StreamQueryMessage sqm;
582   struct GSF_StreamRequest *sr;
583
584   if (NULL != sh->wh)
585     return;
586   sr = sh->pending_head;
587   if (NULL == sr)
588     return;
589   GNUNET_CONTAINER_DLL_remove (sh->pending_head,
590                                sh->pending_tail,
591                                sr);
592   GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
593                                      &sr->query,
594                                      sr,
595                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
596   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
597               "Sending query via stream to %s\n",
598               GNUNET_i2s (&sh->target));
599   sr->was_transmitted = GNUNET_YES;
600   sqm.header.size = htons (sizeof (sqm));
601   sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
602   sqm.type = htonl (sr->type);
603   sqm.query = sr->query;
604   sh->wh = GNUNET_STREAM_write (sh->stream,
605                                 &sqm, sizeof (sqm),
606                                 GNUNET_TIME_UNIT_FOREVER_REL,
607                                 &query_write_continuation,
608                                 sh);
609 }
610
611
612 /**
613  * Closure for 'handle_reply'.
614  */
615 struct HandleReplyClosure
616 {
617
618   /**
619    * Reply payload.
620    */ 
621   const void *data;
622
623   /**
624    * Expiration time for the block.
625    */
626   struct GNUNET_TIME_Absolute expiration;
627
628   /**
629    * Number of bytes in 'data'.
630    */
631   size_t data_size;
632
633   /** 
634    * Type of the block.
635    */
636   enum GNUNET_BLOCK_Type type;
637   
638   /**
639    * Did we have a matching query?
640    */
641   int found;
642 };
643
644
645 /**
646  * Iterator called on each entry in a waiting map to 
647  * process a result.
648  *
649  * @param cls the 'struct HandleReplyClosure'
650  * @param key the key of the entry in the map (the query)
651  * @param value the 'struct GSF_StreamRequest' to handle result for
652  * @return GNUNET_YES (continue to iterate)
653  */
654 static int
655 handle_reply (void *cls,
656               const struct GNUNET_HashCode *key,
657               void *value)
658 {
659   struct HandleReplyClosure *hrc = cls;
660   struct GSF_StreamRequest *sr = value;
661   
662   sr->proc (sr->proc_cls,
663             hrc->type,
664             hrc->expiration,
665             hrc->data_size,
666             hrc->data);
667   GSF_stream_query_cancel (sr);
668   hrc->found = GNUNET_YES;
669   return GNUNET_YES;
670 }
671
672
673 /**
674  * Functions with this signature are called whenever a
675  * complete reply is received.
676  *
677  * Do not call GNUNET_SERVER_mst_destroy in callback
678  *
679  * @param cls closure with the 'struct StreamHandle'
680  * @param client identification of the client, NULL
681  * @param message the actual message
682  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
683  */
684 static int
685 reply_cb (void *cls,
686           void *client,
687           const struct GNUNET_MessageHeader *message)
688 {
689   struct StreamHandle *sh = cls;
690   const struct StreamReplyMessage *srm;
691   struct HandleReplyClosure hrc;
692   uint16_t msize;
693   enum GNUNET_BLOCK_Type type;
694   struct GNUNET_HashCode query;
695
696   msize = ntohs (message->size);
697   switch (ntohs (message->type))
698   {
699   case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
700     if (sizeof (struct StreamReplyMessage) > msize)
701     {
702       GNUNET_break_op (0);
703       reset_stream_async (sh);
704       return GNUNET_SYSERR;
705     }
706     srm = (const struct StreamReplyMessage *) message;
707     msize -= sizeof (struct StreamReplyMessage);
708     type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
709     if (GNUNET_YES !=
710         GNUNET_BLOCK_get_key (GSF_block_ctx,
711                               type,
712                               &srm[1], msize, &query))
713     {
714       GNUNET_break_op (0); 
715       reset_stream_async (sh);
716       return GNUNET_SYSERR;
717     }
718     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719                 "Received reply `%s' via stream\n",
720                 GNUNET_h2s (&query));
721     GNUNET_STATISTICS_update (GSF_stats,
722                               gettext_noop ("# replies received via stream"), 1,
723                               GNUNET_NO);
724     hrc.data = &srm[1];
725     hrc.data_size = msize;
726     hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
727     hrc.type = type;
728     hrc.found = GNUNET_NO;
729     GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
730                                                 &query,
731                                                 &handle_reply,
732                                                 &hrc);
733     if (GNUNET_NO == hrc.found)
734     {
735       GNUNET_STATISTICS_update (GSF_stats,
736                                 gettext_noop ("# replies received via stream dropped"), 1,
737                                 GNUNET_NO);
738       return GNUNET_OK;
739     }
740     return GNUNET_OK;
741   default:
742     GNUNET_break_op (0);
743     reset_stream_async (sh);
744     return GNUNET_SYSERR;
745   }
746 }
747
748
749 /**
750  * Get (or create) a stream to talk to the given peer.
751  *
752  * @param target peer we want to communicate with
753  */
754 static struct StreamHandle *
755 get_stream (const struct GNUNET_PeerIdentity *target)
756 {
757   struct StreamHandle *sh;
758
759   sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
760                                           &target->hashPubKey);
761   if (NULL != sh)
762   {
763     if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
764     {
765       GNUNET_SCHEDULER_cancel (sh->timeout_task);
766       sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
767     }
768     return sh;
769   }
770   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
771               "Creating stream to %s\n",
772               GNUNET_i2s (target));
773   sh = GNUNET_malloc (sizeof (struct StreamHandle));
774   sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
775                                       sh);
776   sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
777   sh->target = *target;
778   sh->stream = GNUNET_STREAM_open (GSF_cfg,
779                                    &sh->target,
780                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
781                                    &stream_ready_cb, sh,
782                                    GNUNET_STREAM_OPTION_END);
783   GNUNET_assert (GNUNET_OK ==
784                  GNUNET_CONTAINER_multihashmap_put (stream_map,
785                                                     &sh->target.hashPubKey,
786                                                     sh,
787                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
788   return sh;
789 }
790
791
792 /**
793  * Look for a block by directly contacting a particular peer.
794  *
795  * @param target peer that should have the block
796  * @param query hash to query for the block
797  * @param type desired type for the block
798  * @param proc function to call with result
799  * @param proc_cls closure for 'proc'
800  * @return handle to cancel the operation
801  */
802 struct GSF_StreamRequest *
803 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
804                   const struct GNUNET_HashCode *query,
805                   enum GNUNET_BLOCK_Type type,
806                   GSF_StreamReplyProcessor proc, void *proc_cls)
807 {
808   struct StreamHandle *sh;
809   struct GSF_StreamRequest *sr;
810
811   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812               "Preparing to send query for %s via stream to %s\n",
813               GNUNET_h2s (query),
814               GNUNET_i2s (target));
815   sh = get_stream (target);
816   sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
817   sr->sh = sh;
818   sr->proc = proc;
819   sr->proc_cls = proc_cls;
820   sr->type = type;
821   sr->query = *query;
822   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
823                                sh->pending_tail,
824                                sr);
825   if (GNUNET_YES == sh->is_ready)
826     transmit_pending (sh);
827   return sr;
828 }
829
830
831 /**
832  * Cancel an active request; must not be called after 'proc'
833  * was calld.
834  *
835  * @param sr request to cancel
836  */
837 void
838 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
839 {
840   struct StreamHandle *sh = sr->sh;
841
842   if (GNUNET_YES == sr->was_transmitted)
843     GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
844                                           &sr->query,
845                                           sr);
846   else
847     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
848                                  sh->pending_tail,
849                                  sr);
850   GNUNET_free (sr);
851   if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
852        (NULL == sh->pending_head) )
853     sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
854                                                      &stream_timeout,
855                                                      sh);
856 }
857
858
859 /* ********************* server-side code ************************* */
860
861
862 /**
863  * We're done with a particular client, clean up.
864  *
865  * @param sc client to clean up
866  */
867 static void
868 terminate_stream (struct StreamClient *sc)
869 {
870   GNUNET_STATISTICS_update (GSF_stats,
871                             gettext_noop ("# stream connections active"), -1,
872                             GNUNET_NO);
873   if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
874     GNUNET_SCHEDULER_cancel (sc->terminate_task); 
875   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
876     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
877  if (NULL != sc->rh)
878     GNUNET_STREAM_io_read_cancel (sc->rh);
879   if (NULL != sc->wh)
880     GNUNET_STREAM_io_write_cancel (sc->wh);
881   if (NULL != sc->qe)
882     GNUNET_DATASTORE_cancel (sc->qe);
883   GNUNET_SERVER_mst_destroy (sc->mst);
884   GNUNET_STREAM_close (sc->socket);
885   GNUNET_CONTAINER_DLL_remove (sc_head,
886                                sc_tail,
887                                sc);
888   GNUNET_free (sc);
889 }
890
891
892 /**
893  * Task run to asynchronously terminate the stream.
894  *
895  * @param cls the 'struct StreamClient'
896  * @param tc scheduler context
897  */ 
898 static void
899 terminate_stream_task (void *cls,
900                        const struct GNUNET_SCHEDULER_TaskContext *tc)
901 {
902   struct StreamClient *sc = cls;
903
904   sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
905   terminate_stream (sc);
906 }
907
908
909 /**
910  * Task run to asynchronously terminate the stream due to timeout.
911  *
912  * @param cls the 'struct StreamClient'
913  * @param tc scheduler context
914  */ 
915 static void
916 timeout_stream_task (void *cls,
917                      const struct GNUNET_SCHEDULER_TaskContext *tc)
918 {
919   struct StreamClient *sc = cls;
920
921   sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
922   terminate_stream (sc);
923 }
924
925
926 /**
927  * Reset the timeout for the stream client (due to activity).
928  *
929  * @param sc client handle to reset timeout for
930  */
931 static void
932 refresh_timeout_task (struct StreamClient *sc)
933 {
934   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
935     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
936   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
937                                                    &timeout_stream_task,
938                                                    sc);
939 }
940
941
942 /**
943  * We had a serious error, termiante stream,
944  * but do so asynchronously.
945  *
946  * @param sc stream to reset
947  */
948 static void
949 terminate_stream_async (struct StreamClient *sc)
950 {
951   if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
952     sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
953                                                    sc);
954 }
955
956
957 /**
958  * Functions of this signature are called whenever data is available from the
959  * stream.
960  *
961  * @param cls the closure from GNUNET_STREAM_read
962  * @param status the status of the stream at the time this function is called
963  * @param data traffic from the other side
964  * @param size the number of bytes available in data read; will be 0 on timeout 
965  * @return number of bytes of processed from 'data' (any data remaining should be
966  *         given to the next time the read processor is called).
967  */
968 static size_t 
969 process_request (void *cls,
970                  enum GNUNET_STREAM_Status status,
971                  const void *data,
972                  size_t size);
973
974
975 /**
976  * We're done handling a request from a client, read the next one.
977  *
978  * @param sc client to continue reading requests from
979  */
980 static void
981 continue_reading (struct StreamClient *sc)
982 {
983   int ret;
984
985   ret = 
986     GNUNET_SERVER_mst_receive (sc->mst,
987                                NULL,
988                                NULL, 0,
989                                GNUNET_NO, GNUNET_YES);
990   if (GNUNET_NO == ret)
991     return; 
992   refresh_timeout_task (sc);
993   sc->rh = GNUNET_STREAM_read (sc->socket,
994                                GNUNET_TIME_UNIT_FOREVER_REL,
995                                &process_request,
996                                sc);      
997 }
998
999
1000 /**
1001  * Functions of this signature are called whenever data is available from the
1002  * stream.
1003  *
1004  * @param cls the closure from GNUNET_STREAM_read
1005  * @param status the status of the stream at the time this function is called
1006  * @param data traffic from the other side
1007  * @param size the number of bytes available in data read; will be 0 on timeout 
1008  * @return number of bytes of processed from 'data' (any data remaining should be
1009  *         given to the next time the read processor is called).
1010  */
1011 static size_t 
1012 process_request (void *cls,
1013                  enum GNUNET_STREAM_Status status,
1014                  const void *data,
1015                  size_t size)
1016 {
1017   struct StreamClient *sc = cls;
1018   int ret;
1019
1020   sc->rh = NULL;
1021   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022               "Received %u byte query via stream\n",
1023               (unsigned int) size);
1024   switch (status)
1025   {
1026   case GNUNET_STREAM_OK:
1027     ret = 
1028       GNUNET_SERVER_mst_receive (sc->mst,
1029                                  NULL,
1030                                  data, size,
1031                                  GNUNET_NO, GNUNET_YES);
1032     if (GNUNET_NO == ret)
1033       return size; /* more messages in MST */
1034     if (GNUNET_SYSERR == ret)
1035     {
1036       GNUNET_break_op (0);
1037       terminate_stream_async (sc);
1038       return size;
1039     }
1040     break;
1041   case GNUNET_STREAM_TIMEOUT:
1042   case GNUNET_STREAM_SHUTDOWN:
1043   case GNUNET_STREAM_SYSERR:
1044   case GNUNET_STREAM_BROKEN:
1045     terminate_stream_async (sc);
1046     return size;
1047   default:
1048     GNUNET_break (0);
1049     return size;
1050   }
1051   continue_reading (sc);
1052   return size;
1053 }
1054
1055
1056 /**
1057  * Sending a reply was completed, continue processing.
1058  *
1059  * @param cls closure with the struct StreamClient which sent the query
1060  */
1061 static void
1062 write_continuation (void *cls,
1063                     enum GNUNET_STREAM_Status status,
1064                     size_t size)
1065 {
1066   struct StreamClient *sc = cls;
1067   
1068   sc->wh = NULL;
1069   if ( (GNUNET_STREAM_OK == status) &&
1070        (size == sc->reply_size) )
1071   {
1072     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073                 "Transmitted %u byte reply via stream\n",
1074                 (unsigned int) size);
1075     GNUNET_STATISTICS_update (GSF_stats,
1076                               gettext_noop ("# Blocks transferred via stream"), 1,
1077                               GNUNET_NO);
1078     continue_reading (sc);
1079   }
1080   else
1081   {
1082     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083                 "Transmission of reply failed, terminating stream\n");
1084     terminate_stream (sc);    
1085   }
1086 }
1087
1088
1089 /**
1090  * Process a datum that was stored in the datastore.
1091  *
1092  * @param cls closure with the struct StreamClient which sent the query
1093  * @param key key for the content
1094  * @param size number of bytes in data
1095  * @param data content stored
1096  * @param type type of the content
1097  * @param priority priority of the content
1098  * @param anonymity anonymity-level for the content
1099  * @param expiration expiration time for the content
1100  * @param uid unique identifier for the datum;
1101  *        maybe 0 if no unique identifier is available
1102  */
1103 static void 
1104 handle_datastore_reply (void *cls,
1105                         const struct GNUNET_HashCode * key,
1106                         size_t size, const void *data,
1107                         enum GNUNET_BLOCK_Type type,
1108                         uint32_t priority,
1109                         uint32_t anonymity,
1110                         struct GNUNET_TIME_Absolute
1111                         expiration, uint64_t uid)
1112 {
1113   struct StreamClient *sc = cls;
1114   size_t msize = size + sizeof (struct StreamReplyMessage);
1115   char buf[msize] GNUNET_ALIGN;
1116   struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
1117
1118   sc->qe = NULL;
1119   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1120   {
1121     if (GNUNET_OK !=
1122         GNUNET_FS_handle_on_demand_block (key,
1123                                           size, data, type,
1124                                           priority, anonymity,
1125                                           expiration, uid,
1126                                           &handle_datastore_reply,
1127                                           sc))
1128     {
1129       continue_reading (sc);
1130     }
1131     return;
1132   }
1133   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
1134   {
1135     GNUNET_break (0);
1136     continue_reading (sc);
1137     return;
1138   }
1139   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140               "Starting transmission of %u byte reply via stream\n",
1141               (unsigned int) size);
1142   srm->header.size = htons ((uint16_t) msize);
1143   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
1144   srm->type = htonl (type);
1145   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
1146   memcpy (&srm[1], data, size);
1147   sc->reply_size = msize;
1148   sc->wh = GNUNET_STREAM_write (sc->socket,
1149                                 buf, msize,
1150                                 GNUNET_TIME_UNIT_FOREVER_REL,
1151                                 &write_continuation,
1152                                 sc);
1153   if (NULL == sc->wh)
1154   {
1155     terminate_stream (sc);
1156     return;
1157   }
1158 }
1159
1160
1161 /**
1162  * Functions with this signature are called whenever a
1163  * complete query message is received.
1164  *
1165  * Do not call GNUNET_SERVER_mst_destroy in callback
1166  *
1167  * @param cls closure with the 'struct StreamClient'
1168  * @param client identification of the client, NULL
1169  * @param message the actual message
1170  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1171  */
1172 static int
1173 request_cb (void *cls,
1174             void *client,
1175             const struct GNUNET_MessageHeader *message)
1176 {
1177   struct StreamClient *sc = cls;
1178   const struct StreamQueryMessage *sqm;
1179
1180   switch (ntohs (message->type))
1181   {
1182   case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
1183     if (sizeof (struct StreamQueryMessage) != 
1184         ntohs (message->size))
1185     {
1186       GNUNET_break_op (0);
1187       terminate_stream_async (sc);
1188       return GNUNET_SYSERR;
1189     }
1190     sqm = (const struct StreamQueryMessage *) message;
1191     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1192                 "Received query for `%s' via stream\n",
1193                 GNUNET_h2s (&sqm->query));
1194     GNUNET_STATISTICS_update (GSF_stats,
1195                               gettext_noop ("# queries received via stream"), 1,
1196                               GNUNET_NO);
1197     refresh_timeout_task (sc);
1198     sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1199                                        0,
1200                                        &sqm->query,
1201                                        ntohl (sqm->type),
1202                                        0 /* priority */, 
1203                                        GSF_datastore_queue_size,
1204                                        GNUNET_TIME_UNIT_FOREVER_REL,
1205                                        &handle_datastore_reply, sc);
1206     if (NULL == sc->qe)
1207       continue_reading (sc);
1208     return GNUNET_OK;
1209   default:
1210     GNUNET_break_op (0);
1211     terminate_stream_async (sc);
1212     return GNUNET_SYSERR;
1213   }
1214 }
1215
1216
1217 /**
1218  * Functions of this type are called upon new stream connection from other peers
1219  * or upon binding error which happen when the app_port given in
1220  * GNUNET_STREAM_listen() is already taken.
1221  *
1222  * @param cls the closure from GNUNET_STREAM_listen
1223  * @param socket the socket representing the stream; NULL on binding error
1224  * @param initiator the identity of the peer who wants to establish a stream
1225  *            with us; NULL on binding error
1226  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
1227  *             stream (the socket will be invalid after the call)
1228  */
1229 static int 
1230 accept_cb (void *cls,
1231            struct GNUNET_STREAM_Socket *socket,
1232            const struct GNUNET_PeerIdentity *initiator)
1233 {
1234   struct StreamClient *sc;
1235
1236   if (NULL == socket)
1237     return GNUNET_SYSERR;
1238   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1239               "Accepting inbound stream connection from `%s'\n",
1240               GNUNET_i2s (initiator));
1241   GNUNET_STATISTICS_update (GSF_stats,
1242                             gettext_noop ("# stream connections active"), 1,
1243                             GNUNET_NO);
1244   sc = GNUNET_malloc (sizeof (struct StreamClient));
1245   sc->socket = socket;
1246   sc->mst = GNUNET_SERVER_mst_create (&request_cb,
1247                                       sc);
1248   sc->rh = GNUNET_STREAM_read (sc->socket,
1249                                GNUNET_TIME_UNIT_FOREVER_REL,
1250                                &process_request,
1251                                sc);
1252   GNUNET_CONTAINER_DLL_insert (sc_head,
1253                                sc_tail,
1254                                sc);
1255   refresh_timeout_task (sc);
1256   return GNUNET_OK;
1257 }
1258
1259
1260 /**
1261  * Initialize subsystem for non-anonymous file-sharing.
1262  */
1263 void
1264 GSF_stream_start ()
1265 {
1266   stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1267   listen_socket = GNUNET_STREAM_listen (GSF_cfg,
1268                                         GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1269                                         &accept_cb, NULL,
1270                                         GNUNET_STREAM_OPTION_END);
1271 }
1272
1273
1274 /**
1275  * Function called on each active streams to shut them down.
1276  *
1277  * @param cls NULL
1278  * @param key target peer, unused
1279  * @param value the 'struct StreamHandle' to destroy
1280  * @return GNUNET_YES (continue to iterate)
1281  */
1282 static int
1283 release_streams (void *cls,
1284                  const struct GNUNET_HashCode *key,
1285                  void *value)
1286 {
1287   struct StreamHandle *sh = value;
1288
1289   destroy_stream_handle (sh);
1290   return GNUNET_YES;
1291 }
1292
1293
1294 /**
1295  * Shutdown subsystem for non-anonymous file-sharing.
1296  */
1297 void
1298 GSF_stream_stop ()
1299 {
1300   struct StreamClient *sc;
1301
1302   while (NULL != (sc = sc_head))
1303     terminate_stream (sc);
1304   if (NULL != listen_socket)
1305   {
1306     GNUNET_STREAM_listen_close (listen_socket);
1307     listen_socket = NULL;
1308   }
1309   GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1310                                          &release_streams,
1311                                          NULL);
1312   GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1313   stream_map = NULL;
1314 }
1315
1316 /* end of gnunet-service-fs_stream.c */