+void
+check_peer_live (struct PeerContext *peer_ctx)
+{
+ (void) get_channel (peer_map, &peer_ctx->peer_id);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Get informed about peer %s getting live\n",
+ GNUNET_i2s (&peer_ctx->peer_id));
+ if (NULL == peer_ctx->is_live_task)
+ {
+ peer_ctx->is_live_task =
+ GNUNET_CADET_notify_transmit_ready (peer_ctx->send_channel,
+ GNUNET_NO,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ sizeof (struct GNUNET_MessageHeader),
+ cadet_ntfy_tmt_rdy_cb,
+ peer_ctx);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Already waiting for notification\n");
+ }
+}
+
+
+/**
+ * Sum all time relatives of an array.
+ */
+ struct GNUNET_TIME_Relative
+T_relative_sum (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size)
+{
+ struct GNUNET_TIME_Relative sum;
+ uint32_t i;
+
+ sum = GNUNET_TIME_UNIT_ZERO;
+ for ( i = 0 ; i < arr_size ; i++ )
+ {
+ sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
+ }
+ return sum;
+}
+
+
+/**
+ * Compute the average of given time relatives.
+ */
+ struct GNUNET_TIME_Relative
+T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size)
+{
+ return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), arr_size);
+}
+
+
+/**
+ * Insert PeerID in #pull_list
+ *
+ * Called once we know a peer is live.
+ */
+ void
+insert_in_pull_list (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+ if (GNUNET_NO == in_arr (pull_list, pull_list_size, peer))
+ GNUNET_array_append (pull_list, pull_list_size, *peer);
+
+ peer_clean (peer);
+}
+
+/**
+ * Check whether #insert_in_pull_list was already scheduled
+ */
+ int
+insert_in_pull_list_scheduled (const struct PeerContext *peer_ctx)
+{
+ unsigned int i;
+
+ for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+ if (insert_in_pull_list == peer_ctx->outstanding_ops[i].op)
+ return GNUNET_YES;
+ return GNUNET_NO;
+}
+
+
+/**
+ * Insert PeerID in #gossip_list
+ *
+ * Called once we know a peer is live.
+ */
+ void
+insert_in_gossip_list (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+ if (GNUNET_NO == in_arr (gossip_list, gossip_list_size, peer))
+ GNUNET_array_append (gossip_list, gossip_list_size, *peer);
+
+ (void) get_channel (peer_map, peer);
+}
+
+/**
+ * Check whether #insert_in_gossip_list was already scheduled
+ */
+ int
+insert_in_gossip_list_scheduled (const struct PeerContext *peer_ctx)
+{
+ unsigned int i;
+
+ for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+ if (insert_in_gossip_list == peer_ctx->outstanding_ops[i].op)
+ return GNUNET_YES;
+ return GNUNET_NO;
+}
+
+
+/**
+ * Update sampler with given PeerID.
+ */
+ void
+insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Updating samplers with peer %s from insert_in_sampler()\n",
+ GNUNET_i2s (peer));
+ RPS_sampler_update (prot_sampler, peer);
+ RPS_sampler_update (client_sampler, peer);
+}
+
+
+/**
+ * Check whether #insert_in_sampler was already scheduled
+ */
+static int
+insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
+{
+ unsigned int i;
+
+ for (i = 0 ; i < peer_ctx->num_outstanding_ops ; i++)
+ if (insert_in_sampler== peer_ctx->outstanding_ops[i].op)
+ return GNUNET_YES;
+ return GNUNET_NO;
+}
+
+
+/**
+ * Wrapper around #RPS_sampler_resize()
+ *
+ * If we do not have enough sampler elements, double current sampler size
+ * If we have more than enough sampler elements, halv current sampler size
+ */
+static void
+resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
+{
+ unsigned int sampler_size;
+
+ // TODO statistics
+ // TODO respect the min, max
+ sampler_size = RPS_sampler_get_size (sampler);
+ if (sampler_size > new_size * 4)
+ { /* Shrinking */
+ RPS_sampler_resize (sampler, sampler_size / 2);
+ }
+ else if (sampler_size < new_size)
+ { /* Growing */
+ RPS_sampler_resize (sampler, sampler_size * 2);
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
+}
+
+
+/**
+ * Wrapper around #RPS_sampler_resize() resizing the client sampler
+ */
+static void
+client_resize_wrapper ()
+{
+ uint32_t bigger_size;
+ unsigned int sampler_size;
+
+ // TODO statistics
+
+ sampler_size = RPS_sampler_get_size (client_sampler);
+
+ if (sampler_size_est_need > sampler_size_client_need)
+ bigger_size = sampler_size_est_need;
+ else
+ bigger_size = sampler_size_client_need;
+
+ // TODO respect the min, max
+ resize_wrapper (client_sampler, bigger_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
+}
+
+
+/**
+ * Estimate request rate
+ *
+ * Called every time we receive a request from the client.
+ */
+ void
+est_request_rate()
+{
+ struct GNUNET_TIME_Relative max_round_duration;
+
+ if (request_deltas_size > req_counter)
+ req_counter++;
+ if ( 1 < req_counter)
+ {
+ /* Shift last request deltas to the right */
+ memcpy (&request_deltas[1],
+ request_deltas,
+ (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
+
+ /* Add current delta to beginning */
+ request_deltas[0] =
+ GNUNET_TIME_absolute_get_difference (last_request,
+ GNUNET_TIME_absolute_get ());
+ request_rate = T_relative_avg (request_deltas, req_counter);
+
+ /* Compute the duration a round will maximally take */
+ max_round_duration =
+ GNUNET_TIME_relative_add (round_interval,
+ GNUNET_TIME_relative_divide (round_interval, 2));
+
+ /* Set the estimated size the sampler has to have to
+ * satisfy the current client request rate */
+ sampler_size_client_need =
+ max_round_duration.rel_value_us / request_rate.rel_value_us;
+
+ /* Resize the sampler */
+ client_resize_wrapper ();
+ }
+ last_request = GNUNET_TIME_absolute_get ();
+}
+
+
+/**
+ * Add all peers in @a peer_array to @peer_map used as set.
+ *
+ * @param peer_array array containing the peers
+ * @param num_peers number of peers in @peer_array
+ * @param peer_map the peermap to use as set
+ */
+static void
+add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
+ unsigned int num_peers,
+ struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
+{
+ unsigned int i;
+ if (NULL == peer_map)
+ peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers,
+ GNUNET_NO);
+ for (i = 0 ; i < num_peers ; i++)
+ {
+ GNUNET_CONTAINER_multipeermap_put (peer_map,
+ &peer_array[i],
+ NULL,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ }
+}
+
+
+/**
+ * Send a PULL REPLY to @a peer_id
+ *
+ * @param peer_id the peer to send the reply to.
+ * @param peer_ids the peers to send to @a peer_id
+ * @param num_peer_ids the number of peers to send to @a peer_id
+ */
+static void
+send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
+ const struct GNUNET_PeerIdentity *peer_ids,
+ unsigned int num_peer_ids)
+{
+ uint32_t send_size;
+ struct GNUNET_MQ_Handle *mq;
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
+
+ /* Compute actual size */
+ send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
+ num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
+
+ if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
+ /* Compute number of peers to send
+ * If too long, simply truncate */
+ // TODO select random ones via permutation
+ // or even better: do good protocol design
+ send_size =
+ (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
+ sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
+ sizeof (struct GNUNET_PeerIdentity);
+ else
+ send_size = num_peer_ids;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "PULL REQUEST from peer %s received, going to send %u peers\n",
+ GNUNET_i2s (peer_id), send_size);
+
+ mq = get_mq (peer_map, peer_id);
+
+ ev = GNUNET_MQ_msg_extra (out_msg,
+ send_size * sizeof (struct GNUNET_PeerIdentity),
+ GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
+ out_msg->num_peers = htonl (send_size);
+ memcpy (&out_msg[1], peer_ids,
+ send_size * sizeof (struct GNUNET_PeerIdentity));
+
+ GNUNET_MQ_send (mq, ev);
+}
+
+
+/***********************************************************************
+ * /Util functions
+***********************************************************************/
+
+
+
+
+
+/**
+ * Function called by NSE.
+ *
+ * Updates sizes of sampler list and gossip list and adapt those lists
+ * accordingly.
+ */
+ void
+nse_callback (void *cls, struct GNUNET_TIME_Absolute timestamp,
+ double logestimate, double std_dev)