/*
This file is part of GNUnet.
- (C) 2013 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2001-2017 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
*/
-
+/**
+ * @file cadet/gnunet-service-cadet_peer.c
+ * @brief Information we track per peer.
+ * @author Bartlomiej Polot
+ * @author Christian Grothoff
+ *
+ * TODO:
+ * - optimize stopping/restarting DHT search to situations
+ * where we actually need it (i.e. not if we have a direct connection,
+ * or if we already have plenty of good short ones, or maybe even
+ * to take a break if we have some connections and have searched a lot (?))
+ */
#include "platform.h"
#include "gnunet_util_lib.h"
-
+#include "gnunet_hello_lib.h"
+#include "gnunet_signatures.h"
#include "gnunet_transport_service.h"
+#include "gnunet_ats_service.h"
#include "gnunet_core_service.h"
#include "gnunet_statistics_service.h"
-
#include "cadet_protocol.h"
-
-#include "gnunet-service-cadet_peer.h"
-#include "gnunet-service-cadet_dht.h"
#include "gnunet-service-cadet_connection.h"
-#include "gnunet-service-cadet_tunnel.h"
-#include "cadet_path.h"
+#include "gnunet-service-cadet_dht.h"
+#include "gnunet-service-cadet_peer.h"
+#include "gnunet-service-cadet_paths.h"
+#include "gnunet-service-cadet_tunnels.h"
+
+
+#define LOG(level, ...) GNUNET_log_from(level,"cadet-per",__VA_ARGS__)
+
+
+/**
+ * How long do we wait until tearing down an idle peer?
+ */
+#define IDLE_PEER_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
+
+/**
+ * How long do we keep paths around if we no longer care about the peer?
+ */
+#define IDLE_PATH_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
-#define LOG(level, ...) GNUNET_log_from (level,"cadet-p2p",__VA_ARGS__)
-#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-p2p",__VA_ARGS__)
-/******************************************************************************/
-/******************************** STRUCTS **********************************/
-/******************************************************************************/
/**
- * Struct containing info about a queued transmission to this peer
+ * Data structure used to track whom we have to notify about changes
+ * to our message queue.
*/
-struct CadetPeerQueue
+struct GCP_MessageQueueManager
{
- /**
- * DLL next
- */
- struct CadetPeerQueue *next;
-
- /**
- * DLL previous
- */
- struct CadetPeerQueue *prev;
-
- /**
- * Peer this transmission is directed to.
- */
- struct CadetPeer *peer;
-
- /**
- * Connection this message belongs to.
- */
- struct CadetConnection *c;
-
- /**
- * Is FWD in c?
- */
- int fwd;
-
- /**
- * Pointer to info stucture used as cls.
- */
- void *cls;
/**
- * Type of message
+ * Kept in a DLL.
+ */
+ struct GCP_MessageQueueManager *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct GCP_MessageQueueManager *prev;
+
+ /**
+ * Function to call with updated message queue object.
*/
- uint16_t type;
+ GCP_MessageQueueNotificationCallback cb;
/**
- * Type of message
+ * Closure for @e cb.
*/
- uint16_t payload_type;
+ void *cb_cls;
/**
- * Type of message
+ * The peer this is for.
*/
- uint32_t payload_id;
+ struct CadetPeer *cp;
/**
- * Size of the message
- */
- size_t size;
-
- /**
- * Set when this message starts waiting for CORE.
- */
- struct GNUNET_TIME_Absolute start_waiting;
-
- /**
- * Function to call on sending.
- */
- GCP_sent callback;
-
- /**
- * Closure for callback.
- */
- void *callback_cls;
+ * Envelope this manager would like to transmit once it is its turn.
+ */
+ struct GNUNET_MQ_Envelope *env;
+
};
+
/**
* Struct containing all information regarding a given peer
*/
struct CadetPeer
{
- /**
- * ID of the peer
- */
- GNUNET_PEER_Id id;
-
- /**
- * Last time we heard from this peer
- */
- struct GNUNET_TIME_Absolute last_contact;
-
- /**
- * Paths to reach the peer, ordered by ascending hop count
- */
- struct CadetPeerPath *path_head;
-
- /**
- * Paths to reach the peer, ordered by ascending hop count
- */
- struct CadetPeerPath *path_tail;
-
- /**
- * Handle to stop the DHT search for paths to this peer
- */
- struct GCD_search_handle *search_h;
-
- /**
- * Tunnel to this peer, if any.
- */
- struct CadetTunnel *tunnel;
-
- /**
- * Connections that go through this peer, indexed by tid;
- */
- struct GNUNET_CONTAINER_MultiHashMap *connections;
+ /**
+ * ID of the peer
+ */
+ struct GNUNET_PeerIdentity pid;
- /**
- * Handle for queued transmissions
- */
- struct GNUNET_CORE_TransmitHandle *core_transmit;
+ /**
+ * Last time we heard from this peer (currently not used!)
+ */
+ struct GNUNET_TIME_Absolute last_contactXXX;
/**
- * Transmission queue to core DLL head
+ * Array of DLLs of paths traversing the peer, organized by the
+ * offset of the peer on the larger path.
*/
- struct CadetPeerQueue *queue_head;
+ struct CadetPeerPathEntry **path_heads;
/**
- * Transmission queue to core DLL tail
+ * Array of DLL of paths traversing the peer, organized by the
+ * offset of the peer on the larger path.
*/
- struct CadetPeerQueue *queue_tail;
+ struct CadetPeerPathEntry **path_tails;
/**
- * How many messages are in the queue to this peer.
+ * Notifications to call when @e core_mq changes.
*/
- unsigned int queue_n;
+ struct GCP_MessageQueueManager *mqm_head;
/**
- * Hello message.
+ * Notifications to call when @e core_mq changes.
*/
- struct GNUNET_HELLO_Message* hello;
-};
+ struct GCP_MessageQueueManager *mqm_tail;
+ /**
+ * Pointer to first "ready" entry in @e mqm_head.
+ */
+ struct GCP_MessageQueueManager *mqm_ready_ptr;
-/******************************************************************************/
-/******************************* GLOBALS ***********************************/
-/******************************************************************************/
+ /**
+ * MIN-heap of paths owned by this peer (they also end at this
+ * peer). Ordered by desirability.
+ */
+ struct GNUNET_CONTAINER_Heap *path_heap;
-/**
- * Global handle to the statistics service.
- */
-extern struct GNUNET_STATISTICS_Handle *stats;
+ /**
+ * Handle to stop the DHT search for paths to this peer
+ */
+ struct GCD_search_handle *search_h;
-/**
- * Local peer own ID (full value).
- */
-extern struct GNUNET_PeerIdentity my_full_id;
+ /**
+ * Task to clean up @e path_heap asynchronously.
+ */
+ struct GNUNET_SCHEDULER_Task *heap_cleanup_task;
-/**
- * Local peer own ID (short)
- */
-extern GNUNET_PEER_Id myid;
+ /**
+ * Task to destroy this entry.
+ */
+ struct GNUNET_SCHEDULER_Task *destroy_task;
-/**
- * Peers known, indexed by PeerIdentity (CadetPeer).
- */
-static struct GNUNET_CONTAINER_MultiPeerMap *peers;
+ /**
+ * Tunnel to this peer, if any.
+ */
+ struct CadetTunnel *t;
-/**
- * How many peers do we want to remember?
- */
-static unsigned long long max_peers;
+ /**
+ * Connections that go through this peer; indexed by tid.
+ */
+ struct GNUNET_CONTAINER_MultiShortmap *connections;
-/**
- * Percentage of messages that will be dropped (for test purposes only).
- */
-static unsigned long long drop_percent;
+ /**
+ * Handle for core transmissions.
+ */
+ struct GNUNET_MQ_Handle *core_mq;
-/**
- * Handle to communicate with core.
- */
-static struct GNUNET_CORE_Handle *core_handle;
+ /**
+ * Hello message of the peer.
+ */
+ struct GNUNET_HELLO_Message *hello;
-/**
- * Handle to try to start new connections.
- */
-static struct GNUNET_TRANSPORT_Handle *transport_handle;
+ /**
+ * Handle to us offering the HELLO to the transport.
+ */
+ struct GNUNET_TRANSPORT_OfferHelloHandle *hello_offer;
+ /**
+ * Handle to our ATS request asking ATS to suggest an address
+ * to TRANSPORT for this peer (to establish a direct link).
+ */
+ struct GNUNET_ATS_ConnectivitySuggestHandle *connectivity_suggestion;
-/******************************************************************************/
-/***************************** DEBUG *********************************/
-/******************************************************************************/
+ /**
+ * How many messages are in the queue to this peer.
+ */
+ unsigned int queue_n;
-/**
- * Log all kinds of info about the queueing status of a peer.
- *
- * @param p Peer whose queue to show.
- * @param level Error level to use for logging.
- */
-static void
-queue_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level)
-{
- struct CadetPeerQueue *q;
- int do_log;
+ /**
+ * How many paths do we have to this peer (in all @e path_heads DLLs combined).
+ */
+ unsigned int num_paths;
- do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
- "cadet-p2p",
- __FILE__, __FUNCTION__, __LINE__);
- if (0 == do_log)
- return;
+ /**
+ * Sum over all of the offsets of all of the paths in the @a path_heads DLLs.
+ * Used to speed-up @GCP_get_desirability_of_path() calculation.
+ */
+ unsigned int off_sum;
- LOG2 (level, "QQQ Message queue towards %s\n", GCP_2s (p));
- LOG2 (level, "QQQ queue length: %u\n", p->queue_n);
- LOG2 (level, "QQQ core tmt rdy: %p\n", p->core_transmit);
+ /**
+ * Number of message queue managers of this peer that have a message in waiting.
+ *
+ * Used to quickly see if we need to bother scanning the @e msm_head DLL.
+ * TODO: could be replaced by another DLL that would then allow us to avoid
+ * the O(n)-scan of the DLL for ready entries!
+ */
+ unsigned int mqm_ready_counter;
- for (q = p->queue_head; NULL != q; q = q->next)
- {
- LOG2 (level, "QQQ - %s %s on %s\n",
- GC_m2s (q->type), GC_f2s (q->fwd), GCC_2s (q->c));
- LOG2 (level, "QQQ payload %s, %u\n",
- GC_m2s (q->payload_type), q->payload_id);
- LOG2 (level, "QQQ size: %u bytes\n", q->size);
- }
+ /**
+ * Current length of the @e path_heads and @path_tails arrays.
+ * The arrays should be grown as needed.
+ */
+ unsigned int path_dll_length;
- LOG2 (level, "QQQ End queue towards %s\n", GCP_2s (p));
-}
+};
/**
- * Log all kinds of info about a peer.
+ * Get the static string for a peer ID.
*
- * @param peer Peer.
+ * @param cp Peer.
+ * @return Static string for it's ID.
*/
-void
-GCP_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level)
+const char *
+GCP_2s (const struct CadetPeer *cp)
{
- struct CadetPeerPath *path;
- unsigned int conns;
- int do_log;
-
- do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
- "cadet-p2p",
- __FILE__, __FUNCTION__, __LINE__);
- if (0 == do_log)
- return;
-
- if (NULL == p)
- {
- LOG2 (level, "PPP DEBUG PEER NULL\n");
- return;
- }
-
- LOG2 (level, "PPP DEBUG PEER %s\n", GCP_2s (p));
- LOG2 (level, "PPP last contact %s\n",
- GNUNET_STRINGS_absolute_time_to_string (p->last_contact));
- for (path = p->path_head; NULL != path; path = path->next)
- {
- char *s;
-
- s = path_2s (path);
- LOG2 (level, "PPP path: %s\n", s);
- GNUNET_free (s);
- }
+ static char buf[32];
- LOG2 (level, "PPP core transmit handle %p\n", p->core_transmit);
- LOG2 (level, "PPP DHT GET handle %p\n", p->search_h);
- if (NULL != p->connections)
- conns = GNUNET_CONTAINER_multihashmap_size (p->connections);
- else
- conns = 0;
- LOG2 (level, "PPP # connections over link to peer: %u\n", conns);
- queue_debug (p, level);
- LOG2 (level, "PPP DEBUG END\n");
+ GNUNET_snprintf (buf,
+ sizeof (buf),
+ "P(%s)",
+ GNUNET_i2s (&cp->pid));
+ return buf;
}
-/******************************************************************************/
-/***************************** CORE HELPERS *********************************/
-/******************************************************************************/
-
-
/**
- * Iterator to notify all connections of a broken link. Mark connections
- * to destroy after all traffic has been sent.
+ * Calculate how desirable a path is for @a cp if @a cp
+ * is at offset @a off.
*
- * @param cls Closure (peer disconnected).
- * @param key Current key code (peer id).
- * @param value Value in the hash map (connection).
+ * The 'desirability_table.c' program can be used to compute a list of
+ * sample outputs for different scenarios. Basically, we score paths
+ * lower if there are many alternatives, and higher if they are
+ * shorter than average, and very high if they are much shorter than
+ * average and without many alternatives.
*
- * @return #GNUNET_YES to continue to iterate.
+ * @param cp a peer reachable via a path
+ * @param off offset of @a cp in the path
+ * @return score how useful a path is to reach @a cp,
+ * positive scores mean path is more desirable
*/
-static int
-notify_broken (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
+double
+GCP_get_desirability_of_path (struct CadetPeer *cp,
+ unsigned int off)
{
- struct CadetPeer *peer = cls;
- struct CadetConnection *c = value;
+ unsigned int num_alts = cp->num_paths;
+ unsigned int off_sum;
+ double avg_sum;
+ double path_delta;
+ double weight_alts;
+
+ GNUNET_assert (num_alts >= 1); /* 'path' should be in there! */
+ GNUNET_assert (0 != cp->path_dll_length);
+
+ /* We maintain 'off_sum' in 'peer' and thereby
+ avoid the SLOW recalculation each time. Kept here
+ just to document what is going on. */
+#if SLOW
+ off_sum = 0;
+ for (unsigned int j=0;j<cp->path_dll_length;j++)
+ for (struct CadetPeerPathEntry *pe = cp->path_heads[j];
+ NULL != pe;
+ pe = pe->next)
+ off_sum += j;
+ GNUNET_assert (off_sum == cp->off_sum);
+#else
+ off_sum = cp->off_sum;
+#endif
+ avg_sum = off_sum * 1.0 / cp->path_dll_length;
+ path_delta = off - avg_sum;
+ /* path_delta positiv: path off of peer above average (bad path for peer),
+ path_delta negativ: path off of peer below average (good path for peer) */
+ if (path_delta <= - 1.0)
+ weight_alts = - num_alts / path_delta; /* discount alternative paths */
+ else if (path_delta >= 1.0)
+ weight_alts = num_alts * path_delta; /* overcount alternative paths */
+ else
+ weight_alts = num_alts; /* count alternative paths normally */
- LOG (GNUNET_ERROR_TYPE_DEBUG, " notifying %s due to %s\n",
- GCC_2s (c), GCP_2s (peer));
- GCC_notify_broken (c, peer);
- return GNUNET_YES;
+ /* off+1: long paths are generally harder to find and thus count
+ a bit more as they get longer. However, above-average paths
+ still need to count less, hence the squaring of that factor. */
+ return (off + 1.0) / (weight_alts * weight_alts);
}
/**
- * Remove the direct path to the peer.
- *
- * @param peer Peer to remove the direct path from.
+ * This peer is no longer be needed, clean it up now.
*
+ * @param cls peer to clean up
*/
-static struct CadetPeerPath *
-pop_direct_path (struct CadetPeer *peer)
+static void
+destroy_peer (void *cls)
{
- struct CadetPeerPath *iter;
+ struct CadetPeer *cp = cls;
- for (iter = peer->path_head; NULL != iter; iter = iter->next)
- {
- if (2 <= iter->length)
- {
- GNUNET_CONTAINER_DLL_remove (peer->path_head, peer->path_tail, iter);
- return iter;
- }
- }
- return NULL;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Destroying state about peer %s\n",
+ GCP_2s (cp));
+ cp->destroy_task = NULL;
+ GNUNET_assert (NULL == cp->t);
+ GNUNET_assert (NULL == cp->core_mq);
+ GNUNET_assert (0 == cp->num_paths);
+ for (unsigned int i=0;i<cp->path_dll_length;i++)
+ GNUNET_assert (NULL == cp->path_heads[i]);
+ GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (cp->connections));
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (peers,
+ &cp->pid,
+ cp));
+ GNUNET_free_non_null (cp->path_heads);
+ GNUNET_free_non_null (cp->path_tails);
+ cp->path_dll_length = 0;
+ if (NULL != cp->search_h)
+ {
+ GCD_search_stop (cp->search_h);
+ cp->search_h = NULL;
+ }
+ /* FIXME: clean up search_delayedXXX! */
+
+ if (NULL != cp->hello_offer)
+ {
+ GNUNET_TRANSPORT_offer_hello_cancel (cp->hello_offer);
+ cp->hello_offer = NULL;
+ }
+ if (NULL != cp->connectivity_suggestion)
+ {
+ GNUNET_ATS_connectivity_suggest_cancel (cp->connectivity_suggestion);
+ cp->connectivity_suggestion = NULL;
+ }
+ GNUNET_CONTAINER_multishortmap_destroy (cp->connections);
+ if (NULL != cp->path_heap)
+ {
+ GNUNET_CONTAINER_heap_destroy (cp->path_heap);
+ cp->path_heap = NULL;
+ }
+ if (NULL != cp->heap_cleanup_task)
+ {
+ GNUNET_SCHEDULER_cancel (cp->heap_cleanup_task);
+ cp->heap_cleanup_task = NULL;
+ }
+ GNUNET_free_non_null (cp->hello);
+ /* Peer should not be freed if paths exist; if there are no paths,
+ there ought to be no connections, and without connections, no
+ notifications. Thus we can assert that mqm_head is empty at this
+ point. */
+ GNUNET_assert (NULL == cp->mqm_head);
+ GNUNET_assert (NULL == cp->mqm_ready_ptr);
+ GNUNET_free (cp);
}
-/******************************************************************************/
-/***************************** CORE CALLBACKS *********************************/
-/******************************************************************************/
-
/**
- * Method called whenever a given peer connects.
+ * This peer is now on more "active" duty, activate processes related to it.
*
- * @param cls closure
- * @param peer peer identity this notification is about
+ * @param cp the more-active peer
*/
static void
-core_connect (void *cls, const struct GNUNET_PeerIdentity *peer)
+consider_peer_activate (struct CadetPeer *cp)
{
- struct CadetPeer *mp;
- struct CadetPeerPath *path;
- char own_id[16];
+ uint32_t strength;
- strncpy (own_id, GNUNET_i2s (&my_full_id), 15);
- mp = GCP_get (peer);
- if (myid == mp->id)
- {
- LOG (GNUNET_ERROR_TYPE_INFO, "CONNECTED %s (self)\n", own_id);
- path = path_new (1);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Updating peer %s activation state (%u connections)%s%s\n",
+ GCP_2s (cp),
+ GNUNET_CONTAINER_multishortmap_size (cp->connections),
+ (NULL == cp->t) ? "" : " with tunnel",
+ (NULL == cp->core_mq) ? "" : " with CORE link");
+ if (NULL != cp->destroy_task)
+ {
+ /* It's active, do not destory! */
+ GNUNET_SCHEDULER_cancel (cp->destroy_task);
+ cp->destroy_task = NULL;
+ }
+ if ( (0 == GNUNET_CONTAINER_multishortmap_size (cp->connections)) &&
+ (NULL == cp->t) )
+ {
+ /* We're just on a path or directly connected; don't bother too much */
+ if (NULL != cp->connectivity_suggestion)
+ {
+ GNUNET_ATS_connectivity_suggest_cancel (cp->connectivity_suggestion);
+ cp->connectivity_suggestion = NULL;
+ }
+ if (NULL != cp->search_h)
+ {
+ GCD_search_stop (cp->search_h);
+ cp->search_h = NULL;
+ }
+ return;
}
- else
+ if (NULL == cp->core_mq)
{
- LOG (GNUNET_ERROR_TYPE_INFO, "CONNECTED %s <= %s\n",
- own_id, GNUNET_i2s (peer));
- path = path_new (2);
- path->peers[1] = mp->id;
- GNUNET_PEER_change_rc (mp->id, 1);
- GNUNET_STATISTICS_update (stats, "# peers", 1, GNUNET_NO);
+ /* Lacks direct connection, try to create one by querying the DHT */
+ if ( (NULL == cp->search_h) &&
+ (DESIRED_CONNECTIONS_PER_TUNNEL > cp->num_paths) )
+ cp->search_h
+ = GCD_search (&cp->pid);
}
- path->peers[0] = myid;
- GNUNET_PEER_change_rc (myid, 1);
- GCP_add_path (mp, path, GNUNET_YES);
-
- mp->connections = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_YES);
-
- if (NULL != GCP_get_tunnel (mp) &&
- 0 > GNUNET_CRYPTO_cmp_peer_identity (&my_full_id, peer))
+ else
{
- GCP_connect (mp);
+ /* Have direct connection, stop DHT search if active */
+ if (NULL != cp->search_h)
+ {
+ GCD_search_stop (cp->search_h);
+ cp->search_h = NULL;
+ }
}
- return;
+ /* If we have a tunnel, our urge for connections is much bigger */
+ strength = (NULL != cp->t) ? 32 : 1;
+ if (NULL != cp->connectivity_suggestion)
+ GNUNET_ATS_connectivity_suggest_cancel (cp->connectivity_suggestion);
+ cp->connectivity_suggestion
+ = GNUNET_ATS_connectivity_suggest (ats_ch,
+ &cp->pid,
+ strength);
}
/**
- * Method called whenever a peer disconnects.
+ * This peer may no longer be needed, consider cleaning it up.
*
- * @param cls closure
- * @param peer peer identity this notification is about
+ * @param cp peer to clean up
*/
static void
-core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
-{
- struct CadetPeer *p;
- struct CadetPeerPath *direct_path;
- char own_id[16];
-
- strncpy (own_id, GNUNET_i2s (&my_full_id), 15);
- p = GNUNET_CONTAINER_multipeermap_get (peers, peer);
- if (NULL == p)
- {
- GNUNET_break (0);
- return;
- }
- if (myid == p->id)
- LOG (GNUNET_ERROR_TYPE_INFO, "DISCONNECTED %s (self)\n", own_id);
- else
- LOG (GNUNET_ERROR_TYPE_INFO, "DISCONNECTED %s <= %s\n",
- own_id, GNUNET_i2s (peer));
- direct_path = pop_direct_path (p);
- GNUNET_CONTAINER_multihashmap_iterate (p->connections, ¬ify_broken, p);
- GNUNET_CONTAINER_multihashmap_destroy (p->connections);
- p->connections = NULL;
- if (NULL != p->core_transmit)
- {
- GNUNET_CORE_notify_transmit_ready_cancel (p->core_transmit);
- p->core_transmit = NULL;
- }
- GNUNET_STATISTICS_update (stats, "# peers", -1, GNUNET_NO);
-
- path_destroy (direct_path);
- return;
-}
-
-
-/**
- * Functions to handle messages from core
- */
-static struct GNUNET_CORE_MessageHandler core_handlers[] = {
- {&GCC_handle_create, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0},
- {&GCC_handle_confirm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK,
- sizeof (struct GNUNET_CADET_ConnectionACK)},
- {&GCC_handle_broken, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
- sizeof (struct GNUNET_CADET_ConnectionBroken)},
- {&GCC_handle_destroy, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
- sizeof (struct GNUNET_CADET_ConnectionDestroy)},
- {&GCC_handle_ack, GNUNET_MESSAGE_TYPE_CADET_ACK,
- sizeof (struct GNUNET_CADET_ACK)},
- {&GCC_handle_poll, GNUNET_MESSAGE_TYPE_CADET_POLL,
- sizeof (struct GNUNET_CADET_Poll)},
- {&GCC_handle_encrypted, GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED, 0},
- {&GCC_handle_kx, GNUNET_MESSAGE_TYPE_CADET_KX, 0},
- {NULL, 0, 0}
-};
+consider_peer_destroy (struct CadetPeer *cp);
/**
- * To be called on core init/fail.
+ * We really no longere care about a peer, stop hogging memory with paths to it.
+ * Afterwards, see if there is more to be cleaned up about this peer.
*
- * @param cls Closure (config)
- * @param identity the public identity of this peer
+ * @param cls a `struct CadetPeer`.
*/
static void
-core_init (void *cls,
- const struct GNUNET_PeerIdentity *identity)
-{
- const struct GNUNET_CONFIGURATION_Handle *c = cls;
- static int i = 0;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n");
- if (0 != memcmp (identity, &my_full_id, sizeof (my_full_id)))
- {
- LOG (GNUNET_ERROR_TYPE_ERROR, _("Wrong CORE service\n"));
- LOG (GNUNET_ERROR_TYPE_ERROR, " core id %s\n", GNUNET_i2s (identity));
- LOG (GNUNET_ERROR_TYPE_ERROR, " my id %s\n", GNUNET_i2s (&my_full_id));
- GNUNET_CORE_disconnect (core_handle);
- core_handle = GNUNET_CORE_connect (c, /* Main configuration */
- NULL, /* Closure passed to CADET functions */
- &core_init, /* Call core_init once connected */
- &core_connect, /* Handle connects */
- &core_disconnect, /* remove peers on disconnects */
- NULL, /* Don't notify about all incoming messages */
- GNUNET_NO, /* For header only in notification */
- NULL, /* Don't notify about all outbound messages */
- GNUNET_NO, /* For header-only out notification */
- core_handlers); /* Register these handlers */
- if (10 < i++)
- GNUNET_abort();
- }
- GML_start ();
- return;
-}
-
-
-/**
- * Core callback to write a pre-constructed data packet to core buffer
- *
- * @param cls Closure (CadetTransmissionDescriptor with data in "data" member).
- * @param size Number of bytes available in buf.
- * @param buf Where the to write the message.
- *
- * @return number of bytes written to buf
- */
-static size_t
-send_core_data_raw (void *cls, size_t size, void *buf)
+drop_paths (void *cls)
{
- struct GNUNET_MessageHeader *msg = cls;
- size_t total_size;
-
- GNUNET_assert (NULL != msg);
- total_size = ntohs (msg->size);
+ struct CadetPeer *cp = cls;
+ struct CadetPeerPath *path;
- if (total_size > size)
- {
- GNUNET_break (0);
- return 0;
- }
- memcpy (buf, msg, total_size);
- GNUNET_free (cls);
- return total_size;
+ cp->destroy_task = NULL;
+ while (NULL != (path = GNUNET_CONTAINER_heap_remove_root (cp->path_heap)))
+ GCPP_release (path);
+ consider_peer_destroy (cp);
}
/**
- * Function to send a create connection message to a peer.
+ * This peer may no longer be needed, consider cleaning it up.
*
- * @param c Connection to create.
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param cp peer to clean up
*/
-static size_t
-send_core_connection_create (struct CadetConnection *c, size_t size, void *buf)
+static void
+consider_peer_destroy (struct CadetPeer *cp)
{
- struct GNUNET_CADET_ConnectionCreate *msg;
- struct GNUNET_PeerIdentity *peer_ptr;
- const struct CadetPeerPath *p = GCC_get_path (c);
- size_t size_needed;
- int i;
-
- if (NULL == p)
- return 0;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION CREATE...\n");
- size_needed =
- sizeof (struct GNUNET_CADET_ConnectionCreate) +
- p->length * sizeof (struct GNUNET_PeerIdentity);
+ struct GNUNET_TIME_Relative exp;
- if (size < size_needed || NULL == buf)
+ if (NULL != cp->destroy_task)
{
- GNUNET_break (0);
- return 0;
+ GNUNET_SCHEDULER_cancel (cp->destroy_task);
+ cp->destroy_task = NULL;
}
- msg = (struct GNUNET_CADET_ConnectionCreate *) buf;
- msg->header.size = htons (size_needed);
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE);
- msg->cid = *GCC_get_id (c);
-
- peer_ptr = (struct GNUNET_PeerIdentity *) &msg[1];
- for (i = 0; i < p->length; i++)
+ if (NULL != cp->t)
+ return; /* still relevant! */
+ if (NULL != cp->core_mq)
+ return; /* still relevant! */
+ if (0 != GNUNET_CONTAINER_multishortmap_size (cp->connections))
+ return; /* still relevant! */
+ if ( (NULL != cp->path_heap) &&
+ (0 < GNUNET_CONTAINER_heap_get_size (cp->path_heap)) )
{
- GNUNET_PEER_resolve (p->peers[i], peer_ptr++);
+ cp->destroy_task = GNUNET_SCHEDULER_add_delayed (IDLE_PATH_TIMEOUT,
+ &drop_paths,
+ cp);
+ return;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "CONNECTION CREATE (%u bytes long) sent!\n",
- size_needed);
- return size_needed;
-}
-
-
-/**
- * Creates a path ack message in buf and frees all unused resources.
- *
- * @param c Connection to send an ACK on.
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- *
- * @return number of bytes written to buf
- */
-static size_t
-send_core_connection_ack (struct CadetConnection *c, size_t size, void *buf)
-{
- struct GNUNET_CADET_ConnectionACK *msg = buf;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION ACK...\n");
- if (sizeof (struct GNUNET_CADET_ConnectionACK) > size)
+ if (0 != cp->num_paths)
+ return; /* still relevant! */
+ if (NULL != cp->hello)
{
- GNUNET_break (0);
- return 0;
+ /* relevant only until HELLO expires */
+ exp = GNUNET_TIME_absolute_get_remaining (GNUNET_HELLO_get_last_expiration (cp->hello));
+ cp->destroy_task = GNUNET_SCHEDULER_add_delayed (exp,
+ &destroy_peer,
+ cp);
+ return;
}
- msg->header.size = htons (sizeof (struct GNUNET_CADET_ConnectionACK));
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK);
- msg->cid = *GCC_get_id (c);
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "CONNECTION ACK sent!\n");
- return sizeof (struct GNUNET_CADET_ConnectionACK);
+ cp->destroy_task = GNUNET_SCHEDULER_add_delayed (IDLE_PEER_TIMEOUT,
+ &destroy_peer,
+ cp);
}
-/******************************************************************************/
-/******************************** STATIC ***********************************/
-/******************************************************************************/
-
-
/**
- * Get priority for a queued message.
- *
- * @param q Queued message
+ * Set the message queue to @a mq for peer @a cp and notify watchers.
*
- * @return CORE priority to use.
+ * @param cp peer to modify
+ * @param mq message queue to set (can be NULL)
*/
-static enum GNUNET_CORE_Priority
-get_priority (struct CadetPeerQueue *q)
+void
+GCP_set_mq (struct CadetPeer *cp,
+ struct GNUNET_MQ_Handle *mq)
{
- enum GNUNET_CORE_Priority low;
- enum GNUNET_CORE_Priority high;
-
- if (NULL == q)
- {
- GNUNET_break (0);
- return GNUNET_CORE_PRIO_BACKGROUND;
- }
-
- /* Relayed traffic has lower priority, our own traffic has higher */
- if (NULL == q->c || GNUNET_NO == GCC_is_origin (q->c, q->fwd))
- {
- low = GNUNET_CORE_PRIO_BEST_EFFORT;
- high = GNUNET_CORE_PRIO_URGENT;
- }
- else
- {
- low = GNUNET_CORE_PRIO_URGENT;
- high = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Message queue for peer %s is now %p\n",
+ GCP_2s (cp),
+ mq);
+ cp->core_mq = mq;
+ for (struct GCP_MessageQueueManager *mqm = cp->mqm_head, *next;
+ NULL != mqm;
+ mqm = next)
+ {
+ /* Save next pointer in case mqm gets freed by the callback */
+ next = mqm->next;
+ if (NULL == mq)
+ {
+ if (NULL != mqm->env)
+ {
+ GNUNET_MQ_discard (mqm->env);
+ mqm->env = NULL;
+ mqm->cb (mqm->cb_cls,
+ GNUNET_SYSERR);
+ }
+ else
+ {
+ mqm->cb (mqm->cb_cls,
+ GNUNET_NO);
+ }
+ }
+ else
+ {
+ GNUNET_assert (NULL == mqm->env);
+ mqm->cb (mqm->cb_cls,
+ GNUNET_YES);
+ }
}
-
- /* Bulky payload has lower priority, control traffic has higher. */
- if (GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED == q->type)
- return low;
+ if ( (NULL != mq) ||
+ (NULL != cp->t) )
+ consider_peer_activate (cp);
else
- return high;
-}
-
-
-/**
- * Iterator over tunnel hash map entries to destroy the tunnel during shutdown.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return #GNUNET_YES if we should continue to iterate,
- * #GNUNET_NO if not.
- */
-static int
-shutdown_tunnel (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
-{
- struct CadetPeer *p = value;
- struct CadetTunnel *t = p->tunnel;
-
- if (NULL != t)
- GCT_destroy (t);
- return GNUNET_YES;
-}
-
-
-/**
- * Destroy the peer_info and free any allocated resources linked to it
- *
- * @param peer The peer_info to destroy.
- *
- * @return GNUNET_OK on success
- */
-static int
-peer_destroy (struct CadetPeer *peer)
-{
- struct GNUNET_PeerIdentity id;
- struct CadetPeerPath *p;
- struct CadetPeerPath *nextp;
-
- GNUNET_PEER_resolve (peer->id, &id);
- GNUNET_PEER_change_rc (peer->id, -1);
+ consider_peer_destroy (cp);
- LOG (GNUNET_ERROR_TYPE_WARNING, "destroying peer %s\n", GNUNET_i2s (&id));
-
- if (GNUNET_YES !=
- GNUNET_CONTAINER_multipeermap_remove (peers, &id, peer))
- {
- GNUNET_break (0);
- LOG (GNUNET_ERROR_TYPE_WARNING, " not in peermap!!\n");
- }
- if (NULL != peer->search_h)
- {
- GCD_search_stop (peer->search_h);
- }
- p = peer->path_head;
- while (NULL != p)
+ if ( (NULL != mq) &&
+ (NULL != cp->t) )
{
- nextp = p->next;
- GNUNET_CONTAINER_DLL_remove (peer->path_head, peer->path_tail, p);
- path_destroy (p);
- p = nextp;
+ /* have a new, direct path to the target, notify tunnel */
+ struct CadetPeerPath *path;
+
+ path = GCPP_get_path_from_route (1,
+ &cp->pid);
+ GCT_consider_path (cp->t,
+ path,
+ 0);
}
- GCT_destroy_empty (peer->tunnel);
- GNUNET_free (peer);
- return GNUNET_OK;
}
/**
- * Returns if peer is used (has a tunnel or is neighbor).
- *
- * @param peer Peer to check.
+ * Debug function should NEVER return true in production code, useful to
+ * simulate losses for testcases.
*
- * @return #GNUNET_YES if peer is in use.
+ * @return #GNUNET_YES or #GNUNET_NO with the decision to drop.
*/
static int
-peer_is_used (struct CadetPeer *peer)
+should_I_drop (void)
{
- struct CadetPeerPath *p;
-
- if (NULL != peer->tunnel)
- return GNUNET_YES;
-
- for (p = peer->path_head; NULL != p; p = p->next)
- {
- if (p->length < 3)
- return GNUNET_YES;
- }
+ if (0 == drop_percent)
return GNUNET_NO;
+ if (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ 101) < drop_percent)
+ return GNUNET_YES;
+ return GNUNET_NO;
}
/**
- * Iterator over all the peers to get the oldest timestamp.
+ * Function called when CORE took one of the messages from
+ * a message queue manager and transmitted it.
*
- * @param cls Closure (unsued).
- * @param key ID of the peer.
- * @param value Peer_Info of the peer.
+ * @param cls the `struct CadetPeeer` where we made progress
*/
-static int
-peer_get_oldest (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
-{
- struct CadetPeer *p = value;
- struct GNUNET_TIME_Absolute *abs = cls;
-
- /* Don't count active peers */
- if (GNUNET_YES == peer_is_used (p))
- return GNUNET_YES;
-
- if (abs->abs_value_us < p->last_contact.abs_value_us)
- abs->abs_value_us = p->last_contact.abs_value_us;
-
- return GNUNET_YES;
-}
-
-
-/**
- * Iterator over all the peers to remove the oldest entry.
- *
- * @param cls Closure (unsued).
- * @param key ID of the peer.
- * @param value Peer_Info of the peer.
- */
-static int
-peer_timeout (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
-{
- struct CadetPeer *p = value;
- struct GNUNET_TIME_Absolute *abs = cls;
-
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "peer %s timeout\n", GNUNET_i2s (key));
-
- if (p->last_contact.abs_value_us == abs->abs_value_us &&
- GNUNET_NO == peer_is_used (p))
- {
- peer_destroy (p);
- return GNUNET_NO;
- }
- return GNUNET_YES;
-}
-
-
-/**
- * Delete oldest unused peer.
- */
-static void
-peer_delete_oldest (void)
-{
- struct GNUNET_TIME_Absolute abs;
-
- abs = GNUNET_TIME_UNIT_FOREVER_ABS;
-
- GNUNET_CONTAINER_multipeermap_iterate (peers,
- &peer_get_oldest,
- &abs);
- GNUNET_CONTAINER_multipeermap_iterate (peers,
- &peer_timeout,
- &abs);
-}
-
-
-/**
- * Choose the best (yet unused) path towards a peer,
- * considering the tunnel properties.
- *
- * @param peer The destination peer.
- *
- * @return Best current known path towards the peer, if any.
- */
-static struct CadetPeerPath *
-peer_get_best_path (const struct CadetPeer *peer)
-{
- struct CadetPeerPath *best_p;
- struct CadetPeerPath *p;
- unsigned int best_cost;
- unsigned int cost;
-
- best_cost = UINT_MAX;
- best_p = NULL;
- for (p = peer->path_head; NULL != p; p = p->next)
- {
- if (GNUNET_NO == path_is_valid (p))
- continue; /* Don't use invalid paths. */
- if (GNUNET_YES == GCT_is_path_used (peer->tunnel, p))
- continue; /* If path is already in use, skip it. */
-
- if ((cost = GCT_get_path_cost (peer->tunnel, p)) < best_cost)
- {
- best_cost = cost;
- best_p = p;
- }
- }
- return best_p;
-}
-
-
-/**
- * Is this queue element sendable?
- *
- * - All management traffic is always sendable.
- * - For payload traffic, check the connection flow control.
- *
- * @param q Queue element to inspect.
- *
- * @return #GNUNET_YES if it is sendable, #GNUNET_NO otherwise.
- */
-static int
-queue_is_sendable (struct CadetPeerQueue *q)
-{
- /* Is PID-independent? */
- switch (q->type)
- {
- case GNUNET_MESSAGE_TYPE_CADET_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_POLL:
- case GNUNET_MESSAGE_TYPE_CADET_KX:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
- return GNUNET_YES;
-
- case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
- break;
-
- default:
- GNUNET_break (0);
- }
-
- return GCC_is_sendable (q->c, q->fwd);
-}
-
-
-/**
- * Get first sendable message.
- *
- * @param peer The destination peer.
- *
- * @return First transmittable message, if any. Otherwise, NULL.
- */
-static struct CadetPeerQueue *
-peer_get_first_message (const struct CadetPeer *peer)
-{
- struct CadetPeerQueue *q;
-
- for (q = peer->queue_head; NULL != q; q = q->next)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Checking %p towards %s\n", q, GCC_2s (q->c));
- if (queue_is_sendable (q))
- return q;
- }
-
- return NULL;
-}
-
-
-/**
- * Function to process paths received for a new peer addition. The recorded
- * paths form the initial tunnel, which can be optimized later.
- * Called on each result obtained for the DHT search.
- *
- * @param cls closure
- * @param path
- */
-static void
-search_handler (void *cls, const struct CadetPeerPath *path)
-{
- struct CadetPeer *peer = cls;
- unsigned int connection_count;
-
- GCP_add_path_to_all (path, GNUNET_NO);
-
- /* Count connections */
- connection_count = GCT_count_connections (peer->tunnel);
-
- /* If we already have 3 (or more (?!)) connections, it's enough */
- if (3 <= connection_count)
- return;
-
- if (CADET_TUNNEL_SEARCHING == GCT_get_cstate (peer->tunnel))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " ... connect!\n");
- GCP_connect (peer);
- }
- return;
-}
-
-
-
-/**
- * Core callback to write a queued packet to core buffer
- *
- * @param cls Closure (peer info).
- * @param size Number of bytes available in buf.
- * @param buf Where the to write the message.
- *
- * @return number of bytes written to buf
- */
-static size_t
-queue_send (void *cls, size_t size, void *buf)
-{
- struct CadetPeer *peer = cls;
- struct CadetConnection *c;
- struct CadetPeerQueue *queue;
- const struct GNUNET_PeerIdentity *dst_id;
- size_t data_size;
- uint32_t pid;
-
- pid = 0;
- peer->core_transmit = NULL;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue send towards %s (max %u)\n",
- GCP_2s (peer), size);
-
- if (NULL == buf || 0 == size)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Buffer size 0.\n");
- return 0;
- }
-
- /* Initialize */
- queue = peer_get_first_message (peer);
- if (NULL == queue)
- {
- GNUNET_assert (0); /* Core tmt_rdy should've been canceled */
- return 0;
- }
- c = queue->c;
-
- dst_id = GNUNET_PEER_resolve2 (peer->id);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " on connection %s %s\n",
- GCC_2s (c), GC_f2s(queue->fwd));
- /* Check if buffer size is enough for the message */
- if (queue->size > size)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING, "not enough room (%u vs %u), reissue\n",
- queue->size, size);
- peer->core_transmit =
- GNUNET_CORE_notify_transmit_ready (core_handle,
- GNUNET_NO, get_priority (queue),
- GNUNET_TIME_UNIT_FOREVER_REL,
- dst_id,
- queue->size,
- &queue_send,
- peer);
- return 0;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG, " size %u ok\n", queue->size);
-
- /* Fill buf */
- switch (queue->type)
- {
- case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
- pid = GCC_get_pid (queue->c, queue->fwd);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " payload ID %u\n", pid);
- data_size = send_core_data_raw (queue->cls, size, buf);
- ((struct GNUNET_CADET_Encrypted *) buf)->pid = htonl (pid);
- break;
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
- case GNUNET_MESSAGE_TYPE_CADET_KX:
- case GNUNET_MESSAGE_TYPE_CADET_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_POLL:
- LOG (GNUNET_ERROR_TYPE_DEBUG, " raw %s\n", GC_m2s (queue->type));
- data_size = send_core_data_raw (queue->cls, size, buf);
- break;
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
- LOG (GNUNET_ERROR_TYPE_DEBUG, " path create\n");
- if (GCC_is_origin (c, GNUNET_YES))
- data_size = send_core_connection_create (c, size, buf);
- else
- data_size = send_core_data_raw (queue->cls, size, buf);
- break;
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
- LOG (GNUNET_ERROR_TYPE_DEBUG, " path ack\n");
- if (GCC_is_origin (c, GNUNET_NO) ||
- GCC_is_origin (c, GNUNET_YES))
- data_size = send_core_connection_ack (c, size, buf);
- else
- data_size = send_core_data_raw (queue->cls, size, buf);
- break;
- case GNUNET_MESSAGE_TYPE_CADET_DATA:
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
- case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
- /* This should be encapsulted */
- GNUNET_break (0);
- data_size = 0;
- break;
- default:
- GNUNET_break (0);
- LOG (GNUNET_ERROR_TYPE_WARNING, " type unknown: %u\n", queue->type);
- data_size = 0;
- }
-
- if (0 < drop_percent &&
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 101) < drop_percent)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s on connection %s\n",
- GC_m2s (queue->type), GCC_2s (c));
- data_size = 0;
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_INFO,
- "snd %s (%s %u) on connection %s (%p) %s (size %u)\n",
- GC_m2s (queue->type), GC_m2s (queue->payload_type),
- queue->payload_id, GCC_2s (c), c, GC_f2s (queue->fwd), data_size);
- }
-
- /* Free queue, but cls was freed by send_core_* */
- GCP_queue_destroy (queue, GNUNET_NO, GNUNET_YES, pid);
-
- /* If more data in queue, send next */
- queue = peer_get_first_message (peer);
- if (NULL != queue)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " more data!\n");
- if (NULL == peer->core_transmit)
- {
- peer->core_transmit =
- GNUNET_CORE_notify_transmit_ready (core_handle,
- GNUNET_NO, get_priority (queue),
- GNUNET_TIME_UNIT_FOREVER_REL,
- dst_id,
- queue->size,
- &queue_send,
- peer);
- queue->start_waiting = GNUNET_TIME_absolute_get ();
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "* tmt rdy called somewhere else\n");
- }
-// GCC_start_poll (); FIXME needed?
- }
- else
- {
-// GCC_stop_poll(); FIXME needed?
- }
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, " return %d\n", data_size);
- queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG);
- return data_size;
-}
-
-
-/******************************************************************************/
-/******************************** API ***********************************/
-/******************************************************************************/
-
-
-/**
- * Free a transmission that was already queued with all resources
- * associated to the request.
- *
- * @param queue Queue handler to cancel.
- * @param clear_cls Is it necessary to free associated cls?
- * @param sent Was it really sent? (Could have been canceled)
- * @param pid PID, if relevant (was sent and was a payload message).
- */
-void
-GCP_queue_destroy (struct CadetPeerQueue *queue, int clear_cls,
- int sent, uint32_t pid)
-{
- struct CadetPeer *peer;
-
- peer = queue->peer;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "queue destroy %s\n", GC_m2s (queue->type));
- if (GNUNET_YES == clear_cls)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " free cls\n");
- switch (queue->type)
- {
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
- LOG (GNUNET_ERROR_TYPE_INFO, "destroying a DESTROY message\n");
- /* fall through */
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
- case GNUNET_MESSAGE_TYPE_CADET_KEEPALIVE:
- case GNUNET_MESSAGE_TYPE_CADET_KX:
- case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
- case GNUNET_MESSAGE_TYPE_CADET_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_POLL:
- GNUNET_free_non_null (queue->cls);
- break;
-
- default:
- GNUNET_break (0);
- LOG (GNUNET_ERROR_TYPE_ERROR, " type %s unknown!\n",
- GC_m2s (queue->type));
- }
- }
- GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue);
-
- if (queue->type != GNUNET_MESSAGE_TYPE_CADET_ACK &&
- queue->type != GNUNET_MESSAGE_TYPE_CADET_POLL)
- {
- peer->queue_n--;
- }
-
- if (NULL != queue->callback)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " calling callback\n");
- queue->callback (queue->callback_cls,
- queue->c, sent, queue->type, pid,
- queue->fwd, queue->size,
- GNUNET_TIME_absolute_get_duration (queue->start_waiting));
- }
-
- if (NULL == peer_get_first_message (peer) && NULL != peer->core_transmit)
- {
- GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
- peer->core_transmit = NULL;
- }
-
- GNUNET_free (queue);
-}
-
-
-/**
- * @brief Queue and pass message to core when possible.
- *
- * @param peer Peer towards which to queue the message.
- * @param cls Closure (@c type dependant). It will be used by queue_send to
- * build the message to be sent if not already prebuilt.
- * @param type Type of the message, 0 for a raw message.
- * @param size Size of the message.
- * @param c Connection this message belongs to (can be NULL).
- * @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!)
- * @param cont Continuation to be called once CORE has taken the message.
- * @param cont_cls Closure for @c cont.
- *
- * @return Handle to cancel the message before it is sent. Once cont is called
- * message has been sent and therefore the handle is no longer valid.
- */
-struct CadetPeerQueue *
-GCP_queue_add (struct CadetPeer *peer, void *cls, uint16_t type,
- uint16_t payload_type, uint32_t payload_id, size_t size,
- struct CadetConnection *c, int fwd,
- GCP_sent cont, void *cont_cls)
-{
- struct CadetPeerQueue *queue;
- int error_level;
- int priority;
- int call_core;
-
- if (NULL == c && GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN != type)
- error_level = GNUNET_ERROR_TYPE_ERROR;
- else
- error_level = GNUNET_ERROR_TYPE_INFO;
- LOG (error_level,
- "que %s (%s %u) on connection %s (%p) %s towards %s (size %u)\n",
- GC_m2s (type), GC_m2s (payload_type), payload_id,
- GCC_2s (c), c, GC_f2s (fwd), GCP_2s (peer), size);
-
- if (error_level == GNUNET_ERROR_TYPE_ERROR)
- GNUNET_abort ();
- if (NULL == peer->connections)
- {
- /* We are not connected to this peer, ignore request. */
- LOG (GNUNET_ERROR_TYPE_WARNING, "%s not a neighbor\n", GCP_2s (peer));
- GNUNET_STATISTICS_update (stats, "# messages dropped due to wrong hop", 1,
- GNUNET_NO);
- return NULL;
- }
-
- priority = 0;
-
- if (GNUNET_MESSAGE_TYPE_CADET_POLL == type ||
- GNUNET_MESSAGE_TYPE_CADET_ACK == type)
- {
- priority = 100;
- }
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "priority %d\n", priority);
-
- call_core = NULL == c ? GNUNET_YES : GCC_is_sendable (c, fwd);
- queue = GNUNET_new (struct CadetPeerQueue);
- queue->cls = cls;
- queue->type = type;
- queue->payload_type = payload_type;
- queue->payload_id = payload_id;
- queue->size = size;
- queue->peer = peer;
- queue->c = c;
- queue->fwd = fwd;
- queue->callback = cont;
- queue->callback_cls = cont_cls;
- if (100 > priority)
- {
- GNUNET_CONTAINER_DLL_insert_tail (peer->queue_head, peer->queue_tail, queue);
- peer->queue_n++;
- }
- else
- {
- GNUNET_CONTAINER_DLL_insert (peer->queue_head, peer->queue_tail, queue);
- call_core = GNUNET_YES;
- }
-
- if (NULL == peer->core_transmit && GNUNET_YES == call_core)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "calling core tmt rdy towards %s for %u bytes\n",
- GCP_2s (peer), size);
- peer->core_transmit =
- GNUNET_CORE_notify_transmit_ready (core_handle,
- GNUNET_NO, get_priority (queue),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_PEER_resolve2 (peer->id),
- size,
- &queue_send,
- peer);
- queue->start_waiting = GNUNET_TIME_absolute_get ();
- }
- else if (GNUNET_NO == call_core)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s not needed\n",
- GCP_2s (peer));
-
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s already called\n",
- GCP_2s (peer));
-
- }
- queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG);
- return queue;
-}
-
-
-/**
- * Cancel all queued messages to a peer that belong to a certain connection.
- *
- * @param peer Peer towards whom to cancel.
- * @param c Connection whose queued messages to cancel. Might be destroyed by
- * the sent continuation call.
- */
-void
-GCP_queue_cancel (struct CadetPeer *peer, struct CadetConnection *c)
-{
- struct CadetPeerQueue *q;
- struct CadetPeerQueue *next;
- struct CadetPeerQueue *prev;
-
- for (q = peer->queue_head; NULL != q; q = next)
- {
- prev = q->prev;
- if (q->c == c)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "GMP queue cancel %s\n", GC_m2s (q->type));
- if (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY == q->type)
- {
- q->c = NULL;
- }
- else
- {
- GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0);
- }
-
- /* Get next from prev, q->next might be already freed:
- * queue destroy -> callback -> GCC_destroy -> cancel_queues -> here
- */
- if (NULL == prev)
- next = peer->queue_head;
- else
- next = prev->next;
- }
- else
- {
- next = q->next;
- }
- }
- if (NULL == peer->queue_head)
- {
- if (NULL != peer->core_transmit)
- {
- GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
- peer->core_transmit = NULL;
- }
- }
-}
-
-
-/**
- * Get the first transmittable message for a connection.
- *
- * @param peer Neighboring peer.
- * @param c Connection.
- *
- * @return First transmittable message.
- */
-static struct CadetPeerQueue *
-connection_get_first_message (struct CadetPeer *peer, struct CadetConnection *c)
-{
- struct CadetPeerQueue *q;
-
- for (q = peer->queue_head; NULL != q; q = q->next)
- {
- if (q->c != c)
- continue;
- if (queue_is_sendable (q))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " sendable!!\n");
- return q;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG, " not sendable\n");
- }
-
- return NULL;
-}
-
-
-/**
- * Get the first message for a connection and unqueue it.
- *
- * Only tunnel (or higher) level messages are unqueued. Connection specific
- * messages are destroyed and the count given to the caller.
- *
- * @param peer Neighboring peer.
- * @param c Connection.
- * @param del[out] How many messages have been deleted without returning.
- * Can be NULL.
- *
- * @return First message for this connection.
- */
-struct GNUNET_MessageHeader *
-GCP_connection_pop (struct CadetPeer *peer,
- struct CadetConnection *c,
- unsigned int *del)
-{
- struct CadetPeerQueue *q;
- struct CadetPeerQueue *next;
- struct GNUNET_MessageHeader *msg;
-
- if (NULL != del) *del = 0;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection pop on connection %p\n", c);
- for (q = peer->queue_head; NULL != q; q = next)
- {
- next = q->next;
- if (q->c != c)
- continue;
- switch (q->type)
- {
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
- case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
- case GNUNET_MESSAGE_TYPE_CADET_ACK:
- case GNUNET_MESSAGE_TYPE_CADET_POLL:
- GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0);
- if (NULL != del) *del = *del + 1;
- continue;
-
- case GNUNET_MESSAGE_TYPE_CADET_KX:
- case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
- msg = (struct GNUNET_MessageHeader *) q->cls;
- GCP_queue_destroy (q, GNUNET_NO, GNUNET_NO, 0);
- return msg;
-
- default:
- GNUNET_break (0);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Unknown message %s\n", GC_m2s (q->type));
- }
- }
-
- return NULL;
-}
-
-/**
- * Unlock a possibly locked queue for a connection.
- *
- * If there is a message that can be sent on this connection, call core for it.
- * Otherwise (if core transmit is already called or there is no sendable
- * message) do nothing.
- *
- * @param peer Peer who keeps the queue.
- * @param c Connection whose messages to unlock.
- */
-void
-GCP_queue_unlock (struct CadetPeer *peer, struct CadetConnection *c)
-{
- struct CadetPeerQueue *q;
- size_t size;
-
- if (NULL != peer->core_transmit)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " already unlocked!\n");
- return; /* Already unlocked */
- }
-
- q = connection_get_first_message (peer, c);
- if (NULL == q)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " queue empty!\n");
- return; /* Nothing to transmit */
- }
-
- size = q->size;
- peer->core_transmit =
- GNUNET_CORE_notify_transmit_ready (core_handle,
- GNUNET_NO, get_priority (q),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_PEER_resolve2 (peer->id),
- size,
- &queue_send,
- peer);
-}
-
-
-/**
- * Initialize the peer subsystem.
- *
- * @param c Configuration.
- */
-void
-GCP_init (const struct GNUNET_CONFIGURATION_Handle *c)
-{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "init\n");
- peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO);
- if (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_PEERS",
- &max_peers))
- {
- GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
- "CADET", "MAX_PEERS", "USING DEFAULT");
- max_peers = 1000;
- }
-
- if (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_number (c, "CADET", "DROP_PERCENT",
- &drop_percent))
- {
- drop_percent = 0;
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n");
- LOG (GNUNET_ERROR_TYPE_WARNING, "Cadet is running with DROP enabled.\n");
- LOG (GNUNET_ERROR_TYPE_WARNING, "This is NOT a good idea!\n");
- LOG (GNUNET_ERROR_TYPE_WARNING, "Remove DROP_PERCENT from config file.\n");
- LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n");
- }
-
- core_handle = GNUNET_CORE_connect (c, /* Main configuration */
- NULL, /* Closure passed to CADET functions */
- &core_init, /* Call core_init once connected */
- &core_connect, /* Handle connects */
- &core_disconnect, /* remove peers on disconnects */
- NULL, /* Don't notify about all incoming messages */
- GNUNET_NO, /* For header only in notification */
- NULL, /* Don't notify about all outbound messages */
- GNUNET_NO, /* For header-only out notification */
- core_handlers); /* Register these handlers */
- if (GNUNET_YES !=
- GNUNET_CONFIGURATION_get_value_yesno (c, "CADET", "DISABLE_TRY_CONNECT"))
- {
- transport_handle = GNUNET_TRANSPORT_connect (c, &my_full_id, NULL, /* cls */
- /* Notify callbacks */
- NULL, NULL, NULL);
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n");
- LOG (GNUNET_ERROR_TYPE_WARNING, "* DISABLE TRYING CONNECT in config *\n");
- LOG (GNUNET_ERROR_TYPE_WARNING, "* Use this only for test purposes. *\n");
- LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n");
- transport_handle = NULL;
- }
-
-
-
- if (NULL == core_handle)
- {
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
-
-}
+static void
+mqm_send_done (void *cls);
/**
- * Shut down the peer subsystem.
+ * Transmit current envelope from this @a mqm.
+ *
+ * @param mqm mqm to transmit message for now
*/
-void
-GCP_shutdown (void)
+static void
+mqm_execute (struct GCP_MessageQueueManager *mqm)
{
- GNUNET_CONTAINER_multipeermap_iterate (peers, &shutdown_tunnel, NULL);
+ struct CadetPeer *cp = mqm->cp;
- if (core_handle != NULL)
+ /* Move ready pointer to the next entry that might be ready. */
+ if ( (mqm == cp->mqm_ready_ptr) &&
+ (NULL != mqm->next) )
+ cp->mqm_ready_ptr = mqm->next;
+ /* Move entry to the end of the DLL, to be fair. */
+ if (mqm != cp->mqm_tail)
{
- GNUNET_CORE_disconnect (core_handle);
- core_handle = NULL;
+ GNUNET_CONTAINER_DLL_remove (cp->mqm_head,
+ cp->mqm_tail,
+ mqm);
+ GNUNET_CONTAINER_DLL_insert_tail (cp->mqm_head,
+ cp->mqm_tail,
+ mqm);
}
- if (transport_handle != NULL)
+ cp->mqm_ready_counter--;
+ if (GNUNET_YES == should_I_drop ())
{
- GNUNET_TRANSPORT_disconnect (transport_handle);
- transport_handle = NULL;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "DROPPING message to peer %s from MQM %p\n",
+ GCP_2s (cp),
+ mqm);
+ GNUNET_MQ_discard (mqm->env);
+ mqm->env = NULL;
+ mqm_send_done (cp);
}
- GNUNET_PEER_change_rc (myid, -1);
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending to peer %s from MQM %p\n",
+ GCP_2s (cp),
+ mqm);
+ GNUNET_MQ_send (cp->core_mq,
+ mqm->env);
+ mqm->env = NULL;
+ }
+ mqm->cb (mqm->cb_cls,
+ GNUNET_YES);
}
/**
- * Retrieve the CadetPeer stucture associated with the peer, create one
- * and insert it in the appropriate structures if the peer is not known yet.
+ * Find the next ready message in the queue (starting
+ * the search from the `cp->mqm_ready_ptr`) and if possible
+ * execute the transmission.
*
- * @param peer_id Full identity of the peer.
- *
- * @return Existing or newly created peer structure.
+ * @param cp peer to try to send the next ready message to
*/
-struct CadetPeer *
-GCP_get (const struct GNUNET_PeerIdentity *peer_id)
+static void
+send_next_ready (struct CadetPeer *cp)
{
- struct CadetPeer *peer;
-
- peer = GNUNET_CONTAINER_multipeermap_get (peers, peer_id);
- if (NULL == peer)
- {
- peer = GNUNET_new (struct CadetPeer);
- if (GNUNET_CONTAINER_multipeermap_size (peers) > max_peers)
- {
- peer_delete_oldest ();
- }
- GNUNET_CONTAINER_multipeermap_put (peers, peer_id, peer,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
- peer->id = GNUNET_PEER_intern (peer_id);
- }
- peer->last_contact = GNUNET_TIME_absolute_get ();
+ struct GCP_MessageQueueManager *mqm;
- return peer;
+ if (0 == cp->mqm_ready_counter)
+ return;
+ while ( (NULL != (mqm = cp->mqm_ready_ptr)) &&
+ (NULL == mqm->env) )
+ cp->mqm_ready_ptr = mqm->next;
+ if (NULL == mqm)
+ return; /* nothing to do */
+ mqm_execute (mqm);
}
/**
- * Retrieve the CadetPeer stucture associated with the peer, create one
- * and insert it in the appropriate structures if the peer is not known yet.
- *
- * @param peer Short identity of the peer.
+ * Function called when CORE took one of the messages from
+ * a message queue manager and transmitted it.
*
- * @return Existing or newly created peer structure.
+ * @param cls the `struct CadetPeeer` where we made progress
*/
-struct CadetPeer *
-GCP_get_short (const GNUNET_PEER_Id peer)
+static void
+mqm_send_done (void *cls)
{
- return GCP_get (GNUNET_PEER_resolve2 (peer));
+ struct CadetPeer *cp = cls;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending to peer %s completed\n",
+ GCP_2s (cp));
+ send_next_ready (cp);
}
/**
- * Try to connect to a peer on transport level.
+ * Send the message in @a env to @a cp.
*
- * @param cls Closure (peer).
- * @param tc TaskContext.
+ * @param mqm the message queue manager to use for transmission
+ * @param env envelope with the message to send; must NOT
+ * yet have a #GNUNET_MQ_notify_sent() callback attached to it
*/
-static void
-try_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+void
+GCP_send (struct GCP_MessageQueueManager *mqm,
+ struct GNUNET_MQ_Envelope *env)
{
- struct CadetPeer *peer = cls;
+ struct CadetPeer *cp = mqm->cp;
- if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ GNUNET_assert (NULL != env);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Queueing message to peer %s in MQM %p\n",
+ GCP_2s (cp),
+ mqm);
+ GNUNET_assert (NULL != cp->core_mq);
+ GNUNET_assert (NULL == mqm->env);
+ GNUNET_MQ_notify_sent (env,
+ &mqm_send_done,
+ cp);
+ mqm->env = env;
+ cp->mqm_ready_counter++;
+ if (mqm != cp->mqm_ready_ptr)
+ cp->mqm_ready_ptr = cp->mqm_head;
+ if (1 == cp->mqm_ready_counter)
+ cp->mqm_ready_ptr = mqm;
+ if (0 != GNUNET_MQ_get_length (cp->core_mq))
return;
-
- GNUNET_TRANSPORT_try_connect (transport_handle,
- GNUNET_PEER_resolve2 (peer->id), NULL, NULL);
+ send_next_ready (cp);
}
/**
- * Try to establish a new connection to this peer (in its tunnel).
- * If the peer doesn't have any path to it yet, try to get one.
- * If the peer already has some path, send a CREATE CONNECTION towards it.
+ * Function called to destroy a peer now.
*
- * @param peer Peer to connect to.
+ * @param cls NULL
+ * @param pid identity of the peer (unused)
+ * @param value the `struct CadetPeer` to clean up
+ * @return #GNUNET_OK (continue to iterate)
*/
-void
-GCP_connect (struct CadetPeer *peer)
+static int
+destroy_iterator_cb (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
{
- struct CadetTunnel *t;
- struct CadetPeerPath *p;
- struct CadetConnection *c;
- int rerun_search;
+ struct CadetPeer *cp = value;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "peer_connect towards %s\n", GCP_2s (peer));
-
- /* If we have a current hello, try to connect using it. */
- GCP_try_connect (peer);
-
- t = peer->tunnel;
- c = NULL;
- rerun_search = GNUNET_NO;
-
- if (NULL != peer->path_head)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " some path exists\n");
- p = peer_get_best_path (peer);
- if (NULL != p)
- {
- char *s;
-
- s = path_2s (p);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " path to use: %s\n", s);
- GNUNET_free (s);
-
- c = GCT_use_path (t, p);
- if (NULL == c)
- {
- /* This case can happen when the path includes a first hop that is
- * not yet known to be connected.
- *
- * This happens quite often during testing when running cadet
- * under valgrind: core connect notifications come very late and the
- * DHT result has already come and created a valid path.
- * In this case, the peer->connections hashmap will be NULL and
- * tunnel_use_path will not be able to create a connection from that
- * path.
- *
- * Re-running the DHT GET should give core time to callback.
- *
- * GCT_use_path -> GCC_new -> register_neighbors takes care of
- * updating statistics about this issue.
- */
- rerun_search = GNUNET_YES;
- }
- else
- {
- GCC_send_create (c);
- return;
- }
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " but is NULL, all paths are in use\n");
- }
- }
-
- if (NULL != peer->search_h && GNUNET_YES == rerun_search)
+ if (NULL != cp->destroy_task)
{
- GCD_search_stop (peer->search_h);
- peer->search_h = NULL;
- LOG (GNUNET_ERROR_TYPE_DEBUG, " Stopping DHT GET for peer %s\n",
- GCP_2s (peer));
+ GNUNET_SCHEDULER_cancel (cp->destroy_task);
+ cp->destroy_task = NULL;
}
+ destroy_peer (cp);
+ return GNUNET_OK;
+}
- if (NULL == peer->search_h)
- {
- const struct GNUNET_PeerIdentity *id;
- id = GNUNET_PEER_resolve2 (peer->id);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- " Starting DHT GET for peer %s\n", GCP_2s (peer));
- peer->search_h = GCD_search (id, &search_handler, peer);
- if (CADET_TUNNEL_NEW == GCT_get_cstate (t)
- || 0 == GCT_count_any_connections (t))
- GCT_change_cstate (t, CADET_TUNNEL_SEARCHING);
- }
+/**
+ * Clean up all entries about all peers.
+ * Must only be called after all tunnels, CORE-connections and
+ * connections are down.
+ */
+void
+GCP_destroy_all_peers ()
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Destroying all peers now\n");
+ GNUNET_CONTAINER_multipeermap_iterate (peers,
+ &destroy_iterator_cb,
+ NULL);
}
/**
- * Chech whether there is a direct (core level) connection to peer.
- *
- * @param peer Peer to check.
+ * Drop all paths owned by this peer, and do not
+ * allow new ones to be added: We are shutting down.
*
- * @return #GNUNET_YES if there is a direct connection.
+ * @param cp peer to drop paths to
*/
-int
-GCP_is_neighbor (const struct CadetPeer *peer)
+void
+GCP_drop_owned_paths (struct CadetPeer *cp)
{
struct CadetPeerPath *path;
- if (NULL == peer->connections)
- return GNUNET_NO;
-
- for (path = peer->path_head; NULL != path; path = path->next)
- {
- if (3 > path->length)
- return GNUNET_YES;
- }
-
- /* Is not a neighbor but connections is not NULL, probably disconnecting */
- return GNUNET_NO;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Destroying all paths to %s\n",
+ GCP_2s (cp));
+ while (NULL != (path =
+ GNUNET_CONTAINER_heap_remove_root (cp->path_heap)))
+ GCPP_release (path);
+ GNUNET_CONTAINER_heap_destroy (cp->path_heap);
+ cp->path_heap = NULL;
}
/**
- * Create and initialize a new tunnel towards a peer, in case it has none.
- * In case the peer already has a tunnel, nothing is done.
- *
- * Does not generate any traffic, just creates the local data structures.
+ * Add an entry to the DLL of all of the paths that this peer is on.
*
- * @param peer Peer towards which to create the tunnel.
+ * @param cp peer to modify
+ * @param entry an entry on a path
+ * @param off offset of this peer on the path
*/
void
-GCP_add_tunnel (struct CadetPeer *peer)
+GCP_path_entry_add (struct CadetPeer *cp,
+ struct CadetPeerPathEntry *entry,
+ unsigned int off)
{
- if (NULL != peer->tunnel)
- return;
- peer->tunnel = GCT_new (peer);
+ GNUNET_assert (cp == GCPP_get_peer_at_offset (entry->path,
+ off));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Discovered that peer %s is on path %s at offset %u\n",
+ GCP_2s (cp),
+ GCPP_2s (entry->path),
+ off);
+ if (off >= cp->path_dll_length)
+ {
+ unsigned int len = cp->path_dll_length;
+
+ GNUNET_array_grow (cp->path_heads,
+ len,
+ off + 4);
+ GNUNET_array_grow (cp->path_tails,
+ cp->path_dll_length,
+ off + 4);
+ }
+ GNUNET_CONTAINER_DLL_insert (cp->path_heads[off],
+ cp->path_tails[off],
+ entry);
+ cp->off_sum += off;
+ cp->num_paths++;
+
+ /* If we have a tunnel to this peer, tell the tunnel that there is a
+ new path available. */
+ if (NULL != cp->t)
+ GCT_consider_path (cp->t,
+ entry->path,
+ off);
+
+ if ( (NULL != cp->search_h) &&
+ (DESIRED_CONNECTIONS_PER_TUNNEL <= cp->num_paths) )
+ {
+ /* Now I have enough paths, stop search */
+ GCD_search_stop (cp->search_h);
+ cp->search_h = NULL;
+ }
+ if (NULL != cp->destroy_task)
+ {
+ /* paths changed, this resets the destroy timeout counter
+ and aborts a destroy task that may no longer be valid
+ to have (as we now have more paths via this peer). */
+ consider_peer_destroy (cp);
+ }
}
/**
- * Add a connection to a neighboring peer.
- *
- * Store that the peer is the first hop of the connection in one
- * direction and that on peer disconnect the connection must be
- * notified and destroyed, for it will no longer be valid.
+ * Remove an entry from the DLL of all of the paths that this peer is on.
*
- * @param peer Peer to add connection to.
- * @param c Connection to add.
- *
- * @return GNUNET_OK on success.
+ * @param cp peer to modify
+ * @param entry an entry on a path
+ * @param off offset of this peer on the path
*/
-int
-GCP_add_connection (struct CadetPeer *peer,
- struct CadetConnection *c)
+void
+GCP_path_entry_remove (struct CadetPeer *cp,
+ struct CadetPeerPathEntry *entry,
+ unsigned int off)
{
- int result;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "adding connection %s\n", GCC_2s (c));
- LOG (GNUNET_ERROR_TYPE_DEBUG, "to peer %s\n", GCP_2s (peer));
-
- if (NULL == peer->connections)
- {
- GNUNET_break (0);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Peer %s is not a neighbor!\n",
- GCP_2s (peer));
- return GNUNET_SYSERR;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "peer %s ok, has %u connections.\n",
- GCP_2s (peer), GNUNET_CONTAINER_multihashmap_size (peer->connections));
- result = GNUNET_CONTAINER_multihashmap_put (peer->connections,
- GCC_get_h (c),
- c,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- " now has %u connections.\n",
- GNUNET_CONTAINER_multihashmap_size (peer->connections));
- LOG (GNUNET_ERROR_TYPE_DEBUG, "result %u\n", result);
-
- return result;
+ "Removing knowledge about peer %s beging on path %s at offset %u\n",
+ GCP_2s (cp),
+ GCPP_2s (entry->path),
+ off);
+ GNUNET_CONTAINER_DLL_remove (cp->path_heads[off],
+ cp->path_tails[off],
+ entry);
+ GNUNET_assert (0 < cp->num_paths);
+ cp->off_sum -= off;
+ cp->num_paths--;
+ if ( (NULL == cp->core_mq) &&
+ (NULL != cp->t) &&
+ (NULL == cp->search_h) &&
+ (DESIRED_CONNECTIONS_PER_TUNNEL > cp->num_paths) )
+ cp->search_h
+ = GCD_search (&cp->pid);
+ if (NULL == cp->destroy_task)
+ {
+ /* paths changed, we might now be ready for destruction, check again */
+ consider_peer_destroy (cp);
+ }
}
/**
- * Add the path to the peer and update the path used to reach it in case this
- * is the shortest.
+ * Prune down the number of paths to this peer, we seem to
+ * have way too many.
*
- * @param peer Destination peer to add the path to.
- * @param path New path to add. Last peer must be the peer in arg 1.
- * Path will be either used of freed if already known.
- * @param trusted Do we trust that this path is real?
- *
- * @return path if path was taken, pointer to existing duplicate if exists
- * NULL on error.
+ * @param cls the `struct CadetPeer` to maintain the path heap for
*/
-struct CadetPeerPath *
-GCP_add_path (struct CadetPeer *peer, struct CadetPeerPath *path,
- int trusted)
+static void
+path_heap_cleanup (void *cls)
{
- struct CadetPeerPath *aux;
- unsigned int l;
- unsigned int l2;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "adding path [%u] to peer %s\n",
- path->length, GCP_2s (peer));
-
- if ((NULL == peer) || (NULL == path))
- {
- GNUNET_break (0);
- path_destroy (path);
- return NULL;
- }
- if (path->peers[path->length - 1] != peer->id)
- {
- GNUNET_break (0);
- path_destroy (path);
- return NULL;
- }
-
- for (l = 1; l < path->length; l++)
- {
- if (path->peers[l] == myid)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " shortening path by %u\n", l);
- for (l2 = 0; l2 < path->length - l; l2++)
- {
- path->peers[l2] = path->peers[l + l2];
- }
- path->length -= l;
- l = 1;
- path->peers = GNUNET_realloc (path->peers,
- path->length * sizeof (GNUNET_PEER_Id));
- }
+ struct CadetPeer *cp = cls;
+ struct CadetPeerPath *root;
+
+ cp->heap_cleanup_task = NULL;
+ while (GNUNET_CONTAINER_heap_get_size (cp->path_heap) >=
+ 2 * DESIRED_CONNECTIONS_PER_TUNNEL)
+ {
+ /* Now we have way too many, drop least desirable UNLESS it is in use!
+ (Note that this intentionally keeps highly desireable, but currently
+ unused paths around in the hope that we might be able to switch, even
+ if the number of paths exceeds the threshold.) */
+ root = GNUNET_CONTAINER_heap_peek (cp->path_heap);
+ GNUNET_assert (NULL != root);
+ if (NULL !=
+ GCPP_get_connection (root,
+ cp,
+ GCPP_get_length (root) - 1))
+ break; /* can't fix */
+ /* Got plenty of paths to this destination, and this is a low-quality
+ one that we don't care about. Allow it to die. */
+ GNUNET_assert (root ==
+ GNUNET_CONTAINER_heap_remove_root (cp->path_heap));
+ GCPP_release (root);
}
+}
- LOG (GNUNET_ERROR_TYPE_DEBUG, " final length: %u\n", path->length);
-
- if (2 >= path->length && GNUNET_NO == trusted)
- {
- /* Only allow CORE to tell us about direct paths */
- path_destroy (path);
- return NULL;
- }
- l = path_get_length (path);
- if (0 == l)
- {
- path_destroy (path);
+/**
+ * Try adding a @a path to this @a peer. If the peer already
+ * has plenty of paths, return NULL.
+ *
+ * @param cp peer to which the @a path leads to
+ * @param path a path looking for an owner; may not be fully initialized yet!
+ * @param off offset of @a cp in @a path
+ * @param force force attaching the path
+ * @return NULL if this peer does not care to become a new owner,
+ * otherwise the node in the peer's path heap for the @a path.
+ */
+struct GNUNET_CONTAINER_HeapNode *
+GCP_attach_path (struct CadetPeer *cp,
+ struct CadetPeerPath *path,
+ unsigned int off,
+ int force)
+{
+ GNUNET_CONTAINER_HeapCostType desirability;
+ struct CadetPeerPath *root;
+ GNUNET_CONTAINER_HeapCostType root_desirability;
+ struct GNUNET_CONTAINER_HeapNode *hn;
+
+ GNUNET_assert (off == GCPP_get_length (path) - 1);
+ GNUNET_assert (cp == GCPP_get_peer_at_offset (path,
+ off));
+ if (NULL == cp->path_heap)
+ {
+ /* #GCP_drop_owned_paths() was already called, we cannot take new ones! */
+ GNUNET_assert (GNUNET_NO == force);
return NULL;
}
-
- GNUNET_assert (peer->id == path->peers[path->length - 1]);
- for (aux = peer->path_head; aux != NULL; aux = aux->next)
+ desirability = GCPP_get_desirability (path);
+ if (GNUNET_NO == force)
{
- l2 = path_get_length (aux);
- if (l2 > l)
+ /* FIXME: desirability is not yet initialized; tricky! */
+ if (GNUNET_NO ==
+ GNUNET_CONTAINER_heap_peek2 (cp->path_heap,
+ (void **) &root,
+ &root_desirability))
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " added\n");
- GNUNET_CONTAINER_DLL_insert_before (peer->path_head,
- peer->path_tail, aux, path);
- if (NULL != peer->tunnel && 3 < GCT_count_connections (peer->tunnel))
- {
- GCP_connect (peer);
- }
- return path;
+ root = NULL;
+ root_desirability = 0;
}
- else
+
+ if ( (DESIRED_CONNECTIONS_PER_TUNNEL > cp->num_paths) &&
+ (desirability < root_desirability) )
{
- if (l2 == l && memcmp (path->peers, aux->peers, l) == 0)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, " already known\n");
- path_destroy (path);
- return aux;
- }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Decided to not attach path %p to peer %s due to undesirability\n",
+ GCPP_2s (path),
+ GCP_2s (cp));
+ return NULL;
}
}
- GNUNET_CONTAINER_DLL_insert_tail (peer->path_head, peer->path_tail,
- path);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " added last\n");
- if (NULL != peer->tunnel && 3 < GCT_count_connections (peer->tunnel))
- {
- GCP_connect (peer);
- }
- return path;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Attaching path %s to peer %s (%s)\n",
+ GCPP_2s (path),
+ GCP_2s (cp),
+ (GNUNET_NO == force) ? "desirable" : "forced");
+
+ /* Yes, we'd like to add this path, add to our heap */
+ hn = GNUNET_CONTAINER_heap_insert (cp->path_heap,
+ path,
+ desirability);
+
+ /* Consider maybe dropping other paths because of the new one */
+ if ( (GNUNET_CONTAINER_heap_get_size (cp->path_heap) >=
+ 2 * DESIRED_CONNECTIONS_PER_TUNNEL) &&
+ (NULL != cp->heap_cleanup_task) )
+ cp->heap_cleanup_task = GNUNET_SCHEDULER_add_now (&path_heap_cleanup,
+ cp);
+ return hn;
}
/**
- * Add the path to the origin peer and update the path used to reach it in case
- * this is the shortest.
- * The path is given in peer_info -> destination, therefore we turn the path
- * upside down first.
+ * This peer can no longer own @a path as the path
+ * has been extended and a peer further down the line
+ * is now the new owner.
*
- * @param peer Peer to add the path to, being the origin of the path.
- * @param path New path to add after being inversed.
- * Path will be either used or freed.
- * @param trusted Do we trust that this path is real?
- *
- * @return path if path was taken, pointer to existing duplicate if exists
- * NULL on error.
+ * @param cp old owner of the @a path
+ * @param path path where the ownership is lost
+ * @param hn note in @a cp's path heap that must be deleted
*/
-struct CadetPeerPath *
-GCP_add_path_to_origin (struct CadetPeer *peer,
- struct CadetPeerPath *path,
- int trusted)
+void
+GCP_detach_path (struct CadetPeer *cp,
+ struct CadetPeerPath *path,
+ struct GNUNET_CONTAINER_HeapNode *hn)
{
- if (NULL == path)
- return NULL;
- path_invert (path);
- return GCP_add_path (peer, path, trusted);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Detatching path %s from peer %s\n",
+ GCPP_2s (path),
+ GCP_2s (cp));
+ GNUNET_assert (path ==
+ GNUNET_CONTAINER_heap_remove_node (hn));
}
/**
- * Adds a path to the info of all the peers in the path
+ * Add a @a connection to this @a cp.
*
- * @param p Path to process.
- * @param confirmed Whether we know if the path works or not.
+ * @param cp peer via which the @a connection goes
+ * @param cc the connection to add
*/
void
-GCP_add_path_to_all (const struct CadetPeerPath *p, int confirmed)
+GCP_add_connection (struct CadetPeer *cp,
+ struct CadetConnection *cc)
{
- unsigned int i;
-
- /* TODO: invert and add */
- for (i = 0; i < p->length && p->peers[i] != myid; i++) /* skip'em */ ;
- for (i++; i < p->length; i++)
- {
- struct CadetPeer *aux;
- struct CadetPeerPath *copy;
-
- aux = GCP_get_short (p->peers[i]);
- copy = path_duplicate (p);
- copy->length = i + 1;
- GCP_add_path (aux, copy, p->length < 3 ? GNUNET_NO : confirmed);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Adding connection %s to peer %s\n",
+ GCC_2s (cc),
+ GCP_2s (cp));
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multishortmap_put (cp->connections,
+ &GCC_get_id (cc)->connection_of_tunnel,
+ cc,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ if (NULL != cp->destroy_task)
+ {
+ GNUNET_SCHEDULER_cancel (cp->destroy_task);
+ cp->destroy_task = NULL;
}
}
/**
- * Remove any path to the peer that has the extact same peers as the one given.
+ * Remove a @a connection that went via this @a cp.
*
- * @param peer Peer to remove the path from.
- * @param path Path to remove. Is always destroyed .
+ * @param cp peer via which the @a connection went
+ * @param cc the connection to remove
*/
void
-GCP_remove_path (struct CadetPeer *peer, struct CadetPeerPath *path)
+GCP_remove_connection (struct CadetPeer *cp,
+ struct CadetConnection *cc)
{
- struct CadetPeerPath *iter;
- struct CadetPeerPath *next;
-
- GNUNET_assert (myid == path->peers[0]);
- GNUNET_assert (peer->id == path->peers[path->length - 1]);
-
- for (iter = peer->path_head; NULL != iter; iter = next)
- {
- next = iter->next;
- if (0 == memcmp (path->peers, iter->peers,
- sizeof (GNUNET_PEER_Id) * path->length))
- {
- GNUNET_CONTAINER_DLL_remove (peer->path_head, peer->path_tail, iter);
- if (iter != path)
- path_destroy (iter);
- }
- }
- path_destroy (path);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Removing connection %s from peer %s\n",
+ GCC_2s (cc),
+ GCP_2s (cp));
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multishortmap_remove (cp->connections,
+ &GCC_get_id (cc)->connection_of_tunnel,
+ cc));
+ consider_peer_destroy (cp);
}
/**
- * Remove a connection from a neighboring peer.
- *
- * @param peer Peer to remove connection from.
- * @param c Connection to remove.
+ * Retrieve the CadetPeer stucture associated with the
+ * peer. Optionally create one and insert it in the appropriate
+ * structures if the peer is not known yet.
*
- * @return GNUNET_OK on success.
+ * @param peer_id Full identity of the peer.
+ * @param create #GNUNET_YES if a new peer should be created if unknown.
+ * #GNUNET_NO to return NULL if peer is unknown.
+ * @return Existing or newly created peer structure.
+ * NULL if unknown and not requested @a create
*/
-int
-GCP_remove_connection (struct CadetPeer *peer,
- const struct CadetConnection *c)
+struct CadetPeer *
+GCP_get (const struct GNUNET_PeerIdentity *peer_id,
+ int create)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "removing connection %s\n", GCC_2s (c));
- LOG (GNUNET_ERROR_TYPE_DEBUG, "from peer %s\n", GCP_2s (peer));
+ struct CadetPeer *cp;
- if (NULL == peer || NULL == peer->connections)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Peer %s is not a neighbor!\n",
- GCP_2s (peer));
- return GNUNET_SYSERR;
- }
+ cp = GNUNET_CONTAINER_multipeermap_get (peers,
+ peer_id);
+ if (NULL != cp)
+ return cp;
+ if (GNUNET_NO == create)
+ return NULL;
+ cp = GNUNET_new (struct CadetPeer);
+ cp->pid = *peer_id;
+ cp->connections = GNUNET_CONTAINER_multishortmap_create (32,
+ GNUNET_YES);
+ cp->path_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (peers,
+ &cp->pid,
+ cp,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "peer %s ok, has %u connections.\n",
- GCP_2s (peer), GNUNET_CONTAINER_multihashmap_size (peer->connections));
-
- return GNUNET_CONTAINER_multihashmap_remove (peer->connections,
- GCC_get_h (c),
- c);
+ "Creating peer %s\n",
+ GCP_2s (cp));
+ return cp;
}
+
/**
- * Start the DHT search for new paths towards the peer: we don't have
- * enough good connections.
+ * Obtain the peer identity for a `struct CadetPeer`.
*
- * @param peer Destination peer.
+ * @param cp our peer handle
+ * @return the peer identity
*/
-void
-GCP_start_search (struct CadetPeer *peer)
+const struct GNUNET_PeerIdentity *
+GCP_get_id (struct CadetPeer *cp)
{
- if (NULL != peer->search_h)
- {
- GNUNET_break (0);
- return;
- }
-
- peer->search_h = GCD_search (GCP_get_id (peer), &search_handler, peer);
+ return &cp->pid;
}
/**
- * Stop the DHT search for new paths towards the peer: we already have
- * enough good connections.
+ * Iterate over all known peers.
*
- * @param peer Destination peer.
+ * @param iter Iterator.
+ * @param cls Closure for @c iter.
*/
void
-GCP_stop_search (struct CadetPeer *peer)
+GCP_iterate_all (GNUNET_CONTAINER_PeerMapIterator iter,
+ void *cls)
{
- if (NULL == peer->search_h)
- {
- return;
- }
-
- GCD_search_stop (peer->search_h);
- peer->search_h = NULL;
+ GNUNET_CONTAINER_multipeermap_iterate (peers,
+ iter,
+ cls);
}
/**
- * Get the Full ID of a peer.
- *
- * @param peer Peer to get from.
+ * Count the number of known paths toward the peer.
*
- * @return Full ID of peer.
+ * @param cp Peer to get path info.
+ * @return Number of known paths.
*/
-const struct GNUNET_PeerIdentity *
-GCP_get_id (const struct CadetPeer *peer)
+unsigned int
+GCP_count_paths (const struct CadetPeer *cp)
{
- return GNUNET_PEER_resolve2 (peer->id);
+ return cp->num_paths;
}
/**
- * Get the Short ID of a peer.
+ * Iterate over the paths to a peer.
*
- * @param peer Peer to get from.
- *
- * @return Short ID of peer.
+ * @param cp Peer to get path info.
+ * @param callback Function to call for every path.
+ * @param callback_cls Closure for @a callback.
+ * @return Number of iterated paths.
*/
-GNUNET_PEER_Id
-GCP_get_short_id (const struct CadetPeer *peer)
+unsigned int
+GCP_iterate_paths (struct CadetPeer *cp,
+ GCP_PathIterator callback,
+ void *callback_cls)
{
- return peer->id;
+ unsigned int ret = 0;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Iterating over paths to peer %s%s\n",
+ GCP_2s (cp),
+ (NULL == cp->core_mq) ? "" : " including direct link");
+ if (NULL != cp->core_mq)
+ {
+ struct CadetPeerPath *path;
+
+ path = GCPP_get_path_from_route (1,
+ &cp->pid);
+ ret++;
+ if (GNUNET_NO ==
+ callback (callback_cls,
+ path,
+ 0))
+ return ret;
+ }
+ for (unsigned int i=0;i<cp->path_dll_length;i++)
+ {
+ for (struct CadetPeerPathEntry *pe = cp->path_heads[i];
+ NULL != pe;
+ pe = pe->next)
+ {
+ ret++;
+ if (GNUNET_NO ==
+ callback (callback_cls,
+ pe->path,
+ i))
+ return ret;
+ }
+ }
+ return ret;
}
/**
- * Set tunnel.
+ * Iterate over the paths to @a cp where
+ * @a cp is at distance @a dist from us.
*
- * @param peer Peer.
- * @param t Tunnel.
+ * @param cp Peer to get path info.
+ * @param dist desired distance of @a cp to us on the path
+ * @param callback Function to call for every path.
+ * @param callback_cls Closure for @a callback.
+ * @return Number of iterated paths.
*/
-void
-GCP_set_tunnel (struct CadetPeer *peer, struct CadetTunnel *t)
+unsigned int
+GCP_iterate_paths_at (struct CadetPeer *cp,
+ unsigned int dist,
+ GCP_PathIterator callback,
+ void *callback_cls)
{
- peer->tunnel = t;
- if (NULL == t && NULL != peer->search_h)
+ unsigned int ret = 0;
+
+ if (dist >= cp->path_dll_length)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Asked to look for paths at distance %u, but maximum for me is < %u\n",
+ dist,
+ cp->path_dll_length);
+ return 0;
+ }
+ for (struct CadetPeerPathEntry *pe = cp->path_heads[dist];
+ NULL != pe;
+ pe = pe->next)
{
- GCP_stop_search (peer);
+ if (GNUNET_NO ==
+ callback (callback_cls,
+ pe->path,
+ dist))
+ return ret;
+ ret++;
}
+ return ret;
}
/**
* Get the tunnel towards a peer.
*
- * @param peer Peer to get from.
- *
+ * @param cp Peer to get from.
+ * @param create #GNUNET_YES to create a tunnel if we do not have one
* @return Tunnel towards peer.
*/
struct CadetTunnel *
-GCP_get_tunnel (const struct CadetPeer *peer)
+GCP_get_tunnel (struct CadetPeer *cp,
+ int create)
{
- return peer->tunnel;
+ if (NULL == cp)
+ return NULL;
+ if ( (NULL != cp->t) ||
+ (GNUNET_NO == create) )
+ return cp->t;
+ cp->t = GCT_create_tunnel (cp);
+ consider_peer_activate (cp);
+ return cp->t;
}
/**
- * Set the hello message.
+ * Hello offer was passed to the transport service. Mark it
+ * as done.
*
- * @param peer Peer whose message to set.
- * @param hello Hello message.
+ * @param cls the `struct CadetPeer` where the offer completed
*/
-void
-GCP_set_hello (struct CadetPeer *peer, const struct GNUNET_HELLO_Message *hello)
+static void
+hello_offer_done (void *cls)
{
- struct GNUNET_HELLO_Message *old;
- size_t size;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "set hello for %s\n", GCP_2s (peer));
- if (NULL == hello)
- return;
+ struct CadetPeer *cp = cls;
- old = GCP_get_hello (peer);
- if (NULL == old)
- {
- size = GNUNET_HELLO_size (hello);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " new (%u bytes)\n", size);
- peer->hello = GNUNET_malloc (size);
- memcpy (peer->hello, hello, size);
- }
- else
- {
- peer->hello = GNUNET_HELLO_merge (old, hello);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " merge into %p (%u bytes)\n",
- peer->hello, GNUNET_HELLO_size (hello));
- GNUNET_free (old);
- }
+ cp->hello_offer = NULL;
}
/**
- * Get the hello message.
- *
- * @param peer Peer whose message to get.
+ * We got a HELLO for a @a peer, remember it, and possibly
+ * trigger adequate actions (like trying to connect).
*
- * @return Hello message.
+ * @param cp the peer we got a HELLO for
+ * @param hello the HELLO to remember
*/
-struct GNUNET_HELLO_Message *
-GCP_get_hello (struct CadetPeer *peer)
+void
+GCP_set_hello (struct CadetPeer *cp,
+ const struct GNUNET_HELLO_Message *hello)
{
- struct GNUNET_TIME_Absolute expiration;
- struct GNUNET_TIME_Relative remaining;
-
- if (NULL == peer->hello)
- return NULL;
+ struct GNUNET_HELLO_Message *mrg;
- expiration = GNUNET_HELLO_get_last_expiration (peer->hello);
- remaining = GNUNET_TIME_absolute_get_remaining (expiration);
- if (0 == remaining.rel_value_us)
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got %u byte HELLO for peer %s\n",
+ (unsigned int) GNUNET_HELLO_size (hello),
+ GCP_2s (cp));
+ if (NULL != cp->hello_offer)
+ {
+ GNUNET_TRANSPORT_offer_hello_cancel (cp->hello_offer);
+ cp->hello_offer = NULL;
+ }
+ if (NULL != cp->hello)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, " get - hello expired on %s\n",
- GNUNET_STRINGS_absolute_time_to_string (expiration));
- GNUNET_free (peer->hello);
- peer->hello = NULL;
+ mrg = GNUNET_HELLO_merge (hello,
+ cp->hello);
+ GNUNET_free (cp->hello);
+ cp->hello = mrg;
}
- return peer->hello;
+ else
+ {
+ cp->hello = GNUNET_memdup (hello,
+ GNUNET_HELLO_size (hello));
+ }
+ cp->hello_offer
+ = GNUNET_TRANSPORT_offer_hello (cfg,
+ GNUNET_HELLO_get_header (cp->hello) ,
+ &hello_offer_done,
+ cp);
+ /* New HELLO means cp's destruction time may change... */
+ consider_peer_destroy (cp);
}
/**
- * Try to connect to a peer on TRANSPORT level.
+ * The tunnel to the given peer no longer exists, remove it from our
+ * data structures, and possibly clean up the peer itself.
*
- * @param peer Peer to whom to connect.
+ * @param cp the peer affected
+ * @param t the dead tunnel
*/
void
-GCP_try_connect (struct CadetPeer *peer)
+GCP_drop_tunnel (struct CadetPeer *cp,
+ struct CadetTunnel *t)
{
- struct GNUNET_HELLO_Message *hello;
- struct GNUNET_MessageHeader *mh;
-
- if (NULL == transport_handle)
- return;
-
- hello = GCP_get_hello (peer);
- if (NULL == hello)
- return;
-
- mh = GNUNET_HELLO_get_header (hello);
- GNUNET_TRANSPORT_offer_hello (transport_handle, mh, try_connect, peer);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Dropping tunnel %s to peer %s\n",
+ GCT_2s (t),
+ GCP_2s (cp));
+ GNUNET_assert (cp->t == t);
+ cp->t = NULL;
+ consider_peer_destroy (cp);
}
/**
- * Notify a peer that a link between two other peers is broken. If any path
- * used that link, eliminate it.
+ * Test if @a cp has a core-level connection
*
- * @param peer Peer affected by the change.
- * @param peer1 Peer whose link is broken.
- * @param peer2 Peer whose link is broken.
+ * @param cp peer to test
+ * @return #GNUNET_YES if @a cp has a core-level connection
*/
-void
-GCP_notify_broken_link (struct CadetPeer *peer,
- struct GNUNET_PeerIdentity *peer1,
- struct GNUNET_PeerIdentity *peer2)
+int
+GCP_has_core_connection (struct CadetPeer *cp)
{
- struct CadetPeerPath *iter;
- struct CadetPeerPath *next;
- unsigned int i;
- GNUNET_PEER_Id p1;
- GNUNET_PEER_Id p2;
-
- p1 = GNUNET_PEER_search (peer1);
- p2 = GNUNET_PEER_search (peer2);
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Link %u-%u broken\n", p1, p2);
- if (0 == p1 || 0 == p2)
- {
- /* We don't even know them */
- return;
- }
-
- for (iter = peer->path_head; NULL != iter; iter = next)
- {
- next = iter->next;
- for (i = 0; i < iter->length - 1; i++)
- {
- if ((iter->peers[i] == p1 && iter->peers[i + 1] == p2)
- || (iter->peers[i] == p2 && iter->peers[i + 1] == p1))
- {
- char *s;
-
- s = path_2s (iter);
- LOG (GNUNET_ERROR_TYPE_DEBUG, " - invalidating %s\n", s);
- GNUNET_free (s);
-
- path_invalidate (iter);
- }
- }
- }
+ return (NULL != cp->core_mq) ? GNUNET_YES : GNUNET_NO;
}
/**
- * Count the number of known paths toward the peer.
- *
- * @param peer Peer to get path info.
+ * Start message queue change notifications.
*
- * @return Number of known paths.
+ * @param cp peer to notify for
+ * @param cb function to call if mq becomes available or unavailable
+ * @param cb_cls closure for @a cb
+ * @return handle to cancel request
*/
-unsigned int
-GCP_count_paths (const struct CadetPeer *peer)
+struct GCP_MessageQueueManager *
+GCP_request_mq (struct CadetPeer *cp,
+ GCP_MessageQueueNotificationCallback cb,
+ void *cb_cls)
{
- struct CadetPeerPath *iter;
- unsigned int i;
-
- for (iter = peer->path_head, i = 0; NULL != iter; iter = iter->next)
- i++;
-
- return i;
+ struct GCP_MessageQueueManager *mqm;
+
+ mqm = GNUNET_new (struct GCP_MessageQueueManager);
+ mqm->cb = cb;
+ mqm->cb_cls = cb_cls;
+ mqm->cp = cp;
+ GNUNET_CONTAINER_DLL_insert (cp->mqm_head,
+ cp->mqm_tail,
+ mqm);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating MQM %p for peer %s\n",
+ mqm,
+ GCP_2s (cp));
+ if (NULL != cp->core_mq)
+ cb (cb_cls,
+ GNUNET_YES);
+ return mqm;
}
/**
- * Iterate all known peers.
+ * Stops message queue change notifications.
*
- * @param iter Iterator.
- * @param cls Closure for @c iter.
+ * @param mqm handle matching request to cancel
+ * @param last_env final message to transmit, or NULL
*/
void
-GCP_iterate_all (GNUNET_CONTAINER_PeerMapIterator iter, void *cls)
+GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm,
+ struct GNUNET_MQ_Envelope *last_env)
{
- GNUNET_CONTAINER_multipeermap_iterate (peers, iter, cls);
+ struct CadetPeer *cp = mqm->cp;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Destroying MQM %p for peer %s%s\n",
+ mqm,
+ GCP_2s (cp),
+ (NULL == last_env) ? "" : " with last ditch transmission");
+ if (NULL != mqm->env)
+ GNUNET_MQ_discard (mqm->env);
+ if (NULL != last_env)
+ {
+ if (NULL != cp->core_mq)
+ {
+ GNUNET_MQ_notify_sent (last_env,
+ &mqm_send_done,
+ cp);
+ GNUNET_MQ_send (cp->core_mq,
+ last_env);
+ }
+ else
+ {
+ GNUNET_MQ_discard (last_env);
+ }
+ }
+ if (cp->mqm_ready_ptr == mqm)
+ cp->mqm_ready_ptr = mqm->next;
+ GNUNET_CONTAINER_DLL_remove (cp->mqm_head,
+ cp->mqm_tail,
+ mqm);
+ GNUNET_free (mqm);
}
/**
- * Get the static string for a peer ID.
+ * Send the message in @a env to @a cp, overriding queueing logic.
+ * This function should only be used to send error messages outside
+ * of flow and congestion control, similar to ICMP. Note that
+ * the envelope may be silently discarded as well.
*
- * @param peer Peer.
- *
- * @return Static string for it's ID.
+ * @param cp peer to send the message to
+ * @param env envelope with the message to send
*/
-const char *
-GCP_2s (const struct CadetPeer *peer)
+void
+GCP_send_ooo (struct CadetPeer *cp,
+ struct GNUNET_MQ_Envelope *env)
{
- if (NULL == peer)
- return "(NULL)";
- return GNUNET_i2s (GNUNET_PEER_resolve2 (peer->id));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending message to %s out of management\n",
+ GCP_2s (cp));
+ if (NULL == cp->core_mq)
+ {
+ GNUNET_MQ_discard (env);
+ return;
+ }
+ GNUNET_MQ_notify_sent (env,
+ &mqm_send_done,
+ cp);
+ GNUNET_MQ_send (cp->core_mq,
+ env);
}
+
+
+
+
+/* end of gnunet-service-cadet-new_peer.c */