/*
This file is part of GNUnet.
- Copyright (C) 2009--2013 GNUnet e.V.
+ Copyright (C) 2009--2013, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
#define LOG(kind,...) GNUNET_log_from (kind, "dv-api",__VA_ARGS__)
-/**
- * Information we track for each peer.
- */
-struct ConnectedPeer;
-
-
-/**
- * Handle for a send operation.
- */
-struct GNUNET_DV_TransmitHandle
-{
- /**
- * Kept in a DLL.
- */
- struct GNUNET_DV_TransmitHandle *next;
-
- /**
- * Kept in a DLL.
- */
- struct GNUNET_DV_TransmitHandle *prev;
-
- /**
- * Handle to the service.
- */
- struct GNUNET_DV_ServiceHandle *sh;
-
- /**
- * Function to call upon completion.
- */
- GNUNET_DV_MessageSentCallback cb;
-
- /**
- * Closure for @a cb.
- */
- void *cb_cls;
-
- /**
- * The actual message (allocated at the end of this struct).
- */
- const struct GNUNET_MessageHeader *msg;
-
- /**
- * Destination for the message.
- */
- struct ConnectedPeer *target;
-
- /**
- * UID of our message, if any.
- */
- uint32_t uid;
-
-};
-
-
/**
* Information we track for each peer.
*/
*/
struct GNUNET_PeerIdentity pid;
- /**
- * Head of DLL of transmission handles where we need
- * to invoke a continuation when we are informed about
- * successful transmission. The respective request
- * has already been sent to the DV service.
- */
- struct GNUNET_DV_TransmitHandle *head;
-
- /**
- * Tail of DLL of transmission handles where we need
- * to invoke a continuation when we are informed about
- * successful transmission. The respective request
- * has already been sent to the DV service.
- */
- struct GNUNET_DV_TransmitHandle *tail;
-
};
/**
* Connection to DV service.
*/
- struct GNUNET_CLIENT_Connection *client;
-
- /**
- * Active request for transmission to DV service.
- */
- struct GNUNET_CLIENT_TransmitHandle *th;
+ struct GNUNET_MQ_Handle *mq;
/**
* Our configuration.
*/
GNUNET_DV_MessageReceivedCallback message_cb;
- /**
- * Head of messages to transmit.
- */
- struct GNUNET_DV_TransmitHandle *th_head;
-
- /**
- * Tail of messages to transmit.
- */
- struct GNUNET_DV_TransmitHandle *th_tail;
-
/**
* Information tracked per connected peer. Maps peer
* identities to `struct ConnectedPeer` entries.
*/
struct GNUNET_CONTAINER_MultiPeerMap *peers;
- /**
- * Current unique ID
- */
- uint32_t uid_gen;
-
};
/**
- * Start sending messages from our queue to the service.
+ * We got disconnected from the service and thus all of the
+ * connections need to be torn down.
*
- * @param sh service handle
+ * @param cls the `struct GNUNET_DV_ServiceHandle`
+ * @param key a peer identity
+ * @param value a `struct ConnectedPeer` to clean up
+ * @return #GNUNET_OK (continue to iterate)
*/
-static void
-start_transmit (struct GNUNET_DV_ServiceHandle *sh);
+static int
+cleanup_send_cb (void *cls,
+ const struct GNUNET_PeerIdentity *key,
+ void *value)
+{
+ struct GNUNET_DV_ServiceHandle *sh = cls;
+ struct ConnectedPeer *peer = value;
+
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (sh->peers,
+ key,
+ peer));
+ sh->disconnect_cb (sh->cls,
+ key);
+ GNUNET_free (peer);
+ return GNUNET_OK;
+}
/**
- * Gives a message from our queue to the DV service.
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
*
- * @param cls handle to the dv service (`struct GNUNET_DV_ServiceHandle`)
- * @param size how many bytes can we send
- * @param buf where to copy the message to send
- * @return how many bytes we copied to @a buf
+ * @param cls the handle to the DV API
+ * @param cm the message that was received
*/
-static size_t
-transmit_pending (void *cls, size_t size, void *buf)
+static void
+handle_connect (void *cls,
+ const struct GNUNET_DV_ConnectMessage *cm)
{
struct GNUNET_DV_ServiceHandle *sh = cls;
- char *cbuf = buf;
- struct GNUNET_DV_TransmitHandle *th;
- size_t ret;
- size_t tsize;
+ struct ConnectedPeer *peer;
- sh->th = NULL;
- if (NULL == buf)
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &cm->peer);
+ if (NULL != peer)
{
+ GNUNET_break (0);
reconnect (sh);
- return 0;
+ return;
}
- ret = 0;
- while ( (NULL != (th = sh->th_head)) &&
- (size - ret >= (tsize = ntohs (th->msg->size)) ))
+ peer = GNUNET_new (struct ConnectedPeer);
+ peer->pid = cm->peer;
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_put (sh->peers,
+ &peer->pid,
+ peer,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ sh->connect_cb (sh->cls,
+ &cm->peer,
+ ntohl (cm->distance),
+ (enum GNUNET_ATS_Network_Type) ntohl (cm->network));
+}
+
+
+/**
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
+ *
+ * @param cls the handle to the DV API
+ * @param dm the message that was received
+ */
+static void
+handle_disconnect (void *cls,
+ const struct GNUNET_DV_DisconnectMessage *dm)
+{
+ struct GNUNET_DV_ServiceHandle *sh = cls;
+ struct ConnectedPeer *peer;
+
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &dm->peer);
+ if (NULL == peer)
{
- GNUNET_CONTAINER_DLL_remove (sh->th_head,
- sh->th_tail,
- th);
- memcpy (&cbuf[ret], th->msg, tsize);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Passing %u bytes of type %u to DV service\n",
- tsize,
- ntohs (th->msg->type));
- th->msg = NULL;
- ret += tsize;
- if (NULL != th->cb)
- {
- GNUNET_CONTAINER_DLL_insert_tail (th->target->head,
- th->target->tail,
- th);
- }
- else
- {
- GNUNET_free (th);
- }
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
}
- if (NULL != sh->th_head)
- start_transmit (sh);
- return ret;
+ cleanup_send_cb (sh,
+ &dm->peer,
+ peer);
}
/**
- * Start sending messages from our queue to the service.
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
*
- * @param sh service handle
+ * @param cls the handle to the DV API
+ * @param msg the message that was received
*/
static void
-start_transmit (struct GNUNET_DV_ServiceHandle *sh)
+handle_distance_update (void *cls,
+ const struct GNUNET_DV_DistanceUpdateMessage *dum)
{
- if (NULL != sh->th)
- return;
- if (NULL == sh->th_head)
+ struct GNUNET_DV_ServiceHandle *sh = cls;
+ struct ConnectedPeer *peer;
+
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &dum->peer);
+ if (NULL == peer)
+ {
+ GNUNET_break (0);
+ reconnect (sh);
return;
- sh->th =
- GNUNET_CLIENT_notify_transmit_ready (sh->client,
- ntohs (sh->th_head->msg->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO,
- &transmit_pending, sh);
+ }
+ sh->distance_cb (sh->cls,
+ &dum->peer,
+ ntohl (dum->distance),
+ (enum GNUNET_ATS_Network_Type) ntohl (dum->network));
}
/**
- * We got disconnected from the service and thus all of the
- * pending send callbacks will never be confirmed. Clean up.
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
*
- * @param cls the 'struct GNUNET_DV_ServiceHandle'
- * @param key a peer identity
- * @param value a `struct ConnectedPeer` to clean up
- * @return #GNUNET_OK (continue to iterate)
+ * @param cls the handle to the DV API
+ * @param rm the message that was received
*/
static int
-cleanup_send_cb (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
+check_received (void *cls,
+ const struct GNUNET_DV_ReceivedMessage *rm)
{
struct GNUNET_DV_ServiceHandle *sh = cls;
- struct ConnectedPeer *peer = value;
- struct GNUNET_DV_TransmitHandle *th;
+ const struct GNUNET_MessageHeader *payload;
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_remove (sh->peers,
- key,
- peer));
- sh->disconnect_cb (sh->cls,
- key);
- while (NULL != (th = peer->head))
+ if (NULL ==
+ GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &rm->sender))
{
- GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, th);
- th->cb (th->cb_cls);
- GNUNET_free (th);
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ if (ntohs (rm->header.size) - sizeof (struct GNUNET_DV_ReceivedMessage) <
+ sizeof (*payload))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ payload = (const struct GNUNET_MessageHeader *) &rm[1];
+ if (ntohs (rm->header.size) !=
+ sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
}
- GNUNET_free (peer);
return GNUNET_OK;
}
* Parse it out and give it to the plugin.
*
* @param cls the handle to the DV API
- * @param msg the message that was received
+ * @param rm the message that was received
*/
static void
-handle_message_receipt (void *cls,
- const struct GNUNET_MessageHeader *msg)
+handle_received (void *cls,
+ const struct GNUNET_DV_ReceivedMessage *rm)
{
struct GNUNET_DV_ServiceHandle *sh = cls;
- const struct GNUNET_DV_ConnectMessage *cm;
- const struct GNUNET_DV_DistanceUpdateMessage *dum;
- const struct GNUNET_DV_DisconnectMessage *dm;
- const struct GNUNET_DV_ReceivedMessage *rm;
const struct GNUNET_MessageHeader *payload;
- const struct GNUNET_DV_AckMessage *ack;
- struct GNUNET_DV_TransmitHandle *th;
- struct GNUNET_DV_TransmitHandle *tn;
- struct ConnectedPeer *peer;
- if (NULL == msg)
- {
- /* Connection closed */
- reconnect (sh);
- return;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u with %u bytes from DV service\n",
- (unsigned int) ntohs (msg->type),
- (unsigned int) ntohs (msg->size));
- switch (ntohs (msg->type))
- {
- case GNUNET_MESSAGE_TYPE_DV_CONNECT:
- if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ConnectMessage))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- cm = (const struct GNUNET_DV_ConnectMessage *) msg;
- peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
- &cm->peer);
- if (NULL != peer)
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- peer = GNUNET_new (struct ConnectedPeer);
- peer->pid = cm->peer;
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_put (sh->peers,
- &peer->pid,
- peer,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- sh->connect_cb (sh->cls,
- &cm->peer,
- ntohl (cm->distance),
- (enum GNUNET_ATS_Network_Type) ntohl (cm->network));
- break;
- case GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED:
- if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DistanceUpdateMessage))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- dum = (const struct GNUNET_DV_DistanceUpdateMessage *) msg;
- sh->distance_cb (sh->cls,
- &dum->peer,
- ntohl (dum->distance),
- (enum GNUNET_ATS_Network_Type) ntohl (dum->network));
- break;
- case GNUNET_MESSAGE_TYPE_DV_DISCONNECT:
- if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DisconnectMessage))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- dm = (const struct GNUNET_DV_DisconnectMessage *) msg;
- peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
- &dm->peer);
- if (NULL == peer)
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- tn = sh->th_head;
- while (NULL != (th = tn))
- {
- tn = th->next;
- if (peer == th->target)
- {
- GNUNET_CONTAINER_DLL_remove (sh->th_head,
- sh->th_tail,
- th);
- th->cb (th->cb_cls);
- GNUNET_free (th);
- }
- }
- cleanup_send_cb (sh, &dm->peer, peer);
- break;
- case GNUNET_MESSAGE_TYPE_DV_RECV:
- if (ntohs (msg->size) < sizeof (struct GNUNET_DV_ReceivedMessage) + sizeof (struct GNUNET_MessageHeader))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- rm = (const struct GNUNET_DV_ReceivedMessage *) msg;
- payload = (const struct GNUNET_MessageHeader *) &rm[1];
- if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- if (NULL ==
- GNUNET_CONTAINER_multipeermap_get (sh->peers,
- &rm->sender))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- sh->message_cb (sh->cls,
- &rm->sender,
- ntohl (rm->distance),
- payload);
- break;
- case GNUNET_MESSAGE_TYPE_DV_SEND_ACK:
- case GNUNET_MESSAGE_TYPE_DV_SEND_NACK:
- if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage))
- {
- GNUNET_break (0);
- reconnect (sh);
- return;
- }
- ack = (const struct GNUNET_DV_AckMessage *) msg;
- peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
- &ack->target);
- if (NULL == peer)
- break; /* this happens, just ignore */
- for (th = peer->head; NULL != th; th = th->next)
- {
- if (th->uid != ntohl (ack->uid))
- continue;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Matched ACK for message to peer %s\n",
- GNUNET_i2s (&ack->target));
- GNUNET_CONTAINER_DLL_remove (peer->head,
- peer->tail,
- th);
- th->cb (th->cb_cls);
- GNUNET_free (th);
- break;
- }
- break;
- default:
- reconnect (sh);
- break;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received message, continuing receive loop for %p\n",
- sh->client);
- GNUNET_CLIENT_receive (sh->client,
- &handle_message_receipt, sh,
- GNUNET_TIME_UNIT_FOREVER_REL);
+ payload = (const struct GNUNET_MessageHeader *) &rm[1];
+ sh->message_cb (sh->cls,
+ &rm->sender,
+ ntohl (rm->distance),
+ payload);
}
/**
- * Transmit the start message to the DV service.
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
*
- * @param cls the `struct GNUNET_DV_ServiceHandle *`
- * @param size number of bytes available in buf
- * @param buf where to copy the message
- * @return number of bytes written to buf
+ * @param cls closure with the `struct GNUNET_DV_ServiceHandle *`
+ * @param error error code
*/
-static size_t
-transmit_start (void *cls,
- size_t size,
- void *buf)
+static void
+mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
{
struct GNUNET_DV_ServiceHandle *sh = cls;
- struct GNUNET_MessageHeader start_message;
- sh->th = NULL;
- if (NULL == buf)
- {
- GNUNET_break (0);
- reconnect (sh);
- return 0;
- }
- GNUNET_assert (size >= sizeof (start_message));
- start_message.size = htons (sizeof (struct GNUNET_MessageHeader));
- start_message.type = htons (GNUNET_MESSAGE_TYPE_DV_START);
- memcpy (buf, &start_message, sizeof (start_message));
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting START request, starting receive loop for %p\n",
- sh->client);
- GNUNET_CLIENT_receive (sh->client,
- &handle_message_receipt, sh,
- GNUNET_TIME_UNIT_FOREVER_REL);
- start_transmit (sh);
- return sizeof (start_message);
+ reconnect (sh);
}
static void
reconnect (struct GNUNET_DV_ServiceHandle *sh)
{
- if (NULL != sh->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th);
- sh->th = NULL;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Disconnecting from DV service at %p\n",
- sh->client);
- if (NULL != sh->client)
+ GNUNET_MQ_hd_fixed_size (connect,
+ GNUNET_MESSAGE_TYPE_DV_CONNECT,
+ struct GNUNET_DV_ConnectMessage);
+ GNUNET_MQ_hd_fixed_size (disconnect,
+ GNUNET_MESSAGE_TYPE_DV_DISCONNECT,
+ struct GNUNET_DV_DisconnectMessage);
+ GNUNET_MQ_hd_fixed_size (distance_update,
+ GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED,
+ struct GNUNET_DV_DistanceUpdateMessage);
+ GNUNET_MQ_hd_var_size (received,
+ GNUNET_MESSAGE_TYPE_DV_RECV,
+ struct GNUNET_DV_ReceivedMessage);
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_connect_handler (sh),
+ make_disconnect_handler (sh),
+ make_distance_update_handler (sh),
+ make_received_handler (sh),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MessageHeader *sm;
+ struct GNUNET_MQ_Envelope *env;
+
+ if (NULL != sh->mq)
{
- GNUNET_CLIENT_disconnect (sh->client);
- sh->client = NULL;
+ GNUNET_MQ_destroy (sh->mq);
+ sh->mq = NULL;
}
GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
&cleanup_send_cb,
sh);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Connecting to DV service\n");
- sh->client = GNUNET_CLIENT_connect ("dv", sh->cfg);
- if (NULL == sh->client)
+ sh->mq = GNUNET_CLIENT_connecT (sh->cfg,
+ "dv",
+ handlers,
+ &mq_error_handler,
+ sh);
+ if (NULL == sh->mq)
{
GNUNET_break (0);
return;
}
- sh->th = GNUNET_CLIENT_notify_transmit_ready (sh->client,
- sizeof (struct GNUNET_MessageHeader),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &transmit_start,
- sh);
+ env = GNUNET_MQ_msg (sm,
+ GNUNET_MESSAGE_TYPE_DV_START);
+ GNUNET_MQ_send (sh->mq,
+ env);
}
sh->distance_cb = distance_cb;
sh->disconnect_cb = disconnect_cb;
sh->message_cb = message_cb;
- sh->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
+ sh->peers = GNUNET_CONTAINER_multipeermap_create (128,
+ GNUNET_YES);
reconnect (sh);
return sh;
}
void
GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh)
{
- struct GNUNET_DV_TransmitHandle *pos;
-
if (NULL == sh)
return;
- if (NULL != sh->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th);
- sh->th = NULL;
- }
- while (NULL != (pos = sh->th_head))
- {
- GNUNET_CONTAINER_DLL_remove (sh->th_head,
- sh->th_tail,
- pos);
- GNUNET_free (pos);
- }
- if (NULL != sh->client)
+ if (NULL != sh->mq)
{
- GNUNET_CLIENT_disconnect (sh->client);
- sh->client = NULL;
+ GNUNET_MQ_destroy (sh->mq);
+ sh->mq = NULL;
}
GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
&cleanup_send_cb,
* @param sh service handle
* @param target intended recpient
* @param msg message payload
- * @param cb function to invoke when done
- * @param cb_cls closure for @a cb
- * @return handle to cancel the operation
*/
-struct GNUNET_DV_TransmitHandle *
+void
GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh,
const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_MessageHeader *msg,
- GNUNET_DV_MessageSentCallback cb,
- void *cb_cls)
+ const struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_DV_TransmitHandle *th;
struct GNUNET_DV_SendMessage *sm;
struct ConnectedPeer *peer;
+ struct GNUNET_MQ_Envelope *env;
- if (ntohs (msg->size) + sizeof (struct GNUNET_DV_SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ if (ntohs (msg->size) + sizeof (*sm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
{
GNUNET_break (0);
- return NULL;
+ return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to send %u bytes of type %u to %s via %p\n",
+ "Asked to send %u bytes of type %u to %s\n",
(unsigned int) ntohs (msg->size),
(unsigned int) ntohs (msg->type),
- GNUNET_i2s (target),
- sh->client);
+ GNUNET_i2s (target));
peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
target);
if (NULL == peer)
{
GNUNET_break (0);
- return NULL;
+ return;
}
- th = GNUNET_malloc (sizeof (struct GNUNET_DV_TransmitHandle) +
- sizeof (struct GNUNET_DV_SendMessage) +
- ntohs (msg->size));
- th->sh = sh;
- th->target = peer;
- th->cb = cb;
- th->cb_cls = cb_cls;
- th->msg = (const struct GNUNET_MessageHeader *) &th[1];
- sm = (struct GNUNET_DV_SendMessage *) &th[1];
- sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND);
- sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) +
- ntohs (msg->size));
- if (0 == sh->uid_gen)
- sh->uid_gen = 1;
- th->uid = sh->uid_gen;
- sm->uid = htonl (sh->uid_gen++);
- /* use memcpy here as 'target' may not be sufficiently aligned */
- memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity));
- memcpy (&sm[1], msg, ntohs (msg->size));
- GNUNET_CONTAINER_DLL_insert_tail (sh->th_head,
- sh->th_tail,
- th);
- start_transmit (sh);
- return th;
-}
-
-
-/**
- * Abort send operation (naturally, the message may have
- * already been transmitted; this only stops the 'cb'
- * from being called again).
- *
- * @param th send operation to cancel
- */
-void
-GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th)
-{
- struct GNUNET_DV_ServiceHandle *sh = th->sh;
-
- if (NULL == th->msg)
- GNUNET_CONTAINER_DLL_remove (th->target->head,
- th->target->tail,
- th);
- else
- GNUNET_CONTAINER_DLL_remove (sh->th_head,
- sh->th_tail,
- th);
- GNUNET_free (th);
+ GNUNET_assert (NULL != sh->mq);
+ env = GNUNET_MQ_msg_nested_mh (sm,
+ GNUNET_MESSAGE_TYPE_DV_SEND,
+ msg);
+ sm->target = *target;
+ GNUNET_MQ_send (sh->mq,
+ env);
}
struct Plugin;
-/**
- * An active request for transmission via DV.
- */
-struct PendingRequest
-{
-
- /**
- * This is a DLL.
- */
- struct PendingRequest *next;
-
- /**
- * This is a DLL.
- */
- struct PendingRequest *prev;
-
- /**
- * Continuation function to call once the transmission buffer
- * has again space available. NULL if there is no
- * continuation to call.
- */
- GNUNET_TRANSPORT_TransmitContinuation transmit_cont;
-
- /**
- * Closure for @e transmit_cont.
- */
- void *transmit_cont_cls;
-
- /**
- * Transmission handle from DV client library.
- */
- struct GNUNET_DV_TransmitHandle *th;
-
- /**
- * Session of this request.
- */
- struct GNUNET_ATS_Session *session;
-
- /**
- * Number of bytes to transmit.
- */
- size_t size;
-};
-
-
/**
* Session handle for connections.
*/
*/
struct Plugin *plugin;
- /**
- * Head of pending requests.
- */
- struct PendingRequest *pr_head;
-
- /**
- * Tail of pending requests.
- */
- struct PendingRequest *pr_tail;
-
/**
* Address we use for the other peer.
*/
free_session (struct GNUNET_ATS_Session *session)
{
struct Plugin *plugin = session->plugin;
- struct PendingRequest *pr;
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
session);
session->active = GNUNET_NO;
}
- while (NULL != (pr = session->pr_head))
- {
- GNUNET_CONTAINER_DLL_remove (session->pr_head,
- session->pr_tail,
- pr);
- GNUNET_DV_send_cancel (pr->th);
- pr->th = NULL;
- if (NULL != pr->transmit_cont)
- pr->transmit_cont (pr->transmit_cont_cls,
- &session->sender,
- GNUNET_SYSERR,
- pr->size, 0);
- GNUNET_free (pr);
- }
GNUNET_HELLO_address_free (session->address);
GNUNET_free (session);
}
}
-/**
- * Function called once the delivery of a message has been successful.
- * Clean up the pending request, and call continuations.
- *
- * @param cls closure
- */
-static void
-send_finished (void *cls)
-{
- struct PendingRequest *pr = cls;
- struct GNUNET_ATS_Session *session = pr->session;
-
- pr->th = NULL;
- GNUNET_CONTAINER_DLL_remove (session->pr_head,
- session->pr_tail,
- pr);
- if (NULL != pr->transmit_cont)
- pr->transmit_cont (pr->transmit_cont_cls,
- &session->sender,
- GNUNET_OK,
- pr->size, 0);
- GNUNET_free (pr);
-}
-
-
/**
* Function that can be used by the transport service to transmit
* a message using the plugin.
size_t msgbuf_size,
unsigned int priority,
struct GNUNET_TIME_Relative timeout,
- GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
+ GNUNET_TRANSPORT_TransmitContinuation cont,
+ void *cont_cls)
{
struct Plugin *plugin = cls;
- struct PendingRequest *pr;
const struct GNUNET_MessageHeader *msg;
struct GNUNET_MessageHeader *box;
memcpy (&box[1], msgbuf, msgbuf_size);
msg = box;
}
- pr = GNUNET_new (struct PendingRequest);
- pr->transmit_cont = cont;
- pr->transmit_cont_cls = cont_cls;
- pr->session = session;
- pr->size = msgbuf_size;
- GNUNET_CONTAINER_DLL_insert_tail (session->pr_head,
- session->pr_tail,
- pr);
-
- pr->th = GNUNET_DV_send (plugin->dvh,
- &session->sender,
- msg,
- &send_finished,
- pr);
+ GNUNET_DV_send (plugin->dvh,
+ &session->sender,
+ msg);
+ cont (cont_cls,
+ &session->sender,
+ GNUNET_OK,
+ msgbuf_size, 0);
GNUNET_free_non_null (box);
return 0; /* DV */
}
{
struct Plugin *plugin = cls;
struct GNUNET_ATS_Session *session;
- struct PendingRequest *pr;
session = GNUNET_CONTAINER_multipeermap_get (plugin->sessions,
target);
if (NULL == session)
return; /* nothing to do */
- while (NULL != (pr = session->pr_head))
- {
- GNUNET_CONTAINER_DLL_remove (session->pr_head,
- session->pr_tail,
- pr);
- GNUNET_DV_send_cancel (pr->th);
- pr->th = NULL;
- if (NULL != pr->transmit_cont)
- pr->transmit_cont (pr->transmit_cont_cls,
- &session->sender,
- GNUNET_SYSERR,
- pr->size, 0);
- GNUNET_free (pr);
- }
session->active = GNUNET_NO;
}
dv_plugin_disconnect_session (void *cls,
struct GNUNET_ATS_Session *session)
{
- struct PendingRequest *pr;
-
- while (NULL != (pr = session->pr_head))
- {
- GNUNET_CONTAINER_DLL_remove (session->pr_head,
- session->pr_tail,
- pr);
- GNUNET_DV_send_cancel (pr->th);
- pr->th = NULL;
- if (NULL != pr->transmit_cont)
- pr->transmit_cont (pr->transmit_cont_cls,
- &session->sender,
- GNUNET_SYSERR,
- pr->size, 0);
- GNUNET_free (pr);
- }
session->active = GNUNET_NO;
return GNUNET_OK;
}
* @param asc_cls closure for @a asc
*/
static void
-dv_plugin_address_pretty_printer (void *cls, const char *type,
+dv_plugin_address_pretty_printer (void *cls,
+ const char *type,
const void *addr,
- size_t addrlen, int numeric,
+ size_t addrlen,
+ int numeric,
struct GNUNET_TIME_Relative timeout,
GNUNET_TRANSPORT_AddressStringCallback asc,
void *asc_cls)