*/
struct AckPending *prev;
+ /**
+ * Communicator this entry belongs to.
+ */
+ struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
+
/**
* Which peer is this about?
*/
/**
* DLL of messages awaiting transmission confirmation (ack).
*/
- struct AckPending *ac_tail;
+ struct AckPending *ap_tail;
/**
* DLL of queues we offer.
*/
- struct QueueHandle *queue_head;
+ struct GNUNET_TRANSPORT_QueueHandle *queue_head;
/**
* DLL of queues we offer.
*/
- struct QueueHandle *queue_tail;
+ struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
/**
* Our configuration.
*/
void *mq_init_cls;
+ /**
+ * Queue to talk to the transport service.
+ */
+ struct GNUNET_MQ_Handle *mq;
+
/**
* Maximum permissable queue length.
*/
*/
struct GNUNET_TRANSPORT_QueueHandle
{
+
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_TRANSPORT_QueueHandle *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_TRANSPORT_QueueHandle *prev;
+
/**
* Handle this queue belongs to.
*/
env = GNUNET_MQ_msg_extra (aam,
strlen (ai->address) + 1,
GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
- aam->expiration = GNUNET_TIME_relative_to_nbo (ai->expiration);
+ aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
aam->nt = htonl ((uint32_t) ai->nt);
memcpy (&aam[1],
ai->address,
return;
env = GNUNET_MQ_msg (dam,
GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
- dam.aid = htonl (ai->aid);
+ dam->aid = htonl (ai->aid);
GNUNET_MQ_send (ai->ch->mq,
env);
}
struct GNUNET_MQ_Envelope *env;
struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
- if (NULL == ai->ch->mq)
+ if (NULL == qh->ch->mq)
return;
env = GNUNET_MQ_msg_extra (aqm,
- strlen (ai->address) + 1,
- GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE);
- aqm.receiver = qh->peer;
- aqm.nt = htonl ((uint32_t) qh->nt);
- aqm.qid = htonl (qh->qid);
+ strlen (qh->address) + 1,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
+ aqm->receiver = qh->peer;
+ aqm->nt = htonl ((uint32_t) qh->nt);
+ aqm->qid = htonl (qh->queue_id);
memcpy (&aqm[1],
- ai->address,
- strlen (ai->address) + 1);
- GNUNET_MQ_send (ai->ch->mq,
+ qh->address,
+ strlen (qh->address) + 1);
+ GNUNET_MQ_send (qh->ch->mq,
env);
}
struct GNUNET_MQ_Envelope *env;
struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
- if (NULL == ai->ch->mq)
+ if (NULL == qh->ch->mq)
return;
env = GNUNET_MQ_msg (dqm,
- GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE);
- dqm.qid = htonl (qh->qid);
- dqm.receiver = qh->peer;
- GNUNET_MQ_send (ai->ch->mq,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
+ dqm->qid = htonl (qh->queue_id);
+ dqm->receiver = qh->peer;
+ GNUNET_MQ_send (qh->ch->mq,
env);
}
struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "MQ failure, reconnecting to transport service.\n");
+ "MQ failure %d, reconnecting to transport service.\n",
+ error);
disconnect (ch);
/* TODO: maybe do this with exponential backoff/delay */
reconnect (ch);
*/
static void
handle_incoming_ack (void *cls,
- struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
+ const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
{
struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
{
if ( (fc->id == incoming_ack->fc_id) &&
(0 == memcmp (&fc->sender,
- incoming_ack->sender,
+ &incoming_ack->sender,
sizeof (struct GNUNET_PeerIdentity))) )
{
GNUNET_CONTAINER_DLL_remove (ch->fc_head,
*/
static int
check_create_queue (void *cls,
- struct GNUNET_TRANSPORT_CreateQueue *cq)
+ const struct GNUNET_TRANSPORT_CreateQueue *cq)
{
uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
const char *addr = (const char *) &cq[1];
+ (void) cls;
if ( (0 == len) ||
('\0' != addr[len-1]) )
{
*/
static void
handle_create_queue (void *cls,
- struct GNUNET_TRANSPORT_CreateQueue *cq)
+ const struct GNUNET_TRANSPORT_CreateQueue *cq)
{
struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
const char *addr = (const char *) &cq[1];
-
+ struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
+ struct GNUNET_MQ_Envelope *env;
+
if (GNUNET_OK !=
ch->mq_init (ch->mq_init_cls,
&cq->receiver,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Address `%s' invalid for this communicator\n",
addr);
- // TODO: do we notify the transport!?
+ env = GNUNET_MQ_msg (cqr,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
}
+ else
+ {
+ env = GNUNET_MQ_msg (cqr,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
+ }
+ cqr->request_id = cq->request_id;
+ GNUNET_MQ_send (ch->mq,
+ env);
}
*/
static int
check_send_msg (void *cls,
- struct GNUNET_TRANSPORT_SendMessageTo *smt)
+ const struct GNUNET_TRANSPORT_SendMessageTo *smt)
{
uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
+ (void) cls;
if (ntohs (mh->size) != len)
{
GNUNET_break (0);
env = GNUNET_MQ_msg (ack,
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
- ack->status = htonl (GNUNET_OK);
- ack->mid = ap->mid;
- ack->receiver = ap->receiver;
+ ack->status = htonl (status);
+ ack->mid = mid;
+ ack->receiver = *receiver;
GNUNET_MQ_send (ch->mq,
env);
}
*/
static void
handle_send_msg (void *cls,
- struct GNUNET_TRANSPORT_SendMessageTo *smt)
+ const struct GNUNET_TRANSPORT_SendMessageTo *smt)
{
struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
const struct GNUNET_MessageHeader *mh;
struct GNUNET_MQ_Envelope *env;
struct AckPending *ap;
- struct QueueHandle *qh;
+ struct GNUNET_TRANSPORT_QueueHandle *qh;
for (qh = ch->queue_head;NULL != qh; qh = qh->next)
if ( (qh->queue_id == smt->qid) &&
(0 == memcmp (&qh->peer,
- &smt->target,
+ &smt->receiver,
sizeof (struct GNUNET_PeerIdentity))) )
break;
if (NULL == qh)
ap->receiver = smt->receiver;
ap->mid = smt->mid;
GNUNET_CONTAINER_DLL_insert (ch->ap_head,
- cp->ap_tail,
+ ch->ap_tail,
ap);
mh = (const struct GNUNET_MessageHeader *) &smt[1];
env = GNUNET_MQ_msg_copy (mh);
struct GNUNET_TRANSPORT_IncomingMessageAck,
ch),
GNUNET_MQ_hd_var_size (create_queue,
- GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
struct GNUNET_TRANSPORT_CreateQueue,
ch),
GNUNET_MQ_hd_var_size (send_msg,
GNUNET_MQ_handler_end()
};
- ch->mq = GNUNET_CLIENT_connect (cfg,
+ ch->mq = GNUNET_CLIENT_connect (ch->cfg,
"transport",
handlers,
&error_handler,
struct GNUNET_TRANSPORT_IncomingMessage *im;
uint16_t msize;
- if (NULL == ai->ch->mq)
+ if (NULL == ch->mq)
return GNUNET_SYSERR;
- if (NULL != cb)
- {
- struct FlowControl *fc;
-
- im->fc_on = htonl (GNUNET_YES);
- im->fc_id = ai->ch->fc_gen++;
- fc = GNUNET_new (struct FlowControl);
- fc->sender = *sender;
- fc->id = im->fc_id;
- fc->cb = cb;
- fc->cb_cls = cb_cls;
- GNUNET_CONTAINER_DLL_insert (ch->fc_head,
- ch->fc_tail,
- fc);
- }
- else
+ if ( (NULL == cb) &&
+ (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) )
{
- if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Dropping message: transprot is too slow, queue length %u exceeded\n",
- ch->max_queue_length);
- return GNUNET_NO;
- }
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Dropping message: transprot is too slow, queue length %llu exceeded\n",
+ ch->max_queue_length);
+ return GNUNET_NO;
}
msize = ntohs (msg->size);
memcpy (&im[1],
msg,
msize);
- GNUNET_MQ_send (ai->ch->mq,
+ if (NULL != cb)
+ {
+ struct FlowControl *fc;
+
+ im->fc_on = htonl (GNUNET_YES);
+ im->fc_id = ch->fc_gen++;
+ fc = GNUNET_new (struct FlowControl);
+ fc->sender = *sender;
+ fc->id = im->fc_id;
+ fc->cb = cb;
+ fc->cb_cls = cb_cls;
+ GNUNET_CONTAINER_DLL_insert (ch->fc_head,
+ ch->fc_tail,
+ fc);
+ }
+ GNUNET_MQ_send (ch->mq,
env);
return GNUNET_OK;
}
ai->address = GNUNET_strdup (address);
ai->nt = nt;
ai->expiration = expiration;
- ai->aid = handle->aid_gen++;
- GNUNET_CONTAINER_DLL_insert (handle->ai_head,
- handle->ai_tail,
+ ai->aid = ch->aid_gen++;
+ GNUNET_CONTAINER_DLL_insert (ch->ai_head,
+ ch->ai_tail,
ai);
send_add_address (ai);
return ai;