From 0866f018ec9465d261664c013e714d0e8a1a0092 Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Thu, 10 Oct 2013 17:23:13 +0000 Subject: [PATCH] - move connection message accounting --- src/mesh/gnunet-service-mesh_connection.c | 57 ++++++++++++--- src/mesh/gnunet-service-mesh_peer.c | 85 ++++------------------- src/mesh/gnunet-service-mesh_peer.h | 6 +- src/mesh/gnunet-service-mesh_tunnel.c | 1 + 4 files changed, 68 insertions(+), 81 deletions(-) diff --git a/src/mesh/gnunet-service-mesh_connection.c b/src/mesh/gnunet-service-mesh_connection.c index 2feeab7cd..63e4b8bd0 100644 --- a/src/mesh/gnunet-service-mesh_connection.c +++ b/src/mesh/gnunet-service-mesh_connection.c @@ -353,20 +353,25 @@ connection_change_state (struct MeshConnection* c, * * @param cls Closure. * @param c Connection this message was on. + * @param type Type of message sent. + * @param fwd Was this a FWD going message? + * @param size Size of the message. * @param wait Time spent waiting for core (only the time for THIS message) */ static void message_sent (void *cls, - struct MeshConnection *c, + struct MeshConnection *c, uint16_t type, + int fwd, size_t size, struct GNUNET_TIME_Relative wait) { struct MeshConnectionPerformance *p; - size_t size = (size_t) cls; + struct MeshFlowControl *fc; double usecsperbyte; if (NULL == c->perf) return; /* Only endpoints are interested in this. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, "! message sent!\n"); p = c->perf; usecsperbyte = ((double) wait.rel_value_us) / size; if (p->size == AVG_MSGS) @@ -386,6 +391,16 @@ message_sent (void *cls, p->avg /= p->size; } p->idx = (p->idx + 1) % AVG_MSGS; + + fc = fwd ? &c->fwd_fc : &c->bck_fc; + LOG (GNUNET_ERROR_TYPE_DEBUG, "! Q_N- %p %u\n", fc, fc->queue_n); + fc->queue_n--; + c->pending_messages--; + if (GNUNET_YES == c->destroy && 0 == c->pending_messages) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "! destroying connection!\n"); + GMC_destroy (c); + } } @@ -528,8 +543,7 @@ send_connection_ack (struct MeshConnection *connection, int fwd) GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK, sizeof (struct GNUNET_MESH_ConnectionACK), connection, NULL, fwd, - &message_sent, - (void *) sizeof (struct GNUNET_MESH_ConnectionACK)); + &message_sent, NULL); if (MESH_TUNNEL3_NEW == GMT_get_state (t)) GMT_change_state (t, MESH_TUNNEL3_WAITING); if (MESH_CONNECTION_READY != connection->state) @@ -1933,9 +1947,11 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, struct MeshChannel *ch, int fwd) { + struct MeshFlowControl *fc; void *data; size_t size; uint16_t type; + int droppable; size = ntohs (message->size); data = GNUNET_malloc (size); @@ -1944,6 +1960,7 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, LOG (GNUNET_ERROR_TYPE_DEBUG, "Send %s (%u) on connection %s\n", GNUNET_MESH_DEBUG_M2S (type), size, GNUNET_h2s (&c->id)); + droppable = GNUNET_YES; switch (type) { struct GNUNET_MESH_Encrypted *emsg; @@ -1972,6 +1989,7 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, amsg = (struct GNUNET_MESH_ACK *) data; amsg->cid = c->id; LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack)); + droppable = GNUNET_NO; break; case GNUNET_MESSAGE_TYPE_MESH_POLL: @@ -1979,6 +1997,7 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, pmsg->cid = c->id; pmsg->pid = htonl (fwd ? c->fwd_fc.last_pid_sent : c->bck_fc.last_pid_sent); LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid)); + droppable = GNUNET_NO; break; case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY: @@ -2001,8 +2020,30 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, GNUNET_break (0); } + fc = fwd ? &c->fwd_fc : &c->bck_fc; + if (fc->queue_n >= fc->queue_max && droppable) + { + GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)", + 1, GNUNET_NO); + GNUNET_break (0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "queue full: %u/%u\n", + fc->queue_n, fc->queue_max); + return; /* Drop this message */ + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid %u\n", fc->last_pid_sent); + LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", fc->last_ack_recv); + LOG (GNUNET_ERROR_TYPE_DEBUG, " Q_N+ %p %u\n", fc, fc->queue_n); + if (GMC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv)) + { + GMC_start_poll (c, fwd); + } + fc->queue_n++; + c->pending_messages++; + GMP_queue_add (get_hop (c, fwd), data, type, size, c, ch, fwd, - &message_sent, (void *) size); + &message_sent, NULL); } @@ -2023,10 +2064,8 @@ enum MeshTunnel3State state; LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection create\n"); GMP_queue_add (get_next_hop (connection), NULL, GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE, - size, - connection, - NULL, - GNUNET_YES, &message_sent, (void *) size); + size, connection, NULL, + GNUNET_YES, &message_sent, NULL); state = GMT_get_state (connection->t); if (MESH_TUNNEL3_SEARCHING == state || MESH_TUNNEL3_NEW == state) GMT_change_state (connection->t, MESH_TUNNEL3_WAITING); diff --git a/src/mesh/gnunet-service-mesh_peer.c b/src/mesh/gnunet-service-mesh_peer.c index 6d87960b5..f997cf838 100644 --- a/src/mesh/gnunet-service-mesh_peer.c +++ b/src/mesh/gnunet-service-mesh_peer.c @@ -768,7 +768,6 @@ static size_t queue_send (void *cls, size_t size, void *buf) { struct MeshPeer *peer = cls; - struct MeshFlowControl *fc; struct MeshConnection *c; struct GNUNET_MessageHeader *msg; struct MeshPeerQueue *queue; @@ -798,7 +797,6 @@ queue_send (void *cls, size_t size, void *buf) } c = queue->c; fwd = queue->fwd; - fc = fwd ? &c->fwd_fc : &c->bck_fc; dst_id = GNUNET_PEER_resolve2 (peer->id); LOG (GNUNET_ERROR_TYPE_DEBUG, "* towards %s\n", GNUNET_i2s (dst_id)); @@ -825,7 +823,7 @@ queue_send (void *cls, size_t size, void *buf) /* Fill buf */ switch (queue->type) { - case GNUNET_MESSAGE_TYPE_MESH_TUNNEL3_DESTROY: + case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY: case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY: case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN: case GNUNET_MESSAGE_TYPE_MESH_FWD: @@ -877,14 +875,6 @@ queue_send (void *cls, size_t size, void *buf) data_size = 0; } - if (NULL != queue->callback) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "* Calling callback\n"); - queue->callback (queue->callback_cls, - queue->c, - GNUNET_TIME_absolute_get_duration (queue->start_waiting)); - } - /* Free queue, but cls was freed by send_core_* */ ch = queue->ch; GMP_queue_destroy (queue, GNUNET_NO); @@ -940,22 +930,13 @@ queue_send (void *cls, size_t size, void *buf) fc->poll_task = GNUNET_SCHEDULER_NO_TASK; } } - if (NULL != c) - { - c->pending_messages--; - if (GNUNET_YES == c->destroy && 0 == c->pending_messages) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "* destroying connection!\n"); - GMC_destroy (c); - } - } if (NULL != t) { t->pending_messages--; if (GNUNET_YES == t->destroy && 0 == t->pending_messages) { -// LOG (GNUNET_ERROR_TYPE_DEBUG, "* destroying tunnel!\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "* destroying tunnel!\n"); GMT_destroy (t); } } @@ -1021,24 +1002,19 @@ void GMP_queue_destroy (struct MeshPeerQueue *queue, int clear_cls) { struct MeshPeer *peer; - struct MeshFlowControl *fc; - int fwd; - fwd = queue->fwd; peer = queue->peer; GNUNET_assert (NULL != queue->c); - fc = fwd ? &queue->c->fwd_fc : &queue->c->bck_fc; if (GNUNET_YES == clear_cls) { - LOG (GNUNET_ERROR_TYPE_DEBUG, " queue destroy type %s\n", + LOG (GNUNET_ERROR_TYPE_DEBUG, "# queue destroy type %s\n", GNUNET_MESH_DEBUG_M2S (queue->type)); switch (queue->type) { case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY: - case GNUNET_MESSAGE_TYPE_MESH_TUNNEL3_DESTROY: + case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY: LOG (GNUNET_ERROR_TYPE_INFO, "destroying a DESTROY message\n"); - GNUNET_break (GNUNET_YES == queue->c->destroy); /* fall through */ case GNUNET_MESSAGE_TYPE_MESH_FWD: case GNUNET_MESSAGE_TYPE_MESH_BCK: @@ -1047,33 +1023,31 @@ GMP_queue_destroy (struct MeshPeerQueue *queue, int clear_cls) case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK: case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE: case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN: - LOG (GNUNET_ERROR_TYPE_DEBUG, " prebuilt message\n");; + LOG (GNUNET_ERROR_TYPE_DEBUG, "# prebuilt message\n");; GNUNET_free_non_null (queue->cls); break; default: GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_ERROR, " type %s unknown!\n", + LOG (GNUNET_ERROR_TYPE_ERROR, "# type %s unknown!\n", GNUNET_MESH_DEBUG_M2S (queue->type)); } - } GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue); if (queue->type != GNUNET_MESSAGE_TYPE_MESH_ACK && queue->type != GNUNET_MESSAGE_TYPE_MESH_POLL) { - LOG (GNUNET_ERROR_TYPE_DEBUG, " Q_N- %p %u\n", fc, fc->queue_n); - fc->queue_n--; peer->queue_n--; } - if (NULL != queue->c) + + if (NULL != queue->callback) { - queue->c->pending_messages--; - if (NULL != queue->c->t) - { - queue->c->t->pending_messages--; - } + LOG (GNUNET_ERROR_TYPE_DEBUG, "# Calling callback\n"); + queue->callback (queue->callback_cls, + queue->c, queue->type, + queue->fwd, queue->size, + GNUNET_TIME_absolute_get_duration (queue->start_waiting)); } GNUNET_free (queue); @@ -1124,34 +1098,8 @@ GMP_queue_add (struct MeshPeer *peer, void *cls, uint16_t type, size_t size, } LOG (GNUNET_ERROR_TYPE_DEBUG, "priority %d\n", priority); - LOG (GNUNET_ERROR_TYPE_DEBUG, "fc %p\n", fc); - if (fc->queue_n >= fc->queue_max && 0 == priority) - { - GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)", - 1, GNUNET_NO); - GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "queue full: %u/%u\n", - fc->queue_n, fc->queue_max); - return; /* Drop this message */ - } - LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid %u\n", fc->last_pid_sent); - LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", fc->last_ack_recv); - if (GMC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv)) - { - call_core = GNUNET_NO; - if (GNUNET_SCHEDULER_NO_TASK == fc->poll_task && - GNUNET_MESSAGE_TYPE_MESH_POLL != type) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "no buffer space (%u > %u): starting poll\n", - fc->last_pid_sent + 1, fc->last_ack_recv); - GMC_start_poll (c, fwd); - } - } - else - call_core = GNUNET_YES; + call_core = GMC_is_sendable (c, fwd); queue = GNUNET_malloc (sizeof (struct MeshPeerQueue)); queue->cls = cls; queue->type = type; @@ -1181,8 +1129,6 @@ GMP_queue_add (struct MeshPeer *peer, void *cls, uint16_t type, size_t size, else { GNUNET_CONTAINER_DLL_insert_tail (peer->queue_head, peer->queue_tail, queue); - LOG (GNUNET_ERROR_TYPE_DEBUG, " Q_N+ %p %u\n", fc, fc->queue_n); - fc->queue_n++; peer->queue_n++; } @@ -1209,9 +1155,6 @@ GMP_queue_add (struct MeshPeer *peer, void *cls, uint16_t type, size_t size, peer2s (peer)); } - c->pending_messages++; - if (NULL != c->t) - c->t->pending_messages++; } diff --git a/src/mesh/gnunet-service-mesh_peer.h b/src/mesh/gnunet-service-mesh_peer.h index abc45f0c9..c7265ce2f 100644 --- a/src/mesh/gnunet-service-mesh_peer.h +++ b/src/mesh/gnunet-service-mesh_peer.h @@ -50,10 +50,14 @@ struct MeshPeer; * * @param cls Closure. * @param c Connection this message was on. + * @param type Type of message sent. + * @param fwd Was this a FWD going message? + * @param size Size of the message. * @param wait Time spent waiting for core (only the time for THIS message) */ typedef void (*GMP_sent) (void *cls, - struct MeshConnection *c, + struct MeshConnection *c, uint16_t type, + int fwd, size_t size, struct GNUNET_TIME_Relative wait); #include "gnunet-service-mesh_connection.h" diff --git a/src/mesh/gnunet-service-mesh_tunnel.c b/src/mesh/gnunet-service-mesh_tunnel.c index 286eca618..c5ccc9f5c 100644 --- a/src/mesh/gnunet-service-mesh_tunnel.c +++ b/src/mesh/gnunet-service-mesh_tunnel.c @@ -1151,6 +1151,7 @@ GMT_send_prebuilt_message (const struct GNUNET_MessageHeader *message, } msg->reserved = 0; + t->pending_messages++; GMC_send_prebuilt_message (&msg->header, c, ch, fwd); } -- 2.25.1