tolerate additional IPv4 address now available for gnunet.org
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cadet_client.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_cadet_client.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - PORT is set to old application type, unsure if we should keep
28  *   it that way (fine for now)
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_cadet_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_cadet.h"
39
40
41 /**
42  * After how long do we reset connections without replies?
43  */
44 #define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
45
46
47 /**
48  * Handle for a cadet to another peer.
49  */
50 struct CadetHandle;
51
52
53 /**
54  * Handle for a request that is going out via cadet API.
55  */
56 struct GSF_CadetRequest
57 {
58
59   /**
60    * DLL.
61    */
62   struct GSF_CadetRequest *next;
63
64   /**
65    * DLL.
66    */
67   struct GSF_CadetRequest *prev;
68
69   /**
70    * Which cadet is this request associated with?
71    */
72   struct CadetHandle *mh;
73
74   /**
75    * Function to call with the result.
76    */
77   GSF_CadetReplyProcessor proc;
78
79   /**
80    * Closure for @e proc
81    */
82   void *proc_cls;
83
84   /**
85    * Query to transmit to the other peer.
86    */
87   struct GNUNET_HashCode query;
88
89   /**
90    * Desired type for the reply.
91    */
92   enum GNUNET_BLOCK_Type type;
93
94   /**
95    * Did we transmit this request already? #GNUNET_YES if we are
96    * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL.
97    */
98   int was_transmitted;
99 };
100
101
102 /**
103  * Handle for a cadet to another peer.
104  */
105 struct CadetHandle
106 {
107   /**
108    * Head of DLL of pending requests on this cadet.
109    */
110   struct GSF_CadetRequest *pending_head;
111
112   /**
113    * Tail of DLL of pending requests on this cadet.
114    */
115   struct GSF_CadetRequest *pending_tail;
116
117   /**
118    * Map from query to `struct GSF_CadetRequest`s waiting for
119    * a reply.
120    */
121   struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
122
123   /**
124    * Channel to the other peer.
125    */
126   struct GNUNET_CADET_Channel *channel;
127
128   /**
129    * Which peer does this cadet go to?
130    */
131   struct GNUNET_PeerIdentity target;
132
133   /**
134    * Task to kill inactive cadets (we keep them around for
135    * a few seconds to give the application a chance to give
136    * us another query).
137    */
138   struct GNUNET_SCHEDULER_Task *timeout_task;
139
140   /**
141    * Task to reset cadets that had errors (asynchronously,
142    * as we may not be able to do it immediately during a
143    * callback from the cadet API).
144    */
145   struct GNUNET_SCHEDULER_Task *reset_task;
146
147 };
148
149
150 /**
151  * Cadet channel for creating outbound channels.
152  */
153 struct GNUNET_CADET_Handle *cadet_handle;
154
155 /**
156  * Map from peer identities to 'struct CadetHandles' with cadet
157  * channels to those peers.
158  */
159 struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
160
161
162 /* ********************* client-side code ************************* */
163
164
165 /**
166  * Transmit pending requests via the cadet.
167  *
168  * @param cls `struct CadetHandle` to process
169  */
170 static void
171 transmit_pending (void *cls);
172
173
174 /**
175  * Iterator called on each entry in a waiting map to
176  * move it back to the pending list.
177  *
178  * @param cls the `struct CadetHandle`
179  * @param key the key of the entry in the map (the query)
180  * @param value the `struct GSF_CadetRequest` to move to pending
181  * @return #GNUNET_YES (continue to iterate)
182  */
183 static int
184 move_to_pending (void *cls,
185                  const struct GNUNET_HashCode *key,
186                  void *value)
187 {
188   struct CadetHandle *mh = cls;
189   struct GSF_CadetRequest *sr = value;
190
191   GNUNET_assert (GNUNET_YES ==
192                  GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
193                                                        key,
194                                                        value));
195   GNUNET_CONTAINER_DLL_insert (mh->pending_head,
196                                mh->pending_tail,
197                                sr);
198   sr->was_transmitted = GNUNET_NO;
199   return GNUNET_YES;
200 }
201
202
203 /**
204  * Functions with this signature are called whenever a complete reply
205  * is received.
206  *
207  * @param cls closure with the `struct CadetHandle`
208  * @param srm the actual message
209  * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
210  */
211 static int
212 check_reply (void *cls,
213              const struct CadetReplyMessage *srm)
214 {
215   /* We check later... */
216   return GNUNET_OK;
217 }
218
219
220 /**
221  * Task called when it is time to reset an cadet.
222  *
223  * @param cls the `struct CadetHandle` to tear down
224  */
225 static void
226 reset_cadet_task (void *cls);
227
228
229 /**
230  * We had a serious error, tear down and re-create cadet from scratch,
231  * but do so asynchronously.
232  *
233  * @param mh cadet to reset
234  */
235 static void
236 reset_cadet_async (struct CadetHandle *mh)
237 {
238   if (NULL != mh->reset_task)
239     GNUNET_SCHEDULER_cancel (mh->reset_task);
240   mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task,
241                                              mh);
242 }
243
244
245 /**
246  * Closure for handle_reply().
247  */
248 struct HandleReplyClosure
249 {
250
251   /**
252    * Reply payload.
253    */
254   const void *data;
255
256   /**
257    * Expiration time for the block.
258    */
259   struct GNUNET_TIME_Absolute expiration;
260
261   /**
262    * Number of bytes in @e data.
263    */
264   size_t data_size;
265
266   /**
267    * Type of the block.
268    */
269   enum GNUNET_BLOCK_Type type;
270
271   /**
272    * Did we have a matching query?
273    */
274   int found;
275 };
276
277
278 /**
279  * Iterator called on each entry in a waiting map to
280  * process a result.
281  *
282  * @param cls the `struct HandleReplyClosure`
283  * @param key the key of the entry in the map (the query)
284  * @param value the `struct GSF_CadetRequest` to handle result for
285  * @return #GNUNET_YES (continue to iterate)
286  */
287 static int
288 process_reply (void *cls,
289                const struct GNUNET_HashCode *key,
290                void *value)
291 {
292   struct HandleReplyClosure *hrc = cls;
293   struct GSF_CadetRequest *sr = value;
294
295   sr->proc (sr->proc_cls,
296             hrc->type,
297             hrc->expiration,
298             hrc->data_size,
299             hrc->data);
300   sr->proc = NULL;
301   GSF_cadet_query_cancel (sr);
302   hrc->found = GNUNET_YES;
303   return GNUNET_YES;
304 }
305
306
307 /**
308  * Iterator called on each entry in a waiting map to
309  * call the 'proc' continuation and release associated
310  * resources.
311  *
312  * @param cls the `struct CadetHandle`
313  * @param key the key of the entry in the map (the query)
314  * @param value the `struct GSF_CadetRequest` to clean up
315  * @return #GNUNET_YES (continue to iterate)
316  */
317 static int
318 free_waiting_entry (void *cls,
319                     const struct GNUNET_HashCode *key,
320                     void *value)
321 {
322   struct GSF_CadetRequest *sr = value;
323
324   GSF_cadet_query_cancel (sr);
325   return GNUNET_YES;
326 }
327
328
329 /**
330  * Functions with this signature are called whenever a complete reply
331  * is received.
332  *
333  * @param cls closure with the `struct CadetHandle`
334  * @param srm the actual message
335  */
336 static void
337 handle_reply (void *cls,
338               const struct CadetReplyMessage *srm)
339 {
340   struct CadetHandle *mh = cls;
341   struct HandleReplyClosure hrc;
342   uint16_t msize;
343   enum GNUNET_BLOCK_Type type;
344   struct GNUNET_HashCode query;
345
346   msize = ntohs (srm->header.size) - sizeof (struct CadetReplyMessage);
347   type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
348   if (GNUNET_YES !=
349       GNUNET_BLOCK_get_key (GSF_block_ctx,
350                             type,
351                             &srm[1],
352                             msize,
353                             &query))
354   {
355     GNUNET_break_op (0);
356     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
357                 "Received bogus reply of type %u with %u bytes via cadet from peer %s\n",
358                 type,
359                 msize,
360                 GNUNET_i2s (&mh->target));
361     reset_cadet_async (mh);
362     return;
363   }
364   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365               "Received reply `%s' via cadet from peer %s\n",
366               GNUNET_h2s (&query),
367               GNUNET_i2s (&mh->target));
368   GNUNET_CADET_receive_done (mh->channel);
369   GNUNET_STATISTICS_update (GSF_stats,
370                             gettext_noop ("# replies received via cadet"), 1,
371                             GNUNET_NO);
372   hrc.data = &srm[1];
373   hrc.data_size = msize;
374   hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
375   hrc.type = type;
376   hrc.found = GNUNET_NO;
377   GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
378                                               &query,
379                                               &process_reply,
380                                               &hrc);
381   if (GNUNET_NO == hrc.found)
382   {
383     GNUNET_STATISTICS_update (GSF_stats,
384                               gettext_noop ("# replies received via cadet dropped"), 1,
385                               GNUNET_NO);
386   }
387 }
388
389
390 /**
391  * Function called by cadet when a client disconnects.
392  * Cleans up our `struct CadetClient` of that channel.
393  *
394  * @param cls our `struct CadetClient`
395  * @param channel channel of the disconnecting client
396  */
397 static void
398 disconnect_cb (void *cls,
399                const struct GNUNET_CADET_Channel *channel)
400 {
401   struct CadetHandle *mh = cls;
402   struct GSF_CadetRequest *sr;
403
404   if (NULL == mh->channel)
405     return; /* being destroyed elsewhere */
406   GNUNET_assert (channel == mh->channel);
407   mh->channel = NULL;
408   while (NULL != (sr = mh->pending_head))
409     GSF_cadet_query_cancel (sr);
410   /* first remove `mh` from the `cadet_map`, so that if the
411      callback from `free_waiting_entry()` happens to re-issue
412      the request, we don't immediately have it back in the
413      `waiting_map`. */
414   GNUNET_assert (GNUNET_OK ==
415                  GNUNET_CONTAINER_multipeermap_remove (cadet_map,
416                                                        &mh->target,
417                                                        mh));
418   GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
419                                          &free_waiting_entry,
420                                          mh);
421   if (NULL != mh->timeout_task)
422     GNUNET_SCHEDULER_cancel (mh->timeout_task);
423   if (NULL != mh->reset_task)
424     GNUNET_SCHEDULER_cancel (mh->reset_task);
425   GNUNET_assert (0 ==
426                  GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
427   GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
428   GNUNET_free (mh);
429 }
430
431
432 /**
433  * Function called whenever an MQ-channel's transmission window size changes.
434  *
435  * The first callback in an outgoing channel will be with a non-zero value
436  * and will mean the channel is connected to the destination.
437  *
438  * For an incoming channel it will be called immediately after the
439  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
440  *
441  * @param cls Channel closure.
442  * @param channel Connection to the other end (henceforth invalid).
443  * @param window_size New window size. If the is more messages than buffer size
444  *                    this value will be negative..
445  */
446 static void
447 window_change_cb (void *cls,
448                   const struct GNUNET_CADET_Channel *channel,
449                   int window_size)
450 {
451   /* FIXME: for flow control, implement? */
452 #if 0
453   /* Something like this instead of the GNUNET_MQ_notify_sent() in
454      transmit_pending() might be good (once the window change CB works...) */
455   if (0 < window_size) /* test needed? */
456     transmit_pending (mh);
457 #endif
458 }
459
460
461 /**
462  * We had a serious error, tear down and re-create cadet from scratch.
463  *
464  * @param mh cadet to reset
465  */
466 static void
467 reset_cadet (struct CadetHandle *mh)
468 {
469   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
470               "Resetting cadet channel to %s\n",
471               GNUNET_i2s (&mh->target));
472   if (NULL != mh->channel)
473   {
474     GNUNET_CADET_channel_destroy (mh->channel);
475     mh->channel = NULL;
476   }
477   GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
478                                          &move_to_pending,
479                                          mh);
480   {
481     struct GNUNET_MQ_MessageHandler handlers[] = {
482       GNUNET_MQ_hd_var_size (reply,
483                              GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
484                              struct CadetReplyMessage,
485                              mh),
486       GNUNET_MQ_handler_end ()
487     };
488     struct GNUNET_HashCode port;
489
490     GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
491                         strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
492                         &port);
493     mh->channel = GNUNET_CADET_channel_create (cadet_handle,
494                                                mh,
495                                                &mh->target,
496                                                &port,
497                                                GNUNET_CADET_OPTION_RELIABLE,
498                                                &window_change_cb,
499                                                &disconnect_cb,
500                                                handlers);
501   }
502   transmit_pending (mh);
503 }
504
505
506 /**
507  * Task called when it is time to destroy an inactive cadet channel.
508  *
509  * @param cls the `struct CadetHandle` to tear down
510  */
511 static void
512 cadet_timeout (void *cls)
513 {
514   struct CadetHandle *mh = cls;
515   struct GNUNET_CADET_Channel *tun;
516
517   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518               "Timeout on cadet channel to %s\n",
519               GNUNET_i2s (&mh->target));
520   mh->timeout_task = NULL;
521   tun = mh->channel;
522   mh->channel = NULL;
523   if (NULL != tun)
524     GNUNET_CADET_channel_destroy (tun);
525 }
526
527
528 /**
529  * Task called when it is time to reset an cadet.
530  *
531  * @param cls the `struct CadetHandle` to tear down
532  */
533 static void
534 reset_cadet_task (void *cls)
535 {
536   struct CadetHandle *mh = cls;
537
538   mh->reset_task = NULL;
539   reset_cadet (mh);
540 }
541
542
543 /**
544  * Transmit pending requests via the cadet.
545  *
546  * @param cls `struct CadetHandle` to process
547  */
548 static void
549 transmit_pending (void *cls)
550 {
551   struct CadetHandle *mh = cls;
552   struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel);
553   struct GSF_CadetRequest *sr;
554   struct GNUNET_MQ_Envelope *env;
555   struct CadetQueryMessage *sqm;
556
557   if ( (0 != GNUNET_MQ_get_length (mq)) ||
558        (NULL == (sr = mh->pending_head)) )
559     return;
560   GNUNET_CONTAINER_DLL_remove (mh->pending_head,
561                                mh->pending_tail,
562                                sr);
563   GNUNET_assert (GNUNET_OK ==
564                  GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
565                                                     &sr->query,
566                                                     sr,
567                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
568   sr->was_transmitted = GNUNET_YES;
569   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
570               "Sending query for %s via cadet to %s\n",
571               GNUNET_h2s (&sr->query),
572               GNUNET_i2s (&mh->target));
573   env = GNUNET_MQ_msg (sqm,
574                        GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
575   sqm->type = htonl (sr->type);
576   sqm->query = sr->query;
577   GNUNET_MQ_notify_sent (env,
578                          &transmit_pending,
579                          mh);
580   GNUNET_MQ_send (mq,
581                   env);
582 }
583
584
585 /**
586  * Get (or create) a cadet to talk to the given peer.
587  *
588  * @param target peer we want to communicate with
589  */
590 static struct CadetHandle *
591 get_cadet (const struct GNUNET_PeerIdentity *target)
592 {
593   struct CadetHandle *mh;
594
595   mh = GNUNET_CONTAINER_multipeermap_get (cadet_map,
596                                           target);
597   if (NULL != mh)
598   {
599     if (NULL != mh->timeout_task)
600     {
601       GNUNET_SCHEDULER_cancel (mh->timeout_task);
602       mh->timeout_task = NULL;
603     }
604     return mh;
605   }
606   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
607               "Creating cadet channel to %s\n",
608               GNUNET_i2s (target));
609   mh = GNUNET_new (struct CadetHandle);
610   mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
611                                                  &reset_cadet_task,
612                                                  mh);
613   mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
614   mh->target = *target;
615   GNUNET_assert (GNUNET_OK ==
616                  GNUNET_CONTAINER_multipeermap_put (cadet_map,
617                                                     &mh->target,
618                                                     mh,
619                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
620   {
621     struct GNUNET_MQ_MessageHandler handlers[] = {
622       GNUNET_MQ_hd_var_size (reply,
623                              GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
624                              struct CadetReplyMessage,
625                              mh),
626       GNUNET_MQ_handler_end ()
627     };
628     struct GNUNET_HashCode port;
629
630     GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
631                         strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
632                         &port);
633     mh->channel = GNUNET_CADET_channel_create (cadet_handle,
634                                                mh,
635                                                &mh->target,
636                                                &port,
637                                                GNUNET_CADET_OPTION_RELIABLE,
638                                                &window_change_cb,
639                                                &disconnect_cb,
640                                                handlers);
641   }
642   return mh;
643 }
644
645
646 /**
647  * Look for a block by directly contacting a particular peer.
648  *
649  * @param target peer that should have the block
650  * @param query hash to query for the block
651  * @param type desired type for the block
652  * @param proc function to call with result
653  * @param proc_cls closure for @a proc
654  * @return handle to cancel the operation
655  */
656 struct GSF_CadetRequest *
657 GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
658                  const struct GNUNET_HashCode *query,
659                  enum GNUNET_BLOCK_Type type,
660                  GSF_CadetReplyProcessor proc,
661                  void *proc_cls)
662 {
663   struct CadetHandle *mh;
664   struct GSF_CadetRequest *sr;
665
666   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
667               "Preparing to send query for %s via cadet to %s\n",
668               GNUNET_h2s (query),
669               GNUNET_i2s (target));
670   mh = get_cadet (target);
671   sr = GNUNET_new (struct GSF_CadetRequest);
672   sr->mh = mh;
673   sr->proc = proc;
674   sr->proc_cls = proc_cls;
675   sr->type = type;
676   sr->query = *query;
677   GNUNET_CONTAINER_DLL_insert (mh->pending_head,
678                                mh->pending_tail,
679                                sr);
680   transmit_pending (mh);
681   return sr;
682 }
683
684
685 /**
686  * Cancel an active request; must not be called after 'proc'
687  * was calld.
688  *
689  * @param sr request to cancel
690  */
691 void
692 GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
693 {
694   struct CadetHandle *mh = sr->mh;
695   GSF_CadetReplyProcessor p;
696
697   p = sr->proc;
698   sr->proc = NULL;
699   if (NULL != p)
700   {
701     /* signal failure / cancellation to callback */
702     p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
703        GNUNET_TIME_UNIT_ZERO_ABS,
704        0, NULL);
705   }
706   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707               "Cancelled query for %s via cadet to %s\n",
708               GNUNET_h2s (&sr->query),
709               GNUNET_i2s (&sr->mh->target));
710   if (GNUNET_YES == sr->was_transmitted)
711     GNUNET_assert (GNUNET_OK ==
712                    GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
713                                                          &sr->query,
714                                                          sr));
715   else
716     GNUNET_CONTAINER_DLL_remove (mh->pending_head,
717                                  mh->pending_tail,
718                                  sr);
719   GNUNET_free (sr);
720   if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
721        (NULL == mh->pending_head) )
722     mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
723                                                      &cadet_timeout,
724                                                      mh);
725 }
726
727
728 /**
729  * Function called on each active cadets to shut them down.
730  *
731  * @param cls NULL
732  * @param key target peer, unused
733  * @param value the `struct CadetHandle` to destroy
734  * @return #GNUNET_YES (continue to iterate)
735  */
736 int
737 GSF_cadet_release_clients (void *cls,
738                            const struct GNUNET_PeerIdentity *key,
739                            void *value)
740 {
741   struct CadetHandle *mh = value;
742
743   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744               "Timeout on cadet channel to %s\n",
745               GNUNET_i2s (&mh->target));
746   if (NULL != mh->channel)
747   {
748     struct GNUNET_CADET_Channel *channel = mh->channel;
749
750     mh->channel = NULL;
751     GNUNET_CADET_channel_destroy (channel);
752   }
753   if (NULL != mh->reset_task)
754   {
755     GNUNET_SCHEDULER_cancel (mh->reset_task);
756     mh->reset_task = NULL;
757   }
758   return GNUNET_YES;
759 }
760
761
762
763 /* end of gnunet-service-fs_cadet_client.c */