convert fs publish to MQ
[oweals/gnunet.git] / src / core / gnunet-service-core_neighbours.c
index 5014298bfb4dfdb8b450d6623cee32db4b259712..c1c62cf1aede06edd4dbc62ad0e5bd249fa46b04 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009, 2010, 2011 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
@@ -57,9 +57,14 @@ struct NeighbourMessageEntry
   struct GNUNET_TIME_Absolute deadline;
 
   /**
-   * How long is the message? (number of bytes following the "struct
-   * MessageEntry", but not including the size of "struct
-   * MessageEntry" itself!)
+   * What time did we submit the request?
+   */
+  struct GNUNET_TIME_Absolute submission_time;
+
+  /**
+   * How long is the message? (number of bytes following the `struct
+   * MessageEntry`, but not including the size of `struct
+   * MessageEntry` itself!)
    */
   size_t size;
 
@@ -103,15 +108,25 @@ struct Neighbour
   /**
    * ID of task used for re-trying plaintext scheduling.
    */
-  GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
+  struct GNUNET_SCHEDULER_Task *retry_plaintext_task;
+
+  /**
+   * How many messages are in the queue for this neighbour?
+   */
+  unsigned int queue_size;
+
+  /**
+   * #GNUNET_YES if this peer currently has excess bandwidth.
+   */
+  int has_excess_bandwidth;
 
 };
 
 
 /**
- * Map of peer identities to 'struct Neighbour'.
+ * Map of peer identities to `struct Neighbour`.
  */
-static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
+static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
 
 /**
  * Transport service.
@@ -129,7 +144,10 @@ static struct GNUNET_TRANSPORT_Handle *transport;
 static struct Neighbour *
 find_neighbour (const struct GNUNET_PeerIdentity *peer)
 {
-  return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey);
+  if (NULL == neighbours)
+    return NULL;
+  return GNUNET_CONTAINER_multipeermap_get (neighbours,
+                                            peer);
 }
 
 
@@ -143,16 +161,18 @@ free_neighbour (struct Neighbour *n)
 {
   struct NeighbourMessageEntry *m;
 
-#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Destroying neighbour entry for peer `%4s'\n",
+              "Destroying neighbour entry for peer `%s'\n",
               GNUNET_i2s (&n->peer));
-#endif
   while (NULL != (m = n->message_head))
   {
-    GNUNET_CONTAINER_DLL_remove (n->message_head, n->message_tail, m);
+    GNUNET_CONTAINER_DLL_remove (n->message_head,
+                                 n->message_tail,
+                                 m);
+    n->queue_size--;
     GNUNET_free (m);
   }
+  GNUNET_assert (0 == n->queue_size);
   if (NULL != n->th)
   {
     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
@@ -162,23 +182,22 @@ free_neighbour (struct Neighbour *n)
                             gettext_noop
                             ("# sessions terminated by transport disconnect"),
                             1, GNUNET_NO);
-  GSC_SESSIONS_end (&n->peer);
   if (NULL != n->kxinfo)
   {
     GSC_KX_stop (n->kxinfo);
     n->kxinfo = NULL;
   }
-  if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
+  if (NULL != n->retry_plaintext_task)
   {
     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
-    n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
+    n->retry_plaintext_task = NULL;
   }
   GNUNET_assert (GNUNET_OK ==
-                 GNUNET_CONTAINER_multihashmap_remove (neighbours,
-                                                       &n->peer.hashPubKey, n));
+                 GNUNET_CONTAINER_multipeermap_remove (neighbours,
+                                                       &n->peer, n));
   GNUNET_STATISTICS_set (GSC_stats,
                          gettext_noop ("# neighbour entries allocated"),
-                         GNUNET_CONTAINER_multihashmap_size (neighbours),
+                         GNUNET_CONTAINER_multipeermap_size (neighbours),
                          GNUNET_NO);
   GNUNET_free (n);
 }
@@ -205,47 +224,67 @@ process_queue (struct Neighbour *n);
  * @return number of bytes transmitted
  */
 static size_t
-transmit_ready (void *cls, size_t size, void *buf)
+transmit_ready (void *cls,
+                size_t size,
+                void *buf)
 {
   struct Neighbour *n = cls;
   struct NeighbourMessageEntry *m;
   size_t ret;
   char *cbuf;
+  struct GNUNET_TIME_Relative delay;
+  struct GNUNET_TIME_Relative overdue;
 
   n->th = NULL;
   m = n->message_head;
-  if (m == NULL)
+  if (NULL == m)
   {
     GNUNET_break (0);
     return 0;
   }
-  GNUNET_CONTAINER_DLL_remove (n->message_head, n->message_tail, m);
-  if (buf == NULL)
+  GNUNET_CONTAINER_DLL_remove (n->message_head,
+                               n->message_tail,
+                               m);
+  n->queue_size--;
+  if (NULL == buf)
   {
-#if DEBUG_CORE
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Transmission of message of type %u and size %u failed\n",
                 (unsigned int)
                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
                 (unsigned int) m->size);
-#endif
     GNUNET_free (m);
     process_queue (n);
     return 0;
   }
-  ret = 0;
+  delay = GNUNET_TIME_absolute_get_duration (m->submission_time);
+  overdue = GNUNET_TIME_absolute_get_duration (m->deadline);
   cbuf = buf;
   GNUNET_assert (size >= m->size);
-  memcpy (cbuf, &m[1], m->size);
+  memcpy (cbuf,
+          &m[1],
+          m->size);
   ret = m->size;
-#if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Copied message of type %u and size %u into transport buffer for `%4s'\n",
-              (unsigned int)
-              ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
-              (unsigned int) ret, GNUNET_i2s (&n->peer));
-#endif
+  if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Copied overdue message of type %u and size %u into transport buffer for `%s' with delay of %s\n",
+                (unsigned int)
+                ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
+                (unsigned int) ret,
+                GNUNET_i2s (&n->peer),
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES));
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Copied message of type %u and size %u into transport buffer for `%s' with delay of %s\n",
+                (unsigned int)
+                ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
+                (unsigned int) ret,
+                GNUNET_i2s (&n->peer),
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES));
   GNUNET_free (m);
+  n->has_excess_bandwidth = GNUNET_NO;
   process_queue (n);
   GNUNET_STATISTICS_update (GSC_stats,
                             gettext_noop
@@ -266,82 +305,87 @@ process_queue (struct Neighbour *n)
 {
   struct NeighbourMessageEntry *m;
 
-  if (n->th != NULL)
+  if (NULL != n->th)
     return;                     /* request already pending */
   m = n->message_head;
-  if (m == NULL)
+  if (NULL == m)
   {
     /* notify sessions that the queue is empty and more messages
      * could thus be queued now */
     GSC_SESSIONS_solicit (&n->peer);
     return;
   }
-#if DEBUG_CORE > 1
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
-              (unsigned int) m->size, GNUNET_i2s (&n->peer),
-              (unsigned long long)
-              GNUNET_TIME_absolute_get_remaining (m->deadline).rel_value);
-#endif
-  n->th =
-      GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer, m->size, 0,
-                                              GNUNET_TIME_absolute_get_remaining
-                                              (m->deadline), &transmit_ready,
+              "Asking transport for transmission of %u bytes to `%s' in next %s\n",
+              (unsigned int) m->size,
+              GNUNET_i2s (&n->peer),
+              GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (m->deadline),
+                                                      GNUNET_NO));
+  m->submission_time = GNUNET_TIME_absolute_get ();
+  n->th
+    = GNUNET_TRANSPORT_notify_transmit_ready (transport,
+                                              &n->peer,
+                                              m->size,
+                                              GNUNET_TIME_absolute_get_remaining (m->deadline),
+                                              &transmit_ready,
                                               n);
-  if (n->th != NULL)
+  if (NULL != n->th)
     return;
   /* message request too large or duplicate request */
   GNUNET_break (0);
   /* discard encrypted message */
-  GNUNET_CONTAINER_DLL_remove (n->message_head, n->message_tail, m);
+  GNUNET_CONTAINER_DLL_remove (n->message_head,
+                               n->message_tail,
+                               m);
+  n->queue_size--;
   GNUNET_free (m);
   process_queue (n);
 }
 
 
-
 /**
  * Function called by transport to notify us that
  * a peer connected to us (on the network level).
  *
  * @param cls closure
  * @param peer the peer that connected
- * @param atsi performance data
- * @param atsi_count number of entries in ats (excluding 0-termination)
  */
 static void
 handle_transport_notify_connect (void *cls,
-                                 const struct GNUNET_PeerIdentity *peer,
-                                 const struct GNUNET_ATS_Information *atsi,
-                                 uint32_t atsi_count)
+                                 const struct GNUNET_PeerIdentity *peer)
 {
   struct Neighbour *n;
 
-  if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity)))
+  if (0 == memcmp (peer,
+                   &GSC_my_identity,
+                   sizeof (struct GNUNET_PeerIdentity)))
   {
     GNUNET_break (0);
     return;
   }
   n = find_neighbour (peer);
-  if (n != NULL)
+  if (NULL != n)
   {
     /* duplicate connect notification!? */
     GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s exists already\n",
+                GNUNET_i2s (peer));
     return;
   }
-#if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received connection from `%4s'.\n",
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received connection from `%s'.\n",
               GNUNET_i2s (peer));
-#endif
-  n = GNUNET_malloc (sizeof (struct Neighbour));
+  n = GNUNET_new (struct Neighbour);
   n->peer = *peer;
   GNUNET_assert (GNUNET_OK ==
-                 GNUNET_CONTAINER_multihashmap_put (neighbours,
-                                                    &n->peer.hashPubKey, n,
+                 GNUNET_CONTAINER_multipeermap_put (neighbours,
+                                                    &n->peer,
+                                                    n,
                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   GNUNET_STATISTICS_set (GSC_stats,
                          gettext_noop ("# neighbour entries allocated"),
-                         GNUNET_CONTAINER_multihashmap_size (neighbours),
+                         GNUNET_CONTAINER_multipeermap_size (neighbours),
                          GNUNET_NO);
   n->kxinfo = GSC_KX_start (peer);
 }
@@ -360,15 +404,23 @@ handle_transport_notify_disconnect (void *cls,
 {
   struct Neighbour *n;
 
-#if DEBUG_CORE
+  if (0 == memcmp (peer,
+                   &GSC_my_identity,
+                   sizeof (struct GNUNET_PeerIdentity)))
+  {
+    GNUNET_break (0);
+    return;
+  }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Peer `%4s' disconnected from us; received notification from transport.\n",
+              "Peer `%s' disconnected from us; received notification from transport.\n",
               GNUNET_i2s (peer));
-#endif
   n = find_neighbour (peer);
-  if (n == NULL)
+  if (NULL == n)
   {
     GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s not found\n",
+                GNUNET_i2s (peer));
     return;
   }
   free_neighbour (n);
@@ -381,40 +433,41 @@ handle_transport_notify_disconnect (void *cls,
  * @param cls closure
  * @param peer (claimed) identity of the other peer
  * @param message the message
- * @param atsi performance data
- * @param atsi_count number of entries in ats (excluding 0-termination)
  */
 static void
-handle_transport_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
-                          const struct GNUNET_MessageHeader *message,
-                          const struct GNUNET_ATS_Information *atsi,
-                          uint32_t atsi_count)
+handle_transport_receive (void *cls,
+                          const struct GNUNET_PeerIdentity *peer,
+                          const struct GNUNET_MessageHeader *message)
 {
   struct Neighbour *n;
   uint16_t type;
 
-#if DEBUG_CORE > 1
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received message of type %u from `%4s', demultiplexing.\n",
-              (unsigned int) ntohs (message->type), GNUNET_i2s (peer));
-#endif
-  if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity)))
+              "Received message of type %u from `%s', demultiplexing.\n",
+              (unsigned int) ntohs (message->type),
+              GNUNET_i2s (peer));
+  if (0 == memcmp (peer,
+                   &GSC_my_identity,
+                   sizeof (struct GNUNET_PeerIdentity)))
   {
     GNUNET_break (0);
     return;
   }
   n = find_neighbour (peer);
-  if (n == NULL)
+  if (NULL == n)
   {
     /* received message from peer that is not connected!? */
     GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s not found\n",
+                GNUNET_i2s (peer));
     return;
   }
   type = ntohs (message->type);
   switch (type)
   {
-  case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
-    GSC_KX_handle_set_key (n->kxinfo, message);
+  case GNUNET_MESSAGE_TYPE_CORE_EPHEMERAL_KEY:
+    GSC_KX_handle_ephemeral_key (n->kxinfo, message);
     break;
   case GNUNET_MESSAGE_TYPE_CORE_PING:
     GSC_KX_handle_ping (n->kxinfo, message);
@@ -423,12 +476,17 @@ handle_transport_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
     GSC_KX_handle_pong (n->kxinfo, message);
     break;
   case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
-    GSC_KX_handle_encrypted_message (n->kxinfo, message, atsi, atsi_count);
+    GSC_KX_handle_encrypted_message (n->kxinfo, message);
+    break;
+  case GNUNET_MESSAGE_TYPE_DUMMY:
+    /*  Dummy messages for testing / benchmarking, just discard */
     break;
   default:
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                _("Unsupported message of type %u received.\n"),
-                (unsigned int) type);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                _("Unsupported message of type %u (%u bytes) received from peer `%s'\n"),
+                (unsigned int) type,
+                (unsigned int) ntohs (message->size),
+                GNUNET_i2s (peer));
     return;
   }
 }
@@ -454,33 +512,122 @@ GSC_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target,
   if (NULL == n)
   {
     GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s not found\n",
+                GNUNET_i2s (target));
     return;
   }
   msize = ntohs (msg->size);
   me = GNUNET_malloc (sizeof (struct NeighbourMessageEntry) + msize);
   me->deadline = GNUNET_TIME_relative_to_absolute (timeout);
   me->size = msize;
-  memcpy (&me[1], msg, msize);
-  GNUNET_CONTAINER_DLL_insert_tail (n->message_head, n->message_tail, me);
+  memcpy (&me[1],
+          msg,
+          msize);
+  GNUNET_CONTAINER_DLL_insert_tail (n->message_head,
+                                    n->message_tail,
+                                    me);
+  n->queue_size++;
   process_queue (n);
 }
 
 
+/**
+ * One of our neighbours has excess bandwidth, remember this.
+ *
+ * @param cls NULL
+ * @param pid identity of the peer with excess bandwidth
+ */
+static void
+handle_transport_notify_excess_bw (void *cls,
+                                   const struct GNUNET_PeerIdentity *pid)
+{
+  struct Neighbour *n;
+
+  n = find_neighbour (pid);
+  if (NULL == n)
+  {
+    GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s not found\n",
+                GNUNET_i2s (pid));
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Peer %s has excess bandwidth available\n",
+              GNUNET_i2s (pid));
+  n->has_excess_bandwidth = GNUNET_YES;
+  GSC_SESSIONS_solicit (pid);
+}
+
+
+/**
+ * Check how many messages are queued for the given neighbour.
+ *
+ * @param target neighbour to check
+ * @return number of items in the message queue
+ */
+unsigned int
+GSC_NEIGHBOURS_get_queue_size (const struct GNUNET_PeerIdentity *target)
+{
+  struct Neighbour *n;
+
+  n = find_neighbour (target);
+  if (NULL == n)
+  {
+    GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s not found\n",
+                GNUNET_i2s (target));
+    return UINT_MAX;
+  }
+  return n->queue_size;
+}
+
+
+/**
+ * Check if the given neighbour has excess bandwidth available.
+ *
+ * @param target neighbour to check
+ * @return #GNUNET_YES if excess bandwidth is available, #GNUNET_NO if not
+ */
+int
+GSC_NEIGHBOURS_check_excess_bandwidth (const struct GNUNET_PeerIdentity *target)
+{
+  struct Neighbour *n;
+
+  n = find_neighbour (target);
+  if (NULL == n)
+  {
+    GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s not found\n",
+                GNUNET_i2s (target));
+    return GNUNET_SYSERR;
+  }
+  return n->has_excess_bandwidth;
+}
+
+
 /**
  * Initialize neighbours subsystem.
  */
 int
 GSC_NEIGHBOURS_init ()
 {
-  neighbours = GNUNET_CONTAINER_multihashmap_create (128);
+  neighbours = GNUNET_CONTAINER_multipeermap_create (128,
+                                                     GNUNET_YES);
   transport =
-      GNUNET_TRANSPORT_connect (GSC_cfg, &GSC_my_identity, NULL,
-                                &handle_transport_receive,
-                                &handle_transport_notify_connect,
-                                &handle_transport_notify_disconnect);
+      GNUNET_TRANSPORT_connect2 (GSC_cfg,
+                                 &GSC_my_identity,
+                                 NULL,
+                                 &handle_transport_receive,
+                                 &handle_transport_notify_connect,
+                                 &handle_transport_notify_disconnect,
+                                 &handle_transport_notify_excess_bw);
   if (NULL == transport)
   {
-    GNUNET_CONTAINER_multihashmap_destroy (neighbours);
+    GNUNET_CONTAINER_multipeermap_destroy (neighbours);
     neighbours = NULL;
     return GNUNET_SYSERR;
   }
@@ -489,15 +636,17 @@ GSC_NEIGHBOURS_init ()
 
 
 /**
- * Wrapper around 'free_neighbour'.
+ * Wrapper around #free_neighbour().
  *
  * @param cls unused
  * @param key peer identity
- * @param value the 'struct Neighbour' to free
- * @return GNUNET_OK (continue to iterate)
+ * @param value the `struct Neighbour` to free
+ * @return #GNUNET_OK (continue to iterate)
  */
 static int
-free_neighbour_helper (void *cls, const GNUNET_HashCode * key, void *value)
+free_neighbour_helper (void *cls,
+                      const struct GNUNET_PeerIdentity * key,
+                      void *value)
 {
   struct Neighbour *n = value;
 
@@ -514,14 +663,19 @@ free_neighbour_helper (void *cls, const GNUNET_HashCode * key, void *value)
 void
 GSC_NEIGHBOURS_done ()
 {
-  if (NULL == transport)
-    return;
-  GNUNET_TRANSPORT_disconnect (transport);
-  transport = NULL;
-  GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper,
-                                         NULL);
-  GNUNET_CONTAINER_multihashmap_destroy (neighbours);
-  neighbours = NULL;
+  if (NULL != transport)
+  {
+    GNUNET_TRANSPORT_disconnect (transport);
+    transport = NULL;
+  }
+  if (NULL != neighbours)
+  {
+    GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+                                           &free_neighbour_helper,
+                                          NULL);
+    GNUNET_CONTAINER_multipeermap_destroy (neighbours);
+    neighbours = NULL;
+  }
 }
 
 /* end of gnunet-service-core_neighbours.c */