X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fdv%2Fdv_api.c;h=e3ba995c3bfce277420bd54f9fb519bbe46490eb;hb=3b680a20ab2cbb98cfa658d85be7a44baaf95d2c;hp=d4c07ecdcf9b7a0ea1fe7b6e4ee2d8b3dc03bfdf;hpb=16a6919a9f98ee9fa1fee9dd262906c321004a19;p=oweals%2Fgnunet.git diff --git a/src/dv/dv_api.c b/src/dv/dv_api.c index d4c07ecdc..e3ba995c3 100644 --- a/src/dv/dv_api.c +++ b/src/dv/dv_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009, 2010 Christian Grothoff (and other contributing authors) + Copyright (C) 2009--2013 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** @@ -25,314 +25,297 @@ * @author Nathan Evans */ #include "platform.h" -#include "gnunet_bandwidth_lib.h" -#include "gnunet_client_lib.h" -#include "gnunet_constants.h" -#include "gnunet_container_lib.h" -#include "gnunet_arm_service.h" -#include "gnunet_hello_lib.h" -#include "gnunet_protocols.h" -#include "gnunet_server_lib.h" -#include "gnunet_time_lib.h" +#include "gnunet_util_lib.h" #include "gnunet_dv_service.h" +#include "gnunet_protocols.h" #include "dv.h" #include "gnunet_transport_plugin.h" +#define LOG(kind,...) GNUNET_log_from (kind, "dv-api",__VA_ARGS__) + + /** - * Store ready to send messages + * Information we track for each peer. */ -struct PendingMessages +struct ConnectedPeer; + + +/** + * Handle for a send operation. + */ +struct GNUNET_DV_TransmitHandle { /** - * Linked list of pending messages + * Kept in a DLL. + */ + struct GNUNET_DV_TransmitHandle *next; + + /** + * Kept in a DLL. */ - struct PendingMessages *next; + struct GNUNET_DV_TransmitHandle *prev; /** - * Message that is pending + * Handle to the service. */ - struct GNUNET_DV_SendMessage *msg; + struct GNUNET_DV_ServiceHandle *sh; /** - * Timeout for this message + * Function to call upon completion. */ - struct GNUNET_TIME_Absolute timeout; + GNUNET_DV_MessageSentCallback cb; + + /** + * Closure for @a cb. + */ + void *cb_cls; + + /** + * The actual message (allocated at the end of this struct). + */ + const struct GNUNET_MessageHeader *msg; + + /** + * Destination for the message. + */ + struct ConnectedPeer *target; + + /** + * UID of our message, if any. + */ + uint32_t uid; }; + /** - * Handle for the service. + * Information we track for each peer. */ -struct GNUNET_DV_Handle +struct ConnectedPeer { /** - * Configuration to use. + * Identity of the peer. */ - const struct GNUNET_CONFIGURATION_Handle *cfg; + struct GNUNET_PeerIdentity pid; /** - * Socket (if available). + * Head of DLL of transmission handles where we need + * to invoke a continuation when we are informed about + * successful transmission. The respective request + * has already been sent to the DV service. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_DV_TransmitHandle *head; /** - * Currently pending transmission request. + * Tail of DLL of transmission handles where we need + * to invoke a continuation when we are informed about + * successful transmission. The respective request + * has already been sent to the DV service. */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_DV_TransmitHandle *tail; + +}; + + +/** + * Handle to the DV service. + */ +struct GNUNET_DV_ServiceHandle +{ /** - * List of the currently pending messages for the DV service. + * Connection to DV service. */ - struct PendingMessages *pending_list; + struct GNUNET_CLIENT_Connection *client; /** - * Message we are currently sending. + * Active request for transmission to DV service. */ - struct PendingMessages *current; + struct GNUNET_CLIENT_TransmitHandle *th; /** - * Handler for messages we receive from the DV service + * Our configuration. */ - GNUNET_DV_MessageReceivedHandler receive_handler; + const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Closure for the receive handler + * Closure for the callbacks. */ - void *receive_cls; + void *cls; /** - * Current unique ID + * Function to call on connect events. */ - uint32_t uid_gen; + GNUNET_DV_ConnectCallback connect_cb; /** - * Hashmap containing outstanding send requests awaiting confirmation. + * Function to call on distance change events. */ - struct GNUNET_CONTAINER_MultiHashMap *send_callbacks; - -}; + GNUNET_DV_DistanceChangedCallback distance_cb; + /** + * Function to call on disconnect events. + */ + GNUNET_DV_DisconnectCallback disconnect_cb; -struct StartContext -{ /** - * Start message + * Function to call on receiving messages events. */ - struct GNUNET_MessageHeader *message; + GNUNET_DV_MessageReceivedCallback message_cb; /** - * Handle to service, in case of timeout + * Head of messages to transmit. */ - struct GNUNET_DV_Handle *handle; -}; + struct GNUNET_DV_TransmitHandle *th_head; -struct SendCallbackContext -{ /** - * The continuation to call once a message is confirmed sent (or failed) + * Tail of messages to transmit. */ - GNUNET_TRANSPORT_TransmitContinuation cont; + struct GNUNET_DV_TransmitHandle *th_tail; /** - * Closure to call with send continuation. + * Information tracked per connected peer. Maps peer + * identities to `struct ConnectedPeer` entries. */ - void *cont_cls; + struct GNUNET_CONTAINER_MultiPeerMap *peers; /** - * Target of the message. + * Current unique ID */ - struct GNUNET_PeerIdentity target; + uint32_t uid_gen; + }; -/** - * Convert unique ID to hash code. - * - * @param uid unique ID to convert - * @param hash set to uid (extended with zeros) - */ -static void -hash_from_uid (uint32_t uid, GNUNET_HashCode * hash) -{ - memset (hash, 0, sizeof (GNUNET_HashCode)); - *((uint32_t *) hash) = uid; -} /** - * Try to (re)connect to the dv service. - * - * @param ret handle to the (disconnected) dv service + * Disconnect and then reconnect to the DV service. * - * @return GNUNET_YES on success, GNUNET_NO on failure. + * @param sh service handle */ -static int -try_connect (struct GNUNET_DV_Handle *ret) -{ - if (ret->client != NULL) - return GNUNET_OK; - ret->client = GNUNET_CLIENT_connect ("dv", ret->cfg); - if (ret->client != NULL) - return GNUNET_YES; -#if DEBUG_DV_MESSAGES - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - _("Failed to connect to the dv service!\n")); -#endif - return GNUNET_NO; -} - static void -process_pending_message (struct GNUNET_DV_Handle *handle); +reconnect (struct GNUNET_DV_ServiceHandle *sh); + /** - * Send complete, schedule next + * Start sending messages from our queue to the service. * - * @param handle handle to the dv service - * @param code return code for send (unused) + * @param sh service handle */ static void -finish (struct GNUNET_DV_Handle *handle, int code) -{ - struct PendingMessages *pos = handle->current; +start_transmit (struct GNUNET_DV_ServiceHandle *sh); - handle->current = NULL; - process_pending_message (handle); - - GNUNET_free (pos->msg); - GNUNET_free (pos); -} /** - * Notification that we can send data + * Gives a message from our queue to the DV service. * - * @param cls handle to the dv service (struct GNUNET_DV_Handle) + * @param cls handle to the dv service (`struct GNUNET_DV_ServiceHandle`) * @param size how many bytes can we send * @param buf where to copy the message to send - * - * @return how many bytes we copied to buf + * @return how many bytes we copied to @a buf */ static size_t transmit_pending (void *cls, size_t size, void *buf) { - struct GNUNET_DV_Handle *handle = cls; + struct GNUNET_DV_ServiceHandle *sh = cls; + char *cbuf = buf; + struct GNUNET_DV_TransmitHandle *th; size_t ret; size_t tsize; -#if DEBUG_DV - if (handle->current != NULL) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "DV API: Transmit pending called with message type %d\n", - ntohs (handle->current->msg->header.type)); -#endif - - if (buf == NULL) + sh->th = NULL; + if (NULL == buf) { -#if DEBUG_DV - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "DV API: Transmit pending FAILED!\n\n\n"); -#endif - finish (handle, GNUNET_SYSERR); + reconnect (sh); return 0; } - handle->th = NULL; - ret = 0; - - if (handle->current != NULL) + while ( (NULL != (th = sh->th_head)) && + (size - ret >= (tsize = ntohs (th->msg->size)) )) { - tsize = ntohs (handle->current->msg->header.size); - if (size >= tsize) + GNUNET_CONTAINER_DLL_remove (sh->th_head, + sh->th_tail, + th); + memcpy (&cbuf[ret], th->msg, tsize); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Passing %u bytes of type %u to DV service\n", + tsize, + ntohs (th->msg->type)); + th->msg = NULL; + ret += tsize; + if (NULL != th->cb) { - memcpy (buf, handle->current->msg, tsize); -#if DEBUG_DV - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "DV API: Copied %d bytes into buffer!\n\n\n", tsize); -#endif - finish (handle, GNUNET_OK); - return tsize; + GNUNET_CONTAINER_DLL_insert_tail (th->target->head, + th->target->tail, + th); + } + else + { + GNUNET_free (th); } - } - + if (NULL != sh->th_head) + start_transmit (sh); return ret; } + /** - * Try to send messages from list of messages to send + * Start sending messages from our queue to the service. * - * @param handle handle to the distance vector service + * @param sh service handle */ static void -process_pending_message (struct GNUNET_DV_Handle *handle) +start_transmit (struct GNUNET_DV_ServiceHandle *sh) { - - if (handle->current != NULL) - return; /* action already pending */ - if (GNUNET_YES != try_connect (handle)) - { - finish (handle, GNUNET_SYSERR); + if (NULL != sh->th) return; - } - - /* schedule next action */ - handle->current = handle->pending_list; - if (NULL == handle->current) - { + if (NULL == sh->th_head) return; - } - handle->pending_list = handle->pending_list->next; - handle->current->next = NULL; - - if (NULL == - (handle->th = - GNUNET_CLIENT_notify_transmit_ready (handle->client, - ntohs (handle->current->msg-> - header.size), - handle->current->msg->timeout, - GNUNET_YES, &transmit_pending, - handle))) - { -#if DEBUG_DV - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit request to dv service.\n"); -#endif - finish (handle, GNUNET_SYSERR); - } + sh->th = + GNUNET_CLIENT_notify_transmit_ready (sh->client, + ntohs (sh->th_head->msg->size), + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_NO, + &transmit_pending, sh); } + /** - * Add a pending message to the linked list + * We got disconnected from the service and thus all of the + * pending send callbacks will never be confirmed. Clean up. * - * @param handle handle to the specified DV api - * @param msg the message to add to the list + * @param cls the 'struct GNUNET_DV_ServiceHandle' + * @param key a peer identity + * @param value a `struct ConnectedPeer` to clean up + * @return #GNUNET_OK (continue to iterate) */ -static void -add_pending (struct GNUNET_DV_Handle *handle, struct GNUNET_DV_SendMessage *msg) +static int +cleanup_send_cb (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { - struct PendingMessages *new_message; - struct PendingMessages *pos; - struct PendingMessages *last; - - new_message = GNUNET_malloc (sizeof (struct PendingMessages)); - new_message->msg = msg; - - if (handle->pending_list != NULL) + struct GNUNET_DV_ServiceHandle *sh = cls; + struct ConnectedPeer *peer = value; + struct GNUNET_DV_TransmitHandle *th; + + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (sh->peers, + key, + peer)); + sh->disconnect_cb (sh->cls, + key); + while (NULL != (th = peer->head)) { - pos = handle->pending_list; - while (pos != NULL) - { - last = pos; - pos = pos->next; - } - last->next = new_message; - } - else - { - handle->pending_list = new_message; + GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, th); + th->cb (th->cb_cls, GNUNET_SYSERR); + GNUNET_free (th); } - - process_pending_message (handle); + GNUNET_free (peer); + return GNUNET_OK; } + /** * Handles a message sent from the DV service to us. * Parse it out and give it to the plugin. @@ -340,290 +323,415 @@ add_pending (struct GNUNET_DV_Handle *handle, struct GNUNET_DV_SendMessage *msg) * @param cls the handle to the DV API * @param msg the message that was received */ -void -handle_message_receipt (void *cls, const struct GNUNET_MessageHeader *msg) +static void +handle_message_receipt (void *cls, + const struct GNUNET_MessageHeader *msg) { - struct GNUNET_DV_Handle *handle = cls; - struct GNUNET_DV_MessageReceived *received_msg; - struct GNUNET_DV_SendResultMessage *send_result_msg; - size_t packed_msg_len; - size_t sender_address_len; - char *sender_address; - char *packed_msg; - char *packed_msg_start; - GNUNET_HashCode uidhash; - struct SendCallbackContext *send_ctx; - - if (msg == NULL) + struct GNUNET_DV_ServiceHandle *sh = cls; + const struct GNUNET_DV_ConnectMessage *cm; + const struct GNUNET_DV_DistanceUpdateMessage *dum; + const struct GNUNET_DV_DisconnectMessage *dm; + const struct GNUNET_DV_ReceivedMessage *rm; + const struct GNUNET_MessageHeader *payload; + const struct GNUNET_DV_AckMessage *ack; + struct GNUNET_DV_TransmitHandle *th; + struct GNUNET_DV_TransmitHandle *tn; + struct ConnectedPeer *peer; + + if (NULL == msg) { -#if DEBUG_DV_MESSAGES - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: connection closed\n"); -#endif - return; /* Connection closed? */ + /* Connection closed */ + reconnect (sh); + return; } - - GNUNET_assert ((ntohs (msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE) - || (ntohs (msg->type) == - GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT)); - + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u with %u bytes from DV service\n", + (unsigned int) ntohs (msg->type), + (unsigned int) ntohs (msg->size)); switch (ntohs (msg->type)) { - case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE: - if (ntohs (msg->size) < sizeof (struct GNUNET_DV_MessageReceived)) + case GNUNET_MESSAGE_TYPE_DV_CONNECT: + if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ConnectMessage)) + { + GNUNET_break (0); + reconnect (sh); return; - - received_msg = (struct GNUNET_DV_MessageReceived *) msg; - packed_msg_len = ntohl (received_msg->msg_len); - sender_address_len = - ntohs (msg->size) - packed_msg_len - - sizeof (struct GNUNET_DV_MessageReceived); - GNUNET_assert (sender_address_len > 0); - sender_address = GNUNET_malloc (sender_address_len); - memcpy (sender_address, &received_msg[1], sender_address_len); - packed_msg_start = (char *) &received_msg[1]; - packed_msg = GNUNET_malloc (packed_msg_len); - memcpy (packed_msg, &packed_msg_start[sender_address_len], packed_msg_len); - -#if DEBUG_DV_MESSAGES - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "DV_API receive: packed message type: %d or %d\n", - ntohs (((struct GNUNET_MessageHeader *) packed_msg)->type), - ((struct GNUNET_MessageHeader *) packed_msg)->type); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "DV_API receive: message sender reported as %s\n", - GNUNET_i2s (&received_msg->sender)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: distance is %u\n", - ntohl (received_msg->distance)); -#endif - - handle->receive_handler (handle->receive_cls, &received_msg->sender, - packed_msg, packed_msg_len, - ntohl (received_msg->distance), sender_address, - sender_address_len); - - GNUNET_free (sender_address); + } + cm = (const struct GNUNET_DV_ConnectMessage *) msg; + peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, + &cm->peer); + if (NULL != peer) + { + GNUNET_break (0); + reconnect (sh); + return; + } + peer = GNUNET_new (struct ConnectedPeer); + peer->pid = cm->peer; + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_put (sh->peers, + &peer->pid, + peer, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + sh->connect_cb (sh->cls, + &cm->peer, + ntohl (cm->distance), + (enum GNUNET_ATS_Network_Type) ntohl (cm->network)); break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT: - if (ntohs (msg->size) < sizeof (struct GNUNET_DV_SendResultMessage)) + case GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED: + if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DistanceUpdateMessage)) + { + GNUNET_break (0); + reconnect (sh); return; - - send_result_msg = (struct GNUNET_DV_SendResultMessage *) msg; - hash_from_uid (ntohl (send_result_msg->uid), &uidhash); - send_ctx = - GNUNET_CONTAINER_multihashmap_get (handle->send_callbacks, &uidhash); - - if ((send_ctx != NULL) && (send_ctx->cont != NULL)) + } + dum = (const struct GNUNET_DV_DistanceUpdateMessage *) msg; + sh->distance_cb (sh->cls, + &dum->peer, + ntohl (dum->distance), + (enum GNUNET_ATS_Network_Type) ntohl (dum->network)); + break; + case GNUNET_MESSAGE_TYPE_DV_DISCONNECT: + if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DisconnectMessage)) { - if (ntohl (send_result_msg->result) == 0) - { - send_ctx->cont (send_ctx->cont_cls, &send_ctx->target, GNUNET_OK); - } - else + GNUNET_break (0); + reconnect (sh); + return; + } + dm = (const struct GNUNET_DV_DisconnectMessage *) msg; + peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, + &dm->peer); + if (NULL == peer) + { + GNUNET_break (0); + reconnect (sh); + return; + } + tn = sh->th_head; + while (NULL != (th = tn)) + { + tn = th->next; + if (peer == th->target) { - send_ctx->cont (send_ctx->cont_cls, &send_ctx->target, GNUNET_SYSERR); + GNUNET_CONTAINER_DLL_remove (sh->th_head, + sh->th_tail, + th); + th->cb (th->cb_cls, GNUNET_SYSERR); + GNUNET_free (th); } } - GNUNET_free_non_null (send_ctx); + cleanup_send_cb (sh, &dm->peer, peer); + break; + case GNUNET_MESSAGE_TYPE_DV_RECV: + if (ntohs (msg->size) < sizeof (struct GNUNET_DV_ReceivedMessage) + sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break (0); + reconnect (sh); + return; + } + rm = (const struct GNUNET_DV_ReceivedMessage *) msg; + payload = (const struct GNUNET_MessageHeader *) &rm[1]; + if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size)) + { + GNUNET_break (0); + reconnect (sh); + return; + } + if (NULL == + GNUNET_CONTAINER_multipeermap_get (sh->peers, + &rm->sender)) + { + GNUNET_break (0); + reconnect (sh); + return; + } + sh->message_cb (sh->cls, + &rm->sender, + ntohl (rm->distance), + payload); + break; + case GNUNET_MESSAGE_TYPE_DV_SEND_ACK: + case GNUNET_MESSAGE_TYPE_DV_SEND_NACK: + if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage)) + { + GNUNET_break (0); + reconnect (sh); + return; + } + ack = (const struct GNUNET_DV_AckMessage *) msg; + peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, + &ack->target); + if (NULL == peer) + break; /* this happens, just ignore */ + for (th = peer->head; NULL != th; th = th->next) + { + if (th->uid != ntohl (ack->uid)) + continue; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Matched ACK for message to peer %s\n", + GNUNET_i2s (&ack->target)); + GNUNET_CONTAINER_DLL_remove (peer->head, + peer->tail, + th); + th->cb (th->cb_cls, + (ntohs (ack->header.type) == GNUNET_MESSAGE_TYPE_DV_SEND_ACK) + ? GNUNET_OK + : GNUNET_SYSERR); + GNUNET_free (th); + break; + } break; default: + reconnect (sh); break; } - GNUNET_CLIENT_receive (handle->client, &handle_message_receipt, handle, + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received message, continuing receive loop for %p\n", + sh->client); + GNUNET_CLIENT_receive (sh->client, + &handle_message_receipt, sh, GNUNET_TIME_UNIT_FOREVER_REL); } + /** - * Send a message from the plugin to the DV service indicating that - * a message should be sent via DV to some peer. - * - * @param dv_handle the handle to the DV api - * @param target the final target of the message - * @param msgbuf the msg(s) to send - * @param msgbuf_size the size of msgbuf - * @param priority priority to pass on to core when sending the message - * @param timeout how long can this message be delayed (pass through to core) - * @param addr the address of this peer (internally known to DV) - * @param addrlen the length of the peer address - * @param cont continuation to call once the message has been sent (or failed) - * @param cont_cls closure for continuation + * Transmit the start message to the DV service. * + * @param cls the `struct GNUNET_DV_ServiceHandle *` + * @param size number of bytes available in buf + * @param buf where to copy the message + * @return number of bytes written to buf */ -int -GNUNET_DV_send (struct GNUNET_DV_Handle *dv_handle, - const struct GNUNET_PeerIdentity *target, const char *msgbuf, - size_t msgbuf_size, unsigned int priority, - struct GNUNET_TIME_Relative timeout, const void *addr, - size_t addrlen, GNUNET_TRANSPORT_TransmitContinuation cont, - void *cont_cls) +static size_t +transmit_start (void *cls, + size_t size, + void *buf) { - struct GNUNET_DV_SendMessage *msg; - struct SendCallbackContext *send_ctx; - char *end_of_message; - GNUNET_HashCode uidhash; - int msize; - -#if DEBUG_DV_MESSAGES - dv_handle->uid_gen = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); -#else - dv_handle->uid_gen++; -#endif - - msize = sizeof (struct GNUNET_DV_SendMessage) + addrlen + msgbuf_size; - msg = GNUNET_malloc (msize); - msg->header.size = htons (msize); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND); - memcpy (&msg->target, target, sizeof (struct GNUNET_PeerIdentity)); - msg->priority = htonl (priority); - msg->timeout = timeout; - msg->addrlen = htonl (addrlen); - msg->uid = htonl (dv_handle->uid_gen); - memcpy (&msg[1], addr, addrlen); - end_of_message = (char *) &msg[1]; - end_of_message = &end_of_message[addrlen]; - memcpy (end_of_message, msgbuf, msgbuf_size); - add_pending (dv_handle, msg); - send_ctx = GNUNET_malloc (sizeof (struct SendCallbackContext)); - send_ctx->cont = cont; - send_ctx->cont_cls = cont_cls; - memcpy (&send_ctx->target, target, sizeof (struct GNUNET_PeerIdentity)); - hash_from_uid (dv_handle->uid_gen, &uidhash); - GNUNET_CONTAINER_multihashmap_put (dv_handle->send_callbacks, &uidhash, - send_ctx, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + struct GNUNET_DV_ServiceHandle *sh = cls; + struct GNUNET_MessageHeader start_message; - return GNUNET_OK; + sh->th = NULL; + if (NULL == buf) + { + GNUNET_break (0); + reconnect (sh); + return 0; + } + GNUNET_assert (size >= sizeof (start_message)); + start_message.size = htons (sizeof (struct GNUNET_MessageHeader)); + start_message.type = htons (GNUNET_MESSAGE_TYPE_DV_START); + memcpy (buf, &start_message, sizeof (start_message)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Transmitting START request, starting receive loop for %p\n", + sh->client); + GNUNET_CLIENT_receive (sh->client, + &handle_message_receipt, sh, + GNUNET_TIME_UNIT_FOREVER_REL); + start_transmit (sh); + return sizeof (start_message); } + /** - * Callback to transmit a start message to - * the DV service, once we can send + * Disconnect and then reconnect to the DV service. * - * @param cls struct StartContext - * @param size how much can we send - * @param buf where to copy the message - * - * @return number of bytes copied to buf + * @param sh service handle */ -static size_t -transmit_start (void *cls, size_t size, void *buf) +static void +reconnect (struct GNUNET_DV_ServiceHandle *sh) { - struct StartContext *start_context = cls; - struct GNUNET_DV_Handle *handle = start_context->handle; - size_t tsize; - -#if DEBUG_DV - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "DV API: sending start request to service\n"); -#endif - if (buf == NULL) + if (NULL != sh->th) { - GNUNET_free (start_context->message); - GNUNET_free (start_context); - GNUNET_DV_disconnect (handle); - return 0; + GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th); + sh->th = NULL; } - - tsize = ntohs (start_context->message->size); - if (size >= tsize) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Disconnecting from DV service at %p\n", + sh->client); + if (NULL != sh->client) { - memcpy (buf, start_context->message, tsize); - GNUNET_free (start_context->message); - GNUNET_free (start_context); - GNUNET_CLIENT_receive (handle->client, &handle_message_receipt, handle, - GNUNET_TIME_UNIT_FOREVER_REL); - - - return tsize; + GNUNET_CLIENT_disconnect (sh->client); + sh->client = NULL; } - - return 0; + GNUNET_CONTAINER_multipeermap_iterate (sh->peers, + &cleanup_send_cb, + sh); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Connecting to DV service\n"); + sh->client = GNUNET_CLIENT_connect ("dv", sh->cfg); + if (NULL == sh->client) + { + GNUNET_break (0); + return; + } + sh->th = GNUNET_CLIENT_notify_transmit_ready (sh->client, + sizeof (struct GNUNET_MessageHeader), + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_YES, + &transmit_start, + sh); } + /** - * Connect to the DV service + * Connect to the DV service. * - * @param cfg the configuration to use - * @param receive_handler method call when on receipt from the service - * @param receive_handler_cls closure for receive_handler - * - * @return handle to the DV service + * @param cfg configuration + * @param cls closure for callbacks + * @param connect_cb function to call on connects + * @param distance_cb function to call if distances change + * @param disconnect_cb function to call on disconnects + * @param message_cb function to call if we receive messages + * @return handle to access the service */ -struct GNUNET_DV_Handle * -GNUNET_DV_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, - GNUNET_DV_MessageReceivedHandler receive_handler, - void *receive_handler_cls) +struct GNUNET_DV_ServiceHandle * +GNUNET_DV_service_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, + void *cls, + GNUNET_DV_ConnectCallback connect_cb, + GNUNET_DV_DistanceChangedCallback distance_cb, + GNUNET_DV_DisconnectCallback disconnect_cb, + GNUNET_DV_MessageReceivedCallback message_cb) { - struct GNUNET_DV_Handle *handle; - struct GNUNET_MessageHeader *start_message; - struct StartContext *start_context; + struct GNUNET_DV_ServiceHandle *sh; + + sh = GNUNET_new (struct GNUNET_DV_ServiceHandle); + sh->cfg = cfg; + sh->cls = cls; + sh->connect_cb = connect_cb; + sh->distance_cb = distance_cb; + sh->disconnect_cb = disconnect_cb; + sh->message_cb = message_cb; + sh->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES); + reconnect (sh); + return sh; +} - handle = GNUNET_malloc (sizeof (struct GNUNET_DV_Handle)); - handle->cfg = cfg; - handle->pending_list = NULL; - handle->current = NULL; - handle->th = NULL; - handle->client = GNUNET_CLIENT_connect ("dv", cfg); - handle->receive_handler = receive_handler; - handle->receive_cls = receive_handler_cls; +/** + * Disconnect from DV service. + * + * @param sh service handle + */ +void +GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh) +{ + struct GNUNET_DV_TransmitHandle *pos; - if (handle->client == NULL) + if (NULL == sh) + return; + if (NULL != sh->th) { - GNUNET_free (handle); - return NULL; + GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th); + sh->th = NULL; } - - start_message = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader)); - start_message->size = htons (sizeof (struct GNUNET_MessageHeader)); - start_message->type = htons (GNUNET_MESSAGE_TYPE_DV_START); - - start_context = GNUNET_malloc (sizeof (struct StartContext)); - start_context->handle = handle; - start_context->message = start_message; - GNUNET_CLIENT_notify_transmit_ready (handle->client, - sizeof (struct GNUNET_MessageHeader), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 60), - GNUNET_YES, &transmit_start, - start_context); - - handle->send_callbacks = GNUNET_CONTAINER_multihashmap_create (100); - - return handle; + while (NULL != (pos = sh->th_head)) + { + GNUNET_CONTAINER_DLL_remove (sh->th_head, + sh->th_tail, + pos); + GNUNET_free (pos); + } + if (NULL != sh->client) + { + GNUNET_CLIENT_disconnect (sh->client); + sh->client = NULL; + } + GNUNET_CONTAINER_multipeermap_iterate (sh->peers, + &cleanup_send_cb, + sh); + GNUNET_CONTAINER_multipeermap_destroy (sh->peers); + GNUNET_free (sh); } + /** - * Disconnect from the DV service + * Send a message via DV service. * - * @param handle the current handle to the service to disconnect + * @param sh service handle + * @param target intended recpient + * @param msg message payload + * @param cb function to invoke when done + * @param cb_cls closure for @a cb + * @return handle to cancel the operation */ -void -GNUNET_DV_disconnect (struct GNUNET_DV_Handle *handle) +struct GNUNET_DV_TransmitHandle * +GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh, + const struct GNUNET_PeerIdentity *target, + const struct GNUNET_MessageHeader *msg, + GNUNET_DV_MessageSentCallback cb, + void *cb_cls) { - struct PendingMessages *pos; - - GNUNET_assert (handle != NULL); + struct GNUNET_DV_TransmitHandle *th; + struct GNUNET_DV_SendMessage *sm; + struct ConnectedPeer *peer; - if (handle->th != NULL) /* We have a live transmit request in the Aether */ + if (ntohs (msg->size) + sizeof (struct GNUNET_DV_SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { - GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); - handle->th = NULL; - } - if (handle->current != NULL) /* We are trying to send something now, clean it up */ - GNUNET_free (handle->current); - while (NULL != (pos = handle->pending_list)) /* Remove all pending sends from the list */ - { - handle->pending_list = pos->next; - GNUNET_free (pos); + GNUNET_break (0); + return NULL; } - if (handle->client != NULL) /* Finally, disconnect from the service */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to send %u bytes of type %u to %s via %p\n", + (unsigned int) ntohs (msg->size), + (unsigned int) ntohs (msg->type), + GNUNET_i2s (target), + sh->client); + peer = GNUNET_CONTAINER_multipeermap_get (sh->peers, + target); + if (NULL == peer) { - GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); - handle->client = NULL; + GNUNET_break (0); + return NULL; } + th = GNUNET_malloc (sizeof (struct GNUNET_DV_TransmitHandle) + + sizeof (struct GNUNET_DV_SendMessage) + + ntohs (msg->size)); + th->sh = sh; + th->target = peer; + th->cb = cb; + th->cb_cls = cb_cls; + th->msg = (const struct GNUNET_MessageHeader *) &th[1]; + sm = (struct GNUNET_DV_SendMessage *) &th[1]; + sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND); + sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) + + ntohs (msg->size)); + if (0 == sh->uid_gen) + sh->uid_gen = 1; + th->uid = sh->uid_gen; + sm->uid = htonl (sh->uid_gen++); + /* use memcpy here as 'target' may not be sufficiently aligned */ + memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity)); + memcpy (&sm[1], msg, ntohs (msg->size)); + GNUNET_CONTAINER_DLL_insert_tail (sh->th_head, + sh->th_tail, + th); + start_transmit (sh); + return th; +} + + +/** + * Abort send operation (naturally, the message may have + * already been transmitted; this only stops the 'cb' + * from being called again). + * + * @param th send operation to cancel + */ +void +GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th) +{ + struct GNUNET_DV_ServiceHandle *sh = th->sh; - GNUNET_free (handle); + if (NULL == th->msg) + GNUNET_CONTAINER_DLL_remove (th->target->head, + th->target->tail, + th); + else + GNUNET_CONTAINER_DLL_remove (sh->th_head, + sh->th_tail, + th); + GNUNET_free (th); } + /* end of dv_api.c */