finishing neighbours
authorChristian Grothoff <christian@grothoff.org>
Fri, 12 Aug 2011 15:49:57 +0000 (15:49 +0000)
committerChristian Grothoff <christian@grothoff.org>
Fri, 12 Aug 2011 15:49:57 +0000 (15:49 +0000)
src/transport/gnunet-service-transport_neighbours.c
src/transport/plugin_transport_tcp.c

index f308789661253d5029e68e554afa3a434e2a5a89..f262c2b876cc35ddd9954c8566396f778c7e074d 100644 (file)
@@ -26,6 +26,7 @@
 #include "platform.h"
 #include "gnunet_ats_service.h"
 #include "gnunet-service-transport_neighbours.h"
+#include "gnunet-service-transport_plugins.h"
 #include "gnunet-service-transport_validation.h"
 #include "gnunet-service-transport.h"
 #include "gnunet_peerinfo_service.h"
 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
 
 
-// TODO:
-// - have a way to access the currently 'connected' session
-//   (for sending and to notice disconnect of it!)
-// - have a way to access/update bandwidth/quota information per peer
-//   (for CostReport/TrafficReport callbacks)
-
-
+/**
+ * Entry in neighbours. 
+ */
 struct NeighbourMapEntry;
 
+
 /**
  * For each neighbour we keep a list of messages
  * that we still want to transmit to the neighbour.
@@ -71,6 +69,12 @@ struct MessageQueue
    */
   struct MessageQueue *prev;
 
+  /**
+   * Once this message is actively being transmitted, which
+   * neighbour is it associated with?
+   */
+  struct NeighbourMapEntry *n;
+
   /**
    * Function to call once we're done.
    */
@@ -129,6 +133,11 @@ struct NeighbourMapEntry
    */
   struct GNUNET_TRANSPORT_ATS_Information *ats;
 
+  /**
+   * Are we currently trying to send a message? If so, which one?
+   */
+  struct MessageQueue *is_active;
+
   /**
    * Active session for communicating with the peer.
    */
@@ -161,18 +170,10 @@ struct NeighbourMapEntry
   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
 
   /**
-   * ID of task scheduled to run when we should retry transmitting
-   * the head of the message queue.  Actually triggered when the
-   * transmission is timing out (we trigger instantly when we have
-   * a chance of success).
-   */
-  GNUNET_SCHEDULER_TaskIdentifier retry_task;
-
-  /**
-   * How long until we should consider this peer dead (if we don't
-   * receive another message in the meantime)?
+   * ID of task scheduled to run when we should try transmitting
+   * the head of the message queue.  
    */
-  struct GNUNET_TIME_Absolute peer_timeout;
+  GNUNET_SCHEDULER_TaskIdentifier transmission_task;
 
   /**
    * Tracker for inbound bandwidth.
@@ -191,16 +192,10 @@ struct NeighbourMapEntry
    */
   unsigned int ats_count;
 
-  /**
-   * Have we seen an PONG from this neighbour in the past (and
-   * not had a disconnect since)?
-   */
-  // int received_pong;
-
   /**
    * Are we already in the process of disconnecting this neighbour?
    */
-  // int in_disconnect;
+  int in_disconnect;
 
   /**
    * Do we currently consider this neighbour connected? (as far as
@@ -246,7 +241,49 @@ lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
 }
 
 
-#if 0
+/**
+ * Task invoked to start a transmission to another peer.
+ *
+ * @param cls the 'struct NeighbourMapEntry'
+ * @param tc scheduler context
+ */
+static void
+transmission_task (void *cls,
+                  const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * We're done with our transmission attempt, continue processing.
+ *
+ * @param cls the 'struct MessageQueue' of the message
+ * @param receiver intended receiver
+ * @param success whether it worked or not
+ */
+static void
+transmit_send_continuation (void *cls,
+                           const struct GNUNET_PeerIdentity *receiver,
+                           int success)
+{
+  struct MessageQueue *mq;
+  struct NeighbourMapEntry *n;
+  
+  mq = cls;
+  n = mq->n;
+  if (NULL != n) 
+    {
+      GNUNET_assert (n->is_active == mq);
+      n->is_active = NULL;
+      GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
+      n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
+                                                      n);
+    }
+  if (NULL != mq->cont)
+    mq->cont (mq->cont_cls,
+             success);
+  GNUNET_free (mq);
+}
+
+
 /**
  * Check the ready list for the given neighbour and if a plugin is
  * ready for transmission (and if we have a message), do so!
@@ -259,41 +296,73 @@ try_transmission_to_peer (struct NeighbourMapEntry *n)
   struct MessageQueue *mq;
   struct GNUNET_TIME_Relative timeout;
   ssize_t ret;
+  struct GNUNET_TRANSPORT_PluginFunctions *papi;
 
-  if (n->messages_head == NULL)
+  if (n->is_active != NULL)
+    return; /* transmission already pending */
+  if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
+    return; /* currently waiting for bandwidth */
+  mq = n->messages_head;
+  while (NULL != (mq = n->messages_head))
     {
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Transmission queue for `%4s' is empty\n",
-                 GNUNET_i2s (&n->id));
-#endif
-      return;                     /* nothing to do */
+      timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
+      if (timeout.rel_value > 0)
+       break;
+      transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */
+    }
+  if (NULL == mq)
+    return; /* no more messages */
+
+  papi = GST_plugins_find (n->plugin_name);
+  if (papi == NULL)
+    {
+      GNUNET_break (0);
+      return;
     }
-  mq = n->messages_head;
   GNUNET_CONTAINER_DLL_remove (n->messages_head,
                               n->messages_tail,
                               mq);
+  n->is_active = mq;
+  mq->n = n;
   ret = papi->send (papi->cls,
-                   &n->pid,
+                   &n->id,
                    mq->message_buf,
                    mq->message_buf_size,
-                   mq->priority,
-                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                   0 /* priority -- remove from plugin API? */,
+                   timeout,
                    n->session,
                    n->addr,
                    n->addrlen,
-                   GNUNET_YES /*?*/,
+                   GNUNET_YES,
                    &transmit_send_continuation, mq);
   if (ret == -1)
     {
       /* failure, but 'send' would not call continuation in this case,
         so we need to do it here! */
       transmit_send_continuation (mq,
-                                 &mq->neighbour_id,
+                                 &n->id,
                                  GNUNET_SYSERR);
+      n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
+                                                      n);
     }
 }
-#endif
+
+
+/**
+ * Task invoked to start a transmission to another peer.
+ *
+ * @param cls the 'struct NeighbourMapEntry'
+ * @param tc scheduler context
+ */
+static void
+transmission_task (void *cls,
+                  const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct NeighbourMapEntry *n = cls;
+
+  n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
+  try_transmission_to_peer (n);
+}
 
 
 /**
@@ -325,22 +394,41 @@ disconnect_neighbour (struct NeighbourMapEntry *n)
 {
   struct MessageQueue *mq;
 
-  if (n->is_connected)
+  if (GNUNET_YES == n->in_disconnect)
+    return;
+  n->in_disconnect = GNUNET_YES;
+  while (NULL != (mq = n->messages_head))
     {
+      GNUNET_CONTAINER_DLL_remove (n->messages_head,
+                                  n->messages_tail,
+                                  mq);
+      mq->cont (mq->cont_cls, GNUNET_SYSERR);
+      GNUNET_free (mq);
+    }
+  if (NULL != n->is_active)
+    {
+      n->is_active->n = NULL;
+      n->is_active = NULL;
+    }
+  if (GNUNET_YES == n->is_connected)
+    {
+      n->is_connected = GNUNET_NO;
       disconnect_notify_cb (callback_cls,
                            &n->id);
-      n->is_connected = GNUNET_NO;
     }
   GNUNET_assert (GNUNET_YES ==
                 GNUNET_CONTAINER_multihashmap_remove (neighbours,
                                                       &n->id.hashPubKey,
                                                       n));
-  while (NULL != (mq = n->messages_head))
+  if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
     {
-      GNUNET_CONTAINER_DLL_remove (n->messages_head,
-                                  n->messages_tail,
-                                  mq);
-      GNUNET_free (mq);
+      GNUNET_SCHEDULER_cancel (n->timeout_task);
+      n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+    }
+  if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
+    {
+      GNUNET_SCHEDULER_cancel (n->timeout_task);
+      n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
     }
   if (NULL != n->asc)
     {
@@ -446,6 +534,7 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
                                  uint32_t ats_count)
 {
   struct NeighbourMapEntry *n;
+  struct GNUNET_MessageHeader connect_msg;
 
   n = lookup_neighbour (peer);
   if (NULL == n)
@@ -466,6 +555,17 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
          ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
   GNUNET_free_non_null (n->plugin_name);
   n->plugin_name = GNUNET_strdup (plugin_name);
+  GNUNET_SCHEDULER_cancel (n->timeout_task);
+  n->timeout_task =
+    GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                 &neighbour_timeout_task, n);
+  connect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
+  connect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
+  GST_neighbours_send (peer,
+                      &connect_msg,
+                      sizeof (connect_msg),
+                      GNUNET_TIME_UNIT_FOREVER_REL,
+                      NULL, NULL);
 }
 
 
@@ -564,7 +664,7 @@ GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
 
   n = lookup_neighbour (target);
   if ( (NULL == n) ||
-       (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) )
+       (n->is_connected == GNUNET_YES) )
        return GNUNET_NO; /* not connected */
   return GNUNET_YES;
 }
@@ -593,7 +693,7 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target,
 
   n = lookup_neighbour (target);
   if ( (n == NULL) ||
-       (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) ) 
+       (GNUNET_YES != n->is_connected) )
     {
       GNUNET_STATISTICS_update (GST_stats,
                                gettext_noop ("# SET QUOTA messages ignored (no such peer)"),
@@ -620,7 +720,10 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target,
   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head,
                                    n->messages_tail,
                                    mq);
-  // try_transmission_to_peer (n);
+  if ( (GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
+       (NULL == n->is_active) )
+    n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
+                                                    n);
 }
 
 
@@ -667,9 +770,6 @@ GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity *sender
          n->quota_violation_count--;
        }
     }
-  n->peer_timeout =
-    GNUNET_TIME_relative_to_absolute
-    (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
   GNUNET_SCHEDULER_cancel (n->timeout_task);
   n->timeout_task =
     GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
@@ -773,8 +873,8 @@ neighbours_iterate (void *cls,
   struct IteratorContext *ic = cls;
   struct NeighbourMapEntry *n = value;
 
-  if (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0)
-    return GNUNET_OK; /* not connected */
+  if (GNUNET_YES != n->is_connected)
+    return GNUNET_OK; 
   GNUNET_assert (n->ats_count > 0);
   ic->cb (ic->cb_cls,
          &n->id,
@@ -813,9 +913,25 @@ void
 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
 {
   struct NeighbourMapEntry *n;
+  struct GNUNET_TRANSPORT_PluginFunctions *papi;
+  struct GNUNET_MessageHeader disconnect_msg;
 
   n = lookup_neighbour (target);
-  /* FIXME: send disconnect message to target... */
+  disconnect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
+  disconnect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
+  papi = GST_plugins_find (n->plugin_name);
+  if (papi != NULL)
+    papi->send (papi->cls,
+               target,
+               (const void*) &disconnect_msg,
+               sizeof (struct GNUNET_MessageHeader),
+               UINT32_MAX /* priority */,
+               GNUNET_TIME_UNIT_FOREVER_REL,
+               n->session,
+               n->addr,
+               n->addrlen,
+               GNUNET_YES,
+               NULL, NULL);
   disconnect_neighbour (n);
 }
 
index e796dacf4ffe20cce4c1a2427ca7ca744153bda0..2c20ba35e53599f58ef20d3748b7cf44fa5642e7 100644 (file)
@@ -834,6 +834,9 @@ disconnect_session (struct Session *session)
         (session->transmit_handle);
       session->transmit_handle = NULL;
     }
+  session->plugin->env->session_end (session->plugin->env->cls,
+                                     &session->target,
+                                     session);
   while (NULL != (pm = session->pending_messages_head))
     {
 #if DEBUG_TCP
@@ -878,9 +881,6 @@ disconnect_session (struct Session *session)
                            -1,
                            GNUNET_NO);
   GNUNET_free_non_null (session->connect_addr);
-  session->plugin->env->session_end (session->plugin->env->cls,
-                                     &session->target,
-                                     session);
   GNUNET_assert (NULL == session->transmit_handle);
   GNUNET_free (session);
 }