From 548c3ae72646799e886f403945bf07befa670e0d Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Fri, 8 Nov 2013 14:11:35 +0000 Subject: [PATCH] - allow to cancel connection messages - change poll/ack mechanism --- src/mesh/gnunet-service-mesh_connection.c | 232 +++++++++++++++++----- src/mesh/gnunet-service-mesh_connection.h | 51 ++++- src/mesh/gnunet-service-mesh_peer.c | 136 +++++++------ src/mesh/gnunet-service-mesh_peer.h | 20 +- src/mesh/gnunet-service-mesh_tunnel.c | 6 +- 5 files changed, 326 insertions(+), 119 deletions(-) diff --git a/src/mesh/gnunet-service-mesh_connection.c b/src/mesh/gnunet-service-mesh_connection.c index c183e71d7..e10dcc7a5 100644 --- a/src/mesh/gnunet-service-mesh_connection.c +++ b/src/mesh/gnunet-service-mesh_connection.c @@ -104,6 +104,16 @@ struct MeshFlowControl * How frequently to poll for ACKs. */ struct GNUNET_TIME_Relative poll_time; + + /** + * Queued poll message, to cancel if not necessary anymore (got ACK). + */ + struct MeshConnectionQueue *poll_msg; + + /** + * Queued poll message, to cancel if not necessary anymore (got ACK). + */ + struct MeshConnectionQueue *ack_msg; }; /** @@ -201,6 +211,16 @@ struct MeshConnection int destroy; }; +/** + * Handle for messages queued but not yet sent. + */ +struct MeshConnectionQueue +{ + struct MeshPeerQueue *q; + GMC_sent cont; + void *cont_cls; +}; + /******************************************************************************/ /******************************* GLOBALS ***********************************/ /******************************************************************************/ @@ -350,20 +370,42 @@ connection_change_state (struct MeshConnection* c, } +/** + * Callback called when a queued ACK message is sent. + * + * @param cls Closure (FC). + * @param c Connection this message was on. + * @param type Type of message sent. + * @param fwd Was this a FWD going message? + * @param size Size of the message. + */ +static void +ack_sent (void *cls, + struct MeshConnection *c, + struct MeshConnectionQueue *q, + uint16_t type, int fwd, size_t size) +{ + struct MeshFlowControl *fc = cls; + + fc->ack_msg = NULL; +} + + /** * Send an ACK on the connection, informing the predecessor about * the available buffer space. Should not be called in case the peer - * is origin (no predecessor). + * is origin (no predecessor) in the @c fwd direction. * * Note that for fwd ack, the FWD mean forward *traffic* (root->dest), * the ACK itself goes "back" (dest->root). * * @param c Connection on which to send the ACK. * @param buffer How much space free to advertise? - * @param fwd Is this FWD ACK? (Going dest->owner) + * @param fwd Is this FWD ACK? (Going dest -> root) + * @param force Don't optimize out. */ static void -send_ack (struct MeshConnection *c, unsigned int buffer, int fwd) +send_ack (struct MeshConnection *c, unsigned int buffer, int fwd, int force) { struct MeshFlowControl *next_fc; struct MeshFlowControl *prev_fc; @@ -385,9 +427,9 @@ send_ack (struct MeshConnection *c, unsigned int buffer, int fwd) "connection send %s ack on %s\n", fwd ? "FWD" : "BCK", GMC_2s (c)); - /* Check if we need to transmit the ACK */ + /* Check if we need to transmit the ACK. */ delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv; - if (3 < delta && buffer < delta) + if (3 < delta && buffer < delta && GNUNET_NO == force) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n"); LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -403,12 +445,28 @@ send_ack (struct MeshConnection *c, unsigned int buffer, int fwd) " last pid %u, last ack %u, qmax %u, q %u\n", prev_fc->last_pid_recv, prev_fc->last_ack_sent, next_fc->queue_max, next_fc->queue_n); - if (ack == prev_fc->last_ack_sent) + if (ack == prev_fc->last_ack_sent && GNUNET_NO == force) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n"); return; } + /* Check if message is already in queue */ + if (NULL != prev_fc->ack_msg) + { + if (GMC_is_pid_bigger (ack, prev_fc->last_ack_sent)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " canceling old ACK\n"); + GMC_cancel (prev_fc->ack_msg); + /* GMC_cancel triggers ack_sent(), which clears fc->ack_msg */ + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " same ACK already in queue\n"); + return; + } + } + prev_fc->last_ack_sent = ack; /* Build ACK message and send on connection */ @@ -417,7 +475,8 @@ send_ack (struct MeshConnection *c, unsigned int buffer, int fwd) msg.ack = htonl (ack); msg.cid = c->id; - GMC_send_prebuilt_message (&msg.header, c, !fwd); + prev_fc->ack_msg = GMC_send_prebuilt_message (&msg.header, c, !fwd, + &ack_sent, prev_fc); } @@ -426,7 +485,7 @@ send_ack (struct MeshConnection *c, unsigned int buffer, int fwd) * * Calculates the average time and connection packet tracking. * - * @param cls Closure. + * @param cls Closure (ConnectionQueue Handle). * @param c Connection this message was on. * @param type Type of message sent. * @param fwd Was this a FWD going message? @@ -441,11 +500,18 @@ message_sent (void *cls, { struct MeshConnectionPerformance *p; struct MeshFlowControl *fc; + struct MeshConnectionQueue *q = cls; double usecsperbyte; fc = fwd ? &c->fwd_fc : &c->bck_fc; LOG (GNUNET_ERROR_TYPE_DEBUG, "! sent %s\n", GNUNET_MESH_DEBUG_M2S (type)); LOG (GNUNET_ERROR_TYPE_DEBUG, "! C_P- %p %u\n", c, c->pending_messages); + if (NULL != q->cont) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "! calling cont\n"); + q->cont (q->cont_cls, c, q, type, fwd, size); + GNUNET_free (q); + } c->pending_messages--; if (GNUNET_YES == c->destroy && 0 == c->pending_messages) { @@ -463,8 +529,17 @@ message_sent (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "! accounting pid %u\n", fc->last_pid_sent); - GMC_send_ack (c, fwd); + GMC_send_ack (c, fwd, GNUNET_NO); break; + + case GNUNET_MESSAGE_TYPE_MESH_POLL: + fc->poll_msg = NULL; + break; + + case GNUNET_MESSAGE_TYPE_MESH_ACK: + fc->ack_msg = NULL; + break; + default: break; } @@ -633,7 +708,7 @@ send_broken (struct MeshConnection *c, msg.cid = c->id; msg.peer1 = *id1; msg.peer2 = *id2; - GMC_send_prebuilt_message (&msg.header, c, fwd); + GMC_send_prebuilt_message (&msg.header, c, fwd, NULL, NULL); } @@ -664,7 +739,7 @@ connection_keepalive (struct MeshConnection *c, int fwd) msg->header.type = htons (type); msg->cid = c->id; - GMC_send_prebuilt_message (&msg->header, c, fwd); + GMC_send_prebuilt_message (&msg->header, c, fwd, NULL, NULL); } @@ -811,6 +886,41 @@ connection_cancel_queues (struct MeshConnection *c, int fwd) } +/** + * Function called if a connection has been stalled for a while, + * possibly due to a missed ACK. Poll the neighbor about its ACK status. + * + * @param cls Closure (poll ctx). + * @param tc TaskContext. + */ +static void +connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + + +/** + * Callback called when a queued POLL message is sent. + * + * @param cls Closure (FC). + * @param c Connection this message was on. + * @param type Type of message sent. + * @param fwd Was this a FWD going message? + * @param size Size of the message. + */ +static void +poll_sent (void *cls, + struct MeshConnection *c, + struct MeshConnectionQueue *q, + uint16_t type, int fwd, size_t size) +{ + struct MeshFlowControl *fc = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL sent, scheduling new one!\n"); + fc->poll_msg = NULL; + fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time); + fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time, + &connection_poll, fc); +} + /** * Function called if a connection has been stalled for a while, * possibly due to a missed ACK. Poll the neighbor about its ACK status. @@ -841,10 +951,8 @@ connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) msg.header.size = htons (sizeof (msg)); msg.pid = htonl (fc->last_pid_sent); LOG (GNUNET_ERROR_TYPE_DEBUG, " *** last pid sent: %u!\n", fc->last_pid_sent); - GMC_send_prebuilt_message (&msg.header, c, fc == &c->fwd_fc); - fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time); - fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time, - &connection_poll, fc); + fc->poll_msg = GMC_send_prebuilt_message (&msg.header, c, fc == &c->fwd_fc, + &poll_sent, fc); } @@ -1136,7 +1244,7 @@ GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer, LOG (GNUNET_ERROR_TYPE_DEBUG, " Retransmitting.\n"); GMP_add_path (dest_peer, path_duplicate (path), GNUNET_NO); GMP_add_path_to_origin (orig_peer, path, GNUNET_NO); - GMC_send_prebuilt_message (message, c, GNUNET_YES); + GMC_send_prebuilt_message (message, c, GNUNET_YES, NULL, NULL); } return GNUNET_OK; } @@ -1233,7 +1341,7 @@ GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer, } LOG (GNUNET_ERROR_TYPE_DEBUG, " not for us, retransmitting...\n"); - GMC_send_prebuilt_message (message, c, fwd); + GMC_send_prebuilt_message (message, c, fwd, NULL, NULL); return GNUNET_OK; } @@ -1282,7 +1390,7 @@ GMC_handle_broken (void* cls, } else { - GMC_send_prebuilt_message (message, c, fwd); + GMC_send_prebuilt_message (message, c, fwd, NULL, NULL); c->destroy = GNUNET_YES; } @@ -1333,7 +1441,7 @@ GMC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_break_op (0); return GNUNET_OK; } - GMC_send_prebuilt_message (message, c, fwd); + GMC_send_prebuilt_message (message, c, fwd, NULL, NULL); c->destroy = GNUNET_YES; return GNUNET_OK; @@ -1445,7 +1553,7 @@ handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer, } fc->last_pid_recv = pid; GMT_handle_encrypted (c->t, msg); - GMC_send_ack (c, fwd); + GMC_send_ack (c, fwd, GNUNET_NO); return GNUNET_OK; } @@ -1457,12 +1565,12 @@ handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer, { GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO); LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n"); - GMC_send_ack (c, fwd); + GMC_send_ack (c, fwd, GNUNET_NO); return GNUNET_OK; } - GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO); - GMC_send_prebuilt_message (&msg->header, c, fwd); + GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO); + GMC_send_prebuilt_message (&msg->header, c, fwd, NULL, NULL); return GNUNET_OK; } @@ -1559,8 +1667,7 @@ handle_mesh_kx (const struct GNUNET_PeerIdentity *peer, /* Message not for us: forward to next hop */ LOG (GNUNET_ERROR_TYPE_DEBUG, " not for us, retransmitting...\n"); GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO); - - GMC_send_prebuilt_message (&msg->header, c, fwd); + GMC_send_prebuilt_message (&msg->header, c, fwd, NULL, NULL); return GNUNET_OK; } @@ -1745,7 +1852,7 @@ GMC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer, LOG (GNUNET_ERROR_TYPE_DEBUG, " PID %u, OLD %u\n", pid, fc->last_pid_recv); fc->last_pid_recv = pid; fwd = fc == &c->bck_fc; - GMC_send_ack (c, fwd); + GMC_send_ack (c, fwd, GNUNET_YES); return GNUNET_OK; } @@ -1801,7 +1908,7 @@ GMC_handle_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; GNUNET_STATISTICS_update (stats, "# keepalives forwarded", 1, GNUNET_NO); - GMC_send_prebuilt_message (message, c, fwd); + GMC_send_prebuilt_message (message, c, fwd, NULL, NULL); return GNUNET_OK; } @@ -1812,10 +1919,11 @@ GMC_handle_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer, * the direction and the position of the peer. * * @param c Which connection to send the hop-by-hop ACK. - * @param fwd Is this a fwd ACK? (will go dest->root) + * @param fwd Is this a fwd ACK? (will go dest->root). + * @param force Send the ACK even if suboptimal (e.g. requested by POLL). */ void -GMC_send_ack (struct MeshConnection *c, int fwd) +GMC_send_ack (struct MeshConnection *c, int fwd, int force) { unsigned int buffer; @@ -1841,22 +1949,20 @@ GMC_send_ack (struct MeshConnection *c, int fwd) buffer = GMC_get_buffer (c, fwd); } LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer available: %u\n", buffer); + if (0 == buffer && GNUNET_NO == force) + return; /* Send available buffer space */ if (GMC_is_origin (c, fwd)) { GNUNET_assert (NULL != c->t); LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on channels...\n"); - if (0 < buffer) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " really sending!\n"); - GMT_unchoke_channels (c->t); - } + GMT_unchoke_channels (c->t); } else { LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on connection\n"); - send_ack (c, buffer, fwd); + send_ack (c, buffer, fwd, force); } } @@ -2112,7 +2218,7 @@ GMC_get_qn (struct MeshConnection *c, int fwd) void GMC_allow (struct MeshConnection *c, unsigned int buffer, int fwd) { - send_ack (c, buffer, fwd); + send_ack (c, buffer, fwd, GNUNET_NO); } @@ -2216,13 +2322,19 @@ GMC_is_sendable (struct MeshConnection *c, int fwd) * If message is not hop-by-hop, decrements TTL of copy. * @param c Connection on which this message is transmitted. * @param fwd Is this a fwd message? + * @param cont Continuation called once message is sent. Can be NULL. + * @param cont_cls Closure for @c cont. + * + * @return Handle to cancel the message before it's sent. NULL on error. + * Invalid on @c cont call. */ -void +struct MeshConnectionQueue * GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, - struct MeshConnection *c, - int fwd) + struct MeshConnection *c, int fwd, + GMC_sent cont, void *cont_cls) { struct MeshFlowControl *fc; + struct MeshConnectionQueue *q; void *data; size_t size; uint16_t type; @@ -2253,7 +2365,7 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, if (0 == ttl) { GNUNET_break_op (0); - return; + return NULL; } emsg->cid = c->id; emsg->ttl = htonl (ttl - 1); @@ -2318,14 +2430,42 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, fc->queue_n, fc->queue_max); if (GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED == type) fc->queue_n--; - return; /* Drop this message */ + return NULL; /* Drop this message */ } LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P+ %p %u\n", c, c->pending_messages); c->pending_messages++; - GMP_queue_add (get_hop (c, fwd), data, type, size, c, fwd, - &message_sent, NULL); + q = GNUNET_new (struct MeshConnectionQueue); + q->q = GMP_queue_add (get_hop (c, fwd), data, type, size, c, fwd, + &message_sent, q); + q->cont = cont; + q->cont_cls = cont_cls; + return q; +} + + +/** + * Cancel a previously sent message while it's in the queue. + * + * ONLY can be called before the continuation given to the send function + * is called. Once the continuation is called, the message is no longer in the + * queue. + * + * If the send function was given no continuation, GMC_cancel should + * NOT be called, since it's not possible to determine if the message has + * already been sent. + * + * @param q Handle to the queue. + */ +void +GMC_cancel (struct MeshConnectionQueue *q) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, "! GMC cancel message\n"); + /* queue destroy calls message_sent, which calls q->cont */ + GMP_queue_destroy (q->q, GNUNET_YES); + + GNUNET_free (q); } @@ -2383,9 +2523,9 @@ GMC_send_destroy (struct MeshConnection *c) GMC_2s (c)); if (GNUNET_NO == GMC_is_terminal (c, GNUNET_YES)) - GMC_send_prebuilt_message (&msg.header, c, GNUNET_YES); + GMC_send_prebuilt_message (&msg.header, c, GNUNET_YES, NULL, NULL); if (GNUNET_NO == GMC_is_terminal (c, GNUNET_NO)) - GMC_send_prebuilt_message (&msg.header, c, GNUNET_NO); + GMC_send_prebuilt_message (&msg.header, c, GNUNET_NO, NULL, NULL); c->destroy = GNUNET_YES; } @@ -2406,7 +2546,7 @@ GMC_start_poll (struct MeshConnection *c, int fwd) struct MeshFlowControl *fc; fc = fwd ? &c->fwd_fc : &c->bck_fc; - if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task) + if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task && NULL != fc->poll_msg) { return; } diff --git a/src/mesh/gnunet-service-mesh_connection.h b/src/mesh/gnunet-service-mesh_connection.h index 7bb3c6aa1..d35630ef6 100644 --- a/src/mesh/gnunet-service-mesh_connection.h +++ b/src/mesh/gnunet-service-mesh_connection.h @@ -72,12 +72,31 @@ enum MeshConnectionState */ struct MeshConnection; +/** + * Handle for messages queued but not yet sent. + */ +struct MeshConnectionQueue; + #include "mesh_path.h" #include "gnunet-service-mesh_channel.h" #include "gnunet-service-mesh_peer.h" +/** + * Callback called when a queued message is sent. + * + * @param cls Closure. + * @param c Connection this message was on. + * @param type Type of message sent. + * @param fwd Was this a FWD going message? + * @param size Size of the message. + */ +typedef void (*GMC_sent) (void *cls, + struct MeshConnection *c, + struct MeshConnectionQueue *q, + uint16_t type, int fwd, size_t size); + /** * Core handler for connection creation. * @@ -211,10 +230,11 @@ GMC_handle_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer, * the direction and the position of the peer. * * @param c Which connection to send the hop-by-hop ACK. - * @param fwd Is this a fwd ACK? (will go dest->root) + * @param fwd Is this a fwd ACK? (will go dest->root). + * @param force Send the ACK even if suboptimal (e.g. requested by POLL). */ void -GMC_send_ack (struct MeshConnection *c, int fwd); +GMC_send_ack (struct MeshConnection *c, int fwd, int force); /** * Initialize the connections subsystem @@ -407,6 +427,22 @@ GMC_is_terminal (struct MeshConnection *c, int fwd); int GMC_is_sendable (struct MeshConnection *c, int fwd); +/** + * Cancel a previously sent message while it's in the queue. + * + * ONLY can be called before the continuation given to the send function + * is called. Once the continuation is called, the message is no longer in the + * queue. + * + * If the send function was given no continuation, GMC_cancel should + * NOT be called, since it's not possible to determine if the message has + * already been sent. + * + * @param q Handle to the queue. + */ +void +GMC_cancel (struct MeshConnectionQueue *q); + /** * Sends an already built message on a connection, properly registering * all used resources. @@ -415,11 +451,16 @@ GMC_is_sendable (struct MeshConnection *c, int fwd); * If message is not hop-by-hop, decrements TTL of copy. * @param c Connection on which this message is transmitted. * @param fwd Is this a fwd message? + * @param cont Continuation called once message is sent. Can be NULL. + * @param cont_cls Closure for @c cont. + * + * @return Handle to cancel the message before it's sent. NULL on error. + * Invalid on @c cont call. */ -void +struct MeshConnectionQueue * GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, - struct MeshConnection *c, - int fwd); + struct MeshConnection *c, int fwd, + GMC_sent cont, void *cont_cls); /** * Sends a CREATE CONNECTION message for a path to a peer. diff --git a/src/mesh/gnunet-service-mesh_peer.c b/src/mesh/gnunet-service-mesh_peer.c index a934ccc72..ec4001771 100644 --- a/src/mesh/gnunet-service-mesh_peer.c +++ b/src/mesh/gnunet-service-mesh_peer.c @@ -742,67 +742,6 @@ search_handler (void *cls, const struct MeshPeerPath *path) } -/** - * Free a transmission that was already queued with all resources - * associated to the request. - * - * @param queue Queue handler to cancel. - * @param clear_cls Is it necessary to free associated cls? - */ -static void -queue_destroy (struct MeshPeerQueue *queue, int clear_cls) -{ - struct MeshPeer *peer; - - peer = queue->peer; - GNUNET_assert (NULL != queue->c); - - if (GNUNET_YES == clear_cls) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "# queue destroy type %s\n", - GNUNET_MESH_DEBUG_M2S (queue->type)); - switch (queue->type) - { - case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY: - case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY: - LOG (GNUNET_ERROR_TYPE_INFO, "destroying a DESTROY message\n"); - /* fall through */ - case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED: - case GNUNET_MESSAGE_TYPE_MESH_ACK: - case GNUNET_MESSAGE_TYPE_MESH_POLL: - case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK: - case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE: - case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN: - LOG (GNUNET_ERROR_TYPE_DEBUG, "# prebuilt message\n");; - GNUNET_free_non_null (queue->cls); - break; - - default: - GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_ERROR, "# type %s unknown!\n", - GNUNET_MESH_DEBUG_M2S (queue->type)); - } - } - GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue); - - if (queue->type != GNUNET_MESSAGE_TYPE_MESH_ACK && - queue->type != GNUNET_MESSAGE_TYPE_MESH_POLL) - { - peer->queue_n--; - } - - if (NULL != queue->callback) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "# Calling callback\n"); - queue->callback (queue->callback_cls, - queue->c, queue->type, - queue->fwd, queue->size, - GNUNET_TIME_absolute_get_duration (queue->start_waiting)); - } - - GNUNET_free (queue); -} - /** * Core callback to write a queued packet to core buffer * @@ -912,7 +851,7 @@ queue_send (void *cls, size_t size, void *buf) } /* Free queue, but cls was freed by send_core_* */ - queue_destroy (queue, GNUNET_NO); + GMP_queue_destroy (queue, GNUNET_NO); /* If more data in queue, send next */ queue = peer_get_first_message (peer); @@ -953,6 +892,69 @@ queue_send (void *cls, size_t size, void *buf) /******************************** API ***********************************/ /******************************************************************************/ + +/** + * Free a transmission that was already queued with all resources + * associated to the request. + * + * @param queue Queue handler to cancel. + * @param clear_cls Is it necessary to free associated cls? + */ +void +GMP_queue_destroy (struct MeshPeerQueue *queue, int clear_cls) +{ + struct MeshPeer *peer; + + peer = queue->peer; + GNUNET_assert (NULL != queue->c); + + if (GNUNET_YES == clear_cls) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "# queue destroy type %s\n", + GNUNET_MESH_DEBUG_M2S (queue->type)); + switch (queue->type) + { + case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY: + case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY: + LOG (GNUNET_ERROR_TYPE_INFO, "destroying a DESTROY message\n"); + /* fall through */ + case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED: + case GNUNET_MESSAGE_TYPE_MESH_ACK: + case GNUNET_MESSAGE_TYPE_MESH_POLL: + case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK: + case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE: + case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN: + LOG (GNUNET_ERROR_TYPE_DEBUG, "# prebuilt message\n");; + GNUNET_free_non_null (queue->cls); + break; + + default: + GNUNET_break (0); + LOG (GNUNET_ERROR_TYPE_ERROR, "# type %s unknown!\n", + GNUNET_MESH_DEBUG_M2S (queue->type)); + } + } + GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue); + + if (queue->type != GNUNET_MESSAGE_TYPE_MESH_ACK && + queue->type != GNUNET_MESSAGE_TYPE_MESH_POLL) + { + peer->queue_n--; + } + + if (NULL != queue->callback) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "# Calling callback\n"); + queue->callback (queue->callback_cls, + queue->c, queue->type, + queue->fwd, queue->size, + GNUNET_TIME_absolute_get_duration (queue->start_waiting)); + } + + GNUNET_free (queue); +} + + /** * @brief Queue and pass message to core when possible. * @@ -965,8 +967,11 @@ queue_send (void *cls, size_t size, void *buf) * @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!) * @param cont Continuation to be called once CORE has taken the message. * @param cont_cls Closure for @c cont. + * + * @return Handle to cancel the message before it is sent. Once cont is called + * message has been sent and therefore the handle is no longer valid. */ -void +struct MeshPeerQueue * GMP_queue_add (struct MeshPeer *peer, void *cls, uint16_t type, size_t size, struct MeshConnection *c, int fwd, GMP_sent cont, void *cont_cls) @@ -985,7 +990,7 @@ GMP_queue_add (struct MeshPeer *peer, void *cls, uint16_t type, size_t size, { /* We are not connected to this peer, ignore request. */ GNUNET_break_op (0); - return; + return NULL; } priority = 0; @@ -1042,6 +1047,7 @@ GMP_queue_add (struct MeshPeer *peer, void *cls, uint16_t type, size_t size, GMP_2s (peer)); } + return queue; } @@ -1067,7 +1073,7 @@ GMP_queue_cancel (struct MeshPeer *peer, struct MeshConnection *c) LOG (GNUNET_ERROR_TYPE_DEBUG, "GMP_cancel_queue %s\n", GNUNET_MESH_DEBUG_M2S (q->type)); - queue_destroy (q, GNUNET_YES); + GMP_queue_destroy (q, GNUNET_YES); /* Get next from prev, q->next might be already freed: * queue destroy -> callback -> GMC_destroy -> cancel_queues -> here diff --git a/src/mesh/gnunet-service-mesh_peer.h b/src/mesh/gnunet-service-mesh_peer.h index 5c82ddcb7..58386c0aa 100644 --- a/src/mesh/gnunet-service-mesh_peer.h +++ b/src/mesh/gnunet-service-mesh_peer.h @@ -45,6 +45,11 @@ extern "C" */ struct MeshPeer; +/** + * Struct containing info about a queued transmission to this peer + */ +struct MeshPeerQueue; + #include "gnunet-service-mesh_connection.h" /** @@ -114,6 +119,16 @@ GMP_get_short (const GNUNET_PEER_Id peer); void GMP_connect (struct MeshPeer *peer); +/** + * Free a transmission that was already queued with all resources + * associated to the request. + * + * @param queue Queue handler to cancel. + * @param clear_cls Is it necessary to free associated cls? + */ +void +GMP_queue_destroy (struct MeshPeerQueue *queue, int clear_cls); + /** * @brief Queue and pass message to core when possible. * @@ -126,8 +141,11 @@ GMP_connect (struct MeshPeer *peer); * @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!) * @param cont Continuation to be called once CORE has taken the message. * @param cont_cls Closure for @c cont. + * + * @return Handle to cancel the message before it is sent. Once cont is called + * message has been sent and therefore the handle is no longer valid. */ -void +struct MeshPeerQueue * GMP_queue_add (struct MeshPeer *peer, void *cls, uint16_t type, size_t size, struct MeshConnection *c, int fwd, GMP_sent cont, void *cont_cls); diff --git a/src/mesh/gnunet-service-mesh_tunnel.c b/src/mesh/gnunet-service-mesh_tunnel.c index 68e9d8684..d745e6da1 100644 --- a/src/mesh/gnunet-service-mesh_tunnel.c +++ b/src/mesh/gnunet-service-mesh_tunnel.c @@ -660,7 +660,8 @@ send_kx (struct MeshTunnel3 *t, } fwd = GMC_is_origin (t->connection_head->c, GNUNET_YES); - GMC_send_prebuilt_message (&msg->header, c, fwd); + /* TODO save handle and cancel in case of a unneeded retransmission */ + GMC_send_prebuilt_message (&msg->header, c, fwd, NULL, NULL); } @@ -2021,7 +2022,8 @@ GMT_send_prebuilt_message (const struct GNUNET_MessageHeader *message, } fwd = GMC_is_origin (c, GNUNET_YES); - GMC_send_prebuilt_message (&msg->header, c, fwd); + /* FIXME allow channels to cancel */ + GMC_send_prebuilt_message (&msg->header, c, fwd, NULL, NULL); } /** -- 2.25.1