-fix
[oweals/gnunet.git] / src / fs / gnunet-service-fs_stream.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_stream.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_constants.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet-service-fs.h"
33 #include "gnunet-service-fs_indexing.h"
34 #include "gnunet-service-fs_stream.h"
35
36 /**
37  * After how long do we termiante idle connections?
38  */
39 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
40
41
42 /**
43  * A message in the queue to be written to the stream.
44  */
45 struct WriteQueueItem
46 {
47   /**
48    * Kept in a DLL.
49    */
50   struct WriteQueueItem *next;
51
52   /**
53    * Kept in a DLL.
54    */
55   struct WriteQueueItem *prev;
56
57   /**
58    * Number of bytes of payload, allocated at the end of this struct.
59    */
60   size_t msize;
61 };
62
63
64 /**
65  * Information we keep around for each active streaming client.
66  */
67 struct StreamClient
68 {
69   /**
70    * DLL
71    */ 
72   struct StreamClient *next;
73
74   /**
75    * DLL
76    */ 
77   struct StreamClient *prev;
78
79   /**
80    * Socket for communication.
81    */ 
82   struct GNUNET_STREAM_Socket *socket;
83
84   /**
85    * Handle for active read operation, or NULL.
86    */ 
87   struct GNUNET_STREAM_IOReadHandle *rh;
88
89   /**
90    * Handle for active write operation, or NULL.
91    */ 
92   struct GNUNET_STREAM_IOWriteHandle *wh;
93
94   /**
95    * Head of write queue.
96    */
97   struct WriteQueueItem *wqi_head;
98
99   /**
100    * Tail of write queue.
101    */
102   struct WriteQueueItem *wqi_tail;
103   
104   /**
105    * Tokenizer for requests.
106    */
107   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
108   
109   /**
110    * Current active request to the datastore, if we have one pending.
111    */
112   struct GNUNET_DATASTORE_QueueEntry *qe;
113
114   /**
115    * Task that is scheduled to asynchronously terminate the connection.
116    */
117   GNUNET_SCHEDULER_TaskIdentifier terminate_task;
118
119   /**
120    * Task that is scheduled to terminate idle connections.
121    */
122   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
123
124   /**
125    * Size of the last write that was initiated.
126    */ 
127   size_t reply_size;
128
129 };
130
131
132 /**
133  * Query from one peer, asking the other for CHK-data.
134  */
135 struct StreamQueryMessage
136 {
137
138   /**
139    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
140    */
141   struct GNUNET_MessageHeader header;
142
143   /**
144    * Block type must be DBLOCK or IBLOCK.
145    */
146   uint32_t type;
147
148   /**
149    * Query hash from CHK (hash of encrypted block).
150    */
151   struct GNUNET_HashCode query;
152
153 };
154
155
156 /**
157  * Reply to a StreamQueryMessage.
158  */
159 struct StreamReplyMessage
160 {
161
162   /**
163    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
164    */
165   struct GNUNET_MessageHeader header;
166
167   /**
168    * Block type must be DBLOCK or IBLOCK.
169    */
170   uint32_t type;
171
172   /**
173    * Expiration time for the block.
174    */
175   struct GNUNET_TIME_AbsoluteNBO expiration;
176
177   /* followed by the encrypted block */
178
179 };
180
181
182 /** 
183  * Handle for a stream to another peer.
184  */
185 struct StreamHandle;
186
187
188 /**
189  * Handle for a request that is going out via stream API.
190  */
191 struct GSF_StreamRequest
192 {
193
194   /**
195    * DLL.
196    */
197   struct GSF_StreamRequest *next;
198
199   /**
200    * DLL.
201    */
202   struct GSF_StreamRequest *prev;
203
204   /**
205    * Which stream is this request associated with?
206    */
207   struct StreamHandle *sh;
208
209   /**
210    * Function to call with the result.
211    */
212   GSF_StreamReplyProcessor proc;
213
214   /**
215    * Closure for 'proc'
216    */
217   void *proc_cls;
218
219   /**
220    * Query to transmit to the other peer.
221    */
222   struct GNUNET_HashCode query;
223
224   /**
225    * Desired type for the reply.
226    */
227   enum GNUNET_BLOCK_Type type;
228
229   /**
230    * Did we transmit this request already? YES if we are
231    * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
232    */
233   int was_transmitted;
234 };
235
236
237 /** 
238  * Handle for a stream to another peer.
239  */
240 struct StreamHandle
241 {
242   /**
243    * Head of DLL of pending requests on this stream.
244    */
245   struct GSF_StreamRequest *pending_head;
246
247   /**
248    * Tail of DLL of pending requests on this stream.
249    */
250   struct GSF_StreamRequest *pending_tail;
251
252   /**
253    * Map from query to 'struct GSF_StreamRequest's waiting for
254    * a reply.
255    */
256   struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
257
258   /**
259    * Connection to the other peer.
260    */
261   struct GNUNET_STREAM_Socket *stream;
262
263   /**
264    * Handle for active read operation, or NULL.
265    */ 
266   struct GNUNET_STREAM_IOReadHandle *rh;
267
268   /**
269    * Handle for active write operation, or NULL.
270    */ 
271   struct GNUNET_STREAM_IOWriteHandle *wh;
272
273   /**
274    * Tokenizer for replies.
275    */
276   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
277
278   /**
279    * Which peer does this stream go to?
280    */ 
281   struct GNUNET_PeerIdentity target;
282
283   /**
284    * Task to kill inactive streams (we keep them around for
285    * a few seconds to give the application a chance to give
286    * us another query).
287    */
288   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
289
290   /**
291    * Task to reset streams that had errors (asynchronously,
292    * as we may not be able to do it immediately during a
293    * callback from the stream API).
294    */
295   GNUNET_SCHEDULER_TaskIdentifier reset_task;
296
297   /**
298    * Is this stream ready for transmission?
299    */
300   int is_ready;
301
302 };
303
304
305 /**
306  * Listen socket for incoming requests.
307  */
308 static struct GNUNET_STREAM_ListenSocket *listen_socket;
309
310 /**
311  * Head of DLL of stream clients.
312  */ 
313 static struct StreamClient *sc_head;
314
315 /**
316  * Tail of DLL of stream clients.
317  */ 
318 static struct StreamClient *sc_tail;
319
320 /**
321  * Number of active stream clients in the 'sc_*'-DLL.
322  */
323 static unsigned int sc_count;
324
325 /**
326  * Maximum allowed number of stream clients.
327  */
328 static unsigned long long sc_count_max;
329
330 /**
331  * Map from peer identities to 'struct StreamHandles' with streams to
332  * those peers.
333  */
334 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
335
336
337 /* ********************* client-side code ************************* */
338
339 /**
340  * Iterator called on each entry in a waiting map to 
341  * call the 'proc' continuation and release associated
342  * resources.
343  *
344  * @param cls the 'struct StreamHandle'
345  * @param key the key of the entry in the map (the query)
346  * @param value the 'struct GSF_StreamRequest' to clean up
347  * @return GNUNET_YES (continue to iterate)
348  */
349 static int
350 free_waiting_entry (void *cls,
351                     const struct GNUNET_HashCode *key,
352                     void *value)
353 {
354   struct GSF_StreamRequest *sr = value;
355
356   sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
357             GNUNET_TIME_UNIT_FOREVER_ABS,
358             0, NULL);
359   GSF_stream_query_cancel (sr);
360   return GNUNET_YES;
361 }
362
363
364 /**
365  * Destroy a stream handle.
366  *
367  * @param sh stream to process
368  */
369 static void
370 destroy_stream_handle (struct StreamHandle *sh)
371 {
372   struct GSF_StreamRequest *sr;
373
374   while (NULL != (sr = sh->pending_head))
375   {
376     sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
377               GNUNET_TIME_UNIT_FOREVER_ABS,
378               0, NULL);
379     GSF_stream_query_cancel (sr);
380   }
381   GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
382                                          &free_waiting_entry,
383                                          sh);
384   if (NULL != sh->wh)
385     GNUNET_STREAM_io_write_cancel (sh->wh);
386   if (NULL != sh->rh)
387     GNUNET_STREAM_io_read_cancel (sh->rh);
388   if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
389     GNUNET_SCHEDULER_cancel (sh->timeout_task);
390   GNUNET_STREAM_close (sh->stream);
391   GNUNET_assert (GNUNET_OK ==
392                  GNUNET_CONTAINER_multihashmap_remove (stream_map,
393                                                        &sh->target.hashPubKey,
394                                                        sh));
395   GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
396   GNUNET_free (sh);
397 }
398
399
400 /**
401  * Transmit pending requests via the stream.
402  *
403  * @param sh stream to process
404  */
405 static void
406 transmit_pending (struct StreamHandle *sh);
407
408
409 /**
410  * Function called once the stream is ready for transmission.
411  *
412  * @param cls the 'struct StreamHandle'
413  * @param socket stream socket handle
414  */
415 static void
416 stream_ready_cb (void *cls,
417                  struct GNUNET_STREAM_Socket *socket)
418 {
419   struct StreamHandle *sh = cls;
420
421   sh->is_ready = GNUNET_YES;
422   transmit_pending (sh);
423 }
424
425
426 /**
427  * Iterator called on each entry in a waiting map to 
428  * move it back to the pending list.
429  *
430  * @param cls the 'struct StreamHandle'
431  * @param key the key of the entry in the map (the query)
432  * @param value the 'struct GSF_StreamRequest' to move to pending
433  * @return GNUNET_YES (continue to iterate)
434  */
435 static int
436 move_to_pending (void *cls,
437                  const struct GNUNET_HashCode *key,
438                  void *value)
439 {
440   struct StreamHandle *sh = cls;
441   struct GSF_StreamRequest *sr = value;
442   
443   GNUNET_assert (GNUNET_YES ==
444                  GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
445                                                        key,
446                                                        value));
447   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
448                                sh->pending_tail,
449                                sr);
450   sr->was_transmitted = GNUNET_NO;
451   return GNUNET_YES;
452 }
453
454
455 /**
456  * We had a serious error, tear down and re-create stream from scratch.
457  *
458  * @param sh stream to reset
459  */
460 static void
461 reset_stream (struct StreamHandle *sh)
462 {
463   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
464               "Resetting stream to %s\n",
465               GNUNET_i2s (&sh->target));
466   if (NULL != sh->rh)
467     GNUNET_STREAM_io_read_cancel (sh->rh);
468   GNUNET_STREAM_close (sh->stream);
469   sh->is_ready = GNUNET_NO;
470   GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
471                                          &move_to_pending,
472                                          sh);
473   sh->stream = GNUNET_STREAM_open (GSF_cfg,
474                                    &sh->target,
475                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
476                                    &stream_ready_cb, sh,
477                                    GNUNET_STREAM_OPTION_END);
478 }
479
480
481 /**
482  * Task called when it is time to destroy an inactive stream.
483  *
484  * @param cls the 'struct StreamHandle' to tear down
485  * @param tc scheduler context, unused
486  */
487 static void
488 stream_timeout (void *cls,
489                 const struct GNUNET_SCHEDULER_TaskContext *tc)
490 {
491   struct StreamHandle *sh = cls;
492
493   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
494               "Timeout on stream to %s\n",
495               GNUNET_i2s (&sh->target));
496   sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
497   destroy_stream_handle (sh);
498 }
499
500
501 /**
502  * Task called when it is time to reset an stream.
503  *
504  * @param cls the 'struct StreamHandle' to tear down
505  * @param tc scheduler context, unused
506  */
507 static void
508 reset_stream_task (void *cls,
509                    const struct GNUNET_SCHEDULER_TaskContext *tc)
510 {
511   struct StreamHandle *sh = cls;
512
513   sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
514   reset_stream (sh);
515 }
516
517
518 /**
519  * We had a serious error, tear down and re-create stream from scratch,
520  * but do so asynchronously.
521  *
522  * @param sh stream to reset
523  */
524 static void
525 reset_stream_async (struct StreamHandle *sh)
526 {
527   if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
528     sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
529                                                sh);
530 }
531
532
533 /**
534  * We got a reply from the stream.  Process it.
535  *
536  * @param cls the struct StreamHandle 
537  * @param status the status of the stream at the time this function is called
538  * @param data traffic from the other side
539  * @param size the number of bytes available in data read; will be 0 on timeout 
540  * @return number of bytes of processed from 'data' (any data remaining should be
541  *         given to the next time the read processor is called).
542  */
543 static size_t
544 handle_stream_reply (void *cls,
545                      enum GNUNET_STREAM_Status status,
546                      const void *data,
547                      size_t size)
548 {
549   struct StreamHandle *sh = cls;
550
551   sh->rh = NULL;
552   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553               "Received %u bytes from stream to %s\n",
554               (unsigned int) size,
555               GNUNET_i2s (&sh->target));
556   if (GNUNET_SYSERR == 
557       GNUNET_SERVER_mst_receive (sh->mst,
558                                  NULL,
559                                  data, size,
560                                  GNUNET_NO, GNUNET_NO))
561   {
562     GNUNET_break_op (0);
563     reset_stream_async (sh);
564     return size;
565   }
566   sh->rh = GNUNET_STREAM_read (sh->stream,
567                                GNUNET_TIME_UNIT_FOREVER_REL,
568                                &handle_stream_reply,
569                                sh);
570   return size;
571 }
572
573
574 /**
575  * Functions of this signature are called whenever we transmitted a
576  * query via a stream.
577  *
578  * @param cls the struct StreamHandle for which we did the write call
579  * @param status the status of the stream at the time this function is called;
580  *          GNUNET_OK if writing to stream was completed successfully,
581  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
582  *          mean time.
583  * @param size the number of bytes written
584  */
585 static void
586 query_write_continuation (void *cls,
587                           enum GNUNET_STREAM_Status status,
588                           size_t size)
589 {
590   struct StreamHandle *sh = cls;
591
592   sh->wh = NULL;
593   if ( (GNUNET_STREAM_OK != status) ||
594        (sizeof (struct StreamQueryMessage) != size) )
595   {
596     reset_stream (sh);
597     return;
598   }
599   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
600               "Successfully transmitted %u bytes via stream to %s\n",
601               (unsigned int) size,
602               GNUNET_i2s (&sh->target));
603   if (NULL == sh->rh)
604     sh->rh = GNUNET_STREAM_read (sh->stream,
605                                  GNUNET_TIME_UNIT_FOREVER_REL,
606                                  &handle_stream_reply,
607                                  sh);
608   transmit_pending (sh);
609 }
610           
611
612 /**
613  * Transmit pending requests via the stream.
614  *
615  * @param sh stream to process
616  */
617 static void
618 transmit_pending (struct StreamHandle *sh)
619 {
620   struct StreamQueryMessage sqm;
621   struct GSF_StreamRequest *sr;
622
623   if (NULL != sh->wh)
624     return;
625   sr = sh->pending_head;
626   if (NULL == sr)
627     return;
628   GNUNET_CONTAINER_DLL_remove (sh->pending_head,
629                                sh->pending_tail,
630                                sr);
631   GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
632                                      &sr->query,
633                                      sr,
634                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
635   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
636               "Sending query via stream to %s\n",
637               GNUNET_i2s (&sh->target));
638   sr->was_transmitted = GNUNET_YES;
639   sqm.header.size = htons (sizeof (sqm));
640   sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
641   sqm.type = htonl (sr->type);
642   sqm.query = sr->query;
643   sh->wh = GNUNET_STREAM_write (sh->stream,
644                                 &sqm, sizeof (sqm),
645                                 GNUNET_TIME_UNIT_FOREVER_REL,
646                                 &query_write_continuation,
647                                 sh);
648 }
649
650
651 /**
652  * Closure for 'handle_reply'.
653  */
654 struct HandleReplyClosure
655 {
656
657   /**
658    * Reply payload.
659    */ 
660   const void *data;
661
662   /**
663    * Expiration time for the block.
664    */
665   struct GNUNET_TIME_Absolute expiration;
666
667   /**
668    * Number of bytes in 'data'.
669    */
670   size_t data_size;
671
672   /** 
673    * Type of the block.
674    */
675   enum GNUNET_BLOCK_Type type;
676   
677   /**
678    * Did we have a matching query?
679    */
680   int found;
681 };
682
683
684 /**
685  * Iterator called on each entry in a waiting map to 
686  * process a result.
687  *
688  * @param cls the 'struct HandleReplyClosure'
689  * @param key the key of the entry in the map (the query)
690  * @param value the 'struct GSF_StreamRequest' to handle result for
691  * @return GNUNET_YES (continue to iterate)
692  */
693 static int
694 handle_reply (void *cls,
695               const struct GNUNET_HashCode *key,
696               void *value)
697 {
698   struct HandleReplyClosure *hrc = cls;
699   struct GSF_StreamRequest *sr = value;
700   
701   sr->proc (sr->proc_cls,
702             hrc->type,
703             hrc->expiration,
704             hrc->data_size,
705             hrc->data);
706   GSF_stream_query_cancel (sr);
707   hrc->found = GNUNET_YES;
708   return GNUNET_YES;
709 }
710
711
712 /**
713  * Functions with this signature are called whenever a
714  * complete reply is received.
715  *
716  * Do not call GNUNET_SERVER_mst_destroy in callback
717  *
718  * @param cls closure with the 'struct StreamHandle'
719  * @param client identification of the client, NULL
720  * @param message the actual message
721  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
722  */
723 static int
724 reply_cb (void *cls,
725           void *client,
726           const struct GNUNET_MessageHeader *message)
727 {
728   struct StreamHandle *sh = cls;
729   const struct StreamReplyMessage *srm;
730   struct HandleReplyClosure hrc;
731   uint16_t msize;
732   enum GNUNET_BLOCK_Type type;
733   struct GNUNET_HashCode query;
734
735   msize = ntohs (message->size);
736   switch (ntohs (message->type))
737   {
738   case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
739     if (sizeof (struct StreamReplyMessage) > msize)
740     {
741       GNUNET_break_op (0);
742       reset_stream_async (sh);
743       return GNUNET_SYSERR;
744     }
745     srm = (const struct StreamReplyMessage *) message;
746     msize -= sizeof (struct StreamReplyMessage);
747     type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
748     if (GNUNET_YES !=
749         GNUNET_BLOCK_get_key (GSF_block_ctx,
750                               type,
751                               &srm[1], msize, &query))
752     {
753       GNUNET_break_op (0); 
754       reset_stream_async (sh);
755       return GNUNET_SYSERR;
756     }
757     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
758                 "Received reply `%s' via stream\n",
759                 GNUNET_h2s (&query));
760     GNUNET_STATISTICS_update (GSF_stats,
761                               gettext_noop ("# replies received via stream"), 1,
762                               GNUNET_NO);
763     hrc.data = &srm[1];
764     hrc.data_size = msize;
765     hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
766     hrc.type = type;
767     hrc.found = GNUNET_NO;
768     GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
769                                                 &query,
770                                                 &handle_reply,
771                                                 &hrc);
772     if (GNUNET_NO == hrc.found)
773     {
774       GNUNET_STATISTICS_update (GSF_stats,
775                                 gettext_noop ("# replies received via stream dropped"), 1,
776                                 GNUNET_NO);
777       return GNUNET_OK;
778     }
779     return GNUNET_OK;
780   default:
781     GNUNET_break_op (0);
782     reset_stream_async (sh);
783     return GNUNET_SYSERR;
784   }
785 }
786
787
788 /**
789  * Get (or create) a stream to talk to the given peer.
790  *
791  * @param target peer we want to communicate with
792  */
793 static struct StreamHandle *
794 get_stream (const struct GNUNET_PeerIdentity *target)
795 {
796   struct StreamHandle *sh;
797
798   sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
799                                           &target->hashPubKey);
800   if (NULL != sh)
801   {
802     if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
803     {
804       GNUNET_SCHEDULER_cancel (sh->timeout_task);
805       sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
806     }
807     return sh;
808   }
809   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810               "Creating stream to %s\n",
811               GNUNET_i2s (target));
812   sh = GNUNET_malloc (sizeof (struct StreamHandle));
813   sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
814                                       sh);
815   sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
816   sh->target = *target;
817   sh->stream = GNUNET_STREAM_open (GSF_cfg,
818                                    &sh->target,
819                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
820                                    &stream_ready_cb, sh,
821                                    GNUNET_STREAM_OPTION_END);
822   GNUNET_assert (GNUNET_OK ==
823                  GNUNET_CONTAINER_multihashmap_put (stream_map,
824                                                     &sh->target.hashPubKey,
825                                                     sh,
826                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
827   return sh;
828 }
829
830
831 /**
832  * Look for a block by directly contacting a particular peer.
833  *
834  * @param target peer that should have the block
835  * @param query hash to query for the block
836  * @param type desired type for the block
837  * @param proc function to call with result
838  * @param proc_cls closure for 'proc'
839  * @return handle to cancel the operation
840  */
841 struct GSF_StreamRequest *
842 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
843                   const struct GNUNET_HashCode *query,
844                   enum GNUNET_BLOCK_Type type,
845                   GSF_StreamReplyProcessor proc, void *proc_cls)
846 {
847   struct StreamHandle *sh;
848   struct GSF_StreamRequest *sr;
849
850   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
851               "Preparing to send query for %s via stream to %s\n",
852               GNUNET_h2s (query),
853               GNUNET_i2s (target));
854   sh = get_stream (target);
855   sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
856   sr->sh = sh;
857   sr->proc = proc;
858   sr->proc_cls = proc_cls;
859   sr->type = type;
860   sr->query = *query;
861   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
862                                sh->pending_tail,
863                                sr);
864   if (GNUNET_YES == sh->is_ready)
865     transmit_pending (sh);
866   return sr;
867 }
868
869
870 /**
871  * Cancel an active request; must not be called after 'proc'
872  * was calld.
873  *
874  * @param sr request to cancel
875  */
876 void
877 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
878 {
879   struct StreamHandle *sh = sr->sh;
880
881   if (GNUNET_YES == sr->was_transmitted)
882     GNUNET_assert (GNUNET_OK ==
883                    GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
884                                                          &sr->query,
885                                                          sr));
886   else
887     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
888                                  sh->pending_tail,
889                                  sr);
890   GNUNET_free (sr);
891   if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
892        (NULL == sh->pending_head) )
893     sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
894                                                      &stream_timeout,
895                                                      sh);
896 }
897
898
899 /* ********************* server-side code ************************* */
900
901
902 /**
903  * We're done with a particular client, clean up.
904  *
905  * @param sc client to clean up
906  */
907 static void
908 terminate_stream (struct StreamClient *sc)
909 {
910   GNUNET_STATISTICS_update (GSF_stats,
911                             gettext_noop ("# stream connections active"), -1,
912                             GNUNET_NO);
913   if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
914     GNUNET_SCHEDULER_cancel (sc->terminate_task); 
915   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
916     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
917  if (NULL != sc->rh)
918     GNUNET_STREAM_io_read_cancel (sc->rh);
919   if (NULL != sc->wh)
920     GNUNET_STREAM_io_write_cancel (sc->wh);
921   if (NULL != sc->qe)
922     GNUNET_DATASTORE_cancel (sc->qe);
923   GNUNET_SERVER_mst_destroy (sc->mst);
924   GNUNET_STREAM_close (sc->socket);
925   struct WriteQueueItem *wqi;
926   while (NULL != (wqi = sc->wqi_head))
927   {
928     GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
929                                  sc->wqi_tail,
930                                  wqi);
931     GNUNET_free (wqi);
932   }
933
934
935   GNUNET_CONTAINER_DLL_remove (sc_head,
936                                sc_tail,
937                                sc);
938   sc_count--;
939   GNUNET_free (sc);
940 }
941
942
943 /**
944  * Task run to asynchronously terminate the stream.
945  *
946  * @param cls the 'struct StreamClient'
947  * @param tc scheduler context
948  */ 
949 static void
950 terminate_stream_task (void *cls,
951                        const struct GNUNET_SCHEDULER_TaskContext *tc)
952 {
953   struct StreamClient *sc = cls;
954
955   sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
956   terminate_stream (sc);
957 }
958
959
960 /**
961  * Task run to asynchronously terminate the stream due to timeout.
962  *
963  * @param cls the 'struct StreamClient'
964  * @param tc scheduler context
965  */ 
966 static void
967 timeout_stream_task (void *cls,
968                      const struct GNUNET_SCHEDULER_TaskContext *tc)
969 {
970   struct StreamClient *sc = cls;
971
972   sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
973   terminate_stream (sc);
974 }
975
976
977 /**
978  * Reset the timeout for the stream client (due to activity).
979  *
980  * @param sc client handle to reset timeout for
981  */
982 static void
983 refresh_timeout_task (struct StreamClient *sc)
984 {
985   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
986     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
987   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
988                                                    &timeout_stream_task,
989                                                    sc);
990 }
991
992
993 /**
994  * We had a serious error, termiante stream,
995  * but do so asynchronously.
996  *
997  * @param sc stream to reset
998  */
999 static void
1000 terminate_stream_async (struct StreamClient *sc)
1001 {
1002   if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
1003     sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
1004                                                    sc);
1005 }
1006
1007
1008 /**
1009  * Functions of this signature are called whenever data is available from the
1010  * stream.
1011  *
1012  * @param cls the closure from GNUNET_STREAM_read
1013  * @param status the status of the stream at the time this function is called
1014  * @param data traffic from the other side
1015  * @param size the number of bytes available in data read; will be 0 on timeout 
1016  * @return number of bytes of processed from 'data' (any data remaining should be
1017  *         given to the next time the read processor is called).
1018  */
1019 static size_t 
1020 process_request (void *cls,
1021                  enum GNUNET_STREAM_Status status,
1022                  const void *data,
1023                  size_t size);
1024
1025
1026 /**
1027  * We're done handling a request from a client, read the next one.
1028  *
1029  * @param sc client to continue reading requests from
1030  */
1031 static void
1032 continue_reading (struct StreamClient *sc)
1033 {
1034   int ret;
1035
1036   ret = 
1037     GNUNET_SERVER_mst_receive (sc->mst,
1038                                NULL,
1039                                NULL, 0,
1040                                GNUNET_NO, GNUNET_YES);
1041   if (GNUNET_NO == ret)
1042     return; 
1043   refresh_timeout_task (sc);
1044   sc->rh = GNUNET_STREAM_read (sc->socket,
1045                                GNUNET_TIME_UNIT_FOREVER_REL,
1046                                &process_request,
1047                                sc);      
1048 }
1049
1050
1051 /**
1052  * Transmit the next entry from the write queue.
1053  *
1054  * @param sc where to process the write queue
1055  */
1056 static void
1057 continue_writing (struct StreamClient *sc);
1058
1059
1060 /**
1061  * Functions of this signature are called whenever data is available from the
1062  * stream.
1063  *
1064  * @param cls the closure from GNUNET_STREAM_read
1065  * @param status the status of the stream at the time this function is called
1066  * @param data traffic from the other side
1067  * @param size the number of bytes available in data read; will be 0 on timeout 
1068  * @return number of bytes of processed from 'data' (any data remaining should be
1069  *         given to the next time the read processor is called).
1070  */
1071 static size_t 
1072 process_request (void *cls,
1073                  enum GNUNET_STREAM_Status status,
1074                  const void *data,
1075                  size_t size)
1076 {
1077   struct StreamClient *sc = cls;
1078   int ret;
1079
1080   sc->rh = NULL;
1081   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1082               "Received %u byte query via stream\n",
1083               (unsigned int) size);
1084   switch (status)
1085   {
1086   case GNUNET_STREAM_OK:
1087     ret = 
1088       GNUNET_SERVER_mst_receive (sc->mst,
1089                                  NULL,
1090                                  data, size,
1091                                  GNUNET_NO, GNUNET_YES);
1092     if (GNUNET_NO == ret)
1093       return size; /* more messages in MST */
1094     if (GNUNET_SYSERR == ret)
1095     {
1096       GNUNET_break_op (0);
1097       terminate_stream_async (sc);
1098       return size;
1099     }
1100     break;
1101   case GNUNET_STREAM_TIMEOUT:
1102   case GNUNET_STREAM_SHUTDOWN:
1103   case GNUNET_STREAM_SYSERR:
1104     terminate_stream_async (sc);
1105     return size;
1106   default:
1107     GNUNET_break (0);
1108     return size;
1109   }
1110   continue_writing (sc);
1111   return size;
1112 }
1113
1114
1115 /**
1116  * Sending a reply was completed, continue processing.
1117  *
1118  * @param cls closure with the struct StreamClient which sent the query
1119  * @param status result code for the operation
1120  * @param size number of bytes that were transmitted
1121  */
1122 static void
1123 write_continuation (void *cls,
1124                     enum GNUNET_STREAM_Status status,
1125                     size_t size)
1126 {
1127   struct StreamClient *sc = cls;
1128   
1129   sc->wh = NULL;
1130   if ( (GNUNET_STREAM_OK != status) ||
1131        (size != sc->reply_size) )
1132   {
1133     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1134                 "Transmission of reply failed, terminating stream\n");
1135     terminate_stream (sc);    
1136     return;
1137   }
1138   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1139               "Transmitted %u byte reply via stream\n",
1140               (unsigned int) size);
1141   GNUNET_STATISTICS_update (GSF_stats,
1142                             gettext_noop ("# Blocks transferred via stream"), 1,
1143                             GNUNET_NO);
1144   continue_writing (sc);
1145 }
1146
1147
1148 /**
1149  * Transmit the next entry from the write queue.
1150  *
1151  * @param sc where to process the write queue
1152  */
1153 static void
1154 continue_writing (struct StreamClient *sc)
1155 {
1156   struct WriteQueueItem *wqi;
1157
1158   if (NULL != sc->wh)
1159     return; /* write already pending */
1160   if (NULL == (wqi = sc->wqi_head))
1161   {
1162     continue_reading (sc);
1163     return;
1164   }
1165   GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
1166                                sc->wqi_tail,
1167                                wqi);
1168   sc->wh = GNUNET_STREAM_write (sc->socket,
1169                                 &wqi[1], wqi->msize,
1170                                 GNUNET_TIME_UNIT_FOREVER_REL,
1171                                 &write_continuation,
1172                                 sc);
1173   GNUNET_free (wqi);
1174   if (NULL == sc->wh)
1175   {
1176     terminate_stream (sc);
1177     return;
1178   }
1179 }
1180
1181
1182 /**
1183  * Process a datum that was stored in the datastore.
1184  *
1185  * @param cls closure with the struct StreamClient which sent the query
1186  * @param key key for the content
1187  * @param size number of bytes in data
1188  * @param data content stored
1189  * @param type type of the content
1190  * @param priority priority of the content
1191  * @param anonymity anonymity-level for the content
1192  * @param expiration expiration time for the content
1193  * @param uid unique identifier for the datum;
1194  *        maybe 0 if no unique identifier is available
1195  */
1196 static void 
1197 handle_datastore_reply (void *cls,
1198                         const struct GNUNET_HashCode * key,
1199                         size_t size, const void *data,
1200                         enum GNUNET_BLOCK_Type type,
1201                         uint32_t priority,
1202                         uint32_t anonymity,
1203                         struct GNUNET_TIME_Absolute
1204                         expiration, uint64_t uid)
1205 {
1206   struct StreamClient *sc = cls;
1207   size_t msize = size + sizeof (struct StreamReplyMessage);
1208   struct WriteQueueItem *wqi;
1209   struct StreamReplyMessage *srm;
1210
1211   sc->qe = NULL;
1212   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1213   {
1214     if (GNUNET_OK !=
1215         GNUNET_FS_handle_on_demand_block (key,
1216                                           size, data, type,
1217                                           priority, anonymity,
1218                                           expiration, uid,
1219                                           &handle_datastore_reply,
1220                                           sc))
1221     {
1222       continue_writing (sc);
1223     }
1224     return;
1225   }
1226   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
1227   {
1228     GNUNET_break (0);
1229     continue_writing (sc);
1230     return;
1231   }
1232   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1233               "Starting transmission of %u byte reply via stream\n",
1234               (unsigned int) size);
1235   wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
1236   srm = (struct StreamReplyMessage *) &wqi[1];
1237   srm->header.size = htons ((uint16_t) msize);
1238   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
1239   srm->type = htonl (type);
1240   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
1241   memcpy (&srm[1], data, size);
1242   sc->reply_size = msize;
1243   GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
1244                                sc->wqi_tail,
1245                                wqi);
1246   continue_writing (sc);
1247 }
1248
1249
1250 /**
1251  * Functions with this signature are called whenever a
1252  * complete query message is received.
1253  *
1254  * Do not call GNUNET_SERVER_mst_destroy in callback
1255  *
1256  * @param cls closure with the 'struct StreamClient'
1257  * @param client identification of the client, NULL
1258  * @param message the actual message
1259  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1260  */
1261 static int
1262 request_cb (void *cls,
1263             void *client,
1264             const struct GNUNET_MessageHeader *message)
1265 {
1266   struct StreamClient *sc = cls;
1267   const struct StreamQueryMessage *sqm;
1268
1269   switch (ntohs (message->type))
1270   {
1271   case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
1272     if (sizeof (struct StreamQueryMessage) != 
1273         ntohs (message->size))
1274     {
1275       GNUNET_break_op (0);
1276       terminate_stream_async (sc);
1277       return GNUNET_SYSERR;
1278     }
1279     sqm = (const struct StreamQueryMessage *) message;
1280     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1281                 "Received query for `%s' via stream\n",
1282                 GNUNET_h2s (&sqm->query));
1283     GNUNET_STATISTICS_update (GSF_stats,
1284                               gettext_noop ("# queries received via stream"), 1,
1285                               GNUNET_NO);
1286     refresh_timeout_task (sc);
1287     sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1288                                        0,
1289                                        &sqm->query,
1290                                        ntohl (sqm->type),
1291                                        0 /* priority */, 
1292                                        GSF_datastore_queue_size,
1293                                        GNUNET_TIME_UNIT_FOREVER_REL,
1294                                        &handle_datastore_reply, sc);
1295     if (NULL == sc->qe)
1296       continue_writing (sc);
1297     return GNUNET_OK;
1298   default:
1299     GNUNET_break_op (0);
1300     terminate_stream_async (sc);
1301     return GNUNET_SYSERR;
1302   }
1303 }
1304
1305
1306 /**
1307  * Functions of this type are called upon new stream connection from other peers
1308  * or upon binding error which happen when the app_port given in
1309  * GNUNET_STREAM_listen() is already taken.
1310  *
1311  * @param cls the closure from GNUNET_STREAM_listen
1312  * @param socket the socket representing the stream; NULL on binding error
1313  * @param initiator the identity of the peer who wants to establish a stream
1314  *            with us; NULL on binding error
1315  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
1316  *             stream (the socket will be invalid after the call)
1317  */
1318 static int 
1319 accept_cb (void *cls,
1320            struct GNUNET_STREAM_Socket *socket,
1321            const struct GNUNET_PeerIdentity *initiator)
1322 {
1323   struct StreamClient *sc;
1324
1325   if (NULL == socket)
1326     return GNUNET_SYSERR;
1327   if (sc_count >= sc_count_max)
1328   {
1329     GNUNET_STATISTICS_update (GSF_stats,
1330                               gettext_noop ("# stream client connections rejected"), 1,
1331                               GNUNET_NO);
1332     return GNUNET_SYSERR;
1333   }
1334   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1335               "Accepting inbound stream connection from `%s'\n",
1336               GNUNET_i2s (initiator));
1337   GNUNET_STATISTICS_update (GSF_stats,
1338                             gettext_noop ("# stream connections active"), 1,
1339                             GNUNET_NO);
1340   sc = GNUNET_malloc (sizeof (struct StreamClient));
1341   sc->socket = socket;
1342   sc->mst = GNUNET_SERVER_mst_create (&request_cb,
1343                                       sc);
1344   sc->rh = GNUNET_STREAM_read (sc->socket,
1345                                GNUNET_TIME_UNIT_FOREVER_REL,
1346                                &process_request,
1347                                sc);
1348   GNUNET_CONTAINER_DLL_insert (sc_head,
1349                                sc_tail,
1350                                sc);
1351   sc_count++;
1352   refresh_timeout_task (sc);
1353   return GNUNET_OK;
1354 }
1355
1356
1357 /**
1358  * Initialize subsystem for non-anonymous file-sharing.
1359  */
1360 void
1361 GSF_stream_start ()
1362 {
1363   stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1364   if (GNUNET_YES ==
1365       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1366                                              "fs",
1367                                              "MAX_STREAM_CLIENTS",
1368                                              &sc_count_max))
1369   {
1370     listen_socket = GNUNET_STREAM_listen (GSF_cfg,
1371                                           GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1372                                           &accept_cb, NULL,
1373                                           GNUNET_STREAM_OPTION_END);
1374   } 
1375 }
1376
1377
1378 /**
1379  * Function called on each active streams to shut them down.
1380  *
1381  * @param cls NULL
1382  * @param key target peer, unused
1383  * @param value the 'struct StreamHandle' to destroy
1384  * @return GNUNET_YES (continue to iterate)
1385  */
1386 static int
1387 release_streams (void *cls,
1388                  const struct GNUNET_HashCode *key,
1389                  void *value)
1390 {
1391   struct StreamHandle *sh = value;
1392
1393   destroy_stream_handle (sh);
1394   return GNUNET_YES;
1395 }
1396
1397
1398 /**
1399  * Shutdown subsystem for non-anonymous file-sharing.
1400  */
1401 void
1402 GSF_stream_stop ()
1403 {
1404   struct StreamClient *sc;
1405
1406   while (NULL != (sc = sc_head))
1407     terminate_stream (sc);
1408   if (NULL != listen_socket)
1409   {
1410     GNUNET_STREAM_listen_close (listen_socket);
1411     listen_socket = NULL;
1412   }
1413   GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1414                                          &release_streams,
1415                                          NULL);
1416   GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1417   stream_map = NULL;
1418 }
1419
1420 /* end of gnunet-service-fs_stream.c */