2 This file is part of GNUnet.
3 Copyright (C) 2012, 2013 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * @file fs/gnunet-service-fs_cadet_client.c
21 * @brief non-anonymous file-transfer
22 * @author Christian Grothoff
25 * - PORT is set to old application type, unsure if we should keep
26 * it that way (fine for now)
29 #include "gnunet_constants.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_applications.h"
34 #include "gnunet-service-fs.h"
35 #include "gnunet-service-fs_indexing.h"
36 #include "gnunet-service-fs_cadet.h"
40 * After how long do we reset connections without replies?
42 #define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
46 * Handle for a cadet to another peer.
52 * Handle for a request that is going out via cadet API.
54 struct GSF_CadetRequest
60 struct GSF_CadetRequest *next;
65 struct GSF_CadetRequest *prev;
68 * Which cadet is this request associated with?
70 struct CadetHandle *mh;
73 * Function to call with the result.
75 GSF_CadetReplyProcessor proc;
83 * Query to transmit to the other peer.
85 struct GNUNET_HashCode query;
88 * Desired type for the reply.
90 enum GNUNET_BLOCK_Type type;
93 * Did we transmit this request already? #GNUNET_YES if we are
94 * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL.
101 * Handle for a cadet to another peer.
106 * Head of DLL of pending requests on this cadet.
108 struct GSF_CadetRequest *pending_head;
111 * Tail of DLL of pending requests on this cadet.
113 struct GSF_CadetRequest *pending_tail;
116 * Map from query to `struct GSF_CadetRequest`s waiting for
119 struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
122 * Channel to the other peer.
124 struct GNUNET_CADET_Channel *channel;
127 * Which peer does this cadet go to?
129 struct GNUNET_PeerIdentity target;
132 * Task to kill inactive cadets (we keep them around for
133 * a few seconds to give the application a chance to give
136 struct GNUNET_SCHEDULER_Task *timeout_task;
139 * Task to reset cadets that had errors (asynchronously,
140 * as we may not be able to do it immediately during a
141 * callback from the cadet API).
143 struct GNUNET_SCHEDULER_Task *reset_task;
149 * Cadet channel for creating outbound channels.
151 struct GNUNET_CADET_Handle *cadet_handle;
154 * Map from peer identities to 'struct CadetHandles' with cadet
155 * channels to those peers.
157 struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
160 /* ********************* client-side code ************************* */
164 * Transmit pending requests via the cadet.
166 * @param cls `struct CadetHandle` to process
169 transmit_pending (void *cls);
173 * Iterator called on each entry in a waiting map to
174 * move it back to the pending list.
176 * @param cls the `struct CadetHandle`
177 * @param key the key of the entry in the map (the query)
178 * @param value the `struct GSF_CadetRequest` to move to pending
179 * @return #GNUNET_YES (continue to iterate)
182 move_to_pending (void *cls,
183 const struct GNUNET_HashCode *key,
186 struct CadetHandle *mh = cls;
187 struct GSF_CadetRequest *sr = value;
189 GNUNET_assert (GNUNET_YES ==
190 GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
193 GNUNET_CONTAINER_DLL_insert (mh->pending_head,
196 sr->was_transmitted = GNUNET_NO;
202 * Functions with this signature are called whenever a complete reply
205 * @param cls closure with the `struct CadetHandle`
206 * @param srm the actual message
207 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
210 check_reply (void *cls,
211 const struct CadetReplyMessage *srm)
213 /* We check later... */
219 * Task called when it is time to reset an cadet.
221 * @param cls the `struct CadetHandle` to tear down
224 reset_cadet_task (void *cls);
228 * We had a serious error, tear down and re-create cadet from scratch,
229 * but do so asynchronously.
231 * @param mh cadet to reset
234 reset_cadet_async (struct CadetHandle *mh)
236 if (NULL != mh->reset_task)
237 GNUNET_SCHEDULER_cancel (mh->reset_task);
238 mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task,
244 * Closure for handle_reply().
246 struct HandleReplyClosure
255 * Expiration time for the block.
257 struct GNUNET_TIME_Absolute expiration;
260 * Number of bytes in @e data.
267 enum GNUNET_BLOCK_Type type;
270 * Did we have a matching query?
277 * Iterator called on each entry in a waiting map to
280 * @param cls the `struct HandleReplyClosure`
281 * @param key the key of the entry in the map (the query)
282 * @param value the `struct GSF_CadetRequest` to handle result for
283 * @return #GNUNET_YES (continue to iterate)
286 process_reply (void *cls,
287 const struct GNUNET_HashCode *key,
290 struct HandleReplyClosure *hrc = cls;
291 struct GSF_CadetRequest *sr = value;
293 sr->proc (sr->proc_cls,
299 GSF_cadet_query_cancel (sr);
300 hrc->found = GNUNET_YES;
306 * Iterator called on each entry in a waiting map to
307 * call the 'proc' continuation and release associated
310 * @param cls the `struct CadetHandle`
311 * @param key the key of the entry in the map (the query)
312 * @param value the `struct GSF_CadetRequest` to clean up
313 * @return #GNUNET_YES (continue to iterate)
316 free_waiting_entry (void *cls,
317 const struct GNUNET_HashCode *key,
320 struct GSF_CadetRequest *sr = value;
322 GSF_cadet_query_cancel (sr);
328 * Functions with this signature are called whenever a complete reply
331 * @param cls closure with the `struct CadetHandle`
332 * @param srm the actual message
335 handle_reply (void *cls,
336 const struct CadetReplyMessage *srm)
338 struct CadetHandle *mh = cls;
339 struct HandleReplyClosure hrc;
341 enum GNUNET_BLOCK_Type type;
342 struct GNUNET_HashCode query;
344 msize = ntohs (srm->header.size) - sizeof (struct CadetReplyMessage);
345 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
347 GNUNET_BLOCK_get_key (GSF_block_ctx,
354 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
355 "Received bogus reply of type %u with %u bytes via cadet from peer %s\n",
358 GNUNET_i2s (&mh->target));
359 reset_cadet_async (mh);
362 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
363 "Received reply `%s' via cadet from peer %s\n",
365 GNUNET_i2s (&mh->target));
366 GNUNET_CADET_receive_done (mh->channel);
367 GNUNET_STATISTICS_update (GSF_stats,
368 gettext_noop ("# replies received via cadet"), 1,
371 hrc.data_size = msize;
372 hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
374 hrc.found = GNUNET_NO;
375 GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
379 if (GNUNET_NO == hrc.found)
381 GNUNET_STATISTICS_update (GSF_stats,
382 gettext_noop ("# replies received via cadet dropped"), 1,
389 * Function called by cadet when a client disconnects.
390 * Cleans up our `struct CadetClient` of that channel.
392 * @param cls our `struct CadetClient`
393 * @param channel channel of the disconnecting client
396 disconnect_cb (void *cls,
397 const struct GNUNET_CADET_Channel *channel)
399 struct CadetHandle *mh = cls;
400 struct GSF_CadetRequest *sr;
402 if (NULL == mh->channel)
403 return; /* being destroyed elsewhere */
404 GNUNET_assert (channel == mh->channel);
406 while (NULL != (sr = mh->pending_head))
407 GSF_cadet_query_cancel (sr);
408 /* first remove `mh` from the `cadet_map`, so that if the
409 callback from `free_waiting_entry()` happens to re-issue
410 the request, we don't immediately have it back in the
412 GNUNET_assert (GNUNET_OK ==
413 GNUNET_CONTAINER_multipeermap_remove (cadet_map,
416 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
419 if (NULL != mh->timeout_task)
420 GNUNET_SCHEDULER_cancel (mh->timeout_task);
421 if (NULL != mh->reset_task)
422 GNUNET_SCHEDULER_cancel (mh->reset_task);
424 GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
425 GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
431 * Function called whenever an MQ-channel's transmission window size changes.
433 * The first callback in an outgoing channel will be with a non-zero value
434 * and will mean the channel is connected to the destination.
436 * For an incoming channel it will be called immediately after the
437 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
439 * @param cls Channel closure.
440 * @param channel Connection to the other end (henceforth invalid).
441 * @param window_size New window size. If the is more messages than buffer size
442 * this value will be negative..
445 window_change_cb (void *cls,
446 const struct GNUNET_CADET_Channel *channel,
449 /* FIXME: for flow control, implement? */
451 /* Something like this instead of the GNUNET_MQ_notify_sent() in
452 transmit_pending() might be good (once the window change CB works...) */
453 if (0 < window_size) /* test needed? */
454 transmit_pending (mh);
460 * We had a serious error, tear down and re-create cadet from scratch.
462 * @param mh cadet to reset
465 reset_cadet (struct CadetHandle *mh)
467 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
468 "Resetting cadet channel to %s\n",
469 GNUNET_i2s (&mh->target));
470 GNUNET_CADET_channel_destroy (mh->channel);
472 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
476 struct GNUNET_MQ_MessageHandler handlers[] = {
477 GNUNET_MQ_hd_var_size (reply,
478 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
479 struct CadetReplyMessage,
481 GNUNET_MQ_handler_end ()
483 struct GNUNET_HashCode port;
485 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
486 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
488 mh->channel = GNUNET_CADET_channel_create (cadet_handle,
492 GNUNET_CADET_OPTION_RELIABLE,
497 transmit_pending (mh);
502 * Task called when it is time to destroy an inactive cadet channel.
504 * @param cls the `struct CadetHandle` to tear down
507 cadet_timeout (void *cls)
509 struct CadetHandle *mh = cls;
510 struct GNUNET_CADET_Channel *tun;
512 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513 "Timeout on cadet channel to %s\n",
514 GNUNET_i2s (&mh->target));
515 mh->timeout_task = NULL;
519 GNUNET_CADET_channel_destroy (tun);
524 * Task called when it is time to reset an cadet.
526 * @param cls the `struct CadetHandle` to tear down
529 reset_cadet_task (void *cls)
531 struct CadetHandle *mh = cls;
533 mh->reset_task = NULL;
539 * Transmit pending requests via the cadet.
541 * @param cls `struct CadetHandle` to process
544 transmit_pending (void *cls)
546 struct CadetHandle *mh = cls;
547 struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel);
548 struct GSF_CadetRequest *sr;
549 struct GNUNET_MQ_Envelope *env;
550 struct CadetQueryMessage *sqm;
552 if ( (0 != GNUNET_MQ_get_length (mq)) ||
553 (NULL == (sr = mh->pending_head)) )
555 GNUNET_CONTAINER_DLL_remove (mh->pending_head,
558 GNUNET_assert (GNUNET_OK ==
559 GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
562 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
563 sr->was_transmitted = GNUNET_YES;
564 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565 "Sending query for %s via cadet to %s\n",
566 GNUNET_h2s (&sr->query),
567 GNUNET_i2s (&mh->target));
568 env = GNUNET_MQ_msg (sqm,
569 GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
570 sqm->type = htonl (sr->type);
571 sqm->query = sr->query;
572 GNUNET_MQ_notify_sent (env,
581 * Get (or create) a cadet to talk to the given peer.
583 * @param target peer we want to communicate with
585 static struct CadetHandle *
586 get_cadet (const struct GNUNET_PeerIdentity *target)
588 struct CadetHandle *mh;
590 mh = GNUNET_CONTAINER_multipeermap_get (cadet_map,
594 if (NULL != mh->timeout_task)
596 GNUNET_SCHEDULER_cancel (mh->timeout_task);
597 mh->timeout_task = NULL;
601 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
602 "Creating cadet channel to %s\n",
603 GNUNET_i2s (target));
604 mh = GNUNET_new (struct CadetHandle);
605 mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
608 mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
609 mh->target = *target;
610 GNUNET_assert (GNUNET_OK ==
611 GNUNET_CONTAINER_multipeermap_put (cadet_map,
614 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
616 struct GNUNET_MQ_MessageHandler handlers[] = {
617 GNUNET_MQ_hd_var_size (reply,
618 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
619 struct CadetReplyMessage,
621 GNUNET_MQ_handler_end ()
623 struct GNUNET_HashCode port;
625 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
626 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
628 mh->channel = GNUNET_CADET_channel_create (cadet_handle,
632 GNUNET_CADET_OPTION_RELIABLE,
642 * Look for a block by directly contacting a particular peer.
644 * @param target peer that should have the block
645 * @param query hash to query for the block
646 * @param type desired type for the block
647 * @param proc function to call with result
648 * @param proc_cls closure for @a proc
649 * @return handle to cancel the operation
651 struct GSF_CadetRequest *
652 GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
653 const struct GNUNET_HashCode *query,
654 enum GNUNET_BLOCK_Type type,
655 GSF_CadetReplyProcessor proc,
658 struct CadetHandle *mh;
659 struct GSF_CadetRequest *sr;
661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
662 "Preparing to send query for %s via cadet to %s\n",
664 GNUNET_i2s (target));
665 mh = get_cadet (target);
666 sr = GNUNET_new (struct GSF_CadetRequest);
669 sr->proc_cls = proc_cls;
672 GNUNET_CONTAINER_DLL_insert (mh->pending_head,
675 transmit_pending (mh);
681 * Cancel an active request; must not be called after 'proc'
684 * @param sr request to cancel
687 GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
689 struct CadetHandle *mh = sr->mh;
690 GSF_CadetReplyProcessor p;
696 /* signal failure / cancellation to callback */
697 p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
698 GNUNET_TIME_UNIT_ZERO_ABS,
701 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702 "Cancelled query for %s via cadet to %s\n",
703 GNUNET_h2s (&sr->query),
704 GNUNET_i2s (&sr->mh->target));
705 if (GNUNET_YES == sr->was_transmitted)
706 GNUNET_assert (GNUNET_OK ==
707 GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
711 GNUNET_CONTAINER_DLL_remove (mh->pending_head,
715 if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
716 (NULL == mh->pending_head) )
717 mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
724 * Function called on each active cadets to shut them down.
727 * @param key target peer, unused
728 * @param value the `struct CadetHandle` to destroy
729 * @return #GNUNET_YES (continue to iterate)
732 GSF_cadet_release_clients (void *cls,
733 const struct GNUNET_PeerIdentity *key,
736 struct CadetHandle *mh = value;
738 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
739 "Timeout on cadet channel to %s\n",
740 GNUNET_i2s (&mh->target));
741 if (NULL != mh->channel)
742 GNUNET_CADET_channel_destroy (mh->channel);
748 /* end of gnunet-service-fs_cadet_client.c */