2 This file is part of GNUnet.
3 Copyright (C) 2012, 2013 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file fs/gnunet-service-fs_cadet_client.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 * - PORT is set to old application type, unsure if we should keep
28 * it that way (fine for now)
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_cadet_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_cadet.h"
42 * After how long do we reset connections without replies?
44 #define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
48 * Handle for a cadet to another peer.
54 * Handle for a request that is going out via cadet API.
56 struct GSF_CadetRequest
62 struct GSF_CadetRequest *next;
67 struct GSF_CadetRequest *prev;
70 * Which cadet is this request associated with?
72 struct CadetHandle *mh;
75 * Function to call with the result.
77 GSF_CadetReplyProcessor proc;
85 * Query to transmit to the other peer.
87 struct GNUNET_HashCode query;
90 * Desired type for the reply.
92 enum GNUNET_BLOCK_Type type;
95 * Did we transmit this request already? #GNUNET_YES if we are
96 * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL.
103 * Handle for a cadet to another peer.
108 * Head of DLL of pending requests on this cadet.
110 struct GSF_CadetRequest *pending_head;
113 * Tail of DLL of pending requests on this cadet.
115 struct GSF_CadetRequest *pending_tail;
118 * Map from query to `struct GSF_CadetRequest`s waiting for
121 struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
124 * Channel to the other peer.
126 struct GNUNET_CADET_Channel *channel;
129 * Which peer does this cadet go to?
131 struct GNUNET_PeerIdentity target;
134 * Task to kill inactive cadets (we keep them around for
135 * a few seconds to give the application a chance to give
138 struct GNUNET_SCHEDULER_Task *timeout_task;
141 * Task to reset cadets that had errors (asynchronously,
142 * as we may not be able to do it immediately during a
143 * callback from the cadet API).
145 struct GNUNET_SCHEDULER_Task *reset_task;
151 * Cadet channel for creating outbound channels.
153 struct GNUNET_CADET_Handle *cadet_handle;
156 * Map from peer identities to 'struct CadetHandles' with cadet
157 * channels to those peers.
159 struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
162 /* ********************* client-side code ************************* */
166 * Transmit pending requests via the cadet.
168 * @param cls `struct CadetHandle` to process
171 transmit_pending (void *cls);
175 * Iterator called on each entry in a waiting map to
176 * move it back to the pending list.
178 * @param cls the `struct CadetHandle`
179 * @param key the key of the entry in the map (the query)
180 * @param value the `struct GSF_CadetRequest` to move to pending
181 * @return #GNUNET_YES (continue to iterate)
184 move_to_pending (void *cls,
185 const struct GNUNET_HashCode *key,
188 struct CadetHandle *mh = cls;
189 struct GSF_CadetRequest *sr = value;
191 GNUNET_assert (GNUNET_YES ==
192 GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
195 GNUNET_CONTAINER_DLL_insert (mh->pending_head,
198 sr->was_transmitted = GNUNET_NO;
204 * Functions with this signature are called whenever a complete reply
207 * @param cls closure with the `struct CadetHandle`
208 * @param srm the actual message
209 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
212 check_reply (void *cls,
213 const struct CadetReplyMessage *srm)
215 /* We check later... */
221 * Task called when it is time to reset an cadet.
223 * @param cls the `struct CadetHandle` to tear down
226 reset_cadet_task (void *cls);
230 * We had a serious error, tear down and re-create cadet from scratch,
231 * but do so asynchronously.
233 * @param mh cadet to reset
236 reset_cadet_async (struct CadetHandle *mh)
238 if (NULL != mh->reset_task)
239 GNUNET_SCHEDULER_cancel (mh->reset_task);
240 mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task,
246 * Closure for handle_reply().
248 struct HandleReplyClosure
257 * Expiration time for the block.
259 struct GNUNET_TIME_Absolute expiration;
262 * Number of bytes in @e data.
269 enum GNUNET_BLOCK_Type type;
272 * Did we have a matching query?
279 * Iterator called on each entry in a waiting map to
282 * @param cls the `struct HandleReplyClosure`
283 * @param key the key of the entry in the map (the query)
284 * @param value the `struct GSF_CadetRequest` to handle result for
285 * @return #GNUNET_YES (continue to iterate)
288 process_reply (void *cls,
289 const struct GNUNET_HashCode *key,
292 struct HandleReplyClosure *hrc = cls;
293 struct GSF_CadetRequest *sr = value;
295 sr->proc (sr->proc_cls,
301 GSF_cadet_query_cancel (sr);
302 hrc->found = GNUNET_YES;
308 * Iterator called on each entry in a waiting map to
309 * call the 'proc' continuation and release associated
312 * @param cls the `struct CadetHandle`
313 * @param key the key of the entry in the map (the query)
314 * @param value the `struct GSF_CadetRequest` to clean up
315 * @return #GNUNET_YES (continue to iterate)
318 free_waiting_entry (void *cls,
319 const struct GNUNET_HashCode *key,
322 struct GSF_CadetRequest *sr = value;
324 GSF_cadet_query_cancel (sr);
330 * Functions with this signature are called whenever a complete reply
333 * @param cls closure with the `struct CadetHandle`
334 * @param srm the actual message
337 handle_reply (void *cls,
338 const struct CadetReplyMessage *srm)
340 struct CadetHandle *mh = cls;
341 struct HandleReplyClosure hrc;
343 enum GNUNET_BLOCK_Type type;
344 struct GNUNET_HashCode query;
346 msize = ntohs (srm->header.size) - sizeof (struct CadetReplyMessage);
347 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
349 GNUNET_BLOCK_get_key (GSF_block_ctx,
356 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
357 "Received bogus reply of type %u with %u bytes via cadet from peer %s\n",
360 GNUNET_i2s (&mh->target));
361 reset_cadet_async (mh);
364 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
365 "Received reply `%s' via cadet from peer %s\n",
367 GNUNET_i2s (&mh->target));
368 GNUNET_CADET_receive_done (mh->channel);
369 GNUNET_STATISTICS_update (GSF_stats,
370 gettext_noop ("# replies received via cadet"), 1,
373 hrc.data_size = msize;
374 hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
376 hrc.found = GNUNET_NO;
377 GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
381 if (GNUNET_NO == hrc.found)
383 GNUNET_STATISTICS_update (GSF_stats,
384 gettext_noop ("# replies received via cadet dropped"), 1,
391 * Function called by cadet when a client disconnects.
392 * Cleans up our `struct CadetClient` of that channel.
394 * @param cls our `struct CadetClient`
395 * @param channel channel of the disconnecting client
398 disconnect_cb (void *cls,
399 const struct GNUNET_CADET_Channel *channel)
401 struct CadetHandle *mh = cls;
402 struct GSF_CadetRequest *sr;
404 if (NULL == mh->channel)
405 return; /* being destroyed elsewhere */
406 GNUNET_assert (channel == mh->channel);
408 while (NULL != (sr = mh->pending_head))
409 GSF_cadet_query_cancel (sr);
410 /* first remove `mh` from the `cadet_map`, so that if the
411 callback from `free_waiting_entry()` happens to re-issue
412 the request, we don't immediately have it back in the
414 GNUNET_assert (GNUNET_OK ==
415 GNUNET_CONTAINER_multipeermap_remove (cadet_map,
418 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
421 if (NULL != mh->timeout_task)
422 GNUNET_SCHEDULER_cancel (mh->timeout_task);
423 if (NULL != mh->reset_task)
424 GNUNET_SCHEDULER_cancel (mh->reset_task);
426 GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
427 GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
433 * Function called whenever an MQ-channel's transmission window size changes.
435 * The first callback in an outgoing channel will be with a non-zero value
436 * and will mean the channel is connected to the destination.
438 * For an incoming channel it will be called immediately after the
439 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
441 * @param cls Channel closure.
442 * @param channel Connection to the other end (henceforth invalid).
443 * @param window_size New window size. If the is more messages than buffer size
444 * this value will be negative..
447 window_change_cb (void *cls,
448 const struct GNUNET_CADET_Channel *channel,
451 /* FIXME: for flow control, implement? */
453 /* Something like this instead of the GNUNET_MQ_notify_sent() in
454 transmit_pending() might be good (once the window change CB works...) */
455 if (0 < window_size) /* test needed? */
456 transmit_pending (mh);
462 * We had a serious error, tear down and re-create cadet from scratch.
464 * @param mh cadet to reset
467 reset_cadet (struct CadetHandle *mh)
469 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
470 "Resetting cadet channel to %s\n",
471 GNUNET_i2s (&mh->target));
472 GNUNET_CADET_channel_destroy (mh->channel);
474 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
478 struct GNUNET_MQ_MessageHandler handlers[] = {
479 GNUNET_MQ_hd_var_size (reply,
480 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
481 struct CadetReplyMessage,
483 GNUNET_MQ_handler_end ()
485 struct GNUNET_HashCode port;
487 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
488 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
490 mh->channel = GNUNET_CADET_channel_create (cadet_handle,
494 GNUNET_CADET_OPTION_RELIABLE,
499 transmit_pending (mh);
504 * Task called when it is time to destroy an inactive cadet channel.
506 * @param cls the `struct CadetHandle` to tear down
509 cadet_timeout (void *cls)
511 struct CadetHandle *mh = cls;
512 struct GNUNET_CADET_Channel *tun;
514 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
515 "Timeout on cadet channel to %s\n",
516 GNUNET_i2s (&mh->target));
517 mh->timeout_task = NULL;
521 GNUNET_CADET_channel_destroy (tun);
526 * Task called when it is time to reset an cadet.
528 * @param cls the `struct CadetHandle` to tear down
531 reset_cadet_task (void *cls)
533 struct CadetHandle *mh = cls;
535 mh->reset_task = NULL;
541 * Transmit pending requests via the cadet.
543 * @param cls `struct CadetHandle` to process
546 transmit_pending (void *cls)
548 struct CadetHandle *mh = cls;
549 struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel);
550 struct GSF_CadetRequest *sr;
551 struct GNUNET_MQ_Envelope *env;
552 struct CadetQueryMessage *sqm;
554 if ( (0 != GNUNET_MQ_get_length (mq)) ||
555 (NULL == (sr = mh->pending_head)) )
557 GNUNET_CONTAINER_DLL_remove (mh->pending_head,
560 GNUNET_assert (GNUNET_OK ==
561 GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
564 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
565 sr->was_transmitted = GNUNET_YES;
566 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
567 "Sending query for %s via cadet to %s\n",
568 GNUNET_h2s (&sr->query),
569 GNUNET_i2s (&mh->target));
570 env = GNUNET_MQ_msg (sqm,
571 GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
572 sqm->type = htonl (sr->type);
573 sqm->query = sr->query;
574 GNUNET_MQ_notify_sent (env,
583 * Get (or create) a cadet to talk to the given peer.
585 * @param target peer we want to communicate with
587 static struct CadetHandle *
588 get_cadet (const struct GNUNET_PeerIdentity *target)
590 struct CadetHandle *mh;
592 mh = GNUNET_CONTAINER_multipeermap_get (cadet_map,
596 if (NULL != mh->timeout_task)
598 GNUNET_SCHEDULER_cancel (mh->timeout_task);
599 mh->timeout_task = NULL;
603 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
604 "Creating cadet channel to %s\n",
605 GNUNET_i2s (target));
606 mh = GNUNET_new (struct CadetHandle);
607 mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
610 mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
611 mh->target = *target;
612 GNUNET_assert (GNUNET_OK ==
613 GNUNET_CONTAINER_multipeermap_put (cadet_map,
616 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
618 struct GNUNET_MQ_MessageHandler handlers[] = {
619 GNUNET_MQ_hd_var_size (reply,
620 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
621 struct CadetReplyMessage,
623 GNUNET_MQ_handler_end ()
625 struct GNUNET_HashCode port;
627 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
628 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
630 mh->channel = GNUNET_CADET_channel_create (cadet_handle,
634 GNUNET_CADET_OPTION_RELIABLE,
644 * Look for a block by directly contacting a particular peer.
646 * @param target peer that should have the block
647 * @param query hash to query for the block
648 * @param type desired type for the block
649 * @param proc function to call with result
650 * @param proc_cls closure for @a proc
651 * @return handle to cancel the operation
653 struct GSF_CadetRequest *
654 GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
655 const struct GNUNET_HashCode *query,
656 enum GNUNET_BLOCK_Type type,
657 GSF_CadetReplyProcessor proc,
660 struct CadetHandle *mh;
661 struct GSF_CadetRequest *sr;
663 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
664 "Preparing to send query for %s via cadet to %s\n",
666 GNUNET_i2s (target));
667 mh = get_cadet (target);
668 sr = GNUNET_new (struct GSF_CadetRequest);
671 sr->proc_cls = proc_cls;
674 GNUNET_CONTAINER_DLL_insert (mh->pending_head,
677 transmit_pending (mh);
683 * Cancel an active request; must not be called after 'proc'
686 * @param sr request to cancel
689 GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
691 struct CadetHandle *mh = sr->mh;
692 GSF_CadetReplyProcessor p;
698 /* signal failure / cancellation to callback */
699 p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
700 GNUNET_TIME_UNIT_ZERO_ABS,
703 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
704 "Cancelled query for %s via cadet to %s\n",
705 GNUNET_h2s (&sr->query),
706 GNUNET_i2s (&sr->mh->target));
707 if (GNUNET_YES == sr->was_transmitted)
708 GNUNET_assert (GNUNET_OK ==
709 GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
713 GNUNET_CONTAINER_DLL_remove (mh->pending_head,
717 if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
718 (NULL == mh->pending_head) )
719 mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
726 * Function called on each active cadets to shut them down.
729 * @param key target peer, unused
730 * @param value the `struct CadetHandle` to destroy
731 * @return #GNUNET_YES (continue to iterate)
734 GSF_cadet_release_clients (void *cls,
735 const struct GNUNET_PeerIdentity *key,
738 struct CadetHandle *mh = value;
740 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
741 "Timeout on cadet channel to %s\n",
742 GNUNET_i2s (&mh->target));
743 if (NULL != mh->channel)
744 GNUNET_CADET_channel_destroy (mh->channel);
750 /* end of gnunet-service-fs_cadet_client.c */