uncrustify as demanded.
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cadet_client.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file fs/gnunet-service-fs_cadet_client.c
23  * @brief non-anonymous file-transfer
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - PORT is set to old application type, unsure if we should keep
28  *   it that way (fine for now)
29  */
30 #include "platform.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_cadet_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_cadet.h"
39
40
41 /**
42  * After how long do we reset connections without replies?
43  */
44 #define CLIENT_RETRY_TIMEOUT \
45   GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
46
47
48 /**
49  * Handle for a cadet to another peer.
50  */
51 struct CadetHandle;
52
53
54 /**
55  * Handle for a request that is going out via cadet API.
56  */
57 struct GSF_CadetRequest {
58   /**
59    * DLL.
60    */
61   struct GSF_CadetRequest *next;
62
63   /**
64    * DLL.
65    */
66   struct GSF_CadetRequest *prev;
67
68   /**
69    * Which cadet is this request associated with?
70    */
71   struct CadetHandle *mh;
72
73   /**
74    * Function to call with the result.
75    */
76   GSF_CadetReplyProcessor proc;
77
78   /**
79    * Closure for @e proc
80    */
81   void *proc_cls;
82
83   /**
84    * Query to transmit to the other peer.
85    */
86   struct GNUNET_HashCode query;
87
88   /**
89    * Desired type for the reply.
90    */
91   enum GNUNET_BLOCK_Type type;
92
93   /**
94    * Did we transmit this request already? #GNUNET_YES if we are
95    * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL.
96    */
97   int was_transmitted;
98 };
99
100
101 /**
102  * Handle for a cadet to another peer.
103  */
104 struct CadetHandle {
105   /**
106    * Head of DLL of pending requests on this cadet.
107    */
108   struct GSF_CadetRequest *pending_head;
109
110   /**
111    * Tail of DLL of pending requests on this cadet.
112    */
113   struct GSF_CadetRequest *pending_tail;
114
115   /**
116    * Map from query to `struct GSF_CadetRequest`s waiting for
117    * a reply.
118    */
119   struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
120
121   /**
122    * Channel to the other peer.
123    */
124   struct GNUNET_CADET_Channel *channel;
125
126   /**
127    * Which peer does this cadet go to?
128    */
129   struct GNUNET_PeerIdentity target;
130
131   /**
132    * Task to kill inactive cadets (we keep them around for
133    * a few seconds to give the application a chance to give
134    * us another query).
135    */
136   struct GNUNET_SCHEDULER_Task *timeout_task;
137
138   /**
139    * Task to reset cadets that had errors (asynchronously,
140    * as we may not be able to do it immediately during a
141    * callback from the cadet API).
142    */
143   struct GNUNET_SCHEDULER_Task *reset_task;
144 };
145
146
147 /**
148  * Cadet channel for creating outbound channels.
149  */
150 struct GNUNET_CADET_Handle *cadet_handle;
151
152 /**
153  * Map from peer identities to 'struct CadetHandles' with cadet
154  * channels to those peers.
155  */
156 struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
157
158
159 /* ********************* client-side code ************************* */
160
161
162 /**
163  * Transmit pending requests via the cadet.
164  *
165  * @param cls `struct CadetHandle` to process
166  */
167 static void
168 transmit_pending(void *cls);
169
170
171 /**
172  * Iterator called on each entry in a waiting map to
173  * move it back to the pending list.
174  *
175  * @param cls the `struct CadetHandle`
176  * @param key the key of the entry in the map (the query)
177  * @param value the `struct GSF_CadetRequest` to move to pending
178  * @return #GNUNET_YES (continue to iterate)
179  */
180 static int
181 move_to_pending(void *cls, const struct GNUNET_HashCode *key, void *value)
182 {
183   struct CadetHandle *mh = cls;
184   struct GSF_CadetRequest *sr = value;
185
186   GNUNET_assert(
187     GNUNET_YES ==
188     GNUNET_CONTAINER_multihashmap_remove(mh->waiting_map, key, value));
189   GNUNET_CONTAINER_DLL_insert(mh->pending_head, mh->pending_tail, sr);
190   sr->was_transmitted = GNUNET_NO;
191   return GNUNET_YES;
192 }
193
194
195 /**
196  * Functions with this signature are called whenever a complete reply
197  * is received.
198  *
199  * @param cls closure with the `struct CadetHandle`
200  * @param srm the actual message
201  * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
202  */
203 static int
204 check_reply(void *cls, const struct CadetReplyMessage *srm)
205 {
206   /* We check later... */
207   return GNUNET_OK;
208 }
209
210
211 /**
212  * Task called when it is time to reset an cadet.
213  *
214  * @param cls the `struct CadetHandle` to tear down
215  */
216 static void
217 reset_cadet_task(void *cls);
218
219
220 /**
221  * We had a serious error, tear down and re-create cadet from scratch,
222  * but do so asynchronously.
223  *
224  * @param mh cadet to reset
225  */
226 static void
227 reset_cadet_async(struct CadetHandle *mh)
228 {
229   if (NULL != mh->reset_task)
230     GNUNET_SCHEDULER_cancel(mh->reset_task);
231   mh->reset_task = GNUNET_SCHEDULER_add_now(&reset_cadet_task, mh);
232 }
233
234
235 /**
236  * Closure for handle_reply().
237  */
238 struct HandleReplyClosure {
239   /**
240    * Reply payload.
241    */
242   const void *data;
243
244   /**
245    * Expiration time for the block.
246    */
247   struct GNUNET_TIME_Absolute expiration;
248
249   /**
250    * Number of bytes in @e data.
251    */
252   size_t data_size;
253
254   /**
255    * Type of the block.
256    */
257   enum GNUNET_BLOCK_Type type;
258
259   /**
260    * Did we have a matching query?
261    */
262   int found;
263 };
264
265
266 /**
267  * Iterator called on each entry in a waiting map to
268  * process a result.
269  *
270  * @param cls the `struct HandleReplyClosure`
271  * @param key the key of the entry in the map (the query)
272  * @param value the `struct GSF_CadetRequest` to handle result for
273  * @return #GNUNET_YES (continue to iterate)
274  */
275 static int
276 process_reply(void *cls, const struct GNUNET_HashCode *key, void *value)
277 {
278   struct HandleReplyClosure *hrc = cls;
279   struct GSF_CadetRequest *sr = value;
280
281   sr->proc(sr->proc_cls,
282            hrc->type,
283            hrc->expiration,
284            hrc->data_size,
285            hrc->data);
286   sr->proc = NULL;
287   GSF_cadet_query_cancel(sr);
288   hrc->found = GNUNET_YES;
289   return GNUNET_YES;
290 }
291
292
293 /**
294  * Iterator called on each entry in a waiting map to
295  * call the 'proc' continuation and release associated
296  * resources.
297  *
298  * @param cls the `struct CadetHandle`
299  * @param key the key of the entry in the map (the query)
300  * @param value the `struct GSF_CadetRequest` to clean up
301  * @return #GNUNET_YES (continue to iterate)
302  */
303 static int
304 free_waiting_entry(void *cls, const struct GNUNET_HashCode *key, void *value)
305 {
306   struct GSF_CadetRequest *sr = value;
307
308   GSF_cadet_query_cancel(sr);
309   return GNUNET_YES;
310 }
311
312
313 /**
314  * Functions with this signature are called whenever a complete reply
315  * is received.
316  *
317  * @param cls closure with the `struct CadetHandle`
318  * @param srm the actual message
319  */
320 static void
321 handle_reply(void *cls, const struct CadetReplyMessage *srm)
322 {
323   struct CadetHandle *mh = cls;
324   struct HandleReplyClosure hrc;
325   uint16_t msize;
326   enum GNUNET_BLOCK_Type type;
327   struct GNUNET_HashCode query;
328
329   msize = ntohs(srm->header.size) - sizeof(struct CadetReplyMessage);
330   type = (enum GNUNET_BLOCK_Type)ntohl(srm->type);
331   if (GNUNET_YES !=
332       GNUNET_BLOCK_get_key(GSF_block_ctx, type, &srm[1], msize, &query))
333     {
334       GNUNET_break_op(0);
335       GNUNET_log(
336         GNUNET_ERROR_TYPE_WARNING,
337         "Received bogus reply of type %u with %u bytes via cadet from peer %s\n",
338         type,
339         msize,
340         GNUNET_i2s(&mh->target));
341       reset_cadet_async(mh);
342       return;
343     }
344   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
345              "Received reply `%s' via cadet from peer %s\n",
346              GNUNET_h2s(&query),
347              GNUNET_i2s(&mh->target));
348   GNUNET_CADET_receive_done(mh->channel);
349   GNUNET_STATISTICS_update(GSF_stats,
350                            gettext_noop("# replies received via cadet"),
351                            1,
352                            GNUNET_NO);
353   hrc.data = &srm[1];
354   hrc.data_size = msize;
355   hrc.expiration = GNUNET_TIME_absolute_ntoh(srm->expiration);
356   hrc.type = type;
357   hrc.found = GNUNET_NO;
358   GNUNET_CONTAINER_multihashmap_get_multiple(mh->waiting_map,
359                                              &query,
360                                              &process_reply,
361                                              &hrc);
362   if (GNUNET_NO == hrc.found)
363     {
364       GNUNET_STATISTICS_update(GSF_stats,
365                                gettext_noop(
366                                  "# replies received via cadet dropped"),
367                                1,
368                                GNUNET_NO);
369     }
370 }
371
372
373 /**
374  * Function called by cadet when a client disconnects.
375  * Cleans up our `struct CadetClient` of that channel.
376  *
377  * @param cls our `struct CadetClient`
378  * @param channel channel of the disconnecting client
379  */
380 static void
381 disconnect_cb(void *cls, const struct GNUNET_CADET_Channel *channel)
382 {
383   struct CadetHandle *mh = cls;
384   struct GSF_CadetRequest *sr;
385
386   if (NULL == mh->channel)
387     return; /* being destroyed elsewhere */
388   GNUNET_assert(channel == mh->channel);
389   mh->channel = NULL;
390   while (NULL != (sr = mh->pending_head))
391     GSF_cadet_query_cancel(sr);
392   /* first remove `mh` from the `cadet_map`, so that if the
393      callback from `free_waiting_entry()` happens to re-issue
394      the request, we don't immediately have it back in the
395      `waiting_map`. */
396   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multipeermap_remove(cadet_map,
397                                                                   &mh->target,
398                                                                   mh));
399   GNUNET_CONTAINER_multihashmap_iterate(mh->waiting_map,
400                                         &free_waiting_entry,
401                                         mh);
402   if (NULL != mh->timeout_task)
403     GNUNET_SCHEDULER_cancel(mh->timeout_task);
404   if (NULL != mh->reset_task)
405     GNUNET_SCHEDULER_cancel(mh->reset_task);
406   GNUNET_assert(0 == GNUNET_CONTAINER_multihashmap_size(mh->waiting_map));
407   GNUNET_CONTAINER_multihashmap_destroy(mh->waiting_map);
408   GNUNET_free(mh);
409 }
410
411
412 /**
413  * Function called whenever an MQ-channel's transmission window size changes.
414  *
415  * The first callback in an outgoing channel will be with a non-zero value
416  * and will mean the channel is connected to the destination.
417  *
418  * For an incoming channel it will be called immediately after the
419  * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
420  *
421  * @param cls Channel closure.
422  * @param channel Connection to the other end (henceforth invalid).
423  * @param window_size New window size. If the is more messages than buffer size
424  *                    this value will be negative..
425  */
426 static void
427 window_change_cb(void *cls,
428                  const struct GNUNET_CADET_Channel *channel,
429                  int window_size)
430 {
431   /* FIXME: for flow control, implement? */
432 #if 0
433   /* Something like this instead of the GNUNET_MQ_notify_sent() in
434      transmit_pending() might be good (once the window change CB works...) */
435   if (0 < window_size) /* test needed? */
436     transmit_pending(mh);
437 #endif
438 }
439
440
441 /**
442  * We had a serious error, tear down and re-create cadet from scratch.
443  *
444  * @param mh cadet to reset
445  */
446 static void
447 reset_cadet(struct CadetHandle *mh)
448 {
449   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
450              "Resetting cadet channel to %s\n",
451              GNUNET_i2s(&mh->target));
452   if (NULL != mh->channel)
453     {
454       GNUNET_CADET_channel_destroy(mh->channel);
455       mh->channel = NULL;
456     }
457   GNUNET_CONTAINER_multihashmap_iterate(mh->waiting_map, &move_to_pending, mh);
458   {
459     struct GNUNET_MQ_MessageHandler handlers[] =
460     { GNUNET_MQ_hd_var_size(reply,
461                             GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
462                             struct CadetReplyMessage,
463                             mh),
464       GNUNET_MQ_handler_end() };
465     struct GNUNET_HashCode port;
466
467     GNUNET_CRYPTO_hash(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
468                        strlen(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
469                        &port);
470     mh->channel = GNUNET_CADET_channel_create(cadet_handle,
471                                               mh,
472                                               &mh->target,
473                                               &port,
474                                               &window_change_cb,
475                                               &disconnect_cb,
476                                               handlers);
477   }
478   transmit_pending(mh);
479 }
480
481
482 /**
483  * Task called when it is time to destroy an inactive cadet channel.
484  *
485  * @param cls the `struct CadetHandle` to tear down
486  */
487 static void
488 cadet_timeout(void *cls)
489 {
490   struct CadetHandle *mh = cls;
491   struct GNUNET_CADET_Channel *tun;
492
493   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
494              "Timeout on cadet channel to %s\n",
495              GNUNET_i2s(&mh->target));
496   mh->timeout_task = NULL;
497   tun = mh->channel;
498   mh->channel = NULL;
499   if (NULL != tun)
500     GNUNET_CADET_channel_destroy(tun);
501 }
502
503
504 /**
505  * Task called when it is time to reset an cadet.
506  *
507  * @param cls the `struct CadetHandle` to tear down
508  */
509 static void
510 reset_cadet_task(void *cls)
511 {
512   struct CadetHandle *mh = cls;
513
514   mh->reset_task = NULL;
515   reset_cadet(mh);
516 }
517
518
519 /**
520  * Transmit pending requests via the cadet.
521  *
522  * @param cls `struct CadetHandle` to process
523  */
524 static void
525 transmit_pending(void *cls)
526 {
527   struct CadetHandle *mh = cls;
528   struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq(mh->channel);
529   struct GSF_CadetRequest *sr;
530   struct GNUNET_MQ_Envelope *env;
531   struct CadetQueryMessage *sqm;
532
533   if ((0 != GNUNET_MQ_get_length(mq)) || (NULL == (sr = mh->pending_head)))
534     return;
535   GNUNET_CONTAINER_DLL_remove(mh->pending_head, mh->pending_tail, sr);
536   GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(
537                   mh->waiting_map,
538                   &sr->query,
539                   sr,
540                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
541   sr->was_transmitted = GNUNET_YES;
542   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
543              "Sending query for %s via cadet to %s\n",
544              GNUNET_h2s(&sr->query),
545              GNUNET_i2s(&mh->target));
546   env = GNUNET_MQ_msg(sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
547   GNUNET_MQ_env_set_options(env,
548                             GNUNET_MQ_PREF_GOODPUT |
549                             GNUNET_MQ_PREF_CORK_ALLOWED |
550                             GNUNET_MQ_PREF_OUT_OF_ORDER);
551   sqm->type = htonl(sr->type);
552   sqm->query = sr->query;
553   GNUNET_MQ_notify_sent(env, &transmit_pending, mh);
554   GNUNET_MQ_send(mq, env);
555 }
556
557
558 /**
559  * Get (or create) a cadet to talk to the given peer.
560  *
561  * @param target peer we want to communicate with
562  */
563 static struct CadetHandle *
564 get_cadet(const struct GNUNET_PeerIdentity *target)
565 {
566   struct CadetHandle *mh;
567
568   mh = GNUNET_CONTAINER_multipeermap_get(cadet_map, target);
569   if (NULL != mh)
570     {
571       if (NULL != mh->timeout_task)
572         {
573           GNUNET_SCHEDULER_cancel(mh->timeout_task);
574           mh->timeout_task = NULL;
575         }
576       return mh;
577     }
578   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
579              "Creating cadet channel to %s\n",
580              GNUNET_i2s(target));
581   mh = GNUNET_new(struct CadetHandle);
582   mh->reset_task =
583     GNUNET_SCHEDULER_add_delayed(CLIENT_RETRY_TIMEOUT, &reset_cadet_task, mh);
584   mh->waiting_map = GNUNET_CONTAINER_multihashmap_create(16, GNUNET_YES);
585   mh->target = *target;
586   GNUNET_assert(GNUNET_OK ==
587                 GNUNET_CONTAINER_multipeermap_put(
588                   cadet_map,
589                   &mh->target,
590                   mh,
591                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
592   {
593     struct GNUNET_MQ_MessageHandler handlers[] =
594     { GNUNET_MQ_hd_var_size(reply,
595                             GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
596                             struct CadetReplyMessage,
597                             mh),
598       GNUNET_MQ_handler_end() };
599     struct GNUNET_HashCode port;
600
601     GNUNET_CRYPTO_hash(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
602                        strlen(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
603                        &port);
604     mh->channel = GNUNET_CADET_channel_create(cadet_handle,
605                                               mh,
606                                               &mh->target,
607                                               &port,
608                                               &window_change_cb,
609                                               &disconnect_cb,
610                                               handlers);
611   }
612   return mh;
613 }
614
615
616 /**
617  * Look for a block by directly contacting a particular peer.
618  *
619  * @param target peer that should have the block
620  * @param query hash to query for the block
621  * @param type desired type for the block
622  * @param proc function to call with result
623  * @param proc_cls closure for @a proc
624  * @return handle to cancel the operation
625  */
626 struct GSF_CadetRequest *
627 GSF_cadet_query(const struct GNUNET_PeerIdentity *target,
628                 const struct GNUNET_HashCode *query,
629                 enum GNUNET_BLOCK_Type type,
630                 GSF_CadetReplyProcessor proc,
631                 void *proc_cls)
632 {
633   struct CadetHandle *mh;
634   struct GSF_CadetRequest *sr;
635
636   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
637              "Preparing to send query for %s via cadet to %s\n",
638              GNUNET_h2s(query),
639              GNUNET_i2s(target));
640   mh = get_cadet(target);
641   sr = GNUNET_new(struct GSF_CadetRequest);
642   sr->mh = mh;
643   sr->proc = proc;
644   sr->proc_cls = proc_cls;
645   sr->type = type;
646   sr->query = *query;
647   GNUNET_CONTAINER_DLL_insert(mh->pending_head, mh->pending_tail, sr);
648   transmit_pending(mh);
649   return sr;
650 }
651
652
653 /**
654  * Cancel an active request; must not be called after 'proc'
655  * was calld.
656  *
657  * @param sr request to cancel
658  */
659 void
660 GSF_cadet_query_cancel(struct GSF_CadetRequest *sr)
661 {
662   struct CadetHandle *mh = sr->mh;
663   GSF_CadetReplyProcessor p;
664
665   p = sr->proc;
666   sr->proc = NULL;
667   if (NULL != p)
668     {
669       /* signal failure / cancellation to callback */
670       p(sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, GNUNET_TIME_UNIT_ZERO_ABS, 0, NULL);
671     }
672   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
673              "Cancelled query for %s via cadet to %s\n",
674              GNUNET_h2s(&sr->query),
675              GNUNET_i2s(&sr->mh->target));
676   if (GNUNET_YES == sr->was_transmitted)
677     GNUNET_assert(
678       GNUNET_OK ==
679       GNUNET_CONTAINER_multihashmap_remove(mh->waiting_map, &sr->query, sr));
680   else
681     GNUNET_CONTAINER_DLL_remove(mh->pending_head, mh->pending_tail, sr);
682   GNUNET_free(sr);
683   if ((0 == GNUNET_CONTAINER_multihashmap_size(mh->waiting_map)) &&
684       (NULL == mh->pending_head))
685     mh->timeout_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_SECONDS,
686                                                     &cadet_timeout,
687                                                     mh);
688 }
689
690
691 /**
692  * Function called on each active cadets to shut them down.
693  *
694  * @param cls NULL
695  * @param key target peer, unused
696  * @param value the `struct CadetHandle` to destroy
697  * @return #GNUNET_YES (continue to iterate)
698  */
699 int
700 GSF_cadet_release_clients(void *cls,
701                           const struct GNUNET_PeerIdentity *key,
702                           void *value)
703 {
704   struct CadetHandle *mh = value;
705
706   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
707              "Timeout on cadet channel to %s\n",
708              GNUNET_i2s(&mh->target));
709   if (NULL != mh->channel)
710     {
711       struct GNUNET_CADET_Channel *channel = mh->channel;
712
713       mh->channel = NULL;
714       GNUNET_CADET_channel_destroy(channel);
715     }
716   if (NULL != mh->reset_task)
717     {
718       GNUNET_SCHEDULER_cancel(mh->reset_task);
719       mh->reset_task = NULL;
720     }
721   return GNUNET_YES;
722 }
723
724
725 /* end of gnunet-service-fs_cadet_client.c */