From a1cd4cff087fbead6d7f151554e70932e48ff80d Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 30 Aug 2011 11:52:57 +0000 Subject: [PATCH] delay calling notify from notify_transmit_ready --- src/mesh/mesh_api_new.c | 213 +++++++++++++++++++++------------------- 1 file changed, 112 insertions(+), 101 deletions(-) diff --git a/src/mesh/mesh_api_new.c b/src/mesh/mesh_api_new.c index 74357e44a..2aa7a9b2f 100644 --- a/src/mesh/mesh_api_new.c +++ b/src/mesh/mesh_api_new.c @@ -66,18 +66,31 @@ struct GNUNET_MESH_queue * Double Linked list */ struct GNUNET_MESH_queue *next; + struct GNUNET_MESH_queue *prev; /** - * Data itself + * Data itself, currently points to the end of this struct if + * we have a message already, NULL if the message is to be + * obtained from the callback. */ - void *data; + const struct GNUNET_MessageHeader *data; - /** - * Size of the data to follow - */ - uint16_t size; + /** + * Callback to obtain the message to transmit, or NULL if we + * got the message in 'data'. + */ + GNUNET_CONNECTION_TransmitReadyNotify notify; + + /** + * Closure for 'notify' + */ + void *notify_cls; + /** + * Size of 'data' -- or the desired size of 'notify' if 'data' is NULL. + */ + size_t size; }; @@ -140,6 +153,8 @@ struct GNUNET_MESH_Handle unsigned int n_applications; + unsigned int max_queue_size; + /** * Have we started the task to receive messages from the service * yet? We do this after we send the 'MESH_LOCAL_CONNECT' message. @@ -480,7 +495,7 @@ send_raw (void *cls, size_t size, void *buf) if (sizeof (struct GNUNET_MessageHeader) > size) { GNUNET_break (0); - GNUNET_assert (sizeof (struct GNUNET_MessageHeader) > q->size); + GNUNET_assert (sizeof (struct GNUNET_MessageHeader) > ntohs (q->data->size)); h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, q->size, GNUNET_TIME_UNIT_FOREVER_REL, @@ -488,10 +503,18 @@ send_raw (void *cls, size_t size, void *buf) return 0; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh: type: %i\n", - ntohs (((struct GNUNET_MessageHeader *) q->data)->type)); - memcpy (buf, q->data, q->size); - GNUNET_free (q->data); - size = q->size; + ntohs (q->data->type)); + if (NULL == q->data) + { + // FIXME: need to encapsulate message with information about + // the target (if data message -- or use wrapper for callback...) + size = q->notify (q->notify_cls, size, buf); + } + else + { + memcpy (buf, q->data, q->size); + size = q->size; + } GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, q); GNUNET_free (q); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh: size: %u\n", size); @@ -520,22 +543,25 @@ send_raw (void *cls, size_t size, void *buf) * Takes care of creating a new queue element and calling the tmt_rdy function * if necessary. * @param h mesh handle - * @param size size of the packet to transmit - * @param data packet itself + * @param msg message to transmit */ static void -send_packet (struct GNUNET_MESH_Handle *h, size_t size, void *data) +send_packet (struct GNUNET_MESH_Handle *h, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_MESH_queue *q; + size_t msize; - q = GNUNET_malloc (sizeof (struct GNUNET_MESH_queue)); - q->size = size; - q->data = data; + msize = ntohs (msg->size); + q = GNUNET_malloc (sizeof (struct GNUNET_MESH_queue) + msize); + q->size = msize; + q->data = (void*) &q[1]; + memcpy (&q[1], msg, msize); GNUNET_CONTAINER_DLL_insert_tail (h->queue_head, h->queue_tail, q); if (NULL != h->th) return; h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, size, + GNUNET_CLIENT_notify_transmit_ready (h->client, msize, GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_YES, &send_raw, h); } @@ -575,7 +601,7 @@ GNUNET_MESH_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh: GNUNET_MESH_connect()\n"); h = GNUNET_malloc (sizeof (struct GNUNET_MESH_Handle)); - + h->max_queue_size = MESH_API_MAX_QUEUE; /* FIXME: add to arguments to 'GNUNET_MESH_connect' */ h->cleaner = cleaner; h->client = GNUNET_CLIENT_connect ("mesh", cfg); if (h->client == NULL) @@ -593,33 +619,33 @@ GNUNET_MESH_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls, /* count handlers and apps, calculate size */ for (h->n_handlers = 0; handlers[h->n_handlers].type; h->n_handlers++) ; for (h->n_applications = 0; stypes[h->n_applications]; h->n_applications++) ; + size = sizeof (struct GNUNET_MESH_ClientConnect); size += h->n_handlers * sizeof (uint16_t); size += h->n_applications * sizeof (GNUNET_MESH_ApplicationType); - /* build connection packet */ - msg = GNUNET_malloc (size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT); - msg->header.size = htons (size); - types = (uint16_t *) & msg[1]; - for (ntypes = 0; ntypes < h->n_handlers; ntypes++) - { - types[ntypes] = h->message_handlers[ntypes].type; - } - apps = (GNUNET_MESH_ApplicationType *) &types[ntypes]; - for (napps = 0; napps < h->n_applications; napps++) { - apps[napps] = h->applications[napps]; - } - msg->applications = htons (napps); - msg->types = htons (ntypes); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "mesh: Sending %lu bytes long message %d types and %d apps\n", - ntohs (msg->header.size), ntypes, napps); - - send_packet (h, size, msg); + char buf[size]; + + /* build connection packet */ + msg = (struct GNUNET_MESH_ClientConnect *) buf; + msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT); + msg->header.size = htons (size); + types = (uint16_t *) & msg[1]; + for (ntypes = 0; ntypes < h->n_handlers; ntypes++) + types[ntypes] = h->message_handlers[ntypes].type; + apps = (GNUNET_MESH_ApplicationType *) &types[ntypes]; + for (napps = 0; napps < h->n_applications; napps++) + apps[napps] = h->applications[napps]; + msg->applications = htons (napps); + msg->types = htons (ntypes); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "mesh: Sending %lu bytes long message %d types and %d apps\n", + ntohs (msg->header.size), ntypes, napps); + + send_packet (h, &msg->header); + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh: GNUNET_MESH_connect() END\n"); @@ -663,7 +689,7 @@ GNUNET_MESH_tunnel_create (struct GNUNET_MESH_Handle *h, disconnect_handler, void *handler_cls) { struct GNUNET_MESH_Tunnel *t; - struct GNUNET_MESH_TunnelMessage *msg; + struct GNUNET_MESH_TunnelMessage msg; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "mesh: Creating new tunnel\n"); t = GNUNET_malloc (sizeof (struct GNUNET_MESH_Tunnel)); @@ -675,13 +701,10 @@ GNUNET_MESH_tunnel_create (struct GNUNET_MESH_Handle *h, t->tid = h->next_tid++; h->next_tid |= GNUNET_MESH_LOCAL_TUNNEL_ID_MARK; // keep in range - msg = GNUNET_malloc (sizeof (struct GNUNET_MESH_TunnelMessage)); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE); - msg->header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage)); - msg->tunnel_id = htonl (t->tid); - - send_packet (h, sizeof (struct GNUNET_MESH_TunnelMessage), msg); - + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE); + msg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage)); + msg.tunnel_id = htonl (t->tid); + send_packet (h, &msg.header); return t; } @@ -707,7 +730,7 @@ GNUNET_MESH_tunnel_destroy (struct GNUNET_MESH_Tunnel *tun) GNUNET_free (tun); - send_packet (h, sizeof (struct GNUNET_MESH_TunnelMessage), msg); + send_packet (h, &msg->header); } @@ -748,7 +771,7 @@ GNUNET_MESH_peer_request_connect_add (struct GNUNET_MESH_Tunnel *tunnel, msg->tunnel_id = htonl (tunnel->tid); memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity)); - send_packet (tunnel->mesh, sizeof (struct GNUNET_MESH_PeerControl), msg); + send_packet (tunnel->mesh, &msg->header); // tunnel->connect_handler (tunnel->cls, peer, NULL); FIXME call this later // TODO: remember timeout @@ -767,41 +790,34 @@ void GNUNET_MESH_peer_request_connect_del (struct GNUNET_MESH_Tunnel *tunnel, const struct GNUNET_PeerIdentity *peer) { - struct GNUNET_MESH_PeerControl *msg; + struct GNUNET_MESH_PeerControl msg; GNUNET_PEER_Id peer_id; unsigned int i; peer_id = GNUNET_PEER_search (peer); if (0 == peer_id) - return; + { + GNUNET_break (0); + return; + } for (i = 0; i < tunnel->npeers; i++) - { if (tunnel->peers[i] == peer_id) + break; + if (i == tunnel->npeers) { - GNUNET_PEER_change_rc (peer_id, -1); - tunnel->npeers--; - while (i < tunnel->npeers) - { - tunnel->peers[i] = tunnel->peers[i + 1]; - i++; - } - tunnel->peers = - GNUNET_realloc (tunnel->peers, - tunnel->npeers * sizeof (GNUNET_PEER_Id)); - msg = GNUNET_malloc (sizeof (struct GNUNET_MESH_PeerControl)); - msg->header.size = htons (sizeof (struct GNUNET_MESH_PeerControl)); - msg->header.type = - htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT_PEER_DEL); - msg->tunnel_id = htonl (tunnel->tid); - memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity)); - - send_packet (tunnel->mesh, sizeof (struct GNUNET_MESH_PeerControl), msg); - + GNUNET_break (0); return; } - } - // TODO: remember timeout - return; + GNUNET_PEER_change_rc (peer_id, -1); + tunnel->peers[i] = tunnel->peers[tunnel->npeers-1]; + GNUNET_array_grow (tunnel->peers, + tunnel->npeers, + tunnel->npeers - 1); + msg.header.size = htons (sizeof (struct GNUNET_MESH_PeerControl)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT_PEER_DEL); + msg.tunnel_id = htonl (tunnel->tid); + memcpy (&msg.peer, peer, sizeof (struct GNUNET_PeerIdentity)); + send_packet (tunnel->mesh, &msg.header); } @@ -819,19 +835,14 @@ GNUNET_MESH_peer_request_connect_by_type (struct GNUNET_MESH_Tunnel *tunnel, struct GNUNET_TIME_Relative timeout, GNUNET_MESH_ApplicationType app_type) { - struct GNUNET_MESH_ConnectPeerByType *msg; + struct GNUNET_MESH_ConnectPeerByType msg; - msg = GNUNET_malloc (sizeof (struct GNUNET_MESH_ConnectPeerByType)); - msg->header.size = htons (sizeof (struct GNUNET_MESH_ConnectPeerByType)); - msg->header.type = - htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT_PEER_BY_TYPE); - msg->tunnel_id = htonl (tunnel->tid); - msg->type = htonl (app_type); - - send_packet (tunnel->mesh, sizeof (struct GNUNET_MESH_ConnectPeerByType), - msg); + msg.header.size = htons (sizeof (struct GNUNET_MESH_ConnectPeerByType)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_CONNECT_PEER_BY_TYPE); + msg.tunnel_id = htonl (tunnel->tid); + msg.type = htonl (app_type); + send_packet (tunnel->mesh, &msg.header); // TODO: remember timeout - return; } @@ -867,23 +878,24 @@ GNUNET_MESH_notify_transmit_ready (struct GNUNET_MESH_Tunnel *tunnel, int cork, void *notify_cls) { struct GNUNET_MESH_TransmitHandle *handle; + struct GNUNET_MESH_queue *q; + if (get_queue_length (tunnel->mesh) >= tunnel->mesh->max_queue_size) + return NULL; /* queue full */ + + // FIXME: priority, maxdelay, target! (keep in 'handle') handle = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle)); handle->t = tunnel; - handle->q = GNUNET_malloc (sizeof (struct GNUNET_MESH_queue)); - handle->q->size = notify_size; - handle->q->data = GNUNET_malloc (notify_size); - - if (get_queue_length (tunnel->mesh) < MESH_API_MAX_QUEUE) - { - notify (notify_cls, notify_size, handle->q->data); - GNUNET_CONTAINER_DLL_insert_tail (tunnel->mesh->queue_head, - tunnel->mesh->queue_tail, handle->q); - } - else - { - // TODO dataless - queue - } + handle->q = q = GNUNET_malloc (sizeof (struct GNUNET_MESH_queue)); + q->size = notify_size; + q->data = NULL; + q->notify = notify; + q->notify_cls = notify_cls; + // FIXME: insert by priority!? + // FIXME: distinguish between control messages (MESH_LOCAL_CONNECT) and data + // messages? + GNUNET_CONTAINER_DLL_insert_tail (tunnel->mesh->queue_head, + tunnel->mesh->queue_tail, q); return handle; } @@ -900,7 +912,6 @@ GNUNET_MESH_notify_transmit_ready_cancel (struct GNUNET_MESH_TransmitHandle *th) GNUNET_CONTAINER_DLL_remove (th->t->mesh->queue_head, th->t->mesh->queue_tail, th->q); // TODO remove from dataless queue - GNUNET_free (th->q->data); GNUNET_free (th->q); GNUNET_free (th); } -- 2.25.1