#define LOG(kind,...) GNUNET_log_from (kind, "dv-api",__VA_ARGS__)
+/**
+ * Information we track for each peer.
+ */
+struct ConnectedPeer;
+
+
/**
* Handle for a send operation.
*/
GNUNET_DV_MessageSentCallback cb;
/**
- * Closure for 'cb'.
+ * Closure for @a cb.
*/
void *cb_cls;
-
+
/**
* The actual message (allocated at the end of this struct).
*/
/**
* Destination for the message.
*/
- struct GNUNET_PeerIdentity target;
+ struct ConnectedPeer *target;
+
+ /**
+ * UID of our message, if any.
+ */
+ uint32_t uid;
+
+};
+
+
+/**
+ * Information we track for each peer.
+ */
+struct ConnectedPeer
+{
+
+ /**
+ * Identity of the peer.
+ */
+ struct GNUNET_PeerIdentity pid;
+
+ /**
+ * Head of DLL of transmission handles where we need
+ * to invoke a continuation when we are informed about
+ * successful transmission. The respective request
+ * has already been sent to the DV service.
+ */
+ struct GNUNET_DV_TransmitHandle *head;
+
+ /**
+ * Tail of DLL of transmission handles where we need
+ * to invoke a continuation when we are informed about
+ * successful transmission. The respective request
+ * has already been sent to the DV service.
+ */
+ struct GNUNET_DV_TransmitHandle *tail;
};
* Closure for the callbacks.
*/
void *cls;
-
+
/**
* Function to call on connect events.
*/
GNUNET_DV_ConnectCallback connect_cb;
+ /**
+ * Function to call on distance change events.
+ */
+ GNUNET_DV_DistanceChangedCallback distance_cb;
+
/**
* Function to call on disconnect events.
*/
struct GNUNET_DV_TransmitHandle *th_tail;
/**
- * Mapping of peer identities to TransmitHandles to invoke
- * upon successful transmission. The respective
- * transmissions have already been done.
+ * Information tracked per connected peer. Maps peer
+ * identities to `struct ConnectedPeer` entries.
*/
- struct GNUNET_CONTAINER_MultiHashMap *send_callbacks;
+ struct GNUNET_CONTAINER_MultiPeerMap *peers;
/**
* Current unique ID
* @param cls handle to the dv service (struct GNUNET_DV_ServiceHandle)
* @param size how many bytes can we send
* @param buf where to copy the message to send
- * @return how many bytes we copied to buf
+ * @return how many bytes we copied to @a buf
*/
static size_t
transmit_pending (void *cls, size_t size, void *buf)
sh->th_tail,
th);
memcpy (&cbuf[ret], th->msg, tsize);
+ th->msg = NULL;
ret += tsize;
- (void) GNUNET_CONTAINER_multihashmap_put (sh->send_callbacks,
- &th->target.hashPubKey,
- th,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ if (NULL != th->cb)
+ {
+ GNUNET_CONTAINER_DLL_insert (th->target->head,
+ th->target->tail,
+ th);
+ }
+ else
+ {
+ GNUNET_free (th);
+ }
}
return ret;
}
if (NULL != sh->th)
return;
if (NULL == sh->th_head)
- return;
+ return;
sh->th =
GNUNET_CLIENT_notify_transmit_ready (sh->client,
ntohs (sh->th_head->msg->size),
GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO,
+ GNUNET_NO,
&transmit_pending, sh);
}
+/**
+ * We got disconnected from the service and thus all of the
+ * pending send callbacks will never be confirmed. Clean up.
+ *
+ * @param cls the 'struct GNUNET_DV_ServiceHandle'
+ * @param key a peer identity
+ * @param value a `struct ConnectedPeer` to clean up
+ * @return #GNUNET_OK (continue to iterate)
+ */
+static int
+cleanup_send_cb (void *cls,
+ const struct GNUNET_PeerIdentity *key,
+ void *value)
+{
+ struct GNUNET_DV_ServiceHandle *sh = cls;
+ struct ConnectedPeer *peer = value;
+ struct GNUNET_DV_TransmitHandle *th;
+
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (sh->peers,
+ key,
+ peer));
+ sh->disconnect_cb (sh->cls,
+ key);
+ while (NULL != (th = peer->head))
+ {
+ GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, th);
+ th->cb (th->cb_cls, GNUNET_SYSERR);
+ GNUNET_free (th);
+ }
+ return GNUNET_OK;
+}
+
+
/**
* Handles a message sent from the DV service to us.
* Parse it out and give it to the plugin.
* @param msg the message that was received
*/
static void
-handle_message_receipt (void *cls,
+handle_message_receipt (void *cls,
const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_DV_ServiceHandle *sh = cls;
const struct GNUNET_DV_ConnectMessage *cm;
+ const struct GNUNET_DV_DistanceUpdateMessage *dum;
const struct GNUNET_DV_DisconnectMessage *dm;
const struct GNUNET_DV_ReceivedMessage *rm;
const struct GNUNET_MessageHeader *payload;
+ const struct GNUNET_DV_AckMessage *ack;
+ struct GNUNET_DV_TransmitHandle *th;
+ struct GNUNET_DV_TransmitHandle *tn;
+ struct ConnectedPeer *peer;
if (NULL == msg)
{
reconnect (sh);
return;
}
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u from DV service\n",
+ (unsigned int) msg->type);
switch (ntohs (msg->type))
{
case GNUNET_MESSAGE_TYPE_DV_CONNECT:
return;
}
cm = (const struct GNUNET_DV_ConnectMessage *) msg;
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &cm->peer);
+ if (NULL != peer)
+ {
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
+ }
+ peer = GNUNET_new (struct ConnectedPeer);
+ peer->pid = cm->peer;
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_put (sh->peers,
+ &peer->pid,
+ peer,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
sh->connect_cb (sh->cls,
&cm->peer,
- ntohl (cm->distance));
+ ntohl (cm->distance), ntohl (cm->network));
+ break;
+ case GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED:
+ if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DistanceUpdateMessage))
+ {
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
+ }
+ dum = (const struct GNUNET_DV_DistanceUpdateMessage *) msg;
+ sh->distance_cb (sh->cls,
+ &dum->peer,
+ ntohl (dum->distance));
break;
case GNUNET_MESSAGE_TYPE_DV_DISCONNECT:
if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DisconnectMessage))
return;
}
dm = (const struct GNUNET_DV_DisconnectMessage *) msg;
- sh->disconnect_cb (sh->cls,
- &dm->peer);
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &dm->peer);
+ if (NULL == peer)
+ {
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
+ }
+ tn = sh->th_head;
+ while (NULL != (th = tn))
+ {
+ tn = th->next;
+ if (peer == th->target)
+ {
+ GNUNET_CONTAINER_DLL_remove (sh->th_head,
+ sh->th_tail,
+ th);
+ th->cb (th->cb_cls, GNUNET_SYSERR);
+ GNUNET_free (th);
+ }
+ }
+ cleanup_send_cb (sh, &dm->peer, peer);
break;
case GNUNET_MESSAGE_TYPE_DV_RECV:
if (ntohs (msg->size) < sizeof (struct GNUNET_DV_ReceivedMessage) + sizeof (struct GNUNET_MessageHeader))
ntohl (rm->distance),
payload);
break;
+ case GNUNET_MESSAGE_TYPE_DV_SEND_ACK:
+ case GNUNET_MESSAGE_TYPE_DV_SEND_NACK:
+ if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage))
+ {
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
+ }
+ ack = (const struct GNUNET_DV_AckMessage *) msg;
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ &ack->target);
+ if (NULL == peer)
+ return; /* this happens, just ignore */
+ for (th = peer->head; NULL != th; th = th->next)
+ {
+ if (th->uid != ntohl (ack->uid))
+ continue;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Matched ACK for message to peer %s\n",
+ GNUNET_i2s (&ack->target));
+ GNUNET_CONTAINER_DLL_remove (peer->head,
+ peer->tail,
+ th);
+ th->cb (th->cb_cls,
+ (ntohs (ack->header.type) == GNUNET_MESSAGE_TYPE_DV_SEND_ACK)
+ ? GNUNET_OK
+ : GNUNET_SYSERR);
+ GNUNET_free (th);
+ break;
+ }
+ break;
default:
reconnect (sh);
break;
}
- GNUNET_CLIENT_receive (sh->client,
+ GNUNET_CLIENT_receive (sh->client,
&handle_message_receipt, sh,
GNUNET_TIME_UNIT_FOREVER_REL);
}
/**
* Transmit the start message to the DV service.
*
- * @param cls the 'struct GNUNET_DV_ServiceHandle'
+ * @param cls the `struct GNUNET_DV_ServiceHandle *`
* @param size number of bytes available in buf
* @param buf where to copy the message
* @return number of bytes written to buf
- */
+ */
static size_t
transmit_start (void *cls,
size_t size,
}
-/**
- * We got disconnected from the service and thus all of the
- * pending send callbacks will never be confirmed. Clean up.
- *
- * @param cls the 'struct GNUNET_DV_ServiceHandle'
- * @param key a peer identity
- * @param value a 'struct GNUNET_DV_TransmitHandle' to clean up
- * @return GNUNET_OK (continue to iterate)
- */
-static int
-cleanup_send_cb (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct GNUNET_DV_ServiceHandle *sh = cls;
- struct GNUNET_DV_TransmitHandle *th = value;
-
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (sh->send_callbacks,
- key,
- th));
- th->cb (th->cb_cls, GNUNET_SYSERR);
- GNUNET_free (th);
- return GNUNET_OK;
-}
-
-
/**
* Disconnect and then reconnect to the DV service.
*
*/
static void
reconnect (struct GNUNET_DV_ServiceHandle *sh)
-{
+{
if (NULL != sh->th)
{
GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th);
GNUNET_CLIENT_disconnect (sh->client);
sh->client = NULL;
}
- GNUNET_CONTAINER_multihashmap_iterate (sh->send_callbacks,
+ GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
&cleanup_send_cb,
sh);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Connecting to DV service\n");
sh->client = GNUNET_CLIENT_connect ("dv", sh->cfg);
if (NULL == sh->client)
{
* @param cfg configuration
* @param cls closure for callbacks
* @param connect_cb function to call on connects
+ * @param distance_cb function to call if distances change
* @param disconnect_cb function to call on disconnects
* @param message_cb function to call if we receive messages
* @return handle to access the service
GNUNET_DV_service_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
void *cls,
GNUNET_DV_ConnectCallback connect_cb,
+ GNUNET_DV_DistanceChangedCallback distance_cb,
GNUNET_DV_DisconnectCallback disconnect_cb,
GNUNET_DV_MessageReceivedCallback message_cb)
{
struct GNUNET_DV_ServiceHandle *sh;
- sh = GNUNET_malloc (sizeof (struct GNUNET_DV_ServiceHandle));
+ sh = GNUNET_new (struct GNUNET_DV_ServiceHandle);
sh->cfg = cfg;
sh->cls = cls;
sh->connect_cb = connect_cb;
+ sh->distance_cb = distance_cb;
sh->disconnect_cb = disconnect_cb;
sh->message_cb = message_cb;
- sh->send_callbacks = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_YES);
+ sh->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
reconnect (sh);
return sh;
}
GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh)
{
struct GNUNET_DV_TransmitHandle *pos;
-
+
if (NULL == sh)
return;
if (NULL != sh->th)
pos);
GNUNET_free (pos);
}
- if (NULL != sh->client)
+ if (NULL != sh->client)
{
GNUNET_CLIENT_disconnect (sh->client);
sh->client = NULL;
}
- GNUNET_CONTAINER_multihashmap_iterate (sh->send_callbacks,
+ GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
&cleanup_send_cb,
sh);
- GNUNET_CONTAINER_multihashmap_destroy (sh->send_callbacks);
+ GNUNET_CONTAINER_multipeermap_destroy (sh->peers);
GNUNET_free (sh);
}
* @param target intended recpient
* @param msg message payload
* @param cb function to invoke when done
- * @param cb_cls closure for 'cb'
+ * @param cb_cls closure for @a cb
* @return handle to cancel the operation
*/
struct GNUNET_DV_TransmitHandle *
{
struct GNUNET_DV_TransmitHandle *th;
struct GNUNET_DV_SendMessage *sm;
+ struct ConnectedPeer *peer;
if (ntohs (msg->size) + sizeof (struct GNUNET_DV_SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
{
GNUNET_break (0);
return NULL;
}
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Asked to send %u bytes of type %u to %s\n",
+ (unsigned int) msg->size,
+ (unsigned int) msg->type,
+ GNUNET_i2s (target));
+ peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+ target);
+ if (NULL == peer)
+ {
+ GNUNET_break (0);
+ return NULL;
+ }
th = GNUNET_malloc (sizeof (struct GNUNET_DV_TransmitHandle) +
sizeof (struct GNUNET_DV_SendMessage) +
ntohs (msg->size));
th->sh = sh;
- th->target = *target;
+ th->target = peer;
th->cb = cb;
th->cb_cls = cb_cls;
th->msg = (const struct GNUNET_MessageHeader *) &th[1];
sm = (struct GNUNET_DV_SendMessage *) &th[1];
sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND);
- sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) +
+ sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) +
ntohs (msg->size));
+ if (0 == sh->uid_gen)
+ sh->uid_gen = 1;
+ th->uid = sh->uid_gen;
+ sm->uid = htonl (sh->uid_gen++);
/* use memcpy here as 'target' may not be sufficiently aligned */
memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity));
memcpy (&sm[1], msg, ntohs (msg->size));
GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th)
{
struct GNUNET_DV_ServiceHandle *sh = th->sh;
- int ret;
- ret = GNUNET_CONTAINER_multihashmap_remove (sh->send_callbacks,
- &th->target.hashPubKey,
- th);
- if (GNUNET_YES != ret)
+ if (NULL == th->msg)
+ GNUNET_CONTAINER_DLL_remove (th->target->head,
+ th->target->tail,
+ th);
+ else
GNUNET_CONTAINER_DLL_remove (sh->th_head,
sh->th_tail,
th);