-modify timeout values further
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
index cfcedcc92d88d359dbc79d8ca169820df8b7c87b..fdbb29371f103dd0b1af0cdad424e493c7b6ca74 100644 (file)
@@ -52,7 +52,7 @@
 /**
  * How many buckets will we allow total.
  */
-#define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
+#define MAX_BUCKETS sizeof (struct GNUNET_HashCode) * 8
 
 /**
  * What is the maximum number of peers in a given bucket.
  */
 #define MAXIMUM_REPLICATION_LEVEL 16
 
+/**
+ * Maximum allowed number of pending messages per peer.
+ */
+#define MAXIMUM_PENDING_PER_PEER 64
+
 /**
  * How often to update our preference levels for peers in our routing tables.
  */
@@ -140,7 +145,7 @@ struct PeerPutMessage
   /**
    * The key we are storing under.
    */
-  GNUNET_HashCode key;
+  struct GNUNET_HashCode key;
 
   /* put path (if tracked) */
 
@@ -182,7 +187,7 @@ struct PeerResultMessage
   /**
    * The key of the corresponding GET request.
    */
-  GNUNET_HashCode key;
+  struct GNUNET_HashCode key;
 
   /* put path (if tracked) */
 
@@ -241,7 +246,7 @@ struct PeerGetMessage
   /**
    * The key we are looking for.
    */
-  GNUNET_HashCode key;
+  struct GNUNET_HashCode key;
 
   /* xquery */
 
@@ -301,8 +306,7 @@ struct PeerInfo
   struct PeerInfo *prev;
 
   /**
-   * Count of outstanding messages for peer.  FIXME: NEEDED?
-   * FIXME: bound queue size!?
+   * Count of outstanding messages for peer. 
    */
   unsigned int pending_count;
 
@@ -424,7 +428,7 @@ static struct GNUNET_ATS_PerformanceHandle *atsAPI;
  *         on error (same hashcode)
  */
 static int
-find_bucket (const GNUNET_HashCode * hc)
+find_bucket (const struct GNUNET_HashCode * hc)
 {
   unsigned int bits;
 
@@ -518,17 +522,15 @@ struct BloomConstructorContext
  * @return GNUNET_YES (we should continue to iterate)
  */
 static int
-add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
+add_known_to_bloom (void *cls, const struct GNUNET_HashCode * key, void *value)
 {
   struct BloomConstructorContext *ctx = cls;
-  GNUNET_HashCode mh;
+  struct GNUNET_HashCode mh;
 
   GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
               GNUNET_h2s (key), ctx->bf_mutator);
-#endif
   GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
   return GNUNET_YES;
 }
@@ -615,10 +617,8 @@ handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
   /* Check for connect to self message */
   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
     return;
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected %s to %s\n",
               GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey));
-#endif
   if (GNUNET_YES ==
       GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
                                               &peer->hashPubKey))
@@ -626,7 +626,7 @@ handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
     GNUNET_break (0);
     return;
   }
-  GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers connected"), 1,
+  GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), 1,
                             GNUNET_NO);
   peer_bucket = find_bucket (&peer->hashPubKey);
   GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS));
@@ -675,10 +675,8 @@ handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
   /* Check for disconnect from self message */
   if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
     return;
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnected %s from %s\n",
               GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey));
-#endif
   to_remove =
       GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
   if (NULL == to_remove)
@@ -686,7 +684,7 @@ handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
     GNUNET_break (0);
     return;
   }
-  GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers connected"), -1,
+  GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), -1,
                             GNUNET_NO);
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
@@ -880,7 +878,7 @@ get_forward_count (uint32_t hop_count, uint32_t target_replication)
  *           the two hash codes increases
  */
 static unsigned int
-get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
+get_distance (const struct GNUNET_HashCode * target, const struct GNUNET_HashCode * have)
 {
   unsigned int bucket;
   unsigned int msb;
@@ -917,7 +915,7 @@ get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
    * mismatching bit at 'bucket' */
   lsb = 0;
   for (i = bucket + 1;
-       (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
+       (i < sizeof (struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
   {
     if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
         GNUNET_CRYPTO_hash_get_bit (have, i))
@@ -940,7 +938,7 @@ get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
  *         GNUNET_NO otherwise.
  */
 static int
-am_closest_peer (const GNUNET_HashCode * key,
+am_closest_peer (const struct GNUNET_HashCode * key,
                  const struct GNUNET_CONTAINER_BloomFilter *bloom)
 {
   int bits;
@@ -949,7 +947,7 @@ am_closest_peer (const GNUNET_HashCode * key,
   int count;
   struct PeerInfo *pos;
 
-  if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
+  if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (struct GNUNET_HashCode)))
     return GNUNET_YES;
   bucket_num = find_bucket (key);
   GNUNET_assert (bucket_num >= 0);
@@ -995,7 +993,7 @@ am_closest_peer (const GNUNET_HashCode * key,
  * @return Peer to route to, or NULL on error
  */
 static struct PeerInfo *
-select_peer (const GNUNET_HashCode * key,
+select_peer (const struct GNUNET_HashCode * key,
              const struct GNUNET_CONTAINER_BloomFilter *bloom, uint32_t hops)
 {
   unsigned int bc;
@@ -1030,11 +1028,9 @@ select_peer (const GNUNET_HashCode * key,
         }
         else
         {
-#if DEBUG_DHT
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                       "Excluded peer `%s' due to BF match in greedy routing for %s\n",
                       GNUNET_i2s (&pos->id), GNUNET_h2s (key));
-#endif
           GNUNET_STATISTICS_update (GDS_stats,
                                     gettext_noop
                                     ("# Peers excluded from routing due to Bloomfilter"),
@@ -1067,11 +1063,9 @@ select_peer (const GNUNET_HashCode * key,
                                   gettext_noop
                                   ("# Peers excluded from routing due to Bloomfilter"),
                                   1, GNUNET_NO);
-#if DEBUG_DHT
         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                     "Excluded peer `%s' due to BF match in random routing for %s\n",
                     GNUNET_i2s (&pos->id), GNUNET_h2s (key));
-#endif
         pos = pos->next;
         continue;               /* Ignore bloomfiltered peers */
       }
@@ -1125,7 +1119,7 @@ select_peer (const GNUNET_HashCode * key,
  * @return number of peers returned in 'targets'.
  */
 static unsigned int
-get_target_peers (const GNUNET_HashCode * key,
+get_target_peers (const struct GNUNET_HashCode * key,
                   struct GNUNET_CONTAINER_BloomFilter *bloom,
                   uint32_t hop_count, uint32_t target_replication,
                   struct PeerInfo ***targets)
@@ -1154,12 +1148,10 @@ get_target_peers (const GNUNET_HashCode * key,
                                                      &nxt->id.hashPubKey));
     GNUNET_CONTAINER_bloomfilter_add (bloom, &rtargets[off]->id.hashPubKey);
   }
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Selected %u/%u peers at hop %u for %s (target was %u)\n", off,
               GNUNET_CONTAINER_multihashmap_size (all_known_peers),
               (unsigned int) hop_count, GNUNET_h2s (key), ret);
-#endif
   if (0 == off)
   {
     GNUNET_free (rtargets);
@@ -1197,7 +1189,7 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
                            struct GNUNET_TIME_Absolute expiration_time,
                            uint32_t hop_count,
                            struct GNUNET_CONTAINER_BloomFilter *bf,
-                           const GNUNET_HashCode * key,
+                           const struct GNUNET_HashCode * key,
                            unsigned int put_path_length,
                            struct GNUNET_PeerIdentity *put_path,
                            const void *data, size_t data_size)
@@ -1212,11 +1204,9 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
   struct GNUNET_PeerIdentity *pp;
 
   GNUNET_assert (NULL != bf);
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Adding myself (%s) to PUT bloomfilter for %s\n",
               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
-#endif
   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity.hashPubKey);
   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"),
                             1, GNUNET_NO);
@@ -1225,12 +1215,10 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
                         &targets);
   if (0 == target_count)
   {
-#if DEBUG_DHT
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Routing PUT for %s terminates after %u hops at %s\n",
                 GNUNET_h2s (key), (unsigned int) hop_count,
                 GNUNET_i2s (&my_identity));
-#endif
     return;
   }
   msize =
@@ -1254,11 +1242,15 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
   for (i = 0; i < target_count; i++)
   {
     target = targets[i];
-#if DEBUG_DHT
+    if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
+    {
+      GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
+                               1, GNUNET_NO);
+      continue; /* skip */
+    }
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key),
                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
-#endif
     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
     pending->importance = 0;    /* FIXME */
     pending->timeout = expiration_time;
@@ -1313,7 +1305,7 @@ void
 GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
                            enum GNUNET_DHT_RouteOption options,
                            uint32_t desired_replication_level,
-                           uint32_t hop_count, const GNUNET_HashCode * key,
+                           uint32_t hop_count, const struct GNUNET_HashCode * key,
                            const void *xquery, size_t xquery_size,
                            const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
                            uint32_t reply_bf_mutator,
@@ -1335,20 +1327,16 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
   target_count =
       get_target_peers (key, peer_bf, hop_count, desired_replication_level,
                         &targets);
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Adding myself (%s) to GET bloomfilter for %s\n",
               GNUNET_i2s (&my_identity), GNUNET_h2s (key));
-#endif
   GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity.hashPubKey);
   if (0 == target_count)
   {
-#if DEBUG_DHT
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Routing GET for %s terminates after %u hops at %s\n",
                 GNUNET_h2s (key), (unsigned int) hop_count,
                 GNUNET_i2s (&my_identity));
-#endif
     return;
   }
   reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
@@ -1367,11 +1355,15 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
   for (i = 0; i < target_count; i++)
   {
     target = targets[i];
-#if DEBUG_DHT
+    if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
+    {
+      GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
+                               1, GNUNET_NO);
+      continue; /* skip */
+    }
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key),
                 (unsigned int) hop_count, GNUNET_i2s (&target->id));
-#endif
     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
     pending->importance = 0;    /* FIXME */
     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
@@ -1429,7 +1421,7 @@ void
 GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
                              enum GNUNET_BLOCK_Type type,
                              struct GNUNET_TIME_Absolute expiration_time,
-                             const GNUNET_HashCode * key,
+                             const struct GNUNET_HashCode * key,
                              unsigned int put_path_length,
                              const struct GNUNET_PeerIdentity *put_path,
                              unsigned int get_path_length,
@@ -1462,6 +1454,14 @@ GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
     /* peer disconnected in the meantime, drop reply */
     return;
   }
+  if (pi->pending_count >= MAXIMUM_PENDING_PER_PEER)
+  {
+    /* skip */
+    GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
+                             1, GNUNET_NO);
+    return;
+  }
+
   GNUNET_STATISTICS_update (GDS_stats,
                             gettext_noop
                             ("# RESULT messages queued for transmission"), 1,
@@ -1532,7 +1532,7 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
   size_t payload_size;
   enum GNUNET_DHT_RouteOption options;
   struct GNUNET_CONTAINER_BloomFilter *bf;
-  GNUNET_HashCode test_key;
+  struct GNUNET_HashCode test_key;
 
   msize = ntohs (message->size);
   if (msize < sizeof (struct PeerPutMessage))
@@ -1565,7 +1565,7 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
            &test_key))
   {
   case GNUNET_YES:
-    if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode)))
+    if (0 != memcmp (&test_key, &put->key, sizeof (struct GNUNET_HashCode)))
     {
       GNUNET_break_op (0);
       return GNUNET_YES;
@@ -1578,10 +1578,8 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
     /* cannot verify, good luck */
     break;
   }
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for %s at %s\n",
-              GNUNET_h2s (&put->key), GNUNET_i2s (&my_identity));
-#endif
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n",
+              GNUNET_h2s (&put->key), GNUNET_i2s (peer));
   bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE,
                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
   GNUNET_break_op (GNUNET_YES ==
@@ -1641,14 +1639,14 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
  */
 static void
 handle_find_peer (const struct GNUNET_PeerIdentity *sender,
-                  const GNUNET_HashCode * key,
+                  const struct GNUNET_HashCode * key,
                   struct GNUNET_CONTAINER_BloomFilter *bf, uint32_t bf_mutator)
 {
   int bucket_idx;
   struct PeerBucket *bucket;
   struct PeerInfo *peer;
   unsigned int choice;
-  GNUNET_HashCode mhash;
+  struct GNUNET_HashCode mhash;
   const struct GNUNET_HELLO_Message *hello;
 
   /* first, check about our own HELLO */
@@ -1683,7 +1681,7 @@ handle_find_peer (const struct GNUNET_PeerIdentity *sender,
   }
 
   /* then, also consider sending a random HELLO from the closest bucket */
-  if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
+  if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (struct GNUNET_HashCode)))
     bucket_idx = closest_bucket;
   else
     bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key));
@@ -1800,11 +1798,9 @@ handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
   /* remember request for routing replies */
   GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size,
                    reply_bf, get->bf_mutator);
-#if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "GET for %s at %s after %u hops\n",
               GNUNET_h2s (&get->key), GNUNET_i2s (&my_identity),
               (unsigned int) ntohl (get->hop_count));
-#endif
   /* local lookup (this may update the reply_bf) */
   if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
       (am_closest_peer (&get->key, peer_bf)))
@@ -2008,7 +2004,7 @@ GDS_NEIGHBOURS_init ()
     bucket_size = (unsigned int) temp_config_num;
   atsAPI = GNUNET_ATS_performance_init (GDS_cfg, NULL, NULL);
   coreAPI =
-      GNUNET_CORE_connect (GDS_cfg, 1, NULL, &core_init, &handle_core_connect,
+      GNUNET_CORE_connect (GDS_cfg, NULL, &core_init, &handle_core_connect,
                            &handle_core_disconnect, NULL, GNUNET_NO, NULL,
                            GNUNET_NO, core_handlers);
   if (coreAPI == NULL)