*/
#define OFFER_HELLO_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
+/**
+ * After how long do we give automatically retry an unsuccessful
+ * CONNECT request?
+ */
+#define CONNECT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 750)
+
/**
* How long should ARM wait when starting up the
* transport service before reporting back?
*/
struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle;
-
/**
* Identity of this neighbour.
*/
uint64_t last_sent;
/**
- * Global quota for outbound traffic to the neighbour in bytes/ms.
+ * Quota for outbound traffic to the neighbour in bytes/ms.
*/
uint32_t quota_out;
* Function to call when notify_size bytes are available
* for transmission.
*/
- GNUNET_NETWORK_TransmitReadyNotify notify;
+ GNUNET_CONNECTION_TransmitReadyNotify notify;
/**
* Closure for notify.
void *notify_cls;
/**
- * transmit_ready task Id. The task is used to introduce
- * the artificial delay that may be required to maintain
- * the bandwidth limits.
+ * transmit_ready task Id. The task is used to introduce the
+ * artificial delay that may be required to maintain the bandwidth
+ * limits. Later, this will be the ID of the "transmit_timeout"
+ * task which is used to signal a timeout if the transmission could
+ * not be done in a timely fashion.
*/
GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
*/
size_t notify_size;
+ /**
+ * How important is this message?
+ */
+ unsigned int priority;
+
};
/**
* Handle to our registration with the client for notification.
*/
- struct GNUNET_NETWORK_TransmitHandle *network_handle;
+ struct GNUNET_CONNECTION_TransmitHandle *network_handle;
/**
* Linked list of transmit handles that are waiting for the
/**
* My configuration.
*/
- struct GNUNET_CONFIGURATION_Handle *cfg;
+ const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
* Linked list of the current neighbours of this peer.
size_t ret;
char *cbuf;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Ready to transmit %u bytes to transport service\n", size);
-#endif
h->network_handle = NULL;
h->transmission_scheduled = GNUNET_NO;
if (buf == NULL)
{
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Could not transmit to transport service, cancelling pending requests\n");
+#endif
th = h->connect_ready_head;
if (th->next != NULL)
th->next->prev = NULL;
GNUNET_assert (n->transmit_handle == th);
n->transmit_handle = NULL;
}
+ if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (h->sched,
+ th->notify_delay_task);
+ th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ }
GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
GNUNET_free (th);
+ if (h->connect_ready_head != NULL) schedule_transmission (h); /* FIXME: is this ok? */
return 0;
- }
+ }
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Ready to transmit %u bytes to transport service\n", size);
+#endif
cbuf = buf;
ret = 0;
h->network_handle = NULL;
do
{
th = h->connect_ready_head;
+ if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (h->sched,
+ th->notify_delay_task);
+ th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ }
GNUNET_assert (th->notify_size <= size);
if (th->next != NULL)
th->next->prev = NULL;
GNUNET_assert (NULL == h->network_handle);
if (h->client == NULL)
{
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Not yet connected to transport service, need to wait.\n");
-#endif
- return;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Could not yet schedule transmission: we are not yet connected to the transport service!\n");
+ return; /* not yet connected */
}
th = h->connect_ready_head;
if (th == NULL)
+ return; /* no request pending */
+ if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
{
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Schedule transmission called, but no request is pending.\n");
-#endif
- return;
+ /* remove existing time out task, will be integrated
+ with transmit_ready notification! */
+ GNUNET_SCHEDULER_cancel (h->sched,
+ th->notify_delay_task);
+ th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
}
h->transmission_scheduled = GNUNET_YES;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Asking client API for transmission of %u bytes\n",
- th->notify_size);
-#endif
h->network_handle = GNUNET_CLIENT_notify_transmit_ready (h->client,
th->notify_size,
GNUNET_TIME_absolute_get_remaining
}
+/**
+ * Cancel a pending notify delay task (if pending) and also remove the
+ * given transmit handle from whatever list is on.
+ *
+ * @param th handle for the transmission request to manipulate
+ */
+static void
+remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
+{
+ struct GNUNET_TRANSPORT_Handle *h;
+
+ h = th->handle;
+ if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
+ th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (th->prev == NULL)
+ {
+ if (th == h->connect_wait_head)
+ h->connect_wait_head = th->next;
+ else
+ h->connect_ready_head = th->next;
+ }
+ else
+ {
+ th->prev->next = th->next;
+ }
+ if (th->next != NULL)
+ th->next->prev = th->prev;
+}
+
+
+/**
+ * Schedule a request to connect to the given
+ * neighbour (and if successful, add the specified
+ * handle to the wait list).
+ *
+ * @param th handle for a request to transmit once we
+ * have connected
+ */
+static void
+try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th);
+
+
+/**
+ * Called when our transmit request timed out before any transport
+ * reported success connecting to the desired peer or before the
+ * transport was ready to receive. Signal error and free
+ * TransmitHandle.
+ */
+static void
+peer_transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
+
+ th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ if (th->neighbour != NULL)
+ th->neighbour->transmit_handle = NULL;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Request for transmission to peer `%s' timed out.\n",
+ GNUNET_i2s(&th->target));
+#endif
+ remove_from_any_list (th);
+ th->notify (th->notify_cls, 0, NULL);
+ GNUNET_free (th);
+}
+
+
+
+
/**
* Queue control request for transmission to the transport
* service.
size_t size,
int at_head,
struct GNUNET_TIME_Relative timeout,
- GNUNET_NETWORK_TransmitReadyNotify notify,
+ GNUNET_CONNECTION_TransmitReadyNotify notify,
void *notify_cls)
{
struct GNUNET_TRANSPORT_TransmitHandle *th;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing %u bytes control transmission request.\n", size);
-#endif
+
th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
th->handle = h;
th->notify = notify;
th->notify_cls = notify_cls;
th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
th->notify_size = size;
+ th->notify_delay_task
+ = GNUNET_SCHEDULER_add_delayed (h->sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_KEEP,
+ GNUNET_SCHEDULER_NO_TASK,
+ timeout,
+ &peer_transmit_timeout, th);
if (at_head)
{
th->next = h->connect_ready_head;
struct HelloWaitList *pos;
struct HelloWaitList *prev;
+ hwl->task = GNUNET_SCHEDULER_NO_TASK;
+ if (GNUNET_TIME_absolute_get_remaining (hwl->timeout).value > 0)
+ {
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ _("First attempt to obtain `%s' from transport service failed, will try again for %llums.\n"),
+ "HELLO",
+ GNUNET_TIME_absolute_get_remaining (hwl->timeout).value);
+#endif
+ hwl->task = GNUNET_SCHEDULER_add_delayed (hwl->handle->sched,
+ GNUNET_YES,
+ GNUNET_SCHEDULER_PRIORITY_KEEP,
+ GNUNET_SCHEDULER_NO_TASK,
+ GNUNET_TIME_absolute_get_remaining (hwl->timeout),
+ &hello_wait_timeout, hwl);
+ return;
+ }
+ /* signal timeout */
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Timeout trying to obtain `%s' from transport service.\n"),
+ "HELLO");
prev = NULL;
pos = hwl->handle->hwl_head;
while (pos != hwl)
hwl->handle->hwl_head = hwl->next;
else
prev->next = hwl->next;
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Timeout trying to obtain `%s' from transport service.\n"),
- "HELLO");
- /* signal timeout */
if (hwl->rec != NULL)
hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL);
GNUNET_free (hwl);
hwl->task = GNUNET_SCHEDULER_add_delayed (handle->sched,
GNUNET_YES,
GNUNET_SCHEDULER_PRIORITY_KEEP,
- GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
+ GNUNET_SCHEDULER_NO_TASK,
timeout,
&hello_wait_timeout, hwl);
return;
if (buf == NULL)
{
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Timeout while trying to transmit `%s' request.\n",
"HELLO");
#endif
if (handle->client == NULL)
{
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Not connected to transport service, dropping offered HELLO\n");
#endif
return;
struct GNUNET_MessageHeader *s = buf;
if (buf == NULL)
- return 0;
+ {
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout while trying to transmit `%s' request.\n",
+ "START");
+#endif
+ return 0;
+ }
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmitting `%s' request.\n", "START");
}
+/**
+ * We're ready to transmit the request that the transport service
+ * should connect to a new peer. In addition to sending the
+ * request, schedule the next phase for the transmission processing
+ * that caused the connect request in the first place.
+ */
+static size_t
+request_connect (void *cls, size_t size, void *buf)
+{
+ struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
+ struct TryConnectMessage *tcm;
+ struct GNUNET_TRANSPORT_Handle *h;
+
+ GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
+ h = th->handle;
+ if (buf == NULL)
+ {
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to transmit `%s' request for `%4s' to service.\n",
+ "TRY_CONNECT",
+ GNUNET_i2s(&th->target));
+#endif
+ if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
+ th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ th->notify (th->notify_cls, 0, NULL);
+ GNUNET_free (th);
+ return 0;
+ }
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitting `%s' message for `%4s'.\n",
+ "TRY_CONNECT", GNUNET_i2s (&th->target));
+#endif
+ GNUNET_assert (size >= sizeof (struct TryConnectMessage));
+ tcm = buf;
+ tcm->header.size = htons (sizeof (struct TryConnectMessage));
+ tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT);
+ tcm->reserved = htonl (0);
+ memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity));
+ th->notify_delay_task
+ = GNUNET_SCHEDULER_add_delayed (h->sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_KEEP,
+ GNUNET_SCHEDULER_NO_TASK,
+ GNUNET_TIME_absolute_get_remaining
+ (th->timeout),
+ &peer_transmit_timeout, th);
+ insert_transmit_handle (&h->connect_wait_head, th);
+ return sizeof (struct TryConnectMessage);
+}
+
+
+/**
+ * Schedule a request to connect to the given
+ * neighbour (and if successful, add the specified
+ * handle to the wait list).
+ *
+ * @param th handle for a request to transmit once we
+ * have connected
+ */
+static void
+try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th)
+{
+ GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
+ schedule_control_transmit (th->handle,
+ sizeof (struct TryConnectMessage),
+ GNUNET_NO,
+ GNUNET_TIME_absolute_get_remaining (th->timeout),
+ &request_connect, th);
+}
+
+
+/**
+ * Task for delayed attempts to reconnect to a peer.
+ *
+ * @param cls must be a transmit handle that determines the peer
+ * to which we will try to connect
+ * @param tc scheduler information about why we were triggered (not used)
+ */
+static void
+try_connect_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
+
+ th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ try_connect (th);
+}
+
+
+/**
+ * Remove neighbour from our list. Will automatically
+ * trigger a re-connect attempt if we have messages pending
+ * for this peer.
+ *
+ * @param h our state
+ * @param peer the peer to remove
+ */
+static void
+remove_neighbour (struct GNUNET_TRANSPORT_Handle *h,
+ const struct GNUNET_PeerIdentity *peer)
+{
+ struct NeighbourList *prev;
+ struct NeighbourList *pos;
+ struct GNUNET_TRANSPORT_TransmitHandle *th;
+
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Removing neighbour `%s' from list of connected peers.\n",
+ GNUNET_i2s (peer));
+#endif
+ prev = NULL;
+ pos = h->neighbours;
+ while ((pos != NULL) &&
+ (0 != memcmp (peer,
+ &pos->id,
+ sizeof (struct GNUNET_PeerIdentity))))
+ {
+ prev = pos;
+ pos = pos->next;
+ }
+ if (pos == NULL)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ if (prev == NULL)
+ h->neighbours = pos->next;
+ else
+ prev->next = pos->next;
+ if (NULL != (th = pos->transmit_handle))
+ {
+ pos->transmit_handle = NULL;
+ th->neighbour = NULL;
+ remove_from_any_list (th);
+ if (GNUNET_TIME_absolute_get_remaining (th->timeout).value <= CONNECT_RETRY_TIMEOUT.value)
+ {
+ /* signal error */
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task);
+ peer_transmit_timeout (th, NULL);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Connection with `%4s' failed, will keep trying for %llu ms to deliver message\n"),
+ GNUNET_i2s (peer),
+ GNUNET_TIME_absolute_get_remaining (th->timeout).value);
+ /* try again in a bit */
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task);
+ th->notify_delay_task
+ = GNUNET_SCHEDULER_add_delayed (h->sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_KEEP,
+ GNUNET_SCHEDULER_NO_TASK,
+ CONNECT_RETRY_TIMEOUT,
+ &try_connect_task,
+ th);
+ }
+ }
+ if (h->nc_cb != NULL)
+ h->nd_cb (h->cls, peer);
+ GNUNET_free (pos);
+}
+
+
/**
* Try again to connect to transport service.
*/
struct GNUNET_TRANSPORT_TransmitHandle *pos;
struct NeighbourList *n;
+ fprintf (stderr,
+ "Trying to reconnect to transport!\n");
+
+ /* Forget about all neighbours that we used to be connected
+ to */
while (NULL != (n = h->neighbours))
- {
- h->neighbours = n->next;
- pos = n->transmit_handle;
- if (pos != NULL)
- {
- pos->neighbour = NULL;
- pos->next = h->connect_wait_head;
- h->connect_wait_head = pos;
- if (pos->next != NULL)
- pos->next->prev = pos;
- pos->prev = NULL;
- }
- GNUNET_free (n);
- }
- h->connect_ready_head = NULL;
+ remove_neighbour (h, &n->id);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
#endif
GNUNET_assert (h->client == NULL);
- h->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
+ h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg);
GNUNET_assert (h->client != NULL);
/* make sure we don't send "START" twice,
schedule_control_transmit (h,
sizeof (struct GNUNET_MessageHeader),
GNUNET_YES,
- GNUNET_TIME_UNIT_FOREVER_REL, &send_start, NULL);
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &send_start, NULL);
GNUNET_CLIENT_receive (h->client,
&demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
}
h->reconnect_delay.value);
#endif
GNUNET_assert (h->client == NULL);
- GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
+ GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
h->reconnect_task
= GNUNET_SCHEDULER_add_delayed (h->sched,
GNUNET_NO,
GNUNET_SCHEDULER_PRIORITY_DEFAULT,
- GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
+ GNUNET_SCHEDULER_NO_TASK,
h->reconnect_delay, &reconnect, h);
h->reconnect_delay = GNUNET_TIME_UNIT_SECONDS;
}
-/**
- * Remove the given transmit handle from the wait list. Does NOT free
- * it.
- */
-static void
-remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
-{
- if (th->prev == NULL)
- th->handle->connect_wait_head = th->next;
- else
- th->prev->next = th->next;
- if (th->next != NULL)
- th->next->prev = th->prev;
-}
-
-
/**
* We are connected to the respective peer, check the
* bandwidth limits and schedule the transmission.
{
struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
- th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
+ th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
schedule_request (th);
}
/**
- * Called when our transmit request timed out before any transport
- * reported success connecting to the desired peer or before the
- * transport was ready to receive. Signal error and free
- * TransmitHandle.
+ * Remove the given transmit handle from the wait list. Does NOT free
+ * it.
*/
static void
-transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
{
- struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
-
- if (th->neighbour != NULL)
- th->neighbour->transmit_handle = NULL;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission request timed out.\n");
-#endif
- th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
- remove_from_wait_list (th);
- th->notify (th->notify_cls, 0, NULL);
- GNUNET_free (th);
+ if (th->prev == NULL)
+ th->handle->connect_wait_head = th->next;
+ else
+ th->prev->next = th->next;
+ if (th->next != NULL)
+ th->next->prev = th->prev;
}
h = th->handle;
n = th->neighbour;
- if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
+ if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
- th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
+ th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
}
/* check outgoing quota */
duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
/* signal timeout! */
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Would need %llu ms before bandwidth is available for delivery, that is too long. Signaling timeout.\n",
- duration.value);
+ "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n",
+ duration.value,
+ GNUNET_i2s(&th->target));
#endif
remove_from_wait_list (th);
th->notify (th->notify_cls, 0, NULL);
}
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Need more bandwidth, delaying delivery by %llu ms\n",
+ "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n",
+ GNUNET_i2s(&th->target),
duration.value);
#endif
th->notify_delay_task
= GNUNET_SCHEDULER_add_delayed (h->sched,
GNUNET_NO,
GNUNET_SCHEDULER_PRIORITY_KEEP,
- GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
+ GNUNET_SCHEDULER_NO_TASK,
duration, &transmit_ready, th);
return;
}
= GNUNET_SCHEDULER_add_delayed (h->sched,
GNUNET_NO,
GNUNET_SCHEDULER_PRIORITY_KEEP,
- GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
+ GNUNET_SCHEDULER_NO_TASK,
GNUNET_TIME_absolute_get_remaining
- (th->timeout), &transmit_timeout, th);
+ (th->timeout), &peer_transmit_timeout, th);
return;
}
n->transmit_ok = GNUNET_NO;
remove_from_wait_list (th);
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Moving message to ready list\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Moving message for `%4s' to ready list\n",
+ GNUNET_i2s(&n->id));
#endif
insert_transmit_handle (&h->connect_ready_head, th);
if (GNUNET_NO == h->transmission_scheduled)
while (pos != NULL)
{
next = pos->next;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found entry in connect_wait_head for `%4s'.\n",
- GNUNET_i2s (&pos->target));
-#endif
if (0 == memcmp (pid,
&pos->target, sizeof (struct GNUNET_PeerIdentity)))
{
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found pending request for new connection, will trigger now.\n");
-#endif
pos->neighbour = n;
- if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
- {
- GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
- pos->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
- }
GNUNET_assert (NULL == n->transmit_handle);
n->transmit_handle = pos;
+ if (prev == NULL)
+ h->connect_wait_head = next;
+ else
+ prev->next = next;
if (GNUNET_YES == n->received_ack)
{
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s' already received, scheduling request\n",
- "ACK");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Found pending request for `%4s' will trigger it now.\n",
+ GNUNET_i2s (&pos->target));
#endif
+ if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
+ pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ }
schedule_request (pos);
}
else
{
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Still need to wait to receive `%s' message\n",
- "ACK");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Found pending request for `%4s' but still need `%s' before proceeding.\n",
+ GNUNET_i2s (&pos->target),
+ "ACK");
#endif
- pos->notify_delay_task
- = GNUNET_SCHEDULER_add_delayed (h->sched,
- GNUNET_NO,
- GNUNET_SCHEDULER_PRIORITY_KEEP,
- GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
- GNUNET_TIME_absolute_get_remaining
- (pos->timeout),
- &transmit_timeout, pos);
}
- if (prev == NULL)
- h->connect_wait_head = next;
- else
- prev->next = next;
break;
}
prev = pos;
*/
struct GNUNET_TRANSPORT_Handle *
GNUNET_TRANSPORT_connect (struct GNUNET_SCHEDULER_Handle *sched,
- struct GNUNET_CONFIGURATION_Handle *cfg,
+ const struct GNUNET_CONFIGURATION_Handle *cfg,
void *cls,
GNUNET_TRANSPORT_ReceiveCallback rec,
GNUNET_TRANSPORT_NotifyConnect nc,
struct GNUNET_CLIENT_Connection *client;
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transport disconnect called!\n");
#endif
while (NULL != (th = handle->connect_ready_head))
{
handle->connect_ready_head = th->next;
+ GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
th->notify (th->notify_cls, 0, NULL);
GNUNET_free (th);
}
-
while (NULL != (th = handle->connect_wait_head))
{
handle->connect_wait_head = th->next;
- if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
+ if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
- th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
+ th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
}
th->notify (th->notify_cls, 0, NULL);
GNUNET_free (th);
handle->hwl_head = hwl->next;
GNUNET_SCHEDULER_cancel (handle->sched, hwl->task);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _
- ("Disconnect while trying to obtain HELLO from transport service.\n"));
+ _("Disconnect while trying to obtain `%s' from transport service.\n"),
+ "HELLO");
if (hwl->rec != NULL)
hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL);
GNUNET_free (hwl);
}
- if (handle->reconnect_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
+ if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task);
- handle->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
+ handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
}
GNUNET_free_non_null (handle->my_hello);
handle->my_hello = NULL;
}
-/**
- * We're ready to transmit the request that the transport service
- * should connect to a new peer. In addition to sending the
- * request, schedule the next phase for the transmission processing
- * that caused the connect request in the first place.
- */
-static size_t
-request_connect (void *cls, size_t size, void *buf)
-{
- struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
- struct TryConnectMessage *tcm;
- struct GNUNET_TRANSPORT_Handle *h;
-
- h = th->handle;
- if (buf == NULL)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to transmit connect request to service.\n");
-#endif
- th->notify (th->notify_cls, 0, NULL);
- GNUNET_free (th);
- return 0;
- }
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting `%s' message for `%4s'.\n",
- "TRY_CONNECT", GNUNET_i2s (&th->target));
-#endif
- GNUNET_assert (size >= sizeof (struct TryConnectMessage));
- tcm = buf;
- tcm->header.size = htons (sizeof (struct TryConnectMessage));
- tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT);
- tcm->reserved = htonl (0);
- memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity));
- th->notify_delay_task
- = GNUNET_SCHEDULER_add_delayed (h->sched,
- GNUNET_NO,
- GNUNET_SCHEDULER_PRIORITY_KEEP,
- GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
- GNUNET_TIME_absolute_get_remaining
- (th->timeout), &transmit_timeout, th);
- insert_transmit_handle (&h->connect_wait_head, th);
- return sizeof (struct TryConnectMessage);
-}
-
-
-/**
- * Schedule a request to connect to the given
- * neighbour (and if successful, add the specified
- * handle to the wait list).
- */
-static void
-try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th)
-{
- schedule_control_transmit (th->handle,
- sizeof (struct TryConnectMessage),
- GNUNET_NO,
- GNUNET_TIME_absolute_get_remaining (th->timeout),
- &request_connect, th);
-}
-
-
-/**
- * Cancel a pending notify transmit task
- * and also remove the given transmit handle
- * from whatever list is on.
- */
-static void
-remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
-{
- struct GNUNET_TRANSPORT_Handle *h;
-
- h = th->handle;
- if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
- {
- GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
- th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
- }
- if (th->prev == NULL)
- {
- if (th == h->connect_wait_head)
- h->connect_wait_head = th->next;
- else
- h->connect_ready_head = th->next;
- }
- else
- th->prev->next = th->next;
- if (th->next != NULL)
- th->next->prev = th->prev;
-}
-
-
-/**
- * Remove neighbour from our list
- */
-static void
-remove_neighbour (struct GNUNET_TRANSPORT_Handle *h,
- const struct GNUNET_PeerIdentity *peer)
-{
- struct NeighbourList *prev;
- struct NeighbourList *pos;
- struct GNUNET_TRANSPORT_TransmitHandle *th;
-
- prev = NULL;
- pos = h->neighbours;
- while ((pos != NULL) &&
- (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
- {
- prev = pos;
- pos = pos->next;
- }
- if (pos == NULL)
- {
- GNUNET_break (0);
- return;
- }
- if (prev == NULL)
- h->neighbours = pos->next;
- else
- prev->next = pos->next;
- if (NULL != (th = pos->transmit_handle))
- {
- pos->transmit_handle = NULL;
- th->neighbour = NULL;
- remove_from_any_list (th);
- try_connect (th);
- }
- if (h->nc_cb != NULL)
- h->nd_cb (h->cls, peer);
- GNUNET_free (pos);
-}
-
-
/**
* Type of a function to call when we receive a message
* from the service.
const struct SendOkMessage *okm;
struct HelloWaitList *hwl;
struct NeighbourList *n;
- struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pkey;
struct GNUNET_PeerIdentity me;
+ struct GNUNET_TRANSPORT_TransmitHandle *th;
uint16_t size;
if ((msg == NULL) || (h->client == NULL))
if (h->client != NULL)
{
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Error receiving from transport service, disconnecting temporarily.\n");
#endif
if (h->network_handle != NULL)
{
- GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle);
+ GNUNET_CONNECTION_notify_transmit_ready_cancel (h->network_handle);
h->network_handle = NULL;
h->transmission_scheduled = GNUNET_NO;
+ th = h->connect_ready_head;
+ /* add timeout again, we cancelled the transmit_ready task! */
+ GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
+ th->notify_delay_task
+ = GNUNET_SCHEDULER_add_delayed (h->sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_KEEP,
+ GNUNET_SCHEDULER_NO_TASK,
+ GNUNET_TIME_absolute_get_remaining(th->timeout),
+ &peer_transmit_timeout,
+ th);
}
GNUNET_CLIENT_disconnect (h->client);
h->client = NULL;
{
case GNUNET_MESSAGE_TYPE_HELLO:
if (GNUNET_OK !=
- GNUNET_HELLO_get_key ((const struct GNUNET_HELLO_Message *) msg,
- &pkey))
+ GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
+ &me))
{
GNUNET_break (0);
break;
}
- GNUNET_CRYPTO_hash (&pkey,
- sizeof (struct
- GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
- &me.hashPubKey);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Receiving (my own) `%s' message, I am `%4s'.\n",
remove_neighbour (h, &dim->peer);
break;
case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving `%s' message.\n", "SEND_OK");
-#endif
if (size != sizeof (struct SendOkMessage))
{
GNUNET_break (0);
break;
}
okm = (const struct SendOkMessage *) msg;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving `%s' message, transmission %s.\n", "SEND_OK",
+ ntohl(okm->success) == GNUNET_OK ? "succeeded" : "failed");
+#endif
n = find_neighbour (h, &okm->peer);
GNUNET_assert (n != NULL);
n->transmit_ok = GNUNET_YES;
{
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processing pending message\n");
+ "Processing pending message for `%4s'\n",
+ GNUNET_i2s(&n->id));
#endif
GNUNET_SCHEDULER_cancel (h->sched,
n->transmit_handle->notify_delay_task);
n->transmit_handle->notify_delay_task =
- GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
+ GNUNET_SCHEDULER_NO_TASK;
GNUNET_assert (GNUNET_YES == n->received_ack);
schedule_request (n->transmit_handle);
}
struct ClientTransmitWrapper
{
- GNUNET_NETWORK_TransmitReadyNotify notify;
+ GNUNET_CONNECTION_TransmitReadyNotify notify;
void *notify_cls;
struct GNUNET_TRANSPORT_TransmitHandle *th;
};
ret += sizeof (struct OutboundMessage);
obm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
obm->header.size = htons (ret);
- obm->reserved = htonl (0);
+ obm->priority = htonl (ctw->th->priority);
obm->peer = ctw->th->target;
GNUNET_free (ctw);
return ret;
* @param handle connection to transport service
* @param target who should receive the message
* @param size how big is the message we want to transmit?
+ * @param priority how important is the message?
* @param timeout after how long should we give up (and call
* notify with buf NULL and size 0)?
* @param notify function to call when we are ready to
*handle,
const struct GNUNET_PeerIdentity
*target, size_t size,
+ unsigned int priority,
struct GNUNET_TIME_Relative timeout,
- GNUNET_NETWORK_TransmitReadyNotify
+ GNUNET_CONNECTION_TransmitReadyNotify
notify, void *notify_cls)
{
struct GNUNET_TRANSPORT_TransmitHandle *pos;
if (size + sizeof (struct OutboundMessage) >=
GNUNET_SERVER_MAX_MESSAGE_SIZE)
- return NULL;
+ {
+ GNUNET_break (0);
+ return NULL;
+ }
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asking transport service for transmission of %u bytes to peer `%4s'.\n",
size, GNUNET_i2s (target));
#endif
n = find_neighbour (handle, target);
+ if ( (n != NULL) &&
+ (n->transmit_handle != NULL) )
+ return NULL; /* already have a request pending for this peer! */
ctw = GNUNET_malloc (sizeof (struct ClientTransmitWrapper));
th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
ctw->notify = notify;
ctw->notify_cls = notify_cls;
ctw->th = th;
th->handle = handle;
+ th->neighbour = n;
th->target = *target;
th->notify = &client_notify_wrapper;
th->notify_cls = ctw;
- th->notify_size = size + sizeof (struct OutboundMessage);
th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
- th->neighbour = n;
+ th->notify_size = size + sizeof (struct OutboundMessage);
+ th->priority = priority;
if (NULL == n)
{
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission request could not be satisfied (not yet connected), adding it to pending request list.\n");
-#endif
pos = handle->connect_wait_head;
while (pos != NULL)
{
"Will now try to connect to `%4s'.\n", GNUNET_i2s (target));
#endif
try_connect (th);
+ return th;
}
- else
- {
+
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission request queued for transmission to transport service.\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmission request queued for transmission to transport service.\n");
#endif
- GNUNET_assert (NULL == n->transmit_handle);
- n->transmit_handle = th;
- if (GNUNET_YES == n->received_ack)
- {
+ GNUNET_assert (NULL == n->transmit_handle);
+ n->transmit_handle = th;
+ if (GNUNET_YES != n->received_ack)
+ {
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Peer `%4s' is connected, scheduling for delivery now.\n",
- GNUNET_i2s (target));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llu ms) only.\n",
+ GNUNET_i2s (target), timeout.value);
#endif
- schedule_request (th);
- }
- else
- {
+ th->notify_delay_task
+ = GNUNET_SCHEDULER_add_delayed (handle->sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_KEEP,
+ GNUNET_SCHEDULER_NO_TASK,
+ timeout, &peer_transmit_timeout, th);
+ return th;
+ }
+
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llums) only.\n",
- GNUNET_i2s (target), timeout.value);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer `%4s' is ready to receive, scheduling message for delivery now.\n",
+ GNUNET_i2s (target));
#endif
- th->notify_delay_task
- = GNUNET_SCHEDULER_add_delayed (handle->sched,
- GNUNET_NO,
- GNUNET_SCHEDULER_PRIORITY_KEEP,
- GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
- timeout, &transmit_timeout, th);
- }
- }
+ schedule_request (th);
return th;
}
/**
- * Cancel the specified transmission-ready
- * notification.
+ * Cancel the specified transmission-ready notification.
*/
void
GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
{
struct GNUNET_TRANSPORT_Handle *h;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmission request of %u bytes to `%4s' was cancelled.\n",
+ th->notify_size - sizeof(struct OutboundMessage),
+ GNUNET_i2s (&th->target));
+#endif
GNUNET_assert (th->notify == &client_notify_wrapper);
remove_from_any_list (th);
h = th->handle;
if ((h->connect_ready_head == NULL) && (h->network_handle != NULL))
{
- GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle);
+ GNUNET_CONNECTION_notify_transmit_ready_cancel (h->network_handle);
h->network_handle = NULL;
h->transmission_scheduled = GNUNET_NO;
}
GNUNET_free (th->notify_cls);
+ GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
GNUNET_free (th);
}