-towards using mesh instead of stream
[oweals/gnunet.git] / src / fs / gnunet-service-fs_stream.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_stream.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - update comments on functions (still matches 'stream')
28  * - MESH2 API doesn't allow flow control for server yet (needed!)
29  * - likely need to register clean up handler with mesh to handle
30  *   client disconnect (likely leaky right now)
31  * - server is optional, currently client code will NPE if we have
32  *   no server, again MESH2 API requirement forcing this for now
33  * - message handlers are symmetric for client/server, should be
34  *   separated (currently clients can get requests and servers can
35  *   handle answers, not good)
36  * - code is entirely untested
37  * - might have overlooked a few possible simplifications
38  * - PORT is set to old application type, unsure if we should keep
39  *   it that way (fine for now)
40  */
41 #include "platform.h"
42 #include "gnunet_constants.h"
43 #include "gnunet_util_lib.h"
44 #include "gnunet_mesh2_service.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_applications.h"
47 #include "gnunet-service-fs.h"
48 #include "gnunet-service-fs_indexing.h"
49 #include "gnunet-service-fs_stream.h"
50
51 /**
52  * After how long do we termiante idle connections?
53  */
54 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
55
56 /**
57  * After how long do we reset connections without replies?
58  */
59 #define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
60
61
62 /**
63  * A message in the queue to be written to the stream.
64  */
65 struct WriteQueueItem
66 {
67   /**
68    * Kept in a DLL.
69    */
70   struct WriteQueueItem *next;
71
72   /**
73    * Kept in a DLL.
74    */
75   struct WriteQueueItem *prev;
76
77   /**
78    * Number of bytes of payload, allocated at the end of this struct.
79    */
80   size_t msize;
81 };
82
83
84 /**
85  * Information we keep around for each active streaming client.
86  */
87 struct StreamClient
88 {
89   /**
90    * DLL
91    */ 
92   struct StreamClient *next;
93
94   /**
95    * DLL
96    */ 
97   struct StreamClient *prev;
98
99   /**
100    * Socket for communication.
101    */ 
102   struct GNUNET_MESH_Tunnel *socket;
103
104   /**
105    * Handle for active write operation, or NULL.
106    */ 
107   struct GNUNET_MESH_TransmitHandle *wh;
108
109   /**
110    * Head of write queue.
111    */
112   struct WriteQueueItem *wqi_head;
113
114   /**
115    * Tail of write queue.
116    */
117   struct WriteQueueItem *wqi_tail;
118   
119   /**
120    * Current active request to the datastore, if we have one pending.
121    */
122   struct GNUNET_DATASTORE_QueueEntry *qe;
123
124   /**
125    * Task that is scheduled to asynchronously terminate the connection.
126    */
127   GNUNET_SCHEDULER_TaskIdentifier terminate_task;
128
129   /**
130    * Task that is scheduled to terminate idle connections.
131    */
132   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
133
134   /**
135    * Size of the last write that was initiated.
136    */ 
137   size_t reply_size;
138
139 };
140
141
142 /**
143  * Query from one peer, asking the other for CHK-data.
144  */
145 struct StreamQueryMessage
146 {
147
148   /**
149    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
150    */
151   struct GNUNET_MessageHeader header;
152
153   /**
154    * Block type must be DBLOCK or IBLOCK.
155    */
156   uint32_t type;
157
158   /**
159    * Query hash from CHK (hash of encrypted block).
160    */
161   struct GNUNET_HashCode query;
162
163 };
164
165
166 /**
167  * Reply to a StreamQueryMessage.
168  */
169 struct StreamReplyMessage
170 {
171
172   /**
173    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
174    */
175   struct GNUNET_MessageHeader header;
176
177   /**
178    * Block type must be DBLOCK or IBLOCK.
179    */
180   uint32_t type;
181
182   /**
183    * Expiration time for the block.
184    */
185   struct GNUNET_TIME_AbsoluteNBO expiration;
186
187   /* followed by the encrypted block */
188
189 };
190
191
192 /** 
193  * Handle for a stream to another peer.
194  */
195 struct StreamHandle;
196
197
198 /**
199  * Handle for a request that is going out via stream API.
200  */
201 struct GSF_StreamRequest
202 {
203
204   /**
205    * DLL.
206    */
207   struct GSF_StreamRequest *next;
208
209   /**
210    * DLL.
211    */
212   struct GSF_StreamRequest *prev;
213
214   /**
215    * Which stream is this request associated with?
216    */
217   struct StreamHandle *sh;
218
219   /**
220    * Function to call with the result.
221    */
222   GSF_StreamReplyProcessor proc;
223
224   /**
225    * Closure for 'proc'
226    */
227   void *proc_cls;
228
229   /**
230    * Query to transmit to the other peer.
231    */
232   struct GNUNET_HashCode query;
233
234   /**
235    * Desired type for the reply.
236    */
237   enum GNUNET_BLOCK_Type type;
238
239   /**
240    * Did we transmit this request already? YES if we are
241    * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
242    */
243   int was_transmitted;
244 };
245
246
247 /** 
248  * Handle for a stream to another peer.
249  */
250 struct StreamHandle
251 {
252   /**
253    * Head of DLL of pending requests on this stream.
254    */
255   struct GSF_StreamRequest *pending_head;
256
257   /**
258    * Tail of DLL of pending requests on this stream.
259    */
260   struct GSF_StreamRequest *pending_tail;
261
262   /**
263    * Map from query to 'struct GSF_StreamRequest's waiting for
264    * a reply.
265    */
266   struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
267
268   /**
269    * Connection to the other peer.
270    */
271   struct GNUNET_MESH_Tunnel *stream;
272
273   /**
274    * Handle for active write operation, or NULL.
275    */ 
276   struct GNUNET_MESH_TransmitHandle *wh;
277
278   /**
279    * Which peer does this stream go to?
280    */ 
281   struct GNUNET_PeerIdentity target;
282
283   /**
284    * Task to kill inactive streams (we keep them around for
285    * a few seconds to give the application a chance to give
286    * us another query).
287    */
288   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
289
290   /**
291    * Task to reset streams that had errors (asynchronously,
292    * as we may not be able to do it immediately during a
293    * callback from the stream API).
294    */
295   GNUNET_SCHEDULER_TaskIdentifier reset_task;
296
297   /**
298    * Is this stream ready for transmission?
299    */
300   int is_ready;
301
302 };
303
304
305 /**
306  * Listen socket for incoming requests.
307  */
308 static struct GNUNET_MESH_Handle *listen_socket;
309
310 /**
311  * Head of DLL of stream clients.
312  */ 
313 static struct StreamClient *sc_head;
314
315 /**
316  * Tail of DLL of stream clients.
317  */ 
318 static struct StreamClient *sc_tail;
319
320 /**
321  * Number of active stream clients in the 'sc_*'-DLL.
322  */
323 static unsigned int sc_count;
324
325 /**
326  * Maximum allowed number of stream clients.
327  */
328 static unsigned long long sc_count_max;
329
330 /**
331  * Map from peer identities to 'struct StreamHandles' with streams to
332  * those peers.
333  */
334 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
335
336
337 /* ********************* client-side code ************************* */
338
339 /**
340  * Iterator called on each entry in a waiting map to 
341  * call the 'proc' continuation and release associated
342  * resources.
343  *
344  * @param cls the 'struct StreamHandle'
345  * @param key the key of the entry in the map (the query)
346  * @param value the 'struct GSF_StreamRequest' to clean up
347  * @return GNUNET_YES (continue to iterate)
348  */
349 static int
350 free_waiting_entry (void *cls,
351                     const struct GNUNET_HashCode *key,
352                     void *value)
353 {
354   struct GSF_StreamRequest *sr = value;
355
356   sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
357             GNUNET_TIME_UNIT_FOREVER_ABS,
358             0, NULL);
359   GSF_stream_query_cancel (sr);
360   return GNUNET_YES;
361 }
362
363
364 /**
365  * Destroy a stream handle.
366  *
367  * @param sh stream to process
368  */
369 static void
370 destroy_stream_handle (struct StreamHandle *sh)
371 {
372   struct GSF_StreamRequest *sr;
373
374   while (NULL != (sr = sh->pending_head))
375   {
376     sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
377               GNUNET_TIME_UNIT_FOREVER_ABS,
378               0, NULL);
379     GSF_stream_query_cancel (sr);
380   }
381   GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
382                                          &free_waiting_entry,
383                                          sh);
384   if (NULL != sh->wh)
385     GNUNET_MESH_notify_transmit_ready_cancel (sh->wh);
386   if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
387     GNUNET_SCHEDULER_cancel (sh->timeout_task);
388   if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
389     GNUNET_SCHEDULER_cancel (sh->reset_task);
390   GNUNET_MESH_tunnel_destroy (sh->stream);
391   GNUNET_assert (GNUNET_OK ==
392                  GNUNET_CONTAINER_multihashmap_remove (stream_map,
393                                                        &sh->target.hashPubKey,
394                                                        sh));
395   GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
396   GNUNET_free (sh);
397 }
398
399
400 /**
401  * Transmit pending requests via the stream.
402  *
403  * @param sh stream to process
404  */
405 static void
406 transmit_pending (struct StreamHandle *sh);
407
408
409 /**
410  * Iterator called on each entry in a waiting map to 
411  * move it back to the pending list.
412  *
413  * @param cls the 'struct StreamHandle'
414  * @param key the key of the entry in the map (the query)
415  * @param value the 'struct GSF_StreamRequest' to move to pending
416  * @return GNUNET_YES (continue to iterate)
417  */
418 static int
419 move_to_pending (void *cls,
420                  const struct GNUNET_HashCode *key,
421                  void *value)
422 {
423   struct StreamHandle *sh = cls;
424   struct GSF_StreamRequest *sr = value;
425   
426   GNUNET_assert (GNUNET_YES ==
427                  GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
428                                                        key,
429                                                        value));
430   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
431                                sh->pending_tail,
432                                sr);
433   sr->was_transmitted = GNUNET_NO;
434   return GNUNET_YES;
435 }
436
437
438 /**
439  * We had a serious error, tear down and re-create stream from scratch.
440  *
441  * @param sh stream to reset
442  */
443 static void
444 reset_stream (struct StreamHandle *sh)
445 {
446   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
447               "Resetting stream to %s\n",
448               GNUNET_i2s (&sh->target));
449   GNUNET_MESH_tunnel_destroy (sh->stream);
450   sh->is_ready = GNUNET_NO;
451   GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
452                                          &move_to_pending,
453                                          sh);
454   sh->stream = GNUNET_MESH_tunnel_create (listen_socket,
455                                           sh,                               
456                                           &sh->target,
457                                           GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
458 }
459
460
461 /**
462  * Task called when it is time to destroy an inactive stream.
463  *
464  * @param cls the 'struct StreamHandle' to tear down
465  * @param tc scheduler context, unused
466  */
467 static void
468 stream_timeout (void *cls,
469                 const struct GNUNET_SCHEDULER_TaskContext *tc)
470 {
471   struct StreamHandle *sh = cls;
472
473   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
474               "Timeout on stream to %s\n",
475               GNUNET_i2s (&sh->target));
476   sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
477   destroy_stream_handle (sh);
478 }
479
480
481 /**
482  * Task called when it is time to reset an stream.
483  *
484  * @param cls the 'struct StreamHandle' to tear down
485  * @param tc scheduler context, unused
486  */
487 static void
488 reset_stream_task (void *cls,
489                    const struct GNUNET_SCHEDULER_TaskContext *tc)
490 {
491   struct StreamHandle *sh = cls;
492
493   sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
494   reset_stream (sh);
495 }
496
497
498 /**
499  * We had a serious error, tear down and re-create stream from scratch,
500  * but do so asynchronously.
501  *
502  * @param sh stream to reset
503  */
504 static void
505 reset_stream_async (struct StreamHandle *sh)
506 {
507   if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
508     GNUNET_SCHEDULER_cancel (sh->reset_task);
509   sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
510                                              sh);
511 }
512
513
514 /**
515  * Functions of this signature are called whenever we are ready to transmit
516  * query via a stream.
517  *
518  * @param cls the struct StreamHandle for which we did the write call
519  * @param size the number of bytes that can be written to 'buf'
520  * @param buf where to write the message
521  * @return number of bytes written to 'buf'
522  */
523 static size_t
524 transmit_sqm (void *cls,
525               size_t size,
526               void *buf)
527 {
528   struct StreamHandle *sh = cls;
529   struct StreamQueryMessage sqm;
530   struct GSF_StreamRequest *sr;
531
532   sh->wh = NULL;
533   if (NULL == buf)
534   {
535     reset_stream (sh);
536     return 0;
537   }
538   sr = sh->pending_head;
539   if (NULL == sr)
540     return 0;
541   GNUNET_assert (size >= sizeof (struct StreamQueryMessage));
542   GNUNET_CONTAINER_DLL_remove (sh->pending_head,
543                                sh->pending_tail,
544                                sr);
545   GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
546                                      &sr->query,
547                                      sr,
548                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550               "Sending query via stream to %s\n",
551               GNUNET_i2s (&sh->target));
552   sr->was_transmitted = GNUNET_YES;
553   sqm.header.size = htons (sizeof (sqm));
554   sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
555   sqm.type = htonl (sr->type);
556   sqm.query = sr->query;
557   memcpy (buf, &sqm, sizeof (sqm));
558   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
559               "Successfully transmitted %u bytes via mesh to %s\n",
560               (unsigned int) size,
561               GNUNET_i2s (&sh->target));
562   transmit_pending (sh);
563   return sizeof (sqm);
564 }
565           
566
567 /**
568  * Transmit pending requests via the stream.
569  *
570  * @param sh stream to process
571  */
572 static void
573 transmit_pending (struct StreamHandle *sh)
574 {
575   if (NULL != sh->wh)
576     return;
577   sh->wh = GNUNET_MESH_notify_transmit_ready (sh->stream, GNUNET_YES /* allow cork */,
578                                               GNUNET_TIME_UNIT_FOREVER_REL,
579                                               sizeof (struct StreamQueryMessage),
580                                               &transmit_sqm, sh);
581 }
582
583
584 /**
585  * Closure for 'handle_reply'.
586  */
587 struct HandleReplyClosure
588 {
589
590   /**
591    * Reply payload.
592    */ 
593   const void *data;
594
595   /**
596    * Expiration time for the block.
597    */
598   struct GNUNET_TIME_Absolute expiration;
599
600   /**
601    * Number of bytes in 'data'.
602    */
603   size_t data_size;
604
605   /** 
606    * Type of the block.
607    */
608   enum GNUNET_BLOCK_Type type;
609   
610   /**
611    * Did we have a matching query?
612    */
613   int found;
614 };
615
616
617 /**
618  * Iterator called on each entry in a waiting map to 
619  * process a result.
620  *
621  * @param cls the 'struct HandleReplyClosure'
622  * @param key the key of the entry in the map (the query)
623  * @param value the 'struct GSF_StreamRequest' to handle result for
624  * @return GNUNET_YES (continue to iterate)
625  */
626 static int
627 handle_reply (void *cls,
628               const struct GNUNET_HashCode *key,
629               void *value)
630 {
631   struct HandleReplyClosure *hrc = cls;
632   struct GSF_StreamRequest *sr = value;
633   
634   sr->proc (sr->proc_cls,
635             hrc->type,
636             hrc->expiration,
637             hrc->data_size,
638             hrc->data);
639   GSF_stream_query_cancel (sr);
640   hrc->found = GNUNET_YES;
641   return GNUNET_YES;
642 }
643
644
645 /**
646  * Functions with this signature are called whenever a
647  * complete reply is received.
648  *
649  * @param cls closure with the 'struct StreamHandle'
650  * @param client identification of the client, NULL
651  * @param message the actual message
652  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
653  */
654 static int
655 reply_cb (void *cls,
656           struct GNUNET_MESH_Tunnel *tunnel,
657           void **tunnel_ctx,
658           const struct GNUNET_PeerIdentity *sender,
659           const struct GNUNET_MessageHeader *message)
660 {
661   struct StreamHandle *sh = *tunnel_ctx;
662   const struct StreamReplyMessage *srm;
663   struct HandleReplyClosure hrc;
664   uint16_t msize;
665   enum GNUNET_BLOCK_Type type;
666   struct GNUNET_HashCode query;
667
668   msize = ntohs (message->size);
669   if (sizeof (struct StreamReplyMessage) > msize)
670   {
671     GNUNET_break_op (0);
672     reset_stream_async (sh);
673     return GNUNET_SYSERR;
674   }
675   srm = (const struct StreamReplyMessage *) message;
676   msize -= sizeof (struct StreamReplyMessage);
677   type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
678   if (GNUNET_YES !=
679       GNUNET_BLOCK_get_key (GSF_block_ctx,
680                             type,
681                             &srm[1], msize, &query))
682   {
683     GNUNET_break_op (0); 
684     reset_stream_async (sh);
685     return GNUNET_SYSERR;
686   }
687   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688               "Received reply `%s' via stream\n",
689               GNUNET_h2s (&query));
690   GNUNET_STATISTICS_update (GSF_stats,
691                             gettext_noop ("# replies received via stream"), 1,
692                             GNUNET_NO);
693   hrc.data = &srm[1];
694   hrc.data_size = msize;
695   hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
696   hrc.type = type;
697   hrc.found = GNUNET_NO;
698   GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
699                                               &query,
700                                               &handle_reply,
701                                               &hrc);
702   if (GNUNET_NO == hrc.found)
703   {
704     GNUNET_STATISTICS_update (GSF_stats,
705                               gettext_noop ("# replies received via stream dropped"), 1,
706                               GNUNET_NO);
707     return GNUNET_OK;
708   }
709   return GNUNET_OK;
710 }
711
712
713 /**
714  * Get (or create) a stream to talk to the given peer.
715  *
716  * @param target peer we want to communicate with
717  */
718 static struct StreamHandle *
719 get_stream (const struct GNUNET_PeerIdentity *target)
720 {
721   struct StreamHandle *sh;
722
723   sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
724                                           &target->hashPubKey);
725   if (NULL != sh)
726   {
727     if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
728     {
729       GNUNET_SCHEDULER_cancel (sh->timeout_task);
730       sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
731     }
732     return sh;
733   }
734   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735               "Creating stream to %s\n",
736               GNUNET_i2s (target));
737   sh = GNUNET_malloc (sizeof (struct StreamHandle));
738   sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
739                                                  &reset_stream_task,
740                                                  sh);
741   sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
742   sh->target = *target;
743   sh->stream = GNUNET_MESH_tunnel_create (listen_socket,
744                                           sh,
745                                           &sh->target,
746                                           GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
747   GNUNET_assert (GNUNET_OK ==
748                  GNUNET_CONTAINER_multihashmap_put (stream_map,
749                                                     &sh->target.hashPubKey,
750                                                     sh,
751                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
752   return sh;
753 }
754
755
756 /**
757  * Look for a block by directly contacting a particular peer.
758  *
759  * @param target peer that should have the block
760  * @param query hash to query for the block
761  * @param type desired type for the block
762  * @param proc function to call with result
763  * @param proc_cls closure for 'proc'
764  * @return handle to cancel the operation
765  */
766 struct GSF_StreamRequest *
767 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
768                   const struct GNUNET_HashCode *query,
769                   enum GNUNET_BLOCK_Type type,
770                   GSF_StreamReplyProcessor proc, void *proc_cls)
771 {
772   struct StreamHandle *sh;
773   struct GSF_StreamRequest *sr;
774
775   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776               "Preparing to send query for %s via stream to %s\n",
777               GNUNET_h2s (query),
778               GNUNET_i2s (target));
779   sh = get_stream (target);
780   sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
781   sr->sh = sh;
782   sr->proc = proc;
783   sr->proc_cls = proc_cls;
784   sr->type = type;
785   sr->query = *query;
786   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
787                                sh->pending_tail,
788                                sr);
789   if (GNUNET_YES == sh->is_ready)
790     transmit_pending (sh);
791   return sr;
792 }
793
794
795 /**
796  * Cancel an active request; must not be called after 'proc'
797  * was calld.
798  *
799  * @param sr request to cancel
800  */
801 void
802 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
803 {
804   struct StreamHandle *sh = sr->sh;
805
806   if (GNUNET_YES == sr->was_transmitted)
807     GNUNET_assert (GNUNET_OK ==
808                    GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
809                                                          &sr->query,
810                                                          sr));
811   else
812     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
813                                  sh->pending_tail,
814                                  sr);
815   GNUNET_free (sr);
816   if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
817        (NULL == sh->pending_head) )
818     sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
819                                                      &stream_timeout,
820                                                      sh);
821 }
822
823
824 /* ********************* server-side code ************************* */
825
826
827 /**
828  * We're done with a particular client, clean up.
829  *
830  * @param sc client to clean up
831  */
832 static void
833 terminate_stream (struct StreamClient *sc)
834 {
835   GNUNET_STATISTICS_update (GSF_stats,
836                             gettext_noop ("# stream connections active"), -1,
837                             GNUNET_NO);
838   if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
839     GNUNET_SCHEDULER_cancel (sc->terminate_task); 
840   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
841     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
842   if (NULL != sc->wh)
843     GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
844   if (NULL != sc->qe)
845     GNUNET_DATASTORE_cancel (sc->qe);
846   GNUNET_MESH_tunnel_destroy (sc->socket);
847   struct WriteQueueItem *wqi;
848   while (NULL != (wqi = sc->wqi_head))
849   {
850     GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
851                                  sc->wqi_tail,
852                                  wqi);
853     GNUNET_free (wqi);
854   }
855   GNUNET_CONTAINER_DLL_remove (sc_head,
856                                sc_tail,
857                                sc);
858   sc_count--;
859   GNUNET_free (sc);
860 }
861
862
863 /**
864  * Task run to asynchronously terminate the stream due to timeout.
865  *
866  * @param cls the 'struct StreamClient'
867  * @param tc scheduler context
868  */ 
869 static void
870 timeout_stream_task (void *cls,
871                      const struct GNUNET_SCHEDULER_TaskContext *tc)
872 {
873   struct StreamClient *sc = cls;
874
875   sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
876   terminate_stream (sc);
877 }
878
879
880 /**
881  * Reset the timeout for the stream client (due to activity).
882  *
883  * @param sc client handle to reset timeout for
884  */
885 static void
886 refresh_timeout_task (struct StreamClient *sc)
887 {
888   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
889     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
890   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
891                                                    &timeout_stream_task,
892                                                    sc);
893 }
894
895
896 /**
897  * We're done handling a request from a client, read the next one.
898  *
899  * @param sc client to continue reading requests from
900  */
901 static void
902 continue_reading (struct StreamClient *sc)
903 {
904   refresh_timeout_task (sc);
905 }
906
907
908 /**
909  * Transmit the next entry from the write queue.
910  *
911  * @param sc where to process the write queue
912  */
913 static void
914 continue_writing (struct StreamClient *sc);
915
916
917 /**
918  * Send a reply now, mesh is ready.
919  *
920  * @param cls closure with the struct StreamClient which sent the query
921  * @param size number of bytes available in 'buf'
922  * @param buf where to write the message
923  * @return number of bytes written to 'buf'
924  */
925 static size_t
926 write_continuation (void *cls,
927                     size_t size,
928                     void *buf)
929 {
930   struct StreamClient *sc = cls;
931   struct WriteQueueItem *wqi;
932   size_t ret;
933
934   sc->wh = NULL;
935   if (NULL == (wqi = sc->wqi_head))
936   {
937     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938                 "Write queue empty, reading more requests\n");
939     return 0;
940   }
941   if (0 == size)
942   {
943     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
944                 "Transmission of reply failed, terminating stream\n");
945     terminate_stream (sc);    
946     return 0;
947   }
948   GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
949                                sc->wqi_tail,
950                                wqi);
951   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
952               "Transmitted %u byte reply via stream\n",
953               (unsigned int) size);
954   GNUNET_STATISTICS_update (GSF_stats,
955                             gettext_noop ("# Blocks transferred via stream"), 1,
956                             GNUNET_NO);
957   memcpy (buf, &wqi[1], ret = wqi->msize);
958   GNUNET_free (wqi);
959   continue_writing (sc);
960   return ret;
961 }
962
963
964 /**
965  * Transmit the next entry from the write queue.
966  *
967  * @param sc where to process the write queue
968  */
969 static void
970 continue_writing (struct StreamClient *sc)
971 {
972   struct WriteQueueItem *wqi;
973
974   if (NULL != sc->wh)
975   {
976     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
977                 "Write pending, waiting for it to complete\n");
978     return; /* write already pending */
979   }
980   if (NULL == (wqi = sc->wqi_head))
981   {
982     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
983                 "Write queue empty, reading more requests\n");
984     continue_reading (sc);
985     return;
986   }
987   sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
988                                               GNUNET_TIME_UNIT_FOREVER_REL,
989                                               wqi->msize,                                     
990                                               &write_continuation,
991                                               sc);
992   if (NULL == sc->wh)
993   {
994     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
995                 "Write failed; terminating stream\n");
996     terminate_stream (sc);
997     return;
998   }
999 }
1000
1001
1002 /**
1003  * Process a datum that was stored in the datastore.
1004  *
1005  * @param cls closure with the struct StreamClient which sent the query
1006  * @param key key for the content
1007  * @param size number of bytes in data
1008  * @param data content stored
1009  * @param type type of the content
1010  * @param priority priority of the content
1011  * @param anonymity anonymity-level for the content
1012  * @param expiration expiration time for the content
1013  * @param uid unique identifier for the datum;
1014  *        maybe 0 if no unique identifier is available
1015  */
1016 static void 
1017 handle_datastore_reply (void *cls,
1018                         const struct GNUNET_HashCode * key,
1019                         size_t size, const void *data,
1020                         enum GNUNET_BLOCK_Type type,
1021                         uint32_t priority,
1022                         uint32_t anonymity,
1023                         struct GNUNET_TIME_Absolute
1024                         expiration, uint64_t uid)
1025 {
1026   struct StreamClient *sc = cls;
1027   size_t msize = size + sizeof (struct StreamReplyMessage);
1028   struct WriteQueueItem *wqi;
1029   struct StreamReplyMessage *srm;
1030
1031   sc->qe = NULL;
1032   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1033   {
1034     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1035                 "Performing on-demand encoding\n");
1036     if (GNUNET_OK !=
1037         GNUNET_FS_handle_on_demand_block (key,
1038                                           size, data, type,
1039                                           priority, anonymity,
1040                                           expiration, uid,
1041                                           &handle_datastore_reply,
1042                                           sc))
1043     {
1044       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1045                   "On-demand encoding request failed\n");
1046       continue_writing (sc);
1047     }
1048     return;
1049   }
1050   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
1051   {
1052     GNUNET_break (0);
1053     continue_writing (sc);
1054     return;
1055   }
1056   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057               "Starting transmission of %u byte reply for query `%s' via stream\n",
1058               (unsigned int) size,
1059               GNUNET_h2s (key));
1060   wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
1061   wqi->msize = msize;
1062   srm = (struct StreamReplyMessage *) &wqi[1];
1063   srm->header.size = htons ((uint16_t) msize);
1064   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
1065   srm->type = htonl (type);
1066   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
1067   memcpy (&srm[1], data, size);
1068   sc->reply_size = msize;
1069   GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
1070                                sc->wqi_tail,
1071                                wqi);
1072   continue_writing (sc);
1073 }
1074
1075
1076 /**
1077  * Functions with this signature are called whenever a
1078  * complete query message is received.
1079  *
1080  * Do not call GNUNET_SERVER_mst_destroy in callback
1081  *
1082  * @param cls closure with the 'struct StreamClient'
1083  * @param client identification of the client, NULL
1084  * @param message the actual message
1085  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1086  */
1087 static int
1088 request_cb (void *cls,
1089             struct GNUNET_MESH_Tunnel *tunnel,
1090             void **tunnel_ctx,
1091             const struct GNUNET_PeerIdentity *sender,
1092             const struct GNUNET_MessageHeader *message)
1093 {
1094   struct StreamClient *sc = *tunnel_ctx;
1095   const struct StreamQueryMessage *sqm;
1096
1097   sqm = (const struct StreamQueryMessage *) message;
1098   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1099               "Received query for `%s' via stream\n",
1100               GNUNET_h2s (&sqm->query));
1101   GNUNET_STATISTICS_update (GSF_stats,
1102                             gettext_noop ("# queries received via stream"), 1,
1103                             GNUNET_NO);
1104   refresh_timeout_task (sc);
1105   sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1106                                      0,
1107                                      &sqm->query,
1108                                      ntohl (sqm->type),
1109                                      0 /* priority */, 
1110                                      GSF_datastore_queue_size,
1111                                      GNUNET_TIME_UNIT_FOREVER_REL,
1112                                      &handle_datastore_reply, sc);
1113   if (NULL == sc->qe)
1114   {
1115     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1116                 "Queueing request with datastore failed (queue full?)\n");
1117     continue_writing (sc);
1118   }
1119   return GNUNET_OK;
1120 }
1121
1122
1123 /**
1124  * Functions of this type are called upon new stream connection from other peers
1125  * or upon binding error which happen when the app_port given in
1126  * GNUNET_STREAM_listen() is already taken.
1127  *
1128  * @param cls the closure from GNUNET_STREAM_listen
1129  * @param socket the socket representing the stream
1130  * @param initiator the identity of the peer who wants to establish a stream
1131  *            with us; NULL on binding error
1132  * @return initial tunnel context (our 'struct StreamClient')
1133  */
1134 static void *
1135 accept_cb (void *cls,
1136            struct GNUNET_MESH_Tunnel *socket,
1137            const struct GNUNET_PeerIdentity *initiator,
1138            uint32_t port)
1139 {
1140   struct StreamClient *sc;
1141
1142   GNUNET_assert (NULL != socket);
1143   if (sc_count >= sc_count_max)
1144   {
1145     GNUNET_STATISTICS_update (GSF_stats,
1146                               gettext_noop ("# stream client connections rejected"), 1,
1147                               GNUNET_NO);
1148     GNUNET_MESH_tunnel_destroy (socket);
1149     return NULL;
1150   }
1151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152               "Accepting inbound stream connection from `%s'\n",
1153               GNUNET_i2s (initiator));
1154   GNUNET_STATISTICS_update (GSF_stats,
1155                             gettext_noop ("# stream connections active"), 1,
1156                             GNUNET_NO);
1157   sc = GNUNET_malloc (sizeof (struct StreamClient));
1158   sc->socket = socket;
1159   GNUNET_CONTAINER_DLL_insert (sc_head,
1160                                sc_tail,
1161                                sc);
1162   sc_count++;
1163   refresh_timeout_task (sc);
1164   return sc;
1165 }
1166
1167
1168 /**
1169  * Initialize subsystem for non-anonymous file-sharing.
1170  */
1171 void
1172 GSF_stream_start ()
1173 {
1174   static const struct GNUNET_MESH_MessageHandler handlers[] = {
1175     { &request_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY, sizeof (struct StreamQueryMessage)},
1176     { &reply_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY, 0 },
1177     { NULL, 0, 0 }
1178   };
1179   static const uint32_t ports[] = {
1180     GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1181     0
1182   };
1183
1184   stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1185   if (GNUNET_YES ==
1186       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1187                                              "fs",
1188                                              "MAX_STREAM_CLIENTS",
1189                                              &sc_count_max))
1190   {
1191     listen_socket = GNUNET_MESH_connect (GSF_cfg,
1192                                          NULL,
1193                                          &accept_cb,
1194                                          NULL /* FIXME: have a cleanup callback? */,
1195                                          handlers,
1196                                          ports);
1197   } 
1198 }
1199
1200
1201 /**
1202  * Function called on each active streams to shut them down.
1203  *
1204  * @param cls NULL
1205  * @param key target peer, unused
1206  * @param value the 'struct StreamHandle' to destroy
1207  * @return GNUNET_YES (continue to iterate)
1208  */
1209 static int
1210 release_streams (void *cls,
1211                  const struct GNUNET_HashCode *key,
1212                  void *value)
1213 {
1214   struct StreamHandle *sh = value;
1215
1216   destroy_stream_handle (sh);
1217   return GNUNET_YES;
1218 }
1219
1220
1221 /**
1222  * Shutdown subsystem for non-anonymous file-sharing.
1223  */
1224 void
1225 GSF_stream_stop ()
1226 {
1227   struct StreamClient *sc;
1228
1229   while (NULL != (sc = sc_head))
1230     terminate_stream (sc);
1231   if (NULL != listen_socket)
1232   {
1233     GNUNET_MESH_disconnect (listen_socket);
1234     listen_socket = NULL;
1235   }
1236   GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1237                                          &release_streams,
1238                                          NULL);
1239   GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1240   stream_map = NULL;
1241 }
1242
1243 /* end of gnunet-service-fs_stream.c */