From 07b5d5dad5ba589cde1c97e574de84e5c7d5d696 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 2 Feb 2011 15:34:14 +0000 Subject: [PATCH] cleaning --- src/fs/gnunet-service-fs_cp.c | 386 +++++++++++++++++++++++------ src/fs/gnunet-service-fs_cp.h | 60 +++-- src/fs/gnunet-service-fs_pe.h | 19 ++ src/include/gnunet_bandwidth_lib.h | 6 + src/include/gnunet_core_service.h | 2 +- 5 files changed, 367 insertions(+), 106 deletions(-) diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index 48e850cab..a3994adf5 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -121,13 +121,14 @@ struct GSF_ConnectedPeer /** * Context of our GNUNET_CORE_peer_change_preference call (or NULL). + * NULL if we have successfully reserved 32k, otherwise non-NULL. */ struct GNUNET_CORE_InformationRequestContext *irc; /** * ID of delay task for scheduling transmission. */ - GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; + GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: unused! /** * Increase in traffic preference still to be submitted @@ -234,7 +235,111 @@ static void update_atsi (struct GSF_ConnectedPeer *cp, const struct GNUNET_TRANSPORT_ATS_Information *atsi) { - // FIXME: merge atsi into cp's performance data! + struct GNUNET_TIME_Relative latency; + + latency = get_latency (atsi); + GNUNET_LOAD_value_set_decline (cp->transmission_delay, + latency); + /* LATER: merge atsi into cp's performance data (if we ever care...) */ +} + + +/** + * Core is ready to transmit to a peer, get the message. + * + * @param cls the 'struct GSF_PeerTransmitHandle' of the message + * @param size number of bytes core is willing to take + * @param buf where to copy the message + * @return number of bytes copied to buf + */ +static size_t +peer_transmit_ready_cb (void *cls, + size_t size, + void *buf) +{ + struct GSF_PeerTransmitHandle *pth = cls; + struct GSF_ConnectedPeer *cp; + size_t ret; + + if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (pth->timeout_task); + pth->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + cp = pth->cp; + GNUNET_CONTAINER_DLL_remove (cp->pth_head, + cp->pth_tail, + pth); + if (pth->is_query) + { + cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get (); + GNUNET_assert (0 < cp->ppd.pending_queries--); + } + else + { + GNUNET_assert (0 < cp->ppd.pending_replies--); + } + GNUNET_LOAD_update (cp->ppd.transmission_delay, + GNUNET_TIME_absolute_get_duration (pth->request_start_time).rel_value); + ret = pth->gmc (pth->gmc_cls, + 0, NULL); + GNUNET_free (pth); + return ret; +} + + +/** + * Function called by core upon success or failure of our bandwidth reservation request. + * + * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request + * @param peer identifies the peer + * @param bandwidth_out available amount of outbound bandwidth + * @param amount set to the amount that was actually reserved or unreserved; + * either the full requested amount or zero (no partial reservations) + * @param preference current traffic preference for the given peer + */ +static void +core_reserve_callback (void *cls, + const struct GNUNET_PeerIdentity * peer, + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, + int amount, + uint64_t preference) +{ + struct GSF_ConnectedPeer *cp = cls; + uint64_t ip; + + cp->irc = NULL; + if (0 == amount) + { + /* failed; retry! (how did we get here!?) */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to reserve bandwidth to peer `%s'\n"), + GNUNET_i2s (peer)); + ip = cp->inc_preference; + cp->inc_preference = 0; + cp->irc = GNUNET_CORE_peer_change_preference (core, + peer, + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_BANDWIDTH_VALUE_MAX, + GNUNET_FS_DBLOCK_SIZE, + ip, + &core_reserve_callback, + cp); + return; + } + pth = cp->pth_head; + if ( (NULL != pth) && + (NULL == pth->cth) ) + { + /* reservation success, try transmission now! */ + pth->cth = GNUNET_CORE_notify_transmit_ready (core, + priority, + GNUNET_TIME_absolute_get_remaining (pth->timeout), + &target, + size, + &peer_transmit_ready_cb, + pth); + } } @@ -258,6 +363,15 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer)); cp->transmission_delay = GNUNET_LOAD_value_init (latency); cp->pid = GNUNET_PEER_intern (peer); + cp->transmission_delay = GNUNET_LOAD_value_init (0); + cp->irc = GNUNET_CORE_peer_change_preference (core, + peer, + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_BANDWIDTH_VALUE_MAX, + GNUNET_FS_DBLOCK_SIZE, + 0, + &core_reserve_callback, + cp); fn = get_trust_filename (peer); if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) && (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust)))) @@ -269,40 +383,42 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, cp, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); update_atsi (cp, atsi); - - - // FIXME: notify plan & migration about new peer! - + GSF_plan_notify_new_peer_ (cp); return cp; } /** - * Core is ready to transmit to a peer, get the message. + * Handle P2P "MIGRATION_STOP" message. * - * @param cls the 'struct GSF_PeerTransmitHandle' of the message - * @param size number of bytes core is willing to take - * @param buf where to copy the message - * @return number of bytes copied to buf + * @param cls closure, always NULL + * @param other the other peer involved (sender or receiver, NULL + * for loopback messages where we are both sender and receiver) + * @param message the actual message + * @param atsi performance information + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) */ -static size_t -peer_transmit_ready_cb (void *cls, - size_t size, - void *buf) +int +GSF_handle_p2p_migration_stop_ (void *cls, + const struct GNUNET_PeerIdentity *other, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_TRANSPORT_ATS_Information *atsi) { - struct GSF_PeerTransmitHandle *pth = cls; - struct GSF_ConnectedPeer *cp; - size_t ret; + struct GSF_ConnectedPeer *cp; + const struct MigrationStopMessage *msm; - cp = pth->cp; - GNUNET_CONTAINER_DLL_remove (cp->pth_head, - cp->pth_tail, - pth); - // FIXME: update 'cp' counters! - ret = pth->gmc (pth->gmc_cls, - 0, NULL); - GNUNET_free (pth); - return ret; + msm = (const struct MigrationStopMessage*) message; + cp = GNUNET_CONTAINER_multihashmap_get (cp_map, + &other->hashPubKey); + if (cp == NULL) + { + GNUNET_break (0); + return GNUNET_OK; + } + cp->ppd.migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration)); + update_atsi (cp, atsi); + return GNUNET_OK; } @@ -325,7 +441,12 @@ peer_transmit_timeout (void *cls, GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth); - // FIXME: update 'cp' counters! + if (pth->is_query) + GNUNET_assert (0 < cp->ppd.pending_queries--); + else + GNUNET_assert (0 < cp->ppd.pending_replies--); + GNUNET_LOAD_update (cp->ppd.transmission_delay, + UINT64_MAX); pth->gmc (pth->gmc_cls, 0, NULL); GNUNET_free (pth); @@ -360,6 +481,8 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer, struct GSF_PeerTransmitHandle *pos; struct GSF_PeerTransmitHandle *prev; struct GNUNET_PeerIdentity target; + uint64_t ip; + int is_ready; cp = GNUNET_CONTAINER_multihashmap_get (cp_map, &peer->hashPubKey); @@ -393,19 +516,55 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer, pth); GNUNET_PEER_resolve (cp->pid, &target); - pth->cth = GNUNET_CORE_notify_transmit_ready (core, - priority, - timeout, - &target, - size, - &peer_transmit_ready_cb, - pth); - /* pth->cth could be NULL here, that's OK, we'll try again - later... */ + if (is_query) + { + /* query, need reservation */ + if (NULL == cp->irc) + { + /* reservation already done! */ + is_ready = GNUNET_YES; + ip = cp->inc_preference; + cp->inc_preference = 0; + cp->irc = GNUNET_CORE_peer_change_preference (core, + peer, + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_BANDWIDTH_VALUE_MAX, + GNUNET_FS_DBLOCK_SIZE, + ip, + &core_reserve_callback, + cp); + } + else + { + /* still waiting for reservation */ + is_ready = GNUNET_NO; + } + } + else + { + /* no reservation needed for content */ + is_ready = GNUNET_YES; + } + if (is_ready) + { + pth->cth = GNUNET_CORE_notify_transmit_ready (core, + priority, + timeout, + &target, + size, + &peer_transmit_ready_cb, + pth); + /* pth->cth could be NULL here, that's OK, we'll try again + later... */ + } if (pth->cth == NULL) - pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, - &peer_transmit_timeout, - pth); + { + /* if we're waiting for reservation OR if we could not do notify_transmit_ready, + install a timeout task to be on the safe side */ + pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, + &peer_transmit_timeout, + pth); + } return pth; } @@ -426,11 +585,19 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth) GNUNET_SCHEDULER_cancel (pth->timeout_task); pth->timeout_task = GNUNET_SCHEDULER_NO_TASK; } + if (NULL != pth->cth) + { + GNUNET_CORE_notify_transmit_ready_cancel (pth->cth); + pth->cth = NULL; + } cp = pth->cp; GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth); - // FIXME: update 'cp' counters! + if (pth->is_query) + GNUNET_assert (0 < cp->ppd.pending_queries--); + else + GNUNET_assert (0 < cp->ppd.pending_replies--); GNUNET_free (pth); } @@ -438,20 +605,37 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth) /** * Report on receiving a reply; update the performance record of the given peer. * - * @param peer responding peer (will be updated) + * @param cp responding peer (will be updated) * @param request_time time at which the original query was transmitted * @param request_priority priority of the original request * @param initiator_client local client on responsible for query (or NULL) * @param initiator_peer other peer responsible for query (or NULL) */ void -GSF_peer_update_performance_ (struct GSF_ConnectedPeer *peer, - GNUNET_TIME_Absolute request_time, +GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp, + struct GNUNET_TIME_Absolute request_time, uint32_t request_priority, const struct GSF_LocalClient *initiator_client, const struct GSF_ConnectedPeer *initiator_peer) { - // FIXME... + struct GNUNET_TIME_Relative delay; + unsigned int i; + + delay = GNUNET_TIME_absolute_get_duration (request_time); + cp->ppd.avg_reply_delay = (cp->ppd.avg_reply_delay * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N; + cp->ppd.avg_priority = (cp->avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N; + if (NULL != initiator_client) + { + cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client; + } + else if (NULL != initiator_peer) + { + GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1); + cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->pid; + GNUNET_PEER_change_rc (initiator_peer->pid, 1); + } + else + GNUNET_break (0); } @@ -495,6 +679,7 @@ GSF_peer_disconnect_handler_ (void *cls, const struct GNUNET_PeerIdentity *peer) { struct GSF_ConnectedPeer *cp; + struct GSF_PeerTransmitHandle *pth; cp = GNUNET_CONTAINER_multihashmap_get (cp_map, &peer->hashPubKey); @@ -502,7 +687,27 @@ GSF_peer_disconnect_handler_ (void *cls, GNUNET_CONTAINER_multihashmap_remove (cp_map, &peer->hashPubKey, cp); - // FIXME: more cleanup + if (NULL != cp->irc) + { + GNUNET_CORE_peer_change_preference_cancel (cp->irc); + cp->irc = NULL; + } + GSF_plan_notify_peer_disconnect_ (cp); + GNUNET_LOAD_value_free (cp->ppd.transmission_delay); + GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE); + while (NULL != (pth = cp->pth_head)) + { + if (NULL != pth->th) + { + GNUNET_CORE_notify_transmit_ready_cancel (pth->th); + pth->th = NULL; + } + GNUNET_CONTAINER_DLL_remove (cp->pth_head, + cp->pth_tail, + pth); + GNUNET_free (pth); + } + GNUNET_PEER_change_rc (cp->pid, -1); GNUNET_free (cp); } @@ -568,38 +773,6 @@ GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it, } -/** - * Try to reserve bandwidth (to receive data FROM the given peer). - * This function must only be called ONCE per connected peer at a - * time; it can be called again after the 'rc' callback was invoked. - * If the peer disconnects, the request is (silently!) ignored (and - * the requester is responsible to register for notification about the - * peer disconnect if any special action needs to be taken in this - * case). - * - * @param cp peer to reserve bandwidth from - * @param size number of bytes to reserve - * @param rc function to call upon reservation success or failure - * @param rc_cls closure for rc - */ -void -GSF_connected_peer_reserve_ (struct GSF_ConnectedPeer *cp, - size_t size, - GSF_PeerReserveCallback rc, - void *rc_cls) -{ - // FIXME: should we allow queueing multiple reservation requests? - // FIXME: what about cancellation? - // FIXME: change docu on peer disconnect handling? - if (NULL != cp->irc) - { - rc (rc_cls, cp, GNUNET_NO); - return; - } - // FIXME... -} - - /** * Write host-trust information to a file - flush the buffer entry! * @@ -643,6 +816,23 @@ flush_trust (void *cls, } +/** + * Notify core about a preference we have for the given peer + * (to allocate more resources towards it). The change will + * be communicated the next time we reserve bandwidth with + * core (not instantly). + * + * @param cp peer to reserve bandwidth from + * @param pref preference change + */ +void +GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp, + uint64_t pref) +{ + cp->inc_preference += pref; +} + + /** * Call this method periodically to flush trust information to disk. * @@ -715,7 +905,7 @@ void GSF_connected_peer_done_ () { cron_flush_trust (NULL, NULL); - GNUNET_CONTAINER_multihashmap_iterate (cp_peers, + GNUNET_CONTAINER_multihashmap_iterate (cp_map, &clean_peer, NULL); GNUNET_CONTAINER_multihashmap_destroy (cp_map); @@ -725,6 +915,44 @@ GSF_connected_peer_done_ () } +/** + * Iterator to remove references to LC entry. + * + * @param the 'struct GSF_LocalClient*' to look for + * @param key current key code + * @param value value in the hash map (peer entry) + * @return GNUNET_YES (we should continue to iterate) + */ +static int +clean_peer (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + const struct GSF_LocalClient *lc = cls; + struct GSF_ConnectedPeer *cp = value; + unsigned int i; + + for (i=0;ippd.last_client_replies[i] == lc) + cp->ppd.last_client_replies[i] = NULL; + return GNUNET_YES; +} + + +/** + * Notification that a local client disconnected. Clean up all of our + * references to the given handle. + * + * @param lc handle to the local client (henceforth invalid) + */ +void +GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc) +{ + GNUNET_CONTAINER_multihashmap_iterate (cp_map, + &clean_local_client, + (void*) lc); +} + #endif /* end of gnunet-service-fs_cp.h */ diff --git a/src/fs/gnunet-service-fs_cp.h b/src/fs/gnunet-service-fs_cp.h index 0c3652ae2..8b11dcd5d 100644 --- a/src/fs/gnunet-service-fs_cp.h +++ b/src/fs/gnunet-service-fs_cp.h @@ -196,15 +196,15 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth); /** * Report on receiving a reply; update the performance record of the given peer. * - * @param peer responding peer (will be updated) + * @param cp responding peer (will be updated) * @param request_time time at which the original query was transmitted * @param request_priority priority of the original request * @param initiator_client local client on responsible for query (or NULL) * @param initiator_peer other peer responsible for query (or NULL) */ void -GSF_peer_update_performance_ (struct GSF_ConnectedPeer *peer, - GNUNET_TIME_Absolute request_time, +GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp, + struct GNUNET_TIME_Absolute request_time, uint32_t request_priority, const struct GSF_LocalClient *initiator_client, const struct GSF_ConnectedPeer *initiator_peer); @@ -230,6 +230,24 @@ GSF_peer_status_handler_ (void *cls, const struct GNUNET_TRANSPORT_ATS_Information *atsi); +/** + * Handle P2P "MIGRATION_STOP" message. + * + * @param cls closure, always NULL + * @param other the other peer involved (sender or receiver, NULL + * for loopback messages where we are both sender and receiver) + * @param message the actual message + * @param atsi performance information + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +int +GSF_handle_p2p_migration_stop_ (void *cls, + const struct GNUNET_PeerIdentity *other, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_TRANSPORT_ATS_Information *atsi); + + /** * A peer disconnected from us. Tear down the connected peer * record. @@ -253,38 +271,28 @@ GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc); /** - * Iterate over all connected peers. + * Notify core about a preference we have for the given peer + * (to allocate more resources towards it). The change will + * be communicated the next time we reserve bandwidth with + * core (not instantly). * - * @param it function to call for each peer - * @param it_cls closure for it + * @param cp peer to reserve bandwidth from + * @param pref preference change */ void -GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it, - void *it_cls); +GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp, + uint64_t pref); -// FIXME: should we allow queueing multiple reservation requests? -// FIXME: what about cancellation? -// FIXME: change docu on peer disconnect handling? /** - * Try to reserve bandwidth (to receive data FROM the given peer). - * This function must only be called ONCE per connected peer at a - * time; it can be called again after the 'rc' callback was invoked. - * If the peer disconnects, the request is (silently!) ignored (and - * the requester is responsible to register for notification about the - * peer disconnect if any special action needs to be taken in this - * case). + * Iterate over all connected peers. * - * @param cp peer to reserve bandwidth from - * @param size number of bytes to reserve - * @param rc function to call upon reservation success - * @param rc_cls closure for rc + * @param it function to call for each peer + * @param it_cls closure for it */ void -GSF_connected_peer_reserve_ (struct GSF_ConnectedPeer *cp, - size_t size, - GSF_PeerReserveCallback rc, - void *rc_cls); +GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it, + void *it_cls); /** diff --git a/src/fs/gnunet-service-fs_pe.h b/src/fs/gnunet-service-fs_pe.h index 9f62ece76..24fbcbb7a 100644 --- a/src/fs/gnunet-service-fs_pe.h +++ b/src/fs/gnunet-service-fs_pe.h @@ -43,6 +43,25 @@ GSF_plan_entry_create_ (struct GSF_ConnectedPeer *cp, unsigned int position); +/** + * Notify the plan about a new peer to use. + * + * @param cp connected peer + */ +void +GSF_plan_notify_new_peer_ (struct GSF_ConnectedPeer *cp); + + +/** + * Notify the plan about a peer being no longer available. + * + * @param cp connected peer + */ +void +GSF_plan_notify_peer_disconnect_ (struct GSF_ConnectedPeer *cp); + + + /** * Get the first plan entry for the given connected peer. * FIXME... diff --git a/src/include/gnunet_bandwidth_lib.h b/src/include/gnunet_bandwidth_lib.h index 0484bfe13..43e75ed0a 100644 --- a/src/include/gnunet_bandwidth_lib.h +++ b/src/include/gnunet_bandwidth_lib.h @@ -93,6 +93,12 @@ struct GNUNET_BANDWIDTH_Value32NBO GNUNET_BANDWIDTH_value_init (uint32_t bytes_per_second); +/** + * Maximum possible bandwidth value. + */ +#define GNUNET_BANDWIDTH_VALUE_MAX GNUNET_BANDWIDTH_value_init(UINT32_MAX) + + /** * At the given bandwidth, calculate how much traffic will be * available until the given deadline. diff --git a/src/include/gnunet_core_service.h b/src/include/gnunet_core_service.h index a66fe2343..b93c86b79 100644 --- a/src/include/gnunet_core_service.h +++ b/src/include/gnunet_core_service.h @@ -325,7 +325,7 @@ struct GNUNET_CORE_InformationRequestContext; * @param timeout after how long should we give up (and call "info" with NULL * for "peer" to signal an error)? * @param bw_out set to the current bandwidth limit (sending) for this peer, - * caller should set "bpm_out" to "-1" to avoid changing + * caller should set "bpm_out" to "GNUNET_BANDWIDTH_VALUE_MAX" to avoid changing * the current value; otherwise "bw_out" will be lowered to * the specified value; passing a pointer to "0" can be used to force * us to disconnect from the peer; "bw_out" might not increase -- 2.25.1