Change architecture of RPS service - api
authorJulius Bünger <buenger@mytum.de>
Tue, 25 Sep 2018 22:22:41 +0000 (00:22 +0200)
committerJulius Bünger <buenger@mytum.de>
Tue, 25 Sep 2018 22:23:45 +0000 (00:23 +0200)
po/POTFILES.in
src/include/gnunet_rps_service.h
src/rps/Makefile.am
src/rps/gnunet-rps-profiler.c
src/rps/gnunet-service-rps.c
src/rps/gnunet-service-rps_sampler.c
src/rps/rps.h
src/rps/rps_api.c

index 45471731ce7619992c2e2ee5502712d7cd643a93..44bd751e43903b42450355412668a0183b78cf7a 100644 (file)
@@ -329,6 +329,7 @@ src/rps/gnunet-service-rps_sampler.c
 src/rps/gnunet-service-rps_sampler_elem.c
 src/rps/gnunet-service-rps_view.c
 src/rps/rps_api.c
+src/rps/rps_test_lib.c
 src/rps/rps-test_util.c
 src/scalarproduct/gnunet-scalarproduct.c
 src/scalarproduct/gnunet-service-scalarproduct_alice.c
index 22e944d0f76b3bf711dfd1ffe89167569b11665d..f77c3dbc4e97cab725b76942776db38585c2687b 100644 (file)
@@ -162,7 +162,7 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
  * @param cls a closure that will be given to the callback
  * @param ready_cb the callback called when the peers are available
  */
-void
+struct GNUNET_RPS_StreamRequestHandle *
 GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
                            uint32_t num_updates,
                            GNUNET_RPS_NotifyReadyCB stream_input_cb,
index 2ed93ef7c2b38c2a387079255d3e9472a2dd8980..5e9fd09fa966c7510eaa7fe916776a3bb5dba198 100644 (file)
@@ -90,7 +90,8 @@ endif
 rps_test_src = \
        test_rps.c \
        rps-test_util.h rps-test_util.c \
- gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c
+ gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \
+ gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c
 
 ld_rps_test_lib = \
        libgnunetrps.la \
index d2640225adb48b068ffa18530fc8b1a5a39c9dda..f2a8083e76a63eca02ba75854a78f301cd6628a9 100644 (file)
@@ -1932,8 +1932,8 @@ static uint32_t binom (uint32_t n, uint32_t k)
 {
   //GNUNET_assert (n >= k);
   if (k > n) return 0;
-  if (0 > n) return 0; /* just for clarity - always false */
-  if (0 > k) return 0; /* just for clarity - always false */
+  /* if (0 > n) return 0;  - always false */
+  /* if (0 > k) return 0;  - always false */
   if (0 == k) return 1;
   return fac (n)
     /
index 40f576d3eb91db9f121d2b99470cc20cdfc7aa99..862514264bf5841e5f1a441fd7c53e45920593bb 100644 (file)
@@ -47,8 +47,6 @@
 
 // TODO connect to friends
 
-// TODO store peers somewhere persistent
-
 // TODO blacklist? (-> mal peer detection on top of brahms)
 
 // hist_size_init, hist_size_max
@@ -978,7 +976,6 @@ destroy_peer (struct PeerContext *peer_ctx)
                      peer_ctx->liveliness_check_pending,
                      sizeof (struct PendingMessage))) )
       {
-        // TODO this may leak memory
         peer_ctx->liveliness_check_pending = NULL;
         GNUNET_STATISTICS_update (stats,
                                   "# pending liveliness checks",
@@ -1620,10 +1617,6 @@ check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
  *        intention to another peer
  *
- * If there is also no channel to receive messages from that peer, remove it
- * from the peermap.
- * TODO really?
- *
  * @peer the peer identity of the peer whose sending channel to destroy
  * @return #GNUNET_YES if channel was destroyed
  *         #GNUNET_NO  otherwise
@@ -1777,10 +1770,10 @@ struct ClientContext
   int64_t view_updates_left;
 
   /**
-   * @brief How many peers from the biased
-   * stream this client expects to receive.
+   * @brief Whether this client wants to receive stream updates.
+   * Either #GNUNET_YES or #GNUNET_NO
    */
-  int64_t stream_peers_left;
+  int8_t stream_update;
 
   /**
    * The client handle to send the reply to
@@ -2232,9 +2225,9 @@ send_view (const struct ClientContext *cli_ctx,
  * @param view_size the size of the view array (can be 0)
  */
 void
-send_stream_peer (const struct ClientContext *cli_ctx,
-                  uint64_t num_peers,
-                  const struct GNUNET_PeerIdentity *peers)
+send_stream_peers (const struct ClientContext *cli_ctx,
+                   uint64_t num_peers,
+                   const struct GNUNET_PeerIdentity *peers)
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
@@ -2275,7 +2268,7 @@ clients_notify_view_update (void)
 
   for (cli_ctx_iter = cli_ctx_head;
        NULL != cli_ctx_iter;
-       cli_ctx_iter = cli_ctx_head->next)
+       cli_ctx_iter = cli_ctx_iter->next)
   {
     if (1 < cli_ctx_iter->view_updates_left)
     {
@@ -2306,7 +2299,6 @@ clients_notify_stream_peer (uint64_t num_peers,
                             // TODO enum StreamPeerSource)
 {
   struct ClientContext *cli_ctx_iter;
-  uint64_t num_peers_send;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "Got peer (%s) from biased stream - update all clients\n",
@@ -2314,32 +2306,12 @@ clients_notify_stream_peer (uint64_t num_peers,
 
   for (cli_ctx_iter = cli_ctx_head;
        NULL != cli_ctx_iter;
-       cli_ctx_iter = cli_ctx_head->next)
+       cli_ctx_iter = cli_ctx_iter->next)
   {
-    if (0 < cli_ctx_iter->stream_peers_left)
-    {
-      /* Client wants to receive limited amount of updates */
-      if (num_peers > cli_ctx_iter->stream_peers_left)
-      {
-        num_peers_send = num_peers - cli_ctx_iter->stream_peers_left;
-        cli_ctx_iter->stream_peers_left = 0;
-      }
-      else
-      {
-        num_peers_send = cli_ctx_iter->stream_peers_left - num_peers;
-        cli_ctx_iter->stream_peers_left -= num_peers_send;
-      }
-    } else if (0 > cli_ctx_iter->stream_peers_left)
-    {
-      /* Client is not interested in updates */
-      continue;
-    } else /* _updates_left == 0 - infinite amount of updates */
+    if (GNUNET_YES == cli_ctx_iter->stream_update)
     {
-      num_peers_send = num_peers;
+      send_stream_peers (cli_ctx_iter, num_peers, peers);
     }
-
-    /* send view */
-    send_stream_peer (cli_ctx_iter, num_peers_send, peers);
   }
 }
 
@@ -3076,16 +3048,13 @@ handle_client_stream_request (void *cls,
                               const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
 {
   struct ClientContext *cli_ctx = cls;
-  uint64_t num_peers;
-
-  num_peers = ntohl (msg->num_peers);
+  (void) msg;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Client requested %" PRIu64 " peers from biased stream.\n",
-       num_peers);
+       "Client requested peers from biased stream.\n");
+  cli_ctx->stream_update = GNUNET_YES;
 
   GNUNET_assert (NULL != cli_ctx);
-  cli_ctx->stream_peers_left = num_peers;
   GNUNET_SERVICE_client_continue (cli_ctx->client);
 }
 
@@ -3882,7 +3851,8 @@ do_round (void *cls)
       if (GNUNET_OK == inserted)
       {
         clients_notify_stream_peer (1,
-            CustomPeerMap_get_peer_by_index (push_map, permut[i]));
+            CustomPeerMap_get_peer_by_index (pull_map,
+                                             permut[i - first_border]));
       }
       to_file (file_name_view_log,
                "+%s\t(pull list)",
@@ -4195,6 +4165,7 @@ client_connect_cb (void *cls,
   cli_ctx = GNUNET_new (struct ClientContext);
   cli_ctx->mq = mq;
   cli_ctx->view_updates_left = -1;
+  cli_ctx->stream_update = GNUNET_NO;
   cli_ctx->client = client;
   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
                                cli_ctx_tail,
index ff4bc9e4237e4709d6d1e21d8bd8143c570c4463..2cd4cb996f39e53e5384c194e53d34871042dd6e 100644 (file)
@@ -313,26 +313,9 @@ sampler_notify_on_update (struct RPS_Sampler *sampler,
   notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX);
   notify_ctx->notify_cb = notify_cb;
   notify_ctx->cls = cls;
-  if (NULL != sampler->notify_ctx_head)
-  {
-    for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head;
-        NULL != notify_iter->next;
-        notify_iter = notify_iter->next)
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-          "Pre: Context\n");
-    }
-  }
   GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head,
                                sampler->notify_ctx_tail,
                                notify_ctx);
-  for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head;
-       NULL != notify_iter;
-       notify_iter = notify_iter->next)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "Post: Context\n");
-  }
   return notify_ctx;
 }
 
@@ -559,27 +542,21 @@ RPS_sampler_mod_init (size_t init_size,
 
 
 /**
- * Update every sampler element of this sampler with given peer
+ * @brief Notify about update of the sampler.
  *
- * @param sampler the sampler to update.
- * @param id the PeerID that is put in the sampler
+ * Call the callbacks that are waiting for notification on updates to the
+ * sampler.
+ *
+ * @param sampler The sampler the updates are waiting for
  */
-  void
-RPS_sampler_update (struct RPS_Sampler *sampler,
-                    const struct GNUNET_PeerIdentity *id)
+static void
+notify_update (struct RPS_Sampler *sampler)
 {
   struct SamplerNotifyUpdateCTX *tmp_notify_head;
   struct SamplerNotifyUpdateCTX *tmp_notify_tail;
 
-  to_file (sampler->file_name,
-           "Got %s",
-           GNUNET_i2s_full (id));
-
-  for (uint32_t i = 0; i < sampler->sampler_size; i++)
-  {
-    RPS_sampler_elem_next (sampler->sampler_elements[i],
-                           id);
-  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+      "Calling callbacks waiting for update notification.\n");
   tmp_notify_head = sampler->notify_ctx_head;
   tmp_notify_tail = sampler->notify_ctx_tail;
   sampler->notify_ctx_head = NULL;
@@ -598,6 +575,29 @@ RPS_sampler_update (struct RPS_Sampler *sampler,
 }
 
 
+/**
+ * Update every sampler element of this sampler with given peer
+ *
+ * @param sampler the sampler to update.
+ * @param id the PeerID that is put in the sampler
+ */
+  void
+RPS_sampler_update (struct RPS_Sampler *sampler,
+                    const struct GNUNET_PeerIdentity *id)
+{
+  to_file (sampler->file_name,
+           "Got %s",
+           GNUNET_i2s_full (id));
+
+  for (uint32_t i = 0; i < sampler->sampler_size; i++)
+  {
+    RPS_sampler_elem_next (sampler->sampler_elements[i],
+                           id);
+  }
+  notify_update (sampler);
+}
+
+
 /**
  * Reinitialise all previously initialised sampler elements with the given value.
  *
@@ -714,6 +714,7 @@ sampler_mod_get_rand_peer (void *cls)
   /* Check whether we may use this sampler to give it back to the client */
   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
   {
+    // TODO remove this condition at least for the client sampler
     last_request_diff =
       GNUNET_TIME_absolute_get_difference (s_elem->last_client_request,
                                            GNUNET_TIME_absolute_get ());
index 26615bfc5d745646b0b6227690647221f1a8ffaa..9e4487f88078980ac444218506ddf527ae2ad3f3 100644 (file)
@@ -226,12 +226,6 @@ struct GNUNET_RPS_CS_DEBUG_StreamRequest
    * Header including size and type in NBO
    */
   struct GNUNET_MessageHeader header;
-
-  /**
-   * Number of peers
-   * 0 for sending updates until cancellation
-   */
-  uint32_t num_peers GNUNET_PACKED;
 };
 
 /**
@@ -244,11 +238,6 @@ struct GNUNET_RPS_CS_DEBUG_StreamReply
    */
   struct GNUNET_MessageHeader header;
 
-  /**
-   * Identifyer of the message.
-   */
-  uint32_t id GNUNET_PACKED;
-
   /**
    * Number of peers
    */
index a558c8a35e59081107180723a40e2428de8e68d1..e4f4db506a1d02044112263bd5501ecb1a572518 100644 (file)
 
 #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__)
 
+/**
+ * Handle for a request to get peers from biased stream of ids
+ */
+struct GNUNET_RPS_StreamRequestHandle
+{
+  /**
+   * The client issuing the request.
+   */
+  struct GNUNET_RPS_Handle *rps_handle;
+
+  /**
+   * The number of requested peers.
+   */
+  uint32_t num_peers_left;
+
+  /**
+   * The callback to be called when we receive an answer.
+   */
+  GNUNET_RPS_NotifyReadyCB ready_cb;
+
+  /**
+   * The closure for the callback.
+   */
+  void *ready_cb_cls;
+
+  /**
+   * @brief Next element of the DLL
+   */
+  struct GNUNET_RPS_StreamRequestHandle *next;
+
+  /**
+   * @brief Previous element of the DLL
+   */
+  struct GNUNET_RPS_StreamRequestHandle *prev;
+};
+
+
 /**
  * Handler to handle requests from a client.
  */
@@ -67,14 +104,19 @@ struct GNUNET_RPS_Handle
   void *view_update_cls;
 
   /**
-   * @brief Callback called on each peer of the biased input stream
+   * @brief Closure to each requested peer from the biased stream
+   */
+  void *stream_input_cls;
+
+  /**
+   * @brief Head of the DLL of stream requests
    */
-  GNUNET_RPS_NotifyReadyCB stream_input_cb;
+  struct GNUNET_RPS_StreamRequestHandle *stream_requests_head;
 
   /**
-   * @brief Closure to each requested peer from the biased stream
+   * @brief Tail of the DLL of stream requests
    */
-  void *stream_input_cls;
+  struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail;
 };
 
 
@@ -138,6 +180,91 @@ struct cb_cls_pack
 };
 
 
+/**
+ * @brief Create a new handle for a stream request
+ *
+ * @param rps_handle The rps handle
+ * @param num_peers The number of desired peers
+ * @param ready_cb The callback to be called, once all peers are ready
+ * @param cls The colsure to provide to the callback
+ *
+ * @return The handle to the stream request
+ */
+static struct GNUNET_RPS_StreamRequestHandle *
+new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
+                    uint64_t num_peers,
+                    GNUNET_RPS_NotifyReadyCB ready_cb,
+                    void *cls)
+{
+  struct GNUNET_RPS_StreamRequestHandle *srh;
+
+  srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle);
+
+  srh->rps_handle = rps_handle;
+  srh->num_peers_left = num_peers;
+  srh->ready_cb = ready_cb;
+  srh->ready_cb_cls = cls;
+  GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head,
+                               rps_handle->stream_requests_tail,
+                               srh);
+
+  return srh;
+}
+
+
+/**
+ * @brief Remove the given stream request from the list of requests and memory
+ *
+ * @param srh The request to be removed
+ * @param srh_head Head of the DLL to remove request from
+ * @param srh_tail Tail of the DLL to remove request from
+ */
+static void
+remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh,
+                       struct GNUNET_RPS_StreamRequestHandle *srh_head,
+                       struct GNUNET_RPS_StreamRequestHandle *srh_tail)
+{
+  GNUNET_CONTAINER_DLL_remove (srh_head,
+                               srh_tail,
+                               srh);
+
+  GNUNET_free (srh);
+}
+
+
+/**
+ * @brief Create new request handle
+ *
+ * @param rps_handle Handle to the service
+ * @param num_requests Number of requests
+ * @param ready_cb Callback
+ * @param cls Closure
+ *
+ * @return The newly created request handle
+ */
+static struct GNUNET_RPS_Request_Handle *
+new_request_handle (struct GNUNET_RPS_Handle *rps_handle,
+                    uint64_t num_requests,
+                    struct RPS_Sampler *sampler,
+                    GNUNET_RPS_NotifyReadyCB ready_cb,
+                    void *cls)
+{
+  struct GNUNET_RPS_Request_Handle *rh;
+
+  rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
+  rh->rps_handle = rps_handle;
+  rh->id = rps_handle->current_request_id++;
+  rh->num_requests = num_requests;
+  rh->sampler = sampler;
+  rh->ready_cb = ready_cb;
+  rh->ready_cb_cls = cls;
+  GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh,
+      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+
+  return rh;
+}
+
+
 /**
  * @brief Send a request to the service.
  *
@@ -304,24 +431,27 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle,
  * @param cls a closure that will be given to the callback
  * @param ready_cb the callback called when the peers are available
  */
-void
+struct GNUNET_RPS_StreamRequestHandle *
 GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
                            uint32_t num_peers,
                            GNUNET_RPS_NotifyReadyCB stream_input_cb,
                            void *cls)
 {
+  struct GNUNET_RPS_StreamRequestHandle *srh;
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
 
+  srh = new_stream_request (rps_handle,
+                            num_peers, /* num requests */
+                            stream_input_cb,
+                            cls);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Client requests %" PRIu32 " biased stream updates\n",
        num_peers);
-  rps_handle->stream_input_cb = stream_input_cb;
-  rps_handle->stream_input_cls = cls;
 
   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
-  msg->num_peers = htonl (num_peers);
   GNUNET_MQ_send (rps_handle->mq, ev);
+  return srh;
 }
 
 
@@ -378,6 +508,41 @@ handle_view_update (void *cls,
 }
 
 
+/**
+ * @brief Send message to service that this client does not want to receive
+ * further updates from the biased peer stream
+ *
+ * @param rps_handle The handle representing the service to the client
+ */
+static void
+cancel_stream (struct GNUNET_RPS_Handle *rps_handle)
+{
+  struct GNUNET_MQ_Envelope *ev;
+
+  ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL);
+  GNUNET_MQ_send (rps_handle->mq, ev);
+}
+
+
+/**
+ * @brief Cancel a specific request for updates from the biased peer stream
+ *
+ * @param srh The request handle to cancel
+ */
+void
+GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh)
+{
+  struct GNUNET_RPS_Handle *rps_handle;
+
+  rps_handle = srh->rps_handle;
+  GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
+                               rps_handle->stream_requests_tail,
+                               srh);
+  GNUNET_free (srh);
+  if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle);
+}
+
+
 /**
  * This function is called, when the service sends another peer from the biased
  * stream.
@@ -420,16 +585,71 @@ handle_stream_input (void *cls,
 {
   struct GNUNET_RPS_Handle *h = cls;
   const struct GNUNET_PeerIdentity *peers;
+  /* The following two pointers are used to prevent that new handles are
+   * inserted into the DLL, that is currently iterated over, from within a call
+   * to that handler_cb, are executed and in turn again add themselves to the
+   * iterated DLL infinitely */
+  struct GNUNET_RPS_StreamRequestHandle *srh_head_tmp;
+  struct GNUNET_RPS_StreamRequestHandle *srh_tail_tmp;
+  uint64_t num_peers;
+  uint64_t num_peers_return;
 
-  /* Give the peers back */
+  peers = (struct GNUNET_PeerIdentity *) &msg[1];
+  num_peers = ntohl (msg->num_peers);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "New peer of %" PRIu64 " biased input stream\n",
-       ntohl (msg->num_peers));
+       "Received %" PRIu64 " peer(s) from stream input.\n",
+       num_peers);
+  srh_head_tmp = h->stream_requests_head;
+  srh_tail_tmp = h->stream_requests_tail;
+  h->stream_requests_head = NULL;
+  h->stream_requests_tail = NULL;
+  for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp;
+       NULL != srh_iter;
+       srh_iter = srh_iter->next)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Calling srh - left: %" PRIu64 "\n",
+        srh_iter->num_peers_left);
+    if (0 == srh_iter->num_peers_left) /* infinite updates */
+    {
+      num_peers_return = num_peers;
+    }
+    else if (num_peers > srh_iter->num_peers_left)
+    {
+      num_peers_return = num_peers - srh_iter->num_peers_left;
+    }
+    else /* num_peers <= srh_iter->num_peers_left */
+    {
+      num_peers_return = srh_iter->num_peers_left - num_peers;
+    }
+    srh_iter->ready_cb (srh_iter->ready_cb_cls,
+                        num_peers_return,
+                        peers);
+    if (0 == srh_iter->num_peers_left) ;
+    else if (num_peers_return >= srh_iter->num_peers_left)
+    {
+      remove_stream_request (srh_iter,
+                             srh_head_tmp,
+                             srh_tail_tmp);
+    }
+    else
+    {
+      srh_iter->num_peers_left -= num_peers_return;
+    }
+  }
+  for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp;
+       NULL != srh_iter;
+       srh_iter = srh_iter->next)
+  {
+      GNUNET_CONTAINER_DLL_insert (h->stream_requests_head,
+                                   h->stream_requests_tail,
+                                   srh_iter);
+  }
 
-  peers = (struct GNUNET_PeerIdentity *) &msg[1];
-  GNUNET_assert (NULL != h);
-  GNUNET_assert (NULL != h->stream_input_cb);
-  h->stream_input_cb (h->stream_input_cls, ntohl (msg->num_peers), peers);
+  if (NULL == h->stream_requests_head)
+  {
+    cancel_stream (h);
+  }
 }
 
 
@@ -524,39 +744,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
 }
 
 
-/**
- * @brief Create new request handle
- *
- * @param rps_handle Handle to the service
- * @param num_requests Number of requests
- * @param ready_cb Callback
- * @param cls Closure
- *
- * @return The newly created request handle
- */
-static struct GNUNET_RPS_Request_Handle *
-new_request_handle (struct GNUNET_RPS_Handle *rps_handle,
-                    uint64_t num_requests,
-                    struct RPS_Sampler *sampler,
-                    GNUNET_RPS_NotifyReadyCB ready_cb,
-                    void *cls)
-{
-  struct GNUNET_RPS_Request_Handle *rh;
-
-  rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
-  rh->rps_handle = rps_handle;
-  rh->id = rps_handle->current_request_id++;
-  rh->num_requests = num_requests;
-  rh->sampler = sampler;
-  rh->ready_cb = ready_cb;
-  rh->ready_cb_cls = cls;
-  GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh,
-      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
-
-  return rh;
-}
-
-
 /**
  * Request n random peers.
  *
@@ -646,9 +833,9 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
  */
 struct GNUNET_RPS_Request_Handle *
 GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
-                            uint32_t num_req_peers,
-                            GNUNET_RPS_NotifyReadyCB ready_cb,
-                            void *cls)
+                          uint32_t num_req_peers,
+                          GNUNET_RPS_NotifyReadyCB ready_cb,
+                          void *cls)
 {
   struct GNUNET_RPS_Request_Handle *rh;