X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Frps%2Frps_api.c;h=7d0674aff12d10ccb2240368bd93d6184421c29f;hb=7a5a724a6f96baf80d2226326124aa01c58ad3fe;hp=e4f4db506a1d02044112263bd5501ecb1a572518;hpb=a38275062455b7991c6a8536db38a55135cefff2;p=oweals%2Fgnunet.git diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index e4f4db506..7d0674aff 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -25,7 +25,7 @@ #include "gnunet_util_lib.h" #include "rps.h" #include "gnunet_rps_service.h" -#include "gnunet-service-rps_sampler.h" +#include "rps-sampler_client.h" #include @@ -83,16 +83,6 @@ struct GNUNET_RPS_Handle */ struct GNUNET_MQ_Handle *mq; - /** - * Array of Request_Handles. - */ - struct GNUNET_CONTAINER_MultiHashMap32 *req_handlers; - - /** - * The id of the last request. - */ - uint32_t current_request_id; - /** * @brief Callback called on each update of the view */ @@ -130,11 +120,6 @@ struct GNUNET_RPS_Request_Handle */ struct GNUNET_RPS_Handle *rps_handle; - /** - * The id of the request. - */ - uint32_t id; - /** * The number of requested peers. */ @@ -145,6 +130,17 @@ struct GNUNET_RPS_Request_Handle */ struct RPS_Sampler *sampler; + /** + * @brief Request handle of the request to the sampler - needed to cancel the request + */ + struct RPS_SamplerRequestHandle *sampler_rh; + + /** + * @brief Request handle of the request of the biased stream of peers - + * needed to cancel the request + */ + struct GNUNET_RPS_StreamRequestHandle *srh; + /** * The callback to be called when we receive an answer. */ @@ -233,160 +229,86 @@ remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh, /** - * @brief Create new request handle - * - * @param rps_handle Handle to the service - * @param num_requests Number of requests - * @param ready_cb Callback - * @param cls Closure + * @brief Called once the sampler has collected all requested peers. * - * @return The newly created request handle - */ -static struct GNUNET_RPS_Request_Handle * -new_request_handle (struct GNUNET_RPS_Handle *rps_handle, - uint64_t num_requests, - struct RPS_Sampler *sampler, - GNUNET_RPS_NotifyReadyCB ready_cb, - void *cls) -{ - struct GNUNET_RPS_Request_Handle *rh; - - rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); - rh->rps_handle = rps_handle; - rh->id = rps_handle->current_request_id++; - rh->num_requests = num_requests; - rh->sampler = sampler; - rh->ready_cb = ready_cb; - rh->ready_cb_cls = cls; - GNUNET_CONTAINER_multihashmap32_put (rps_handle->req_handlers, rh->id, rh, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); - - return rh; -} - - -/** - * @brief Send a request to the service. + * Calls the callback provided by the client with the corresponding cls. * - * @param h rps handle - * @param id id of the request - * @param num_req_peers number of peers + * @param peers The array of @a num_peers that has been returned. + * @param num_peers The number of peers that have been returned + * @param cls The #GNUNET_RPS_Request_Handle */ void -send_request (const struct GNUNET_RPS_Handle *h, - uint32_t id, - uint32_t num_req_peers) +peers_ready_cb (const struct GNUNET_PeerIdentity *peers, + uint32_t num_peers, + void *cls) { - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_RPS_CS_RequestMessage *msg; + struct GNUNET_RPS_Request_Handle *rh = cls; - ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST); - msg->num_peers = htonl (num_req_peers); - msg->id = htonl (id); - GNUNET_MQ_send (h->mq, ev); + rh->ready_cb (rh->ready_cb_cls, + num_peers, + peers); + // TODO cleanup, sampler, rh, cancel stuff + // TODO screw this function. We can give the cb,cls directly to the sampler. } -/** - * @brief Iterator function over pending requests - * - * Implements #GNUNET_CONTAINER_HashMapIterator32 - * - * @param cls rps handle - * @param key id of the request - * @param value request handle - * - * @return GNUNET_YES to continue iteration - */ -int -resend_requests_iterator (void *cls, uint32_t key, void *value) -{ - const struct GNUNET_RPS_Handle *h = cls; - const struct GNUNET_RPS_Request_Handle *req_handle = value; - (void) key; - - send_request (h, req_handle->id, req_handle->num_requests); - return GNUNET_YES; /* continue iterating */ -} /** - * @brief Resend all pending requests - * - * This is used to resend all pending requests after the client - * reconnected to the service, because the service cancels all - * pending requests after reconnection. + * @brief Callback to collect the peers from the biased stream and put those + * into the sampler. * - * @param h rps handle + * @param cls The #GNUNET_RPS_Request_Handle + * @param num_peers The number of peer that have been returned + * @param peers The array of @a num_peers that have been returned */ void -resend_requests (struct GNUNET_RPS_Handle *h) -{ - GNUNET_CONTAINER_multihashmap32_iterate (h->req_handlers, - resend_requests_iterator, - h); -} - - -/** - * This function is called, when the service replies to our request. - * It verifies that @a msg is well-formed. - * - * @param cls the closure - * @param msg the message - * @return #GNUNET_OK if @a msg is well-formed - */ -static int -check_reply (void *cls, - const struct GNUNET_RPS_CS_ReplyMessage *msg) +collect_peers_cb (void *cls, + uint64_t num_peers, + const struct GNUNET_PeerIdentity *peers) { - uint16_t msize = ntohs (msg->header.size); - uint32_t num_peers = ntohl (msg->num_peers); - (void) cls; + struct GNUNET_RPS_Request_Handle *rh = cls; - msize -= sizeof (struct GNUNET_RPS_CS_ReplyMessage); - if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) || - (msize % sizeof (struct GNUNET_PeerIdentity) != 0) ) + for (uint64_t i = 0; i < num_peers; i++) { - GNUNET_break (0); - return GNUNET_SYSERR; + RPS_sampler_update (rh->sampler, &peers[i]); } - return GNUNET_OK; } /** - * This function is called, when the service replies to our request. - * It calls the callback the caller gave us with the provided closure - * and disconnects afterwards. + * @brief Create new request handle * - * @param cls the closure - * @param msg the message + * @param rps_handle Handle to the service + * @param num_requests Number of requests + * @param ready_cb Callback + * @param cls Closure + * + * @return The newly created request handle */ -static void -handle_reply (void *cls, - const struct GNUNET_RPS_CS_ReplyMessage *msg) +static struct GNUNET_RPS_Request_Handle * +new_request_handle (struct GNUNET_RPS_Handle *rps_handle, + uint64_t num_requests, + GNUNET_RPS_NotifyReadyCB ready_cb, + void *cls) { - struct GNUNET_RPS_Handle *h = cls; - struct GNUNET_PeerIdentity *peers; struct GNUNET_RPS_Request_Handle *rh; - uint32_t id; - /* Give the peers back */ - id = ntohl (msg->id); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Service replied with %" PRIu32 " peers for id %" PRIu32 "\n", - ntohl (msg->num_peers), - id); + rh = GNUNET_new (struct GNUNET_RPS_Request_Handle); + rh->rps_handle = rps_handle; + rh->num_requests = num_requests; + rh->sampler = RPS_sampler_mod_init (num_requests, + GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff + rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler, + num_requests, + peers_ready_cb, + rh); + rh->srh = GNUNET_RPS_stream_request (rps_handle, + 0, /* infinite updates */ + collect_peers_cb, + rh); /* cls */ + rh->ready_cb = ready_cb; + rh->ready_cb_cls = cls; - peers = (struct GNUNET_PeerIdentity *) &msg[1]; - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, id)); - rh = GNUNET_CONTAINER_multihashmap32_get (h->req_handlers, id); - GNUNET_assert (NULL != rh); - GNUNET_assert (rh->num_requests == ntohl (msg->num_peers)); - GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, id); - rh->ready_cb (rh->ready_cb_cls, - ntohl (msg->num_peers), - peers); + return rh; } @@ -422,6 +344,20 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, } +void +GNUNET_RPS_view_request_cancel (struct GNUNET_RPS_Handle *rps_handle) +{ + struct GNUNET_MQ_Envelope *ev; + + GNUNET_assert (NULL != rps_handle->view_update_cb); + + rps_handle->view_update_cb = NULL; + + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL); + GNUNET_MQ_send (rps_handle->mq, ev); +} + + /** * Request biased stream of peers that are being put into the sampler * @@ -683,7 +619,6 @@ mq_error_handler (void *cls, reconnect (h); /* Resend all pending request as the service destroyed its knowledge * about them */ - resend_requests (h); } @@ -694,10 +629,6 @@ static void reconnect (struct GNUNET_RPS_Handle *h) { struct GNUNET_MQ_MessageHandler mq_handlers[] = { - GNUNET_MQ_hd_var_size (reply, - GNUNET_MESSAGE_TYPE_RPS_CS_REPLY, - struct GNUNET_RPS_CS_ReplyMessage, - h), GNUNET_MQ_hd_var_size (view_update, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY, struct GNUNET_RPS_CS_DEBUG_ViewReply, @@ -731,7 +662,6 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) struct GNUNET_RPS_Handle *h; h = GNUNET_new (struct GNUNET_RPS_Handle); - h->current_request_id = 0; h->cfg = cfg; reconnect (h); if (NULL == h->mq) @@ -739,89 +669,10 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) GNUNET_free (h); return NULL; } - h->req_handlers = GNUNET_CONTAINER_multihashmap32_create (4); return h; } -/** - * Request n random peers. - * - * @param rps_handle handle to the rps service - * @param num_req_peers number of peers we want to receive - * @param ready_cb the callback called when the peers are available - * @param cls closure given to the callback - * @return a handle to cancel this request - */ -struct GNUNET_RPS_Request_Handle * -GNUNET_RPS_request_peers_2 (struct GNUNET_RPS_Handle *rps_handle, - uint32_t num_req_peers, - GNUNET_RPS_NotifyReadyCB ready_cb, - void *cls) -{ - struct GNUNET_RPS_Request_Handle *rh; - - rh = new_request_handle (rps_handle, - num_req_peers, - NULL, /* no sampler needed */ - ready_cb, - cls); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Requesting %" PRIu32 " peers with id %" PRIu32 "\n", - num_req_peers, - rh->id); - - send_request (rps_handle, rh->id, num_req_peers); - return rh; -} - - -/** - * @brief Callback to collect the peers from the biased stream and put those - * into the sampler. - * - * @param cls The #GNUNET_RPS_Request_Handle - * @param num_peers The number of peer that have been returned - * @param peers The array of @a num_peers that have been returned - */ -void -collect_peers_cb (void *cls, - uint64_t num_peers, - const struct GNUNET_PeerIdentity *peers) -{ - struct GNUNET_RPS_Request_Handle *rh = cls; - - for (uint64_t i = 0; i < num_peers; i++) - { - RPS_sampler_update (rh->sampler, &peers[i]); - } -} - - -/** - * @brief Called once the sampler has collected all requested peers. - * - * Calls the callback provided by the client with the corresponding cls. - * - * @param peers The array of @a num_peers that has been returned. - * @param num_peers The number of peers that have been returned - * @param cls The #GNUNET_RPS_Request_Handle - */ -void -peers_ready_cb (const struct GNUNET_PeerIdentity *peers, - uint32_t num_peers, - void *cls) -{ - struct GNUNET_RPS_Request_Handle *rh = cls; - - rh->ready_cb (rh->ready_cb_cls, - num_peers, - peers); - // TODO cleanup, sampler, rh, cancel stuff - // TODO screw this function. We can give the cb,cls directly to the sampler. -} - /** * Request n random peers. * @@ -841,19 +692,9 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, rh = new_request_handle (rps_handle, num_req_peers, - RPS_sampler_mod_init (num_req_peers, - GNUNET_TIME_UNIT_SECONDS), // TODO remove this time-stuff ready_cb, cls); - RPS_sampler_get_n_rand_peers (rh->sampler, - num_req_peers, - peers_ready_cb, - rh); - GNUNET_RPS_stream_request (rps_handle, - 0, /* infinite updates */ - collect_peers_cb, - rh); /* cls */ return rh; } @@ -1022,20 +863,21 @@ void GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) { struct GNUNET_RPS_Handle *h; - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_RPS_CS_RequestCancelMessage*msg; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Cancelling request with id %" PRIu32 "\n", - rh->id); h = rh->rps_handle; - GNUNET_assert (GNUNET_CONTAINER_multihashmap32_contains (h->req_handlers, - rh->id)); - GNUNET_CONTAINER_multihashmap32_remove_all (h->req_handlers, rh->id); - ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL); - msg->id = htonl (rh->id); - GNUNET_MQ_send (rh->rps_handle->mq, ev); + if (NULL != rh->srh) + { + remove_stream_request (rh->srh, + h->stream_requests_head, + h->stream_requests_tail); + } + if (NULL == h->stream_requests_head) cancel_stream(h); + if (NULL != rh->sampler_rh) + { + RPS_sampler_request_cancel (rh->sampler_rh); + } + RPS_sampler_destroy (rh->sampler); + GNUNET_free (rh); } @@ -1048,10 +890,16 @@ void GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) { GNUNET_MQ_destroy (h->mq); - if (0 < GNUNET_CONTAINER_multihashmap32_size (h->req_handlers)) + if (NULL != h->stream_requests_head) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Still waiting for replies\n"); + } + if (NULL != h->view_update_cb) + { LOG (GNUNET_ERROR_TYPE_WARNING, - "Still waiting for requests\n"); - GNUNET_CONTAINER_multihashmap32_destroy (h->req_handlers); + "Still waiting for view updates\n"); + } GNUNET_free (h); }