-more logging
[oweals/gnunet.git] / src / fs / gnunet-service-fs_stream.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_stream.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - limit # concurrent clients, have timeouts for server-side
28  */
29 #include "platform.h"
30 #include "gnunet_constants.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_stream_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_applications.h"
35 #include "gnunet-service-fs.h"
36 #include "gnunet-service-fs_indexing.h"
37 #include "gnunet-service-fs_stream.h"
38
39 /**
40  * Information we keep around for each active streaming client.
41  */
42 struct StreamClient
43 {
44   /**
45    * DLL
46    */ 
47   struct StreamClient *next;
48
49   /**
50    * DLL
51    */ 
52   struct StreamClient *prev;
53
54   /**
55    * Socket for communication.
56    */ 
57   struct GNUNET_STREAM_Socket *socket;
58
59   /**
60    * Handle for active read operation, or NULL.
61    */ 
62   struct GNUNET_STREAM_IOReadHandle *rh;
63
64   /**
65    * Handle for active write operation, or NULL.
66    */ 
67   struct GNUNET_STREAM_IOWriteHandle *wh;
68   
69   /**
70    * Tokenizer for requests.
71    */
72   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
73   
74   /**
75    * Current active request to the datastore, if we have one pending.
76    */
77   struct GNUNET_DATASTORE_QueueEntry *qe;
78
79   /**
80    * Task that is scheduled to asynchronously terminate the connection.
81    */
82   GNUNET_SCHEDULER_TaskIdentifier terminate_task;
83
84   /**
85    * Size of the last write that was initiated.
86    */ 
87   size_t reply_size;
88
89 };
90
91
92 /**
93  * Query from one peer, asking the other for CHK-data.
94  */
95 struct StreamQueryMessage
96 {
97
98   /**
99    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
100    */
101   struct GNUNET_MessageHeader header;
102
103   /**
104    * Block type must be DBLOCK or IBLOCK.
105    */
106   uint32_t type;
107
108   /**
109    * Query hash from CHK (hash of encrypted block).
110    */
111   struct GNUNET_HashCode query;
112
113 };
114
115
116 /**
117  * Reply to a StreamQueryMessage.
118  */
119 struct StreamReplyMessage
120 {
121
122   /**
123    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
124    */
125   struct GNUNET_MessageHeader header;
126
127   /**
128    * Block type must be DBLOCK or IBLOCK.
129    */
130   uint32_t type;
131
132   /**
133    * Expiration time for the block.
134    */
135   struct GNUNET_TIME_AbsoluteNBO expiration;
136
137   /* followed by the encrypted block */
138
139 };
140
141
142 /** 
143  * Handle for a stream to another peer.
144  */
145 struct StreamHandle;
146
147
148 /**
149  * Handle for a request that is going out via stream API.
150  */
151 struct GSF_StreamRequest
152 {
153
154   /**
155    * DLL.
156    */
157   struct GSF_StreamRequest *next;
158
159   /**
160    * DLL.
161    */
162   struct GSF_StreamRequest *prev;
163
164   /**
165    * Which stream is this request associated with?
166    */
167   struct StreamHandle *sh;
168
169   /**
170    * Function to call with the result.
171    */
172   GSF_StreamReplyProcessor proc;
173
174   /**
175    * Closure for 'proc'
176    */
177   void *proc_cls;
178
179   /**
180    * Query to transmit to the other peer.
181    */
182   struct GNUNET_HashCode query;
183
184   /**
185    * Desired type for the reply.
186    */
187   enum GNUNET_BLOCK_Type type;
188
189   /**
190    * Did we transmit this request already? YES if we are
191    * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
192    */
193   int was_transmitted;
194 };
195
196
197 /** 
198  * Handle for a stream to another peer.
199  */
200 struct StreamHandle
201 {
202   /**
203    * Head of DLL of pending requests on this stream.
204    */
205   struct GSF_StreamRequest *pending_head;
206
207   /**
208    * Tail of DLL of pending requests on this stream.
209    */
210   struct GSF_StreamRequest *pending_tail;
211
212   /**
213    * Head of DLL of requests waiting for a reply on this stream.
214    */
215   struct GSF_StreamRequest *waiting_head;
216
217   /**
218    * Tail of DLL of requests waiting for a reply on this stream.
219    */
220   struct GSF_StreamRequest *waiting_tail;
221
222   /**
223    * Connection to the other peer.
224    */
225   struct GNUNET_STREAM_Socket *stream;
226
227   /**
228    * Handle for active read operation, or NULL.
229    */ 
230   struct GNUNET_STREAM_IOReadHandle *rh;
231
232   /**
233    * Handle for active write operation, or NULL.
234    */ 
235   struct GNUNET_STREAM_IOWriteHandle *wh;
236
237   /**
238    * Tokenizer for replies.
239    */
240   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
241
242   /**
243    * Which peer does this stream go to?
244    */ 
245   struct GNUNET_PeerIdentity target;
246
247   /**
248    * Task to kill inactive streams (we keep them around for
249    * a few seconds to give the application a chance to give
250    * us another query).
251    */
252   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
253
254   /**
255    * Task to reset streams that had errors (asynchronously,
256    * as we may not be able to do it immediately during a
257    * callback from the stream API).
258    */
259   GNUNET_SCHEDULER_TaskIdentifier reset_task;
260
261   /**
262    * Is this stream ready for transmission?
263    */
264   int is_ready;
265
266 };
267
268
269 /**
270  * Listen socket for incoming requests.
271  */
272 static struct GNUNET_STREAM_ListenSocket *listen_socket;
273
274 /**
275  * Head of DLL of stream clients.
276  */ 
277 static struct StreamClient *sc_head;
278
279 /**
280  * Tail of DLL of stream clients.
281  */ 
282 static struct StreamClient *sc_tail;
283
284 /**
285  * Map from peer identities to 'struct StreamHandles' with streams to
286  * those peers.
287  */
288 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
289
290
291 /* ********************* client-side code ************************* */
292
293
294 /**
295  * Destroy a stream handle.
296  *
297  * @param sh stream to process
298  */
299 static void
300 destroy_stream_handle (struct StreamHandle *sh)
301 {
302   struct GSF_StreamRequest *sr;
303
304   while (NULL != (sr = sh->pending_head))
305   {
306     sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
307               GNUNET_TIME_UNIT_FOREVER_ABS,
308               0, NULL);
309     GSF_stream_query_cancel (sr);
310   }
311   while (NULL != (sr = sh->waiting_head))
312   {
313     sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
314               GNUNET_TIME_UNIT_FOREVER_ABS,
315               0, NULL);
316     GSF_stream_query_cancel (sr);
317   }
318   if (NULL != sh->wh)
319     GNUNET_STREAM_io_write_cancel (sh->wh);
320   if (NULL != sh->rh)
321     GNUNET_STREAM_io_read_cancel (sh->rh);
322   if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
323     GNUNET_SCHEDULER_cancel (sh->timeout_task);
324   GNUNET_STREAM_close (sh->stream);
325   GNUNET_assert (GNUNET_OK ==
326                  GNUNET_CONTAINER_multihashmap_remove (stream_map,
327                                                        &sh->target.hashPubKey,
328                                                        sh));
329   GNUNET_free (sh);
330 }
331
332
333 /**
334  * Transmit pending requests via the stream.
335  *
336  * @param sh stream to process
337  */
338 static void
339 transmit_pending (struct StreamHandle *sh);
340
341
342 /**
343  * Function called once the stream is ready for transmission.
344  *
345  * @param cls the 'struct StreamHandle'
346  * @param socket stream socket handle
347  */
348 static void
349 stream_ready_cb (void *cls,
350                  struct GNUNET_STREAM_Socket *socket)
351 {
352   struct StreamHandle *sh = cls;
353
354   sh->is_ready = GNUNET_YES;
355   transmit_pending (sh);
356 }
357
358
359 /**
360  * We had a serious error, tear down and re-create stream from scratch.
361  *
362  * @param sh stream to reset
363  */
364 static void
365 reset_stream (struct StreamHandle *sh)
366 {
367   struct GSF_StreamRequest *sr;
368
369   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
370               "Resetting stream to %s\n",
371               GNUNET_i2s (&sh->target));
372   if (NULL != sh->rh)
373     GNUNET_STREAM_io_read_cancel (sh->rh);
374   GNUNET_STREAM_close (sh->stream);
375   sh->is_ready = GNUNET_NO;
376   while (NULL != (sr = sh->waiting_tail))
377   {
378     GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
379                                  sh->waiting_tail,
380                                  sr);
381     GNUNET_CONTAINER_DLL_insert (sh->pending_head,
382                                  sh->pending_tail,
383                                  sr);
384     sr->was_transmitted = GNUNET_NO;
385   }
386   sh->stream = GNUNET_STREAM_open (GSF_cfg,
387                                    &sh->target,
388                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
389                                    &stream_ready_cb, sh,
390                                    GNUNET_STREAM_OPTION_END);
391 }
392
393
394 /**
395  * Task called when it is time to destroy an inactive stream.
396  *
397  * @param cls the 'struct StreamHandle' to tear down
398  * @param tc scheduler context, unused
399  */
400 static void
401 stream_timeout (void *cls,
402                 const struct GNUNET_SCHEDULER_TaskContext *tc)
403 {
404   struct StreamHandle *sh = cls;
405
406   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
407               "Timeout on stream to %s\n",
408               GNUNET_i2s (&sh->target));
409   sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
410   destroy_stream_handle (sh);
411 }
412
413
414 /**
415  * Task called when it is time to reset an stream.
416  *
417  * @param cls the 'struct StreamHandle' to tear down
418  * @param tc scheduler context, unused
419  */
420 static void
421 reset_stream_task (void *cls,
422                    const struct GNUNET_SCHEDULER_TaskContext *tc)
423 {
424   struct StreamHandle *sh = cls;
425
426   sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
427   reset_stream (sh);
428 }
429
430
431 /**
432  * We had a serious error, tear down and re-create stream from scratch,
433  * but do so asynchronously.
434  *
435  * @param sh stream to reset
436  */
437 static void
438 reset_stream_async (struct StreamHandle *sh)
439 {
440   if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
441     sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
442                                                sh);
443 }
444
445
446 /**
447  * We got a reply from the stream.  Process it.
448  *
449  * @param cls the struct StreamHandle 
450  * @param status the status of the stream at the time this function is called
451  * @param data traffic from the other side
452  * @param size the number of bytes available in data read; will be 0 on timeout 
453  * @return number of bytes of processed from 'data' (any data remaining should be
454  *         given to the next time the read processor is called).
455  */
456 static size_t
457 handle_stream_reply (void *cls,
458                      enum GNUNET_STREAM_Status status,
459                      const void *data,
460                      size_t size)
461 {
462   struct StreamHandle *sh = cls;
463
464   sh->rh = NULL;
465   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
466               "Received %u bytes from stream to %s\n",
467               (unsigned int) size,
468               GNUNET_i2s (&sh->target));
469   if (GNUNET_SYSERR == 
470       GNUNET_SERVER_mst_receive (sh->mst,
471                                  NULL,
472                                  data, size,
473                                  GNUNET_NO, GNUNET_NO))
474   {
475     GNUNET_break_op (0);
476     reset_stream_async (sh);
477     return size;
478   }
479   sh->rh = GNUNET_STREAM_read (sh->stream,
480                                GNUNET_TIME_UNIT_FOREVER_REL,
481                                &handle_stream_reply,
482                                sh);
483   return size;
484 }
485
486
487 /**
488  * Functions of this signature are called whenever we transmitted a
489  * query via a stream.
490  *
491  * @param cls the struct StreamHandle for which we did the write call
492  * @param status the status of the stream at the time this function is called;
493  *          GNUNET_OK if writing to stream was completed successfully,
494  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
495  *          mean time.
496  * @param size the number of bytes written
497  */
498 static void
499 query_write_continuation (void *cls,
500                           enum GNUNET_STREAM_Status status,
501                           size_t size)
502 {
503   struct StreamHandle *sh = cls;
504
505   sh->wh = NULL;
506   if ( (GNUNET_STREAM_OK != status) ||
507        (sizeof (struct StreamQueryMessage) != size) )
508   {
509     reset_stream (sh);
510     return;
511   }
512   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513               "Successfully transmitted %u bytes via stream to %s\n",
514               (unsigned int) size,
515               GNUNET_i2s (&sh->target));
516   if (NULL == sh->rh)
517     sh->rh = GNUNET_STREAM_read (sh->stream,
518                                  GNUNET_TIME_UNIT_FOREVER_REL,
519                                  &handle_stream_reply,
520                                  sh);
521   transmit_pending (sh);
522 }
523           
524
525 /**
526  * Transmit pending requests via the stream.
527  *
528  * @param sh stream to process
529  */
530 static void
531 transmit_pending (struct StreamHandle *sh)
532 {
533   struct StreamQueryMessage sqm;
534   struct GSF_StreamRequest *sr;
535
536   if (NULL != sh->wh)
537     return;
538   sr = sh->pending_head;
539   if (NULL == sr)
540     return;
541   GNUNET_CONTAINER_DLL_remove (sh->pending_head,
542                                sh->pending_tail,
543                                sr);
544   GNUNET_CONTAINER_DLL_insert_tail (sh->waiting_head,
545                                     sh->waiting_tail,
546                                     sr);
547   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
548               "Sending query via stream to %s\n",
549               GNUNET_i2s (&sh->target));
550   sr->was_transmitted = GNUNET_YES;
551   sqm.header.size = htons (sizeof (sqm));
552   sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
553   sqm.type = htonl (sr->type);
554   sqm.query = sr->query;
555   sh->wh = GNUNET_STREAM_write (sh->stream,
556                                 &sqm, sizeof (sqm),
557                                 GNUNET_TIME_UNIT_FOREVER_REL,
558                                 &query_write_continuation,
559                                 sh);
560 }
561
562
563 /**
564  * Functions with this signature are called whenever a
565  * complete reply is received.
566  *
567  * Do not call GNUNET_SERVER_mst_destroy in callback
568  *
569  * @param cls closure with the 'struct StreamHandle'
570  * @param client identification of the client, NULL
571  * @param message the actual message
572  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
573  */
574 static int
575 reply_cb (void *cls,
576           void *client,
577           const struct GNUNET_MessageHeader *message)
578 {
579   struct StreamHandle *sh = cls;
580   const struct StreamReplyMessage *srm;
581   uint16_t msize;
582   enum GNUNET_BLOCK_Type type;
583   struct GNUNET_HashCode query;
584   struct GSF_StreamRequest *sr;
585
586   msize = ntohs (message->size);
587   switch (ntohs (message->type))
588   {
589   case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
590     if (sizeof (struct StreamReplyMessage) > msize)
591     {
592       GNUNET_break_op (0);
593       reset_stream_async (sh);
594       return GNUNET_SYSERR;
595     }
596     srm = (const struct StreamReplyMessage *) message;
597     msize -= sizeof (struct StreamReplyMessage);
598     type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
599     if (GNUNET_YES !=
600         GNUNET_BLOCK_get_key (GSF_block_ctx,
601                               type,
602                               &srm[1], msize, &query))
603     {
604       GNUNET_break_op (0); 
605       reset_stream_async (sh);
606       return GNUNET_SYSERR;
607     }
608     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
609                 "Received reply `%s' via stream\n",
610                 GNUNET_h2s (&query));
611     GNUNET_STATISTICS_update (GSF_stats,
612                               gettext_noop ("# replies received via stream"), 1,
613                               GNUNET_NO);
614     for (sr = sh->waiting_head; NULL != sr; sr = sr->next)
615       if (0 == memcmp (&query,
616                        &sr->query,
617                        sizeof (struct GNUNET_HashCode)))
618         break;
619     if (NULL == sr)
620     {
621       GNUNET_STATISTICS_update (GSF_stats,
622                                 gettext_noop ("# replies received via stream dropped"), 1,
623                                 GNUNET_NO);
624       return GNUNET_OK;
625     }
626     sr->proc (sr->proc_cls,
627               type,
628               GNUNET_TIME_absolute_ntoh (srm->expiration),
629               msize,
630               &srm[1]);
631     GSF_stream_query_cancel (sr);
632     return GNUNET_OK;
633   default:
634     GNUNET_break_op (0);
635     reset_stream_async (sh);
636     return GNUNET_SYSERR;
637   }
638 }
639
640
641 /**
642  * Get (or create) a stream to talk to the given peer.
643  *
644  * @param target peer we want to communicate with
645  */
646 static struct StreamHandle *
647 get_stream (const struct GNUNET_PeerIdentity *target)
648 {
649   struct StreamHandle *sh;
650
651   sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
652                                           &target->hashPubKey);
653   if (NULL != sh)
654   {
655     if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
656     {
657       GNUNET_SCHEDULER_cancel (sh->timeout_task);
658       sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
659     }
660     return sh;
661   }
662   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
663               "Creating stream to %s\n",
664               GNUNET_i2s (target));
665   sh = GNUNET_malloc (sizeof (struct StreamHandle));
666   sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
667                                       sh);
668   sh->target = *target;
669   sh->stream = GNUNET_STREAM_open (GSF_cfg,
670                                    &sh->target,
671                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
672                                    &stream_ready_cb, sh,
673                                    GNUNET_STREAM_OPTION_END);
674   GNUNET_assert (GNUNET_OK ==
675                  GNUNET_CONTAINER_multihashmap_put (stream_map,
676                                                     &sh->target.hashPubKey,
677                                                     sh,
678                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
679   return sh;
680 }
681
682
683 /**
684  * Look for a block by directly contacting a particular peer.
685  *
686  * @param target peer that should have the block
687  * @param query hash to query for the block
688  * @param type desired type for the block
689  * @param proc function to call with result
690  * @param proc_cls closure for 'proc'
691  * @return handle to cancel the operation
692  */
693 struct GSF_StreamRequest *
694 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
695                   const struct GNUNET_HashCode *query,
696                   enum GNUNET_BLOCK_Type type,
697                   GSF_StreamReplyProcessor proc, void *proc_cls)
698 {
699   struct StreamHandle *sh;
700   struct GSF_StreamRequest *sr;
701
702   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
703               "Preparing to send query for %s via stream to %s\n",
704               GNUNET_h2s (query),
705               GNUNET_i2s (target));
706   sh = get_stream (target);
707   sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
708   sr->sh = sh;
709   sr->proc = proc;
710   sr->proc_cls = proc_cls;
711   sr->type = type;
712   sr->query = *query;
713   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
714                                sh->pending_tail,
715                                sr);
716   if (GNUNET_YES == sh->is_ready)
717     transmit_pending (sh);
718   return sr;
719 }
720
721
722 /**
723  * Cancel an active request; must not be called after 'proc'
724  * was calld.
725  *
726  * @param sr request to cancel
727  */
728 void
729 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
730 {
731   struct StreamHandle *sh = sr->sh;
732
733   if (GNUNET_YES == sr->was_transmitted)
734     GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
735                                  sh->waiting_tail,
736                                  sr);
737   else
738     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
739                                  sh->pending_tail,
740                                  sr);
741   GNUNET_free (sr);
742   if ( (NULL == sh->waiting_head) &&
743        (NULL == sh->pending_head) )
744     sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
745                                                      &stream_timeout,
746                                                      sh);
747 }
748
749
750 /* ********************* server-side code ************************* */
751
752
753 /**
754  * We're done with a particular client, clean up.
755  *
756  * @param sc client to clean up
757  */
758 static void
759 terminate_stream (struct StreamClient *sc)
760 {
761   GNUNET_STATISTICS_update (GSF_stats,
762                             gettext_noop ("# stream connections active"), -1,
763                             GNUNET_NO);
764   if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
765     GNUNET_SCHEDULER_cancel (sc->terminate_task); 
766  if (NULL != sc->rh)
767     GNUNET_STREAM_io_read_cancel (sc->rh);
768   if (NULL != sc->wh)
769     GNUNET_STREAM_io_write_cancel (sc->wh);
770   if (NULL != sc->qe)
771     GNUNET_DATASTORE_cancel (sc->qe);
772   GNUNET_SERVER_mst_destroy (sc->mst);
773   GNUNET_STREAM_close (sc->socket);
774   GNUNET_CONTAINER_DLL_remove (sc_head,
775                                sc_tail,
776                                sc);
777   GNUNET_free (sc);
778 }
779
780
781 /**
782  * Task run to asynchronously terminate the stream.
783  *
784  * @param cls the 'struct StreamClient'
785  * @param tc scheduler context
786  */ 
787 static void
788 terminate_stream_task (void *cls,
789                        const struct GNUNET_SCHEDULER_TaskContext *tc)
790 {
791   struct StreamClient *sc = cls;
792
793   sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
794   terminate_stream (sc);
795 }
796
797
798 /**
799  * We had a serious error, termiante stream,
800  * but do so asynchronously.
801  *
802  * @param sc stream to reset
803  */
804 static void
805 terminate_stream_async (struct StreamClient *sc)
806 {
807   if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
808     sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
809                                                    sc);
810 }
811
812
813 /**
814  * Functions of this signature are called whenever data is available from the
815  * stream.
816  *
817  * @param cls the closure from GNUNET_STREAM_read
818  * @param status the status of the stream at the time this function is called
819  * @param data traffic from the other side
820  * @param size the number of bytes available in data read; will be 0 on timeout 
821  * @return number of bytes of processed from 'data' (any data remaining should be
822  *         given to the next time the read processor is called).
823  */
824 static size_t 
825 process_request (void *cls,
826                  enum GNUNET_STREAM_Status status,
827                  const void *data,
828                  size_t size);
829
830
831 /**
832  * We're done handling a request from a client, read the next one.
833  *
834  * @param sc client to continue reading requests from
835  */
836 static void
837 continue_reading (struct StreamClient *sc)
838 {
839   int ret;
840
841   ret = 
842     GNUNET_SERVER_mst_receive (sc->mst,
843                                NULL,
844                                NULL, 0,
845                                GNUNET_NO, GNUNET_YES);
846   if (GNUNET_NO == ret)
847     return; 
848   sc->rh = GNUNET_STREAM_read (sc->socket,
849                                GNUNET_TIME_UNIT_FOREVER_REL,
850                                &process_request,
851                                sc);      
852 }
853
854
855 /**
856  * Functions of this signature are called whenever data is available from the
857  * stream.
858  *
859  * @param cls the closure from GNUNET_STREAM_read
860  * @param status the status of the stream at the time this function is called
861  * @param data traffic from the other side
862  * @param size the number of bytes available in data read; will be 0 on timeout 
863  * @return number of bytes of processed from 'data' (any data remaining should be
864  *         given to the next time the read processor is called).
865  */
866 static size_t 
867 process_request (void *cls,
868                  enum GNUNET_STREAM_Status status,
869                  const void *data,
870                  size_t size)
871 {
872   struct StreamClient *sc = cls;
873   int ret;
874
875   sc->rh = NULL;
876   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
877               "Received %u byte query via stream\n",
878               (unsigned int) size);
879   switch (status)
880   {
881   case GNUNET_STREAM_OK:
882     ret = 
883       GNUNET_SERVER_mst_receive (sc->mst,
884                                  NULL,
885                                  data, size,
886                                  GNUNET_NO, GNUNET_YES);
887     if (GNUNET_NO == ret)
888       return size; /* more messages in MST */
889     if (GNUNET_SYSERR == ret)
890     {
891       GNUNET_break_op (0);
892       terminate_stream_async (sc);
893       return size;
894     }
895     break;
896   case GNUNET_STREAM_TIMEOUT:
897   case GNUNET_STREAM_SHUTDOWN:
898   case GNUNET_STREAM_SYSERR:
899   case GNUNET_STREAM_BROKEN:
900     terminate_stream_async (sc);
901     return size;
902   default:
903     GNUNET_break (0);
904     return size;
905   }
906   continue_reading (sc);
907   return size;
908 }
909
910
911 /**
912  * Sending a reply was completed, continue processing.
913  *
914  * @param cls closure with the struct StreamClient which sent the query
915  */
916 static void
917 write_continuation (void *cls,
918                     enum GNUNET_STREAM_Status status,
919                     size_t size)
920 {
921   struct StreamClient *sc = cls;
922   
923   sc->wh = NULL;
924   if ( (GNUNET_STREAM_OK == status) &&
925        (size == sc->reply_size) )
926   {
927     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
928                 "Transmitted %u byte reply via stream\n",
929                 (unsigned int) size);
930     GNUNET_STATISTICS_update (GSF_stats,
931                               gettext_noop ("# Blocks transferred via stream"), 1,
932                               GNUNET_NO);
933     continue_reading (sc);
934   }
935   else
936   {
937     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938                 "Transmission of reply failed, terminating stream\n");
939     terminate_stream (sc);    
940   }
941 }
942
943
944 /**
945  * Process a datum that was stored in the datastore.
946  *
947  * @param cls closure with the struct StreamClient which sent the query
948  * @param key key for the content
949  * @param size number of bytes in data
950  * @param data content stored
951  * @param type type of the content
952  * @param priority priority of the content
953  * @param anonymity anonymity-level for the content
954  * @param expiration expiration time for the content
955  * @param uid unique identifier for the datum;
956  *        maybe 0 if no unique identifier is available
957  */
958 static void 
959 handle_datastore_reply (void *cls,
960                         const struct GNUNET_HashCode * key,
961                         size_t size, const void *data,
962                         enum GNUNET_BLOCK_Type type,
963                         uint32_t priority,
964                         uint32_t anonymity,
965                         struct GNUNET_TIME_Absolute
966                         expiration, uint64_t uid)
967 {
968   struct StreamClient *sc = cls;
969   size_t msize = size + sizeof (struct StreamReplyMessage);
970   char buf[msize] GNUNET_ALIGN;
971   struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
972
973   sc->qe = NULL;
974   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
975   {
976     if (GNUNET_OK !=
977         GNUNET_FS_handle_on_demand_block (key,
978                                           size, data, type,
979                                           priority, anonymity,
980                                           expiration, uid,
981                                           &handle_datastore_reply,
982                                           sc))
983     {
984       continue_reading (sc);
985     }
986     return;
987   }
988   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
989   {
990     GNUNET_break (0);
991     continue_reading (sc);
992     return;
993   }
994   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
995               "Starting transmission of %u byte reply via stream\n",
996               (unsigned int) size);
997   srm->header.size = htons ((uint16_t) msize);
998   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
999   srm->type = htonl (type);
1000   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
1001   memcpy (&srm[1], data, size);
1002   sc->reply_size = msize;
1003   sc->wh = GNUNET_STREAM_write (sc->socket,
1004                                 buf, msize,
1005                                 GNUNET_TIME_UNIT_FOREVER_REL,
1006                                 &write_continuation,
1007                                 sc);
1008   if (NULL == sc->wh)
1009   {
1010     terminate_stream (sc);
1011     return;
1012   }
1013 }
1014
1015
1016 /**
1017  * Functions with this signature are called whenever a
1018  * complete query message is received.
1019  *
1020  * Do not call GNUNET_SERVER_mst_destroy in callback
1021  *
1022  * @param cls closure with the 'struct StreamClient'
1023  * @param client identification of the client, NULL
1024  * @param message the actual message
1025  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1026  */
1027 static int
1028 request_cb (void *cls,
1029             void *client,
1030             const struct GNUNET_MessageHeader *message)
1031 {
1032   struct StreamClient *sc = cls;
1033   const struct StreamQueryMessage *sqm;
1034
1035   switch (ntohs (message->type))
1036   {
1037   case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
1038     if (sizeof (struct StreamQueryMessage) != 
1039         ntohs (message->size))
1040     {
1041       GNUNET_break_op (0);
1042       terminate_stream_async (sc);
1043       return GNUNET_SYSERR;
1044     }
1045     sqm = (const struct StreamQueryMessage *) message;
1046     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1047                 "Received query for `%s' via stream\n",
1048                 GNUNET_h2s (&sqm->query));
1049     GNUNET_STATISTICS_update (GSF_stats,
1050                               gettext_noop ("# queries received via stream"), 1,
1051                               GNUNET_NO);
1052     sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1053                                        0,
1054                                        &sqm->query,
1055                                        ntohl (sqm->type),
1056                                        0 /* priority */, 
1057                                        GSF_datastore_queue_size,
1058                                        GNUNET_TIME_UNIT_FOREVER_REL,
1059                                        &handle_datastore_reply, sc);
1060     if (NULL == sc->qe)
1061       continue_reading (sc);
1062     return GNUNET_OK;
1063   default:
1064     GNUNET_break_op (0);
1065     terminate_stream_async (sc);
1066     return GNUNET_SYSERR;
1067   }
1068 }
1069
1070
1071 /**
1072  * Functions of this type are called upon new stream connection from other peers
1073  * or upon binding error which happen when the app_port given in
1074  * GNUNET_STREAM_listen() is already taken.
1075  *
1076  * @param cls the closure from GNUNET_STREAM_listen
1077  * @param socket the socket representing the stream; NULL on binding error
1078  * @param initiator the identity of the peer who wants to establish a stream
1079  *            with us; NULL on binding error
1080  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
1081  *             stream (the socket will be invalid after the call)
1082  */
1083 static int 
1084 accept_cb (void *cls,
1085            struct GNUNET_STREAM_Socket *socket,
1086            const struct GNUNET_PeerIdentity *initiator)
1087 {
1088   struct StreamClient *sc;
1089
1090   if (NULL == socket)
1091     return GNUNET_SYSERR;
1092   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1093               "Accepting inbound stream connection from `%s'\n",
1094               GNUNET_i2s (initiator));
1095   GNUNET_STATISTICS_update (GSF_stats,
1096                             gettext_noop ("# stream connections active"), 1,
1097                             GNUNET_NO);
1098   sc = GNUNET_malloc (sizeof (struct StreamClient));
1099   sc->socket = socket;
1100   sc->mst = GNUNET_SERVER_mst_create (&request_cb,
1101                                       sc);
1102   sc->rh = GNUNET_STREAM_read (sc->socket,
1103                                GNUNET_TIME_UNIT_FOREVER_REL,
1104                                &process_request,
1105                                sc);
1106   GNUNET_CONTAINER_DLL_insert (sc_head,
1107                                sc_tail,
1108                                sc);
1109   return GNUNET_OK;
1110 }
1111
1112
1113 /**
1114  * Initialize subsystem for non-anonymous file-sharing.
1115  */
1116 void
1117 GSF_stream_start ()
1118 {
1119   stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1120   listen_socket = GNUNET_STREAM_listen (GSF_cfg,
1121                                         GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1122                                         &accept_cb, NULL,
1123                                         GNUNET_STREAM_OPTION_END);
1124 }
1125
1126
1127 /**
1128  * Function called on each active streams to shut them down.
1129  *
1130  * @param cls NULL
1131  * @param key target peer, unused
1132  * @param value the 'struct StreamHandle' to destroy
1133  * @return GNUNET_YES (continue to iterate)
1134  */
1135 static int
1136 release_streams (void *cls,
1137                  const struct GNUNET_HashCode *key,
1138                  void *value)
1139 {
1140   struct StreamHandle *sh = value;
1141
1142   destroy_stream_handle (sh);
1143   return GNUNET_YES;
1144 }
1145
1146
1147 /**
1148  * Shutdown subsystem for non-anonymous file-sharing.
1149  */
1150 void
1151 GSF_stream_stop ()
1152 {
1153   struct StreamClient *sc;
1154
1155   while (NULL != (sc = sc_head))
1156     terminate_stream (sc);
1157   if (NULL != listen_socket)
1158   {
1159     GNUNET_STREAM_listen_close (listen_socket);
1160     listen_socket = NULL;
1161   }
1162   GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1163                                          &release_streams,
1164                                          NULL);
1165   GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1166   stream_map = NULL;
1167 }
1168
1169 /* end of gnunet-service-fs_stream.c */