restructured service and sampler
authorJulius Bünger <buenger@mytum.de>
Thu, 22 Jan 2015 00:18:44 +0000 (00:18 +0000)
committerJulius Bünger <buenger@mytum.de>
Thu, 22 Jan 2015 00:18:44 +0000 (00:18 +0000)
src/rps/gnunet-service-rps.c
src/rps/gnunet-service-rps_sampler.c
src/rps/gnunet-service-rps_sampler.h

index 6caa77c400002a87a22421a9faa0803680e4ca95..9a751972820add32980879c48d2481e4f46ef843 100644 (file)
@@ -70,22 +70,6 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
  */
 static struct GNUNET_PeerIdentity *own_identity;
 
-/**
- * Closure to the callback cadet calls on each peer it passes to us
- */
-struct init_peer_cls
-{
-  /**
-   * The server handle to later listen to client requests
-   */
-  struct GNUNET_SERVER_Handle *server;
-
-  /**
-   * Counts how many peers cadet already passed to us
-   */
-  uint32_t i;
-};
-
 
   struct GNUNET_PeerIdentity *
 get_rand_peer (const struct GNUNET_PeerIdentity *peer_list, unsigned int size);
@@ -122,6 +106,29 @@ enum PeerFlags
   LIVING                = 0x10
 };
 
+
+/**
+ * Functions of this type can be used to be stored at a peer for later execution.
+ */
+typedef void (* PeerOp) (void *cls, const struct GNUNET_PeerIdentity *peer);
+
+/**
+ * Outstanding operation on peer consisting of callback and closure
+ */
+struct PeerOutstandingOp
+{
+  /**
+   * Callback
+   */
+  PeerOp op;
+
+  /**
+   * Closure
+   */
+  void *op_cls;
+};
+
+
 /**
  * Struct used to keep track of other peer's status
  *
@@ -149,6 +156,17 @@ struct PeerContext
    */
   struct GNUNET_CADET_Channel *recv_channel; // unneeded?
 
+  /**
+   * Array of outstanding operations on this peer.
+   */
+  struct PeerOutstandingOp *outstanding_ops;
+
+  /**
+   * Number of outstanding operations.
+   */
+  unsigned int num_outstanding_ops;
+  //size_t num_outstanding_ops;
+
   /**
    * This is pobably followed by 'statistical' data (when we first saw
    * him, how did we get his ID, how many pushes (in a timeinterval),
@@ -310,6 +328,12 @@ static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
 static struct GNUNET_TIME_Relative  request_rate;
 
 
+/**
+ * Number of history update tasks.
+ */
+uint32_t num_hist_update_tasks;
+
+
 /***********************************************************************
  * /Globals
 ***********************************************************************/
@@ -398,6 +422,8 @@ get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
     ctx->mq = NULL;
     ctx->send_channel = NULL;
     ctx->recv_channel = NULL;
+    ctx->outstanding_ops = NULL;
+    ctx->num_outstanding_ops = 0;
     (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
   }
@@ -405,6 +431,22 @@ get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
 }
 
 
+/**
+ * Put random peer from sampler into the gossip list as history update.
+ */
+  void
+hist_update (void *cls, struct GNUNET_PeerIdentity *ids, uint32_t num_peers)
+{
+  GNUNET_assert (1 == num_peers);
+
+  if (gossip_list_size < sampler_size_est_need)
+    GNUNET_array_append (gossip_list, gossip_list_size, *ids);
+
+  if (0 < num_hist_update_tasks)
+    num_hist_update_tasks--;
+}
+
+
 /**
  * Callback that is called when a channel was effectively established.
  * This is given to ntfy_tmt_rdy and called when the channel was
@@ -422,6 +464,15 @@ peer_is_live (void *cls, size_t size, void *buf)
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Peer %s is live\n", GNUNET_i2s (peer));
 
+  if (0 != peer_ctx->num_outstanding_ops)
+  { /* Call outstanding operations */
+    unsigned int i;
+
+    for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+      peer_ctx->outstanding_ops[i].op (peer_ctx->outstanding_ops[i].op_cls, peer);
+    GNUNET_array_grow (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, 0);
+  }
+
   GNUNET_free (peer);
 
   buf = NULL;
@@ -437,7 +488,7 @@ get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
              const struct GNUNET_PeerIdentity *peer)
 {
   struct PeerContext *ctx;
-  //struct GNUNET_PeerIdentity *tmp_peer;
+  struct GNUNET_PeerIdentity *tmp_peer;
 
   ctx = get_peer_ctx (peer_map, peer);
   if (NULL == ctx->send_channel)
@@ -446,11 +497,14 @@ get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
                                                      GNUNET_RPS_CADET_PORT,
                                                      GNUNET_CADET_OPTION_RELIABLE);
 
-    //tmp_peer = GNUNET_new (struct GNUNET_PeerIdentity);
-    //*tmp_peer = *peer;
-    //(void) GNUNET_CADET_notify_transmit_ready (ctx->send_channel, GNUNET_NO,
-    //                                         GNUNET_TIME_UNIT_FOREVER_REL,
-    //                                         0, peer_is_live, tmp_peer);
+    if (NULL == ctx->recv_channel)
+    {
+      tmp_peer = GNUNET_new (struct GNUNET_PeerIdentity);
+      *tmp_peer = *peer;
+      (void) GNUNET_CADET_notify_transmit_ready (ctx->send_channel, GNUNET_NO,
+                                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                                 0, peer_is_live, tmp_peer);
+    }
 
     // do I have to explicitly put it in the peer_map?
     (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
@@ -509,16 +563,91 @@ T_relative_sum (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size)
   struct GNUNET_TIME_Relative
 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size)
 {
-  return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), arr_size); // FIXME find a way to devide that by arr_size
+  return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), arr_size);
+}
+
+
+/**
+ * Insert PeerID in #pull_list
+ *
+ * Called once we know a peer is live.
+ */
+  void
+insert_in_pull_list (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+  if (GNUNET_NO == in_arr (pull_list, pull_list_size, peer))
+    GNUNET_array_append (pull_list, pull_list_size, *peer);
+}
+
+/**
+ * Check whether #insert_in_pull_list was already scheduled
+ */
+  int
+insert_in_pull_list_scheduled (const struct PeerContext *peer_ctx)
+{
+  unsigned int i;
+
+  for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+    if (insert_in_pull_list == peer_ctx->outstanding_ops[i].op)
+      return GNUNET_YES;
+  return GNUNET_NO;
+}
+
+
+/**
+ * Insert PeerID in #gossip_list
+ *
+ * Called once we know a peer is live.
+ */
+  void
+insert_in_gossip_list (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+  if (GNUNET_NO == in_arr (gossip_list, gossip_list_size, peer))
+    GNUNET_array_append (gossip_list, gossip_list_size, *peer);
+}
+
+/**
+ * Check whether #insert_in_pull_list was already scheduled
+ */
+  int
+insert_in_gossip_list_scheduled (const struct PeerContext *peer_ctx)
+{
+  unsigned int i;
+
+  for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+    if (insert_in_gossip_list == peer_ctx->outstanding_ops[i].op)
+      return GNUNET_YES;
+  return GNUNET_NO;
+}
+
+
+/**
+ * Update sampler with given PeerID.
+ */
+  void
+insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+  RPS_sampler_update_list (peer);
+}
+
+/**
+ * Check whether #insert_in_sampler was already scheduled
+ */
+  int
+insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
+{
+  unsigned int i;
+
+  for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+    if (insert_in_sampler== peer_ctx->outstanding_ops[i].op)
+      return GNUNET_YES;
+  return GNUNET_NO;
 }
 
 
-/***********************************************************************
- * /Util functions
-***********************************************************************/
 
 /**
- * Wrapper around _sampler_resize()
+ * Wrapper around #RPS_sampler_resize()
  */
   void
 resize_wrapper ()
@@ -544,6 +673,10 @@ resize_wrapper ()
 }
 
 
+/***********************************************************************
+ * /Util functions
+***********************************************************************/
+
 /**
  * Function called by NSE.
  *
@@ -660,7 +793,7 @@ handle_client_request (void *cls,
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested %" PRIX32 " random peer(s).\n", num_peers);
 
-  RPS_sampler_get_n_rand_peers (client_respond, client, num_peers);
+  RPS_sampler_get_n_rand_peers (client_respond, client, num_peers, GNUNET_YES);
 
   GNUNET_SERVER_receive_done (client,
                              GNUNET_OK);
@@ -806,6 +939,8 @@ handle_peer_pull_reply (void *cls,
 
   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
   struct GNUNET_PeerIdentity *peers;
+  struct PeerContext *peer_ctx;
+  struct PeerOutstandingOp out_op;
   uint32_t i;
 
   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size))
@@ -828,8 +963,19 @@ handle_peer_pull_reply (void *cls,
   peers = (struct GNUNET_PeerIdentity *) &msg[1];
   for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ )
   {
-    if (GNUNET_NO == in_arr (pull_list, pull_list_size, &peers[i]))
-      GNUNET_array_append (pull_list, pull_list_size, peers[i]);
+    peer_ctx = get_peer_ctx (peer_map, &peers[i]);
+
+    if ((0 != (peer_ctx->peer_flags && LIVING)) ||
+        NULL != peer_ctx->recv_channel)
+    {
+      if (GNUNET_NO == in_arr (pull_list, pull_list_size, &peers[i]))
+        GNUNET_array_append (pull_list, pull_list_size, peers[i]);
+    }
+    else if (GNUNET_NO == insert_in_pull_list_scheduled (peer_ctx))
+    {
+      out_op.op = insert_in_pull_list;
+      GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op);
+    }
   }
 
   // TODO check that id is valid - whether it is reachable
@@ -865,44 +1011,50 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 
   /* Send PUSHes */
   //n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) gossip_list_size);
-  n_peers = round (alpha * gossip_list_size);
-  if (0 == n_peers)
-    n_peers = 1;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %u (%f * %u) peers.\n",
-      n_peers, alpha, gossip_list_size);
-  for ( i = 0 ; i < n_peers ; i++ )
+  if (0 != gossip_list_size)
   {
-    peer = get_rand_peer (gossip_list, gossip_list_size);
-    if (own_identity != peer)
-    { // FIXME if this fails schedule/loop this for later
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s (peer));
-
-      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
-      // FIXME sometimes it returns a pointer to a freed mq
-      mq = get_mq (peer_map, peer);
-      GNUNET_MQ_send (mq, ev);
+    n_peers = round (alpha * gossip_list_size);
+    if (0 == n_peers)
+      n_peers = 1;
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %u (%f * %u) peers.\n",
+        n_peers, alpha, gossip_list_size);
+    for ( i = 0 ; i < n_peers ; i++ )
+    {
+      peer = get_rand_peer (gossip_list, gossip_list_size);
+      if (own_identity != peer)
+      { // FIXME if this fails schedule/loop this for later
+        LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s (peer));
+
+        ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
+        // FIXME sometimes it returns a pointer to a freed mq
+        mq = get_mq (peer_map, peer);
+        GNUNET_MQ_send (mq, ev);
+      }
     }
   }
 
 
   /* Send PULL requests */
   //n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
-  n_peers = round (beta * gossip_list_size);
-  if (0 == n_peers)
-    n_peers = 1;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %u (%f * %u) peers.\n",
-      n_peers, beta, gossip_list_size);
-  for ( i = 0 ; i < n_peers ; i++ )
+  if (0 != gossip_list_size)
   {
-    peer = get_rand_peer (gossip_list, gossip_list_size);
-    if (own_identity != peer)
-    { // FIXME if this fails schedule/loop this for later
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s (peer));
-
-      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
-      //pull_msg = NULL;
-      mq = get_mq (peer_map, peer);
-      GNUNET_MQ_send (mq, ev);
+    n_peers = round (beta * gossip_list_size);
+    if (0 == n_peers)
+      n_peers = 1;
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %u (%f * %u) peers.\n",
+        n_peers, beta, gossip_list_size);
+    for ( i = 0 ; i < n_peers ; i++ )
+    {
+      peer = get_rand_peer (gossip_list, gossip_list_size);
+      if (own_identity != peer)
+      { // FIXME if this fails schedule/loop this for later
+        LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s (peer));
+
+        ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
+        //pull_msg = NULL;
+        mq = get_mq (peer_map, peer);
+        GNUNET_MQ_send (mq, ev);
+      }
     }
   }
 
@@ -914,14 +1066,16 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
        push_list_size != 0 &&
        pull_list_size != 0 )
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list.\n");
 
     uint32_t first_border;
     uint32_t second_border;
     
-    GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size_est_need);
+    first_border = round (alpha * sampler_size_est_need);
+    second_border = first_border + round (beta * sampler_size_est_need);
+
+    GNUNET_array_grow (gossip_list, gossip_list_size, second_border);
 
-    first_border = round (alpha * gossip_list_size);
     for ( i = 0 ; i < first_border ; i++ )
     { // TODO use RPS_sampler_get_n_rand_peers
       /* Update gossip list with peers received through PUSHes */
@@ -931,7 +1085,6 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
       // TODO change the peer_flags accordingly
     }
 
-    second_border = first_border + round (beta * gossip_list_size);
     for ( i = first_border ; i < second_border ; i++ )
     {
       /* Update gossip list with peers received through PULLs */
@@ -944,8 +1097,8 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     for ( i = second_border ; i < gossip_list_size ; i++ )
     {
       /* Update gossip list with peers from history */
-      peer = RPS_sampler_get_n_rand_peers_ (1);
-      gossip_list[i] = *peer;
+      RPS_sampler_get_n_rand_peers (hist_update, NULL, 1, GNUNET_NO);
+      num_hist_update_tasks++;
       // TODO change the peer_flags accordingly
     }
 
@@ -1058,37 +1211,38 @@ init_peer_cb (void *cls,
               unsigned int best_path) // "How long is the best path?
                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
 {
-  struct init_peer_cls *ipc;
+  struct GNUNET_SERVER_Handle *server;
+  struct PeerOutstandingOp out_op;
+  struct PeerContext *peer_ctx;
 
-  ipc = (struct init_peer_cls *) cls;
+  server = (struct GNUNET_SERVER_Handle *) cls;
   if ( NULL != peer )
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "Got %" PRIX32 ". peer %s (at %p) from CADET (gossip_list_size: %u)\n",
-        ipc->i, GNUNET_i2s (peer), peer, gossip_list_size);
-    RPS_sampler_update_list (peer);
-    (void) get_peer_ctx (peer_map, peer); // unneeded? -> insertCB
+        "Got peer %s (at %p) from CADET (gossip_list_size: %u)\n",
+        GNUNET_i2s (peer), peer, gossip_list_size);
 
-    if (ipc->i < gossip_list_size)
+    // maybe create a function for that
+    peer_ctx = get_peer_ctx (peer_map, peer);
+    if (GNUNET_NO == insert_in_sampler_scheduled (peer_ctx))
     {
-      gossip_list[ipc->i] = *peer; // FIXME sometimes we're writing to invalid space here
-                                   // not sure whether fixed
-      ipc->i++;
+      out_op.op = insert_in_sampler;
+      GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op);
     }
 
-    // send push/pull to each of those peers?
-  }
-  else
-  {
-    if (ipc->i < gossip_list_size)
+    if (GNUNET_NO == insert_in_gossip_list_scheduled (peer_ctx))
     {
-      memcpy (&gossip_list[ipc->i],
-          RPS_sampler_get_n_rand_peers_ (1),
-          (gossip_list_size - ipc->i) * sizeof (struct GNUNET_PeerIdentity));
+      out_op.op = insert_in_gossip_list;
+      GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op);
     }
-    rps_start (ipc->server);
-    GNUNET_free (ipc);
+
+    /* Issue livelyness test on peer */
+    (void) get_channel (peer_map, peer);
+
+    // send push/pull to each of those peers?
   }
+  else
+    rps_start (server);
 }
 
 
@@ -1236,6 +1390,7 @@ cleanup_channel (void *cls,
   (void) peer_remove_cb (peer, peer, peer_ctx);
 }
 
+
 /**
  * Actually start the service.
  */
@@ -1256,6 +1411,8 @@ rps_start (struct GNUNET_SERVER_Handle *server)
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Ready to receive requests from clients\n");
 
 
+  num_hist_update_tasks = 0;
+
   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
 
@@ -1283,7 +1440,6 @@ run (void *cls,
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "RPS started\n");
 
-  struct init_peer_cls *ipc;
 
   cfg = c;
 
@@ -1316,16 +1472,13 @@ run (void *cls,
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", sampler_size_est_need);
 
-  //gossip_list_size = sampler_size; // TODO rename sampler_size
 
   gossip_list = NULL;
-  GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size_est_need);
 
 
   /* connect to NSE */
   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
   // TODO check whether that was successful
-  // TODO disconnect on shutdown
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n");
 
 
@@ -1383,7 +1536,7 @@ run (void *cls,
   half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
 
-  RPS_sampler_init (sampler_size_est_need, own_identity, max_round_interval,
+  RPS_sampler_init (sampler_size_est_need, max_round_interval,
       insertCB, NULL, removeCB, NULL);
   sampler_size = sampler_size_est_need;
 
@@ -1394,11 +1547,8 @@ run (void *cls,
   pull_list_size = 0;
 
 
-  ipc = GNUNET_new (struct init_peer_cls);
-  ipc->server = server;
-  ipc->i = 0;
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
-  GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, ipc);
+  GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, server);
 
   // TODO send push/pull to each of those peers?
 }
index 85d8d532bdca621bcf50a6fed10c45c0185663aa..b2ee5fb21ec79853573cfa2a11afdea7e9533eec 100644 (file)
@@ -151,7 +151,7 @@ struct RPS_Sampler
 /**
  * Closure to _get_n_rand_peers_ready_cb()
  */
-struct RPS_GetNRandPeersReadyCls
+struct NRandPeersReadyCls
 {
   /**
    * Number of peers we are waiting for.
@@ -255,15 +255,15 @@ static uint32_t client_get_index;
  * give those back.
  */
   void
-RPS_sampler_get_n_rand_peers_ready_cb (void *cls,
+check_n_peers_ready (void *cls,
     const struct GNUNET_PeerIdentity *id)
 {
-  struct RPS_GetNRandPeersReadyCls *n_peers_cls;
+  struct NRandPeersReadyCls *n_peers_cls;
 
-  n_peers_cls = (struct RPS_GetNRandPeersReadyCls *) cls;
+  n_peers_cls = (struct NRandPeersReadyCls *) cls;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-      "SAMPLER: Got %" PRIX32 "th of %" PRIX32 " peers\n",
+      "SAMPLER: Got %" PRIX32 ". of %" PRIX32 " peers\n",
       n_peers_cls->cur_num_peers, n_peers_cls->num_peers);
 
   if (n_peers_cls->num_peers - 1 == n_peers_cls->cur_num_peers)
@@ -297,12 +297,6 @@ RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el)
 
   sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS;
 
-  /* We might want to keep the previous peer */
-
-  //GNUNET_CRYPTO_hmac(&sampler_el->auth_key, sampler_el->peer_id,
-  //                   sizeof(struct GNUNET_PeerIdentity),
-  //                   &sampler_el->peer_id_hash);
-
   sampler_el->birth = GNUNET_TIME_absolute_get ();
   sampler_el->num_peers = 0;
   sampler_el->num_change = 0;
@@ -479,7 +473,6 @@ RPS_sampler_resize (unsigned int new_size)
  * Initialise a tuple of sampler elements.
  *
  * @param init_size the size the sampler is initialised with
- * @param id with which all newly created sampler elements are initialised
  * @param ins_cb the callback that will be called on every PeerID that is 
  *               newly inserted into a sampler element
  * @param ins_cls the closure given to #ins_cb
@@ -489,7 +482,6 @@ RPS_sampler_resize (unsigned int new_size)
  */
   void
 RPS_sampler_init (size_t init_size,
-    const struct GNUNET_PeerIdentity *id,
     struct GNUNET_TIME_Relative max_round_interval,
     RPS_sampler_insert_cb ins_cb, void *ins_cls,
     RPS_sampler_remove_cb rem_cb, void *rem_cls)
@@ -513,7 +505,6 @@ RPS_sampler_init (size_t init_size,
   //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
   //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
   RPS_sampler_resize (init_size);
-  RPS_sampler_update_list (id); // no super nice desing but ok for the moment
 
   client_get_index = 0;
 
@@ -568,13 +559,14 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id)
  * corrsponding peer to the client.
  * Only used internally
  */
-  const struct GNUNET_PeerIdentity * 
-RPS_sampler_get_rand_peer_ ()
+  void
+RPS_sampler_get_rand_peer_ (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  struct GetPeerCls *gpc;
   uint32_t r_index;
-  const struct GNUNET_PeerIdentity *peer; // do we have to malloc that?
+  struct GNUNET_HashCode *hash;
 
-  // TODO implement extra logic
+  gpc = (struct GetPeerCls *) cls;
 
   /**;
    * Choose the r_index of the peer we want to return
@@ -583,50 +575,25 @@ RPS_sampler_get_rand_peer_ ()
   r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
       sampler->sampler_size);
 
-  //if ( EMPTY == sampler->sampler_elements[r_index]->is_empty )
-  //  // TODO schedule for later
-  //  peer = NULL;
-  //else
-    peer = &(sampler->sampler_elements[r_index]->peer_id);
-  //sampler->sampler_elements[r_index]->last_client_request = GNUNET_TIME_absolute_get();
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", GNUNET_i2s(peer));
-
-  return peer;
-}
-
-
-/**
- * Get n random peers out of the sampled peers.
- *
- * We might want to reinitialise this sampler after giving the
- * corrsponding peer to the client.
- * Random with or without consumption?
- * Only used internally
- */
-  const struct GNUNET_PeerIdentity *
-RPS_sampler_get_n_rand_peers_ (uint32_t n)
-{
-  if ( 0 == sampler->sampler_size )
+  if ( EMPTY == sampler->sampler_elements[r_index]->is_empty )
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "Sgrp: List empty - Returning NULL\n");
-    return NULL;
+    gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(
+                                                                   GNUNET_TIME_UNIT_SECONDS,
+                                                                   .1),
+                                                       &RPS_sampler_get_rand_peer_,
+                                                       cls);
+    return;
   }
-  else
-  {
-    // TODO check if we have too much (distinct) sampled peers
-    // If we are not ready yet maybe schedule for later
-    struct GNUNET_PeerIdentity *peers;
-    uint32_t i;
 
-    peers = GNUNET_malloc (n * sizeof(struct GNUNET_PeerIdentity));
+  *gpc->id = sampler->sampler_elements[r_index]->peer_id;
 
-    for ( i = 0 ; i < n ; i++ ) {
-      //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements);
-      memcpy (&peers[i], RPS_sampler_get_rand_peer_ (), sizeof (struct GNUNET_PeerIdentity));
-    }
-    return peers;
-  }
+  hash = GNUNET_new (struct GNUNET_HashCode);
+  GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
+  if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task))
+      LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n");
+  GNUNET_free (gpc->get_peer_task);
+
+  gpc->cb (gpc->cb_cls, gpc->id);
 }
 
 
@@ -639,28 +606,28 @@ RPS_sampler_get_n_rand_peers_ (uint32_t n)
  * @return a random PeerID of the PeerIDs previously put into the sampler.
  */
   void
-//RPS_sampler_get_rand_peer (RPS_sampler_rand_peer_ready_cb cb,
-//    void *cls, struct GNUNET_PeerIdentity *id)
 RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GetPeerCls *gpc;
+  struct GNUNET_PeerIdentity tmp_id;
   struct RPS_SamplerElement *s_elem;
   struct GNUNET_TIME_Relative last_request_diff;
   struct GNUNET_HashCode *hash;
   uint32_t tmp_client_get_index;
-  //struct GNUNET_TIME_Relative inv_last_request_diff;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Single peer was requested\n");
 
   gpc = (struct GetPeerCls *) cls;
   hash = GNUNET_new (struct GNUNET_HashCode);
+
+  /* Store the next #client_get_index to check whether we cycled over the whole list */
   if (0 < client_get_index)
     tmp_client_get_index = client_get_index - 1;
   else
     tmp_client_get_index = sampler->sampler_size - 1;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-      "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ".\n",
+      "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n",
       tmp_client_get_index, sampler->sampler_size);
 
   do
@@ -674,17 +641,22 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext
       return;
     }
 
-    *gpc->id = sampler->sampler_elements[client_get_index]->peer_id;
-
+    tmp_id = sampler->sampler_elements[client_get_index]->peer_id;
     RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]);
+    RPS_sampler_elem_next (sampler->sampler_elements[client_get_index], &tmp_id,
+                           NULL, NULL, NULL, NULL);
+
+    /* Cycle the #client_get_index one step further */
     if ( client_get_index == sampler->sampler_size - 1 )
       client_get_index = 0;
     else
       client_get_index++;
+
     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: incremented index to %" PRIX32 ".\n", client_get_index);
   } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty);
 
   s_elem = sampler->sampler_elements[client_get_index];
+  *gpc->id = s_elem->peer_id;
 
   /* Check whether we may use this sampler to give it back to the client */
   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
@@ -729,54 +701,49 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext
  *
  * @param cb callback that will be called once the ids are ready.
  * @param cls closure given to @a cb
+ * @param for_client #GNUNET_YES if result is used for client,
+ *                   #GNUNET_NO if used internally
  * @param num_peers the number of peers requested
  */
   void
 RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
-    void *cls, uint32_t num_peers)
+    void *cls, uint32_t num_peers, int for_client)
 {
-  if ( 0 == sampler->sampler_size )
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "Sgrp: List empty - Returning NULL\n");
-    cb (cls, NULL, 0);
-  }
-  else
-  {
-    // TODO check if we have too much (distinct) sampled peers
-    // If we are not ready yet maybe schedule for later
-    uint32_t i;
-    struct RPS_GetNRandPeersReadyCls *cb_cls;
-    struct GetPeerCls *gpc;
-    struct GNUNET_HashCode *hash;
-    
-    hash = GNUNET_new (struct GNUNET_HashCode);
+  GNUNET_assert (0 != sampler->sampler_size);
 
-    cb_cls = GNUNET_new (struct RPS_GetNRandPeersReadyCls);
-    cb_cls->num_peers = num_peers;
-    cb_cls->cur_num_peers = 0;
-    cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
-    cb_cls->callback = cb;
-    cb_cls->cls = cls;
+  // TODO check if we have too much (distinct) sampled peers
+  uint32_t i;
+  struct NRandPeersReadyCls *cb_cls;
+  struct GetPeerCls *gpc;
+  struct GNUNET_HashCode *hash;
 
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers);
+  hash = GNUNET_new (struct GNUNET_HashCode);
 
-    for ( i = 0 ; i < num_peers ; i++ )
-    {
-      gpc = GNUNET_new (struct GetPeerCls);
-      gpc->cb = RPS_sampler_get_n_rand_peers_ready_cb;
-      gpc->cb_cls = cb_cls;
-      gpc->id = &cb_cls->ids[i];
+  cb_cls = GNUNET_new (struct NRandPeersReadyCls);
+  cb_cls->num_peers = num_peers;
+  cb_cls->cur_num_peers = 0;
+  cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
+  cb_cls->callback = cb;
+  cb_cls->cls = cls;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+      "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers);
 
-      // maybe add a little delay
+  for ( i = 0 ; i < num_peers ; i++ )
+  {
+    gpc = GNUNET_new (struct GetPeerCls);
+    gpc->cb = check_n_peers_ready;
+    gpc->cb_cls = cb_cls;
+    gpc->id = &cb_cls->ids[i];
+
+    // maybe add a little delay
+    if (GNUNET_YES == for_client)
       gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer, gpc);
-      GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
-      (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, &gpc->get_peer_task,
-                                                GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-      //RPS_sampler_get_rand_peer (RPS_sampler_get_n_rand_peers_ready_cb,
-      //    cb_cls, &peers[i]);
-    }
+    else if (GNUNET_NO == for_client)
+      gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer_, gpc);
+    GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
+    (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, &gpc->get_peer_task,
+        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   }
 }
 
index 451d3cdb0c9365cbed7e8cd4d00333790cf27536..a7021b4fe52352d3c1c4968d485835487149180c 100644 (file)
@@ -89,7 +89,6 @@ RPS_sampler_resize (unsigned int new_size);
  */
   void
 RPS_sampler_init (size_t init_size,
-    const struct GNUNET_PeerIdentity *id,
     struct GNUNET_TIME_Relative max_round_interval,
     RPS_sampler_insert_cb ins_cb, void *ins_cls,
     RPS_sampler_remove_cb rem_cb, void *rem_cls);
@@ -115,18 +114,6 @@ RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id);
 RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id);
 
 
-/**
- * Get n random peers out of the sampled peers.
- *
- * We might want to reinitialise this sampler after giving the
- * corrsponding peer to the client.
- * Random with or without consumption?
- * Only used internally
- */
-  const struct GNUNET_PeerIdentity *
-RPS_sampler_get_n_rand_peers_ (uint32_t n);
-
-
 /**
  * Get n random peers out of the sampled peers.
  *
@@ -136,11 +123,13 @@ RPS_sampler_get_n_rand_peers_ (uint32_t n);
  *
  * @param cb callback that will be called once the ids are ready.
  * @param cls closure given to @a cb
+ * @param for_client #GNUNET_YES if result is used for client,
+ *                   #GNUNET_NO if used internally
  * @param num_peers the number of peers requested
  */
     void
 RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
-    void *cls, uint32_t num_peers);
+    void *cls, uint32_t num_peers, int for_client);
 
 
 /**