- FIX: GNUNET_SET_STATUS_HALF_DONE is never called only GNUNET_SET_STATUS_DONE
[oweals/gnunet.git] / src / dv / gnunet-service-dv.c
index 00de78aae345c023f31e726e345fca2e61664324..84bdc11c23020c69c34710f043ee0140921206ff 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet.
 /*
      This file is part of GNUnet.
-     (C) 2009 Christian Grothoff (and other contributing authors)
+     (C) 2013 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
  *
  * @author Christian Grothoff
  * @author Nathan Evans
  *
  * @author Christian Grothoff
  * @author Nathan Evans
- *
- * TODO: The gossip rates need to be worked out.  Probably many other things
- * as well.
- *
  */
 #include "platform.h"
  */
 #include "platform.h"
-#include "gnunet_client_lib.h"
-#include "gnunet_getopt_lib.h"
-#include "gnunet_os_lib.h"
+#include "gnunet_util_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_protocols.h"
-#include "gnunet_service_lib.h"
 #include "gnunet_core_service.h"
 #include "gnunet_core_service.h"
-#include "gnunet_signal_lib.h"
-#include "gnunet_util_lib.h"
 #include "gnunet_hello_lib.h"
 #include "gnunet_peerinfo_service.h"
 #include "gnunet_hello_lib.h"
 #include "gnunet_peerinfo_service.h"
-#include "gnunet_crypto_lib.h"
+#include "gnunet_statistics_service.h"
+#include "gnunet_set_service.h"
+#include "gnunet_ats_service.h"
 #include "dv.h"
 #include "dv.h"
+#include <gcrypt.h>
 
 
-/**
- * For testing mostly, remember only the
- * shortest path to a distant neighbor.
- */
-#define AT_MOST_ONE GNUNET_NO
-
-#define USE_PEER_ID GNUNET_YES
-
-/**
- * How often do we check about sending out more peer information (if
- * we are connected to no peers previously).
- */
-#define GNUNET_DV_DEFAULT_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 500000)
-
-/**
- * How long do we wait at most between sending out information?
- */
-#define GNUNET_DV_MAX_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 500000)
-
-/**
- * How long can we have not heard from a peer and
- * still have it in our tables?
- */
-#define GNUNET_DV_PEER_EXPIRATION_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1000))
 
 /**
 
 /**
- * Priority for gossip.
+ * How often do we establish the consensu?
  */
  */
-#define GNUNET_DV_DHT_GOSSIP_PRIORITY (GNUNET_EXTREME_PRIORITY / 10)
+#define GNUNET_DV_CONSENSUS_FREQUENCY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
 
 /**
 
 /**
- * How often should we check if expiration time has elapsed for
- * some peer?
+ * Maximum number of messages we queue per peer.
  */
  */
-#define GNUNET_DV_MAINTAIN_FREQUENCY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5))
+#define MAX_QUEUE_SIZE 16
 
 /**
 
 /**
- * How long to allow a message to be delayed?
+ * Maximum number of messages we queue towards the clients/plugin.
  */
  */
-#define DV_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5))
+#define MAX_QUEUE_SIZE_PLUGIN 1024
 
 /**
 
 /**
- * Priority to use for DV data messages.
+ * The default fisheye depth, from how many hops away will
+ * we keep peers?
  */
  */
-#define DV_PRIORITY 0
+#define DEFAULT_FISHEYE_DEPTH 3
 
 /**
 
 /**
- * The cost to a direct neighbor.  We used to use 0, but 1 makes more sense.
+ * How many hops is a direct neighbor away?
  */
 #define DIRECT_NEIGHBOR_COST 1
 
  */
 #define DIRECT_NEIGHBOR_COST 1
 
-/**
- * The default number of direct connections to store in DV (max)
- */
-#define DEFAULT_DIRECT_CONNECTIONS 50
-
-/**
- * The default size of direct + extended peers in DV (max)
- */
-#define DEFAULT_DV_SIZE 100
 
 
-/**
- * The default fisheye depth, from how many hops away will
- * we keep peers?
- */
-#define DEFAULT_FISHEYE_DEPTH 4
+GNUNET_NETWORK_STRUCT_BEGIN
 
 /**
 
 /**
- * Linked list of messages to send to clients.
+ * Information about a peer DV can route to.  These entries are what
+ * we use as the binary format to establish consensus to create our
+ * routing table and as the address format in the HELLOs.
  */
  */
-struct PendingMessage
+struct Target
 {
 {
-  /**
-   * Pointer to next item in the list
-   */
-  struct PendingMessage *next;
-
-  /**
-   * Pointer to previous item in the list
-   */
-  struct PendingMessage *prev;
-
-  /**
-   * The PeerIdentity to send to
-   */
-  struct GNUNET_PeerIdentity recipient;
-
-  /**
-   * The result of message sending.
-   */
-  struct GNUNET_DV_SendResultMessage *send_result;
-
-  /**
-   * Message importance level.
-   */
-  unsigned int importance;
-
-  /**
-   * Size of message.
-   */
-  unsigned int msg_size;
 
   /**
 
   /**
-   * How long to wait before sending message.
+   * Identity of the peer we can reach.
    */
    */
-  struct GNUNET_TIME_Relative timeout;
+  struct GNUNET_PeerIdentity peer;
 
   /**
 
   /**
-   * Actual message to be sent; // avoid allocation
+   * How many hops (1-3) is this peer away? in network byte order
    */
    */
-  const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
+  uint32_t distance GNUNET_PACKED;
 
 };
 
 
 };
 
-struct FastGossipNeighborList
-{
-  /**
-   * Next element of DLL
-   */
-  struct FastGossipNeighborList *next;
-
-  /**
-   * Prev element of DLL
-   */
-  struct FastGossipNeighborList *prev;
-
-  /**
-   * The neighbor to gossip about
-   */
-  struct DistantNeighbor *about;
-};
 
 /**
 
 /**
- * Context created whenever a direct peer connects to us,
- * used to gossip other peers to it.
+ * Message exchanged between DV services (via core), requesting a
+ * message to be routed.
  */
  */
-struct NeighborSendContext
+struct RouteMessage
 {
   /**
 {
   /**
-   * The peer we will gossip to.
+   * Type: GNUNET_MESSAGE_TYPE_DV_ROUTE
    */
    */
-  struct DirectNeighbor *toNeighbor;
+  struct GNUNET_MessageHeader header;
 
   /**
 
   /**
-   * The task associated with this context.
+   * Expected (remaining) distance.  Must be always smaller than
+   * DEFAULT_FISHEYE_DEPTH, should be zero at the target.  Must
+   * be decremented by one at each hop.  Peers must not forward
+   * these messages further once the counter has reached zero.
    */
    */
-  GNUNET_SCHEDULER_TaskIdentifier task;
+  uint32_t distance GNUNET_PACKED;
 
   /**
 
   /**
-   * Head of DLL of peers to gossip about
-   * as fast as possible to this peer, for initial
-   * set up.
+   * The (actual) target of the message (this peer, if distance is zero).
    */
    */
-  struct FastGossipNeighborList *fast_gossip_list_head;
+  struct GNUNET_PeerIdentity target;
 
   /**
 
   /**
-   * Tail of DLL of peers to gossip about
-   * as fast as possible to this peer, for initial
-   * set up.
+   * The (actual) sender of the message.
    */
    */
-  struct FastGossipNeighborList *fast_gossip_list_tail;
+  struct GNUNET_PeerIdentity sender;
 
 };
 
 
 };
 
+GNUNET_NETWORK_STRUCT_END
+
 
 /**
 
 /**
- * Struct to hold information for updating existing neighbors
+ * Linked list of messages to send to clients.
  */
  */
-struct NeighborUpdateInfo
+struct PendingMessage
 {
   /**
 {
   /**
-   * Cost
+   * Pointer to next item in the list
    */
    */
-  unsigned int cost;
+  struct PendingMessage *next;
 
   /**
 
   /**
-   * The existing neighbor
+   * Pointer to previous item in the list
    */
    */
-  struct DistantNeighbor *neighbor;
+  struct PendingMessage *prev;
 
   /**
 
   /**
-   * The referrer of the possibly existing peer
+   * Actual message to be sent, allocated after this struct.
    */
    */
-  struct DirectNeighbor *referrer;
+  const struct GNUNET_MessageHeader *msg;
 
   /**
 
   /**
-   * The time we heard about this peer
+   * Ultimate target for the message.
    */
    */
-  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_PeerIdentity ultimate_target;
 
   /**
 
   /**
-   * Peer id this peer uses to refer to neighbor.
+   * Unique ID of the message.
    */
    */
-  unsigned int referrer_peer_id;
+  uint32_t uid;
 
 };
 
 
 };
 
+
 /**
 /**
- * Struct where actual neighbor information is stored,
- * referenced by min_heap and max_heap.  Freeing dealt
- * with when items removed from hashmap.
+ * Information about a direct neighbor (core-level, excluding
+ * DV-links, only DV-enabled peers).
  */
 struct DirectNeighbor
 {
  */
 struct DirectNeighbor
 {
-  /**
-   * Identity of neighbor.
-   */
-  struct GNUNET_PeerIdentity identity;
 
   /**
 
   /**
-   * PublicKey of neighbor.
+   * Identity of the peer.
    */
    */
-  struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pkey;
+  struct GNUNET_PeerIdentity peer;
 
   /**
 
   /**
-   * Head of DLL of nodes that this direct neighbor referred to us.
+   * Session ID we use whenever we create a set union with
+   * this neighbor; constructed from the XOR of our peer
+   * IDs and then salted with "DV-SALT" to avoid conflicts
+   * with other applications.
    */
    */
-  struct DistantNeighbor *referee_head;
+  struct GNUNET_HashCode real_session_id;
 
   /**
 
   /**
-   * Tail of DLL of nodes that this direct neighbor referred to us.
+   * Head of linked list of messages to send to this peer.
    */
    */
-  struct DistantNeighbor *referee_tail;
-
-  /**
-   * The sending context for gossiping peers to this neighbor.
-   */
-  struct NeighborSendContext *send_context;
-
-  /**
-   * Is this one of the direct neighbors that we are "hiding"
-   * from DV?
-   */
-  int hidden;
-};
-
+  struct PendingMessage *pm_head;
 
 
-/**
- * Struct where actual neighbor information is stored,
- * referenced by min_heap and max_heap.  Freeing dealt
- * with when items removed from hashmap.
- */
-struct DistantNeighbor
-{
   /**
   /**
-   * We keep distant neighbor's of the same referrer in a DLL.
+   * Tail of linked list of messages to send to this peer.
    */
    */
-  struct DistantNeighbor *next;
+  struct PendingMessage *pm_tail;
 
   /**
 
   /**
-   * We keep distant neighbor's of the same referrer in a DLL.
+   * Transmit handle to core service.
    */
    */
-  struct DistantNeighbor *prev;
+  struct GNUNET_CORE_TransmitHandle *cth;
 
   /**
 
   /**
-   * Node in min heap
+   * Routing table of the neighbor, NULL if not yet established.
+   * Keys are peer identities, values are 'struct Target' entries.
+   * Note that the distances in the targets are from the point-of-view
+   * of the peer, not from us!
    */
    */
-  struct GNUNET_CONTAINER_HeapNode *min_loc;
+  struct GNUNET_CONTAINER_MultiPeerMap *neighbor_table;
 
   /**
 
   /**
-   * Node in max heap
+   * Updated routing table of the neighbor, under construction,
+   * NULL if we are not currently building it.
+   * Keys are peer identities, values are 'struct Target' entries.
+   * Note that the distances in the targets are from the point-of-view
+   * of the peer, not from us!
    */
    */
-  struct GNUNET_CONTAINER_HeapNode *max_loc;
+  struct GNUNET_CONTAINER_MultiPeerMap *neighbor_table_consensus;
 
   /**
 
   /**
-   * Identity of referrer (next hop towards 'neighbor').
+   * Our current (exposed) routing table as a set.
    */
    */
-  struct DirectNeighbor *referrer;
+  struct GNUNET_SET_Handle *my_set;
 
   /**
 
   /**
-   * Identity of neighbor.
+   * Handle for our current active set union operation.
    */
    */
-  struct GNUNET_PeerIdentity identity;
+  struct GNUNET_SET_OperationHandle *set_op;
 
   /**
 
   /**
-   * PublicKey of neighbor.
+   * Handle used if we are listening for this peer, waiting for the
+   * other peer to initiate construction of the set union.  NULL if
+   * we ar the initiating peer.
    */
    */
-  struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *pkey;
+  struct GNUNET_SET_ListenHandle *listen_handle;
 
   /**
 
   /**
-   * Last time we received routing information from this peer
+   * ID of the task we use to (periodically) update our consensus
+   * with this peer.  Used if we are the initiating peer.
    */
    */
-  struct GNUNET_TIME_Absolute last_activity;
+  GNUNET_SCHEDULER_TaskIdentifier initiate_task;
 
   /**
 
   /**
-   * Last time we sent routing information about this peer
+   * At what offset are we, with respect to inserting our own routes
+   * into the consensus?
    */
    */
-  struct GNUNET_TIME_Absolute last_gossip;
+  unsigned int consensus_insertion_offset;
 
   /**
 
   /**
-   * Cost to neighbor, used for actual distance vector computations
+   * At what distance are we, with respect to inserting our own routes
+   * into the consensus?
    */
    */
-  unsigned int cost;
+  unsigned int consensus_insertion_distance;
 
   /**
 
   /**
-   * Random identifier *we* use for this peer, to be used as shortcut
-   * instead of sending full peer id for each message
+   * Number of messages currently in the 'pm_XXXX'-DLL.
    */
    */
-  unsigned int our_id;
+  unsigned int pm_queue_size;
 
   /**
 
   /**
-   * Random identifier the *referrer* uses for this peer.
+   * Elements in consensus
    */
    */
-  unsigned int referrer_id;
+  unsigned int consensus_elements;
 
   /**
 
   /**
-   * Is this one of the direct neighbors that we are "hiding"
-   * from DV?
+   * Direct one hop route
    */
    */
-  int hidden;
-
-};
+  struct Route *direct_route;
 
 
-struct PeerIteratorContext
-{
   /**
   /**
-   * The actual context, to be freed later.
+   * Flag set within 'check_target_removed' to trigger full global route refresh.
    */
    */
-  struct GNUNET_PEERINFO_IteratorContext *ic;
+  int target_removed;
 
   /**
 
   /**
-   * The neighbor about which we are concerned.
+   * Our distance to this peer, 0 for unknown.
    */
    */
-  struct DirectNeighbor *neighbor;
+  uint32_t distance;
 
   /**
 
   /**
-   * The distant neighbor entry for this direct neighbor.
+   * Is this neighbor connected at the core level?
    */
    */
-  struct DistantNeighbor *distant;
+  int connected;
 
 };
 
 
 };
 
+
 /**
 /**
- * Context used for creating hello messages when
- * gossips are received.
+ * A route includes information about the next hop,
+ * the target, and the ultimate distance to the
+ * target.
  */
  */
-struct HelloContext
-{
-  /**
-   * Identity of distant neighbor.
-   */
-  struct GNUNET_PeerIdentity distant_peer;
-
-  /**
-   * Identity of direct neighbor, via which we send this message.
-   */
-  const struct GNUNET_PeerIdentity *direct_peer;
-
-  /**
-   * How many addresses do we need to add (always starts at 1, then set to 0)
-   */
-  int addresses_to_add;
-
-};
-
-struct DV_SendContext
+struct Route
 {
 {
-  /**
-   * The distant peer (should always match)
-   */
-  struct GNUNET_PeerIdentity *distant_peer;
-
-  /**
-   * The direct peer, we need to verify the referrer of.
-   */
-  struct GNUNET_PeerIdentity *direct_peer;
-
-  /**
-   * The message to be sent
-   */
-  struct GNUNET_MessageHeader *message;
 
   /**
 
   /**
-   * The pre-built send result message.  Simply needs to be queued
-   * and freed once send has been called!
+   * Which peer do we need to forward the message to?
    */
    */
-  struct GNUNET_DV_SendResultMessage *send_result;
+  struct DirectNeighbor *next_hop;
 
   /**
 
   /**
-   * The size of the message being sent, may be larger
-   * than message->header.size because it's multiple
-   * messages packed into one!
+   * What would be the target, and how far is it away?
    */
    */
-  size_t message_size;
+  struct Target target;
 
   /**
 
   /**
-   * How important is this message?
+   * Offset of this target in the respective consensus set.
    */
    */
-  unsigned int importance;
+  unsigned int set_offset;
 
 
-  /**
-   * Timeout for this message
-   */
-  struct GNUNET_TIME_Relative timeout;
-
-  /**
-   * Unique ID for DV message
-   */
-  unsigned int uid;
 };
 
 };
 
-struct FindDestinationContext
-{
-  unsigned int tid;
-  struct DistantNeighbor *dest;
-};
 
 
-struct FindIDContext
+/**
+ * Set of targets we bring to a consensus; all targets in a set have a
+ * distance equal to the sets distance (which is implied by the array
+ * index of the set).
+ */
+struct ConsensusSet
 {
 {
-  unsigned int tid;
-  struct GNUNET_PeerIdentity *dest;
-  const struct GNUNET_PeerIdentity *via;
-};
 
 
-struct DisconnectContext
-{
   /**
   /**
-   * Distant neighbor to get pid from.
+   * Array of targets in the set, may include NULL entries if a
+   * neighbor has disconnected; the targets are allocated with the
+   * respective container (all_routes), not here.
    */
    */
-  struct DistantNeighbor *distant;
+  struct Route **targets;
 
   /**
 
   /**
-   * Direct neighbor that disconnected.
+   * Size of the 'targets' array.
    */
    */
-  struct DirectNeighbor *direct;
-};
+  unsigned int array_length;
 
 
-struct TokenizedMessageContext
-{
-  /**
-   * Immediate sender of this message
-   */
-  const struct GNUNET_PeerIdentity *peer;
+};
 
 
-  /**
-   * Distant sender of the message
-   */
-  struct DistantNeighbor *distant;
 
 
-  /**
-   * Uid for this set of messages
-   */
-  uint32_t uid;
-};
+/**
+ * Peermap of all of our neighbors; processing these usually requires
+ * first checking to see if the peer is core-connected and if the
+ * distance is 1, in which case they are direct neighbors.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *direct_neighbors;
 
 /**
 
 /**
- * Context for finding the least cost peer to send to.
- * Transport selection can only go so far.
+ * Hashmap with all routes that we currently support; contains
+ * routing information for all peers from distance 2
+ * up to distance DEFAULT_FISHEYE_DEPTH.
  */
  */
-struct FindLeastCostContext
-{
-  struct DistantNeighbor *target;
-  unsigned int least_cost;
-};
+static struct GNUNET_CONTAINER_MultiPeerMap *all_routes;
 
 /**
 
 /**
- * Handle to the core service api.
+ * Array of consensus sets we expose to the outside world.  Sets
+ * are structured by the distance to the target.
  */
  */
-static struct GNUNET_CORE_Handle *coreAPI;
+static struct ConsensusSet consensi[DEFAULT_FISHEYE_DEPTH - 1];
 
 /**
 
 /**
- * Stream tokenizer to handle messages coming in from core.
+ * Handle to the core service api.
  */
  */
-static struct GNUNET_SERVER_MessageStreamTokenizer *coreMST;
+static struct GNUNET_CORE_Handle *core_api;
 
 /**
  * The identity of our peer.
 
 /**
  * The identity of our peer.
@@ -521,2373 +362,1572 @@ static struct GNUNET_PeerIdentity my_identity;
 static const struct GNUNET_CONFIGURATION_Handle *cfg;
 
 /**
 static const struct GNUNET_CONFIGURATION_Handle *cfg;
 
 /**
- * The scheduler for this service.
+ * The client, the DV plugin connected to us (or an event monitor).
+ * Hopefully this client will never change, although if the plugin
+ * dies and returns for some reason it may happen.
  */
  */
-static struct GNUNET_SCHEDULER_Handle *sched;
+static struct GNUNET_SERVER_NotificationContext *nc;
 
 /**
 
 /**
- * The client, the DV plugin connected to us.  Hopefully
- * this client will never change, although if the plugin dies
- * and returns for some reason it may happen.
+ * Handle for the statistics service.
  */
  */
-static struct GNUNET_SERVER_Client * client_handle;
+static struct GNUNET_STATISTICS_Handle *stats;
 
 /**
 
 /**
- * Task to run when we shut down, cleaning up all our trash
+ * Handle to ATS service.
  */
  */
-static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
+static struct GNUNET_ATS_PerformanceHandle *ats;
 
 
-static size_t default_dv_priority = 0;
-
-static char *my_short_id;
-
-/**
- * Transmit handle to the plugin.
- */
-static struct GNUNET_CONNECTION_TransmitHandle * plugin_transmit_handle;
 
 /**
 
 /**
- * Head of DLL for client messages
+ * Start creating a new DV set union by initiating the connection.
+ *
+ * @param cls the 'struct DirectNeighbor' of the peer we're building
+ *        a routing consensus with
+ * @param tc scheduler context
  */
  */
-static struct PendingMessage *plugin_pending_head;
+static void
+initiate_set_union (void *cls,
+                   const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
-/**
- * Tail of DLL for client messages
- */
-static struct PendingMessage *plugin_pending_tail;
 
 /**
 
 /**
- * Handle to the peerinfo service
+ * Start creating a new DV set union construction, our neighbour has
+ * asked for it (callback for listening peer).
+ *
+ * @param cls the 'struct DirectNeighbor' of the peer we're building
+ *        a routing consensus with
+ * @param other_peer the other peer
+ * @param context_msg message with application specific information from
+ *        the other peer
+ * @param request request from the other peer, use GNUNET_SET_accept
+ *        to accept it, otherwise the request will be refused
+ *        Note that we don't use a return value here, as it is also
+ *        necessary to specify the set we want to do the operation with,
+ *        whith sometimes can be derived from the context message.
+ *        Also necessary to specify the timeout.
  */
  */
-static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
+static void
+listen_set_union (void *cls,
+                 const struct GNUNET_PeerIdentity *other_peer,
+                 const struct GNUNET_MessageHeader *context_msg,
+                 struct GNUNET_SET_Request *request);
 
 
-/**
- * Transmit handle to core service.
- */
-static struct GNUNET_CORE_TransmitHandle * core_transmit_handle;
 
 /**
 
 /**
- * Head of DLL for core messages
+ * Forward a message from another peer to the plugin.
+ *
+ * @param message the message to send to the plugin
+ * @param origin the original sender of the message
+ * @param distance distance to the original sender of the message
  */
  */
-static struct PendingMessage *core_pending_head;
+static void
+send_data_to_plugin (const struct GNUNET_MessageHeader *message,
+                    const struct GNUNET_PeerIdentity *origin,
+                    uint32_t distance)
+{
+  struct GNUNET_DV_ReceivedMessage *received_msg;
+  size_t size;
 
 
-/**
- * Tail of DLL for core messages
- */
-static struct PendingMessage *core_pending_tail;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Delivering message from peer `%s'\n",
+              GNUNET_i2s (origin));
+  size = sizeof (struct GNUNET_DV_ReceivedMessage) +
+    ntohs (message->size);
+  if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0); /* too big */
+    return;
+  }
+  received_msg = GNUNET_malloc (size);
+  received_msg->header.size = htons (size);
+  received_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DV_RECV);
+  received_msg->distance = htonl (distance);
+  received_msg->sender = *origin;
+  memcpy (&received_msg[1], message, ntohs (message->size));
+  GNUNET_SERVER_notification_context_broadcast (nc,
+                                               &received_msg->header,
+                                               GNUNET_YES);
+  GNUNET_free (received_msg);
+}
 
 
-/**
- * Map of PeerIdentifiers to 'struct GNUNET_dv_neighbor*'s for all
- * directly connected peers.
- */
-static struct GNUNET_CONTAINER_MultiHashMap *direct_neighbors;
 
 /**
 
 /**
- * Map of PeerIdentifiers to 'struct GNUNET_dv_neighbor*'s for
- * peers connected via DV (extended neighborhood).  Does ALSO
- * include any peers that are in 'direct_neighbors'; for those
- * peers, the cost will be zero and the referrer all zeros.
+ * Forward a control message to the plugin.
+ *
+ * @param message the message to send to the plugin
  */
  */
-static struct GNUNET_CONTAINER_MultiHashMap *extended_neighbors;
+static void
+send_control_to_plugin (const struct GNUNET_MessageHeader *message)
+{
+  GNUNET_SERVER_notification_context_broadcast (nc,
+                                               message,
+                                               GNUNET_NO);
+}
 
 
-/**
- * We use the min heap (min refers to cost) to prefer
- * gossipping about peers with small costs.
- */
-static struct GNUNET_CONTAINER_Heap *neighbor_min_heap;
 
 /**
 
 /**
- * We use the max heap (max refers to cost) for general
- * iterations over all peers and to remove the most costly
- * connection if we have too many.
+ * Give an (N)ACK message to the plugin, we transmitted a message for it.
+ *
+ * @param target peer that received the message
+ * @param uid plugin-chosen UID for the message
+ * @param nack GNUNET_NO to send ACK, GNUNET_YES to send NACK
  */
  */
-static struct GNUNET_CONTAINER_Heap *neighbor_max_heap;
+static void
+send_ack_to_plugin (const struct GNUNET_PeerIdentity *target,
+                   uint32_t uid,
+                   int nack)
+{
+  struct GNUNET_DV_AckMessage ack_msg;
 
 
-/**
- * How far out to keep peers we learn about.
- */
-static unsigned long long fisheye_depth;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Delivering ACK for message to peer `%s'\n",
+              GNUNET_i2s (target));
+  ack_msg.header.size = htons (sizeof (ack_msg));
+  ack_msg.header.type = htons ((GNUNET_YES == nack)
+                              ? GNUNET_MESSAGE_TYPE_DV_SEND_NACK
+                              : GNUNET_MESSAGE_TYPE_DV_SEND_ACK);
+  ack_msg.uid = htonl (uid);
+  ack_msg.target = *target;
+  send_control_to_plugin (&ack_msg.header);
+}
 
 
-/**
- * How many peers to store at most.
- */
-static unsigned long long max_table_size;
 
 /**
 
 /**
- * We've been given a target ID based on the random numbers that
- * we assigned to our DV-neighborhood.  Find the entry for the
- * respective neighbor.
+ * Send a DISTANCE_CHANGED message to the plugin.
+ *
+ * @param peer peer with a changed distance
+ * @param distance new distance to the peer
  */
  */
-static int
-find_destination (void *cls,
-                  struct GNUNET_CONTAINER_HeapNode *node,
-                  void *element, GNUNET_CONTAINER_HeapCostType cost)
+static void
+send_distance_change_to_plugin (const struct GNUNET_PeerIdentity *peer,
+                               uint32_t distance)
 {
 {
-  struct FindDestinationContext *fdc = cls;
-  struct DistantNeighbor *dn = element;
+  struct GNUNET_DV_DistanceUpdateMessage du_msg;
 
 
-  if (fdc->tid != dn->our_id)
-    return GNUNET_YES;
-  fdc->dest = dn;
-  return GNUNET_NO;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Delivering DISTANCE_CHANGED for message about peer `%s'\n",
+              GNUNET_i2s (peer));
+  du_msg.header.size = htons (sizeof (du_msg));
+  du_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED);
+  du_msg.distance = htonl (distance);
+  du_msg.peer = *peer;
+  send_control_to_plugin (&du_msg.header);
 }
 
 
 /**
 }
 
 
 /**
- * We've been given a target ID based on the random numbers that
- * we assigned to our DV-neighborhood.  Find the entry for the
- * respective neighbor.
+ * Give a CONNECT message to the plugin.
+ *
+ * @param target peer that connected
+ * @param distance distance to the target
  */
  */
-static int
-find_specific_id (void *cls,
-                  const GNUNET_HashCode *key,
-                  void *value)
+static void
+send_connect_to_plugin (const struct GNUNET_PeerIdentity *target,
+                       uint32_t distance)
 {
 {
-  struct FindIDContext *fdc = cls;
-  struct DistantNeighbor *dn = value;
+  struct GNUNET_DV_ConnectMessage cm;
 
 
-  if (memcmp(&dn->referrer->identity, fdc->via, sizeof(struct GNUNET_PeerIdentity)) == 0)
-    {
-      fdc->tid = dn->referrer_id;
-      return GNUNET_NO;
-    }
-  return GNUNET_YES;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Delivering CONNECT about peer `%s'\n",
+              GNUNET_i2s (target));
+  cm.header.size = htons (sizeof (cm));
+  cm.header.type = htons (GNUNET_MESSAGE_TYPE_DV_CONNECT);
+  cm.distance = htonl (distance);
+  cm.peer = *target;
+  send_control_to_plugin (&cm.header);
 }
 
 }
 
+
 /**
 /**
- * Find a distant peer whose referrer_id matches what we're
- * looking for.  For looking up a peer we've gossipped about
- * but is now disconnected.  Need to do this because we don't
- * want to remove those that may be accessible via a different
- * route.
+ * Give a DISCONNECT message to the plugin.
+ *
+ * @param target peer that disconnected
  */
  */
-static int find_distant_peer (void *cls,
-                              const GNUNET_HashCode * key,
-                              void *value)
+static void
+send_disconnect_to_plugin (const struct GNUNET_PeerIdentity *target)
 {
 {
-  struct FindDestinationContext *fdc = cls;
-  struct DistantNeighbor *distant = value;
+  struct GNUNET_DV_DisconnectMessage dm;
 
 
-  if (fdc->tid == distant->referrer_id)
-    {
-      fdc->dest = distant;
-      return GNUNET_NO;
-    }
-  return GNUNET_YES;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Delivering DISCONNECT about peer `%s'\n",
+              GNUNET_i2s (target));
+  dm.header.size = htons (sizeof (dm));
+  dm.header.type = htons (GNUNET_MESSAGE_TYPE_DV_DISCONNECT);
+  dm.reserved = htonl (0);
+  dm.peer = *target;
+  send_control_to_plugin (&dm.header);
 }
 
 }
 
+
 /**
 /**
- * Function called to notify a client about the socket
- * begin ready to queue more data.  "buf" will be
- * NULL and "size" zero if the socket was closed for
- * writing in the meantime.
+ * Function called to transfer a message to another peer
+ * via core.
  *
  *
- * @param cls closure
+ * @param cls closure with the direct neighbor
  * @param size number of bytes available in buf
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
  */
  * @param size number of bytes available in buf
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
  */
-size_t transmit_to_plugin (void *cls,
-                           size_t size, void *buf)
+static size_t
+core_transmit_notify (void *cls, size_t size, void *buf)
 {
 {
+  struct DirectNeighbor *dn = cls;
   char *cbuf = buf;
   char *cbuf = buf;
-  struct PendingMessage *reply;
+  struct PendingMessage *pending;
   size_t off;
   size_t msize;
 
   size_t off;
   size_t msize;
 
-  if (buf == NULL)
-    {
-      /* client disconnected */
-#if DEBUG_DV_MESSAGES
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: %s buffer was NULL (client disconnect?)\n", my_short_id, "transmit_to_plugin");
-#endif
-      return 0;
-    }
-  plugin_transmit_handle = NULL;
+  dn->cth = NULL;
+  if (NULL == buf)
+  {
+    /* peer disconnected */
+    return 0;
+  }
   off = 0;
   off = 0;
-  while ( (NULL != (reply = plugin_pending_head)) &&
-          (size >= off + (msize = ntohs (reply->msg->size))))
-    {
-      GNUNET_CONTAINER_DLL_remove (plugin_pending_head,
-                                   plugin_pending_tail,
-                                   reply);
-      memcpy (&cbuf[off], reply->msg, msize);
-      GNUNET_free (reply);
-      off += msize;
-    }
-
-  if (plugin_pending_head != NULL)
-    plugin_transmit_handle = GNUNET_SERVER_notify_transmit_ready (client_handle,
-                                                                  ntohs(plugin_pending_head->msg->size),
-                                                                  GNUNET_TIME_UNIT_FOREVER_REL,
-                                                                  &transmit_to_plugin, NULL);
-
+  pending = dn->pm_head;
+  off = 0;
+  while ( (NULL != (pending = dn->pm_head)) &&
+         (size >= off + (msize = ntohs (pending->msg->size))))
+  {
+    dn->pm_queue_size--;
+    GNUNET_CONTAINER_DLL_remove (dn->pm_head,
+                                dn->pm_tail,
+                                 pending);
+    memcpy (&cbuf[off], pending->msg, msize);
+    if (0 != pending->uid)
+      send_ack_to_plugin (&pending->ultimate_target,
+                         pending->uid,
+                         GNUNET_NO);
+    GNUNET_free (pending);
+    off += msize;
+  }
+  if (NULL != dn->pm_head)
+    dn->cth =
+      GNUNET_CORE_notify_transmit_ready (core_api,
+                                        GNUNET_YES /* cork */,
+                                        0 /* priority */,
+                                        GNUNET_TIME_UNIT_FOREVER_REL,
+                                        &dn->peer,
+                                        msize,                                 
+                                        &core_transmit_notify, dn);
   return off;
 }
 
   return off;
 }
 
+
 /**
 /**
- * Send a message to the dv plugin.
+ * Forward the given payload to the given target.
  *
  *
- * @param sender the direct sender of the message
- * @param message the message to send to the plugin
- *        (may be an encapsulated type)
- * @param message_size the size of the message to be sent
- * @param distant_neighbor the original sender of the message
- * @param cost the cost to the original sender of the message
+ * @param target where to send the message
+ * @param uid unique ID for the message
+ * @param ultimate_target ultimate recipient for the message
+ * @param distance expected (remaining) distance to the target
+ * @param sender original sender of the message
+ * @param payload payload of the message
  */
  */
-void send_to_plugin(const struct GNUNET_PeerIdentity * sender,
-                    const struct GNUNET_MessageHeader *message,
-                    size_t message_size,
-                    struct GNUNET_PeerIdentity *distant_neighbor,
-                    size_t cost)
+static void
+forward_payload (struct DirectNeighbor *target,
+                uint32_t distance,
+                uint32_t uid,
+                const struct GNUNET_PeerIdentity *sender,
+                const struct GNUNET_PeerIdentity *ultimate_target,
+                const struct GNUNET_MessageHeader *payload)
 {
 {
-  struct GNUNET_DV_MessageReceived *received_msg;
-  struct PendingMessage *pending_message;
-  char *sender_address;
-  size_t sender_address_len;
-  char *packed_msg_start;
-  int size;
-
-#if DEBUG_DV
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "send_to_plugin called with peer %s as sender\n", GNUNET_i2s(distant_neighbor));
-#endif
-
-  if (memcmp(sender, distant_neighbor, sizeof(struct GNUNET_PeerIdentity)) != 0)
+  struct PendingMessage *pm;
+  struct RouteMessage *rm;
+  size_t msize;
+
+  if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) &&
+       (0 != memcmp (sender,
+                    &my_identity,
+                    sizeof (struct GNUNET_PeerIdentity))) )
   {
   {
-    sender_address_len = sizeof(struct GNUNET_PeerIdentity) * 2;
-    sender_address = GNUNET_malloc(sender_address_len);
-    memcpy(sender_address, distant_neighbor, sizeof(struct GNUNET_PeerIdentity));
-    memcpy(&sender_address[sizeof(struct GNUNET_PeerIdentity)], sender, sizeof(struct GNUNET_PeerIdentity));
+    GNUNET_break (0 == uid);
+    return;
   }
   }
-  else
+  msize = sizeof (struct RouteMessage) + ntohs (payload->size);
+  if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
   {
   {
-    sender_address_len = sizeof(struct GNUNET_PeerIdentity);
-    sender_address = GNUNET_malloc(sender_address_len);
-    memcpy(sender_address, sender, sizeof(struct GNUNET_PeerIdentity));
+    GNUNET_break (0);
+    return;
   }
   }
-
-  size = sizeof(struct GNUNET_DV_MessageReceived) + sender_address_len + message_size;
-  received_msg = GNUNET_malloc(size);
-  received_msg->header.size = htons(size);
-  received_msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE);
-  received_msg->sender_address_len = htonl(sender_address_len);
-  received_msg->distance = htonl(cost);
-  received_msg->msg_len = htonl(message_size);
-  /* Set the sender in this message to be the original sender! */
-  memcpy(&received_msg->sender, distant_neighbor, sizeof(struct GNUNET_PeerIdentity));
-  /* Copy the intermediate sender to the end of the message, this is how the transport identifies this peer */
-  memcpy(&received_msg[1], sender_address, sender_address_len);
-  GNUNET_free(sender_address);
-  /* Copy the actual message after the sender */
-  packed_msg_start = (char *)&received_msg[1];
-  packed_msg_start = &packed_msg_start[sender_address_len];
-  memcpy(packed_msg_start, message, message_size);
-  pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + size);
-  pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
-  memcpy(&pending_message[1], received_msg, size);
-  GNUNET_free(received_msg);
-
-  GNUNET_CONTAINER_DLL_insert_after(plugin_pending_head, plugin_pending_tail, plugin_pending_tail, pending_message);
-
-  if (client_handle != NULL)
-    {
-      if (plugin_transmit_handle == NULL)
-        {
-          plugin_transmit_handle = GNUNET_SERVER_notify_transmit_ready (client_handle,
-                                                                        size, GNUNET_TIME_UNIT_FOREVER_REL,
-                                                                        &transmit_to_plugin, NULL);
-        }
-    }
-  else
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to queue message for plugin, client_handle not yet set (how?)!\n");
-    }
+  pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
+  pm->ultimate_target = *ultimate_target;
+  pm->uid = uid;
+  pm->msg = (const struct GNUNET_MessageHeader *) &pm[1];
+  rm = (struct RouteMessage *) &pm[1];
+  rm->header.size = htons ((uint16_t) msize);
+  rm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_ROUTE);
+  rm->distance = htonl (distance);
+  rm->target = target->peer;
+  rm->sender = *sender;
+  memcpy (&rm[1], payload, ntohs (payload->size));
+  GNUNET_CONTAINER_DLL_insert_tail (target->pm_head,
+                                   target->pm_tail,
+                                   pm);
+  target->pm_queue_size++;
+  if (NULL == target->cth)
+    target->cth = GNUNET_CORE_notify_transmit_ready (core_api,
+                                                    GNUNET_YES /* cork */,
+                                                    0 /* priority */,
+                                                    GNUNET_TIME_UNIT_FOREVER_REL,
+                                                    &target->peer,
+                                                    msize,                                     
+                                                    &core_transmit_notify, target);
 }
 
 }
 
-/* Declare here so retry_core_send is aware of it */
-size_t core_transmit_notify (void *cls,
-                             size_t size, void *buf);
 
 /**
 
 /**
- *  Try to send another message from our core sending list
+ * Find a free slot for storing a 'route' in the 'consensi'
+ * set at the given distance.
+ *
+ * @param distance distance to use for the set slot
  */
  */
-static void
-try_core_send (void *cls,
-                 const struct GNUNET_SCHEDULER_TaskContext *tc)
+static unsigned int
+get_consensus_slot (uint32_t distance)
 {
 {
-  struct PendingMessage *pending;
-  pending = core_pending_head;
-
-  if (core_transmit_handle != NULL)
-    return; /* Message send already in progress */
-
-  if (pending != NULL)
-    core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, pending->importance, pending->timeout, &pending->recipient, pending->msg_size, &core_transmit_notify, NULL);
+  struct ConsensusSet *cs;
+  unsigned int i;
+
+  cs = &consensi[distance];
+  i = 0;
+  while ( (i < cs->array_length) &&
+         (NULL != cs->targets[i]) ) i++;
+  if (i == cs->array_length)
+  {
+    GNUNET_array_grow (cs->targets,
+                      cs->array_length,
+                      cs->array_length * 2 + 2);
+  }
+  return i;
 }
 
 }
 
+
 /**
 /**
- * Function called to notify a client about the socket
- * being ready to queue more data.  "buf" will be
- * NULL and "size" zero if the socket was closed for
- * writing in the meantime.
+ * Allocate a slot in the consensus set for a route.
  *
  *
- * @param cls closure (NULL)
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param route route to initialize
+ * @param distance which consensus set to use
  */
  */
-size_t core_transmit_notify (void *cls,
-                             size_t size, void *buf)
+static void
+allocate_route (struct Route *route,
+               uint32_t distance)
 {
 {
-  char *cbuf = buf;
-  struct PendingMessage *pending;
-  struct PendingMessage *client_reply;
-  size_t off;
-  size_t msize;
-
-  if (buf == NULL)
-    {
-      /* client disconnected */
-#if DEBUG_DV
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': buffer was NULL\n", "DHT");
-#endif
-      return 0;
-    }
+  unsigned int i;
 
 
-  core_transmit_handle = NULL;
-  off = 0;
-  pending = core_pending_head;
-  if ( (pending != NULL) &&
-          (size >= (msize = ntohs (pending->msg->size))))
-    {
-#if DEBUG_DV
-      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s' : transmit_notify (core) called with size %d\n", "dv service", msize);
-#endif
-      GNUNET_CONTAINER_DLL_remove (core_pending_head,
-                                   core_pending_tail,
-                                   pending);
-      if (pending->send_result != NULL) /* Will only be non-null if a real client asked for this send */
-        {
-          client_reply = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(struct GNUNET_DV_SendResultMessage));
-          client_reply->msg = (struct GNUNET_MessageHeader *)&client_reply[1];
-          memcpy(&client_reply[1], pending->send_result, sizeof(struct GNUNET_DV_SendResultMessage));
-          GNUNET_free(pending->send_result);
-
-          GNUNET_CONTAINER_DLL_insert_after(plugin_pending_head, plugin_pending_tail, plugin_pending_tail, client_reply);
-          if (client_handle != NULL)
-            {
-              if (plugin_transmit_handle == NULL)
-                {
-                  plugin_transmit_handle = GNUNET_SERVER_notify_transmit_ready (client_handle,
-                                                                                sizeof(struct GNUNET_DV_SendResultMessage),
-                                                                                GNUNET_TIME_UNIT_FOREVER_REL,
-                                                                                &transmit_to_plugin, NULL);
-                }
-              else
-                {
-                  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to queue message for plugin, must be one in progress already!!\n");
-                }
-            }
-        }
-      memcpy (&cbuf[off], pending->msg, msize);
-      GNUNET_free (pending);
-      off += msize;
-    }
-  /*reply = core_pending_head;*/
+  i = get_consensus_slot (distance);
+  route->set_offset = i;
+  consensi[distance].targets[i] = route;
+  route->target.distance = htonl (distance);
+}
 
 
-  GNUNET_SCHEDULER_add_now(sched, &try_core_send, NULL);
-  /*if (reply != NULL)
-    core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, reply->importance, reply->timeout, &reply->recipient, reply->msg_size, &core_transmit_notify, NULL);*/
 
 
-  return off;
+/**
+ * Release a slot in the consensus set for a route.
+ *
+ * @param route route to release the slot from
+ */
+static void
+release_route (struct Route *route)
+{
+  consensi[ntohl (route->target.distance)].targets[route->set_offset] = NULL;
+  route->set_offset = UINT_MAX; /* indicate invalid slot */
 }
 
 
 /**
 }
 
 
 /**
- * Send a DV data message via DV.
+ * Move a route from one consensus set to another.
  *
  *
- * @param sender the original sender of the message
- * @param recipient the next hop recipient, may be our direct peer, maybe not
- * @param send_context the send context
+ * @param route route to move
+ * @param new_distance new distance for the route (destination set)
  */
  */
-static int
-send_message_via (const struct GNUNET_PeerIdentity *sender,
-                  const struct GNUNET_PeerIdentity *recipient,
-                  struct DV_SendContext *send_context)
+static void
+move_route (struct Route *route,
+           uint32_t new_distance)
 {
 {
-  p2p_dv_MESSAGE_Data *toSend;
-  unsigned int msg_size;
-  unsigned int recipient_id;
-  unsigned int sender_id;
-  struct DistantNeighbor *source;
-  struct PendingMessage *pending_message;
-  struct FindIDContext find_context;
-#if DEBUG_DV
-  char shortname[5];
-#endif
-
-  msg_size = send_context->message_size + sizeof (p2p_dv_MESSAGE_Data);
-
-  find_context.dest = send_context->distant_peer;
-  find_context.via = recipient;
-  find_context.tid = 0;
-  GNUNET_CONTAINER_multihashmap_get_multiple (extended_neighbors, &send_context->distant_peer->hashPubKey,
-                                              &find_specific_id, &find_context);
-
-  if (find_context.tid == 0)
-    {
-      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: find_specific_id failed to find peer!\n", my_short_id);
-      /* target unknown to us, drop! */
-      return GNUNET_SYSERR;
-    }
-  recipient_id = find_context.tid;
-
-  if (0 == (memcmp (&my_identity,
-                        sender, sizeof (struct GNUNET_PeerIdentity))))
-  {
-    sender_id = 0;
-    source = GNUNET_CONTAINER_multihashmap_get (extended_neighbors,
-                                                    &sender->hashPubKey);
-    if (source != NULL)
-      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: send_message_via found %s, myself in extended peer list???\n", my_short_id, GNUNET_i2s(&source->identity));
-  }
-  else
-  {
-    source = GNUNET_CONTAINER_multihashmap_get (extended_neighbors,
-                                                &sender->hashPubKey);
-    if (source == NULL)
-      {
-              /* sender unknown to us, drop! */
-        return GNUNET_SYSERR;
-      }
-    sender_id = source->our_id;
-  }
-
-  pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + msg_size);
-  pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
-  pending_message->send_result = send_context->send_result;
-  memcpy(&pending_message->recipient, recipient, sizeof(struct GNUNET_PeerIdentity));
-  pending_message->msg_size = msg_size;
-  pending_message->importance = send_context->importance;
-  pending_message->timeout = send_context->timeout;
-  toSend = (p2p_dv_MESSAGE_Data *)pending_message->msg;
-  toSend->header.size = htons (msg_size);
-  toSend->header.type = htons (GNUNET_MESSAGE_TYPE_DV_DATA);
-  toSend->sender = htonl (sender_id);
-  toSend->recipient = htonl (recipient_id);
-#if DEBUG_DV_MESSAGES
-  toSend->uid = send_context->uid; /* Still sent around in network byte order */
-#else
-  toSend->uid = htonl(0);
-#endif
-
-  memcpy (&toSend[1], send_context->message, send_context->message_size);
-
-#if DEBUG_DV
-  memcpy(&shortname, GNUNET_i2s(send_context->distant_peer), 4);
-  shortname[4] = '\0';
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Notifying core of send to destination `%s' via `%s' size %u\n", "DV", &shortname, GNUNET_i2s(recipient), msg_size);
-#endif
-
-  GNUNET_CONTAINER_DLL_insert_after (core_pending_head,
-                                     core_pending_tail,
-                                     core_pending_tail,
-                                     pending_message);
-
-  GNUNET_SCHEDULER_add_now(sched, try_core_send, NULL);
+  unsigned int i;
 
 
-  return GNUNET_YES;
+  release_route (route);
+  i = get_consensus_slot (new_distance);
+  route->set_offset = i;
+  consensi[new_distance].targets[i] = route;
+  route->target.distance = htonl (new_distance);
 }
 
 }
 
+
 /**
 /**
- * Given a FindLeastCostContext, and a set
- * of peers that match the target, return the cheapest.
+ * Initialize this neighbors 'my_set' and when done give
+ * it to the pending set operation for execution.
  *
  *
- * @param cls closure, a struct FindLeastCostContext
- * @param key the key identifying the target peer
- * @param value the target peer
+ * Add a single element to the set per call:
  *
  *
- * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
- */
-static int
-find_least_cost_peer (void *cls,
-                  const GNUNET_HashCode *key,
-                  void *value)
-{
-  struct FindLeastCostContext *find_context = cls;
-  struct DistantNeighbor *dn = value;
-
-  if (dn->cost < find_context->least_cost)
-    {
-      find_context->target = dn;
-    }
-  if (dn->cost == DIRECT_NEIGHBOR_COST)
-    return GNUNET_NO;
-  return GNUNET_YES;
-}
-
-/**
- * Send a DV data message via DV.
+ * If we reached the last element of a consensus element: increase distance
  *
  *
- * @param recipient the ultimate recipient of this message
- * @param sender the original sender of the message
- * @param specific_neighbor the specific neighbor to send this message via
- * @param message the packed message
- * @param message_size size of the message
- * @param importance what priority to send this message with
- * @param timeout how long to possibly delay sending this message
- */
-static int
-send_message (const struct GNUNET_PeerIdentity * recipient,
-              const struct GNUNET_PeerIdentity * sender,
-              const struct DistantNeighbor * specific_neighbor,
-              const struct GNUNET_MessageHeader * message,
-              size_t message_size,
-              unsigned int importance,
-              unsigned int uid,
-              struct GNUNET_TIME_Relative timeout)
-{
-  p2p_dv_MESSAGE_Data *toSend;
-  unsigned int msg_size;
-  unsigned int cost;
-  unsigned int recipient_id;
-  unsigned int sender_id;
-  struct DistantNeighbor *target;
-  struct DistantNeighbor *source;
-  struct PendingMessage *pending_message;
-  struct FindLeastCostContext find_least_ctx;
-#if DEBUG_DV_PEER_NUMBERS
-  struct GNUNET_CRYPTO_HashAsciiEncoded encPeerFrom;
-  struct GNUNET_CRYPTO_HashAsciiEncoded encPeerTo;
-  struct GNUNET_CRYPTO_HashAsciiEncoded encPeerVia;
-#endif
-  msg_size = message_size + sizeof (p2p_dv_MESSAGE_Data);
-
-  find_least_ctx.least_cost = -1;
-  find_least_ctx.target = NULL;
-  /*
-   * Need to find the least cost peer, lest the transport selection keep
-   * picking the same DV route for the same destination which results
-   * in messages looping forever.  Relatively cheap, we don't iterate
-   * over all known peers, just those that apply.
-   */
-  GNUNET_CONTAINER_multihashmap_get_multiple (extended_neighbors,
-                                                       &recipient->hashPubKey,  &find_least_cost_peer, &find_least_ctx);
-  target = find_least_ctx.target;
-
-  if (target == NULL)
-    {
-      /* target unknown to us, drop! */
-      return GNUNET_SYSERR;
-    }
-  recipient_id = target->referrer_id;
-
-  source = GNUNET_CONTAINER_multihashmap_get (extended_neighbors,
-                                              &sender->hashPubKey);
-  if (source == NULL)
-    {
-      if (0 != (memcmp (&my_identity,
-                        sender, sizeof (struct GNUNET_PeerIdentity))))
-        {
-          /* sender unknown to us, drop! */
-          return GNUNET_SYSERR;
-        }
-      sender_id = 0;            /* 0 == us */
-    }
-  else
-    {
-      /* find out the number that we use when we gossip about
-         the sender */
-      sender_id = source->our_id;
-    }
-
-#if DEBUG_DV_PEER_NUMBERS
-  GNUNET_CRYPTO_hash_to_enc (&source->identity.hashPubKey, &encPeerFrom);
-  GNUNET_CRYPTO_hash_to_enc (&target->referrer->identity.hashPubKey, &encPeerVia);
-  encPeerFrom.encoding[4] = '\0';
-  encPeerVia.encoding[4] = '\0';
-#endif
-  if ((sender_id != 0) && (0 == memcmp(&source->identity, &target->referrer->identity, sizeof(struct GNUNET_PeerIdentity))))
-    {
-      return 0;
-    }
-
-  cost = target->cost;
-  pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + msg_size);
-  pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
-  pending_message->send_result = NULL;
-  pending_message->importance = importance;
-  pending_message->timeout = timeout;
-  memcpy(&pending_message->recipient, &target->referrer->identity, sizeof(struct GNUNET_PeerIdentity));
-  pending_message->msg_size = msg_size;
-  toSend = (p2p_dv_MESSAGE_Data *)pending_message->msg;
-  toSend->header.size = htons (msg_size);
-  toSend->header.type = htons (GNUNET_MESSAGE_TYPE_DV_DATA);
-  toSend->sender = htonl (sender_id);
-  toSend->recipient = htonl (recipient_id);
-#if DEBUG_DV_MESSAGES
-  toSend->uid = htonl(uid);
-#else
-  toSend->uid = htonl(0);
-#endif
-
-#if DEBUG_DV_PEER_NUMBERS
-  GNUNET_CRYPTO_hash_to_enc (&target->identity.hashPubKey, &encPeerTo);
-  encPeerTo.encoding[4] = '\0';
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Sending DATA message. Sender id %u, source %s, destination %s, via %s\n", GNUNET_i2s(&my_identity), sender_id, &encPeerFrom, &encPeerTo, &encPeerVia);
-#endif
-  memcpy (&toSend[1], message, message_size);
-  if (source->pkey == NULL) /* Test our hypothesis about message failures! */
-    {
-      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: Sending message, but anticipate recipient will not know sender!!!\n\n\n");
-    }
-  GNUNET_CONTAINER_DLL_insert_after (core_pending_head,
-                                     core_pending_tail,
-                                     core_pending_tail,
-                                     pending_message);
-#if DEBUG_DV
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Notifying core of send size %d to destination `%s'\n", "DV SEND MESSAGE", msg_size, GNUNET_i2s(recipient));
-#endif
-
-  GNUNET_SCHEDULER_add_now(sched, try_core_send, NULL);
-  /*if (core_transmit_handle == NULL)
-    core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, importance, timeout, &target->referrer->identity, msg_size, &core_transmit_notify, NULL);
-  else
-    GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: CORE ALREADY SENDING\n", "DV SEND MESSAGE", msg_size);*/
-  return (int) cost;
-}
-
-#if USE_PEER_ID
-struct CheckPeerContext
-{
-  /**
-   * Peer we found
-   */
-  struct DistantNeighbor *peer;
-
-  /**
-   * Sender id to search for
-   */
-  unsigned int sender_id;
-};
-
-/**
- * Iterator over hash map entries.
  *
  *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
+ * @param cls the neighbor for which we are building the set
  */
  */
-int checkPeerID (void *cls,
-                 const GNUNET_HashCode * key,
-                 void *value)
+static void
+build_set (void *cls)
 {
 {
-  struct CheckPeerContext *ctx = cls;
-  struct DistantNeighbor *distant = value;
-
-  if (memcmp(key, &ctx->sender_id, sizeof(unsigned int)) == 0)
+  struct DirectNeighbor *neighbor = cls;
+  struct GNUNET_SET_Element element;
+  struct Target *target;
+  target = NULL;
+  while ( (DEFAULT_FISHEYE_DEPTH - 1 > neighbor->consensus_insertion_distance) &&
+         (consensi[neighbor->consensus_insertion_distance].array_length == neighbor->consensus_insertion_offset) )
   {
   {
-    ctx->peer = distant;
-    return GNUNET_NO;
+    /* If we reached the last element of a consensus array element: increase distance and start with next array */
+    neighbor->consensus_insertion_offset = 0;
+    neighbor->consensus_insertion_distance++;
+    /* skip over NULL entries */
+    while ( (DEFAULT_FISHEYE_DEPTH - 1 > neighbor->consensus_insertion_distance) &&
+           (consensi[neighbor->consensus_insertion_distance].array_length  > neighbor->consensus_insertion_offset) &&
+           (NULL == consensi[neighbor->consensus_insertion_distance].targets[neighbor->consensus_insertion_offset]) )
+      neighbor->consensus_insertion_offset++;
+  }
+  if (DEFAULT_FISHEYE_DEPTH - 1 == neighbor->consensus_insertion_distance)
+  {
+    /* we have added all elements to the set, run the operation */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Finished building my SET for peer `%s' with %u elements, committing\n",
+               GNUNET_i2s(&neighbor->peer),
+               neighbor->consensus_elements);
+    GNUNET_SET_commit (neighbor->set_op,
+                      neighbor->my_set);
+    GNUNET_SET_destroy (neighbor->my_set);
+    neighbor->my_set = NULL;
+    return;
   }
   }
-  return GNUNET_YES;
-
-}
-#endif
 
 
+  target = &consensi[neighbor->consensus_insertion_distance].targets[neighbor->consensus_insertion_offset]->target;
+  element.size = sizeof (struct Target);
+  element.type = htons (0); /* do we need this? */
+  element.data = target;
+
+  /* Find next non-NULL entry */
+  neighbor->consensus_insertion_offset++;
+  /* skip over NULL entries */
+  while ( (DEFAULT_FISHEYE_DEPTH - 1 > neighbor->consensus_insertion_distance) &&
+         (consensi[neighbor->consensus_insertion_distance].array_length > neighbor->consensus_insertion_offset) &&
+         (NULL == consensi[neighbor->consensus_insertion_distance].targets[neighbor->consensus_insertion_offset]) )
+  {
+    neighbor->consensus_insertion_offset++;
+  }
 
 
-/**
- * Handler for messages parsed out by the tokenizer from
- * DV DATA received for this peer.
- *
- * @param cls NULL
- * @param client the TokenizedMessageContext which contains message information
- * @param message the actual message
- */
-void tokenized_message_handler (void *cls,
-                                void *client,
-                                const struct GNUNET_MessageHeader *message)
-{
-  struct TokenizedMessageContext *ctx = client;
-  GNUNET_break_op (ntohs (message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP);
-  GNUNET_break_op (ntohs (message->type) != GNUNET_MESSAGE_TYPE_DV_DATA);
-  if ( (ntohs (message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP) &&
-      (ntohs (message->type) != GNUNET_MESSAGE_TYPE_DV_DATA) )
+  if ( (0 != memcmp(&target->peer, &my_identity, sizeof (my_identity))) &&
+       (0 != memcmp(&target->peer, &neighbor->peer, sizeof (neighbor->peer))) )
   {
   {
-#if DEBUG_DV_MESSAGES
+    /* Add target if it is not the neighbor or this peer */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%s: Receives %s message for me, uid %u, size %d, type %d cost %u from %s!\n", my_short_id, "DV DATA", ctx->uid, ntohs(message->size), ntohs(message->type), ctx->distant->cost, GNUNET_i2s(&ctx->distant->identity));
-#endif
-    GNUNET_assert(memcmp(ctx->peer, &ctx->distant->identity, sizeof(struct GNUNET_PeerIdentity)) != 0);
-    send_to_plugin(ctx->peer, message, ntohs(message->size), &ctx->distant->identity, ctx->distant->cost);
+                "Adding peer `%s' with distance %u to SET\n",
+                GNUNET_i2s (&target->peer),
+                ntohl (target->distance));
+    GNUNET_SET_add_element (neighbor->my_set,
+                            &element,
+                            &build_set, neighbor);
+    neighbor->consensus_elements++;
   }
   }
+  else
+    build_set(neighbor);
 }
 
 }
 
-struct DelayedMessageContext
-{
-  struct GNUNET_PeerIdentity dest;
-  struct GNUNET_PeerIdentity sender;
-  struct GNUNET_MessageHeader *message;
-  size_t message_size;
-  uint32_t uid;
-};
-
-void send_message_delayed (void *cls,
-                           const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct DelayedMessageContext *msg_ctx = cls;
-  if (msg_ctx != NULL)
-    {
-      send_message(&msg_ctx->dest,
-                   &msg_ctx->sender,
-                   NULL,
-                   msg_ctx->message,
-                   msg_ctx->message_size,
-                   default_dv_priority,
-                   msg_ctx->uid,
-                   GNUNET_TIME_relative_get_forever());
-    }
-  GNUNET_free(msg_ctx->message);
-  GNUNET_free(msg_ctx);
-}
 
 /**
 
 /**
- * Core handler for dv data messages.  Whatever this message
- * contains all we really have to do is rip it out of its
- * DV layering and give it to our pal the DV plugin to report
- * in with.
+ * A peer is now connected to us at distance 1.  Initiate DV exchange.
  *
  *
- * @param cls closure
- * @param peer peer which sent the message (immediate sender)
- * @param message the message
- * @param latency the latency of the connection we received the message from
- * @param distance the distance to the immediate peer
+ * @param neighbor entry for the neighbor at distance 1
  */
  */
-static int handle_dv_data_message (void *cls,
-                             const struct GNUNET_PeerIdentity * peer,
-                             const struct GNUNET_MessageHeader * message,
-                             struct GNUNET_TIME_Relative latency,
-                             uint32_t distance)
+static void
+handle_direct_connect (struct DirectNeighbor *neighbor)
 {
 {
-  const p2p_dv_MESSAGE_Data *incoming = (const p2p_dv_MESSAGE_Data *) message;
-  const struct GNUNET_MessageHeader *packed_message;
-  struct DirectNeighbor *dn;
-  struct DistantNeighbor *pos;
-  unsigned int sid;             /* Sender id */
-  unsigned int tid;             /* Target id */
-  struct GNUNET_PeerIdentity *original_sender;
-  struct GNUNET_PeerIdentity *destination;
-  struct FindDestinationContext fdc;
-  struct TokenizedMessageContext tkm_ctx;
-  struct DelayedMessageContext *delayed_context;
-#if USE_PEER_ID
-  struct CheckPeerContext checkPeerCtx;
-#endif
-  char *sender_id;
-  char *direct_id;
-  int ret;
-  size_t packed_message_size;
-  char *cbuf;
-#if NO_MST
-  size_t offset;
-#endif
-  packed_message_size = ntohs(incoming->header.size) - sizeof(p2p_dv_MESSAGE_Data);
-
-
-#if DEBUG_DV
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Receives DATA message from %s size %d, packed size %d!\n", my_short_id, GNUNET_i2s(peer) , ntohs(incoming->header.size), packed_message_size);
-#endif
-
-  if (ntohs (incoming->header.size) <  sizeof (p2p_dv_MESSAGE_Data) + sizeof (struct GNUNET_MessageHeader))
-    {
+  struct Route *route;
+  struct GNUNET_HashCode h1;
+  struct GNUNET_HashCode h2;
+  struct GNUNET_HashCode session_id;
 
 
-#if DEBUG_DV
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "`%s': Message sizes don't add up, total size %u, expected at least %u!\n", "dv service", ntohs(incoming->header.size), sizeof (p2p_dv_MESSAGE_Data) + sizeof (struct GNUNET_MessageHeader));
-#endif
-      return GNUNET_SYSERR;
-    }
-
-  dn = GNUNET_CONTAINER_multihashmap_get (direct_neighbors,
-                                  &peer->hashPubKey);
-  if (dn == NULL)
-    {
-#if DEBUG_DV
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%s: dn NULL!\n", "dv");
-#endif
-      return GNUNET_OK;
-    }
-  sid = ntohl (incoming->sender);
-#if USE_PEER_ID
-  if (sid != 0)
-  {
-    checkPeerCtx.sender_id = sid;
-    checkPeerCtx.peer = NULL;
-    GNUNET_CONTAINER_multihashmap_iterate(extended_neighbors, &checkPeerID, &checkPeerCtx);
-    pos = checkPeerCtx.peer;
-  }
-  else
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Direct connection to %s established, routing table exchange begins.\n",
+             GNUNET_i2s (&neighbor->peer));
+  GNUNET_STATISTICS_update (stats,
+                           "# peers connected (1-hop)",
+                           1, GNUNET_NO);
+  route = GNUNET_CONTAINER_multipeermap_get (all_routes,
+                                            &neighbor->peer);
+  if (NULL != route)
   {
   {
-    pos = GNUNET_CONTAINER_multihashmap_get (extended_neighbors,
-                                             &peer->hashPubKey);
+    send_disconnect_to_plugin (&neighbor->peer);
+    release_route (route);
+    GNUNET_free (route);
   }
   }
-#else
-  pos = dn->referee_head;
-  while ((NULL != pos) && (pos->referrer_id != sid))
-    pos = pos->next;
-#endif
 
 
-  if (pos == NULL)
-    {
-      direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity));
-#if DEBUG_DV_MESSAGES
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%s: unknown sender (%u), Message uid %llu from %s!\n", GNUNET_i2s(&my_identity), ntohl(incoming->sender), ntohl(incoming->uid), direct_id);
-#endif
-      GNUNET_free(direct_id);
-      pos = dn->referee_head;
-      while ((NULL != pos) && (pos->referrer_id != sid))
-      {
-        sender_id = strdup(GNUNET_i2s(&pos->identity));
-        GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "I know sender %u %s\n", pos->referrer_id, sender_id);
-        GNUNET_free(sender_id);
-        pos = pos->next;
-      }
+  neighbor->direct_route = GNUNET_new (struct Route);
+  neighbor->direct_route->next_hop = neighbor;
+  neighbor->direct_route->target.peer= neighbor->peer;
+  neighbor->direct_route->target.distance = DIRECT_NEIGHBOR_COST;
+  allocate_route (neighbor->direct_route, DIRECT_NEIGHBOR_COST);
 
 
-#if DEBUG_MESSAGE_DROP
-      direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity));
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%s: DROPPING MESSAGE type %d, unknown sender! Message immediately from %s!\n", GNUNET_i2s(&my_identity), ntohs(((struct GNUNET_MessageHeader *)&incoming[1])->type), direct_id);
-      GNUNET_free(direct_id);
-#endif
-      /* unknown sender */
-      return GNUNET_OK;
-    }
-  original_sender = &pos->identity;
-  tid = ntohl (incoming->recipient);
-  if (tid == 0)
-    {
-      /* 0 == us */
-      cbuf = (char *)&incoming[1];
-
-      tkm_ctx.peer = peer;
-      tkm_ctx.distant = pos;
-      tkm_ctx.uid = ntohl(incoming->uid);
-      if (GNUNET_OK != GNUNET_SERVER_mst_receive (coreMST,
-                                                  &tkm_ctx,
-                                                  cbuf,
-                                                  packed_message_size,
-                                                  GNUNET_NO,
-                                                  GNUNET_NO))
-        {
-          GNUNET_break_op(0);
-          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: %s Received corrupt data, discarding!", my_short_id, "DV SERVICE");
-        }
-#if NO_MST
-      offset = 0;
-      while(offset < packed_message_size)
-        {
-          packed_message = (struct GNUNET_MessageHeader *)&cbuf[offset];
-
-          GNUNET_break_op (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP);
-          GNUNET_break_op (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_DATA);
-          if ( (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP) &&
-              (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_DATA) )
-          {
-#if DEBUG_DV_MESSAGES
-            GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                        "%s: Receives %s message(s) for me, uid %u, total size %d cost %u from %s!\n", my_short_id, "DV DATA", ntohl(incoming->uid), ntohs(packed_message->size), pos->cost, GNUNET_i2s(&pos->identity));
-#endif
-            GNUNET_assert(memcmp(peer, &pos->identity, sizeof(struct GNUNET_PeerIdentity)) != 0);
-            send_to_plugin(peer, packed_message, ntohs(packed_message->size), &pos->identity, pos->cost);
-          }
-          offset += ntohs(packed_message->size);
-        }
-#endif
-      return GNUNET_OK;
-    }
-  else
-    {
-      packed_message = (struct GNUNET_MessageHeader *)&incoming[1];
-    }
-
-  /* FIXME: this is the *only* per-request operation we have in DV
-     that is O(n) in relation to the number of connected peers; a
-     hash-table lookup could easily solve this (minor performance
-     issue) */
-  fdc.tid = tid;
-  fdc.dest = NULL;
-  GNUNET_CONTAINER_heap_iterate (neighbor_max_heap,
-                                 &find_destination, &fdc);
-
-#if DEBUG_DV
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%s: Receives %s message for someone else!\n", "dv", "DV DATA");
-#endif
-
-  if (fdc.dest == NULL)
-    {
-#if DEBUG_DV_MESSAGES
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%s: Receives %s message uid %u for someone we don't know (id %u)!\n", my_short_id, "DV DATA", ntohl(incoming->uid), tid);
-#endif
-    return GNUNET_OK;
-    }
-  destination = &fdc.dest->identity;
-
-  if (0 == memcmp (destination, peer, sizeof (struct GNUNET_PeerIdentity)))
-    {
-      /* FIXME: create stat: routing loop-discard! */
-#if DEBUG_DV_PEER_NUMBERS
-      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "\n\n\nLoopy loo message\n\n\n");
-#endif
-
-#if DEBUG_DV_MESSAGES
-      direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity));
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%s: DROPPING MESSAGE uid %u type %d, routing loop! Message immediately from %s!\n", my_short_id, ntohl(incoming->uid), ntohs(packed_message->type), direct_id);
-#endif
-      return GNUNET_OK;
-    }
-
-  /* At this point we have a message, and we need to forward it on to the
-   * next DV hop.
-   */
-  /* FIXME: Can't send message on, we have to behave.
-   * We have to tell core we have a message for the next peer, and let
-   * transport do transport selection on how to get this message to 'em */
-  /*ret = send_message (&destination,
-                      &original_sender,
-                      packed_message, DV_PRIORITY, DV_DELAY);*/
-
-#if DEBUG_DV_MESSAGES
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: FORWARD %s message for %s, uid %u, size %d type %d, cost %u!\n", my_short_id, "DV DATA", GNUNET_i2s(destination), ntohl(incoming->uid), ntohs(packed_message->size), ntohs(packed_message->type), pos->cost);
-#endif
-
-  if (GNUNET_TIME_absolute_get_duration(pos->last_gossip).value < GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 2).value)
+              "Adding direct route to %s\n",
+              GNUNET_i2s (&neighbor->direct_route->target.peer));
+
+
+  /* construct session ID seed as XOR of both peer's identities */
+  GNUNET_CRYPTO_hash (&my_identity, sizeof (my_identity), &h1);
+  GNUNET_CRYPTO_hash (&neighbor->peer, sizeof (struct GNUNET_PeerIdentity), &h2);
+  GNUNET_CRYPTO_hash_xor (&h1,
+                         &h2,
+                         &session_id);
+  /* make sure session ID is unique across applications by salting it with 'DV' */
+  GNUNET_CRYPTO_hkdf (&neighbor->real_session_id, sizeof (struct GNUNET_HashCode),
+                     GCRY_MD_SHA512, GCRY_MD_SHA256,
+                     "DV-SALT", 2,
+                     &session_id, sizeof (session_id),
+                     NULL, 0);
+  if (0 < memcmp (&neighbor->peer,
+                 &my_identity,
+                 sizeof (struct GNUNET_PeerIdentity)))
+  {
+    if (NULL != neighbor->listen_handle)
     {
     {
-      delayed_context = GNUNET_malloc(sizeof(struct DelayedMessageContext));
-      memcpy(&delayed_context->dest, destination, sizeof(struct GNUNET_PeerIdentity));
-      memcpy(&delayed_context->sender, original_sender, sizeof(struct GNUNET_PeerIdentity));
-      delayed_context->message = GNUNET_malloc(packed_message_size);
-      memcpy(delayed_context->message, packed_message, packed_message_size);
-      delayed_context->message_size = packed_message_size;
-      delayed_context->uid = ntohl(incoming->uid);
-      GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 2500), &send_message_delayed, delayed_context);
-      //GNUNET_SCHEDULER_add_now(sched, &send_message_delayed, delayed_context);
-      return GNUNET_OK;
+      GNUNET_break (0);
     }
     }
+    else
+      neighbor->initiate_task = GNUNET_SCHEDULER_add_now (&initiate_set_union,
+                                                       neighbor);
+  }
   else
   else
+  {
+    if (NULL != neighbor->listen_handle)
     {
     {
-      ret = send_message(destination,
-                         original_sender,
-                         NULL,
-                         packed_message,
-                         packed_message_size,
-                         default_dv_priority,
-                         ntohl(incoming->uid),
-                         GNUNET_TIME_relative_get_forever());
+      GNUNET_break (0);
     }
     }
-  if (ret != GNUNET_SYSERR)
-    return GNUNET_OK;
-  else
+    else
     {
     {
-#if DEBUG_MESSAGE_DROP
-      direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity));
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%s: DROPPING MESSAGE type %d, forwarding failed! Message immediately from %s!\n", GNUNET_i2s(&my_identity), ntohs(((struct GNUNET_MessageHeader *)&incoming[1])->type), direct_id);
-#endif
-      return GNUNET_SYSERR;
+                  "Starting SET listen operation with peer `%s'\n",
+                  GNUNET_i2s(&neighbor->peer));
+      neighbor->listen_handle = GNUNET_SET_listen (cfg,
+                                                   GNUNET_SET_OPERATION_UNION,
+                                                   &neighbor->real_session_id,
+                                                   &listen_set_union,
+                                                   neighbor);
     }
     }
+  }
 }
 
 }
 
-#if DEBUG_DV
-/**
- * Iterator over hash map entries.
- *
- * @param cls closure (NULL)
- * @param key current key code
- * @param value value in the hash map (DistantNeighbor)
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-int print_neighbors (void *cls,
-                     const GNUNET_HashCode * key,
-                     void *value)
-{
-  struct DistantNeighbor *distant_neighbor = value;
-  char my_shortname[5];
-  char referrer_shortname[5];
-  memcpy(&my_shortname, GNUNET_i2s(&my_identity), 4);
-  my_shortname[4] = '\0';
-  memcpy(&referrer_shortname, GNUNET_i2s(&distant_neighbor->referrer->identity), 4);
-  referrer_shortname[4] = '\0';
-
-  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "`%s' %s: Peer `%s', distance %d, referrer `%s' pkey: %s\n", &my_shortname, "DV", GNUNET_i2s(&distant_neighbor->identity), distant_neighbor->cost, &referrer_shortname, distant_neighbor->pkey == NULL ? "no" : "yes");
-  return GNUNET_YES;
-}
-#endif
 
 /**
 
 /**
- *  Scheduled task which gossips about known direct peers to other connected
- *  peers.  Will run until called with reason shutdown.
+ * Method called whenever a peer connects.
+ *
+ * @param cls closure
+ * @param peer peer identity this notification is about
  */
 static void
  */
 static void
-neighbor_send_task (void *cls,
-                    const struct GNUNET_SCHEDULER_TaskContext *tc)
+handle_core_connect (void *cls,
+                    const struct GNUNET_PeerIdentity *peer)
 {
 {
-  struct NeighborSendContext *send_context = cls;
-#if DEBUG_DV_GOSSIP_SEND
-  char * encPeerAbout;
-  char * encPeerTo;
-#endif
-  struct DistantNeighbor *about;
-  struct DirectNeighbor *to;
-  struct FastGossipNeighborList *about_list;
-
-  p2p_dv_MESSAGE_NeighborInfo *message;
-  struct PendingMessage *pending_message;
-
-  if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
+  struct DirectNeighbor *neighbor;
+
+  /* Check for connect to self message */
+  if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
+    return;
+  /* check if entry exists */
+  neighbor = GNUNET_CONTAINER_multipeermap_get (direct_neighbors,
+                                               peer);
+  if (NULL != neighbor)
   {
   {
-#if DEBUG_DV_GOSSIP
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Called with reason shutdown, shutting down!\n",
-              GNUNET_i2s(&my_identity));
-#endif
+    GNUNET_break (GNUNET_YES != neighbor->connected);
+    neighbor->connected = GNUNET_YES;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Core connected to %s (distance %u)\n",
+               GNUNET_i2s (peer),
+               (unsigned int) neighbor->distance);
+    if (DIRECT_NEIGHBOR_COST != neighbor->distance)
+      return;
+    handle_direct_connect (neighbor);
     return;
   }
     return;
   }
-
-  if (send_context->fast_gossip_list_head != NULL)
-    {
-      about_list = send_context->fast_gossip_list_head;
-      about = about_list->about;
-      GNUNET_CONTAINER_DLL_remove(send_context->fast_gossip_list_head,
-                                  send_context->fast_gossip_list_tail,
-                                  about_list);
-      GNUNET_free(about_list);
-    }
-  else
-    {
-      /* FIXME: this may become a problem, because the heap walk has only one internal "walker".  This means
-       * that if two neighbor_send_tasks are operating in lockstep (which is quite possible, given default
-       * values for all connected peers) there may be a serious bias as to which peers get gossiped about!
-       * Probably the *best* way to fix would be to have an opaque pointer to the walk position passed as
-       * part of the walk_get_next call.  Then the heap would have to keep a list of walks, or reset the walk
-       * whenever a modification has been detected.  Yuck either way.  Perhaps we could iterate over the heap
-       * once to get a list of peers to gossip about and gossip them over time... But then if one goes away
-       * in the mean time that becomes nasty.  For now we'll just assume that the walking is done
-       * asynchronously enough to avoid major problems (-;
-       *
-       * NOTE: probably fixed once we decided send rate based on allowed bandwidth.
-       */
-      about = GNUNET_CONTAINER_heap_walk_get_next (neighbor_min_heap);
-    }
-  to = send_context->toNeighbor;
-
-  if ((about != NULL) && (to != about->referrer /* split horizon */ ) &&
-#if SUPPORT_HIDING
-      (about->hidden == GNUNET_NO) &&
-#endif
-      (to != NULL) &&
-      (0 != memcmp (&about->identity,
-                        &to->identity, sizeof (struct GNUNET_PeerIdentity))) &&
-      (about->pkey != NULL))
-    {
-#if DEBUG_DV_GOSSIP_SEND
-      encPeerAbout = GNUNET_strdup(GNUNET_i2s(&about->identity));
-      encPeerTo = GNUNET_strdup(GNUNET_i2s(&to->identity));
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%s: Sending info about peer %s id %u to directly connected peer %s\n",
-                  GNUNET_i2s(&my_identity),
-                  encPeerAbout, about->our_id, encPeerTo);
-      GNUNET_free(encPeerAbout);
-      GNUNET_free(encPeerTo);
-#endif
-      about->last_gossip = GNUNET_TIME_absolute_get();
-      pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(p2p_dv_MESSAGE_NeighborInfo));
-      pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
-      pending_message->importance = default_dv_priority;
-      pending_message->timeout = GNUNET_TIME_relative_get_forever();
-      memcpy(&pending_message->recipient, &to->identity, sizeof(struct GNUNET_PeerIdentity));
-      pending_message->msg_size = sizeof(p2p_dv_MESSAGE_NeighborInfo);
-      message = (p2p_dv_MESSAGE_NeighborInfo *)pending_message->msg;
-      message->header.size = htons (sizeof (p2p_dv_MESSAGE_NeighborInfo));
-      message->header.type = htons (GNUNET_MESSAGE_TYPE_DV_GOSSIP);
-      message->cost = htonl (about->cost);
-      message->neighbor_id = htonl (about->our_id);
-
-      memcpy (&message->pkey, about->pkey, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
-      memcpy (&message->neighbor,
-              &about->identity, sizeof (struct GNUNET_PeerIdentity));
-
-      GNUNET_CONTAINER_DLL_insert_after (core_pending_head,
-                                         core_pending_tail,
-                                         core_pending_tail,
-                                         pending_message);
-
-      GNUNET_SCHEDULER_add_now(sched, try_core_send, NULL);
-      /*if (core_transmit_handle == NULL)
-        core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, default_dv_priority, GNUNET_TIME_relative_get_forever(), &to->identity, sizeof(p2p_dv_MESSAGE_NeighborInfo), &core_transmit_notify, NULL);*/
-
-    }
-
-  if (send_context->fast_gossip_list_head != NULL) /* If there are other peers in the fast list, schedule right away */
-    {
-#if DEBUG_DV_PEER_NUMBERS
-      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "DV SERVICE: still in fast send mode\n");
-#endif
-      send_context->task = GNUNET_SCHEDULER_add_now(sched, &neighbor_send_task, send_context);
-    }
-  else
-    {
-#if DEBUG_DV_PEER_NUMBERS
-      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "DV SERVICE: entering slow send mode\n");
-#endif
-      send_context->task = GNUNET_SCHEDULER_add_delayed(sched, GNUNET_DV_DEFAULT_SEND_INTERVAL, &neighbor_send_task, send_context);
-    }
-
-  return;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Core connected to %s (distance unknown)\n",
+             GNUNET_i2s (peer));
+  neighbor = GNUNET_new (struct DirectNeighbor);
+  neighbor->peer = *peer;
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_put (direct_neighbors,
+                                                   peer,
+                                                   neighbor,
+                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  neighbor->connected = GNUNET_YES;
+  neighbor->distance = 0; /* unknown */
 }
 
 
 /**
 }
 
 
 /**
- * Handle START-message.  This is the first message sent to us
- * by the client (can only be one!).
+ * Called for each 'target' in a neighbor table to free the associated memory.
  *
  *
- * @param cls closure (always NULL)
- * @param client identification of the client
- * @param message the actual message
+ * @param cls NULL
+ * @param key key of the value
+ * @param value value to free
+ * @return GNUNET_OK to continue to iterate
  */
  */
-static void
-handle_start (void *cls,
-              struct GNUNET_SERVER_Client *client,
-              const struct GNUNET_MessageHeader *message)
+static int
+free_targets (void *cls,
+             const struct GNUNET_PeerIdentity *key,
+             void *value)
 {
 {
-
-#if DEBUG_DV
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received `%s' request from client\n", "START");
-#endif
-
-  client_handle = client;
-
-  GNUNET_SERVER_client_keep(client_handle);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_free (value);
+  return GNUNET_OK;
 }
 
 }
 
-#if UNSIMPLER
+
 /**
 /**
- * Iterate over hash map entries for a distant neighbor,
- * if direct neighbor matches context call send message
+ * Multipeerhmap iterator for checking if a given route is
+ * (now) useful to this peer.
  *
  *
- * @param cls closure, a DV_SendContext
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
+ * @param cls the direct neighbor for the given route
+ * @param key key value stored under
+ * @param value a 'struct Target' that may or may not be useful; not that
+ *        the distance in 'target' does not include the first hop yet
+ * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
  */
  */
-int send_iterator (void *cls,
-                   const GNUNET_HashCode * key,
-                   void *value)
+static int
+check_possible_route (void *cls,
+                     const struct GNUNET_PeerIdentity *key,
+                     void *value)
 {
 {
-  struct DV_SendContext *send_context = cls;
-  struct DistantNeighbor *distant_neighbor = value;
+  struct DirectNeighbor *neighbor = cls;
+  struct Target *target = value;
+  struct Route *route;
 
 
-  if (memcmp(distant_neighbor->referrer, send_context->direct_peer, sizeof(struct GNUNET_PeerIdentity)) == 0) /* They match, send and free */
+  route = GNUNET_CONTAINER_multipeermap_get (all_routes,
+                                            key);
+  if (NULL != route)
+  {
+    if (ntohl (route->target.distance) > ntohl (target->distance) + 1)
     {
     {
-      send_message_via(&my_identity, distant_neighbor, send_context);
-      return GNUNET_NO;
+      /* this 'target' is cheaper than the existing route; switch to alternative route! */
+      move_route (route, ntohl (target->distance) + 1);
+      route->next_hop = neighbor;
+      send_distance_change_to_plugin (&target->peer, ntohl (target->distance) + 1);
     }
     }
+    return GNUNET_YES; /* got a route to this target already */
+  }
+  route = GNUNET_new (struct Route);
+  route->next_hop = neighbor;
+  route->target.distance = htonl (ntohl (target->distance) + 1);
+  route->target.peer = target->peer;
+  allocate_route (route, ntohl (route->target.distance));
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_put (all_routes,
+                                                   &route->target.peer,
+                                                   route,
+                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  send_connect_to_plugin (&route->target.peer, ntohl (target->distance));
   return GNUNET_YES;
 }
   return GNUNET_YES;
 }
-#endif
+
 
 /**
 
 /**
- * Service server's handler for message send requests (which come
- * bubbling up to us through the DV plugin).
+ * Multipeermap iterator for finding routes that were previously
+ * "hidden" due to a better route (called after a disconnect event).
  *
  *
- * @param cls closure
- * @param client identification of the client
- * @param message the actual message
+ * @param cls NULL
+ * @param key peer identity of the given direct neighbor
+ * @param value a 'struct DirectNeighbor' to check for additional routes
+ * @return GNUNET_YES to continue iteration
  */
  */
-void handle_dv_send_message (void *cls,
-                             struct GNUNET_SERVER_Client * client,
-                             const struct GNUNET_MessageHeader * message)
+static int
+refresh_routes (void *cls,
+               const struct GNUNET_PeerIdentity *key,
+               void *value)
 {
 {
-  struct GNUNET_DV_SendMessage *send_msg;
-  struct GNUNET_DV_SendResultMessage *send_result_msg;
-  struct PendingMessage *pending_message;
-  size_t address_len;
-  size_t message_size;
-  struct GNUNET_PeerIdentity *destination;
-  struct GNUNET_PeerIdentity *direct;
-  struct GNUNET_MessageHeader *message_buf;
-  char *temp_pos;
-  int offset;
-  static struct GNUNET_CRYPTO_HashAsciiEncoded dest_hash;
-  struct DV_SendContext *send_context;
-#if DEBUG_DV_MESSAGES
-  char *cbuf;
-  struct GNUNET_MessageHeader *packed_message;
-#endif
-
-  if (client_handle == NULL)
-  {
-    client_handle = client;
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Setting initial client handle, never received `%s' message?\n", "dv", "START");
-  }
-  else if (client_handle != client)
-  {
-    client_handle = client;
-    /* What should we do in this case, assert fail or just log the warning? */
-#if DEBUG_DV
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%s: Setting client handle (was a different client!)!\n", "dv");
-#endif
-  }
-
-  GNUNET_assert(ntohs(message->size) > sizeof(struct GNUNET_DV_SendMessage));
-  send_msg = (struct GNUNET_DV_SendMessage *)message;
-
-  address_len = ntohl(send_msg->addrlen);
-  GNUNET_assert(address_len == sizeof(struct GNUNET_PeerIdentity) * 2);
-  message_size = ntohs(message->size) - sizeof(struct GNUNET_DV_SendMessage) - address_len;
-  destination = GNUNET_malloc(sizeof(struct GNUNET_PeerIdentity));
-  direct = GNUNET_malloc(sizeof(struct GNUNET_PeerIdentity));
-  message_buf = GNUNET_malloc(message_size);
-
-  temp_pos = (char *)&send_msg[1]; /* Set pointer to end of message */
-  offset = 0; /* Offset starts at zero */
-
-  memcpy(destination, &temp_pos[offset], sizeof(struct GNUNET_PeerIdentity));
-  offset += sizeof(struct GNUNET_PeerIdentity);
-
-  memcpy(direct, &temp_pos[offset], sizeof(struct GNUNET_PeerIdentity));
-  offset += sizeof(struct GNUNET_PeerIdentity);
-
+  struct DirectNeighbor *neighbor = value;
 
 
-  memcpy(message_buf, &temp_pos[offset], message_size);
-  if (memcmp(&send_msg->target, destination, sizeof(struct GNUNET_PeerIdentity)) != 0)
-    {
-      GNUNET_CRYPTO_hash_to_enc (&destination->hashPubKey, &dest_hash); /* GNUNET_i2s won't properly work, need to hash one ourselves */
-      dest_hash.encoding[4] = '\0';
-      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: asked to send message to `%s', but address is for `%s'!", "DV SERVICE", GNUNET_i2s(&send_msg->target), (const char *)&dest_hash.encoding);
-    }
-
-#if DEBUG_DV_MESSAGES
-  cbuf = (char *)message_buf;
-  offset = 0;
-  while(offset < message_size)
-    {
-      packed_message = (struct GNUNET_MessageHeader *)&cbuf[offset];
-      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: DV PLUGIN SEND uid %u type %d to %s\n", my_short_id, ntohl(send_msg->uid), ntohs(packed_message->type), GNUNET_i2s(destination));
-      offset += ntohs(packed_message->size);
-    }
-  /*GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: DV PLUGIN SEND uid %u type %d to %s\n", my_short_id, ntohl(send_msg->uid), ntohs(message_buf->type), GNUNET_i2s(destination));*/
-#endif
-  GNUNET_CRYPTO_hash_to_enc (&destination->hashPubKey, &dest_hash); /* GNUNET_i2s won't properly work, need to hash one ourselves */
-  dest_hash.encoding[4] = '\0';
-  send_context = GNUNET_malloc(sizeof(struct DV_SendContext));
-
-  send_result_msg = GNUNET_malloc(sizeof(struct GNUNET_DV_SendResultMessage));
-  send_result_msg->header.size = htons(sizeof(struct GNUNET_DV_SendResultMessage));
-  send_result_msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT);
-  send_result_msg->uid = send_msg->uid; /* No need to ntohl->htonl this */
-
-  send_context->importance = ntohl(send_msg->priority);
-  send_context->timeout = send_msg->timeout;
-  send_context->direct_peer = direct;
-  send_context->distant_peer = destination;
-  send_context->message = message_buf;
-  send_context->message_size = message_size;
-  send_context->send_result = send_result_msg;
-#if DEBUG_DV_MESSAGES
-  send_context->uid = send_msg->uid;
-#endif
-
-  if (send_message_via(&my_identity, direct, send_context) != GNUNET_YES)
-    {
-      send_result_msg->result = htons(1);
-      pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(struct GNUNET_DV_SendResultMessage));
-      pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
-      memcpy(&pending_message[1], send_result_msg, sizeof(struct GNUNET_DV_SendResultMessage));
-      GNUNET_free(send_result_msg);
-
-      GNUNET_CONTAINER_DLL_insert_after(plugin_pending_head, plugin_pending_tail, plugin_pending_tail, pending_message);
-
-      if (client_handle != NULL)
-        {
-          if (plugin_transmit_handle == NULL)
-            {
-              plugin_transmit_handle = GNUNET_SERVER_notify_transmit_ready (client_handle,
-                                                                            sizeof(struct GNUNET_DV_SendResultMessage),
-                                                                            GNUNET_TIME_UNIT_FOREVER_REL,
-                                                                            &transmit_to_plugin, NULL);
-            }
-          else
-            {
-              GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to queue message for plugin, must be one in progress already!!\n");
-            }
-        }
-      GNUNET_CRYPTO_hash_to_enc (&destination->hashPubKey, &dest_hash); /* GNUNET_i2s won't properly work, need to hash one ourselves */
-      dest_hash.encoding[4] = '\0';
-      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s DV SEND failed to send message to destination `%s' via `%s'\n", my_short_id, (const char *)&dest_hash.encoding, GNUNET_i2s(direct));
-    }
-
-  /* In bizarro world GNUNET_SYSERR indicates that we succeeded */
-#if UNSIMPLER
-  if (GNUNET_SYSERR != GNUNET_CONTAINER_multihashmap_get_multiple(extended_neighbors, &destination->hashPubKey, &send_iterator, send_context))
-    {
-      send_result_msg->result = htons(1);
-      pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(struct GNUNET_DV_SendResultMessage));
-      pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
-      memcpy(&pending_message[1], send_result_msg, sizeof(struct GNUNET_DV_SendResultMessage));
-      GNUNET_free(send_result_msg);
-
-      GNUNET_CONTAINER_DLL_insert_after(plugin_pending_head, plugin_pending_tail, plugin_pending_tail, pending_message);
-
-      if (client_handle != NULL)
-        {
-          if (plugin_transmit_handle == NULL)
-            {
-              plugin_transmit_handle = GNUNET_SERVER_notify_transmit_ready (client_handle,
-                                                                            sizeof(struct GNUNET_DV_SendResultMessage),
-                                                                            GNUNET_TIME_UNIT_FOREVER_REL,
-                                                                            &transmit_to_plugin, NULL);
-            }
-          else
-            {
-              GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to queue message for plugin, must be one in progress already!!\n");
-            }
-        }
-      GNUNET_CRYPTO_hash_to_enc (&destination->hashPubKey, &dest_hash); /* GNUNET_i2s won't properly work, need to hash one ourselves */
-      dest_hash.encoding[4] = '\0';
-      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s DV SEND failed to send message to destination `%s' via `%s'\n", my_short_id, (const char *)&dest_hash.encoding, GNUNET_i2s(direct));
-    }
-#endif
-  GNUNET_free(message_buf);
-  GNUNET_free(send_context);
-  GNUNET_free(direct);
-  GNUNET_free(destination);
-
-  GNUNET_SERVER_receive_done(client, GNUNET_OK);
+  if ( (GNUNET_YES != neighbor->connected) ||
+       (DIRECT_NEIGHBOR_COST != neighbor->distance) )
+    return GNUNET_YES;
+  if (NULL != neighbor->neighbor_table)
+    GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table,
+                                          &check_possible_route,
+                                          neighbor);
+  return GNUNET_YES;
 }
 
 }
 
-/** Forward declarations **/
-static int handle_dv_gossip_message (void *cls,
-                                     const struct GNUNET_PeerIdentity *peer,
-                                     const struct GNUNET_MessageHeader *message,
-                                     struct GNUNET_TIME_Relative latency,
-                                     uint32_t distance);
-
-static int handle_dv_disconnect_message (void *cls,
-                                         const struct GNUNET_PeerIdentity *peer,
-                                         const struct GNUNET_MessageHeader *message,
-                                         struct GNUNET_TIME_Relative latency,
-                                         uint32_t distance);
-/** End forward declarations **/
-
 
 /**
 
 /**
- * List of handlers for the messages understood by this
- * service.
+ * Get distance information from 'atsi'.
  *
  *
- * Hmm... will we need to register some handlers with core and
- * some handlers with our server here?  Because core should be
- * getting the incoming DV messages (from whichever lower level
- * transport) and then our server should be getting messages
- * from the dv_plugin, right?
+ * @param atsi performance data
+ * @param atsi_count number of entries in atsi
+ * @return connected transport distance
  */
  */
-static struct GNUNET_CORE_MessageHandler core_handlers[] = {
-  {&handle_dv_data_message, GNUNET_MESSAGE_TYPE_DV_DATA, 0},
-  {&handle_dv_gossip_message, GNUNET_MESSAGE_TYPE_DV_GOSSIP, 0},
-  {&handle_dv_disconnect_message, GNUNET_MESSAGE_TYPE_DV_DISCONNECT, 0},
-  {NULL, 0, 0}
-};
-
-static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
-  {&handle_dv_send_message, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND, 0},
-  {&handle_start, NULL, GNUNET_MESSAGE_TYPE_DV_START, 0},
-  {NULL, NULL, 0, 0}
-};
-
-/**
- * Free a DistantNeighbor node, including removing it
- * from the referer's list.
- */
-static void
-distant_neighbor_free (struct DistantNeighbor *referee)
+static uint32_t
+get_atsi_distance (const struct GNUNET_ATS_Information *atsi,
+                   uint32_t atsi_count)
 {
 {
-  struct DirectNeighbor *referrer;
+  uint32_t i;
 
 
-  referrer = referee->referrer;
-  if (referrer != NULL)
-    {
-      GNUNET_CONTAINER_DLL_remove (referrer->referee_head,
-                         referrer->referee_tail, referee);
-    }
-  GNUNET_CONTAINER_heap_remove_node (neighbor_max_heap, referee->max_loc);
-  GNUNET_CONTAINER_heap_remove_node (neighbor_min_heap, referee->min_loc);
-  GNUNET_CONTAINER_multihashmap_remove_all (extended_neighbors,
-                                    &referee->identity.hashPubKey);
-  GNUNET_free_non_null (referee->pkey);
-  GNUNET_free (referee);
+  for (i = 0; i < atsi_count; i++)
+    if (ntohl (atsi[i].type) == GNUNET_ATS_QUALITY_NET_DISTANCE)
+      return (0 == ntohl (atsi->value)) ? DIRECT_NEIGHBOR_COST : ntohl (atsi->value); // FIXME: 0 check should not be required once ATS is fixed!
+  /* If we do not have explicit distance data, assume direct neighbor. */
+  return DIRECT_NEIGHBOR_COST;
 }
 
 }
 
-/**
- * Free a DirectNeighbor node, including removing it
- * from the referer's list.
- */
-static void
-direct_neighbor_free (struct DirectNeighbor *direct)
-{
-  struct NeighborSendContext *send_context;
-  struct FastGossipNeighborList *about_list;
-  struct FastGossipNeighborList *prev_about;
-
-  send_context = direct->send_context;
-
-  if (send_context->task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel(sched, send_context->task);
-
-  about_list = send_context->fast_gossip_list_head;
-  while (about_list != NULL)
-    {
-      GNUNET_CONTAINER_DLL_remove(send_context->fast_gossip_list_head, send_context->fast_gossip_list_tail, about_list);
-      prev_about = about_list;
-      about_list = about_list->next;
-      GNUNET_free(prev_about);
-    }
-  GNUNET_free(send_context);
-  GNUNET_free(direct);
-}
 
 /**
 
 /**
- * Multihashmap iterator for sending out disconnect messages
- * for a peer.
+ * Multipeermap iterator for freeing routes that go via a particular
+ * neighbor that disconnected and is thus no longer available.
  *
  *
- * @param cls the peer that was disconnected
+ * @param cls the direct neighbor that is now unavailable
  * @param key key value stored under
  * @param key key value stored under
- * @param value the direct neighbor to send disconnect to
+ * @param value a 'struct Route' that may or may not go via neighbor
  *
  * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
  */
  *
  * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
  */
-static int schedule_disconnect_messages (void *cls,
-                                    const GNUNET_HashCode * key,
-                                    void *value)
+static int
+cull_routes (void *cls,
+            const struct GNUNET_PeerIdentity *key,
+            void *value)
 {
 {
-  struct DisconnectContext *disconnect_context = cls;
-  struct DirectNeighbor *disconnected = disconnect_context->direct;
-  struct DirectNeighbor *notify = value;
-  struct PendingMessage *pending_message;
-  p2p_dv_MESSAGE_Disconnect *disconnect_message;
-
-  if (memcmp(&notify->identity, &disconnected->identity, sizeof(struct GNUNET_PeerIdentity)) == 0)
-    return GNUNET_YES; /* Don't send disconnect message to peer that disconnected! */
-
-  pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(p2p_dv_MESSAGE_Disconnect));
-  pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
-  pending_message->importance = default_dv_priority;
-  pending_message->timeout = GNUNET_TIME_relative_get_forever();
-  memcpy(&pending_message->recipient, &notify->identity, sizeof(struct GNUNET_PeerIdentity));
-  pending_message->msg_size = sizeof(p2p_dv_MESSAGE_Disconnect);
-  disconnect_message = (p2p_dv_MESSAGE_Disconnect *)pending_message->msg;
-  disconnect_message->header.size = htons (sizeof (p2p_dv_MESSAGE_Disconnect));
-  disconnect_message->header.type = htons (GNUNET_MESSAGE_TYPE_DV_DISCONNECT);
-  disconnect_message->peer_id = htonl(disconnect_context->distant->our_id);
-
-  GNUNET_CONTAINER_DLL_insert_after (core_pending_head,
-                                     core_pending_tail,
-                                     core_pending_tail,
-                                     pending_message);
-
-  GNUNET_SCHEDULER_add_now(sched, try_core_send, NULL);
-  /*if (core_transmit_handle == NULL)
-    core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, default_dv_priority, GNUNET_TIME_relative_get_forever(), &notify->identity, sizeof(p2p_dv_MESSAGE_Disconnect), &core_transmit_notify, NULL);*/
-
+  struct DirectNeighbor *neighbor = cls;
+  struct Route *route = value;
+
+  if (route->next_hop != neighbor)
+    return GNUNET_YES; /* not affected */
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_remove (all_routes, key, value));
+  release_route (route);
+  send_disconnect_to_plugin (&route->target.peer);
+  GNUNET_free (route);
   return GNUNET_YES;
 }
 
   return GNUNET_YES;
 }
 
-/**
- * Multihashmap iterator for freeing extended neighbors.
- *
- * @param cls NULL
- * @param key key value stored under
- * @param value the distant neighbor to be freed
- *
- * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
- */
-static int free_extended_neighbors (void *cls,
-                                    const GNUNET_HashCode * key,
-                                    void *value)
-{
-  struct DistantNeighbor *distant = value;
-  distant_neighbor_free(distant);
-  return GNUNET_YES;
-}
 
 /**
 
 /**
- * Multihashmap iterator for freeing direct neighbors.
- *
- * @param cls NULL
- * @param key key value stored under
- * @param value the direct neighbor to be freed
+ * Handle the case that a direct connection to a peer is
+ * disrupted.  Remove all routes via that peer and
+ * stop the consensus with it.
  *
  *
- * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
+ * @param neighbor peer that was disconnected (or at least is no
+ *    longer at distance 1)
  */
  */
-static int free_direct_neighbors (void *cls,
-                                    const GNUNET_HashCode * key,
-                                    void *value)
+static void
+handle_direct_disconnect (struct DirectNeighbor *neighbor)
 {
 {
-  struct DirectNeighbor *direct = value;
-  direct_neighbor_free(direct);
-  return GNUNET_YES;
+  GNUNET_CONTAINER_multipeermap_iterate (all_routes,
+                                        &cull_routes,
+                                         neighbor);
+  if (NULL != neighbor->cth)
+  {
+    GNUNET_CORE_notify_transmit_ready_cancel (neighbor->cth);
+    neighbor->cth = NULL;
+  }
+
+  if (NULL != neighbor->direct_route)
+  {
+    release_route (neighbor->direct_route);
+    GNUNET_free (neighbor->direct_route);
+    neighbor->direct_route = NULL;
+  }
+
+  if (NULL != neighbor->neighbor_table_consensus)
+  {
+    GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table_consensus,
+                                          &free_targets,
+                                          NULL);
+    GNUNET_CONTAINER_multipeermap_destroy (neighbor->neighbor_table_consensus);
+    neighbor->neighbor_table_consensus = NULL;
+  }
+  if (NULL != neighbor->neighbor_table)
+  {
+    GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table,
+                                          &free_targets,
+                                          NULL);
+    GNUNET_CONTAINER_multipeermap_destroy (neighbor->neighbor_table);
+    neighbor->neighbor_table = NULL;
+  }
+  if (NULL != neighbor->set_op)
+  {
+    GNUNET_SET_operation_cancel (neighbor->set_op);
+    neighbor->set_op = NULL;
+  }
+  if (NULL != neighbor->my_set)
+  {
+    GNUNET_SET_destroy (neighbor->my_set);
+    neighbor->my_set = NULL;
+  }
+  if (NULL != neighbor->listen_handle)
+  {
+    GNUNET_SET_listen_cancel (neighbor->listen_handle);
+    neighbor->listen_handle = NULL;
+  }
+  if (GNUNET_SCHEDULER_NO_TASK != neighbor->initiate_task)
+  {
+    GNUNET_SCHEDULER_cancel (neighbor->initiate_task);
+    neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK;
+  }
 }
 
 
 /**
 }
 
 
 /**
- * Task run during shutdown.
+ * Function that is called with QoS information about an address; used
+ * to update our current distance to another peer.
  *
  *
- * @param cls unused
- * @param tc unused
+ * @param cls closure
+ * @param address the address
+ * @param active is this address in active use
+ * @param bandwidth_out assigned outbound bandwidth for the connection
+ * @param bandwidth_in assigned inbound bandwidth for the connection
+ * @param ats performance data for the address (as far as known)
+ * @param ats_count number of performance records in 'ats'
  */
 static void
  */
 static void
-shutdown_task (void *cls,
-               const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-#if DEBUG_DV
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "calling CORE_DISCONNECT\n");
-  GNUNET_CONTAINER_multihashmap_iterate(extended_neighbors, &print_neighbors, NULL);
-#endif
-  GNUNET_CONTAINER_multihashmap_iterate(extended_neighbors, &free_extended_neighbors, NULL);
-  GNUNET_CONTAINER_multihashmap_destroy(extended_neighbors);
-  GNUNET_CONTAINER_multihashmap_iterate(direct_neighbors, &free_direct_neighbors, NULL);
-  GNUNET_CONTAINER_multihashmap_destroy(direct_neighbors);
-
-  GNUNET_CONTAINER_heap_destroy(neighbor_max_heap);
-  GNUNET_CONTAINER_heap_destroy(neighbor_min_heap);
-
-  GNUNET_CORE_disconnect (coreAPI);
-  GNUNET_PEERINFO_disconnect(peerinfo_handle);
-  GNUNET_SERVER_mst_destroy(coreMST);
-  GNUNET_free_non_null(my_short_id);
-#if DEBUG_DV
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "CORE_DISCONNECT completed\n");
-#endif
-}
-
-/**
- * To be called on core init/fail.
- */
-void core_init (void *cls,
-                struct GNUNET_CORE_Handle * server,
-                const struct GNUNET_PeerIdentity *identity,
-                const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded * publicKey)
+handle_ats_update (void *cls,
+                  const struct GNUNET_HELLO_Address *address,
+                  int active,
+                  struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
+                  struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
+                  const struct GNUNET_ATS_Information *ats,
+                  uint32_t ats_count)
 {
 {
+  struct DirectNeighbor *neighbor;
+  uint32_t distance;
 
 
-  if (server == NULL)
+  if (GNUNET_NO == active)
+       return;
+  distance = get_atsi_distance (ats, ats_count);
+  /*
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "ATS says distance to %s is %u\n",
+             GNUNET_i2s (&address->peer),
+             (unsigned int) distance);*/
+  /* check if entry exists */
+  neighbor = GNUNET_CONTAINER_multipeermap_get (direct_neighbors,
+                                               &address->peer);
+  if (NULL != neighbor)
+  {
+    if ( (DIRECT_NEIGHBOR_COST == neighbor->distance) &&
+        (DIRECT_NEIGHBOR_COST == distance) )
+      return; /* no change */
+    if (DIRECT_NEIGHBOR_COST == neighbor->distance)
     {
     {
-      GNUNET_SCHEDULER_cancel(sched, cleanup_task);
-      GNUNET_SCHEDULER_add_now(sched, &shutdown_task, NULL);
+      neighbor->distance = distance;
+      GNUNET_STATISTICS_update (stats,
+                               "# peers connected (1-hop)",
+                               -1, GNUNET_NO);
+      handle_direct_disconnect (neighbor);
+      GNUNET_CONTAINER_multipeermap_iterate (direct_neighbors,
+                                            &refresh_routes,
+                                            NULL);
       return;
     }
       return;
     }
-#if DEBUG_DV
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Core connection initialized, I am peer: %s\n", "dv", GNUNET_i2s(identity));
-#endif
-  memcpy(&my_identity, identity, sizeof(struct GNUNET_PeerIdentity));
-  my_short_id = GNUNET_strdup(GNUNET_i2s(&my_identity));
-  coreAPI = server;
+    neighbor->distance = distance;
+    if (DIRECT_NEIGHBOR_COST != neighbor->distance)
+      return;
+    if (GNUNET_YES != neighbor->connected)
+      return;
+    handle_direct_connect (neighbor);
+    return;
+  }
+  neighbor = GNUNET_new (struct DirectNeighbor);
+  neighbor->peer = address->peer;
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_put (direct_neighbors,
+                                                   &address->peer,
+                                                   neighbor,
+                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  neighbor->connected = GNUNET_NO; /* not yet */
+  neighbor->distance = distance;
 }
 
 
 }
 
 
-#if PKEY_NO_NEIGHBOR_ON_ADD
 /**
 /**
- * Iterator over hash map entries.
+ * Check if a target was removed from the set of the other peer; if so,
+ * if we also used it for our route, we need to remove it from our
+ * 'all_routes' set (and later check if an alternative path now exists).
  *
  *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
+ * @param cls the 'struct DirectNeighbor'
+ * @param key peer identity for the target
+ * @param value a 'struct Target' previously reachable via the given neighbor
  */
  */
-static int add_pkey_to_extended (void *cls,
-                                 const GNUNET_HashCode * key,
-                                 void *value)
+static int
+check_target_removed (void *cls,
+                     const struct GNUNET_PeerIdentity *key,
+                     void *value)
 {
 {
-  struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *pkey = cls;
-  struct DistantNeighbor *distant_neighbor = value;
+  struct DirectNeighbor *neighbor = cls;
+  struct Target *new_target;
+  struct Route *current_route;
 
 
-  if (distant_neighbor->pkey == NULL)
+  new_target = GNUNET_CONTAINER_multipeermap_get (neighbor->neighbor_table_consensus,
+                                                 key);
+  if (NULL == new_target)
   {
   {
-    distant_neighbor->pkey = GNUNET_malloc(sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
-    memcpy(distant_neighbor->pkey, pkey, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+    /* target was revoked, check if it was used */
+    current_route = GNUNET_CONTAINER_multipeermap_get (all_routes,
+                                                      key);
+    if ( (NULL == current_route) ||
+        (current_route->next_hop != neighbor) )
+    {
+      /* didn't matter, wasn't used */
+      return GNUNET_OK;
+    }
+    /* remove existing route */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Lost route to %s\n",
+               GNUNET_i2s (&current_route->target.peer));
+    GNUNET_assert (GNUNET_YES ==
+                  GNUNET_CONTAINER_multipeermap_remove (all_routes, key, current_route));
+    send_disconnect_to_plugin (&current_route->target.peer);
+    GNUNET_free (current_route);
+    neighbor->target_removed = GNUNET_YES;
+    return GNUNET_OK;
   }
   }
-
-  return GNUNET_YES;
+  return GNUNET_OK;
 }
 }
-#endif
+
 
 /**
 
 /**
- * Iterator over hash map entries.
+ * Check if a target was added to the set of the other peer; if it
+ * was added or impoves the existing route, do the needed updates.
  *
  *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
+ * @param cls the 'struct DirectNeighbor'
+ * @param key peer identity for the target
+ * @param value a 'struct Target' now reachable via the given neighbor
  */
  */
-static int update_matching_neighbors (void *cls,
-                                      const GNUNET_HashCode * key,
-                                      void *value)
+static int
+check_target_added (void *cls,
+                   const struct GNUNET_PeerIdentity *key,
+                   void *value)
 {
 {
-  struct NeighborUpdateInfo * update_info = cls;
-  struct DistantNeighbor *distant_neighbor = value;
-
-  if (update_info->referrer == distant_neighbor->referrer) /* Direct neighbor matches, update it's info and return GNUNET_NO */
+  struct DirectNeighbor *neighbor = cls;
+  struct Target *target = value;
+  struct Route *current_route;
+
+  /* target was revoked, check if it was used */
+  current_route = GNUNET_CONTAINER_multipeermap_get (all_routes,
+                                                    key);
+  if (NULL != current_route)
   {
   {
-    /* same referrer, cost change! */
-    GNUNET_CONTAINER_heap_update_cost (neighbor_max_heap,
-                                       update_info->neighbor->max_loc, update_info->cost);
-    GNUNET_CONTAINER_heap_update_cost (neighbor_min_heap,
-                                       update_info->neighbor->min_loc, update_info->cost);
-    update_info->neighbor->last_activity = update_info->now;
-    update_info->neighbor->cost = update_info->cost;
-    update_info->neighbor->referrer_id = update_info->referrer_peer_id;
-    return GNUNET_NO;
+    /* route exists */
+    if (current_route->next_hop == neighbor)
+    {
+      /* we had the same route before, no change */
+      if (ntohl (target->distance) + 1 != ntohl (current_route->target.distance))
+      {
+       current_route->target.distance = htonl (ntohl (target->distance) + 1);
+       send_distance_change_to_plugin (&target->peer, ntohl (target->distance) + 1);
+      }
+      return GNUNET_OK;
+    }
+    if (ntohl (current_route->target.distance) >= ntohl (target->distance) + 1)
+    {
+      /* alternative, shorter route exists, ignore */
+      return GNUNET_OK;
+    }
+    /* new route is better than the existing one, take over! */
+    /* NOTE: minor security issue: malicious peers may advertise
+       very short routes to take over longer paths; as we don't
+       check that the shorter routes actually work, a malicious
+       direct neighbor can use this to DoS our long routes */
+    current_route->next_hop = neighbor;
+    current_route->target.distance = htonl (ntohl (target->distance) + 1);
+    send_distance_change_to_plugin (&target->peer, ntohl (target->distance) + 1);
+    return GNUNET_OK;
   }
   }
-
-  return GNUNET_YES;
+  /* new route */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Discovered new route to %s using %u hops\n",
+             GNUNET_i2s (&target->peer),
+             (unsigned int) (ntohl (target->distance) + 1));
+  current_route = GNUNET_new (struct Route);
+  current_route->next_hop = neighbor;
+  current_route->target.peer = target->peer;
+  current_route->target.distance = htonl (ntohl (target->distance) + 1);
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_put (all_routes,
+                                                   &current_route->target.peer,
+                                                   current_route,
+                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  send_connect_to_plugin (&current_route->target.peer,
+                         ntohl (current_route->target.distance));
+  return GNUNET_OK;
 }
 
 
 /**
 }
 
 
 /**
- * Iterate over all current direct peers, add DISTANT newly connected
- * peer to the fast gossip list for that peer so we get DV routing
- * information out as fast as possible!
- *
- * @param cls the newly connected neighbor we will gossip about
- * @param key the hashcode of the peer
- * @param value the direct neighbor we should gossip to
+ * Callback for set operation results. Called for each element
+ * in the result set.
+ * We have learned a new route from the other peer.  Add it to the
+ * route set we're building.
  *
  *
- * @return GNUNET_YES to continue iteration, GNUNET_NO otherwise
+ * @param cls the 'struct DirectNeighbor' we're building the consensus with
+ * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
+ * @param status see enum GNUNET_SET_Status
  */
  */
-static int add_distant_all_direct_neighbors (void *cls,
-                                     const GNUNET_HashCode * key,
-                                     void *value)
+static void
+handle_set_union_result (void *cls,
+                        const struct GNUNET_SET_Element *element,
+                        enum GNUNET_SET_Status status)
 {
 {
-  struct DirectNeighbor *direct = (struct DirectNeighbor *)value;
-  struct DistantNeighbor *distant = (struct DistantNeighbor *)cls;
-  struct NeighborSendContext *send_context = direct->send_context;
-  struct FastGossipNeighborList *gossip_entry;
-#if DEBUG_DV
-  char *encPeerAbout;
-  char *encPeerTo;
-#endif
-
-  if (distant == NULL)
+  struct DirectNeighbor *neighbor = cls;
+  struct Target *target;
+  char *status_str;
+
+  switch (status) {
+    case GNUNET_SET_STATUS_OK:
+      status_str = "GNUNET_SET_STATUS_OK";
+      break;
+    case GNUNET_SET_STATUS_TIMEOUT:
+      status_str = "GNUNET_SET_STATUS_TIMEOUT";
+      break;
+    case GNUNET_SET_STATUS_FAILURE:
+      status_str = "GNUNET_SET_STATUS_FAILURE";
+      break;
+    case GNUNET_SET_STATUS_HALF_DONE:
+      status_str = "GNUNET_SET_STATUS_HALF_DONE";
+      break;
+    case GNUNET_SET_STATUS_DONE:
+      status_str = "GNUNET_SET_STATUS_DONE";
+      break;
+    default:
+      status_str = "UNDEFINED";
+      break;
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Got SET union result: %s\n",
+             status_str);
+  switch (status)
+  {
+  case GNUNET_SET_STATUS_OK:
+    if (sizeof (struct Target) != element->size)
+    {
+      GNUNET_break_op (0);
+      return;
+    }
+    target = GNUNET_new (struct Target);
+    memcpy (target, element->data, sizeof (struct Target));
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Received information about peer `%s' with distance %u\n",
+                GNUNET_i2s (&target->peer), ntohl(target->distance));
+    if (NULL == neighbor->neighbor_table_consensus)
+      neighbor->neighbor_table_consensus = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
+    if (GNUNET_YES !=
+       GNUNET_CONTAINER_multipeermap_put (neighbor->neighbor_table_consensus,
+                                          &target->peer,
+                                          target,
+                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
     {
     {
-      return GNUNET_YES;
+      GNUNET_break_op (0);
+      GNUNET_free (target);
     }
     }
-
-  if (memcmp(&direct->identity, &distant->identity, sizeof(struct GNUNET_PeerIdentity)) == 0)
+    break;
+  case GNUNET_SET_STATUS_TIMEOUT:
+  case GNUNET_SET_STATUS_FAILURE:
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Failed to establish DV union, will try again later\n");
+    neighbor->set_op = NULL;
+    if (NULL != neighbor->neighbor_table_consensus)
     {
     {
-      return GNUNET_YES; /* Don't gossip to a peer about itself! */
+      GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table_consensus,
+                                            &free_targets,
+                                            NULL);
+      GNUNET_CONTAINER_multipeermap_destroy (neighbor->neighbor_table_consensus);
+      neighbor->neighbor_table_consensus = NULL;
     }
     }
-
-#if SUPPORT_HIDING
-  if (distant->hidden == GNUNET_YES)
-    return GNUNET_YES; /* This peer should not be gossipped about (hidden) */
-#endif
-  gossip_entry = GNUNET_malloc(sizeof(struct FastGossipNeighborList));
-  gossip_entry->about = distant;
-
-  GNUNET_CONTAINER_DLL_insert_after(send_context->fast_gossip_list_head,
-                                    send_context->fast_gossip_list_tail,
-                                    send_context->fast_gossip_list_tail,
-                                    gossip_entry);
-#if DEBUG_DV
-  encPeerAbout = GNUNET_strdup(GNUNET_i2s(&distant->identity));
-  encPeerTo = GNUNET_strdup(GNUNET_i2s(&direct->identity));
-
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Fast send info about peer %s id %u for directly connected peer %s\n",
-             GNUNET_i2s(&my_identity),
-             encPeerAbout, distant->our_id, encPeerTo);
-  GNUNET_free(encPeerAbout);
-  GNUNET_free(encPeerTo);
-#endif
-  /*if (send_context->task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel(sched, send_context->task);*/
-
-  send_context->task = GNUNET_SCHEDULER_add_now(sched, &neighbor_send_task, send_context);
-  return GNUNET_YES;
+    if (0 < memcmp (&neighbor->peer,
+                   &my_identity,
+                   sizeof (struct GNUNET_PeerIdentity)))
+      neighbor->initiate_task = GNUNET_SCHEDULER_add_delayed (GNUNET_DV_CONSENSUS_FREQUENCY,
+                                                             &initiate_set_union,
+                                                             neighbor);
+    break;
+  case GNUNET_SET_STATUS_HALF_DONE:
+    break;
+  case GNUNET_SET_STATUS_DONE:
+    /* we got all of our updates; integrate routing table! */
+    neighbor->target_removed = GNUNET_NO;
+    if (NULL == neighbor->neighbor_table_consensus)
+      neighbor->neighbor_table_consensus = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
+    if (NULL != neighbor->neighbor_table)
+      GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table,
+                                           &check_target_removed,
+                                           neighbor);
+    if (GNUNET_YES == neighbor->target_removed)
+    {
+      /* check if we got an alternative for the removed routes */
+      GNUNET_CONTAINER_multipeermap_iterate (direct_neighbors,
+                                             &refresh_routes,
+                                             NULL);
+    }
+    /* add targets that appeared (and check for improved routes) */
+    GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table_consensus,
+                                           &check_target_added,
+                                           neighbor);
+    if (NULL != neighbor->neighbor_table)
+    {
+      GNUNET_CONTAINER_multipeermap_iterate (neighbor->neighbor_table,
+                                             &free_targets,
+                                             NULL);
+      GNUNET_CONTAINER_multipeermap_destroy (neighbor->neighbor_table);
+      neighbor->neighbor_table = NULL;
+    }
+    neighbor->neighbor_table = neighbor->neighbor_table_consensus;
+    neighbor->neighbor_table_consensus = NULL;
+
+    /* operation done, schedule next run! */
+    neighbor->set_op = NULL;
+    if (0 < memcmp (&neighbor->peer,
+                   &my_identity,
+                   sizeof (struct GNUNET_PeerIdentity)))
+      neighbor->initiate_task = GNUNET_SCHEDULER_add_delayed (GNUNET_DV_CONSENSUS_FREQUENCY,
+                                                             &initiate_set_union,
+                                                             neighbor);
+    break;
+  default:
+    GNUNET_break (0);
+    return;
+  }
 }
 
 }
 
+
 /**
 /**
- * Callback for hello address creation.
+ * Start creating a new DV set union construction, our neighbour has
+ * asked for it (callback for listening peer).
  *
  *
- * @param cls closure, a struct HelloContext
- * @param max maximum number of bytes that can be written to buf
- * @param buf where to write the address information
- *
- * @return number of bytes written, 0 to signal the
- *         end of the iteration.
+ * @param cls the 'struct DirectNeighbor' of the peer we're building
+ *        a routing consensus with
+ * @param other_peer the other peer
+ * @param context_msg message with application specific information from
+ *        the other peer
+ * @param request request from the other peer, use GNUNET_SET_accept
+ *        to accept it, otherwise the request will be refused
+ *        Note that we don't use a return value here, as it is also
+ *        necessary to specify the set we want to do the operation with,
+ *        whith sometimes can be derived from the context message.
+ *        Also necessary to specify the timeout.
  */
  */
-static size_t
-generate_hello_address (void *cls, size_t max, void *buf)
+static void
+listen_set_union (void *cls,
+                 const struct GNUNET_PeerIdentity *other_peer,
+                 const struct GNUNET_MessageHeader *context_msg,
+                 struct GNUNET_SET_Request *request)
 {
 {
-  struct HelloContext *hello_context = cls;
-  char *addr_buffer;
-  size_t offset;
-  size_t size;
-  size_t ret;
-
-  if (hello_context->addresses_to_add == 0)
-    return 0;
+  struct DirectNeighbor *neighbor = cls;
 
 
-  /* Hello "address" will be concatenation of distant peer and direct peer identities */
-  size = 2 * sizeof(struct GNUNET_PeerIdentity);
-  GNUNET_assert(max >= size);
-
-  addr_buffer = GNUNET_malloc(size);
-  offset = 0;
-  /* Copy the distant peer identity to buffer */
-  memcpy(addr_buffer, &hello_context->distant_peer, sizeof(struct GNUNET_PeerIdentity));
-  offset += sizeof(struct GNUNET_PeerIdentity);
-  /* Copy the direct peer identity to buffer */
-  memcpy(&addr_buffer[offset], hello_context->direct_peer, sizeof(struct GNUNET_PeerIdentity));
-  ret = GNUNET_HELLO_add_address ("dv",
-                                  GNUNET_TIME_relative_to_absolute
-                                  (GNUNET_TIME_UNIT_HOURS), addr_buffer, size,
-                                  buf, max);
-
-  hello_context->addresses_to_add--;
-
-  GNUNET_free(addr_buffer);
-  return ret;
+  if (NULL == request)
+    return; /* why??? */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Starting to create consensus with %s\n",
+             GNUNET_i2s (&neighbor->peer));
+  if (NULL != neighbor->set_op)
+  {
+    GNUNET_SET_operation_cancel (neighbor->set_op);
+    neighbor->set_op = NULL;
+  }
+  if (NULL != neighbor->my_set)
+  {
+    GNUNET_SET_destroy (neighbor->my_set);
+    neighbor->my_set = NULL;
+  }
+  neighbor->my_set = GNUNET_SET_create (cfg,
+                                       GNUNET_SET_OPERATION_UNION);
+  neighbor->set_op = GNUNET_SET_accept (request,
+                                       GNUNET_SET_RESULT_ADDED,
+                                       &handle_set_union_result,
+                                       neighbor);
+  neighbor->consensus_insertion_offset = 0;
+  neighbor->consensus_insertion_distance = 0;
+  neighbor->consensus_elements = 0;
+  build_set (neighbor);
 }
 
 
 /**
 }
 
 
 /**
- * Handles when a peer is either added due to being newly connected
- * or having been gossiped about, also called when the cost for a neighbor
- * needs to be updated.
- *
- * @param peer identity of the peer whose info is being added/updated
- * @param pkey public key of the peer whose info is being added/updated
- * @param referrer_peer_id id to use when sending to 'peer'
- * @param referrer if this is a gossiped peer, who did we hear it from?
- * @param cost the cost of communicating with this peer via 'referrer'
+ * Start creating a new DV set union by initiating the connection.
  *
  *
- * @return the added neighbor, the updated neighbor or NULL (neighbor
- *         not added)
+ * @param cls the 'struct DirectNeighbor' of the peer we're building
+ *        a routing consensus with
+ * @param tc scheduler context
  */
  */
-static struct DistantNeighbor *
-addUpdateNeighbor (const struct GNUNET_PeerIdentity * peer, struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *pkey,
-                   unsigned int referrer_peer_id,
-                   struct DirectNeighbor *referrer, unsigned int cost)
+static void
+initiate_set_union (void *cls,
+                   const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
 {
-  struct DistantNeighbor *neighbor;
-  struct DistantNeighbor *max;
-  struct GNUNET_TIME_Absolute now;
-  struct NeighborUpdateInfo *neighbor_update;
-  struct HelloContext *hello_context;
-  struct GNUNET_HELLO_Message *hello_msg;
-  unsigned int our_id;
-  char *addr1;
-  char *addr2;
-
-#if DEBUG_DV_PEER_NUMBERS
-  char *encAbout;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s Received sender id (%u)!\n", "DV SERVICE", referrer_peer_id);
-#endif
-
-  now = GNUNET_TIME_absolute_get ();
-  neighbor = GNUNET_CONTAINER_multihashmap_get (extended_neighbors,
-                                                &peer->hashPubKey);
-  neighbor_update = GNUNET_malloc(sizeof(struct NeighborUpdateInfo));
-  neighbor_update->neighbor = neighbor;
-  neighbor_update->cost = cost;
-  neighbor_update->now = now;
-  neighbor_update->referrer = referrer;
-  neighbor_update->referrer_peer_id = referrer_peer_id;
-
-  if (neighbor != NULL)
-    {
-#if USE_PEER_ID
-      memcpy(&our_id, &neighbor->identity, sizeof(unsigned int));
-#else
-      our_id = neighbor->our_id;
-#endif
-    }
-  else
-    {
-#if USE_PEER_ID
-      memcpy(&our_id, peer, sizeof(unsigned int));
-#else
-      our_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, RAND_MAX - 1) + 1;
-#endif
-    }
-
-  /* Either we do not know this peer, or we already do but via a different immediate peer */
-  if ((neighbor == NULL) ||
-      (GNUNET_CONTAINER_multihashmap_get_multiple(extended_neighbors,
-                                                  &peer->hashPubKey,
-                                                  &update_matching_neighbors,
-                                                  neighbor_update) != GNUNET_SYSERR))
-    {
-
-#if AT_MOST_ONE
-    if ((neighbor != NULL) && (cost < neighbor->cost)) /* New cost is less than old, remove old */
-      {
-        distant_neighbor_free(neighbor);
-      }
-    else if (neighbor != NULL) /* Only allow one DV connection to each peer */
-      {
-        return NULL;
-      }
-#endif
-      /* new neighbor! */
-      if (cost > fisheye_depth)
-        {
-          /* too costly */
-          GNUNET_free(neighbor_update);
-          return NULL;
-        }
-
-#if DEBUG_DV_PEER_NUMBERS
-      encAbout = GNUNET_strdup(GNUNET_i2s(peer));
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%s: %s Chose NEW id (%u) for peer %s!\n", GNUNET_i2s(&my_identity), "DV SERVICE", our_id, encAbout);
-      GNUNET_free(encAbout);
-#endif
-
-      if (max_table_size <=
-          GNUNET_CONTAINER_multihashmap_size (extended_neighbors))
-        {
-          /* remove most expensive entry */
-          max = GNUNET_CONTAINER_heap_peek (neighbor_max_heap);
-          GNUNET_assert(max != NULL);
-          if (cost > max->cost)
-            {
-              /* new entry most expensive, don't create */
-              GNUNET_free(neighbor_update);
-              return NULL;
-            }
-          if (max->cost > 1)
-            {
-              /* only free if this is not a direct connection;
-                 we could theoretically have more direct
-                 connections than DV entries allowed total! */
-              distant_neighbor_free (max);
-            }
-        }
-
-      neighbor = GNUNET_malloc (sizeof (struct DistantNeighbor));
-      GNUNET_CONTAINER_DLL_insert (referrer->referee_head,
-                         referrer->referee_tail, neighbor);
-      neighbor->max_loc = GNUNET_CONTAINER_heap_insert (neighbor_max_heap,
-                                                        neighbor, cost);
-      neighbor->min_loc = GNUNET_CONTAINER_heap_insert (neighbor_min_heap,
-                                                        neighbor, cost);
-      neighbor->referrer = referrer;
-      memcpy (&neighbor->identity, peer, sizeof (struct GNUNET_PeerIdentity));
-      if (pkey != NULL) /* pkey will be null on direct neighbor addition */
-      {
-        neighbor->pkey = GNUNET_malloc(sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
-        memcpy (neighbor->pkey, pkey, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
-      }
-      else
-        neighbor->pkey = pkey;
-
-      neighbor->last_activity = now;
-      neighbor->cost = cost;
-      neighbor->referrer_id = referrer_peer_id;
-      neighbor->our_id = our_id;
-      neighbor->hidden =
-        (cost == DIRECT_NEIGHBOR_COST) ? (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 4) ==
-                       0) : GNUNET_NO;
-
-      GNUNET_CONTAINER_multihashmap_put (extended_neighbors, &peer->hashPubKey,
-                                 neighbor,
-                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
-      if (cost != DIRECT_NEIGHBOR_COST)
-        {
-          /* Added neighbor, now send HELLO to transport */
-          hello_context = GNUNET_malloc(sizeof(struct HelloContext));
-          hello_context->direct_peer = &referrer->identity;
-          memcpy(&hello_context->distant_peer, peer, sizeof(struct GNUNET_PeerIdentity));
-          hello_context->addresses_to_add = 1;
-          hello_msg = GNUNET_HELLO_create(pkey, &generate_hello_address, hello_context);
-          GNUNET_assert(memcmp(hello_context->direct_peer, &hello_context->distant_peer, sizeof(struct GNUNET_PeerIdentity)) != 0);
-          addr1 = GNUNET_strdup(GNUNET_i2s(hello_context->direct_peer));
-          addr2 = GNUNET_strdup(GNUNET_i2s(&hello_context->distant_peer));
-          GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: GIVING HELLO size %d for %s via %s to TRANSPORT\n", my_short_id, GNUNET_HELLO_size(hello_msg), addr2, addr1);
-          GNUNET_free(addr1);
-          GNUNET_free(addr2);
-          send_to_plugin(hello_context->direct_peer, GNUNET_HELLO_get_header(hello_msg), GNUNET_HELLO_size(hello_msg), &hello_context->distant_peer, cost);
-          GNUNET_free(hello_context);
-          GNUNET_free(hello_msg);
-        }
-
-    }
-  else
-    {
-#if DEBUG_DV_GOSSIP
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%s: Already know peer %s distance %d, referrer id %d!\n", "dv", GNUNET_i2s(peer), cost, referrer_peer_id);
-#endif
-    }
-#if DEBUG_DV
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%s: Size of extended_neighbors is %d\n", "dv", GNUNET_CONTAINER_multihashmap_size(extended_neighbors));
-#endif
+  struct DirectNeighbor *neighbor = cls;
 
 
-  GNUNET_free(neighbor_update);
-  return neighbor;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Initiating SET union with peer `%s'\n",
+             GNUNET_i2s (&neighbor->peer));
+  neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK;
+  neighbor->my_set = GNUNET_SET_create (cfg,
+                                       GNUNET_SET_OPERATION_UNION);
+  neighbor->set_op = GNUNET_SET_prepare (&neighbor->peer,
+                                         &neighbor->real_session_id,
+                                         NULL,
+                                         0 /* FIXME: salt */,
+                                         GNUNET_SET_RESULT_ADDED,
+                                         &handle_set_union_result,
+                                         neighbor);
+  neighbor->consensus_insertion_offset = 0;
+  neighbor->consensus_insertion_distance = 0;
+  neighbor->consensus_elements = 0;
+  build_set (neighbor);
 }
 
 
 /**
 }
 
 
 /**
- * Core handler for dv disconnect messages.  These will be used
- * by us to tell transport via the dv plugin that a peer can
- * no longer be contacted by us via a certain address.  We should
- * then propagate these messages on, given that the distance to
- * the peer indicates we would have gossiped about it to others.
+ * Core handler for DV data messages.  Whatever this message
+ * contains all we really have to do is rip it out of its
+ * DV layering and give it to our pal the DV plugin to report
+ * in with.
  *
  * @param cls closure
  * @param peer peer which sent the message (immediate sender)
  * @param message the message
  *
  * @param cls closure
  * @param peer peer which sent the message (immediate sender)
  * @param message the message
- * @param latency the latency of the connection we received the message from
- * @param distance the distance to the immediate peer
+ * @return GNUNET_OK on success, GNUNET_SYSERR if the other peer violated the protocol
  */
  */
-static int handle_dv_disconnect_message (void *cls,
-                                         const struct GNUNET_PeerIdentity *peer,
-                                         const struct GNUNET_MessageHeader *message,
-                                         struct GNUNET_TIME_Relative latency,
-                                         uint32_t distance)
+static int
+handle_dv_route_message (void *cls, const struct GNUNET_PeerIdentity *peer,
+                        const struct GNUNET_MessageHeader *message)
 {
 {
-  struct DirectNeighbor *referrer;
-  struct DistantNeighbor *distant;
-  p2p_dv_MESSAGE_Disconnect *enc_message = (p2p_dv_MESSAGE_Disconnect *)message;
+  const struct RouteMessage *rm;
+  const struct GNUNET_MessageHeader *payload;
+  struct Route *route;
 
 
-  if (ntohs (message->size) < sizeof (p2p_dv_MESSAGE_Disconnect))
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Handling DV message\n");
+  if (ntohs (message->size) < sizeof (struct RouteMessage) + sizeof (struct GNUNET_MessageHeader))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  rm = (const struct RouteMessage *) message;
+  payload = (const struct GNUNET_MessageHeader *) &rm[1];
+  if (ntohs (message->size) != sizeof (struct RouteMessage) + ntohs (payload->size))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (0 == memcmp (&rm->target,
+                  &my_identity,
+                  sizeof (struct GNUNET_PeerIdentity)))
+  {
+    /* message is for me, check reverse route! */
+    route = GNUNET_CONTAINER_multipeermap_get (all_routes,
+                                              &rm->sender);
+    if (NULL == route)
     {
     {
-      return GNUNET_SYSERR;     /* invalid message */
+      /* don't have reverse route, drop */
+      GNUNET_STATISTICS_update (stats,
+                               "# message discarded (no reverse route)",
+                               1, GNUNET_NO);
+      return GNUNET_OK;
     }
     }
-
-  referrer = GNUNET_CONTAINER_multihashmap_get (direct_neighbors,
-                                                &peer->hashPubKey);
-  if (referrer == NULL)
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Delivering %u bytes to myself!\n",
+               ntohs (payload->size));
+    send_data_to_plugin (payload,
+                        &rm->sender,
+                        ntohl (route->target.distance));
     return GNUNET_OK;
     return GNUNET_OK;
-
-  distant = referrer->referee_head;
-  while (distant != NULL)
-    {
-      if (distant->referrer_id == ntohl(enc_message->peer_id))
-        {
-          distant_neighbor_free(distant);
-        }
-      distant = referrer->referee_head;
-    }
-
+  }
+  route = GNUNET_CONTAINER_multipeermap_get (all_routes,
+                                            &rm->target);
+  if (NULL == route)
+  {
+    GNUNET_STATISTICS_update (stats,
+                             "# messages discarded (no route)",
+                             1, GNUNET_NO);
+    return GNUNET_OK;
+  }
+  if (ntohl (route->target.distance) > ntohl (rm->distance) + 1)
+  {
+    GNUNET_STATISTICS_update (stats,
+                             "# messages discarded (target too far)",
+                             1, GNUNET_NO);
+    return GNUNET_OK;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Forwarding message to %s\n",
+             GNUNET_i2s (&rm->target));
+  forward_payload (route->next_hop,
+                  ntohl (route->target.distance),
+                  0,
+                  &rm->target,
+                  &rm->sender,
+                  payload);
   return GNUNET_OK;
 }
 
 
 /**
   return GNUNET_OK;
 }
 
 
 /**
- * Core handler for dv gossip messages.  These will be used
- * by us to create a HELLO message for the newly peer containing
- * which direct peer we can connect through, and what the cost
- * is.  This HELLO will then be scheduled for validation by the
- * transport service so that it can be used by all others.
+ * Service server's handler for message send requests (which come
+ * bubbling up to us through the DV plugin).
  *
  * @param cls closure
  *
  * @param cls closure
- * @param peer peer which sent the message (immediate sender)
- * @param message the message
- * @param latency the latency of the connection we received the message from
- * @param distance the distance to the immediate peer
+ * @param client identification of the client
+ * @param message the actual message
  */
  */
-static int handle_dv_gossip_message (void *cls,
-                                     const struct GNUNET_PeerIdentity *peer,
-                                     const struct GNUNET_MessageHeader *message,
-                                     struct GNUNET_TIME_Relative latency,
-                                     uint32_t distance)
+static void
+handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client,
+                        const struct GNUNET_MessageHeader *message)
 {
 {
-  struct DirectNeighbor *referrer;
-  p2p_dv_MESSAGE_NeighborInfo *enc_message = (p2p_dv_MESSAGE_NeighborInfo *)message;
-
-  if (ntohs (message->size) < sizeof (p2p_dv_MESSAGE_NeighborInfo))
-    {
-      return GNUNET_SYSERR;     /* invalid message */
-    }
+  struct Route *route;
+  const struct GNUNET_DV_SendMessage *msg;
+  const struct GNUNET_MessageHeader *payload;
 
 
-#if DEBUG_DV_GOSSIP_RECEIPT
-  char * encPeerAbout;
-  char * encPeerFrom;
-
-  encPeerAbout = GNUNET_strdup(GNUNET_i2s(&enc_message->neighbor));
-  encPeerFrom = GNUNET_strdup(GNUNET_i2s(peer));
+  if (ntohs (message->size) < sizeof (struct GNUNET_DV_SendMessage) + sizeof (struct GNUNET_MessageHeader))
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  msg = (const struct GNUNET_DV_SendMessage *) message;
+  GNUNET_break (0 != ntohl (msg->uid));
+  payload = (const struct GNUNET_MessageHeader *) &msg[1];
+  if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size))
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  route = GNUNET_CONTAINER_multipeermap_get (all_routes,
+                                            &msg->target);
+  if (NULL == route)
+  {
+    /* got disconnected */
+    GNUNET_STATISTICS_update (stats,
+                             "# local messages discarded (no route)",
+                             1, GNUNET_NO);
+    send_ack_to_plugin (&msg->target, ntohl (msg->uid), GNUNET_YES);
+    GNUNET_SERVER_receive_done (client, GNUNET_OK);
+    return;
+  }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Received %s message from peer %s about peer %s id %u distance %d!\n", GNUNET_i2s(&my_identity), "DV GOSSIP", encPeerFrom, encPeerAbout, ntohl(enc_message->neighbor_id), ntohl (enc_message->cost) + 1);
-  GNUNET_free(encPeerAbout);
-  GNUNET_free(encPeerFrom);
-#endif
-
-  referrer = GNUNET_CONTAINER_multihashmap_get (direct_neighbors,
-                                                &peer->hashPubKey);
-  if (referrer == NULL)
-    return GNUNET_OK;
-
-  addUpdateNeighbor (&enc_message->neighbor, &enc_message->pkey,
-                     ntohl (enc_message->neighbor_id),
-                     referrer, ntohl (enc_message->cost) + 1);
-
-  return GNUNET_OK;
+             "Forwarding %u bytes to %s\n",
+             ntohs (payload->size),
+             GNUNET_i2s (&msg->target));
+
+  forward_payload (route->next_hop,
+                  ntohl (route->target.distance),
+                  htonl (msg->uid),
+                  &msg->target,
+                  &my_identity,
+                  payload);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
 /**
 }
 
 
 /**
- * Iterate over all currently known peers, add them to the
- * fast gossip list for this peer so we get DV routing information
- * out as fast as possible!
- *
- * @param cls the direct neighbor we will gossip to
- * @param key the hashcode of the peer
- * @param value the distant neighbor we should add to the list
+ * Cleanup all of the data structures associated with a given neighbor.
  *
  *
- * @return GNUNET_YES to continue iteration, GNUNET_NO otherwise
+ * @param neighbor neighbor to clean up
  */
  */
-static int add_all_extended_peers (void *cls,
-                                   const GNUNET_HashCode * key,
-                                   void *value)
+static void
+cleanup_neighbor (struct DirectNeighbor *neighbor)
 {
 {
-  struct NeighborSendContext *send_context = (struct NeighborSendContext *)cls;
-  struct DistantNeighbor *distant = (struct DistantNeighbor *)value;
-  struct FastGossipNeighborList *gossip_entry;
-
-  if (memcmp(&send_context->toNeighbor->identity, &distant->identity, sizeof(struct GNUNET_PeerIdentity)) == 0)
-    return GNUNET_YES; /* Don't gossip to a peer about itself! */
-
-#if SUPPORT_HIDING
-  if (distant->hidden == GNUNET_YES)
-    return GNUNET_YES; /* This peer should not be gossipped about (hidden) */
-#endif
-  gossip_entry = GNUNET_malloc(sizeof(struct FastGossipNeighborList));
-  gossip_entry->about = distant;
-
-  GNUNET_CONTAINER_DLL_insert_after(send_context->fast_gossip_list_head,
-                                    send_context->fast_gossip_list_tail,
-                                    send_context->fast_gossip_list_tail,
-                                    gossip_entry);
+  struct PendingMessage *pending;
 
 
-  return GNUNET_YES;
+  while (NULL != (pending = neighbor->pm_head))
+  {
+    neighbor->pm_queue_size--;
+    GNUNET_CONTAINER_DLL_remove (neighbor->pm_head,
+                                neighbor->pm_tail,
+                                pending);
+    GNUNET_free (pending);
+  }
+  handle_direct_disconnect (neighbor);
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_remove (direct_neighbors,
+                                                      &neighbor->peer,
+                                                      neighbor));
+  GNUNET_free (neighbor);
 }
 
 }
 
-#if INSANE_GOSSIP
+
 /**
 /**
- * Iterator over hash map entries.
+ * Method called whenever a given peer disconnects.
  *
  * @param cls closure
  *
  * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
+ * @param peer peer identity this notification is about
  */
  */
-static int gossip_all_to_all_iterator (void *cls,
-                                      const GNUNET_HashCode * key,
-                                      void *value)
+static void
+handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
 {
 {
-  struct DirectNeighbor *direct = value;
-
-  GNUNET_CONTAINER_multihashmap_iterate (extended_neighbors, &add_all_extended_peers, direct->send_context);
+  struct DirectNeighbor *neighbor;
 
 
-  if (direct->send_context->task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel(sched, direct->send_context->task);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received core peer disconnect message for peer `%s'!\n",
+             GNUNET_i2s (peer));
+  /* Check for disconnect from self message */
+  if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
+    return;
+  neighbor =
+      GNUNET_CONTAINER_multipeermap_get (direct_neighbors, peer);
+  if (NULL == neighbor)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  GNUNET_break (GNUNET_YES == neighbor->connected);
+  neighbor->connected = GNUNET_NO;
+  if (DIRECT_NEIGHBOR_COST == neighbor->distance)
+  {
 
 
-  direct->send_context->task = GNUNET_SCHEDULER_add_now(sched, &neighbor_send_task, direct->send_context);
-  return GNUNET_YES;
+    GNUNET_STATISTICS_update (stats,
+                             "# peers connected (1-hop)",
+                             -1, GNUNET_NO);
+  }
+  cleanup_neighbor (neighbor);
+  GNUNET_CONTAINER_multipeermap_iterate (direct_neighbors,
+                                        &refresh_routes,
+                                         NULL);
 }
 
 }
 
+
 /**
 /**
- * Task run during shutdown.
+ * Multipeermap iterator for freeing routes.  Should never be called.
  *
  *
- * @param cls unused
- * @param tc unused
+ * @param cls NULL
+ * @param key key value stored under
+ * @param value the route to be freed
+ *
+ * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
  */
  */
-static void
-gossip_all_to_all (void *cls,
-                   const struct GNUNET_SCHEDULER_TaskContext *tc)
+static int
+free_route (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
 {
 {
-  GNUNET_CONTAINER_multihashmap_iterate (direct_neighbors, &gossip_all_to_all_iterator, NULL);
+  struct Route *route = value;
+
+  GNUNET_break (0);
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_remove (all_routes, key, value));
+  release_route (route);
+  send_disconnect_to_plugin (&route->target.peer);
+  GNUNET_free (route);
+  return GNUNET_YES;
+}
 
 
-  GNUNET_SCHEDULER_add_delayed (sched,
-                                GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5),
-                                &gossip_all_to_all,
-                                NULL);
 
 
-}
-#endif
 /**
 /**
- * Iterate over all current direct peers, add newly connected peer
- * to the fast gossip list for that peer so we get DV routing
- * information out as fast as possible!
+ * Multipeermap iterator for freeing direct neighbors. Should never be called.
  *
  *
- * @param cls the newly connected neighbor we will gossip about
- * @param key the hashcode of the peer
- * @param value the direct neighbor we should gossip to
+ * @param cls NULL
+ * @param key key value stored under
+ * @param value the direct neighbor to be freed
  *
  *
- * @return GNUNET_YES to continue iteration, GNUNET_NO otherwise
+ * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
  */
  */
-static int add_all_direct_neighbors (void *cls,
-                                     const GNUNET_HashCode * key,
-                                     void *value)
+static int
+free_direct_neighbors (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
 {
 {
-  struct DirectNeighbor *direct = (struct DirectNeighbor *)value;
-  struct DirectNeighbor *to = (struct DirectNeighbor *)cls;
-  struct DistantNeighbor *distant;
-  struct NeighborSendContext *send_context = direct->send_context;
-  struct FastGossipNeighborList *gossip_entry;
-  char *direct_id;
+  struct DirectNeighbor *neighbor = value;
 
 
-
-  distant = GNUNET_CONTAINER_multihashmap_get(extended_neighbors, &to->identity.hashPubKey);
-  if (distant == NULL)
-    {
-      return GNUNET_YES;
-    }
-
-  if (memcmp(&direct->identity, &to->identity, sizeof(struct GNUNET_PeerIdentity)) == 0)
-    {
-      return GNUNET_YES; /* Don't gossip to a peer about itself! */
-    }
-
-#if SUPPORT_HIDING
-  if (distant->hidden == GNUNET_YES)
-    return GNUNET_YES; /* This peer should not be gossipped about (hidden) */
-#endif
-  direct_id = GNUNET_strdup(GNUNET_i2s(&direct->identity));
-#if DEBUG_DV_GOSSIP
-  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: adding peer %s to fast send list for %s\n", my_short_id, GNUNET_i2s(&distant->identity), direct_id);
-#endif
-  GNUNET_free(direct_id);
-  gossip_entry = GNUNET_malloc(sizeof(struct FastGossipNeighborList));
-  gossip_entry->about = distant;
-
-  GNUNET_CONTAINER_DLL_insert_after(send_context->fast_gossip_list_head,
-                                    send_context->fast_gossip_list_tail,
-                                    send_context->fast_gossip_list_tail,
-                                    gossip_entry);
-  if (send_context->task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel(sched, send_context->task);
-
-  send_context->task = GNUNET_SCHEDULER_add_now(sched, &neighbor_send_task, send_context);
-  //tc.reason = GNUNET_SCHEDULER_REASON_TIMEOUT;
-  //neighbor_send_task(send_context, &tc);
+  GNUNET_break (0);
+  cleanup_neighbor (neighbor);
   return GNUNET_YES;
 }
 
   return GNUNET_YES;
 }
 
+
 /**
 /**
- * Type of an iterator over the hosts.  Note that each
- * host will be called with each available protocol.
+ * Task run during shutdown.
  *
  *
- * @param cls closure
- * @param peer id of the peer, NULL for last call
- * @param hello hello message for the peer (can be NULL)
- * @param trust amount of trust we have in the peer
+ * @param cls unused
+ * @param tc unused
  */
 static void
  */
 static void
-process_peerinfo (void *cls,
-                  const struct GNUNET_PeerIdentity *peer,
-                  const struct GNUNET_HELLO_Message *hello, uint32_t trust)
+shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
 {
-  struct PeerIteratorContext *peerinfo_iterator = cls;
-  struct DirectNeighbor *neighbor = peerinfo_iterator->neighbor;
-  struct DistantNeighbor *distant = peerinfo_iterator->distant;
-#if DEBUG_DV_PEER_NUMBERS
-  char *neighbor_pid;
-#endif
-  int sent;
-
-  if (peer == NULL)
-    {
-      if (distant->pkey == NULL)
-        {
-#if DEBUG_DV
-          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to get peerinfo information for this peer, retrying!\n");
-#endif
-          peerinfo_iterator->ic = GNUNET_PEERINFO_iterate(peerinfo_handle,
-                                                          &peerinfo_iterator->neighbor->identity,
-                                                          0,
-                                                          GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3),
-                                                          &process_peerinfo,
-                                                          peerinfo_iterator);
-        }
-      else
-        {
-          GNUNET_free(peerinfo_iterator);
-        }
-      return;
-    }
+  unsigned int i;
+
+  GNUNET_CORE_disconnect (core_api);
+  core_api = NULL;
+  GNUNET_ATS_performance_done (ats);
+  ats = NULL;
+  GNUNET_CONTAINER_multipeermap_iterate (direct_neighbors,
+                                         &free_direct_neighbors, NULL);
+  GNUNET_CONTAINER_multipeermap_iterate (all_routes,
+                                         &free_route, NULL);
+  GNUNET_CONTAINER_multipeermap_destroy (direct_neighbors);
+  GNUNET_CONTAINER_multipeermap_destroy (all_routes);
+  GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+  stats = NULL;
+  GNUNET_SERVER_notification_context_destroy (nc);
+  nc = NULL;
+  for (i=0;i<DEFAULT_FISHEYE_DEPTH - 1;i++)
+    GNUNET_array_grow (consensi[i].targets,
+                      consensi[i].array_length,
+                      0);
+}
 
 
-  if (memcmp(&neighbor->identity, peer, sizeof(struct GNUNET_PeerIdentity) != 0))
-    return;
 
 
-  if ((hello != NULL) && (GNUNET_HELLO_get_key (hello, &neighbor->pkey) == GNUNET_OK))
-    {
-      if (distant->pkey == NULL)
-        {
-          distant->pkey = GNUNET_malloc(sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
-          memcpy(distant->pkey, &neighbor->pkey, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
-        }
-
-      sent = GNUNET_CONTAINER_multihashmap_iterate (extended_neighbors, &add_all_extended_peers, neighbor->send_context);
-
-#if DEBUG_DV_PEER_NUMBERS
-      neighbor_pid = GNUNET_strdup(GNUNET_i2s(&neighbor->identity));
-      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Gossipped %d extended peers to %s\n", GNUNET_i2s(&my_identity), sent, neighbor_pid);
-#endif
-      sent = GNUNET_CONTAINER_multihashmap_iterate (direct_neighbors, &add_all_direct_neighbors, neighbor);
-#if DEBUG_DV_PEER_NUMBERS
-      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Gossipped about %s to %d direct peers\n", GNUNET_i2s(&my_identity), neighbor_pid, sent);
-      GNUNET_free(neighbor_pid);
-#endif
-      neighbor->send_context->task = GNUNET_SCHEDULER_add_now(sched, &neighbor_send_task, neighbor->send_context);
-    }
+/**
+ * Notify newly connected client about an existing route.
+ *
+ * @param cls the 'struct GNUNET_SERVER_Client'
+ * @param key peer identity
+ * @param value the XXX.
+ * @return GNUNET_OK (continue to iterate)
+ */
+static int
+add_route (void *cls,
+          const struct GNUNET_PeerIdentity *key,
+          void *value)
+{
+  struct GNUNET_SERVER_Client *client = cls;
+  struct Route *route = value;
+  struct GNUNET_DV_ConnectMessage cm;
+
+  cm.header.size = htons (sizeof (cm));
+  cm.header.type = htons (GNUNET_MESSAGE_TYPE_DV_CONNECT);
+  cm.distance = htonl (route->target.distance);
+  cm.peer = route->target.peer;
+
+  GNUNET_SERVER_notification_context_unicast (nc,
+                                             client,
+                                             &cm.header,
+                                             GNUNET_NO);
+  return GNUNET_OK;
 }
 
 
 /**
 }
 
 
 /**
- * Method called whenever a peer connects.
+ * Handle START-message.  This is the first message sent to us
+ * by the client (can only be one!).
  *
  *
- * @param cls closure
- * @param peer peer identity this notification is about
- * @param latency reported latency of the connection with peer
- * @param distance reported distance (DV) to peer
+ * @param cls closure (always NULL)
+ * @param client identification of the client
+ * @param message the actual message
  */
  */
-void handle_core_connect (void *cls,
-                          const struct GNUNET_PeerIdentity * peer,
-                          struct GNUNET_TIME_Relative latency,
-                          uint32_t distance)
+static void
+handle_start (void *cls, struct GNUNET_SERVER_Client *client,
+              const struct GNUNET_MessageHeader *message)
 {
 {
-  struct DirectNeighbor *neighbor;
-  struct DistantNeighbor *about;
-  struct PeerIteratorContext *peerinfo_iterator;
-  int sent;
-#if DEBUG_DV
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Receives core connect message for peer %s distance %d!\n", "dv", GNUNET_i2s(peer), distance);
-#endif
-
-  if ((distance == DIRECT_NEIGHBOR_COST) && (GNUNET_CONTAINER_multihashmap_get(direct_neighbors, &peer->hashPubKey) == NULL))
-  {
-    peerinfo_iterator = GNUNET_malloc(sizeof(struct PeerIteratorContext));
-    neighbor = GNUNET_malloc (sizeof (struct DirectNeighbor));
-    neighbor->send_context = GNUNET_malloc(sizeof(struct NeighborSendContext));
-    neighbor->send_context->toNeighbor = neighbor;
-    memcpy (&neighbor->identity, peer, sizeof (struct GNUNET_PeerIdentity));
-
-    GNUNET_assert(GNUNET_SYSERR != GNUNET_CONTAINER_multihashmap_put (direct_neighbors,
-                               &peer->hashPubKey,
-                               neighbor, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-    about = addUpdateNeighbor (peer, NULL, 0, neighbor, DIRECT_NEIGHBOR_COST);
-    peerinfo_iterator->distant = about;
-    peerinfo_iterator->neighbor = neighbor;
-    peerinfo_iterator->ic = GNUNET_PEERINFO_iterate (peerinfo_handle,
-                                                     peer,
-                                                     0,
-                                                     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3),
-                                                     &process_peerinfo,
-                                                     peerinfo_iterator);
-
-    if ((about != NULL) && (about->pkey == NULL))
-      {
-#if DEBUG_DV
-        GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Newly added peer %s has NULL pkey!\n", GNUNET_i2s(peer));
-#endif
-      }
-    else if (about != NULL)
-      {
-        GNUNET_free(peerinfo_iterator);
-      }
-  }
-  else
-  {
-    about = GNUNET_CONTAINER_multihashmap_get(extended_neighbors, &peer->hashPubKey);
-    if ((GNUNET_CONTAINER_multihashmap_get(direct_neighbors, &peer->hashPubKey) == NULL) && (about != NULL))
-      sent = GNUNET_CONTAINER_multihashmap_iterate(direct_neighbors, &add_distant_all_direct_neighbors, about);
-#if DEBUG_DV
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%s: Distance (%d) greater than %d or already know about peer (%s), not re-adding!\n", "dv", distance, DIRECT_NEIGHBOR_COST, GNUNET_i2s(peer));
-#endif
-    return;
-  }
+  GNUNET_SERVER_notification_context_add (nc, client);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_CONTAINER_multipeermap_iterate (all_routes,
+                                        &add_route,
+                                        client);
 }
 
 }
 
+
 /**
 /**
- * Method called whenever a given peer disconnects.
+ * Called on core init.
  *
  *
- * @param cls closure
- * @param peer peer identity this notification is about
+ * @param cls unused
+ * @param identity this peer's identity
  */
  */
-void handle_core_disconnect (void *cls,
-                             const struct GNUNET_PeerIdentity * peer)
+static void
+core_init (void *cls,
+           const struct GNUNET_PeerIdentity *identity)
 {
 {
-  struct DirectNeighbor *neighbor;
-  struct DistantNeighbor *referee;
-  struct FindDestinationContext fdc;
-  struct DisconnectContext disconnect_context;
-
-#if DEBUG_DV
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Receives core peer disconnect message!\n", "dv");
-#endif
-
-  neighbor =
-    GNUNET_CONTAINER_multihashmap_get (direct_neighbors, &peer->hashPubKey);
-  if (neighbor == NULL)
-    {
-      return;
-    }
-  while (NULL != (referee = neighbor->referee_head))
-    distant_neighbor_free (referee);
-
-  fdc.dest = NULL;
-  fdc.tid = 0;
-
-  GNUNET_CONTAINER_multihashmap_iterate (extended_neighbors, &find_distant_peer, &fdc);
-
-  if (fdc.dest != NULL)
-    {
-      disconnect_context.direct = neighbor;
-      disconnect_context.distant = fdc.dest;
-      GNUNET_CONTAINER_multihashmap_iterate (direct_neighbors, &schedule_disconnect_messages, &disconnect_context);
-    }
-
-  GNUNET_assert (neighbor->referee_tail == NULL);
-  if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (direct_neighbors,
-                                        &peer->hashPubKey, neighbor))
-    {
-      GNUNET_break(0);
-    }
-  if ((neighbor->send_context != NULL) && (neighbor->send_context->task != GNUNET_SCHEDULER_NO_TASK))
-    GNUNET_SCHEDULER_cancel(sched, neighbor->send_context->task);
-  GNUNET_free (neighbor);
+              "I am peer: %s\n",
+              GNUNET_i2s (identity));
+  my_identity = *identity;
 }
 
 
 }
 
 
@@ -2895,85 +1935,52 @@ void handle_core_disconnect (void *cls,
  * Process dv requests.
  *
  * @param cls closure
  * Process dv requests.
  *
  * @param cls closure
- * @param scheduler scheduler to use
  * @param server the initialized server
  * @param c configuration to use
  */
 static void
  * @param server the initialized server
  * @param c configuration to use
  */
 static void
-run (void *cls,
-     struct GNUNET_SCHEDULER_Handle *scheduler,
-     struct GNUNET_SERVER_Handle *server,
+run (void *cls, struct GNUNET_SERVER_Handle *server,
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
-  unsigned long long max_hosts;
-  sched = scheduler;
-  cfg = c;
-
-  /* FIXME: Read from config, or calculate, or something other than this! */
-  max_hosts = DEFAULT_DIRECT_CONNECTIONS;
-  max_table_size = DEFAULT_DV_SIZE;
-  fisheye_depth = DEFAULT_FISHEYE_DEPTH;
-
-  if (GNUNET_CONFIGURATION_have_value(cfg, "dv", "max_direct_connections"))
-    GNUNET_assert(GNUNET_OK == GNUNET_CONFIGURATION_get_value_number(cfg, "dv", "max_direct_connections", &max_hosts));
-
-  if (GNUNET_CONFIGURATION_have_value(cfg, "dv", "max_total_connections"))
-    GNUNET_assert(GNUNET_OK == GNUNET_CONFIGURATION_get_value_number(cfg, "dv", "max_total_connections", &max_table_size));
-
-
-  if (GNUNET_CONFIGURATION_have_value(cfg, "dv", "fisheye_depth"))
-    GNUNET_assert(GNUNET_OK == GNUNET_CONFIGURATION_get_value_number(cfg, "dv", "fisheye_depth", &fisheye_depth));
-
-  neighbor_min_heap =
-    GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
-  neighbor_max_heap =
-    GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
-
-  direct_neighbors = GNUNET_CONTAINER_multihashmap_create (max_hosts);
-  extended_neighbors =
-    GNUNET_CONTAINER_multihashmap_create (max_table_size * 3);
+  static struct GNUNET_CORE_MessageHandler core_handlers[] = {
+    {&handle_dv_route_message, GNUNET_MESSAGE_TYPE_DV_ROUTE, 0},
+    {NULL, 0, 0}
+  };
+  static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
+    {&handle_start, NULL,
+     GNUNET_MESSAGE_TYPE_DV_START,
+     sizeof (struct GNUNET_MessageHeader) },
+    { &handle_dv_send_message, NULL,
+      GNUNET_MESSAGE_TYPE_DV_SEND,
+      0},
+    {NULL, NULL, 0, 0}
+  };
 
 
-  GNUNET_SERVER_add_handlers (server, plugin_handlers);
-  coreAPI =
-  GNUNET_CORE_connect (sched,
-                       cfg,
-                       GNUNET_TIME_relative_get_forever(),
-                       NULL, /* FIXME: anything we want to pass around? */
-                       &core_init,
-                       &handle_core_connect,
-                       &handle_core_disconnect,
-                       NULL,
-                       GNUNET_NO,
-                       NULL,
-                       GNUNET_NO,
-                       core_handlers);
-
-  if (coreAPI == NULL)
+  cfg = c;
+  direct_neighbors = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO);
+  all_routes = GNUNET_CONTAINER_multipeermap_create (65536, GNUNET_NO);
+  core_api = GNUNET_CORE_connect (cfg, NULL,
+                                 &core_init,
+                                 &handle_core_connect,
+                                 &handle_core_disconnect,
+                                 NULL, GNUNET_NO,
+                                 NULL, GNUNET_NO,
+                                 core_handlers);
+
+  if (NULL == core_api)
     return;
     return;
-
-  coreMST = GNUNET_SERVER_mst_create (GNUNET_SERVER_MAX_MESSAGE_SIZE,
-                                      &tokenized_message_handler,
-                                      NULL);
-
-   peerinfo_handle = GNUNET_PEERINFO_connect(sched, cfg);
-
-   if (peerinfo_handle == NULL)
-     {
-       GNUNET_CORE_disconnect(coreAPI);
-       return;
-     }
-
-  /* Scheduled the task to clean up when shutdown is called */
-  cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
-                                GNUNET_TIME_UNIT_FOREVER_REL,
-                                &shutdown_task,
-                                NULL);
-#if INSANE_GOSSIP
-  GNUNET_SCHEDULER_add_delayed (sched,
-                                GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5),
-                                &gossip_all_to_all,
-                                NULL);
-#endif
+  ats = GNUNET_ATS_performance_init (cfg, &handle_ats_update, NULL);
+  if (NULL == ats)
+  {
+    GNUNET_CORE_disconnect (core_api);
+    return;
+  }
+  nc = GNUNET_SERVER_notification_context_create (server,
+                                                 MAX_QUEUE_SIZE_PLUGIN);
+  stats = GNUNET_STATISTICS_create ("dv", cfg);
+  GNUNET_SERVER_add_handlers (server, plugin_handlers);
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+                               &shutdown_task, NULL);
 }
 
 
 }
 
 
@@ -2988,9 +1995,8 @@ int
 main (int argc, char *const *argv)
 {
   return (GNUNET_OK ==
 main (int argc, char *const *argv)
 {
   return (GNUNET_OK ==
-          GNUNET_SERVICE_run (argc,
-                              argv,
-                              "dv",
-                              GNUNET_SERVICE_OPTION_NONE,
+          GNUNET_SERVICE_run (argc, argv, "dv", GNUNET_SERVICE_OPTION_NONE,
                               &run, NULL)) ? 0 : 1;
 }
                               &run, NULL)) ? 0 : 1;
 }
+
+/* end of gnunet-service-dv.c */