From 439fed4dda3de557a37de85c08be9ac29746e6b7 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 5 Jul 2016 14:17:50 +0000 Subject: [PATCH] -reworking DV to use new MQ API --- src/dv/dv.h | 29 +- src/dv/dv_api.c | 651 ++++++++++---------------------- src/dv/gnunet-service-dv.c | 50 --- src/dv/plugin_transport_dv.c | 157 +------- src/include/gnunet_dv_service.h | 21 +- 5 files changed, 210 insertions(+), 698 deletions(-) diff --git a/src/dv/dv.h b/src/dv/dv.h index e86fdfd15..844cfb5e1 100644 --- a/src/dv/dv.h +++ b/src/dv/dv.h @@ -126,34 +126,9 @@ struct GNUNET_DV_SendMessage struct GNUNET_MessageHeader header; /** - * Unique ID for this message, for confirm callback, must never be zero. + * Reserved for alignment. 0. */ - uint32_t uid GNUNET_PACKED; - - /** - * The (actual) target of the message - */ - struct GNUNET_PeerIdentity target; - -}; - - -/** - * Message from service to DV plugin, saying that a - * SEND request was handled. - */ -struct GNUNET_DV_AckMessage -{ - /** - * Type: #GNUNET_MESSAGE_TYPE_DV_SEND_ACK or - * #GNUNET_MESSAGE_TYPE_DV_SEND_NACK. - */ - struct GNUNET_MessageHeader header; - - /** - * Which message is being acknowledged? - */ - uint32_t uid GNUNET_PACKED; + uint32_t reserved GNUNET_PACKED; /** * The (actual) target of the message diff --git a/src/dv/dv_api.c b/src/dv/dv_api.c index d74453376..76a6ea484 100644 --- a/src/dv/dv_api.c +++ b/src/dv/dv_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009--2013 GNUnet e.V. + Copyright (C) 2009--2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -34,60 +34,6 @@ #define LOG(kind,...) GNUNET_log_from (kind, "dv-api",__VA_ARGS__) -/** - * Information we track for each peer. - */ -struct ConnectedPeer; - - -/** - * Handle for a send operation. - */ -struct GNUNET_DV_TransmitHandle -{ - /** - * Kept in a DLL. - */ - struct GNUNET_DV_TransmitHandle *next; - - /** - * Kept in a DLL. - */ - struct GNUNET_DV_TransmitHandle *prev; - - /** - * Handle to the service. - */ - struct GNUNET_DV_ServiceHandle *sh; - - /** - * Function to call upon completion. - */ - GNUNET_DV_MessageSentCallback cb; - - /** - * Closure for @a cb. - */ - void *cb_cls; - - /** - * The actual message (allocated at the end of this struct). - */ - const struct GNUNET_MessageHeader *msg; - - /** - * Destination for the message. - */ - struct ConnectedPeer *target; - - /** - * UID of our message, if any. - */ - uint32_t uid; - -}; - - /** * Information we track for each peer. */ @@ -99,22 +45,6 @@ struct ConnectedPeer */ struct GNUNET_PeerIdentity pid; - /** - * Head of DLL of transmission handles where we need - * to invoke a continuation when we are informed about - * successful transmission. The respective request - * has already been sent to the DV service. - */ - struct GNUNET_DV_TransmitHandle *head; - - /** - * Tail of DLL of transmission handles where we need - * to invoke a continuation when we are informed about - * successful transmission. The respective request - * has already been sent to the DV service. - */ - struct GNUNET_DV_TransmitHandle *tail; - }; @@ -127,12 +57,7 @@ struct GNUNET_DV_ServiceHandle /** * Connection to DV service. */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Active request for transmission to DV service. - */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_MQ_Handle *mq; /** * Our configuration. @@ -164,27 +89,12 @@ struct GNUNET_DV_ServiceHandle */ GNUNET_DV_MessageReceivedCallback message_cb; - /** - * Head of messages to transmit. - */ - struct GNUNET_DV_TransmitHandle *th_head; - - /** - * Tail of messages to transmit. - */ - struct GNUNET_DV_TransmitHandle *th_tail; - /** * Information tracked per connected peer. Maps peer * identities to `struct ConnectedPeer` entries. */ struct GNUNET_CONTAINER_MultiPeerMap *peers; - /** - * Current unique ID - */ - uint32_t uid_gen; - }; @@ -198,120 +108,160 @@ reconnect (struct GNUNET_DV_ServiceHandle *sh); /** - * Start sending messages from our queue to the service. + * We got disconnected from the service and thus all of the + * connections need to be torn down. * - * @param sh service handle + * @param cls the `struct GNUNET_DV_ServiceHandle` + * @param key a peer identity + * @param value a `struct ConnectedPeer` to clean up + * @return #GNUNET_OK (continue to iterate) */ -static void -start_transmit (struct GNUNET_DV_ServiceHandle *sh); +static int +cleanup_send_cb (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) +{ + struct GNUNET_DV_ServiceHandle *sh = cls; + struct ConnectedPeer *peer = value; + + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (sh->peers, + key, + peer)); + sh->disconnect_cb (sh->cls, + key); + GNUNET_free (peer); + return GNUNET_OK; +} /** - * Gives a message from our queue to the DV service. + * Handles a message sent from the DV service to us. + * Parse it out and give it to the plugin. * - * @param cls handle to the dv service (`struct GNUNET_DV_ServiceHandle`) - * @param size how many bytes can we send - * @param buf where to copy the message to send - * @return how many bytes we copied to @a buf + * @param cls the handle to the DV API + * @param cm the message that was received */ -static size_t -transmit_pending (void *cls, size_t size, void *buf) +static void +handle_connect (void *cls, + const struct GNUNET_DV_ConnectMessage *cm) { struct GNUNET_DV_ServiceHandle *sh = cls; - char *cbuf = buf; - struct GNUNET_DV_TransmitHandle *th; - size_t ret; - size_t tsize; + struct ConnectedPeer *peer; - sh->th = NULL; - if (NULL == buf) + peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, + &cm->peer); + if (NULL != peer) { + GNUNET_break (0); reconnect (sh); - return 0; + return; } - ret = 0; - while ( (NULL != (th = sh->th_head)) && - (size - ret >= (tsize = ntohs (th->msg->size)) )) + peer = GNUNET_new (struct ConnectedPeer); + peer->pid = cm->peer; + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_put (sh->peers, + &peer->pid, + peer, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + sh->connect_cb (sh->cls, + &cm->peer, + ntohl (cm->distance), + (enum GNUNET_ATS_Network_Type) ntohl (cm->network)); +} + + +/** + * Handles a message sent from the DV service to us. + * Parse it out and give it to the plugin. + * + * @param cls the handle to the DV API + * @param dm the message that was received + */ +static void +handle_disconnect (void *cls, + const struct GNUNET_DV_DisconnectMessage *dm) +{ + struct GNUNET_DV_ServiceHandle *sh = cls; + struct ConnectedPeer *peer; + + peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, + &dm->peer); + if (NULL == peer) { - GNUNET_CONTAINER_DLL_remove (sh->th_head, - sh->th_tail, - th); - memcpy (&cbuf[ret], th->msg, tsize); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Passing %u bytes of type %u to DV service\n", - tsize, - ntohs (th->msg->type)); - th->msg = NULL; - ret += tsize; - if (NULL != th->cb) - { - GNUNET_CONTAINER_DLL_insert_tail (th->target->head, - th->target->tail, - th); - } - else - { - GNUNET_free (th); - } + GNUNET_break (0); + reconnect (sh); + return; } - if (NULL != sh->th_head) - start_transmit (sh); - return ret; + cleanup_send_cb (sh, + &dm->peer, + peer); } /** - * Start sending messages from our queue to the service. + * Handles a message sent from the DV service to us. + * Parse it out and give it to the plugin. * - * @param sh service handle + * @param cls the handle to the DV API + * @param msg the message that was received */ static void -start_transmit (struct GNUNET_DV_ServiceHandle *sh) +handle_distance_update (void *cls, + const struct GNUNET_DV_DistanceUpdateMessage *dum) { - if (NULL != sh->th) - return; - if (NULL == sh->th_head) + struct GNUNET_DV_ServiceHandle *sh = cls; + struct ConnectedPeer *peer; + + peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, + &dum->peer); + if (NULL == peer) + { + GNUNET_break (0); + reconnect (sh); return; - sh->th = - GNUNET_CLIENT_notify_transmit_ready (sh->client, - ntohs (sh->th_head->msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &transmit_pending, sh); + } + sh->distance_cb (sh->cls, + &dum->peer, + ntohl (dum->distance), + (enum GNUNET_ATS_Network_Type) ntohl (dum->network)); } /** - * We got disconnected from the service and thus all of the - * pending send callbacks will never be confirmed. Clean up. + * Handles a message sent from the DV service to us. + * Parse it out and give it to the plugin. * - * @param cls the 'struct GNUNET_DV_ServiceHandle' - * @param key a peer identity - * @param value a `struct ConnectedPeer` to clean up - * @return #GNUNET_OK (continue to iterate) + * @param cls the handle to the DV API + * @param rm the message that was received */ static int -cleanup_send_cb (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) +check_received (void *cls, + const struct GNUNET_DV_ReceivedMessage *rm) { struct GNUNET_DV_ServiceHandle *sh = cls; - struct ConnectedPeer *peer = value; - struct GNUNET_DV_TransmitHandle *th; + const struct GNUNET_MessageHeader *payload; - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_remove (sh->peers, - key, - peer)); - sh->disconnect_cb (sh->cls, - key); - while (NULL != (th = peer->head)) + if (NULL == + GNUNET_CONTAINER_multipeermap_get (sh->peers, + &rm->sender)) { - GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, th); - th->cb (th->cb_cls); - GNUNET_free (th); + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (ntohs (rm->header.size) - sizeof (struct GNUNET_DV_ReceivedMessage) < + sizeof (*payload)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + payload = (const struct GNUNET_MessageHeader *) &rm[1]; + if (ntohs (rm->header.size) != + sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size)) + { + GNUNET_break (0); + return GNUNET_SYSERR; } - GNUNET_free (peer); return GNUNET_OK; } @@ -321,211 +271,38 @@ cleanup_send_cb (void *cls, * Parse it out and give it to the plugin. * * @param cls the handle to the DV API - * @param msg the message that was received + * @param rm the message that was received */ static void -handle_message_receipt (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_received (void *cls, + const struct GNUNET_DV_ReceivedMessage *rm) { struct GNUNET_DV_ServiceHandle *sh = cls; - const struct GNUNET_DV_ConnectMessage *cm; - const struct GNUNET_DV_DistanceUpdateMessage *dum; - const struct GNUNET_DV_DisconnectMessage *dm; - const struct GNUNET_DV_ReceivedMessage *rm; const struct GNUNET_MessageHeader *payload; - const struct GNUNET_DV_AckMessage *ack; - struct GNUNET_DV_TransmitHandle *th; - struct GNUNET_DV_TransmitHandle *tn; - struct ConnectedPeer *peer; - if (NULL == msg) - { - /* Connection closed */ - reconnect (sh); - return; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u with %u bytes from DV service\n", - (unsigned int) ntohs (msg->type), - (unsigned int) ntohs (msg->size)); - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_DV_CONNECT: - if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ConnectMessage)) - { - GNUNET_break (0); - reconnect (sh); - return; - } - cm = (const struct GNUNET_DV_ConnectMessage *) msg; - peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, - &cm->peer); - if (NULL != peer) - { - GNUNET_break (0); - reconnect (sh); - return; - } - peer = GNUNET_new (struct ConnectedPeer); - peer->pid = cm->peer; - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_put (sh->peers, - &peer->pid, - peer, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - sh->connect_cb (sh->cls, - &cm->peer, - ntohl (cm->distance), - (enum GNUNET_ATS_Network_Type) ntohl (cm->network)); - break; - case GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED: - if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DistanceUpdateMessage)) - { - GNUNET_break (0); - reconnect (sh); - return; - } - dum = (const struct GNUNET_DV_DistanceUpdateMessage *) msg; - sh->distance_cb (sh->cls, - &dum->peer, - ntohl (dum->distance), - (enum GNUNET_ATS_Network_Type) ntohl (dum->network)); - break; - case GNUNET_MESSAGE_TYPE_DV_DISCONNECT: - if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DisconnectMessage)) - { - GNUNET_break (0); - reconnect (sh); - return; - } - dm = (const struct GNUNET_DV_DisconnectMessage *) msg; - peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, - &dm->peer); - if (NULL == peer) - { - GNUNET_break (0); - reconnect (sh); - return; - } - tn = sh->th_head; - while (NULL != (th = tn)) - { - tn = th->next; - if (peer == th->target) - { - GNUNET_CONTAINER_DLL_remove (sh->th_head, - sh->th_tail, - th); - th->cb (th->cb_cls); - GNUNET_free (th); - } - } - cleanup_send_cb (sh, &dm->peer, peer); - break; - case GNUNET_MESSAGE_TYPE_DV_RECV: - if (ntohs (msg->size) < sizeof (struct GNUNET_DV_ReceivedMessage) + sizeof (struct GNUNET_MessageHeader)) - { - GNUNET_break (0); - reconnect (sh); - return; - } - rm = (const struct GNUNET_DV_ReceivedMessage *) msg; - payload = (const struct GNUNET_MessageHeader *) &rm[1]; - if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size)) - { - GNUNET_break (0); - reconnect (sh); - return; - } - if (NULL == - GNUNET_CONTAINER_multipeermap_get (sh->peers, - &rm->sender)) - { - GNUNET_break (0); - reconnect (sh); - return; - } - sh->message_cb (sh->cls, - &rm->sender, - ntohl (rm->distance), - payload); - break; - case GNUNET_MESSAGE_TYPE_DV_SEND_ACK: - case GNUNET_MESSAGE_TYPE_DV_SEND_NACK: - if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage)) - { - GNUNET_break (0); - reconnect (sh); - return; - } - ack = (const struct GNUNET_DV_AckMessage *) msg; - peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, - &ack->target); - if (NULL == peer) - break; /* this happens, just ignore */ - for (th = peer->head; NULL != th; th = th->next) - { - if (th->uid != ntohl (ack->uid)) - continue; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Matched ACK for message to peer %s\n", - GNUNET_i2s (&ack->target)); - GNUNET_CONTAINER_DLL_remove (peer->head, - peer->tail, - th); - th->cb (th->cb_cls); - GNUNET_free (th); - break; - } - break; - default: - reconnect (sh); - break; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message, continuing receive loop for %p\n", - sh->client); - GNUNET_CLIENT_receive (sh->client, - &handle_message_receipt, sh, - GNUNET_TIME_UNIT_FOREVER_REL); + payload = (const struct GNUNET_MessageHeader *) &rm[1]; + sh->message_cb (sh->cls, + &rm->sender, + ntohl (rm->distance), + payload); } /** - * Transmit the start message to the DV service. + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param cls the `struct GNUNET_DV_ServiceHandle *` - * @param size number of bytes available in buf - * @param buf where to copy the message - * @return number of bytes written to buf + * @param cls closure with the `struct GNUNET_DV_ServiceHandle *` + * @param error error code */ -static size_t -transmit_start (void *cls, - size_t size, - void *buf) +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { struct GNUNET_DV_ServiceHandle *sh = cls; - struct GNUNET_MessageHeader start_message; - sh->th = NULL; - if (NULL == buf) - { - GNUNET_break (0); - reconnect (sh); - return 0; - } - GNUNET_assert (size >= sizeof (start_message)); - start_message.size = htons (sizeof (struct GNUNET_MessageHeader)); - start_message.type = htons (GNUNET_MESSAGE_TYPE_DV_START); - memcpy (buf, &start_message, sizeof (start_message)); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting START request, starting receive loop for %p\n", - sh->client); - GNUNET_CLIENT_receive (sh->client, - &handle_message_receipt, sh, - GNUNET_TIME_UNIT_FOREVER_REL); - start_transmit (sh); - return sizeof (start_message); + reconnect (sh); } @@ -537,36 +314,52 @@ transmit_start (void *cls, static void reconnect (struct GNUNET_DV_ServiceHandle *sh) { - if (NULL != sh->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th); - sh->th = NULL; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Disconnecting from DV service at %p\n", - sh->client); - if (NULL != sh->client) + GNUNET_MQ_hd_fixed_size (connect, + GNUNET_MESSAGE_TYPE_DV_CONNECT, + struct GNUNET_DV_ConnectMessage); + GNUNET_MQ_hd_fixed_size (disconnect, + GNUNET_MESSAGE_TYPE_DV_DISCONNECT, + struct GNUNET_DV_DisconnectMessage); + GNUNET_MQ_hd_fixed_size (distance_update, + GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED, + struct GNUNET_DV_DistanceUpdateMessage); + GNUNET_MQ_hd_var_size (received, + GNUNET_MESSAGE_TYPE_DV_RECV, + struct GNUNET_DV_ReceivedMessage); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_connect_handler (sh), + make_disconnect_handler (sh), + make_distance_update_handler (sh), + make_received_handler (sh), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MessageHeader *sm; + struct GNUNET_MQ_Envelope *env; + + if (NULL != sh->mq) { - GNUNET_CLIENT_disconnect (sh->client); - sh->client = NULL; + GNUNET_MQ_destroy (sh->mq); + sh->mq = NULL; } GNUNET_CONTAINER_multipeermap_iterate (sh->peers, &cleanup_send_cb, sh); LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to DV service\n"); - sh->client = GNUNET_CLIENT_connect ("dv", sh->cfg); - if (NULL == sh->client) + sh->mq = GNUNET_CLIENT_connecT (sh->cfg, + "dv", + handlers, + &mq_error_handler, + sh); + if (NULL == sh->mq) { GNUNET_break (0); return; } - sh->th = GNUNET_CLIENT_notify_transmit_ready (sh->client, - sizeof (struct GNUNET_MessageHeader), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &transmit_start, - sh); + env = GNUNET_MQ_msg (sm, + GNUNET_MESSAGE_TYPE_DV_START); + GNUNET_MQ_send (sh->mq, + env); } @@ -598,7 +391,8 @@ GNUNET_DV_service_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, sh->distance_cb = distance_cb; sh->disconnect_cb = disconnect_cb; sh->message_cb = message_cb; - sh->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES); + sh->peers = GNUNET_CONTAINER_multipeermap_create (128, + GNUNET_YES); reconnect (sh); return sh; } @@ -612,26 +406,12 @@ GNUNET_DV_service_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh) { - struct GNUNET_DV_TransmitHandle *pos; - if (NULL == sh) return; - if (NULL != sh->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th); - sh->th = NULL; - } - while (NULL != (pos = sh->th_head)) - { - GNUNET_CONTAINER_DLL_remove (sh->th_head, - sh->th_tail, - pos); - GNUNET_free (pos); - } - if (NULL != sh->client) + if (NULL != sh->mq) { - GNUNET_CLIENT_disconnect (sh->client); - sh->client = NULL; + GNUNET_MQ_destroy (sh->mq); + sh->mq = NULL; } GNUNET_CONTAINER_multipeermap_iterate (sh->peers, &cleanup_send_cb, @@ -647,87 +427,40 @@ GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh) * @param sh service handle * @param target intended recpient * @param msg message payload - * @param cb function to invoke when done - * @param cb_cls closure for @a cb - * @return handle to cancel the operation */ -struct GNUNET_DV_TransmitHandle * +void GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh, const struct GNUNET_PeerIdentity *target, - const struct GNUNET_MessageHeader *msg, - GNUNET_DV_MessageSentCallback cb, - void *cb_cls) + const struct GNUNET_MessageHeader *msg) { - struct GNUNET_DV_TransmitHandle *th; struct GNUNET_DV_SendMessage *sm; struct ConnectedPeer *peer; + struct GNUNET_MQ_Envelope *env; - if (ntohs (msg->size) + sizeof (struct GNUNET_DV_SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + if (ntohs (msg->size) + sizeof (*sm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { GNUNET_break (0); - return NULL; + return; } LOG (GNUNET_ERROR_TYPE_DEBUG, - "Asked to send %u bytes of type %u to %s via %p\n", + "Asked to send %u bytes of type %u to %s\n", (unsigned int) ntohs (msg->size), (unsigned int) ntohs (msg->type), - GNUNET_i2s (target), - sh->client); + GNUNET_i2s (target)); peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, target); if (NULL == peer) { GNUNET_break (0); - return NULL; + return; } - th = GNUNET_malloc (sizeof (struct GNUNET_DV_TransmitHandle) + - sizeof (struct GNUNET_DV_SendMessage) + - ntohs (msg->size)); - th->sh = sh; - th->target = peer; - th->cb = cb; - th->cb_cls = cb_cls; - th->msg = (const struct GNUNET_MessageHeader *) &th[1]; - sm = (struct GNUNET_DV_SendMessage *) &th[1]; - sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND); - sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) + - ntohs (msg->size)); - if (0 == sh->uid_gen) - sh->uid_gen = 1; - th->uid = sh->uid_gen; - sm->uid = htonl (sh->uid_gen++); - /* use memcpy here as 'target' may not be sufficiently aligned */ - memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity)); - memcpy (&sm[1], msg, ntohs (msg->size)); - GNUNET_CONTAINER_DLL_insert_tail (sh->th_head, - sh->th_tail, - th); - start_transmit (sh); - return th; -} - - -/** - * Abort send operation (naturally, the message may have - * already been transmitted; this only stops the 'cb' - * from being called again). - * - * @param th send operation to cancel - */ -void -GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th) -{ - struct GNUNET_DV_ServiceHandle *sh = th->sh; - - if (NULL == th->msg) - GNUNET_CONTAINER_DLL_remove (th->target->head, - th->target->tail, - th); - else - GNUNET_CONTAINER_DLL_remove (sh->th_head, - sh->th_tail, - th); - GNUNET_free (th); + GNUNET_assert (NULL != sh->mq); + env = GNUNET_MQ_msg_nested_mh (sm, + GNUNET_MESSAGE_TYPE_DV_SEND, + msg); + sm->target = *target; + GNUNET_MQ_send (sh->mq, + env); } diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index af6ddb3d9..d103612a8 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c @@ -149,11 +149,6 @@ struct PendingMessage */ struct GNUNET_PeerIdentity next_target; - /** - * Unique ID of the message. - */ - uint32_t uid; - }; @@ -479,33 +474,6 @@ send_control_to_plugin (const struct GNUNET_MessageHeader *message) } -/** - * Give an (N)ACK message to the plugin, we transmitted a message for it. - * - * @param target peer that received the message - * @param uid plugin-chosen UID for the message - * @param nack #GNUNET_NO to send ACK, #GNUNET_YES to send NACK - */ -static void -send_ack_to_plugin (const struct GNUNET_PeerIdentity *target, - uint32_t uid, - int nack) -{ - struct GNUNET_DV_AckMessage ack_msg; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Delivering ACK for message to peer `%s'\n", - GNUNET_i2s (target)); - ack_msg.header.size = htons (sizeof (ack_msg)); - ack_msg.header.type = htons ((GNUNET_YES == nack) - ? GNUNET_MESSAGE_TYPE_DV_SEND_NACK - : GNUNET_MESSAGE_TYPE_DV_SEND_ACK); - ack_msg.uid = htonl (uid); - ack_msg.target = *target; - send_control_to_plugin (&ack_msg.header); -} - - /** * Send a DISTANCE_CHANGED message to the plugin. * @@ -613,16 +581,6 @@ core_transmit_notify (void *cls, size_t size, void *buf) dn->pm_tail, pending); memcpy (&cbuf[off], pending->msg, msize); - if (0 != pending->uid) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Acking transmission of %u bytes to %s with plugin\n", - (unsigned int) msize, - GNUNET_i2s (&pending->next_target)); - send_ack_to_plugin (&pending->next_target, - pending->uid, - GNUNET_NO); - } GNUNET_free (pending); off += msize; } @@ -649,7 +607,6 @@ core_transmit_notify (void *cls, size_t size, void *buf) * * @param target where to send the message * @param distance distance to the @a sender - * @param uid unique ID for the message * @param sender original sender of the message * @param actual_target ultimate recipient for the message * @param payload payload of the message @@ -657,7 +614,6 @@ core_transmit_notify (void *cls, size_t size, void *buf) static void forward_payload (struct DirectNeighbor *target, uint32_t distance, - uint32_t uid, const struct GNUNET_PeerIdentity *sender, const struct GNUNET_PeerIdentity *actual_target, const struct GNUNET_MessageHeader *payload) @@ -667,7 +623,6 @@ forward_payload (struct DirectNeighbor *target, size_t msize; if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) && - (0 == uid) && (0 != memcmp (sender, &my_identity, sizeof (struct GNUNET_PeerIdentity))) ) @@ -686,7 +641,6 @@ forward_payload (struct DirectNeighbor *target, } pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); pm->next_target = target->peer; - pm->uid = uid; pm->msg = (const struct GNUNET_MessageHeader *) &pm[1]; rm = (struct RouteMessage *) &pm[1]; rm->header.size = htons ((uint16_t) msize); @@ -1888,7 +1842,6 @@ handle_dv_route_message (void *cls, GNUNET_i2s (&neighbor->peer)); forward_payload (neighbor, distance + 1, - 0, &rm->sender, &rm->target, payload); @@ -1920,7 +1873,6 @@ handle_dv_send_message (void *cls, return; } msg = (const struct GNUNET_DV_SendMessage *) message; - GNUNET_break (0 != ntohl (msg->uid)); payload = (const struct GNUNET_MessageHeader *) &msg[1]; if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size)) { @@ -1940,7 +1892,6 @@ handle_dv_send_message (void *cls, GNUNET_STATISTICS_update (stats, "# local messages discarded (no route)", 1, GNUNET_NO); - send_ack_to_plugin (&msg->target, ntohl (msg->uid), GNUNET_YES); GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } @@ -1952,7 +1903,6 @@ handle_dv_send_message (void *cls, forward_payload (route->next_hop, 0 /* first hop, distance is zero */, - htonl (msg->uid), &my_identity, &msg->target, payload); diff --git a/src/dv/plugin_transport_dv.c b/src/dv/plugin_transport_dv.c index 7293d9fb8..0c72cea3f 100644 --- a/src/dv/plugin_transport_dv.c +++ b/src/dv/plugin_transport_dv.c @@ -44,51 +44,6 @@ struct Plugin; -/** - * An active request for transmission via DV. - */ -struct PendingRequest -{ - - /** - * This is a DLL. - */ - struct PendingRequest *next; - - /** - * This is a DLL. - */ - struct PendingRequest *prev; - - /** - * Continuation function to call once the transmission buffer - * has again space available. NULL if there is no - * continuation to call. - */ - GNUNET_TRANSPORT_TransmitContinuation transmit_cont; - - /** - * Closure for @e transmit_cont. - */ - void *transmit_cont_cls; - - /** - * Transmission handle from DV client library. - */ - struct GNUNET_DV_TransmitHandle *th; - - /** - * Session of this request. - */ - struct GNUNET_ATS_Session *session; - - /** - * Number of bytes to transmit. - */ - size_t size; -}; - - /** * Session handle for connections. */ @@ -99,16 +54,6 @@ struct GNUNET_ATS_Session */ struct Plugin *plugin; - /** - * Head of pending requests. - */ - struct PendingRequest *pr_head; - - /** - * Tail of pending requests. - */ - struct PendingRequest *pr_tail; - /** * Address we use for the other peer. */ @@ -449,7 +394,6 @@ static void free_session (struct GNUNET_ATS_Session *session) { struct Plugin *plugin = session->plugin; - struct PendingRequest *pr; GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (plugin->sessions, @@ -470,20 +414,6 @@ free_session (struct GNUNET_ATS_Session *session) session); session->active = GNUNET_NO; } - while (NULL != (pr = session->pr_head)) - { - GNUNET_CONTAINER_DLL_remove (session->pr_head, - session->pr_tail, - pr); - GNUNET_DV_send_cancel (pr->th); - pr->th = NULL; - if (NULL != pr->transmit_cont) - pr->transmit_cont (pr->transmit_cont_cls, - &session->sender, - GNUNET_SYSERR, - pr->size, 0); - GNUNET_free (pr); - } GNUNET_HELLO_address_free (session->address); GNUNET_free (session); } @@ -514,31 +444,6 @@ handle_dv_disconnect (void *cls, } -/** - * Function called once the delivery of a message has been successful. - * Clean up the pending request, and call continuations. - * - * @param cls closure - */ -static void -send_finished (void *cls) -{ - struct PendingRequest *pr = cls; - struct GNUNET_ATS_Session *session = pr->session; - - pr->th = NULL; - GNUNET_CONTAINER_DLL_remove (session->pr_head, - session->pr_tail, - pr); - if (NULL != pr->transmit_cont) - pr->transmit_cont (pr->transmit_cont_cls, - &session->sender, - GNUNET_OK, - pr->size, 0); - GNUNET_free (pr); -} - - /** * Function that can be used by the transport service to transmit * a message using the plugin. @@ -565,10 +470,10 @@ dv_plugin_send (void *cls, size_t msgbuf_size, unsigned int priority, struct GNUNET_TIME_Relative timeout, - GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) + GNUNET_TRANSPORT_TransmitContinuation cont, + void *cont_cls) { struct Plugin *plugin = cls; - struct PendingRequest *pr; const struct GNUNET_MessageHeader *msg; struct GNUNET_MessageHeader *box; @@ -585,20 +490,13 @@ dv_plugin_send (void *cls, memcpy (&box[1], msgbuf, msgbuf_size); msg = box; } - pr = GNUNET_new (struct PendingRequest); - pr->transmit_cont = cont; - pr->transmit_cont_cls = cont_cls; - pr->session = session; - pr->size = msgbuf_size; - GNUNET_CONTAINER_DLL_insert_tail (session->pr_head, - session->pr_tail, - pr); - - pr->th = GNUNET_DV_send (plugin->dvh, - &session->sender, - msg, - &send_finished, - pr); + GNUNET_DV_send (plugin->dvh, + &session->sender, + msg); + cont (cont_cls, + &session->sender, + GNUNET_OK, + msgbuf_size, 0); GNUNET_free_non_null (box); return 0; /* DV */ } @@ -618,26 +516,11 @@ dv_plugin_disconnect_peer (void *cls, { struct Plugin *plugin = cls; struct GNUNET_ATS_Session *session; - struct PendingRequest *pr; session = GNUNET_CONTAINER_multipeermap_get (plugin->sessions, target); if (NULL == session) return; /* nothing to do */ - while (NULL != (pr = session->pr_head)) - { - GNUNET_CONTAINER_DLL_remove (session->pr_head, - session->pr_tail, - pr); - GNUNET_DV_send_cancel (pr->th); - pr->th = NULL; - if (NULL != pr->transmit_cont) - pr->transmit_cont (pr->transmit_cont_cls, - &session->sender, - GNUNET_SYSERR, - pr->size, 0); - GNUNET_free (pr); - } session->active = GNUNET_NO; } @@ -655,22 +538,6 @@ static int dv_plugin_disconnect_session (void *cls, struct GNUNET_ATS_Session *session) { - struct PendingRequest *pr; - - while (NULL != (pr = session->pr_head)) - { - GNUNET_CONTAINER_DLL_remove (session->pr_head, - session->pr_tail, - pr); - GNUNET_DV_send_cancel (pr->th); - pr->th = NULL; - if (NULL != pr->transmit_cont) - pr->transmit_cont (pr->transmit_cont_cls, - &session->sender, - GNUNET_SYSERR, - pr->size, 0); - GNUNET_free (pr); - } session->active = GNUNET_NO; return GNUNET_OK; } @@ -691,9 +558,11 @@ dv_plugin_disconnect_session (void *cls, * @param asc_cls closure for @a asc */ static void -dv_plugin_address_pretty_printer (void *cls, const char *type, +dv_plugin_address_pretty_printer (void *cls, + const char *type, const void *addr, - size_t addrlen, int numeric, + size_t addrlen, + int numeric, struct GNUNET_TIME_Relative timeout, GNUNET_TRANSPORT_AddressStringCallback asc, void *asc_cls) diff --git a/src/include/gnunet_dv_service.h b/src/include/gnunet_dv_service.h index f58311f74..bc5927517 100644 --- a/src/include/gnunet_dv_service.h +++ b/src/include/gnunet_dv_service.h @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2013 GNUnet e.V. + Copyright (C) 2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -153,27 +153,12 @@ struct GNUNET_DV_TransmitHandle; * @param sh service handle * @param target intended recpient * @param msg message payload - * @param cb function to invoke when done - * @param cb_cls closure for 'cb' * @return handle to cancel the operation */ -struct GNUNET_DV_TransmitHandle * +void GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh, const struct GNUNET_PeerIdentity *target, - const struct GNUNET_MessageHeader *msg, - GNUNET_DV_MessageSentCallback cb, - void *cb_cls); - - -/** - * Abort send operation (naturally, the message may have - * already been transmitted; this only stops the 'cb' - * from being called again). - * - * @param th send operation to cancel - */ -void -GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th); + const struct GNUNET_MessageHeader *msg); #endif -- 2.25.1