#include "platform.h"
#include "gnunet_ats_service.h"
#include "gnunet-service-transport_neighbours.h"
+#include "gnunet-service-transport_plugins.h"
#include "gnunet-service-transport_validation.h"
#include "gnunet-service-transport.h"
#include "gnunet_peerinfo_service.h"
#define QUOTA_VIOLATION_DROP_THRESHOLD 10
-// TODO:
-// - have a way to access the currently 'connected' session
-// (for sending and to notice disconnect of it!)
-// - have a way to access/update bandwidth/quota information per peer
-// (for CostReport/TrafficReport callbacks)
-
-
+/**
+ * Entry in neighbours.
+ */
struct NeighbourMapEntry;
+
/**
* For each neighbour we keep a list of messages
* that we still want to transmit to the neighbour.
*/
struct MessageQueue *prev;
+ /**
+ * Once this message is actively being transmitted, which
+ * neighbour is it associated with?
+ */
+ struct NeighbourMapEntry *n;
+
/**
* Function to call once we're done.
*/
*/
struct GNUNET_TRANSPORT_ATS_Information *ats;
+ /**
+ * Are we currently trying to send a message? If so, which one?
+ */
+ struct MessageQueue *is_active;
+
/**
* Active session for communicating with the peer.
*/
GNUNET_SCHEDULER_TaskIdentifier timeout_task;
/**
- * ID of task scheduled to run when we should retry transmitting
- * the head of the message queue. Actually triggered when the
- * transmission is timing out (we trigger instantly when we have
- * a chance of success).
- */
- GNUNET_SCHEDULER_TaskIdentifier retry_task;
-
- /**
- * How long until we should consider this peer dead (if we don't
- * receive another message in the meantime)?
+ * ID of task scheduled to run when we should try transmitting
+ * the head of the message queue.
*/
- struct GNUNET_TIME_Absolute peer_timeout;
+ GNUNET_SCHEDULER_TaskIdentifier transmission_task;
/**
* Tracker for inbound bandwidth.
*/
unsigned int ats_count;
- /**
- * Have we seen an PONG from this neighbour in the past (and
- * not had a disconnect since)?
- */
- // int received_pong;
-
/**
* Are we already in the process of disconnecting this neighbour?
*/
- // int in_disconnect;
+ int in_disconnect;
/**
* Do we currently consider this neighbour connected? (as far as
}
-#if 0
+/**
+ * Task invoked to start a transmission to another peer.
+ *
+ * @param cls the 'struct NeighbourMapEntry'
+ * @param tc scheduler context
+ */
+static void
+transmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * We're done with our transmission attempt, continue processing.
+ *
+ * @param cls the 'struct MessageQueue' of the message
+ * @param receiver intended receiver
+ * @param success whether it worked or not
+ */
+static void
+transmit_send_continuation (void *cls,
+ const struct GNUNET_PeerIdentity *receiver,
+ int success)
+{
+ struct MessageQueue *mq;
+ struct NeighbourMapEntry *n;
+
+ mq = cls;
+ n = mq->n;
+ if (NULL != n)
+ {
+ GNUNET_assert (n->is_active == mq);
+ n->is_active = NULL;
+ GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
+ n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
+ n);
+ }
+ if (NULL != mq->cont)
+ mq->cont (mq->cont_cls,
+ success);
+ GNUNET_free (mq);
+}
+
+
/**
* Check the ready list for the given neighbour and if a plugin is
* ready for transmission (and if we have a message), do so!
struct MessageQueue *mq;
struct GNUNET_TIME_Relative timeout;
ssize_t ret;
+ struct GNUNET_TRANSPORT_PluginFunctions *papi;
- if (n->messages_head == NULL)
+ if (n->is_active != NULL)
+ return; /* transmission already pending */
+ if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
+ return; /* currently waiting for bandwidth */
+ mq = n->messages_head;
+ while (NULL != (mq = n->messages_head))
{
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission queue for `%4s' is empty\n",
- GNUNET_i2s (&n->id));
-#endif
- return; /* nothing to do */
+ timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
+ if (timeout.rel_value > 0)
+ break;
+ transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */
+ }
+ if (NULL == mq)
+ return; /* no more messages */
+
+ papi = GST_plugins_find (n->plugin_name);
+ if (papi == NULL)
+ {
+ GNUNET_break (0);
+ return;
}
- mq = n->messages_head;
GNUNET_CONTAINER_DLL_remove (n->messages_head,
n->messages_tail,
mq);
+ n->is_active = mq;
+ mq->n = n;
ret = papi->send (papi->cls,
- &n->pid,
+ &n->id,
mq->message_buf,
mq->message_buf_size,
- mq->priority,
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ 0 /* priority -- remove from plugin API? */,
+ timeout,
n->session,
n->addr,
n->addrlen,
- GNUNET_YES /*?*/,
+ GNUNET_YES,
&transmit_send_continuation, mq);
if (ret == -1)
{
/* failure, but 'send' would not call continuation in this case,
so we need to do it here! */
transmit_send_continuation (mq,
- &mq->neighbour_id,
+ &n->id,
GNUNET_SYSERR);
+ n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
+ n);
}
}
-#endif
+
+
+/**
+ * Task invoked to start a transmission to another peer.
+ *
+ * @param cls the 'struct NeighbourMapEntry'
+ * @param tc scheduler context
+ */
+static void
+transmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct NeighbourMapEntry *n = cls;
+
+ n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
+ try_transmission_to_peer (n);
+}
/**
{
struct MessageQueue *mq;
- if (n->is_connected)
+ if (GNUNET_YES == n->in_disconnect)
+ return;
+ n->in_disconnect = GNUNET_YES;
+ while (NULL != (mq = n->messages_head))
{
+ GNUNET_CONTAINER_DLL_remove (n->messages_head,
+ n->messages_tail,
+ mq);
+ mq->cont (mq->cont_cls, GNUNET_SYSERR);
+ GNUNET_free (mq);
+ }
+ if (NULL != n->is_active)
+ {
+ n->is_active->n = NULL;
+ n->is_active = NULL;
+ }
+ if (GNUNET_YES == n->is_connected)
+ {
+ n->is_connected = GNUNET_NO;
disconnect_notify_cb (callback_cls,
&n->id);
- n->is_connected = GNUNET_NO;
}
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (neighbours,
&n->id.hashPubKey,
n));
- while (NULL != (mq = n->messages_head))
+ if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
{
- GNUNET_CONTAINER_DLL_remove (n->messages_head,
- n->messages_tail,
- mq);
- GNUNET_free (mq);
+ GNUNET_SCHEDULER_cancel (n->timeout_task);
+ n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
+ {
+ GNUNET_SCHEDULER_cancel (n->timeout_task);
+ n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
}
if (NULL != n->asc)
{
uint32_t ats_count)
{
struct NeighbourMapEntry *n;
+ struct GNUNET_MessageHeader connect_msg;
n = lookup_neighbour (peer);
if (NULL == n)
ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
GNUNET_free_non_null (n->plugin_name);
n->plugin_name = GNUNET_strdup (plugin_name);
+ GNUNET_SCHEDULER_cancel (n->timeout_task);
+ n->timeout_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &neighbour_timeout_task, n);
+ connect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
+ connect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
+ GST_neighbours_send (peer,
+ &connect_msg,
+ sizeof (connect_msg),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ NULL, NULL);
}
n = lookup_neighbour (target);
if ( (NULL == n) ||
- (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) )
+ (n->is_connected == GNUNET_YES) )
return GNUNET_NO; /* not connected */
return GNUNET_YES;
}
n = lookup_neighbour (target);
if ( (n == NULL) ||
- (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) )
+ (GNUNET_YES != n->is_connected) )
{
GNUNET_STATISTICS_update (GST_stats,
gettext_noop ("# SET QUOTA messages ignored (no such peer)"),
GNUNET_CONTAINER_DLL_insert_tail (n->messages_head,
n->messages_tail,
mq);
- // try_transmission_to_peer (n);
+ if ( (GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
+ (NULL == n->is_active) )
+ n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
+ n);
}
n->quota_violation_count--;
}
}
- n->peer_timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
GNUNET_SCHEDULER_cancel (n->timeout_task);
n->timeout_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
struct IteratorContext *ic = cls;
struct NeighbourMapEntry *n = value;
- if (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0)
- return GNUNET_OK; /* not connected */
+ if (GNUNET_YES != n->is_connected)
+ return GNUNET_OK;
GNUNET_assert (n->ats_count > 0);
ic->cb (ic->cb_cls,
&n->id,
GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
{
struct NeighbourMapEntry *n;
+ struct GNUNET_TRANSPORT_PluginFunctions *papi;
+ struct GNUNET_MessageHeader disconnect_msg;
n = lookup_neighbour (target);
- /* FIXME: send disconnect message to target... */
+ disconnect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
+ disconnect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
+ papi = GST_plugins_find (n->plugin_name);
+ if (papi != NULL)
+ papi->send (papi->cls,
+ target,
+ (const void*) &disconnect_msg,
+ sizeof (struct GNUNET_MessageHeader),
+ UINT32_MAX /* priority */,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ n->session,
+ n->addr,
+ n->addrlen,
+ GNUNET_YES,
+ NULL, NULL);
disconnect_neighbour (n);
}