X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Frps%2Fgnunet-service-rps_sampler.c;h=b65dd7c47f3cdbaca29d4cccf78f9f67feab55f9;hb=1a3f487d25259afe2dacd784c7e088a307113b62;hp=d182894c00e434ea3322e7922b53195df8a95b53;hpb=8215376b2d1b4a3d95a0cf1ba474cf4be437c1b0;p=oweals%2Fgnunet.git diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index d182894c0..b65dd7c47 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) + Copyright (C) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** @@ -28,11 +28,15 @@ #include "rps.h" #include "gnunet-service-rps_sampler.h" +#include "gnunet-service-rps_sampler_elem.h" #include #include -#define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__) +#include "rps-test_util.h" + +#define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler",__VA_ARGS__) + // multiple 'clients'? @@ -49,45 +53,85 @@ // TODO care about invalid input of the caller (size 0 or less...) -enum RPS_SamplerEmpty -{ - NOT_EMPTY = 0x0, - EMPTY = 0x1 -}; +/** + * Callback that is called from _get_rand_peer() when the PeerID is ready. + * + * @param cls the closure given alongside this function. + * @param id the PeerID that was returned + */ +typedef void +(*RPS_sampler_rand_peer_ready_cont) (void *cls, + const struct GNUNET_PeerIdentity *id); + /** - * A sampler element sampling one PeerID at a time. + * Closure for #sampler_mod_get_rand_peer() and #sampler_get_rand_peer */ -struct RPS_SamplerElement +struct GetPeerCls { /** - * Min-wise linear permutation used by this sampler. - * - * This is an key later used by a hmac. + * DLL */ - struct GNUNET_CRYPTO_AuthKey auth_key; + struct GetPeerCls *next; + struct GetPeerCls *prev; /** - * The PeerID this sampler currently samples. + * The #RPS_SamplerRequestHandle this single request belongs to. */ - struct GNUNET_PeerIdentity peer_id; + struct RPS_SamplerRequestHandle *req_handle; /** - * The according hash value of this PeerID. + * The task for this function. */ - struct GNUNET_HashCode peer_id_hash; + struct GNUNET_SCHEDULER_Task *get_peer_task; /** - * Time of last request. + * The callback */ - struct GNUNET_TIME_Absolute last_request; - + RPS_sampler_rand_peer_ready_cont cont; + /** - * Flag that indicates that we are not holding a valid PeerID right now. + * The closure to the callback @e cont */ - enum RPS_SamplerEmpty is_empty; + void *cont_cls; + + /** + * The address of the id to be stored at + */ + struct GNUNET_PeerIdentity *id; }; + +/** + * Type of function used to differentiate between modified and not modified + * Sampler. + */ +typedef void +(*RPS_get_peers_type) (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + +/** + * Get one random peer out of the sampled peers. + * + * We might want to reinitialise this sampler after giving the + * corrsponding peer to the client. + * Only used internally + */ +static void +sampler_get_rand_peer (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + +/** + * Get one random peer out of the sampled peers. + * + * We might want to reinitialise this sampler after giving the + * corrsponding peer to the client. + */ +static void +sampler_mod_get_rand_peer (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + + /** * Sampler with its own array of SamplerElements */ @@ -100,42 +144,89 @@ struct RPS_Sampler //size_t size; /** - * All Samplers in one array. + * All sampler elements in one array. */ struct RPS_SamplerElement **sampler_elements; /** - * Index to a sampler element. + * Maximum time a round takes * - * Gets cycled on every hist_request. + * Used in the context of RPS + */ + struct GNUNET_TIME_Relative max_round_interval; + + /** + * Stores the function to return peers. Which one it is depends on whether + * the Sampler is the modified one or not. + */ + RPS_get_peers_type get_peers; + + /** + * Head and tail for the DLL to store the #RPS_SamplerRequestHandle + */ + struct RPS_SamplerRequestHandle *req_handle_head; + struct RPS_SamplerRequestHandle *req_handle_tail; + + #ifdef TO_FILE + /** + * File name to log to + */ + char *file_name; + #endif /* TO_FILE */ +}; + +/** + * Closure to _get_n_rand_peers_ready_cb() + */ +struct RPS_SamplerRequestHandle +{ + /** + * DLL + */ + struct RPS_SamplerRequestHandle *next; + struct RPS_SamplerRequestHandle *prev; + + /** + * Number of peers we are waiting for. + */ + uint32_t num_peers; + + /** + * Number of peers we currently have. */ - uint64_t sampler_elem_index; + uint32_t cur_num_peers; /** - * Callback to be called when a peer gets inserted into a sampler. + * Pointer to the array holding the ids. */ - RPS_sampler_insert_cb insert_cb; + struct GNUNET_PeerIdentity *ids; /** - * Closure to the insert_cb. + * Head and tail for the DLL to store the tasks for single requests */ - void *insert_cls; + struct GetPeerCls *gpc_head; + struct GetPeerCls *gpc_tail; /** - * Callback to be called when a peer gets inserted into a sampler. + * Sampler. */ - RPS_sampler_remove_cb remove_cb; + struct RPS_Sampler *sampler; /** - * Closure to the remove_cb. + * Callback to be called when all ids are available. */ - void *remove_cls; + RPS_sampler_n_rand_peers_ready_cb callback; + + /** + * Closure given to the callback + */ + void *cls; }; -/** - * Global sampler variable. - */ -struct RPS_Sampler *sampler; +///** +// * Global sampler variable. +// */ +//struct RPS_Sampler *sampler; /** @@ -156,141 +247,64 @@ static size_t max_size; /** * Inedex to the sampler element that is the next to be returned */ -static uint64_t client_get_index; +static uint32_t client_get_index; /** - * Reinitialise a previously initialised sampler element. + * Callback to _get_rand_peer() used by _get_n_rand_peers(). * - * @param sampler pointer to the memory that keeps the value. + * Checks whether all n peers are available. If they are, + * give those back. */ - static void -RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el) +static void +check_n_peers_ready (void *cls, + const struct GNUNET_PeerIdentity *id) { - sampler_el->is_empty = EMPTY; + struct RPS_SamplerRequestHandle *req_handle = cls; - // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()... - GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG, - &(sampler_el->auth_key.key), - GNUNET_CRYPTO_HASH_LENGTH); + req_handle->cur_num_peers++; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got %" PRIX32 ". of %" PRIX32 " peers\n", + req_handle->cur_num_peers, req_handle->num_peers); - sampler_el->last_request = GNUNET_TIME_UNIT_FOREVER_ABS; + if (req_handle->num_peers == req_handle->cur_num_peers) + { /* All peers are ready -- return those to the client */ + GNUNET_assert (NULL != req_handle->callback); - /* We might want to keep the previous peer */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "returning %" PRIX32 " peers to the client\n", + req_handle->num_peers); + req_handle->callback (req_handle->cls, req_handle->ids, req_handle->num_peers); - //GNUNET_CRYPTO_hmac(&sampler_el->auth_key, sampler_el->peer_id, - // sizeof(struct GNUNET_PeerIdentity), - // &sampler_el->peer_id_hash); + RPS_sampler_request_cancel (req_handle); + } } /** - * (Re)Initialise given Sampler with random min-wise independent function. - * - * In this implementation this means choosing an auth_key for later use in - * a hmac at random. + * Get the size of the sampler. * - * @return a newly created RPS_SamplerElement which currently holds no id. + * @param sampler the sampler to return the size of. + * @return the size of the sampler */ - struct RPS_SamplerElement * -RPS_sampler_elem_create (void) +unsigned int +RPS_sampler_get_size (struct RPS_Sampler *sampler) { - struct RPS_SamplerElement *s; - - s = GNUNET_new (struct RPS_SamplerElement); - - RPS_sampler_elem_reinit (s); - LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with empty PeerID\n"); - - return s; -} - - -/** - * Input an PeerID into the given sampler. - */ - static void -RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_PeerIdentity *other, - RPS_sampler_insert_cb insert_cb, void *insert_cls, - RPS_sampler_remove_cb remove_cb, void *remove_cls) -{ - struct GNUNET_HashCode other_hash; - - if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(other, &(s_elem->peer_id)) ) - { - LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n", - GNUNET_i2s(other)); - LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n", - GNUNET_i2s(&(s_elem->peer_id))); - } - else - { - GNUNET_CRYPTO_hmac(&s_elem->auth_key, - other, - sizeof(struct GNUNET_PeerIdentity), - &other_hash); - - if ( EMPTY == s_elem->is_empty ) - { // Or whatever is a valid way to say - // "we have no PeerID at the moment" - LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (was empty previously).\n", - GNUNET_i2s(other)); - s_elem->peer_id = *other; - //s_elem->peer_id = other; - s_elem->peer_id_hash = other_hash; - if (NULL != sampler->insert_cb) - { - sampler->insert_cb(sampler->insert_cls, &(s_elem->peer_id)); - } - } - else if ( 0 > GNUNET_CRYPTO_hash_cmp(&other_hash, &s_elem->peer_id_hash) ) - { - LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n", - GNUNET_i2s(other)); - LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n", - GNUNET_i2s(&s_elem->peer_id)); - - if ( NULL != sampler->remove_cb ) - { - LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n", - GNUNET_i2s(&s_elem->peer_id)); - sampler->remove_cb(sampler->remove_cls, &s_elem->peer_id); - } - - memcpy(&s_elem->peer_id, other, sizeof(struct GNUNET_PeerIdentity)); - //s_elem->peer_id = other; - s_elem->peer_id_hash = other_hash; - - if ( NULL != sampler->insert_cb ) - { - LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n", - GNUNET_i2s(&s_elem->peer_id)); - sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id); - } - } - else - { - LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n", - GNUNET_i2s(other)); - LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n", - GNUNET_i2s(&s_elem->peer_id)); - } - } - s_elem->is_empty = NOT_EMPTY; + return sampler->sampler_size; } /** * Grow or shrink the size of the sampler. * + * @param sampler the sampler to resize. * @param new_size the new size of the sampler */ - void -RPS_sampler_resize (unsigned int new_size) +static void +sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size) { unsigned int old_size; - uint64_t i; - struct RPS_SamplerElement **rem_list; + uint32_t i; // TODO check min and max size @@ -298,52 +312,93 @@ RPS_sampler_resize (unsigned int new_size) if (old_size > new_size) { /* Shrinking */ - /* Temporary store those to properly call the removeCB on those later */ - rem_list = GNUNET_malloc ((old_size - new_size) * sizeof (struct RPS_SamplerElement *)); - memcpy (rem_list, - &sampler->sampler_elements[new_size], - (old_size - new_size) * sizeof (struct RPS_SamplerElement *)); - - LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Shrinking sampler %d -> %d\n", old_size, new_size); - GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, - "SAMPLER: sampler->sampler_elements now points to %p\n", - sampler->sampler_elements); - - for (i = 0 ; i < old_size - new_size ; i++) - {/* Remove unneeded rest */ - LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". sampler\n", i); - if (NULL != sampler->remove_cb) - sampler->remove_cb (sampler->remove_cls, &rem_list[i]->peer_id); - GNUNET_free (rem_list[i]); + "Shrinking sampler %d -> %d\n", + old_size, + new_size); + + to_file (sampler->file_name, + "Shrinking sampler %d -> %d", + old_size, + new_size); + + for (i = new_size ; i < old_size ; i++) + { + to_file (sampler->file_name, + "-%" PRIu32 ": %s", + i, + sampler->sampler_elements[i]->file_name); } + + GNUNET_array_grow (sampler->sampler_elements, + sampler->sampler_size, + new_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sampler->sampler_elements now points to %p\n", + sampler->sampler_elements); + } else if (old_size < new_size) { /* Growing */ - LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing sampler %d -> %d\n", old_size, new_size); - GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size); LOG (GNUNET_ERROR_TYPE_DEBUG, - "SAMPLER: sampler->sampler_elements now points to %p\n", - sampler->sampler_elements); + "Growing sampler %d -> %d\n", + old_size, + new_size); - for ( i = old_size ; i < new_size ; i++ ) + to_file (sampler->file_name, + "Growing sampler %d -> %d", + old_size, + new_size); + + GNUNET_array_grow (sampler->sampler_elements, + sampler->sampler_size, + new_size); + + for (i = old_size ; i < new_size ; i++) { /* Add new sampler elements */ sampler->sampler_elements[i] = RPS_sampler_elem_create (); - if (NULL != sampler->insert_cb) - sampler->insert_cb (sampler->insert_cls, &sampler->sampler_elements[i]->peer_id); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "SAMPLER: Added %" PRIX64 ". sampler, now pointing to %p, contains %s\n", - i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id)); + + to_file (sampler->file_name, + "+%" PRIu32 ": %s", + i, + sampler->sampler_elements[i]->file_name); } } else { - LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n"); return; } - GNUNET_assert(sampler->sampler_size == new_size); - LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); // remove + GNUNET_assert (sampler->sampler_size == new_size); +} + + +/** + * Grow or shrink the size of the sampler. + * + * @param sampler the sampler to resize. + * @param new_size the new size of the sampler + */ +void +RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size) +{ + GNUNET_assert (0 < new_size); + sampler_resize (sampler, new_size); +} + + +/** + * Empty the sampler. + * + * @param sampler the sampler to empty. + * @param new_size the new size of the sampler + */ +static void +sampler_empty (struct RPS_Sampler *sampler) +{ + sampler_resize (sampler, 0); } @@ -351,59 +406,89 @@ RPS_sampler_resize (unsigned int new_size) * Initialise a tuple of sampler elements. * * @param init_size the size the sampler is initialised with - * @param id with which all newly created sampler elements are initialised - * @param ins_cb the callback that will be called on every PeerID that is - * newly inserted into a sampler element - * @param ins_cls the closure given to #ins_cb - * @param rem_cb the callback that will be called on every PeerID that is - * removed from a sampler element - * @param rem_cls the closure given to #rem_cb + * @param max_round_interval maximum time a round takes + * @return a handle to a sampler that consists of sampler elements. */ - void -RPS_sampler_init (size_t init_size, const struct GNUNET_PeerIdentity *id, - RPS_sampler_insert_cb ins_cb, void *ins_cls, - RPS_sampler_remove_cb rem_cb, void *rem_cls) +struct RPS_Sampler * +RPS_sampler_init (size_t init_size, + struct GNUNET_TIME_Relative max_round_interval) { - //struct RPS_Sampler *sampler; - //uint64_t i; + struct RPS_Sampler *sampler; /* Initialise context around extended sampler */ min_size = 10; // TODO make input to _samplers_init() max_size = 1000; // TODO make input to _samplers_init() - GNUNET_new_array (64, struct GNUNET_TIME_Relative); sampler = GNUNET_new (struct RPS_Sampler); - sampler->sampler_size = 0; - sampler->sampler_elements = NULL; - sampler->insert_cb = ins_cb; - sampler->insert_cls = ins_cls; - sampler->remove_cb = rem_cb; - sampler->remove_cls = rem_cls; + + #ifdef TO_FILE + sampler->file_name = create_file ("sampler-"); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Initialised sampler %s\n", + sampler->file_name); + #endif /* TO_FILE */ + + sampler->max_round_interval = max_round_interval; + sampler->get_peers = sampler_get_rand_peer; //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); - RPS_sampler_resize (init_size); - RPS_sampler_update_list (id); // no super nice desing but ok for the moment + RPS_sampler_resize (sampler, init_size); client_get_index = 0; //GNUNET_assert (init_size == sampler->sampler_size); + return sampler; +} + +/** + * Initialise a modified tuple of sampler elements. + * + * @param init_size the size the sampler is initialised with + * @param max_round_interval maximum time a round takes + * @return a handle to a sampler that consists of sampler elements. + */ +struct RPS_Sampler * +RPS_sampler_mod_init (size_t init_size, + struct GNUNET_TIME_Relative max_round_interval) +{ + struct RPS_Sampler *sampler; + + sampler = RPS_sampler_init (init_size, max_round_interval); + sampler->get_peers = sampler_mod_get_rand_peer; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Initialised modified sampler %s\n", + sampler->file_name); + to_file (sampler->file_name, + "This is a modified sampler"); + + return sampler; } /** * A fuction to update every sampler in the given list * + * @param sampler the sampler to update. * @param id the PeerID that is put in the sampler */ void -RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id) +RPS_sampler_update (struct RPS_Sampler *sampler, + const struct GNUNET_PeerIdentity *id) { - uint64_t i; + uint32_t i; + + to_file (sampler->file_name, + "Got %s", + GNUNET_i2s_full (id)); + + for (i = 0 ; i < sampler->sampler_size ; i++) + { + RPS_sampler_elem_next (sampler->sampler_elements[i], + id); + } - for ( i = 0 ; i < sampler->sampler_size ; i++ ) - RPS_sampler_elem_next (sampler->sampler_elements[i], id, - sampler->insert_cb, sampler->insert_cls, - sampler->remove_cb, sampler->remove_cls); } @@ -412,18 +497,25 @@ RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id) * * Used to get rid of a PeerID. * + * @param sampler the sampler to reinitialise a sampler element in. * @param id the id of the sampler elements to update. */ void -RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id) +RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler, + const struct GNUNET_PeerIdentity *id) { - uint64_t i; + uint32_t i; + struct RPS_SamplerElement *trash_entry; for ( i = 0 ; i < sampler->sampler_size ; i++ ) { if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) ) { - LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n"); + trash_entry = GNUNET_new (struct RPS_SamplerElement); + *trash_entry = *(sampler->sampler_elements[i]); + to_file (trash_entry->file_name, + "--- non-active"); RPS_sampler_elem_reinit (sampler->sampler_elements[i]); } } @@ -437,90 +529,128 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id) * corrsponding peer to the client. * Only used internally */ - const struct GNUNET_PeerIdentity * -RPS_sampler_get_rand_peer_ () +static void +sampler_get_rand_peer (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - uint64_t r_index; - const struct GNUNET_PeerIdentity *peer; // do we have to malloc that? + struct GetPeerCls *gpc = cls; + uint32_t r_index; + struct RPS_Sampler *sampler; - // TODO implement extra logic + gpc->get_peer_task = NULL; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + sampler = gpc->req_handle->sampler; /**; * Choose the r_index of the peer we want to return * at random from the interval of the gossip list */ - r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, + r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG, sampler->sampler_size); - //if ( EMPTY == sampler->sampler_elements[r_index]->is_empty ) - // // TODO schedule for later - // peer = NULL; - //else - peer = &(sampler->sampler_elements[r_index]->peer_id); - sampler->sampler_elements[r_index]->last_request = GNUNET_TIME_absolute_get(); - LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", GNUNET_i2s(peer)); + if (EMPTY == sampler->sampler_elements[r_index]->is_empty) + { + //LOG (GNUNET_ERROR_TYPE_DEBUG, + // "Not returning randomly selected, empty PeerID. - Rescheduling.\n"); + + /* FIXME no active wait - get notified, when new id arrives? + * Might also be a freshly emptied one. Others might still contain ids. + * Counter? + */ + gpc->get_peer_task = + GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1), + &sampler_get_rand_peer, + cls); + return; + } + + GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, + gpc->req_handle->gpc_tail, + gpc); + *gpc->id = sampler->sampler_elements[r_index]->peer_id; + gpc->cont (gpc->cont_cls, gpc->id); - return peer; + GNUNET_free (gpc); } /** - * Get n random peers out of the sampled peers. + * Get one random peer out of the sampled peers. * * We might want to reinitialise this sampler after giving the * corrsponding peer to the client. - * Random with or without consumption? - * Only used internally */ - const struct GNUNET_PeerIdentity * -RPS_sampler_get_n_rand_peers_ (uint64_t n) +static void +sampler_mod_get_rand_peer (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - if ( 0 == sampler->sampler_size ) + struct GetPeerCls *gpc = cls; + struct RPS_SamplerElement *s_elem; + struct GNUNET_TIME_Relative last_request_diff; + struct RPS_Sampler *sampler; + + gpc->get_peer_task = NULL; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + sampler = gpc->req_handle->sampler; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); + + /* Cycle the #client_get_index one step further */ + client_get_index = (client_get_index + 1) % sampler->sampler_size; + + s_elem = sampler->sampler_elements[client_get_index]; + *gpc->id = s_elem->peer_id; + GNUNET_assert (NULL != s_elem); + + if (EMPTY == s_elem->is_empty) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sgrp: List empty - Returning NULL\n"); - return NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, "Sampler_mod element empty, rescheduling.\n"); + GNUNET_assert (NULL == gpc->get_peer_task); + gpc->get_peer_task = + GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, + &sampler_mod_get_rand_peer, + cls); + return; } - else - { - // TODO check if we have too much (distinct) sampled peers - // If we are not ready yet maybe schedule for later - struct GNUNET_PeerIdentity *peers; - uint64_t i; - - peers = GNUNET_malloc (n * sizeof(struct GNUNET_PeerIdentity)); - for ( i = 0 ; i < n ; i++ ) { - //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements); - memcpy (&peers[i], RPS_sampler_get_rand_peer_ (), sizeof (struct GNUNET_PeerIdentity)); + /* Check whether we may use this sampler to give it back to the client */ + if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us) + { + last_request_diff = + GNUNET_TIME_absolute_get_difference (s_elem->last_client_request, + GNUNET_TIME_absolute_get ()); + /* We're not going to give it back now if it was + * already requested by a client this round */ + if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); + ///* How many time remains untile the next round has started? */ + //inv_last_request_diff = + // GNUNET_TIME_absolute_get_difference (last_request_diff, + // sampler->max_round_interval); + // add a little delay + /* Schedule it one round later */ + GNUNET_assert (NULL == gpc->get_peer_task); + gpc->get_peer_task = + GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, + &sampler_mod_get_rand_peer, + cls); + return; } - return peers; + // TODO add other reasons to wait here } -} + s_elem->last_client_request = GNUNET_TIME_absolute_get (); -/** - * Get one random peer out of the sampled peers. - * - * We might want to reinitialise this sampler after giving the - * corrsponding peer to the client. - * - * @return a random PeerID of the PeerIDs previously put into the sampler. - */ - const struct GNUNET_PeerIdentity * -RPS_sampler_get_rand_peer () -{ - struct GNUNET_PeerIdentity *peer; - - // use _get_rand_peer_ ? - peer = GNUNET_new (struct GNUNET_PeerIdentity); - *peer = sampler->sampler_elements[client_get_index]->peer_id; - RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]); - if ( client_get_index == sampler->sampler_size ) - client_get_index = 0; - else - client_get_index++; - return peer; + GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, + gpc->req_handle->gpc_tail, + gpc); + gpc->cont (gpc->cont_cls, gpc->id); + GNUNET_free (gpc); } @@ -531,56 +661,103 @@ RPS_sampler_get_rand_peer () * corrsponding peer to the client. * Random with or without consumption? * - * @return n random PeerIDs of the PeerIDs previously put into the sampler. + * @param sampler the sampler to get peers from. + * @param cb callback that will be called once the ids are ready. + * @param cls closure given to @a cb + * @param for_client #GNUNET_YES if result is used for client, + * #GNUNET_NO if used internally + * @param num_peers the number of peers requested */ - const struct GNUNET_PeerIdentity * -RPS_sampler_get_n_rand_peers (uint64_t n) +struct RPS_SamplerRequestHandle * +RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, + RPS_sampler_n_rand_peers_ready_cb cb, + void *cls, uint32_t num_peers) { - // use _get_rand_peers_ ? - if ( 0 == sampler->sampler_size ) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sgrp: List empty - Returning NULL\n"); + GNUNET_assert (0 != sampler->sampler_size); + if (0 == num_peers) return NULL; + + // TODO check if we have too much (distinct) sampled peers + uint32_t i; + struct RPS_SamplerRequestHandle *req_handle; + struct GetPeerCls *gpc; + + req_handle = GNUNET_new (struct RPS_SamplerRequestHandle); + req_handle->num_peers = num_peers; + req_handle->cur_num_peers = 0; + req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); + req_handle->sampler = sampler; + req_handle->callback = cb; + req_handle->cls = cls; + GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head, + sampler->req_handle_tail, + req_handle); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Scheduling requests for %" PRIu32 " peers\n", num_peers); + + for (i = 0 ; i < num_peers ; i++) + { + gpc = GNUNET_new (struct GetPeerCls); + gpc->req_handle = req_handle; + gpc->cont = check_n_peers_ready; + gpc->cont_cls = req_handle; + gpc->id = &req_handle->ids[i]; + + GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head, + req_handle->gpc_tail, + gpc); + // maybe add a little delay + gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, gpc); } - else + return req_handle; +} + +/** + * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. + * + * @param req_handle the handle to the request + */ +void +RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle) +{ + struct GetPeerCls *i; + + while (NULL != (i = req_handle->gpc_head) ) { - // TODO check if we have too much (distinct) sampled peers - // If we are not ready yet maybe schedule for later - struct GNUNET_PeerIdentity *peers; - const struct GNUNET_PeerIdentity *peer; - uint64_t i; - - peers = GNUNET_malloc (n * sizeof (struct GNUNET_PeerIdentity)); - - for ( i = 0 ; i < n ; i++ ) { - //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements); - peer = RPS_sampler_get_rand_peer (); - memcpy (&peers[i], peer, sizeof (struct GNUNET_PeerIdentity)); - //GNUNET_free (peer); - } - return peers; + GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head, + req_handle->gpc_tail, + i); + if (NULL != i->get_peer_task) + GNUNET_SCHEDULER_cancel (i->get_peer_task); + GNUNET_free (i); } + GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head, + req_handle->sampler->req_handle_tail, + req_handle); + GNUNET_free (req_handle); } /** * Counts how many Samplers currently hold a given PeerID. * + * @param sampler the sampler to count ids in. * @param id the PeerID to count. * * @return the number of occurrences of id. */ - uint64_t -RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id) + uint32_t +RPS_sampler_count_id (struct RPS_Sampler *sampler, + const struct GNUNET_PeerIdentity *id) { - uint64_t count; - uint64_t i; + uint32_t count; + uint32_t i; count = 0; for ( i = 0 ; i < sampler->sampler_size ; i++ ) { - if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id) + if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id) && EMPTY != sampler->sampler_elements[i]->is_empty) count++; } @@ -592,10 +769,17 @@ RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id) * Cleans the sampler. */ void -RPS_sampler_destroy () +RPS_sampler_destroy (struct RPS_Sampler *sampler) { - RPS_sampler_resize (0); - GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 0); + if (NULL != sampler->req_handle_head) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "There are still pending requests. Going to remove them.\n"); + while (NULL != sampler->req_handle_head) + RPS_sampler_request_cancel (sampler->req_handle_head); + } + sampler_empty (sampler); + GNUNET_free (sampler); } /* end of gnunet-service-rps.c */