-improve UDP logging
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
index 388111f69caa32e7737a0c39c4069bcad461bec0..b5ced6f33678779189f8f4a7b95ffc3fa1a9b2af 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C)
+     Copyright (C)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -189,6 +189,16 @@ struct PeerContext
  * Globals
 ***********************************************************************/
 
+/**
+ * Sampler used for the Brahms protocol itself.
+ */
+static struct RPS_Sampler *prot_sampler;
+
+/**
+ * Sampler used for the clients.
+ */
+static struct RPS_Sampler *client_sampler;
+
 /**
  * Set of all peers to keep track of them.
  */
@@ -441,7 +451,10 @@ get_rand_peer_ignore_list (const struct GNUNET_PeerIdentity *peer_list,
     rem_from_list (tmp_peer_list, &tmp_size, peer);
 
     if (0 == tmp_size)
+    {
+      GNUNET_free (peer);
       return NULL;
+    }
 
     /**;
      * Choose the r_index of the peer we want to return
@@ -692,14 +705,15 @@ insert_in_gossip_list_scheduled (const struct PeerContext *peer_ctx)
   void
 insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer)
 {
-  RPS_sampler_update_list (peer);
+  RPS_sampler_update (prot_sampler,   peer);
+  RPS_sampler_update (client_sampler, peer);
 }
 
 
 /**
  * Check whether #insert_in_sampler was already scheduled
  */
-  int
+static int
 insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
 {
   unsigned int i;
@@ -713,30 +727,50 @@ insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
 
 /**
  * Wrapper around #RPS_sampler_resize()
+ *
+ * If we do not have enough sampler elements, double current sampler size
+ * If we have more than enough sampler elements, halv current sampler size
  */
-  void
-resize_wrapper ()
+static void
+resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
+{
+  unsigned int sampler_size;
+
+  // TODO statistics
+  // TODO respect the min, max
+  sampler_size = RPS_sampler_get_size (sampler);
+  if (sampler_size > new_size * 4)
+  { /* Shrinking */
+    RPS_sampler_resize (sampler, sampler_size / 2);
+  }
+  else if (sampler_size < new_size)
+  { /* Growing */
+    RPS_sampler_resize (sampler, sampler_size * 2);
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
+}
+
+
+/**
+ * Wrapper around #RPS_sampler_resize() resizing the client sampler
+ */
+static void
+client_resize_wrapper ()
 {
   uint32_t bigger_size;
   unsigned int sampler_size;
 
   // TODO statistics
 
+  sampler_size = RPS_sampler_get_size (client_sampler);
+
   if (sampler_size_est_need > sampler_size_client_need)
     bigger_size = sampler_size_est_need;
   else
     bigger_size = sampler_size_client_need;
 
   // TODO respect the min, max
-  sampler_size = RPS_sampler_get_size ();
-  if (sampler_size > bigger_size * 4)
-  { /* Shrinking */
-    RPS_sampler_resize (sampler_size / 2);
-  }
-  else if (sampler_size < bigger_size)
-  { /* Growing */
-    RPS_sampler_resize (sampler_size * 2);
-  }
+  resize_wrapper (client_sampler, bigger_size);
   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
 }
 
@@ -777,7 +811,7 @@ est_request_rate()
         max_round_duration.rel_value_us / request_rate.rel_value_us;
 
     /* Resize the sampler */
-    resize_wrapper ();
+    client_resize_wrapper ();
   }
   last_request = GNUNET_TIME_absolute_get ();
 }
@@ -787,6 +821,10 @@ est_request_rate()
  * /Util functions
 ***********************************************************************/
 
+
+
+
+
 /**
  * Function called by NSE.
  *
@@ -802,7 +840,7 @@ nse_callback (void *cls, struct GNUNET_TIME_Absolute timestamp,
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
-       logestimate, std_dev, RPS_sampler_get_size ());
+       logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
   //scale = .01;
   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
   // GNUNET_NSE_log_estimate_to_n (logestimate);
@@ -817,7 +855,8 @@ nse_callback (void *cls, struct GNUNET_TIME_Absolute timestamp,
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
 
   /* If the NSE has changed adapt the lists accordingly */
-  resize_wrapper ();
+  resize_wrapper (prot_sampler, sampler_size_est_need);
+  client_resize_wrapper ();
 }
 
 
@@ -898,7 +937,8 @@ handle_client_request (void *cls,
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested %" PRIX32 " random peer(s).\n", num_peers);
 
-  RPS_sampler_get_n_rand_peers (client_respond, client, num_peers, GNUNET_YES);
+  RPS_sampler_get_n_rand_peers (client_sampler, client_respond,
+                                client, num_peers, GNUNET_YES);
 
   GNUNET_SERVER_receive_done (client,
                              GNUNET_OK);
@@ -940,7 +980,8 @@ handle_client_seed (void *cls,
   peers = (struct GNUNET_PeerIdentity *) &message[1];
 
   for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ )
-    RPS_sampler_update_list (&peers[i]);
+    RPS_sampler_update (prot_sampler,   &peers[i]);
+    RPS_sampler_update (client_sampler, &peers[i]);
 
   GNUNET_SERVER_receive_done (client,
                              GNUNET_OK);
@@ -1188,6 +1229,7 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     {
       peer = *tmp_peer;
       GNUNET_free (tmp_peer);
+
       GNUNET_array_append (pending_pull_reply_list, pending_pull_reply_list_size, peer);
 
       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer))
@@ -1242,7 +1284,7 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     for (i = second_border ; i < sampler_size_est_need ; i++)
     {
       /* Update gossip list with peers from history */
-      RPS_sampler_get_n_rand_peers (hist_update, NULL, 1, GNUNET_NO);
+      RPS_sampler_get_n_rand_peers (prot_sampler, hist_update, NULL, 1, GNUNET_NO);
       num_hist_update_tasks++;
       // TODO change the peer_flags accordingly
     }
@@ -1250,7 +1292,7 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   }
   else
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list. ()\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list.\n");
   }
   // TODO independent of that also get some peers from CADET_get_peers()?
 
@@ -1259,13 +1301,15 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 
   for ( i = 0 ; i < push_list_size ; i++ )
   {
-    RPS_sampler_update_list (&push_list[i]);
+    RPS_sampler_update (prot_sampler,   &push_list[i]);
+    RPS_sampler_update (client_sampler, &push_list[i]);
     // TODO set in_flag?
   }
 
   for ( i = 0 ; i < pull_list_size ; i++ )
   {
-    RPS_sampler_update_list (&pull_list[i]);
+    RPS_sampler_update (prot_sampler,   &push_list[i]);
+    RPS_sampler_update (client_sampler, &push_list[i]);
     // TODO set in_flag?
   }
 
@@ -1302,7 +1346,8 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  * Open a connection to given peer and store channel and mq.
  */
   void
-insertCB (void *cls, const struct GNUNET_PeerIdentity *id)
+insertCB (void *cls, struct RPS_Sampler *sampler,
+          const struct GNUNET_PeerIdentity *id)
 {
   // We open a channel to be notified when this peer goes down.
   (void) get_channel (peer_map, id);
@@ -1313,12 +1358,13 @@ insertCB (void *cls, const struct GNUNET_PeerIdentity *id)
  * Close the connection to given peer and delete channel and mq.
  */
   void
-removeCB (void *cls, const struct GNUNET_PeerIdentity *id)
+removeCB (void *cls, struct RPS_Sampler *sampler,
+          const struct GNUNET_PeerIdentity *id)
 {
   size_t s;
   struct PeerContext *ctx;
 
-  s = RPS_sampler_count_id (id);
+  s = RPS_sampler_count_id (sampler, id);
   if ( 1 >= s )
   {
     if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, id))
@@ -1405,6 +1451,9 @@ peer_remove_cb (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
 
   peer_ctx = (struct PeerContext *) value;
 
+  if (0 != peer_ctx->num_outstanding_ops)
+    GNUNET_array_grow (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, 0);
+
   if (NULL != peer_ctx->mq)
     GNUNET_MQ_destroy (peer_ctx->mq);
 
@@ -1467,7 +1516,8 @@ shutdown_task (void *cls,
 
   GNUNET_NSE_disconnect (nse);
   GNUNET_CADET_disconnect (cadet_handle);
-  RPS_sampler_destroy ();
+  RPS_sampler_destroy (prot_sampler);
+  RPS_sampler_destroy (client_sampler);
   GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (peer_map));
   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
   GNUNET_array_grow (gossip_list, gossip_list_size, 0);
@@ -1555,7 +1605,8 @@ cleanup_channel (void *cls,
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Cleaning up channel to peer %s\n",
        GNUNET_i2s (peer));
 
-  RPS_sampler_reinitialise_by_value (peer);
+  RPS_sampler_reinitialise_by_value (prot_sampler,   peer);
+  RPS_sampler_reinitialise_by_value (client_sampler, peer);
 
   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
   {
@@ -1691,7 +1742,9 @@ run (void *cls,
   half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
 
-  RPS_sampler_init (sampler_size_est_need, max_round_interval,
+  prot_sampler =   RPS_sampler_init (sampler_size_est_need, max_round_interval,
+      insertCB, NULL, removeCB, NULL);
+  client_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval,
       insertCB, NULL, removeCB, NULL);
 
   /* Initialise push and pull maps */