From: Christian Grothoff Date: Fri, 11 Mar 2011 16:23:52 +0000 (+0000) Subject: stuff X-Git-Tag: initial-import-from-subversion-38251~18994 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=64821d4ae43b03b30de3dd136137598c0d5a2ab2;p=oweals%2Fgnunet.git stuff --- diff --git a/src/fs/gnunet-service-fs_new.c b/src/fs/gnunet-service-fs_new.c index 7ad1874f7..5e9fce754 100644 --- a/src/fs/gnunet-service-fs_new.c +++ b/src/fs/gnunet-service-fs_new.c @@ -228,9 +228,6 @@ GSF_test_get_load_too_high_ (uint32_t priority) } - - - /** * Handle P2P "PUT" message. * @@ -260,25 +257,6 @@ handle_p2p_put (void *cls, } -/** - * Decide with what weight we should forward the given - * request to the given peer. - * - * @param cp target peer - * @param pr request - */ -static void -plan (struct GSF_ConnectedPeer *cp, - struct GSF_PendingRequest *pr) -{ - GNUNET_CONTAINER_HeapCostType weight; - - weight = 0; - /* FIXME: calculate weight properly... */ - GSF_plan_add_ (cp, pr, weight); -} - - /** * We have a new request, consider forwarding it to the given * peer. @@ -296,7 +274,7 @@ consider_request_for_forwarding (void *cls, { struct GSF_PendingRequest *pr = cls; - plan (cp, pr); + GSF_plan_add_ (cp, pr); } @@ -466,7 +444,7 @@ consider_peer_for_forwarding (void *cls, { struct GSF_ConnectedPeer *cp = cls; - plan (cp, pr); + GSF_plan_add_ (cp, pr); return GNUNET_YES; } diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index db501b761..e7608653e 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c @@ -26,28 +26,130 @@ #include "platform.h" #include "gnunet-service-fs_cp.h" #include "gnunet-service-fs_pe.h" +#include "gnunet-service-fs_pr.h" /** - * Hash map from peer identities to GNUNET_CONTAINER_Heap's with - * pending requests as entries. + * Transmission plan for a peer. + */ +struct PeerPlan +{ + /** + * Heap with pending queries, smaller weights mean higher priority. + */ + struct GNUNET_CONTAINER_Heap *heap; + + /** + * Current transmission request handle. + */ + struct GSF_PeerTransmitHandle *pth; + + /** + * Peer for which this is the plan. + */ + struct GSF_ConnectedPeer *cp; + + /** + * Current task for executing the plan. + */ + GNUNET_SCHEDULER_TaskIdentifier task; +}; + + +/** + * Hash map from peer identities to PeerPlans. */ static struct GNUNET_CONTAINER_MultiHashMap *plans; /** - * Get the size of the request queue for the given peer. + * Figure out when and how to transmit to the given peer. + * + * @param cls the 'struct GSF_ConnectedPeer' for transmission + * @param tc scheduler context + */ +static void +schedule_peer_transmission (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + + +/** + * Function called to get a message for transmission. * - * @param cp connected peer to query - * @return number of entries in this peer's request queue + * @param cls closure + * @param buf_size number of bytes available in buf + * @param buf where to copy the message, NULL on error (peer disconnect) + * @return number of bytes copied to 'buf', can be 0 (without indicating an error) */ -static struct GNUNET_CONTAINER_Heap * -get_heap (const struct GSF_ConnectedPeer *cp) +static size_t +transmit_message_callback (void *cls, + size_t buf_size, + void *buf) { - struct GNUNET_PeerIdentity id; + struct PeerPlan *pp = cls; + struct GSF_PendingRequest *pr; + size_t msize; - GSF_connected_peer_get_identity_ (cp, &id); - return GNUNET_CONTAINER_multihashmap_get (plans, - &id.hashPubKey); + if (NULL == buf) + { + /* failed, try again... */ + pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); + return 0; + } + pr = GNUNET_CONTAINER_heap_peek (pp->heap); + msize = GSF_pending_request_get_message_ (pr, buf_size, buf); + if (msize > buf_size) + { + /* buffer to small (message changed), try again */ + pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); + return 0; + } + /* remove from root, add again elsewhere... */ + GNUNET_assert (pr == GNUNET_CONTAINER_heap_remove_root (pp->heap)); + GSF_plan_add_ (pp->cp, pr); + return msize; +} + + +/** + * Figure out when and how to transmit to the given peer. + * + * @param cls the 'struct PeerPlan' + * @param tc scheduler context + */ +static void +schedule_peer_transmission (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerPlan *pp = cls; + struct GSF_PendingRequest *pr; + size_t msize; + struct GNUNET_TIME_Relative delay; + + pp->task = GNUNET_SCHEDULER_NO_TASK; + if (NULL == pp->heap) + return; + if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap)) + return; + GNUNET_assert (NULL == pp->pth); + pr = GNUNET_CONTAINER_heap_peek (pp->heap); + if (0) // FIXME: if (re)transmission should wait, wait... + { + delay = GNUNET_TIME_UNIT_SECONDS; + // FIXME + pp->task = GNUNET_SCHEDULER_add_delayed (delay, + &schedule_peer_transmission, + pp); + return; + } + msize = GSF_pending_request_get_message_ (pr, 0, NULL); + pp->pth = GSF_peer_transmit_ (pp->cp, + GNUNET_YES, + 0 /* FIXME: pr->priority? */, + GNUNET_TIME_UNIT_FOREVER_REL, + msize, + &transmit_message_callback, + pp); + GNUNET_assert (NULL != pp->pth); } @@ -56,31 +158,42 @@ get_heap (const struct GSF_ConnectedPeer *cp) * * @param cp peer with the entry * @param pr request with the entry - * @param weight determines position of the entry in the cp queue, - * lower weights are earlier in the queue */ void GSF_plan_add_ (const struct GSF_ConnectedPeer *cp, - struct GSF_PendingRequest *pr, - GNUNET_CONTAINER_HeapCostType weight) + struct GSF_PendingRequest *pr) { struct GNUNET_PeerIdentity id; - struct GNUNET_CONTAINER_Heap *h; - + struct PeerPlan *pp; + GNUNET_CONTAINER_HeapCostType weight; + GSF_connected_peer_get_identity_ (cp, &id); - h = GNUNET_CONTAINER_multihashmap_get (plans, - &id.hashPubKey); - if (NULL == h) + pp = GNUNET_CONTAINER_multihashmap_get (plans, + &id.hashPubKey); + if (NULL == pp) { - h = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + pp = GNUNET_malloc (sizeof (struct PeerPlan)); + pp->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); GNUNET_CONTAINER_multihashmap_put (plans, &id.hashPubKey, - h, + pp, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); } - GNUNET_CONTAINER_heap_insert (h, + weight = 0; // FIXME: calculate real weight! + GNUNET_CONTAINER_heap_insert (pp->heap, pr, weight); + if (pp->pth != NULL) + { + if (pr != GNUNET_CONTAINER_heap_peek (pp->heap)) + return; + GSF_peer_transmit_cancel_ (pp->pth); + pp->pth = NULL; + } + if (GNUNET_SCHEDULER_NO_TASK != pp->task) + GNUNET_SCHEDULER_cancel (pp->task); + pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, + pp); } @@ -94,15 +207,20 @@ void GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) { struct GNUNET_PeerIdentity id; - struct GNUNET_CONTAINER_Heap *h; + struct PeerPlan *pp; GSF_connected_peer_get_identity_ (cp, &id); - h = GNUNET_CONTAINER_multihashmap_get (plans, - &id.hashPubKey); + pp = GNUNET_CONTAINER_multihashmap_get (plans, + &id.hashPubKey); GNUNET_CONTAINER_multihashmap_remove (plans, &id.hashPubKey, - h); - GNUNET_CONTAINER_heap_destroy (h); + pp); + if (NULL != pp->pth) + GSF_peer_transmit_cancel_ (pp->pth); + if (GNUNET_SCHEDULER_NO_TASK != pp->task) + GNUNET_SCHEDULER_cancel (pp->task); + GNUNET_CONTAINER_heap_destroy (pp->heap); + GNUNET_free (pp); } @@ -152,11 +270,11 @@ find_request (void *cls, /** - * Remove the given request from all heaps. * + * Remove the given request from all heaps. * FIXME: O(n) -- inefficient! * * @param cls 'struct GSF_PendingRequest' to purge * @param key identity of the peer we're currently looking at (unused) - * @param value request heap for the given peer to search for the 'cls' + * @param value PeerPlan for the given peer to search for the 'cls' * @return GNUNET_OK (continue iteration) */ static int @@ -165,7 +283,8 @@ remove_request (void *cls, void *value) { const struct GSF_PendingRequest *pr = cls; - struct GNUNET_CONTAINER_Heap *h = value; + struct PeerPlan *pp = value; + struct GNUNET_CONTAINER_Heap *h = pp->heap; struct FindRequestClosure frc; frc.pr = pr; @@ -198,44 +317,6 @@ GSF_plan_notify_request_done_ (const struct GSF_PendingRequest *pr) } -/** - * Get the lowest-weight entry for the respective peer - * from the plan. Removes the entry from the plan's queue. - * - * @param cp connected peer to query for the next request - * @return NULL if the queue for this peer is empty - */ -struct GSF_PendingRequest * -GSF_plan_get_ (const struct GSF_ConnectedPeer *cp) -{ - struct GNUNET_CONTAINER_Heap *h; - - h = get_heap (cp); - if (NULL == h) - return NULL; - return GNUNET_CONTAINER_heap_remove_root (h); -} - - -/** - * Get the size of the request queue for the given peer. - * - * @param cp connected peer to query - * @return number of entries in this peer's request queue - */ -unsigned int -GSF_plan_size_ (const struct GSF_ConnectedPeer *cp) -{ - struct GNUNET_CONTAINER_Heap *h; - - h = get_heap (cp); - if (NULL == h) - return 0; - return GNUNET_CONTAINER_heap_get_size (h); -} - - - /** * Initialize plan subsystem. */ diff --git a/src/fs/gnunet-service-fs_pe.h b/src/fs/gnunet-service-fs_pe.h index d70001356..14e9dec2e 100644 --- a/src/fs/gnunet-service-fs_pe.h +++ b/src/fs/gnunet-service-fs_pe.h @@ -39,8 +39,7 @@ */ void GSF_plan_add_ (const struct GSF_ConnectedPeer *cp, - struct GSF_PendingRequest *pr, - GNUNET_CONTAINER_HeapCostType weight); + struct GSF_PendingRequest *pr); /** @@ -63,27 +62,6 @@ void GSF_plan_notify_request_done_ (const struct GSF_PendingRequest *pr); -/** - * Get the lowest-weight entry for the respective peer - * from the plan. Removes the entry from the plan's queue. - * - * @param cp connected peer to query for the next request - * @return NULL if the queue for this peer is empty - */ -struct GSF_PendingRequest * -GSF_plan_get_ (const struct GSF_ConnectedPeer *cp); - - -/** - * Get the size of the request queue for the given peer. - * - * @param cp connected peer to query - * @return number of entries in this peer's request queue - */ -unsigned int -GSF_plan_size_ (const struct GSF_ConnectedPeer *cp); - - /** * Initialize plan subsystem. */ diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 0fdcd0cf1..ff2f7a3a3 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -423,14 +423,12 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, * transmission to other peers (or at least determine its size). * * @param pr request to generate the message for - * @param do_route are we routing the reply * @param buf_size number of bytes available in buf * @param buf where to copy the message (can be NULL) * @return number of bytes needed (if > buf_size) or used */ size_t GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, - int do_route, size_t buf_size, void *buf) { @@ -444,10 +442,13 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, size_t bf_size; struct GNUNET_TIME_Absolute now; int64_t ttl; + int do_route; + k = 0; bm = 0; - if (GNUNET_YES != do_route) + do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY)); + if (! do_route) { bm |= GET_MESSAGE_BIT_RETURN_TO; k++; @@ -471,7 +472,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); gm->header.size = htons (msize); gm->type = htonl (pr->public_data.type); - if (GNUNET_YES == do_route) + if (do_route) prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, pr->public_data.priority + 1); else @@ -486,7 +487,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, gm->query = pr->public_data.query; ext = (GNUNET_HashCode*) &gm[1]; k = 0; - if (GNUNET_YES != do_route) + if (! do_route) GNUNET_PEER_resolve (pr->sender_pid, (struct GNUNET_PeerIdentity*) &ext[k++]); if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h index 885947295..9632df015 100644 --- a/src/fs/gnunet-service-fs_pr.h +++ b/src/fs/gnunet-service-fs_pr.h @@ -234,14 +234,12 @@ GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr); * transmission to other peers (or at least determine its size). * * @param pr request to generate the message for - * @param do_route are we routing the reply * @param buf_size number of bytes available in buf * @param buf where to copy the message (can be NULL) * @return number of bytes needed (if buf_size too small) or used */ size_t GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, - int do_route, size_t buf_size, void *buf);