-improve UDP logging
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
index 929e47a424986068f40346859310d660a924c193..b5ced6f33678779189f8f4a7b95ffc3fa1a9b2af 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C)
+     Copyright (C)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -38,8 +38,6 @@
 
 // TODO modify @brief in every file
 
-// TODO take care that messages are not longer than 64k
-
 // TODO check for overflows
 
 // TODO align message structs
 
 // TODO malicious peer
 
-// TODO Change API to accept initialisation peers
-
-// TODO Change API to accept good peers 'friends'
+// TODO connect to friends
 
 // TODO store peers somewhere
 
-// TODO check that every id we get is valid - is it reachable?
-
 // TODO ignore list?
 
 // hist_size_init, hist_size_max
@@ -68,11 +62,12 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
 /**
  * Our own identity.
  */
-static struct GNUNET_PeerIdentity *own_identity;
+static struct GNUNET_PeerIdentity own_identity;
 
 
   struct GNUNET_PeerIdentity *
-get_rand_peer (const struct GNUNET_PeerIdentity *peer_list, unsigned int size);
+get_rand_peer_ignore_list (const struct GNUNET_PeerIdentity *peer_list, unsigned int size,
+                           const struct GNUNET_PeerIdentity *ignore_list, unsigned int ignore_size);
 
 
 /***********************************************************************
@@ -95,15 +90,15 @@ struct client_ctx
  */
 enum PeerFlags
 {
-  IN_OTHER_SAMPLER_LIST = 0x01, // unneeded?
-  IN_OTHER_GOSSIP_LIST  = 0x02, // unneeded?
-  IN_OWN_SAMPLER_LIST   = 0x04, // unneeded?
-  IN_OWN_GOSSIP_LIST    = 0x08, // unneeded?
+  PULL_REPLY_PENDING   = 0x01,
+  IN_OTHER_GOSSIP_LIST = 0x02, // unneeded?
+  IN_OWN_SAMPLER_LIST  = 0x04, // unneeded?
+  IN_OWN_GOSSIP_LIST   = 0x08, // unneeded?
 
   /**
    * We set this bit when we can be sure the other peer is/was live.
    */
-  LIVING                = 0x10
+  LIVING               = 0x10
 };
 
 
@@ -174,6 +169,11 @@ struct PeerContext
    */
   struct GNUNET_CADET_TransmitHandle *is_live_task;
 
+  /**
+   * Identity of the peer
+   */
+  struct GNUNET_PeerIdentity peer_id;
+
   /**
    * This is pobably followed by 'statistical' data (when we first saw
    * him, how did we get his ID, how many pushes (in a timeinterval),
@@ -189,6 +189,16 @@ struct PeerContext
  * Globals
 ***********************************************************************/
 
+/**
+ * Sampler used for the Brahms protocol itself.
+ */
+static struct RPS_Sampler *prot_sampler;
+
+/**
+ * Sampler used for the clients.
+ */
+static struct RPS_Sampler *client_sampler;
+
 /**
  * Set of all peers to keep track of them.
  */
@@ -207,12 +217,6 @@ static struct GNUNET_PeerIdentity *gossip_list;
 static uint32_t gossip_list_size;
 
 
-/**
- * The actual size of the sampler
- */
-static unsigned int sampler_size;
-//size_t sampler_size;
-
 /**
  * The size of sampler we need to be able to satisfy the client's need of
  * random peers.
@@ -234,16 +238,12 @@ static unsigned int sampler_size_est_need;
 /**
  * Percentage of total peer number in the gossip list
  * to send random PUSHes to
- *
- * TODO do not read from configuration
  */
 static float alpha;
 
 /**
  * Percentage of total peer number in the gossip list
  * to send random PULLs to
- *
- * TODO do not read from configuration
  */
 static float beta;
 
@@ -335,6 +335,17 @@ static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
 static struct GNUNET_TIME_Relative  request_rate;
 
 
+/**
+ * List with the peers we sent requests to.
+ */
+struct GNUNET_PeerIdentity *pending_pull_reply_list;
+
+/**
+ * Size of #pending_pull_reply_list.
+ */
+uint32_t pending_pull_reply_list_size;
+
+
 /**
  * Number of history update tasks.
  */
@@ -378,32 +389,84 @@ in_arr (const struct GNUNET_PeerIdentity *array,
     return GNUNET_YES;
 }
 
+/**
+ * Remove peer from list.
+ */
+  void
+rem_from_list (struct GNUNET_PeerIdentity *peer_list,
+               unsigned int *list_size,
+               const struct GNUNET_PeerIdentity *peer)
+{
+  unsigned int i;
+
+  for ( i = 0 ; i < *list_size ; i++ )
+  {
+    if (0 == GNUNET_CRYPTO_cmp_peer_identity (&peer_list[i], peer))
+    {
+      if (i < *list_size -1)
+      { /* Not at the last entry -- shift peers left */
+        memcpy (&peer_list[i], &peer_list[i +1],
+                (*list_size - i -1) * sizeof (struct GNUNET_PeerIdentity));
+      }
+      /* Remove last entry (should be now useless PeerID) */
+      GNUNET_array_grow (peer_list, *list_size, *list_size -1);
+    }
+  }
+}
 
 /**
- * Get random peer from the gossip list.
+ * Get random peer from the given list but don't return one from the @a ignore_list.
  */
   struct GNUNET_PeerIdentity *
-get_rand_peer (const struct GNUNET_PeerIdentity *peer_list, unsigned int list_size)
+get_rand_peer_ignore_list (const struct GNUNET_PeerIdentity *peer_list,
+                           uint32_t list_size,
+                           const struct GNUNET_PeerIdentity *ignore_list,
+                           uint32_t ignore_size)
 {
   uint32_t r_index;
+  uint32_t tmp_size;
+  struct GNUNET_PeerIdentity *tmp_peer_list;
   struct GNUNET_PeerIdentity *peer;
 
+  GNUNET_assert (NULL != peer_list);
+  if (0 == list_size)
+    return NULL;
+
+  tmp_size = 0;
+  tmp_peer_list = NULL;
+  GNUNET_array_grow (tmp_peer_list, tmp_size, list_size);
+  memcpy (tmp_peer_list, peer_list, list_size * sizeof (struct GNUNET_PeerIdentity));
   peer = GNUNET_new (struct GNUNET_PeerIdentity);
-  // FIXME if we have only NULL in gossip list this will block
-  // but then we might have a problem nevertheless
 
-  do
+  /**;
+   * Choose the r_index of the peer we want to return
+   * at random from the interval of the gossip list
+   */
+  r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
+                                      tmp_size);
+  *peer = tmp_peer_list[r_index];
+
+  while (in_arr (ignore_list, ignore_size, peer))
   {
+    rem_from_list (tmp_peer_list, &tmp_size, peer);
+
+    if (0 == tmp_size)
+    {
+      GNUNET_free (peer);
+      return NULL;
+    }
 
     /**;
      * Choose the r_index of the peer we want to return
      * at random from the interval of the gossip list
      */
     r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
-                                     list_size);
+                                        tmp_size);
+    *peer = tmp_peer_list[r_index];
+  }
+
 
-    *peer = peer_list[r_index];
-  } while (NULL == peer);
+  GNUNET_array_grow (tmp_peer_list, tmp_size, 0);
 
   return peer;
 }
@@ -459,13 +522,19 @@ hist_update (void *cls, struct GNUNET_PeerIdentity *ids, uint32_t num_peers)
  * This is given to ntfy_tmt_rdy and called when the channel was
  * successfully established.
  */
-  size_t
+static size_t
 peer_is_live (void *cls, size_t size, void *buf)
 {
+  struct PeerContext *ctx = cls;
   struct GNUNET_PeerIdentity *peer;
   struct PeerContext *peer_ctx;
 
-  peer = (struct GNUNET_PeerIdentity *) cls;
+  //if (NULL == buf ||
+  //    0 == size)
+  // TODO check
+
+  ctx->is_live_task = NULL;
+  peer = &ctx->peer_id;
   peer_ctx = get_peer_ctx (peer_map, peer);
   peer_ctx->peer_flags |= LIVING;
 
@@ -480,9 +549,11 @@ peer_is_live (void *cls, size_t size, void *buf)
     GNUNET_array_grow (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, 0);
   }
 
-  GNUNET_free (peer);
-
-  buf = NULL;
+  //if (NULL != peer_ctx->is_live_task)
+  //{
+  //  GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->is_live_task);
+  //  peer_ctx->is_live_task = NULL; // needed?
+  //}
   return 0;
 }
 
@@ -495,7 +566,6 @@ get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
              const struct GNUNET_PeerIdentity *peer)
 {
   struct PeerContext *ctx;
-  struct GNUNET_PeerIdentity *tmp_peer;
 
   ctx = get_peer_ctx (peer_map, peer);
   if (NULL == ctx->send_channel)
@@ -506,11 +576,12 @@ get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
 
     if (NULL == ctx->recv_channel)
     {
-      tmp_peer = GNUNET_new (struct GNUNET_PeerIdentity);
-      *tmp_peer = *peer;
-      ctx->is_live_task = GNUNET_CADET_notify_transmit_ready (ctx->send_channel, GNUNET_NO,
-                                                              GNUNET_TIME_UNIT_FOREVER_REL,
-                                                              0, peer_is_live, tmp_peer);
+      ctx->peer_id = *peer;
+      ctx->is_live_task =
+          GNUNET_CADET_notify_transmit_ready (ctx->send_channel, GNUNET_NO,
+                                              GNUNET_TIME_UNIT_FOREVER_REL,
+                                              sizeof (struct GNUNET_MessageHeader),
+                                              peer_is_live, ctx);
     }
 
     // do I have to explicitly put it in the peer_map?
@@ -634,13 +705,15 @@ insert_in_gossip_list_scheduled (const struct PeerContext *peer_ctx)
   void
 insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer)
 {
-  RPS_sampler_update_list (peer);
+  RPS_sampler_update (prot_sampler,   peer);
+  RPS_sampler_update (client_sampler, peer);
 }
 
+
 /**
  * Check whether #insert_in_sampler was already scheduled
  */
-  int
+static int
 insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
 {
   unsigned int i;
@@ -652,31 +725,95 @@ insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
 }
 
 
-
 /**
  * Wrapper around #RPS_sampler_resize()
+ *
+ * If we do not have enough sampler elements, double current sampler size
+ * If we have more than enough sampler elements, halv current sampler size
  */
-  void
-resize_wrapper ()
+static void
+resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
+{
+  unsigned int sampler_size;
+
+  // TODO statistics
+  // TODO respect the min, max
+  sampler_size = RPS_sampler_get_size (sampler);
+  if (sampler_size > new_size * 4)
+  { /* Shrinking */
+    RPS_sampler_resize (sampler, sampler_size / 2);
+  }
+  else if (sampler_size < new_size)
+  { /* Growing */
+    RPS_sampler_resize (sampler, sampler_size * 2);
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
+}
+
+
+/**
+ * Wrapper around #RPS_sampler_resize() resizing the client sampler
+ */
+static void
+client_resize_wrapper ()
 {
   uint32_t bigger_size;
+  unsigned int sampler_size;
 
   // TODO statistics
 
+  sampler_size = RPS_sampler_get_size (client_sampler);
+
   if (sampler_size_est_need > sampler_size_client_need)
-    bigger_size = sampler_size_client_need;
-  else
     bigger_size = sampler_size_est_need;
+  else
+    bigger_size = sampler_size_client_need;
 
-  // TODO respect the request rate, min, max
-  if (sampler_size > bigger_size*4)
-  { /* Shrinking */
-    RPS_sampler_resize (sampler_size/2);
-  }
-  else if (sampler_size < bigger_size)
-  { /* Growing */
-    RPS_sampler_resize (sampler_size*2);
+  // TODO respect the min, max
+  resize_wrapper (client_sampler, bigger_size);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
+}
+
+
+/**
+ * Estimate request rate
+ *
+ * Called every time we receive a request from the client.
+ */
+  void
+est_request_rate()
+{
+  struct GNUNET_TIME_Relative max_round_duration;
+
+  if (request_deltas_size > req_counter)
+    req_counter++;
+  if ( 1 < req_counter)
+  {
+    /* Shift last request deltas to the right */
+    memcpy (&request_deltas[1],
+        request_deltas,
+        (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
+
+    /* Add current delta to beginning */
+    request_deltas[0] =
+        GNUNET_TIME_absolute_get_difference (last_request,
+                                             GNUNET_TIME_absolute_get ());
+    request_rate = T_relative_avg (request_deltas, req_counter);
+
+    /* Compute the duration a round will maximally take */
+    max_round_duration =
+        GNUNET_TIME_relative_add (round_interval,
+                                  GNUNET_TIME_relative_divide (round_interval, 2));
+
+    /* Set the estimated size the sampler has to have to
+     * satisfy the current client request rate */
+    sampler_size_client_need =
+        max_round_duration.rel_value_us / request_rate.rel_value_us;
+
+    /* Resize the sampler */
+    client_resize_wrapper ();
   }
+  last_request = GNUNET_TIME_absolute_get ();
 }
 
 
@@ -684,6 +821,10 @@ resize_wrapper ()
  * /Util functions
 ***********************************************************************/
 
+
+
+
+
 /**
  * Function called by NSE.
  *
@@ -691,28 +832,31 @@ resize_wrapper ()
  * accordingly.
  */
   void
-nse_callback (void *cls, struct GNUNET_TIME_Absolute timestamp, double logestimate, double std_dev)
+nse_callback (void *cls, struct GNUNET_TIME_Absolute timestamp,
+              double logestimate, double std_dev)
 {
   double estimate;
   //double scale; // TODO this might go gloabal/config
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-      "Received a ns estimate - logest: %f, std_dev: %f (old_size: %f)\n",
-      logestimate, std_dev, sampler_size);
+       "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
+       logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
   //scale = .01;
   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
   // GNUNET_NSE_log_estimate_to_n (logestimate);
-  estimate = pow (estimate, 1./3);
+  estimate = pow (estimate, 1.0 / 3);
   // TODO add if std_dev is a number
   // estimate += (std_dev * scale);
-  if ( 0 < estimate ) {
+  if (2 < ceil (estimate))
+  {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
     sampler_size_est_need = estimate;
   } else
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
 
   /* If the NSE has changed adapt the lists accordingly */
-  resize_wrapper ();
+  resize_wrapper (prot_sampler, sampler_size_est_need);
+  client_resize_wrapper ();
 }
 
 
@@ -728,10 +872,16 @@ void client_respond (void *cls,
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
   struct GNUNET_SERVER_Client *client;
+  uint32_t size_needed;
   struct client_ctx *cli_ctx;
 
   client = (struct GNUNET_SERVER_Client *) cls;
 
+  size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
+                num_peers * sizeof (struct GNUNET_PeerIdentity);
+
+  GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= size_needed);
+
   ev = GNUNET_MQ_msg_extra (out_msg,
                             num_peers * sizeof (struct GNUNET_PeerIdentity),
                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
@@ -741,14 +891,14 @@ void client_respond (void *cls,
       ids,
       num_peers * sizeof (struct GNUNET_PeerIdentity));
   GNUNET_free (ids);
-  
+
   cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct client_ctx);
   if ( NULL == cli_ctx ) {
     cli_ctx = GNUNET_new (struct client_ctx);
     cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
     GNUNET_SERVER_client_set_user_context (client, cli_ctx);
   }
-  
+
   GNUNET_MQ_send (cli_ctx->mq, ev);
 }
 
@@ -767,40 +917,28 @@ handle_client_request (void *cls,
 {
   struct GNUNET_RPS_CS_RequestMessage *msg;
   uint32_t num_peers;
-  struct GNUNET_TIME_Relative max_round_duration;
-
+  uint32_t size_needed;
+  uint32_t i;
 
-  /* Estimate request rate */
-  if (request_deltas_size > req_counter)
-    req_counter++;
-  if ( 1 < req_counter)
-  {
-    /* Shift last request deltas to the right */
-    memcpy (&request_deltas[1],
-        request_deltas,
-        (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
-    /* Add current delta to beginning */
-    request_deltas[0] = GNUNET_TIME_absolute_get_difference (last_request,
-        GNUNET_TIME_absolute_get ());
-    request_rate = T_relative_avg (request_deltas, req_counter);
+  msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
 
-    max_round_duration = GNUNET_TIME_relative_add (round_interval,
-        GNUNET_TIME_relative_divide (round_interval, 2));
-    sampler_size_client_need = max_round_duration.rel_value_us / request_rate.rel_value_us;
+  num_peers = ntohl (msg->num_peers);
+  size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
+                num_peers * sizeof (struct GNUNET_PeerIdentity);
 
-    resize_wrapper ();
+  if (GNUNET_SERVER_MAX_MESSAGE_SIZE < size_needed)
+  {
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
   }
-  last_request = GNUNET_TIME_absolute_get ();
 
-
-  // TODO check message size
-  msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
-
-  num_peers = ntohl (msg->num_peers);
+  for (i = 0 ; i < num_peers ; i++)
+    est_request_rate();
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested %" PRIX32 " random peer(s).\n", num_peers);
 
-  RPS_sampler_get_n_rand_peers (client_respond, client, num_peers, GNUNET_YES);
+  RPS_sampler_get_n_rand_peers (client_sampler, client_respond,
+                                client, num_peers, GNUNET_YES);
 
   GNUNET_SERVER_receive_done (client,
                              GNUNET_OK);
@@ -842,7 +980,8 @@ handle_client_seed (void *cls,
   peers = (struct GNUNET_PeerIdentity *) &message[1];
 
   for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ )
-    RPS_sampler_update_list (&peers[i]);
+    RPS_sampler_update (prot_sampler,   &peers[i]);
+    RPS_sampler_update (client_sampler, &peers[i]);
 
   GNUNET_SERVER_receive_done (client,
                              GNUNET_OK);
@@ -868,12 +1007,12 @@ handle_peer_push (void *cls,
 {
   const struct GNUNET_PeerIdentity *peer;
 
-  // (check the proof of work) 
-  
+  // (check the proof of work)
+
   peer = (const struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
   // FIXME wait for cadet to change this function
   LOG (GNUNET_ERROR_TYPE_DEBUG, "PUSH received (%s)\n", GNUNET_i2s (peer));
-  
+
   /* Add the sending peer to the push_list */
   if (GNUNET_NO == in_arr (push_list, pull_list_size, peer))
     GNUNET_array_append (push_list, push_list_size, *peer);
@@ -898,6 +1037,7 @@ handle_peer_pull_request (void *cls,
     const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_PeerIdentity *peer;
+  uint32_t send_size;
   struct GNUNET_MQ_Handle *mq;
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
@@ -906,19 +1046,33 @@ handle_peer_pull_request (void *cls,
   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (channel,
                                                                        GNUNET_CADET_OPTION_PEER);
   // FIXME wait for cadet to change this function
+
+  /* Compute actual size */
+  send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
+              gossip_list_size * sizeof (struct GNUNET_PeerIdentity);
+
+  if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
+    /* Compute number of peers to send
+     * If too long, simply truncate */
+    send_size = (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
+                 sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
+                 sizeof (struct GNUNET_PeerIdentity);
+  else
+    send_size = gossip_list_size;
+
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "PULL REQUEST from peer %s received, going to send %u peers\n",
-      GNUNET_i2s (peer), gossip_list_size);
+      GNUNET_i2s (peer), send_size);
 
   mq = get_mq (peer_map, peer);
 
   ev = GNUNET_MQ_msg_extra (out_msg,
-                           gossip_list_size * sizeof (struct GNUNET_PeerIdentity),
+                           send_size * sizeof (struct GNUNET_PeerIdentity),
                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
   //out_msg->num_peers = htonl (gossip_list_size);
-  out_msg->num_peers = htonl (gossip_list_size);
+  out_msg->num_peers = htonl (send_size);
   memcpy (&out_msg[1], gossip_list,
-         gossip_list_size * sizeof (struct GNUNET_PeerIdentity));
+         send_size * sizeof (struct GNUNET_PeerIdentity));
 
   GNUNET_MQ_send (mq, ev);
 
@@ -947,6 +1101,8 @@ handle_peer_pull_reply (void *cls,
   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
   struct GNUNET_PeerIdentity *peers;
   struct PeerContext *peer_ctx;
+  struct GNUNET_PeerIdentity *sender;
+  struct PeerContext *sender_ctx;
   struct PeerOutstandingOp out_op;
   uint32_t i;
 
@@ -965,13 +1121,22 @@ handle_peer_pull_reply (void *cls,
     return GNUNET_SYSERR;
   }
 
-  // TODO check that we sent a request and that it is the first reply
+  sender = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
+      (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
+       // Guess simply casting isn't the nicest way...
+       // FIXME wait for cadet to change this function
+  sender_ctx = get_peer_ctx (peer_map, sender);
+
+  if (0 == (sender_ctx->peer_flags || PULL_REPLY_PENDING))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_OK;
+  }
 
   peers = (struct GNUNET_PeerIdentity *) &msg[1];
   for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ )
   {
     peer_ctx = get_peer_ctx (peer_map, &peers[i]);
-
     if ((0 != (peer_ctx->peer_flags && LIVING)) ||
         NULL != peer_ctx->recv_channel)
     {
@@ -981,11 +1146,13 @@ handle_peer_pull_reply (void *cls,
     else if (GNUNET_NO == insert_in_pull_list_scheduled (peer_ctx))
     {
       out_op.op = insert_in_pull_list;
+      out_op.op_cls = NULL;
       GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op);
     }
   }
 
-  // TODO check that id is valid - whether it is reachable
+  sender_ctx->peer_flags &= (~PULL_REPLY_PENDING);
+  rem_from_list (pending_pull_reply_list, &pending_pull_reply_list_size, sender);
 
   return GNUNET_OK;
 }
@@ -1002,64 +1169,77 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round\n");
 
   uint32_t i;
-  //unsigned int *n_arr;
+  unsigned int *permut;
   unsigned int n_peers; /* Number of peers we send pushes/pulls to */
   struct GNUNET_MQ_Envelope *ev;
-  const struct GNUNET_PeerIdentity *peer;
+  struct GNUNET_PeerIdentity peer;
+  struct GNUNET_PeerIdentity *tmp_peer;
   struct GNUNET_MQ_Handle *mq;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Printing gossip list:\n");
+  for (i = 0 ; i < gossip_list_size ; i++)
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "\t%s\n", GNUNET_i2s (&gossip_list[i]));
   // TODO log lists, ...
 
-
   /* Would it make sense to have one shuffeled gossip list and then
    * to send PUSHes to first alpha peers, PULL requests to next beta peers and
    * use the rest to update sampler?
    * in essence get random peers with consumption */
 
   /* Send PUSHes */
-  //n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) gossip_list_size);
-  if (0 != gossip_list_size)
+  if (0 < gossip_list_size)
   {
-    n_peers = round (alpha * gossip_list_size);
-    if (0 == n_peers)
-      n_peers = 1;
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %u (%f * %u) peers.\n",
-        n_peers, alpha, gossip_list_size);
-    for ( i = 0 ; i < n_peers ; i++ )
+    permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
+                                           (unsigned int) gossip_list_size);
+    n_peers = ceil (alpha * gossip_list_size);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Going to send pushes to %u ceil (%f * %u) peers.\n",
+         n_peers, alpha, gossip_list_size);
+    for (i = 0 ; i < n_peers ; i++)
     {
-      peer = get_rand_peer (gossip_list, gossip_list_size);
-      if (own_identity != peer)
+      peer = gossip_list[permut[i]];
+      if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
       { // FIXME if this fails schedule/loop this for later
-        LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s (peer));
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "Sending PUSH to peer %s of gossiped list.\n",
+             GNUNET_i2s (&peer));
 
         ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
-        // FIXME sometimes it returns a pointer to a freed mq
-        mq = get_mq (peer_map, peer);
+        mq = get_mq (peer_map, &peer);
         GNUNET_MQ_send (mq, ev);
       }
     }
+    GNUNET_free (permut);
   }
 
 
   /* Send PULL requests */
-  //n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
-  if (0 != gossip_list_size)
+  //permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
+  n_peers = ceil (beta * gossip_list_size);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Going to send pulls to %u ceil (%f * %u) peers.\n",
+       n_peers, beta, gossip_list_size);
+  for (i = 0 ; i < n_peers ; i++)
   {
-    n_peers = round (beta * gossip_list_size);
-    if (0 == n_peers)
-      n_peers = 1;
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %u (%f * %u) peers.\n",
-        n_peers, beta, gossip_list_size);
-    for ( i = 0 ; i < n_peers ; i++ )
+    tmp_peer = get_rand_peer_ignore_list (gossip_list, gossip_list_size,
+        pending_pull_reply_list, pending_pull_reply_list_size);
+    if (NULL != tmp_peer)
     {
-      peer = get_rand_peer (gossip_list, gossip_list_size);
-      if (own_identity != peer)
+      peer = *tmp_peer;
+      GNUNET_free (tmp_peer);
+
+      GNUNET_array_append (pending_pull_reply_list, pending_pull_reply_list_size, peer);
+
+      if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer))
       { // FIXME if this fails schedule/loop this for later
-        LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s (peer));
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "Sending PULL request to peer %s of gossiped list.\n",
+             GNUNET_i2s (&peer));
 
         ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
-        //pull_msg = NULL;
-        mq = get_mq (peer_map, peer);
+        mq = get_mq (peer_map, &peer);
         GNUNET_MQ_send (mq, ev);
       }
     }
@@ -1077,13 +1257,13 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 
     uint32_t first_border;
     uint32_t second_border;
-    
-    first_border = round (alpha * sampler_size_est_need);
-    second_border = first_border + round (beta * sampler_size_est_need);
+
+    first_border  =                ceil (alpha * sampler_size_est_need);
+    second_border = first_border + ceil (beta  * sampler_size_est_need);
 
     GNUNET_array_grow (gossip_list, gossip_list_size, second_border);
 
-    for ( i = 0 ; i < first_border ; i++ )
+    for (i = 0 ; i < first_border ; i++)
     { // TODO use RPS_sampler_get_n_rand_peers
       /* Update gossip list with peers received through PUSHes */
       r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
@@ -1092,7 +1272,7 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
       // TODO change the peer_flags accordingly
     }
 
-    for ( i = first_border ; i < second_border ; i++ )
+    for (i = first_border ; i < second_border ; i++)
     {
       /* Update gossip list with peers received through PULLs */
       r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
@@ -1101,10 +1281,10 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
       // TODO change the peer_flags accordingly
     }
 
-    for ( i = second_border ; i < gossip_list_size ; i++ )
+    for (i = second_border ; i < sampler_size_est_need ; i++)
     {
       /* Update gossip list with peers from history */
-      RPS_sampler_get_n_rand_peers (hist_update, NULL, 1, GNUNET_NO);
+      RPS_sampler_get_n_rand_peers (prot_sampler, hist_update, NULL, 1, GNUNET_NO);
       num_hist_update_tasks++;
       // TODO change the peer_flags accordingly
     }
@@ -1112,7 +1292,7 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   }
   else
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list. ()\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list.\n");
   }
   // TODO independent of that also get some peers from CADET_get_peers()?
 
@@ -1121,13 +1301,15 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 
   for ( i = 0 ; i < push_list_size ; i++ )
   {
-    RPS_sampler_update_list (&push_list[i]);
+    RPS_sampler_update (prot_sampler,   &push_list[i]);
+    RPS_sampler_update (client_sampler, &push_list[i]);
     // TODO set in_flag?
   }
 
   for ( i = 0 ; i < pull_list_size ; i++ )
   {
-    RPS_sampler_update_list (&pull_list[i]);
+    RPS_sampler_update (prot_sampler,   &push_list[i]);
+    RPS_sampler_update (client_sampler, &push_list[i]);
     // TODO set in_flag?
   }
 
@@ -1164,7 +1346,8 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  * Open a connection to given peer and store channel and mq.
  */
   void
-insertCB (void *cls, const struct GNUNET_PeerIdentity *id)
+insertCB (void *cls, struct RPS_Sampler *sampler,
+          const struct GNUNET_PeerIdentity *id)
 {
   // We open a channel to be notified when this peer goes down.
   (void) get_channel (peer_map, id);
@@ -1175,12 +1358,13 @@ insertCB (void *cls, const struct GNUNET_PeerIdentity *id)
  * Close the connection to given peer and delete channel and mq.
  */
   void
-removeCB (void *cls, const struct GNUNET_PeerIdentity *id)
+removeCB (void *cls, struct RPS_Sampler *sampler,
+          const struct GNUNET_PeerIdentity *id)
 {
   size_t s;
   struct PeerContext *ctx;
 
-  s = RPS_sampler_count_id (id);
+  s = RPS_sampler_count_id (sampler, id);
   if ( 1 >= s )
   {
     if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, id))
@@ -1240,6 +1424,7 @@ init_peer_cb (void *cls,
     if (GNUNET_NO == insert_in_gossip_list_scheduled (peer_ctx))
     {
       out_op.op = insert_in_gossip_list;
+      out_op.op_cls = NULL;
       GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op);
     }
 
@@ -1260,24 +1445,46 @@ init_peer_cb (void *cls,
 peer_remove_cb (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
 {
   struct PeerContext *peer_ctx;
+  const struct GNUNET_CADET_Channel *ch = (const struct GNUNET_CADET_Channel *) cls;
+  struct GNUNET_CADET_Channel *recv;
+  struct GNUNET_CADET_Channel *send;
 
   peer_ctx = (struct PeerContext *) value;
 
-  if ( NULL != peer_ctx->mq)
+  if (0 != peer_ctx->num_outstanding_ops)
+    GNUNET_array_grow (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, 0);
+
+  if (NULL != peer_ctx->mq)
     GNUNET_MQ_destroy (peer_ctx->mq);
 
-  if ( NULL != peer_ctx->is_live_task)
+  if (NULL != peer_ctx->is_live_task)
+  {
     GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->is_live_task);
+    peer_ctx->is_live_task = NULL;
+  }
+
+  send = peer_ctx->send_channel;
+  peer_ctx->send_channel = NULL;
+  recv = peer_ctx->send_channel;
+  peer_ctx->recv_channel = NULL;
+
+  if (NULL  != send
+      && ch != send)
+  {
+    GNUNET_CADET_channel_destroy (send);
+  }
 
-  if ( NULL != peer_ctx->send_channel)
-    GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
-  
-  if ( NULL != peer_ctx->recv_channel)
-    GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
+  if (NULL  != recv
+      && ch != recv)
+  {
+    GNUNET_CADET_channel_destroy (recv);
+  }
 
   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_remove_all (peer_map, key))
     LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n");
-  
+  else
+    GNUNET_free (peer_ctx);
+
   return GNUNET_YES;
 }
 
@@ -1300,18 +1507,19 @@ shutdown_task (void *cls,
     do_round_task = NULL;
   }
 
-  
+
+  {
   if (GNUNET_SYSERR == GNUNET_CONTAINER_multipeermap_iterate (peer_map, peer_remove_cb, NULL))
     LOG (GNUNET_ERROR_TYPE_WARNING,
         "Iterating over peers to disconnect from them was cancelled\n");
-
-  GNUNET_CONTAINER_multipeermap_destroy (peer_map);
+  }
 
   GNUNET_NSE_disconnect (nse);
   GNUNET_CADET_disconnect (cadet_handle);
-  GNUNET_free (own_identity);
-  RPS_sampler_destroy ();
-  GNUNET_array_grow (request_deltas, request_deltas_size, 0);
+  RPS_sampler_destroy (prot_sampler);
+  RPS_sampler_destroy (client_sampler);
+  GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (peer_map));
+  GNUNET_CONTAINER_multipeermap_destroy (peer_map);
   GNUNET_array_grow (gossip_list, gossip_list_size, 0);
   GNUNET_array_grow (push_list, push_list_size, 0);
   GNUNET_array_grow (pull_list, pull_list_size, 0);
@@ -1330,6 +1538,7 @@ handle_client_disconnect (void *cls,
 {
 }
 
+
 /**
  * Handle the channel a peer opens to us.
  *
@@ -1362,16 +1571,18 @@ handle_inbound_channel (void *cls,
     ctx->recv_channel = channel;
   }
 
-  // FIXME there might already be an established channel
+  ctx->peer_flags |= LIVING;
 
   //ctx->peer_flags = IN_OTHER_GOSSIP_LIST;
-  ctx->mq = NULL; // TODO create mq?
+  ctx->mq = NULL;
 
   (void) GNUNET_CONTAINER_multipeermap_put (peer_map, initiator, ctx,
       GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+
   return NULL; // TODO
 }
 
+
 /**
  * This is called when a remote peer destroys a channel.
  *
@@ -1387,17 +1598,27 @@ cleanup_channel (void *cls,
   struct GNUNET_PeerIdentity *peer;
   struct PeerContext *peer_ctx;
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Channel to remote peer was destroyed.\n");
-
   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
        // Guess simply casting isn't the nicest way...
        // FIXME wait for cadet to change this function
-  RPS_sampler_reinitialise_by_value (peer);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Cleaning up channel to peer %s\n",
+       GNUNET_i2s (peer));
+
+  RPS_sampler_reinitialise_by_value (prot_sampler,   peer);
+  RPS_sampler_reinitialise_by_value (client_sampler, peer);
+
+  if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
+  {
+    peer_ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
 
-  peer_ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
-  /* Somwewhat {ab,re}use the iterator function */
-  (void) peer_remove_cb (peer, peer, peer_ctx);
+    if (NULL == peer_ctx) /* It could have been removed by shutdown_task */
+      return;
+
+    /* Somwewhat {ab,re}use the iterator function */
+    /* Cast to void is ok, because it's used as void in peer_remove_cb */
+    (void) peer_remove_cb ((void *) channel, peer, peer_ctx);
+  }
 }
 
 
@@ -1447,18 +1668,14 @@ run (void *cls,
   // TODO check what this does -- copied from gnunet-boss
   // - seems to work as expected
   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "RPS started\n");
-
-
   cfg = c;
 
 
   /* Get own ID */
-  own_identity = GNUNET_new (struct GNUNET_PeerIdentity);
-  GNUNET_CRYPTO_get_peer_identity (cfg, own_identity); // TODO check return value
-  GNUNET_assert (NULL != own_identity);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Own identity is %s (at %p).\n", GNUNET_i2s (own_identity), own_identity);
+  GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "STARTING SERVICE (rps) for peer [%s]\n",
+              GNUNET_i2s (&own_identity));
 
 
   /* Get time interval from the configuration */
@@ -1494,27 +1711,6 @@ run (void *cls,
 
   alpha = 0.45;
   beta  = 0.45;
-  // TODO initialise thresholds - ?
-
-  /* Get alpha from the configuration */
-  if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_float (cfg, "RPS",
-                                                         "ALPHA",
-                                                         &alpha))
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "No ALPHA specified in the config\n");
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "ALPHA is %f\n", alpha);
-  /* Get beta from the configuration */
-  if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_float (cfg, "RPS",
-                                                         "BETA",
-                                                         &beta))
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "No BETA specified in the config\n");
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "BETA is %f\n", beta);
-
-  // TODO check that alpha + beta < 1
 
   peer_map = GNUNET_CONTAINER_multipeermap_create (sampler_size_est_need, GNUNET_NO);
 
@@ -1546,15 +1742,18 @@ run (void *cls,
   half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
 
-  RPS_sampler_init (sampler_size_est_need, max_round_interval,
+  prot_sampler =   RPS_sampler_init (sampler_size_est_need, max_round_interval,
+      insertCB, NULL, removeCB, NULL);
+  client_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval,
       insertCB, NULL, removeCB, NULL);
-  sampler_size = sampler_size_est_need;
 
   /* Initialise push and pull maps */
   push_list = NULL;
   push_list_size = 0;
   pull_list = NULL;
   pull_list_size = 0;
+  pending_pull_reply_list = NULL;
+  pending_pull_reply_list_size = 0;
 
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");