2 This file is part of GNUnet.
3 Copyright (C) 2009-2013, 2016, 2018 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file transport/transport_api_core.c
23 * @brief library to access the transport service for message exchange
24 * @author Christian Grothoff
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_arm_service.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_transport_core_service.h"
33 #include "transport.h"
35 #define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__)
38 * How large to start with for the hashmap of neighbours.
40 #define STARTING_NEIGHBOURS_SIZE 16
44 * Entry in hash table of all of our current (connected) neighbours.
50 * Identity of this neighbour.
52 struct GNUNET_PeerIdentity id;
55 * Overall transport handle.
57 struct GNUNET_TRANSPORT_CoreHandle *h;
60 * Active message queue for the peer.
62 struct GNUNET_MQ_Handle *mq;
65 * Envelope with the message we are currently transmitting (or NULL).
67 struct GNUNET_MQ_Envelope *env;
70 * Closure for @e mq handlers.
75 * Entry in our readyness heap (which is sorted by @e next_ready
76 * value). NULL if there is no pending transmission request for
77 * this neighbour or if we're waiting for @e is_ready to become
78 * true AFTER the @e out_tracker suggested that this peer's quota
79 * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
80 * we should immediately go back into the heap).
82 struct GNUNET_CONTAINER_HeapNode *hn;
85 * Task to trigger MQ when we have enough bandwidth for the
88 struct GNUNET_SCHEDULER_Task *timeout_task;
91 * Outbound bandwidh tracker.
93 struct GNUNET_BANDWIDTH_Tracker out_tracker;
96 * Sending consumed more bytes on wire than payload was announced
97 * This overhead is added to the delay of next sending operation
99 unsigned long long traffic_overhead;
102 * Is this peer currently ready to receive a message?
107 * Size of the message in @e env.
116 * Handle for the transport service (includes all of the
117 * state for the transport service).
119 struct GNUNET_TRANSPORT_CoreHandle
123 * Closure for the callbacks.
128 * Functions to call for received data (template for
129 * new message queues).
131 struct GNUNET_MQ_MessageHandler *handlers;
134 * function to call on connect events
136 GNUNET_TRANSPORT_NotifyConnect nc_cb;
139 * function to call on disconnect events
141 GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
144 * function to call on excess bandwidth events
146 GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
149 * My client connection to the transport service.
151 struct GNUNET_MQ_Handle *mq;
156 const struct GNUNET_CONFIGURATION_Handle *cfg;
159 * Hash map of the current connected neighbours of this peer.
160 * Maps peer identities to `struct Neighbour` entries.
162 struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
165 * Peer identity as assumed by this process, or all zeros.
167 struct GNUNET_PeerIdentity self;
170 * ID of the task trying to reconnect to the service.
172 struct GNUNET_SCHEDULER_Task *reconnect_task;
175 * Delay until we try to reconnect.
177 struct GNUNET_TIME_Relative reconnect_delay;
180 * Should we check that @e self matches what the service thinks?
181 * (if #GNUNET_NO, then @e self is all zeros!).
189 * Function that will schedule the job that will try
190 * to connect us again to the client.
192 * @param h transport service to reconnect
195 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h);
199 * Get the neighbour list entry for the given peer
201 * @param h our context
202 * @param peer peer to look up
203 * @return NULL if no such peer entry exists
205 static struct Neighbour *
206 neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h,
207 const struct GNUNET_PeerIdentity *peer)
209 return GNUNET_CONTAINER_multipeermap_get (h->neighbours,
215 * Function called by the bandwidth tracker if we have excess
218 * @param cls the `struct Neighbour` that has excess bandwidth
221 notify_excess_cb (void *cls)
223 struct Neighbour *n = cls;
224 struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
226 LOG (GNUNET_ERROR_TYPE_DEBUG,
227 "Notifying CORE that more bandwidth is available for %s\n",
228 GNUNET_i2s (&n->id));
230 if (NULL != h->neb_cb)
238 * Iterator over hash map entries, for deleting state of a neighbour.
240 * @param cls the `struct GNUNET_TRANSPORT_CoreHandle *`
241 * @param key peer identity
242 * @param value value in the hash map, the neighbour entry to delete
243 * @return #GNUNET_YES if we should continue to
248 neighbour_delete (void *cls,
249 const struct GNUNET_PeerIdentity *key,
252 struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
253 struct Neighbour *n = value;
255 LOG (GNUNET_ERROR_TYPE_DEBUG,
256 "Dropping entry for neighbour `%s'.\n",
258 GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
259 if (NULL != handle->nd_cb)
260 handle->nd_cb (handle->cls,
263 if (NULL != n->timeout_task)
265 GNUNET_SCHEDULER_cancel (n->timeout_task);
266 n->timeout_task = NULL;
270 GNUNET_MQ_send_cancel (n->env);
273 GNUNET_MQ_destroy (n->mq);
274 GNUNET_assert (NULL == n->mq);
275 GNUNET_assert (GNUNET_YES ==
276 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours,
285 * Generic error handler, called with the appropriate
286 * error code and the same closure specified at the creation of
288 * Not every message queue implementation supports an error handler.
290 * @param cls closure with the `struct GNUNET_TRANSPORT_CoreHandle *`
291 * @param error error code
294 mq_error_handler (void *cls,
295 enum GNUNET_MQ_Error error)
297 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
299 LOG (GNUNET_ERROR_TYPE_DEBUG,
300 "Error receiving from transport service, disconnecting temporarily.\n");
301 disconnect_and_schedule_reconnect (h);
306 * A message from the handler's message queue to a neighbour was
307 * transmitted. Now trigger (possibly delayed) notification of the
308 * neighbour's message queue that we are done and thus ready for
311 * @param cls the `struct Neighbour` where the message was sent
314 notify_send_done_fin (void *cls)
316 struct Neighbour *n = cls;
318 n->timeout_task = NULL;
319 n->is_ready = GNUNET_YES;
320 GNUNET_MQ_impl_send_continue (n->mq);
325 * A message from the handler's message queue to a neighbour was
326 * transmitted. Now trigger (possibly delayed) notification of the
327 * neighbour's message queue that we are done and thus ready for
330 * @param cls the `struct Neighbour` where the message was sent
333 notify_send_done (void *cls)
335 struct Neighbour *n = cls;
336 struct GNUNET_TIME_Relative delay;
338 n->timeout_task = NULL;
341 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
342 n->env_size + n->traffic_overhead);
344 n->traffic_overhead = 0;
346 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
348 if (0 == delay.rel_value_us)
350 n->is_ready = GNUNET_YES;
351 GNUNET_MQ_impl_send_continue (n->mq);
354 GNUNET_MQ_impl_send_in_flight (n->mq);
355 /* cannot send even a small message without violating
356 quota, wait a before allowing MQ to send next message */
357 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
358 ¬ify_send_done_fin,
364 * Implement sending functionality of a message queue.
365 * Called one message at a time. Should send the @a msg
366 * to the transport service and then notify the queue
367 * once we are ready for the next one.
369 * @param mq the message queue
370 * @param msg the message to send
371 * @param impl_state state of the implementation
374 mq_send_impl (struct GNUNET_MQ_Handle *mq,
375 const struct GNUNET_MessageHeader *msg,
378 struct Neighbour *n = impl_state;
379 struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
380 struct OutboundMessage *obm;
383 GNUNET_assert (GNUNET_YES == n->is_ready);
384 msize = ntohs (msg->size);
385 if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (*obm))
388 GNUNET_MQ_impl_send_continue (mq);
391 GNUNET_assert (NULL == n->env);
392 n->env = GNUNET_MQ_msg_nested_mh (obm,
393 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
395 obm->reserved = htonl (0);
396 obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
398 GNUNET_assert (NULL == n->timeout_task);
399 n->is_ready = GNUNET_NO;
400 n->env_size = ntohs (msg->size);
401 GNUNET_MQ_notify_sent (n->env,
404 GNUNET_MQ_send (h->mq,
406 LOG (GNUNET_ERROR_TYPE_DEBUG,
407 "Queued message of type %u for neighbour `%s'.\n",
409 GNUNET_i2s (&n->id));
414 * Handle destruction of a message queue. Implementations must not
415 * free @a mq, but should take care of @a impl_state.
417 * @param mq the message queue to destroy
418 * @param impl_state state of the implementation
421 mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
424 struct Neighbour *n = impl_state;
426 GNUNET_assert (mq == n->mq);
432 * Implementation function that cancels the currently sent message.
433 * Should basically undo whatever #mq_send_impl() did.
435 * @param mq message queue
436 * @param impl_state state specific to the implementation
439 mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
442 struct Neighbour *n = impl_state;
444 GNUNET_assert (GNUNET_NO == n->is_ready);
447 GNUNET_MQ_send_cancel (n->env);
451 n->is_ready = GNUNET_YES;
456 * We had an error processing a message we forwarded from a peer to
457 * the CORE service. We should just complain about it but otherwise
458 * continue processing.
461 * @param error error code
464 peer_mq_error_handler (void *cls,
465 enum GNUNET_MQ_Error error)
467 /* struct Neighbour *n = cls; */
474 * The outbound quota has changed in a way that may require
475 * us to reset the timeout. Update the timeout.
477 * @param cls the `struct Neighbour` for which the timeout changed
480 outbound_bw_tracker_update (void *cls)
482 struct Neighbour *n = cls;
483 struct GNUNET_TIME_Relative delay;
485 if (NULL == n->timeout_task)
487 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
489 GNUNET_SCHEDULER_cancel (n->timeout_task);
490 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
497 * Function we use for handling incoming connect messages.
499 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
500 * @param cim message received
503 handle_connect (void *cls,
504 const struct ConnectInfoMessage *cim)
506 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
509 LOG (GNUNET_ERROR_TYPE_DEBUG,
510 "Receiving CONNECT message for `%s' with quota %u\n",
511 GNUNET_i2s (&cim->id),
512 ntohl (cim->quota_out.value__));
513 n = neighbour_find (h,
518 disconnect_and_schedule_reconnect (h);
521 n = GNUNET_new (struct Neighbour);
524 n->is_ready = GNUNET_YES;
525 n->traffic_overhead = 0;
526 GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
527 &outbound_bw_tracker_update,
529 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
530 MAX_BANDWIDTH_CARRY_S,
533 GNUNET_assert (GNUNET_OK ==
534 GNUNET_CONTAINER_multipeermap_put (h->neighbours,
537 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
539 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
541 n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl,
546 &peer_mq_error_handler,
548 if (NULL != h->nc_cb)
550 n->handlers_cls = h->nc_cb (h->cls,
553 GNUNET_MQ_set_handlers_closure (n->mq,
560 * Function we use for handling incoming disconnect messages.
562 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
563 * @param dim message received
566 handle_disconnect (void *cls,
567 const struct DisconnectInfoMessage *dim)
569 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
572 GNUNET_break (ntohl (dim->reserved) == 0);
573 LOG (GNUNET_ERROR_TYPE_DEBUG,
574 "Receiving DISCONNECT message for `%s'.\n",
575 GNUNET_i2s (&dim->peer));
576 n = neighbour_find (h,
581 disconnect_and_schedule_reconnect (h);
584 GNUNET_assert (GNUNET_YES ==
592 * Function we use for handling incoming send-ok messages.
594 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
595 * @param okm message received
598 handle_send_ok (void *cls,
599 const struct SendOkMessage *okm)
601 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
604 uint32_t bytes_physical;
606 bytes_msg = ntohs (okm->bytes_msg);
607 bytes_physical = ntohl (okm->bytes_physical);
608 LOG (GNUNET_ERROR_TYPE_DEBUG,
609 "Receiving SEND_OK message, transmission to %s %s.\n",
610 GNUNET_i2s (&okm->peer),
611 (GNUNET_OK == ntohs (okm->success))
614 n = neighbour_find (h,
618 /* We should never get a 'SEND_OK' for a peer that we are not
621 disconnect_and_schedule_reconnect (h);
624 if (bytes_physical > bytes_msg)
626 LOG (GNUNET_ERROR_TYPE_DEBUG,
627 "Overhead for %u byte message was %u\n",
628 (unsigned int) bytes_msg,
629 (unsigned int) (bytes_physical - bytes_msg));
630 n->traffic_overhead += bytes_physical - bytes_msg;
636 * Function we use for checking incoming "inbound" messages.
638 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
639 * @param im message received
642 check_recv (void *cls,
643 const struct InboundMessage *im)
645 const struct GNUNET_MessageHeader *imm;
648 size = ntohs (im->header.size) - sizeof (*im);
649 if (size < sizeof (struct GNUNET_MessageHeader))
652 return GNUNET_SYSERR;
654 imm = (const struct GNUNET_MessageHeader *) &im[1];
655 if (ntohs (imm->size) != size)
658 return GNUNET_SYSERR;
665 * Function we use for handling incoming messages.
667 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
668 * @param im message received
671 handle_recv (void *cls,
672 const struct InboundMessage *im)
674 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
675 const struct GNUNET_MessageHeader *imm
676 = (const struct GNUNET_MessageHeader *) &im[1];
679 LOG (GNUNET_ERROR_TYPE_DEBUG,
680 "Received message of type %u with %u bytes from `%s'.\n",
681 (unsigned int) ntohs (imm->type),
682 (unsigned int) ntohs (imm->size),
683 GNUNET_i2s (&im->peer));
684 n = neighbour_find (h,
689 disconnect_and_schedule_reconnect (h);
692 GNUNET_MQ_inject_message (n->mq,
698 * Function we use for handling incoming set quota messages.
700 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
701 * @param msg message received
704 handle_set_quota (void *cls,
705 const struct QuotaSetMessage *qm)
707 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
710 n = neighbour_find (h,
715 disconnect_and_schedule_reconnect (h);
718 LOG (GNUNET_ERROR_TYPE_DEBUG,
719 "Receiving SET_QUOTA message for `%s' with quota %u\n",
720 GNUNET_i2s (&qm->peer),
721 (unsigned int) ntohl (qm->quota.value__));
722 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
728 * Try again to connect to transport service.
730 * @param cls the handle to the transport service
733 reconnect (void *cls)
735 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
736 struct GNUNET_MQ_MessageHandler handlers[] = {
737 GNUNET_MQ_hd_fixed_size (connect,
738 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
739 struct ConnectInfoMessage,
741 GNUNET_MQ_hd_fixed_size (disconnect,
742 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
743 struct DisconnectInfoMessage,
745 GNUNET_MQ_hd_fixed_size (send_ok,
746 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
747 struct SendOkMessage,
749 GNUNET_MQ_hd_var_size (recv,
750 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
751 struct InboundMessage,
753 GNUNET_MQ_hd_fixed_size (set_quota,
754 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
755 struct QuotaSetMessage,
757 GNUNET_MQ_handler_end ()
759 struct GNUNET_MQ_Envelope *env;
760 struct StartMessage *s;
763 h->reconnect_task = NULL;
764 LOG (GNUNET_ERROR_TYPE_DEBUG,
765 "Connecting to transport service.\n");
766 GNUNET_assert (NULL == h->mq);
767 h->mq = GNUNET_CLIENT_connect (h->cfg,
774 env = GNUNET_MQ_msg (s,
775 GNUNET_MESSAGE_TYPE_TRANSPORT_START);
779 if (NULL != h->handlers)
781 s->options = htonl (options);
783 GNUNET_MQ_send (h->mq,
789 * Disconnect from the transport service.
791 * @param h transport service to reconnect
794 disconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
796 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
801 GNUNET_MQ_destroy (h->mq);
808 * Function that will schedule the job that will try
809 * to connect us again to the client.
811 * @param h transport service to reconnect
814 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
816 GNUNET_assert (NULL == h->reconnect_task);
818 LOG (GNUNET_ERROR_TYPE_DEBUG,
819 "Scheduling task to reconnect to transport service in %s.\n",
820 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
823 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
826 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
831 * Checks if a given peer is connected to us and get the message queue.
833 * @param handle connection to transport service
834 * @param peer the peer to check
835 * @return NULL if disconnected, otherwise message queue for @a peer
837 struct GNUNET_MQ_Handle *
838 GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
839 const struct GNUNET_PeerIdentity *peer)
843 n = neighbour_find (handle,
852 * Connect to the transport service. Note that the connection may
853 * complete (or fail) asynchronously.
855 * @param cfg configuration to use
856 * @param self our own identity (API should check that it matches
857 * the identity found by transport), or NULL (no check)
858 * @param cls closure for the callbacks
859 * @param rec receive function to call
860 * @param nc function to call on connect events
861 * @param nd function to call on disconnect events
862 * @param neb function to call if we have excess bandwidth to a peer
863 * @return NULL on error
865 struct GNUNET_TRANSPORT_CoreHandle *
866 GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
867 const struct GNUNET_PeerIdentity *self,
868 const struct GNUNET_MQ_MessageHandler *handlers,
870 GNUNET_TRANSPORT_NotifyConnect nc,
871 GNUNET_TRANSPORT_NotifyDisconnect nd,
872 GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
874 struct GNUNET_TRANSPORT_CoreHandle *h;
877 h = GNUNET_new (struct GNUNET_TRANSPORT_CoreHandle);
881 h->check_self = GNUNET_YES;
888 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
889 if (NULL != handlers)
891 for (i=0;NULL != handlers[i].cb; i++) ;
892 h->handlers = GNUNET_new_array (i + 1,
893 struct GNUNET_MQ_MessageHandler);
894 GNUNET_memcpy (h->handlers,
896 i * sizeof (struct GNUNET_MQ_MessageHandler));
898 LOG (GNUNET_ERROR_TYPE_DEBUG,
899 "Connecting to transport service\n");
903 GNUNET_free_non_null (h->handlers);
908 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
915 * Disconnect from the transport service.
917 * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect()
920 GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
922 LOG (GNUNET_ERROR_TYPE_DEBUG,
923 "Transport disconnect called!\n");
924 /* this disconnects all neighbours... */
926 /* and now we stop trying to connect again... */
927 if (NULL != handle->reconnect_task)
929 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
930 handle->reconnect_task = NULL;
932 GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
933 handle->neighbours = NULL;
934 GNUNET_free_non_null (handle->handlers);
935 handle->handlers = NULL;
936 GNUNET_free (handle);
940 /* end of transport_api_core.c */