/*
This file is part of GNUnet.
- Copyright (C) 2009-2013 GNUnet e.V.
+ Copyright (C) 2009-2013, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
};
-/**
- * Linked list of functions to call whenever our HELLO is updated.
- */
-struct GNUNET_TRANSPORT_GetHelloHandle
-{
-
- /**
- * This is a doubly linked list.
- */
- struct GNUNET_TRANSPORT_GetHelloHandle *next;
-
- /**
- * This is a doubly linked list.
- */
- struct GNUNET_TRANSPORT_GetHelloHandle *prev;
-
- /**
- * Transport handle.
- */
- struct GNUNET_TRANSPORT_Handle *handle;
-
- /**
- * Callback to call once we got our HELLO.
- */
- GNUNET_TRANSPORT_HelloUpdateCallback rec;
-
- /**
- * Task for calling the HelloUpdateCallback when we already have a HELLO
- */
- struct GNUNET_SCHEDULER_Task *notify_task;
-
- /**
- * Closure for @e rec.
- */
- void *rec_cls;
-
-};
-
-
-/**
- * Entry in linked list for all offer-HELLO requests.
- */
-struct GNUNET_TRANSPORT_OfferHelloHandle
-{
- /**
- * For the DLL.
- */
- struct GNUNET_TRANSPORT_OfferHelloHandle *prev;
-
- /**
- * For the DLL.
- */
- struct GNUNET_TRANSPORT_OfferHelloHandle *next;
-
- /**
- * Transport service handle we use for transmission.
- */
- struct GNUNET_TRANSPORT_Handle *th;
-
- /**
- * Transmission handle for this request.
- */
- struct GNUNET_TRANSPORT_TransmitHandle *tth;
-
- /**
- * Function to call once we are done.
- */
- GNUNET_SCHEDULER_TaskCallback cont;
-
- /**
- * Closure for @e cont
- */
- void *cls;
-
- /**
- * The HELLO message to be transmitted.
- */
- struct GNUNET_MessageHeader *msg;
-};
-
/**
* Handle for the transport service (includes all of the
*/
GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
- /**
- * Head of DLL of control messages.
- */
- struct GNUNET_TRANSPORT_TransmitHandle *control_head;
-
- /**
- * Tail of DLL of control messages.
- */
- struct GNUNET_TRANSPORT_TransmitHandle *control_tail;
-
/**
* The current HELLO message for this peer. Updated
* whenever transports change their addresses.
/**
* My client connection to the transport service.
*/
- struct GNUNET_CLIENT_Connection *client;
-
- /**
- * Handle to our registration with the client for notification.
- */
- struct GNUNET_CLIENT_TransmitHandle *cth;
-
- /**
- * Linked list of pending requests for our HELLO.
- */
- struct GNUNET_TRANSPORT_GetHelloHandle *hwl_head;
-
- /**
- * Linked list of pending requests for our HELLO.
- */
- struct GNUNET_TRANSPORT_GetHelloHandle *hwl_tail;
-
- /**
- * Linked list of pending offer HELLO requests head
- */
- struct GNUNET_TRANSPORT_OfferHelloHandle *oh_head;
-
- /**
- * Linked list of pending offer HELLO requests tail
- */
- struct GNUNET_TRANSPORT_OfferHelloHandle *oh_tail;
+ struct GNUNET_MQ_Handle *mq;
/**
* My configuration.
GNUNET_STRINGS_relative_time_to_string (delay,
GNUNET_NO));
GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
- n->hn, delay.rel_value_us);
+ n->hn,
+ delay.rel_value_us);
schedule_transmission (n->h);
}
/**
- * Function we use for handling incoming messages.
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_TRANSPORT_Handle *h = cls;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Error receiving from transport service, disconnecting temporarily.\n");
+ h->reconnecting = GNUNET_YES;
+ disconnect_and_schedule_reconnect (h);
+}
+
+
+/**
+ * Function we use for checking incoming HELLO messages.
*
* @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
- * @param msg message received, NULL on timeout or fatal error
+ * @param msg message received
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_hello (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_PeerIdentity me;
+
+ if (GNUNET_OK !=
+ GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
+ &me))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_i2s (&me));
+ return GNUNET_OK;
+}
+
+
+/**
+ * Function we use for handling incoming HELLO messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param msg message received
*/
static void
-demultiplexer (void *cls,
- const struct GNUNET_MessageHeader *msg)
+handle_hello (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_TRANSPORT_Handle *h = cls;
+
+ GNUNET_free_non_null (h->my_hello);
+ h->my_hello = GNUNET_copy_message (msg);
+}
+
+
+/**
+ * Function we use for handling incoming connect messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param cim message received
+ */
+static void
+handle_connect (void *cls,
+ const struct ConnectInfoMessage *cim)
+{
+ struct GNUNET_TRANSPORT_Handle *h = cls;
+ struct Neighbour *n;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving CONNECT message for `%s'.\n",
+ GNUNET_i2s (&cim->id));
+ n = neighbour_find (h, &cim->id);
+ if (NULL != n)
+ {
+ GNUNET_break (0);
+ h->reconnecting = GNUNET_YES;
+ disconnect_and_schedule_reconnect (h);
+ return;
+ }
+ n = neighbour_add (h,
+ &cim->id);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving CONNECT message for `%s' with quota %u\n",
+ GNUNET_i2s (&cim->id),
+ ntohl (cim->quota_out.value__));
+ GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
+ cim->quota_out);
+ if (NULL != h->nc_cb)
+ h->nc_cb (h->cls,
+ &n->id);
+}
+
+
+/**
+ * Function we use for handling incoming disconnect messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param dim message received
+ */
+static void
+handle_disconnect (void *cls,
+ const struct DisconnectInfoMessage *dim)
+{
+ struct GNUNET_TRANSPORT_Handle *h = cls;
+ struct Neighbour *n;
+
+ GNUNET_break (ntohl (dim->reserved) == 0);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving DISCONNECT message for `%s'.\n",
+ GNUNET_i2s (&dim->peer));
+ n = neighbour_find (h, &dim->peer);
+ if (NULL == n)
+ {
+ GNUNET_break (0);
+ h->reconnecting = GNUNET_YES;
+ disconnect_and_schedule_reconnect (h);
+ return;
+ }
+ neighbour_delete (h,
+ &dim->peer,
+ n);
+}
+
+
+/**
+ * Function we use for handling incoming send-ok messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param okm message received
+ */
+static void
+handle_send_ok (void *cls,
+ const struct SendOkMessage *okm)
{
struct GNUNET_TRANSPORT_Handle *h = cls;
- const struct DisconnectInfoMessage *dim;
- const struct ConnectInfoMessage *cim;
- const struct InboundMessage *im;
- const struct GNUNET_MessageHeader *imm;
- const struct SendOkMessage *okm;
- const struct QuotaSetMessage *qm;
- struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
- struct GNUNET_TRANSPORT_GetHelloHandle *next_hwl;
struct Neighbour *n;
- struct GNUNET_PeerIdentity me;
- uint16_t size;
uint32_t bytes_msg;
uint32_t bytes_physical;
- GNUNET_assert (NULL != h->client);
- if (GNUNET_YES == h->reconnecting)
+ bytes_msg = ntohl (okm->bytes_msg);
+ bytes_physical = ntohl (okm->bytes_physical);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving SEND_OK message, transmission to %s %s.\n",
+ GNUNET_i2s (&okm->peer),
+ ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
+
+ n = neighbour_find (h,
+ &okm->peer);
+ if (NULL == n)
{
+ /* We should never get a 'SEND_OK' for a peer that we are not
+ connected to */
+ GNUNET_break (0);
+ h->reconnecting = GNUNET_YES;
+ disconnect_and_schedule_reconnect (h);
return;
}
- if (NULL == msg)
+ if (bytes_physical > bytes_msg)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Error receiving from transport service, disconnecting temporarily.\n");
+ "Overhead for %u byte message was %u\n",
+ bytes_msg,
+ bytes_physical - bytes_msg);
+ n->traffic_overhead += bytes_physical - bytes_msg;
+ }
+ GNUNET_break (GNUNET_NO == n->is_ready);
+ n->is_ready = GNUNET_YES;
+ if (NULL != n->unready_warn_task)
+ {
+ GNUNET_SCHEDULER_cancel (n->unready_warn_task);
+ n->unready_warn_task = NULL;
+ }
+ if ((NULL != n->th) && (NULL == n->hn))
+ {
+ GNUNET_assert (NULL != n->th->timeout_task);
+ GNUNET_SCHEDULER_cancel (n->th->timeout_task);
+ n->th->timeout_task = NULL;
+ /* we've been waiting for this (congestion, not quota,
+ * caused delayed transmission) */
+ n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
+ n,
+ 0);
+ }
+ schedule_transmission (h);
+}
+
+
+/**
+ * Function we use for checking incoming "inbound" messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param im message received
+ */
+static int
+check_recv (void *cls,
+ const struct InboundMessage *im)
+{
+ const struct GNUNET_MessageHeader *imm;
+ uint16_t size;
+
+ size = ntohs (im->header.size);
+ if (size <
+ sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ imm = (const struct GNUNET_MessageHeader *) &im[1];
+ if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Function we use for handling incoming messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param im message received
+ */
+static void
+handle_recv (void *cls,
+ const struct InboundMessage *im)
+{
+ struct GNUNET_TRANSPORT_Handle *h = cls;
+ const struct GNUNET_MessageHeader *imm
+ = (const struct GNUNET_MessageHeader *) &im[1];
+ struct Neighbour *n;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u with %u bytes from `%s'.\n",
+ (unsigned int) ntohs (imm->type),
+ (unsigned int) ntohs (imm->size),
+ GNUNET_i2s (&im->peer));
+ n = neighbour_find (h, &im->peer);
+ if (NULL == n)
+ {
+ GNUNET_break (0);
h->reconnecting = GNUNET_YES;
disconnect_and_schedule_reconnect (h);
return;
}
- GNUNET_CLIENT_receive (h->client,
- &demultiplexer,
- h,
- GNUNET_TIME_UNIT_FOREVER_REL);
- size = ntohs (msg->size);
- switch (ntohs (msg->type))
- {
- case GNUNET_MESSAGE_TYPE_HELLO:
- if (GNUNET_OK !=
- GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
- &me))
- {
- GNUNET_break (0);
- break;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
- (unsigned int) size,
- GNUNET_i2s (&me));
- GNUNET_free_non_null (h->my_hello);
- h->my_hello = NULL;
- if (size < sizeof (struct GNUNET_MessageHeader))
- {
- GNUNET_break (0);
- break;
- }
- h->my_hello = GNUNET_copy_message (msg);
- hwl = h->hwl_head;
- while (NULL != hwl)
- {
- next_hwl = hwl->next;
- hwl->rec (hwl->rec_cls,
- h->my_hello);
- hwl = next_hwl;
- }
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
- if (size < sizeof (struct ConnectInfoMessage))
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- cim = (const struct ConnectInfoMessage *) msg;
- if (size !=
- sizeof (struct ConnectInfoMessage))
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving CONNECT message for `%s'.\n",
- GNUNET_i2s (&cim->id));
- n = neighbour_find (h, &cim->id);
- if (NULL != n)
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- n = neighbour_add (h,
- &cim->id);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving CONNECT message for `%s' with quota %u\n",
- GNUNET_i2s (&cim->id),
- ntohl (cim->quota_out.value__));
- GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
- cim->quota_out);
- if (NULL != h->nc_cb)
- h->nc_cb (h->cls,
- &n->id);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
- if (size != sizeof (struct DisconnectInfoMessage))
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- dim = (const struct DisconnectInfoMessage *) msg;
- GNUNET_break (ntohl (dim->reserved) == 0);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving DISCONNECT message for `%s'.\n",
- GNUNET_i2s (&dim->peer));
- n = neighbour_find (h, &dim->peer);
- if (NULL == n)
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- neighbour_delete (h,
- &dim->peer,
- n);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
- if (size != sizeof (struct SendOkMessage))
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- okm = (const struct SendOkMessage *) msg;
- bytes_msg = ntohl (okm->bytes_msg);
- bytes_physical = ntohl (okm->bytes_physical);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving SEND_OK message, transmission to %s %s.\n",
- GNUNET_i2s (&okm->peer),
- ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
+ if (NULL != h->rec)
+ h->rec (h->cls,
+ &im->peer,
+ imm);
+}
- n = neighbour_find (h,
- &okm->peer);
- if (NULL == n)
- {
- /* We should never get a 'SEND_OK' for a peer that we are not
- connected to */
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- if (bytes_physical > bytes_msg)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Overhead for %u byte message was %u\n",
- bytes_msg,
- bytes_physical - bytes_msg);
- n->traffic_overhead += bytes_physical - bytes_msg;
- }
- GNUNET_break (GNUNET_NO == n->is_ready);
- n->is_ready = GNUNET_YES;
- if (NULL != n->unready_warn_task)
- {
- GNUNET_SCHEDULER_cancel (n->unready_warn_task);
- n->unready_warn_task = NULL;
- }
- if ((NULL != n->th) && (NULL == n->hn))
- {
- GNUNET_assert (NULL != n->th->timeout_task);
- GNUNET_SCHEDULER_cancel (n->th->timeout_task);
- n->th->timeout_task = NULL;
- /* we've been waiting for this (congestion, not quota,
- * caused delayed transmission) */
- n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
- n,
- 0);
- }
- schedule_transmission (h);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
- if (size <
- sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- im = (const struct InboundMessage *) msg;
- imm = (const struct GNUNET_MessageHeader *) &im[1];
- if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u with %u bytes from `%s'.\n",
- (unsigned int) ntohs (imm->type),
- (unsigned int) ntohs (imm->size),
- GNUNET_i2s (&im->peer));
- n = neighbour_find (h, &im->peer);
- if (NULL == n)
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- if (NULL != h->rec)
- h->rec (h->cls,
- &im->peer,
- imm);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA:
- if (size != sizeof (struct QuotaSetMessage))
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- qm = (const struct QuotaSetMessage *) msg;
- n = neighbour_find (h, &qm->peer);
- if (NULL == n)
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving SET_QUOTA message for `%s' with quota %u\n",
- GNUNET_i2s (&qm->peer),
- ntohl (qm->quota.value__));
- GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
- qm->quota);
- break;
- default:
- LOG (GNUNET_ERROR_TYPE_ERROR,
- _("Received unexpected message of type %u in %s:%u\n"),
- ntohs (msg->type),
- __FILE__,
- __LINE__);
+
+/**
+ * Function we use for handling incoming set quota messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param msg message received
+ */
+static void
+handle_set_quota (void *cls,
+ const struct QuotaSetMessage *qm)
+{
+ struct GNUNET_TRANSPORT_Handle *h = cls;
+ struct Neighbour *n;
+
+ n = neighbour_find (h, &qm->peer);
+ if (NULL == n)
+ {
GNUNET_break (0);
- break;
+ h->reconnecting = GNUNET_YES;
+ disconnect_and_schedule_reconnect (h);
+ return;
}
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving SET_QUOTA message for `%s' with quota %u\n",
+ GNUNET_i2s (&qm->peer),
+ ntohl (qm->quota.value__));
+ GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
+ qm->quota);
}
/**
- * Transmit message(s) to service.
+ * Transmit ready message(s) to service.
*
- * @param cls handle to transport
- * @param size number of bytes available in @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
+ * @param h handle to transport
*/
-static size_t
-transport_notify_ready (void *cls,
- size_t size,
- void *buf)
+static void
+transmit_ready (struct GNUNET_TRANSPORT_Handle *h)
{
- struct GNUNET_TRANSPORT_Handle *h = cls;
struct GNUNET_TRANSPORT_TransmitHandle *th;
struct GNUNET_TIME_Relative delay;
struct Neighbour *n;
- char *cbuf;
- struct OutboundMessage obm;
- size_t ret;
- size_t nret;
+ struct OutboundMessage *obm;
+ struct GNUNET_MQ_Envelope *env;
size_t mret;
- GNUNET_assert (NULL != h->client);
- h->cth = NULL;
- if (NULL == buf)
- {
- /* transmission failed */
- disconnect_and_schedule_reconnect (h);
- return 0;
- }
-
- cbuf = buf;
- ret = 0;
- /* first send control messages */
- while ( (NULL != (th = h->control_head)) &&
- (th->notify_size <= size) )
- {
- GNUNET_CONTAINER_DLL_remove (h->control_head,
- h->control_tail,
- th);
- nret = th->notify (th->notify_cls,
- size,
- &cbuf[ret]);
- delay = GNUNET_TIME_absolute_get_duration (th->request_start);
- if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Added %u bytes of control message at %u after %s delay\n",
- nret,
- ret,
- GNUNET_STRINGS_relative_time_to_string (delay,
- GNUNET_YES));
- else
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Added %u bytes of control message at %u after %s delay\n",
- nret,
- ret,
- GNUNET_STRINGS_relative_time_to_string (delay,
- GNUNET_YES));
- GNUNET_free (th);
- ret += nret;
- size -= nret;
- }
-
- /* then, if possible and no control messages pending, send data messages */
- while ( (NULL == h->control_head) &&
- (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) )
+ GNUNET_assert (NULL != h->mq);
+ while (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
{
+ th = n->th;
if (GNUNET_YES != n->is_ready)
{
/* peer not ready, wait for notification! */
GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
n->hn = NULL;
GNUNET_assert (NULL == n->th->timeout_task);
- n->th->timeout_task
+ th->timeout_task
= GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
- (n->th->timeout),
+ (th->timeout),
&timeout_request_due_to_congestion,
- n->th);
+ th);
continue;
}
- th = n->th;
- if (th->notify_size + sizeof (struct OutboundMessage) > size)
- break; /* does not fit */
- if (GNUNET_BANDWIDTH_tracker_get_delay
- (&n->out_tracker,
- th->notify_size).rel_value_us > 0)
+ if (GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
+ th->notify_size).rel_value_us > 0)
break; /* too early */
GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
n->hn = NULL;
n->th = NULL;
- GNUNET_assert (size >= sizeof (struct OutboundMessage));
+ env = GNUNET_MQ_msg_extra (obm,
+ th->notify_size,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
mret = th->notify (th->notify_cls,
- size - sizeof (struct OutboundMessage),
- &cbuf[ret + sizeof (struct OutboundMessage)]);
- GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
+ th->notify_size,
+ &obm[1]);
if (0 == mret)
{
GNUNET_free (th);
+ GNUNET_MQ_discard (env);
continue;
}
if (NULL != n->unready_warn_task)
n);
n->last_payload = GNUNET_TIME_absolute_get ();
n->is_ready = GNUNET_NO;
- GNUNET_assert (mret + sizeof (struct OutboundMessage) <
- GNUNET_SERVER_MAX_MESSAGE_SIZE);
- obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
- obm.header.size = htons (mret + sizeof (struct OutboundMessage));
- obm.reserved = htonl (0);
- obm.timeout =
+ obm->reserved = htonl (0);
+ obm->timeout =
GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
(th->timeout));
- obm.peer = n->id;
- memcpy (&cbuf[ret],
- &obm,
- sizeof (struct OutboundMessage));
- ret += (mret + sizeof (struct OutboundMessage));
- size -= (mret + sizeof (struct OutboundMessage));
+ obm->peer = n->id;
+ GNUNET_MQ_send (h->mq,
+ env);
GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
mret);
delay = GNUNET_TIME_absolute_get_duration (th->request_start);
GNUNET_YES),
(unsigned int) n->out_tracker.available_bytes_per_s__);
GNUNET_free (th);
- break;
}
/* if there are more pending messages, try to schedule those */
schedule_transmission (h);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting %u bytes to transport service\n",
- ret);
- return ret;
}
schedule_transmission_task (void *cls)
{
struct GNUNET_TRANSPORT_Handle *h = cls;
- size_t size;
struct GNUNET_TRANSPORT_TransmitHandle *th;
struct Neighbour *n;
h->quota_task = NULL;
- GNUNET_assert (NULL != h->client);
+ GNUNET_assert (NULL != h->mq);
/* destroy all requests that have timed out */
while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
(0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) )
NULL));
GNUNET_free (th);
}
- if (NULL != h->cth)
- return;
- if (NULL != h->control_head)
- {
- size = h->control_head->notify_size;
- }
- else
- {
- n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
- if (NULL == n)
- return; /* no pending messages */
- size = n->th->notify_size + sizeof (struct OutboundMessage);
- }
+ n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
+ if (NULL == n)
+ return; /* no pending messages */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Calling notify_transmit_ready\n");
- h->cth
- = GNUNET_CLIENT_notify_transmit_ready (h->client,
- size,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO,
- &transport_notify_ready,
- h);
- GNUNET_assert (NULL != h->cth);
+ transmit_ready (h);
}
struct GNUNET_TIME_Relative delay;
struct Neighbour *n;
- GNUNET_assert (NULL != h->client);
+ GNUNET_assert (NULL != h->mq);
if (NULL != h->quota_task)
{
GNUNET_SCHEDULER_cancel (h->quota_task);
h->quota_task = NULL;
}
- if (NULL != h->control_head)
- delay = GNUNET_TIME_UNIT_ZERO;
- else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
+ if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
{
delay =
GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
}
-/**
- * Queue control request for transmission to the transport
- * service.
- *
- * @param h handle to the transport service
- * @param size number of bytes to be transmitted
- * @param notify function to call to get the content
- * @param notify_cls closure for @a notify
- * @return a `struct GNUNET_TRANSPORT_TransmitHandle`
- */
-static struct GNUNET_TRANSPORT_TransmitHandle *
-schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
- size_t size,
- GNUNET_TRANSPORT_TransmitReadyNotify notify,
- void *notify_cls)
-{
- struct GNUNET_TRANSPORT_TransmitHandle *th;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Control transmit of %u bytes requested\n",
- size);
- th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
- th->notify = notify;
- th->notify_cls = notify_cls;
- th->notify_size = size;
- th->request_start = GNUNET_TIME_absolute_get ();
- GNUNET_CONTAINER_DLL_insert_tail (h->control_head,
- h->control_tail,
- th);
- schedule_transmission (h);
- return th;
-}
-
-
-/**
- * Transmit START message to service.
- *
- * @param cls unused
- * @param size number of bytes available in @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
- */
-static size_t
-send_start (void *cls,
- size_t size,
- void *buf)
-{
- struct GNUNET_TRANSPORT_Handle *h = cls;
- struct StartMessage s;
- uint32_t options;
-
- if (NULL == buf)
- {
- /* Can only be shutdown, just give up */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Shutdown while trying to transmit START request.\n");
- return 0;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting START request.\n");
- GNUNET_assert (size >= sizeof (struct StartMessage));
- s.header.size = htons (sizeof (struct StartMessage));
- s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
- options = 0;
- if (h->check_self)
- options |= 1;
- if (NULL != h->rec)
- options |= 2;
- s.options = htonl (options);
- s.self = h->self;
- memcpy (buf, &s, sizeof (struct StartMessage));
- GNUNET_CLIENT_receive (h->client, &demultiplexer, h,
- GNUNET_TIME_UNIT_FOREVER_REL);
- return sizeof (struct StartMessage);
-}
-
-
/**
* Try again to connect to transport service.
*
static void
reconnect (void *cls)
{
+ GNUNET_MQ_hd_var_size (hello,
+ GNUNET_MESSAGE_TYPE_HELLO,
+ struct GNUNET_MessageHeader);
+ GNUNET_MQ_hd_fixed_size (connect,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
+ struct ConnectInfoMessage);
+ GNUNET_MQ_hd_fixed_size (disconnect,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
+ struct DisconnectInfoMessage);
+ GNUNET_MQ_hd_fixed_size (send_ok,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
+ struct SendOkMessage);
+ GNUNET_MQ_hd_var_size (recv,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
+ struct InboundMessage);
+ GNUNET_MQ_hd_fixed_size (set_quota,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
+ struct QuotaSetMessage);
struct GNUNET_TRANSPORT_Handle *h = cls;
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_hello_handler (h),
+ make_connect_handler (h),
+ make_disconnect_handler (h),
+ make_send_ok_handler (h),
+ make_recv_handler (h),
+ make_set_quota_handler (h),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MQ_Envelope *env;
+ struct StartMessage *s;
+ uint32_t options;
h->reconnect_task = NULL;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Connecting to transport service.\n");
- GNUNET_assert (NULL == h->client);
- GNUNET_assert (NULL == h->control_head);
- GNUNET_assert (NULL == h->control_tail);
+ GNUNET_assert (NULL == h->mq);
h->reconnecting = GNUNET_NO;
- h->client = GNUNET_CLIENT_connect ("transport", h->cfg);
-
- GNUNET_assert (NULL != h->client);
- schedule_control_transmit (h, sizeof (struct StartMessage),
- &send_start, h);
+ h->mq = GNUNET_CLIENT_connecT (h->cfg,
+ "transport",
+ handlers,
+ &mq_error_handler,
+ h);
+ if (NULL == h->mq)
+ return;
+ env = GNUNET_MQ_msg (s,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_START);
+ options = 0;
+ if (h->check_self)
+ options |= 1;
+ if (NULL != h->rec)
+ options |= 2;
+ s->options = htonl (options);
+ s->self = h->self;
+ GNUNET_MQ_send (h->mq,
+ env);
}
static void
disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
{
- struct GNUNET_TRANSPORT_TransmitHandle *th;
-
GNUNET_assert (NULL == h->reconnect_task);
- if (NULL != h->cth)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
- h->cth = NULL;
- }
- if (NULL != h->client)
+ if (NULL != h->mq)
{
- GNUNET_CLIENT_disconnect (h->client);
- h->client = NULL;
-/* LOG (GNUNET_ERROR_TYPE_ERROR,
- "Client disconnect done \n");*/
+ GNUNET_MQ_destroy (h->mq);
+ h->mq = NULL;
}
/* Forget about all neighbours that we used to be connected to */
GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
GNUNET_SCHEDULER_cancel (h->quota_task);
h->quota_task = NULL;
}
- while ((NULL != (th = h->control_head)))
- {
- GNUNET_CONTAINER_DLL_remove (h->control_head,
- h->control_tail,
- th);
- th->notify (th->notify_cls,
- 0,
- NULL);
- GNUNET_free (th);
- }
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Scheduling task to reconnect to transport service in %s.\n",
GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
/**
- * Cancel control request for transmission to the transport service.
- *
- * @param th handle to the transport service
- * @param tth transmit handle to cancel
- */
-static void
-cancel_control_transmit (struct GNUNET_TRANSPORT_Handle *th,
- struct GNUNET_TRANSPORT_TransmitHandle *tth)
-{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Canceling transmit of contral transmission requested\n");
- GNUNET_CONTAINER_DLL_remove (th->control_head,
- th->control_tail,
- tth);
- GNUNET_free (tth);
-}
-
-
-/**
- * Send HELLO message to the service.
- *
- * @param cls the HELLO message to send
- * @param size number of bytes available in @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
- */
-static size_t
-send_hello (void *cls,
- size_t size,
- void *buf)
-{
- struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls;
- struct GNUNET_MessageHeader *msg = ohh->msg;
- uint16_t ssize;
-
- if (NULL == buf)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout while trying to transmit `%s' request.\n",
- "HELLO");
- if (NULL != ohh->cont)
- ohh->cont (ohh->cls);
- GNUNET_free (msg);
- GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
- ohh->th->oh_tail,
- ohh);
- GNUNET_free (ohh);
- return 0;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting `%s' request.\n",
- "HELLO");
- ssize = ntohs (msg->size);
- GNUNET_assert (size >= ssize);
- memcpy (buf,
- msg,
- ssize);
- GNUNET_free (msg);
- if (NULL != ohh->cont)
- ohh->cont (ohh->cls);
- GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
- ohh->th->oh_tail,
- ohh);
- GNUNET_free (ohh);
- return ssize;
-}
-
-
-/**
- * Send traffic metric message to the service.
- *
- * @param cls the message to send
- * @param size number of bytes available in @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
- */
-static size_t
-send_metric (void *cls,
- size_t size,
- void *buf)
-{
- struct TrafficMetricMessage *msg = cls;
- uint16_t ssize;
-
- if (NULL == buf)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout while trying to transmit TRAFFIC_METRIC request.\n");
- GNUNET_free (msg);
- return 0;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting TRAFFIC_METRIC request.\n");
- ssize = ntohs (msg->header.size);
- GNUNET_assert (size >= ssize);
- memcpy (buf, msg, ssize);
- GNUNET_free (msg);
- return ssize;
-}
-
-
-/**
- * Set transport metrics for a peer and a direction
+ * Set transport metrics for a peer and a direction.
*
* @param handle transport handle
* @param peer the peer to set the metric for
struct GNUNET_TIME_Relative delay_in,
struct GNUNET_TIME_Relative delay_out)
{
+ struct GNUNET_MQ_Envelope *env;
struct TrafficMetricMessage *msg;
- msg = GNUNET_new (struct TrafficMetricMessage);
- msg->header.size = htons (sizeof (struct TrafficMetricMessage));
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
+ if (NULL == handle->mq)
+ return;
+ env = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
msg->reserved = htonl (0);
msg->peer = *peer;
GNUNET_ATS_properties_hton (&msg->properties,
prop);
msg->delay_in = GNUNET_TIME_relative_hton (delay_in);
msg->delay_out = GNUNET_TIME_relative_hton (delay_out);
- schedule_control_transmit (handle,
- sizeof (struct TrafficMetricMessage),
- &send_metric,
- msg);
-}
-
-
-/**
- * Offer the transport service the HELLO of another peer. Note that
- * the transport service may just ignore this message if the HELLO is
- * malformed or useless due to our local configuration.
- *
- * @param handle connection to transport service
- * @param hello the hello message
- * @param cont continuation to call when HELLO has been sent,
- * tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
- * tc reasong #GNUNET_SCHEDULER_REASON_READ_READY for success
- * @param cont_cls closure for @a cont
- * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on failure,
- * in case of failure @a cont will not be called
- *
- */
-struct GNUNET_TRANSPORT_OfferHelloHandle *
-GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
- const struct GNUNET_MessageHeader *hello,
- GNUNET_SCHEDULER_TaskCallback cont,
- void *cont_cls)
-{
- struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
- struct GNUNET_MessageHeader *msg;
- struct GNUNET_PeerIdentity peer;
- uint16_t size;
-
- if (NULL == handle->client)
- return NULL;
- GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
- size = ntohs (hello->size);
- GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
- if (GNUNET_OK !=
- GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello,
- &peer))
- {
- GNUNET_break (0);
- return NULL;
- }
-
- msg = GNUNET_malloc (size);
- memcpy (msg, hello, size);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Offering HELLO message of `%s' to transport for validation.\n",
- GNUNET_i2s (&peer));
-
- ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
- ohh->th = handle;
- ohh->cont = cont;
- ohh->cls = cont_cls;
- ohh->msg = msg;
- ohh->tth = schedule_control_transmit (handle,
- size,
- &send_hello,
- ohh);
- GNUNET_CONTAINER_DLL_insert (handle->oh_head,
- handle->oh_tail,
- ohh);
- return ohh;
-}
-
-
-/**
- * Cancel the request to transport to offer the HELLO message
- *
- * @param ohh the handle for the operation to cancel
- */
-void
-GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh)
-{
- struct GNUNET_TRANSPORT_Handle *th = ohh->th;
-
- cancel_control_transmit (ohh->th, ohh->tth);
- GNUNET_CONTAINER_DLL_remove (th->oh_head,
- th->oh_tail,
- ohh);
- GNUNET_free (ohh->msg);
- GNUNET_free (ohh);
+ GNUNET_MQ_send (handle->mq,
+ env);
}
}
-/**
- * Task to call the HelloUpdateCallback of the GetHelloHandle
- *
- * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle`
- */
-static void
-call_hello_update_cb_async (void *cls)
-{
- struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
-
- GNUNET_assert (NULL != ghh->handle->my_hello);
- GNUNET_assert (NULL != ghh->notify_task);
- ghh->notify_task = NULL;
- ghh->rec (ghh->rec_cls,
- ghh->handle->my_hello);
-}
-
-
-/**
- * Obtain the HELLO message for this peer. The callback given in this function
- * is never called synchronously.
- *
- * @param handle connection to transport service
- * @param rec function to call with the HELLO, sender will be our peer
- * identity; message and sender will be NULL on timeout
- * (handshake with transport service pending/failed).
- * cost estimate will be 0.
- * @param rec_cls closure for @a rec
- * @return handle to cancel the operation
- */
-struct GNUNET_TRANSPORT_GetHelloHandle *
-GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
- GNUNET_TRANSPORT_HelloUpdateCallback rec,
- void *rec_cls)
-{
- struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
-
- hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
- hwl->rec = rec;
- hwl->rec_cls = rec_cls;
- hwl->handle = handle;
- GNUNET_CONTAINER_DLL_insert (handle->hwl_head,
- handle->hwl_tail,
- hwl);
- if (NULL != handle->my_hello)
- hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async,
- hwl);
- return hwl;
-}
-
-
-/**
- * Stop receiving updates about changes to our HELLO message.
- *
- * @param ghh handle to cancel
- */
-void
-GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
-{
- struct GNUNET_TRANSPORT_Handle *handle = ghh->handle;
-
- if (NULL != ghh->notify_task)
- GNUNET_SCHEDULER_cancel (ghh->notify_task);
- GNUNET_CONTAINER_DLL_remove (handle->hwl_head,
- handle->hwl_tail,
- ghh);
- GNUNET_free (ghh);
-}
-
-
/**
* Connect to the transport service. Note that the connection may
* complete (or fail) asynchronously.
GNUNET_TRANSPORT_NotifyDisconnect nd,
GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
{
- struct GNUNET_TRANSPORT_Handle *ret;
+ struct GNUNET_TRANSPORT_Handle *h;
- ret = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
+ h = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
if (NULL != self)
{
- ret->self = *self;
- ret->check_self = GNUNET_YES;
+ h->self = *self;
+ h->check_self = GNUNET_YES;
}
- ret->cfg = cfg;
- ret->cls = cls;
- ret->rec = rec;
- ret->nc_cb = nc;
- ret->nd_cb = nd;
- ret->neb_cb = neb;
- ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
+ h->cfg = cfg;
+ h->cls = cls;
+ h->rec = rec;
+ h->nc_cb = nc;
+ h->nd_cb = nd;
+ h->neb_cb = neb;
+ h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Connecting to transport service.\n");
- ret->client = GNUNET_CLIENT_connect ("transport",
- cfg);
- if (NULL == ret->client)
+ reconnect (h);
+ if (NULL == h->mq)
{
- GNUNET_free (ret);
+ GNUNET_free (h);
return NULL;
}
- ret->neighbours =
+ h->neighbours =
GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
GNUNET_YES);
- ret->ready_heap =
+ h->ready_heap =
GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- schedule_control_transmit (ret,
- sizeof (struct StartMessage),
- &send_start,
- ret);
- return ret;
+ return h;
}
}
GNUNET_free_non_null (handle->my_hello);
handle->my_hello = NULL;
- GNUNET_assert (NULL == handle->hwl_head);
- GNUNET_assert (NULL == handle->hwl_tail);
GNUNET_CONTAINER_heap_destroy (handle->ready_heap);
handle->ready_heap = NULL;
GNUNET_free (handle);