2 This file is part of GNUnet.
3 Copyright (C) 2012, 2013 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file fs/gnunet-service-fs_cadet_client.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 * - PORT is set to old application type, unsure if we should keep
28 * it that way (fine for now)
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_cadet_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_cadet.h"
42 * After how long do we reset connections without replies?
44 #define CLIENT_RETRY_TIMEOUT \
45 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
49 * Handle for a cadet to another peer.
55 * Handle for a request that is going out via cadet API.
57 struct GSF_CadetRequest {
61 struct GSF_CadetRequest *next;
66 struct GSF_CadetRequest *prev;
69 * Which cadet is this request associated with?
71 struct CadetHandle *mh;
74 * Function to call with the result.
76 GSF_CadetReplyProcessor proc;
84 * Query to transmit to the other peer.
86 struct GNUNET_HashCode query;
89 * Desired type for the reply.
91 enum GNUNET_BLOCK_Type type;
94 * Did we transmit this request already? #GNUNET_YES if we are
95 * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL.
102 * Handle for a cadet to another peer.
106 * Head of DLL of pending requests on this cadet.
108 struct GSF_CadetRequest *pending_head;
111 * Tail of DLL of pending requests on this cadet.
113 struct GSF_CadetRequest *pending_tail;
116 * Map from query to `struct GSF_CadetRequest`s waiting for
119 struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
122 * Channel to the other peer.
124 struct GNUNET_CADET_Channel *channel;
127 * Which peer does this cadet go to?
129 struct GNUNET_PeerIdentity target;
132 * Task to kill inactive cadets (we keep them around for
133 * a few seconds to give the application a chance to give
136 struct GNUNET_SCHEDULER_Task *timeout_task;
139 * Task to reset cadets that had errors (asynchronously,
140 * as we may not be able to do it immediately during a
141 * callback from the cadet API).
143 struct GNUNET_SCHEDULER_Task *reset_task;
148 * Cadet channel for creating outbound channels.
150 struct GNUNET_CADET_Handle *cadet_handle;
153 * Map from peer identities to 'struct CadetHandles' with cadet
154 * channels to those peers.
156 struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
159 /* ********************* client-side code ************************* */
163 * Transmit pending requests via the cadet.
165 * @param cls `struct CadetHandle` to process
168 transmit_pending(void *cls);
172 * Iterator called on each entry in a waiting map to
173 * move it back to the pending list.
175 * @param cls the `struct CadetHandle`
176 * @param key the key of the entry in the map (the query)
177 * @param value the `struct GSF_CadetRequest` to move to pending
178 * @return #GNUNET_YES (continue to iterate)
181 move_to_pending(void *cls, const struct GNUNET_HashCode *key, void *value)
183 struct CadetHandle *mh = cls;
184 struct GSF_CadetRequest *sr = value;
188 GNUNET_CONTAINER_multihashmap_remove(mh->waiting_map, key, value));
189 GNUNET_CONTAINER_DLL_insert(mh->pending_head, mh->pending_tail, sr);
190 sr->was_transmitted = GNUNET_NO;
196 * Functions with this signature are called whenever a complete reply
199 * @param cls closure with the `struct CadetHandle`
200 * @param srm the actual message
201 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
204 check_reply(void *cls, const struct CadetReplyMessage *srm)
206 /* We check later... */
212 * Task called when it is time to reset an cadet.
214 * @param cls the `struct CadetHandle` to tear down
217 reset_cadet_task(void *cls);
221 * We had a serious error, tear down and re-create cadet from scratch,
222 * but do so asynchronously.
224 * @param mh cadet to reset
227 reset_cadet_async(struct CadetHandle *mh)
229 if (NULL != mh->reset_task)
230 GNUNET_SCHEDULER_cancel(mh->reset_task);
231 mh->reset_task = GNUNET_SCHEDULER_add_now(&reset_cadet_task, mh);
236 * Closure for handle_reply().
238 struct HandleReplyClosure {
245 * Expiration time for the block.
247 struct GNUNET_TIME_Absolute expiration;
250 * Number of bytes in @e data.
257 enum GNUNET_BLOCK_Type type;
260 * Did we have a matching query?
267 * Iterator called on each entry in a waiting map to
270 * @param cls the `struct HandleReplyClosure`
271 * @param key the key of the entry in the map (the query)
272 * @param value the `struct GSF_CadetRequest` to handle result for
273 * @return #GNUNET_YES (continue to iterate)
276 process_reply(void *cls, const struct GNUNET_HashCode *key, void *value)
278 struct HandleReplyClosure *hrc = cls;
279 struct GSF_CadetRequest *sr = value;
281 sr->proc(sr->proc_cls,
287 GSF_cadet_query_cancel(sr);
288 hrc->found = GNUNET_YES;
294 * Iterator called on each entry in a waiting map to
295 * call the 'proc' continuation and release associated
298 * @param cls the `struct CadetHandle`
299 * @param key the key of the entry in the map (the query)
300 * @param value the `struct GSF_CadetRequest` to clean up
301 * @return #GNUNET_YES (continue to iterate)
304 free_waiting_entry(void *cls, const struct GNUNET_HashCode *key, void *value)
306 struct GSF_CadetRequest *sr = value;
308 GSF_cadet_query_cancel(sr);
314 * Functions with this signature are called whenever a complete reply
317 * @param cls closure with the `struct CadetHandle`
318 * @param srm the actual message
321 handle_reply(void *cls, const struct CadetReplyMessage *srm)
323 struct CadetHandle *mh = cls;
324 struct HandleReplyClosure hrc;
326 enum GNUNET_BLOCK_Type type;
327 struct GNUNET_HashCode query;
329 msize = ntohs(srm->header.size) - sizeof(struct CadetReplyMessage);
330 type = (enum GNUNET_BLOCK_Type)ntohl(srm->type);
332 GNUNET_BLOCK_get_key(GSF_block_ctx, type, &srm[1], msize, &query))
336 GNUNET_ERROR_TYPE_WARNING,
337 "Received bogus reply of type %u with %u bytes via cadet from peer %s\n",
340 GNUNET_i2s(&mh->target));
341 reset_cadet_async(mh);
344 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
345 "Received reply `%s' via cadet from peer %s\n",
347 GNUNET_i2s(&mh->target));
348 GNUNET_CADET_receive_done(mh->channel);
349 GNUNET_STATISTICS_update(GSF_stats,
350 gettext_noop("# replies received via cadet"),
354 hrc.data_size = msize;
355 hrc.expiration = GNUNET_TIME_absolute_ntoh(srm->expiration);
357 hrc.found = GNUNET_NO;
358 GNUNET_CONTAINER_multihashmap_get_multiple(mh->waiting_map,
362 if (GNUNET_NO == hrc.found)
364 GNUNET_STATISTICS_update(GSF_stats,
366 "# replies received via cadet dropped"),
374 * Function called by cadet when a client disconnects.
375 * Cleans up our `struct CadetClient` of that channel.
377 * @param cls our `struct CadetClient`
378 * @param channel channel of the disconnecting client
381 disconnect_cb(void *cls, const struct GNUNET_CADET_Channel *channel)
383 struct CadetHandle *mh = cls;
384 struct GSF_CadetRequest *sr;
386 if (NULL == mh->channel)
387 return; /* being destroyed elsewhere */
388 GNUNET_assert(channel == mh->channel);
390 while (NULL != (sr = mh->pending_head))
391 GSF_cadet_query_cancel(sr);
392 /* first remove `mh` from the `cadet_map`, so that if the
393 callback from `free_waiting_entry()` happens to re-issue
394 the request, we don't immediately have it back in the
396 GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multipeermap_remove(cadet_map,
399 GNUNET_CONTAINER_multihashmap_iterate(mh->waiting_map,
402 if (NULL != mh->timeout_task)
403 GNUNET_SCHEDULER_cancel(mh->timeout_task);
404 if (NULL != mh->reset_task)
405 GNUNET_SCHEDULER_cancel(mh->reset_task);
406 GNUNET_assert(0 == GNUNET_CONTAINER_multihashmap_size(mh->waiting_map));
407 GNUNET_CONTAINER_multihashmap_destroy(mh->waiting_map);
413 * Function called whenever an MQ-channel's transmission window size changes.
415 * The first callback in an outgoing channel will be with a non-zero value
416 * and will mean the channel is connected to the destination.
418 * For an incoming channel it will be called immediately after the
419 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
421 * @param cls Channel closure.
422 * @param channel Connection to the other end (henceforth invalid).
423 * @param window_size New window size. If the is more messages than buffer size
424 * this value will be negative..
427 window_change_cb(void *cls,
428 const struct GNUNET_CADET_Channel *channel,
431 /* FIXME: for flow control, implement? */
433 /* Something like this instead of the GNUNET_MQ_notify_sent() in
434 transmit_pending() might be good (once the window change CB works...) */
435 if (0 < window_size) /* test needed? */
436 transmit_pending(mh);
442 * We had a serious error, tear down and re-create cadet from scratch.
444 * @param mh cadet to reset
447 reset_cadet(struct CadetHandle *mh)
449 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
450 "Resetting cadet channel to %s\n",
451 GNUNET_i2s(&mh->target));
452 if (NULL != mh->channel)
454 GNUNET_CADET_channel_destroy(mh->channel);
457 GNUNET_CONTAINER_multihashmap_iterate(mh->waiting_map, &move_to_pending, mh);
459 struct GNUNET_MQ_MessageHandler handlers[] =
460 { GNUNET_MQ_hd_var_size(reply,
461 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
462 struct CadetReplyMessage,
464 GNUNET_MQ_handler_end() };
465 struct GNUNET_HashCode port;
467 GNUNET_CRYPTO_hash(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
468 strlen(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
470 mh->channel = GNUNET_CADET_channel_create(cadet_handle,
478 transmit_pending(mh);
483 * Task called when it is time to destroy an inactive cadet channel.
485 * @param cls the `struct CadetHandle` to tear down
488 cadet_timeout(void *cls)
490 struct CadetHandle *mh = cls;
491 struct GNUNET_CADET_Channel *tun;
493 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
494 "Timeout on cadet channel to %s\n",
495 GNUNET_i2s(&mh->target));
496 mh->timeout_task = NULL;
500 GNUNET_CADET_channel_destroy(tun);
505 * Task called when it is time to reset an cadet.
507 * @param cls the `struct CadetHandle` to tear down
510 reset_cadet_task(void *cls)
512 struct CadetHandle *mh = cls;
514 mh->reset_task = NULL;
520 * Transmit pending requests via the cadet.
522 * @param cls `struct CadetHandle` to process
525 transmit_pending(void *cls)
527 struct CadetHandle *mh = cls;
528 struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq(mh->channel);
529 struct GSF_CadetRequest *sr;
530 struct GNUNET_MQ_Envelope *env;
531 struct CadetQueryMessage *sqm;
533 if ((0 != GNUNET_MQ_get_length(mq)) || (NULL == (sr = mh->pending_head)))
535 GNUNET_CONTAINER_DLL_remove(mh->pending_head, mh->pending_tail, sr);
536 GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(
540 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
541 sr->was_transmitted = GNUNET_YES;
542 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
543 "Sending query for %s via cadet to %s\n",
544 GNUNET_h2s(&sr->query),
545 GNUNET_i2s(&mh->target));
546 env = GNUNET_MQ_msg(sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
547 GNUNET_MQ_env_set_options(env,
548 GNUNET_MQ_PREF_GOODPUT |
549 GNUNET_MQ_PREF_CORK_ALLOWED |
550 GNUNET_MQ_PREF_OUT_OF_ORDER);
551 sqm->type = htonl(sr->type);
552 sqm->query = sr->query;
553 GNUNET_MQ_notify_sent(env, &transmit_pending, mh);
554 GNUNET_MQ_send(mq, env);
559 * Get (or create) a cadet to talk to the given peer.
561 * @param target peer we want to communicate with
563 static struct CadetHandle *
564 get_cadet(const struct GNUNET_PeerIdentity *target)
566 struct CadetHandle *mh;
568 mh = GNUNET_CONTAINER_multipeermap_get(cadet_map, target);
571 if (NULL != mh->timeout_task)
573 GNUNET_SCHEDULER_cancel(mh->timeout_task);
574 mh->timeout_task = NULL;
578 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
579 "Creating cadet channel to %s\n",
581 mh = GNUNET_new(struct CadetHandle);
583 GNUNET_SCHEDULER_add_delayed(CLIENT_RETRY_TIMEOUT, &reset_cadet_task, mh);
584 mh->waiting_map = GNUNET_CONTAINER_multihashmap_create(16, GNUNET_YES);
585 mh->target = *target;
586 GNUNET_assert(GNUNET_OK ==
587 GNUNET_CONTAINER_multipeermap_put(
591 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
593 struct GNUNET_MQ_MessageHandler handlers[] =
594 { GNUNET_MQ_hd_var_size(reply,
595 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
596 struct CadetReplyMessage,
598 GNUNET_MQ_handler_end() };
599 struct GNUNET_HashCode port;
601 GNUNET_CRYPTO_hash(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
602 strlen(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
604 mh->channel = GNUNET_CADET_channel_create(cadet_handle,
617 * Look for a block by directly contacting a particular peer.
619 * @param target peer that should have the block
620 * @param query hash to query for the block
621 * @param type desired type for the block
622 * @param proc function to call with result
623 * @param proc_cls closure for @a proc
624 * @return handle to cancel the operation
626 struct GSF_CadetRequest *
627 GSF_cadet_query(const struct GNUNET_PeerIdentity *target,
628 const struct GNUNET_HashCode *query,
629 enum GNUNET_BLOCK_Type type,
630 GSF_CadetReplyProcessor proc,
633 struct CadetHandle *mh;
634 struct GSF_CadetRequest *sr;
636 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
637 "Preparing to send query for %s via cadet to %s\n",
640 mh = get_cadet(target);
641 sr = GNUNET_new(struct GSF_CadetRequest);
644 sr->proc_cls = proc_cls;
647 GNUNET_CONTAINER_DLL_insert(mh->pending_head, mh->pending_tail, sr);
648 transmit_pending(mh);
654 * Cancel an active request; must not be called after 'proc'
657 * @param sr request to cancel
660 GSF_cadet_query_cancel(struct GSF_CadetRequest *sr)
662 struct CadetHandle *mh = sr->mh;
663 GSF_CadetReplyProcessor p;
669 /* signal failure / cancellation to callback */
670 p(sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, GNUNET_TIME_UNIT_ZERO_ABS, 0, NULL);
672 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
673 "Cancelled query for %s via cadet to %s\n",
674 GNUNET_h2s(&sr->query),
675 GNUNET_i2s(&sr->mh->target));
676 if (GNUNET_YES == sr->was_transmitted)
679 GNUNET_CONTAINER_multihashmap_remove(mh->waiting_map, &sr->query, sr));
681 GNUNET_CONTAINER_DLL_remove(mh->pending_head, mh->pending_tail, sr);
683 if ((0 == GNUNET_CONTAINER_multihashmap_size(mh->waiting_map)) &&
684 (NULL == mh->pending_head))
685 mh->timeout_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_SECONDS,
692 * Function called on each active cadets to shut them down.
695 * @param key target peer, unused
696 * @param value the `struct CadetHandle` to destroy
697 * @return #GNUNET_YES (continue to iterate)
700 GSF_cadet_release_clients(void *cls,
701 const struct GNUNET_PeerIdentity *key,
704 struct CadetHandle *mh = value;
706 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
707 "Timeout on cadet channel to %s\n",
708 GNUNET_i2s(&mh->target));
709 if (NULL != mh->channel)
711 struct GNUNET_CADET_Channel *channel = mh->channel;
714 GNUNET_CADET_channel_destroy(channel);
716 if (NULL != mh->reset_task)
718 GNUNET_SCHEDULER_cancel(mh->reset_task);
719 mh->reset_task = NULL;
725 /* end of gnunet-service-fs_cadet_client.c */