From 2d8a83d2269d9f88032f4bac2a12cc1d156741c1 Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Thu, 2 Dec 2010 13:03:09 +0000 Subject: [PATCH] this is a merged version of revision 13866 and my latestest changes without the old ats changes all changes from r13826 not made from me are included --- src/transport/transport_api.c | 425 ++++++++++++++++++++++------------ 1 file changed, 275 insertions(+), 150 deletions(-) diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c index 643f8b0f4..4f9433c6c 100644 --- a/src/transport/transport_api.c +++ b/src/transport/transport_api.c @@ -60,6 +60,11 @@ */ #define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) +/** + * How large to start with for the hashmap of neighbours. + */ +#define STARTING_NEIGHBOURS_SIZE 10 + /** * What stage are we in for transmission processing? @@ -187,18 +192,33 @@ struct ControlMessage }; - /** - * Entry in linked list of all of our current neighbours. + * Context for storing information about attempted next transmission. */ -struct NeighbourList +struct TryTransmitContext { /** - * This is a linked list. + * Main transport handle. */ - struct NeighbourList *next; + struct GNUNET_TRANSPORT_Handle *h; + + /** + * Returned transmission handle. + */ + struct GNUNET_TRANSPORT_TransmitHandle *ret; + /** + * Time to retry the send task. + */ + struct GNUNET_TIME_Relative retry_time; +}; + +/** + * Entry in hash table of all of our current neighbours. + */ +struct NeighbourList +{ /** * Overall transport handle. */ @@ -235,6 +255,11 @@ struct NeighbourList */ int is_connected; + /** + * Are we in the middle of disconnecting the peer already? + */ + unsigned int in_disconnect; + }; @@ -334,7 +359,7 @@ struct GNUNET_TRANSPORT_Handle /** * Linked list of the current neighbours of this peer. */ - struct NeighbourList *neighbours; + struct GNUNET_CONTAINER_MultiHashMap *neighbours; /** * Peer identity as assumed by this process, or all zeros. @@ -371,7 +396,6 @@ struct GNUNET_TRANSPORT_Handle }; -// FIXME: replace with hash map! /** * Get the neighbour list entry for the given peer * @@ -383,13 +407,7 @@ static struct NeighbourList * neighbour_find (struct GNUNET_TRANSPORT_Handle *h, const struct GNUNET_PeerIdentity *peer) { - struct NeighbourList *pos; - - pos = h->neighbours; - while ((pos != NULL) && - (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity)))) - pos = pos->next; - return pos; + return GNUNET_CONTAINER_multihashmap_get(h->neighbours, &peer->hashPubKey); } @@ -417,6 +435,90 @@ quota_transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } +/** + * Iterator over hash map entries, attempt to schedule + * a transmission to entries in the neighbour hashmap. + * + * @param cls closure a TryTransmitContext + * @param key current key code + * @param value value in the hash map, the neighbour entry to consider + * @return GNUNET_YES if we should continue to + * iterate, + * GNUNET_NO if not. + */ +static int +try_schedule_transmission (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct NeighbourList *n = value; + struct TryTransmitContext *try_transmit_ctx = cls; + struct GNUNET_TIME_Relative duration; + GNUNET_CONNECTION_TransmitReadyNotify notify; + struct GNUNET_TRANSPORT_TransmitHandle *th; + struct GNUNET_TIME_Absolute duration_abs; + + if (n->transmit_stage != TS_QUEUED) + return GNUNET_YES; /* not eligible, keep iterating */ + if (n->is_connected != GNUNET_YES) + return GNUNET_YES; /* keep iterating */ + + th = &n->transmit_handle; + GNUNET_break (n == th->neighbour); + /* check outgoing quota */ + duration = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, + th->notify_size - sizeof (struct OutboundMessage)); + duration_abs = GNUNET_TIME_relative_to_absolute (duration); + if (th->timeout.abs_value < duration_abs.abs_value) + { + /* signal timeout! */ +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n", + duration.rel_value, + GNUNET_i2s (&n->id)); +#endif + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (th->notify_delay_task); + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } + n->transmit_stage = TS_NEW; + if (NULL != (notify = th->notify)) + { + th->notify = NULL; + GNUNET_assert (0 == notify (th->notify_cls, 0, NULL)); + } + return GNUNET_YES; /* keep iterating */ + } + if (duration.rel_value > 0) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Need more bandwidth (%u b/s allowed, %u b needed), delaying delivery to `%4s' by %llu ms\n", + (unsigned int) n->out_tracker.available_bytes_per_s__, + (unsigned int) th->notify_size - sizeof (struct OutboundMessage), + GNUNET_i2s (&n->id), + (unsigned long long) duration.rel_value); +#endif + try_transmit_ctx->retry_time = GNUNET_TIME_relative_min (try_transmit_ctx->retry_time, + duration); + return GNUNET_YES; /* keep iterating */ + } +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Have %u bytes of bandwidth available for transmission to `%4s' right now\n", + th->notify_size - sizeof (struct OutboundMessage), + GNUNET_i2s (&n->id)); +#endif + + if ( (try_transmit_ctx->ret == NULL) || + (try_transmit_ctx->ret->priority < th->priority) ) + try_transmit_ctx->ret = th; + return GNUNET_YES; +} + + /** * Figure out which transmission to a peer can be done right now. * If none can, schedule a task to call 'schedule_transmission' @@ -430,88 +532,24 @@ quota_transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static struct GNUNET_TRANSPORT_TransmitHandle * schedule_peer_transmission (struct GNUNET_TRANSPORT_Handle *h) { - struct GNUNET_TRANSPORT_TransmitHandle *ret; - struct GNUNET_TRANSPORT_TransmitHandle *th; - struct NeighbourList *n; - struct NeighbourList *next; - struct GNUNET_TIME_Relative retry_time; - struct GNUNET_TIME_Relative duration; - GNUNET_CONNECTION_TransmitReadyNotify notify; + struct TryTransmitContext try_transmit_ctx; if (h->quota_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (h->quota_task); h->quota_task = GNUNET_SCHEDULER_NO_TASK; } - retry_time = GNUNET_TIME_UNIT_FOREVER_REL; - ret = NULL; - next = h->neighbours; - while (NULL != (n = next)) - { - next = n->next; - if (n->transmit_stage != TS_QUEUED) - continue; /* not eligible */ - if (n->is_connected != GNUNET_YES) - continue; - - th = &n->transmit_handle; - GNUNET_break (n == th->neighbour); - /* check outgoing quota */ - duration = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, - th->notify_size - sizeof (struct OutboundMessage)); - struct GNUNET_TIME_Absolute duration_abs = GNUNET_TIME_relative_to_absolute (duration); - if (th->timeout.abs_value < duration_abs.abs_value) - { - /* signal timeout! */ -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n", - duration.rel_value, - GNUNET_i2s (&n->id)); -#endif - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - n->transmit_stage = TS_NEW; - if (NULL != (notify = th->notify)) - { - th->notify = NULL; - GNUNET_assert (0 == notify (th->notify_cls, 0, NULL)); - } - continue; - } - if (duration.rel_value > 0) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Need more bandwidth (%u b/s allowed, %u b needed), delaying delivery to `%4s' by %llu ms\n", - (unsigned int) n->out_tracker.available_bytes_per_s__, - (unsigned int) th->notify_size - sizeof (struct OutboundMessage), - GNUNET_i2s (&n->id), - duration.rel_value); -#endif - retry_time = GNUNET_TIME_relative_min (retry_time, - duration); - continue; - } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Have %u bytes of bandwidth available for transmission to `%4s' right now\n", - th->notify_size - sizeof (struct OutboundMessage), - GNUNET_i2s (&n->id)); -#endif - - if ( (ret == NULL) || - (ret->priority < th->priority) ) - ret = th; - } - if (ret == NULL) - h->quota_task = GNUNET_SCHEDULER_add_delayed (retry_time, + try_transmit_ctx.h = h; + try_transmit_ctx.ret = NULL; + try_transmit_ctx.retry_time = GNUNET_TIME_UNIT_FOREVER_REL; + GNUNET_CONTAINER_multihashmap_iterate(h->neighbours, + &try_schedule_transmission, + &try_transmit_ctx); + if (try_transmit_ctx.ret == NULL) + h->quota_task = GNUNET_SCHEDULER_add_delayed (try_transmit_ctx.retry_time, "a_transmit_ready, h); - return ret; + return try_transmit_ctx.ret; } @@ -777,18 +815,39 @@ schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h, } +/** + * FIXME: document + */ struct SetQuotaContext { + /** + * FIXME: document + */ struct GNUNET_TRANSPORT_Handle *handle; + /** + * FIXME: document + */ struct GNUNET_PeerIdentity target; + /** + * FIXME: document + */ GNUNET_SCHEDULER_Task cont; + /** + * Closure for 'cont'. + */ void *cont_cls; + /** + * FIXME: document + */ struct GNUNET_TIME_Absolute timeout; + /** + * FIXME: document + */ struct GNUNET_BANDWIDTH_Value32NBO quota_in; }; @@ -809,9 +868,10 @@ send_set_quota (void *cls, size_t size, void *buf) if (buf == NULL) { - GNUNET_SCHEDULER_add_continuation (sqc->cont, - sqc->cont_cls, - GNUNET_SCHEDULER_REASON_TIMEOUT); + if (sqc->cont != NULL) + GNUNET_SCHEDULER_add_continuation (sqc->cont, + sqc->cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); GNUNET_free (sqc); return 0; } @@ -1090,9 +1150,21 @@ static void neighbour_free (struct NeighbourList *n) { struct GNUNET_TRANSPORT_Handle *h; - struct NeighbourList *prev; - struct NeighbourList *pos; + /* Added so task gets canceled when a disconnect is received! */ + /* Method 1 + if (n->transmit_handle.notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel(n->transmit_handle.notify_delay_task); + n->transmit_handle.notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + n->transmit_handle.notify = NULL; + } + */ + /* NATE: if the above is not needed, then clearly this assertion + should hold (I've checked the code and I'm pretty sure this is + true. -CG + FIXME: remove above comments once we've seen tests pass with the assert... */ + GNUNET_assert (n->transmit_handle.notify_delay_task == GNUNET_SCHEDULER_NO_TASK); GNUNET_assert (n->transmit_handle.notify == NULL); h = n->h; #if DEBUG_TRANSPORT @@ -1103,17 +1175,10 @@ neighbour_free (struct NeighbourList *n) GNUNET_break (n->is_connected == GNUNET_NO); GNUNET_break (n->transmit_stage == TS_NEW); - prev = NULL; - pos = h->neighbours; - while (pos != n) - { - prev = pos; - pos = pos->next; - } - if (prev == NULL) - h->neighbours = n->next; - else - prev->next = n->next; + GNUNET_assert(GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove(h->neighbours, + &n->id.hashPubKey, + n)); GNUNET_free (n); } @@ -1134,6 +1199,14 @@ neighbour_disconnect (struct NeighbourList *n) #endif GNUNET_break (n->is_connected == GNUNET_YES); n->is_connected = GNUNET_NO; + /* FIXME: this 'in_disconnect' flag is dubious; we should define + clearly what disconnect means for pending 'notify_transmit_ready' + requests; maybe a good approach is to REQUIRE clients to + call 'notify_transmit_ready_cancel' on pending requests on disconnect + and otherwise FAIL HARD with an assertion failure before + 'neighbour_free' right here (transmit_stage would be forced + to 'TS_NEW') */ + n->in_disconnect = GNUNET_YES; if (h->nd_cb != NULL) h->nd_cb (h->cls, &n->id); if (n->transmit_stage == TS_NEW) @@ -1152,6 +1225,35 @@ static void demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg); +/** + * Iterator over hash map entries, for getting rid of a neighbor + * upon a reconnect call. + * + * @param cls closure (NULL) + * @param key current key code + * @param value value in the hash map, the neighbour entry to forget + * @return GNUNET_YES if we should continue to + * iterate, + * GNUNET_NO if not. + */ +static int +forget_neighbours (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct NeighbourList *n = value; + +#if DEBUG_TRANSPORT_DISCONNECT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Disconnecting due to reconnect being called\n"); +#endif + if (n->is_connected) + neighbour_disconnect (n); + + return GNUNET_YES; +} + + /** * Try again to connect to transport service. * @@ -1164,8 +1266,6 @@ reconnect (void *cls, { struct GNUNET_TRANSPORT_Handle *h = cls; struct ControlMessage *pos; - struct NeighbourList *n; - struct NeighbourList *next; h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) @@ -1174,18 +1274,10 @@ reconnect (void *cls, return; } /* Forget about all neighbours that we used to be connected to */ - n = h->neighbours; - while (NULL != n) - { -#if DEBUG_TRANSPORT_DISCONNECT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Disconnecting due to reconnect being called\n"); -#endif - next = n->next; - if (n->is_connected) - neighbour_disconnect (n); - n = next; - } + GNUNET_CONTAINER_multihashmap_iterate(h->neighbours, + &forget_neighbours, + NULL); + #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); @@ -1310,6 +1402,7 @@ send_request_connect_message(struct GNUNET_TRANSPORT_Handle *h, struct Neighbour GNUNET_TIME_UNIT_FOREVER_REL, &send_transport_request_connect, trcm); } + /** * Add neighbour to our list * @@ -1336,18 +1429,61 @@ neighbour_add (struct GNUNET_TRANSPORT_Handle *h, #endif n = GNUNET_malloc (sizeof (struct NeighbourList)); n->id = *pid; + n->h = h; GNUNET_BANDWIDTH_tracker_init (&n->out_tracker, GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, MAX_BANDWIDTH_CARRY_S); - n->next = h->neighbours; - n->h = h; - h->neighbours = n; - + GNUNET_CONTAINER_multihashmap_put (h->neighbours, + &pid->hashPubKey, + n, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); return n; } +/** + * Iterator over hash map entries, for deleting state of a neighbor. + * + * @param cls closure (NULL) + * @param key current key code + * @param value value in the hash map, the neighbour entry to delete + * @return GNUNET_YES if we should continue to + * iterate, + * GNUNET_NO if not. + */ +static int +delete_neighbours (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct NeighbourList *n = value; + struct GNUNET_TRANSPORT_TransmitHandle *th; + + switch (n->transmit_stage) + { + case TS_NEW: + case TS_TRANSMITTED: + /* nothing to do */ + break; + case TS_QUEUED: + case TS_TRANSMITTED_QUEUED: + th = &n->transmit_handle; + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (th->notify_delay_task); + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); + break; + default: + GNUNET_break (0); + } + GNUNET_free (n); + return GNUNET_YES; +} + + /** * Connect to the transport service. Note that the connection may * complete (or fail) asynchronously. @@ -1382,6 +1518,7 @@ GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, ret->nc_cb = nc; ret->nd_cb = nd; ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO; + ret->neighbours = GNUNET_CONTAINER_multihashmap_create(STARTING_NEIGHBOURS_SIZE); schedule_reconnect (ret); return ret; } @@ -1393,8 +1530,6 @@ GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) { - struct GNUNET_TRANSPORT_TransmitHandle *th; - struct NeighbourList *n; struct HelloWaitList *hwl; struct GNUNET_CLIENT_Connection *client; struct ControlMessage *cm; @@ -1402,31 +1537,20 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n"); #endif + /* FIXME: this flag is dubious, we should be able to do this + more cleanly; also, we should probably do 'disconnect' + callbacks for every connected peer here, i.e. by calling + the iterator with 'forget_neighbours' instead of 'delete_neighbours'. + */ + handle->in_disconnect = GNUNET_YES; - while (NULL != (n = handle->neighbours)) - { - handle->neighbours = n->next; - switch (n->transmit_stage) - { - case TS_NEW: - case TS_TRANSMITTED: - /* nothing to do */ - break; - case TS_QUEUED: - case TS_TRANSMITTED_QUEUED: - th = &n->transmit_handle; - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - break; - default: - GNUNET_break (0); - } - GNUNET_free (n); - } + + GNUNET_assert (GNUNET_SYSERR != + GNUNET_CONTAINER_multihashmap_iterate(handle->neighbours, + &delete_neighbours, + handle)); + GNUNET_CONTAINER_multihashmap_destroy (handle->neighbours); + while (NULL != (hwl = handle->hwl_head)) { handle->hwl_head = hwl->next; @@ -1886,7 +2010,7 @@ GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct n = th->neighbour; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission request of %u bytes to `%4s' was cancelled.\n", + "Transmission request of %u bytes to `%4s' was canceled.\n", th->notify_size - sizeof (struct OutboundMessage), GNUNET_i2s (&n->id)); #endif @@ -1902,7 +2026,8 @@ GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct break; case TS_QUEUED: n->transmit_stage = TS_NEW; - if (n->is_connected == GNUNET_NO) + if ( (n->in_disconnect == GNUNET_NO) && + (n->is_connected == GNUNET_NO) ) neighbour_free (n); break; case TS_TRANSMITTED: -- 2.25.1