2 This file is part of GNUnet.
3 (C) 2012, 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_mesh_client.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
27 * - PORT is set to old application type, unsure if we should keep
28 * it that way (fine for now)
31 #include "gnunet_constants.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_mesh_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_applications.h"
36 #include "gnunet-service-fs.h"
37 #include "gnunet-service-fs_indexing.h"
38 #include "gnunet-service-fs_mesh.h"
42 * After how long do we reset connections without replies?
44 #define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
48 * Handle for a mesh to another peer.
54 * Handle for a request that is going out via mesh API.
56 struct GSF_MeshRequest
62 struct GSF_MeshRequest *next;
67 struct GSF_MeshRequest *prev;
70 * Which mesh is this request associated with?
72 struct MeshHandle *mh;
75 * Function to call with the result.
77 GSF_MeshReplyProcessor proc;
85 * Query to transmit to the other peer.
87 struct GNUNET_HashCode query;
90 * Desired type for the reply.
92 enum GNUNET_BLOCK_Type type;
95 * Did we transmit this request already? #GNUNET_YES if we are
96 * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL.
103 * Handle for a mesh to another peer.
108 * Head of DLL of pending requests on this mesh.
110 struct GSF_MeshRequest *pending_head;
113 * Tail of DLL of pending requests on this mesh.
115 struct GSF_MeshRequest *pending_tail;
118 * Map from query to `struct GSF_MeshRequest`s waiting for
121 struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
124 * Channel to the other peer.
126 struct GNUNET_MESH_Channel *channel;
129 * Handle for active write operation, or NULL.
131 struct GNUNET_MESH_TransmitHandle *wh;
134 * Which peer does this mesh go to?
136 struct GNUNET_PeerIdentity target;
139 * Task to kill inactive meshs (we keep them around for
140 * a few seconds to give the application a chance to give
143 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
146 * Task to reset meshs that had errors (asynchronously,
147 * as we may not be able to do it immediately during a
148 * callback from the mesh API).
150 GNUNET_SCHEDULER_TaskIdentifier reset_task;
156 * Mesh channel for creating outbound channels.
158 static struct GNUNET_MESH_Handle *mesh_handle;
161 * Map from peer identities to 'struct MeshHandles' with mesh
162 * channels to those peers.
164 static struct GNUNET_CONTAINER_MultiPeerMap *mesh_map;
167 /* ********************* client-side code ************************* */
171 * Transmit pending requests via the mesh.
173 * @param mh mesh to process
176 transmit_pending (struct MeshHandle *mh);
180 * Iterator called on each entry in a waiting map to
181 * move it back to the pending list.
183 * @param cls the `struct MeshHandle`
184 * @param key the key of the entry in the map (the query)
185 * @param value the `struct GSF_MeshRequest` to move to pending
186 * @return #GNUNET_YES (continue to iterate)
189 move_to_pending (void *cls,
190 const struct GNUNET_HashCode *key,
193 struct MeshHandle *mh = cls;
194 struct GSF_MeshRequest *sr = value;
196 GNUNET_assert (GNUNET_YES ==
197 GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
200 GNUNET_CONTAINER_DLL_insert (mh->pending_head,
203 sr->was_transmitted = GNUNET_NO;
209 * We had a serious error, tear down and re-create mesh from scratch.
211 * @param mh mesh to reset
214 reset_mesh (struct MeshHandle *mh)
216 struct GNUNET_MESH_Channel *channel = mh->channel;
218 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
219 "Resetting mesh channel to %s\n",
220 GNUNET_i2s (&mh->target));
223 GNUNET_MESH_channel_destroy (channel);
224 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
227 mh->channel = GNUNET_MESH_channel_create (mesh_handle,
230 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
231 GNUNET_MESH_OPTION_RELIABLE);
232 transmit_pending (mh);
237 * Task called when it is time to destroy an inactive mesh channel.
239 * @param cls the `struct MeshHandle` to tear down
240 * @param tc scheduler context, unused
243 mesh_timeout (void *cls,
244 const struct GNUNET_SCHEDULER_TaskContext *tc)
246 struct MeshHandle *mh = cls;
247 struct GNUNET_MESH_Channel *tun;
249 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
250 "Timeout on mesh channel to %s\n",
251 GNUNET_i2s (&mh->target));
252 mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
255 GNUNET_MESH_channel_destroy (tun);
260 * Task called when it is time to reset an mesh.
262 * @param cls the `struct MeshHandle` to tear down
263 * @param tc scheduler context, unused
266 reset_mesh_task (void *cls,
267 const struct GNUNET_SCHEDULER_TaskContext *tc)
269 struct MeshHandle *mh = cls;
271 mh->reset_task = GNUNET_SCHEDULER_NO_TASK;
277 * We had a serious error, tear down and re-create mesh from scratch,
278 * but do so asynchronously.
280 * @param mh mesh to reset
283 reset_mesh_async (struct MeshHandle *mh)
285 if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
286 GNUNET_SCHEDULER_cancel (mh->reset_task);
287 mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_mesh_task,
293 * Functions of this signature are called whenever we are ready to transmit
296 * @param cls the struct MeshHandle for which we did the write call
297 * @param size the number of bytes that can be written to @a buf
298 * @param buf where to write the message
299 * @return number of bytes written to @a buf
302 transmit_sqm (void *cls,
306 struct MeshHandle *mh = cls;
307 struct MeshQueryMessage sqm;
308 struct GSF_MeshRequest *sr;
313 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
314 "Mesh channel to %s failed during transmission attempt, rebuilding\n",
315 GNUNET_i2s (&mh->target));
316 reset_mesh_async (mh);
319 sr = mh->pending_head;
322 GNUNET_assert (size >= sizeof (struct MeshQueryMessage));
323 GNUNET_CONTAINER_DLL_remove (mh->pending_head,
326 GNUNET_assert (GNUNET_OK ==
327 GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
330 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
331 sr->was_transmitted = GNUNET_YES;
332 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
333 "Sending query for %s via mesh to %s\n",
334 GNUNET_h2s (&sr->query),
335 GNUNET_i2s (&mh->target));
336 sqm.header.size = htons (sizeof (sqm));
337 sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_QUERY);
338 sqm.type = htonl (sr->type);
339 sqm.query = sr->query;
340 memcpy (buf, &sqm, sizeof (sqm));
341 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
342 "Successfully transmitted %u bytes via mesh to %s\n",
344 GNUNET_i2s (&mh->target));
345 transmit_pending (mh);
351 * Transmit pending requests via the mesh.
353 * @param mh mesh to process
356 transmit_pending (struct MeshHandle *mh)
358 if (NULL == mh->channel)
362 mh->wh = GNUNET_MESH_notify_transmit_ready (mh->channel, GNUNET_YES /* allow cork */,
363 GNUNET_TIME_UNIT_FOREVER_REL,
364 sizeof (struct MeshQueryMessage),
370 * Closure for handle_reply().
372 struct HandleReplyClosure
381 * Expiration time for the block.
383 struct GNUNET_TIME_Absolute expiration;
386 * Number of bytes in 'data'.
393 enum GNUNET_BLOCK_Type type;
396 * Did we have a matching query?
403 * Iterator called on each entry in a waiting map to
406 * @param cls the `struct HandleReplyClosure`
407 * @param key the key of the entry in the map (the query)
408 * @param value the `struct GSF_MeshRequest` to handle result for
409 * @return #GNUNET_YES (continue to iterate)
412 handle_reply (void *cls,
413 const struct GNUNET_HashCode *key,
416 struct HandleReplyClosure *hrc = cls;
417 struct GSF_MeshRequest *sr = value;
419 sr->proc (sr->proc_cls,
425 GSF_mesh_query_cancel (sr);
426 hrc->found = GNUNET_YES;
432 * Functions with this signature are called whenever a complete reply
435 * @param cls closure with the `struct MeshHandle`
436 * @param channel channel handle
437 * @param channel_ctx channel context
438 * @param message the actual message
439 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
443 struct GNUNET_MESH_Channel *channel,
445 const struct GNUNET_MessageHeader *message)
447 struct MeshHandle *mh = *channel_ctx;
448 const struct MeshReplyMessage *srm;
449 struct HandleReplyClosure hrc;
451 enum GNUNET_BLOCK_Type type;
452 struct GNUNET_HashCode query;
454 msize = ntohs (message->size);
455 if (sizeof (struct MeshReplyMessage) > msize)
458 reset_mesh_async (mh);
459 return GNUNET_SYSERR;
461 srm = (const struct MeshReplyMessage *) message;
462 msize -= sizeof (struct MeshReplyMessage);
463 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
465 GNUNET_BLOCK_get_key (GSF_block_ctx,
467 &srm[1], msize, &query))
470 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
471 "Received bogus reply of type %u with %u bytes via mesh from peer %s\n",
474 GNUNET_i2s (&mh->target));
475 reset_mesh_async (mh);
476 return GNUNET_SYSERR;
478 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
479 "Received reply `%s' via mesh from peer %s\n",
481 GNUNET_i2s (&mh->target));
482 GNUNET_MESH_receive_done (channel);
483 GNUNET_STATISTICS_update (GSF_stats,
484 gettext_noop ("# replies received via mesh"), 1,
487 hrc.data_size = msize;
488 hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
490 hrc.found = GNUNET_NO;
491 GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
495 if (GNUNET_NO == hrc.found)
497 GNUNET_STATISTICS_update (GSF_stats,
498 gettext_noop ("# replies received via mesh dropped"), 1,
507 * Get (or create) a mesh to talk to the given peer.
509 * @param target peer we want to communicate with
511 static struct MeshHandle *
512 get_mesh (const struct GNUNET_PeerIdentity *target)
514 struct MeshHandle *mh;
516 mh = GNUNET_CONTAINER_multipeermap_get (mesh_map,
520 if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
522 GNUNET_SCHEDULER_cancel (mh->timeout_task);
523 mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
527 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
528 "Creating mesh channel to %s\n",
529 GNUNET_i2s (target));
530 mh = GNUNET_new (struct MeshHandle);
531 mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
534 mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
535 mh->target = *target;
536 GNUNET_assert (GNUNET_OK ==
537 GNUNET_CONTAINER_multipeermap_put (mesh_map,
540 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
541 mh->channel = GNUNET_MESH_channel_create (mesh_handle,
544 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
545 GNUNET_MESH_OPTION_RELIABLE);
547 GNUNET_CONTAINER_multipeermap_get (mesh_map,
554 * Look for a block by directly contacting a particular peer.
556 * @param target peer that should have the block
557 * @param query hash to query for the block
558 * @param type desired type for the block
559 * @param proc function to call with result
560 * @param proc_cls closure for @a proc
561 * @return handle to cancel the operation
563 struct GSF_MeshRequest *
564 GSF_mesh_query (const struct GNUNET_PeerIdentity *target,
565 const struct GNUNET_HashCode *query,
566 enum GNUNET_BLOCK_Type type,
567 GSF_MeshReplyProcessor proc, void *proc_cls)
569 struct MeshHandle *mh;
570 struct GSF_MeshRequest *sr;
572 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
573 "Preparing to send query for %s via mesh to %s\n",
575 GNUNET_i2s (target));
576 mh = get_mesh (target);
577 sr = GNUNET_new (struct GSF_MeshRequest);
580 sr->proc_cls = proc_cls;
583 GNUNET_CONTAINER_DLL_insert (mh->pending_head,
586 transmit_pending (mh);
592 * Cancel an active request; must not be called after 'proc'
595 * @param sr request to cancel
598 GSF_mesh_query_cancel (struct GSF_MeshRequest *sr)
600 struct MeshHandle *mh = sr->mh;
601 GSF_MeshReplyProcessor p;
607 /* signal failure / cancellation to callback */
608 p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
609 GNUNET_TIME_UNIT_ZERO_ABS,
612 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
613 "Cancelled query for %s via mesh to %s\n",
614 GNUNET_h2s (&sr->query),
615 GNUNET_i2s (&sr->mh->target));
616 if (GNUNET_YES == sr->was_transmitted)
617 GNUNET_assert (GNUNET_OK ==
618 GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
622 GNUNET_CONTAINER_DLL_remove (mh->pending_head,
626 if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
627 (NULL == mh->pending_head) )
628 mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
635 * Iterator called on each entry in a waiting map to
636 * call the 'proc' continuation and release associated
639 * @param cls the `struct MeshHandle`
640 * @param key the key of the entry in the map (the query)
641 * @param value the `struct GSF_MeshRequest` to clean up
642 * @return #GNUNET_YES (continue to iterate)
645 free_waiting_entry (void *cls,
646 const struct GNUNET_HashCode *key,
649 struct GSF_MeshRequest *sr = value;
651 GSF_mesh_query_cancel (sr);
657 * Function called by mesh when a client disconnects.
658 * Cleans up our `struct MeshClient` of that channel.
661 * @param channel channel of the disconnecting client
662 * @param channel_ctx our `struct MeshClient`
665 cleaner_cb (void *cls,
666 const struct GNUNET_MESH_Channel *channel,
669 struct MeshHandle *mh = channel_ctx;
670 struct GSF_MeshRequest *sr;
672 if (NULL == mh->channel)
673 return; /* being destroyed elsewhere */
674 GNUNET_assert (channel == mh->channel);
676 while (NULL != (sr = mh->pending_head))
677 GSF_mesh_query_cancel (sr);
678 /* first remove `mh` from the `mesh_map`, so that if the
679 callback from `free_waiting_entry()` happens to re-issue
680 the request, we don't immediately have it back in the
682 GNUNET_assert (GNUNET_OK ==
683 GNUNET_CONTAINER_multipeermap_remove (mesh_map,
686 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
690 GNUNET_MESH_notify_transmit_ready_cancel (mh->wh);
691 if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
692 GNUNET_SCHEDULER_cancel (mh->timeout_task);
693 if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
694 GNUNET_SCHEDULER_cancel (mh->reset_task);
696 GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
697 GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
703 * Initialize subsystem for non-anonymous file-sharing.
706 GSF_mesh_start_client ()
708 static const struct GNUNET_MESH_MessageHandler handlers[] = {
709 { &reply_cb, GNUNET_MESSAGE_TYPE_FS_MESH_REPLY, 0 },
713 mesh_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
714 mesh_handle = GNUNET_MESH_connect (GSF_cfg,
724 * Function called on each active meshs to shut them down.
727 * @param key target peer, unused
728 * @param value the `struct MeshHandle` to destroy
729 * @return #GNUNET_YES (continue to iterate)
732 release_meshs (void *cls,
733 const struct GNUNET_PeerIdentity *key,
736 struct MeshHandle *mh = value;
738 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
739 "Timeout on mesh channel to %s\n",
740 GNUNET_i2s (&mh->target));
741 if (NULL != mh->channel)
742 GNUNET_MESH_channel_destroy (mh->channel);
748 * Shutdown subsystem for non-anonymous file-sharing.
751 GSF_mesh_stop_client ()
753 GNUNET_CONTAINER_multipeermap_iterate (mesh_map,
756 GNUNET_CONTAINER_multipeermap_destroy (mesh_map);
758 if (NULL != mesh_handle)
760 GNUNET_MESH_disconnect (mesh_handle);
766 /* end of gnunet-service-fs_mesh_client.c */