-missing file, more cleanup
[oweals/gnunet.git] / src / fs / gnunet-service-fs_mesh_client.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_mesh_client.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - PORT is set to old application type, unsure if we should keep
28  *   it that way (fine for now)
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_mesh_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_mesh.h"
39
40
41 /**
42  * After how long do we reset connections without replies?
43  */
44 #define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
45
46
47 /** 
48  * Handle for a mesh to another peer.
49  */
50 struct MeshHandle;
51
52
53 /**
54  * Handle for a request that is going out via mesh API.
55  */
56 struct GSF_MeshRequest
57 {
58
59   /**
60    * DLL.
61    */
62   struct GSF_MeshRequest *next;
63
64   /**
65    * DLL.
66    */
67   struct GSF_MeshRequest *prev;
68
69   /**
70    * Which mesh is this request associated with?
71    */
72   struct MeshHandle *mh;
73
74   /**
75    * Function to call with the result.
76    */
77   GSF_MeshReplyProcessor proc;
78
79   /**
80    * Closure for 'proc'
81    */
82   void *proc_cls;
83
84   /**
85    * Query to transmit to the other peer.
86    */
87   struct GNUNET_HashCode query;
88
89   /**
90    * Desired type for the reply.
91    */
92   enum GNUNET_BLOCK_Type type;
93
94   /**
95    * Did we transmit this request already? YES if we are
96    * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
97    */
98   int was_transmitted;
99 };
100
101
102 /** 
103  * Handle for a mesh to another peer.
104  */
105 struct MeshHandle
106 {
107   /**
108    * Head of DLL of pending requests on this mesh.
109    */
110   struct GSF_MeshRequest *pending_head;
111
112   /**
113    * Tail of DLL of pending requests on this mesh.
114    */
115   struct GSF_MeshRequest *pending_tail;
116
117   /**
118    * Map from query to 'struct GSF_MeshRequest's waiting for
119    * a reply.
120    */
121   struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
122
123   /**
124    * Tunnel to the other peer.
125    */
126   struct GNUNET_MESH_Tunnel *tunnel;
127
128   /**
129    * Handle for active write operation, or NULL.
130    */ 
131   struct GNUNET_MESH_TransmitHandle *wh;
132
133   /**
134    * Which peer does this mesh go to?
135    */ 
136   struct GNUNET_PeerIdentity target;
137
138   /**
139    * Task to kill inactive meshs (we keep them around for
140    * a few seconds to give the application a chance to give
141    * us another query).
142    */
143   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
144
145   /**
146    * Task to reset meshs that had errors (asynchronously,
147    * as we may not be able to do it immediately during a
148    * callback from the mesh API).
149    */
150   GNUNET_SCHEDULER_TaskIdentifier reset_task;
151
152 };
153
154
155 /**
156  * Mesh tunnel for creating outbound tunnels.
157  */
158 static struct GNUNET_MESH_Handle *mesh_tunnel;
159
160 /**
161  * Map from peer identities to 'struct MeshHandles' with mesh
162  * tunnels to those peers.
163  */
164 static struct GNUNET_CONTAINER_MultiHashMap *mesh_map;
165
166
167 /* ********************* client-side code ************************* */
168
169
170 /**
171  * Transmit pending requests via the mesh.
172  *
173  * @param mh mesh to process
174  */
175 static void
176 transmit_pending (struct MeshHandle *mh);
177
178
179 /**
180  * Iterator called on each entry in a waiting map to 
181  * move it back to the pending list.
182  *
183  * @param cls the 'struct MeshHandle'
184  * @param key the key of the entry in the map (the query)
185  * @param value the 'struct GSF_MeshRequest' to move to pending
186  * @return GNUNET_YES (continue to iterate)
187  */
188 static int
189 move_to_pending (void *cls,
190                  const struct GNUNET_HashCode *key,
191                  void *value)
192 {
193   struct MeshHandle *mh = cls;
194   struct GSF_MeshRequest *sr = value;
195   
196   GNUNET_assert (GNUNET_YES ==
197                  GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
198                                                        key,
199                                                        value));
200   GNUNET_CONTAINER_DLL_insert (mh->pending_head,
201                                mh->pending_tail,
202                                sr);
203   sr->was_transmitted = GNUNET_NO;
204   return GNUNET_YES;
205 }
206
207
208 /**
209  * We had a serious error, tear down and re-create mesh from scratch.
210  *
211  * @param mh mesh to reset
212  */
213 static void
214 reset_mesh (struct MeshHandle *mh)
215 {
216   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
217               "Resetting mesh tunnel to %s\n",
218               GNUNET_i2s (&mh->target));
219   GNUNET_MESH_tunnel_destroy (mh->tunnel);
220   GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
221                                          &move_to_pending,
222                                          mh);
223   mh->tunnel = GNUNET_MESH_tunnel_create (mesh_tunnel,
224                                         mh,
225                                         &mh->target,
226                                         GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
227                                         GNUNET_YES,
228                                         GNUNET_YES);
229 }
230
231
232 /**
233  * Task called when it is time to destroy an inactive mesh tunnel.
234  *
235  * @param cls the 'struct MeshHandle' to tear down
236  * @param tc scheduler context, unused
237  */
238 static void
239 mesh_timeout (void *cls,
240               const struct GNUNET_SCHEDULER_TaskContext *tc)
241 {
242   struct MeshHandle *mh = cls;
243   struct GNUNET_MESH_Tunnel *tun;
244
245   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
246               "Timeout on mesh tunnel to %s\n",
247               GNUNET_i2s (&mh->target));
248   mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
249   tun = mh->tunnel;
250   mh->tunnel = NULL;
251   GNUNET_MESH_tunnel_destroy (tun);
252 }
253
254
255 /**
256  * Task called when it is time to reset an mesh.
257  *
258  * @param cls the 'struct MeshHandle' to tear down
259  * @param tc scheduler context, unused
260  */
261 static void
262 reset_mesh_task (void *cls,
263                  const struct GNUNET_SCHEDULER_TaskContext *tc)
264 {
265   struct MeshHandle *mh = cls;
266
267   mh->reset_task = GNUNET_SCHEDULER_NO_TASK;
268   reset_mesh (mh);
269 }
270
271
272 /**
273  * We had a serious error, tear down and re-create mesh from scratch,
274  * but do so asynchronously.
275  *
276  * @param mh mesh to reset
277  */
278 static void
279 reset_mesh_async (struct MeshHandle *mh)
280 {
281   if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
282     GNUNET_SCHEDULER_cancel (mh->reset_task);
283   mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_mesh_task,
284                                              mh);
285 }
286
287
288 /**
289  * Functions of this signature are called whenever we are ready to transmit
290  * query via a mesh.
291  *
292  * @param cls the struct MeshHandle for which we did the write call
293  * @param size the number of bytes that can be written to 'buf'
294  * @param buf where to write the message
295  * @return number of bytes written to 'buf'
296  */
297 static size_t
298 transmit_sqm (void *cls,
299               size_t size,
300               void *buf)
301 {
302   struct MeshHandle *mh = cls;
303   struct MeshQueryMessage sqm;
304   struct GSF_MeshRequest *sr;
305
306   mh->wh = NULL;
307   if (NULL == buf)
308   {
309     reset_mesh (mh);
310     return 0;
311   }
312   sr = mh->pending_head;
313   if (NULL == sr)
314     return 0;
315   GNUNET_assert (size >= sizeof (struct MeshQueryMessage));
316   GNUNET_CONTAINER_DLL_remove (mh->pending_head,
317                                mh->pending_tail,
318                                sr);
319   GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
320                                      &sr->query,
321                                      sr,
322                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
323   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
324               "Sending query for %s via mesh to %s\n",
325               GNUNET_h2s (&sr->query),
326               GNUNET_i2s (&mh->target));
327   sr->was_transmitted = GNUNET_YES;
328   sqm.header.size = htons (sizeof (sqm));
329   sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_QUERY);
330   sqm.type = htonl (sr->type);
331   sqm.query = sr->query;
332   memcpy (buf, &sqm, sizeof (sqm));
333   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
334               "Successfully transmitted %u bytes via mesh to %s\n",
335               (unsigned int) size,
336               GNUNET_i2s (&mh->target));
337   transmit_pending (mh);
338   return sizeof (sqm);
339 }
340           
341
342 /**
343  * Transmit pending requests via the mesh.
344  *
345  * @param mh mesh to process
346  */
347 static void
348 transmit_pending (struct MeshHandle *mh)
349 {
350   if (NULL != mh->wh)
351     return;
352   mh->wh = GNUNET_MESH_notify_transmit_ready (mh->tunnel, GNUNET_YES /* allow cork */,
353                                               GNUNET_TIME_UNIT_FOREVER_REL,
354                                               sizeof (struct MeshQueryMessage),
355                                               &transmit_sqm, mh);
356 }
357
358
359 /**
360  * Closure for 'handle_reply'.
361  */
362 struct HandleReplyClosure
363 {
364
365   /**
366    * Reply payload.
367    */ 
368   const void *data;
369
370   /**
371    * Expiration time for the block.
372    */
373   struct GNUNET_TIME_Absolute expiration;
374
375   /**
376    * Number of bytes in 'data'.
377    */
378   size_t data_size;
379
380   /** 
381    * Type of the block.
382    */
383   enum GNUNET_BLOCK_Type type;
384   
385   /**
386    * Did we have a matching query?
387    */
388   int found;
389 };
390
391
392 /**
393  * Iterator called on each entry in a waiting map to 
394  * process a result.
395  *
396  * @param cls the 'struct HandleReplyClosure'
397  * @param key the key of the entry in the map (the query)
398  * @param value the 'struct GSF_MeshRequest' to handle result for
399  * @return GNUNET_YES (continue to iterate)
400  */
401 static int
402 handle_reply (void *cls,
403               const struct GNUNET_HashCode *key,
404               void *value)
405 {
406   struct HandleReplyClosure *hrc = cls;
407   struct GSF_MeshRequest *sr = value;
408   
409   sr->proc (sr->proc_cls,
410             hrc->type,
411             hrc->expiration,
412             hrc->data_size,
413             hrc->data);
414   GSF_mesh_query_cancel (sr);
415   hrc->found = GNUNET_YES;
416   return GNUNET_YES;
417 }
418
419
420 /**
421  * Functions with this signature are called whenever a complete reply
422  * is received.
423  *
424  * @param cls closure with the 'struct MeshHandle'
425  * @param tunnel tunnel handle
426  * @param tunnel_ctx tunnel context
427  * @param message the actual message
428  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
429  */
430 static int
431 reply_cb (void *cls,
432           struct GNUNET_MESH_Tunnel *tunnel,
433           void **tunnel_ctx,
434           const struct GNUNET_MessageHeader *message)
435 {
436   struct MeshHandle *mh = *tunnel_ctx;
437   const struct MeshReplyMessage *srm;
438   struct HandleReplyClosure hrc;
439   uint16_t msize;
440   enum GNUNET_BLOCK_Type type;
441   struct GNUNET_HashCode query;
442
443   msize = ntohs (message->size);
444   if (sizeof (struct MeshReplyMessage) > msize)
445   {
446     GNUNET_break_op (0);
447     reset_mesh_async (mh);
448     return GNUNET_SYSERR;
449   }
450   srm = (const struct MeshReplyMessage *) message;
451   msize -= sizeof (struct MeshReplyMessage);
452   type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
453   if (GNUNET_YES !=
454       GNUNET_BLOCK_get_key (GSF_block_ctx,
455                             type,
456                             &srm[1], msize, &query))
457   {
458     GNUNET_break_op (0); 
459     reset_mesh_async (mh);
460     return GNUNET_SYSERR;
461   }
462   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
463               "Received reply `%s' via mesh from peer %s\n",
464               GNUNET_h2s (&query),
465               GNUNET_i2s (&mh->target));
466   GNUNET_STATISTICS_update (GSF_stats,
467                             gettext_noop ("# replies received via mesh"), 1,
468                             GNUNET_NO);
469   hrc.data = &srm[1];
470   hrc.data_size = msize;
471   hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
472   hrc.type = type;
473   hrc.found = GNUNET_NO;
474   GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
475                                               &query,
476                                               &handle_reply,
477                                               &hrc);
478   if (GNUNET_NO == hrc.found)
479   {
480     GNUNET_STATISTICS_update (GSF_stats,
481                               gettext_noop ("# replies received via mesh dropped"), 1,
482                               GNUNET_NO);
483     return GNUNET_OK;
484   }
485   return GNUNET_OK;
486 }
487
488
489 /**
490  * Get (or create) a mesh to talk to the given peer.
491  *
492  * @param target peer we want to communicate with
493  */
494 static struct MeshHandle *
495 get_mesh (const struct GNUNET_PeerIdentity *target)
496 {
497   struct MeshHandle *mh;
498
499   mh = GNUNET_CONTAINER_multihashmap_get (mesh_map,
500                                           &target->hashPubKey);
501   if (NULL != mh)
502   {
503     if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
504     {
505       GNUNET_SCHEDULER_cancel (mh->timeout_task);
506       mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
507     }
508     return mh;
509   }
510   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
511               "Creating mesh tunnel to %s\n",
512               GNUNET_i2s (target));
513   mh = GNUNET_new (struct MeshHandle);
514   mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
515                                                  &reset_mesh_task,
516                                                  mh);
517   mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
518   mh->target = *target;
519   mh->tunnel = GNUNET_MESH_tunnel_create (mesh_tunnel,
520                                         mh,
521                                         &mh->target,
522                                         GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
523                                         GNUNET_NO,
524                                         GNUNET_YES);
525   GNUNET_assert (GNUNET_OK ==
526                  GNUNET_CONTAINER_multihashmap_put (mesh_map,
527                                                     &mh->target.hashPubKey,
528                                                     mh,
529                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
530   return mh;
531 }
532
533
534 /**
535  * Look for a block by directly contacting a particular peer.
536  *
537  * @param target peer that should have the block
538  * @param query hash to query for the block
539  * @param type desired type for the block
540  * @param proc function to call with result
541  * @param proc_cls closure for 'proc'
542  * @return handle to cancel the operation
543  */
544 struct GSF_MeshRequest *
545 GSF_mesh_query (const struct GNUNET_PeerIdentity *target,
546                 const struct GNUNET_HashCode *query,
547                 enum GNUNET_BLOCK_Type type,
548                 GSF_MeshReplyProcessor proc, void *proc_cls)
549 {
550   struct MeshHandle *mh;
551   struct GSF_MeshRequest *sr;
552
553   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554               "Preparing to send query for %s via mesh to %s\n",
555               GNUNET_h2s (query),
556               GNUNET_i2s (target));
557   mh = get_mesh (target);
558   sr = GNUNET_new (struct GSF_MeshRequest);
559   sr->mh = mh;
560   sr->proc = proc;
561   sr->proc_cls = proc_cls;
562   sr->type = type;
563   sr->query = *query;
564   GNUNET_CONTAINER_DLL_insert (mh->pending_head,
565                                mh->pending_tail,
566                                sr);
567   transmit_pending (mh);
568   return sr;
569 }
570
571
572 /**
573  * Cancel an active request; must not be called after 'proc'
574  * was calld.
575  *
576  * @param sr request to cancel
577  */
578 void
579 GSF_mesh_query_cancel (struct GSF_MeshRequest *sr)
580 {
581   struct MeshHandle *mh = sr->mh;
582
583   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
584               "Cancelled query for %s via mesh to %s\n",
585               GNUNET_h2s (&sr->query),
586               GNUNET_i2s (&sr->mh->target));
587   if (GNUNET_YES == sr->was_transmitted)
588     GNUNET_assert (GNUNET_OK ==
589                    GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
590                                                          &sr->query,
591                                                          sr));
592   else
593     GNUNET_CONTAINER_DLL_remove (mh->pending_head,
594                                  mh->pending_tail,
595                                  sr);
596   GNUNET_free (sr);
597   if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
598        (NULL == mh->pending_head) )
599     mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
600                                                      &mesh_timeout,
601                                                      mh);
602 }
603
604
605 /**
606  * Iterator called on each entry in a waiting map to 
607  * call the 'proc' continuation and release associated
608  * resources.
609  *
610  * @param cls the 'struct MeshHandle'
611  * @param key the key of the entry in the map (the query)
612  * @param value the 'struct GSF_MeshRequest' to clean up
613  * @return GNUNET_YES (continue to iterate)
614  */
615 static int
616 free_waiting_entry (void *cls,
617                     const struct GNUNET_HashCode *key,
618                     void *value)
619 {
620   struct GSF_MeshRequest *sr = value;
621
622   sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
623             GNUNET_TIME_UNIT_FOREVER_ABS,
624             0, NULL);
625   GSF_mesh_query_cancel (sr);
626   return GNUNET_YES;
627 }
628
629
630 /**
631  * Function called by mesh when a client disconnects.
632  * Cleans up our 'struct MeshClient' of that tunnel.
633  *
634  * @param cls NULL
635  * @param tunnel tunnel of the disconnecting client
636  * @param tunnel_ctx our 'struct MeshClient' 
637  */
638 static void
639 cleaner_cb (void *cls,
640             const struct GNUNET_MESH_Tunnel *tunnel,
641             void *tunnel_ctx)
642 {
643   struct MeshHandle *mh = tunnel_ctx;
644   struct GSF_MeshRequest *sr;
645
646   mh->tunnel = NULL;
647   while (NULL != (sr = mh->pending_head))
648   {
649     sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
650               GNUNET_TIME_UNIT_FOREVER_ABS,
651               0, NULL);
652     GSF_mesh_query_cancel (sr);
653   }
654   GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
655                                          &free_waiting_entry,
656                                          mh);
657   if (NULL != mh->wh)
658     GNUNET_MESH_notify_transmit_ready_cancel (mh->wh);
659   if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
660     GNUNET_SCHEDULER_cancel (mh->timeout_task);
661   if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
662     GNUNET_SCHEDULER_cancel (mh->reset_task);
663   GNUNET_assert (GNUNET_OK ==
664                  GNUNET_CONTAINER_multihashmap_remove (mesh_map,
665                                                        &mh->target.hashPubKey,
666                                                        mh));
667   GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
668   GNUNET_free (mh);
669 }
670
671
672 /**
673  * Initialize subsystem for non-anonymous file-sharing.
674  */
675 void
676 GSF_mesh_start_client ()
677 {
678   static const struct GNUNET_MESH_MessageHandler handlers[] = {
679     { &reply_cb, GNUNET_MESSAGE_TYPE_FS_MESH_REPLY, 0 },
680     { NULL, 0, 0 }
681   };
682   static const uint32_t ports[] = {
683     GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
684     0
685   };
686
687   mesh_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
688   mesh_tunnel = GNUNET_MESH_connect (GSF_cfg,
689                                          NULL,
690                                          NULL,
691                                          &cleaner_cb,
692                                          handlers,
693                                          ports);
694 }
695
696
697 /**
698  * Function called on each active meshs to shut them down.
699  *
700  * @param cls NULL
701  * @param key target peer, unused
702  * @param value the 'struct MeshHandle' to destroy
703  * @return GNUNET_YES (continue to iterate)
704  */
705 static int
706 release_meshs (void *cls,
707                const struct GNUNET_HashCode *key,
708                void *value)
709 {
710   struct MeshHandle *mh = value;
711   struct GNUNET_MESH_Tunnel *tun;
712
713   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
714               "Timeout on mesh tunnel to %s\n",
715               GNUNET_i2s (&mh->target));
716   tun = mh->tunnel;
717   mh->tunnel = NULL;
718   if (NULL != tun)
719     GNUNET_MESH_tunnel_destroy (tun);
720   return GNUNET_YES;
721 }
722
723
724 /**
725  * Shutdown subsystem for non-anonymous file-sharing.
726  */
727 void
728 GSF_mesh_stop_client ()
729 {
730   GNUNET_CONTAINER_multihashmap_iterate (mesh_map,
731                                          &release_meshs,
732                                          NULL);
733   GNUNET_CONTAINER_multihashmap_destroy (mesh_map);
734   mesh_map = NULL;
735   if (NULL != mesh_tunnel)
736   {
737     GNUNET_MESH_disconnect (mesh_tunnel);
738     mesh_tunnel = NULL;
739   }
740 }
741
742
743 /* end of gnunet-service-fs_mesh_client.c */