2 This file is part of GNUnet.
3 Copyright (C) 2009-2013, 2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file transport/transport_api.c
23 * @brief library to access the low-level P2P IO service
24 * @author Christian Grothoff
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_transport_service.h"
36 #include "transport.h"
38 #define LOG(kind,...) GNUNET_log_from (kind, "transport-api",__VA_ARGS__)
41 * If we could not send any payload to a peer for this amount of
42 * time, we print a warning.
44 #define UNREADY_WARN_TIME GNUNET_TIME_UNIT_MINUTES
47 * How large to start with for the hashmap of neighbours.
49 #define STARTING_NEIGHBOURS_SIZE 16
52 * Handle for a message that should be transmitted to the service.
53 * Used for both control messages and normal messages.
55 struct GNUNET_TRANSPORT_TransmitHandle
59 * We keep all requests in a DLL.
61 struct GNUNET_TRANSPORT_TransmitHandle *next;
64 * We keep all requests in a DLL.
66 struct GNUNET_TRANSPORT_TransmitHandle *prev;
69 * Neighbour for this handle, NULL for control messages.
71 struct Neighbour *neighbour;
74 * Function to call when @e notify_size bytes are available
77 GNUNET_TRANSPORT_TransmitReadyNotify notify;
80 * Closure for @e notify.
85 * Time at which this request was originally scheduled.
87 struct GNUNET_TIME_Absolute request_start;
90 * Timeout for this request, 0 for control messages.
92 struct GNUNET_TIME_Absolute timeout;
95 * Task to trigger request timeout if the request is stalled due to
98 struct GNUNET_SCHEDULER_Task *timeout_task;
101 * How many bytes is our notify callback waiting for?
109 * Entry in hash table of all of our current (connected) neighbours.
114 * Overall transport handle.
116 struct GNUNET_TRANSPORT_Handle *h;
119 * Active transmit handle or NULL.
121 struct GNUNET_TRANSPORT_TransmitHandle *th;
124 * Identity of this neighbour.
126 struct GNUNET_PeerIdentity id;
129 * Outbound bandwidh tracker.
131 struct GNUNET_BANDWIDTH_Tracker out_tracker;
134 * Entry in our readyness heap (which is sorted by @e next_ready
135 * value). NULL if there is no pending transmission request for
136 * this neighbour or if we're waiting for @e is_ready to become
137 * true AFTER the @e out_tracker suggested that this peer's quota
138 * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
139 * we should immediately go back into the heap).
141 struct GNUNET_CONTAINER_HeapNode *hn;
144 * Last time when this peer received payload from us.
146 struct GNUNET_TIME_Absolute last_payload;
149 * Task to trigger warnings if we do not get SEND_OK after a while.
151 struct GNUNET_SCHEDULER_Task *unready_warn_task;
154 * Is this peer currently ready to receive a message?
159 * Sending consumed more bytes on wire than payload was announced
160 * This overhead is added to the delay of next sending operation
162 size_t traffic_overhead;
168 * Handle for the transport service (includes all of the
169 * state for the transport service).
171 struct GNUNET_TRANSPORT_Handle
175 * Closure for the callbacks.
180 * Function to call for received data.
182 GNUNET_TRANSPORT_ReceiveCallback rec;
185 * function to call on connect events
187 GNUNET_TRANSPORT_NotifyConnect nc_cb;
190 * function to call on disconnect events
192 GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
195 * function to call on excess bandwidth events
197 GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
200 * The current HELLO message for this peer. Updated
201 * whenever transports change their addresses.
203 struct GNUNET_MessageHeader *my_hello;
206 * My client connection to the transport service.
208 struct GNUNET_MQ_Handle *mq;
213 const struct GNUNET_CONFIGURATION_Handle *cfg;
216 * Hash map of the current connected neighbours of this peer.
217 * Maps peer identities to `struct Neighbour` entries.
219 struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
222 * Heap sorting peers with pending messages by the timestamps that
223 * specify when we could next send a message to the respective peer.
224 * Excludes control messages (which can always go out immediately).
225 * Maps time stamps to `struct Neighbour` entries.
227 struct GNUNET_CONTAINER_Heap *ready_heap;
230 * Peer identity as assumed by this process, or all zeros.
232 struct GNUNET_PeerIdentity self;
235 * ID of the task trying to reconnect to the service.
237 struct GNUNET_SCHEDULER_Task *reconnect_task;
240 * ID of the task trying to trigger transmission for a peer while
241 * maintaining bandwidth quotas. In use if there are no control
242 * messages and the smallest entry in the @e ready_heap has a time
243 * stamp in the future.
245 struct GNUNET_SCHEDULER_Task *quota_task;
248 * Delay until we try to reconnect.
250 struct GNUNET_TIME_Relative reconnect_delay;
253 * Should we check that @e self matches what the service thinks?
254 * (if #GNUNET_NO, then @e self is all zeros!).
259 * Reconnect in progress
266 * Schedule the task to send one message, either from the control
267 * list or the peer message queues to the service.
269 * @param h transport service to schedule a transmission for
272 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
276 * Function that will schedule the job that will try
277 * to connect us again to the client.
279 * @param h transport service to reconnect
282 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h);
286 * A neighbour has not gotten a SEND_OK in a while. Print a warning.
288 * @param cls the `struct Neighbour`
291 do_warn_unready (void *cls)
293 struct Neighbour *n = cls;
294 struct GNUNET_TIME_Relative delay;
296 delay = GNUNET_TIME_absolute_get_duration (n->last_payload);
297 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
298 "Lacking SEND_OK, no payload could be send to %s for %s\n",
300 GNUNET_STRINGS_relative_time_to_string (delay,
303 = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
310 * Get the neighbour list entry for the given peer
312 * @param h our context
313 * @param peer peer to look up
314 * @return NULL if no such peer entry exists
316 static struct Neighbour *
317 neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
318 const struct GNUNET_PeerIdentity *peer)
320 return GNUNET_CONTAINER_multipeermap_get (h->neighbours,
326 * The outbound quota has changed in a way that may require
327 * us to reset the timeout. Update the timeout.
329 * @param cls the `struct Neighbour` for which the timeout changed
332 outbound_bw_tracker_update (void *cls)
334 struct Neighbour *n = cls;
335 struct GNUNET_TIME_Relative delay;
339 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
340 n->th->notify_size + n->traffic_overhead);
341 LOG (GNUNET_ERROR_TYPE_DEBUG,
342 "New outbound delay %s us\n",
343 GNUNET_STRINGS_relative_time_to_string (delay,
345 GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
348 schedule_transmission (n->h);
353 * Function called by the bandwidth tracker if we have excess
356 * @param cls the `struct Neighbour` that has excess bandwidth
359 notify_excess_cb (void *cls)
361 struct Neighbour *n = cls;
362 struct GNUNET_TRANSPORT_Handle *h = n->h;
364 if (NULL != h->neb_cb)
371 * Add neighbour to our list
373 * @return NULL if this API is currently disconnecting from the service
375 static struct Neighbour *
376 neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
377 const struct GNUNET_PeerIdentity *pid)
381 LOG (GNUNET_ERROR_TYPE_DEBUG,
382 "Creating entry for neighbour `%s'.\n",
384 n = GNUNET_new (struct Neighbour);
387 n->is_ready = GNUNET_YES;
388 n->traffic_overhead = 0;
389 GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
390 &outbound_bw_tracker_update,
392 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
393 MAX_BANDWIDTH_CARRY_S,
396 GNUNET_assert (GNUNET_OK ==
397 GNUNET_CONTAINER_multipeermap_put (h->neighbours,
400 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
406 * Iterator over hash map entries, for deleting state of a neighbour.
408 * @param cls the `struct GNUNET_TRANSPORT_Handle *`
409 * @param key peer identity
410 * @param value value in the hash map, the neighbour entry to delete
411 * @return #GNUNET_YES if we should continue to
416 neighbour_delete (void *cls,
417 const struct GNUNET_PeerIdentity *key,
420 struct GNUNET_TRANSPORT_Handle *handle = cls;
421 struct Neighbour *n = value;
423 LOG (GNUNET_ERROR_TYPE_DEBUG,
424 "Dropping entry for neighbour `%s'.\n",
426 GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
427 if (NULL != handle->nd_cb)
428 handle->nd_cb (handle->cls,
430 if (NULL != n->unready_warn_task)
432 GNUNET_SCHEDULER_cancel (n->unready_warn_task);
433 n->unready_warn_task = NULL;
435 GNUNET_assert (NULL == n->th);
436 GNUNET_assert (NULL == n->hn);
437 GNUNET_assert (GNUNET_YES ==
438 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours,
447 * Generic error handler, called with the appropriate
448 * error code and the same closure specified at the creation of
450 * Not every message queue implementation supports an error handler.
452 * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *`
453 * @param error error code
456 mq_error_handler (void *cls,
457 enum GNUNET_MQ_Error error)
459 struct GNUNET_TRANSPORT_Handle *h = cls;
461 LOG (GNUNET_ERROR_TYPE_DEBUG,
462 "Error receiving from transport service, disconnecting temporarily.\n");
463 h->reconnecting = GNUNET_YES;
464 disconnect_and_schedule_reconnect (h);
469 * Function we use for checking incoming HELLO messages.
471 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
472 * @param msg message received
473 * @return #GNUNET_OK if message is well-formed
476 check_hello (void *cls,
477 const struct GNUNET_MessageHeader *msg)
479 struct GNUNET_PeerIdentity me;
482 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
486 return GNUNET_SYSERR;
488 LOG (GNUNET_ERROR_TYPE_DEBUG,
489 "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
490 (unsigned int) ntohs (msg->size),
497 * Function we use for handling incoming HELLO messages.
499 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
500 * @param msg message received
503 handle_hello (void *cls,
504 const struct GNUNET_MessageHeader *msg)
506 struct GNUNET_TRANSPORT_Handle *h = cls;
508 GNUNET_free_non_null (h->my_hello);
509 h->my_hello = GNUNET_copy_message (msg);
514 * Function we use for handling incoming connect messages.
516 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
517 * @param cim message received
520 handle_connect (void *cls,
521 const struct ConnectInfoMessage *cim)
523 struct GNUNET_TRANSPORT_Handle *h = cls;
526 LOG (GNUNET_ERROR_TYPE_DEBUG,
527 "Receiving CONNECT message for `%s'.\n",
528 GNUNET_i2s (&cim->id));
529 n = neighbour_find (h, &cim->id);
533 h->reconnecting = GNUNET_YES;
534 disconnect_and_schedule_reconnect (h);
537 n = neighbour_add (h,
539 LOG (GNUNET_ERROR_TYPE_DEBUG,
540 "Receiving CONNECT message for `%s' with quota %u\n",
541 GNUNET_i2s (&cim->id),
542 ntohl (cim->quota_out.value__));
543 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
545 if (NULL != h->nc_cb)
552 * Function we use for handling incoming disconnect messages.
554 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
555 * @param dim message received
558 handle_disconnect (void *cls,
559 const struct DisconnectInfoMessage *dim)
561 struct GNUNET_TRANSPORT_Handle *h = cls;
564 GNUNET_break (ntohl (dim->reserved) == 0);
565 LOG (GNUNET_ERROR_TYPE_DEBUG,
566 "Receiving DISCONNECT message for `%s'.\n",
567 GNUNET_i2s (&dim->peer));
568 n = neighbour_find (h, &dim->peer);
572 h->reconnecting = GNUNET_YES;
573 disconnect_and_schedule_reconnect (h);
583 * Function we use for handling incoming send-ok messages.
585 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
586 * @param okm message received
589 handle_send_ok (void *cls,
590 const struct SendOkMessage *okm)
592 struct GNUNET_TRANSPORT_Handle *h = cls;
595 uint32_t bytes_physical;
597 bytes_msg = ntohl (okm->bytes_msg);
598 bytes_physical = ntohl (okm->bytes_physical);
599 LOG (GNUNET_ERROR_TYPE_DEBUG,
600 "Receiving SEND_OK message, transmission to %s %s.\n",
601 GNUNET_i2s (&okm->peer),
602 ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
604 n = neighbour_find (h,
608 /* We should never get a 'SEND_OK' for a peer that we are not
611 h->reconnecting = GNUNET_YES;
612 disconnect_and_schedule_reconnect (h);
615 if (bytes_physical > bytes_msg)
617 LOG (GNUNET_ERROR_TYPE_DEBUG,
618 "Overhead for %u byte message was %u\n",
620 bytes_physical - bytes_msg);
621 n->traffic_overhead += bytes_physical - bytes_msg;
623 GNUNET_break (GNUNET_NO == n->is_ready);
624 n->is_ready = GNUNET_YES;
625 if (NULL != n->unready_warn_task)
627 GNUNET_SCHEDULER_cancel (n->unready_warn_task);
628 n->unready_warn_task = NULL;
630 if ((NULL != n->th) && (NULL == n->hn))
632 GNUNET_assert (NULL != n->th->timeout_task);
633 GNUNET_SCHEDULER_cancel (n->th->timeout_task);
634 n->th->timeout_task = NULL;
635 /* we've been waiting for this (congestion, not quota,
636 * caused delayed transmission) */
637 n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
641 schedule_transmission (h);
646 * Function we use for checking incoming "inbound" messages.
648 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
649 * @param im message received
652 check_recv (void *cls,
653 const struct InboundMessage *im)
655 const struct GNUNET_MessageHeader *imm;
658 size = ntohs (im->header.size);
660 sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
663 return GNUNET_SYSERR;
665 imm = (const struct GNUNET_MessageHeader *) &im[1];
666 if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
669 return GNUNET_SYSERR;
676 * Function we use for handling incoming messages.
678 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
679 * @param im message received
682 handle_recv (void *cls,
683 const struct InboundMessage *im)
685 struct GNUNET_TRANSPORT_Handle *h = cls;
686 const struct GNUNET_MessageHeader *imm
687 = (const struct GNUNET_MessageHeader *) &im[1];
690 LOG (GNUNET_ERROR_TYPE_DEBUG,
691 "Received message of type %u with %u bytes from `%s'.\n",
692 (unsigned int) ntohs (imm->type),
693 (unsigned int) ntohs (imm->size),
694 GNUNET_i2s (&im->peer));
695 n = neighbour_find (h, &im->peer);
699 h->reconnecting = GNUNET_YES;
700 disconnect_and_schedule_reconnect (h);
711 * Function we use for handling incoming set quota messages.
713 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
714 * @param msg message received
717 handle_set_quota (void *cls,
718 const struct QuotaSetMessage *qm)
720 struct GNUNET_TRANSPORT_Handle *h = cls;
723 n = neighbour_find (h, &qm->peer);
727 h->reconnecting = GNUNET_YES;
728 disconnect_and_schedule_reconnect (h);
731 LOG (GNUNET_ERROR_TYPE_DEBUG,
732 "Receiving SET_QUOTA message for `%s' with quota %u\n",
733 GNUNET_i2s (&qm->peer),
734 ntohl (qm->quota.value__));
735 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
741 * A transmission request could not be satisfied because of
742 * network congestion. Notify the initiator and clean up.
744 * @param cls the `struct GNUNET_TRANSPORT_TransmitHandle`
747 timeout_request_due_to_congestion (void *cls)
749 struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
750 struct Neighbour *n = th->neighbour;
751 struct GNUNET_TIME_Relative delay;
753 n->th->timeout_task = NULL;
754 delay = GNUNET_TIME_absolute_get_duration (th->request_start);
755 LOG (GNUNET_ERROR_TYPE_WARNING,
756 "Discarding %u bytes of payload message after %s delay due to congestion\n",
758 GNUNET_STRINGS_relative_time_to_string (delay,
760 GNUNET_assert (th == n->th);
761 GNUNET_assert (NULL == n->hn);
763 th->notify (th->notify_cls,
771 * Transmit ready message(s) to service.
773 * @param h handle to transport
776 transmit_ready (struct GNUNET_TRANSPORT_Handle *h)
778 struct GNUNET_TRANSPORT_TransmitHandle *th;
779 struct GNUNET_TIME_Relative delay;
781 struct OutboundMessage *obm;
782 struct GNUNET_MQ_Envelope *env;
785 GNUNET_assert (NULL != h->mq);
786 while (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
789 if (GNUNET_YES != n->is_ready)
791 /* peer not ready, wait for notification! */
792 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
794 GNUNET_assert (NULL == n->th->timeout_task);
796 = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
798 &timeout_request_due_to_congestion,
802 if (GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
803 th->notify_size).rel_value_us > 0)
804 break; /* too early */
805 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
808 env = GNUNET_MQ_msg_extra (obm,
810 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
811 mret = th->notify (th->notify_cls,
817 GNUNET_MQ_discard (env);
820 obm->header.size = htons (mret + sizeof (*obm));
821 if (NULL != n->unready_warn_task)
823 = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
826 n->last_payload = GNUNET_TIME_absolute_get ();
827 n->is_ready = GNUNET_NO;
828 obm->reserved = htonl (0);
830 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
833 GNUNET_MQ_send (h->mq,
835 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
837 delay = GNUNET_TIME_absolute_get_duration (th->request_start);
838 if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
839 LOG (GNUNET_ERROR_TYPE_WARNING,
840 "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
843 GNUNET_STRINGS_relative_time_to_string (delay,
845 (unsigned int) n->out_tracker.available_bytes_per_s__);
847 LOG (GNUNET_ERROR_TYPE_DEBUG,
848 "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
851 GNUNET_STRINGS_relative_time_to_string (delay,
853 (unsigned int) n->out_tracker.available_bytes_per_s__);
856 /* if there are more pending messages, try to schedule those */
857 schedule_transmission (h);
862 * Schedule the task to send one message, either from the control
863 * list or the peer message queues to the service.
865 * @param cls transport service to schedule a transmission for
868 schedule_transmission_task (void *cls)
870 struct GNUNET_TRANSPORT_Handle *h = cls;
871 struct GNUNET_TRANSPORT_TransmitHandle *th;
874 h->quota_task = NULL;
875 GNUNET_assert (NULL != h->mq);
876 /* destroy all requests that have timed out */
877 while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
878 (0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) )
880 /* notify client that the request could not be satisfied within
881 * the given time constraints */
884 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
886 LOG (GNUNET_ERROR_TYPE_DEBUG,
887 "Signalling timeout for transmission to peer %s due to congestion\n",
888 GNUNET_i2s (&n->id));
889 GNUNET_assert (0 == th->notify (th->notify_cls,
894 n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
896 return; /* no pending messages */
897 LOG (GNUNET_ERROR_TYPE_DEBUG,
898 "Calling notify_transmit_ready\n");
904 * Schedule the task to send one message, either from the control
905 * list or the peer message queues to the service.
907 * @param h transport service to schedule a transmission for
910 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
912 struct GNUNET_TIME_Relative delay;
915 GNUNET_assert (NULL != h->mq);
916 if (NULL != h->quota_task)
918 GNUNET_SCHEDULER_cancel (h->quota_task);
919 h->quota_task = NULL;
921 if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
924 GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
925 n->th->notify_size + n->traffic_overhead);
926 n->traffic_overhead = 0;
930 LOG (GNUNET_ERROR_TYPE_DEBUG,
931 "No work to be done, not scheduling transmission.\n");
932 return; /* no work to be done */
934 LOG (GNUNET_ERROR_TYPE_DEBUG,
935 "Scheduling next transmission to service in %s\n",
936 GNUNET_STRINGS_relative_time_to_string (delay,
939 GNUNET_SCHEDULER_add_delayed (delay,
940 &schedule_transmission_task,
946 * Try again to connect to transport service.
948 * @param cls the handle to the transport service
951 reconnect (void *cls)
953 GNUNET_MQ_hd_var_size (hello,
954 GNUNET_MESSAGE_TYPE_HELLO,
955 struct GNUNET_MessageHeader);
956 GNUNET_MQ_hd_fixed_size (connect,
957 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
958 struct ConnectInfoMessage);
959 GNUNET_MQ_hd_fixed_size (disconnect,
960 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
961 struct DisconnectInfoMessage);
962 GNUNET_MQ_hd_fixed_size (send_ok,
963 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
964 struct SendOkMessage);
965 GNUNET_MQ_hd_var_size (recv,
966 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
967 struct InboundMessage);
968 GNUNET_MQ_hd_fixed_size (set_quota,
969 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
970 struct QuotaSetMessage);
971 struct GNUNET_TRANSPORT_Handle *h = cls;
972 struct GNUNET_MQ_MessageHandler handlers[] = {
973 make_hello_handler (h),
974 make_connect_handler (h),
975 make_disconnect_handler (h),
976 make_send_ok_handler (h),
977 make_recv_handler (h),
978 make_set_quota_handler (h),
979 GNUNET_MQ_handler_end ()
981 struct GNUNET_MQ_Envelope *env;
982 struct StartMessage *s;
985 h->reconnect_task = NULL;
986 LOG (GNUNET_ERROR_TYPE_DEBUG,
987 "Connecting to transport service.\n");
988 GNUNET_assert (NULL == h->mq);
989 h->reconnecting = GNUNET_NO;
990 h->mq = GNUNET_CLIENT_connecT (h->cfg,
997 env = GNUNET_MQ_msg (s,
998 GNUNET_MESSAGE_TYPE_TRANSPORT_START);
1004 s->options = htonl (options);
1006 GNUNET_MQ_send (h->mq,
1012 * Function that will schedule the job that will try
1013 * to connect us again to the client.
1015 * @param h transport service to reconnect
1018 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1020 GNUNET_assert (NULL == h->reconnect_task);
1023 GNUNET_MQ_destroy (h->mq);
1026 /* Forget about all neighbours that we used to be connected to */
1027 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
1030 if (NULL != h->quota_task)
1032 GNUNET_SCHEDULER_cancel (h->quota_task);
1033 h->quota_task = NULL;
1035 LOG (GNUNET_ERROR_TYPE_DEBUG,
1036 "Scheduling task to reconnect to transport service in %s.\n",
1037 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
1040 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
1043 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
1048 * Set transport metrics for a peer and a direction.
1050 * @param handle transport handle
1051 * @param peer the peer to set the metric for
1052 * @param prop the performance metrics to set
1053 * @param delay_in inbound delay to introduce
1054 * @param delay_out outbound delay to introduce
1056 * Note: Delay restrictions in receiving direction will be enforced
1057 * with one message delay.
1060 GNUNET_TRANSPORT_set_traffic_metric (struct GNUNET_TRANSPORT_Handle *handle,
1061 const struct GNUNET_PeerIdentity *peer,
1062 const struct GNUNET_ATS_Properties *prop,
1063 struct GNUNET_TIME_Relative delay_in,
1064 struct GNUNET_TIME_Relative delay_out)
1066 struct GNUNET_MQ_Envelope *env;
1067 struct TrafficMetricMessage *msg;
1069 if (NULL == handle->mq)
1071 env = GNUNET_MQ_msg (msg,
1072 GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
1073 msg->reserved = htonl (0);
1075 GNUNET_ATS_properties_hton (&msg->properties,
1077 msg->delay_in = GNUNET_TIME_relative_hton (delay_in);
1078 msg->delay_out = GNUNET_TIME_relative_hton (delay_out);
1079 GNUNET_MQ_send (handle->mq,
1085 * Checks if a given peer is connected to us
1087 * @param handle connection to transport service
1088 * @param peer the peer to check
1089 * @return #GNUNET_YES (connected) or #GNUNET_NO (disconnected)
1092 GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle,
1093 const struct GNUNET_PeerIdentity *peer)
1096 GNUNET_CONTAINER_multipeermap_contains (handle->neighbours,
1104 * Connect to the transport service. Note that the connection may
1105 * complete (or fail) asynchronously.
1107 * @param cfg configuration to use
1108 * @param self our own identity (API should check that it matches
1109 * the identity found by transport), or NULL (no check)
1110 * @param cls closure for the callbacks
1111 * @param rec receive function to call
1112 * @param nc function to call on connect events
1113 * @param nd function to call on disconnect events
1114 * @return NULL on error
1116 struct GNUNET_TRANSPORT_Handle *
1117 GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1118 const struct GNUNET_PeerIdentity *self,
1120 GNUNET_TRANSPORT_ReceiveCallback rec,
1121 GNUNET_TRANSPORT_NotifyConnect nc,
1122 GNUNET_TRANSPORT_NotifyDisconnect nd)
1124 return GNUNET_TRANSPORT_connect2 (cfg,
1135 * Connect to the transport service. Note that the connection may
1136 * complete (or fail) asynchronously.
1138 * @param cfg configuration to use
1139 * @param self our own identity (API should check that it matches
1140 * the identity found by transport), or NULL (no check)
1141 * @param cls closure for the callbacks
1142 * @param rec receive function to call
1143 * @param nc function to call on connect events
1144 * @param nd function to call on disconnect events
1145 * @param neb function to call if we have excess bandwidth to a peer
1146 * @return NULL on error
1148 struct GNUNET_TRANSPORT_Handle *
1149 GNUNET_TRANSPORT_connect2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
1150 const struct GNUNET_PeerIdentity *self,
1152 GNUNET_TRANSPORT_ReceiveCallback rec,
1153 GNUNET_TRANSPORT_NotifyConnect nc,
1154 GNUNET_TRANSPORT_NotifyDisconnect nd,
1155 GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
1157 struct GNUNET_TRANSPORT_Handle *h;
1159 h = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
1163 h->check_self = GNUNET_YES;
1171 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1172 LOG (GNUNET_ERROR_TYPE_DEBUG,
1173 "Connecting to transport service.\n");
1181 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
1184 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1190 * Disconnect from the transport service.
1192 * @param handle handle to the service as returned from #GNUNET_TRANSPORT_connect()
1195 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1197 LOG (GNUNET_ERROR_TYPE_DEBUG,
1198 "Transport disconnect called!\n");
1199 /* this disconnects all neighbours... */
1200 if (NULL == handle->reconnect_task)
1201 disconnect_and_schedule_reconnect (handle);
1202 /* and now we stop trying to connect again... */
1203 if (NULL != handle->reconnect_task)
1205 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1206 handle->reconnect_task = NULL;
1208 GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
1209 handle->neighbours = NULL;
1210 if (NULL != handle->quota_task)
1212 GNUNET_SCHEDULER_cancel (handle->quota_task);
1213 handle->quota_task = NULL;
1215 GNUNET_free_non_null (handle->my_hello);
1216 handle->my_hello = NULL;
1217 GNUNET_CONTAINER_heap_destroy (handle->ready_heap);
1218 handle->ready_heap = NULL;
1219 GNUNET_free (handle);
1224 * Check if we could queue a message of the given size for
1225 * transmission. The transport service will take both its
1226 * internal buffers and bandwidth limits imposed by the
1227 * other peer into consideration when answering this query.
1229 * @param handle connection to transport service
1230 * @param target who should receive the message
1231 * @param size how big is the message we want to transmit?
1232 * @param timeout after how long should we give up (and call
1233 * notify with buf NULL and size 0)?
1234 * @param notify function to call when we are ready to
1235 * send such a message
1236 * @param notify_cls closure for @a notify
1237 * @return NULL if someone else is already waiting to be notified
1238 * non-NULL if the notify callback was queued (can be used to cancel
1239 * using #GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1241 struct GNUNET_TRANSPORT_TransmitHandle *
1242 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
1243 const struct GNUNET_PeerIdentity *target,
1245 struct GNUNET_TIME_Relative timeout,
1246 GNUNET_TRANSPORT_TransmitReadyNotify notify,
1249 struct Neighbour *n;
1250 struct GNUNET_TRANSPORT_TransmitHandle *th;
1251 struct GNUNET_TIME_Relative delay;
1253 n = neighbour_find (handle, target);
1256 /* only use this function
1257 * once a connection has been established */
1263 /* attempt to send two messages at the same time to the same peer */
1267 GNUNET_assert (NULL == n->hn);
1268 th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
1270 th->notify = notify;
1271 th->notify_cls = notify_cls;
1272 th->request_start = GNUNET_TIME_absolute_get ();
1273 th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1274 th->notify_size = size;
1276 /* calculate when our transmission should be ready */
1277 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
1278 size + n->traffic_overhead);
1279 n->traffic_overhead = 0;
1280 if (delay.rel_value_us > timeout.rel_value_us)
1281 delay.rel_value_us = 0; /* notify immediately (with failure) */
1282 if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1283 LOG (GNUNET_ERROR_TYPE_WARNING,
1284 "At bandwidth %u byte/s next transmission to %s in %s\n",
1285 (unsigned int) n->out_tracker.available_bytes_per_s__,
1286 GNUNET_i2s (target),
1287 GNUNET_STRINGS_relative_time_to_string (delay,
1290 LOG (GNUNET_ERROR_TYPE_DEBUG,
1291 "At bandwidth %u byte/s next transmission to %s in %s\n",
1292 (unsigned int) n->out_tracker.available_bytes_per_s__,
1293 GNUNET_i2s (target),
1294 GNUNET_STRINGS_relative_time_to_string (delay,
1296 n->hn = GNUNET_CONTAINER_heap_insert (handle->ready_heap,
1298 delay.rel_value_us);
1299 schedule_transmission (handle);
1305 * Cancel the specified transmission-ready notification.
1307 * @param th handle returned from #GNUNET_TRANSPORT_notify_transmit_ready()
1310 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th)
1312 struct Neighbour *n;
1314 GNUNET_assert (NULL == th->next);
1315 GNUNET_assert (NULL == th->prev);
1317 GNUNET_assert (th == n->th);
1321 GNUNET_CONTAINER_heap_remove_node (n->hn);
1326 GNUNET_assert (NULL != th->timeout_task);
1327 GNUNET_SCHEDULER_cancel (th->timeout_task);
1328 th->timeout_task = NULL;
1334 /* end of transport_api.c */