3cdac35638b3134e96aff603bff148dbc3c52dc1
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cadet_client.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file fs/gnunet-service-fs_cadet_client.c
21  * @brief non-anonymous file-transfer
22  * @author Christian Grothoff
23  *
24  * TODO:
25  * - PORT is set to old application type, unsure if we should keep
26  *   it that way (fine for now)
27  */
28 #include "platform.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_applications.h"
34 #include "gnunet-service-fs.h"
35 #include "gnunet-service-fs_indexing.h"
36 #include "gnunet-service-fs_cadet.h"
37
38
39 /**
40  * After how long do we reset connections without replies?
41  */
42 #define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
43
44
45 /**
46  * Handle for a cadet to another peer.
47  */
48 struct CadetHandle;
49
50
51 /**
52  * Handle for a request that is going out via cadet API.
53  */
54 struct GSF_CadetRequest
55 {
56
57   /**
58    * DLL.
59    */
60   struct GSF_CadetRequest *next;
61
62   /**
63    * DLL.
64    */
65   struct GSF_CadetRequest *prev;
66
67   /**
68    * Which cadet is this request associated with?
69    */
70   struct CadetHandle *mh;
71
72   /**
73    * Function to call with the result.
74    */
75   GSF_CadetReplyProcessor proc;
76
77   /**
78    * Closure for @e proc
79    */
80   void *proc_cls;
81
82   /**
83    * Query to transmit to the other peer.
84    */
85   struct GNUNET_HashCode query;
86
87   /**
88    * Desired type for the reply.
89    */
90   enum GNUNET_BLOCK_Type type;
91
92   /**
93    * Did we transmit this request already? #GNUNET_YES if we are
94    * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL.
95    */
96   int was_transmitted;
97 };
98
99
100 /**
101  * Handle for a cadet to another peer.
102  */
103 struct CadetHandle
104 {
105   /**
106    * Head of DLL of pending requests on this cadet.
107    */
108   struct GSF_CadetRequest *pending_head;
109
110   /**
111    * Tail of DLL of pending requests on this cadet.
112    */
113   struct GSF_CadetRequest *pending_tail;
114
115   /**
116    * Map from query to `struct GSF_CadetRequest`s waiting for
117    * a reply.
118    */
119   struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
120
121   /**
122    * Channel to the other peer.
123    */
124   struct GNUNET_CADET_Channel *channel;
125
126   /**
127    * Which peer does this cadet go to?
128    */
129   struct GNUNET_PeerIdentity target;
130
131   /**
132    * Task to kill inactive cadets (we keep them around for
133    * a few seconds to give the application a chance to give
134    * us another query).
135    */
136   struct GNUNET_SCHEDULER_Task *timeout_task;
137
138   /**
139    * Task to reset cadets that had errors (asynchronously,
140    * as we may not be able to do it immediately during a
141    * callback from the cadet API).
142    */
143   struct GNUNET_SCHEDULER_Task *reset_task;
144
145 };
146
147
148 /**
149  * Cadet channel for creating outbound channels.
150  */
151 struct GNUNET_CADET_Handle *cadet_handle;
152
153 /**
154  * Map from peer identities to 'struct CadetHandles' with cadet
155  * channels to those peers.
156  */
157 struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
158
159
160 /* ********************* client-side code ************************* */
161
162
163 /**
164  * Transmit pending requests via the cadet.
165  *
166  * @param cls `struct CadetHandle` to process
167  */
168 static void
169 transmit_pending (void *cls);
170
171
172 /**
173  * Iterator called on each entry in a waiting map to
174  * move it back to the pending list.
175  *
176  * @param cls the `struct CadetHandle`
177  * @param key the key of the entry in the map (the query)
178  * @param value the `struct GSF_CadetRequest` to move to pending
179  * @return #GNUNET_YES (continue to iterate)
180  */
181 static int
182 move_to_pending (void *cls,
183                  const struct GNUNET_HashCode *key,
184                  void *value)
185 {
186   struct CadetHandle *mh = cls;
187   struct GSF_CadetRequest *sr = value;
188
189   GNUNET_assert (GNUNET_YES ==
190                  GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
191                                                        key,
192                                                        value));
193   GNUNET_CONTAINER_DLL_insert (mh->pending_head,
194                                mh->pending_tail,
195                                sr);
196   sr->was_transmitted = GNUNET_NO;
197   return GNUNET_YES;
198 }
199
200
201 /**
202  * Functions with this signature are called whenever a complete reply
203  * is received.
204  *
205  * @param cls closure with the `struct CadetHandle`
206  * @param srm the actual message
207  * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
208  */
209 static int
210 check_reply (void *cls,
211              const struct CadetReplyMessage *srm)
212 {
213   /* We check later... */
214   return GNUNET_OK;
215 }
216
217
218 /**
219  * Task called when it is time to reset an cadet.
220  *
221  * @param cls the `struct CadetHandle` to tear down
222  */
223 static void
224 reset_cadet_task (void *cls);
225
226
227 /**
228  * We had a serious error, tear down and re-create cadet from scratch,
229  * but do so asynchronously.
230  *
231  * @param mh cadet to reset
232  */
233 static void
234 reset_cadet_async (struct CadetHandle *mh)
235 {
236   if (NULL != mh->reset_task)
237     GNUNET_SCHEDULER_cancel (mh->reset_task);
238   mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task,
239                                              mh);
240 }
241
242
243 /**
244  * Closure for handle_reply().
245  */
246 struct HandleReplyClosure
247 {
248
249   /**
250    * Reply payload.
251    */
252   const void *data;
253
254   /**
255    * Expiration time for the block.
256    */
257   struct GNUNET_TIME_Absolute expiration;
258
259   /**
260    * Number of bytes in @e data.
261    */
262   size_t data_size;
263
264   /**
265    * Type of the block.
266    */
267   enum GNUNET_BLOCK_Type type;
268
269   /**
270    * Did we have a matching query?
271    */
272   int found;
273 };
274
275
276 /**
277  * Iterator called on each entry in a waiting map to
278  * process a result.
279  *
280  * @param cls the `struct HandleReplyClosure`
281  * @param key the key of the entry in the map (the query)
282  * @param value the `struct GSF_CadetRequest` to handle result for
283  * @return #GNUNET_YES (continue to iterate)
284  */
285 static int
286 process_reply (void *cls,
287                const struct GNUNET_HashCode *key,
288                void *value)
289 {
290   struct HandleReplyClosure *hrc = cls;
291   struct GSF_CadetRequest *sr = value;
292
293   sr->proc (sr->proc_cls,
294             hrc->type,
295             hrc->expiration,
296             hrc->data_size,
297             hrc->data);
298   sr->proc = NULL;
299   GSF_cadet_query_cancel (sr);
300   hrc->found = GNUNET_YES;
301   return GNUNET_YES;
302 }
303
304
305 /**
306  * Iterator called on each entry in a waiting map to
307  * call the 'proc' continuation and release associated
308  * resources.
309  *
310  * @param cls the `struct CadetHandle`
311  * @param key the key of the entry in the map (the query)
312  * @param value the `struct GSF_CadetRequest` to clean up
313  * @return #GNUNET_YES (continue to iterate)
314  */
315 static int
316 free_waiting_entry (void *cls,
317                     const struct GNUNET_HashCode *key,
318                     void *value)
319 {
320   struct GSF_CadetRequest *sr = value;
321
322   GSF_cadet_query_cancel (sr);
323   return GNUNET_YES;
324 }
325
326
327 /**
328  * Functions with this signature are called whenever a complete reply
329  * is received.
330  *
331  * @param cls closure with the `struct CadetHandle`
332  * @param srm the actual message
333  */
334 static void
335 handle_reply (void *cls,
336               const struct CadetReplyMessage *srm)
337 {
338   struct CadetHandle *mh = cls;
339   struct HandleReplyClosure hrc;
340   uint16_t msize;
341   enum GNUNET_BLOCK_Type type;
342   struct GNUNET_HashCode query;
343
344   msize = ntohs (srm->header.size) - sizeof (struct CadetReplyMessage);
345   type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
346   if (GNUNET_YES !=
347       GNUNET_BLOCK_get_key (GSF_block_ctx,
348                             type,
349                             &srm[1],
350                             msize,
351                             &query))
352   {
353     GNUNET_break_op (0);
354     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
355                 "Received bogus reply of type %u with %u bytes via cadet from peer %s\n",
356                 type,
357                 msize,
358                 GNUNET_i2s (&mh->target));
359     reset_cadet_async (mh);
360     return;
361   }
362   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
363               "Received reply `%s' via cadet from peer %s\n",
364               GNUNET_h2s (&query),
365               GNUNET_i2s (&mh->target));
366   GNUNET_CADET_receive_done (mh->channel);
367   GNUNET_STATISTICS_update (GSF_stats,
368                             gettext_noop ("# replies received via cadet"), 1,
369                             GNUNET_NO);
370   hrc.data = &srm[1];
371   hrc.data_size = msize;
372   hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
373   hrc.type = type;
374   hrc.found = GNUNET_NO;
375   GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
376                                               &query,
377                                               &process_reply,
378                                               &hrc);
379   if (GNUNET_NO == hrc.found)
380   {
381     GNUNET_STATISTICS_update (GSF_stats,
382                               gettext_noop ("# replies received via cadet dropped"), 1,
383                               GNUNET_NO);
384   }
385 }
386
387
388 /**
389  * Function called by cadet when a client disconnects.
390  * Cleans up our `struct CadetClient` of that channel.
391  *
392  * @param cls our `struct CadetClient`
393  * @param channel channel of the disconnecting client
394  */
395 static void
396 disconnect_cb (void *cls,
397                const struct GNUNET_CADET_Channel *channel)
398 {
399   struct CadetHandle *mh = cls;
400   struct GSF_CadetRequest *sr;
401
402   if (NULL == mh->channel)
403     return; /* being destroyed elsewhere */
404   GNUNET_assert (channel == mh->channel);
405   mh->channel = NULL;
406   while (NULL != (sr = mh->pending_head))
407     GSF_cadet_query_cancel (sr);
408   /* first remove `mh` from the `cadet_map`, so that if the
409      callback from `free_waiting_entry()` happens to re-issue
410      the request, we don't immediately have it back in the
411      `waiting_map`. */
412   GNUNET_assert (GNUNET_OK ==
413                  GNUNET_CONTAINER_multipeermap_remove (cadet_map,
414                                                        &mh->target,
415                                                        mh));
416   GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
417                                          &free_waiting_entry,
418                                          mh);
419   if (NULL != mh->timeout_task)
420     GNUNET_SCHEDULER_cancel (mh->timeout_task);
421   if (NULL != mh->reset_task)
422     GNUNET_SCHEDULER_cancel (mh->reset_task);
423   GNUNET_assert (0 ==
424                  GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
425   GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
426   GNUNET_free (mh);
427 }
428
429
430 /**
431  * Function called whenever an MQ-channel's transmission window size changes.
432  *
433  * The first callback in an outgoing channel will be with a non-zero value
434  * and will mean the channel is connected to the destination.
435  *
436  * For an incoming channel it will be called immediately after the
437  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
438  *
439  * @param cls Channel closure.
440  * @param channel Connection to the other end (henceforth invalid).
441  * @param window_size New window size. If the is more messages than buffer size
442  *                    this value will be negative..
443  */
444 static void
445 window_change_cb (void *cls,
446                   const struct GNUNET_CADET_Channel *channel,
447                   int window_size)
448 {
449   /* FIXME: for flow control, implement? */
450 #if 0
451   /* Something like this instead of the GNUNET_MQ_notify_sent() in
452      transmit_pending() might be good (once the window change CB works...) */
453   if (0 < window_size) /* test needed? */
454     transmit_pending (mh);
455 #endif
456 }
457
458
459 /**
460  * We had a serious error, tear down and re-create cadet from scratch.
461  *
462  * @param mh cadet to reset
463  */
464 static void
465 reset_cadet (struct CadetHandle *mh)
466 {
467   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
468               "Resetting cadet channel to %s\n",
469               GNUNET_i2s (&mh->target));
470   GNUNET_CADET_channel_destroy (mh->channel);
471   mh->channel = NULL;
472   GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
473                                          &move_to_pending,
474                                          mh);
475   {
476     struct GNUNET_MQ_MessageHandler handlers[] = {
477       GNUNET_MQ_hd_var_size (reply,
478                              GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
479                              struct CadetReplyMessage,
480                              mh),
481       GNUNET_MQ_handler_end ()
482     };
483     struct GNUNET_HashCode port;
484
485     GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
486                         strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
487                         &port);
488     mh->channel = GNUNET_CADET_channel_create (cadet_handle,
489                                                mh,
490                                                &mh->target,
491                                                &port,
492                                                GNUNET_CADET_OPTION_RELIABLE,
493                                                &window_change_cb,
494                                                &disconnect_cb,
495                                                handlers);
496   }
497   transmit_pending (mh);
498 }
499
500
501 /**
502  * Task called when it is time to destroy an inactive cadet channel.
503  *
504  * @param cls the `struct CadetHandle` to tear down
505  */
506 static void
507 cadet_timeout (void *cls)
508 {
509   struct CadetHandle *mh = cls;
510   struct GNUNET_CADET_Channel *tun;
511
512   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513               "Timeout on cadet channel to %s\n",
514               GNUNET_i2s (&mh->target));
515   mh->timeout_task = NULL;
516   tun = mh->channel;
517   mh->channel = NULL;
518   if (NULL != tun)
519     GNUNET_CADET_channel_destroy (tun);
520 }
521
522
523 /**
524  * Task called when it is time to reset an cadet.
525  *
526  * @param cls the `struct CadetHandle` to tear down
527  */
528 static void
529 reset_cadet_task (void *cls)
530 {
531   struct CadetHandle *mh = cls;
532
533   mh->reset_task = NULL;
534   reset_cadet (mh);
535 }
536
537
538 /**
539  * Transmit pending requests via the cadet.
540  *
541  * @param cls `struct CadetHandle` to process
542  */
543 static void
544 transmit_pending (void *cls)
545 {
546   struct CadetHandle *mh = cls;
547   struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel);
548   struct GSF_CadetRequest *sr;
549   struct GNUNET_MQ_Envelope *env;
550   struct CadetQueryMessage *sqm;
551
552   if ( (0 != GNUNET_MQ_get_length (mq)) ||
553        (NULL == (sr = mh->pending_head)) )
554     return;
555   GNUNET_CONTAINER_DLL_remove (mh->pending_head,
556                                mh->pending_tail,
557                                sr);
558   GNUNET_assert (GNUNET_OK ==
559                  GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
560                                                     &sr->query,
561                                                     sr,
562                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
563   sr->was_transmitted = GNUNET_YES;
564   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565               "Sending query for %s via cadet to %s\n",
566               GNUNET_h2s (&sr->query),
567               GNUNET_i2s (&mh->target));
568   env = GNUNET_MQ_msg (sqm,
569                        GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
570   sqm->type = htonl (sr->type);
571   sqm->query = sr->query;
572   GNUNET_MQ_notify_sent (env,
573                          &transmit_pending,
574                          mh);
575   GNUNET_MQ_send (mq,
576                   env);
577 }
578
579
580 /**
581  * Get (or create) a cadet to talk to the given peer.
582  *
583  * @param target peer we want to communicate with
584  */
585 static struct CadetHandle *
586 get_cadet (const struct GNUNET_PeerIdentity *target)
587 {
588   struct CadetHandle *mh;
589
590   mh = GNUNET_CONTAINER_multipeermap_get (cadet_map,
591                                           target);
592   if (NULL != mh)
593   {
594     if (NULL != mh->timeout_task)
595     {
596       GNUNET_SCHEDULER_cancel (mh->timeout_task);
597       mh->timeout_task = NULL;
598     }
599     return mh;
600   }
601   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
602               "Creating cadet channel to %s\n",
603               GNUNET_i2s (target));
604   mh = GNUNET_new (struct CadetHandle);
605   mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
606                                                  &reset_cadet_task,
607                                                  mh);
608   mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
609   mh->target = *target;
610   GNUNET_assert (GNUNET_OK ==
611                  GNUNET_CONTAINER_multipeermap_put (cadet_map,
612                                                     &mh->target,
613                                                     mh,
614                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
615   {
616     struct GNUNET_MQ_MessageHandler handlers[] = {
617       GNUNET_MQ_hd_var_size (reply,
618                              GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
619                              struct CadetReplyMessage,
620                              mh),
621       GNUNET_MQ_handler_end ()
622     };
623     struct GNUNET_HashCode port;
624
625     GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
626                         strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
627                         &port);
628     mh->channel = GNUNET_CADET_channel_create (cadet_handle,
629                                                mh,
630                                                &mh->target,
631                                                &port,
632                                                GNUNET_CADET_OPTION_RELIABLE,
633                                                &window_change_cb,
634                                                &disconnect_cb,
635                                                handlers);
636   }
637   return mh;
638 }
639
640
641 /**
642  * Look for a block by directly contacting a particular peer.
643  *
644  * @param target peer that should have the block
645  * @param query hash to query for the block
646  * @param type desired type for the block
647  * @param proc function to call with result
648  * @param proc_cls closure for @a proc
649  * @return handle to cancel the operation
650  */
651 struct GSF_CadetRequest *
652 GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
653                  const struct GNUNET_HashCode *query,
654                  enum GNUNET_BLOCK_Type type,
655                  GSF_CadetReplyProcessor proc,
656                  void *proc_cls)
657 {
658   struct CadetHandle *mh;
659   struct GSF_CadetRequest *sr;
660
661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
662               "Preparing to send query for %s via cadet to %s\n",
663               GNUNET_h2s (query),
664               GNUNET_i2s (target));
665   mh = get_cadet (target);
666   sr = GNUNET_new (struct GSF_CadetRequest);
667   sr->mh = mh;
668   sr->proc = proc;
669   sr->proc_cls = proc_cls;
670   sr->type = type;
671   sr->query = *query;
672   GNUNET_CONTAINER_DLL_insert (mh->pending_head,
673                                mh->pending_tail,
674                                sr);
675   transmit_pending (mh);
676   return sr;
677 }
678
679
680 /**
681  * Cancel an active request; must not be called after 'proc'
682  * was calld.
683  *
684  * @param sr request to cancel
685  */
686 void
687 GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
688 {
689   struct CadetHandle *mh = sr->mh;
690   GSF_CadetReplyProcessor p;
691
692   p = sr->proc;
693   sr->proc = NULL;
694   if (NULL != p)
695   {
696     /* signal failure / cancellation to callback */
697     p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
698        GNUNET_TIME_UNIT_ZERO_ABS,
699        0, NULL);
700   }
701   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702               "Cancelled query for %s via cadet to %s\n",
703               GNUNET_h2s (&sr->query),
704               GNUNET_i2s (&sr->mh->target));
705   if (GNUNET_YES == sr->was_transmitted)
706     GNUNET_assert (GNUNET_OK ==
707                    GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
708                                                          &sr->query,
709                                                          sr));
710   else
711     GNUNET_CONTAINER_DLL_remove (mh->pending_head,
712                                  mh->pending_tail,
713                                  sr);
714   GNUNET_free (sr);
715   if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
716        (NULL == mh->pending_head) )
717     mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
718                                                      &cadet_timeout,
719                                                      mh);
720 }
721
722
723 /**
724  * Function called on each active cadets to shut them down.
725  *
726  * @param cls NULL
727  * @param key target peer, unused
728  * @param value the `struct CadetHandle` to destroy
729  * @return #GNUNET_YES (continue to iterate)
730  */
731 int
732 GSF_cadet_release_clients (void *cls,
733                            const struct GNUNET_PeerIdentity *key,
734                            void *value)
735 {
736   struct CadetHandle *mh = value;
737
738   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
739               "Timeout on cadet channel to %s\n",
740               GNUNET_i2s (&mh->target));
741   if (NULL != mh->channel)
742     GNUNET_CADET_channel_destroy (mh->channel);
743   return GNUNET_YES;
744 }
745
746
747
748 /* end of gnunet-service-fs_cadet_client.c */