X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftransport%2Ftransport_api.c;h=8d08f82d9d3f516796604a662dcc9b977b3992af;hb=b547897ce3fe3ae2a129afb36bf26eadfb695eef;hp=f7ce56afc0e95d42bd4e70e6b4ccf1bbf44674bc;hpb=9c4a3cd49e5bcf2ba9995c0e344498060ac0a466;p=oweals%2Fgnunet.git diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c index f7ce56afc..8d08f82d9 100644 --- a/src/transport/transport_api.c +++ b/src/transport/transport_api.c @@ -43,6 +43,12 @@ */ #define OFFER_HELLO_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) +/** + * After how long do we give automatically retry an unsuccessful + * CONNECT request? + */ +#define CONNECT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 750) + /** * How long should ARM wait when starting up the * transport service before reporting back? @@ -74,7 +80,6 @@ struct NeighbourList */ struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle; - /** * Identity of this neighbour. */ @@ -92,7 +97,7 @@ struct NeighbourList uint64_t last_sent; /** - * Global quota for outbound traffic to the neighbour in bytes/ms. + * Quota for outbound traffic to the neighbour in bytes/ms. */ uint32_t quota_out; @@ -198,7 +203,7 @@ struct GNUNET_TRANSPORT_TransmitHandle * Function to call when notify_size bytes are available * for transmission. */ - GNUNET_NETWORK_TransmitReadyNotify notify; + GNUNET_CONNECTION_TransmitReadyNotify notify; /** * Closure for notify. @@ -206,9 +211,11 @@ struct GNUNET_TRANSPORT_TransmitHandle void *notify_cls; /** - * transmit_ready task Id. The task is used to introduce - * the artificial delay that may be required to maintain - * the bandwidth limits. + * transmit_ready task Id. The task is used to introduce the + * artificial delay that may be required to maintain the bandwidth + * limits. Later, this will be the ID of the "transmit_timeout" + * task which is used to signal a timeout if the transmission could + * not be done in a timely fashion. */ GNUNET_SCHEDULER_TaskIdentifier notify_delay_task; @@ -222,6 +229,11 @@ struct GNUNET_TRANSPORT_TransmitHandle */ size_t notify_size; + /** + * How important is this message? + */ + unsigned int priority; + }; @@ -266,7 +278,7 @@ struct GNUNET_TRANSPORT_Handle /** * Handle to our registration with the client for notification. */ - struct GNUNET_NETWORK_TransmitHandle *network_handle; + struct GNUNET_CONNECTION_TransmitHandle *network_handle; /** * Linked list of transmit handles that are waiting for the @@ -301,7 +313,7 @@ struct GNUNET_TRANSPORT_Handle /** * My configuration. */ - struct GNUNET_CONFIGURATION_Handle *cfg; + const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Linked list of the current neighbours of this peer. @@ -361,14 +373,14 @@ transport_notify_ready (void *cls, size_t size, void *buf) size_t ret; char *cbuf; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Ready to transmit %u bytes to transport service\n", size); -#endif h->network_handle = NULL; h->transmission_scheduled = GNUNET_NO; if (buf == NULL) { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Could not transmit to transport service, cancelling pending requests\n"); +#endif th = h->connect_ready_head; if (th->next != NULL) th->next->prev = NULL; @@ -378,10 +390,21 @@ transport_notify_ready (void *cls, size_t size, void *buf) GNUNET_assert (n->transmit_handle == th); n->transmit_handle = NULL; } + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->sched, + th->notify_delay_task); + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); GNUNET_free (th); + if (h->connect_ready_head != NULL) schedule_transmission (h); /* FIXME: is this ok? */ return 0; - } + } +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Ready to transmit %u bytes to transport service\n", size); +#endif cbuf = buf; ret = 0; h->network_handle = NULL; @@ -389,6 +412,12 @@ transport_notify_ready (void *cls, size_t size, void *buf) do { th = h->connect_ready_head; + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->sched, + th->notify_delay_task); + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } GNUNET_assert (th->notify_size <= size); if (th->next != NULL) th->next->prev = NULL; @@ -428,27 +457,22 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h) GNUNET_assert (NULL == h->network_handle); if (h->client == NULL) { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not yet connected to transport service, need to wait.\n"); -#endif - return; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Could not yet schedule transmission: we are not yet connected to the transport service!\n"); + return; /* not yet connected */ } th = h->connect_ready_head; if (th == NULL) + return; /* no request pending */ + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Schedule transmission called, but no request is pending.\n"); -#endif - return; + /* remove existing time out task, will be integrated + with transmit_ready notification! */ + GNUNET_SCHEDULER_cancel (h->sched, + th->notify_delay_task); + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; } h->transmission_scheduled = GNUNET_YES; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking client API for transmission of %u bytes\n", - th->notify_size); -#endif h->network_handle = GNUNET_CLIENT_notify_transmit_ready (h->client, th->notify_size, GNUNET_TIME_absolute_get_remaining @@ -498,6 +522,78 @@ insert_transmit_handle (struct GNUNET_TRANSPORT_TransmitHandle **head, } +/** + * Cancel a pending notify delay task (if pending) and also remove the + * given transmit handle from whatever list is on. + * + * @param th handle for the transmission request to manipulate + */ +static void +remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th) +{ + struct GNUNET_TRANSPORT_Handle *h; + + h = th->handle; + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } + if (th->prev == NULL) + { + if (th == h->connect_wait_head) + h->connect_wait_head = th->next; + else + h->connect_ready_head = th->next; + } + else + { + th->prev->next = th->next; + } + if (th->next != NULL) + th->next->prev = th->prev; +} + + +/** + * Schedule a request to connect to the given + * neighbour (and if successful, add the specified + * handle to the wait list). + * + * @param th handle for a request to transmit once we + * have connected + */ +static void +try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th); + + +/** + * Called when our transmit request timed out before any transport + * reported success connecting to the desired peer or before the + * transport was ready to receive. Signal error and free + * TransmitHandle. + */ +static void +peer_transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_TRANSPORT_TransmitHandle *th = cls; + + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + if (th->neighbour != NULL) + th->neighbour->transmit_handle = NULL; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Request for transmission to peer `%s' timed out.\n", + GNUNET_i2s(&th->target)); +#endif + remove_from_any_list (th); + th->notify (th->notify_cls, 0, NULL); + GNUNET_free (th); +} + + + + /** * Queue control request for transmission to the transport * service. @@ -514,20 +610,24 @@ schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h, size_t size, int at_head, struct GNUNET_TIME_Relative timeout, - GNUNET_NETWORK_TransmitReadyNotify notify, + GNUNET_CONNECTION_TransmitReadyNotify notify, void *notify_cls) { struct GNUNET_TRANSPORT_TransmitHandle *th; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queueing %u bytes control transmission request.\n", size); -#endif + th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle)); th->handle = h; th->notify = notify; th->notify_cls = notify_cls; th->timeout = GNUNET_TIME_relative_to_absolute (timeout); th->notify_size = size; + th->notify_delay_task + = GNUNET_SCHEDULER_add_delayed (h->sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_KEEP, + GNUNET_SCHEDULER_NO_TASK, + timeout, + &peer_transmit_timeout, th); if (at_head) { th->next = h->connect_ready_head; @@ -690,6 +790,27 @@ hello_wait_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct HelloWaitList *pos; struct HelloWaitList *prev; + hwl->task = GNUNET_SCHEDULER_NO_TASK; + if (GNUNET_TIME_absolute_get_remaining (hwl->timeout).value > 0) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + _("First attempt to obtain `%s' from transport service failed, will try again for %llums.\n"), + "HELLO", + GNUNET_TIME_absolute_get_remaining (hwl->timeout).value); +#endif + hwl->task = GNUNET_SCHEDULER_add_delayed (hwl->handle->sched, + GNUNET_YES, + GNUNET_SCHEDULER_PRIORITY_KEEP, + GNUNET_SCHEDULER_NO_TASK, + GNUNET_TIME_absolute_get_remaining (hwl->timeout), + &hello_wait_timeout, hwl); + return; + } + /* signal timeout */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Timeout trying to obtain `%s' from transport service.\n"), + "HELLO"); prev = NULL; pos = hwl->handle->hwl_head; while (pos != hwl) @@ -702,10 +823,6 @@ hello_wait_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) hwl->handle->hwl_head = hwl->next; else prev->next = hwl->next; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Timeout trying to obtain `%s' from transport service.\n"), - "HELLO"); - /* signal timeout */ if (hwl->rec != NULL) hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL); GNUNET_free (hwl); @@ -745,7 +862,7 @@ GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle, hwl->task = GNUNET_SCHEDULER_add_delayed (handle->sched, GNUNET_YES, GNUNET_SCHEDULER_PRIORITY_KEEP, - GNUNET_SCHEDULER_NO_PREREQUISITE_TASK, + GNUNET_SCHEDULER_NO_TASK, timeout, &hello_wait_timeout, hwl); return; @@ -770,7 +887,7 @@ send_hello (void *cls, size_t size, void *buf) if (buf == NULL) { #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Timeout while trying to transmit `%s' request.\n", "HELLO"); #endif @@ -807,7 +924,7 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, if (handle->client == NULL) { #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Not connected to transport service, dropping offered HELLO\n"); #endif return; @@ -835,7 +952,14 @@ send_start (void *cls, size_t size, void *buf) struct GNUNET_MessageHeader *s = buf; if (buf == NULL) - return 0; + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Timeout while trying to transmit `%s' request.\n", + "START"); +#endif + return 0; + } #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' request.\n", "START"); @@ -847,6 +971,175 @@ send_start (void *cls, size_t size, void *buf) } +/** + * We're ready to transmit the request that the transport service + * should connect to a new peer. In addition to sending the + * request, schedule the next phase for the transmission processing + * that caused the connect request in the first place. + */ +static size_t +request_connect (void *cls, size_t size, void *buf) +{ + struct GNUNET_TRANSPORT_TransmitHandle *th = cls; + struct TryConnectMessage *tcm; + struct GNUNET_TRANSPORT_Handle *h; + + GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK); + h = th->handle; + if (buf == NULL) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to transmit `%s' request for `%4s' to service.\n", + "TRY_CONNECT", + GNUNET_i2s(&th->target)); +#endif + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } + th->notify (th->notify_cls, 0, NULL); + GNUNET_free (th); + return 0; + } +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmitting `%s' message for `%4s'.\n", + "TRY_CONNECT", GNUNET_i2s (&th->target)); +#endif + GNUNET_assert (size >= sizeof (struct TryConnectMessage)); + tcm = buf; + tcm->header.size = htons (sizeof (struct TryConnectMessage)); + tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT); + tcm->reserved = htonl (0); + memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity)); + th->notify_delay_task + = GNUNET_SCHEDULER_add_delayed (h->sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_KEEP, + GNUNET_SCHEDULER_NO_TASK, + GNUNET_TIME_absolute_get_remaining + (th->timeout), + &peer_transmit_timeout, th); + insert_transmit_handle (&h->connect_wait_head, th); + return sizeof (struct TryConnectMessage); +} + + +/** + * Schedule a request to connect to the given + * neighbour (and if successful, add the specified + * handle to the wait list). + * + * @param th handle for a request to transmit once we + * have connected + */ +static void +try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th) +{ + GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK); + schedule_control_transmit (th->handle, + sizeof (struct TryConnectMessage), + GNUNET_NO, + GNUNET_TIME_absolute_get_remaining (th->timeout), + &request_connect, th); +} + + +/** + * Task for delayed attempts to reconnect to a peer. + * + * @param cls must be a transmit handle that determines the peer + * to which we will try to connect + * @param tc scheduler information about why we were triggered (not used) + */ +static void +try_connect_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_TRANSPORT_TransmitHandle *th = cls; + + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + try_connect (th); +} + + +/** + * Remove neighbour from our list. Will automatically + * trigger a re-connect attempt if we have messages pending + * for this peer. + * + * @param h our state + * @param peer the peer to remove + */ +static void +remove_neighbour (struct GNUNET_TRANSPORT_Handle *h, + const struct GNUNET_PeerIdentity *peer) +{ + struct NeighbourList *prev; + struct NeighbourList *pos; + struct GNUNET_TRANSPORT_TransmitHandle *th; + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Removing neighbour `%s' from list of connected peers.\n", + GNUNET_i2s (peer)); +#endif + prev = NULL; + pos = h->neighbours; + while ((pos != NULL) && + (0 != memcmp (peer, + &pos->id, + sizeof (struct GNUNET_PeerIdentity)))) + { + prev = pos; + pos = pos->next; + } + if (pos == NULL) + { + GNUNET_break (0); + return; + } + if (prev == NULL) + h->neighbours = pos->next; + else + prev->next = pos->next; + if (NULL != (th = pos->transmit_handle)) + { + pos->transmit_handle = NULL; + th->neighbour = NULL; + remove_from_any_list (th); + if (GNUNET_TIME_absolute_get_remaining (th->timeout).value <= CONNECT_RETRY_TIMEOUT.value) + { + /* signal error */ + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task); + peer_transmit_timeout (th, NULL); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Connection with `%4s' failed, will keep trying for %llu ms to deliver message\n"), + GNUNET_i2s (peer), + GNUNET_TIME_absolute_get_remaining (th->timeout).value); + /* try again in a bit */ + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task); + th->notify_delay_task + = GNUNET_SCHEDULER_add_delayed (h->sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_KEEP, + GNUNET_SCHEDULER_NO_TASK, + CONNECT_RETRY_TIMEOUT, + &try_connect_task, + th); + } + } + if (h->nc_cb != NULL) + h->nd_cb (h->cls, peer); + GNUNET_free (pos); +} + + /** * Try again to connect to transport service. */ @@ -857,27 +1150,18 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct GNUNET_TRANSPORT_TransmitHandle *pos; struct NeighbourList *n; + fprintf (stderr, + "Trying to reconnect to transport!\n"); + + /* Forget about all neighbours that we used to be connected + to */ while (NULL != (n = h->neighbours)) - { - h->neighbours = n->next; - pos = n->transmit_handle; - if (pos != NULL) - { - pos->neighbour = NULL; - pos->next = h->connect_wait_head; - h->connect_wait_head = pos; - if (pos->next != NULL) - pos->next->prev = pos; - pos->prev = NULL; - } - GNUNET_free (n); - } - h->connect_ready_head = NULL; + remove_neighbour (h, &n->id); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); #endif GNUNET_assert (h->client == NULL); - h->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK; + h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg); GNUNET_assert (h->client != NULL); /* make sure we don't send "START" twice, @@ -902,7 +1186,8 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) schedule_control_transmit (h, sizeof (struct GNUNET_MessageHeader), GNUNET_YES, - GNUNET_TIME_UNIT_FOREVER_REL, &send_start, NULL); + GNUNET_TIME_UNIT_FOREVER_REL, + &send_start, NULL); GNUNET_CLIENT_receive (h->client, &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL); } @@ -921,33 +1206,17 @@ schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) h->reconnect_delay.value); #endif GNUNET_assert (h->client == NULL); - GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK); + GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK); h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched, GNUNET_NO, GNUNET_SCHEDULER_PRIORITY_DEFAULT, - GNUNET_SCHEDULER_NO_PREREQUISITE_TASK, + GNUNET_SCHEDULER_NO_TASK, h->reconnect_delay, &reconnect, h); h->reconnect_delay = GNUNET_TIME_UNIT_SECONDS; } -/** - * Remove the given transmit handle from the wait list. Does NOT free - * it. - */ -static void -remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th) -{ - if (th->prev == NULL) - th->handle->connect_wait_head = th->next; - else - th->prev->next = th->next; - if (th->next != NULL) - th->next->prev = th->prev; -} - - /** * We are connected to the respective peer, check the * bandwidth limits and schedule the transmission. @@ -965,31 +1234,24 @@ transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_TRANSPORT_TransmitHandle *th = cls; - th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK; + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; schedule_request (th); } /** - * Called when our transmit request timed out before any transport - * reported success connecting to the desired peer or before the - * transport was ready to receive. Signal error and free - * TransmitHandle. + * Remove the given transmit handle from the wait list. Does NOT free + * it. */ static void -transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th) { - struct GNUNET_TRANSPORT_TransmitHandle *th = cls; - - if (th->neighbour != NULL) - th->neighbour->transmit_handle = NULL; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission request timed out.\n"); -#endif - th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK; - remove_from_wait_list (th); - th->notify (th->notify_cls, 0, NULL); - GNUNET_free (th); + if (th->prev == NULL) + th->handle->connect_wait_head = th->next; + else + th->prev->next = th->next; + if (th->next != NULL) + th->next->prev = th->prev; } @@ -1007,10 +1269,10 @@ schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th) h = th->handle; n = th->neighbour; - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK) + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK; + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; } /* check outgoing quota */ duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update); @@ -1034,8 +1296,9 @@ schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th) /* signal timeout! */ #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Would need %llu ms before bandwidth is available for delivery, that is too long. Signaling timeout.\n", - duration.value); + "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n", + duration.value, + GNUNET_i2s(&th->target)); #endif remove_from_wait_list (th); th->notify (th->notify_cls, 0, NULL); @@ -1044,14 +1307,15 @@ schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th) } #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Need more bandwidth, delaying delivery by %llu ms\n", + "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n", + GNUNET_i2s(&th->target), duration.value); #endif th->notify_delay_task = GNUNET_SCHEDULER_add_delayed (h->sched, GNUNET_NO, GNUNET_SCHEDULER_PRIORITY_KEEP, - GNUNET_SCHEDULER_NO_PREREQUISITE_TASK, + GNUNET_SCHEDULER_NO_TASK, duration, &transmit_ready, th); return; } @@ -1073,15 +1337,16 @@ schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th) = GNUNET_SCHEDULER_add_delayed (h->sched, GNUNET_NO, GNUNET_SCHEDULER_PRIORITY_KEEP, - GNUNET_SCHEDULER_NO_PREREQUISITE_TASK, + GNUNET_SCHEDULER_NO_TASK, GNUNET_TIME_absolute_get_remaining - (th->timeout), &transmit_timeout, th); + (th->timeout), &peer_transmit_timeout, th); return; } n->transmit_ok = GNUNET_NO; remove_from_wait_list (th); #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Moving message to ready list\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Moving message for `%4s' to ready list\n", + GNUNET_i2s(&n->id)); #endif insert_transmit_handle (&h->connect_ready_head, th); if (GNUNET_NO == h->transmission_scheduled) @@ -1127,55 +1392,39 @@ add_neighbour (struct GNUNET_TRANSPORT_Handle *h, while (pos != NULL) { next = pos->next; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found entry in connect_wait_head for `%4s'.\n", - GNUNET_i2s (&pos->target)); -#endif if (0 == memcmp (pid, &pos->target, sizeof (struct GNUNET_PeerIdentity))) { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found pending request for new connection, will trigger now.\n"); -#endif pos->neighbour = n; - if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task); - pos->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK; - } GNUNET_assert (NULL == n->transmit_handle); n->transmit_handle = pos; + if (prev == NULL) + h->connect_wait_head = next; + else + prev->next = next; if (GNUNET_YES == n->received_ack) { #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s' already received, scheduling request\n", - "ACK"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found pending request for `%4s' will trigger it now.\n", + GNUNET_i2s (&pos->target)); #endif + if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task); + pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } schedule_request (pos); } else { #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Still need to wait to receive `%s' message\n", - "ACK"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found pending request for `%4s' but still need `%s' before proceeding.\n", + GNUNET_i2s (&pos->target), + "ACK"); #endif - pos->notify_delay_task - = GNUNET_SCHEDULER_add_delayed (h->sched, - GNUNET_NO, - GNUNET_SCHEDULER_PRIORITY_KEEP, - GNUNET_SCHEDULER_NO_PREREQUISITE_TASK, - GNUNET_TIME_absolute_get_remaining - (pos->timeout), - &transmit_timeout, pos); } - if (prev == NULL) - h->connect_wait_head = next; - else - prev->next = next; break; } prev = pos; @@ -1198,7 +1447,7 @@ add_neighbour (struct GNUNET_TRANSPORT_Handle *h, */ struct GNUNET_TRANSPORT_Handle * GNUNET_TRANSPORT_connect (struct GNUNET_SCHEDULER_Handle *sched, - struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls, GNUNET_TRANSPORT_ReceiveCallback rec, GNUNET_TRANSPORT_NotifyConnect nc, @@ -1252,22 +1501,23 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) struct GNUNET_CLIENT_Connection *client; #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transport disconnect called!\n"); #endif while (NULL != (th = handle->connect_ready_head)) { handle->connect_ready_head = th->next; + GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK); th->notify (th->notify_cls, 0, NULL); GNUNET_free (th); } - while (NULL != (th = handle->connect_wait_head)) { handle->connect_wait_head = th->next; - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK) + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK; + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; } th->notify (th->notify_cls, 0, NULL); GNUNET_free (th); @@ -1282,16 +1532,16 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) handle->hwl_head = hwl->next; GNUNET_SCHEDULER_cancel (handle->sched, hwl->task); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _ - ("Disconnect while trying to obtain HELLO from transport service.\n")); + _("Disconnect while trying to obtain `%s' from transport service.\n"), + "HELLO"); if (hwl->rec != NULL) hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL); GNUNET_free (hwl); } - if (handle->reconnect_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK) + if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task); - handle->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK; + handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK; } GNUNET_free_non_null (handle->my_hello); handle->my_hello = NULL; @@ -1310,140 +1560,6 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) } -/** - * We're ready to transmit the request that the transport service - * should connect to a new peer. In addition to sending the - * request, schedule the next phase for the transmission processing - * that caused the connect request in the first place. - */ -static size_t -request_connect (void *cls, size_t size, void *buf) -{ - struct GNUNET_TRANSPORT_TransmitHandle *th = cls; - struct TryConnectMessage *tcm; - struct GNUNET_TRANSPORT_Handle *h; - - h = th->handle; - if (buf == NULL) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit connect request to service.\n"); -#endif - th->notify (th->notify_cls, 0, NULL); - GNUNET_free (th); - return 0; - } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting `%s' message for `%4s'.\n", - "TRY_CONNECT", GNUNET_i2s (&th->target)); -#endif - GNUNET_assert (size >= sizeof (struct TryConnectMessage)); - tcm = buf; - tcm->header.size = htons (sizeof (struct TryConnectMessage)); - tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT); - tcm->reserved = htonl (0); - memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity)); - th->notify_delay_task - = GNUNET_SCHEDULER_add_delayed (h->sched, - GNUNET_NO, - GNUNET_SCHEDULER_PRIORITY_KEEP, - GNUNET_SCHEDULER_NO_PREREQUISITE_TASK, - GNUNET_TIME_absolute_get_remaining - (th->timeout), &transmit_timeout, th); - insert_transmit_handle (&h->connect_wait_head, th); - return sizeof (struct TryConnectMessage); -} - - -/** - * Schedule a request to connect to the given - * neighbour (and if successful, add the specified - * handle to the wait list). - */ -static void -try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th) -{ - schedule_control_transmit (th->handle, - sizeof (struct TryConnectMessage), - GNUNET_NO, - GNUNET_TIME_absolute_get_remaining (th->timeout), - &request_connect, th); -} - - -/** - * Cancel a pending notify transmit task - * and also remove the given transmit handle - * from whatever list is on. - */ -static void -remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th) -{ - struct GNUNET_TRANSPORT_Handle *h; - - h = th->handle; - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK; - } - if (th->prev == NULL) - { - if (th == h->connect_wait_head) - h->connect_wait_head = th->next; - else - h->connect_ready_head = th->next; - } - else - th->prev->next = th->next; - if (th->next != NULL) - th->next->prev = th->prev; -} - - -/** - * Remove neighbour from our list - */ -static void -remove_neighbour (struct GNUNET_TRANSPORT_Handle *h, - const struct GNUNET_PeerIdentity *peer) -{ - struct NeighbourList *prev; - struct NeighbourList *pos; - struct GNUNET_TRANSPORT_TransmitHandle *th; - - prev = NULL; - pos = h->neighbours; - while ((pos != NULL) && - (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity)))) - { - prev = pos; - pos = pos->next; - } - if (pos == NULL) - { - GNUNET_break (0); - return; - } - if (prev == NULL) - h->neighbours = pos->next; - else - prev->next = pos->next; - if (NULL != (th = pos->transmit_handle)) - { - pos->transmit_handle = NULL; - th->neighbour = NULL; - remove_from_any_list (th); - try_connect (th); - } - if (h->nc_cb != NULL) - h->nd_cb (h->cls, peer); - GNUNET_free (pos); -} - - /** * Type of a function to call when we receive a message * from the service. @@ -1462,8 +1578,8 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) const struct SendOkMessage *okm; struct HelloWaitList *hwl; struct NeighbourList *n; - struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pkey; struct GNUNET_PeerIdentity me; + struct GNUNET_TRANSPORT_TransmitHandle *th; uint16_t size; if ((msg == NULL) || (h->client == NULL)) @@ -1471,14 +1587,25 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) if (h->client != NULL) { #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Error receiving from transport service, disconnecting temporarily.\n"); #endif if (h->network_handle != NULL) { - GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle); + GNUNET_CONNECTION_notify_transmit_ready_cancel (h->network_handle); h->network_handle = NULL; h->transmission_scheduled = GNUNET_NO; + th = h->connect_ready_head; + /* add timeout again, we cancelled the transmit_ready task! */ + GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK); + th->notify_delay_task + = GNUNET_SCHEDULER_add_delayed (h->sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_KEEP, + GNUNET_SCHEDULER_NO_TASK, + GNUNET_TIME_absolute_get_remaining(th->timeout), + &peer_transmit_timeout, + th); } GNUNET_CLIENT_disconnect (h->client); h->client = NULL; @@ -1499,16 +1626,12 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) { case GNUNET_MESSAGE_TYPE_HELLO: if (GNUNET_OK != - GNUNET_HELLO_get_key ((const struct GNUNET_HELLO_Message *) msg, - &pkey)) + GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, + &me)) { GNUNET_break (0); break; } - GNUNET_CRYPTO_hash (&pkey, - sizeof (struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - &me.hashPubKey); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving (my own) `%s' message, I am `%4s'.\n", @@ -1582,12 +1705,13 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Processing pending message\n"); + "Processing pending message for `%4s'\n", + GNUNET_i2s(&n->id)); #endif GNUNET_SCHEDULER_cancel (h->sched, n->transmit_handle->notify_delay_task); n->transmit_handle->notify_delay_task = - GNUNET_SCHEDULER_NO_PREREQUISITE_TASK; + GNUNET_SCHEDULER_NO_TASK; GNUNET_assert (GNUNET_YES == n->received_ack); schedule_request (n->transmit_handle); } @@ -1663,7 +1787,7 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) struct ClientTransmitWrapper { - GNUNET_NETWORK_TransmitReadyNotify notify; + GNUNET_CONNECTION_TransmitReadyNotify notify; void *notify_cls; struct GNUNET_TRANSPORT_TransmitHandle *th; }; @@ -1717,7 +1841,7 @@ client_notify_wrapper (void *cls, size_t size, void *buf) ret += sizeof (struct OutboundMessage); obm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND); obm->header.size = htons (ret); - obm->reserved = htonl (0); + obm->priority = htonl (ctw->th->priority); obm->peer = ctw->th->target; GNUNET_free (ctw); return ret; @@ -1734,6 +1858,7 @@ client_notify_wrapper (void *cls, size_t size, void *buf) * @param handle connection to transport service * @param target who should receive the message * @param size how big is the message we want to transmit? + * @param priority how important is the message? * @param timeout after how long should we give up (and call * notify with buf NULL and size 0)? * @param notify function to call when we are ready to @@ -1748,8 +1873,9 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle, const struct GNUNET_PeerIdentity *target, size_t size, + unsigned int priority, struct GNUNET_TIME_Relative timeout, - GNUNET_NETWORK_TransmitReadyNotify + GNUNET_CONNECTION_TransmitReadyNotify notify, void *notify_cls) { struct GNUNET_TRANSPORT_TransmitHandle *pos; @@ -1759,31 +1885,34 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle if (size + sizeof (struct OutboundMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) - return NULL; + { + GNUNET_break (0); + return NULL; + } #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking transport service for transmission of %u bytes to peer `%4s'.\n", size, GNUNET_i2s (target)); #endif n = find_neighbour (handle, target); + if ( (n != NULL) && + (n->transmit_handle != NULL) ) + return NULL; /* already have a request pending for this peer! */ ctw = GNUNET_malloc (sizeof (struct ClientTransmitWrapper)); th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle)); ctw->notify = notify; ctw->notify_cls = notify_cls; ctw->th = th; th->handle = handle; + th->neighbour = n; th->target = *target; th->notify = &client_notify_wrapper; th->notify_cls = ctw; - th->notify_size = size + sizeof (struct OutboundMessage); th->timeout = GNUNET_TIME_relative_to_absolute (timeout); - th->neighbour = n; + th->notify_size = size + sizeof (struct OutboundMessage); + th->priority = priority; if (NULL == n) { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission request could not be satisfied (not yet connected), adding it to pending request list.\n"); -#endif pos = handle->connect_wait_head; while (pos != NULL) { @@ -1797,46 +1926,43 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle "Will now try to connect to `%4s'.\n", GNUNET_i2s (target)); #endif try_connect (th); + return th; } - else - { + #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission request queued for transmission to transport service.\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmission request queued for transmission to transport service.\n"); #endif - GNUNET_assert (NULL == n->transmit_handle); - n->transmit_handle = th; - if (GNUNET_YES == n->received_ack) - { + GNUNET_assert (NULL == n->transmit_handle); + n->transmit_handle = th; + if (GNUNET_YES != n->received_ack) + { #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer `%4s' is connected, scheduling for delivery now.\n", - GNUNET_i2s (target)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llu ms) only.\n", + GNUNET_i2s (target), timeout.value); #endif - schedule_request (th); - } - else - { + th->notify_delay_task + = GNUNET_SCHEDULER_add_delayed (handle->sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_KEEP, + GNUNET_SCHEDULER_NO_TASK, + timeout, &peer_transmit_timeout, th); + return th; + } + #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llums) only.\n", - GNUNET_i2s (target), timeout.value); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peer `%4s' is ready to receive, scheduling message for delivery now.\n", + GNUNET_i2s (target)); #endif - th->notify_delay_task - = GNUNET_SCHEDULER_add_delayed (handle->sched, - GNUNET_NO, - GNUNET_SCHEDULER_PRIORITY_KEEP, - GNUNET_SCHEDULER_NO_PREREQUISITE_TASK, - timeout, &transmit_timeout, th); - } - } + schedule_request (th); return th; } /** - * Cancel the specified transmission-ready - * notification. + * Cancel the specified transmission-ready notification. */ void GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct @@ -1845,16 +1971,23 @@ GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct { struct GNUNET_TRANSPORT_Handle *h; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmission request of %u bytes to `%4s' was cancelled.\n", + th->notify_size - sizeof(struct OutboundMessage), + GNUNET_i2s (&th->target)); +#endif GNUNET_assert (th->notify == &client_notify_wrapper); remove_from_any_list (th); h = th->handle; if ((h->connect_ready_head == NULL) && (h->network_handle != NULL)) { - GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle); + GNUNET_CONNECTION_notify_transmit_ready_cancel (h->network_handle); h->network_handle = NULL; h->transmission_scheduled = GNUNET_NO; } GNUNET_free (th->notify_cls); + GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK); GNUNET_free (th); }