convert fs publish to MQ
[oweals/gnunet.git] / src / dht / gnunet-service-dht_neighbours.c
index 773ad8983c0748ad052ae774055615c78369bf50..4add3c4aee0ea02d4b8426a903ad89ceb57432ca 100644 (file)
@@ -719,12 +719,14 @@ send_find_peer_message (void *cls)
     /* If we are finding many peers already, no need to send out our request right now! */
     find_peer_task =
         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
-                                      &send_find_peer_message, NULL);
+                                      &send_find_peer_message,
+                                      NULL);
     newly_found_peers = 0;
     return;
   }
   bcc.bf_mutator =
-      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
+      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                UINT32_MAX);
   bcc.bloom =
       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
                                          GNUNET_CONSTANTS_BLOOMFILTER_K);
@@ -732,7 +734,8 @@ send_find_peer_message (void *cls)
                                          &add_known_to_bloom,
                                          &bcc);
   GNUNET_STATISTICS_update (GDS_stats,
-                            gettext_noop ("# FIND PEER messages initiated"), 1,
+                            gettext_noop ("# FIND PEER messages initiated"),
+                            1,
                             GNUNET_NO);
   peer_bf =
       GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
@@ -819,8 +822,8 @@ handle_core_connect (void *cls,
     update_connect_preferences ();
     newly_found_peers++;
   }
-  if (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers) &&
-      (GNUNET_YES != disable_try_connect))
+  if ( (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
+       (GNUNET_YES != disable_try_connect))
   {
     /* got a first connection, good time to start with FIND PEER requests... */
     GNUNET_assert (NULL == find_peer_task);
@@ -870,6 +873,12 @@ handle_core_disconnect (void *cls,
                  GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
                                                        peer,
                                                        to_remove));
+  if ( (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers) &&
+        (GNUNET_YES != disable_try_connect)) )
+  {
+    GNUNET_SCHEDULER_cancel (find_peer_task);
+    find_peer_task = NULL;
+  }
   GNUNET_CRYPTO_hash (peer,
                      sizeof (struct GNUNET_PeerIdentity),
                      &phash);
@@ -894,6 +903,9 @@ handle_core_disconnect (void *cls,
     GNUNET_CONTAINER_DLL_remove (to_remove->head,
                                  to_remove->tail,
                                  pos);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Dropping message of type %u due to disconnect\n",
+                ntohs (pos->msg->type));
     discarded++;
     GNUNET_free (pos);
   }
@@ -927,6 +939,9 @@ core_transmit_notify (void *cls,
   size_t off;
   size_t msize;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "DHT ctn called with buffer of %u bytes\n",
+              (unsigned int) size);
   peer->th = NULL;
   while ((NULL != (pending = peer->head)) &&
          (0 == GNUNET_TIME_absolute_get_remaining (pending->timeout).rel_value_us))
@@ -936,24 +951,35 @@ core_transmit_notify (void *cls,
                               ("# Messages dropped (CORE timeout)"),
                               1,
                               GNUNET_NO);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Dropping message of type %u due to timeout\n",
+                ntohs (pending->msg->type));
     peer->pending_count--;
-    GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
+    GNUNET_CONTAINER_DLL_remove (peer->head,
+                                 peer->tail,
+                                 pending);
     GNUNET_free (pending);
   }
   if (NULL == pending)
   {
     /* no messages pending */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "No messages pending\n");
     return 0;
   }
   if (NULL == buf)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Got NULL buffer, trying again\n");
     peer->th =
         GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO,
                                            GNUNET_CORE_PRIO_BEST_EFFORT,
                                            GNUNET_TIME_absolute_get_remaining
-                                           (pending->timeout), &peer->id,
+                                           (pending->timeout),
+                                           &peer->id,
                                            ntohs (pending->msg->size),
-                                           &core_transmit_notify, peer);
+                                           &core_transmit_notify,
+                                           peer);
     GNUNET_break (NULL != peer->th);
     return 0;
   }
@@ -965,7 +991,13 @@ core_transmit_notify (void *cls,
                               gettext_noop
                               ("# Bytes transmitted to other peers"), msize,
                               GNUNET_NO);
-    memcpy (&cbuf[off], pending->msg, msize);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Transmitting message of type %u to %s\n",
+                ntohs (pending->msg->type),
+                GNUNET_i2s (&peer->id));
+    memcpy (&cbuf[off],
+            pending->msg,
+            msize);
     off += msize;
     peer->pending_count--;
     GNUNET_CONTAINER_DLL_remove (peer->head,
@@ -973,6 +1005,11 @@ core_transmit_notify (void *cls,
                                 pending);
     GNUNET_free (pending);
   }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%u bytes fit in %u bytes available, next message is %u bytes\n",
+              (unsigned int) off,
+              (unsigned int) size,
+              (NULL != peer->head) ? ntohs (peer->head->msg->size) : 0);
   if (NULL != (pending = peer->head))
   {
     /* technically redundant, but easier to read and
@@ -1006,7 +1043,10 @@ process_peer_queue (struct PeerInfo *peer)
   if (NULL == (pending = peer->head))
     return;
   if (NULL != peer->th)
-    return;
+  {
+    GNUNET_CORE_notify_transmit_ready_cancel (peer->th);
+    peer->th = NULL;
+  }
   GNUNET_STATISTICS_update (GDS_stats,
                             gettext_noop
                             ("# Bytes of bandwidth requested from core"),
@@ -1445,7 +1485,8 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
   GNUNET_assert (NULL != bf);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Adding myself (%s) to PUT bloomfilter for %s\n",
-              GNUNET_i2s (&my_identity), GNUNET_h2s (key));
+              GNUNET_i2s (&my_identity),
+              GNUNET_h2s (key));
   GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash);
   GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"),
                             1, GNUNET_NO);
@@ -1492,8 +1533,10 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
       continue;
     }
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key),
-                (unsigned int) hop_count, GNUNET_i2s (&target->id));
+                "Routing PUT for %s after %u hops to %s\n",
+                GNUNET_h2s (key),
+                (unsigned int) hop_count,
+                GNUNET_i2s (&target->id));
     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
     pending->importance = 0;    /* FIXME */
     pending->timeout = expiration_time;
@@ -1521,10 +1564,15 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
     pp = (struct GNUNET_PeerIdentity *) &ppm[1];
     memcpy (pp, put_path,
             sizeof (struct GNUNET_PeerIdentity) * put_path_length);
-    memcpy (&pp[put_path_length], data, data_size);
-    GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
+    memcpy (&pp[put_path_length],
+            data,
+            data_size);
+    GNUNET_CONTAINER_DLL_insert_tail (target->head,
+                                      target->tail,
+                                      pending);
     target->pending_count++;
-    process_peer_queue (target);
+    if (pending == target->head)
+      process_peer_queue (target);
   }
   GNUNET_free (targets);
   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
@@ -1579,8 +1627,10 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
                         &targets);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Adding myself (%s) to GET bloomfilter for %s\n",
-              GNUNET_i2s (&my_identity), GNUNET_h2s (key));
-  GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity_hash);
+              GNUNET_i2s (&my_identity),
+              GNUNET_h2s (key));
+  GNUNET_CONTAINER_bloomfilter_add (peer_bf,
+                                    &my_identity_hash);
   if (0 == target_count)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1616,8 +1666,10 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
       continue;
     }
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key),
-                (unsigned int) hop_count, GNUNET_i2s (&target->id));
+                "Routing GET for %s after %u hops to %s\n",
+                GNUNET_h2s (key),
+                (unsigned int) hop_count,
+                GNUNET_i2s (&target->id));
     pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
     pending->importance = 0;    /* FIXME */
     pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
@@ -1650,9 +1702,12 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
                                                                 &xq
                                                                 [xquery_size],
                                                                 reply_bf_size));
-    GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending);
+    GNUNET_CONTAINER_DLL_insert_tail (target->head,
+                                      target->tail,
+                                      pending);
     target->pending_count++;
-    process_peer_queue (target);
+    if (pending == target->head)
+      process_peer_queue (target);
   }
   GNUNET_free (targets);
   return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
@@ -1712,16 +1767,28 @@ GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
   if (NULL == pi)
   {
     /* peer disconnected in the meantime, drop reply */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "No matching peer for reply for key %s\n",
+                GNUNET_h2s (key));
     return;
   }
   if (pi->pending_count >= MAXIMUM_PENDING_PER_PEER)
   {
     /* skip */
-    GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
-                             1, GNUNET_NO);
+    GNUNET_STATISTICS_update (GDS_stats,
+                              gettext_noop ("# P2P messages dropped due to full queue"),
+                             1,
+                              GNUNET_NO);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Peer queue full, ignoring reply for key %s\n",
+                GNUNET_h2s (key));
     return;
   }
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Forwarding reply for key %s to peer %s\n",
+              GNUNET_h2s (key),
+              GNUNET_i2s (target));
   GNUNET_STATISTICS_update (GDS_stats,
                             gettext_noop
                             ("# RESULT messages queued for transmission"), 1,
@@ -1739,12 +1806,18 @@ GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
   prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
   prm->key = *key;
   paths = (struct GNUNET_PeerIdentity *) &prm[1];
-  memcpy (paths, put_path,
+  memcpy (paths,
+          put_path,
           put_path_length * sizeof (struct GNUNET_PeerIdentity));
-  memcpy (&paths[put_path_length], get_path,
+  memcpy (&paths[put_path_length],
+          get_path,
           get_path_length * sizeof (struct GNUNET_PeerIdentity));
-  memcpy (&paths[put_path_length + get_path_length], data, data_size);
-  GNUNET_CONTAINER_DLL_insert (pi->head, pi->tail, pending);
+  memcpy (&paths[put_path_length + get_path_length],
+          data,
+          data_size);
+  GNUNET_CONTAINER_DLL_insert (pi->head,
+                               pi->tail,
+                               pending);
   pi->pending_count++;
   process_peer_queue (pi);
 }
@@ -1760,10 +1833,14 @@ static void
 core_init (void *cls,
            const struct GNUNET_PeerIdentity *identity)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+             "CORE called, I am %s\n",
+              GNUNET_i2s (identity));
   my_identity = *identity;
   GNUNET_CRYPTO_hash (identity,
                      sizeof (struct GNUNET_PeerIdentity),
                      &my_identity_hash);
+  GDS_CLIENTS_init ();
 }
 
 
@@ -1778,7 +1855,8 @@ core_init (void *cls,
  *         #GNUNET_SYSERR to close it (signal serious error)
  */
 static int
-handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
+handle_dht_p2p_put (void *cls,
+                    const struct GNUNET_PeerIdentity *peer,
                     const struct GNUNET_MessageHeader *message)
 {
   const struct PeerPutMessage *put;
@@ -1811,10 +1889,12 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
     return GNUNET_YES;
   }
   GNUNET_STATISTICS_update (GDS_stats,
-                            gettext_noop ("# P2P PUT requests received"), 1,
+                            gettext_noop ("# P2P PUT requests received"),
+                            1,
                             GNUNET_NO);
   GNUNET_STATISTICS_update (GDS_stats,
-                            gettext_noop ("# P2P PUT bytes received"), msize,
+                            gettext_noop ("# P2P PUT bytes received"),
+                            msize,
                             GNUNET_NO);
   put_path = (const struct GNUNET_PeerIdentity *) &put[1];
   payload = &put_path[putlen];
@@ -1833,10 +1913,14 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
     tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
     LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
                  "R5N PUT %s: %s->%s (%u, %u=>%u)\n",
-                 GNUNET_h2s (&put->key), GNUNET_i2s (peer), tmp,
+                 GNUNET_h2s (&put->key),
+                 GNUNET_i2s (peer),
+                 tmp,
                  ntohl(put->hop_count),
-                 GNUNET_CRYPTO_hash_matching_bits (&phash, &put->key),
-                 GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, &put->key)
+                 GNUNET_CRYPTO_hash_matching_bits (&phash,
+                                                   &put->key),
+                 GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
+                                                   &put->key)
                 );
     GNUNET_free (tmp);
   }
@@ -1848,6 +1932,7 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
     if (0 != memcmp (&test_key, &put->key, sizeof (struct GNUNET_HashCode)))
     {
       char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
+
       GNUNET_break_op (0);
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "PUT with key `%s' for block with key %s\n",
@@ -1889,7 +1974,8 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
     }
   }
 
-  bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE,
+  bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
+                                          DHT_BLOOM_SIZE,
                                           GNUNET_CONSTANTS_BLOOMFILTER_K);
   GNUNET_break_op (GNUNET_YES ==
                    GNUNET_CONTAINER_bloomfilter_test (bf, &phash));
@@ -2025,8 +2111,12 @@ handle_find_peer (const struct GNUNET_PeerIdentity *sender,
       return;                   /* no non-masked peer available */
     if (peer == NULL)
       peer = bucket->head;
-    GNUNET_CRYPTO_hash (&peer->id, sizeof (struct GNUNET_PeerIdentity), &phash);
-    GNUNET_BLOCK_mingle_hash (&phash, bf_mutator, &mhash);
+    GNUNET_CRYPTO_hash (&peer->id,
+                        sizeof (struct GNUNET_PeerIdentity),
+                        &phash);
+    GNUNET_BLOCK_mingle_hash (&phash,
+                              bf_mutator,
+                              &mhash);
     hello = GDS_HELLO_get (&peer->id);
   }
   while ((hello == NULL) ||
@@ -2067,7 +2157,8 @@ handle_dht_p2p_get (void *cls,
   int forwarded;
 
   GNUNET_break (0 !=
-                memcmp (peer, &my_identity,
+                memcmp (peer,
+                        &my_identity,
                         sizeof (struct GNUNET_PeerIdentity)));
   /* parse and validate message */
   msize = ntohs (message->size);
@@ -2435,11 +2526,7 @@ GDS_NEIGHBOURS_done ()
   all_desired_peers = NULL;
   GNUNET_ATS_connectivity_done (ats_ch);
   ats_ch = NULL;
-  if (NULL != find_peer_task)
-  {
-    GNUNET_SCHEDULER_cancel (find_peer_task);
-    find_peer_task = NULL;
-  }
+  GNUNET_assert (NULL == find_peer_task);
 }