error handling
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cadet_client.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file fs/gnunet-service-fs_cadet_client.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - PORT is set to old application type, unsure if we should keep
28  *   it that way (fine for now)
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_cadet_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_cadet.h"
39
40
41 /**
42  * After how long do we reset connections without replies?
43  */
44 #define CLIENT_RETRY_TIMEOUT \
45   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
46
47
48 /**
49  * Handle for a cadet to another peer.
50  */
51 struct CadetHandle;
52
53
54 /**
55  * Handle for a request that is going out via cadet API.
56  */
57 struct GSF_CadetRequest
58 {
59   /**
60    * DLL.
61    */
62   struct GSF_CadetRequest *next;
63
64   /**
65    * DLL.
66    */
67   struct GSF_CadetRequest *prev;
68
69   /**
70    * Which cadet is this request associated with?
71    */
72   struct CadetHandle *mh;
73
74   /**
75    * Function to call with the result.
76    */
77   GSF_CadetReplyProcessor proc;
78
79   /**
80    * Closure for @e proc
81    */
82   void *proc_cls;
83
84   /**
85    * Query to transmit to the other peer.
86    */
87   struct GNUNET_HashCode query;
88
89   /**
90    * Desired type for the reply.
91    */
92   enum GNUNET_BLOCK_Type type;
93
94   /**
95    * Did we transmit this request already? #GNUNET_YES if we are
96    * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL.
97    */
98   int was_transmitted;
99 };
100
101
102 /**
103  * Handle for a cadet to another peer.
104  */
105 struct CadetHandle
106 {
107   /**
108    * Head of DLL of pending requests on this cadet.
109    */
110   struct GSF_CadetRequest *pending_head;
111
112   /**
113    * Tail of DLL of pending requests on this cadet.
114    */
115   struct GSF_CadetRequest *pending_tail;
116
117   /**
118    * Map from query to `struct GSF_CadetRequest`s waiting for
119    * a reply.
120    */
121   struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
122
123   /**
124    * Channel to the other peer.
125    */
126   struct GNUNET_CADET_Channel *channel;
127
128   /**
129    * Which peer does this cadet go to?
130    */
131   struct GNUNET_PeerIdentity target;
132
133   /**
134    * Task to kill inactive cadets (we keep them around for
135    * a few seconds to give the application a chance to give
136    * us another query).
137    */
138   struct GNUNET_SCHEDULER_Task *timeout_task;
139
140   /**
141    * Task to reset cadets that had errors (asynchronously,
142    * as we may not be able to do it immediately during a
143    * callback from the cadet API).
144    */
145   struct GNUNET_SCHEDULER_Task *reset_task;
146 };
147
148
149 /**
150  * Cadet channel for creating outbound channels.
151  */
152 struct GNUNET_CADET_Handle *cadet_handle;
153
154 /**
155  * Map from peer identities to 'struct CadetHandles' with cadet
156  * channels to those peers.
157  */
158 struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
159
160
161 /* ********************* client-side code ************************* */
162
163
164 /**
165  * Transmit pending requests via the cadet.
166  *
167  * @param cls `struct CadetHandle` to process
168  */
169 static void
170 transmit_pending (void *cls);
171
172
173 /**
174  * Iterator called on each entry in a waiting map to
175  * move it back to the pending list.
176  *
177  * @param cls the `struct CadetHandle`
178  * @param key the key of the entry in the map (the query)
179  * @param value the `struct GSF_CadetRequest` to move to pending
180  * @return #GNUNET_YES (continue to iterate)
181  */
182 static int
183 move_to_pending (void *cls, const struct GNUNET_HashCode *key, void *value)
184 {
185   struct CadetHandle *mh = cls;
186   struct GSF_CadetRequest *sr = value;
187
188   GNUNET_assert (
189     GNUNET_YES ==
190     GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, key, value));
191   GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr);
192   sr->was_transmitted = GNUNET_NO;
193   return GNUNET_YES;
194 }
195
196
197 /**
198  * Functions with this signature are called whenever a complete reply
199  * is received.
200  *
201  * @param cls closure with the `struct CadetHandle`
202  * @param srm the actual message
203  * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
204  */
205 static int
206 check_reply (void *cls, const struct CadetReplyMessage *srm)
207 {
208   /* We check later... */
209   return GNUNET_OK;
210 }
211
212
213 /**
214  * Task called when it is time to reset an cadet.
215  *
216  * @param cls the `struct CadetHandle` to tear down
217  */
218 static void
219 reset_cadet_task (void *cls);
220
221
222 /**
223  * We had a serious error, tear down and re-create cadet from scratch,
224  * but do so asynchronously.
225  *
226  * @param mh cadet to reset
227  */
228 static void
229 reset_cadet_async (struct CadetHandle *mh)
230 {
231   if (NULL != mh->reset_task)
232     GNUNET_SCHEDULER_cancel (mh->reset_task);
233   mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task, mh);
234 }
235
236
237 /**
238  * Closure for handle_reply().
239  */
240 struct HandleReplyClosure
241 {
242   /**
243    * Reply payload.
244    */
245   const void *data;
246
247   /**
248    * Expiration time for the block.
249    */
250   struct GNUNET_TIME_Absolute expiration;
251
252   /**
253    * Number of bytes in @e data.
254    */
255   size_t data_size;
256
257   /**
258    * Type of the block.
259    */
260   enum GNUNET_BLOCK_Type type;
261
262   /**
263    * Did we have a matching query?
264    */
265   int found;
266 };
267
268
269 /**
270  * Iterator called on each entry in a waiting map to
271  * process a result.
272  *
273  * @param cls the `struct HandleReplyClosure`
274  * @param key the key of the entry in the map (the query)
275  * @param value the `struct GSF_CadetRequest` to handle result for
276  * @return #GNUNET_YES (continue to iterate)
277  */
278 static int
279 process_reply (void *cls, const struct GNUNET_HashCode *key, void *value)
280 {
281   struct HandleReplyClosure *hrc = cls;
282   struct GSF_CadetRequest *sr = value;
283
284   sr->proc (sr->proc_cls,
285             hrc->type,
286             hrc->expiration,
287             hrc->data_size,
288             hrc->data);
289   sr->proc = NULL;
290   GSF_cadet_query_cancel (sr);
291   hrc->found = GNUNET_YES;
292   return GNUNET_YES;
293 }
294
295
296 /**
297  * Iterator called on each entry in a waiting map to
298  * call the 'proc' continuation and release associated
299  * resources.
300  *
301  * @param cls the `struct CadetHandle`
302  * @param key the key of the entry in the map (the query)
303  * @param value the `struct GSF_CadetRequest` to clean up
304  * @return #GNUNET_YES (continue to iterate)
305  */
306 static int
307 free_waiting_entry (void *cls, const struct GNUNET_HashCode *key, void *value)
308 {
309   struct GSF_CadetRequest *sr = value;
310
311   GSF_cadet_query_cancel (sr);
312   return GNUNET_YES;
313 }
314
315
316 /**
317  * Functions with this signature are called whenever a complete reply
318  * is received.
319  *
320  * @param cls closure with the `struct CadetHandle`
321  * @param srm the actual message
322  */
323 static void
324 handle_reply (void *cls, const struct CadetReplyMessage *srm)
325 {
326   struct CadetHandle *mh = cls;
327   struct HandleReplyClosure hrc;
328   uint16_t msize;
329   enum GNUNET_BLOCK_Type type;
330   struct GNUNET_HashCode query;
331
332   msize = ntohs (srm->header.size) - sizeof(struct CadetReplyMessage);
333   type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
334   if (GNUNET_YES !=
335       GNUNET_BLOCK_get_key (GSF_block_ctx, type, &srm[1], msize, &query))
336   {
337     GNUNET_break_op (0);
338     GNUNET_log (
339       GNUNET_ERROR_TYPE_WARNING,
340       "Received bogus reply of type %u with %u bytes via cadet from peer %s\n",
341       type,
342       msize,
343       GNUNET_i2s (&mh->target));
344     reset_cadet_async (mh);
345     return;
346   }
347   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
348               "Received reply `%s' via cadet from peer %s\n",
349               GNUNET_h2s (&query),
350               GNUNET_i2s (&mh->target));
351   GNUNET_CADET_receive_done (mh->channel);
352   GNUNET_STATISTICS_update (GSF_stats,
353                             gettext_noop ("# replies received via cadet"),
354                             1,
355                             GNUNET_NO);
356   hrc.data = &srm[1];
357   hrc.data_size = msize;
358   hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
359   hrc.type = type;
360   hrc.found = GNUNET_NO;
361   GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
362                                               &query,
363                                               &process_reply,
364                                               &hrc);
365   if (GNUNET_NO == hrc.found)
366   {
367     GNUNET_STATISTICS_update (GSF_stats,
368                               gettext_noop (
369                                 "# replies received via cadet dropped"),
370                               1,
371                               GNUNET_NO);
372   }
373 }
374
375
376 /**
377  * Function called by cadet when a client disconnects.
378  * Cleans up our `struct CadetClient` of that channel.
379  *
380  * @param cls our `struct CadetClient`
381  * @param channel channel of the disconnecting client
382  */
383 static void
384 disconnect_cb (void *cls, const struct GNUNET_CADET_Channel *channel)
385 {
386   struct CadetHandle *mh = cls;
387   struct GSF_CadetRequest *sr;
388
389   if (NULL == mh->channel)
390     return; /* being destroyed elsewhere */
391   GNUNET_assert (channel == mh->channel);
392   mh->channel = NULL;
393   while (NULL != (sr = mh->pending_head))
394     GSF_cadet_query_cancel (sr);
395   /* first remove `mh` from the `cadet_map`, so that if the
396      callback from `free_waiting_entry()` happens to re-issue
397      the request, we don't immediately have it back in the
398      `waiting_map`. */
399   GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_remove (cadet_map,
400                                                                     &mh->target,
401                                                                     mh));
402   GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
403                                          &free_waiting_entry,
404                                          mh);
405   if (NULL != mh->timeout_task)
406     GNUNET_SCHEDULER_cancel (mh->timeout_task);
407   if (NULL != mh->reset_task)
408     GNUNET_SCHEDULER_cancel (mh->reset_task);
409   GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
410   GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
411   GNUNET_free (mh);
412 }
413
414
415 /**
416  * Function called whenever an MQ-channel's transmission window size changes.
417  *
418  * The first callback in an outgoing channel will be with a non-zero value
419  * and will mean the channel is connected to the destination.
420  *
421  * For an incoming channel it will be called immediately after the
422  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
423  *
424  * @param cls Channel closure.
425  * @param channel Connection to the other end (henceforth invalid).
426  * @param window_size New window size. If the is more messages than buffer size
427  *                    this value will be negative..
428  */
429 static void
430 window_change_cb (void *cls,
431                   const struct GNUNET_CADET_Channel *channel,
432                   int window_size)
433 {
434   /* FIXME: for flow control, implement? */
435 #if 0
436   /* Something like this instead of the GNUNET_MQ_notify_sent() in
437      transmit_pending() might be good (once the window change CB works...) */
438   if (0 < window_size) /* test needed? */
439     transmit_pending (mh);
440 #endif
441 }
442
443
444 /**
445  * We had a serious error, tear down and re-create cadet from scratch.
446  *
447  * @param mh cadet to reset
448  */
449 static void
450 reset_cadet (struct CadetHandle *mh)
451 {
452   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453               "Resetting cadet channel to %s\n",
454               GNUNET_i2s (&mh->target));
455   if (NULL != mh->channel)
456   {
457     GNUNET_CADET_channel_destroy (mh->channel);
458     mh->channel = NULL;
459   }
460   GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, &move_to_pending, mh);
461   {
462     struct GNUNET_MQ_MessageHandler handlers[] =
463     { GNUNET_MQ_hd_var_size (reply,
464                              GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
465                              struct CadetReplyMessage,
466                              mh),
467       GNUNET_MQ_handler_end () };
468     struct GNUNET_HashCode port;
469
470     GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
471                         strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
472                         &port);
473     mh->channel = GNUNET_CADET_channel_create (cadet_handle,
474                                                mh,
475                                                &mh->target,
476                                                &port,
477                                                &window_change_cb,
478                                                &disconnect_cb,
479                                                handlers);
480   }
481   transmit_pending (mh);
482 }
483
484
485 /**
486  * Task called when it is time to destroy an inactive cadet channel.
487  *
488  * @param cls the `struct CadetHandle` to tear down
489  */
490 static void
491 cadet_timeout (void *cls)
492 {
493   struct CadetHandle *mh = cls;
494   struct GNUNET_CADET_Channel *tun;
495
496   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
497               "Timeout on cadet channel to %s\n",
498               GNUNET_i2s (&mh->target));
499   mh->timeout_task = NULL;
500   tun = mh->channel;
501   mh->channel = NULL;
502   if (NULL != tun)
503     GNUNET_CADET_channel_destroy (tun);
504 }
505
506
507 /**
508  * Task called when it is time to reset an cadet.
509  *
510  * @param cls the `struct CadetHandle` to tear down
511  */
512 static void
513 reset_cadet_task (void *cls)
514 {
515   struct CadetHandle *mh = cls;
516
517   mh->reset_task = NULL;
518   reset_cadet (mh);
519 }
520
521
522 /**
523  * Transmit pending requests via the cadet.
524  *
525  * @param cls `struct CadetHandle` to process
526  */
527 static void
528 transmit_pending (void *cls)
529 {
530   struct CadetHandle *mh = cls;
531   struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel);
532   struct GSF_CadetRequest *sr;
533   struct GNUNET_MQ_Envelope *env;
534   struct CadetQueryMessage *sqm;
535
536   if ((0 != GNUNET_MQ_get_length (mq)) || (NULL == (sr = mh->pending_head)))
537     return;
538   GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr);
539   GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
540                    mh->waiting_map,
541                    &sr->query,
542                    sr,
543                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
544   sr->was_transmitted = GNUNET_YES;
545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
546               "Sending query for %s via cadet to %s\n",
547               GNUNET_h2s (&sr->query),
548               GNUNET_i2s (&mh->target));
549   env = GNUNET_MQ_msg (sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
550   GNUNET_MQ_env_set_options (env,
551                              GNUNET_MQ_PREF_GOODPUT
552                              | GNUNET_MQ_PREF_CORK_ALLOWED
553                              | GNUNET_MQ_PREF_OUT_OF_ORDER);
554   sqm->type = htonl (sr->type);
555   sqm->query = sr->query;
556   GNUNET_MQ_notify_sent (env, &transmit_pending, mh);
557   GNUNET_MQ_send (mq, env);
558 }
559
560
561 /**
562  * Get (or create) a cadet to talk to the given peer.
563  *
564  * @param target peer we want to communicate with
565  */
566 static struct CadetHandle *
567 get_cadet (const struct GNUNET_PeerIdentity *target)
568 {
569   struct CadetHandle *mh;
570
571   mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, target);
572   if (NULL != mh)
573   {
574     if (NULL != mh->timeout_task)
575     {
576       GNUNET_SCHEDULER_cancel (mh->timeout_task);
577       mh->timeout_task = NULL;
578     }
579     return mh;
580   }
581   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
582               "Creating cadet channel to %s\n",
583               GNUNET_i2s (target));
584   mh = GNUNET_new (struct CadetHandle);
585   mh->reset_task =
586     GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, &reset_cadet_task, mh);
587   mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
588   mh->target = *target;
589   GNUNET_assert (GNUNET_OK ==
590                  GNUNET_CONTAINER_multipeermap_put (
591                    cadet_map,
592                    &mh->target,
593                    mh,
594                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
595   {
596     struct GNUNET_MQ_MessageHandler handlers[] =
597     { GNUNET_MQ_hd_var_size (reply,
598                              GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
599                              struct CadetReplyMessage,
600                              mh),
601       GNUNET_MQ_handler_end () };
602     struct GNUNET_HashCode port;
603
604     GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
605                         strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
606                         &port);
607     mh->channel = GNUNET_CADET_channel_create (cadet_handle,
608                                                mh,
609                                                &mh->target,
610                                                &port,
611                                                &window_change_cb,
612                                                &disconnect_cb,
613                                                handlers);
614   }
615   return mh;
616 }
617
618
619 /**
620  * Look for a block by directly contacting a particular peer.
621  *
622  * @param target peer that should have the block
623  * @param query hash to query for the block
624  * @param type desired type for the block
625  * @param proc function to call with result
626  * @param proc_cls closure for @a proc
627  * @return handle to cancel the operation
628  */
629 struct GSF_CadetRequest *
630 GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
631                  const struct GNUNET_HashCode *query,
632                  enum GNUNET_BLOCK_Type type,
633                  GSF_CadetReplyProcessor proc,
634                  void *proc_cls)
635 {
636   struct CadetHandle *mh;
637   struct GSF_CadetRequest *sr;
638
639   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
640               "Preparing to send query for %s via cadet to %s\n",
641               GNUNET_h2s (query),
642               GNUNET_i2s (target));
643   mh = get_cadet (target);
644   sr = GNUNET_new (struct GSF_CadetRequest);
645   sr->mh = mh;
646   sr->proc = proc;
647   sr->proc_cls = proc_cls;
648   sr->type = type;
649   sr->query = *query;
650   GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr);
651   transmit_pending (mh);
652   return sr;
653 }
654
655
656 /**
657  * Cancel an active request; must not be called after 'proc'
658  * was calld.
659  *
660  * @param sr request to cancel
661  */
662 void
663 GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
664 {
665   struct CadetHandle *mh = sr->mh;
666   GSF_CadetReplyProcessor p;
667
668   p = sr->proc;
669   sr->proc = NULL;
670   if (NULL != p)
671   {
672     /* signal failure / cancellation to callback */
673     p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, GNUNET_TIME_UNIT_ZERO_ABS, 0, NULL);
674   }
675   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676               "Cancelled query for %s via cadet to %s\n",
677               GNUNET_h2s (&sr->query),
678               GNUNET_i2s (&sr->mh->target));
679   if (GNUNET_YES == sr->was_transmitted)
680     GNUNET_assert (
681       GNUNET_OK ==
682       GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, &sr->query, sr));
683   else
684     GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr);
685   GNUNET_free (sr);
686   if ((0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
687       (NULL == mh->pending_head))
688     mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
689                                                      &cadet_timeout,
690                                                      mh);
691 }
692
693
694 /**
695  * Function called on each active cadets to shut them down.
696  *
697  * @param cls NULL
698  * @param key target peer, unused
699  * @param value the `struct CadetHandle` to destroy
700  * @return #GNUNET_YES (continue to iterate)
701  */
702 int
703 GSF_cadet_release_clients (void *cls,
704                            const struct GNUNET_PeerIdentity *key,
705                            void *value)
706 {
707   struct CadetHandle *mh = value;
708
709   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
710               "Timeout on cadet channel to %s\n",
711               GNUNET_i2s (&mh->target));
712   if (NULL != mh->channel)
713   {
714     struct GNUNET_CADET_Channel *channel = mh->channel;
715
716     mh->channel = NULL;
717     GNUNET_CADET_channel_destroy (channel);
718   }
719   if (NULL != mh->reset_task)
720   {
721     GNUNET_SCHEDULER_cancel (mh->reset_task);
722     mh->reset_task = NULL;
723   }
724   return GNUNET_YES;
725 }
726
727
728 /* end of gnunet-service-fs_cadet_client.c */