// TODO connect to friends
-// TODO store peers somewhere persistent
-
// TODO blacklist? (-> mal peer detection on top of brahms)
// hist_size_init, hist_size_max
peer_ctx->liveliness_check_pending,
sizeof (struct PendingMessage))) )
{
- // TODO this may leak memory
peer_ctx->liveliness_check_pending = NULL;
GNUNET_STATISTICS_update (stats,
"# pending liveliness checks",
* @brief Destroy the send channel of a peer e.g. stop indicating a sending
* intention to another peer
*
- * If there is also no channel to receive messages from that peer, remove it
- * from the peermap.
- * TODO really?
- *
* @peer the peer identity of the peer whose sending channel to destroy
* @return #GNUNET_YES if channel was destroyed
* #GNUNET_NO otherwise
int64_t view_updates_left;
/**
- * @brief How many peers from the biased
- * stream this client expects to receive.
+ * @brief Whether this client wants to receive stream updates.
+ * Either #GNUNET_YES or #GNUNET_NO
*/
- int64_t stream_peers_left;
+ int8_t stream_update;
/**
* The client handle to send the reply to
* @param view_size the size of the view array (can be 0)
*/
void
-send_stream_peer (const struct ClientContext *cli_ctx,
- uint64_t num_peers,
- const struct GNUNET_PeerIdentity *peers)
+send_stream_peers (const struct ClientContext *cli_ctx,
+ uint64_t num_peers,
+ const struct GNUNET_PeerIdentity *peers)
{
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
for (cli_ctx_iter = cli_ctx_head;
NULL != cli_ctx_iter;
- cli_ctx_iter = cli_ctx_head->next)
+ cli_ctx_iter = cli_ctx_iter->next)
{
if (1 < cli_ctx_iter->view_updates_left)
{
// TODO enum StreamPeerSource)
{
struct ClientContext *cli_ctx_iter;
- uint64_t num_peers_send;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Got peer (%s) from biased stream - update all clients\n",
for (cli_ctx_iter = cli_ctx_head;
NULL != cli_ctx_iter;
- cli_ctx_iter = cli_ctx_head->next)
+ cli_ctx_iter = cli_ctx_iter->next)
{
- if (0 < cli_ctx_iter->stream_peers_left)
- {
- /* Client wants to receive limited amount of updates */
- if (num_peers > cli_ctx_iter->stream_peers_left)
- {
- num_peers_send = num_peers - cli_ctx_iter->stream_peers_left;
- cli_ctx_iter->stream_peers_left = 0;
- }
- else
- {
- num_peers_send = cli_ctx_iter->stream_peers_left - num_peers;
- cli_ctx_iter->stream_peers_left -= num_peers_send;
- }
- } else if (0 > cli_ctx_iter->stream_peers_left)
- {
- /* Client is not interested in updates */
- continue;
- } else /* _updates_left == 0 - infinite amount of updates */
+ if (GNUNET_YES == cli_ctx_iter->stream_update)
{
- num_peers_send = num_peers;
+ send_stream_peers (cli_ctx_iter, num_peers, peers);
}
-
- /* send view */
- send_stream_peer (cli_ctx_iter, num_peers_send, peers);
}
}
const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
{
struct ClientContext *cli_ctx = cls;
- uint64_t num_peers;
-
- num_peers = ntohl (msg->num_peers);
+ (void) msg;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Client requested %" PRIu64 " peers from biased stream.\n",
- num_peers);
+ "Client requested peers from biased stream.\n");
+ cli_ctx->stream_update = GNUNET_YES;
GNUNET_assert (NULL != cli_ctx);
- cli_ctx->stream_peers_left = num_peers;
GNUNET_SERVICE_client_continue (cli_ctx->client);
}
if (GNUNET_OK == inserted)
{
clients_notify_stream_peer (1,
- CustomPeerMap_get_peer_by_index (push_map, permut[i]));
+ CustomPeerMap_get_peer_by_index (pull_map,
+ permut[i - first_border]));
}
to_file (file_name_view_log,
"+%s\t(pull list)",
cli_ctx = GNUNET_new (struct ClientContext);
cli_ctx->mq = mq;
cli_ctx->view_updates_left = -1;
+ cli_ctx->stream_update = GNUNET_NO;
cli_ctx->client = client;
GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
cli_ctx_tail,
notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX);
notify_ctx->notify_cb = notify_cb;
notify_ctx->cls = cls;
- if (NULL != sampler->notify_ctx_head)
- {
- for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head;
- NULL != notify_iter->next;
- notify_iter = notify_iter->next)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Pre: Context\n");
- }
- }
GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head,
sampler->notify_ctx_tail,
notify_ctx);
- for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head;
- NULL != notify_iter;
- notify_iter = notify_iter->next)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Post: Context\n");
- }
return notify_ctx;
}
/**
- * Update every sampler element of this sampler with given peer
+ * @brief Notify about update of the sampler.
*
- * @param sampler the sampler to update.
- * @param id the PeerID that is put in the sampler
+ * Call the callbacks that are waiting for notification on updates to the
+ * sampler.
+ *
+ * @param sampler The sampler the updates are waiting for
*/
- void
-RPS_sampler_update (struct RPS_Sampler *sampler,
- const struct GNUNET_PeerIdentity *id)
+static void
+notify_update (struct RPS_Sampler *sampler)
{
struct SamplerNotifyUpdateCTX *tmp_notify_head;
struct SamplerNotifyUpdateCTX *tmp_notify_tail;
- to_file (sampler->file_name,
- "Got %s",
- GNUNET_i2s_full (id));
-
- for (uint32_t i = 0; i < sampler->sampler_size; i++)
- {
- RPS_sampler_elem_next (sampler->sampler_elements[i],
- id);
- }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Calling callbacks waiting for update notification.\n");
tmp_notify_head = sampler->notify_ctx_head;
tmp_notify_tail = sampler->notify_ctx_tail;
sampler->notify_ctx_head = NULL;
}
+/**
+ * Update every sampler element of this sampler with given peer
+ *
+ * @param sampler the sampler to update.
+ * @param id the PeerID that is put in the sampler
+ */
+ void
+RPS_sampler_update (struct RPS_Sampler *sampler,
+ const struct GNUNET_PeerIdentity *id)
+{
+ to_file (sampler->file_name,
+ "Got %s",
+ GNUNET_i2s_full (id));
+
+ for (uint32_t i = 0; i < sampler->sampler_size; i++)
+ {
+ RPS_sampler_elem_next (sampler->sampler_elements[i],
+ id);
+ }
+ notify_update (sampler);
+}
+
+
/**
* Reinitialise all previously initialised sampler elements with the given value.
*
/* Check whether we may use this sampler to give it back to the client */
if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
{
+ // TODO remove this condition at least for the client sampler
last_request_diff =
GNUNET_TIME_absolute_get_difference (s_elem->last_client_request,
GNUNET_TIME_absolute_get ());
#define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__)
+/**
+ * Handle for a request to get peers from biased stream of ids
+ */
+struct GNUNET_RPS_StreamRequestHandle
+{
+ /**
+ * The client issuing the request.
+ */
+ struct GNUNET_RPS_Handle *rps_handle;
+
+ /**
+ * The number of requested peers.
+ */
+ uint32_t num_peers_left;
+
+ /**
+ * The callback to be called when we receive an answer.
+ */
+ GNUNET_RPS_NotifyReadyCB ready_cb;
+
+ /**
+ * The closure for the callback.
+ */
+ void *ready_cb_cls;
+
+ /**
+ * @brief Next element of the DLL
+ */
+ struct GNUNET_RPS_StreamRequestHandle *next;
+
+ /**
+ * @brief Previous element of the DLL
+ */
+ struct GNUNET_RPS_StreamRequestHandle *prev;
+};
+
+
/**
* Handler to handle requests from a client.
*/
void *view_update_cls;
/**
- * @brief Callback called on each peer of the biased input stream
+ * @brief Closure to each requested peer from the biased stream
+ */
+ void *stream_input_cls;
+
+ /**
+ * @brief Head of the DLL of stream requests
*/
- GNUNET_RPS_NotifyReadyCB stream_input_cb;
+ struct GNUNET_RPS_StreamRequestHandle *stream_requests_head;
/**
- * @brief Closure to each requested peer from the biased stream
+ * @brief Tail of the DLL of stream requests
*/
- void *stream_input_cls;
+ struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail;
};
};
+/**
+ * @brief Create a new handle for a stream request
+ *
+ * @param rps_handle The rps handle
+ * @param num_peers The number of desired peers
+ * @param ready_cb The callback to be called, once all peers are ready
+ * @param cls The colsure to provide to the callback
+ *
+ * @return The handle to the stream request
+ */
+static struct GNUNET_RPS_StreamRequestHandle *
+new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
+ uint64_t num_peers,
+ GNUNET_RPS_NotifyReadyCB ready_cb,
+ void *cls)
+{
+ struct GNUNET_RPS_StreamRequestHandle *srh;
+
+ srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle);
+
+ srh->rps_handle = rps_handle;
+ srh->num_peers_left = num_peers;
+ srh->ready_cb = ready_cb;
+ srh->ready_cb_cls = cls;
+ GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head,
+ rps_handle->stream_requests_tail,
+ srh);
+
+ return srh;
+}
+
+
+/**
+ * @brief Remove the given stream request from the list of requests and memory
+ *
+ * @param srh The request to be removed
+ * @param srh_head Head of the DLL to remove request from
+ * @param srh_tail Tail of the DLL to remove request from
+ */
+static void
+remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh,
+ struct GNUNET_RPS_StreamRequestHandle *srh_head,
+ struct GNUNET_RPS_StreamRequestHandle *srh_tail)
+{
+ GNUNET_CONTAINER_DLL_remove (srh_head,
+ srh_tail,
+ srh);
+
+ GNUNET_free (srh);
+}
+
+
+/**
+ * @brief Create new request handle
+ *
+ * @param rps_handle Handle to the service
+ * @param num_requests Number of requests
+ * @param ready_cb Callback
+ * @param cls Closure
+ *
+ * @return The newly created request handle
+ */
+static struct GNUNET_RPS_Request_Handle *
+new_request_handle (struct GNUNET_RPS_Handle *rps_handle,
+ uint64_t num_requests,
+ struct RPS_Sampler *sampler,
+ GNUNET_RPS_NotifyReadyCB ready_cb,
+ void *cls)
+{
+ struct GNUNET_RPS_Request_Handle *rh;
+
+ rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
+ rh->rps_handle = rps_handle;
+ rh->id = rps_handle->current_request_id++;
+ rh->num_requests = num_requests;
+ rh->sampler = sampler;
+ rh->ready_cb = ready_cb;
+ rh->ready_cb_cls = cls;
+ GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+
+ return rh;
+}
+
+
/**
* @brief Send a request to the service.
*
* @param cls a closure that will be given to the callback
* @param ready_cb the callback called when the peers are available
*/
-void
+struct GNUNET_RPS_StreamRequestHandle *
GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
uint32_t num_peers,
GNUNET_RPS_NotifyReadyCB stream_input_cb,
void *cls)
{
+ struct GNUNET_RPS_StreamRequestHandle *srh;
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
+ srh = new_stream_request (rps_handle,
+ num_peers, /* num requests */
+ stream_input_cb,
+ cls);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Client requests %" PRIu32 " biased stream updates\n",
num_peers);
- rps_handle->stream_input_cb = stream_input_cb;
- rps_handle->stream_input_cls = cls;
ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
- msg->num_peers = htonl (num_peers);
GNUNET_MQ_send (rps_handle->mq, ev);
+ return srh;
}
}
+/**
+ * @brief Send message to service that this client does not want to receive
+ * further updates from the biased peer stream
+ *
+ * @param rps_handle The handle representing the service to the client
+ */
+static void
+cancel_stream (struct GNUNET_RPS_Handle *rps_handle)
+{
+ struct GNUNET_MQ_Envelope *ev;
+
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL);
+ GNUNET_MQ_send (rps_handle->mq, ev);
+}
+
+
+/**
+ * @brief Cancel a specific request for updates from the biased peer stream
+ *
+ * @param srh The request handle to cancel
+ */
+void
+GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh)
+{
+ struct GNUNET_RPS_Handle *rps_handle;
+
+ rps_handle = srh->rps_handle;
+ GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
+ rps_handle->stream_requests_tail,
+ srh);
+ GNUNET_free (srh);
+ if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle);
+}
+
+
/**
* This function is called, when the service sends another peer from the biased
* stream.
{
struct GNUNET_RPS_Handle *h = cls;
const struct GNUNET_PeerIdentity *peers;
+ /* The following two pointers are used to prevent that new handles are
+ * inserted into the DLL, that is currently iterated over, from within a call
+ * to that handler_cb, are executed and in turn again add themselves to the
+ * iterated DLL infinitely */
+ struct GNUNET_RPS_StreamRequestHandle *srh_head_tmp;
+ struct GNUNET_RPS_StreamRequestHandle *srh_tail_tmp;
+ uint64_t num_peers;
+ uint64_t num_peers_return;
- /* Give the peers back */
+ peers = (struct GNUNET_PeerIdentity *) &msg[1];
+ num_peers = ntohl (msg->num_peers);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "New peer of %" PRIu64 " biased input stream\n",
- ntohl (msg->num_peers));
+ "Received %" PRIu64 " peer(s) from stream input.\n",
+ num_peers);
+ srh_head_tmp = h->stream_requests_head;
+ srh_tail_tmp = h->stream_requests_tail;
+ h->stream_requests_head = NULL;
+ h->stream_requests_tail = NULL;
+ for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp;
+ NULL != srh_iter;
+ srh_iter = srh_iter->next)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Calling srh - left: %" PRIu64 "\n",
+ srh_iter->num_peers_left);
+ if (0 == srh_iter->num_peers_left) /* infinite updates */
+ {
+ num_peers_return = num_peers;
+ }
+ else if (num_peers > srh_iter->num_peers_left)
+ {
+ num_peers_return = num_peers - srh_iter->num_peers_left;
+ }
+ else /* num_peers <= srh_iter->num_peers_left */
+ {
+ num_peers_return = srh_iter->num_peers_left - num_peers;
+ }
+ srh_iter->ready_cb (srh_iter->ready_cb_cls,
+ num_peers_return,
+ peers);
+ if (0 == srh_iter->num_peers_left) ;
+ else if (num_peers_return >= srh_iter->num_peers_left)
+ {
+ remove_stream_request (srh_iter,
+ srh_head_tmp,
+ srh_tail_tmp);
+ }
+ else
+ {
+ srh_iter->num_peers_left -= num_peers_return;
+ }
+ }
+ for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp;
+ NULL != srh_iter;
+ srh_iter = srh_iter->next)
+ {
+ GNUNET_CONTAINER_DLL_insert (h->stream_requests_head,
+ h->stream_requests_tail,
+ srh_iter);
+ }
- peers = (struct GNUNET_PeerIdentity *) &msg[1];
- GNUNET_assert (NULL != h);
- GNUNET_assert (NULL != h->stream_input_cb);
- h->stream_input_cb (h->stream_input_cls, ntohl (msg->num_peers), peers);
+ if (NULL == h->stream_requests_head)
+ {
+ cancel_stream (h);
+ }
}
}
-/**
- * @brief Create new request handle
- *
- * @param rps_handle Handle to the service
- * @param num_requests Number of requests
- * @param ready_cb Callback
- * @param cls Closure
- *
- * @return The newly created request handle
- */
-static struct GNUNET_RPS_Request_Handle *
-new_request_handle (struct GNUNET_RPS_Handle *rps_handle,
- uint64_t num_requests,
- struct RPS_Sampler *sampler,
- GNUNET_RPS_NotifyReadyCB ready_cb,
- void *cls)
-{
- struct GNUNET_RPS_Request_Handle *rh;
-
- rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
- rh->rps_handle = rps_handle;
- rh->id = rps_handle->current_request_id++;
- rh->num_requests = num_requests;
- rh->sampler = sampler;
- rh->ready_cb = ready_cb;
- rh->ready_cb_cls = cls;
- GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
-
- return rh;
-}
-
-
/**
* Request n random peers.
*
*/
struct GNUNET_RPS_Request_Handle *
GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
- uint32_t num_req_peers,
- GNUNET_RPS_NotifyReadyCB ready_cb,
- void *cls)
+ uint32_t num_req_peers,
+ GNUNET_RPS_NotifyReadyCB ready_cb,
+ void *cls)
{
struct GNUNET_RPS_Request_Handle *rh;