};
+
+
+
+
+
+
+
+/* *********************** TNG messages ***************** */
+
+/**
+ * Add address to the list.
+ */
+struct GNUNET_TRANSPORT_AddAddressMessage
+{
+
+ /**
+ * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Address identifier (used during deletion).
+ */
+ uint32_t aid GNUNET_PACKED;
+
+ /**
+ * When does the address expire?
+ */
+ struct GNUNET_TIME_RelativeNBO expiration;
+
+ /**
+ * An `enum GNUNET_ATS_Network_Type` in NBO.
+ */
+ uint32_t nt;
+
+ /* followed by UTF-8 encoded, 0-terminated human-readable address */
+};
+
+
+/**
+ * Remove address from the list.
+ */
+struct GNUNET_TRANSPORT_DelAddressMessage
+{
+
+ /**
+ * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Address identifier.
+ */
+ uint32_t aid GNUNET_PACKED;
+
+};
+
+
+/**
+ * Inform transport about an incoming message.
+ */
+struct GNUNET_TRANSPORT_IncomingMessage
+{
+
+ /**
+ * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Do we use flow control or not?
+ */
+ uint32_t fc_on GNUNET_PACKED;
+
+ /**
+ * 64-bit number to identify the matching ACK.
+ */
+ uint64_t fc_id GNUNET_PACKED;
+
+ /**
+ * Sender identifier.
+ */
+ struct GNUNET_PeerIdentity sender GNUNET_PACKED;
+
+ /* followed by the message */
+};
+
+
+/**
+ * Transport informs us about being done with an incoming message.
+ * (only sent if fc_on was set).
+ */
+struct GNUNET_TRANSPORT_IncomingMessageAck
+{
+
+ /**
+ * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Reserved (0)
+ */
+ uint32_t reserved GNUNET_PACKED;
+
+ /**
+ * Which message is being ACKed?
+ */
+ uint64_t fc_id GNUNET_PACKED;
+
+ /**
+ * Sender identifier of the original message.
+ */
+ struct GNUNET_PeerIdentity sender GNUNET_PACKED;
+
+};
+
+
+/**
+ * Add queue to the transport
+ */
+struct GNUNET_TRANSPORT_AddQueueMessage
+{
+
+ /**
+ * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Queue identifier (used to identify the queue).
+ */
+ uint32_t qid GNUNET_PACKED;
+
+ /**
+ * Receiver that can be addressed via the queue.
+ */
+ struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
+
+ /**
+ * An `enum GNUNET_ATS_Network_Type` in NBO.
+ */
+ uint32_t nt;
+
+ /* followed by UTF-8 encoded, 0-terminated human-readable address */
+};
+
+
+/**
+ * Remove queue, it is no longer available.
+ */
+struct GNUNET_TRANSPORT_DelQueueMessage
+{
+
+ /**
+ * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Address identifier.
+ */
+ uint32_t qid GNUNET_PACKED;
+
+ /**
+ * Receiver that can be addressed via the queue.
+ */
+ struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
+
+};
+
+
+/**
+ * Transport tells communicator that it wants a new queue.
+ */
+struct GNUNET_TRANSPORT_CreateQueue
+{
+
+ /**
+ * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Always zero.
+ */
+ uint32_t reserved GNUNET_PACKED;
+
+ /**
+ * Receiver that can be addressed via the queue.
+ */
+ struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
+
+ /* followed by UTF-8 encoded, 0-terminated human-readable address */
+};
+
+
+/**
+ * Inform communicator about transport's desire to send a message.
+ */
+struct GNUNET_TRANSPORT_SendMessageTo
+{
+
+ /**
+ * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Which queue should we use?
+ */
+ uint32_t qid GNUNET_PACKED;
+
+ /**
+ * Message ID, used for flow control.
+ */
+ uint64_t mid GNUNET_PACKED;
+
+ /**
+ * Receiver identifier.
+ */
+ struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
+
+ /* followed by the message */
+};
+
+
+/**
+ * Inform transport that message was sent.
+ */
+struct GNUNET_TRANSPORT_SendMessageToAck
+{
+
+ /**
+ * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Success (#GNUNET_OK), failure (#GNUNET_SYSERR).
+ */
+ uint32_t status GNUNET_PACKED;
+
+ /**
+ * Message ID of the original message.
+ */
+ uint64_t mid GNUNET_PACKED;
+
+ /**
+ * Receiver identifier.
+ */
+ struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
+
+};
+
+
+
GNUNET_NETWORK_STRUCT_END
/* end of transport.h */
#include "transport.h"
+/**
+ * How many messages do we keep at most in the queue to the
+ * transport service before we start to drop (default,
+ * can be changed via the configuration file).
+ */
+#define DEFAULT_MAX_QUEUE_LENGTH 16
+
+
+/**
+ * Information we track per packet to enable flow control.
+ */
+struct FlowControl
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct FlowControl *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct FlowControl *prev;
+
+ /**
+ * Function to call once the message was processed.
+ */
+ GNUNET_TRANSPORT_MessageCompletedCallback cb;
+
+ /**
+ * Closure for @e cb
+ */
+ void *cb_cls;
+
+ /**
+ * Which peer is this about?
+ */
+ struct GNUNET_PeerIdentity sender;
+
+ /**
+ * More-or-less unique ID for the message.
+ */
+ uint64_t id;
+};
+
+
+/**
+ * Information we track per message to tell the transport about
+ * success or failures.
+ */
+struct AckPending
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct AckPending *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct AckPending *prev;
+
+ /**
+ * Which peer is this about?
+ */
+ struct GNUNET_PeerIdentity receiver;
+
+ /**
+ * More-or-less unique ID for the message.
+ */
+ uint64_t mid;
+};
+
+
/**
* Opaque handle to the transport service for communicators.
*/
*/
struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
+ /**
+ * DLL of messages awaiting flow control confirmation (ack).
+ */
+ struct FlowControl *fc_head;
+
+ /**
+ * DLL of messages awaiting flow control confirmation (ack).
+ */
+ struct FlowControl *fc_tail;
+
+ /**
+ * DLL of messages awaiting transmission confirmation (ack).
+ */
+ struct AckPending *ap_head;
+
+ /**
+ * DLL of messages awaiting transmission confirmation (ack).
+ */
+ struct AckPending *ac_tail;
+
+ /**
+ * DLL of queues we offer.
+ */
+ struct QueueHandle *queue_head;
+
+ /**
+ * DLL of queues we offer.
+ */
+ struct QueueHandle *queue_tail;
+
/**
* Our configuration.
*/
*/
void *mq_init_cls;
+ /**
+ * Maximum permissable queue length.
+ */
+ unsigned long long max_queue_length;
+
+ /**
+ * Flow-control identifier generator.
+ */
+ uint64_t fc_gen;
+
/**
* MTU of the communicator
*/
* transport service.
*/
uint32_t aid_gen;
+
+ /**
+ * Queue identifier generator.
+ */
+ uint32_t queue_gen;
};
+/**
+ * Handle returned to identify the internal data structure the transport
+ * API has created to manage a message queue to a particular peer.
+ */
+struct GNUNET_TRANSPORT_QueueHandle
+{
+ /**
+ * Handle this queue belongs to.
+ */
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
+
+ /**
+ * Which peer we can communciate with.
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * Address used by the communication queue.
+ */
+ char *address;
+
+ /**
+ * Network type of the communciation queue.
+ */
+ enum GNUNET_ATS_Network_Type nt;
+
+ /**
+ * The queue itself.
+ */
+ struct GNUNET_MQ_Handle *mq;
+
+ /**
+ * ID for this queue when talking to the transport service.
+ */
+ uint32_t queue_id;
+
+};
+
/**
* Internal representation of an address a communicator is
}
+/**
+ * Send message to the transport service about queue @a qh
+ * being now available.
+ *
+ * @param qh queue to add
+ */
+static void
+send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
+
+ if (NULL == ai->ch->mq)
+ return;
+ env = GNUNET_MQ_msg_extra (aqm,
+ strlen (ai->address) + 1,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE);
+ aqm.receiver = qh->peer;
+ aqm.nt = htonl ((uint32_t) qh->nt);
+ aqm.qid = htonl (qh->qid);
+ memcpy (&aqm[1],
+ ai->address,
+ strlen (ai->address) + 1);
+ GNUNET_MQ_send (ai->ch->mq,
+ env);
+}
+
+
+/**
+ * Send message to the transport service about queue @a qh
+ * being no longer available.
+ *
+ * @param qh queue to delete
+ */
+static void
+send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
+
+ if (NULL == ai->ch->mq)
+ return;
+ env = GNUNET_MQ_msg (dqm,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE);
+ dqm.qid = htonl (qh->qid);
+ dqm.receiver = qh->peer;
+ GNUNET_MQ_send (ai->ch->mq,
+ env);
+}
+
+
+/**
+ * Disconnect from the transport service. Purges
+ * all flow control entries as we will no longer receive
+ * the ACKs. Purges the ack pending entries as the
+ * transport will no longer expect the confirmations.
+ *
+ * @param ch service to disconnect from
+ */
+static void
+disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
+{
+ struct FlowControl *fcn;
+ struct AckPending *apn;
+
+ for (struct FlowControl *fc = ch->fc_head;
+ NULL != fc;
+ fc = fcn)
+ {
+ fcn = fc->next;
+ GNUNET_CONTAINER_DLL_remove (ch->fc_head,
+ ch->fc_tail,
+ fc);
+ fc->cb (fc->cb_cls,
+ GNUNET_SYSERR);
+ GNUNET_free (fc);
+ }
+ for (struct AckPending *ap = ch->ap_head;
+ NULL != ap;
+ ap = apn)
+ {
+ apn = ap->next;
+ GNUNET_CONTAINER_DLL_remove (ch->ap_head,
+ ch->ap_tail,
+ ap);
+ GNUNET_free (ap);
+ }
+ if (NULL == ch->mq)
+ return;
+ GNUNET_MQ_destroy (ch->mq);
+ ch->mq = NULL;
+}
+
+
/**
* Function called on MQ errors.
*/
enum GNUNET_MQ_Error error)
{
struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "MQ failure, reconnecting to transport service.\n");
+ disconnect (ch);
+ /* TODO: maybe do this with exponential backoff/delay */
+ reconnect (ch);
+}
+
+
+/**
+ * Transport service acknowledged a message we gave it
+ * (with flow control enabled). Tell the communicator.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
+ * @param incoming_ack the ack
+ */
+static void
+handle_incoming_ack (void *cls,
+ struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
+{
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
- GNUNET_MQ_destroy (ch->mq);
- ch->mq = NULL;
+ for (struct FlowControl *fc = ch->fc_head;
+ NULL != fc;
+ fc = fc->next)
+ {
+ if ( (fc->id == incoming_ack->fc_id) &&
+ (0 == memcmp (&fc->sender,
+ incoming_ack->sender,
+ sizeof (struct GNUNET_PeerIdentity))) )
+ {
+ GNUNET_CONTAINER_DLL_remove (ch->fc_head,
+ ch->fc_tail,
+ fc);
+ fc->cb (fc->cb_cls,
+ GNUNET_OK);
+ GNUNET_free (fc);
+ return;
+ }
+ }
+ GNUNET_break (0);
+ disconnect (ch);
/* TODO: maybe do this with exponential backoff/delay */
reconnect (ch);
}
+/**
+ * Transport service wants us to create a queue. Check if @a cq
+ * is well-formed.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
+ * @param cq the queue creation request
+ * @return #GNUNET_OK if @a smt is well-formed
+ */
+static int
+check_create_queue (void *cls,
+ struct GNUNET_TRANSPORT_CreateQueue *cq)
+{
+ uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
+ const char *addr = (const char *) &cq[1];
+
+ if ( (0 == len) ||
+ ('\0' != addr[len-1]) )
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Transport service wants us to create a queue. Tell the communicator.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
+ * @param cq the queue creation request
+ */
+static void
+handle_create_queue (void *cls,
+ struct GNUNET_TRANSPORT_CreateQueue *cq)
+{
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
+ const char *addr = (const char *) &cq[1];
+
+ if (GNUNET_OK !=
+ ch->mq_init (ch->mq_init_cls,
+ &cq->receiver,
+ addr))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Address `%s' invalid for this communicator\n",
+ addr);
+ // TODO: do we notify the transport!?
+ }
+}
+
+
+/**
+ * Transport service wants us to send a message. Check if @a smt
+ * is well-formed.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
+ * @param smt the transmission request
+ * @return #GNUNET_OK if @a smt is well-formed
+ */
+static int
+check_send_msg (void *cls,
+ struct GNUNET_TRANSPORT_SendMessageTo *smt)
+{
+ uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
+ const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
+
+ if (ntohs (mh->size) != len)
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Notify transport service about @a status of a message with
+ * @a mid sent to @a receiver.
+ *
+ * @param ch handle
+ * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
+ * @param receiver which peer was the receiver
+ * @param mid message that the ack is about
+ */
+static void
+send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
+ int status,
+ const struct GNUNET_PeerIdentity *receiver,
+ uint64_t mid)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_TRANSPORT_SendMessageToAck *ack;
+
+ env = GNUNET_MQ_msg (ack,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
+ ack->status = htonl (GNUNET_OK);
+ ack->mid = ap->mid;
+ ack->receiver = ap->receiver;
+ GNUNET_MQ_send (ch->mq,
+ env);
+}
+
+
+/**
+ * Message queue transmission by communicator was successful,
+ * notify transport service.
+ *
+ * @param cls an `struct AckPending *`
+ */
+static void
+send_ack_cb (void *cls)
+{
+ struct AckPending *ap = cls;
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
+
+ GNUNET_CONTAINER_DLL_remove (ch->ap_head,
+ ch->ap_tail,
+ ap);
+ send_ack (ch,
+ GNUNET_OK,
+ &ap->receiver,
+ ap->mid);
+ GNUNET_free (ap);
+}
+
+
+/**
+ * Transport service wants us to send a message. Tell the communicator.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
+ * @param smt the transmission request
+ */
+static void
+handle_send_msg (void *cls,
+ struct GNUNET_TRANSPORT_SendMessageTo *smt)
+{
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
+ const struct GNUNET_MessageHeader *mh;
+ struct GNUNET_MQ_Envelope *env;
+ struct AckPending *ap;
+ struct QueueHandle *qh;
+
+ for (qh = ch->queue_head;NULL != qh; qh = qh->next)
+ if ( (qh->queue_id == smt->qid) &&
+ (0 == memcmp (&qh->peer,
+ &smt->target,
+ sizeof (struct GNUNET_PeerIdentity))) )
+ break;
+ if (NULL == qh)
+ {
+ /* queue is already gone, tell transport this one failed */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Transmission failed, queue no longer exists.\n");
+ send_ack (ch,
+ GNUNET_NO,
+ &smt->receiver,
+ smt->mid);
+ return;
+ }
+ ap = GNUNET_new (struct AckPending);
+ ap->ch = ch;
+ ap->receiver = smt->receiver;
+ ap->mid = smt->mid;
+ GNUNET_CONTAINER_DLL_insert (ch->ap_head,
+ cp->ap_tail,
+ ap);
+ mh = (const struct GNUNET_MessageHeader *) &smt[1];
+ env = GNUNET_MQ_msg_copy (mh);
+ GNUNET_MQ_notify_sent (env,
+ &send_ack_cb,
+ ap);
+ GNUNET_MQ_send (qh->mq,
+ env);
+}
+
+
/**
* (re)connect our communicator to the transport service
*
reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
{
struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_fixed_size (incoming_ack,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
+ struct GNUNET_TRANSPORT_IncomingMessageAck,
+ ch),
+ GNUNET_MQ_hd_var_size (create_queue,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE,
+ struct GNUNET_TRANSPORT_CreateQueue,
+ ch),
+ GNUNET_MQ_hd_var_size (send_msg,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
+ struct GNUNET_TRANSPORT_SendMessageTo,
+ ch),
GNUNET_MQ_handler_end()
};
handlers,
&error_handler,
ch);
- for (struct GNUNET_TRANSPORT_AddressIdentifier ai = ch->ai_head;
+ for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
NULL != ai;
ai = ai->next)
send_add_address (ai);
+ for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
+ NULL != qh;
+ qh = qh->next)
+ send_add_queue (qh);
}
ch->mq_init = mq_init;
ch->mq_init_cls = mq_init_cls;
reconnect (ch);
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_number (cfg,
+ name,
+ "MAX_QUEUE_LENGTH",
+ &ch->max_queue_length))
+ ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
if (NULL == ch->mq)
{
GNUNET_free (ch);
void
GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
{
+ disconnect (ch);
while (NULL != ch->ai_head)
{
GNUNET_break (0); /* communicator forgot to remove address, warn! */
GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
}
- GNUNET_MQ_destroy (ch->mq);
GNUNET_free (ch);
}
* @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
* immediately dropped due to memory limitations (communicator
* should try to apply back pressure),
- * #GNUNET_SYSERR if the message is ill formed and communicator
- * should try to reset stream
+ * #GNUNET_SYSERR if the message could not be delivered because
+ * the tranport service is not yet up
*/
int
GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
uint16_t msize;
if (NULL == ai->ch->mq)
- return;
+ return GNUNET_SYSERR;
+ if (NULL != cb)
+ {
+ struct FlowControl *fc;
+
+ im->fc_on = htonl (GNUNET_YES);
+ im->fc_id = ai->ch->fc_gen++;
+ fc = GNUNET_new (struct FlowControl);
+ fc->sender = *sender;
+ fc->id = im->fc_id;
+ fc->cb = cb;
+ fc->cb_cls = cb_cls;
+ GNUNET_CONTAINER_DLL_insert (ch->fc_head,
+ ch->fc_tail,
+ fc);
+ }
+ else
+ {
+ if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Dropping message: transprot is too slow, queue length %u exceeded\n",
+ ch->max_queue_length);
+ return GNUNET_NO;
+ }
+ }
+
msize = ntohs (msg->size);
env = GNUNET_MQ_msg_extra (im,
msize,
if (NULL == env)
{
GNUNET_break (0);
- return;
+ return GNUNET_SYSERR;
}
im->sender = *sender;
memcpy (&im[1],
msize);
GNUNET_MQ_send (ai->ch->mq,
env);
+ return GNUNET_OK;
}
/* ************************* Discovery *************************** */
-/**
- * Handle returned to identify the internal data structure the transport
- * API has created to manage a message queue to a particular peer.
- */
-struct GNUNET_TRANSPORT_QueueHandle
-{
-};
-
/**
* Notify transport service that an MQ became available due to an
enum GNUNET_ATS_Network_Type nt,
struct GNUNET_MQ_Handle *mq)
{
+ struct GNUNET_TRANSPORT_QueueHandle *qh;
+
+ qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
+ qh->ch = ch;
+ qh->peer = *peer;
+ qh->address = GNUNET_strdup (address);
+ qh->nt = nt;
+ qh->mq = mq;
+ qh->queue_id = ch->queue_gen++;
+ GNUNET_CONTAINER_DLL_insert (ch->queue_head,
+ ch->queue_tail,
+ qh);
+ send_add_queue (qh);
+ return qh;
}
void
GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
{
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
+
+ send_del_queue (qh);
+ GNUNET_CONTAINER_DLL_remove (ch->queue_head,
+ ch->queue_tail,
+ qh);
+ GNUNET_MQ_destroy (qh->mq);
+ GNUNET_free (qh->address);
+ GNUNET_free (qh);
}
-
-
/**
* Notify transport service about an address that this communicator
* provides for this peer.
struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
send_del_address (ai);
- GNUNET_free (ai->address);
GNUNET_CONTAINER_DLL_remove (ch->ai_head,
ch->ai_tail,
ai);
+ GNUNET_free (ai->address);
GNUNET_free (ai);
}