-typo
[oweals/gnunet.git] / src / fs / gnunet-service-fs_mesh.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_mesh.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - update comments on functions (still matches 'mesh')
28  * - MESH2 API doesn't allow flow control for server yet (needed!)
29  * - likely need to register clean up handler with mesh to handle
30  *   client disconnect (likely leaky right now)
31  * - server is optional, currently client code will NPE if we have
32  *   no server, again MESH2 API requirement forcing this for now
33  * - message handlers are symmetric for client/server, should be
34  *   separated (currently clients can get requests and servers can
35  *   handle answers, not good)
36  * - code is entirely untested
37  * - might have overlooked a few possible simplifications
38  * - PORT is set to old application type, unsure if we should keep
39  *   it that way (fine for now)
40  */
41 #include "platform.h"
42 #include "gnunet_constants.h"
43 #include "gnunet_util_lib.h"
44 #include "gnunet_mesh_service.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_applications.h"
47 #include "gnunet-service-fs.h"
48 #include "gnunet-service-fs_indexing.h"
49 #include "gnunet-service-fs_mesh.h"
50
51 /**
52  * After how long do we termiante idle connections?
53  */
54 #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
55
56 /**
57  * After how long do we reset connections without replies?
58  */
59 #define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
60
61
62 /**
63  * A message in the queue to be written to the mesh.
64  */
65 struct WriteQueueItem
66 {
67   /**
68    * Kept in a DLL.
69    */
70   struct WriteQueueItem *next;
71
72   /**
73    * Kept in a DLL.
74    */
75   struct WriteQueueItem *prev;
76
77   /**
78    * Number of bytes of payload, allocated at the end of this struct.
79    */
80   size_t msize;
81 };
82
83
84 /**
85  * Information we keep around for each active meshing client.
86  */
87 struct StreamClient
88 {
89   /**
90    * DLL
91    */ 
92   struct StreamClient *next;
93
94   /**
95    * DLL
96    */ 
97   struct StreamClient *prev;
98
99   /**
100    * Socket for communication.
101    */ 
102   struct GNUNET_MESH_Tunnel *socket;
103
104   /**
105    * Handle for active write operation, or NULL.
106    */ 
107   struct GNUNET_MESH_TransmitHandle *wh;
108
109   /**
110    * Head of write queue.
111    */
112   struct WriteQueueItem *wqi_head;
113
114   /**
115    * Tail of write queue.
116    */
117   struct WriteQueueItem *wqi_tail;
118   
119   /**
120    * Current active request to the datastore, if we have one pending.
121    */
122   struct GNUNET_DATASTORE_QueueEntry *qe;
123
124   /**
125    * Task that is scheduled to asynchronously terminate the connection.
126    */
127   GNUNET_SCHEDULER_TaskIdentifier terminate_task;
128
129   /**
130    * Task that is scheduled to terminate idle connections.
131    */
132   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
133
134   /**
135    * Size of the last write that was initiated.
136    */ 
137   size_t reply_size;
138
139 };
140
141
142 /**
143  * Query from one peer, asking the other for CHK-data.
144  */
145 struct StreamQueryMessage
146 {
147
148   /**
149    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
150    */
151   struct GNUNET_MessageHeader header;
152
153   /**
154    * Block type must be DBLOCK or IBLOCK.
155    */
156   uint32_t type;
157
158   /**
159    * Query hash from CHK (hash of encrypted block).
160    */
161   struct GNUNET_HashCode query;
162
163 };
164
165
166 /**
167  * Reply to a StreamQueryMessage.
168  */
169 struct StreamReplyMessage
170 {
171
172   /**
173    * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
174    */
175   struct GNUNET_MessageHeader header;
176
177   /**
178    * Block type must be DBLOCK or IBLOCK.
179    */
180   uint32_t type;
181
182   /**
183    * Expiration time for the block.
184    */
185   struct GNUNET_TIME_AbsoluteNBO expiration;
186
187   /* followed by the encrypted block */
188
189 };
190
191
192 /** 
193  * Handle for a mesh to another peer.
194  */
195 struct StreamHandle;
196
197
198 /**
199  * Handle for a request that is going out via mesh API.
200  */
201 struct GSF_StreamRequest
202 {
203
204   /**
205    * DLL.
206    */
207   struct GSF_StreamRequest *next;
208
209   /**
210    * DLL.
211    */
212   struct GSF_StreamRequest *prev;
213
214   /**
215    * Which mesh is this request associated with?
216    */
217   struct StreamHandle *sh;
218
219   /**
220    * Function to call with the result.
221    */
222   GSF_StreamReplyProcessor proc;
223
224   /**
225    * Closure for 'proc'
226    */
227   void *proc_cls;
228
229   /**
230    * Query to transmit to the other peer.
231    */
232   struct GNUNET_HashCode query;
233
234   /**
235    * Desired type for the reply.
236    */
237   enum GNUNET_BLOCK_Type type;
238
239   /**
240    * Did we transmit this request already? YES if we are
241    * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
242    */
243   int was_transmitted;
244 };
245
246
247 /** 
248  * Handle for a mesh to another peer.
249  */
250 struct StreamHandle
251 {
252   /**
253    * Head of DLL of pending requests on this mesh.
254    */
255   struct GSF_StreamRequest *pending_head;
256
257   /**
258    * Tail of DLL of pending requests on this mesh.
259    */
260   struct GSF_StreamRequest *pending_tail;
261
262   /**
263    * Map from query to 'struct GSF_StreamRequest's waiting for
264    * a reply.
265    */
266   struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
267
268   /**
269    * Connection to the other peer.
270    */
271   struct GNUNET_MESH_Tunnel *mesh;
272
273   /**
274    * Handle for active write operation, or NULL.
275    */ 
276   struct GNUNET_MESH_TransmitHandle *wh;
277
278   /**
279    * Which peer does this mesh go to?
280    */ 
281   struct GNUNET_PeerIdentity target;
282
283   /**
284    * Task to kill inactive meshs (we keep them around for
285    * a few seconds to give the application a chance to give
286    * us another query).
287    */
288   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
289
290   /**
291    * Task to reset meshs that had errors (asynchronously,
292    * as we may not be able to do it immediately during a
293    * callback from the mesh API).
294    */
295   GNUNET_SCHEDULER_TaskIdentifier reset_task;
296
297   /**
298    * Is this mesh ready for transmission?
299    */
300   int is_ready;
301
302 };
303
304
305 /**
306  * Listen socket for incoming requests.
307  */
308 static struct GNUNET_MESH_Handle *listen_socket;
309
310 /**
311  * Head of DLL of mesh clients.
312  */ 
313 static struct StreamClient *sc_head;
314
315 /**
316  * Tail of DLL of mesh clients.
317  */ 
318 static struct StreamClient *sc_tail;
319
320 /**
321  * Number of active mesh clients in the 'sc_*'-DLL.
322  */
323 static unsigned int sc_count;
324
325 /**
326  * Maximum allowed number of mesh clients.
327  */
328 static unsigned long long sc_count_max;
329
330 /**
331  * Map from peer identities to 'struct StreamHandles' with meshs to
332  * those peers.
333  */
334 static struct GNUNET_CONTAINER_MultiHashMap *mesh_map;
335
336
337 /* ********************* client-side code ************************* */
338
339 /**
340  * Iterator called on each entry in a waiting map to 
341  * call the 'proc' continuation and release associated
342  * resources.
343  *
344  * @param cls the 'struct StreamHandle'
345  * @param key the key of the entry in the map (the query)
346  * @param value the 'struct GSF_StreamRequest' to clean up
347  * @return GNUNET_YES (continue to iterate)
348  */
349 static int
350 free_waiting_entry (void *cls,
351                     const struct GNUNET_HashCode *key,
352                     void *value)
353 {
354   struct GSF_StreamRequest *sr = value;
355
356   sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
357             GNUNET_TIME_UNIT_FOREVER_ABS,
358             0, NULL);
359   GSF_mesh_query_cancel (sr);
360   return GNUNET_YES;
361 }
362
363
364 /**
365  * Destroy a mesh handle.
366  *
367  * @param sh mesh to process
368  */
369 static void
370 destroy_mesh_handle (struct StreamHandle *sh)
371 {
372   struct GSF_StreamRequest *sr;
373
374   while (NULL != (sr = sh->pending_head))
375   {
376     sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
377               GNUNET_TIME_UNIT_FOREVER_ABS,
378               0, NULL);
379     GSF_mesh_query_cancel (sr);
380   }
381   GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
382                                          &free_waiting_entry,
383                                          sh);
384   if (NULL != sh->wh)
385     GNUNET_MESH_notify_transmit_ready_cancel (sh->wh);
386   if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
387     GNUNET_SCHEDULER_cancel (sh->timeout_task);
388   if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
389     GNUNET_SCHEDULER_cancel (sh->reset_task);
390   GNUNET_MESH_tunnel_destroy (sh->mesh);
391   GNUNET_assert (GNUNET_OK ==
392                  GNUNET_CONTAINER_multihashmap_remove (mesh_map,
393                                                        &sh->target.hashPubKey,
394                                                        sh));
395   GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
396   GNUNET_free (sh);
397 }
398
399
400 /**
401  * Transmit pending requests via the mesh.
402  *
403  * @param sh mesh to process
404  */
405 static void
406 transmit_pending (struct StreamHandle *sh);
407
408
409 /**
410  * Iterator called on each entry in a waiting map to 
411  * move it back to the pending list.
412  *
413  * @param cls the 'struct StreamHandle'
414  * @param key the key of the entry in the map (the query)
415  * @param value the 'struct GSF_StreamRequest' to move to pending
416  * @return GNUNET_YES (continue to iterate)
417  */
418 static int
419 move_to_pending (void *cls,
420                  const struct GNUNET_HashCode *key,
421                  void *value)
422 {
423   struct StreamHandle *sh = cls;
424   struct GSF_StreamRequest *sr = value;
425   
426   GNUNET_assert (GNUNET_YES ==
427                  GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
428                                                        key,
429                                                        value));
430   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
431                                sh->pending_tail,
432                                sr);
433   sr->was_transmitted = GNUNET_NO;
434   return GNUNET_YES;
435 }
436
437
438 /**
439  * We had a serious error, tear down and re-create mesh from scratch.
440  *
441  * @param sh mesh to reset
442  */
443 static void
444 reset_mesh (struct StreamHandle *sh)
445 {
446   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
447               "Resetting mesh to %s\n",
448               GNUNET_i2s (&sh->target));
449   GNUNET_MESH_tunnel_destroy (sh->mesh);
450   sh->is_ready = GNUNET_NO;
451   GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
452                                          &move_to_pending,
453                                          sh);
454   sh->mesh = GNUNET_MESH_tunnel_create (listen_socket,
455                                           sh,
456                                           &sh->target,
457                                           GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
458                       GNUNET_YES,
459                       GNUNET_YES);
460 }
461
462
463 /**
464  * Task called when it is time to destroy an inactive mesh.
465  *
466  * @param cls the 'struct StreamHandle' to tear down
467  * @param tc scheduler context, unused
468  */
469 static void
470 mesh_timeout (void *cls,
471                 const struct GNUNET_SCHEDULER_TaskContext *tc)
472 {
473   struct StreamHandle *sh = cls;
474
475   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
476               "Timeout on mesh to %s\n",
477               GNUNET_i2s (&sh->target));
478   sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
479   destroy_mesh_handle (sh);
480 }
481
482
483 /**
484  * Task called when it is time to reset an mesh.
485  *
486  * @param cls the 'struct StreamHandle' to tear down
487  * @param tc scheduler context, unused
488  */
489 static void
490 reset_mesh_task (void *cls,
491                    const struct GNUNET_SCHEDULER_TaskContext *tc)
492 {
493   struct StreamHandle *sh = cls;
494
495   sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
496   reset_mesh (sh);
497 }
498
499
500 /**
501  * We had a serious error, tear down and re-create mesh from scratch,
502  * but do so asynchronously.
503  *
504  * @param sh mesh to reset
505  */
506 static void
507 reset_mesh_async (struct StreamHandle *sh)
508 {
509   if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
510     GNUNET_SCHEDULER_cancel (sh->reset_task);
511   sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_mesh_task,
512                                              sh);
513 }
514
515
516 /**
517  * Functions of this signature are called whenever we are ready to transmit
518  * query via a mesh.
519  *
520  * @param cls the struct StreamHandle for which we did the write call
521  * @param size the number of bytes that can be written to 'buf'
522  * @param buf where to write the message
523  * @return number of bytes written to 'buf'
524  */
525 static size_t
526 transmit_sqm (void *cls,
527               size_t size,
528               void *buf)
529 {
530   struct StreamHandle *sh = cls;
531   struct StreamQueryMessage sqm;
532   struct GSF_StreamRequest *sr;
533
534   sh->wh = NULL;
535   if (NULL == buf)
536   {
537     reset_mesh (sh);
538     return 0;
539   }
540   sr = sh->pending_head;
541   if (NULL == sr)
542     return 0;
543   GNUNET_assert (size >= sizeof (struct StreamQueryMessage));
544   GNUNET_CONTAINER_DLL_remove (sh->pending_head,
545                                sh->pending_tail,
546                                sr);
547   GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
548                                      &sr->query,
549                                      sr,
550                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
551   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
552               "Sending query via mesh to %s\n",
553               GNUNET_i2s (&sh->target));
554   sr->was_transmitted = GNUNET_YES;
555   sqm.header.size = htons (sizeof (sqm));
556   sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
557   sqm.type = htonl (sr->type);
558   sqm.query = sr->query;
559   memcpy (buf, &sqm, sizeof (sqm));
560   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
561               "Successfully transmitted %u bytes via mesh to %s\n",
562               (unsigned int) size,
563               GNUNET_i2s (&sh->target));
564   transmit_pending (sh);
565   return sizeof (sqm);
566 }
567           
568
569 /**
570  * Transmit pending requests via the mesh.
571  *
572  * @param sh mesh to process
573  */
574 static void
575 transmit_pending (struct StreamHandle *sh)
576 {
577   if (NULL != sh->wh)
578     return;
579   sh->wh = GNUNET_MESH_notify_transmit_ready (sh->mesh, GNUNET_YES /* allow cork */,
580                                               GNUNET_TIME_UNIT_FOREVER_REL,
581                                               sizeof (struct StreamQueryMessage),
582                                               &transmit_sqm, sh);
583 }
584
585
586 /**
587  * Closure for 'handle_reply'.
588  */
589 struct HandleReplyClosure
590 {
591
592   /**
593    * Reply payload.
594    */ 
595   const void *data;
596
597   /**
598    * Expiration time for the block.
599    */
600   struct GNUNET_TIME_Absolute expiration;
601
602   /**
603    * Number of bytes in 'data'.
604    */
605   size_t data_size;
606
607   /** 
608    * Type of the block.
609    */
610   enum GNUNET_BLOCK_Type type;
611   
612   /**
613    * Did we have a matching query?
614    */
615   int found;
616 };
617
618
619 /**
620  * Iterator called on each entry in a waiting map to 
621  * process a result.
622  *
623  * @param cls the 'struct HandleReplyClosure'
624  * @param key the key of the entry in the map (the query)
625  * @param value the 'struct GSF_StreamRequest' to handle result for
626  * @return GNUNET_YES (continue to iterate)
627  */
628 static int
629 handle_reply (void *cls,
630               const struct GNUNET_HashCode *key,
631               void *value)
632 {
633   struct HandleReplyClosure *hrc = cls;
634   struct GSF_StreamRequest *sr = value;
635   
636   sr->proc (sr->proc_cls,
637             hrc->type,
638             hrc->expiration,
639             hrc->data_size,
640             hrc->data);
641   GSF_mesh_query_cancel (sr);
642   hrc->found = GNUNET_YES;
643   return GNUNET_YES;
644 }
645
646
647 /**
648  * Functions with this signature are called whenever a
649  * complete reply is received.
650  *
651  * @param cls closure with the 'struct StreamHandle'
652  * @param tunnel tunnel handle
653  * @param tunnel_ctx tunnel context
654  * @param message the actual message
655  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
656  */
657 static int
658 reply_cb (void *cls,
659           struct GNUNET_MESH_Tunnel *tunnel,
660           void **tunnel_ctx,
661           const struct GNUNET_MessageHeader *message)
662 {
663   struct StreamHandle *sh = *tunnel_ctx;
664   const struct StreamReplyMessage *srm;
665   struct HandleReplyClosure hrc;
666   uint16_t msize;
667   enum GNUNET_BLOCK_Type type;
668   struct GNUNET_HashCode query;
669
670   msize = ntohs (message->size);
671   if (sizeof (struct StreamReplyMessage) > msize)
672   {
673     GNUNET_break_op (0);
674     reset_mesh_async (sh);
675     return GNUNET_SYSERR;
676   }
677   srm = (const struct StreamReplyMessage *) message;
678   msize -= sizeof (struct StreamReplyMessage);
679   type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
680   if (GNUNET_YES !=
681       GNUNET_BLOCK_get_key (GSF_block_ctx,
682                             type,
683                             &srm[1], msize, &query))
684   {
685     GNUNET_break_op (0); 
686     reset_mesh_async (sh);
687     return GNUNET_SYSERR;
688   }
689   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
690               "Received reply `%s' via mesh\n",
691               GNUNET_h2s (&query));
692   GNUNET_STATISTICS_update (GSF_stats,
693                             gettext_noop ("# replies received via mesh"), 1,
694                             GNUNET_NO);
695   hrc.data = &srm[1];
696   hrc.data_size = msize;
697   hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
698   hrc.type = type;
699   hrc.found = GNUNET_NO;
700   GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
701                                               &query,
702                                               &handle_reply,
703                                               &hrc);
704   if (GNUNET_NO == hrc.found)
705   {
706     GNUNET_STATISTICS_update (GSF_stats,
707                               gettext_noop ("# replies received via mesh dropped"), 1,
708                               GNUNET_NO);
709     return GNUNET_OK;
710   }
711   return GNUNET_OK;
712 }
713
714
715 /**
716  * Get (or create) a mesh to talk to the given peer.
717  *
718  * @param target peer we want to communicate with
719  */
720 static struct StreamHandle *
721 get_mesh (const struct GNUNET_PeerIdentity *target)
722 {
723   struct StreamHandle *sh;
724
725   sh = GNUNET_CONTAINER_multihashmap_get (mesh_map,
726                                           &target->hashPubKey);
727   if (NULL != sh)
728   {
729     if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
730     {
731       GNUNET_SCHEDULER_cancel (sh->timeout_task);
732       sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
733     }
734     return sh;
735   }
736   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
737               "Creating mesh to %s\n",
738               GNUNET_i2s (target));
739   sh = GNUNET_malloc (sizeof (struct StreamHandle));
740   sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
741                                                  &reset_mesh_task,
742                                                  sh);
743   sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
744   sh->target = *target;
745   sh->mesh = GNUNET_MESH_tunnel_create (listen_socket,
746                                           sh,
747                                           &sh->target,
748                                           GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
749                       GNUNET_YES,
750                       GNUNET_YES);
751   GNUNET_assert (GNUNET_OK ==
752                  GNUNET_CONTAINER_multihashmap_put (mesh_map,
753                                                     &sh->target.hashPubKey,
754                                                     sh,
755                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
756   return sh;
757 }
758
759
760 /**
761  * Look for a block by directly contacting a particular peer.
762  *
763  * @param target peer that should have the block
764  * @param query hash to query for the block
765  * @param type desired type for the block
766  * @param proc function to call with result
767  * @param proc_cls closure for 'proc'
768  * @return handle to cancel the operation
769  */
770 struct GSF_StreamRequest *
771 GSF_mesh_query (const struct GNUNET_PeerIdentity *target,
772                   const struct GNUNET_HashCode *query,
773                   enum GNUNET_BLOCK_Type type,
774                   GSF_StreamReplyProcessor proc, void *proc_cls)
775 {
776   struct StreamHandle *sh;
777   struct GSF_StreamRequest *sr;
778
779   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
780               "Preparing to send query for %s via mesh to %s\n",
781               GNUNET_h2s (query),
782               GNUNET_i2s (target));
783   sh = get_mesh (target);
784   sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
785   sr->sh = sh;
786   sr->proc = proc;
787   sr->proc_cls = proc_cls;
788   sr->type = type;
789   sr->query = *query;
790   GNUNET_CONTAINER_DLL_insert (sh->pending_head,
791                                sh->pending_tail,
792                                sr);
793   if (GNUNET_YES == sh->is_ready)
794     transmit_pending (sh);
795   return sr;
796 }
797
798
799 /**
800  * Cancel an active request; must not be called after 'proc'
801  * was calld.
802  *
803  * @param sr request to cancel
804  */
805 void
806 GSF_mesh_query_cancel (struct GSF_StreamRequest *sr)
807 {
808   struct StreamHandle *sh = sr->sh;
809
810   if (GNUNET_YES == sr->was_transmitted)
811     GNUNET_assert (GNUNET_OK ==
812                    GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
813                                                          &sr->query,
814                                                          sr));
815   else
816     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
817                                  sh->pending_tail,
818                                  sr);
819   GNUNET_free (sr);
820   if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
821        (NULL == sh->pending_head) )
822     sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
823                                                      &mesh_timeout,
824                                                      sh);
825 }
826
827
828 /* ********************* server-side code ************************* */
829
830
831 /**
832  * We're done with a particular client, clean up.
833  *
834  * @param sc client to clean up
835  */
836 static void
837 terminate_mesh (struct StreamClient *sc)
838 {
839   GNUNET_STATISTICS_update (GSF_stats,
840                             gettext_noop ("# mesh connections active"), -1,
841                             GNUNET_NO);
842   if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
843     GNUNET_SCHEDULER_cancel (sc->terminate_task); 
844   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
845     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
846   if (NULL != sc->wh)
847     GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
848   if (NULL != sc->qe)
849     GNUNET_DATASTORE_cancel (sc->qe);
850   GNUNET_MESH_tunnel_destroy (sc->socket);
851   struct WriteQueueItem *wqi;
852   while (NULL != (wqi = sc->wqi_head))
853   {
854     GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
855                                  sc->wqi_tail,
856                                  wqi);
857     GNUNET_free (wqi);
858   }
859   GNUNET_CONTAINER_DLL_remove (sc_head,
860                                sc_tail,
861                                sc);
862   sc_count--;
863   GNUNET_free (sc);
864 }
865
866
867 /**
868  * Task run to asynchronously terminate the mesh due to timeout.
869  *
870  * @param cls the 'struct StreamClient'
871  * @param tc scheduler context
872  */ 
873 static void
874 timeout_mesh_task (void *cls,
875                      const struct GNUNET_SCHEDULER_TaskContext *tc)
876 {
877   struct StreamClient *sc = cls;
878
879   sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
880   terminate_mesh (sc);
881 }
882
883
884 /**
885  * Reset the timeout for the mesh client (due to activity).
886  *
887  * @param sc client handle to reset timeout for
888  */
889 static void
890 refresh_timeout_task (struct StreamClient *sc)
891 {
892   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
893     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
894   sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
895                                                    &timeout_mesh_task,
896                                                    sc);
897 }
898
899
900 /**
901  * We're done handling a request from a client, read the next one.
902  *
903  * @param sc client to continue reading requests from
904  */
905 static void
906 continue_reading (struct StreamClient *sc)
907 {
908   refresh_timeout_task (sc);
909 }
910
911
912 /**
913  * Transmit the next entry from the write queue.
914  *
915  * @param sc where to process the write queue
916  */
917 static void
918 continue_writing (struct StreamClient *sc);
919
920
921 /**
922  * Send a reply now, mesh is ready.
923  *
924  * @param cls closure with the struct StreamClient which sent the query
925  * @param size number of bytes available in 'buf'
926  * @param buf where to write the message
927  * @return number of bytes written to 'buf'
928  */
929 static size_t
930 write_continuation (void *cls,
931                     size_t size,
932                     void *buf)
933 {
934   struct StreamClient *sc = cls;
935   struct WriteQueueItem *wqi;
936   size_t ret;
937
938   sc->wh = NULL;
939   if (NULL == (wqi = sc->wqi_head))
940   {
941     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942                 "Write queue empty, reading more requests\n");
943     return 0;
944   }
945   if (0 == size)
946   {
947     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948                 "Transmission of reply failed, terminating mesh\n");
949     terminate_mesh (sc);    
950     return 0;
951   }
952   GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
953                                sc->wqi_tail,
954                                wqi);
955   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
956               "Transmitted %u byte reply via mesh\n",
957               (unsigned int) size);
958   GNUNET_STATISTICS_update (GSF_stats,
959                             gettext_noop ("# Blocks transferred via mesh"), 1,
960                             GNUNET_NO);
961   memcpy (buf, &wqi[1], ret = wqi->msize);
962   GNUNET_free (wqi);
963   continue_writing (sc);
964   return ret;
965 }
966
967
968 /**
969  * Transmit the next entry from the write queue.
970  *
971  * @param sc where to process the write queue
972  */
973 static void
974 continue_writing (struct StreamClient *sc)
975 {
976   struct WriteQueueItem *wqi;
977
978   if (NULL != sc->wh)
979   {
980     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
981                 "Write pending, waiting for it to complete\n");
982     return; /* write already pending */
983   }
984   if (NULL == (wqi = sc->wqi_head))
985   {
986     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
987                 "Write queue empty, reading more requests\n");
988     continue_reading (sc);
989     return;
990   }
991   sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
992                                               GNUNET_TIME_UNIT_FOREVER_REL,
993                                               wqi->msize,                                     
994                                               &write_continuation,
995                                               sc);
996   if (NULL == sc->wh)
997   {
998     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
999                 "Write failed; terminating mesh\n");
1000     terminate_mesh (sc);
1001     return;
1002   }
1003 }
1004
1005
1006 /**
1007  * Process a datum that was stored in the datastore.
1008  *
1009  * @param cls closure with the struct StreamClient which sent the query
1010  * @param key key for the content
1011  * @param size number of bytes in data
1012  * @param data content stored
1013  * @param type type of the content
1014  * @param priority priority of the content
1015  * @param anonymity anonymity-level for the content
1016  * @param expiration expiration time for the content
1017  * @param uid unique identifier for the datum;
1018  *        maybe 0 if no unique identifier is available
1019  */
1020 static void 
1021 handle_datastore_reply (void *cls,
1022                         const struct GNUNET_HashCode * key,
1023                         size_t size, const void *data,
1024                         enum GNUNET_BLOCK_Type type,
1025                         uint32_t priority,
1026                         uint32_t anonymity,
1027                         struct GNUNET_TIME_Absolute
1028                         expiration, uint64_t uid)
1029 {
1030   struct StreamClient *sc = cls;
1031   size_t msize = size + sizeof (struct StreamReplyMessage);
1032   struct WriteQueueItem *wqi;
1033   struct StreamReplyMessage *srm;
1034
1035   sc->qe = NULL;
1036   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1037   {
1038     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1039                 "Performing on-demand encoding\n");
1040     if (GNUNET_OK !=
1041         GNUNET_FS_handle_on_demand_block (key,
1042                                           size, data, type,
1043                                           priority, anonymity,
1044                                           expiration, uid,
1045                                           &handle_datastore_reply,
1046                                           sc))
1047     {
1048       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049                   "On-demand encoding request failed\n");
1050       continue_writing (sc);
1051     }
1052     return;
1053   }
1054   if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
1055   {
1056     GNUNET_break (0);
1057     continue_writing (sc);
1058     return;
1059   }
1060   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1061               "Starting transmission of %u byte reply for query `%s' via mesh\n",
1062               (unsigned int) size,
1063               GNUNET_h2s (key));
1064   wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
1065   wqi->msize = msize;
1066   srm = (struct StreamReplyMessage *) &wqi[1];
1067   srm->header.size = htons ((uint16_t) msize);
1068   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
1069   srm->type = htonl (type);
1070   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
1071   memcpy (&srm[1], data, size);
1072   sc->reply_size = msize;
1073   GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
1074                                sc->wqi_tail,
1075                                wqi);
1076   continue_writing (sc);
1077 }
1078
1079
1080 /**
1081  * Functions with this signature are called whenever a
1082  * complete query message is received.
1083  *
1084  * Do not call GNUNET_SERVER_mst_destroy in callback
1085  *
1086  * @param cls closure with the 'struct StreamClient'
1087  * @param tunnel tunnel handle
1088  * @param tunnel_ctx tunnel context
1089  * @param message the actual message
1090  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1091  */
1092 static int
1093 request_cb (void *cls,
1094             struct GNUNET_MESH_Tunnel *tunnel,
1095             void **tunnel_ctx,
1096             const struct GNUNET_MessageHeader *message)
1097 {
1098   struct StreamClient *sc = *tunnel_ctx;
1099   const struct StreamQueryMessage *sqm;
1100
1101   sqm = (const struct StreamQueryMessage *) message;
1102   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103               "Received query for `%s' via mesh\n",
1104               GNUNET_h2s (&sqm->query));
1105   GNUNET_STATISTICS_update (GSF_stats,
1106                             gettext_noop ("# queries received via mesh"), 1,
1107                             GNUNET_NO);
1108   refresh_timeout_task (sc);
1109   sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1110                                      0,
1111                                      &sqm->query,
1112                                      ntohl (sqm->type),
1113                                      0 /* priority */, 
1114                                      GSF_datastore_queue_size,
1115                                      GNUNET_TIME_UNIT_FOREVER_REL,
1116                                      &handle_datastore_reply, sc);
1117   if (NULL == sc->qe)
1118   {
1119     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1120                 "Queueing request with datastore failed (queue full?)\n");
1121     continue_writing (sc);
1122   }
1123   return GNUNET_OK;
1124 }
1125
1126
1127 /**
1128  * Functions of this type are called upon new mesh connection from other peers.
1129  *
1130  * @param cls the closure from GNUNET_MESH_connect
1131  * @param socket the socket representing the mesh
1132  * @param initiator the identity of the peer who wants to establish a mesh
1133  *            with us; NULL on binding error
1134  * @param port mesh port used for the incoming connection
1135  * @return initial tunnel context (our 'struct StreamClient')
1136  */
1137 static void *
1138 accept_cb (void *cls,
1139            struct GNUNET_MESH_Tunnel *socket,
1140            const struct GNUNET_PeerIdentity *initiator,
1141            uint32_t port)
1142 {
1143   struct StreamClient *sc;
1144
1145   GNUNET_assert (NULL != socket);
1146   if (sc_count >= sc_count_max)
1147   {
1148     GNUNET_STATISTICS_update (GSF_stats,
1149                               gettext_noop ("# mesh client connections rejected"), 1,
1150                               GNUNET_NO);
1151     GNUNET_MESH_tunnel_destroy (socket);
1152     return NULL;
1153   }
1154   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1155               "Accepting inbound mesh connection from `%s'\n",
1156               GNUNET_i2s (initiator));
1157   GNUNET_STATISTICS_update (GSF_stats,
1158                             gettext_noop ("# mesh connections active"), 1,
1159                             GNUNET_NO);
1160   sc = GNUNET_malloc (sizeof (struct StreamClient));
1161   sc->socket = socket;
1162   GNUNET_CONTAINER_DLL_insert (sc_head,
1163                                sc_tail,
1164                                sc);
1165   sc_count++;
1166   refresh_timeout_task (sc);
1167   return sc;
1168 }
1169
1170
1171 /**
1172  * Initialize subsystem for non-anonymous file-sharing.
1173  */
1174 void
1175 GSF_mesh_start ()
1176 {
1177   static const struct GNUNET_MESH_MessageHandler handlers[] = {
1178     { &request_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY, sizeof (struct StreamQueryMessage)},
1179     { &reply_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY, 0 },
1180     { NULL, 0, 0 }
1181   };
1182   static const uint32_t ports[] = {
1183     GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1184     0
1185   };
1186
1187   mesh_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1188   if (GNUNET_YES ==
1189       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1190                                              "fs",
1191                                              "MAX_STREAM_CLIENTS",
1192                                              &sc_count_max))
1193   {
1194     listen_socket = GNUNET_MESH_connect (GSF_cfg,
1195                                          NULL,
1196                                          &accept_cb,
1197                                          NULL /* FIXME: have a cleanup callback? */,
1198                                          handlers,
1199                                          ports);
1200   } 
1201 }
1202
1203
1204 /**
1205  * Function called on each active meshs to shut them down.
1206  *
1207  * @param cls NULL
1208  * @param key target peer, unused
1209  * @param value the 'struct StreamHandle' to destroy
1210  * @return GNUNET_YES (continue to iterate)
1211  */
1212 static int
1213 release_meshs (void *cls,
1214                  const struct GNUNET_HashCode *key,
1215                  void *value)
1216 {
1217   struct StreamHandle *sh = value;
1218
1219   destroy_mesh_handle (sh);
1220   return GNUNET_YES;
1221 }
1222
1223
1224 /**
1225  * Shutdown subsystem for non-anonymous file-sharing.
1226  */
1227 void
1228 GSF_mesh_stop ()
1229 {
1230   struct StreamClient *sc;
1231
1232   while (NULL != (sc = sc_head))
1233     terminate_mesh (sc);
1234   if (NULL != listen_socket)
1235   {
1236     GNUNET_MESH_disconnect (listen_socket);
1237     listen_socket = NULL;
1238   }
1239   GNUNET_CONTAINER_multihashmap_iterate (mesh_map,
1240                                          &release_meshs,
1241                                          NULL);
1242   GNUNET_CONTAINER_multihashmap_destroy (mesh_map);
1243   mesh_map = NULL;
1244 }
1245
1246 /* end of gnunet-service-fs_mesh.c */