From 41700b6cb0345080513d82bdbe2dbe33c8654491 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 21 Apr 2011 15:14:14 +0000 Subject: [PATCH] wip --- src/fs/fs_download.c | 2 +- src/fs/gnunet-service-fs_cp.c | 299 +++++++++++++++++----------- src/fs/gnunet-service-fs_lc.c | 46 ++++- src/fs/gnunet-service-fs_pe.c | 73 ++++--- src/fs/gnunet-service-fs_pr.c | 2 +- src/fs/test_gnunet_service_fs_p2p.c | 4 +- 6 files changed, 257 insertions(+), 169 deletions(-) diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c index 785803edf..8192b8c1f 100644 --- a/src/fs/fs_download.c +++ b/src/fs/fs_download.c @@ -31,7 +31,7 @@ #include "fs.h" #include "fs_tree.h" -#define DEBUG_DOWNLOAD GNUNET_YES +#define DEBUG_DOWNLOAD GNUNET_NO /** * Determine if the given download (options and meta data) should cause diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index 5aba83298..2522cbe7b 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -104,6 +104,11 @@ struct GSF_PeerTransmitHandle */ int is_query; + /** + * Did we get a reservation already? + */ + int was_reserved; + /** * Priority of this request. */ @@ -112,6 +117,30 @@ struct GSF_PeerTransmitHandle }; +/** + * Information per peer and request. + */ +struct PeerRequest +{ + + /** + * Handle to generic request. + */ + struct GSF_PendingRequest *pr; + + /** + * Handle to specific peer. + */ + struct GSF_ConnectedPeer *cp; + + /** + * Task for asynchronous stopping of this request. + */ + GNUNET_SCHEDULER_TaskIdentifier kill_task; + +}; + + /** * A connected peer. */ @@ -162,7 +191,7 @@ struct GSF_ConnectedPeer GNUNET_SCHEDULER_TaskIdentifier irc_delay_task; /** - * Active requests from this neighbour. + * Active requests from this neighbour, map of query to 'struct PeerRequest'. */ struct GNUNET_CONTAINER_MultiHashMap *request_map; @@ -288,6 +317,92 @@ GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp) } +/** + * Core is ready to transmit to a peer, get the message. + * + * @param cls the 'struct GSF_PeerTransmitHandle' of the message + * @param size number of bytes core is willing to take + * @param buf where to copy the message + * @return number of bytes copied to buf + */ +static size_t +peer_transmit_ready_cb (void *cls, + size_t size, + void *buf); + + + + +/** + * Function called by core upon success or failure of our bandwidth reservation request. + * + * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request + * @param peer identifies the peer + * @param bandwidth_out available amount of outbound bandwidth + * @param amount set to the amount that was actually reserved or unreserved; + * either the full requested amount or zero (no partial reservations) + * @param res_delay if the reservation could not be satisfied (amount was 0), how + * long should the client wait until re-trying? + * @param preference current traffic preference for the given peer + */ +static void +core_reserve_callback (void *cls, + const struct GNUNET_PeerIdentity *peer, + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, + int32_t amount, + struct GNUNET_TIME_Relative res_delay, + uint64_t preference); + + +/** + * If ready (bandwidth reserved), try to schedule transmission via + * core for the given handle. + * + * @param pth transmission handle to schedule + */ +static void +schedule_transmission (struct GSF_PeerTransmitHandle *pth) +{ + struct GSF_ConnectedPeer *cp; + struct GNUNET_PeerIdentity target; + uint64_t ip; + + if (NULL != pth->cth) + return; /* already done */ + cp = pth->cp; + GNUNET_PEER_resolve (cp->ppd.pid, + &target); + if ( (GNUNET_YES == pth->is_query) && + (GNUNET_YES != pth->was_reserved) ) + { + /* query, need reservation */ + if (GNUNET_YES != cp->did_reserve) + return; /* not ready */ + cp->did_reserve = GNUNET_NO; + /* reservation already done! */ + pth->was_reserved = GNUNET_YES; + ip = cp->inc_preference; + cp->inc_preference = 0; + cp->irc = GNUNET_CORE_peer_change_preference (GSF_core, + &target, + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_BANDWIDTH_VALUE_MAX, + DBLOCK_SIZE, + ip, + &core_reserve_callback, + cp); + } + pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core, + GNUNET_YES, + pth->priority, + GNUNET_TIME_absolute_get_remaining (pth->timeout), + &target, + pth->size, + &peer_transmit_ready_cb, + pth); +} + + /** * Core is ready to transmit to a peer, get the message. * @@ -305,6 +420,7 @@ peer_transmit_ready_cb (void *cls, struct GSF_ConnectedPeer *cp; size_t ret; + pth->cth = NULL; if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (pth->timeout_task); @@ -327,32 +443,13 @@ peer_transmit_ready_cb (void *cls, GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value); ret = pth->gmc (pth->gmc_cls, size, buf); - GNUNET_free (pth); + GNUNET_free (pth); + for (pth = cp->pth_head; pth != NULL; pth = pth->next) + schedule_transmission (pth); return ret; } -/** - * Function called by core upon success or failure of our bandwidth reservation request. - * - * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request - * @param peer identifies the peer - * @param bandwidth_out available amount of outbound bandwidth - * @param amount set to the amount that was actually reserved or unreserved; - * either the full requested amount or zero (no partial reservations) - * @param res_delay if the reservation could not be satisfied (amount was 0), how - * long should the client wait until re-trying? - * @param preference current traffic preference for the given peer - */ -static void -core_reserve_callback (void *cls, - const struct GNUNET_PeerIdentity *peer, - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, - int32_t amount, - struct GNUNET_TIME_Relative res_delay, - uint64_t preference); - - /** * (re)try to reserve bandwidth from the given peer. * @@ -606,6 +703,36 @@ copy_reply (void *cls, } +/** + * Free the given client request. + * + * @param cls the client request to free + * @param tc task context + */ +static void +peer_request_destroy (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerRequest *peerreq = cls; + struct GSF_PendingRequest *pr = peerreq->pr; + struct GSF_ConnectedPeer *cp = peerreq->cp; + struct GSF_PendingRequestData *prd; + + peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK; + prd = GSF_pending_request_get_data_ (pr); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# P2P searches active"), + -1, + GNUNET_NO); + GNUNET_break (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (cp->request_map, + &prd->query, + peerreq)); + GSF_pending_request_cancel_ (pr); + GNUNET_free (peerreq); +} + + /** * Handle a reply to a pending request. Also called if a request * expires (then with data == NULL). The handler may be called @@ -614,8 +741,7 @@ copy_reply (void *cls, * and will also not be called anymore after a call signalling * expiration. * - * @param cls 'struct GSF_ConnectedPeer' of the peer that would - * have liked an answer to the request + * @param cls 'struct PeerRequest' this is an answer for * @param eval evaluation of the result * @param pr handle to the original pending request * @param expiration when does 'data' expire? @@ -632,12 +758,14 @@ handle_p2p_reply (void *cls, const void *data, size_t data_len) { - struct GSF_ConnectedPeer *cp = cls; + struct PeerRequest *peerreq = cls; + struct GSF_ConnectedPeer *cp = peerreq->cp; struct GSF_PendingRequestData *prd; struct PutMessage *pm; size_t msize; GNUNET_assert (data_len < GNUNET_SERVER_MAX_MESSAGE_SIZE); + GNUNET_assert (peerreq->pr == pr); prd = GSF_pending_request_get_data_ (pr); if (NULL == data) { @@ -648,7 +776,8 @@ handle_p2p_reply (void *cls, GNUNET_break (GNUNET_OK == GNUNET_CONTAINER_multihashmap_remove (cp->request_map, &prd->query, - pr)); + peerreq)); + GNUNET_free (peerreq); return; } GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY); @@ -687,15 +816,8 @@ handle_p2p_reply (void *cls, pm); if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) return; - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# P2P searches active"), - -1, - GNUNET_NO); - GNUNET_break (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_remove (cp->request_map, - &prd->query, - pr)); - GSF_pending_request_cancel_ (pr); + peerreq->kill_task = GNUNET_SCHEDULER_add_now (&peer_request_destroy, + peerreq); } @@ -844,6 +966,7 @@ struct GSF_PendingRequest * GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, const struct GNUNET_MessageHeader *message) { + struct PeerRequest *peerreq; struct GSF_PendingRequest *pr; struct GSF_PendingRequestData *prd; struct GSF_ConnectedPeer *cp; @@ -1002,10 +1125,11 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, ttl -= ttl_decrement; /* test if the request already exists */ - pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map, - &gm->query); - if (pr != NULL) + peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map, + &gm->query); + if (peerreq != NULL) { + pr = peerreq->pr; prd = GSF_pending_request_get_data_ (pr); if ( (prd->type == type) && ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) || @@ -1034,10 +1158,15 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (cp->request_map, &gm->query, - pr)); + peerreq)); + if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (peerreq->kill_task); + GNUNET_free (peerreq); } } + peerreq = GNUNET_malloc (sizeof (struct PeerRequest)); + peerreq->cp = cp; pr = GSF_pending_request_create_ (options, type, &gm->query, @@ -1052,11 +1181,12 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, spid, NULL, 0, /* replies_seen */ &handle_p2p_reply, - cp); + peerreq); + peerreq->pr = pr; GNUNET_break (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query, - pr, + peerreq, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches received"), @@ -1131,9 +1261,6 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, struct GSF_PeerTransmitHandle *pth; struct GSF_PeerTransmitHandle *pos; struct GSF_PeerTransmitHandle *prev; - struct GNUNET_PeerIdentity target; - uint64_t ip; - int is_ready; pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle)); pth->transmission_request_start_time = GNUNET_TIME_absolute_get (); @@ -1162,79 +1289,15 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, cp->pth_tail, prev, pth); - GNUNET_PEER_resolve (cp->ppd.pid, - &target); if (GNUNET_YES == is_query) - { - /* query, need reservation */ - cp->ppd.pending_queries++; - if (GNUNET_YES == cp->did_reserve) - { - cp->did_reserve = GNUNET_NO; - /* reservation already done! */ - is_ready = GNUNET_YES; - ip = cp->inc_preference; - cp->inc_preference = 0; - cp->irc = GNUNET_CORE_peer_change_preference (GSF_core, - &target, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_BANDWIDTH_VALUE_MAX, - DBLOCK_SIZE, - ip, - &core_reserve_callback, - cp); - } - else - { - /* still waiting for reservation */ - is_ready = GNUNET_NO; - } - } + cp->ppd.pending_queries++; else if (GNUNET_NO == is_query) - { - /* no reservation needed for content */ - cp->ppd.pending_replies++; - is_ready = GNUNET_YES; - } - else - { - /* not a query or content, no reservation needed */ - is_ready = GNUNET_YES; - } - if (is_ready) - { - pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core, - GNUNET_YES, - priority, - timeout, - &target, - size, - &peer_transmit_ready_cb, + cp->ppd.pending_replies++; + + pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, + &peer_transmit_timeout, pth); - /* pth->cth could be NULL here, that's OK, we'll try again - later... */ - } - else - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not ready to ask for transmission to `%s'\n", - GNUNET_i2s (&target)); -#endif - } - if (pth->cth == NULL) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No transmission task scheduled, creating timeout task (%llu ms)\n", - (unsigned long long) timeout.rel_value); -#endif - /* if we're waiting for reservation OR if we could not do notify_transmit_ready, - install a timeout task to be on the safe side */ - pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, - &peer_transmit_timeout, - pth); - } + schedule_transmission (pth); return pth; } @@ -1364,9 +1427,13 @@ cancel_pending_request (void *cls, const GNUNET_HashCode *query, void *value) { - struct GSF_PendingRequest *pr = value; + struct PeerRequest *peerreq = cls; + struct GSF_PendingRequest *pr = peerreq->pr; GSF_pending_request_cancel_ (pr); + if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (peerreq->kill_task); + GNUNET_free (peerreq); return GNUNET_OK; } diff --git a/src/fs/gnunet-service-fs_lc.c b/src/fs/gnunet-service-fs_lc.c index 58a1a0933..15dd4897a 100644 --- a/src/fs/gnunet-service-fs_lc.c +++ b/src/fs/gnunet-service-fs_lc.c @@ -58,6 +58,11 @@ struct ClientRequest */ struct GSF_LocalClient *lc; + /** + * Task scheduled to destroy the request. + */ + GNUNET_SCHEDULER_TaskIdentifier kill_task; + }; @@ -179,6 +184,33 @@ GSF_local_client_lookup_ (struct GNUNET_SERVER_Client *client) } +/** + * Free the given client request. + * + * @param cls the client request to free + * @param tc task context + */ +static void +client_request_destroy (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct ClientRequest *cr = cls; + struct GSF_LocalClient *lc; + + cr->kill_task = GNUNET_SCHEDULER_NO_TASK; + lc = cr->lc; + GNUNET_CONTAINER_DLL_remove (lc->cr_head, + lc->cr_tail, + cr); + GSF_pending_request_cancel_ (cr->pr); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# client searches active"), + - 1, + GNUNET_NO); + GNUNET_free (cr); +} + + /** * Handle a reply to a pending request. Also called if a request * expires (then with data == NULL). The handler may be called @@ -246,16 +278,8 @@ client_response_handler (void *cls, #endif if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) return; - GNUNET_CONTAINER_DLL_remove (lc->cr_head, - lc->cr_tail, - cr); - GSF_pending_request_cancel_ (cr->pr); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# client searches active"), - - 1, - GNUNET_NO); - GNUNET_free (cr); - + cr->kill_task = GNUNET_SCHEDULER_add_now (&client_request_destroy, + cr); } @@ -489,6 +513,8 @@ GSF_client_disconnect_handler_ (void *cls, gettext_noop ("# client searches active"), - 1, GNUNET_NO); + if (GNUNET_SCHEDULER_NO_TASK != cr->kill_task) + GNUNET_SCHEDULER_cancel (cr->kill_task); GNUNET_free (cr); } while (NULL != (res = pos->res_head)) diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index b3f46cf45..28036150f 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c @@ -120,6 +120,17 @@ struct PeerPlan static struct GNUNET_CONTAINER_MultiHashMap *plans; +/** + * Figure out when and how to transmit to the given peer. + * + * @param cls the 'struct GSF_ConnectedPeer' for transmission + * @param tc scheduler context + */ +static void +schedule_peer_transmission (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + + /** * Insert the given request plan into the heap with the appropriate weight. * @@ -156,20 +167,12 @@ plan (struct PeerPlan *pp, rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp, rp->earliest_transmission.abs_value); + if (GNUNET_SCHEDULER_NO_TASK != pp->task) + GNUNET_SCHEDULER_cancel (pp->task); + pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); } -/** - * Figure out when and how to transmit to the given peer. - * - * @param cls the 'struct GSF_ConnectedPeer' for transmission - * @param tc scheduler context - */ -static void -schedule_peer_transmission (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc); - - /** * Function called to get a message for transmission. * @@ -238,7 +241,11 @@ schedule_peer_transmission (void *cls, size_t msize; pp->task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (NULL == pp->pth); + if (pp->pth != NULL) + { + GSF_peer_transmit_cancel_ (pp->pth); + pp->pth = NULL; + } /* move ready requests to priority queue */ while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) ) @@ -253,7 +260,20 @@ schedule_peer_transmission (void *cls, /* priority heap (still) empty, check for delay... */ rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap); if (NULL == rp) - return; /* both queues empty */ + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No active requests for plan %p.\n", + pp); +#endif + return; /* both queues empty */ + } +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sleeping for %llu ms before retrying requests on plan %p.\n", + (unsigned long long) GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value, + pp); +#endif pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission), &schedule_peer_transmission, pp); @@ -267,7 +287,7 @@ schedule_peer_transmission (void *cls, rp); #endif GNUNET_assert (NULL != rp); - msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL); + msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL); pp->pth = GSF_peer_transmit_ (pp->cp, GNUNET_YES, rp->priority, @@ -322,31 +342,6 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp, prd->rp_tail, rp); plan (pp, rp); - if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap)) - { - /* no request that should be done immediately, figure out delay */ - if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap)) - return; /* did not change delay heap top, no need to do anything */ - GNUNET_assert (NULL == pp->pth); - if (GNUNET_SCHEDULER_NO_TASK != pp->task) - GNUNET_SCHEDULER_cancel (pp->task); - pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission), - &schedule_peer_transmission, - pp); - return; - } - - if (pp->pth != NULL) - { - if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap)) - return; /* did not change priority heap top, no need to do anyhing */ - GSF_peer_transmit_cancel_ (pp->pth); - pp->pth = NULL; - } - if (GNUNET_SCHEDULER_NO_TASK != pp->task) - GNUNET_SCHEDULER_cancel (pp->task); - pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, - pp); } diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index f327d9b4b..7406bed0f 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -1008,7 +1008,7 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr) */ static void process_local_reply (void *cls, - const GNUNET_HashCode * key, + const GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, diff --git a/src/fs/test_gnunet_service_fs_p2p.c b/src/fs/test_gnunet_service_fs_p2p.c index 3bb808c48..91cfda0eb 100644 --- a/src/fs/test_gnunet_service_fs_p2p.c +++ b/src/fs/test_gnunet_service_fs_p2p.c @@ -34,9 +34,9 @@ #define FILESIZE (1024 * 1024 * 2) /** - * How long until we give up on transmitting the message? + * How long until we give up on the download? */ -#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 600) +#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60) #define NUM_DAEMONS 2 -- 2.25.1