some fixes to the pt/vpn testcase.
[oweals/gnunet.git] / src / fs / gnunet-service-fs_stream.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_stream.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_constants.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_stream_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet-service-fs.h"
33 #include "gnunet-service-fs_indexing.h"
34 #include "gnunet-service-fs_stream.h"
35
36 /**
37  * After how long do we termiante idle connections?
38  */
39 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
40
41 /**
42  * After how long do we reset connections without replies?
43  */
44 #define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
45
46
47 /**
48  * A message in the queue to be written to the stream.
49  */
50 struct WriteQueueItem
51 {
52   /**
53    * Kept in a DLL.
54    */
55   struct WriteQueueItem *next;
56
57   /**
58    * Kept in a DLL.
59    */
60   struct WriteQueueItem *prev;
61
62   /**
63    * Number of bytes of payload, allocated at the end of this struct.
64    */
65   size_t msize;
66 };
67
68
69 /**
70  * Information we keep around for each active streaming client.
71  */
72 struct StreamClient
73 {
74   /**
75    * DLL
76    */ 
77   struct StreamClient *next;
78
79   /**
80    * DLL
81    */ 
82   struct StreamClient *prev;
83
84   /**
85    * Socket for communication.
86    */ 
87   struct GNUNET_STREAM_Socket *socket;
88
89   /**
90    * Handle for active read operation, or NULL.
91    */ 
92   struct GNUNET_STREAM_ReadHandle *rh;
93
94   /**
95    * Handle for active write operation, or NULL.
96    */ 
97   struct GNUNET_STREAM_WriteHandle *wh;
98
99   /**
100    * Head of write queue.
101    */
102   struct WriteQueueItem *wqi_head;
103
104   /**
105    * Tail of write queue.
106    */
107   struct WriteQueueItem *wqi_tail;
108   
109   /**
110    * Tokenizer for requests.
111    */
112   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
113   
114   /**
115    * Current active request to the datastore, if we have one pending.
116    */
117   struct GNUNET_DATASTORE_QueueEntry *qe;
118
119   /**
120    * Task that is scheduled to asynchronously terminate the connection.
121    */
122   GNUNET_SCHEDULER_TaskIdentifier terminate_task;
123
124   /**
125    * Task that is scheduled to terminate idle connections.
126    */
127   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
128
129   /**
130    * Size of the last write that was initiated.
131    */ 
132   size_t reply_size;
133
134 };
135
136
137 /**
138  * Query from one peer, asking the other for CHK-data.
139  */
140 struct StreamQueryMessage
141 {
142
143   /**
144    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
145    */
146   struct GNUNET_MessageHeader header;
147
148   /**
149    * Block type must be DBLOCK or IBLOCK.
150    */
151   uint32_t type;
152
153   /**
154    * Query hash from CHK (hash of encrypted block).
155    */
156   struct GNUNET_HashCode query;
157
158 };
159
160
161 /**
162  * Reply to a StreamQueryMessage.
163  */
164 struct StreamReplyMessage
165 {
166
167   /**
168    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
169    */
170   struct GNUNET_MessageHeader header;
171
172   /**
173    * Block type must be DBLOCK or IBLOCK.
174    */
175   uint32_t type;
176
177   /**
178    * Expiration time for the block.
179    */
180   struct GNUNET_TIME_AbsoluteNBO expiration;
181
182   /* followed by the encrypted block */
183
184 };
185
186
187 /** 
188  * Handle for a stream to another peer.
189  */
190 struct StreamHandle;
191
192
193 /**
194  * Handle for a request that is going out via stream API.
195  */
196 struct GSF_StreamRequest
197 {
198
199   /**
200    * DLL.
201    */
202   struct GSF_StreamRequest *next;
203
204   /**
205    * DLL.
206    */
207   struct GSF_StreamRequest *prev;
208
209   /**
210    * Which stream is this request associated with?
211    */
212   struct StreamHandle *sh;
213
214   /**
215    * Function to call with the result.
216    */
217   GSF_StreamReplyProcessor proc;
218
219   /**
220    * Closure for 'proc'
221    */
222   void *proc_cls;
223
224   /**
225    * Query to transmit to the other peer.
226    */
227   struct GNUNET_HashCode query;
228
229   /**
230    * Desired type for the reply.
231    */
232   enum GNUNET_BLOCK_Type type;
233
234   /**
235    * Did we transmit this request already? YES if we are
236    * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
237    */
238   int was_transmitted;
239 };
240
241
242 /** 
243  * Handle for a stream to another peer.
244  */
245 struct StreamHandle
246 {
247   /**
248    * Head of DLL of pending requests on this stream.
249    */
250   struct GSF_StreamRequest *pending_head;
251
252   /**
253    * Tail of DLL of pending requests on this stream.
254    */
255   struct GSF_StreamRequest *pending_tail;
256
257   /**
258    * Map from query to 'struct GSF_StreamRequest's waiting for
259    * a reply.
260    */
261   struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
262
263   /**
264    * Connection to the other peer.
265    */
266   struct GNUNET_STREAM_Socket *stream;
267
268   /**
269    * Handle for active read operation, or NULL.
270    */ 
271   struct GNUNET_STREAM_ReadHandle *rh;
272
273   /**
274    * Handle for active write operation, or NULL.
275    */ 
276   struct GNUNET_STREAM_WriteHandle *wh;
277
278   /**
279    * Tokenizer for replies.
280    */
281   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
282
283   /**
284    * Which peer does this stream go to?
285    */ 
286   struct GNUNET_PeerIdentity target;
287
288   /**
289    * Task to kill inactive streams (we keep them around for
290    * a few seconds to give the application a chance to give
291    * us another query).
292    */
293   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
294
295   /**
296    * Task to reset streams that had errors (asynchronously,
297    * as we may not be able to do it immediately during a
298    * callback from the stream API).
299    */
300   GNUNET_SCHEDULER_TaskIdentifier reset_task;
301
302   /**
303    * Is this stream ready for transmission?
304    */
305   int is_ready;
306
307 };
308
309
310 /**
311  * Listen socket for incoming requests.
312  */
313 static struct GNUNET_STREAM_ListenSocket *listen_socket;
314
315 /**
316  * Head of DLL of stream clients.
317  */ 
318 static struct StreamClient *sc_head;
319
320 /**
321  * Tail of DLL of stream clients.
322  */ 
323 static struct StreamClient *sc_tail;
324
325 /**
326  * Number of active stream clients in the 'sc_*'-DLL.
327  */
328 static unsigned int sc_count;
329
330 /**
331  * Maximum allowed number of stream clients.
332  */
333 static unsigned long long sc_count_max;
334
335 /**
336  * Map from peer identities to 'struct StreamHandles' with streams to
337  * those peers.
338  */
339 static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
340
341
342 /* ********************* client-side code ************************* */
343
344 /**
345  * Iterator called on each entry in a waiting map to 
346  * call the 'proc' continuation and release associated
347  * resources.
348  *
349  * @param cls the 'struct StreamHandle'
350  * @param key the key of the entry in the map (the query)
351  * @param value the 'struct GSF_StreamRequest' to clean up
352  * @return GNUNET_YES (continue to iterate)
353  */
354 static int
355 free_waiting_entry (void *cls,
356                     const struct GNUNET_HashCode *key,
357                     void *value)
358 {
359   struct GSF_StreamRequest *sr = value;
360
361   sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
362             GNUNET_TIME_UNIT_FOREVER_ABS,
363             0, NULL);
364   GSF_stream_query_cancel (sr);
365   return GNUNET_YES;
366 }
367
368
369 /**
370  * Destroy a stream handle.
371  *
372  * @param sh stream to process
373  */
374 static void
375 destroy_stream_handle (struct StreamHandle *sh)
376 {
377   struct GSF_StreamRequest *sr;
378
379   while (NULL != (sr = sh->pending_head))
380   {
381     sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
382               GNUNET_TIME_UNIT_FOREVER_ABS,
383               0, NULL);
384     GSF_stream_query_cancel (sr);
385   }
386   GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
387                                          &free_waiting_entry,
388                                          sh);
389   if (NULL != sh->wh)
390     GNUNET_STREAM_write_cancel (sh->wh);
391   if (NULL != sh->rh)
392     GNUNET_STREAM_read_cancel (sh->rh);
393   if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
394     GNUNET_SCHEDULER_cancel (sh->timeout_task);
395   if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
396     GNUNET_SCHEDULER_cancel (sh->reset_task);
397   GNUNET_STREAM_close (sh->stream);
398   GNUNET_assert (GNUNET_OK ==
399                  GNUNET_CONTAINER_multihashmap_remove (stream_map,
400                                                        &sh->target.hashPubKey,
401                                                        sh));
402   GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
403   GNUNET_free (sh);
404 }
405
406
407 /**
408  * Transmit pending requests via the stream.
409  *
410  * @param sh stream to process
411  */
412 static void
413 transmit_pending (struct StreamHandle *sh);
414
415
416 /**
417  * Function called once the stream is ready for transmission.
418  *
419  * @param cls the 'struct StreamHandle'
420  * @param socket stream socket handle
421  */
422 static void
423 stream_ready_cb (void *cls,
424                  struct GNUNET_STREAM_Socket *socket)
425 {
426   struct StreamHandle *sh = cls;
427
428   sh->is_ready = GNUNET_YES;
429   transmit_pending (sh);
430 }
431
432
433 /**
434  * Iterator called on each entry in a waiting map to 
435  * move it back to the pending list.
436  *
437  * @param cls the 'struct StreamHandle'
438  * @param key the key of the entry in the map (the query)
439  * @param value the 'struct GSF_StreamRequest' to move to pending
440  * @return GNUNET_YES (continue to iterate)
441  */
442 static int
443 move_to_pending (void *cls,
444                  const struct GNUNET_HashCode *key,
445                  void *value)
446 {
447   struct StreamHandle *sh = cls;
448   struct GSF_StreamRequest *sr = value;
449   
450   GNUNET_assert (GNUNET_YES ==
451                  GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
452                                                        key,
453                                                        value));
454   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
455                                sh->pending_tail,
456                                sr);
457   sr->was_transmitted = GNUNET_NO;
458   return GNUNET_YES;
459 }
460
461
462 /**
463  * We had a serious error, tear down and re-create stream from scratch.
464  *
465  * @param sh stream to reset
466  */
467 static void
468 reset_stream (struct StreamHandle *sh)
469 {
470   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471               "Resetting stream to %s\n",
472               GNUNET_i2s (&sh->target));
473   if (NULL != sh->rh)
474   {
475     GNUNET_STREAM_read_cancel (sh->rh);
476     sh->rh = NULL;
477   }
478   GNUNET_STREAM_close (sh->stream);
479   sh->is_ready = GNUNET_NO;
480   GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
481                                          &move_to_pending,
482                                          sh);
483   sh->stream = GNUNET_STREAM_open (GSF_cfg,
484                                    &sh->target,
485                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
486                                    &stream_ready_cb, sh,
487                                    GNUNET_STREAM_OPTION_END);
488 }
489
490
491 /**
492  * Task called when it is time to destroy an inactive stream.
493  *
494  * @param cls the 'struct StreamHandle' to tear down
495  * @param tc scheduler context, unused
496  */
497 static void
498 stream_timeout (void *cls,
499                 const struct GNUNET_SCHEDULER_TaskContext *tc)
500 {
501   struct StreamHandle *sh = cls;
502
503   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
504               "Timeout on stream to %s\n",
505               GNUNET_i2s (&sh->target));
506   sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
507   destroy_stream_handle (sh);
508 }
509
510
511 /**
512  * Task called when it is time to reset an stream.
513  *
514  * @param cls the 'struct StreamHandle' to tear down
515  * @param tc scheduler context, unused
516  */
517 static void
518 reset_stream_task (void *cls,
519                    const struct GNUNET_SCHEDULER_TaskContext *tc)
520 {
521   struct StreamHandle *sh = cls;
522
523   sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
524   reset_stream (sh);
525 }
526
527
528 /**
529  * We had a serious error, tear down and re-create stream from scratch,
530  * but do so asynchronously.
531  *
532  * @param sh stream to reset
533  */
534 static void
535 reset_stream_async (struct StreamHandle *sh)
536 {
537   if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
538     GNUNET_SCHEDULER_cancel (sh->reset_task);
539   sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
540                                              sh);
541 }
542
543
544 /**
545  * We got a reply from the stream.  Process it.
546  *
547  * @param cls the struct StreamHandle 
548  * @param status the status of the stream at the time this function is called
549  * @param data traffic from the other side
550  * @param size the number of bytes available in data read; will be 0 on timeout 
551  * @return number of bytes of processed from 'data' (any data remaining should be
552  *         given to the next time the read processor is called).
553  */
554 static size_t
555 handle_stream_reply (void *cls,
556                      enum GNUNET_STREAM_Status status,
557                      const void *data,
558                      size_t size)
559 {
560   struct StreamHandle *sh = cls;
561
562   sh->rh = NULL;
563   GNUNET_SCHEDULER_cancel (sh->reset_task);
564   sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
565                                                  &reset_stream_task,
566                                                  sh);
567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
568               "Received %u bytes from stream to %s\n",
569               (unsigned int) size,
570               GNUNET_i2s (&sh->target));
571   if (GNUNET_SYSERR == 
572       GNUNET_SERVER_mst_receive (sh->mst,
573                                  NULL,
574                                  data, size,
575                                  GNUNET_NO, GNUNET_NO))
576   {
577     GNUNET_break_op (0);
578     reset_stream_async (sh);
579     return size;
580   }
581   if (NULL == sh->rh)
582     sh->rh = GNUNET_STREAM_read (sh->stream,
583                                  GNUNET_TIME_UNIT_FOREVER_REL,
584                                  &handle_stream_reply,
585                                  sh);
586   return size;
587 }
588
589
590 /**
591  * Functions of this signature are called whenever we transmitted a
592  * query via a stream.
593  *
594  * @param cls the struct StreamHandle for which we did the write call
595  * @param status the status of the stream at the time this function is called;
596  *          GNUNET_OK if writing to stream was completed successfully,
597  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
598  *          mean time.
599  * @param size the number of bytes written
600  */
601 static void
602 query_write_continuation (void *cls,
603                           enum GNUNET_STREAM_Status status,
604                           size_t size)
605 {
606   struct StreamHandle *sh = cls;
607
608   sh->wh = NULL;
609   if ( (GNUNET_STREAM_OK != status) ||
610        (sizeof (struct StreamQueryMessage) != size) )
611   {
612     reset_stream (sh);
613     return;
614   }
615   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616               "Successfully transmitted %u bytes via stream to %s\n",
617               (unsigned int) size,
618               GNUNET_i2s (&sh->target));
619   if (NULL == sh->rh)
620     sh->rh = GNUNET_STREAM_read (sh->stream,
621                                  GNUNET_TIME_UNIT_FOREVER_REL,
622                                  &handle_stream_reply,
623                                  sh);
624   transmit_pending (sh);
625 }
626           
627
628 /**
629  * Transmit pending requests via the stream.
630  *
631  * @param sh stream to process
632  */
633 static void
634 transmit_pending (struct StreamHandle *sh)
635 {
636   struct StreamQueryMessage sqm;
637   struct GSF_StreamRequest *sr;
638
639   if (NULL != sh->wh)
640     return;
641   sr = sh->pending_head;
642   if (NULL == sr)
643     return;
644   GNUNET_CONTAINER_DLL_remove (sh->pending_head,
645                                sh->pending_tail,
646                                sr);
647   GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
648                                      &sr->query,
649                                      sr,
650                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
651   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
652               "Sending query via stream to %s\n",
653               GNUNET_i2s (&sh->target));
654   sr->was_transmitted = GNUNET_YES;
655   sqm.header.size = htons (sizeof (sqm));
656   sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
657   sqm.type = htonl (sr->type);
658   sqm.query = sr->query;
659   sh->wh = GNUNET_STREAM_write (sh->stream,
660                                 &sqm, sizeof (sqm),
661                                 GNUNET_TIME_UNIT_FOREVER_REL,
662                                 &query_write_continuation,
663                                 sh);
664 }
665
666
667 /**
668  * Closure for 'handle_reply'.
669  */
670 struct HandleReplyClosure
671 {
672
673   /**
674    * Reply payload.
675    */ 
676   const void *data;
677
678   /**
679    * Expiration time for the block.
680    */
681   struct GNUNET_TIME_Absolute expiration;
682
683   /**
684    * Number of bytes in 'data'.
685    */
686   size_t data_size;
687
688   /** 
689    * Type of the block.
690    */
691   enum GNUNET_BLOCK_Type type;
692   
693   /**
694    * Did we have a matching query?
695    */
696   int found;
697 };
698
699
700 /**
701  * Iterator called on each entry in a waiting map to 
702  * process a result.
703  *
704  * @param cls the 'struct HandleReplyClosure'
705  * @param key the key of the entry in the map (the query)
706  * @param value the 'struct GSF_StreamRequest' to handle result for
707  * @return GNUNET_YES (continue to iterate)
708  */
709 static int
710 handle_reply (void *cls,
711               const struct GNUNET_HashCode *key,
712               void *value)
713 {
714   struct HandleReplyClosure *hrc = cls;
715   struct GSF_StreamRequest *sr = value;
716   
717   sr->proc (sr->proc_cls,
718             hrc->type,
719             hrc->expiration,
720             hrc->data_size,
721             hrc->data);
722   GSF_stream_query_cancel (sr);
723   hrc->found = GNUNET_YES;
724   return GNUNET_YES;
725 }
726
727
728 /**
729  * Functions with this signature are called whenever a
730  * complete reply is received.
731  *
732  * Do not call GNUNET_SERVER_mst_destroy in callback
733  *
734  * @param cls closure with the 'struct StreamHandle'
735  * @param client identification of the client, NULL
736  * @param message the actual message
737  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
738  */
739 static int
740 reply_cb (void *cls,
741           void *client,
742           const struct GNUNET_MessageHeader *message)
743 {
744   struct StreamHandle *sh = cls;
745   const struct StreamReplyMessage *srm;
746   struct HandleReplyClosure hrc;
747   uint16_t msize;
748   enum GNUNET_BLOCK_Type type;
749   struct GNUNET_HashCode query;
750
751   msize = ntohs (message->size);
752   switch (ntohs (message->type))
753   {
754   case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
755     if (sizeof (struct StreamReplyMessage) > msize)
756     {
757       GNUNET_break_op (0);
758       reset_stream_async (sh);
759       return GNUNET_SYSERR;
760     }
761     srm = (const struct StreamReplyMessage *) message;
762     msize -= sizeof (struct StreamReplyMessage);
763     type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
764     if (GNUNET_YES !=
765         GNUNET_BLOCK_get_key (GSF_block_ctx,
766                               type,
767                               &srm[1], msize, &query))
768     {
769       GNUNET_break_op (0); 
770       reset_stream_async (sh);
771       return GNUNET_SYSERR;
772     }
773     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
774                 "Received reply `%s' via stream\n",
775                 GNUNET_h2s (&query));
776     GNUNET_STATISTICS_update (GSF_stats,
777                               gettext_noop ("# replies received via stream"), 1,
778                               GNUNET_NO);
779     hrc.data = &srm[1];
780     hrc.data_size = msize;
781     hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
782     hrc.type = type;
783     hrc.found = GNUNET_NO;
784     GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
785                                                 &query,
786                                                 &handle_reply,
787                                                 &hrc);
788     if (GNUNET_NO == hrc.found)
789     {
790       GNUNET_STATISTICS_update (GSF_stats,
791                                 gettext_noop ("# replies received via stream dropped"), 1,
792                                 GNUNET_NO);
793       return GNUNET_OK;
794     }
795     return GNUNET_OK;
796   default:
797     GNUNET_break_op (0);
798     reset_stream_async (sh);
799     return GNUNET_SYSERR;
800   }
801 }
802
803
804 /**
805  * Get (or create) a stream to talk to the given peer.
806  *
807  * @param target peer we want to communicate with
808  */
809 static struct StreamHandle *
810 get_stream (const struct GNUNET_PeerIdentity *target)
811 {
812   struct StreamHandle *sh;
813
814   sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
815                                           &target->hashPubKey);
816   if (NULL != sh)
817   {
818     if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
819     {
820       GNUNET_SCHEDULER_cancel (sh->timeout_task);
821       sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
822     }
823     return sh;
824   }
825   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
826               "Creating stream to %s\n",
827               GNUNET_i2s (target));
828   sh = GNUNET_malloc (sizeof (struct StreamHandle));
829   sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
830                                                  &reset_stream_task,
831                                                  sh);
832   sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
833                                       sh);
834   sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
835   sh->target = *target;
836   sh->stream = GNUNET_STREAM_open (GSF_cfg,
837                                    &sh->target,
838                                    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
839                                    &stream_ready_cb, sh,
840                                    GNUNET_STREAM_OPTION_END);
841   GNUNET_assert (GNUNET_OK ==
842                  GNUNET_CONTAINER_multihashmap_put (stream_map,
843                                                     &sh->target.hashPubKey,
844                                                     sh,
845                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
846   return sh;
847 }
848
849
850 /**
851  * Look for a block by directly contacting a particular peer.
852  *
853  * @param target peer that should have the block
854  * @param query hash to query for the block
855  * @param type desired type for the block
856  * @param proc function to call with result
857  * @param proc_cls closure for 'proc'
858  * @return handle to cancel the operation
859  */
860 struct GSF_StreamRequest *
861 GSF_stream_query (const struct GNUNET_PeerIdentity *target,
862                   const struct GNUNET_HashCode *query,
863                   enum GNUNET_BLOCK_Type type,
864                   GSF_StreamReplyProcessor proc, void *proc_cls)
865 {
866   struct StreamHandle *sh;
867   struct GSF_StreamRequest *sr;
868
869   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
870               "Preparing to send query for %s via stream to %s\n",
871               GNUNET_h2s (query),
872               GNUNET_i2s (target));
873   sh = get_stream (target);
874   sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
875   sr->sh = sh;
876   sr->proc = proc;
877   sr->proc_cls = proc_cls;
878   sr->type = type;
879   sr->query = *query;
880   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
881                                sh->pending_tail,
882                                sr);
883   if (GNUNET_YES == sh->is_ready)
884     transmit_pending (sh);
885   return sr;
886 }
887
888
889 /**
890  * Cancel an active request; must not be called after 'proc'
891  * was calld.
892  *
893  * @param sr request to cancel
894  */
895 void
896 GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
897 {
898   struct StreamHandle *sh = sr->sh;
899
900   if (GNUNET_YES == sr->was_transmitted)
901     GNUNET_assert (GNUNET_OK ==
902                    GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
903                                                          &sr->query,
904                                                          sr));
905   else
906     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
907                                  sh->pending_tail,
908                                  sr);
909   GNUNET_free (sr);
910   if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
911        (NULL == sh->pending_head) )
912     sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
913                                                      &stream_timeout,
914                                                      sh);
915 }
916
917
918 /* ********************* server-side code ************************* */
919
920
921 /**
922  * We're done with a particular client, clean up.
923  *
924  * @param sc client to clean up
925  */
926 static void
927 terminate_stream (struct StreamClient *sc)
928 {
929   GNUNET_STATISTICS_update (GSF_stats,
930                             gettext_noop ("# stream connections active"), -1,
931                             GNUNET_NO);
932   if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
933     GNUNET_SCHEDULER_cancel (sc->terminate_task); 
934   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
935     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
936  if (NULL != sc->rh)
937     GNUNET_STREAM_read_cancel (sc->rh);
938   if (NULL != sc->wh)
939     GNUNET_STREAM_write_cancel (sc->wh);
940   if (NULL != sc->qe)
941     GNUNET_DATASTORE_cancel (sc->qe);
942   GNUNET_SERVER_mst_destroy (sc->mst);
943   GNUNET_STREAM_close (sc->socket);
944   struct WriteQueueItem *wqi;
945   while (NULL != (wqi = sc->wqi_head))
946   {
947     GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
948                                  sc->wqi_tail,
949                                  wqi);
950     GNUNET_free (wqi);
951   }
952
953
954   GNUNET_CONTAINER_DLL_remove (sc_head,
955                                sc_tail,
956                                sc);
957   sc_count--;
958   GNUNET_free (sc);
959 }
960
961
962 /**
963  * Task run to asynchronously terminate the stream.
964  *
965  * @param cls the 'struct StreamClient'
966  * @param tc scheduler context
967  */ 
968 static void
969 terminate_stream_task (void *cls,
970                        const struct GNUNET_SCHEDULER_TaskContext *tc)
971 {
972   struct StreamClient *sc = cls;
973
974   sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
975   terminate_stream (sc);
976 }
977
978
979 /**
980  * Task run to asynchronously terminate the stream due to timeout.
981  *
982  * @param cls the 'struct StreamClient'
983  * @param tc scheduler context
984  */ 
985 static void
986 timeout_stream_task (void *cls,
987                      const struct GNUNET_SCHEDULER_TaskContext *tc)
988 {
989   struct StreamClient *sc = cls;
990
991   sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
992   terminate_stream (sc);
993 }
994
995
996 /**
997  * Reset the timeout for the stream client (due to activity).
998  *
999  * @param sc client handle to reset timeout for
1000  */
1001 static void
1002 refresh_timeout_task (struct StreamClient *sc)
1003 {
1004   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
1005     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
1006   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
1007                                                    &timeout_stream_task,
1008                                                    sc);
1009 }
1010
1011
1012 /**
1013  * We had a serious error, termiante stream,
1014  * but do so asynchronously.
1015  *
1016  * @param sc stream to reset
1017  */
1018 static void
1019 terminate_stream_async (struct StreamClient *sc)
1020 {
1021   if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
1022     sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
1023                                                    sc);
1024 }
1025
1026
1027 /**
1028  * Functions of this signature are called whenever data is available from the
1029  * stream.
1030  *
1031  * @param cls the closure from GNUNET_STREAM_read
1032  * @param status the status of the stream at the time this function is called
1033  * @param data traffic from the other side
1034  * @param size the number of bytes available in data read; will be 0 on timeout 
1035  * @return number of bytes of processed from 'data' (any data remaining should be
1036  *         given to the next time the read processor is called).
1037  */
1038 static size_t 
1039 process_request (void *cls,
1040                  enum GNUNET_STREAM_Status status,
1041                  const void *data,
1042                  size_t size);
1043
1044
1045 /**
1046  * We're done handling a request from a client, read the next one.
1047  *
1048  * @param sc client to continue reading requests from
1049  */
1050 static void
1051 continue_reading (struct StreamClient *sc)
1052 {
1053   int ret;
1054
1055   ret = 
1056     GNUNET_SERVER_mst_receive (sc->mst,
1057                                NULL,
1058                                NULL, 0,
1059                                GNUNET_NO, GNUNET_NO);
1060   if (GNUNET_NO == ret)
1061     return; 
1062   refresh_timeout_task (sc);
1063   if (NULL != sc->rh)
1064     return;
1065   sc->rh = GNUNET_STREAM_read (sc->socket,
1066                                GNUNET_TIME_UNIT_FOREVER_REL,
1067                                &process_request,
1068                                sc);      
1069 }
1070
1071
1072 /**
1073  * Transmit the next entry from the write queue.
1074  *
1075  * @param sc where to process the write queue
1076  */
1077 static void
1078 continue_writing (struct StreamClient *sc);
1079
1080
1081 /**
1082  * Functions of this signature are called whenever data is available from the
1083  * stream.
1084  *
1085  * @param cls the closure from GNUNET_STREAM_read
1086  * @param status the status of the stream at the time this function is called
1087  * @param data traffic from the other side
1088  * @param size the number of bytes available in data read; will be 0 on timeout 
1089  * @return number of bytes of processed from 'data' (any data remaining should be
1090  *         given to the next time the read processor is called).
1091  */
1092 static size_t 
1093 process_request (void *cls,
1094                  enum GNUNET_STREAM_Status status,
1095                  const void *data,
1096                  size_t size)
1097 {
1098   struct StreamClient *sc = cls;
1099   int ret;
1100
1101   sc->rh = NULL;
1102   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103               "Received %u byte query via stream\n",
1104               (unsigned int) size);
1105   switch (status)
1106   {
1107   case GNUNET_STREAM_OK:
1108     ret = 
1109       GNUNET_SERVER_mst_receive (sc->mst,
1110                                  NULL,
1111                                  data, size,
1112                                  GNUNET_NO, GNUNET_NO);
1113     if (GNUNET_NO == ret)
1114       return size; /* more messages in MST */
1115     if (GNUNET_SYSERR == ret)
1116     {
1117       GNUNET_break_op (0);
1118       terminate_stream_async (sc);
1119       return size;
1120     }
1121     break;
1122   case GNUNET_STREAM_TIMEOUT:
1123   case GNUNET_STREAM_SHUTDOWN:
1124   case GNUNET_STREAM_SYSERR:
1125     terminate_stream_async (sc);
1126     return size;
1127   default:
1128     GNUNET_break (0);
1129     return size;
1130   }
1131   continue_writing (sc);
1132   return size;
1133 }
1134
1135
1136 /**
1137  * Sending a reply was completed, continue processing.
1138  *
1139  * @param cls closure with the struct StreamClient which sent the query
1140  * @param status result code for the operation
1141  * @param size number of bytes that were transmitted
1142  */
1143 static void
1144 write_continuation (void *cls,
1145                     enum GNUNET_STREAM_Status status,
1146                     size_t size)
1147 {
1148   struct StreamClient *sc = cls;
1149   
1150   sc->wh = NULL;
1151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152               "Write continuation called on 'server' side with status %d\n",
1153               status);
1154   if ( (GNUNET_STREAM_OK != status) ||
1155        (size != sc->reply_size) )
1156   {
1157     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1158                 "Transmission of reply failed, terminating stream\n");
1159     terminate_stream (sc);    
1160     return;
1161   }
1162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163               "Transmitted %u byte reply via stream\n",
1164               (unsigned int) size);
1165   GNUNET_STATISTICS_update (GSF_stats,
1166                             gettext_noop ("# Blocks transferred via stream"), 1,
1167                             GNUNET_NO);
1168   continue_writing (sc);
1169 }
1170
1171
1172 /**
1173  * Transmit the next entry from the write queue.
1174  *
1175  * @param sc where to process the write queue
1176  */
1177 static void
1178 continue_writing (struct StreamClient *sc)
1179 {
1180   struct WriteQueueItem *wqi;
1181
1182   if (NULL != sc->wh)
1183   {
1184     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185                 "Write pending, waiting for it to complete\n");
1186     return; /* write already pending */
1187   }
1188   if (NULL == (wqi = sc->wqi_head))
1189   {
1190     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1191                 "Write queue empty, reading more requests\n");
1192     continue_reading (sc);
1193     return;
1194   }
1195   GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
1196                                sc->wqi_tail,
1197                                wqi);
1198   sc->wh = GNUNET_STREAM_write (sc->socket,
1199                                 &wqi[1], wqi->msize,
1200                                 GNUNET_TIME_UNIT_FOREVER_REL,
1201                                 &write_continuation,
1202                                 sc);
1203   if (NULL != sc->wh)
1204     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1205                 "Gave %u bytes for stream for transmission\n",
1206                 (unsigned int) wqi->msize);
1207   GNUNET_free (wqi);
1208   if (NULL == sc->wh)
1209   {
1210     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1211                 "Write failed; terminating stream\n");
1212     terminate_stream (sc);
1213     return;
1214   }
1215 }
1216
1217
1218 /**
1219  * Process a datum that was stored in the datastore.
1220  *
1221  * @param cls closure with the struct StreamClient which sent the query
1222  * @param key key for the content
1223  * @param size number of bytes in data
1224  * @param data content stored
1225  * @param type type of the content
1226  * @param priority priority of the content
1227  * @param anonymity anonymity-level for the content
1228  * @param expiration expiration time for the content
1229  * @param uid unique identifier for the datum;
1230  *        maybe 0 if no unique identifier is available
1231  */
1232 static void 
1233 handle_datastore_reply (void *cls,
1234                         const struct GNUNET_HashCode * key,
1235                         size_t size, const void *data,
1236                         enum GNUNET_BLOCK_Type type,
1237                         uint32_t priority,
1238                         uint32_t anonymity,
1239                         struct GNUNET_TIME_Absolute
1240                         expiration, uint64_t uid)
1241 {
1242   struct StreamClient *sc = cls;
1243   size_t msize = size + sizeof (struct StreamReplyMessage);
1244   struct WriteQueueItem *wqi;
1245   struct StreamReplyMessage *srm;
1246
1247   sc->qe = NULL;
1248   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1249   {
1250     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1251                 "Performing on-demand encoding\n");
1252     if (GNUNET_OK !=
1253         GNUNET_FS_handle_on_demand_block (key,
1254                                           size, data, type,
1255                                           priority, anonymity,
1256                                           expiration, uid,
1257                                           &handle_datastore_reply,
1258                                           sc))
1259     {
1260       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1261                   "On-demand encoding request failed\n");
1262       continue_writing (sc);
1263     }
1264     return;
1265   }
1266   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
1267   {
1268     GNUNET_break (0);
1269     continue_writing (sc);
1270     return;
1271   }
1272   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273               "Starting transmission of %u byte reply for query `%s' via stream\n",
1274               (unsigned int) size,
1275               GNUNET_h2s (key));
1276   wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
1277   wqi->msize = msize;
1278   srm = (struct StreamReplyMessage *) &wqi[1];
1279   srm->header.size = htons ((uint16_t) msize);
1280   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
1281   srm->type = htonl (type);
1282   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
1283   memcpy (&srm[1], data, size);
1284   sc->reply_size = msize;
1285   GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
1286                                sc->wqi_tail,
1287                                wqi);
1288   continue_writing (sc);
1289 }
1290
1291
1292 /**
1293  * Functions with this signature are called whenever a
1294  * complete query message is received.
1295  *
1296  * Do not call GNUNET_SERVER_mst_destroy in callback
1297  *
1298  * @param cls closure with the 'struct StreamClient'
1299  * @param client identification of the client, NULL
1300  * @param message the actual message
1301  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1302  */
1303 static int
1304 request_cb (void *cls,
1305             void *client,
1306             const struct GNUNET_MessageHeader *message)
1307 {
1308   struct StreamClient *sc = cls;
1309   const struct StreamQueryMessage *sqm;
1310
1311   switch (ntohs (message->type))
1312   {
1313   case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
1314     if (sizeof (struct StreamQueryMessage) != 
1315         ntohs (message->size))
1316     {
1317       GNUNET_break_op (0);
1318       terminate_stream_async (sc);
1319       return GNUNET_SYSERR;
1320     }
1321     sqm = (const struct StreamQueryMessage *) message;
1322     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1323                 "Received query for `%s' via stream\n",
1324                 GNUNET_h2s (&sqm->query));
1325     GNUNET_STATISTICS_update (GSF_stats,
1326                               gettext_noop ("# queries received via stream"), 1,
1327                               GNUNET_NO);
1328     refresh_timeout_task (sc);
1329     sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1330                                        0,
1331                                        &sqm->query,
1332                                        ntohl (sqm->type),
1333                                        0 /* priority */, 
1334                                        GSF_datastore_queue_size,
1335                                        GNUNET_TIME_UNIT_FOREVER_REL,
1336                                        &handle_datastore_reply, sc);
1337     if (NULL == sc->qe)
1338     {
1339       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1340                   "Queueing request with datastore failed (queue full?)\n");
1341       continue_writing (sc);
1342     }
1343     return GNUNET_OK;
1344   default:
1345     GNUNET_break_op (0);
1346     terminate_stream_async (sc);
1347     return GNUNET_SYSERR;
1348   }
1349 }
1350
1351
1352 /**
1353  * Functions of this type are called upon new stream connection from other peers
1354  * or upon binding error which happen when the app_port given in
1355  * GNUNET_STREAM_listen() is already taken.
1356  *
1357  * @param cls the closure from GNUNET_STREAM_listen
1358  * @param socket the socket representing the stream; NULL on binding error
1359  * @param initiator the identity of the peer who wants to establish a stream
1360  *            with us; NULL on binding error
1361  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
1362  *             stream (the socket will be invalid after the call)
1363  */
1364 static int 
1365 accept_cb (void *cls,
1366            struct GNUNET_STREAM_Socket *socket,
1367            const struct GNUNET_PeerIdentity *initiator)
1368 {
1369   struct StreamClient *sc;
1370
1371   if (NULL == socket)
1372     return GNUNET_SYSERR;
1373   if (sc_count >= sc_count_max)
1374   {
1375     GNUNET_STATISTICS_update (GSF_stats,
1376                               gettext_noop ("# stream client connections rejected"), 1,
1377                               GNUNET_NO);
1378     return GNUNET_SYSERR;
1379   }
1380   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1381               "Accepting inbound stream connection from `%s'\n",
1382               GNUNET_i2s (initiator));
1383   GNUNET_STATISTICS_update (GSF_stats,
1384                             gettext_noop ("# stream connections active"), 1,
1385                             GNUNET_NO);
1386   sc = GNUNET_malloc (sizeof (struct StreamClient));
1387   sc->socket = socket;
1388   sc->mst = GNUNET_SERVER_mst_create (&request_cb,
1389                                       sc);
1390   sc->rh = GNUNET_STREAM_read (sc->socket,
1391                                GNUNET_TIME_UNIT_FOREVER_REL,
1392                                &process_request,
1393                                sc);
1394   GNUNET_CONTAINER_DLL_insert (sc_head,
1395                                sc_tail,
1396                                sc);
1397   sc_count++;
1398   refresh_timeout_task (sc);
1399   return GNUNET_OK;
1400 }
1401
1402
1403 /**
1404  * Initialize subsystem for non-anonymous file-sharing.
1405  */
1406 void
1407 GSF_stream_start ()
1408 {
1409   stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1410   if (GNUNET_YES ==
1411       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1412                                              "fs",
1413                                              "MAX_STREAM_CLIENTS",
1414                                              &sc_count_max))
1415   {
1416     listen_socket = GNUNET_STREAM_listen (GSF_cfg,
1417                                           GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1418                                           &accept_cb, NULL,
1419                                           GNUNET_STREAM_OPTION_END);
1420   } 
1421 }
1422
1423
1424 /**
1425  * Function called on each active streams to shut them down.
1426  *
1427  * @param cls NULL
1428  * @param key target peer, unused
1429  * @param value the 'struct StreamHandle' to destroy
1430  * @return GNUNET_YES (continue to iterate)
1431  */
1432 static int
1433 release_streams (void *cls,
1434                  const struct GNUNET_HashCode *key,
1435                  void *value)
1436 {
1437   struct StreamHandle *sh = value;
1438
1439   destroy_stream_handle (sh);
1440   return GNUNET_YES;
1441 }
1442
1443
1444 /**
1445  * Shutdown subsystem for non-anonymous file-sharing.
1446  */
1447 void
1448 GSF_stream_stop ()
1449 {
1450   struct StreamClient *sc;
1451
1452   while (NULL != (sc = sc_head))
1453     terminate_stream (sc);
1454   if (NULL != listen_socket)
1455   {
1456     GNUNET_STREAM_listen_close (listen_socket);
1457     listen_socket = NULL;
1458   }
1459   GNUNET_CONTAINER_multihashmap_iterate (stream_map,
1460                                          &release_streams,
1461                                          NULL);
1462   GNUNET_CONTAINER_multihashmap_destroy (stream_map);
1463   stream_map = NULL;
1464 }
1465
1466 /* end of gnunet-service-fs_stream.c */