From f23239f779a412049d064ad38e307b7a083baa10 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 27 Sep 2009 23:06:55 +0000 Subject: [PATCH] more work on fs service --- src/fs/fs.h | 13 + src/fs/gnunet-service-fs.c | 415 ++++++++++++++++++++++++++++- src/include/gnunet_container_lib.h | 1 + src/include/gnunet_core_service.h | 2 + 4 files changed, 422 insertions(+), 9 deletions(-) diff --git a/src/fs/fs.h b/src/fs/fs.h index 002ffb6b3..a9fa712c6 100644 --- a/src/fs/fs.h +++ b/src/fs/fs.h @@ -74,6 +74,19 @@ */ #define TTL_DECREMENT 5000 +/** + * Length of the P2P success tracker. Note that + * having a very long list can also hurt performance. + */ +#define P2P_SUCCESS_LIST_SIZE 8 + + +/** + * Length of the CS-2-P success tracker. Note that + * having a very long list can also hurt performance. + */ +#define CS2P_SUCCESS_LIST_SIZE 8 + /** * How long are we willing to wait for the datastore to be ready to * process a request for a query without priority? diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 0c27f58b9..ae52125ac 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -38,6 +38,7 @@ * - check that we decrement PIDs always where necessary (can wait) */ #include "platform.h" +#include #include "gnunet_core_service.h" #include "gnunet_datastore_service.h" #include "gnunet_peer_lib.h" @@ -407,7 +408,10 @@ struct PendingRequest /** * Pending transmission request with the core service for the target - * peer (for processing of 'replies_pending'). + * peer (for processing of 'replies_pending') or Handle for a + * pending query-request for P2P-transmission with the core service. + * If non-NULL, this request must be cancelled should this struct be + * destroyed! */ struct GNUNET_CORE_TransmitHandle *cth; @@ -435,6 +439,12 @@ struct PendingRequest */ GNUNET_HashCode query; + /** + * The task responsible for transmitting queries + * for this request. + */ + GNUNET_SCHEDULER_TaskIdentifier task; + /** * (Interned) Peer identifier (only valid if "client" is NULL) * that identifies a peer that gave us this request. @@ -607,6 +617,69 @@ struct ProcessReplyClosure }; +/** + * Information about a peer that we are connected to. + * We track data that is useful for determining which + * peers should receive our requests. + */ +struct ConnectedPeer +{ + + /** + * List of the last clients for which this peer + * successfully answered a query. + */ + struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE]; + + /** + * List of the last PIDs for which + * this peer successfully answered a query; + * We use 0 to indicate no successful reply. + */ + GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE]; + + /** + * Average delay between sending the peer a request and + * getting a reply (only calculated over the requests for + * which we actually got a reply). Calculated + * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n + */ + struct GNUNET_TIME_Relative avg_delay; + + /** + * Average priority of successful replies. Calculated + * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n + */ + double avg_priority; + + /** + * The peer's identity. + */ + GNUNET_PEER_Id pid; + + /** + * Number of requests we have currently pending + * with this peer (that is, requests that were + * transmitted so recently that we would not retransmit + * them right now). + */ + unsigned int pending_requests; + + /** + * Which offset in "last_p2p_replies" will be updated next? + * (we go round-robin). + */ + unsigned int last_p2p_replies_woff; + + /** + * Which offset in "last_client_replies" will be updated next? + * (we go round-robin). + */ + unsigned int last_client_replies_woff; + +}; + + /** * Our connection to the datastore. */ @@ -685,6 +758,11 @@ static struct ClientList *clients; */ static struct GNUNET_CONTAINER_Heap *requests_by_expiration; +/** + * Map of peer identifiers to "struct ConnectedPeer" (for that peer). + */ +static struct GNUNET_CONTAINER_MultiHashMap *connected_peers; + /** * Maximum number of requests (from other peers) that we're * willing to have pending at any given point in time. @@ -1456,6 +1534,279 @@ refresh_bloomfilter (unsigned int count, } +/** + * Closure used for "target_peer_select_cb". + */ +struct PeerSelectionContext +{ + /** + * The request for which we are selecting + * peers. + */ + struct PendingRequest *pr; + + /** + * Current "prime" target. + */ + struct GNUNET_PeerIdentity target; + + /** + * How much do we like this target? + */ + double target_score; + +}; + + +/** + * Function called for each connected peer to determine + * which one(s) would make good targets for forwarding. + * + * @param cls closure (struct PeerSelectionContext) + * @param key current key code (peer identity) + * @param value value in the hash map (struct ConnectedPeer) + * @return GNUNET_YES if we should continue to + * iterate, + * GNUNET_NO if not. + */ +static int +target_peer_select_cb (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct PeerSelectionContext *psc = cls; + // struct ConnectedPeer *cp = value; + double score; + // FIXME (CRITICAL: would always sent to same peer without this!) + // 1) check if we have already (recently) forwarded to this peer, if so, skip + // 2) calculate how much we'd like to forward to this peer + score = 0; + + // 3) store in closure + if (score > psc->target_score) + { + psc->target_score = score; + psc->target.hashPubKey = *key; + } + return GNUNET_YES; +} + + + + +/** + * We use a random delay to make the timing of requests + * less predictable. This function returns such a random + * delay. + * + * @return random delay to use for some request, between 0 and TTL_DECREMENT ms + */ +static struct GNUNET_TIME_Relative +get_processing_delay () +{ + return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + TTL_DECREMENT)); +} + + +/** + * Task that is run for each request with the + * goal of forwarding the associated query to + * other peers. The task should re-schedule + * itself to be re-run once the TTL has expired. + * (or at a later time if more peers should + * be queried earlier). + * + * @param cls the requests "struct PendingRequest*" + * @param tc task context (unused) + */ +static void +forward_request_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + + +/** + * We've selected a peer for forwarding of a query. + * Construct the message and then re-schedule the + * task to forward again to (other) peers. + * + * @param cls closure + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +transmit_request_cb (void *cls, + size_t size, + void *buf) +{ + struct PendingRequest *pr = cls; + uint16_t msize; + + pr->cth = NULL; + /* (1) check for timeout */ + if (NULL == buf) + { + /* timeout, try another peer immediately again */ + pr->task = GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_IDLE, + GNUNET_SCHEDULER_NO_TASK, + GNUNET_TIME_UNIT_ZERO, + &forward_request_task, + pr); + return 0; + } + /* (2) build query message */ + msize = 0; + // CRITICAL-FIXME! (nothing goes without this!) + /* (3) schedule job to do it again (or another peer, etc.) */ + pr->task = GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_IDLE, + GNUNET_SCHEDULER_NO_TASK, + get_processing_delay (), // FIXME! + &forward_request_task, + pr); + + return msize; +} + + +/** + * Function called after we've tried to reserve + * a certain amount of bandwidth for a reply. + * Check if we succeeded and if so send our query. + * + * @param cls the requests "struct PendingRequest*" + * @param peer identifies the peer + * @param latency current latency estimate, "FOREVER" if we have been + * disconnected + * @param bpm_in set to the current bandwidth limit (receiving) for this peer + * @param bpm_out set to the current bandwidth limit (sending) for this peer + * @param amount set to the amount that was actually reserved or unreserved + * @param preference current traffic preference for the given peer + */ +static void +target_reservation_cb (void *cls, + const struct + GNUNET_PeerIdentity * peer, + unsigned int bpm_in, + unsigned int bpm_out, + struct GNUNET_TIME_Relative + latency, int amount, + unsigned long long preference) +{ + struct PendingRequest *pr = cls; + uint32_t priority; + uint16_t size; + struct GNUNET_TIME_Relative maxdelay; + + GNUNET_assert (peer != NULL); + if ( (amount != DBLOCK_SIZE) || + (pr->cth != NULL) ) + { + /* try again later; FIXME: we may need to un-reserve "amount"? */ + pr->task = GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_IDLE, + GNUNET_SCHEDULER_NO_TASK, + get_processing_delay (), // FIXME: longer? + &forward_request_task, + pr); + return; + } + // (2) transmit, update ttl/priority + // FIXME: calculate priority, maxdelay, size properly! + priority = 0; + size = 60000; + maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT; + pr->cth = GNUNET_CORE_notify_transmit_ready (core, + priority, + maxdelay, + peer, + size, + &transmit_request_cb, + pr); + if (pr->cth == NULL) + { + /* try again later */ + pr->task = GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_IDLE, + GNUNET_SCHEDULER_NO_TASK, + get_processing_delay (), // FIXME: longer? + &forward_request_task, + pr); + } +} + + +/** + * Task that is run for each request with the + * goal of forwarding the associated query to + * other peers. The task should re-schedule + * itself to be re-run once the TTL has expired. + * (or at a later time if more peers should + * be queried earlier). + * + * @param cls the requests "struct PendingRequest*" + * @param tc task context (unused) + */ +static void +forward_request_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PendingRequest *pr = cls; + struct PeerSelectionContext psc; + + pr->task = GNUNET_SCHEDULER_NO_TASK; + if (pr->cth != NULL) + { + /* we're busy transmitting a result, wait a bit */ + pr->task = GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_IDLE, + GNUNET_SCHEDULER_NO_TASK, + get_processing_delay (), + &forward_request_task, + pr); + return; + } + /* (1) select target */ + psc.pr = pr; + psc.target_score = MINDOUBLE; + GNUNET_CONTAINER_multihashmap_iterate (connected_peers, + &target_peer_select_cb, + &psc); + if (psc.target_score == MINDOUBLE) + { + /* no possible target found, wait some time */ + pr->task = GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_IDLE, + GNUNET_SCHEDULER_NO_TASK, + get_processing_delay (), // FIXME: exponential back-off? or at least wait longer... + &forward_request_task, + pr); + return; + } + /* (2) reserve reply bandwidth */ + // FIXME: need a way to cancel; this + // async operation is problematic (segv-problematic) + // if "pr" is destroyed while it happens! + GNUNET_CORE_peer_configure (core, + &psc.target, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + -1, + DBLOCK_SIZE, // FIXME: make dependent on type? + 0, + &target_reservation_cb, + pr); +} + + /** * We're processing (local) results for a search request * from a (local) client. Pass applicable results to the @@ -1547,8 +1898,13 @@ process_local_get_result (void *cls, &pr->query, pr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - - // FIXME: trigger some processing NOW! + pr->task = GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_IDLE, + GNUNET_SCHEDULER_NO_TASK, + get_processing_delay (), + &forward_request_task, + pr); local_get_context_free (lgc); return; } @@ -1765,10 +2121,12 @@ destroy_pending_request (struct PendingRequest *pr) cl->tail, pr->crl_entry); } - if (NULL != pr->bf) - GNUNET_CONTAINER_bloomfilter_free (pr->bf); + if (GNUNET_SCHEDULER_NO_TASK != pr->task) + GNUNET_SCHEDULER_cancel (sched, pr->task); if (NULL != pr->cth) GNUNET_CORE_notify_transmit_ready_cancel (pr->cth); + if (NULL != pr->bf) + GNUNET_CONTAINER_bloomfilter_free (pr->bf); if (NULL != pr->th) GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th); while (NULL != (reply = pr->replies_pending)) @@ -1883,6 +2241,10 @@ shutdown_task (void *cls, requests_by_peer = NULL; GNUNET_CONTAINER_heap_destroy (requests_by_expiration); requests_by_expiration = NULL; + // FIXME: iterate over entries and free individually? + // (or do we get disconnect notifications?) + GNUNET_CONTAINER_multihashmap_destroy (connected_peers); + connected_peers = NULL; GNUNET_CONTAINER_multihashmap_destroy (ifm); ifm = NULL; while (NULL != (pos = indexed_files)) @@ -1917,6 +2279,29 @@ destroy_request (void *cls, } + +/** + * Method called whenever a given peer connects. + * + * @param cls closure, not used + * @param peer peer identity this notification is about + */ +static void +peer_connect_handler (void *cls, + const struct + GNUNET_PeerIdentity * peer) +{ + struct ConnectedPeer *cp; + + cp = GNUNET_malloc (sizeof (struct ConnectedPeer)); + cp->pid = GNUNET_PEER_intern (peer); + GNUNET_CONTAINER_multihashmap_put (connected_peers, + &peer->hashPubKey, + cp, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); +} + + /** * Method called whenever a peer disconnects. * @@ -1928,6 +2313,13 @@ peer_disconnect_handler (void *cls, const struct GNUNET_PeerIdentity * peer) { + struct ConnectedPeer *cp; + + cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, + &peer->hashPubKey); + GNUNET_PEER_change_rc (cp->pid, -1); + GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); + GNUNET_free (cp); GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer, &peer->hashPubKey, &destroy_request, @@ -1993,11 +2385,15 @@ forward_get_request (void *cls, eer); destroy_pending_request (eer); } - // FIXME: trigger actual forwarding NOW! + pr->task = GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_IDLE, + GNUNET_SCHEDULER_NO_TASK, + get_processing_delay (), + &forward_request_task, + pr); GNUNET_free (pgc); } - - /** * Transmit the given message by copying it to * the target buffer "buf". "buf" will be @@ -2831,7 +3227,7 @@ core_connect_task (void *cls, GNUNET_TIME_UNIT_FOREVER_REL, NULL, &core_start_cb, - NULL, + &peer_connect_handler, &peer_disconnect_handler, NULL, NULL, GNUNET_NO, @@ -2860,6 +3256,7 @@ run (void *cls, ifm = GNUNET_CONTAINER_multihashmap_create (128); requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config + connected_peers = GNUNET_CONTAINER_multihashmap_create (64); requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); read_index_list (); dsh = GNUNET_DATASTORE_connect (cfg, diff --git a/src/include/gnunet_container_lib.h b/src/include/gnunet_container_lib.h index b376af3c8..9756d0751 100644 --- a/src/include/gnunet_container_lib.h +++ b/src/include/gnunet_container_lib.h @@ -472,6 +472,7 @@ enum GNUNET_CONTAINER_MultiHashMapOption GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST }; + /** * Iterator over HashCodes. * diff --git a/src/include/gnunet_core_service.h b/src/include/gnunet_core_service.h index b5dceca3a..ca02f9874 100644 --- a/src/include/gnunet_core_service.h +++ b/src/include/gnunet_core_service.h @@ -197,6 +197,7 @@ void GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle); /** * Function called with statistics about the given peer. * + * @param cls closure * @param peer identifies the peer * @param latency current latency estimate, "FOREVER" if we have been * disconnected @@ -239,6 +240,7 @@ typedef void * @param info function to call with the resulting configuration information * @param info_cls closure for info */ +// FIXME: should return handle for cancellation! void GNUNET_CORE_peer_configure (struct GNUNET_CORE_Handle *handle, const struct GNUNET_PeerIdentity *peer, -- 2.25.1