2 This file is part of GNUnet.
3 (C) 2012, 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_cadet_client.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 * - PORT is set to old application type, unsure if we should keep
28 * it that way (fine for now)
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_cadet_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_cadet.h"
42 * After how long do we reset connections without replies?
44 #define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
48 * Handle for a cadet to another peer.
54 * Handle for a request that is going out via cadet API.
56 struct GSF_CadetRequest
62 struct GSF_CadetRequest *next;
67 struct GSF_CadetRequest *prev;
70 * Which cadet is this request associated with?
72 struct CadetHandle *mh;
75 * Function to call with the result.
77 GSF_CadetReplyProcessor proc;
85 * Query to transmit to the other peer.
87 struct GNUNET_HashCode query;
90 * Desired type for the reply.
92 enum GNUNET_BLOCK_Type type;
95 * Did we transmit this request already? #GNUNET_YES if we are
96 * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL.
103 * Handle for a cadet to another peer.
108 * Head of DLL of pending requests on this cadet.
110 struct GSF_CadetRequest *pending_head;
113 * Tail of DLL of pending requests on this cadet.
115 struct GSF_CadetRequest *pending_tail;
118 * Map from query to `struct GSF_CadetRequest`s waiting for
121 struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
124 * Channel to the other peer.
126 struct GNUNET_CADET_Channel *channel;
129 * Handle for active write operation, or NULL.
131 struct GNUNET_CADET_TransmitHandle *wh;
134 * Which peer does this cadet go to?
136 struct GNUNET_PeerIdentity target;
139 * Task to kill inactive cadets (we keep them around for
140 * a few seconds to give the application a chance to give
143 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
146 * Task to reset cadets that had errors (asynchronously,
147 * as we may not be able to do it immediately during a
148 * callback from the cadet API).
150 GNUNET_SCHEDULER_TaskIdentifier reset_task;
156 * Cadet channel for creating outbound channels.
158 static struct GNUNET_CADET_Handle *cadet_handle;
161 * Map from peer identities to 'struct CadetHandles' with cadet
162 * channels to those peers.
164 static struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
167 /* ********************* client-side code ************************* */
171 * Transmit pending requests via the cadet.
173 * @param mh cadet to process
176 transmit_pending (struct CadetHandle *mh);
180 * Iterator called on each entry in a waiting map to
181 * move it back to the pending list.
183 * @param cls the `struct CadetHandle`
184 * @param key the key of the entry in the map (the query)
185 * @param value the `struct GSF_CadetRequest` to move to pending
186 * @return #GNUNET_YES (continue to iterate)
189 move_to_pending (void *cls,
190 const struct GNUNET_HashCode *key,
193 struct CadetHandle *mh = cls;
194 struct GSF_CadetRequest *sr = value;
196 GNUNET_assert (GNUNET_YES ==
197 GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
200 GNUNET_CONTAINER_DLL_insert (mh->pending_head,
203 sr->was_transmitted = GNUNET_NO;
209 * We had a serious error, tear down and re-create cadet from scratch.
211 * @param mh cadet to reset
214 reset_cadet (struct CadetHandle *mh)
216 struct GNUNET_CADET_Channel *channel = mh->channel;
218 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
219 "Resetting cadet channel to %s\n",
220 GNUNET_i2s (&mh->target));
228 GNUNET_CADET_cancel_notify(mh->wh);
231 GNUNET_CADET_channel_destroy (channel);
233 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
236 mh->channel = GNUNET_CADET_channel_create (cadet_handle,
239 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
240 GNUNET_CADET_OPTION_RELIABLE);
241 transmit_pending (mh);
246 * Task called when it is time to destroy an inactive cadet channel.
248 * @param cls the `struct CadetHandle` to tear down
249 * @param tc scheduler context, unused
252 cadet_timeout (void *cls,
253 const struct GNUNET_SCHEDULER_TaskContext *tc)
255 struct CadetHandle *mh = cls;
256 struct GNUNET_CADET_Channel *tun;
258 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
259 "Timeout on cadet channel to %s\n",
260 GNUNET_i2s (&mh->target));
261 mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
264 GNUNET_CADET_channel_destroy (tun);
269 * Task called when it is time to reset an cadet.
271 * @param cls the `struct CadetHandle` to tear down
272 * @param tc scheduler context, unused
275 reset_cadet_task (void *cls,
276 const struct GNUNET_SCHEDULER_TaskContext *tc)
278 struct CadetHandle *mh = cls;
280 mh->reset_task = GNUNET_SCHEDULER_NO_TASK;
286 * We had a serious error, tear down and re-create cadet from scratch,
287 * but do so asynchronously.
289 * @param mh cadet to reset
292 reset_cadet_async (struct CadetHandle *mh)
294 if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
295 GNUNET_SCHEDULER_cancel (mh->reset_task);
296 mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task,
302 * Functions of this signature are called whenever we are ready to transmit
305 * @param cls the struct CadetHandle for which we did the write call
306 * @param size the number of bytes that can be written to @a buf
307 * @param buf where to write the message
308 * @return number of bytes written to @a buf
311 transmit_sqm (void *cls,
315 struct CadetHandle *mh = cls;
316 struct CadetQueryMessage sqm;
317 struct GSF_CadetRequest *sr;
322 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
323 "Cadet channel to %s failed during transmission attempt, rebuilding\n",
324 GNUNET_i2s (&mh->target));
325 reset_cadet_async (mh);
328 sr = mh->pending_head;
331 GNUNET_assert (size >= sizeof (struct CadetQueryMessage));
332 GNUNET_CONTAINER_DLL_remove (mh->pending_head,
335 GNUNET_assert (GNUNET_OK ==
336 GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
339 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
340 sr->was_transmitted = GNUNET_YES;
341 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
342 "Sending query for %s via cadet to %s\n",
343 GNUNET_h2s (&sr->query),
344 GNUNET_i2s (&mh->target));
345 sqm.header.size = htons (sizeof (sqm));
346 sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
347 sqm.type = htonl (sr->type);
348 sqm.query = sr->query;
349 memcpy (buf, &sqm, sizeof (sqm));
350 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
351 "Successfully transmitted %u bytes via cadet to %s\n",
353 GNUNET_i2s (&mh->target));
354 transmit_pending (mh);
360 * Transmit pending requests via the cadet.
362 * @param mh cadet to process
365 transmit_pending (struct CadetHandle *mh)
367 if (NULL == mh->channel)
371 mh->wh = GNUNET_CADET_notify_transmit_ready (mh->channel, GNUNET_YES /* allow cork */,
372 GNUNET_TIME_UNIT_FOREVER_REL,
373 sizeof (struct CadetQueryMessage),
379 * Closure for handle_reply().
381 struct HandleReplyClosure
390 * Expiration time for the block.
392 struct GNUNET_TIME_Absolute expiration;
395 * Number of bytes in 'data'.
402 enum GNUNET_BLOCK_Type type;
405 * Did we have a matching query?
412 * Iterator called on each entry in a waiting map to
415 * @param cls the `struct HandleReplyClosure`
416 * @param key the key of the entry in the map (the query)
417 * @param value the `struct GSF_CadetRequest` to handle result for
418 * @return #GNUNET_YES (continue to iterate)
421 handle_reply (void *cls,
422 const struct GNUNET_HashCode *key,
425 struct HandleReplyClosure *hrc = cls;
426 struct GSF_CadetRequest *sr = value;
428 sr->proc (sr->proc_cls,
434 GSF_cadet_query_cancel (sr);
435 hrc->found = GNUNET_YES;
441 * Functions with this signature are called whenever a complete reply
444 * @param cls closure with the `struct CadetHandle`
445 * @param channel channel handle
446 * @param channel_ctx channel context
447 * @param message the actual message
448 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
452 struct GNUNET_CADET_Channel *channel,
454 const struct GNUNET_MessageHeader *message)
456 struct CadetHandle *mh = *channel_ctx;
457 const struct CadetReplyMessage *srm;
458 struct HandleReplyClosure hrc;
460 enum GNUNET_BLOCK_Type type;
461 struct GNUNET_HashCode query;
463 msize = ntohs (message->size);
464 if (sizeof (struct CadetReplyMessage) > msize)
467 reset_cadet_async (mh);
468 return GNUNET_SYSERR;
470 srm = (const struct CadetReplyMessage *) message;
471 msize -= sizeof (struct CadetReplyMessage);
472 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
474 GNUNET_BLOCK_get_key (GSF_block_ctx,
476 &srm[1], msize, &query))
479 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
480 "Received bogus reply of type %u with %u bytes via cadet from peer %s\n",
483 GNUNET_i2s (&mh->target));
484 reset_cadet_async (mh);
485 return GNUNET_SYSERR;
487 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
488 "Received reply `%s' via cadet from peer %s\n",
490 GNUNET_i2s (&mh->target));
491 GNUNET_CADET_receive_done (channel);
492 GNUNET_STATISTICS_update (GSF_stats,
493 gettext_noop ("# replies received via cadet"), 1,
496 hrc.data_size = msize;
497 hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
499 hrc.found = GNUNET_NO;
500 GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
504 if (GNUNET_NO == hrc.found)
506 GNUNET_STATISTICS_update (GSF_stats,
507 gettext_noop ("# replies received via cadet dropped"), 1,
516 * Get (or create) a cadet to talk to the given peer.
518 * @param target peer we want to communicate with
520 static struct CadetHandle *
521 get_cadet (const struct GNUNET_PeerIdentity *target)
523 struct CadetHandle *mh;
525 mh = GNUNET_CONTAINER_multipeermap_get (cadet_map,
529 if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
531 GNUNET_SCHEDULER_cancel (mh->timeout_task);
532 mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
536 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
537 "Creating cadet channel to %s\n",
538 GNUNET_i2s (target));
539 mh = GNUNET_new (struct CadetHandle);
540 mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
543 mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
544 mh->target = *target;
545 GNUNET_assert (GNUNET_OK ==
546 GNUNET_CONTAINER_multipeermap_put (cadet_map,
549 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
550 mh->channel = GNUNET_CADET_channel_create (cadet_handle,
553 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
554 GNUNET_CADET_OPTION_RELIABLE);
556 GNUNET_CONTAINER_multipeermap_get (cadet_map,
563 * Look for a block by directly contacting a particular peer.
565 * @param target peer that should have the block
566 * @param query hash to query for the block
567 * @param type desired type for the block
568 * @param proc function to call with result
569 * @param proc_cls closure for @a proc
570 * @return handle to cancel the operation
572 struct GSF_CadetRequest *
573 GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
574 const struct GNUNET_HashCode *query,
575 enum GNUNET_BLOCK_Type type,
576 GSF_CadetReplyProcessor proc, void *proc_cls)
578 struct CadetHandle *mh;
579 struct GSF_CadetRequest *sr;
581 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
582 "Preparing to send query for %s via cadet to %s\n",
584 GNUNET_i2s (target));
585 mh = get_cadet (target);
586 sr = GNUNET_new (struct GSF_CadetRequest);
589 sr->proc_cls = proc_cls;
592 GNUNET_CONTAINER_DLL_insert (mh->pending_head,
595 transmit_pending (mh);
601 * Cancel an active request; must not be called after 'proc'
604 * @param sr request to cancel
607 GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
609 struct CadetHandle *mh = sr->mh;
610 GSF_CadetReplyProcessor p;
616 /* signal failure / cancellation to callback */
617 p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
618 GNUNET_TIME_UNIT_ZERO_ABS,
621 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622 "Cancelled query for %s via cadet to %s\n",
623 GNUNET_h2s (&sr->query),
624 GNUNET_i2s (&sr->mh->target));
625 if (GNUNET_YES == sr->was_transmitted)
626 GNUNET_assert (GNUNET_OK ==
627 GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
631 GNUNET_CONTAINER_DLL_remove (mh->pending_head,
635 if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
636 (NULL == mh->pending_head) )
637 mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
644 * Iterator called on each entry in a waiting map to
645 * call the 'proc' continuation and release associated
648 * @param cls the `struct CadetHandle`
649 * @param key the key of the entry in the map (the query)
650 * @param value the `struct GSF_CadetRequest` to clean up
651 * @return #GNUNET_YES (continue to iterate)
654 free_waiting_entry (void *cls,
655 const struct GNUNET_HashCode *key,
658 struct GSF_CadetRequest *sr = value;
660 GSF_cadet_query_cancel (sr);
666 * Function called by cadet when a client disconnects.
667 * Cleans up our `struct CadetClient` of that channel.
670 * @param channel channel of the disconnecting client
671 * @param channel_ctx our `struct CadetClient`
674 cleaner_cb (void *cls,
675 const struct GNUNET_CADET_Channel *channel,
678 struct CadetHandle *mh = channel_ctx;
679 struct GSF_CadetRequest *sr;
681 if (NULL == mh->channel)
682 return; /* being destroyed elsewhere */
683 GNUNET_assert (channel == mh->channel);
685 while (NULL != (sr = mh->pending_head))
686 GSF_cadet_query_cancel (sr);
687 /* first remove `mh` from the `cadet_map`, so that if the
688 callback from `free_waiting_entry()` happens to re-issue
689 the request, we don't immediately have it back in the
691 GNUNET_assert (GNUNET_OK ==
692 GNUNET_CONTAINER_multipeermap_remove (cadet_map,
695 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
699 GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
700 if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
701 GNUNET_SCHEDULER_cancel (mh->timeout_task);
702 if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
703 GNUNET_SCHEDULER_cancel (mh->reset_task);
705 GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
706 GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
712 * Initialize subsystem for non-anonymous file-sharing.
715 GSF_cadet_start_client ()
717 static const struct GNUNET_CADET_MessageHandler handlers[] = {
718 { &reply_cb, GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 0 },
722 cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
723 cadet_handle = GNUNET_CADET_connect (GSF_cfg,
733 * Function called on each active cadets to shut them down.
736 * @param key target peer, unused
737 * @param value the `struct CadetHandle` to destroy
738 * @return #GNUNET_YES (continue to iterate)
741 release_cadets (void *cls,
742 const struct GNUNET_PeerIdentity *key,
745 struct CadetHandle *mh = value;
747 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
748 "Timeout on cadet channel to %s\n",
749 GNUNET_i2s (&mh->target));
750 if (NULL != mh->channel)
751 GNUNET_CADET_channel_destroy (mh->channel);
757 * Shutdown subsystem for non-anonymous file-sharing.
760 GSF_cadet_stop_client ()
762 GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
765 GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
767 if (NULL != cadet_handle)
769 GNUNET_CADET_disconnect (cadet_handle);
775 /* end of gnunet-service-fs_cadet_client.c */