From 2855795582ac050551da45a04a687e73c2c3d0d8 Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Wed, 25 Jul 2012 17:34:21 +0000 Subject: [PATCH] - adapt callback to ack --- src/mesh/mesh_api.c | 88 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 70 insertions(+), 18 deletions(-) diff --git a/src/mesh/mesh_api.c b/src/mesh/mesh_api.c index 8117b9354..aee4c2a7b 100644 --- a/src/mesh/mesh_api.c +++ b/src/mesh/mesh_api.c @@ -23,6 +23,7 @@ * STRUCTURE: * - CONSTANTS * - DATA STRUCTURES + * - DECLARATIONS * - AUXILIARY FUNCTIONS * - RECEIVE HANDLERS * - SEND FUNCTIONS @@ -307,9 +308,9 @@ struct GNUNET_MESH_Tunnel unsigned int npeers; /** - * Number of packets queued in this tunnel + * Size of packet queued in this tunnel */ - unsigned int npackets; + unsigned int packet_size; /** * Number of applications requested this tunnel @@ -340,6 +341,24 @@ struct GNUNET_MESH_Tunnel }; +/******************************************************************************/ +/*********************** DECLARATIONS *************************/ +/******************************************************************************/ + +/** + * Function called to send a message to the service. + * "buf" will be NULL and "size" zero if the socket was closed for writing in + * the meantime. + * + * @param cls closure, the mesh handle + * @param size number of bytes available in buf + * @param buf where the callee should write the connect message + * @return number of bytes written to buf + */ +static size_t +send_callback (void *cls, size_t size, void *buf); + + /******************************************************************************/ /*********************** AUXILIARY FUNCTIONS *************************/ /******************************************************************************/ @@ -1079,6 +1098,11 @@ process_ack (struct GNUNET_MESH_Handle *h, ack = ntohl (msg->max_pid); if (ack > t->max_pid || PID_OVERFLOW (t->max_pid, ack)) t->max_pid = ack; + if (NULL == h->th && 0 < t->packet_size) + h->th = + GNUNET_CLIENT_notify_transmit_ready (h->client, t->packet_size, + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_YES, &send_callback, h); } @@ -1157,6 +1181,8 @@ send_callback (void *cls, size_t size, void *buf) { struct GNUNET_MESH_Handle *h = cls; struct GNUNET_MESH_TransmitHandle *th; + struct GNUNET_MESH_TransmitHandle *next; + struct GNUNET_MESH_Tunnel *t; char *cbuf = buf; size_t tsize; size_t psize; @@ -1170,11 +1196,18 @@ send_callback (void *cls, size_t size, void *buf) return 0; } tsize = 0; - while ((NULL != (th = h->th_head)) && (size >= th->size)) + while ((NULL != (th = next)) && (size >= th->size)) { + t = th->tunnel; if (NULL != th->notify) { - if (th->tunnel->tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_SERV) + if (t->max_pid < t->pid && ! PID_OVERFLOW (t->pid, t->max_pid)) { + /* This tunnel is not ready to transmit yet, try next message */ + next = th->next; + continue; + } + t->packet_size = 0; + if (t->tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_SERV) { /* traffic to origin */ struct GNUNET_MESH_ToOrigin to; @@ -1191,7 +1224,8 @@ send_callback (void *cls, size_t size, void *buf) GNUNET_assert (size >= psize); to.header.size = htons (psize); to.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN); - to.tid = htonl (th->tunnel->tid); + to.tid = htonl (t->tid); + // FIXME pid? memset (&to.oid, 0, sizeof (struct GNUNET_PeerIdentity)); memset (&to.sender, 0, sizeof (struct GNUNET_PeerIdentity)); memcpy (cbuf, &to, sizeof (to)); @@ -1214,8 +1248,8 @@ send_callback (void *cls, size_t size, void *buf) GNUNET_assert (size >= psize); mc.header.size = htons (psize); mc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_MULTICAST); - mc.tid = htonl (th->tunnel->tid); - mc.pid = 0; + mc.tid = htonl (t->tid); + mc.pid = htonl (t->pid); mc.ttl = 0; memset (&mc.oid, 0, sizeof (struct GNUNET_PeerIdentity)); memcpy (cbuf, &mc, sizeof (mc)); @@ -1238,12 +1272,14 @@ send_callback (void *cls, size_t size, void *buf) GNUNET_assert (size >= psize); uc.header.size = htons (psize); uc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_UNICAST); - uc.tid = htonl (th->tunnel->tid); + uc.tid = htonl (t->tid); + uc.pid = htonl (t->pid); memset (&uc.oid, 0, sizeof (struct GNUNET_PeerIdentity)); GNUNET_PEER_resolve (th->target, &uc.destination); memcpy (cbuf, &uc, sizeof (uc)); } } + t->pid++; } else { @@ -1255,25 +1291,42 @@ send_callback (void *cls, size_t size, void *buf) } if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK) GNUNET_SCHEDULER_cancel (th->timeout_task); - if (NULL != th->notify) - { - th->tunnel->npackets--; - } GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th); GNUNET_free (th); + next = h->th_head; cbuf += psize; size -= psize; tsize += psize; } LOG (GNUNET_ERROR_TYPE_DEBUG, " total size: %u\n", tsize); - if (NULL != (th = h->th_head)) + if (NULL != h->th_head) { - LOG (GNUNET_ERROR_TYPE_DEBUG, " next size: %u\n", th->size); - if (NULL == h->th) + int request = GNUNET_NO; + + for (th = h->th_head; NULL != th; th = th->next) + { + struct GNUNET_MESH_Tunnel *t = th->tunnel; + + if (NULL != th->notify || + (t->max_pid >= t->pid || PID_OVERFLOW (t->pid, t->max_pid))) + { + request = GNUNET_YES; + break; + } + } + + if (GNUNET_YES == request) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " next size: %u\n", th->size); h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, th->size, GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_YES, &send_callback, h); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, " nothing left to transmit\n"); + } } LOG (GNUNET_ERROR_TYPE_DEBUG, "Send packet() END\n"); if (GNUNET_NO == h->in_receive) @@ -1863,8 +1916,7 @@ GNUNET_MESH_notify_transmit_ready (struct GNUNET_MESH_Tunnel *tunnel, int cork, else LOG (GNUNET_ERROR_TYPE_DEBUG, " target multicast\n"); GNUNET_assert (NULL != notify); - GNUNET_assert (0 == tunnel->npackets); - tunnel->npackets++; + GNUNET_assert (0 == tunnel->packet_size); // Only one data packet allowed th = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle)); th->tunnel = tunnel; th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); @@ -1875,7 +1927,7 @@ GNUNET_MESH_notify_transmit_ready (struct GNUNET_MESH_Tunnel *tunnel, int cork, overhead = sizeof (struct GNUNET_MESH_Multicast); else overhead = sizeof (struct GNUNET_MESH_Unicast); - th->size = notify_size + overhead; + tunnel->packet_size = th->size = notify_size + overhead; th->notify = notify; th->notify_cls = notify_cls; add_to_queue (tunnel->mesh, th); -- 2.25.1