X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fcadet%2Fgnunet-service-cadet_peer.c;h=71c7c67d0c11af201e23b08dd201720eb82719f0;hb=c831b76d24e75c692ab6141f327d1a91d391f93b;hp=f3161383ae2f13980068d42c8a057c7a23750293;hpb=79f53fe65e00a29889b290fca9c9390101ea537e;p=oweals%2Fgnunet.git diff --git a/src/cadet/gnunet-service-cadet_peer.c b/src/cadet/gnunet-service-cadet_peer.c index f3161383a..71c7c67d0 100644 --- a/src/cadet/gnunet-service-cadet_peer.c +++ b/src/cadet/gnunet-service-cadet_peer.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2013, 2015 Christian Grothoff (and other contributing authors) + Copyright (C) 2001-2017 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -17,128 +17,138 @@ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ + /** * @file cadet/gnunet-service-cadet_peer.c - * @brief GNUnet CADET service connection handling + * @brief Information we track per peer. * @author Bartlomiej Polot + * @author Christian Grothoff + * + * TODO: + * - optimize stopping/restarting DHT search to situations + * where we actually need it (i.e. not if we have a direct connection, + * or if we already have plenty of good short ones, or maybe even + * to take a break if we have some connections and have searched a lot (?)) */ #include "platform.h" #include "gnunet_util_lib.h" +#include "gnunet_hello_lib.h" #include "gnunet_signatures.h" #include "gnunet_transport_service.h" +#include "gnunet_ats_service.h" #include "gnunet_core_service.h" #include "gnunet_statistics_service.h" #include "cadet_protocol.h" -#include "gnunet-service-cadet_peer.h" -#include "gnunet-service-cadet_dht.h" #include "gnunet-service-cadet_connection.h" -#include "gnunet-service-cadet_tunnel.h" -#include "cadet_path.h" +#include "gnunet-service-cadet_dht.h" +#include "gnunet-service-cadet_peer.h" +#include "gnunet-service-cadet_paths.h" +#include "gnunet-service-cadet_tunnels.h" + + +#define LOG(level, ...) GNUNET_log_from(level,"cadet-per",__VA_ARGS__) + + +/** + * How long do we wait until tearing down an idle peer? + */ +#define IDLE_PEER_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5) + +/** + * How long do we keep paths around if we no longer care about the peer? + */ +#define IDLE_PATH_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2) -#define LOG(level, ...) GNUNET_log_from (level,"cadet-p2p",__VA_ARGS__) -#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-p2p",__VA_ARGS__) -/******************************************************************************/ -/******************************** STRUCTS **********************************/ -/******************************************************************************/ /** - * Struct containing info about a queued transmission to this peer + * Data structure used to track whom we have to notify about changes + * to our message queue. */ -struct CadetPeerQueue +struct GCP_MessageQueueManager { - /** - * DLL next - */ - struct CadetPeerQueue *next; /** - * DLL previous + * Kept in a DLL. */ - struct CadetPeerQueue *prev; + struct GCP_MessageQueueManager *next; /** - * Peer this transmission is directed to. + * Kept in a DLL. */ - struct CadetPeer *peer; + struct GCP_MessageQueueManager *prev; /** - * Connection this message belongs to. + * Function to call with updated message queue object. */ - struct CadetConnection *c; + GCP_MessageQueueNotificationCallback cb; /** - * Is FWD in c? + * Closure for @e cb. */ - int fwd; + void *cb_cls; /** - * Pointer to info stucture used as cls. + * The peer this is for. */ - void *cls; + struct CadetPeer *cp; /** - * Type of message + * Envelope this manager would like to transmit once it is its turn. */ - uint16_t type; + struct GNUNET_MQ_Envelope *env; - /** - * Type of message - */ - uint16_t payload_type; +}; - /** - * Type of message - */ - uint32_t payload_id; +/** + * Struct containing all information regarding a given peer + */ +struct CadetPeer +{ /** - * Size of the message + * ID of the peer */ - size_t size; + struct GNUNET_PeerIdentity pid; /** - * Set when this message starts waiting for CORE. + * Last time we heard from this peer (currently not used!) */ - struct GNUNET_TIME_Absolute start_waiting; + struct GNUNET_TIME_Absolute last_contactXXX; /** - * Function to call on sending. + * Array of DLLs of paths traversing the peer, organized by the + * offset of the peer on the larger path. */ - GCP_sent cont; + struct CadetPeerPathEntry **path_heads; /** - * Closure for callback. + * Array of DLL of paths traversing the peer, organized by the + * offset of the peer on the larger path. */ - void *cont_cls; -}; - + struct CadetPeerPathEntry **path_tails; -/** - * Struct containing all information regarding a given peer - */ -struct CadetPeer -{ /** - * ID of the peer + * Notifications to call when @e core_mq changes. */ - GNUNET_PEER_Id id; + struct GCP_MessageQueueManager *mqm_head; /** - * Last time we heard from this peer + * Notifications to call when @e core_mq changes. */ - struct GNUNET_TIME_Absolute last_contact; + struct GCP_MessageQueueManager *mqm_tail; /** - * Paths to reach the peer, ordered by ascending hop count + * Pointer to first "ready" entry in @e mqm_head. */ - struct CadetPeerPath *path_head; + struct GCP_MessageQueueManager *mqm_ready_ptr; /** - * Paths to reach the peer, ordered by ascending hop count + * MIN-heap of paths owned by this peer (they also end at this + * peer). Ordered by desirability. */ - struct CadetPeerPath *path_tail; + struct GNUNET_CONTAINER_Heap *path_heap; /** * Handle to stop the DHT search for paths to this peer @@ -146,39 +156,45 @@ struct CadetPeer struct GCD_search_handle *search_h; /** - * Handle to stop the DHT search for paths to this peer + * Task to clean up @e path_heap asynchronously. + */ + struct GNUNET_SCHEDULER_Task *heap_cleanup_task; + + /** + * Task to destroy this entry. */ - struct GNUNET_SCHEDULER_Task *search_delayed; + struct GNUNET_SCHEDULER_Task *destroy_task; /** * Tunnel to this peer, if any. */ - struct CadetTunnel *tunnel; + struct CadetTunnel *t; /** * Connections that go through this peer; indexed by tid. */ - struct GNUNET_CONTAINER_MultiHashMap *connections; + struct GNUNET_CONTAINER_MultiShortmap *connections; /** - * Handle for queued transmissions + * Handle for core transmissions. */ - struct GNUNET_CORE_TransmitHandle *core_transmit; + struct GNUNET_MQ_Handle *core_mq; /** - * Timestamp + * Hello message of the peer. */ - struct GNUNET_TIME_Absolute tmt_time; + struct GNUNET_HELLO_Message *hello; /** - * Transmission queue to core DLL head + * Handle to us offering the HELLO to the transport. */ - struct CadetPeerQueue *queue_head; + struct GNUNET_TRANSPORT_OfferHelloHandle *hello_offer; /** - * Transmission queue to core DLL tail + * Handle to our ATS request asking ATS to suggest an address + * to TRANSPORT for this peer (to establish a direct link). */ - struct CadetPeerQueue *queue_tail; + struct GNUNET_ATS_ConnectivitySuggestHandle *connectivity_suggestion; /** * How many messages are in the queue to this peer. @@ -186,2479 +202,1276 @@ struct CadetPeer unsigned int queue_n; /** - * Hello message. + * How many paths do we have to this peer (in all @e path_heads DLLs combined). */ - struct GNUNET_HELLO_Message* hello; -}; - + unsigned int num_paths; -/******************************************************************************/ -/******************************* GLOBALS ***********************************/ -/******************************************************************************/ + /** + * Sum over all of the offsets of all of the paths in the @a path_heads DLLs. + * Used to speed-up @GCP_get_desirability_of_path() calculation. + */ + unsigned int off_sum; -/** - * Global handle to the statistics service. - */ -extern struct GNUNET_STATISTICS_Handle *stats; + /** + * Number of message queue managers of this peer that have a message in waiting. + * + * Used to quickly see if we need to bother scanning the @e msm_head DLL. + * TODO: could be replaced by another DLL that would then allow us to avoid + * the O(n)-scan of the DLL for ready entries! + */ + unsigned int mqm_ready_counter; -/** - * Local peer own ID (full value). - */ -extern struct GNUNET_PeerIdentity my_full_id; + /** + * Current length of the @e path_heads and @path_tails arrays. + * The arrays should be grown as needed. + */ + unsigned int path_dll_length; -/** - * Local peer own ID (short) - */ -extern GNUNET_PEER_Id myid; +}; -/** - * Peers known, indexed by PeerIdentity, values of type `struct CadetPeer`. - */ -static struct GNUNET_CONTAINER_MultiPeerMap *peers; /** - * How many peers do we want to remember? + * Get the static string for a peer ID. + * + * @param cp Peer. + * @return Static string for it's ID. */ -static unsigned long long max_peers; +const char * +GCP_2s (const struct CadetPeer *cp) +{ + static char buf[32]; -/** - * Percentage of messages that will be dropped (for test purposes only). - */ -static unsigned long long drop_percent; + GNUNET_snprintf (buf, + sizeof (buf), + "P(%s)", + GNUNET_i2s (&cp->pid)); + return buf; +} -/** - * Handle to communicate with core. - */ -static struct GNUNET_CORE_Handle *core_handle; /** - * Handle to try to start new connections. + * Calculate how desirable a path is for @a cp if @a cp + * is at offset @a off. + * + * The 'desirability_table.c' program can be used to compute a list of + * sample outputs for different scenarios. Basically, we score paths + * lower if there are many alternatives, and higher if they are + * shorter than average, and very high if they are much shorter than + * average and without many alternatives. + * + * @param cp a peer reachable via a path + * @param off offset of @a cp in the path + * @return score how useful a path is to reach @a cp, + * positive scores mean path is more desirable */ -static struct GNUNET_TRANSPORT_Handle *transport_handle; +double +GCP_get_desirability_of_path (struct CadetPeer *cp, + unsigned int off) +{ + unsigned int num_alts = cp->num_paths; + unsigned int off_sum; + double avg_sum; + double path_delta; + double weight_alts; + + GNUNET_assert (num_alts >= 1); /* 'path' should be in there! */ + GNUNET_assert (0 != cp->path_dll_length); + + /* We maintain 'off_sum' in 'peer' and thereby + avoid the SLOW recalculation each time. Kept here + just to document what is going on. */ +#if SLOW + off_sum = 0; + for (unsigned int j=0;jpath_dll_length;j++) + for (struct CadetPeerPathEntry *pe = cp->path_heads[j]; + NULL != pe; + pe = pe->next) + off_sum += j; + GNUNET_assert (off_sum == cp->off_sum); +#else + off_sum = cp->off_sum; +#endif + avg_sum = off_sum * 1.0 / cp->path_dll_length; + path_delta = off - avg_sum; + /* path_delta positiv: path off of peer above average (bad path for peer), + path_delta negativ: path off of peer below average (good path for peer) */ + if (path_delta <= - 1.0) + weight_alts = - num_alts / path_delta; /* discount alternative paths */ + else if (path_delta >= 1.0) + weight_alts = num_alts * path_delta; /* overcount alternative paths */ + else + weight_alts = num_alts; /* count alternative paths normally */ -/** - * Shutdown falg. - */ -static int in_shutdown; + /* off+1: long paths are generally harder to find and thus count + a bit more as they get longer. However, above-average paths + still need to count less, hence the squaring of that factor. */ + return (off + 1.0) / (weight_alts * weight_alts); +} -/******************************************************************************/ -/***************************** DEBUG *********************************/ -/******************************************************************************/ /** - * Log all kinds of info about the queueing status of a peer. + * This peer is no longer be needed, clean it up now. * - * @param p Peer whose queue to show. - * @param level Error level to use for logging. + * @param cls peer to clean up */ static void -queue_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level) +destroy_peer (void *cls) { - struct GNUNET_TIME_Relative core_wait_time; - struct CadetPeerQueue *q; - int do_log; - - do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK), - "cadet-p2p", - __FILE__, __FUNCTION__, __LINE__); - if (0 == do_log) - return; + struct CadetPeer *cp = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Destroying state about peer %s\n", + GCP_2s (cp)); + cp->destroy_task = NULL; + GNUNET_assert (NULL == cp->t); + GNUNET_assert (NULL == cp->core_mq); + GNUNET_assert (0 == cp->num_paths); + for (unsigned int i=0;ipath_dll_length;i++) + GNUNET_assert (NULL == cp->path_heads[i]); + GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (cp->connections)); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (peers, + &cp->pid, + cp)); + GNUNET_free_non_null (cp->path_heads); + GNUNET_free_non_null (cp->path_tails); + cp->path_dll_length = 0; + if (NULL != cp->search_h) + { + GCD_search_stop (cp->search_h); + cp->search_h = NULL; + } + /* FIXME: clean up search_delayedXXX! */ - LOG2 (level, "QQQ Message queue towards %s\n", GCP_2s (p)); - LOG2 (level, "QQQ queue length: %u\n", p->queue_n); - LOG2 (level, "QQQ core tmt rdy: %p\n", p->core_transmit); - if (NULL != p->core_transmit) + if (NULL != cp->hello_offer) { - core_wait_time = GNUNET_TIME_absolute_get_duration (p->tmt_time); - LOG2 (level, "QQQ core called %s ago\n", - GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_NO)); + GNUNET_TRANSPORT_offer_hello_cancel (cp->hello_offer); + cp->hello_offer = NULL; } - for (q = p->queue_head; NULL != q; q = q->next) + if (NULL != cp->connectivity_suggestion) { - LOG2 (level, "QQQ - %s %s on %s\n", - GC_m2s (q->type), GC_f2s (q->fwd), GCC_2s (q->c)); - LOG2 (level, "QQQ payload %s, %u\n", - GC_m2s (q->payload_type), q->payload_id); - LOG2 (level, "QQQ size: %u bytes\n", q->size); + GNUNET_ATS_connectivity_suggest_cancel (cp->connectivity_suggestion); + cp->connectivity_suggestion = NULL; } - - LOG2 (level, "QQQ End queue towards %s\n", GCP_2s (p)); + GNUNET_CONTAINER_multishortmap_destroy (cp->connections); + if (NULL != cp->path_heap) + { + GNUNET_CONTAINER_heap_destroy (cp->path_heap); + cp->path_heap = NULL; + } + if (NULL != cp->heap_cleanup_task) + { + GNUNET_SCHEDULER_cancel (cp->heap_cleanup_task); + cp->heap_cleanup_task = NULL; + } + GNUNET_free_non_null (cp->hello); + /* Peer should not be freed if paths exist; if there are no paths, + there ought to be no connections, and without connections, no + notifications. Thus we can assert that mqm_head is empty at this + point. */ + GNUNET_assert (NULL == cp->mqm_head); + GNUNET_assert (NULL == cp->mqm_ready_ptr); + GNUNET_free (cp); } /** - * Log all kinds of info about a peer. + * This peer is now on more "active" duty, activate processes related to it. * - * @param peer Peer. + * @param cp the more-active peer */ -void -GCP_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level) +static void +consider_peer_activate (struct CadetPeer *cp) { - struct CadetPeerPath *path; - unsigned int conns; - int do_log; + uint32_t strength; - do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK), - "cadet-p2p", - __FILE__, __FUNCTION__, __LINE__); - if (0 == do_log) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Updating peer %s activation state (%u connections)%s%s\n", + GCP_2s (cp), + GNUNET_CONTAINER_multishortmap_size (cp->connections), + (NULL == cp->t) ? "" : " with tunnel", + (NULL == cp->core_mq) ? "" : " with CORE link"); + if (NULL != cp->destroy_task) + { + /* It's active, do not destory! */ + GNUNET_SCHEDULER_cancel (cp->destroy_task); + cp->destroy_task = NULL; + } + if ( (0 == GNUNET_CONTAINER_multishortmap_size (cp->connections)) && + (NULL == cp->t) ) + { + /* We're just on a path or directly connected; don't bother too much */ + if (NULL != cp->connectivity_suggestion) + { + GNUNET_ATS_connectivity_suggest_cancel (cp->connectivity_suggestion); + cp->connectivity_suggestion = NULL; + } + if (NULL != cp->search_h) + { + GCD_search_stop (cp->search_h); + cp->search_h = NULL; + } return; - - if (NULL == p) + } + if (NULL == cp->core_mq) { - LOG2 (level, "PPP DEBUG PEER NULL\n"); - return; + /* Lacks direct connection, try to create one by querying the DHT */ + if ( (NULL == cp->search_h) && + (DESIRED_CONNECTIONS_PER_TUNNEL > cp->num_paths) ) + cp->search_h + = GCD_search (&cp->pid); } - - LOG2 (level, "PPP DEBUG PEER %s\n", GCP_2s (p)); - LOG2 (level, "PPP last contact %s\n", - GNUNET_STRINGS_absolute_time_to_string (p->last_contact)); - for (path = p->path_head; NULL != path; path = path->next) + else { - char *s; - - s = path_2s (path); - LOG2 (level, "PPP path: %s\n", s); - GNUNET_free (s); + /* Have direct connection, stop DHT search if active */ + if (NULL != cp->search_h) + { + GCD_search_stop (cp->search_h); + cp->search_h = NULL; + } } - LOG2 (level, "PPP core transmit handle %p\n", p->core_transmit); - LOG2 (level, "PPP DHT GET handle %p\n", p->search_h); - conns = 0; - if (NULL != p->connections) - conns += GNUNET_CONTAINER_multihashmap_size (p->connections); - LOG2 (level, "PPP # connections over link to peer: %u\n", conns); - queue_debug (p, level); - LOG2 (level, "PPP DEBUG END\n"); + /* If we have a tunnel, our urge for connections is much bigger */ + strength = (NULL != cp->t) ? 32 : 1; + if (NULL != cp->connectivity_suggestion) + GNUNET_ATS_connectivity_suggest_cancel (cp->connectivity_suggestion); + cp->connectivity_suggestion + = GNUNET_ATS_connectivity_suggest (ats_ch, + &cp->pid, + strength); } -/******************************************************************************/ -/***************************** CORE HELPERS *********************************/ -/******************************************************************************/ +/** + * This peer may no longer be needed, consider cleaning it up. + * + * @param cp peer to clean up + */ +static void +consider_peer_destroy (struct CadetPeer *cp); /** - * Iterator to notify all connections of a broken link. Mark connections - * to destroy after all traffic has been sent. - * - * @param cls Closure (disconnected peer). - * @param key Current key code (peer id). - * @param value Value in the hash map (connection). + * We really no longere care about a peer, stop hogging memory with paths to it. + * Afterwards, see if there is more to be cleaned up about this peer. * - * @return #GNUNET_YES to continue to iterate. + * @param cls a `struct CadetPeer`. */ -static int -notify_broken (void *cls, - const struct GNUNET_HashCode *key, - void *value) +static void +drop_paths (void *cls) { - struct CadetPeer *peer = cls; - struct CadetConnection *c = value; + struct CadetPeer *cp = cls; + struct CadetPeerPath *path; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Notifying %s due to %s disconnect\n", - GCC_2s (c), GCP_2s (peer)); - GCC_neighbor_disconnected (c, peer); - return GNUNET_YES; + cp->destroy_task = NULL; + while (NULL != (path = GNUNET_CONTAINER_heap_remove_root (cp->path_heap))) + GCPP_release (path); + consider_peer_destroy (cp); } /** - * Remove the direct path to the peer. - * - * @param peer Peer to remove the direct path from. + * This peer may no longer be needed, consider cleaning it up. * + * @param cp peer to clean up */ -static struct CadetPeerPath * -pop_direct_path (struct CadetPeer *peer) +static void +consider_peer_destroy (struct CadetPeer *cp) { - struct CadetPeerPath *iter; + struct GNUNET_TIME_Relative exp; - for (iter = peer->path_head; NULL != iter; iter = iter->next) + if (NULL != cp->destroy_task) { - if (2 >= iter->length) - { - GNUNET_CONTAINER_DLL_remove (peer->path_head, peer->path_tail, iter); - return iter; - } + GNUNET_SCHEDULER_cancel (cp->destroy_task); + cp->destroy_task = NULL; + } + if (NULL != cp->t) + return; /* still relevant! */ + if (NULL != cp->core_mq) + return; /* still relevant! */ + if (0 != GNUNET_CONTAINER_multishortmap_size (cp->connections)) + return; /* still relevant! */ + if ( (NULL != cp->path_heap) && + (0 < GNUNET_CONTAINER_heap_get_size (cp->path_heap)) ) + { + cp->destroy_task = GNUNET_SCHEDULER_add_delayed (IDLE_PATH_TIMEOUT, + &drop_paths, + cp); + return; + } + if (0 != cp->num_paths) + return; /* still relevant! */ + if (NULL != cp->hello) + { + /* relevant only until HELLO expires */ + exp = GNUNET_TIME_absolute_get_remaining (GNUNET_HELLO_get_last_expiration (cp->hello)); + cp->destroy_task = GNUNET_SCHEDULER_add_delayed (exp, + &destroy_peer, + cp); + return; } - return NULL; + cp->destroy_task = GNUNET_SCHEDULER_add_delayed (IDLE_PEER_TIMEOUT, + &destroy_peer, + cp); } -/******************************************************************************/ -/***************************** CORE CALLBACKS *********************************/ -/******************************************************************************/ - - /** - * Method called whenever a given peer connects. + * Set the message queue to @a mq for peer @a cp and notify watchers. * - * @param cls closure - * @param peer peer identity this notification is about + * @param cp peer to modify + * @param mq message queue to set (can be NULL) */ -static void -core_connect (void *cls, - const struct GNUNET_PeerIdentity *peer) +void +GCP_set_mq (struct CadetPeer *cp, + struct GNUNET_MQ_Handle *mq) { - struct CadetPeer *neighbor; - struct CadetPeerPath *path; - char own_id[16]; - - GCC_check_connections (); - GNUNET_snprintf (own_id, - sizeof (own_id), - "%s", - GNUNET_i2s (&my_full_id)); - neighbor = GCP_get (peer, GNUNET_YES); - if (myid == neighbor->id) - { - LOG (GNUNET_ERROR_TYPE_INFO, - "CONNECTED %s (self)\n", - own_id); - path = path_new (1); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message queue for peer %s is now %p\n", + GCP_2s (cp), + mq); + cp->core_mq = mq; + for (struct GCP_MessageQueueManager *mqm = cp->mqm_head, *next; + NULL != mqm; + mqm = next) + { + /* Save next pointer in case mqm gets freed by the callback */ + next = mqm->next; + if (NULL == mq) + { + if (NULL != mqm->env) + { + GNUNET_MQ_discard (mqm->env); + mqm->env = NULL; + mqm->cb (mqm->cb_cls, + GNUNET_SYSERR); + } + else + { + mqm->cb (mqm->cb_cls, + GNUNET_NO); + } + } + else + { + GNUNET_assert (NULL == mqm->env); + mqm->cb (mqm->cb_cls, + GNUNET_YES); + } } + if ( (NULL != mq) || + (NULL != cp->t) ) + consider_peer_activate (cp); else + consider_peer_destroy (cp); + + if ( (NULL != mq) && + (NULL != cp->t) ) { - LOG (GNUNET_ERROR_TYPE_INFO, - "CONNECTED %s <= %s\n", - own_id, - GNUNET_i2s (peer)); - path = path_new (2); - path->peers[1] = neighbor->id; - GNUNET_PEER_change_rc (neighbor->id, 1); + /* have a new, direct path to the target, notify tunnel */ + struct CadetPeerPath *path; + + path = GCPP_get_path_from_route (1, + &cp->pid); + GCT_consider_path (cp->t, + path, + 0); } - path->peers[0] = myid; - GNUNET_PEER_change_rc (myid, 1); - GCP_add_path (neighbor, path, GNUNET_YES); - - GNUNET_assert (NULL == neighbor->connections); - neighbor->connections = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_NO); - GNUNET_assert (NULL != neighbor->connections); - - GNUNET_STATISTICS_update (stats, - "# peers", - 1, - GNUNET_NO); - - if ( (NULL != GCP_get_tunnel (neighbor)) && - (0 > GNUNET_CRYPTO_cmp_peer_identity (&my_full_id, peer)) ) - GCP_connect (neighbor); - GCC_check_connections (); } /** - * Method called whenever a peer disconnects. + * Debug function should NEVER return true in production code, useful to + * simulate losses for testcases. * - * @param cls closure - * @param peer peer identity this notification is about + * @return #GNUNET_YES or #GNUNET_NO with the decision to drop. */ -static void -core_disconnect (void *cls, - const struct GNUNET_PeerIdentity *peer) +static int +should_I_drop (void) { - struct CadetPeer *p; - struct CadetPeerPath *direct_path; - char own_id[16]; - - GCC_check_connections (); - strncpy (own_id, GNUNET_i2s (&my_full_id), 16); - own_id[15] = '\0'; - p = GNUNET_CONTAINER_multipeermap_get (peers, peer); - if (NULL == p) - { - GNUNET_break (GNUNET_YES == in_shutdown); - return; - } - if (myid == p->id) - LOG (GNUNET_ERROR_TYPE_INFO, - "DISCONNECTED %s (self)\n", - own_id); - else - LOG (GNUNET_ERROR_TYPE_INFO, - "DISCONNECTED %s <= %s\n", - own_id, GNUNET_i2s (peer)); - direct_path = pop_direct_path (p); - GNUNET_CONTAINER_multihashmap_iterate (p->connections, - ¬ify_broken, - p); - GNUNET_CONTAINER_multihashmap_destroy (p->connections); - p->connections = NULL; - if (NULL != p->core_transmit) - { - GNUNET_CORE_notify_transmit_ready_cancel (p->core_transmit); - p->core_transmit = NULL; - p->tmt_time.abs_value_us = 0; - } - GNUNET_STATISTICS_update (stats, - "# peers", - -1, - GNUNET_NO); - path_destroy (direct_path); - GCC_check_connections (); + if (0 == drop_percent) + return GNUNET_NO; + if (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + 101) < drop_percent) + return GNUNET_YES; + return GNUNET_NO; } /** - * Functions to handle messages from core + * Function called when CORE took one of the messages from + * a message queue manager and transmitted it. + * + * @param cls the `struct CadetPeeer` where we made progress */ -static struct GNUNET_CORE_MessageHandler core_handlers[] = { - {&GCC_handle_create, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0}, - {&GCC_handle_confirm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, - sizeof (struct GNUNET_CADET_ConnectionACK)}, - {&GCC_handle_broken, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN, - sizeof (struct GNUNET_CADET_ConnectionBroken)}, - {&GCC_handle_destroy, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY, - sizeof (struct GNUNET_CADET_ConnectionDestroy)}, - {&GCC_handle_ack, GNUNET_MESSAGE_TYPE_CADET_ACK, - sizeof (struct GNUNET_CADET_ACK)}, - {&GCC_handle_poll, GNUNET_MESSAGE_TYPE_CADET_POLL, - sizeof (struct GNUNET_CADET_Poll)}, - {&GCC_handle_kx, GNUNET_MESSAGE_TYPE_CADET_KX, 0}, - {&GCC_handle_encrypted, GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED, 0}, - {&GCC_handle_encrypted, GNUNET_MESSAGE_TYPE_CADET_AX, 0}, - {NULL, 0, 0} -}; +static void +mqm_send_done (void *cls); /** - * To be called on core init/fail. + * Transmit current envelope from this @a mqm. * - * @param cls Closure (config) - * @param identity the public identity of this peer + * @param mqm mqm to transmit message for now */ static void -core_init (void *cls, - const struct GNUNET_PeerIdentity *identity) +mqm_execute (struct GCP_MessageQueueManager *mqm) { - const struct GNUNET_CONFIGURATION_Handle *c = cls; - static int i = 0; + struct CadetPeer *cp = mqm->cp; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n"); - if (0 != memcmp (identity, &my_full_id, sizeof (my_full_id))) + /* Move ready pointer to the next entry that might be ready. */ + if ( (mqm == cp->mqm_ready_ptr) && + (NULL != mqm->next) ) + cp->mqm_ready_ptr = mqm->next; + /* Move entry to the end of the DLL, to be fair. */ + if (mqm != cp->mqm_tail) + { + GNUNET_CONTAINER_DLL_remove (cp->mqm_head, + cp->mqm_tail, + mqm); + GNUNET_CONTAINER_DLL_insert_tail (cp->mqm_head, + cp->mqm_tail, + mqm); + } + cp->mqm_ready_counter--; + if (GNUNET_YES == should_I_drop ()) { - LOG (GNUNET_ERROR_TYPE_ERROR, _("Wrong CORE service\n")); - LOG (GNUNET_ERROR_TYPE_ERROR, " core id %s\n", GNUNET_i2s (identity)); - LOG (GNUNET_ERROR_TYPE_ERROR, " my id %s\n", GNUNET_i2s (&my_full_id)); - GNUNET_CORE_disconnect (core_handle); - core_handle = GNUNET_CORE_connect (c, /* Main configuration */ - NULL, /* Closure passed to CADET functions */ - &core_init, /* Call core_init once connected */ - &core_connect, /* Handle connects */ - &core_disconnect, /* remove peers on disconnects */ - NULL, /* Don't notify about all incoming messages */ - GNUNET_NO, /* For header only in notification */ - NULL, /* Don't notify about all outbound messages */ - GNUNET_NO, /* For header-only out notification */ - core_handlers); /* Register these handlers */ - if (10 < i++) - GNUNET_assert (0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "DROPPING message to peer %s from MQM %p\n", + GCP_2s (cp), + mqm); + GNUNET_MQ_discard (mqm->env); + mqm->env = NULL; + mqm_send_done (cp); } - GML_start (); + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending to peer %s from MQM %p\n", + GCP_2s (cp), + mqm); + GNUNET_MQ_send (cp->core_mq, + mqm->env); + mqm->env = NULL; + } + mqm->cb (mqm->cb_cls, + GNUNET_YES); } /** - * Core callback to write a pre-constructed data packet to core buffer - * - * @param cls Closure (CadetTransmissionDescriptor with data in "data" member). - * @param size Number of bytes available in buf. - * @param buf Where the to write the message. - * - * @return number of bytes written to buf - */ -static size_t -send_core_data_raw (void *cls, size_t size, void *buf) + * Find the next ready message in the queue (starting + * the search from the `cp->mqm_ready_ptr`) and if possible + * execute the transmission. + * + * @param cp peer to try to send the next ready message to + */ +static void +send_next_ready (struct CadetPeer *cp) { - struct GNUNET_MessageHeader *msg = cls; - size_t total_size; + struct GCP_MessageQueueManager *mqm; - GNUNET_assert (NULL != msg); - total_size = ntohs (msg->size); - - if (total_size > size) - { - GNUNET_break (0); - return 0; - } - memcpy (buf, msg, total_size); - GNUNET_free (cls); - return total_size; + if (0 == cp->mqm_ready_counter) + return; + while ( (NULL != (mqm = cp->mqm_ready_ptr)) && + (NULL == mqm->env) ) + cp->mqm_ready_ptr = mqm->next; + if (NULL == mqm) + return; /* nothing to do */ + mqm_execute (mqm); } /** - * Function to send a create connection message to a peer. + * Function called when CORE took one of the messages from + * a message queue manager and transmitted it. * - * @param c Connection to create. - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls the `struct CadetPeeer` where we made progress */ -static size_t -send_core_connection_create (struct CadetConnection *c, size_t size, void *buf) +static void +mqm_send_done (void *cls) { - struct GNUNET_CADET_ConnectionCreate *msg; - struct GNUNET_PeerIdentity *peer_ptr; - const struct CadetPeerPath *p = GCC_get_path (c); - size_t size_needed; - int i; - - if (NULL == p) - return 0; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION CREATE...\n"); - size_needed = - sizeof (struct GNUNET_CADET_ConnectionCreate) + - p->length * sizeof (struct GNUNET_PeerIdentity); - - if (size < size_needed || NULL == buf) - { - GNUNET_break (0); - return 0; - } - msg = (struct GNUNET_CADET_ConnectionCreate *) buf; - msg->header.size = htons (size_needed); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE); - msg->cid = *GCC_get_id (c); - - peer_ptr = (struct GNUNET_PeerIdentity *) &msg[1]; - for (i = 0; i < p->length; i++) - { - GNUNET_PEER_resolve (p->peers[i], peer_ptr++); - } + struct CadetPeer *cp = cls; LOG (GNUNET_ERROR_TYPE_DEBUG, - "CONNECTION CREATE (%u bytes long) sent!\n", - size_needed); - return size_needed; + "Sending to peer %s completed\n", + GCP_2s (cp)); + send_next_ready (cp); } /** - * Creates a path ack message in buf and frees all unused resources. + * Send the message in @a env to @a cp. * - * @param c Connection to send an ACK on. - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * - * @return number of bytes written to buf + * @param mqm the message queue manager to use for transmission + * @param env envelope with the message to send; must NOT + * yet have a #GNUNET_MQ_notify_sent() callback attached to it */ -static size_t -send_core_connection_ack (struct CadetConnection *c, size_t size, void *buf) +void +GCP_send (struct GCP_MessageQueueManager *mqm, + struct GNUNET_MQ_Envelope *env) { - struct GNUNET_CADET_ConnectionACK *msg = buf; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION ACK...\n"); - if (sizeof (struct GNUNET_CADET_ConnectionACK) > size) - { - GNUNET_break (0); - return 0; - } - msg->header.size = htons (sizeof (struct GNUNET_CADET_ConnectionACK)); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK); - msg->cid = *GCC_get_id (c); + struct CadetPeer *cp = mqm->cp; - LOG (GNUNET_ERROR_TYPE_DEBUG, "CONNECTION ACK sent!\n"); - return sizeof (struct GNUNET_CADET_ConnectionACK); + GNUNET_assert (NULL != env); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queueing message to peer %s in MQM %p\n", + GCP_2s (cp), + mqm); + GNUNET_assert (NULL != cp->core_mq); + GNUNET_assert (NULL == mqm->env); + GNUNET_MQ_notify_sent (env, + &mqm_send_done, + cp); + mqm->env = env; + cp->mqm_ready_counter++; + if (mqm != cp->mqm_ready_ptr) + cp->mqm_ready_ptr = cp->mqm_head; + if (1 == cp->mqm_ready_counter) + cp->mqm_ready_ptr = mqm; + if (0 != GNUNET_MQ_get_length (cp->core_mq)) + return; + send_next_ready (cp); } -/******************************************************************************/ -/******************************** STATIC ***********************************/ -/******************************************************************************/ - - /** - * Get priority for a queued message. + * Function called to destroy a peer now. * - * @param q Queued message - * - * @return CORE priority to use. + * @param cls NULL + * @param pid identity of the peer (unused) + * @param value the `struct CadetPeer` to clean up + * @return #GNUNET_OK (continue to iterate) */ -static enum GNUNET_CORE_Priority -get_priority (struct CadetPeerQueue *q) +static int +destroy_iterator_cb (void *cls, + const struct GNUNET_PeerIdentity *pid, + void *value) { - enum GNUNET_CORE_Priority low; - enum GNUNET_CORE_Priority high; + struct CadetPeer *cp = value; - if (NULL == q) + if (NULL != cp->destroy_task) { - GNUNET_break (0); - return GNUNET_CORE_PRIO_BACKGROUND; + GNUNET_SCHEDULER_cancel (cp->destroy_task); + cp->destroy_task = NULL; } + destroy_peer (cp); + return GNUNET_OK; +} - /* Relayed traffic has lower priority, our own traffic has higher */ - if (NULL == q->c || GNUNET_NO == GCC_is_origin (q->c, q->fwd)) - { - low = GNUNET_CORE_PRIO_BEST_EFFORT; - high = GNUNET_CORE_PRIO_URGENT; - } - else - { - low = GNUNET_CORE_PRIO_URGENT; - high = GNUNET_CORE_PRIO_CRITICAL_CONTROL; - } - /* Bulky payload has lower priority, control traffic has higher. */ - if (GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED == q->type - || GNUNET_MESSAGE_TYPE_CADET_AX == q->type) - return low; - else - return high; +/** + * Clean up all entries about all peers. + * Must only be called after all tunnels, CORE-connections and + * connections are down. + */ +void +GCP_destroy_all_peers () +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Destroying all peers now\n"); + GNUNET_CONTAINER_multipeermap_iterate (peers, + &destroy_iterator_cb, + NULL); } /** - * Destroy the peer_info and free any allocated resources linked to it + * Drop all paths owned by this peer, and do not + * allow new ones to be added: We are shutting down. * - * @param peer The peer_info to destroy. - * @return #GNUNET_OK on success + * @param cp peer to drop paths to */ -static int -peer_destroy (struct CadetPeer *peer) +void +GCP_drop_owned_paths (struct CadetPeer *cp) { - struct GNUNET_PeerIdentity id; - struct CadetPeerPath *p; - struct CadetPeerPath *nextp; - - GNUNET_PEER_resolve (peer->id, &id); - GNUNET_PEER_change_rc (peer->id, -1); - - LOG (GNUNET_ERROR_TYPE_INFO, - "destroying peer %s\n", - GNUNET_i2s (&id)); + struct CadetPeerPath *path; - if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove (peers, &id, peer)) - { - GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_WARNING, " peer not in peermap!!\n"); - } - GCP_stop_search (peer); - p = peer->path_head; - while (NULL != p) - { - nextp = p->next; - GNUNET_CONTAINER_DLL_remove (peer->path_head, peer->path_tail, p); - path_destroy (p); - p = nextp; - } - if (NULL != peer->tunnel) - GCT_destroy_empty (peer->tunnel); - if (NULL != peer->connections) - { - GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (peer->connections)); - GNUNET_CONTAINER_multihashmap_destroy (peer->connections); - } - if (NULL != peer->core_transmit) - GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); - GNUNET_free_non_null (peer->hello); - GNUNET_free (peer); - return GNUNET_OK; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Destroying all paths to %s\n", + GCP_2s (cp)); + while (NULL != (path = + GNUNET_CONTAINER_heap_remove_root (cp->path_heap))) + GCPP_release (path); + GNUNET_CONTAINER_heap_destroy (cp->path_heap); + cp->path_heap = NULL; } /** - * Iterator over peer hash map entries to destroy the peer during in_shutdown. + * Add an entry to the DLL of all of the paths that this peer is on. * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return #GNUNET_YES if we should continue to iterate, - * #GNUNET_NO if not. + * @param cp peer to modify + * @param entry an entry on a path + * @param off offset of this peer on the path */ -static int -shutdown_peer (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) +void +GCP_path_entry_add (struct CadetPeer *cp, + struct CadetPeerPathEntry *entry, + unsigned int off) { - struct CadetPeer *p = value; - struct CadetTunnel *t = p->tunnel; - LOG (GNUNET_ERROR_TYPE_DEBUG, " shutting down %s\n", GCP_2s (p)); - if (NULL != t) - GCT_destroy (t); - p->tunnel = NULL; - peer_destroy (p); - return GNUNET_YES; + GNUNET_assert (cp == GCPP_get_peer_at_offset (entry->path, + off)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Discovered that peer %s is on path %s at offset %u\n", + GCP_2s (cp), + GCPP_2s (entry->path), + off); + if (off >= cp->path_dll_length) + { + unsigned int len = cp->path_dll_length; + + GNUNET_array_grow (cp->path_heads, + len, + off + 4); + GNUNET_array_grow (cp->path_tails, + cp->path_dll_length, + off + 4); + } + GNUNET_CONTAINER_DLL_insert (cp->path_heads[off], + cp->path_tails[off], + entry); + cp->off_sum += off; + cp->num_paths++; + + /* If we have a tunnel to this peer, tell the tunnel that there is a + new path available. */ + if (NULL != cp->t) + GCT_consider_path (cp->t, + entry->path, + off); + + if ( (NULL != cp->search_h) && + (DESIRED_CONNECTIONS_PER_TUNNEL <= cp->num_paths) ) + { + /* Now I have enough paths, stop search */ + GCD_search_stop (cp->search_h); + cp->search_h = NULL; + } + if (NULL != cp->destroy_task) + { + /* paths changed, this resets the destroy timeout counter + and aborts a destroy task that may no longer be valid + to have (as we now have more paths via this peer). */ + consider_peer_destroy (cp); + } } - /** - * Check if peer is searching for a path (either active or delayed search). + * Remove an entry from the DLL of all of the paths that this peer is on. * - * @param peer Peer to check - * @return #GNUNET_YES if there is a search active. - * #GNUNET_NO otherwise. + * @param cp peer to modify + * @param entry an entry on a path + * @param off offset of this peer on the path */ -static int -is_searching (const struct CadetPeer *peer) +void +GCP_path_entry_remove (struct CadetPeer *cp, + struct CadetPeerPathEntry *entry, + unsigned int off) { - return ( (NULL == peer->search_h) && - (NULL == peer->search_delayed) ) ? - GNUNET_NO : GNUNET_YES; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Removing knowledge about peer %s beging on path %s at offset %u\n", + GCP_2s (cp), + GCPP_2s (entry->path), + off); + GNUNET_CONTAINER_DLL_remove (cp->path_heads[off], + cp->path_tails[off], + entry); + GNUNET_assert (0 < cp->num_paths); + cp->off_sum -= off; + cp->num_paths--; + if ( (NULL == cp->core_mq) && + (NULL != cp->t) && + (NULL == cp->search_h) && + (DESIRED_CONNECTIONS_PER_TUNNEL > cp->num_paths) ) + cp->search_h + = GCD_search (&cp->pid); + if (NULL == cp->destroy_task) + { + /* paths changed, we might now be ready for destruction, check again */ + consider_peer_destroy (cp); + } } /** - * @brief Start a search for a peer. + * Prune down the number of paths to this peer, we seem to + * have way too many. * - * @param cls Closure (Peer to search for). - * @param tc Task context. + * @param cls the `struct CadetPeer` to maintain the path heap for */ static void -delayed_search (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +path_heap_cleanup (void *cls) { - struct CadetPeer *peer = cls; - - peer->search_delayed = NULL; - if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason)) - return; - GCC_check_connections (); - GCP_start_search (peer); - GCC_check_connections (); + struct CadetPeer *cp = cls; + struct CadetPeerPath *root; + + cp->heap_cleanup_task = NULL; + while (GNUNET_CONTAINER_heap_get_size (cp->path_heap) >= + 2 * DESIRED_CONNECTIONS_PER_TUNNEL) + { + /* Now we have way too many, drop least desirable UNLESS it is in use! + (Note that this intentionally keeps highly desireable, but currently + unused paths around in the hope that we might be able to switch, even + if the number of paths exceeds the threshold.) */ + root = GNUNET_CONTAINER_heap_peek (cp->path_heap); + GNUNET_assert (NULL != root); + if (NULL != + GCPP_get_connection (root, + cp, + GCPP_get_length (root) - 1)) + break; /* can't fix */ + /* Got plenty of paths to this destination, and this is a low-quality + one that we don't care about. Allow it to die. */ + GNUNET_assert (root == + GNUNET_CONTAINER_heap_remove_root (cp->path_heap)); + GCPP_release (root); + } } /** - * Returns if peer is used (has a tunnel or is neighbor). + * Try adding a @a path to this @a peer. If the peer already + * has plenty of paths, return NULL. * - * @param peer Peer to check. - * @return #GNUNET_YES if peer is in use. + * @param cp peer to which the @a path leads to + * @param path a path looking for an owner; may not be fully initialized yet! + * @param off offset of @a cp in @a path + * @param force force attaching the path + * @return NULL if this peer does not care to become a new owner, + * otherwise the node in the peer's path heap for the @a path. */ -static int -peer_is_used (struct CadetPeer *peer) +struct GNUNET_CONTAINER_HeapNode * +GCP_attach_path (struct CadetPeer *cp, + struct CadetPeerPath *path, + unsigned int off, + int force) { - struct CadetPeerPath *p; - - if (NULL != peer->tunnel) - return GNUNET_YES; - - for (p = peer->path_head; NULL != p; p = p->next) + GNUNET_CONTAINER_HeapCostType desirability; + struct CadetPeerPath *root; + GNUNET_CONTAINER_HeapCostType root_desirability; + struct GNUNET_CONTAINER_HeapNode *hn; + + GNUNET_assert (off == GCPP_get_length (path) - 1); + GNUNET_assert (cp == GCPP_get_peer_at_offset (path, + off)); + if (NULL == cp->path_heap) + { + /* #GCP_drop_owned_paths() was already called, we cannot take new ones! */ + GNUNET_assert (GNUNET_NO == force); + return NULL; + } + desirability = GCPP_get_desirability (path); + if (GNUNET_NO == force) { - if (p->length < 3) - return GNUNET_YES; + /* FIXME: desirability is not yet initialized; tricky! */ + if (GNUNET_NO == + GNUNET_CONTAINER_heap_peek2 (cp->path_heap, + (void **) &root, + &root_desirability)) + { + root = NULL; + root_desirability = 0; + } + + if ( (DESIRED_CONNECTIONS_PER_TUNNEL > cp->num_paths) && + (desirability < root_desirability) ) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Decided to not attach path %p to peer %s due to undesirability\n", + GCPP_2s (path), + GCP_2s (cp)); + return NULL; + } } - return GNUNET_NO; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Attaching path %s to peer %s (%s)\n", + GCPP_2s (path), + GCP_2s (cp), + (GNUNET_NO == force) ? "desirable" : "forced"); + + /* Yes, we'd like to add this path, add to our heap */ + hn = GNUNET_CONTAINER_heap_insert (cp->path_heap, + path, + desirability); + + /* Consider maybe dropping other paths because of the new one */ + if ( (GNUNET_CONTAINER_heap_get_size (cp->path_heap) >= + 2 * DESIRED_CONNECTIONS_PER_TUNNEL) && + (NULL != cp->heap_cleanup_task) ) + cp->heap_cleanup_task = GNUNET_SCHEDULER_add_now (&path_heap_cleanup, + cp); + return hn; } /** - * Iterator over all the peers to get the oldest timestamp. + * This peer can no longer own @a path as the path + * has been extended and a peer further down the line + * is now the new owner. * - * @param cls Closure (unsued). - * @param key ID of the peer. - * @param value Peer_Info of the peer. + * @param cp old owner of the @a path + * @param path path where the ownership is lost + * @param hn note in @a cp's path heap that must be deleted */ -static int -peer_get_oldest (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) +void +GCP_detach_path (struct CadetPeer *cp, + struct CadetPeerPath *path, + struct GNUNET_CONTAINER_HeapNode *hn) { - struct CadetPeer *p = value; - struct GNUNET_TIME_Absolute *abs = cls; - - /* Don't count active peers */ - if (GNUNET_YES == peer_is_used (p)) - return GNUNET_YES; - - if (abs->abs_value_us < p->last_contact.abs_value_us) - abs->abs_value_us = p->last_contact.abs_value_us; - - return GNUNET_YES; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Detatching path %s from peer %s\n", + GCPP_2s (path), + GCP_2s (cp)); + GNUNET_assert (path == + GNUNET_CONTAINER_heap_remove_node (hn)); } /** - * Iterator over all the peers to remove the oldest entry. + * Add a @a connection to this @a cp. * - * @param cls Closure (unsued). - * @param key ID of the peer. - * @param value Peer_Info of the peer. + * @param cp peer via which the @a connection goes + * @param cc the connection to add */ -static int -peer_timeout (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) +void +GCP_add_connection (struct CadetPeer *cp, + struct CadetConnection *cc) { - struct CadetPeer *p = value; - struct GNUNET_TIME_Absolute *abs = cls; - - LOG (GNUNET_ERROR_TYPE_WARNING, - "peer %s timeout\n", GNUNET_i2s (key)); - - if (p->last_contact.abs_value_us == abs->abs_value_us && - GNUNET_NO == peer_is_used (p)) - { - peer_destroy (p); - return GNUNET_NO; - } - return GNUNET_YES; -} - - -/** - * Delete oldest unused peer. - */ -static void -peer_delete_oldest (void) -{ - struct GNUNET_TIME_Absolute abs; - - abs = GNUNET_TIME_UNIT_FOREVER_ABS; - - GNUNET_CONTAINER_multipeermap_iterate (peers, - &peer_get_oldest, - &abs); - GNUNET_CONTAINER_multipeermap_iterate (peers, - &peer_timeout, - &abs); -} - - -/** - * Choose the best (yet unused) path towards a peer, - * considering the tunnel properties. - * - * @param peer The destination peer. - * @return Best current known path towards the peer, if any. - */ -static struct CadetPeerPath * -peer_get_best_path (const struct CadetPeer *peer) -{ - struct CadetPeerPath *best_p; - struct CadetPeerPath *p; - unsigned int best_cost; - unsigned int cost; - - best_cost = UINT_MAX; - best_p = NULL; - for (p = peer->path_head; NULL != p; p = p->next) - { - if (GNUNET_NO == path_is_valid (p)) - continue; /* Don't use invalid paths. */ - if (GNUNET_YES == GCT_is_path_used (peer->tunnel, p)) - continue; /* If path is already in use, skip it. */ - - if ((cost = GCT_get_path_cost (peer->tunnel, p)) < best_cost) - { - best_cost = cost; - best_p = p; - } - } - return best_p; -} - - -/** - * Is this queue element sendable? - * - * - All management traffic is always sendable. - * - For payload traffic, check the connection flow control. - * - * @param q Queue element to inspect. - * @return #GNUNET_YES if it is sendable, #GNUNET_NO otherwise. - */ -static int -queue_is_sendable (struct CadetPeerQueue *q) -{ - /* Is PID-independent? */ - switch (q->type) - { - case GNUNET_MESSAGE_TYPE_CADET_ACK: - case GNUNET_MESSAGE_TYPE_CADET_POLL: - case GNUNET_MESSAGE_TYPE_CADET_KX: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: - return GNUNET_YES; - - case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED: - case GNUNET_MESSAGE_TYPE_CADET_AX: - break; - - default: - GNUNET_break (0); - } - - return GCC_is_sendable (q->c, q->fwd); -} - - -/** - * Get first sendable message. - * - * @param peer The destination peer. - * - * @return First transmittable message, if any. Otherwise, NULL. - */ -static struct CadetPeerQueue * -peer_get_first_message (const struct CadetPeer *peer) -{ - struct CadetPeerQueue *q; - - for (q = peer->queue_head; NULL != q; q = q->next) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Checking q:%p on c:%s\n", q, GCC_2s (q->c)); - if (queue_is_sendable (q)) - return q; - } - - return NULL; -} - - -/** - * Function to process paths received for a new peer addition. The recorded - * paths form the initial tunnel, which can be optimized later. - * Called on each result obtained for the DHT search. - * - * @param cls Closure (peer towards a path has been found). - * @param path Path created from the DHT query. Will be freed afterwards. - */ -static void -search_handler (void *cls, const struct CadetPeerPath *path) -{ - struct CadetPeer *peer = cls; - unsigned int connection_count; - - GCC_check_connections (); - GCP_add_path_to_all (path, GNUNET_NO); - - /* Count connections */ - connection_count = GCT_count_connections (peer->tunnel); - - /* If we already have our minimum (or more) connections, it's enough */ - if (CONNECTIONS_PER_TUNNEL <= connection_count) - { - GCC_check_connections (); - return; - } - - if (CADET_TUNNEL_SEARCHING == GCT_get_cstate (peer->tunnel)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " ... connect!\n"); - GCP_connect (peer); - } - GCC_check_connections (); -} - - -/** - * Adjust core requested size to accomodate an ACK. - * - * @param message_size Requested size. - * - * @return Size enough to fit @c message_size and an ACK. - */ -static size_t -get_core_size (size_t message_size) -{ - return message_size + sizeof (struct GNUNET_CADET_ACK); -} - - -/** - * Fill a core buffer with the appropriate data for the queued message. - * - * @param queue Queue element for the message. - * @param buf Core buffer to fill. - * @param size Size remaining in @c buf. - * @param[out] pid In case its an encrypted payload, set payload. - * - * @return Bytes written to @c buf. - */ -static size_t -fill_buf (struct CadetPeerQueue *queue, void *buf, size_t size, uint32_t *pid) -{ - struct CadetConnection *c = queue->c; - size_t msg_size; - - switch (queue->type) - { - case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED: - *pid = GCC_get_pid (queue->c, queue->fwd); - LOG (GNUNET_ERROR_TYPE_DEBUG, " otr payload ID %u\n", *pid); - msg_size = send_core_data_raw (queue->cls, size, buf); - ((struct GNUNET_CADET_Encrypted *) buf)->pid = htonl (*pid); - break; - case GNUNET_MESSAGE_TYPE_CADET_AX: - *pid = GCC_get_pid (queue->c, queue->fwd); - LOG (GNUNET_ERROR_TYPE_DEBUG, " ax payload ID %u\n", *pid); - msg_size = send_core_data_raw (queue->cls, size, buf); - ((struct GNUNET_CADET_AX *) buf)->pid = htonl (*pid); - break; - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: - case GNUNET_MESSAGE_TYPE_CADET_KX: - case GNUNET_MESSAGE_TYPE_CADET_ACK: - case GNUNET_MESSAGE_TYPE_CADET_POLL: - LOG (GNUNET_ERROR_TYPE_DEBUG, " raw %s\n", GC_m2s (queue->type)); - msg_size = send_core_data_raw (queue->cls, size, buf); - break; - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: - LOG (GNUNET_ERROR_TYPE_DEBUG, " path create\n"); - if (GCC_is_origin (c, GNUNET_YES)) - msg_size = send_core_connection_create (c, size, buf); - else - msg_size = send_core_data_raw (queue->cls, size, buf); - break; - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: - LOG (GNUNET_ERROR_TYPE_DEBUG, " path ack\n"); - if (GCC_is_origin (c, GNUNET_NO) || - GCC_is_origin (c, GNUNET_YES)) - { - msg_size = send_core_connection_ack (c, size, buf); - } - else - { - msg_size = send_core_data_raw (queue->cls, size, buf); - } - break; - case GNUNET_MESSAGE_TYPE_CADET_DATA: - case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE: - case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY: - /* This should be encapsulted */ - msg_size = 0; - GNUNET_assert (0); - break; - default: - GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_WARNING, " type unknown: %u\n", queue->type); - msg_size = 0; - } - - GNUNET_assert (size >= msg_size); - - return msg_size; -} - - -/** - * Core callback to write a queued packet to core buffer - * - * @param cls Closure (peer info). - * @param size Number of bytes available in buf. - * @param buf Where the to write the message. - * - * @return number of bytes written to buf - */ -static size_t -queue_send (void *cls, size_t size, void *buf) -{ - struct CadetPeer *peer = cls; - struct CadetConnection *c; - struct CadetPeerQueue *queue; - struct GNUNET_TIME_Relative core_wait_time; - const char *wait_s; - const struct GNUNET_PeerIdentity *dst_id; - size_t msg_size; - size_t total_size; - size_t rest; - char *dst; - uint32_t pid; - - GCC_check_connections (); - LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); - LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue send towards %s (max %u)\n", - GCP_2s (peer), size); - - /* Sanity checking */ - if (NULL == buf || 0 == size) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " not allowed/\n"); - if (GNUNET_NO == in_shutdown) - { - queue = peer_get_first_message (peer); - if (NULL == queue) - { - peer->core_transmit = NULL; - peer->tmt_time.abs_value_us = 0; - GCC_check_connections (); - return 0; - } - dst_id = GNUNET_PEER_resolve2 (peer->id); - peer->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, - GNUNET_NO, get_priority (queue), - GNUNET_TIME_UNIT_FOREVER_REL, - dst_id, - get_core_size (queue->size), - &queue_send, - peer); - peer->tmt_time = GNUNET_TIME_absolute_get (); - } - else - { - peer->core_transmit = NULL; - peer->tmt_time.abs_value_us = 0; - } - GCC_check_connections (); - return 0; - } - - /* Init */ - rest = size; - total_size = 0; - dst = (char *) buf; - pid = 0; - peer->core_transmit = NULL; - queue = peer_get_first_message (peer); - if (NULL == queue) - { - GNUNET_break (0); /* Core tmt_rdy should've been canceled */ - peer->tmt_time.abs_value_us = 0; - return 0; - } - core_wait_time = GNUNET_TIME_absolute_get_duration (peer->tmt_time); - wait_s = GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_YES); - if (core_wait_time.rel_value_us >= 1000000) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - " %s: core wait time %s (> 1 second) for %u bytes\n", - GCP_2s (peer), wait_s, queue->size); - } - peer->tmt_time.abs_value_us = 0; - - /* Copy all possible messages to the core buffer */ - while (NULL != queue && rest >= queue->size) - { - c = queue->c; - - LOG (GNUNET_ERROR_TYPE_DEBUG, " on conn %s %s\n", - GCC_2s (c), GC_f2s(queue->fwd)); - LOG (GNUNET_ERROR_TYPE_DEBUG, " size %u ok (%u/%u)\n", - queue->size, total_size, size); - - msg_size = fill_buf (queue, (void *) dst, size, &pid); - - if (0 < drop_percent && - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 101) < drop_percent) - { - LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on conn %s %s\n", - GC_m2s (queue->type), GC_m2s (queue->payload_type), - queue->payload_id, GCC_2s (c), GC_f2s (queue->fwd)); - msg_size = 0; - } - else - { - LOG (GNUNET_ERROR_TYPE_INFO, - ">>> %s (%s %4u) on conn %s (%p) %s [%5u], after %s\n", - GC_m2s (queue->type), GC_m2s (queue->payload_type), - queue->payload_id, GCC_2s (c), c, - GC_f2s (queue->fwd), msg_size, wait_s); - } - total_size += msg_size; - rest -= msg_size; - dst = &dst[msg_size]; - msg_size = 0; - - /* Free queue, but cls was freed by send_core_* in fill_buf. */ - (void) GCP_queue_destroy (queue, GNUNET_NO, GNUNET_YES, pid); - - /* Next! */ - queue = peer_get_first_message (peer); - } - - /* If more data in queue, send next */ - if (NULL != queue) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " more data! (%u)\n", queue->size); - if (NULL == peer->core_transmit) - { - dst_id = GNUNET_PEER_resolve2 (peer->id); - peer->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, - GNUNET_NO, get_priority (queue), - GNUNET_TIME_UNIT_FOREVER_REL, - dst_id, - get_core_size (queue->size), - &queue_send, - peer); - peer->tmt_time = GNUNET_TIME_absolute_get (); - queue->start_waiting = GNUNET_TIME_absolute_get (); - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "* tmt rdy called somewhere else\n"); - } -// GCC_start_poll (); FIXME needed? - } - else - { -// GCC_stop_poll(); FIXME needed? - } - - LOG (GNUNET_ERROR_TYPE_DEBUG, " return %d\n", total_size); - queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG); - GCC_check_connections (); - return total_size; -} - - -/******************************************************************************/ -/******************************** API ***********************************/ -/******************************************************************************/ - - -/** - * Free a transmission that was already queued with all resources - * associated to the request. - * - * If connection was marked to be destroyed, and this was the last queued - * message on it, the connection will be free'd as a result. - * - * @param queue Queue handler to cancel. - * @param clear_cls Is it necessary to free associated cls? - * @param sent Was it really sent? (Could have been canceled) - * @param pid PID, if relevant (was sent and was a payload message). - * - * @return #GNUNET_YES if connection was destroyed as a result, - * #GNUNET_NO otherwise. - */ -int -GCP_queue_destroy (struct CadetPeerQueue *queue, - int clear_cls, - int sent, - uint32_t pid) -{ - struct CadetPeer *peer; - int connection_destroyed; - - GCC_check_connections (); - peer = queue->peer; - LOG (GNUNET_ERROR_TYPE_DEBUG, "queue destroy %s\n", GC_m2s (queue->type)); - if (GNUNET_YES == clear_cls) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " free cls\n"); - switch (queue->type) - { - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: - LOG (GNUNET_ERROR_TYPE_INFO, "destroying a DESTROY message\n"); - /* fall through */ - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: - case GNUNET_MESSAGE_TYPE_CADET_KX: - case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED: - case GNUNET_MESSAGE_TYPE_CADET_AX: - case GNUNET_MESSAGE_TYPE_CADET_ACK: - case GNUNET_MESSAGE_TYPE_CADET_POLL: - GNUNET_free_non_null (queue->cls); - break; - - default: - GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_ERROR, " type %s unknown!\n", - GC_m2s (queue->type)); - } - } - GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue); - - if (queue->type != GNUNET_MESSAGE_TYPE_CADET_ACK && - queue->type != GNUNET_MESSAGE_TYPE_CADET_POLL) - { - peer->queue_n--; - } - - if (NULL != queue->cont) - { - struct GNUNET_TIME_Relative wait_time; - - wait_time = GNUNET_TIME_absolute_get_duration (queue->start_waiting); - LOG (GNUNET_ERROR_TYPE_DEBUG, " calling callback, time elapsed %s\n", - GNUNET_STRINGS_relative_time_to_string (wait_time, GNUNET_NO)); - connection_destroyed = queue->cont (queue->cont_cls, - queue->c, sent, queue->type, pid, - queue->fwd, queue->size, wait_time); - } - else - { - connection_destroyed = GNUNET_NO; - } - - if (NULL == peer_get_first_message (peer) && NULL != peer->core_transmit) - { - GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); - peer->core_transmit = NULL; - peer->tmt_time.abs_value_us = 0; - } - - GNUNET_free (queue); - GCC_check_connections (); - return connection_destroyed; -} - - -/** - * @brief Queue and pass message to core when possible. - * - * @param peer Peer towards which to queue the message. - * @param cls Closure (@c type dependant). It will be used by queue_send to - * build the message to be sent if not already prebuilt. - * @param type Type of the message. - * @param payload_type Type of the message's payload - * 0 if the message is a retransmission (unknown payload). - * UINT16_MAX if the message does not have payload. - * @param payload_id ID of the payload (MID, ACK #, etc) - * @param size Size of the message. - * @param c Connection this message belongs to (can be NULL). - * @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!) - * @param cont Continuation to be called once CORE has taken the message. - * @param cont_cls Closure for @c cont. - * - * @return Handle to cancel the message before it is sent. Once cont is called - * message has been sent and therefore the handle is no longer valid. - */ -struct CadetPeerQueue * -GCP_queue_add (struct CadetPeer *peer, - void *cls, - uint16_t type, - uint16_t payload_type, - uint32_t payload_id, - size_t size, - struct CadetConnection *c, - int fwd, - GCP_sent cont, - void *cont_cls) -{ - struct CadetPeerQueue *q; - int priority; - int call_core; - - GCC_check_connections (); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "que %s (%s %4u) on conn %s (%p) %s towards %s (size %u)\n", - GC_m2s (type), GC_m2s (payload_type), payload_id, - GCC_2s (c), c, GC_f2s (fwd), GCP_2s (peer), size); - - if (NULL == peer->connections) - { - /* We are not connected to this peer, ignore request. */ - LOG (GNUNET_ERROR_TYPE_INFO, "%s not a neighbor\n", GCP_2s (peer)); - GNUNET_STATISTICS_update (stats, "# messages dropped due to wrong hop", 1, - GNUNET_NO); - return NULL; - } - - priority = 0; - - if (GNUNET_MESSAGE_TYPE_CADET_POLL == type || - GNUNET_MESSAGE_TYPE_CADET_ACK == type) - { - priority = 100; - } - - LOG (GNUNET_ERROR_TYPE_DEBUG, "priority %d\n", priority); - - call_core = (NULL == c || type == GNUNET_MESSAGE_TYPE_CADET_KX) ? - GNUNET_YES : GCC_is_sendable (c, fwd); - q = GNUNET_new (struct CadetPeerQueue); - q->cls = cls; - q->type = type; - q->payload_type = payload_type; - q->payload_id = payload_id; - q->size = size; - q->peer = peer; - q->c = c; - q->fwd = fwd; - q->cont = cont; - q->cont_cls = cont_cls; - if (100 > priority) - { - GNUNET_CONTAINER_DLL_insert_tail (peer->queue_head, peer->queue_tail, q); - peer->queue_n++; - } - else - { - GNUNET_CONTAINER_DLL_insert (peer->queue_head, peer->queue_tail, q); - call_core = GNUNET_YES; - } - - q->start_waiting = GNUNET_TIME_absolute_get (); - if (NULL == peer->core_transmit && GNUNET_YES == call_core) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "calling core tmt rdy towards %s for %u bytes\n", - GCP_2s (peer), size); - peer->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, - GNUNET_NO, get_priority (q), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_PEER_resolve2 (peer->id), - get_core_size (size), - &queue_send, peer); - peer->tmt_time = GNUNET_TIME_absolute_get (); - } - else if (GNUNET_NO == call_core) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s not needed\n", - GCP_2s (peer)); - - } - else - { - struct GNUNET_TIME_Relative elapsed; - elapsed = GNUNET_TIME_absolute_get_duration (peer->tmt_time); - LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s already called %s\n", - GCP_2s (peer), - GNUNET_STRINGS_relative_time_to_string (elapsed, GNUNET_NO)); - - } - queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG); - GCC_check_connections (); - return q; -} - - -/** - * Cancel all queued messages to a peer that belong to a certain connection. - * - * @param peer Peer towards whom to cancel. - * @param c Connection whose queued messages to cancel. Might be destroyed by - * the sent continuation call. - */ -void -GCP_queue_cancel (struct CadetPeer *peer, - struct CadetConnection *c) -{ - struct CadetPeerQueue *q; - struct CadetPeerQueue *next; - struct CadetPeerQueue *prev; - int connection_destroyed; - - GCC_check_connections (); - connection_destroyed = GNUNET_NO; - for (q = peer->queue_head; NULL != q; q = next) - { - prev = q->prev; - if (q->c == c) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "GMP queue cancel %s\n", - GC_m2s (q->type)); - GNUNET_assert (GNUNET_NO == connection_destroyed); - if (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY == q->type) - { - q->c = NULL; - } - else - { - connection_destroyed = GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0); - } - - /* Get next from prev, q->next might be already freed: - * queue destroy -> callback -> GCC_destroy -> cancel_queues -> here - */ - if (NULL == prev) - next = peer->queue_head; - else - next = prev->next; - } - else - { - next = q->next; - } - } - - if ( (NULL == peer->queue_head) && - (NULL != peer->core_transmit) ) - { - GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); - peer->core_transmit = NULL; - peer->tmt_time.abs_value_us = 0; - } - GCC_check_connections (); -} - - -/** - * Get the first transmittable message for a connection. - * - * @param peer Neighboring peer. - * @param c Connection. - * - * @return First transmittable message. - */ -static struct CadetPeerQueue * -connection_get_first_message (struct CadetPeer *peer, struct CadetConnection *c) -{ - struct CadetPeerQueue *q; - - for (q = peer->queue_head; NULL != q; q = q->next) - { - if (q->c != c) - continue; - if (queue_is_sendable (q)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " sendable!!\n"); - return q; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, " not sendable\n"); - } - - return NULL; -} - - -/** - * Get the first message for a connection and unqueue it. - * - * Only tunnel (or higher) level messages are unqueued. Connection specific - * messages are silently destroyed upon encounter. - * - * @param peer Neighboring peer. - * @param c Connection. - * @param destroyed[in/out] Was the connection destroyed (prev/as a result)?. - * Can NOT be NULL. - * - * @return First message for this connection. - */ -struct GNUNET_MessageHeader * -GCP_connection_pop (struct CadetPeer *peer, - struct CadetConnection *c, - int *destroyed) -{ - struct CadetPeerQueue *q; - struct CadetPeerQueue *next; - struct GNUNET_MessageHeader *msg; - int dest; - - GCC_check_connections (); - GNUNET_assert (NULL != destroyed); - LOG (GNUNET_ERROR_TYPE_DEBUG, "connection_pop on conn %p\n", c); - for (q = peer->queue_head; NULL != q; q = next) - { - next = q->next; - if (q->c != c) - continue; - LOG (GNUNET_ERROR_TYPE_DEBUG, " - queued: %s (%s %u), cont: %p\n", - GC_m2s (q->type), GC_m2s (q->payload_type), q->payload_id, - q->cont); - switch (q->type) - { - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: - case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: - case GNUNET_MESSAGE_TYPE_CADET_ACK: - case GNUNET_MESSAGE_TYPE_CADET_POLL: - dest = GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0); - if (GNUNET_YES == dest) - { - GNUNET_break (GNUNET_NO == *destroyed); - *destroyed = GNUNET_YES; - } - continue; - - case GNUNET_MESSAGE_TYPE_CADET_KX: - case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED: - case GNUNET_MESSAGE_TYPE_CADET_AX: - case GNUNET_MESSAGE_TYPE_CADET_AX_KX: - msg = (struct GNUNET_MessageHeader *) q->cls; - dest = GCP_queue_destroy (q, GNUNET_NO, GNUNET_NO, 0); - if (GNUNET_YES == dest) - { - GNUNET_break (GNUNET_NO == *destroyed); - *destroyed = GNUNET_YES; - } - return msg; - - default: - GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Unknown message %s\n", GC_m2s (q->type)); - } - } - GCC_check_connections (); - return NULL; -} - - -/** - * Unlock a possibly locked queue for a connection. - * - * If there is a message that can be sent on this connection, call core for it. - * Otherwise (if core transmit is already called or there is no sendable - * message) do nothing. - * - * @param peer Peer who keeps the queue. - * @param c Connection whose messages to unlock. - */ -void -GCP_queue_unlock (struct CadetPeer *peer, struct CadetConnection *c) -{ - struct CadetPeerQueue *q; - size_t size; - - GCC_check_connections (); - if (NULL != peer->core_transmit) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " already unlocked!\n"); - return; /* Already unlocked */ - } - - q = connection_get_first_message (peer, c); - if (NULL == q) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " queue empty!\n"); - return; /* Nothing to transmit */ - } - - size = q->size; - peer->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, - GNUNET_NO, get_priority (q), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_PEER_resolve2 (peer->id), - get_core_size (size), - &queue_send, - peer); - peer->tmt_time = GNUNET_TIME_absolute_get (); - GCC_check_connections (); -} - - -/** - * Initialize the peer subsystem. - * - * @param c Configuration. - */ -void -GCP_init (const struct GNUNET_CONFIGURATION_Handle *c) -{ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "GCP_init\n"); - in_shutdown = GNUNET_NO; - peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); - if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_PEERS", - &max_peers)) - { - GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING, - "CADET", "MAX_PEERS", "USING DEFAULT"); - max_peers = 1000; - } - - if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_number (c, "CADET", "DROP_PERCENT", - &drop_percent)) - { - drop_percent = 0; - } - else - { - LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n"); - LOG (GNUNET_ERROR_TYPE_WARNING, "Cadet is running with DROP enabled.\n"); - LOG (GNUNET_ERROR_TYPE_WARNING, "This is NOT a good idea!\n"); - LOG (GNUNET_ERROR_TYPE_WARNING, "Remove DROP_PERCENT from config file.\n"); - LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n"); - } - - core_handle = GNUNET_CORE_connect (c, /* Main configuration */ - NULL, /* Closure passed to CADET functions */ - &core_init, /* Call core_init once connected */ - &core_connect, /* Handle connects */ - &core_disconnect, /* remove peers on disconnects */ - NULL, /* Don't notify about all incoming messages */ - GNUNET_NO, /* For header only in notification */ - NULL, /* Don't notify about all outbound messages */ - GNUNET_NO, /* For header-only out notification */ - core_handlers); /* Register these handlers */ - if (GNUNET_YES != - GNUNET_CONFIGURATION_get_value_yesno (c, "CADET", "DISABLE_TRY_CONNECT")) - { - transport_handle = GNUNET_TRANSPORT_connect (c, &my_full_id, NULL, /* cls */ - /* Notify callbacks */ - NULL, NULL, NULL); - } - else - { - LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n"); - LOG (GNUNET_ERROR_TYPE_WARNING, "* DISABLE TRYING CONNECT in config *\n"); - LOG (GNUNET_ERROR_TYPE_WARNING, "* Use this only for test purposes. *\n"); - LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n"); - transport_handle = NULL; - } - - - - if (NULL == core_handle) - { - GNUNET_break (0); - GNUNET_SCHEDULER_shutdown (); - return; - } - -} - - -/** - * Shut down the peer subsystem. - */ -void -GCP_shutdown (void) -{ - LOG (GNUNET_ERROR_TYPE_DEBUG, "Shutting down peers\n"); - in_shutdown = GNUNET_YES; - GNUNET_CONTAINER_multipeermap_iterate (peers, - &shutdown_peer, - NULL); - if (NULL != core_handle) - { - GNUNET_CORE_disconnect (core_handle); - core_handle = NULL; - } - if (NULL != transport_handle) - { - GNUNET_TRANSPORT_disconnect (transport_handle); - transport_handle = NULL; - } - GNUNET_PEER_change_rc (myid, -1); - GNUNET_CONTAINER_multipeermap_destroy (peers); - peers = NULL; -} - - -/** - * Retrieve the CadetPeer stucture associated with the peer. Optionally create - * one and insert it in the appropriate structures if the peer is not known yet. - * - * @param peer_id Full identity of the peer. - * @param create #GNUNET_YES if a new peer should be created if unknown. - * #GNUNET_NO otherwise. - * - * @return Existing or newly created peer structure. - * NULL if unknown and not requested @a create - */ -struct CadetPeer * -GCP_get (const struct GNUNET_PeerIdentity *peer_id, int create) -{ - struct CadetPeer *peer; - - peer = GNUNET_CONTAINER_multipeermap_get (peers, peer_id); - if (NULL == peer) - { - peer = GNUNET_new (struct CadetPeer); - if (GNUNET_CONTAINER_multipeermap_size (peers) > max_peers) - { - peer_delete_oldest (); - } - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_put (peers, - peer_id, - peer, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - peer->id = GNUNET_PEER_intern (peer_id); - } - peer->last_contact = GNUNET_TIME_absolute_get (); - - return peer; -} - - -/** - * Retrieve the CadetPeer stucture associated with the peer. Optionally create - * one and insert it in the appropriate structures if the peer is not known yet. - * - * @param peer Short identity of the peer. - * @param create #GNUNET_YES if a new peer should be created if unknown. - * #GNUNET_NO otherwise. - * - * @return Existing or newly created peer structure. - * NULL if unknown and not requested @a create - */ -struct CadetPeer * -GCP_get_short (const GNUNET_PEER_Id peer, int create) -{ - return GCP_get (GNUNET_PEER_resolve2 (peer), create); -} - - -/** - * Try to connect to a peer on transport level. - * - * @param cls Closure (peer). - * @param tc TaskContext. - */ -static void -try_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct CadetPeer *peer = cls; - - if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason)) - return; - - GNUNET_TRANSPORT_try_connect (transport_handle, - GNUNET_PEER_resolve2 (peer->id), NULL, NULL); -} - - -/** - * Try to establish a new connection to this peer (in its tunnel). - * If the peer doesn't have any path to it yet, try to get one. - * If the peer already has some path, send a CREATE CONNECTION towards it. - * - * @param peer Peer to connect to. - */ -void -GCP_connect (struct CadetPeer *peer) -{ - struct CadetTunnel *t; - struct CadetPeerPath *path; - struct CadetConnection *c; - int rerun_search; - - GCC_check_connections (); - LOG (GNUNET_ERROR_TYPE_DEBUG, "peer_connect towards %s\n", GCP_2s (peer)); - - /* If we have a current hello, try to connect using it. */ - GCP_try_connect (peer); - - t = peer->tunnel; - c = NULL; - rerun_search = GNUNET_NO; - - if (NULL != peer->path_head) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " some path exists\n"); - path = peer_get_best_path (peer); - if (NULL != path) - { - char *s; - - s = path_2s (path); - LOG (GNUNET_ERROR_TYPE_DEBUG, " path to use: %s\n", s); - GNUNET_free (s); - - c = GCT_use_path (t, path); - if (NULL == c) - { - /* This case can happen when the path includes a first hop that is - * not yet known to be connected. - * - * This happens quite often during testing when running cadet - * under valgrind: core connect notifications come very late - * and the DHT result has already come and created a valid - * path. In this case, the peer->connections - * hashmaps will be NULL and tunnel_use_path will not be able - * to create a connection from that path. - * - * Re-running the DHT GET should give core time to callback. - * - * GCT_use_path -> GCC_new -> register_neighbors takes care of - * updating statistics about this issue. - */ - rerun_search = GNUNET_YES; - } - else - { - GCC_send_create (c); - return; - } - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " but is NULL, all paths are in use\n"); - } - } - - if (GNUNET_YES == rerun_search) - { - struct GNUNET_TIME_Relative delay; - - GCP_stop_search (peer); - delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 100); - peer->search_delayed = GNUNET_SCHEDULER_add_delayed (delay, - &delayed_search, - peer); - GCC_check_connections (); - return; - } - - if (GNUNET_NO == is_searching (peer)) - GCP_start_search (peer); - GCC_check_connections (); -} - - -/** - * Chech whether there is a direct (core level) connection to peer. - * - * @param peer Peer to check. - * - * @return #GNUNET_YES if there is a direct connection. - */ -int -GCP_is_neighbor (const struct CadetPeer *peer) -{ - struct CadetPeerPath *path; - - if (NULL == peer->connections) - return GNUNET_NO; - - for (path = peer->path_head; NULL != path; path = path->next) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Adding connection %s to peer %s\n", + GCC_2s (cc), + GCP_2s (cp)); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multishortmap_put (cp->connections, + &GCC_get_id (cc)->connection_of_tunnel, + cc, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + if (NULL != cp->destroy_task) { - if (3 > path->length) - return GNUNET_YES; + GNUNET_SCHEDULER_cancel (cp->destroy_task); + cp->destroy_task = NULL; } - - /* Is not a neighbor but connections is not NULL, probably disconnecting */ - GNUNET_break (0); - return GNUNET_NO; -} - - -/** - * Create and initialize a new tunnel towards a peer, in case it has none. - * In case the peer already has a tunnel, nothing is done. - * - * Does not generate any traffic, just creates the local data structures. - * - * @param peer Peer towards which to create the tunnel. - */ -void -GCP_add_tunnel (struct CadetPeer *peer) -{ - GCC_check_connections (); - if (NULL != peer->tunnel) - return; - peer->tunnel = GCT_new (peer); - GCC_check_connections (); } /** - * Add a connection to a neighboring peer. - * - * Store that the peer is the first hop of the connection in one - * direction and that on peer disconnect the connection must be - * notified and destroyed, for it will no longer be valid. + * Remove a @a connection that went via this @a cp. * - * @param peer Peer to add connection to. - * @param c Connection to add. - * @param pred #GNUNET_YES if we are predecessor, #GNUNET_NO if we are successor + * @param cp peer via which the @a connection went + * @param cc the connection to remove */ void -GCP_add_connection (struct CadetPeer *peer, - struct CadetConnection *c, - int pred) +GCP_remove_connection (struct CadetPeer *cp, + struct CadetConnection *cc) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "adding connection %s\n", - GCC_2s (c)); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "to peer %s\n", - GCP_2s (peer)); - GNUNET_assert (NULL != peer->connections); - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (peer->connections, - GCC_get_h (c), - c, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Peer %s has now %u connections.\n", - GCP_2s (peer), - GNUNET_CONTAINER_multihashmap_size (peer->connections)); + "Removing connection %s from peer %s\n", + GCC_2s (cc), + GCP_2s (cp)); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multishortmap_remove (cp->connections, + &GCC_get_id (cc)->connection_of_tunnel, + cc)); + consider_peer_destroy (cp); } /** - * Add the path to the peer and update the path used to reach it in case this - * is the shortest. - * - * @param peer Destination peer to add the path to. - * @param path New path to add. Last peer must be @c peer. - * Path will be either used of freed if already known. - * @param trusted Do we trust that this path is real? + * Retrieve the CadetPeer stucture associated with the + * peer. Optionally create one and insert it in the appropriate + * structures if the peer is not known yet. * - * @return path if path was taken, pointer to existing duplicate if exists - * NULL on error. + * @param peer_id Full identity of the peer. + * @param create #GNUNET_YES if a new peer should be created if unknown. + * #GNUNET_NO to return NULL if peer is unknown. + * @return Existing or newly created peer structure. + * NULL if unknown and not requested @a create */ -struct CadetPeerPath * -GCP_add_path (struct CadetPeer *peer, - struct CadetPeerPath *path, - int trusted) +struct CadetPeer * +GCP_get (const struct GNUNET_PeerIdentity *peer_id, + int create) { - struct CadetPeerPath *aux; - unsigned int l; - unsigned int l2; - - GCC_check_connections (); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "adding path [%u] to peer %s\n", - path->length, GCP_2s (peer)); - - if (NULL == peer || NULL == path - || path->peers[path->length - 1] != peer->id) - { - GNUNET_break (0); - path_destroy (path); - return NULL; - } - - for (l = 1; l < path->length; l++) - { - if (path->peers[l] == myid) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " shortening path by %u\n", l); - for (l2 = 0; l2 < path->length - l; l2++) - { - path->peers[l2] = path->peers[l + l2]; - } - path->length -= l; - l = 1; - path->peers = GNUNET_realloc (path->peers, - path->length * sizeof (GNUNET_PEER_Id)); - } - } - - LOG (GNUNET_ERROR_TYPE_DEBUG, " final length: %u\n", path->length); - - if (2 >= path->length && GNUNET_NO == trusted) - { - /* Only allow CORE to tell us about direct paths */ - path_destroy (path); - return NULL; - } + struct CadetPeer *cp; - l = path_get_length (path); - if (0 == l) - { - path_destroy (path); + cp = GNUNET_CONTAINER_multipeermap_get (peers, + peer_id); + if (NULL != cp) + return cp; + if (GNUNET_NO == create) return NULL; - } - - GNUNET_assert (peer->id == path->peers[path->length - 1]); - for (aux = peer->path_head; aux != NULL; aux = aux->next) - { - l2 = path_get_length (aux); - if (l2 > l) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " added\n"); - GNUNET_CONTAINER_DLL_insert_before (peer->path_head, - peer->path_tail, aux, path); - goto finish; - } - else - { - if (l2 == l && memcmp (path->peers, aux->peers, l) == 0) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " already known\n"); - path_destroy (path); - return aux; - } - } - } - GNUNET_CONTAINER_DLL_insert_tail (peer->path_head, peer->path_tail, - path); - LOG (GNUNET_ERROR_TYPE_DEBUG, " added last\n"); - -finish: - if (NULL != peer->tunnel - && CONNECTIONS_PER_TUNNEL > GCT_count_connections (peer->tunnel) - && 2 < path->length) /* Direct paths are handled by core_connect */ - { - GCP_connect (peer); - } - GCC_check_connections (); - return path; + cp = GNUNET_new (struct CadetPeer); + cp->pid = *peer_id; + cp->connections = GNUNET_CONTAINER_multishortmap_create (32, + GNUNET_YES); + cp->path_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put (peers, + &cp->pid, + cp, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating peer %s\n", + GCP_2s (cp)); + return cp; } /** - * Add the path to the origin peer and update the path used to reach it in case - * this is the shortest. - * The path is given in peer_info -> destination, therefore we turn the path - * upside down first. + * Obtain the peer identity for a `struct CadetPeer`. * - * @param peer Peer to add the path to, being the origin of the path. - * @param path New path to add after being inversed. - * Path will be either used or freed. - * @param trusted Do we trust that this path is real? - * - * @return path if path was taken, pointer to existing duplicate if exists - * NULL on error. + * @param cp our peer handle + * @return the peer identity */ -struct CadetPeerPath * -GCP_add_path_to_origin (struct CadetPeer *peer, - struct CadetPeerPath *path, - int trusted) +const struct GNUNET_PeerIdentity * +GCP_get_id (struct CadetPeer *cp) { - if (NULL == path) - return NULL; - path_invert (path); - return GCP_add_path (peer, path, trusted); + return &cp->pid; } /** - * Adds a path to the info of all the peers in the path + * Iterate over all known peers. * - * @param p Path to process. - * @param confirmed Whether we know if the path works or not. + * @param iter Iterator. + * @param cls Closure for @c iter. */ void -GCP_add_path_to_all (const struct CadetPeerPath *p, int confirmed) +GCP_iterate_all (GNUNET_CONTAINER_PeerMapIterator iter, + void *cls) { - unsigned int i; - - /* TODO: invert and add */ - GCC_check_connections (); - for (i = 0; i < p->length && p->peers[i] != myid; i++) /* skip'em */ ; - for (i++; i < p->length; i++) - { - struct CadetPeer *peer; - struct CadetPeerPath *copy; - - peer = GCP_get_short (p->peers[i], GNUNET_YES); - copy = path_duplicate (p); - copy->length = i + 1; - GCP_add_path (peer, copy, 3 > p->length ? GNUNET_NO : confirmed); - } - GCC_check_connections (); + GNUNET_CONTAINER_multipeermap_iterate (peers, + iter, + cls); } /** - * Remove any path to the peer that has the exact same peers as the one given. + * Count the number of known paths toward the peer. * - * @param peer Peer to remove the path from. - * @param path Path to remove. Is always destroyed . + * @param cp Peer to get path info. + * @return Number of known paths. */ -void -GCP_remove_path (struct CadetPeer *peer, struct CadetPeerPath *path) +unsigned int +GCP_count_paths (const struct CadetPeer *cp) { - struct CadetPeerPath *iter; - struct CadetPeerPath *next; - - GCC_check_connections (); - GNUNET_assert (myid == path->peers[0]); - GNUNET_assert (peer->id == path->peers[path->length - 1]); - - LOG (GNUNET_ERROR_TYPE_INFO, "Removing path %p (%u) from %s\n", - path, path->length, GCP_2s (peer)); - - for (iter = peer->path_head; NULL != iter; iter = next) - { - next = iter->next; - if (0 == path_cmp (path, iter)) - { - GNUNET_CONTAINER_DLL_remove (peer->path_head, peer->path_tail, iter); - if (iter != path) - path_destroy (iter); - } - } - path_destroy (path); - GCC_check_connections (); + return cp->num_paths; } /** - * Check that we are aware of a connection from a neighboring peer. + * Iterate over the paths to a peer. * - * @param peer Peer to the connection is with - * @param c Connection that should be in the map with this peer. + * @param cp Peer to get path info. + * @param callback Function to call for every path. + * @param callback_cls Closure for @a callback. + * @return Number of iterated paths. */ -void -GCP_check_connection (const struct CadetPeer *peer, - const struct CadetConnection *c) +unsigned int +GCP_iterate_paths (struct CadetPeer *cp, + GCP_PathIterator callback, + void *callback_cls) { - GNUNET_assert (NULL != peer); - GNUNET_assert (NULL != peer->connections); - return; - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_contains_value (peer->connections, - GCC_get_h (c), - c)); -} - + unsigned int ret = 0; -/** - * Remove a connection from a neighboring peer. - * - * @param peer Peer to remove connection from. - * @param c Connection to remove. - */ -void -GCP_remove_connection (struct CadetPeer *peer, - const struct CadetConnection *c) -{ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Removing connection %s\n", - GCC_2s (c)); LOG (GNUNET_ERROR_TYPE_DEBUG, - "from peer %s\n", - GCP_2s (peer)); - if ( (NULL == peer) || - (NULL == peer->connections) ) - return; - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (peer->connections, - GCC_get_h (c), - c)); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Peer %s remains with %u connections.\n", - GCP_2s (peer), - GNUNET_CONTAINER_multihashmap_size (peer->connections)); -} - - -/** - * Start the DHT search for new paths towards the peer: we don't have - * enough good connections. - * - * @param peer Destination peer. - */ -void -GCP_start_search (struct CadetPeer *peer) -{ - const struct GNUNET_PeerIdentity *id; - struct CadetTunnel *t = peer->tunnel; - - GCC_check_connections (); - if (NULL != peer->search_h) - { - GNUNET_break (0); - return; - } - - if (NULL != peer->search_delayed) - GCP_stop_search (peer); - - id = GNUNET_PEER_resolve2 (peer->id); - peer->search_h = GCD_search (id, &search_handler, peer); - - if (NULL == t) - { - /* Why would we search for a peer with no tunnel towards it? */ - GNUNET_break (0); - return; - } - - if (CADET_TUNNEL_NEW == GCT_get_cstate (t) - || 0 == GCT_count_any_connections (t)) - { - GCT_change_cstate (t, CADET_TUNNEL_SEARCHING); + "Iterating over paths to peer %s%s\n", + GCP_2s (cp), + (NULL == cp->core_mq) ? "" : " including direct link"); + if (NULL != cp->core_mq) + { + struct CadetPeerPath *path; + + path = GCPP_get_path_from_route (1, + &cp->pid); + ret++; + if (GNUNET_NO == + callback (callback_cls, + path, + 0)) + return ret; + } + for (unsigned int i=0;ipath_dll_length;i++) + { + for (struct CadetPeerPathEntry *pe = cp->path_heads[i]; + NULL != pe; + pe = pe->next) + { + ret++; + if (GNUNET_NO == + callback (callback_cls, + pe->path, + i)) + return ret; + } } - GCC_check_connections (); + return ret; } /** - * Stop the DHT search for new paths towards the peer: we already have - * enough good connections. + * Iterate over the paths to @a cp where + * @a cp is at distance @a dist from us. * - * @param peer Destination peer. + * @param cp Peer to get path info. + * @param dist desired distance of @a cp to us on the path + * @param callback Function to call for every path. + * @param callback_cls Closure for @a callback. + * @return Number of iterated paths. */ -void -GCP_stop_search (struct CadetPeer *peer) +unsigned int +GCP_iterate_paths_at (struct CadetPeer *cp, + unsigned int dist, + GCP_PathIterator callback, + void *callback_cls) { - GCC_check_connections (); - if (NULL != peer->search_h) + unsigned int ret = 0; + + if (dist >= cp->path_dll_length) { - GCD_search_stop (peer->search_h); - peer->search_h = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to look for paths at distance %u, but maximum for me is < %u\n", + dist, + cp->path_dll_length); + return 0; } - if (NULL != peer->search_delayed) + for (struct CadetPeerPathEntry *pe = cp->path_heads[dist]; + NULL != pe; + pe = pe->next) { - GNUNET_SCHEDULER_cancel (peer->search_delayed); - peer->search_delayed = NULL; + if (GNUNET_NO == + callback (callback_cls, + pe->path, + dist)) + return ret; + ret++; } - GCC_check_connections (); -} - - -/** - * Get the Full ID of a peer. - * - * @param peer Peer to get from. - * - * @return Full ID of peer. - */ -const struct GNUNET_PeerIdentity * -GCP_get_id (const struct CadetPeer *peer) -{ - return GNUNET_PEER_resolve2 (peer->id); + return ret; } /** - * Get the Short ID of a peer. - * - * @param peer Peer to get from. + * Get the tunnel towards a peer. * - * @return Short ID of peer. + * @param cp Peer to get from. + * @param create #GNUNET_YES to create a tunnel if we do not have one + * @return Tunnel towards peer. */ -GNUNET_PEER_Id -GCP_get_short_id (const struct CadetPeer *peer) +struct CadetTunnel * +GCP_get_tunnel (struct CadetPeer *cp, + int create) { - return peer->id; + if (NULL == cp) + return NULL; + if ( (NULL != cp->t) || + (GNUNET_NO == create) ) + return cp->t; + cp->t = GCT_create_tunnel (cp); + consider_peer_activate (cp); + return cp->t; } /** - * Set tunnel. + * Hello offer was passed to the transport service. Mark it + * as done. * - * If tunnel is NULL and there was a search active, stop it, as it's useless. - * - * @param peer Peer. - * @param t Tunnel. + * @param cls the `struct CadetPeer` where the offer completed */ -void -GCP_set_tunnel (struct CadetPeer *peer, struct CadetTunnel *t) +static void +hello_offer_done (void *cls) { - peer->tunnel = t; - if (NULL == t && GNUNET_YES == is_searching (peer)) - { - GCP_stop_search (peer); - } -} + struct CadetPeer *cp = cls; - -/** - * Get the tunnel towards a peer. - * - * @param peer Peer to get from. - * - * @return Tunnel towards peer. - */ -struct CadetTunnel * -GCP_get_tunnel (const struct CadetPeer *peer) -{ - if (NULL == peer) - return NULL; - return peer->tunnel; + cp->hello_offer = NULL; } /** - * Set the hello message. + * We got a HELLO for a @a peer, remember it, and possibly + * trigger adequate actions (like trying to connect). * - * @param peer Peer whose message to set. - * @param hello Hello message. + * @param cp the peer we got a HELLO for + * @param hello the HELLO to remember */ void -GCP_set_hello (struct CadetPeer *peer, const struct GNUNET_HELLO_Message *hello) +GCP_set_hello (struct CadetPeer *cp, + const struct GNUNET_HELLO_Message *hello) { - struct GNUNET_HELLO_Message *old; - size_t size; - - GCC_check_connections (); - LOG (GNUNET_ERROR_TYPE_DEBUG, "set hello for %s\n", GCP_2s (peer)); - if (NULL == hello) - return; + struct GNUNET_HELLO_Message *mrg; - old = GCP_get_hello (peer); - if (NULL == old) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got %u byte HELLO for peer %s\n", + (unsigned int) GNUNET_HELLO_size (hello), + GCP_2s (cp)); + if (NULL != cp->hello_offer) { - size = GNUNET_HELLO_size (hello); - peer->hello = GNUNET_malloc (size); - memcpy (peer->hello, hello, size); + GNUNET_TRANSPORT_offer_hello_cancel (cp->hello_offer); + cp->hello_offer = NULL; } - else + if (NULL != cp->hello) { - peer->hello = GNUNET_HELLO_merge (old, hello); - GNUNET_free (old); + mrg = GNUNET_HELLO_merge (hello, + cp->hello); + GNUNET_free (cp->hello); + cp->hello = mrg; } - GCC_check_connections (); -} - - -/** - * Get the hello message. - * - * @param peer Peer whose message to get. - * - * @return Hello message. - */ -struct GNUNET_HELLO_Message * -GCP_get_hello (struct CadetPeer *peer) -{ - struct GNUNET_TIME_Absolute expiration; - struct GNUNET_TIME_Relative remaining; - - if (NULL == peer->hello) - return NULL; - - expiration = GNUNET_HELLO_get_last_expiration (peer->hello); - remaining = GNUNET_TIME_absolute_get_remaining (expiration); - if (0 == remaining.rel_value_us) + else { - LOG (GNUNET_ERROR_TYPE_DEBUG, " get - hello expired on %s\n", - GNUNET_STRINGS_absolute_time_to_string (expiration)); - GNUNET_free (peer->hello); - peer->hello = NULL; + cp->hello = GNUNET_memdup (hello, + GNUNET_HELLO_size (hello)); } - return peer->hello; + cp->hello_offer + = GNUNET_TRANSPORT_offer_hello (cfg, + GNUNET_HELLO_get_header (cp->hello) , + &hello_offer_done, + cp); + /* New HELLO means cp's destruction time may change... */ + consider_peer_destroy (cp); } /** - * Try to connect to a peer on TRANSPORT level. + * The tunnel to the given peer no longer exists, remove it from our + * data structures, and possibly clean up the peer itself. * - * @param peer Peer to whom to connect. + * @param cp the peer affected + * @param t the dead tunnel */ void -GCP_try_connect (struct CadetPeer *peer) +GCP_drop_tunnel (struct CadetPeer *cp, + struct CadetTunnel *t) { - struct GNUNET_HELLO_Message *hello; - struct GNUNET_MessageHeader *mh; - - if (NULL == transport_handle) - return; - GCC_check_connections (); - if (GNUNET_YES == GCP_is_neighbor (peer)) - return; - hello = GCP_get_hello (peer); - if (NULL == hello) - return; - - mh = GNUNET_HELLO_get_header (hello); - GNUNET_TRANSPORT_offer_hello (transport_handle, - mh, - &try_connect, - peer); - GCC_check_connections (); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Dropping tunnel %s to peer %s\n", + GCT_2s (t), + GCP_2s (cp)); + GNUNET_assert (cp->t == t); + cp->t = NULL; + consider_peer_destroy (cp); } /** - * Notify a peer that a link between two other peers is broken. If any path - * used that link, eliminate it. + * Test if @a cp has a core-level connection * - * @param peer Peer affected by the change. - * @param peer1 Peer whose link is broken. - * @param peer2 Peer whose link is broken. + * @param cp peer to test + * @return #GNUNET_YES if @a cp has a core-level connection */ -void -GCP_notify_broken_link (struct CadetPeer *peer, - const struct GNUNET_PeerIdentity *peer1, - const struct GNUNET_PeerIdentity *peer2) +int +GCP_has_core_connection (struct CadetPeer *cp) { - struct CadetPeerPath *iter; - struct CadetPeerPath *next; - unsigned int i; - GNUNET_PEER_Id p1; - GNUNET_PEER_Id p2; - - GCC_check_connections (); - p1 = GNUNET_PEER_search (peer1); - p2 = GNUNET_PEER_search (peer2); - - LOG (GNUNET_ERROR_TYPE_DEBUG, "Link %u-%u broken\n", p1, p2); - if (0 == p1 || 0 == p2) - { - /* We don't even know them */ - return; - } - - for (iter = peer->path_head; NULL != iter; iter = next) - { - next = iter->next; - for (i = 0; i < iter->length - 1; i++) - { - if ((iter->peers[i] == p1 && iter->peers[i + 1] == p2) - || (iter->peers[i] == p2 && iter->peers[i + 1] == p1)) - { - char *s; - - s = path_2s (iter); - LOG (GNUNET_ERROR_TYPE_DEBUG, " - invalidating %s\n", s); - GNUNET_free (s); - - path_invalidate (iter); - } - } - } - GCC_check_connections (); + return (NULL != cp->core_mq) ? GNUNET_YES : GNUNET_NO; } /** - * Count the number of known paths toward the peer. + * Start message queue change notifications. * - * @param peer Peer to get path info. - * - * @return Number of known paths. + * @param cp peer to notify for + * @param cb function to call if mq becomes available or unavailable + * @param cb_cls closure for @a cb + * @return handle to cancel request */ -unsigned int -GCP_count_paths (const struct CadetPeer *peer) +struct GCP_MessageQueueManager * +GCP_request_mq (struct CadetPeer *cp, + GCP_MessageQueueNotificationCallback cb, + void *cb_cls) { - struct CadetPeerPath *iter; - unsigned int i; - - for (iter = peer->path_head, i = 0; NULL != iter; iter = iter->next) - i++; - - return i; + struct GCP_MessageQueueManager *mqm; + + mqm = GNUNET_new (struct GCP_MessageQueueManager); + mqm->cb = cb; + mqm->cb_cls = cb_cls; + mqm->cp = cp; + GNUNET_CONTAINER_DLL_insert (cp->mqm_head, + cp->mqm_tail, + mqm); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating MQM %p for peer %s\n", + mqm, + GCP_2s (cp)); + if (NULL != cp->core_mq) + cb (cb_cls, + GNUNET_YES); + return mqm; } /** - * Iterate over the paths to a peer. - * - * @param peer Peer to get path info. - * @param callback Function to call for every path. - * @param cls Closure for @a callback. + * Stops message queue change notifications. * - * @return Number of iterated paths. + * @param mqm handle matching request to cancel + * @param last_env final message to transmit, or NULL */ -unsigned int -GCP_iterate_paths (struct CadetPeer *peer, - GCP_path_iterator callback, - void *cls) +void +GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm, + struct GNUNET_MQ_Envelope *last_env) { - struct CadetPeerPath *iter; - unsigned int i; + struct CadetPeer *cp = mqm->cp; - for (iter = peer->path_head, i = 0; NULL != iter; iter = iter->next) - { - i++; - if (GNUNET_YES != callback (cls, peer, iter)) - break; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Destroying MQM %p for peer %s%s\n", + mqm, + GCP_2s (cp), + (NULL == last_env) ? "" : " with last ditch transmission"); + if (NULL != mqm->env) + GNUNET_MQ_discard (mqm->env); + if (NULL != last_env) + { + if (NULL != cp->core_mq) + { + GNUNET_MQ_notify_sent (last_env, + &mqm_send_done, + cp); + GNUNET_MQ_send (cp->core_mq, + last_env); + } + else + { + GNUNET_MQ_discard (last_env); + } } - - return i; + if (cp->mqm_ready_ptr == mqm) + cp->mqm_ready_ptr = mqm->next; + GNUNET_CONTAINER_DLL_remove (cp->mqm_head, + cp->mqm_tail, + mqm); + GNUNET_free (mqm); } /** - * Iterate all known peers. + * Send the message in @a env to @a cp, overriding queueing logic. + * This function should only be used to send error messages outside + * of flow and congestion control, similar to ICMP. Note that + * the envelope may be silently discarded as well. * - * @param iter Iterator. - * @param cls Closure for @c iter. + * @param cp peer to send the message to + * @param env envelope with the message to send */ void -GCP_iterate_all (GNUNET_CONTAINER_PeerMapIterator iter, - void *cls) +GCP_send_ooo (struct CadetPeer *cp, + struct GNUNET_MQ_Envelope *env) { - GCC_check_connections (); - GNUNET_CONTAINER_multipeermap_iterate (peers, - iter, - cls); - GCC_check_connections (); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending message to %s out of management\n", + GCP_2s (cp)); + if (NULL == cp->core_mq) + { + GNUNET_MQ_discard (env); + return; + } + GNUNET_MQ_notify_sent (env, + &mqm_send_done, + cp); + GNUNET_MQ_send (cp->core_mq, + env); } -/** - * Get the static string for a peer ID. - * - * @param peer Peer. - * - * @return Static string for it's ID. - */ -const char * -GCP_2s (const struct CadetPeer *peer) -{ - if (NULL == peer) - return "(NULL)"; - return GNUNET_i2s (GNUNET_PEER_resolve2 (peer->id)); -} -/* end of gnunet-service-cadet_peer.c */ +/* end of gnunet-service-cadet-new_peer.c */