-ensure that either stream_api calls callbacks last or that we don't destroy a stream...
[oweals/gnunet.git] / src / fs / gnunet-service-fs_stream.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_stream.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - limit # concurrent clients, have timeouts for server-side
28  */
29 #include "platform.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_stream_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_applications.h"
35 #include "gnunet-service-fs.h"
36 #include "gnunet-service-fs_indexing.h"
37 #include "gnunet-service-fs_stream.h"
38
39 /**
40  * Information we keep around for each active streaming client.
41  */
42 struct StreamClient
43 {
44   /**
45    * DLL
46    */ 
47   struct StreamClient *next;
48
49   /**
50    * DLL
51    */ 
52   struct StreamClient *prev;
53
54   /**
55    * Socket for communication.
56    */ 
57   struct GNUNET_STREAM_Socket *socket;
58
59   /**
60    * Handle for active read operation, or NULL.
61    */ 
62   struct GNUNET_STREAM_IOReadHandle *rh;
63
64   /**
65    * Handle for active write operation, or NULL.
66    */ 
67   struct GNUNET_STREAM_IOWriteHandle *wh;
68   
69   /**
70    * Tokenizer for requests.
71    */
72   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
73   
74   /**
75    * Current active request to the datastore, if we have one pending.
76    */
77   struct GNUNET_DATASTORE_QueueEntry *qe;
78
79   /**
80    * Task that is scheduled to asynchronously terminate the connection.
81    */
82   GNUNET_SCHEDULER_TaskIdentifier terminate_task;
83
84   /**
85    * Size of the last write that was initiated.
86    */ 
87   size_t reply_size;
88
89 };
90
91
92 /**
93  * Query from one peer, asking the other for CHK-data.
94  */
95 struct StreamQueryMessage
96 {
97
98   /**
99    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
100    */
101   struct GNUNET_MessageHeader header;
102
103   /**
104    * Block type must be DBLOCK or IBLOCK.
105    */
106   uint32_t type;
107
108   /**
109    * Query hash from CHK (hash of encrypted block).
110    */
111   struct GNUNET_HashCode query;
112
113 };
114
115
116 /**
117  * Reply to a StreamQueryMessage.
118  */
119 struct StreamReplyMessage
120 {
121
122   /**
123    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
124    */
125   struct GNUNET_MessageHeader header;
126
127   /**
128    * Block type must be DBLOCK or IBLOCK.
129    */
130   uint32_t type;
131
132   /**
133    * Expiration time for the block.
134    */
135   struct GNUNET_TIME_AbsoluteNBO expiration;
136
137   /* followed by the encrypted block */
138
139 };
140
141
142 /** 
143  * Handle for a stream to another peer.
144  */
145 struct StreamHandle;
146
147
148 /**
149  * Handle for a request that is going out via stream API.
150  */
151 struct GSF_StreamRequest
152 {
153
154   /**
155    * DLL.
156    */
157   struct GSF_StreamRequest *next;
158
159   /**
160    * DLL.
161    */
162   struct GSF_StreamRequest *prev;
163
164   /**
165    * Which stream is this request associated with?
166    */
167   struct StreamHandle *sh;
168
169   /**
170    * Function to call with the result.
171    */
172   GSF_StreamReplyProcessor proc;
173
174   /**
175    * Closure for 'proc'
176    */
177   void *proc_cls;
178
179   /**
180    * Query to transmit to the other peer.
181    */
182   struct GNUNET_HashCode query;
183
184   /**
185    * Desired type for the reply.
186    */
187   enum GNUNET_BLOCK_Type type;
188
189   /**
190    * Did we transmit this request already? YES if we are
191    * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
192    */
193   int was_transmitted;
194 };
195
196
197 /** 
198  * Handle for a stream to another peer.
199  */
200 struct StreamHandle
201 {
202   /**
203    * Head of DLL of pending requests on this stream.
204    */
205   struct GSF_StreamRequest *pending_head;
206
207   /**
208    * Tail of DLL of pending requests on this stream.
209    */
210   struct GSF_StreamRequest *pending_tail;
211
212   /**
213    * Head of DLL of requests waiting for a reply on this stream.
214    */
215   struct GSF_StreamRequest *waiting_head;
216
217   /**
218    * Tail of DLL of requests waiting for a reply on this stream.
219    */
220   struct GSF_StreamRequest *waiting_tail;
221
222   /**
223    * Connection to the other peer.
224    */
225   struct GNUNET_STREAM_Socket *stream;
226
227   /**
228    * Handle for active read operation, or NULL.
229    */ 
230   struct GNUNET_STREAM_IOReadHandle *rh;
231
232   /**
233    * Handle for active write operation, or NULL.
234    */ 
235   struct GNUNET_STREAM_IOWriteHandle *wh;
236
237   /**
238    * Tokenizer for replies.
239    */
240   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
241
242   /**
243    * Which peer does this stream go to?
244    */ 
245   struct GNUNET_PeerIdentity target;
246
247   /**
248    * Task to kill inactive streams (we keep them around for
249    * a few seconds to give the application a chance to give
250    * us another query).
251    */
252   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
253
254   /**
255    * Task to reset streams that had errors (asynchronously,
256    * as we may not be able to do it immediately during a
257    * callback from the stream API).
258    */
259   GNUNET_SCHEDULER_TaskIdentifier reset_task;
260
261   /**
262    * Is this stream ready for transmission?
263    */
264   int is_ready;
265
266 };
267
268
269 /**
270  * Listen socket for incoming requests.
271  */
272 static struct GNUNET_STREAM_ListenSocket *listen_socket;
273
274 /**
275  * Head of DLL of stream clients.
276  */ 
277 static struct StreamClient *sc_head;
278
279 /**
280  * Tail of DLL of stream clients.
281  */ 
282 static struct StreamClient *sc_tail;
283
284 /**
285  * Map from peer identities to 'struct StreamHandles' with streams to
286  * those peers.
287  */
288 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
289
290
291 /* ********************* client-side code ************************* */
292
293
294 /**
295  * Destroy a stream handle.
296  *
297  * @param sh stream to process
298  */
299 static void
300 destroy_stream_handle (struct StreamHandle *sh)
301 {
302   struct GSF_StreamRequest *sr;
303
304   while (NULL != (sr = sh->pending_head))
305   {
306     sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
307               GNUNET_TIME_UNIT_FOREVER_ABS,
308               0, NULL);
309     GSF_stream_query_cancel (sr);
310   }
311   while (NULL != (sr = sh->waiting_head))
312   {
313     sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
314               GNUNET_TIME_UNIT_FOREVER_ABS,
315               0, NULL);
316     GSF_stream_query_cancel (sr);
317   }
318   if (NULL != sh->wh)
319     GNUNET_STREAM_io_write_cancel (sh->wh);
320   if (NULL != sh->rh)
321     GNUNET_STREAM_io_read_cancel (sh->rh);
322   if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
323     GNUNET_SCHEDULER_cancel (sh->timeout_task);
324   GNUNET_STREAM_close (sh->stream);
325   GNUNET_assert (GNUNET_OK ==
326                  GNUNET_CONTAINER_multihashmap_remove (stream_map,
327                                                        &sh->target.hashPubKey,
328                                                        sh));
329   GNUNET_free (sh);
330 }
331
332
333 /**
334  * Transmit pending requests via the stream.
335  *
336  * @param sh stream to process
337  */
338 static void
339 transmit_pending (struct StreamHandle *sh);
340
341
342 /**
343  * Function called once the stream is ready for transmission.
344  *
345  * @param cls the 'struct StreamHandle'
346  * @param socket stream socket handle
347  */
348 static void
349 stream_ready_cb (void *cls,
350                  struct GNUNET_STREAM_Socket *socket)
351 {
352   struct StreamHandle *sh = cls;
353
354   sh->is_ready = GNUNET_YES;
355   transmit_pending (sh);
356 }
357
358
359 /**
360  * We had a serious error, tear down and re-create stream from scratch.
361  *
362  * @param sh stream to reset
363  */
364 static void
365 reset_stream (struct StreamHandle *sh)
366 {
367   struct GSF_StreamRequest *sr;
368   
369   if (NULL != sh->rh)
370     GNUNET_STREAM_io_read_cancel (sh->rh);
371   GNUNET_STREAM_close (sh->stream);
372   sh->is_ready = GNUNET_NO;
373   while (NULL != (sr = sh->waiting_tail))
374   {
375     GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
376                                  sh->waiting_tail,
377                                  sr);
378     GNUNET_CONTAINER_DLL_insert (sh->pending_head,
379                                  sh->pending_tail,
380                                  sr);
381     sr->was_transmitted = GNUNET_NO;
382   }
383   sh->stream = GNUNET_STREAM_open (GSF_cfg,
384                                    &sh->target,
385                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
386                                    &stream_ready_cb, sh,
387                                    GNUNET_STREAM_OPTION_END);
388 }
389
390
391 /**
392  * Task called when it is time to destroy an inactive stream.
393  *
394  * @param cls the 'struct StreamHandle' to tear down
395  * @param tc scheduler context, unused
396  */
397 static void
398 stream_timeout (void *cls,
399                 const struct GNUNET_SCHEDULER_TaskContext *tc)
400 {
401   struct StreamHandle *sh = cls;
402
403   sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
404   destroy_stream_handle (sh);
405 }
406
407
408 /**
409  * Task called when it is time to reset an stream.
410  *
411  * @param cls the 'struct StreamHandle' to tear down
412  * @param tc scheduler context, unused
413  */
414 static void
415 reset_stream_task (void *cls,
416                    const struct GNUNET_SCHEDULER_TaskContext *tc)
417 {
418   struct StreamHandle *sh = cls;
419
420   sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
421   reset_stream (sh);
422 }
423
424
425 /**
426  * We had a serious error, tear down and re-create stream from scratch,
427  * but do so asynchronously.
428  *
429  * @param sh stream to reset
430  */
431 static void
432 reset_stream_async (struct StreamHandle *sh)
433 {
434   if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
435     sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
436                                                sh);
437 }
438
439
440 /**
441  * We got a reply from the stream.  Process it.
442  *
443  * @param cls the struct StreamHandle 
444  * @param status the status of the stream at the time this function is called
445  * @param data traffic from the other side
446  * @param size the number of bytes available in data read; will be 0 on timeout 
447  * @return number of bytes of processed from 'data' (any data remaining should be
448  *         given to the next time the read processor is called).
449  */
450 static size_t
451 handle_stream_reply (void *cls,
452                      enum GNUNET_STREAM_Status status,
453                      const void *data,
454                      size_t size)
455 {
456   struct StreamHandle *sh = cls;
457
458   sh->rh = NULL;
459   if (GNUNET_SYSERR == 
460       GNUNET_SERVER_mst_receive (sh->mst,
461                                  NULL,
462                                  data, size,
463                                  GNUNET_NO, GNUNET_NO))
464   {
465     GNUNET_break_op (0);
466     reset_stream_async (sh);
467     return size;
468   }
469   sh->rh = GNUNET_STREAM_read (sh->stream,
470                                GNUNET_TIME_UNIT_FOREVER_REL,
471                                &handle_stream_reply,
472                                sh);
473   return size;
474 }
475
476
477 /**
478  * Functions of this signature are called whenever we transmitted a
479  * query via a stream.
480  *
481  * @param cls the struct StreamHandle for which we did the write call
482  * @param status the status of the stream at the time this function is called;
483  *          GNUNET_OK if writing to stream was completed successfully,
484  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
485  *          mean time.
486  * @param size the number of bytes written
487  */
488 static void
489 query_write_continuation (void *cls,
490                           enum GNUNET_STREAM_Status status,
491                           size_t size)
492 {
493   struct StreamHandle *sh = cls;
494
495   sh->wh = NULL;
496   if ( (GNUNET_STREAM_OK != status) ||
497        (sizeof (struct StreamQueryMessage) != size) )
498   {
499     reset_stream (sh);
500     return;
501   }
502   if (NULL == sh->rh)
503     sh->rh = GNUNET_STREAM_read (sh->stream,
504                                  GNUNET_TIME_UNIT_FOREVER_REL,
505                                  &handle_stream_reply,
506                                  sh);
507   transmit_pending (sh);
508 }
509           
510
511 /**
512  * Transmit pending requests via the stream.
513  *
514  * @param sh stream to process
515  */
516 static void
517 transmit_pending (struct StreamHandle *sh)
518 {
519   struct StreamQueryMessage sqm;
520   struct GSF_StreamRequest *sr;
521
522   if (NULL != sh->wh)
523     return;
524   sr = sh->pending_head;
525   if (NULL == sr)
526     return;
527   GNUNET_CONTAINER_DLL_remove (sh->pending_head,
528                                sh->pending_tail,
529                                sr);
530   GNUNET_CONTAINER_DLL_insert_tail (sh->waiting_head,
531                                     sh->waiting_tail,
532                                     sr);
533   sr->was_transmitted = GNUNET_YES;
534   sqm.header.size = htons (sizeof (sqm));
535   sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
536   sqm.type = htonl (sr->type);
537   sqm.query = sr->query;
538   sh->wh = GNUNET_STREAM_write (sh->stream,
539                                 &sqm, sizeof (sqm),
540                                 GNUNET_TIME_UNIT_FOREVER_REL,
541                                 &query_write_continuation,
542                                 sh);
543 }
544
545
546 /**
547  * Functions with this signature are called whenever a
548  * complete reply is received.
549  *
550  * Do not call GNUNET_SERVER_mst_destroy in callback
551  *
552  * @param cls closure with the 'struct StreamHandle'
553  * @param client identification of the client, NULL
554  * @param message the actual message
555  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
556  */
557 static int
558 reply_cb (void *cls,
559           void *client,
560           const struct GNUNET_MessageHeader *message)
561 {
562   struct StreamHandle *sh = cls;
563   const struct StreamReplyMessage *srm;
564   uint16_t msize;
565   enum GNUNET_BLOCK_Type type;
566   struct GNUNET_HashCode query;
567   struct GSF_StreamRequest *sr;
568
569   msize = ntohs (message->size);
570   switch (ntohs (message->type))
571   {
572   case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
573     if (sizeof (struct StreamReplyMessage) > msize)
574     {
575       GNUNET_break_op (0);
576       reset_stream_async (sh);
577       return GNUNET_SYSERR;
578     }
579     srm = (const struct StreamReplyMessage *) message;
580     msize -= sizeof (struct StreamReplyMessage);
581     type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
582     if (GNUNET_YES !=
583         GNUNET_BLOCK_get_key (GSF_block_ctx,
584                               type,
585                               &srm[1], msize, &query))
586     {
587       GNUNET_break_op (0); 
588       reset_stream_async (sh);
589       return GNUNET_SYSERR;
590     }
591     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
592                 "Received reply `%s' via stream\n",
593                 GNUNET_h2s (&query));
594     GNUNET_STATISTICS_update (GSF_stats,
595                               gettext_noop ("# replies received via stream"), 1,
596                               GNUNET_NO);
597     for (sr = sh->waiting_head; NULL != sr; sr = sr->next)
598       if (0 == memcmp (&query,
599                        &sr->query,
600                        sizeof (struct GNUNET_HashCode)))
601         break;
602     if (NULL == sr)
603     {
604       GNUNET_STATISTICS_update (GSF_stats,
605                                 gettext_noop ("# replies received via stream dropped"), 1,
606                                 GNUNET_NO);
607       return GNUNET_OK;
608     }
609     sr->proc (sr->proc_cls,
610               type,
611               GNUNET_TIME_absolute_ntoh (srm->expiration),
612               msize,
613               &srm[1]);
614     GSF_stream_query_cancel (sr);
615     return GNUNET_OK;
616   default:
617     GNUNET_break_op (0);
618     reset_stream_async (sh);
619     return GNUNET_SYSERR;
620   }
621 }
622
623
624 /**
625  * Get (or create) a stream to talk to the given peer.
626  *
627  * @param target peer we want to communicate with
628  */
629 static struct StreamHandle *
630 get_stream (const struct GNUNET_PeerIdentity *target)
631 {
632   struct StreamHandle *sh;
633
634   sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
635                                           &target->hashPubKey);
636   if (NULL != sh)
637   {
638     if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
639     {
640       GNUNET_SCHEDULER_cancel (sh->timeout_task);
641       sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
642     }
643     return sh;
644   }
645   sh = GNUNET_malloc (sizeof (struct StreamHandle));
646   sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
647                                       sh);
648   sh->target = *target;
649   sh->stream = GNUNET_STREAM_open (GSF_cfg,
650                                    &sh->target,
651                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
652                                    &stream_ready_cb, sh,
653                                    GNUNET_STREAM_OPTION_END);
654   GNUNET_assert (GNUNET_OK ==
655                  GNUNET_CONTAINER_multihashmap_put (stream_map,
656                                                     &sh->target.hashPubKey,
657                                                     sh,
658                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
659   return sh;
660 }
661
662
663 /**
664  * Look for a block by directly contacting a particular peer.
665  *
666  * @param target peer that should have the block
667  * @param query hash to query for the block
668  * @param type desired type for the block
669  * @param proc function to call with result
670  * @param proc_cls closure for 'proc'
671  * @return handle to cancel the operation
672  */
673 struct GSF_StreamRequest *
674 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
675                   const struct GNUNET_HashCode *query,
676                   enum GNUNET_BLOCK_Type type,
677                   GSF_StreamReplyProcessor proc, void *proc_cls)
678 {
679   struct StreamHandle *sh;
680   struct GSF_StreamRequest *sr;
681
682   sh = get_stream (target);
683   sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
684   sr->sh = sh;
685   sr->proc = proc;
686   sr->proc_cls = proc_cls;
687   sr->type = type;
688   sr->query = *query;
689   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
690                                sh->pending_tail,
691                                sr);
692   if (GNUNET_YES == sh->is_ready)
693     transmit_pending (sh);
694   return sr;
695 }
696
697
698 /**
699  * Cancel an active request; must not be called after 'proc'
700  * was calld.
701  *
702  * @param sr request to cancel
703  */
704 void
705 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
706 {
707   struct StreamHandle *sh = sr->sh;
708
709   if (GNUNET_YES == sr->was_transmitted)
710     GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
711                                  sh->waiting_tail,
712                                  sr);
713   else
714     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
715                                  sh->pending_tail,
716                                  sr);
717   GNUNET_free (sr);
718   if ( (NULL == sh->waiting_head) &&
719        (NULL == sh->pending_head) )
720     sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
721                                                      &stream_timeout,
722                                                      sh);
723 }
724
725
726 /* ********************* server-side code ************************* */
727
728
729 /**
730  * We're done with a particular client, clean up.
731  *
732  * @param sc client to clean up
733  */
734 static void
735 terminate_stream (struct StreamClient *sc)
736 {
737   GNUNET_STATISTICS_update (GSF_stats,
738                             gettext_noop ("# stream connections active"), -1,
739                             GNUNET_NO);
740   if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
741     GNUNET_SCHEDULER_cancel (sc->terminate_task); 
742  if (NULL != sc->rh)
743     GNUNET_STREAM_io_read_cancel (sc->rh);
744   if (NULL != sc->wh)
745     GNUNET_STREAM_io_write_cancel (sc->wh);
746   if (NULL != sc->qe)
747     GNUNET_DATASTORE_cancel (sc->qe);
748   GNUNET_SERVER_mst_destroy (sc->mst);
749   GNUNET_STREAM_close (sc->socket);
750   GNUNET_CONTAINER_DLL_remove (sc_head,
751                                sc_tail,
752                                sc);
753   GNUNET_free (sc);
754 }
755
756
757 /**
758  * Task run to asynchronously terminate the stream.
759  *
760  * @param cls the 'struct StreamClient'
761  * @param tc scheduler context
762  */ 
763 static void
764 terminate_stream_task (void *cls,
765                        const struct GNUNET_SCHEDULER_TaskContext *tc)
766 {
767   struct StreamClient *sc = cls;
768
769   sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
770   terminate_stream (sc);
771 }
772
773
774 /**
775  * We had a serious error, termiante stream,
776  * but do so asynchronously.
777  *
778  * @param sc stream to reset
779  */
780 static void
781 terminate_stream_async (struct StreamClient *sc)
782 {
783   if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
784     sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
785                                                    sc);
786 }
787
788
789 /**
790  * Functions of this signature are called whenever data is available from the
791  * stream.
792  *
793  * @param cls the closure from GNUNET_STREAM_read
794  * @param status the status of the stream at the time this function is called
795  * @param data traffic from the other side
796  * @param size the number of bytes available in data read; will be 0 on timeout 
797  * @return number of bytes of processed from 'data' (any data remaining should be
798  *         given to the next time the read processor is called).
799  */
800 static size_t 
801 process_request (void *cls,
802                  enum GNUNET_STREAM_Status status,
803                  const void *data,
804                  size_t size);
805
806
807 /**
808  * We're done handling a request from a client, read the next one.
809  *
810  * @param sc client to continue reading requests from
811  */
812 static void
813 continue_reading (struct StreamClient *sc)
814 {
815   int ret;
816
817   ret = 
818     GNUNET_SERVER_mst_receive (sc->mst,
819                                NULL,
820                                NULL, 0,
821                                GNUNET_NO, GNUNET_YES);
822   if (GNUNET_NO == ret)
823     return; 
824   sc->rh = GNUNET_STREAM_read (sc->socket,
825                                GNUNET_TIME_UNIT_FOREVER_REL,
826                                &process_request,
827                                sc);      
828 }
829
830
831 /**
832  * Functions of this signature are called whenever data is available from the
833  * stream.
834  *
835  * @param cls the closure from GNUNET_STREAM_read
836  * @param status the status of the stream at the time this function is called
837  * @param data traffic from the other side
838  * @param size the number of bytes available in data read; will be 0 on timeout 
839  * @return number of bytes of processed from 'data' (any data remaining should be
840  *         given to the next time the read processor is called).
841  */
842 static size_t 
843 process_request (void *cls,
844                  enum GNUNET_STREAM_Status status,
845                  const void *data,
846                  size_t size)
847 {
848   struct StreamClient *sc = cls;
849   int ret;
850
851   sc->rh = NULL;
852   switch (status)
853   {
854   case GNUNET_STREAM_OK:
855     ret = 
856       GNUNET_SERVER_mst_receive (sc->mst,
857                                  NULL,
858                                  data, size,
859                                  GNUNET_NO, GNUNET_YES);
860     if (GNUNET_NO == ret)
861       return size; /* more messages in MST */
862     if (GNUNET_SYSERR == ret)
863     {
864       GNUNET_break_op (0);
865       terminate_stream_async (sc);
866       return size;
867     }
868     break;
869   case GNUNET_STREAM_TIMEOUT:
870   case GNUNET_STREAM_SHUTDOWN:
871   case GNUNET_STREAM_SYSERR:
872   case GNUNET_STREAM_BROKEN:
873     terminate_stream_async (sc);
874     return size;
875   default:
876     GNUNET_break (0);
877     return size;
878   }
879   continue_reading (sc);
880   return size;
881 }
882
883
884 /**
885  * Sending a reply was completed, continue processing.
886  *
887  * @param cls closure with the struct StreamClient which sent the query
888  */
889 static void
890 write_continuation (void *cls,
891                     enum GNUNET_STREAM_Status status,
892                     size_t size)
893 {
894   struct StreamClient *sc = cls;
895   
896   sc->wh = NULL;
897   if ( (GNUNET_STREAM_OK == status) &&
898        (size == sc->reply_size) )
899   {
900     GNUNET_STATISTICS_update (GSF_stats,
901                               gettext_noop ("# Blocks transferred via stream"), 1,
902                               GNUNET_NO);
903     continue_reading (sc);
904   }
905   else
906     terminate_stream (sc);    
907 }
908
909
910 /**
911  * Process a datum that was stored in the datastore.
912  *
913  * @param cls closure with the struct StreamClient which sent the query
914  * @param key key for the content
915  * @param size number of bytes in data
916  * @param data content stored
917  * @param type type of the content
918  * @param priority priority of the content
919  * @param anonymity anonymity-level for the content
920  * @param expiration expiration time for the content
921  * @param uid unique identifier for the datum;
922  *        maybe 0 if no unique identifier is available
923  */
924 static void 
925 handle_datastore_reply (void *cls,
926                         const struct GNUNET_HashCode * key,
927                         size_t size, const void *data,
928                         enum GNUNET_BLOCK_Type type,
929                         uint32_t priority,
930                         uint32_t anonymity,
931                         struct GNUNET_TIME_Absolute
932                         expiration, uint64_t uid)
933 {
934   struct StreamClient *sc = cls;
935   size_t msize = size + sizeof (struct StreamReplyMessage);
936   char buf[msize] GNUNET_ALIGN;
937   struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
938
939   sc->qe = NULL;
940   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
941   {
942     if (GNUNET_OK !=
943         GNUNET_FS_handle_on_demand_block (key,
944                                           size, data, type,
945                                           priority, anonymity,
946                                           expiration, uid,
947                                           &handle_datastore_reply,
948                                           sc))
949     {
950       continue_reading (sc);
951     }
952     return;
953   }
954   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
955   {
956     GNUNET_break (0);
957     continue_reading (sc);
958     return;
959   }
960   srm->header.size = htons ((uint16_t) msize);
961   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
962   srm->type = htonl (type);
963   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
964   memcpy (&srm[1], data, size);
965   sc->reply_size = msize;
966   sc->wh = GNUNET_STREAM_write (sc->socket,
967                                 buf, msize,
968                                 GNUNET_TIME_UNIT_FOREVER_REL,
969                                 &write_continuation,
970                                 sc);
971   if (NULL == sc->wh)
972   {
973     terminate_stream (sc);
974     return;
975   }
976 }
977
978
979 /**
980  * Functions with this signature are called whenever a
981  * complete query message is received.
982  *
983  * Do not call GNUNET_SERVER_mst_destroy in callback
984  *
985  * @param cls closure with the 'struct StreamClient'
986  * @param client identification of the client, NULL
987  * @param message the actual message
988  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
989  */
990 static int
991 request_cb (void *cls,
992             void *client,
993             const struct GNUNET_MessageHeader *message)
994 {
995   struct StreamClient *sc = cls;
996   const struct StreamQueryMessage *sqm;
997
998   switch (ntohs (message->type))
999   {
1000   case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
1001     if (sizeof (struct StreamQueryMessage) != 
1002         ntohs (message->size))
1003     {
1004       GNUNET_break_op (0);
1005       terminate_stream_async (sc);
1006       return GNUNET_SYSERR;
1007     }
1008     sqm = (const struct StreamQueryMessage *) message;
1009     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010                 "Received query for `%s' via stream\n",
1011                 GNUNET_h2s (&sqm->query));
1012     GNUNET_STATISTICS_update (GSF_stats,
1013                               gettext_noop ("# queries received via stream"), 1,
1014                               GNUNET_NO);
1015     sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1016                                        0,
1017                                        &sqm->query,
1018                                        ntohl (sqm->type),
1019                                        0 /* priority */, 
1020                                        GSF_datastore_queue_size,
1021                                        GNUNET_TIME_UNIT_FOREVER_REL,
1022                                        &handle_datastore_reply, sc);
1023     if (NULL == sc->qe)
1024       continue_reading (sc);
1025     return GNUNET_OK;
1026   default:
1027     GNUNET_break_op (0);
1028     terminate_stream_async (sc);
1029     return GNUNET_SYSERR;
1030   }
1031 }
1032
1033
1034 /**
1035  * Functions of this type are called upon new stream connection from other peers
1036  * or upon binding error which happen when the app_port given in
1037  * GNUNET_STREAM_listen() is already taken.
1038  *
1039  * @param cls the closure from GNUNET_STREAM_listen
1040  * @param socket the socket representing the stream; NULL on binding error
1041  * @param initiator the identity of the peer who wants to establish a stream
1042  *            with us; NULL on binding error
1043  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
1044  *             stream (the socket will be invalid after the call)
1045  */
1046 static int 
1047 accept_cb (void *cls,
1048            struct GNUNET_STREAM_Socket *socket,
1049            const struct GNUNET_PeerIdentity *initiator)
1050 {
1051   struct StreamClient *sc;
1052
1053   if (NULL == socket)
1054     return GNUNET_SYSERR;
1055   GNUNET_STATISTICS_update (GSF_stats,
1056                             gettext_noop ("# stream connections active"), 1,
1057                             GNUNET_NO);
1058   sc = GNUNET_malloc (sizeof (struct StreamClient));
1059   sc->socket = socket;
1060   sc->mst = GNUNET_SERVER_mst_create (&request_cb,
1061                                       sc);
1062   sc->rh = GNUNET_STREAM_read (sc->socket,
1063                                GNUNET_TIME_UNIT_FOREVER_REL,
1064                                &process_request,
1065                                sc);
1066   GNUNET_CONTAINER_DLL_insert (sc_head,
1067                                sc_tail,
1068                                sc);
1069   return GNUNET_OK;
1070 }
1071
1072
1073 /**
1074  * Initialize subsystem for non-anonymous file-sharing.
1075  */
1076 void
1077 GSF_stream_start ()
1078 {
1079   stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1080   listen_socket = GNUNET_STREAM_listen (GSF_cfg,
1081                                         GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1082                                         &accept_cb, NULL,
1083                                         GNUNET_STREAM_OPTION_END);
1084 }
1085
1086
1087 /**
1088  * Function called on each active streams to shut them down.
1089  *
1090  * @param cls NULL
1091  * @param key target peer, unused
1092  * @param value the 'struct StreamHandle' to destroy
1093  * @return GNUNET_YES (continue to iterate)
1094  */
1095 static int
1096 release_streams (void *cls,
1097                  const struct GNUNET_HashCode *key,
1098                  void *value)
1099 {
1100   struct StreamHandle *sh = value;
1101
1102   destroy_stream_handle (sh);
1103   return GNUNET_YES;
1104 }
1105
1106
1107 /**
1108  * Shutdown subsystem for non-anonymous file-sharing.
1109  */
1110 void
1111 GSF_stream_stop ()
1112 {
1113   struct StreamClient *sc;
1114
1115   while (NULL != (sc = sc_head))
1116     terminate_stream (sc);
1117   if (NULL != listen_socket)
1118   {
1119     GNUNET_STREAM_listen_close (listen_socket);
1120     listen_socket = NULL;
1121   }
1122   GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1123                                          &release_streams,
1124                                          NULL);
1125   GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1126   stream_map = NULL;
1127 }
1128
1129 /* end of gnunet-service-fs_stream.c */