From a38275062455b7991c6a8536db38a55135cefff2 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Julius=20B=C3=BCnger?= Date: Wed, 26 Sep 2018 00:22:41 +0200 Subject: [PATCH] Change architecture of RPS service - api --- po/POTFILES.in | 1 + src/include/gnunet_rps_service.h | 2 +- src/rps/Makefile.am | 3 +- src/rps/gnunet-rps-profiler.c | 4 +- src/rps/gnunet-service-rps.c | 61 ++---- src/rps/gnunet-service-rps_sampler.c | 65 +++--- src/rps/rps.h | 11 - src/rps/rps_api.c | 289 ++++++++++++++++++++++----- 8 files changed, 293 insertions(+), 143 deletions(-) diff --git a/po/POTFILES.in b/po/POTFILES.in index 45471731c..44bd751e4 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -329,6 +329,7 @@ src/rps/gnunet-service-rps_sampler.c src/rps/gnunet-service-rps_sampler_elem.c src/rps/gnunet-service-rps_view.c src/rps/rps_api.c +src/rps/rps_test_lib.c src/rps/rps-test_util.c src/scalarproduct/gnunet-scalarproduct.c src/scalarproduct/gnunet-service-scalarproduct_alice.c diff --git a/src/include/gnunet_rps_service.h b/src/include/gnunet_rps_service.h index 22e944d0f..f77c3dbc4 100644 --- a/src/include/gnunet_rps_service.h +++ b/src/include/gnunet_rps_service.h @@ -162,7 +162,7 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, * @param cls a closure that will be given to the callback * @param ready_cb the callback called when the peers are available */ -void +struct GNUNET_RPS_StreamRequestHandle * GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, uint32_t num_updates, GNUNET_RPS_NotifyReadyCB stream_input_cb, diff --git a/src/rps/Makefile.am b/src/rps/Makefile.am index 2ed93ef7c..5e9fd09fa 100644 --- a/src/rps/Makefile.am +++ b/src/rps/Makefile.am @@ -90,7 +90,8 @@ endif rps_test_src = \ test_rps.c \ rps-test_util.h rps-test_util.c \ - gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c + gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \ + gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c ld_rps_test_lib = \ libgnunetrps.la \ diff --git a/src/rps/gnunet-rps-profiler.c b/src/rps/gnunet-rps-profiler.c index d2640225a..f2a8083e7 100644 --- a/src/rps/gnunet-rps-profiler.c +++ b/src/rps/gnunet-rps-profiler.c @@ -1932,8 +1932,8 @@ static uint32_t binom (uint32_t n, uint32_t k) { //GNUNET_assert (n >= k); if (k > n) return 0; - if (0 > n) return 0; /* just for clarity - always false */ - if (0 > k) return 0; /* just for clarity - always false */ + /* if (0 > n) return 0; - always false */ + /* if (0 > k) return 0; - always false */ if (0 == k) return 1; return fac (n) / diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 40f576d3e..862514264 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c @@ -47,8 +47,6 @@ // TODO connect to friends -// TODO store peers somewhere persistent - // TODO blacklist? (-> mal peer detection on top of brahms) // hist_size_init, hist_size_max @@ -978,7 +976,6 @@ destroy_peer (struct PeerContext *peer_ctx) peer_ctx->liveliness_check_pending, sizeof (struct PendingMessage))) ) { - // TODO this may leak memory peer_ctx->liveliness_check_pending = NULL; GNUNET_STATISTICS_update (stats, "# pending liveliness checks", @@ -1620,10 +1617,6 @@ check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer) * @brief Destroy the send channel of a peer e.g. stop indicating a sending * intention to another peer * - * If there is also no channel to receive messages from that peer, remove it - * from the peermap. - * TODO really? - * * @peer the peer identity of the peer whose sending channel to destroy * @return #GNUNET_YES if channel was destroyed * #GNUNET_NO otherwise @@ -1777,10 +1770,10 @@ struct ClientContext int64_t view_updates_left; /** - * @brief How many peers from the biased - * stream this client expects to receive. + * @brief Whether this client wants to receive stream updates. + * Either #GNUNET_YES or #GNUNET_NO */ - int64_t stream_peers_left; + int8_t stream_update; /** * The client handle to send the reply to @@ -2232,9 +2225,9 @@ send_view (const struct ClientContext *cli_ctx, * @param view_size the size of the view array (can be 0) */ void -send_stream_peer (const struct ClientContext *cli_ctx, - uint64_t num_peers, - const struct GNUNET_PeerIdentity *peers) +send_stream_peers (const struct ClientContext *cli_ctx, + uint64_t num_peers, + const struct GNUNET_PeerIdentity *peers) { struct GNUNET_MQ_Envelope *ev; struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg; @@ -2275,7 +2268,7 @@ clients_notify_view_update (void) for (cli_ctx_iter = cli_ctx_head; NULL != cli_ctx_iter; - cli_ctx_iter = cli_ctx_head->next) + cli_ctx_iter = cli_ctx_iter->next) { if (1 < cli_ctx_iter->view_updates_left) { @@ -2306,7 +2299,6 @@ clients_notify_stream_peer (uint64_t num_peers, // TODO enum StreamPeerSource) { struct ClientContext *cli_ctx_iter; - uint64_t num_peers_send; LOG (GNUNET_ERROR_TYPE_DEBUG, "Got peer (%s) from biased stream - update all clients\n", @@ -2314,32 +2306,12 @@ clients_notify_stream_peer (uint64_t num_peers, for (cli_ctx_iter = cli_ctx_head; NULL != cli_ctx_iter; - cli_ctx_iter = cli_ctx_head->next) + cli_ctx_iter = cli_ctx_iter->next) { - if (0 < cli_ctx_iter->stream_peers_left) - { - /* Client wants to receive limited amount of updates */ - if (num_peers > cli_ctx_iter->stream_peers_left) - { - num_peers_send = num_peers - cli_ctx_iter->stream_peers_left; - cli_ctx_iter->stream_peers_left = 0; - } - else - { - num_peers_send = cli_ctx_iter->stream_peers_left - num_peers; - cli_ctx_iter->stream_peers_left -= num_peers_send; - } - } else if (0 > cli_ctx_iter->stream_peers_left) - { - /* Client is not interested in updates */ - continue; - } else /* _updates_left == 0 - infinite amount of updates */ + if (GNUNET_YES == cli_ctx_iter->stream_update) { - num_peers_send = num_peers; + send_stream_peers (cli_ctx_iter, num_peers, peers); } - - /* send view */ - send_stream_peer (cli_ctx_iter, num_peers_send, peers); } } @@ -3076,16 +3048,13 @@ handle_client_stream_request (void *cls, const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg) { struct ClientContext *cli_ctx = cls; - uint64_t num_peers; - - num_peers = ntohl (msg->num_peers); + (void) msg; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Client requested %" PRIu64 " peers from biased stream.\n", - num_peers); + "Client requested peers from biased stream.\n"); + cli_ctx->stream_update = GNUNET_YES; GNUNET_assert (NULL != cli_ctx); - cli_ctx->stream_peers_left = num_peers; GNUNET_SERVICE_client_continue (cli_ctx->client); } @@ -3882,7 +3851,8 @@ do_round (void *cls) if (GNUNET_OK == inserted) { clients_notify_stream_peer (1, - CustomPeerMap_get_peer_by_index (push_map, permut[i])); + CustomPeerMap_get_peer_by_index (pull_map, + permut[i - first_border])); } to_file (file_name_view_log, "+%s\t(pull list)", @@ -4195,6 +4165,7 @@ client_connect_cb (void *cls, cli_ctx = GNUNET_new (struct ClientContext); cli_ctx->mq = mq; cli_ctx->view_updates_left = -1; + cli_ctx->stream_update = GNUNET_NO; cli_ctx->client = client; GNUNET_CONTAINER_DLL_insert (cli_ctx_head, cli_ctx_tail, diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index ff4bc9e42..2cd4cb996 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c @@ -313,26 +313,9 @@ sampler_notify_on_update (struct RPS_Sampler *sampler, notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX); notify_ctx->notify_cb = notify_cb; notify_ctx->cls = cls; - if (NULL != sampler->notify_ctx_head) - { - for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head; - NULL != notify_iter->next; - notify_iter = notify_iter->next) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Pre: Context\n"); - } - } GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head, sampler->notify_ctx_tail, notify_ctx); - for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head; - NULL != notify_iter; - notify_iter = notify_iter->next) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Post: Context\n"); - } return notify_ctx; } @@ -559,27 +542,21 @@ RPS_sampler_mod_init (size_t init_size, /** - * Update every sampler element of this sampler with given peer + * @brief Notify about update of the sampler. * - * @param sampler the sampler to update. - * @param id the PeerID that is put in the sampler + * Call the callbacks that are waiting for notification on updates to the + * sampler. + * + * @param sampler The sampler the updates are waiting for */ - void -RPS_sampler_update (struct RPS_Sampler *sampler, - const struct GNUNET_PeerIdentity *id) +static void +notify_update (struct RPS_Sampler *sampler) { struct SamplerNotifyUpdateCTX *tmp_notify_head; struct SamplerNotifyUpdateCTX *tmp_notify_tail; - to_file (sampler->file_name, - "Got %s", - GNUNET_i2s_full (id)); - - for (uint32_t i = 0; i < sampler->sampler_size; i++) - { - RPS_sampler_elem_next (sampler->sampler_elements[i], - id); - } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Calling callbacks waiting for update notification.\n"); tmp_notify_head = sampler->notify_ctx_head; tmp_notify_tail = sampler->notify_ctx_tail; sampler->notify_ctx_head = NULL; @@ -598,6 +575,29 @@ RPS_sampler_update (struct RPS_Sampler *sampler, } +/** + * Update every sampler element of this sampler with given peer + * + * @param sampler the sampler to update. + * @param id the PeerID that is put in the sampler + */ + void +RPS_sampler_update (struct RPS_Sampler *sampler, + const struct GNUNET_PeerIdentity *id) +{ + to_file (sampler->file_name, + "Got %s", + GNUNET_i2s_full (id)); + + for (uint32_t i = 0; i < sampler->sampler_size; i++) + { + RPS_sampler_elem_next (sampler->sampler_elements[i], + id); + } + notify_update (sampler); +} + + /** * Reinitialise all previously initialised sampler elements with the given value. * @@ -714,6 +714,7 @@ sampler_mod_get_rand_peer (void *cls) /* Check whether we may use this sampler to give it back to the client */ if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us) { + // TODO remove this condition at least for the client sampler last_request_diff = GNUNET_TIME_absolute_get_difference (s_elem->last_client_request, GNUNET_TIME_absolute_get ()); diff --git a/src/rps/rps.h b/src/rps/rps.h index 26615bfc5..9e4487f88 100644 --- a/src/rps/rps.h +++ b/src/rps/rps.h @@ -226,12 +226,6 @@ struct GNUNET_RPS_CS_DEBUG_StreamRequest * Header including size and type in NBO */ struct GNUNET_MessageHeader header; - - /** - * Number of peers - * 0 for sending updates until cancellation - */ - uint32_t num_peers GNUNET_PACKED; }; /** @@ -244,11 +238,6 @@ struct GNUNET_RPS_CS_DEBUG_StreamReply */ struct GNUNET_MessageHeader header; - /** - * Identifyer of the message. - */ - uint32_t id GNUNET_PACKED; - /** * Number of peers */ diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index a558c8a35..e4f4db506 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -31,6 +31,43 @@ #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__) +/** + * Handle for a request to get peers from biased stream of ids + */ +struct GNUNET_RPS_StreamRequestHandle +{ + /** + * The client issuing the request. + */ + struct GNUNET_RPS_Handle *rps_handle; + + /** + * The number of requested peers. + */ + uint32_t num_peers_left; + + /** + * The callback to be called when we receive an answer. + */ + GNUNET_RPS_NotifyReadyCB ready_cb; + + /** + * The closure for the callback. + */ + void *ready_cb_cls; + + /** + * @brief Next element of the DLL + */ + struct GNUNET_RPS_StreamRequestHandle *next; + + /** + * @brief Previous element of the DLL + */ + struct GNUNET_RPS_StreamRequestHandle *prev; +}; + + /** * Handler to handle requests from a client. */ @@ -67,14 +104,19 @@ struct GNUNET_RPS_Handle void *view_update_cls; /** - * @brief Callback called on each peer of the biased input stream + * @brief Closure to each requested peer from the biased stream + */ + void *stream_input_cls; + + /** + * @brief Head of the DLL of stream requests */ - GNUNET_RPS_NotifyReadyCB stream_input_cb; + struct GNUNET_RPS_StreamRequestHandle *stream_requests_head; /** - * @brief Closure to each requested peer from the biased stream + * @brief Tail of the DLL of stream requests */ - void *stream_input_cls; + struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail; }; @@ -138,6 +180,91 @@ struct cb_cls_pack }; +/** + * @brief Create a new handle for a stream request + * + * @param rps_handle The rps handle + * @param num_peers The number of desired peers + * @param ready_cb The callback to be called, once all peers are ready + * @param cls The colsure to provide to the callback + * + * @return The handle to the stream request + */ +static struct GNUNET_RPS_StreamRequestHandle * +new_stream_request (struct GNUNET_RPS_Handle *rps_handle, + uint64_t num_peers, + GNUNET_RPS_NotifyReadyCB ready_cb, + void *cls) +{ + struct GNUNET_RPS_StreamRequestHandle *srh; + + srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle); + + srh->rps_handle = rps_handle; + srh->num_peers_left = num_peers; + srh->ready_cb = ready_cb; + srh->ready_cb_cls = cls; + GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head, + rps_handle->stream_requests_tail, + srh); + + return srh; +} + + +/** + * @brief Remove the given stream request from the list of requests and memory + * + * @param srh The request to be removed + * @param srh_head Head of the DLL to remove request from + * @param srh_tail Tail of the DLL to remove request from + */ +static void +remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh, + struct GNUNET_RPS_StreamRequestHandle *srh_head, + struct GNUNET_RPS_StreamRequestHandle *srh_tail) +{ + GNUNET_CONTAINER_DLL_remove (srh_head, + srh_tail, + srh); + + GNUNET_free (srh); +} + + +/** + * @brief Create new request handle + * + * @param rps_handle Handle to the service + * @param num_requests Number of requests + * @param ready_cb Callback + * @param cls Closure + * + * @return The newly created request handle + */ +static struct GNUNET_RPS_Request_Handle * +new_request_handle (struct GNUNET_RPS_Handle *rps_handle, + uint64_t num_requests, + struct RPS_Sampler *sampler, + GNUNET_RPS_NotifyReadyCB ready_cb, + void *cls) +{ + struct GNUNET_RPS_Request_Handle *rh; + + rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); + rh->rps_handle = rps_handle; + rh->id = rps_handle->current_request_id++; + rh->num_requests = num_requests; + rh->sampler = sampler; + rh->ready_cb = ready_cb; + rh->ready_cb_cls = cls; + GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + + return rh; +} + + /** * @brief Send a request to the service. * @@ -304,24 +431,27 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, * @param cls a closure that will be given to the callback * @param ready_cb the callback called when the peers are available */ -void +struct GNUNET_RPS_StreamRequestHandle * GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, uint32_t num_peers, GNUNET_RPS_NotifyReadyCB stream_input_cb, void *cls) { + struct GNUNET_RPS_StreamRequestHandle *srh; struct GNUNET_MQ_Envelope *ev; struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; + srh = new_stream_request (rps_handle, + num_peers, /* num requests */ + stream_input_cb, + cls); LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requests %" PRIu32 " biased stream updates\n", num_peers); - rps_handle->stream_input_cb = stream_input_cb; - rps_handle->stream_input_cls = cls; ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); - msg->num_peers = htonl (num_peers); GNUNET_MQ_send (rps_handle->mq, ev); + return srh; } @@ -378,6 +508,41 @@ handle_view_update (void *cls, } +/** + * @brief Send message to service that this client does not want to receive + * further updates from the biased peer stream + * + * @param rps_handle The handle representing the service to the client + */ +static void +cancel_stream (struct GNUNET_RPS_Handle *rps_handle) +{ + struct GNUNET_MQ_Envelope *ev; + + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL); + GNUNET_MQ_send (rps_handle->mq, ev); +} + + +/** + * @brief Cancel a specific request for updates from the biased peer stream + * + * @param srh The request handle to cancel + */ +void +GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh) +{ + struct GNUNET_RPS_Handle *rps_handle; + + rps_handle = srh->rps_handle; + GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head, + rps_handle->stream_requests_tail, + srh); + GNUNET_free (srh); + if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle); +} + + /** * This function is called, when the service sends another peer from the biased * stream. @@ -420,16 +585,71 @@ handle_stream_input (void *cls, { struct GNUNET_RPS_Handle *h = cls; const struct GNUNET_PeerIdentity *peers; + /* The following two pointers are used to prevent that new handles are + * inserted into the DLL, that is currently iterated over, from within a call + * to that handler_cb, are executed and in turn again add themselves to the + * iterated DLL infinitely */ + struct GNUNET_RPS_StreamRequestHandle *srh_head_tmp; + struct GNUNET_RPS_StreamRequestHandle *srh_tail_tmp; + uint64_t num_peers; + uint64_t num_peers_return; - /* Give the peers back */ + peers = (struct GNUNET_PeerIdentity *) &msg[1]; + num_peers = ntohl (msg->num_peers); LOG (GNUNET_ERROR_TYPE_DEBUG, - "New peer of %" PRIu64 " biased input stream\n", - ntohl (msg->num_peers)); + "Received %" PRIu64 " peer(s) from stream input.\n", + num_peers); + srh_head_tmp = h->stream_requests_head; + srh_tail_tmp = h->stream_requests_tail; + h->stream_requests_head = NULL; + h->stream_requests_tail = NULL; + for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp; + NULL != srh_iter; + srh_iter = srh_iter->next) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Calling srh - left: %" PRIu64 "\n", + srh_iter->num_peers_left); + if (0 == srh_iter->num_peers_left) /* infinite updates */ + { + num_peers_return = num_peers; + } + else if (num_peers > srh_iter->num_peers_left) + { + num_peers_return = num_peers - srh_iter->num_peers_left; + } + else /* num_peers <= srh_iter->num_peers_left */ + { + num_peers_return = srh_iter->num_peers_left - num_peers; + } + srh_iter->ready_cb (srh_iter->ready_cb_cls, + num_peers_return, + peers); + if (0 == srh_iter->num_peers_left) ; + else if (num_peers_return >= srh_iter->num_peers_left) + { + remove_stream_request (srh_iter, + srh_head_tmp, + srh_tail_tmp); + } + else + { + srh_iter->num_peers_left -= num_peers_return; + } + } + for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp; + NULL != srh_iter; + srh_iter = srh_iter->next) + { + GNUNET_CONTAINER_DLL_insert (h->stream_requests_head, + h->stream_requests_tail, + srh_iter); + } - peers = (struct GNUNET_PeerIdentity *) &msg[1]; - GNUNET_assert (NULL != h); - GNUNET_assert (NULL != h->stream_input_cb); - h->stream_input_cb (h->stream_input_cls, ntohl (msg->num_peers), peers); + if (NULL == h->stream_requests_head) + { + cancel_stream (h); + } } @@ -524,39 +744,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) } -/** - * @brief Create new request handle - * - * @param rps_handle Handle to the service - * @param num_requests Number of requests - * @param ready_cb Callback - * @param cls Closure - * - * @return The newly created request handle - */ -static struct GNUNET_RPS_Request_Handle * -new_request_handle (struct GNUNET_RPS_Handle *rps_handle, - uint64_t num_requests, - struct RPS_Sampler *sampler, - GNUNET_RPS_NotifyReadyCB ready_cb, - void *cls) -{ - struct GNUNET_RPS_Request_Handle *rh; - - rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); - rh->rps_handle = rps_handle; - rh->id = rps_handle->current_request_id++; - rh->num_requests = num_requests; - rh->sampler = sampler; - rh->ready_cb = ready_cb; - rh->ready_cb_cls = cls; - GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); - - return rh; -} - - /** * Request n random peers. * @@ -646,9 +833,9 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers, */ struct GNUNET_RPS_Request_Handle * GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, - uint32_t num_req_peers, - GNUNET_RPS_NotifyReadyCB ready_cb, - void *cls) + uint32_t num_req_peers, + GNUNET_RPS_NotifyReadyCB ready_cb, + void *cls) { struct GNUNET_RPS_Request_Handle *rh; -- 2.25.1