convert fs publish to MQ
[oweals/gnunet.git] / src / core / gnunet-service-core_neighbours.c
index f69f998706bdb704945000530e86fc3a203bb2ba..c1c62cf1aede06edd4dbc62ad0e5bd249fa46b04 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009, 2010, 2011 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -57,9 +57,14 @@ struct NeighbourMessageEntry
   struct GNUNET_TIME_Absolute deadline;
 
   /**
-   * How long is the message? (number of bytes following the "struct
-   * MessageEntry", but not including the size of "struct
-   * MessageEntry" itself!)
+   * What time did we submit the request?
+   */
+  struct GNUNET_TIME_Absolute submission_time;
+
+  /**
+   * How long is the message? (number of bytes following the `struct
+   * MessageEntry`, but not including the size of `struct
+   * MessageEntry` itself!)
    */
   size_t size;
 
@@ -119,7 +124,7 @@ struct Neighbour
 
 
 /**
- * Map of peer identities to 'struct Neighbour'.
+ * Map of peer identities to `struct Neighbour`.
  */
 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
 
@@ -141,7 +146,8 @@ find_neighbour (const struct GNUNET_PeerIdentity *peer)
 {
   if (NULL == neighbours)
     return NULL;
-  return GNUNET_CONTAINER_multipeermap_get (neighbours, peer);
+  return GNUNET_CONTAINER_multipeermap_get (neighbours,
+                                            peer);
 }
 
 
@@ -156,7 +162,7 @@ free_neighbour (struct Neighbour *n)
   struct NeighbourMessageEntry *m;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Destroying neighbour entry for peer `%4s'\n",
+              "Destroying neighbour entry for peer `%s'\n",
               GNUNET_i2s (&n->peer));
   while (NULL != (m = n->message_head))
   {
@@ -218,12 +224,16 @@ process_queue (struct Neighbour *n);
  * @return number of bytes transmitted
  */
 static size_t
-transmit_ready (void *cls, size_t size, void *buf)
+transmit_ready (void *cls,
+                size_t size,
+                void *buf)
 {
   struct Neighbour *n = cls;
   struct NeighbourMessageEntry *m;
   size_t ret;
   char *cbuf;
+  struct GNUNET_TIME_Relative delay;
+  struct GNUNET_TIME_Relative overdue;
 
   n->th = NULL;
   m = n->message_head;
@@ -247,15 +257,32 @@ transmit_ready (void *cls, size_t size, void *buf)
     process_queue (n);
     return 0;
   }
+  delay = GNUNET_TIME_absolute_get_duration (m->submission_time);
+  overdue = GNUNET_TIME_absolute_get_duration (m->deadline);
   cbuf = buf;
   GNUNET_assert (size >= m->size);
-  memcpy (cbuf, &m[1], m->size);
+  memcpy (cbuf,
+          &m[1],
+          m->size);
   ret = m->size;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Copied message of type %u and size %u into transport buffer for `%4s'\n",
-              (unsigned int)
-              ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
-              (unsigned int) ret, GNUNET_i2s (&n->peer));
+  if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Copied overdue message of type %u and size %u into transport buffer for `%s' with delay of %s\n",
+                (unsigned int)
+                ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
+                (unsigned int) ret,
+                GNUNET_i2s (&n->peer),
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES));
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Copied message of type %u and size %u into transport buffer for `%s' with delay of %s\n",
+                (unsigned int)
+                ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
+                (unsigned int) ret,
+                GNUNET_i2s (&n->peer),
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES));
   GNUNET_free (m);
   n->has_excess_bandwidth = GNUNET_NO;
   process_queue (n);
@@ -289,11 +316,12 @@ process_queue (struct Neighbour *n)
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Asking transport for transmission of %u bytes to `%4s' in next %s\n",
+              "Asking transport for transmission of %u bytes to `%s' in next %s\n",
               (unsigned int) m->size,
               GNUNET_i2s (&n->peer),
               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (m->deadline),
                                                       GNUNET_NO));
+  m->submission_time = GNUNET_TIME_absolute_get ();
   n->th
     = GNUNET_TRANSPORT_notify_transmit_ready (transport,
                                               &n->peer,
@@ -336,20 +364,24 @@ handle_transport_notify_connect (void *cls,
     return;
   }
   n = find_neighbour (peer);
-  if (n != NULL)
+  if (NULL != n)
   {
     /* duplicate connect notification!? */
     GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s exists already\n",
+                GNUNET_i2s (peer));
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received connection from `%4s'.\n",
+              "Received connection from `%s'.\n",
               GNUNET_i2s (peer));
   n = GNUNET_new (struct Neighbour);
   n->peer = *peer;
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multipeermap_put (neighbours,
-                                                    &n->peer, n,
+                                                    &n->peer,
+                                                    n,
                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   GNUNET_STATISTICS_set (GSC_stats,
                          gettext_noop ("# neighbour entries allocated"),
@@ -372,13 +404,23 @@ handle_transport_notify_disconnect (void *cls,
 {
   struct Neighbour *n;
 
+  if (0 == memcmp (peer,
+                   &GSC_my_identity,
+                   sizeof (struct GNUNET_PeerIdentity)))
+  {
+    GNUNET_break (0);
+    return;
+  }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Peer `%4s' disconnected from us; received notification from transport.\n",
+              "Peer `%s' disconnected from us; received notification from transport.\n",
               GNUNET_i2s (peer));
   n = find_neighbour (peer);
   if (NULL == n)
   {
     GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s not found\n",
+                GNUNET_i2s (peer));
     return;
   }
   free_neighbour (n);
@@ -401,18 +443,24 @@ handle_transport_receive (void *cls,
   uint16_t type;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received message of type %u from `%4s', demultiplexing.\n",
-              (unsigned int) ntohs (message->type), GNUNET_i2s (peer));
-  if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity)))
+              "Received message of type %u from `%s', demultiplexing.\n",
+              (unsigned int) ntohs (message->type),
+              GNUNET_i2s (peer));
+  if (0 == memcmp (peer,
+                   &GSC_my_identity,
+                   sizeof (struct GNUNET_PeerIdentity)))
   {
     GNUNET_break (0);
     return;
   }
   n = find_neighbour (peer);
-  if (n == NULL)
+  if (NULL == n)
   {
     /* received message from peer that is not connected!? */
     GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s not found\n",
+                GNUNET_i2s (peer));
     return;
   }
   type = ntohs (message->type);
@@ -464,6 +512,9 @@ GSC_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target,
   if (NULL == n)
   {
     GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s not found\n",
+                GNUNET_i2s (target));
     return;
   }
   msize = ntohs (msg->size);
@@ -482,8 +533,7 @@ GSC_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target,
 
 
 /**
- * One of our neighbours has excess bandwidth,
- * remember this.
+ * One of our neighbours has excess bandwidth, remember this.
  *
  * @param cls NULL
  * @param pid identity of the peer with excess bandwidth
@@ -494,15 +544,18 @@ handle_transport_notify_excess_bw (void *cls,
 {
   struct Neighbour *n;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Peer %s has excess bandwidth available\n",
-              GNUNET_i2s (pid));
   n = find_neighbour (pid);
   if (NULL == n)
   {
     GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s not found\n",
+                GNUNET_i2s (pid));
     return;
   }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Peer %s has excess bandwidth available\n",
+              GNUNET_i2s (pid));
   n->has_excess_bandwidth = GNUNET_YES;
   GSC_SESSIONS_solicit (pid);
 }
@@ -523,6 +576,9 @@ GSC_NEIGHBOURS_get_queue_size (const struct GNUNET_PeerIdentity *target)
   if (NULL == n)
   {
     GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s not found\n",
+                GNUNET_i2s (target));
     return UINT_MAX;
   }
   return n->queue_size;
@@ -544,6 +600,9 @@ GSC_NEIGHBOURS_check_excess_bandwidth (const struct GNUNET_PeerIdentity *target)
   if (NULL == n)
   {
     GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peer %s not found\n",
+                GNUNET_i2s (target));
     return GNUNET_SYSERR;
   }
   return n->has_excess_bandwidth;
@@ -556,9 +615,12 @@ GSC_NEIGHBOURS_check_excess_bandwidth (const struct GNUNET_PeerIdentity *target)
 int
 GSC_NEIGHBOURS_init ()
 {
-  neighbours = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO);
+  neighbours = GNUNET_CONTAINER_multipeermap_create (128,
+                                                     GNUNET_YES);
   transport =
-      GNUNET_TRANSPORT_connect2 (GSC_cfg, &GSC_my_identity, NULL,
+      GNUNET_TRANSPORT_connect2 (GSC_cfg,
+                                 &GSC_my_identity,
+                                 NULL,
                                  &handle_transport_receive,
                                  &handle_transport_notify_connect,
                                  &handle_transport_notify_disconnect,
@@ -574,7 +636,7 @@ GSC_NEIGHBOURS_init ()
 
 
 /**
- * Wrapper around 'free_neighbour'.
+ * Wrapper around #free_neighbour().
  *
  * @param cls unused
  * @param key peer identity