/*
This file is part of GNUnet.
- (C) 2009, 2010 Christian Grothoff (and other contributing authors)
+ (C) 2009--2013 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
+ by the Free Software Foundation; either version 3, or (at your
option) any later version.
GNUnet is distributed in the hope that it will be useful, but
* @file dv/dv_api.c
* @brief library to access the DV service
* @author Christian Grothoff
- * @author Not Nathan Evans
+ * @author Nathan Evans
*/
#include "platform.h"
-#include "gnunet_bandwidth_lib.h"
-#include "gnunet_client_lib.h"
-#include "gnunet_constants.h"
-#include "gnunet_container_lib.h"
-#include "gnunet_arm_service.h"
-#include "gnunet_hello_lib.h"
-#include "gnunet_protocols.h"
-#include "gnunet_server_lib.h"
-#include "gnunet_time_lib.h"
+#include "gnunet_util_lib.h"
#include "gnunet_dv_service.h"
+#include "gnunet_protocols.h"
#include "dv.h"
+#include "gnunet_transport_plugin.h"
+
+#define LOG(kind,...) GNUNET_log_from (kind, "dv-api",__VA_ARGS__)
+
+
+/**
+ * Information we track for each peer.
+ */
+struct ConnectedPeer;
-struct PendingMessages
+/**
+ * Handle for a send operation.
+ */
+struct GNUNET_DV_TransmitHandle
{
/**
- * Linked list of pending messages
+ * Kept in a DLL.
+ */
+ struct GNUNET_DV_TransmitHandle *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_DV_TransmitHandle *prev;
+
+ /**
+ * Handle to the service.
+ */
+ struct GNUNET_DV_ServiceHandle *sh;
+
+ /**
+ * Function to call upon completion.
+ */
+ GNUNET_DV_MessageSentCallback cb;
+
+ /**
+ * Closure for @a cb.
+ */
+ void *cb_cls;
+
+ /**
+ * The actual message (allocated at the end of this struct).
*/
- struct PendingMessages *next;
+ const struct GNUNET_MessageHeader *msg;
/**
- * Message that is pending
+ * Destination for the message.
*/
- struct GNUNET_DV_SendMessage *msg;
+ struct ConnectedPeer *target;
/**
- * Timeout for this message
+ * UID of our message, if any.
*/
- struct GNUNET_TIME_Absolute timeout;
+ uint32_t uid;
};
+
/**
- * Handle for the service.
+ * Information we track for each peer.
*/
-struct GNUNET_DV_Handle
+struct ConnectedPeer
{
+
/**
- * Our scheduler.
+ * Identity of the peer.
*/
- struct GNUNET_SCHEDULER_Handle *sched;
+ struct GNUNET_PeerIdentity pid;
/**
- * Configuration to use.
+ * Head of DLL of transmission handles where we need
+ * to invoke a continuation when we are informed about
+ * successful transmission. The respective request
+ * has already been sent to the DV service.
*/
- const struct GNUNET_CONFIGURATION_Handle *cfg;
+ struct GNUNET_DV_TransmitHandle *head;
/**
- * Socket (if available).
+ * Tail of DLL of transmission handles where we need
+ * to invoke a continuation when we are informed about
+ * successful transmission. The respective request
+ * has already been sent to the DV service.
+ */
+ struct GNUNET_DV_TransmitHandle *tail;
+
+};
+
+
+/**
+ * Handle to the DV service.
+ */
+struct GNUNET_DV_ServiceHandle
+{
+
+ /**
+ * Connection to DV service.
*/
struct GNUNET_CLIENT_Connection *client;
/**
- * Currently pending transmission request.
+ * Active request for transmission to DV service.
*/
struct GNUNET_CLIENT_TransmitHandle *th;
/**
- * List of the currently pending messages for the DV service.
+ * Our configuration.
+ */
+ const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+ /**
+ * Closure for the callbacks.
*/
- struct PendingMessages *pending_list;
+ void *cls;
/**
- * Message we are currently sending.
+ * Function to call on connect events.
*/
- struct PendingMessages *current;
+ GNUNET_DV_ConnectCallback connect_cb;
/**
- * Kill off the connection and any pending messages.
+ * Function to call on distance change events.
*/
- int do_destroy;
+ GNUNET_DV_DistanceChangedCallback distance_cb;
/**
- * Handler for messages we receive from the DV service
+ * Function to call on disconnect events.
*/
- GNUNET_DV_MessageReceivedHandler receive_handler;
+ GNUNET_DV_DisconnectCallback disconnect_cb;
/**
- * Closure for the receive handler
+ * Function to call on receiving messages events.
*/
- void *receive_cls;
+ GNUNET_DV_MessageReceivedCallback message_cb;
+
+ /**
+ * Head of messages to transmit.
+ */
+ struct GNUNET_DV_TransmitHandle *th_head;
+
+ /**
+ * Tail of messages to transmit.
+ */
+ struct GNUNET_DV_TransmitHandle *th_tail;
+
+ /**
+ * Information tracked per connected peer. Maps peer
+ * identities to `struct ConnectedPeer` entries.
+ */
+ struct GNUNET_CONTAINER_MultiPeerMap *peers;
+
+ /**
+ * Current unique ID
+ */
+ uint32_t uid_gen;
};
/**
- * Try to (re)connect to the dv service.
+ * Disconnect and then reconnect to the DV service.
*
- * @return GNUNET_YES on success, GNUNET_NO on failure.
+ * @param sh service handle
*/
-static int
-try_connect (struct GNUNET_DV_Handle *ret)
-{
- if (ret->client != NULL)
- return GNUNET_OK;
- ret->client = GNUNET_CLIENT_connect (ret->sched, "dv", ret->cfg);
- if (ret->client != NULL)
- return GNUNET_YES;
-#if DEBUG_STATISTICS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- _("Failed to connect to the dv service!\n"));
-#endif
- return GNUNET_NO;
-}
+static void
+reconnect (struct GNUNET_DV_ServiceHandle *sh);
-static void process_pending_message(struct GNUNET_DV_Handle *handle);
/**
- * Send complete, schedule next
+ * Gives a message from our queue to the DV service.
+ *
+ * @param cls handle to the dv service (struct GNUNET_DV_ServiceHandle)
+ * @param size how many bytes can we send
+ * @param buf where to copy the message to send
+ * @return how many bytes we copied to @a buf
*/
-static void
-finish (struct GNUNET_DV_Handle *handle, int code)
-{
- struct PendingMessages *pos = handle->current;
- handle->current = NULL;
- process_pending_message (handle);
-
- GNUNET_free (pos);
-}
-
-
static size_t
transmit_pending (void *cls, size_t size, void *buf)
{
- struct GNUNET_DV_Handle *handle = cls;
+ struct GNUNET_DV_ServiceHandle *sh = cls;
+ char *cbuf = buf;
+ struct GNUNET_DV_TransmitHandle *th;
size_t ret;
size_t tsize;
- if (buf == NULL)
- {
- finish(handle, GNUNET_SYSERR);
- return 0;
- }
- handle->th = NULL;
-
+ sh->th = NULL;
+ if (NULL == buf)
+ {
+ reconnect (sh);
+ return 0;
+ }
ret = 0;
-
- if (handle->current != NULL)
+ while ( (NULL != (th = sh->th_head)) &&
+ (size - ret >= (tsize = ntohs (th->msg->size)) ))
{
- tsize = ntohs(handle->current->msg->header.size);
- if (size >= tsize)
+ GNUNET_CONTAINER_DLL_remove (sh->th_head,
+ sh->th_tail,
+ th);
+ memcpy (&cbuf[ret], th->msg, tsize);
+ th->msg = NULL;
+ ret += tsize;
+ if (NULL != th->cb)
{
- memcpy(buf, handle->current->msg, tsize);
+ GNUNET_CONTAINER_DLL_insert (th->target->head,
+ th->target->tail,
+ th);
}
else
{
- return ret;
+ GNUNET_free (th);
}
}
-
return ret;
}
+
+/**
+ * Start sending messages from our queue to the service.
+ *
+ * @param sh service handle
+ */
+static void
+start_transmit (struct GNUNET_DV_ServiceHandle *sh)
+{
+ if (NULL != sh->th)
+ return;
+ if (NULL == sh->th_head)
+ return;
+ sh->th =
+ GNUNET_CLIENT_notify_transmit_ready (sh->client,
+ ntohs (sh->th_head->msg->size),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_NO,
+ &transmit_pending, sh);
+}
+
+
/**
- * Try to send messages from list of messages to send
+ * We got disconnected from the service and thus all of the
+ * pending send callbacks will never be confirmed. Clean up.
+ *
+ * @param cls the 'struct GNUNET_DV_ServiceHandle'
+ * @param key a peer identity
+ * @param value a `struct ConnectedPeer` to clean up
+ * @return #GNUNET_OK (continue to iterate)
*/
-static void process_pending_message(struct GNUNET_DV_Handle *handle)
+static int
+cleanup_send_cb (void *cls,
+ const struct GNUNET_PeerIdentity *key,
+ void *value)
{
- struct GNUNET_TIME_Relative timeout;
+ struct GNUNET_DV_ServiceHandle *sh = cls;
+ struct ConnectedPeer *peer = value;
+ struct GNUNET_DV_TransmitHandle *th;
+
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (sh->peers,
+ key,
+ peer));
+ sh->disconnect_cb (sh->cls,
+ key);
+ while (NULL != (th = peer->head))
+ {
+ GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, th);
+ th->cb (th->cb_cls, GNUNET_SYSERR);
+ GNUNET_free (th);
+ }
+ return GNUNET_OK;
+}
+
- if (handle->current != NULL)
- return; /* action already pending */
- if (GNUNET_YES != try_connect (handle))
+/**
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
+ *
+ * @param cls the handle to the DV API
+ * @param msg the message that was received
+ */
+static void
+handle_message_receipt (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_DV_ServiceHandle *sh = cls;
+ const struct GNUNET_DV_ConnectMessage *cm;
+ const struct GNUNET_DV_DistanceUpdateMessage *dum;
+ const struct GNUNET_DV_DisconnectMessage *dm;
+ const struct GNUNET_DV_ReceivedMessage *rm;
+ const struct GNUNET_MessageHeader *payload;
+ const struct GNUNET_DV_AckMessage *ack;
+ struct GNUNET_DV_TransmitHandle *th;
+ struct GNUNET_DV_TransmitHandle *tn;
+ struct ConnectedPeer *peer;
+
+ if (NULL == msg)
+ {
+ /* Connection closed */
+ reconnect (sh);
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u from DV service\n",
+ (unsigned int) msg->type);
+ switch (ntohs (msg->type))
+ {
+ case GNUNET_MESSAGE_TYPE_DV_CONNECT:
+ if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ConnectMessage))
{
- finish (handle, GNUNET_SYSERR);
+ GNUNET_break (0);
+ reconnect (sh);
return;
}
-
- /* schedule next action */
- handle->current = handle->pending_list;
- if (NULL == handle->current)
+ cm = (const struct GNUNET_DV_ConnectMessage *) msg;
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &cm->peer);
+ if (NULL != peer)
+ {
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
+ }
+ peer = GNUNET_new (struct ConnectedPeer);
+ peer->pid = cm->peer;
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_put (sh->peers,
+ &peer->pid,
+ peer,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ sh->connect_cb (sh->cls,
+ &cm->peer,
+ ntohl (cm->distance), ntohl (cm->network));
+ break;
+ case GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED:
+ if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DistanceUpdateMessage))
{
- if (handle->do_destroy)
- {
- handle->do_destroy = GNUNET_NO;
- //GNUNET_DV_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
- }
+ GNUNET_break (0);
+ reconnect (sh);
return;
}
- handle->pending_list = handle->pending_list->next;
- handle->current->next = NULL;
-
- timeout = GNUNET_TIME_absolute_get_remaining (handle->current->timeout);
- if (NULL ==
- (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
- ntohs(handle->current->msg->msgbuf_size),
- timeout,
- GNUNET_YES,
- &transmit_pending, handle)))
+ dum = (const struct GNUNET_DV_DistanceUpdateMessage *) msg;
+ sh->distance_cb (sh->cls,
+ &dum->peer,
+ ntohl (dum->distance));
+ break;
+ case GNUNET_MESSAGE_TYPE_DV_DISCONNECT:
+ if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DisconnectMessage))
{
-#if DEBUG_STATISTICS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to transmit request to dv service.\n");
-#endif
- finish (handle, GNUNET_SYSERR);
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
+ }
+ dm = (const struct GNUNET_DV_DisconnectMessage *) msg;
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &dm->peer);
+ if (NULL == peer)
+ {
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
+ }
+ tn = sh->th_head;
+ while (NULL != (th = tn))
+ {
+ tn = th->next;
+ if (peer == th->target)
+ {
+ GNUNET_CONTAINER_DLL_remove (sh->th_head,
+ sh->th_tail,
+ th);
+ th->cb (th->cb_cls, GNUNET_SYSERR);
+ GNUNET_free (th);
+ }
+ }
+ cleanup_send_cb (sh, &dm->peer, peer);
+ break;
+ case GNUNET_MESSAGE_TYPE_DV_RECV:
+ if (ntohs (msg->size) < sizeof (struct GNUNET_DV_ReceivedMessage) + sizeof (struct GNUNET_MessageHeader))
+ {
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
+ }
+ rm = (const struct GNUNET_DV_ReceivedMessage *) msg;
+ payload = (const struct GNUNET_MessageHeader *) &rm[1];
+ if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size))
+ {
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
}
+ sh->message_cb (sh->cls,
+ &rm->sender,
+ ntohl (rm->distance),
+ payload);
+ break;
+ case GNUNET_MESSAGE_TYPE_DV_SEND_ACK:
+ case GNUNET_MESSAGE_TYPE_DV_SEND_NACK:
+ if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage))
+ {
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
+ }
+ ack = (const struct GNUNET_DV_AckMessage *) msg;
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &ack->target);
+ if (NULL == peer)
+ return; /* this happens, just ignore */
+ for (th = peer->head; NULL != th; th = th->next)
+ {
+ if (th->uid != ntohl (ack->uid))
+ continue;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Matched ACK for message to peer %s\n",
+ GNUNET_i2s (&ack->target));
+ GNUNET_CONTAINER_DLL_remove (peer->head,
+ peer->tail,
+ th);
+ th->cb (th->cb_cls,
+ (ntohs (ack->header.type) == GNUNET_MESSAGE_TYPE_DV_SEND_ACK)
+ ? GNUNET_OK
+ : GNUNET_SYSERR);
+ GNUNET_free (th);
+ break;
+ }
+ break;
+ default:
+ reconnect (sh);
+ break;
+ }
+ GNUNET_CLIENT_receive (sh->client,
+ &handle_message_receipt, sh,
+ GNUNET_TIME_UNIT_FOREVER_REL);
}
+
/**
- * Add a pending message to the linked list
+ * Transmit the start message to the DV service.
*
- * @param handle handle to the specified DV api
- * @param msg the message to add to the list
+ * @param cls the `struct GNUNET_DV_ServiceHandle *`
+ * @param size number of bytes available in buf
+ * @param buf where to copy the message
+ * @return number of bytes written to buf
*/
-static void add_pending(struct GNUNET_DV_Handle *handle, struct GNUNET_DV_SendMessage *msg)
+static size_t
+transmit_start (void *cls,
+ size_t size,
+ void *buf)
{
- struct PendingMessages *new_message;
- struct PendingMessages *pos;
- struct PendingMessages *last;
-
- new_message = GNUNET_malloc(sizeof(struct PendingMessages));
- new_message->msg = msg;
-
- if (handle->pending_list != NULL)
- {
- pos = handle->pending_list;
- while(pos != NULL)
- {
- last = pos;
- pos = pos->next;
- }
- new_message->next = last->next; /* Should always be null */
- last->next = new_message;
- }
- else
- {
- new_message->next = handle->pending_list; /* Will always be null */
- handle->pending_list = new_message;
- }
+ struct GNUNET_DV_ServiceHandle *sh = cls;
+ struct GNUNET_MessageHeader start_message;
- process_pending_message(handle);
+ sh->th = NULL;
+ if (NULL == buf)
+ {
+ GNUNET_break (0);
+ reconnect (sh);
+ return 0;
+ }
+ GNUNET_assert (size >= sizeof (start_message));
+ start_message.size = htons (sizeof (struct GNUNET_MessageHeader));
+ start_message.type = htons (GNUNET_MESSAGE_TYPE_DV_START);
+ memcpy (buf, &start_message, sizeof (start_message));
+ GNUNET_CLIENT_receive (sh->client,
+ &handle_message_receipt, sh,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+ start_transmit (sh);
+ return sizeof (start_message);
}
-
-
-void handle_message_receipt (void *cls,
- const struct GNUNET_MessageHeader * msg)
+/**
+ * Disconnect and then reconnect to the DV service.
+ *
+ * @param sh service handle
+ */
+static void
+reconnect (struct GNUNET_DV_ServiceHandle *sh)
{
- struct GNUNET_DV_Handle *handle = cls;
- struct GNUNET_DV_MessageReceived *received_msg;
- size_t packed_msg_len;
- size_t sender_address_len;
- char *sender_address;
- char *packed_msg;
-
- if (msg == NULL)
+ if (NULL != sh->th)
{
- return; /* Connection closed? */
+ GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th);
+ sh->th = NULL;
}
-
- GNUNET_assert(ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE);
-
- if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived))
+ if (NULL != sh->client)
+ {
+ GNUNET_CLIENT_disconnect (sh->client);
+ sh->client = NULL;
+ }
+ GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
+ &cleanup_send_cb,
+ sh);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Connecting to DV service\n");
+ sh->client = GNUNET_CLIENT_connect ("dv", sh->cfg);
+ if (NULL == sh->client)
+ {
+ GNUNET_break (0);
return;
-
- received_msg = (struct GNUNET_DV_MessageReceived *)msg;
- packed_msg_len = ntohs(received_msg->msg_len);
- sender_address_len = ntohs(received_msg->sender_address_len);
- GNUNET_assert(ntohs(msg->size) == (sizeof(struct GNUNET_DV_MessageReceived) + packed_msg_len + sender_address_len));
-
- sender_address = GNUNET_malloc(sender_address_len);
- memcpy(sender_address, &received_msg[1], sender_address_len);
- packed_msg = GNUNET_malloc(packed_msg_len);
- memcpy(packed_msg, &received_msg[1 + sender_address_len], packed_msg_len);
-
- handle->receive_handler(handle->receive_cls,
- &received_msg->sender,
- packed_msg,
- packed_msg_len,
- ntohl(received_msg->distance),
- sender_address,
- sender_address_len);
-
- GNUNET_free(sender_address);
-
- GNUNET_CLIENT_receive (handle->client,
- &handle_message_receipt,
- handle, GNUNET_TIME_UNIT_FOREVER_REL);
+ }
+ sh->th = GNUNET_CLIENT_notify_transmit_ready (sh->client,
+ sizeof (struct GNUNET_MessageHeader),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_YES,
+ &transmit_start,
+ sh);
}
+
/**
- * Send a message from the plugin to the DV service indicating that
- * a message should be sent via DV to some peer.
- *
- * @param dv_handle the handle to the DV api
- * @param target the final target of the message
- * @param msgbuf the msg(s) to send
- * @param msgbuf_size the size of msgbuf
- * @param priority priority to pass on to core when sending the message
- * @param timeout how long can this message be delayed (pass through to core)
- * @param addr the address of this peer (internally known to DV)
- * @param addrlen the length of the peer address
+ * Connect to the DV service.
*
+ * @param cfg configuration
+ * @param cls closure for callbacks
+ * @param connect_cb function to call on connects
+ * @param distance_cb function to call if distances change
+ * @param disconnect_cb function to call on disconnects
+ * @param message_cb function to call if we receive messages
+ * @return handle to access the service
*/
-int GNUNET_DV_send (struct GNUNET_DV_Handle *dv_handle,
- const struct GNUNET_PeerIdentity *target,
- const char *msgbuf,
- size_t msgbuf_size,
- unsigned int priority,
- struct GNUNET_TIME_Relative timeout,
- const void *addr,
- size_t addrlen)
+struct GNUNET_DV_ServiceHandle *
+GNUNET_DV_service_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
+ void *cls,
+ GNUNET_DV_ConnectCallback connect_cb,
+ GNUNET_DV_DistanceChangedCallback distance_cb,
+ GNUNET_DV_DisconnectCallback disconnect_cb,
+ GNUNET_DV_MessageReceivedCallback message_cb)
{
- struct GNUNET_DV_SendMessage *msg;
-
- msg = GNUNET_malloc(sizeof(struct GNUNET_DV_SendMessage) + msgbuf_size + addrlen);
- msg->header.size = htons(sizeof(struct GNUNET_DV_SendMessage) + msgbuf_size + addrlen);
- msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND);
- memcpy(&msg->target, target, sizeof(struct GNUNET_PeerIdentity));
- msg->msgbuf = GNUNET_malloc(msgbuf_size);
- memcpy(msg->msgbuf, msgbuf, msgbuf_size);
- msg->msgbuf_size = htons(msgbuf_size);
- msg->priority = htonl(priority);
- msg->timeout = timeout;
- msg->addrlen = htons(addrlen);
- memcpy(&msg[1], addr, addrlen);
-
- add_pending(dv_handle, msg);
-
- return GNUNET_OK;
+ struct GNUNET_DV_ServiceHandle *sh;
+
+ sh = GNUNET_new (struct GNUNET_DV_ServiceHandle);
+ sh->cfg = cfg;
+ sh->cls = cls;
+ sh->connect_cb = connect_cb;
+ sh->distance_cb = distance_cb;
+ sh->disconnect_cb = disconnect_cb;
+ sh->message_cb = message_cb;
+ sh->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
+ reconnect (sh);
+ return sh;
}
+
/**
- * Connect to the DV service
- *
- * @param sched the scheduler to use
- * @param cfg the configuration to use
- * @param receive_handler method call when on receipt from the service
- * @param receive_handler_cls closure for receive_handler
+ * Disconnect from DV service.
*
- * @return handle to the DV service
+ * @param sh service handle
*/
-struct GNUNET_DV_Handle *
-GNUNET_DV_connect (struct GNUNET_SCHEDULER_Handle *sched,
- const struct GNUNET_CONFIGURATION_Handle *cfg,
- GNUNET_DV_MessageReceivedHandler receive_handler,
- void *receive_handler_cls)
+void
+GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh)
{
- struct GNUNET_DV_Handle *handle;
+ struct GNUNET_DV_TransmitHandle *pos;
- handle = GNUNET_malloc(sizeof(struct GNUNET_DV_Handle));
+ if (NULL == sh)
+ return;
+ if (NULL != sh->th)
+ {
+ GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th);
+ sh->th = NULL;
+ }
+ while (NULL != (pos = sh->th_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (sh->th_head,
+ sh->th_tail,
+ pos);
+ GNUNET_free (pos);
+ }
+ if (NULL != sh->client)
+ {
+ GNUNET_CLIENT_disconnect (sh->client);
+ sh->client = NULL;
+ }
+ GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
+ &cleanup_send_cb,
+ sh);
+ GNUNET_CONTAINER_multipeermap_destroy (sh->peers);
+ GNUNET_free (sh);
+}
- handle->cfg = cfg;
- handle->sched = sched;
- handle->pending_list = NULL;
- handle->current = NULL;
- handle->do_destroy = GNUNET_NO;
- handle->th = NULL;
- handle->client = GNUNET_CLIENT_connect(sched, "dv", cfg);
- handle->receive_handler = receive_handler;
- handle->receive_cls = receive_handler_cls;
- if (handle->client == NULL)
- return NULL;
-
- GNUNET_CLIENT_receive (handle->client,
- &handle_message_receipt,
- handle, GNUNET_TIME_UNIT_FOREVER_REL);
+/**
+ * Send a message via DV service.
+ *
+ * @param sh service handle
+ * @param target intended recpient
+ * @param msg message payload
+ * @param cb function to invoke when done
+ * @param cb_cls closure for @a cb
+ * @return handle to cancel the operation
+ */
+struct GNUNET_DV_TransmitHandle *
+GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh,
+ const struct GNUNET_PeerIdentity *target,
+ const struct GNUNET_MessageHeader *msg,
+ GNUNET_DV_MessageSentCallback cb,
+ void *cb_cls)
+{
+ struct GNUNET_DV_TransmitHandle *th;
+ struct GNUNET_DV_SendMessage *sm;
+ struct ConnectedPeer *peer;
- return handle;
+ if (ntohs (msg->size) + sizeof (struct GNUNET_DV_SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ {
+ GNUNET_break (0);
+ return NULL;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Asked to send %u bytes of type %u to %s\n",
+ (unsigned int) msg->size,
+ (unsigned int) msg->type,
+ GNUNET_i2s (target));
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ target);
+ if (NULL == peer)
+ {
+ GNUNET_break (0);
+ return NULL;
+ }
+ th = GNUNET_malloc (sizeof (struct GNUNET_DV_TransmitHandle) +
+ sizeof (struct GNUNET_DV_SendMessage) +
+ ntohs (msg->size));
+ th->sh = sh;
+ th->target = peer;
+ th->cb = cb;
+ th->cb_cls = cb_cls;
+ th->msg = (const struct GNUNET_MessageHeader *) &th[1];
+ sm = (struct GNUNET_DV_SendMessage *) &th[1];
+ sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND);
+ sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) +
+ ntohs (msg->size));
+ if (0 == sh->uid_gen)
+ sh->uid_gen = 1;
+ th->uid = sh->uid_gen;
+ sm->uid = htonl (sh->uid_gen++);
+ /* use memcpy here as 'target' may not be sufficiently aligned */
+ memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity));
+ memcpy (&sm[1], msg, ntohs (msg->size));
+ GNUNET_CONTAINER_DLL_insert (sh->th_head,
+ sh->th_tail,
+ th);
+ start_transmit (sh);
+ return th;
}
+
/**
- * Disconnect from the DV service
+ * Abort send operation (naturally, the message may have
+ * already been transmitted; this only stops the 'cb'
+ * from being called again).
*
- * @param handle the current handle to the service to disconnect
+ * @param th send operation to cancel
*/
-void GNUNET_DV_disconnect(struct GNUNET_DV_Handle *handle)
+void
+GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th)
{
- struct PendingMessages *pos;
-
- GNUNET_assert(handle != NULL);
+ struct GNUNET_DV_ServiceHandle *sh = th->sh;
- if (handle->th != NULL) /* We have a live transmit request in the Aether */
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
- handle->th = NULL;
- }
- if (handle->current != NULL) /* We are trying to send something now, clean it up */
- GNUNET_free(handle->current);
- while (NULL != (pos = handle->pending_list)) /* Remove all pending sends from the list */
- {
- handle->pending_list = pos->next;
- GNUNET_free(pos);
- }
- if (handle->client != NULL) /* Finally, disconnect from the service */
- {
- GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
- handle->client = NULL;
- }
-
- GNUNET_free (handle);
+ if (NULL == th->msg)
+ GNUNET_CONTAINER_DLL_remove (th->target->head,
+ th->target->tail,
+ th);
+ else
+ GNUNET_CONTAINER_DLL_remove (sh->th_head,
+ sh->th_tail,
+ th);
+ GNUNET_free (th);
}
+
/* end of dv_api.c */