towards using stream for non-anonymous file-sharing
[oweals/gnunet.git] / src / fs / gnunet-service-fs_stream.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_stream.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - limit # concurrent clients, have timeouts for server-side
28  * - stream shutdown in callbacks from stream may not always work right now (check with stream_api!)
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_stream_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_stream.h"
39
40 /**
41  * Information we keep around for each active streaming client.
42  */
43 struct StreamClient
44 {
45   /**
46    * DLL
47    */ 
48   struct StreamClient *next;
49
50   /**
51    * DLL
52    */ 
53   struct StreamClient *prev;
54
55   /**
56    * Socket for communication.
57    */ 
58   struct GNUNET_STREAM_Socket *socket;
59
60   /**
61    * Handle for active read operation, or NULL.
62    */ 
63   struct GNUNET_STREAM_IOReadHandle *rh;
64
65   /**
66    * Handle for active write operation, or NULL.
67    */ 
68   struct GNUNET_STREAM_IOWriteHandle *wh;
69   
70   /**
71    * Tokenizer for requests.
72    */
73   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
74   
75   /**
76    * Current active request to the datastore, if we have one pending.
77    */
78   struct GNUNET_DATASTORE_QueueEntry *qe;
79
80   /**
81    * Size of the last write that was initiated.
82    */ 
83   size_t reply_size;
84
85 };
86
87
88 /**
89  * Query from one peer, asking the other for CHK-data.
90  */
91 struct StreamQueryMessage
92 {
93
94   /**
95    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
96    */
97   struct GNUNET_MessageHeader header;
98
99   /**
100    * Block type must be DBLOCK or IBLOCK.
101    */
102   uint32_t type;
103
104   /**
105    * Query hash from CHK (hash of encrypted block).
106    */
107   struct GNUNET_HashCode query;
108
109 };
110
111
112 /**
113  * Reply to a StreamQueryMessage.
114  */
115 struct StreamReplyMessage
116 {
117
118   /**
119    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
120    */
121   struct GNUNET_MessageHeader header;
122
123   /**
124    * Block type must be DBLOCK or IBLOCK.
125    */
126   uint32_t type;
127
128   /**
129    * Expiration time for the block.
130    */
131   struct GNUNET_TIME_AbsoluteNBO expiration;
132
133   /* followed by the encrypted block */
134
135 };
136
137
138 /** 
139  * Handle for a stream to another peer.
140  */
141 struct StreamHandle;
142
143
144 /**
145  * Handle for a request that is going out via stream API.
146  */
147 struct GSF_StreamRequest
148 {
149
150   /**
151    * DLL.
152    */
153   struct GSF_StreamRequest *next;
154
155   /**
156    * DLL.
157    */
158   struct GSF_StreamRequest *prev;
159
160   /**
161    * Which stream is this request associated with?
162    */
163   struct StreamHandle *sh;
164
165   /**
166    * Function to call with the result.
167    */
168   GSF_StreamReplyProcessor proc;
169
170   /**
171    * Closure for 'proc'
172    */
173   void *proc_cls;
174
175   /**
176    * Query to transmit to the other peer.
177    */
178   struct GNUNET_HashCode query;
179
180   /**
181    * Desired type for the reply.
182    */
183   enum GNUNET_BLOCK_Type type;
184
185   /**
186    * Did we transmit this request already? YES if we are
187    * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
188    */
189   int was_transmitted;
190 };
191
192
193 /** 
194  * Handle for a stream to another peer.
195  */
196 struct StreamHandle
197 {
198   /**
199    * Head of DLL of pending requests on this stream.
200    */
201   struct GSF_StreamRequest *pending_head;
202
203   /**
204    * Tail of DLL of pending requests on this stream.
205    */
206   struct GSF_StreamRequest *pending_tail;
207
208   /**
209    * Head of DLL of requests waiting for a reply on this stream.
210    */
211   struct GSF_StreamRequest *waiting_head;
212
213   /**
214    * Tail of DLL of requests waiting for a reply on this stream.
215    */
216   struct GSF_StreamRequest *waiting_tail;
217
218   /**
219    * Connection to the other peer.
220    */
221   struct GNUNET_STREAM_Socket *stream;
222
223   /**
224    * Handle for active read operation, or NULL.
225    */ 
226   struct GNUNET_STREAM_IOReadHandle *rh;
227
228   /**
229    * Handle for active write operation, or NULL.
230    */ 
231   struct GNUNET_STREAM_IOWriteHandle *wh;
232
233   /**
234    * Tokenizer for replies.
235    */
236   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
237
238   /**
239    * Which peer does this stream go to?
240    */ 
241   struct GNUNET_PeerIdentity target;
242
243   /**
244    * Task to kill inactive streams (we keep them around for
245    * a few seconds to give the application a chance to give
246    * us another query).
247    */
248   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
249
250   /**
251    * Is this stream ready for transmission?
252    */
253   int is_ready;
254
255 };
256
257
258 /**
259  * Listen socket for incoming requests.
260  */
261 static struct GNUNET_STREAM_ListenSocket *listen_socket;
262
263 /**
264  * Head of DLL of stream clients.
265  */ 
266 static struct StreamClient *sc_head;
267
268 /**
269  * Tail of DLL of stream clients.
270  */ 
271 static struct StreamClient *sc_tail;
272
273 /**
274  * Map from peer identities to 'struct StreamHandles' with streams to
275  * those peers.
276  */
277 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
278
279
280 /* ********************* client-side code ************************* */
281
282
283 /**
284  * Destroy a stream handle.
285  *
286  * @param sh stream to process
287  */
288 static void
289 destroy_stream_handle (struct StreamHandle *sh)
290 {
291   struct GSF_StreamRequest *sr;
292
293   while (NULL != (sr = sh->pending_head))
294   {
295     sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
296               GNUNET_TIME_UNIT_FOREVER_ABS,
297               0, NULL);
298     GSF_stream_query_cancel (sr);
299   }
300   while (NULL != (sr = sh->waiting_head))
301   {
302     sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
303               GNUNET_TIME_UNIT_FOREVER_ABS,
304               0, NULL);
305     GSF_stream_query_cancel (sr);
306   }
307   if (NULL != sh->wh)
308     GNUNET_STREAM_io_write_cancel (sh->wh);
309   if (NULL != sh->rh)
310     GNUNET_STREAM_io_read_cancel (sh->rh);
311   if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
312     GNUNET_SCHEDULER_cancel (sh->timeout_task);
313   GNUNET_STREAM_close (sh->stream);
314   GNUNET_assert (GNUNET_OK ==
315                  GNUNET_CONTAINER_multihashmap_remove (stream_map,
316                                                        &sh->target.hashPubKey,
317                                                        sh));
318   GNUNET_free (sh);
319 }
320
321
322 /**
323  * Transmit pending requests via the stream.
324  *
325  * @param sh stream to process
326  */
327 static void
328 transmit_pending (struct StreamHandle *sh);
329
330
331 /**
332  * Function called once the stream is ready for transmission.
333  *
334  * @param cls the 'struct StreamHandle'
335  * @param socket stream socket handle
336  */
337 static void
338 stream_ready_cb (void *cls,
339                  struct GNUNET_STREAM_Socket *socket)
340 {
341   struct StreamHandle *sh = cls;
342
343   sh->is_ready = GNUNET_YES;
344   transmit_pending (sh);
345 }
346
347
348 /**
349  * We had a serious error, tear down and re-create stream from scratch.
350  *
351  * @param sh stream to reset
352  */
353 static void
354 reset_stream (struct StreamHandle *sh)
355 {
356   struct GSF_StreamRequest *sr;
357   
358   if (NULL != sh->rh)
359     GNUNET_STREAM_io_read_cancel (sh->rh);
360   GNUNET_STREAM_close (sh->stream);
361   sh->is_ready = GNUNET_NO;
362   while (NULL != (sr = sh->waiting_tail))
363   {
364     GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
365                                  sh->waiting_tail,
366                                  sr);
367     GNUNET_CONTAINER_DLL_insert (sh->pending_head,
368                                  sh->pending_tail,
369                                  sr);
370     sr->was_transmitted = GNUNET_NO;
371   }
372   sh->stream = GNUNET_STREAM_open (GSF_cfg,
373                                    &sh->target,
374                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
375                                    &stream_ready_cb, sh,
376                                    GNUNET_STREAM_OPTION_END);
377 }
378
379
380 /**
381  * We got a reply from the stream.  Process it.
382  *
383  * @param cls the struct StreamHandle 
384  * @param status the status of the stream at the time this function is called
385  * @param data traffic from the other side
386  * @param size the number of bytes available in data read; will be 0 on timeout 
387  * @return number of bytes of processed from 'data' (any data remaining should be
388  *         given to the next time the read processor is called).
389  */
390 static size_t
391 handle_stream_reply (void *cls,
392                      enum GNUNET_STREAM_Status status,
393                      const void *data,
394                      size_t size)
395 {
396   struct StreamHandle *sh = cls;
397
398   sh->rh = NULL;
399   if (GNUNET_SYSERR == 
400       GNUNET_SERVER_mst_receive (sh->mst,
401                                  NULL,
402                                  data, size,
403                                  GNUNET_NO, GNUNET_NO))
404   {
405     GNUNET_break_op (0);
406     reset_stream (sh);
407     return size;
408   }
409   sh->rh = GNUNET_STREAM_read (sh->stream,
410                                GNUNET_TIME_UNIT_FOREVER_REL,
411                                &handle_stream_reply,
412                                sh);
413   return size;
414 }
415
416
417 /**
418  * Functions of this signature are called whenever we transmitted a
419  * query via a stream.
420  *
421  * @param cls the struct StreamHandle for which we did the write call
422  * @param status the status of the stream at the time this function is called;
423  *          GNUNET_OK if writing to stream was completed successfully,
424  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
425  *          mean time.
426  * @param size the number of bytes written
427  */
428 static void
429 query_write_continuation (void *cls,
430                           enum GNUNET_STREAM_Status status,
431                           size_t size)
432 {
433   struct StreamHandle *sh = cls;
434
435   sh->wh = NULL;
436   if ( (GNUNET_STREAM_OK != status) ||
437        (sizeof (struct StreamQueryMessage) != size) )
438   {
439     reset_stream (sh);
440     return;
441   }
442   if (NULL == sh->rh)
443     sh->rh = GNUNET_STREAM_read (sh->stream,
444                                  GNUNET_TIME_UNIT_FOREVER_REL,
445                                  &handle_stream_reply,
446                                  sh);
447   transmit_pending (sh);
448 }
449           
450
451 /**
452  * Transmit pending requests via the stream.
453  *
454  * @param sh stream to process
455  */
456 static void
457 transmit_pending (struct StreamHandle *sh)
458 {
459   struct StreamQueryMessage sqm;
460   struct GSF_StreamRequest *sr;
461
462   if (NULL != sh->wh)
463     return;
464   sr = sh->pending_head;
465   if (NULL == sr)
466     return;
467   GNUNET_CONTAINER_DLL_remove (sh->pending_head,
468                                sh->pending_tail,
469                                sr);
470   GNUNET_CONTAINER_DLL_insert_tail (sh->waiting_head,
471                                     sh->waiting_tail,
472                                     sr);
473   sr->was_transmitted = GNUNET_YES;
474   sqm.header.size = htons (sizeof (sqm));
475   sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
476   sqm.type = htonl (sr->type);
477   sqm.query = sr->query;
478   sh->wh = GNUNET_STREAM_write (sh->stream,
479                                 &sqm, sizeof (sqm),
480                                 GNUNET_TIME_UNIT_FOREVER_REL,
481                                 &query_write_continuation,
482                                 sh);
483 }
484
485
486 /**
487  * Functions with this signature are called whenever a
488  * complete reply is received.
489  *
490  * Do not call GNUNET_SERVER_mst_destroy in callback
491  *
492  * @param cls closure with the 'struct StreamHandle'
493  * @param client identification of the client, NULL
494  * @param message the actual message
495  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
496  */
497 static int
498 reply_cb (void *cls,
499           void *client,
500           const struct GNUNET_MessageHeader *message)
501 {
502   struct StreamHandle *sh = cls;
503   const struct StreamReplyMessage *srm;
504   uint16_t msize;
505   enum GNUNET_BLOCK_Type type;
506   struct GNUNET_HashCode query;
507   struct GSF_StreamRequest *sr;
508
509   msize = ntohs (message->size);
510   switch (ntohs (message->type))
511   {
512   case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
513     if (sizeof (struct StreamReplyMessage) > msize)
514     {
515       GNUNET_break_op (0);
516       return GNUNET_SYSERR;
517     }
518     srm = (const struct StreamReplyMessage *) message;
519     msize -= sizeof (struct StreamReplyMessage);
520     type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
521     if (GNUNET_YES !=
522         GNUNET_BLOCK_get_key (GSF_block_ctx,
523                               type,
524                               &srm[1], msize, &query))
525     {
526       GNUNET_break_op (0);
527       return GNUNET_SYSERR;
528     }
529     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
530                 "Received reply `%s' via stream\n",
531                 GNUNET_h2s (&query));
532     GNUNET_STATISTICS_update (GSF_stats,
533                               gettext_noop ("# replies received via stream"), 1,
534                               GNUNET_NO);
535     for (sr = sh->waiting_head; NULL != sr; sr = sr->next)
536       if (0 == memcmp (&query,
537                        &sr->query,
538                        sizeof (struct GNUNET_HashCode)))
539         break;
540     if (NULL == sr)
541     {
542       GNUNET_STATISTICS_update (GSF_stats,
543                                 gettext_noop ("# replies received via stream dropped"), 1,
544                                 GNUNET_NO);
545       return GNUNET_OK;
546     }
547     sr->proc (sr->proc_cls,
548               type,
549               GNUNET_TIME_absolute_ntoh (srm->expiration),
550               msize,
551               &srm[1]);
552     GSF_stream_query_cancel (sr);
553     return GNUNET_OK;
554   default:
555     GNUNET_break_op (0);
556     return GNUNET_SYSERR;
557   }
558 }
559
560
561 /**
562  * Get (or create) a stream to talk to the given peer.
563  *
564  * @param target peer we want to communicate with
565  */
566 static struct StreamHandle *
567 get_stream (const struct GNUNET_PeerIdentity *target)
568 {
569   struct StreamHandle *sh;
570
571   sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
572                                           &target->hashPubKey);
573   if (NULL != sh)
574   {
575     if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
576     {
577       GNUNET_SCHEDULER_cancel (sh->timeout_task);
578       sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
579     }
580     return sh;
581   }
582   sh = GNUNET_malloc (sizeof (struct StreamHandle));
583   sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
584                                       sh);
585   sh->target = *target;
586   sh->stream = GNUNET_STREAM_open (GSF_cfg,
587                                    &sh->target,
588                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
589                                    &stream_ready_cb, sh,
590                                    GNUNET_STREAM_OPTION_END);
591   GNUNET_assert (GNUNET_OK ==
592                  GNUNET_CONTAINER_multihashmap_put (stream_map,
593                                                     &sh->target.hashPubKey,
594                                                     sh,
595                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
596   return sh;
597 }
598
599
600 /**
601  * Look for a block by directly contacting a particular peer.
602  *
603  * @param target peer that should have the block
604  * @param query hash to query for the block
605  * @param type desired type for the block
606  * @param proc function to call with result
607  * @param proc_cls closure for 'proc'
608  * @return handle to cancel the operation
609  */
610 struct GSF_StreamRequest *
611 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
612                   const struct GNUNET_HashCode *query,
613                   enum GNUNET_BLOCK_Type type,
614                   GSF_StreamReplyProcessor proc, void *proc_cls)
615 {
616   struct StreamHandle *sh;
617   struct GSF_StreamRequest *sr;
618
619   sh = get_stream (target);
620   sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
621   sr->sh = sh;
622   sr->proc = proc;
623   sr->proc_cls = proc_cls;
624   sr->type = type;
625   sr->query = *query;
626   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
627                                sh->pending_tail,
628                                sr);
629   if (GNUNET_YES == sh->is_ready)
630     transmit_pending (sh);
631   return sr;
632 }
633
634
635 /**
636  * Task called when it is time to destroy an inactive stream.
637  *
638  * @param cls the 'struct StreamHandle' to tear down
639  * @param tc scheduler context, unused
640  */
641 static void
642 stream_timeout (void *cls,
643                 const struct GNUNET_SCHEDULER_TaskContext *tc)
644 {
645   struct StreamHandle *sh = cls;
646
647   sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
648   destroy_stream_handle (sh);
649 }
650
651
652 /**
653  * Cancel an active request; must not be called after 'proc'
654  * was calld.
655  *
656  * @param sr request to cancel
657  */
658 void
659 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
660 {
661   struct StreamHandle *sh = sr->sh;
662
663   if (GNUNET_YES == sr->was_transmitted)
664     GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
665                                  sh->waiting_tail,
666                                  sr);
667   else
668     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
669                                  sh->pending_tail,
670                                  sr);
671   GNUNET_free (sr);
672   if ( (NULL == sh->waiting_head) &&
673        (NULL == sh->pending_head) )
674     sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
675                                                      &stream_timeout,
676                                                      sh);
677 }
678
679
680 /* ********************* server-side code ************************* */
681
682
683 /**
684  * We're done with a particular client, clean up.
685  *
686  * @param sc client to clean up
687  */
688 static void
689 terminate_stream (struct StreamClient *sc)
690 {
691   GNUNET_STATISTICS_update (GSF_stats,
692                             gettext_noop ("# stream connections active"), -1,
693                             GNUNET_NO);
694   if (NULL != sc->rh)
695     GNUNET_STREAM_io_read_cancel (sc->rh);
696   if (NULL != sc->wh)
697     GNUNET_STREAM_io_write_cancel (sc->wh);
698   if (NULL != sc->qe)
699     GNUNET_DATASTORE_cancel (sc->qe);
700   GNUNET_SERVER_mst_destroy (sc->mst);
701   GNUNET_STREAM_close (sc->socket);
702   GNUNET_CONTAINER_DLL_remove (sc_head,
703                                sc_tail,
704                                sc);
705   GNUNET_free (sc);
706 }
707
708
709 /**
710  * Functions of this signature are called whenever data is available from the
711  * stream.
712  *
713  * @param cls the closure from GNUNET_STREAM_read
714  * @param status the status of the stream at the time this function is called
715  * @param data traffic from the other side
716  * @param size the number of bytes available in data read; will be 0 on timeout 
717  * @return number of bytes of processed from 'data' (any data remaining should be
718  *         given to the next time the read processor is called).
719  */
720 static size_t 
721 process_request (void *cls,
722                  enum GNUNET_STREAM_Status status,
723                  const void *data,
724                  size_t size);
725
726
727 /**
728  * We're done handling a request from a client, read the next one.
729  *
730  * @param sc client to continue reading requests from
731  */
732 static void
733 continue_reading (struct StreamClient *sc)
734 {
735   int ret;
736
737   ret = 
738     GNUNET_SERVER_mst_receive (sc->mst,
739                                NULL,
740                                NULL, 0,
741                                GNUNET_NO, GNUNET_YES);
742   if (GNUNET_NO == ret)
743     return; 
744   sc->rh = GNUNET_STREAM_read (sc->socket,
745                                GNUNET_TIME_UNIT_FOREVER_REL,
746                                &process_request,
747                                sc);      
748 }
749
750
751 /**
752  * Functions of this signature are called whenever data is available from the
753  * stream.
754  *
755  * @param cls the closure from GNUNET_STREAM_read
756  * @param status the status of the stream at the time this function is called
757  * @param data traffic from the other side
758  * @param size the number of bytes available in data read; will be 0 on timeout 
759  * @return number of bytes of processed from 'data' (any data remaining should be
760  *         given to the next time the read processor is called).
761  */
762 static size_t 
763 process_request (void *cls,
764                  enum GNUNET_STREAM_Status status,
765                  const void *data,
766                  size_t size)
767 {
768   struct StreamClient *sc = cls;
769   int ret;
770
771   sc->rh = NULL;
772   switch (status)
773   {
774   case GNUNET_STREAM_OK:
775     ret = 
776       GNUNET_SERVER_mst_receive (sc->mst,
777                                  NULL,
778                                  data, size,
779                                  GNUNET_NO, GNUNET_YES);
780     if (GNUNET_NO == ret)
781       return size; /* more messages in MST */
782     if (GNUNET_SYSERR == ret)
783     {
784       GNUNET_break_op (0);
785       terminate_stream (sc);
786       return size;
787     }
788     break;
789   case GNUNET_STREAM_TIMEOUT:
790   case GNUNET_STREAM_SHUTDOWN:
791   case GNUNET_STREAM_SYSERR:
792   case GNUNET_STREAM_BROKEN:
793     terminate_stream (sc);
794     return size;
795   default:
796     GNUNET_break (0);
797     return size;
798   }
799   continue_reading (sc);
800   return size;
801 }
802
803
804 /**
805  * Sending a reply was completed, continue processing.
806  *
807  * @param cls closure with the struct StreamClient which sent the query
808  */
809 static void
810 write_continuation (void *cls,
811                     enum GNUNET_STREAM_Status status,
812                     size_t size)
813 {
814   struct StreamClient *sc = cls;
815   
816   sc->wh = NULL;
817   if ( (GNUNET_STREAM_OK == status) &&
818        (size == sc->reply_size) )
819   {
820     GNUNET_STATISTICS_update (GSF_stats,
821                               gettext_noop ("# Blocks transferred via stream"), 1,
822                               GNUNET_NO);
823     continue_reading (sc);
824   }
825   else
826     terminate_stream (sc);    
827 }
828
829
830 /**
831  * Process a datum that was stored in the datastore.
832  *
833  * @param cls closure with the struct StreamClient which sent the query
834  * @param key key for the content
835  * @param size number of bytes in data
836  * @param data content stored
837  * @param type type of the content
838  * @param priority priority of the content
839  * @param anonymity anonymity-level for the content
840  * @param expiration expiration time for the content
841  * @param uid unique identifier for the datum;
842  *        maybe 0 if no unique identifier is available
843  */
844 static void 
845 handle_datastore_reply (void *cls,
846                         const struct GNUNET_HashCode * key,
847                         size_t size, const void *data,
848                         enum GNUNET_BLOCK_Type type,
849                         uint32_t priority,
850                         uint32_t anonymity,
851                         struct GNUNET_TIME_Absolute
852                         expiration, uint64_t uid)
853 {
854   struct StreamClient *sc = cls;
855   size_t msize = size + sizeof (struct StreamReplyMessage);
856   char buf[msize] GNUNET_ALIGN;
857   struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
858
859   sc->qe = NULL;
860   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
861   {
862     if (GNUNET_OK !=
863         GNUNET_FS_handle_on_demand_block (key,
864                                           size, data, type,
865                                           priority, anonymity,
866                                           expiration, uid,
867                                           &handle_datastore_reply,
868                                           sc))
869     {
870       continue_reading (sc);
871     }
872     return;
873   }
874   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
875   {
876     GNUNET_break (0);
877     continue_reading (sc);
878     return;
879   }
880   srm->header.size = htons ((uint16_t) msize);
881   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
882   srm->type = htonl (type);
883   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
884   memcpy (&srm[1], data, size);
885   sc->reply_size = msize;
886   sc->wh = GNUNET_STREAM_write (sc->socket,
887                                 buf, msize,
888                                 GNUNET_TIME_UNIT_FOREVER_REL,
889                                 &write_continuation,
890                                 sc);
891   if (NULL == sc->wh)
892   {
893     terminate_stream (sc);
894     return;
895   }
896 }
897
898
899 /**
900  * Functions with this signature are called whenever a
901  * complete query message is received.
902  *
903  * Do not call GNUNET_SERVER_mst_destroy in callback
904  *
905  * @param cls closure with the 'struct StreamClient'
906  * @param client identification of the client, NULL
907  * @param message the actual message
908  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
909  */
910 static int
911 request_cb (void *cls,
912             void *client,
913             const struct GNUNET_MessageHeader *message)
914 {
915   struct StreamClient *sc = cls;
916   const struct StreamQueryMessage *sqm;
917
918   switch (ntohs (message->type))
919   {
920   case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
921     if (sizeof (struct StreamQueryMessage) != 
922         ntohs (message->size))
923     {
924       GNUNET_break_op (0);
925       return GNUNET_SYSERR;
926     }
927     sqm = (const struct StreamQueryMessage *) message;
928     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
929                 "Received query for `%s' via stream\n",
930                 GNUNET_h2s (&sqm->query));
931     GNUNET_STATISTICS_update (GSF_stats,
932                               gettext_noop ("# queries received via stream"), 1,
933                               GNUNET_NO);
934     sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
935                                        0,
936                                        &sqm->query,
937                                        ntohl (sqm->type),
938                                        0 /* priority */, 
939                                        GSF_datastore_queue_size,
940                                        GNUNET_TIME_UNIT_FOREVER_REL,
941                                        &handle_datastore_reply, sc);
942     if (NULL == sc->qe)
943       continue_reading (sc);
944     return GNUNET_OK;
945   default:
946     GNUNET_break_op (0);
947     return GNUNET_SYSERR;
948   }
949 }
950
951
952 /**
953  * Functions of this type are called upon new stream connection from other peers
954  * or upon binding error which happen when the app_port given in
955  * GNUNET_STREAM_listen() is already taken.
956  *
957  * @param cls the closure from GNUNET_STREAM_listen
958  * @param socket the socket representing the stream; NULL on binding error
959  * @param initiator the identity of the peer who wants to establish a stream
960  *            with us; NULL on binding error
961  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
962  *             stream (the socket will be invalid after the call)
963  */
964 static int 
965 accept_cb (void *cls,
966            struct GNUNET_STREAM_Socket *socket,
967            const struct GNUNET_PeerIdentity *initiator)
968 {
969   struct StreamClient *sc;
970
971   if (NULL == socket)
972     return GNUNET_SYSERR;
973   GNUNET_STATISTICS_update (GSF_stats,
974                             gettext_noop ("# stream connections active"), 1,
975                             GNUNET_NO);
976   sc = GNUNET_malloc (sizeof (struct StreamClient));
977   sc->socket = socket;
978   sc->mst = GNUNET_SERVER_mst_create (&request_cb,
979                                       sc);
980   sc->rh = GNUNET_STREAM_read (sc->socket,
981                                GNUNET_TIME_UNIT_FOREVER_REL,
982                                &process_request,
983                                sc);
984   GNUNET_CONTAINER_DLL_insert (sc_head,
985                                sc_tail,
986                                sc);
987   return GNUNET_OK;
988 }
989
990
991 /**
992  * Initialize subsystem for non-anonymous file-sharing.
993  */
994 void
995 GSF_stream_start ()
996 {
997   stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
998   listen_socket = GNUNET_STREAM_listen (GSF_cfg,
999                                         GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1000                                         &accept_cb, NULL,
1001                                         GNUNET_STREAM_OPTION_END);
1002 }
1003
1004
1005 /**
1006  * Function called on each active streams to shut them down.
1007  *
1008  * @param cls NULL
1009  * @param key target peer, unused
1010  * @param value the 'struct StreamHandle' to destroy
1011  * @return GNUNET_YES (continue to iterate)
1012  */
1013 static int
1014 release_streams (void *cls,
1015                  const struct GNUNET_HashCode *key,
1016                  void *value)
1017 {
1018   struct StreamHandle *sh = value;
1019
1020   destroy_stream_handle (sh);
1021   return GNUNET_YES;
1022 }
1023
1024
1025 /**
1026  * Shutdown subsystem for non-anonymous file-sharing.
1027  */
1028 void
1029 GSF_stream_stop ()
1030 {
1031   struct StreamClient *sc;
1032
1033   while (NULL != (sc = sc_head))
1034     terminate_stream (sc);
1035   if (NULL != listen_socket)
1036   {
1037     GNUNET_STREAM_listen_close (listen_socket);
1038     listen_socket = NULL;
1039   }
1040   GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1041                                          &release_streams,
1042                                          NULL);
1043   GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1044   stream_map = NULL;
1045 }
1046
1047 /* end of gnunet-service-fs_stream.c */