fixes
[oweals/gnunet.git] / src / dv / gnunet-service-dv.c
index c9dbbb5969d702024115a118059d7269b038623a..844e44b6ffb4573962a6cdb5b9920bb3655c669b 100644 (file)
@@ -4,7 +4,7 @@
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
  * @author Christian Grothoff
  * @author Nathan Evans
  *
  * @author Christian Grothoff
  * @author Nathan Evans
  *
- * TODO: Currently the final hop of a DV message assigns a 0 to the receiver
- * id field.  This probably can't work(!) even if we know that the peer is
- * a direct neighbor (unless we can trust that transport will choose that
- * address for the peer).  So the DV message will likely need to have the
- * peer identity of the recipient.
- *
- * Also the gossip rates need to be worked out.  Probably many other things
- * as well.
- *
  */
 #include "platform.h"
 #include "gnunet_client_lib.h"
  */
 #include "platform.h"
 #include "gnunet_client_lib.h"
 #include "gnunet_hello_lib.h"
 #include "gnunet_peerinfo_service.h"
 #include "gnunet_crypto_lib.h"
 #include "gnunet_hello_lib.h"
 #include "gnunet_peerinfo_service.h"
 #include "gnunet_crypto_lib.h"
+#include "gnunet_statistics_service.h"
 #include "dv.h"
 
 /**
 #include "dv.h"
 
 /**
- * DV Service Context stuff goes here...
- */
-
-/**
- * Handle to the core service api.
- */
-static struct GNUNET_CORE_Handle *coreAPI;
-
-/**
- * The identity of our peer.
+ * For testing mostly, remember only the
+ * shortest path to a distant neighbor.
  */
  */
-static struct GNUNET_PeerIdentity my_identity;
+#define AT_MOST_ONE GNUNET_NO
 
 
-/**
- * The configuration for this service.
- */
-static const struct GNUNET_CONFIGURATION_Handle *cfg;
+#define USE_PEER_ID GNUNET_YES
 
 /**
 
 /**
- * The scheduler for this service.
+ * How many outstanding messages (unknown sender) will we allow per peer?
  */
  */
-static struct GNUNET_SCHEDULER_Handle *sched;
+#define MAX_OUTSTANDING_MESSAGES 5
 
 /**
  * How often do we check about sending out more peer information (if
  * we are connected to no peers previously).
  */
 
 /**
  * How often do we check about sending out more peer information (if
  * we are connected to no peers previously).
  */
-#define GNUNET_DV_DEFAULT_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 500))
+#define GNUNET_DV_DEFAULT_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 500000)
 
 /**
  * How long do we wait at most between sending out information?
  */
 
 /**
  * How long do we wait at most between sending out information?
  */
-#define GNUNET_DV_MAX_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5))
+#define GNUNET_DV_MAX_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 500000)
 
 /**
  * How long can we have not heard from a peer and
 
 /**
  * How long can we have not heard from a peer and
@@ -114,33 +95,25 @@ static struct GNUNET_SCHEDULER_Handle *sched;
 #define DV_PRIORITY 0
 
 /**
 #define DV_PRIORITY 0
 
 /**
- * The client, should be the DV plugin connected to us.  Hopefully
- * this client will never change, although if the plugin dies
- * and returns for some reason it may happen.
+ * The cost to a direct neighbor.  We used to use 0, but 1 makes more sense.
  */
  */
-static struct GNUNET_SERVER_Client * client_handle;
+#define DIRECT_NEIGHBOR_COST 1
 
 /**
 
 /**
- * Task to run when we shut down, cleaning up all our trash
+ * The default number of direct connections to store in DV (max)
  */
  */
-GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
+#define DEFAULT_DIRECT_CONNECTIONS 50
 
 /**
 
 /**
- * Task to run to gossip about peers.  Will reschedule itself forever until shutdown!
+ * The default size of direct + extended peers in DV (max)
  */
  */
-GNUNET_SCHEDULER_TaskIdentifier gossip_task;
+#define DEFAULT_DV_SIZE 100
 
 /**
 
 /**
- * Struct where neighbor information is stored.
+ * The default fisheye depth, from how many hops away will
+ * we keep peers?
  */
  */
-struct DistantNeighbor *referees;
-
-static struct GNUNET_TIME_Relative client_transmit_timeout;
-
-static struct GNUNET_TIME_Relative default_dv_delay;
-
-static size_t default_dv_priority = 0;
-
+#define DEFAULT_FISHEYE_DEPTH 4
 
 /**
  * Linked list of messages to send to clients.
 
 /**
  * Linked list of messages to send to clients.
@@ -158,45 +131,54 @@ struct PendingMessage
   struct PendingMessage *prev;
 
   /**
   struct PendingMessage *prev;
 
   /**
-   * Actual message to be sent; // avoid allocation
+   * The PeerIdentity to send to
    */
    */
-  const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
-
-};
-
-/**
- * Transmit handle to the plugin.
- */
-struct GNUNET_CONNECTION_TransmitHandle * plugin_transmit_handle;
+  struct GNUNET_PeerIdentity recipient;
 
 
-/**
- * Head of DLL for client messages
- */
-struct PendingMessage *plugin_pending_head;
+  /**
+   * The result of message sending.
  */
+  struct GNUNET_DV_SendResultMessage *send_result;
 
 
-/**
- * Tail of DLL for client messages
- */
-struct PendingMessage *plugin_pending_tail;
+  /**
+   * Message importance level.
  */
+  unsigned int importance;
 
 
+  /**
+   * Size of message.
+   */
+  unsigned int msg_size;
 
 
-/**
* Transmit handle to core service.
- */
-struct GNUNET_CORE_TransmitHandle * core_transmit_handle;
+  /**
  * How long to wait before sending message.
  */
+  struct GNUNET_TIME_Relative timeout;
 
 
-/**
- * Head of DLL for core messages
- */
-struct PendingMessage *core_pending_head;
+  /**
+   * Actual message to be sent; // avoid allocation
  */
+  const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
 
 
-/**
- * Tail of DLL for core messages
- */
-struct PendingMessage *core_pending_tail;
+};
 
 
+struct FastGossipNeighborList
+{
+  /**
+   * Next element of DLL
+   */
+  struct FastGossipNeighborList *next;
 
 
+  /**
+   * Prev element of DLL
+   */
+  struct FastGossipNeighborList *prev;
 
 
+  /**
+   * The neighbor to gossip about
+   */
+  struct DistantNeighbor *about;
+};
 
 /**
  * Context created whenever a direct peer connects to us,
 
 /**
  * Context created whenever a direct peer connects to us,
@@ -210,14 +192,23 @@ struct NeighborSendContext
   struct DirectNeighbor *toNeighbor;
 
   /**
   struct DirectNeighbor *toNeighbor;
 
   /**
-   * The timeout for this task.
+   * The task associated with this context.
    */
    */
-  struct GNUNET_TIME_Relative timeout;
+  GNUNET_SCHEDULER_TaskIdentifier task;
 
   /**
 
   /**
-   * The task associated with this context.
+   * Head of DLL of peers to gossip about
+   * as fast as possible to this peer, for initial
+   * set up.
    */
    */
-  GNUNET_SCHEDULER_TaskIdentifier task;
+  struct FastGossipNeighborList *fast_gossip_list_head;
+
+  /**
+   * Tail of DLL of peers to gossip about
+   * as fast as possible to this peer, for initial
+   * set up.
+   */
+  struct FastGossipNeighborList *fast_gossip_list_tail;
 
 };
 
 
 };
 
@@ -246,6 +237,44 @@ struct NeighborUpdateInfo
    * The time we heard about this peer
    */
   struct GNUNET_TIME_Absolute now;
    * The time we heard about this peer
    */
   struct GNUNET_TIME_Absolute now;
+
+  /**
+   * Peer id this peer uses to refer to neighbor.
+   */
+  unsigned int referrer_peer_id;
+
+};
+
+/**
+ * Struct to store a single message received with
+ * an unknown sender.
+ */
+struct UnknownSenderMessage
+{
+  /**
+   * Message sender (immediate)
+   */
+  struct GNUNET_PeerIdentity sender;
+
+  /**
+   * The actual message received
+   */
+  struct GNUNET_MessageHeader *message;
+
+  /**
+   * Latency of connection
+   */
+  struct GNUNET_TIME_Relative latency;
+
+  /**
+   * Distance to destination
+   */
+  uint32_t distance;
+
+  /**
+   * Unknown sender id
+   */
+  uint32_t sender_id;
 };
 
 /**
 };
 
 /**
@@ -285,6 +314,13 @@ struct DirectNeighbor
    * from DV?
    */
   int hidden;
    * from DV?
    */
   int hidden;
+
+  /**
+   * Save messages immediately from this direct neighbor from a
+   * distan peer we don't know on the chance that it will be
+   * gossiped about and we can deliver the message.
+   */
+  struct UnknownSenderMessage pending_messages[MAX_OUTSTANDING_MESSAGES];
 };
 
 
 };
 
 
@@ -335,6 +371,11 @@ struct DistantNeighbor
    */
   struct GNUNET_TIME_Absolute last_activity;
 
    */
   struct GNUNET_TIME_Absolute last_activity;
 
+  /**
+   * Last time we sent routing information about this peer
+   */
+  struct GNUNET_TIME_Absolute last_gossip;
+
   /**
    * Cost to neighbor, used for actual distance vector computations
    */
   /**
    * Cost to neighbor, used for actual distance vector computations
    */
@@ -371,6 +412,11 @@ struct PeerIteratorContext
    */
   struct DirectNeighbor *neighbor;
 
    */
   struct DirectNeighbor *neighbor;
 
+  /**
+   * The distant neighbor entry for this direct neighbor.
+   */
+  struct DistantNeighbor *distant;
+
 };
 
 /**
 };
 
 /**
@@ -413,6 +459,12 @@ struct DV_SendContext
    */
   struct GNUNET_MessageHeader *message;
 
    */
   struct GNUNET_MessageHeader *message;
 
+  /**
+   * The pre-built send result message.  Simply needs to be queued
+   * and freed once send has been called!
+   */
+  struct GNUNET_DV_SendResultMessage *send_result;
+
   /**
    * The size of the message being sent, may be larger
    * than message->header.size because it's multiple
   /**
    * The size of the message being sent, may be larger
    * than message->header.size because it's multiple
@@ -429,58 +481,180 @@ struct DV_SendContext
    * Timeout for this message
    */
   struct GNUNET_TIME_Relative timeout;
    * Timeout for this message
    */
   struct GNUNET_TIME_Relative timeout;
+
+  /**
+   * Unique ID for DV message
+   */
+  unsigned int uid;
 };
 
 };
 
-/**
- * Global construct
- */
-struct GNUNET_DV_Context
+struct FindDestinationContext
+{
+  unsigned int tid;
+  struct DistantNeighbor *dest;
+};
+
+struct FindIDContext
+{
+  unsigned int tid;
+  struct GNUNET_PeerIdentity *dest;
+  const struct GNUNET_PeerIdentity *via;
+};
+
+struct DisconnectContext
 {
   /**
 {
   /**
-   * Map of PeerIdentifiers to 'struct GNUNET_dv_neighbor*'s for all
-   * directly connected peers.
+   * Distant neighbor to get pid from.
    */
    */
-  struct GNUNET_CONTAINER_MultiHashMap *direct_neighbors;
+  struct DistantNeighbor *distant;
 
   /**
 
   /**
-   * Map of PeerIdentifiers to 'struct GNUNET_dv_neighbor*'s for
-   * peers connected via DV (extended neighborhood).  Does ALSO
-   * include any peers that are in 'direct_neighbors'; for those
-   * peers, the cost will be zero and the referrer all zeros.
+   * Direct neighbor that disconnected.
    */
    */
-  struct GNUNET_CONTAINER_MultiHashMap *extended_neighbors;
+  struct DirectNeighbor *direct;
+};
 
 
+struct TokenizedMessageContext
+{
   /**
   /**
-   * We use the min heap (min refers to cost) to prefer
-   * gossipping about peers with small costs.
+   * Immediate sender of this message
    */
    */
-  struct GNUNET_CONTAINER_Heap *neighbor_min_heap;
+  const struct GNUNET_PeerIdentity *peer;
 
   /**
 
   /**
-   * We use the max heap (max refers to cost) for general
-   * iterations over all peers and to remove the most costly
-   * connection if we have too many.
+   * Distant sender of the message
    */
    */
-  struct GNUNET_CONTAINER_Heap *neighbor_max_heap;
+  struct DistantNeighbor *distant;
 
 
-  unsigned long long fisheye_depth;
+  /**
+   * Uid for this set of messages
+   */
+  uint32_t uid;
+};
 
 
-  unsigned long long max_table_size;
+/**
+ * Context for finding the least cost peer to send to.
+ * Transport selection can only go so far.
+ */
+struct FindLeastCostContext
+{
+  struct DistantNeighbor *target;
+  unsigned int least_cost;
+};
 
 
-  unsigned int neighbor_id_loc;
+/**
+ * Handle to the core service api.
+ */
+static struct GNUNET_CORE_Handle *coreAPI;
 
 
-  int closing;
+/**
+ * Stream tokenizer to handle messages coming in from core.
+ */
+static struct GNUNET_SERVER_MessageStreamTokenizer *coreMST;
 
 
-};
+/**
+ * The identity of our peer.
+ */
+static struct GNUNET_PeerIdentity my_identity;
 
 
-static struct GNUNET_DV_Context ctx;
+/**
+ * The configuration for this service.
+ */
+static const struct GNUNET_CONFIGURATION_Handle *cfg;
 
 
-struct FindDestinationContext
-{
-  unsigned int tid;
-  struct DistantNeighbor *dest;
-};
 
 
+/**
+ * The client, the DV plugin connected to us.  Hopefully
+ * this client will never change, although if the plugin dies
+ * and returns for some reason it may happen.
+ */
+static struct GNUNET_SERVER_Client * client_handle;
+
+/**
+ * Task to run when we shut down, cleaning up all our trash
+ */
+static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
+
+static size_t default_dv_priority = 0;
+
+static char *my_short_id;
+
+/**
+ * Transmit handle to the plugin.
+ */
+static struct GNUNET_CONNECTION_TransmitHandle * plugin_transmit_handle;
+
+/**
+ * Head of DLL for client messages
+ */
+static struct PendingMessage *plugin_pending_head;
+
+/**
+ * Tail of DLL for client messages
+ */
+static struct PendingMessage *plugin_pending_tail;
+
+/**
+ * Handle to the peerinfo service
+ */
+static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
+
+/**
+ * Transmit handle to core service.
+ */
+static struct GNUNET_CORE_TransmitHandle * core_transmit_handle;
+
+/**
+ * Head of DLL for core messages
+ */
+static struct PendingMessage *core_pending_head;
+
+/**
+ * Tail of DLL for core messages
+ */
+static struct PendingMessage *core_pending_tail;
+
+/**
+ * Map of PeerIdentifiers to 'struct GNUNET_dv_neighbor*'s for all
+ * directly connected peers.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *direct_neighbors;
+
+/**
+ * Map of PeerIdentifiers to 'struct GNUNET_dv_neighbor*'s for
+ * peers connected via DV (extended neighborhood).  Does ALSO
+ * include any peers that are in 'direct_neighbors'; for those
+ * peers, the cost will be zero and the referrer all zeros.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *extended_neighbors;
+
+/**
+ * We use the min heap (min refers to cost) to prefer
+ * gossipping about peers with small costs.
+ */
+static struct GNUNET_CONTAINER_Heap *neighbor_min_heap;
+
+/**
+ * We use the max heap (max refers to cost) for general
+ * iterations over all peers and to remove the most costly
+ * connection if we have too many.
+ */
+static struct GNUNET_CONTAINER_Heap *neighbor_max_heap;
+
+/**
+ * Handle for the statistics service.
+ */
+struct GNUNET_STATISTICS_Handle *stats;
+
+/**
+ * How far out to keep peers we learn about.
+ */
+static unsigned long long fisheye_depth;
+
+/**
+ * How many peers to store at most.
+ */
+static unsigned long long max_table_size;
 
 /**
  * We've been given a target ID based on the random numbers that
 
 /**
  * We've been given a target ID based on the random numbers that
@@ -501,6 +675,50 @@ find_destination (void *cls,
   return GNUNET_NO;
 }
 
   return GNUNET_NO;
 }
 
+
+/**
+ * We've been given a target ID based on the random numbers that
+ * we assigned to our DV-neighborhood.  Find the entry for the
+ * respective neighbor.
+ */
+static int
+find_specific_id (void *cls,
+                  const GNUNET_HashCode *key,
+                  void *value)
+{
+  struct FindIDContext *fdc = cls;
+  struct DistantNeighbor *dn = value;
+
+  if (memcmp(&dn->referrer->identity, fdc->via, sizeof(struct GNUNET_PeerIdentity)) == 0)
+    {
+      fdc->tid = dn->referrer_id;
+      return GNUNET_NO;
+    }
+  return GNUNET_YES;
+}
+
+/**
+ * Find a distant peer whose referrer_id matches what we're
+ * looking for.  For looking up a peer we've gossipped about
+ * but is now disconnected.  Need to do this because we don't
+ * want to remove those that may be accessible via a different
+ * route.
+ */
+static int find_distant_peer (void *cls,
+                              const GNUNET_HashCode * key,
+                              void *value)
+{
+  struct FindDestinationContext *fdc = cls;
+  struct DistantNeighbor *distant = value;
+
+  if (fdc->tid == distant->referrer_id)
+    {
+      fdc->dest = distant;
+      return GNUNET_NO;
+    }
+  return GNUNET_YES;
+}
+
 /**
  * Function called to notify a client about the socket
  * begin ready to queue more data.  "buf" will be
 /**
  * Function called to notify a client about the socket
  * begin ready to queue more data.  "buf" will be
@@ -523,8 +741,8 @@ size_t transmit_to_plugin (void *cls,
   if (buf == NULL)
     {
       /* client disconnected */
   if (buf == NULL)
     {
       /* client disconnected */
-#if DEBUG_DV
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': buffer was NULL\n", "DHT");
+#if DEBUG_DV_MESSAGES
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: %s buffer was NULL (client disconnect?)\n", my_short_id, "transmit_to_plugin");
 #endif
       return 0;
     }
 #endif
       return 0;
     }
@@ -533,9 +751,6 @@ size_t transmit_to_plugin (void *cls,
   while ( (NULL != (reply = plugin_pending_head)) &&
           (size >= off + (msize = ntohs (reply->msg->size))))
     {
   while ( (NULL != (reply = plugin_pending_head)) &&
           (size >= off + (msize = ntohs (reply->msg->size))))
     {
-#if DEBUG_DV
-    GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s' : transmit_notify (plugin) called with size %d\n", "dv service", msize);
-#endif
       GNUNET_CONTAINER_DLL_remove (plugin_pending_head,
                                    plugin_pending_tail,
                                    reply);
       GNUNET_CONTAINER_DLL_remove (plugin_pending_head,
                                    plugin_pending_tail,
                                    reply);
@@ -553,7 +768,16 @@ size_t transmit_to_plugin (void *cls,
   return off;
 }
 
   return off;
 }
 
-
+/**
+ * Send a message to the dv plugin.
+ *
+ * @param sender the direct sender of the message
+ * @param message the message to send to the plugin
+ *        (may be an encapsulated type)
+ * @param message_size the size of the message to be sent
+ * @param distant_neighbor the original sender of the message
+ * @param cost the cost to the original sender of the message
+ */
 void send_to_plugin(const struct GNUNET_PeerIdentity * sender,
                     const struct GNUNET_MessageHeader *message,
                     size_t message_size,
 void send_to_plugin(const struct GNUNET_PeerIdentity * sender,
                     const struct GNUNET_MessageHeader *message,
                     size_t message_size,
@@ -562,11 +786,6 @@ void send_to_plugin(const struct GNUNET_PeerIdentity * sender,
 {
   struct GNUNET_DV_MessageReceived *received_msg;
   struct PendingMessage *pending_message;
 {
   struct GNUNET_DV_MessageReceived *received_msg;
   struct PendingMessage *pending_message;
-#if DEBUG_DV
-  struct GNUNET_MessageHeader * packed_message_header;
-  struct GNUNET_HELLO_Message *hello_msg;
-  struct GNUNET_PeerIdentity hello_identity;
-#endif
   char *sender_address;
   size_t sender_address_len;
   char *packed_msg_start;
   char *sender_address;
   size_t sender_address_len;
   char *packed_msg_start;
@@ -594,9 +813,8 @@ void send_to_plugin(const struct GNUNET_PeerIdentity * sender,
   received_msg = GNUNET_malloc(size);
   received_msg->header.size = htons(size);
   received_msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE);
   received_msg = GNUNET_malloc(size);
   received_msg->header.size = htons(size);
   received_msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE);
-  received_msg->sender_address_len = htons(sender_address_len);
   received_msg->distance = htonl(cost);
   received_msg->distance = htonl(cost);
-  received_msg->msg_len = htons(message_size);
+  received_msg->msg_len = htonl(message_size);
   /* Set the sender in this message to be the original sender! */
   memcpy(&received_msg->sender, distant_neighbor, sizeof(struct GNUNET_PeerIdentity));
   /* Copy the intermediate sender to the end of the message, this is how the transport identifies this peer */
   /* Set the sender in this message to be the original sender! */
   memcpy(&received_msg->sender, distant_neighbor, sizeof(struct GNUNET_PeerIdentity));
   /* Copy the intermediate sender to the end of the message, this is how the transport identifies this peer */
@@ -606,17 +824,6 @@ void send_to_plugin(const struct GNUNET_PeerIdentity * sender,
   packed_msg_start = (char *)&received_msg[1];
   packed_msg_start = &packed_msg_start[sender_address_len];
   memcpy(packed_msg_start, message, message_size);
   packed_msg_start = (char *)&received_msg[1];
   packed_msg_start = &packed_msg_start[sender_address_len];
   memcpy(packed_msg_start, message, message_size);
-#if DEBUG_DV
-  packed_message_header = (struct GNUNET_MessageHeader *)packed_msg_start;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dv service created received message. sender_address_len %lu, packed message len %d, total len %d\n", sender_address_len, ntohs(received_msg->msg_len), ntohs(received_msg->header.size));
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dv packed message len %d, type %d\n", ntohs(packed_message_header->size), ntohs(packed_message_header->type));
-  if (ntohs(packed_message_header->type) == GNUNET_MESSAGE_TYPE_HELLO)
-  {
-    hello_msg = (struct GNUNET_HELLO_Message *)packed_message_header;
-    GNUNET_assert(GNUNET_OK == GNUNET_HELLO_get_id(hello_msg, &hello_identity));
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Packed HELLO message is about peer %s\n", GNUNET_i2s(&hello_identity));
-  }
-#endif
   pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + size);
   pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
   memcpy(&pending_message[1], received_msg, size);
   pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + size);
   pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
   memcpy(&pending_message[1], received_msg, size);
@@ -632,21 +839,48 @@ void send_to_plugin(const struct GNUNET_PeerIdentity * sender,
                                                                         size, GNUNET_TIME_UNIT_FOREVER_REL,
                                                                         &transmit_to_plugin, NULL);
         }
                                                                         size, GNUNET_TIME_UNIT_FOREVER_REL,
                                                                         &transmit_to_plugin, NULL);
         }
-      else
-        {
-          GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to queue message for plugin, must be one in progress already!!\n");
-        }
     }
     }
+  else
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to queue message for plugin, client_handle not yet set (how?)!\n");
+    }
+}
+
+/* Declare here so retry_core_send is aware of it */
+size_t core_transmit_notify (void *cls,
+                             size_t size, void *buf);
+
+/**
+ *  Try to send another message from our core sending list
+ */
+static void
+try_core_send (void *cls,
+               const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct PendingMessage *pending;
+  pending = core_pending_head;
+
+  if (core_transmit_handle != NULL)
+    return; /* Message send already in progress */
+
+  if ((pending != NULL) && (coreAPI != NULL))
+    core_transmit_handle = GNUNET_CORE_notify_transmit_ready (coreAPI,
+                                                             GNUNET_YES,
+                                                             pending->importance,
+                                                             pending->timeout, 
+                                                             &pending->recipient,
+                                                             pending->msg_size,
+                                                             &core_transmit_notify, NULL);
 }
 
 
 /**
  * Function called to notify a client about the socket
 }
 
 
 /**
  * Function called to notify a client about the socket
- * begin ready to queue more data.  "buf" will be
+ * being ready to queue more data.  "buf" will be
  * NULL and "size" zero if the socket was closed for
  * writing in the meantime.
  *
  * NULL and "size" zero if the socket was closed for
  * writing in the meantime.
  *
- * @param cls closure
+ * @param cls closure (NULL)
  * @param size number of bytes available in buf
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
  * @param size number of bytes available in buf
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
@@ -655,7 +889,8 @@ size_t core_transmit_notify (void *cls,
                              size_t size, void *buf)
 {
   char *cbuf = buf;
                              size_t size, void *buf)
 {
   char *cbuf = buf;
-  struct PendingMessage *reply;
+  struct PendingMessage *pending;
+  struct PendingMessage *client_reply;
   size_t off;
   size_t msize;
 
   size_t off;
   size_t msize;
 
@@ -670,19 +905,49 @@ size_t core_transmit_notify (void *cls,
 
   core_transmit_handle = NULL;
   off = 0;
 
   core_transmit_handle = NULL;
   off = 0;
-  while ( (NULL != (reply = core_pending_head)) &&
-          (size >= off + (msize = ntohs (reply->msg->size))))
+  pending = core_pending_head;
+  if ( (pending != NULL) &&
+          (size >= (msize = ntohs (pending->msg->size))))
     {
 #if DEBUG_DV
     {
 #if DEBUG_DV
-    GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s' : transmit_notify (core) called with size %d\n", "dv service", msize);
+      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s' : transmit_notify (core) called with size %d\n", "dv service", msize);
 #endif
       GNUNET_CONTAINER_DLL_remove (core_pending_head,
                                    core_pending_tail,
 #endif
       GNUNET_CONTAINER_DLL_remove (core_pending_head,
                                    core_pending_tail,
-                                   reply);
-      memcpy (&cbuf[off], reply->msg, msize);
-      GNUNET_free (reply);
+                                   pending);
+      if (pending->send_result != NULL) /* Will only be non-null if a real client asked for this send */
+        {
+          client_reply = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(struct GNUNET_DV_SendResultMessage));
+          client_reply->msg = (struct GNUNET_MessageHeader *)&client_reply[1];
+          memcpy(&client_reply[1], pending->send_result, sizeof(struct GNUNET_DV_SendResultMessage));
+          GNUNET_free(pending->send_result);
+
+          GNUNET_CONTAINER_DLL_insert_after(plugin_pending_head, plugin_pending_tail, plugin_pending_tail, client_reply);
+          if (client_handle != NULL)
+            {
+              if (plugin_transmit_handle == NULL)
+                {
+                  plugin_transmit_handle = GNUNET_SERVER_notify_transmit_ready (client_handle,
+                                                                                sizeof(struct GNUNET_DV_SendResultMessage),
+                                                                                GNUNET_TIME_UNIT_FOREVER_REL,
+                                                                                &transmit_to_plugin, NULL);
+                }
+              else
+                {
+                  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to queue message for plugin, must be one in progress already!!\n");
+                }
+            }
+        }
+      memcpy (&cbuf[off], pending->msg, msize);
+      GNUNET_free (pending);
       off += msize;
     }
       off += msize;
     }
+  /*reply = core_pending_head;*/
+
+  GNUNET_SCHEDULER_add_now(&try_core_send, NULL);
+  /*if (reply != NULL)
+    core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, GNUNET_YES,  reply->importance, reply->timeout, &reply->recipient, reply->msg_size, &core_transmit_notify, NULL);*/
+
   return off;
 }
 
   return off;
 }
 
@@ -691,81 +956,124 @@ size_t core_transmit_notify (void *cls,
  * Send a DV data message via DV.
  *
  * @param sender the original sender of the message
  * Send a DV data message via DV.
  *
  * @param sender the original sender of the message
- * @param specific_neighbor the specific DistantNeighbor to use, complete with referrer!
+ * @param recipient the next hop recipient, may be our direct peer, maybe not
  * @param send_context the send context
  */
 static int
  * @param send_context the send context
  */
 static int
-send_message_via (const struct GNUNET_PeerIdentity * sender,
-              const struct DistantNeighbor * specific_neighbor,
-              struct DV_SendContext *send_context)
+send_message_via (const struct GNUNET_PeerIdentity *sender,
+                  const struct GNUNET_PeerIdentity *recipient,
+                  struct DV_SendContext *send_context)
 {
   p2p_dv_MESSAGE_Data *toSend;
   unsigned int msg_size;
 {
   p2p_dv_MESSAGE_Data *toSend;
   unsigned int msg_size;
-  unsigned int cost;
   unsigned int recipient_id;
   unsigned int sender_id;
   struct DistantNeighbor *source;
   struct PendingMessage *pending_message;
   unsigned int recipient_id;
   unsigned int sender_id;
   struct DistantNeighbor *source;
   struct PendingMessage *pending_message;
+  struct FindIDContext find_context;
 #if DEBUG_DV
   char shortname[5];
 #endif
 
   msg_size = send_context->message_size + sizeof (p2p_dv_MESSAGE_Data);
 
 #if DEBUG_DV
   char shortname[5];
 #endif
 
   msg_size = send_context->message_size + sizeof (p2p_dv_MESSAGE_Data);
 
-  if (specific_neighbor == NULL)
+  find_context.dest = send_context->distant_peer;
+  find_context.via = recipient;
+  find_context.tid = 0;
+  GNUNET_CONTAINER_multihashmap_get_multiple (extended_neighbors, &send_context->distant_peer->hashPubKey,
+                                              &find_specific_id, &find_context);
+
+  if (find_context.tid == 0)
     {
     {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: find_specific_id failed to find peer!\n", my_short_id);
       /* target unknown to us, drop! */
       return GNUNET_SYSERR;
     }
       /* target unknown to us, drop! */
       return GNUNET_SYSERR;
     }
-  recipient_id = specific_neighbor->referrer_id;
+  recipient_id = find_context.tid;
 
 
-  source = GNUNET_CONTAINER_multihashmap_get (ctx.extended_neighbors,
-                                      &sender->hashPubKey);
-  if (source == NULL)
-    {
-      if (0 != (memcmp (&my_identity,
+  if (0 == (memcmp (&my_identity,
                         sender, sizeof (struct GNUNET_PeerIdentity))))
                         sender, sizeof (struct GNUNET_PeerIdentity))))
-        {
-          /* sender unknown to us, drop! */
-          return GNUNET_SYSERR;
-        }
-      sender_id = 0;            /* 0 == us */
-    }
+  {
+    sender_id = 0;
+    source = GNUNET_CONTAINER_multihashmap_get (extended_neighbors,
+                                                    &sender->hashPubKey);
+    if (source != NULL)
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: send_message_via found %s, myself in extended peer list???\n", my_short_id, GNUNET_i2s(&source->identity));
+  }
   else
   else
-    {
-      /* find out the number that we use when we gossip about
-         the sender */
-      sender_id = source->our_id;
-    }
+  {
+    source = GNUNET_CONTAINER_multihashmap_get (extended_neighbors,
+                                                &sender->hashPubKey);
+    if (source == NULL)
+      {
+              /* sender unknown to us, drop! */
+        return GNUNET_SYSERR;
+      }
+    sender_id = source->our_id;
+  }
 
 
-  cost = specific_neighbor->cost;
   pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + msg_size);
   pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
   pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + msg_size);
   pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
+  pending_message->send_result = send_context->send_result;
+  memcpy(&pending_message->recipient, recipient, sizeof(struct GNUNET_PeerIdentity));
+  pending_message->msg_size = msg_size;
+  pending_message->importance = send_context->importance;
+  pending_message->timeout = send_context->timeout;
   toSend = (p2p_dv_MESSAGE_Data *)pending_message->msg;
   toSend->header.size = htons (msg_size);
   toSend->header.type = htons (GNUNET_MESSAGE_TYPE_DV_DATA);
   toSend->sender = htonl (sender_id);
   toSend->recipient = htonl (recipient_id);
   toSend = (p2p_dv_MESSAGE_Data *)pending_message->msg;
   toSend->header.size = htons (msg_size);
   toSend->header.type = htons (GNUNET_MESSAGE_TYPE_DV_DATA);
   toSend->sender = htonl (sender_id);
   toSend->recipient = htonl (recipient_id);
+#if DEBUG_DV_MESSAGES
+  toSend->uid = send_context->uid; /* Still sent around in network byte order */
+#else
+  toSend->uid = htonl(0);
+#endif
+
   memcpy (&toSend[1], send_context->message, send_context->message_size);
 
 #if DEBUG_DV
   memcpy (&toSend[1], send_context->message, send_context->message_size);
 
 #if DEBUG_DV
-  memcpy(&shortname, GNUNET_i2s(&specific_neighbor->identity), 4);
+  memcpy(&shortname, GNUNET_i2s(send_context->distant_peer), 4);
   shortname[4] = '\0';
   shortname[4] = '\0';
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Notifying core of send to destination `%s' via `%s' size %u\n", "DV", &shortname, GNUNET_i2s(&specific_neighbor->referrer->identity), msg_size);
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Notifying core of send to destination `%s' via `%s' size %u\n", "DV", &shortname, GNUNET_i2s(recipient), msg_size);
 #endif
 
   GNUNET_CONTAINER_DLL_insert_after (core_pending_head,
                                      core_pending_tail,
                                      core_pending_tail,
                                      pending_message);
 #endif
 
   GNUNET_CONTAINER_DLL_insert_after (core_pending_head,
                                      core_pending_tail,
                                      core_pending_tail,
                                      pending_message);
-  if (core_transmit_handle == NULL)
-    core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, send_context->importance, send_context->timeout, &specific_neighbor->referrer->identity, msg_size, &core_transmit_notify, NULL);
-  else
-    GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "`%s': Failed to schedule pending transmission (must be one in progress!)\n", "dv service");
 
 
-  return (int) cost;
+  GNUNET_SCHEDULER_add_now(try_core_send, NULL);
+
+  return GNUNET_YES;
 }
 
 }
 
+/**
+ * Given a FindLeastCostContext, and a set
+ * of peers that match the target, return the cheapest.
+ *
+ * @param cls closure, a struct FindLeastCostContext
+ * @param key the key identifying the target peer
+ * @param value the target peer
+ *
+ * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
+ */
+static int
+find_least_cost_peer (void *cls,
+                  const GNUNET_HashCode *key,
+                  void *value)
+{
+  struct FindLeastCostContext *find_context = cls;
+  struct DistantNeighbor *dn = value;
+
+  if (dn->cost < find_context->least_cost)
+    {
+      find_context->target = dn;
+    }
+  if (dn->cost == DIRECT_NEIGHBOR_COST)
+    return GNUNET_NO;
+  return GNUNET_YES;
+}
 
 /**
  * Send a DV data message via DV.
 
 /**
  * Send a DV data message via DV.
@@ -776,6 +1084,7 @@ send_message_via (const struct GNUNET_PeerIdentity * sender,
  * @param message the packed message
  * @param message_size size of the message
  * @param importance what priority to send this message with
  * @param message the packed message
  * @param message_size size of the message
  * @param importance what priority to send this message with
+ * @param uid the unique identifier of this message (or 0 for none)
  * @param timeout how long to possibly delay sending this message
  */
 static int
  * @param timeout how long to possibly delay sending this message
  */
 static int
@@ -784,7 +1093,9 @@ send_message (const struct GNUNET_PeerIdentity * recipient,
               const struct DistantNeighbor * specific_neighbor,
               const struct GNUNET_MessageHeader * message,
               size_t message_size,
               const struct DistantNeighbor * specific_neighbor,
               const struct GNUNET_MessageHeader * message,
               size_t message_size,
-              unsigned int importance, struct GNUNET_TIME_Relative timeout)
+              unsigned int importance,
+              unsigned int uid,
+              struct GNUNET_TIME_Relative timeout)
 {
   p2p_dv_MESSAGE_Data *toSend;
   unsigned int msg_size;
 {
   p2p_dv_MESSAGE_Data *toSend;
   unsigned int msg_size;
@@ -794,11 +1105,26 @@ send_message (const struct GNUNET_PeerIdentity * recipient,
   struct DistantNeighbor *target;
   struct DistantNeighbor *source;
   struct PendingMessage *pending_message;
   struct DistantNeighbor *target;
   struct DistantNeighbor *source;
   struct PendingMessage *pending_message;
-
+  struct FindLeastCostContext find_least_ctx;
+#if DEBUG_DV_PEER_NUMBERS
+  struct GNUNET_CRYPTO_HashAsciiEncoded encPeerFrom;
+  struct GNUNET_CRYPTO_HashAsciiEncoded encPeerTo;
+  struct GNUNET_CRYPTO_HashAsciiEncoded encPeerVia;
+#endif
   msg_size = message_size + sizeof (p2p_dv_MESSAGE_Data);
 
   msg_size = message_size + sizeof (p2p_dv_MESSAGE_Data);
 
-  target = GNUNET_CONTAINER_multihashmap_get (ctx.extended_neighbors,
-                                              &recipient->hashPubKey);
+  find_least_ctx.least_cost = -1;
+  find_least_ctx.target = NULL;
+  /*
+   * Need to find the least cost peer, lest the transport selection keep
+   * picking the same DV route for the same destination which results
+   * in messages looping forever.  Relatively cheap, we don't iterate
+   * over all known peers, just those that apply.
+   */
+  GNUNET_CONTAINER_multihashmap_get_multiple (extended_neighbors,
+                                                       &recipient->hashPubKey,  &find_least_cost_peer, &find_least_ctx);
+  target = find_least_ctx.target;
+
   if (target == NULL)
     {
       /* target unknown to us, drop! */
   if (target == NULL)
     {
       /* target unknown to us, drop! */
@@ -806,8 +1132,8 @@ send_message (const struct GNUNET_PeerIdentity * recipient,
     }
   recipient_id = target->referrer_id;
 
     }
   recipient_id = target->referrer_id;
 
-  source = GNUNET_CONTAINER_multihashmap_get (ctx.extended_neighbors,
-                                      &sender->hashPubKey);
+  source = GNUNET_CONTAINER_multihashmap_get (extended_neighbors,
+                                              &sender->hashPubKey);
   if (source == NULL)
     {
       if (0 != (memcmp (&my_identity,
   if (source == NULL)
     {
       if (0 != (memcmp (&my_identity,
@@ -825,16 +1151,46 @@ send_message (const struct GNUNET_PeerIdentity * recipient,
       sender_id = source->our_id;
     }
 
       sender_id = source->our_id;
     }
 
+#if DEBUG_DV_PEER_NUMBERS
+  GNUNET_CRYPTO_hash_to_enc (&source->identity.hashPubKey, &encPeerFrom);
+  GNUNET_CRYPTO_hash_to_enc (&target->referrer->identity.hashPubKey, &encPeerVia);
+  encPeerFrom.encoding[4] = '\0';
+  encPeerVia.encoding[4] = '\0';
+#endif
+  if ((sender_id != 0) && (0 == memcmp(&source->identity, &target->referrer->identity, sizeof(struct GNUNET_PeerIdentity))))
+    {
+      return 0;
+    }
+
   cost = target->cost;
   pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + msg_size);
   pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
   cost = target->cost;
   pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + msg_size);
   pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
+  pending_message->send_result = NULL;
+  pending_message->importance = importance;
+  pending_message->timeout = timeout;
+  memcpy(&pending_message->recipient, &target->referrer->identity, sizeof(struct GNUNET_PeerIdentity));
+  pending_message->msg_size = msg_size;
   toSend = (p2p_dv_MESSAGE_Data *)pending_message->msg;
   toSend->header.size = htons (msg_size);
   toSend->header.type = htons (GNUNET_MESSAGE_TYPE_DV_DATA);
   toSend->sender = htonl (sender_id);
   toSend->recipient = htonl (recipient_id);
   toSend = (p2p_dv_MESSAGE_Data *)pending_message->msg;
   toSend->header.size = htons (msg_size);
   toSend->header.type = htons (GNUNET_MESSAGE_TYPE_DV_DATA);
   toSend->sender = htonl (sender_id);
   toSend->recipient = htonl (recipient_id);
-  memcpy (&toSend[1], message, message_size);
+#if DEBUG_DV_MESSAGES
+  toSend->uid = htonl(uid);
+#else
+  toSend->uid = htonl(0);
+#endif
 
 
+#if DEBUG_DV_PEER_NUMBERS
+  GNUNET_CRYPTO_hash_to_enc (&target->identity.hashPubKey, &encPeerTo);
+  encPeerTo.encoding[4] = '\0';
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Sending DATA message. Sender id %u, source %s, destination %s, via %s\n", GNUNET_i2s(&my_identity), sender_id, &encPeerFrom, &encPeerTo, &encPeerVia);
+#endif
+  memcpy (&toSend[1], message, message_size);
+  if ((source != NULL) && (source->pkey == NULL)) /* Test our hypothesis about message failures! */
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: Sending message, but anticipate recipient will not know sender!!!\n\n\n", my_short_id);
+    }
   GNUNET_CONTAINER_DLL_insert_after (core_pending_head,
                                      core_pending_tail,
                                      core_pending_tail,
   GNUNET_CONTAINER_DLL_insert_after (core_pending_head,
                                      core_pending_tail,
                                      core_pending_tail,
@@ -842,12 +1198,152 @@ send_message (const struct GNUNET_PeerIdentity * recipient,
 #if DEBUG_DV
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Notifying core of send size %d to destination `%s'\n", "DV SEND MESSAGE", msg_size, GNUNET_i2s(recipient));
 #endif
 #if DEBUG_DV
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Notifying core of send size %d to destination `%s'\n", "DV SEND MESSAGE", msg_size, GNUNET_i2s(recipient));
 #endif
-  if (core_transmit_handle == NULL)
-    core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, importance, timeout, &target->referrer->identity, msg_size, &core_transmit_notify, NULL);
 
 
+  GNUNET_SCHEDULER_add_now(try_core_send, NULL);
   return (int) cost;
 }
 
   return (int) cost;
 }
 
+#if USE_PEER_ID
+struct CheckPeerContext
+{
+  /**
+   * Peer we found
+   */
+  struct DistantNeighbor *peer;
+
+  /**
+   * Sender id to search for
+   */
+  unsigned int sender_id;
+};
+
+/**
+ * Iterator over hash map entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return GNUNET_YES if we should continue to
+ *         iterate,
+ *         GNUNET_NO if not.
+ */
+int checkPeerID (void *cls,
+                 const GNUNET_HashCode * key,
+                 void *value)
+{
+  struct CheckPeerContext *ctx = cls;
+  struct DistantNeighbor *distant = value;
+
+  if (memcmp(key, &ctx->sender_id, sizeof(unsigned int)) == 0)
+  {
+    ctx->peer = distant;
+    return GNUNET_NO;
+  }
+  return GNUNET_YES;
+
+}
+#endif
+
+
+/**
+ * Handler for messages parsed out by the tokenizer from
+ * DV DATA received for this peer.
+ *
+ * @param cls NULL
+ * @param client the TokenizedMessageContext which contains message information
+ * @param message the actual message
+ */
+void tokenized_message_handler (void *cls,
+                                void *client,
+                                const struct GNUNET_MessageHeader *message)
+{
+  struct TokenizedMessageContext *ctx = client;
+  GNUNET_break_op (ntohs (message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP);
+  GNUNET_break_op (ntohs (message->type) != GNUNET_MESSAGE_TYPE_DV_DATA);
+  if ( (ntohs (message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP) &&
+      (ntohs (message->type) != GNUNET_MESSAGE_TYPE_DV_DATA) )
+  {
+#if DEBUG_DV_MESSAGES
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%s: Receives %s message for me, uid %u, size %d, type %d cost %u from %s!\n", my_short_id, "DV DATA", ctx->uid, ntohs(message->size), ntohs(message->type), ctx->distant->cost, GNUNET_i2s(&ctx->distant->identity));
+#endif
+    GNUNET_assert(memcmp(ctx->peer, &ctx->distant->identity, sizeof(struct GNUNET_PeerIdentity)) != 0);
+    send_to_plugin(ctx->peer, message, ntohs(message->size), &ctx->distant->identity, ctx->distant->cost);
+  }
+}
+
+#if DELAY_FORWARDS
+struct DelayedMessageContext
+{
+  struct GNUNET_PeerIdentity dest;
+  struct GNUNET_PeerIdentity sender;
+  struct GNUNET_MessageHeader *message;
+  size_t message_size;
+  uint32_t uid;
+};
+
+void send_message_delayed (void *cls,
+                           const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct DelayedMessageContext *msg_ctx = cls;
+  if (msg_ctx != NULL)
+    {
+      send_message(&msg_ctx->dest,
+                   &msg_ctx->sender,
+                   NULL,
+                   msg_ctx->message,
+                   msg_ctx->message_size,
+                   default_dv_priority,
+                   msg_ctx->uid,
+                   GNUNET_TIME_relative_get_forever());
+      GNUNET_free(msg_ctx->message);
+      GNUNET_free(msg_ctx);
+    }
+}
+#endif
+
+/**
+ * Get distance information from 'atsi'.
+ *
+ * @param atsi performance data
+ * @return connected transport distance
+ */
+static uint32_t
+get_atsi_distance (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+{
+  while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
+          (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DISTANCE) )
+    atsi++;
+  if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR)
+    {
+      GNUNET_break (0);
+      /* FIXME: we do not have distance data? Assume direct neighbor. */
+      return DIRECT_NEIGHBOR_COST;
+    }
+  return ntohl (atsi->value);
+}
+
+/**
+ * Find latency information in 'atsi'.
+ *
+ * @param atsi performance data
+ * @return connection latency
+ */
+static struct GNUNET_TIME_Relative
+get_atsi_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+{
+  while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
+          (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
+    atsi++;
+  if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR)
+    {
+      GNUNET_break (0);
+      /* how can we not have latency data? */
+      return GNUNET_TIME_UNIT_SECONDS;
+    }
+  /* FIXME: Multiply by GNUNET_TIME_UNIT_MILLISECONDS (1) to get as a GNUNET_TIME_Relative */
+  return GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, ntohl (atsi->value));
+}
 
 /**
  * Core handler for dv data messages.  Whatever this message
 
 /**
  * Core handler for dv data messages.  Whatever this message
@@ -858,14 +1354,13 @@ send_message (const struct GNUNET_PeerIdentity * recipient,
  * @param cls closure
  * @param peer peer which sent the message (immediate sender)
  * @param message the message
  * @param cls closure
  * @param peer peer which sent the message (immediate sender)
  * @param message the message
- * @param latency the latency of the connection we received the message from
- * @param distance the distance to the immediate peer
- */
-static int handle_dv_data_message (void *cls,
-                             const struct GNUNET_PeerIdentity * peer,
-                             const struct GNUNET_MessageHeader * message,
-                             struct GNUNET_TIME_Relative latency,
-                             uint32_t distance)
+ * @param atsi transport ATS information (latency, distance, etc.)
+ */
+static int 
+handle_dv_data_message (void *cls,
+                       const struct GNUNET_PeerIdentity * peer,
+                       const struct GNUNET_MessageHeader * message,
+                       const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
   const p2p_dv_MESSAGE_Data *incoming = (const p2p_dv_MESSAGE_Data *) message;
   const struct GNUNET_MessageHeader *packed_message;
 {
   const p2p_dv_MESSAGE_Data *incoming = (const p2p_dv_MESSAGE_Data *) message;
   const struct GNUNET_MessageHeader *packed_message;
@@ -873,77 +1368,132 @@ static int handle_dv_data_message (void *cls,
   struct DistantNeighbor *pos;
   unsigned int sid;             /* Sender id */
   unsigned int tid;             /* Target id */
   struct DistantNeighbor *pos;
   unsigned int sid;             /* Sender id */
   unsigned int tid;             /* Target id */
-  struct GNUNET_PeerIdentity original_sender;
-  struct GNUNET_PeerIdentity destination;
+  struct GNUNET_PeerIdentity *original_sender;
+  struct GNUNET_PeerIdentity *destination;
   struct FindDestinationContext fdc;
   struct FindDestinationContext fdc;
+  struct TokenizedMessageContext tkm_ctx;
+  int i;
+  int found_pos;
+#if DELAY_FORWARDS
+  struct DelayedMessageContext *delayed_context;
+#endif
+#if USE_PEER_ID
+  struct CheckPeerContext checkPeerCtx;
+#endif
+#if DEBUG_DV_MESSAGES
+  char *sender_id;
+#endif
   int ret;
   size_t packed_message_size;
   char *cbuf;
   int ret;
   size_t packed_message_size;
   char *cbuf;
-  size_t offset;
+  uint32_t distance; /* Distance information */
+  struct GNUNET_TIME_Relative latency; /* Latency information */
 
   packed_message_size = ntohs(incoming->header.size) - sizeof(p2p_dv_MESSAGE_Data);
 
   packed_message_size = ntohs(incoming->header.size) - sizeof(p2p_dv_MESSAGE_Data);
-
 #if DEBUG_DV
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
 #if DEBUG_DV
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Receives %s message size %d, packed message size %d!\n", "dv", "DV DATA", ntohs(incoming->header.size), packed_message_size);
+              "%s: Receives DATA message from %s size %d, packed size %d!\n", my_short_id, GNUNET_i2s(peer) , ntohs(incoming->header.size), packed_message_size);
 #endif
 #endif
+
   if (ntohs (incoming->header.size) <  sizeof (p2p_dv_MESSAGE_Data) + sizeof (struct GNUNET_MessageHeader))
     {
 #if DEBUG_DV
   if (ntohs (incoming->header.size) <  sizeof (p2p_dv_MESSAGE_Data) + sizeof (struct GNUNET_MessageHeader))
     {
 #if DEBUG_DV
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Message sizes don't add up, total size %u, expected at least %u!\n", "dv service", ntohs(incoming->header.size), sizeof (p2p_dv_MESSAGE_Data) + sizeof (struct GNUNET_MessageHeader));
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "`%s': Message sizes don't add up, total size %u, expected at least %u!\n", "dv service", ntohs(incoming->header.size), sizeof (p2p_dv_MESSAGE_Data) + sizeof (struct GNUNET_MessageHeader));
 #endif
       return GNUNET_SYSERR;
     }
 
 #endif
       return GNUNET_SYSERR;
     }
 
-  dn = GNUNET_CONTAINER_multihashmap_get (ctx.direct_neighbors,
-                                  &peer->hashPubKey);
+  /* Iterate over ATS_Information to get distance and latency */
+  latency = get_atsi_latency(atsi);
+  distance = get_atsi_distance(atsi);
+  dn = GNUNET_CONTAINER_multihashmap_get (direct_neighbors,
+                                          &peer->hashPubKey);
   if (dn == NULL)
   if (dn == NULL)
-    {
-#if DEBUG_DV
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%s: dn NULL!\n", "dv");
-#endif
-      return GNUNET_OK;
-    }
+    return GNUNET_OK;
+
   sid = ntohl (incoming->sender);
   sid = ntohl (incoming->sender);
+#if USE_PEER_ID
+  if (sid != 0)
+  {
+    checkPeerCtx.sender_id = sid;
+    checkPeerCtx.peer = NULL;
+    GNUNET_CONTAINER_multihashmap_iterate(extended_neighbors, &checkPeerID, &checkPeerCtx);
+    pos = checkPeerCtx.peer;
+  }
+  else
+  {
+    pos = GNUNET_CONTAINER_multihashmap_get (extended_neighbors,
+                                             &peer->hashPubKey);
+  }
+#else
   pos = dn->referee_head;
   while ((NULL != pos) && (pos->referrer_id != sid))
     pos = pos->next;
   pos = dn->referee_head;
   while ((NULL != pos) && (pos->referrer_id != sid))
     pos = pos->next;
+#endif
+
   if (pos == NULL)
     {
   if (pos == NULL)
     {
-#if DEBUG_DV
+#if DEBUG_DV_MESSAGES
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%s: unknown sender (%d), size of extended_peers is %d!\n", "dv", ntohl(incoming->sender), GNUNET_CONTAINER_multihashmap_size (ctx.extended_neighbors));
+                  "%s: unknown sender (%u), Message uid %u from %s!\n", my_short_id, ntohl(incoming->sender), ntohl(incoming->uid), GNUNET_i2s(&dn->identity));
+      pos = dn->referee_head;
+      while ((NULL != pos) && (pos->referrer_id != sid))
+      {
+        sender_id = strdup(GNUNET_i2s(&pos->identity));
+        GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "I know sender %u %s\n", pos->referrer_id, sender_id);
+        GNUNET_free(sender_id);
+        pos = pos->next;
+      }
 #endif
 #endif
+
+      found_pos = -1;
+      for (i = 0; i< MAX_OUTSTANDING_MESSAGES; i++)
+        {
+          if (dn->pending_messages[i].sender_id == 0)
+            {
+              found_pos = i;
+              break;
+            }
+        }
+
+      if (found_pos == -1)
+        {
+          GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                      "%s: Too many unknown senders (%u), ignoring message! Message uid %llu from %s!\n", my_short_id, ntohl(incoming->sender), ntohl(incoming->uid), GNUNET_i2s(&dn->identity));
+        }
+      else
+        {
+            dn->pending_messages[found_pos].message = GNUNET_malloc(ntohs (message->size));
+            memcpy(dn->pending_messages[found_pos].message, message, ntohs(message->size));
+            dn->pending_messages[found_pos].distance = distance;
+            dn->pending_messages[found_pos].latency = latency;
+            memcpy(&dn->pending_messages[found_pos].sender, peer, sizeof(struct GNUNET_PeerIdentity));
+            dn->pending_messages[found_pos].sender_id = sid;
+        }
       /* unknown sender */
       return GNUNET_OK;
     }
       /* unknown sender */
       return GNUNET_OK;
     }
-  original_sender = pos->identity;
+  original_sender = &pos->identity;
   tid = ntohl (incoming->recipient);
   if (tid == 0)
     {
       /* 0 == us */
   tid = ntohl (incoming->recipient);
   if (tid == 0)
     {
       /* 0 == us */
-
       cbuf = (char *)&incoming[1];
       cbuf = (char *)&incoming[1];
-      offset = 0;
-      while(offset < packed_message_size)
+
+      tkm_ctx.peer = peer;
+      tkm_ctx.distant = pos;
+      tkm_ctx.uid = ntohl(incoming->uid);
+      if (GNUNET_OK != GNUNET_SERVER_mst_receive (coreMST,
+                                                  &tkm_ctx,
+                                                  cbuf,
+                                                  packed_message_size,
+                                                  GNUNET_NO,
+                                                  GNUNET_NO))
         {
         {
-          packed_message = (struct GNUNET_MessageHeader *)&cbuf[offset];
-#if DEBUG_DV
-          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "%s: Receives %s message for me, size %d type %d!\n", "dv", "DV DATA", ntohs(packed_message->size), ntohs(packed_message->type));
-#endif
-          GNUNET_break_op (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP);
-          GNUNET_break_op (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_DATA);
-          if ( (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP) &&
-              (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_DATA) )
-          {
-            send_to_plugin(peer, packed_message, ntohs(packed_message->size), &pos->identity, pos->cost);
-          }
-          offset += ntohs(packed_message->size);
+          GNUNET_break_op(0);
+          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: %s Received corrupt data, discarding!", my_short_id, "DV SERVICE");
         }
         }
-
       return GNUNET_OK;
     }
   else
       return GNUNET_OK;
     }
   else
@@ -957,7 +1507,7 @@ static int handle_dv_data_message (void *cls,
      issue) */
   fdc.tid = tid;
   fdc.dest = NULL;
      issue) */
   fdc.tid = tid;
   fdc.dest = NULL;
-  GNUNET_CONTAINER_heap_iterate (ctx.neighbor_max_heap,
+  GNUNET_CONTAINER_heap_iterate (neighbor_max_heap,
                                  &find_destination, &fdc);
 
 #if DEBUG_DV
                                  &find_destination, &fdc);
 
 #if DEBUG_DV
@@ -967,15 +1517,21 @@ static int handle_dv_data_message (void *cls,
 
   if (fdc.dest == NULL)
     {
 
   if (fdc.dest == NULL)
     {
+#if DEBUG_DV_MESSAGES
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "%s: Receives %s message uid %u for someone we don't know (id %u)!\n", my_short_id, "DV DATA", ntohl(incoming->uid), tid);
+#endif
       return GNUNET_OK;
     }
       return GNUNET_OK;
     }
-  destination = fdc.dest->identity;
+  destination = &fdc.dest->identity;
 
 
-  if (0 == memcmp (&destination, peer, sizeof (struct GNUNET_PeerIdentity)))
+  if (0 == memcmp (destination, peer, sizeof (struct GNUNET_PeerIdentity)))
     {
       /* FIXME: create stat: routing loop-discard! */
     {
       /* FIXME: create stat: routing loop-discard! */
-#if DEBUG_DV
-      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "\n\n\nLoopy loo message\n\n\n");
+
+#if DEBUG_DV_MESSAGES
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "%s: DROPPING MESSAGE uid %u type %d, routing loop! Message immediately from %s!\n", my_short_id, ntohl(incoming->uid), ntohs(packed_message->type), GNUNET_i2s(&dn->identity));
 #endif
       return GNUNET_OK;
     }
 #endif
       return GNUNET_OK;
     }
@@ -983,72 +1539,133 @@ static int handle_dv_data_message (void *cls,
   /* At this point we have a message, and we need to forward it on to the
    * next DV hop.
    */
   /* At this point we have a message, and we need to forward it on to the
    * next DV hop.
    */
-  /* FIXME: Can't send message on, we have to behave.
-   * We have to tell core we have a message for the next peer, and let
-   * transport do transport selection on how to get this message to 'em */
-  /*ret = send_message (&destination,
-                      &original_sender,
-                      packed_message, DV_PRIORITY, DV_DELAY);*/
-#if DEBUG_DV
+#if DEBUG_DV_MESSAGES
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Sends message size %d on!\n", "dv", packed_message_size);
+              "%s: FORWARD %s message for %s, uid %u, size %d type %d, cost %u!\n", my_short_id, "DV DATA", GNUNET_i2s(destination), ntohl(incoming->uid), ntohs(packed_message->size), ntohs(packed_message->type), pos->cost);
 #endif
 #endif
-  ret = send_message(&destination, &original_sender, NULL, packed_message, packed_message_size, default_dv_priority, default_dv_delay);
 
 
+#if DELAY_FORWARDS
+  if (GNUNET_TIME_absolute_get_duration(pos->last_gossip).abs_value < GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 2).abs_value)
+    {
+      delayed_context = GNUNET_malloc(sizeof(struct DelayedMessageContext));
+      memcpy(&delayed_context->dest, destination, sizeof(struct GNUNET_PeerIdentity));
+      memcpy(&delayed_context->sender, original_sender, sizeof(struct GNUNET_PeerIdentity));
+      delayed_context->message = GNUNET_malloc(packed_message_size);
+      memcpy(delayed_context->message, packed_message, packed_message_size);
+      delayed_context->message_size = packed_message_size;
+      delayed_context->uid = ntohl(incoming->uid);
+      GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 2500), &send_message_delayed, delayed_context);
+      return GNUNET_OK;
+    }
+  else
+#endif
+    {
+      ret = send_message(destination,
+                         original_sender,
+                         NULL,
+                         packed_message,
+                         packed_message_size,
+                         default_dv_priority,
+                         ntohl(incoming->uid),
+                         GNUNET_TIME_relative_get_forever());
+    }
   if (ret != GNUNET_SYSERR)
     return GNUNET_OK;
   else
   if (ret != GNUNET_SYSERR)
     return GNUNET_OK;
   else
-    return GNUNET_SYSERR;
+    {
+#if DEBUG_MESSAGE_DROP
+      char *direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity));
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "%s: DROPPING MESSAGE type %d, forwarding failed! Message immediately from %s!\n", GNUNET_i2s(&my_identity), ntohs(((struct GNUNET_MessageHeader *)&incoming[1])->type), direct_id);
+      GNUNET_free (direct_id);
+#endif
+      return GNUNET_SYSERR;
+    }
 }
 
 }
 
+#if DEBUG_DV
+/**
+ * Iterator over hash map entries.
+ *
+ * @param cls closure (NULL)
+ * @param key current key code
+ * @param value value in the hash map (DistantNeighbor)
+ * @return GNUNET_YES if we should continue to
+ *         iterate,
+ *         GNUNET_NO if not.
+ */
+int print_neighbors (void *cls,
+                     const GNUNET_HashCode * key,
+                     void *abs_value)
+{
+  struct DistantNeighbor *distant_neighbor = abs_value;
+  char my_shortname[5];
+  char referrer_shortname[5];
+  memcpy(&my_shortname, GNUNET_i2s(&my_identity), 4);
+  my_shortname[4] = '\0';
+  memcpy(&referrer_shortname, GNUNET_i2s(&distant_neighbor->referrer->identity), 4);
+  referrer_shortname[4] = '\0';
+
+  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "`%s' %s: Peer `%s', distance %d, referrer `%s' pkey: %s\n", &my_shortname, "DV", GNUNET_i2s(&distant_neighbor->identity), distant_neighbor->cost, &referrer_shortname, distant_neighbor->pkey == NULL ? "no" : "yes");
+  return GNUNET_YES;
+}
+#endif
 
 /**
 
 /**
- * Thread which chooses a peer to gossip about and a peer to gossip
- * to, then constructs the message and sends it out.  Will run until
- * done_module_dv is called.
+ *  Scheduled task which gossips about known direct peers to other connected
+ *  peers.  Will run until called with reason shutdown.
  */
 static void
 neighbor_send_task (void *cls,
  */
 static void
 neighbor_send_task (void *cls,
-                      const struct GNUNET_SCHEDULER_TaskContext *tc)
+                    const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct NeighborSendContext *send_context = cls;
 {
   struct NeighborSendContext *send_context = cls;
-#if DEBUG_DV_GOSSIP
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Entering neighbor_send_task...\n",
-              GNUNET_i2s(&my_identity));
+#if DEBUG_DV_GOSSIP_SEND
   char * encPeerAbout;
   char * encPeerTo;
 #endif
   struct DistantNeighbor *about;
   struct DirectNeighbor *to;
   char * encPeerAbout;
   char * encPeerTo;
 #endif
   struct DistantNeighbor *about;
   struct DirectNeighbor *to;
+  struct FastGossipNeighborList *about_list;
 
   p2p_dv_MESSAGE_NeighborInfo *message;
   struct PendingMessage *pending_message;
 
 
   p2p_dv_MESSAGE_NeighborInfo *message;
   struct PendingMessage *pending_message;
 
-  if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
-  {
+  if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
+    {
 #if DEBUG_DV_GOSSIP
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%s: Called with reason shutdown, shutting down!\n",
               GNUNET_i2s(&my_identity));
 #endif
 #if DEBUG_DV_GOSSIP
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%s: Called with reason shutdown, shutting down!\n",
               GNUNET_i2s(&my_identity));
 #endif
-    send_context->toNeighbor->send_context = NULL;
-    GNUNET_free(send_context);
-    return;
-  }
-
+      return;
+    }
 
 
-  /* FIXME: this may become a problem, because the heap walk has only one internal "walker".  This means
-   * that if two neighbor_send_tasks are operating in lockstep (which is quite possible, given default
-   * values for all connected peers) there may be a serious bias as to which peers get gossiped about!
-   * Probably the *best* way to fix would be to have an opaque pointer to the walk position passed as
-   * part of the walk_get_next call.  Then the heap would have to keep a list of walks, or reset the walk
-   * whenever a modification has been detected.  Yuck either way.  Perhaps we could iterate over the heap
-   * once to get a list of peers to gossip about and gossip them over time... But then if one goes away
-   * in the mean time that becomes nasty.  For now we'll just assume that the walking is done
-   * asynchronously enough to avoid major problems (-;
-   */
-  about = GNUNET_CONTAINER_heap_walk_get_next (ctx.neighbor_min_heap);
+  if (send_context->fast_gossip_list_head != NULL)
+    {
+      about_list = send_context->fast_gossip_list_head;
+      about = about_list->about;
+      GNUNET_CONTAINER_DLL_remove(send_context->fast_gossip_list_head,
+                                  send_context->fast_gossip_list_tail,
+                                  about_list);
+      GNUNET_free(about_list);
+    }
+  else
+    {
+      /* FIXME: this may become a problem, because the heap walk has only one internal "walker".  This means
+       * that if two neighbor_send_tasks are operating in lockstep (which is quite possible, given default
+       * values for all connected peers) there may be a serious bias as to which peers get gossiped about!
+       * Probably the *best* way to fix would be to have an opaque pointer to the walk position passed as
+       * part of the walk_get_next call.  Then the heap would have to keep a list of walks, or reset the walk
+       * whenever a modification has been detected.  Yuck either way.  Perhaps we could iterate over the heap
+       * once to get a list of peers to gossip about and gossip them over time... But then if one goes away
+       * in the mean time that becomes nasty.  For now we'll just assume that the walking is done
+       * asynchronously enough to avoid major problems (-;
+       *
+       * NOTE: probably fixed once we decided send rate based on allowed bandwidth.
+       */
+      about = GNUNET_CONTAINER_heap_walk_get_next (neighbor_min_heap);
+    }
   to = send_context->toNeighbor;
 
   if ((about != NULL) && (to != about->referrer /* split horizon */ ) &&
   to = send_context->toNeighbor;
 
   if ((about != NULL) && (to != about->referrer /* split horizon */ ) &&
@@ -1060,18 +1677,23 @@ neighbor_send_task (void *cls,
                         &to->identity, sizeof (struct GNUNET_PeerIdentity))) &&
       (about->pkey != NULL))
     {
                         &to->identity, sizeof (struct GNUNET_PeerIdentity))) &&
       (about->pkey != NULL))
     {
-#if DEBUG_DV_GOSSIP
+#if DEBUG_DV_GOSSIP_SEND
       encPeerAbout = GNUNET_strdup(GNUNET_i2s(&about->identity));
       encPeerTo = GNUNET_strdup(GNUNET_i2s(&to->identity));
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
       encPeerAbout = GNUNET_strdup(GNUNET_i2s(&about->identity));
       encPeerTo = GNUNET_strdup(GNUNET_i2s(&to->identity));
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "%s: Sending info about peer %s to directly connected peer %s\n",
+                  "%s: Sending info about peer %s id %u to directly connected peer %s\n",
                   GNUNET_i2s(&my_identity),
                   GNUNET_i2s(&my_identity),
-                  encPeerAbout, encPeerTo);
+                  encPeerAbout, about->our_id, encPeerTo);
       GNUNET_free(encPeerAbout);
       GNUNET_free(encPeerTo);
 #endif
       GNUNET_free(encPeerAbout);
       GNUNET_free(encPeerTo);
 #endif
+      about->last_gossip = GNUNET_TIME_absolute_get();
       pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(p2p_dv_MESSAGE_NeighborInfo));
       pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
       pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(p2p_dv_MESSAGE_NeighborInfo));
       pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
+      pending_message->importance = default_dv_priority;
+      pending_message->timeout = GNUNET_TIME_relative_get_forever();
+      memcpy(&pending_message->recipient, &to->identity, sizeof(struct GNUNET_PeerIdentity));
+      pending_message->msg_size = sizeof(p2p_dv_MESSAGE_NeighborInfo);
       message = (p2p_dv_MESSAGE_NeighborInfo *)pending_message->msg;
       message->header.size = htons (sizeof (p2p_dv_MESSAGE_NeighborInfo));
       message->header.type = htons (GNUNET_MESSAGE_TYPE_DV_GOSSIP);
       message = (p2p_dv_MESSAGE_NeighborInfo *)pending_message->msg;
       message->header.size = htons (sizeof (p2p_dv_MESSAGE_NeighborInfo));
       message->header.type = htons (GNUNET_MESSAGE_TYPE_DV_GOSSIP);
@@ -1087,12 +1709,27 @@ neighbor_send_task (void *cls,
                                          core_pending_tail,
                                          pending_message);
 
                                          core_pending_tail,
                                          pending_message);
 
-      if (core_transmit_handle == NULL)
-        core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, default_dv_priority, default_dv_delay, &to->identity, sizeof(p2p_dv_MESSAGE_NeighborInfo), &core_transmit_notify, NULL);
+      GNUNET_SCHEDULER_add_now(try_core_send, NULL);
+      /*if (core_transmit_handle == NULL)
+        core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, GNUNET_YES,  default_dv_priority, GNUNET_TIME_relative_get_forever(), &to->identity, sizeof(p2p_dv_MESSAGE_NeighborInfo), &core_transmit_notify, NULL);*/
 
     }
 
 
     }
 
-  send_context->task = GNUNET_SCHEDULER_add_delayed(sched, send_context->timeout, &neighbor_send_task, send_context);
+  if (send_context->fast_gossip_list_head != NULL) /* If there are other peers in the fast list, schedule right away */
+    {
+#if DEBUG_DV_PEER_NUMBERS
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "DV SERVICE: still in fast send mode\n");
+#endif
+      send_context->task = GNUNET_SCHEDULER_add_now(&neighbor_send_task, send_context);
+    }
+  else
+    {
+#if DEBUG_DV_PEER_NUMBERS
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "DV SERVICE: entering slow send mode\n");
+#endif
+      send_context->task = GNUNET_SCHEDULER_add_delayed(GNUNET_DV_DEFAULT_SEND_INTERVAL, &neighbor_send_task, send_context);
+    }
+
   return;
 }
 
   return;
 }
 
@@ -1122,7 +1759,7 @@ handle_start (void *cls,
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
-
+#if UNSIMPLER
 /**
  * Iterate over hash map entries for a distant neighbor,
  * if direct neighbor matches context call send message
 /**
  * Iterate over hash map entries for a distant neighbor,
  * if direct neighbor matches context call send message
@@ -1136,10 +1773,10 @@ handle_start (void *cls,
  */
 int send_iterator (void *cls,
                    const GNUNET_HashCode * key,
  */
 int send_iterator (void *cls,
                    const GNUNET_HashCode * key,
-                   void *value)
+                   void *abs_value)
 {
   struct DV_SendContext *send_context = cls;
 {
   struct DV_SendContext *send_context = cls;
-  struct DistantNeighbor *distant_neighbor = value;
+  struct DistantNeighbor *distant_neighbor = abs_value;
 
   if (memcmp(distant_neighbor->referrer, send_context->direct_peer, sizeof(struct GNUNET_PeerIdentity)) == 0) /* They match, send and free */
     {
 
   if (memcmp(distant_neighbor->referrer, send_context->direct_peer, sizeof(struct GNUNET_PeerIdentity)) == 0) /* They match, send and free */
     {
@@ -1148,6 +1785,7 @@ int send_iterator (void *cls,
     }
   return GNUNET_YES;
 }
     }
   return GNUNET_YES;
 }
+#endif
 
 /**
  * Service server's handler for message send requests (which come
 
 /**
  * Service server's handler for message send requests (which come
@@ -1158,10 +1796,12 @@ int send_iterator (void *cls,
  * @param message the actual message
  */
 void handle_dv_send_message (void *cls,
  * @param message the actual message
  */
 void handle_dv_send_message (void *cls,
-                      struct GNUNET_SERVER_Client * client,
-                      const struct GNUNET_MessageHeader * message)
+                             struct GNUNET_SERVER_Client * client,
+                             const struct GNUNET_MessageHeader * message)
 {
   struct GNUNET_DV_SendMessage *send_msg;
 {
   struct GNUNET_DV_SendMessage *send_msg;
+  struct GNUNET_DV_SendResultMessage *send_result_msg;
+  struct PendingMessage *pending_message;
   size_t address_len;
   size_t message_size;
   struct GNUNET_PeerIdentity *destination;
   size_t address_len;
   size_t message_size;
   struct GNUNET_PeerIdentity *destination;
@@ -1171,33 +1811,33 @@ void handle_dv_send_message (void *cls,
   int offset;
   static struct GNUNET_CRYPTO_HashAsciiEncoded dest_hash;
   struct DV_SendContext *send_context;
   int offset;
   static struct GNUNET_CRYPTO_HashAsciiEncoded dest_hash;
   struct DV_SendContext *send_context;
+#if DEBUG_DV_MESSAGES
+  char *cbuf;
+  struct GNUNET_MessageHeader *packed_message;
+#endif
 
   if (client_handle == NULL)
   {
     client_handle = client;
 
   if (client_handle == NULL)
   {
     client_handle = client;
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%s: Setting initial client handle, never received `%s' message?\n", "dv", "START");
   }
   else if (client_handle != client)
   {
     client_handle = client;
     /* What should we do in this case, assert fail or just log the warning? */
               "%s: Setting initial client handle, never received `%s' message?\n", "dv", "START");
   }
   else if (client_handle != client)
   {
     client_handle = client;
     /* What should we do in this case, assert fail or just log the warning? */
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+#if DEBUG_DV
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "%s: Setting client handle (was a different client!)!\n", "dv");
                 "%s: Setting client handle (was a different client!)!\n", "dv");
+#endif
   }
 
   GNUNET_assert(ntohs(message->size) > sizeof(struct GNUNET_DV_SendMessage));
   send_msg = (struct GNUNET_DV_SendMessage *)message;
 
   }
 
   GNUNET_assert(ntohs(message->size) > sizeof(struct GNUNET_DV_SendMessage));
   send_msg = (struct GNUNET_DV_SendMessage *)message;
 
-  address_len = ntohs(send_msg->addrlen);
+  address_len = ntohl(send_msg->addrlen);
   GNUNET_assert(address_len == sizeof(struct GNUNET_PeerIdentity) * 2);
   GNUNET_assert(address_len == sizeof(struct GNUNET_PeerIdentity) * 2);
-  message_size = ntohs(send_msg->msgbuf_size);
-
-#if DEBUG_DV
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Receives %s message size %u!\n\n\n", "dv", "SEND", message_size);
-#endif
-  GNUNET_assert(ntohs(message->size) == sizeof(struct GNUNET_DV_SendMessage) + address_len + message_size);
+  message_size = ntohs(message->size) - sizeof(struct GNUNET_DV_SendMessage) - address_len;
   destination = GNUNET_malloc(sizeof(struct GNUNET_PeerIdentity));
   direct = GNUNET_malloc(sizeof(struct GNUNET_PeerIdentity));
   message_buf = GNUNET_malloc(message_size);
   destination = GNUNET_malloc(sizeof(struct GNUNET_PeerIdentity));
   direct = GNUNET_malloc(sizeof(struct GNUNET_PeerIdentity));
   message_buf = GNUNET_malloc(message_size);
@@ -1220,26 +1860,97 @@ void handle_dv_send_message (void *cls,
       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: asked to send message to `%s', but address is for `%s'!", "DV SERVICE", GNUNET_i2s(&send_msg->target), (const char *)&dest_hash.encoding);
     }
 
       GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: asked to send message to `%s', but address is for `%s'!", "DV SERVICE", GNUNET_i2s(&send_msg->target), (const char *)&dest_hash.encoding);
     }
 
-#if DEBUG_DV
+#if DEBUG_DV_MESSAGES
+  cbuf = (char *)message_buf;
+  offset = 0;
+  while(offset < message_size)
+    {
+      packed_message = (struct GNUNET_MessageHeader *)&cbuf[offset];
+      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: DV PLUGIN SEND uid %u type %d to %s\n", my_short_id, ntohl(send_msg->uid), ntohs(packed_message->type), GNUNET_i2s(destination));
+      offset += ntohs(packed_message->size);
+    }
+  /*GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: DV PLUGIN SEND uid %u type %d to %s\n", my_short_id, ntohl(send_msg->uid), ntohs(message_buf->type), GNUNET_i2s(destination));*/
+#endif
   GNUNET_CRYPTO_hash_to_enc (&destination->hashPubKey, &dest_hash); /* GNUNET_i2s won't properly work, need to hash one ourselves */
   dest_hash.encoding[4] = '\0';
   GNUNET_CRYPTO_hash_to_enc (&destination->hashPubKey, &dest_hash); /* GNUNET_i2s won't properly work, need to hash one ourselves */
   dest_hash.encoding[4] = '\0';
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV SEND called with message of size %d type %d, destination `%s' via `%s'\n", message_size, ntohs(message_buf->type), (const char *)&dest_hash.encoding, GNUNET_i2s(direct));
-#endif
   send_context = GNUNET_malloc(sizeof(struct DV_SendContext));
 
   send_context = GNUNET_malloc(sizeof(struct DV_SendContext));
 
-  send_context->importance = ntohs(send_msg->priority);
+  send_result_msg = GNUNET_malloc(sizeof(struct GNUNET_DV_SendResultMessage));
+  send_result_msg->header.size = htons(sizeof(struct GNUNET_DV_SendResultMessage));
+  send_result_msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT);
+  send_result_msg->uid = send_msg->uid; /* No need to ntohl->htonl this */
+
+  send_context->importance = ntohl(send_msg->priority);
   send_context->timeout = send_msg->timeout;
   send_context->direct_peer = direct;
   send_context->distant_peer = destination;
   send_context->message = message_buf;
   send_context->message_size = message_size;
   send_context->timeout = send_msg->timeout;
   send_context->direct_peer = direct;
   send_context->distant_peer = destination;
   send_context->message = message_buf;
   send_context->message_size = message_size;
+  send_context->send_result = send_result_msg;
+#if DEBUG_DV_MESSAGES
+  send_context->uid = send_msg->uid;
+#endif
 
 
-  /* In bizarro world GNUNET_SYSERR indicates that we succeeded */
-  if (GNUNET_SYSERR != GNUNET_CONTAINER_multihashmap_get_multiple(ctx.extended_neighbors, &destination->hashPubKey, &send_iterator, send_context))
+  if (send_message_via(&my_identity, direct, send_context) != GNUNET_YES)
     {
     {
-      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "DV SEND failed to send message to destination `%s' via `%s'\n", (const char *)&dest_hash.encoding, GNUNET_i2s(direct));
+      send_result_msg->result = htons(1);
+      pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(struct GNUNET_DV_SendResultMessage));
+      pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
+      memcpy(&pending_message[1], send_result_msg, sizeof(struct GNUNET_DV_SendResultMessage));
+      GNUNET_free(send_result_msg);
+
+      GNUNET_CONTAINER_DLL_insert_after(plugin_pending_head, plugin_pending_tail, plugin_pending_tail, pending_message);
+
+      if (client_handle != NULL)
+        {
+          if (plugin_transmit_handle == NULL)
+            {
+              plugin_transmit_handle = GNUNET_SERVER_notify_transmit_ready (client_handle,
+                                                                            sizeof(struct GNUNET_DV_SendResultMessage),
+                                                                            GNUNET_TIME_UNIT_FOREVER_REL,
+                                                                            &transmit_to_plugin, NULL);
+            }
+          else
+            {
+              GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to queue message for plugin, must be one in progress already!!\n");
+            }
+        }
+      GNUNET_CRYPTO_hash_to_enc (&destination->hashPubKey, &dest_hash); /* GNUNET_i2s won't properly work, need to hash one ourselves */
+      dest_hash.encoding[4] = '\0';
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s DV SEND failed to send message to destination `%s' via `%s'\n", my_short_id, (const char *)&dest_hash.encoding, GNUNET_i2s(direct));
     }
 
     }
 
+  /* In bizarro world GNUNET_SYSERR indicates that we succeeded */
+#if UNSIMPLER
+  if (GNUNET_SYSERR != GNUNET_CONTAINER_multihashmap_get_multiple(extended_neighbors, &destination->hashPubKey, &send_iterator, send_context))
+    {
+      send_result_msg->result = htons(1);
+      pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(struct GNUNET_DV_SendResultMessage));
+      pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
+      memcpy(&pending_message[1], send_result_msg, sizeof(struct GNUNET_DV_SendResultMessage));
+      GNUNET_free(send_result_msg);
+
+      GNUNET_CONTAINER_DLL_insert_after(plugin_pending_head, plugin_pending_tail, plugin_pending_tail, pending_message);
+
+      if (client_handle != NULL)
+        {
+          if (plugin_transmit_handle == NULL)
+            {
+              plugin_transmit_handle = GNUNET_SERVER_notify_transmit_ready (client_handle,
+                                                                            sizeof(struct GNUNET_DV_SendResultMessage),
+                                                                            GNUNET_TIME_UNIT_FOREVER_REL,
+                                                                            &transmit_to_plugin, NULL);
+            }
+          else
+            {
+              GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to queue message for plugin, must be one in progress already!!\n");
+            }
+        }
+      GNUNET_CRYPTO_hash_to_enc (&destination->hashPubKey, &dest_hash); /* GNUNET_i2s won't properly work, need to hash one ourselves */
+      dest_hash.encoding[4] = '\0';
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s DV SEND failed to send message to destination `%s' via `%s'\n", my_short_id, (const char *)&dest_hash.encoding, GNUNET_i2s(direct));
+    }
+#endif
   GNUNET_free(message_buf);
   GNUNET_free(send_context);
   GNUNET_free(direct);
   GNUNET_free(message_buf);
   GNUNET_free(send_context);
   GNUNET_free(direct);
@@ -1248,11 +1959,18 @@ void handle_dv_send_message (void *cls,
   GNUNET_SERVER_receive_done(client, GNUNET_OK);
 }
 
   GNUNET_SERVER_receive_done(client, GNUNET_OK);
 }
 
+/** Forward declarations **/
 static int handle_dv_gossip_message (void *cls,
                                      const struct GNUNET_PeerIdentity *peer,
                                      const struct GNUNET_MessageHeader *message,
 static int handle_dv_gossip_message (void *cls,
                                      const struct GNUNET_PeerIdentity *peer,
                                      const struct GNUNET_MessageHeader *message,
-                                     struct GNUNET_TIME_Relative latency,
-                                     uint32_t distance);
+                                     const struct GNUNET_TRANSPORT_ATS_Information *atsi);
+
+static int handle_dv_disconnect_message (void *cls,
+                                         const struct GNUNET_PeerIdentity *peer,
+                                         const struct GNUNET_MessageHeader *message,
+                                        const struct GNUNET_TRANSPORT_ATS_Information *atsi);
+/** End forward declarations **/
+
 
 /**
  * List of handlers for the messages understood by this
 
 /**
  * List of handlers for the messages understood by this
@@ -1267,6 +1985,7 @@ static int handle_dv_gossip_message (void *cls,
 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
   {&handle_dv_data_message, GNUNET_MESSAGE_TYPE_DV_DATA, 0},
   {&handle_dv_gossip_message, GNUNET_MESSAGE_TYPE_DV_GOSSIP, 0},
 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
   {&handle_dv_data_message, GNUNET_MESSAGE_TYPE_DV_DATA, 0},
   {&handle_dv_gossip_message, GNUNET_MESSAGE_TYPE_DV_GOSSIP, 0},
+  {&handle_dv_disconnect_message, GNUNET_MESSAGE_TYPE_DV_DISCONNECT, 0},
   {NULL, 0, 0}
 };
 
   {NULL, 0, 0}
 };
 
@@ -1276,6 +1995,139 @@ static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
   {NULL, NULL, 0, 0}
 };
 
   {NULL, NULL, 0, 0}
 };
 
+/**
+ * Free a DistantNeighbor node, including removing it
+ * from the referer's list.
+ */
+static void
+distant_neighbor_free (struct DistantNeighbor *referee)
+{
+  struct DirectNeighbor *referrer;
+
+  referrer = referee->referrer;
+  if (referrer != NULL)
+    {
+      GNUNET_CONTAINER_DLL_remove (referrer->referee_head,
+                         referrer->referee_tail, referee);
+    }
+  GNUNET_CONTAINER_heap_remove_node (referee->max_loc);
+  GNUNET_CONTAINER_heap_remove_node (referee->min_loc);
+  GNUNET_CONTAINER_multihashmap_remove_all (extended_neighbors,
+                                    &referee->identity.hashPubKey);
+  GNUNET_free_non_null (referee->pkey);
+  GNUNET_free (referee);
+}
+
+/**
+ * Free a DirectNeighbor node, including removing it
+ * from the referer's list.
+ */
+static void
+direct_neighbor_free (struct DirectNeighbor *direct)
+{
+  struct NeighborSendContext *send_context;
+  struct FastGossipNeighborList *about_list;
+  struct FastGossipNeighborList *prev_about;
+
+  send_context = direct->send_context;
+
+  if (send_context->task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel(send_context->task);
+
+  about_list = send_context->fast_gossip_list_head;
+  while (about_list != NULL)
+    {
+      GNUNET_CONTAINER_DLL_remove(send_context->fast_gossip_list_head, send_context->fast_gossip_list_tail, about_list);
+      prev_about = about_list;
+      about_list = about_list->next;
+      GNUNET_free(prev_about);
+    }
+  GNUNET_free(send_context);
+  GNUNET_free(direct);
+}
+
+/**
+ * Multihashmap iterator for sending out disconnect messages
+ * for a peer.
+ *
+ * @param cls the peer that was disconnected
+ * @param key key value stored under
+ * @param value the direct neighbor to send disconnect to
+ *
+ * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
+ */
+static int schedule_disconnect_messages (void *cls,
+                                    const GNUNET_HashCode * key,
+                                    void *value)
+{
+  struct DisconnectContext *disconnect_context = cls;
+  struct DirectNeighbor *disconnected = disconnect_context->direct;
+  struct DirectNeighbor *notify = value;
+  struct PendingMessage *pending_message;
+  p2p_dv_MESSAGE_Disconnect *disconnect_message;
+
+  if (memcmp(&notify->identity, &disconnected->identity, sizeof(struct GNUNET_PeerIdentity)) == 0)
+    return GNUNET_YES; /* Don't send disconnect message to peer that disconnected! */
+
+  pending_message = GNUNET_malloc(sizeof(struct PendingMessage) + sizeof(p2p_dv_MESSAGE_Disconnect));
+  pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
+  pending_message->importance = default_dv_priority;
+  pending_message->timeout = GNUNET_TIME_relative_get_forever();
+  memcpy(&pending_message->recipient, &notify->identity, sizeof(struct GNUNET_PeerIdentity));
+  pending_message->msg_size = sizeof(p2p_dv_MESSAGE_Disconnect);
+  disconnect_message = (p2p_dv_MESSAGE_Disconnect *)pending_message->msg;
+  disconnect_message->header.size = htons (sizeof (p2p_dv_MESSAGE_Disconnect));
+  disconnect_message->header.type = htons (GNUNET_MESSAGE_TYPE_DV_DISCONNECT);
+  disconnect_message->peer_id = htonl(disconnect_context->distant->our_id);
+
+  GNUNET_CONTAINER_DLL_insert_after (core_pending_head,
+                                     core_pending_tail,
+                                     core_pending_tail,
+                                     pending_message);
+
+  GNUNET_SCHEDULER_add_now(try_core_send, NULL);
+  /*if (core_transmit_handle == NULL)
+    core_transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, GNUNET_YES, default_dv_priority, GNUNET_TIME_relative_get_forever(), &notify->identity, sizeof(p2p_dv_MESSAGE_Disconnect), &core_transmit_notify, NULL);*/
+
+  return GNUNET_YES;
+}
+
+/**
+ * Multihashmap iterator for freeing extended neighbors.
+ *
+ * @param cls NULL
+ * @param key key value stored under
+ * @param value the distant neighbor to be freed
+ *
+ * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
+ */
+static int free_extended_neighbors (void *cls,
+                                    const GNUNET_HashCode * key,
+                                    void *value)
+{
+  struct DistantNeighbor *distant = value;
+  distant_neighbor_free(distant);
+  return GNUNET_YES;
+}
+
+/**
+ * Multihashmap iterator for freeing direct neighbors.
+ *
+ * @param cls NULL
+ * @param key key value stored under
+ * @param value the direct neighbor to be freed
+ *
+ * @return GNUNET_YES to continue iteration, GNUNET_NO to stop
+ */
+static int free_direct_neighbors (void *cls,
+                                    const GNUNET_HashCode * key,
+                                    void *value)
+{
+  struct DirectNeighbor *direct = value;
+  direct_neighbor_free(direct);
+  return GNUNET_YES;
+}
+
 
 /**
  * Task run during shutdown.
 
 /**
  * Task run during shutdown.
@@ -1289,8 +2141,21 @@ shutdown_task (void *cls,
 {
 #if DEBUG_DV
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "calling CORE_DISCONNECT\n");
 {
 #if DEBUG_DV
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "calling CORE_DISCONNECT\n");
+  GNUNET_CONTAINER_multihashmap_iterate(extended_neighbors, &print_neighbors, NULL);
 #endif
 #endif
+  GNUNET_CONTAINER_multihashmap_iterate(extended_neighbors, &free_extended_neighbors, NULL);
+  GNUNET_CONTAINER_multihashmap_destroy(extended_neighbors);
+  GNUNET_CONTAINER_multihashmap_iterate(direct_neighbors, &free_direct_neighbors, NULL);
+  GNUNET_CONTAINER_multihashmap_destroy(direct_neighbors);
+
+  GNUNET_CONTAINER_heap_destroy(neighbor_max_heap);
+  GNUNET_CONTAINER_heap_destroy(neighbor_min_heap);
+
   GNUNET_CORE_disconnect (coreAPI);
   GNUNET_CORE_disconnect (coreAPI);
+  coreAPI = NULL;
+  GNUNET_PEERINFO_disconnect(peerinfo_handle);
+  GNUNET_SERVER_mst_destroy(coreMST);
+  GNUNET_free_non_null(my_short_id);
 #if DEBUG_DV
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "CORE_DISCONNECT completed\n");
 #endif
 #if DEBUG_DV
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "CORE_DISCONNECT completed\n");
 #endif
@@ -1307,8 +2172,8 @@ void core_init (void *cls,
 
   if (server == NULL)
     {
 
   if (server == NULL)
     {
-      GNUNET_SCHEDULER_cancel(sched, cleanup_task);
-      GNUNET_SCHEDULER_add_now(sched, &shutdown_task, NULL);
+      GNUNET_SCHEDULER_cancel(cleanup_task);
+      GNUNET_SCHEDULER_add_now(&shutdown_task, NULL);
       return;
     }
 #if DEBUG_DV
       return;
     }
 #if DEBUG_DV
@@ -1316,9 +2181,12 @@ void core_init (void *cls,
               "%s: Core connection initialized, I am peer: %s\n", "dv", GNUNET_i2s(identity));
 #endif
   memcpy(&my_identity, identity, sizeof(struct GNUNET_PeerIdentity));
               "%s: Core connection initialized, I am peer: %s\n", "dv", GNUNET_i2s(identity));
 #endif
   memcpy(&my_identity, identity, sizeof(struct GNUNET_PeerIdentity));
+  my_short_id = GNUNET_strdup(GNUNET_i2s(&my_identity));
   coreAPI = server;
 }
 
   coreAPI = server;
 }
 
+
+#if PKEY_NO_NEIGHBOR_ON_ADD
 /**
  * Iterator over hash map entries.
  *
 /**
  * Iterator over hash map entries.
  *
@@ -1331,10 +2199,10 @@ void core_init (void *cls,
  */
 static int add_pkey_to_extended (void *cls,
                                  const GNUNET_HashCode * key,
  */
 static int add_pkey_to_extended (void *cls,
                                  const GNUNET_HashCode * key,
-                                 void *value)
+                                 void *abs_value)
 {
   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *pkey = cls;
 {
   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *pkey = cls;
-  struct DistantNeighbor *distant_neighbor = value;
+  struct DistantNeighbor *distant_neighbor = abs_value;
 
   if (distant_neighbor->pkey == NULL)
   {
 
   if (distant_neighbor->pkey == NULL)
   {
@@ -1344,6 +2212,7 @@ static int add_pkey_to_extended (void *cls,
 
   return GNUNET_YES;
 }
 
   return GNUNET_YES;
 }
+#endif
 
 /**
  * Iterator over hash map entries.
 
 /**
  * Iterator over hash map entries.
@@ -1365,12 +2234,13 @@ static int update_matching_neighbors (void *cls,
   if (update_info->referrer == distant_neighbor->referrer) /* Direct neighbor matches, update it's info and return GNUNET_NO */
   {
     /* same referrer, cost change! */
   if (update_info->referrer == distant_neighbor->referrer) /* Direct neighbor matches, update it's info and return GNUNET_NO */
   {
     /* same referrer, cost change! */
-    GNUNET_CONTAINER_heap_update_cost (ctx.neighbor_max_heap,
+    GNUNET_CONTAINER_heap_update_cost (neighbor_max_heap,
                                        update_info->neighbor->max_loc, update_info->cost);
                                        update_info->neighbor->max_loc, update_info->cost);
-    GNUNET_CONTAINER_heap_update_cost (ctx.neighbor_min_heap,
+    GNUNET_CONTAINER_heap_update_cost (neighbor_min_heap,
                                        update_info->neighbor->min_loc, update_info->cost);
     update_info->neighbor->last_activity = update_info->now;
     update_info->neighbor->cost = update_info->cost;
                                        update_info->neighbor->min_loc, update_info->cost);
     update_info->neighbor->last_activity = update_info->now;
     update_info->neighbor->cost = update_info->cost;
+    update_info->neighbor->referrer_id = update_info->referrer_peer_id;
     return GNUNET_NO;
   }
 
     return GNUNET_NO;
   }
 
@@ -1379,56 +2249,111 @@ static int update_matching_neighbors (void *cls,
 
 
 /**
 
 
 /**
- * Free a DistantNeighbor node, including removing it
- * from the referer's list.
+ * Iterate over all current direct peers, add DISTANT newly connected
+ * peer to the fast gossip list for that peer so we get DV routing
+ * information out as fast as possible!
+ *
+ * @param cls the newly connected neighbor we will gossip about
+ * @param key the hashcode of the peer
+ * @param value the direct neighbor we should gossip to
+ *
+ * @return GNUNET_YES to continue iteration, GNUNET_NO otherwise
  */
  */
-static void
-distant_neighbor_free (struct DistantNeighbor *referee)
+static int add_distant_all_direct_neighbors (void *cls,
+                                     const GNUNET_HashCode * key,
+                                     void *value)
 {
 {
-  struct DirectNeighbor *referrer;
+  struct DirectNeighbor *direct = (struct DirectNeighbor *)value;
+  struct DistantNeighbor *distant = (struct DistantNeighbor *)cls;
+  struct NeighborSendContext *send_context = direct->send_context;
+  struct FastGossipNeighborList *gossip_entry;
+#if DEBUG_DV
+  char *encPeerAbout;
+  char *encPeerTo;
+#endif
 
 
-  referrer = referee->referrer;
-  if (referrer != NULL)
+  if (distant == NULL)
     {
     {
-      GNUNET_CONTAINER_DLL_remove (referrer->referee_head,
-                         referrer->referee_tail, referee);
+      return GNUNET_YES;
     }
     }
-  GNUNET_CONTAINER_heap_remove_node (ctx.neighbor_max_heap, referee->max_loc);
-  GNUNET_CONTAINER_heap_remove_node (ctx.neighbor_min_heap, referee->min_loc);
-  GNUNET_CONTAINER_multihashmap_remove_all (ctx.extended_neighbors,
-                                    &referee->identity.hashPubKey);
-  GNUNET_free (referee);
-}
 
 
+  if (memcmp(&direct->identity, &distant->identity, sizeof(struct GNUNET_PeerIdentity)) == 0)
+    {
+      return GNUNET_YES; /* Don't gossip to a peer about itself! */
+    }
+
+#if SUPPORT_HIDING
+  if (distant->hidden == GNUNET_YES)
+    return GNUNET_YES; /* This peer should not be gossipped about (hidden) */
+#endif
+  gossip_entry = GNUNET_malloc(sizeof(struct FastGossipNeighborList));
+  gossip_entry->about = distant;
+
+  GNUNET_CONTAINER_DLL_insert_after(send_context->fast_gossip_list_head,
+                                    send_context->fast_gossip_list_tail,
+                                    send_context->fast_gossip_list_tail,
+                                    gossip_entry);
+#if DEBUG_DV
+  encPeerAbout = GNUNET_strdup(GNUNET_i2s(&distant->identity));
+  encPeerTo = GNUNET_strdup(GNUNET_i2s(&direct->identity));
+
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Fast send info about peer %s id %u for directly connected peer %s\n",
+             GNUNET_i2s(&my_identity),
+             encPeerAbout, distant->our_id, encPeerTo);
+  GNUNET_free(encPeerAbout);
+  GNUNET_free(encPeerTo);
+#endif
+  /*if (send_context->task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel(send_context->task);*/
+
+  send_context->task = GNUNET_SCHEDULER_add_now(&neighbor_send_task, send_context);
+  return GNUNET_YES;
+}
 
 
-#if DEBUG_DV_GOSSIP
 /**
 /**
- * Iterator over hash map entries.
+ * Callback for hello address creation.
  *
  *
- * @param cls closure (NULL)
- * @param key current key code
- * @param value value in the hash map (DistantNeighbor)
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
+ * @param cls closure, a struct HelloContext
+ * @param max maximum number of bytes that can be written to buf
+ * @param buf where to write the address information
+ *
+ * @return number of bytes written, 0 to signal the
+ *         end of the iteration.
  */
  */
-int print_neighbors (void *cls,
-                     const GNUNET_HashCode * key,
-                     void *value)
+static size_t
+generate_hello_address (void *cls, size_t max, void *buf)
 {
 {
-  struct DistantNeighbor *distant_neighbor = value;
-  char my_shortname[5];
-  char referrer_shortname[5];
-  memcpy(&my_shortname, GNUNET_i2s(&my_identity), 4);
-  my_shortname[4] = '\0';
-  memcpy(&referrer_shortname, GNUNET_i2s(&distant_neighbor->referrer->identity), 4);
-  referrer_shortname[4] = '\0';
+  struct HelloContext *hello_context = cls;
+  char *addr_buffer;
+  size_t offset;
+  size_t size;
+  size_t ret;
 
 
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s' %s: Peer `%s', distance %d, referrer `%s'\n", &my_shortname, "DV", GNUNET_i2s(&distant_neighbor->identity), distant_neighbor->cost, &referrer_shortname);
-  return GNUNET_YES;
+  if (hello_context->addresses_to_add == 0)
+    return 0;
+
+  /* Hello "address" will be concatenation of distant peer and direct peer identities */
+  size = 2 * sizeof(struct GNUNET_PeerIdentity);
+  GNUNET_assert(max >= size);
+
+  addr_buffer = GNUNET_malloc(size);
+  offset = 0;
+  /* Copy the distant peer identity to buffer */
+  memcpy(addr_buffer, &hello_context->distant_peer, sizeof(struct GNUNET_PeerIdentity));
+  offset += sizeof(struct GNUNET_PeerIdentity);
+  /* Copy the direct peer identity to buffer */
+  memcpy(&addr_buffer[offset], hello_context->direct_peer, sizeof(struct GNUNET_PeerIdentity));
+  ret = GNUNET_HELLO_add_address ("dv",
+                                  GNUNET_TIME_relative_to_absolute
+                                  (GNUNET_TIME_UNIT_HOURS), addr_buffer, size,
+                                  buf, max);
+
+  hello_context->addresses_to_add--;
+
+  GNUNET_free(addr_buffer);
+  return ret;
 }
 
 }
 
-#endif
 
 /**
  * Handles when a peer is either added due to being newly connected
 
 /**
  * Handles when a peer is either added due to being newly connected
@@ -1440,9 +2365,13 @@ int print_neighbors (void *cls,
  * @param referrer_peer_id id to use when sending to 'peer'
  * @param referrer if this is a gossiped peer, who did we hear it from?
  * @param cost the cost of communicating with this peer via 'referrer'
  * @param referrer_peer_id id to use when sending to 'peer'
  * @param referrer if this is a gossiped peer, who did we hear it from?
  * @param cost the cost of communicating with this peer via 'referrer'
+ *
+ * @return the added neighbor, the updated neighbor or NULL (neighbor
+ *         not added)
  */
  */
-static void
-addUpdateNeighbor (const struct GNUNET_PeerIdentity * peer, struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *pkey,
+static struct DistantNeighbor *
+addUpdateNeighbor (const struct GNUNET_PeerIdentity * peer,
+                  struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *pkey,
                    unsigned int referrer_peer_id,
                    struct DirectNeighbor *referrer, unsigned int cost)
 {
                    unsigned int referrer_peer_id,
                    struct DirectNeighbor *referrer, unsigned int cost)
 {
@@ -1450,45 +2379,92 @@ addUpdateNeighbor (const struct GNUNET_PeerIdentity * peer, struct GNUNET_CRYPTO
   struct DistantNeighbor *max;
   struct GNUNET_TIME_Absolute now;
   struct NeighborUpdateInfo *neighbor_update;
   struct DistantNeighbor *max;
   struct GNUNET_TIME_Absolute now;
   struct NeighborUpdateInfo *neighbor_update;
+  struct HelloContext *hello_context;
+  struct GNUNET_HELLO_Message *hello_msg;
   unsigned int our_id;
   unsigned int our_id;
+  char *addr1;
+  char *addr2;
+  int i;
 
 
-  now = GNUNET_TIME_absolute_get ();
-  our_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, RAND_MAX - 1) + 1;
+#if DEBUG_DV_PEER_NUMBERS
+  char *encAbout;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%s Received sender id (%u)!\n", "DV SERVICE", referrer_peer_id);
+#endif
 
 
-  neighbor = GNUNET_CONTAINER_multihashmap_get (ctx.extended_neighbors,
+  now = GNUNET_TIME_absolute_get ();
+  neighbor = GNUNET_CONTAINER_multihashmap_get (extended_neighbors,
                                                 &peer->hashPubKey);
   neighbor_update = GNUNET_malloc(sizeof(struct NeighborUpdateInfo));
   neighbor_update->neighbor = neighbor;
   neighbor_update->cost = cost;
   neighbor_update->now = now;
   neighbor_update->referrer = referrer;
                                                 &peer->hashPubKey);
   neighbor_update = GNUNET_malloc(sizeof(struct NeighborUpdateInfo));
   neighbor_update->neighbor = neighbor;
   neighbor_update->cost = cost;
   neighbor_update->now = now;
   neighbor_update->referrer = referrer;
+  neighbor_update->referrer_peer_id = referrer_peer_id;
+
+  if (neighbor != NULL)
+    {
+#if USE_PEER_ID
+      memcpy(&our_id, &neighbor->identity, sizeof(unsigned int));
+#else
+      our_id = neighbor->our_id;
+#endif
+    }
+  else
+    {
+#if USE_PEER_ID
+      memcpy(&our_id, peer, sizeof(unsigned int));
+#else
+      our_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, RAND_MAX - 1) + 1;
+#endif
+    }
 
   /* Either we do not know this peer, or we already do but via a different immediate peer */
   if ((neighbor == NULL) ||
 
   /* Either we do not know this peer, or we already do but via a different immediate peer */
   if ((neighbor == NULL) ||
-      (GNUNET_CONTAINER_multihashmap_get_multiple(ctx.extended_neighbors,
+      (GNUNET_CONTAINER_multihashmap_get_multiple(extended_neighbors,
                                                   &peer->hashPubKey,
                                                   &update_matching_neighbors,
                                                   neighbor_update) != GNUNET_SYSERR))
     {
                                                   &peer->hashPubKey,
                                                   &update_matching_neighbors,
                                                   neighbor_update) != GNUNET_SYSERR))
     {
+
+#if AT_MOST_ONE
+    if ((neighbor != NULL) && (cost < neighbor->cost)) /* New cost is less than old, remove old */
+      {
+        distant_neighbor_free(neighbor);
+      }
+    else if (neighbor != NULL) /* Only allow one DV connection to each peer */
+      {
+        return NULL;
+      }
+#endif
       /* new neighbor! */
       /* new neighbor! */
-      if (cost > ctx.fisheye_depth)
+      if (cost > fisheye_depth)
         {
           /* too costly */
           GNUNET_free(neighbor_update);
         {
           /* too costly */
           GNUNET_free(neighbor_update);
-          return;
+          return NULL;
         }
         }
-      if (ctx.max_table_size <=
-          GNUNET_CONTAINER_multihashmap_size (ctx.extended_neighbors))
+
+#if DEBUG_DV_PEER_NUMBERS
+      encAbout = GNUNET_strdup(GNUNET_i2s(peer));
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "%s: %s Chose NEW id (%u) for peer %s!\n", GNUNET_i2s(&my_identity), "DV SERVICE", our_id, encAbout);
+      GNUNET_free(encAbout);
+#endif
+
+      if (max_table_size <=
+          GNUNET_CONTAINER_multihashmap_size (extended_neighbors))
         {
           /* remove most expensive entry */
         {
           /* remove most expensive entry */
-          max = GNUNET_CONTAINER_heap_peek (ctx.neighbor_max_heap);
+          max = GNUNET_CONTAINER_heap_peek (neighbor_max_heap);
+          GNUNET_assert(max != NULL);
           if (cost > max->cost)
             {
               /* new entry most expensive, don't create */
               GNUNET_free(neighbor_update);
           if (cost > max->cost)
             {
               /* new entry most expensive, don't create */
               GNUNET_free(neighbor_update);
-              return;
+              return NULL;
             }
             }
-          if (max->cost > 0)
+          if (max->cost > 1)
             {
               /* only free if this is not a direct connection;
                  we could theoretically have more direct
             {
               /* only free if this is not a direct connection;
                  we could theoretically have more direct
@@ -1500,9 +2476,9 @@ addUpdateNeighbor (const struct GNUNET_PeerIdentity * peer, struct GNUNET_CRYPTO
       neighbor = GNUNET_malloc (sizeof (struct DistantNeighbor));
       GNUNET_CONTAINER_DLL_insert (referrer->referee_head,
                          referrer->referee_tail, neighbor);
       neighbor = GNUNET_malloc (sizeof (struct DistantNeighbor));
       GNUNET_CONTAINER_DLL_insert (referrer->referee_head,
                          referrer->referee_tail, neighbor);
-      neighbor->max_loc = GNUNET_CONTAINER_heap_insert (ctx.neighbor_max_heap,
+      neighbor->max_loc = GNUNET_CONTAINER_heap_insert (neighbor_max_heap,
                                                         neighbor, cost);
                                                         neighbor, cost);
-      neighbor->min_loc = GNUNET_CONTAINER_heap_insert (ctx.neighbor_min_heap,
+      neighbor->min_loc = GNUNET_CONTAINER_heap_insert (neighbor_min_heap,
                                                         neighbor, cost);
       neighbor->referrer = referrer;
       memcpy (&neighbor->identity, peer, sizeof (struct GNUNET_PeerIdentity));
                                                         neighbor, cost);
       neighbor->referrer = referrer;
       memcpy (&neighbor->identity, peer, sizeof (struct GNUNET_PeerIdentity));
@@ -1519,11 +2495,58 @@ addUpdateNeighbor (const struct GNUNET_PeerIdentity * peer, struct GNUNET_CRYPTO
       neighbor->referrer_id = referrer_peer_id;
       neighbor->our_id = our_id;
       neighbor->hidden =
       neighbor->referrer_id = referrer_peer_id;
       neighbor->our_id = our_id;
       neighbor->hidden =
-        (cost == 0) ? (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 4) ==
+        (cost == DIRECT_NEIGHBOR_COST) ? (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 4) ==
                        0) : GNUNET_NO;
                        0) : GNUNET_NO;
-      GNUNET_CONTAINER_multihashmap_put (ctx.extended_neighbors, &peer->hashPubKey,
+
+      GNUNET_CONTAINER_multihashmap_put (extended_neighbors, &peer->hashPubKey,
                                  neighbor,
                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
                                  neighbor,
                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+      if (referrer_peer_id != 0)
+        {
+          for (i = 0; i< MAX_OUTSTANDING_MESSAGES; i++)
+            {
+              if (referrer->pending_messages[i].sender_id == referrer_peer_id) /* We have a queued message from just learned about peer! */
+                {
+#if DEBUG_DV_MESSAGES
+                  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: learned about peer %llu from which we have a previous unknown message, processing!\n", my_short_id, referrer_peer_id);
+#endif
+                  struct GNUNET_TRANSPORT_ATS_Information atsi[3];
+                  atsi[0].type = htonl (GNUNET_TRANSPORT_ATS_QUALITY_NET_DISTANCE);
+                  atsi[0].value = htonl (referrer->pending_messages[i].distance);
+                  atsi[1].type = htonl (GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY);
+                  atsi[1].value = htonl ((uint32_t)referrer->pending_messages[i].latency.rel_value);
+                  atsi[2].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
+                  atsi[2].value = htonl (0);
+                  handle_dv_data_message(NULL,
+                                         &referrer->pending_messages[i].sender,
+                                        referrer->pending_messages[i].message, 
+                                        (const struct GNUNET_TRANSPORT_ATS_Information *)&atsi);
+                  GNUNET_free(referrer->pending_messages[i].message);
+                  referrer->pending_messages[i].sender_id = 0;
+                }
+            }
+        }
+      if ((cost != DIRECT_NEIGHBOR_COST) && (neighbor->pkey != NULL))
+        {
+          /* Added neighbor, now send HELLO to transport */
+          hello_context = GNUNET_malloc(sizeof(struct HelloContext));
+          hello_context->direct_peer = &referrer->identity;
+          memcpy(&hello_context->distant_peer, peer, sizeof(struct GNUNET_PeerIdentity));
+          hello_context->addresses_to_add = 1;
+          hello_msg = GNUNET_HELLO_create(pkey, &generate_hello_address, hello_context);
+          GNUNET_assert(memcmp(hello_context->direct_peer, &hello_context->distant_peer, sizeof(struct GNUNET_PeerIdentity)) != 0);
+          addr1 = GNUNET_strdup(GNUNET_i2s(hello_context->direct_peer));
+          addr2 = GNUNET_strdup(GNUNET_i2s(&hello_context->distant_peer));
+#if DEBUG_DV
+          GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: GIVING HELLO size %d for %s via %s to TRANSPORT\n", my_short_id, GNUNET_HELLO_size(hello_msg), addr2, addr1);
+#endif
+          GNUNET_free(addr1);
+          GNUNET_free(addr2);
+          send_to_plugin(hello_context->direct_peer, GNUNET_HELLO_get_header(hello_msg), GNUNET_HELLO_size(hello_msg), &hello_context->distant_peer, cost);
+          GNUNET_free(hello_context);
+          GNUNET_free(hello_msg);
+        }
+
     }
   else
     {
     }
   else
     {
@@ -1532,66 +2555,60 @@ addUpdateNeighbor (const struct GNUNET_PeerIdentity * peer, struct GNUNET_CRYPTO
                   "%s: Already know peer %s distance %d, referrer id %d!\n", "dv", GNUNET_i2s(peer), cost, referrer_peer_id);
 #endif
     }
                   "%s: Already know peer %s distance %d, referrer id %d!\n", "dv", GNUNET_i2s(peer), cost, referrer_peer_id);
 #endif
     }
-#if DEBUG_DV_GOSSIP
+#if DEBUG_DV
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%s: Size of extended_neighbors is %d\n", "dv", GNUNET_CONTAINER_multihashmap_size(ctx.extended_neighbors));
-    GNUNET_CONTAINER_multihashmap_iterate(ctx.extended_neighbors, &print_neighbors, NULL);
+                "%s: Size of extended_neighbors is %d\n", "dv", GNUNET_CONTAINER_multihashmap_size(extended_neighbors));
 #endif
 #endif
+
   GNUNET_free(neighbor_update);
   GNUNET_free(neighbor_update);
-  /* Old logic to remove entry and replace, not needed now as we only want to remove when full
-   * or when the referring peer disconnects from us.
-   *
-   * FIXME: add new functionality, or check if it already exists (i forget)
-   */
-  /*
-  GNUNET_DLL_remove (neighbor->referrer->referee_head,
-                     neighbor->referrer->referee_tail, neighbor);
-  neighbor->referrer = referrer;
-  GNUNET_DLL_insert (referrer->referee_head,
-                     referrer->referee_tail, neighbor);
-  GNUNET_CONTAINER_heap_update_cost (ctx.neighbor_max_heap,
-                                     neighbor->max_loc, cost);
-  GNUNET_CONTAINER_heap_update_cost (ctx.neighbor_min_heap,
-                                     neighbor->min_loc, cost);
-  neighbor->referrer_id = referrer_peer_id;
-  neighbor->last_activity = now;
-  neighbor->cost = cost;
-  */
+  return neighbor;
 }
 
 
 }
 
 
-static size_t
-generate_hello_address (void *cls, size_t max, void *buf)
+/**
+ * Core handler for dv disconnect messages.  These will be used
+ * by us to tell transport via the dv plugin that a peer can
+ * no longer be contacted by us via a certain address.  We should
+ * then propagate these messages on, given that the distance to
+ * the peer indicates we would have gossiped about it to others.
+ *
+ * @param cls closure
+ * @param peer peer which sent the message (immediate sender)
+ * @param message the message
+ * @param atsi performance data
+ */
+static int handle_dv_disconnect_message (void *cls,
+                                         const struct GNUNET_PeerIdentity *peer,
+                                         const struct GNUNET_MessageHeader *message,
+                                        const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
 {
-  struct HelloContext *hello_context = cls;
-  char *addr_buffer;
-  size_t offset;
-  size_t size;
-  size_t ret;
-
-  if (hello_context->addresses_to_add == 0)
-    return 0;
+  struct DirectNeighbor *referrer;
+  struct DistantNeighbor *distant;
+  p2p_dv_MESSAGE_Disconnect *enc_message = (p2p_dv_MESSAGE_Disconnect *)message;
 
 
-  /* Hello "address" will be concatenation of distant peer and direct peer identities */
-  size = 2 * sizeof(struct GNUNET_PeerIdentity);
-  GNUNET_assert(max >= size);
+  if (ntohs (message->size) < sizeof (p2p_dv_MESSAGE_Disconnect))
+    {
+      return GNUNET_SYSERR;     /* invalid message */
+    }
 
 
-  addr_buffer = GNUNET_malloc(size);
-  offset = 0;
-  /* Copy the distant peer identity to buffer */
-  memcpy(addr_buffer, &hello_context->distant_peer, sizeof(struct GNUNET_PeerIdentity));
-  offset += sizeof(struct GNUNET_PeerIdentity);
-  /* Copy the direct peer identity to buffer */
-  memcpy(&addr_buffer[offset], hello_context->direct_peer, sizeof(struct GNUNET_PeerIdentity));
-  ret = GNUNET_HELLO_add_address ("dv",
-                                  GNUNET_TIME_relative_to_absolute
-                                  (GNUNET_TIME_UNIT_HOURS), addr_buffer, size,
-                                  buf, max);
+  referrer = GNUNET_CONTAINER_multihashmap_get (direct_neighbors,
+                                                &peer->hashPubKey);
+  if (referrer == NULL)
+    return GNUNET_OK;
 
 
-  hello_context->addresses_to_add--;
+  distant = referrer->referee_head;
+  while (distant != NULL)
+    {
+      if (distant->referrer_id == ntohl(enc_message->peer_id))
+        {
+          distant_neighbor_free(distant);
+          distant = referrer->referee_head;
+        }
+      else
+        distant = distant->next;
+    }
 
 
-  GNUNET_free(addr_buffer);
-  return ret;
+  return GNUNET_OK;
 }
 
 
 }
 
 
@@ -1605,17 +2622,14 @@ generate_hello_address (void *cls, size_t max, void *buf)
  * @param cls closure
  * @param peer peer which sent the message (immediate sender)
  * @param message the message
  * @param cls closure
  * @param peer peer which sent the message (immediate sender)
  * @param message the message
- * @param latency the latency of the connection we received the message from
- * @param distance the distance to the immediate peer
+ * @param atsi performance data
  */
  */
-static int handle_dv_gossip_message (void *cls,
-                                     const struct GNUNET_PeerIdentity *peer,
-                                     const struct GNUNET_MessageHeader *message,
-                                     struct GNUNET_TIME_Relative latency,
-                                     uint32_t distance)
+static int 
+handle_dv_gossip_message (void *cls,
+                         const struct GNUNET_PeerIdentity *peer,
+                         const struct GNUNET_MessageHeader *message,
+                         const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
 {
-  struct HelloContext *hello_context;
-  struct GNUNET_HELLO_Message *hello_msg;
   struct DirectNeighbor *referrer;
   p2p_dv_MESSAGE_NeighborInfo *enc_message = (p2p_dv_MESSAGE_NeighborInfo *)message;
 
   struct DirectNeighbor *referrer;
   p2p_dv_MESSAGE_NeighborInfo *enc_message = (p2p_dv_MESSAGE_NeighborInfo *)message;
 
@@ -1624,19 +2638,19 @@ static int handle_dv_gossip_message (void *cls,
       return GNUNET_SYSERR;     /* invalid message */
     }
 
       return GNUNET_SYSERR;     /* invalid message */
     }
 
-#if DEBUG_DV_GOSSIP
+#if DEBUG_DV_GOSSIP_RECEIPT
   char * encPeerAbout;
   char * encPeerFrom;
 
   encPeerAbout = GNUNET_strdup(GNUNET_i2s(&enc_message->neighbor));
   encPeerFrom = GNUNET_strdup(GNUNET_i2s(peer));
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   char * encPeerAbout;
   char * encPeerFrom;
 
   encPeerAbout = GNUNET_strdup(GNUNET_i2s(&enc_message->neighbor));
   encPeerFrom = GNUNET_strdup(GNUNET_i2s(peer));
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Receives %s message from peer %s about peer %s!\n", "dv", "DV GOSSIP", encPeerFrom, encPeerAbout);
+              "%s: Received %s message from peer %s about peer %s id %u distance %d!\n", GNUNET_i2s(&my_identity), "DV GOSSIP", encPeerFrom, encPeerAbout, ntohl(enc_message->neighbor_id), ntohl (enc_message->cost) + 1);
   GNUNET_free(encPeerAbout);
   GNUNET_free(encPeerFrom);
 #endif
 
   GNUNET_free(encPeerAbout);
   GNUNET_free(encPeerFrom);
 #endif
 
-  referrer = GNUNET_CONTAINER_multihashmap_get (ctx.direct_neighbors,
+  referrer = GNUNET_CONTAINER_multihashmap_get (direct_neighbors,
                                                 &peer->hashPubKey);
   if (referrer == NULL)
     return GNUNET_OK;
                                                 &peer->hashPubKey);
   if (referrer == NULL)
     return GNUNET_OK;
@@ -1645,34 +2659,198 @@ static int handle_dv_gossip_message (void *cls,
                      ntohl (enc_message->neighbor_id),
                      referrer, ntohl (enc_message->cost) + 1);
 
                      ntohl (enc_message->neighbor_id),
                      referrer, ntohl (enc_message->cost) + 1);
 
-  hello_context = GNUNET_malloc(sizeof(struct HelloContext));
-  hello_context->direct_peer = peer;
-  memcpy(&hello_context->distant_peer, &enc_message->neighbor, sizeof(struct GNUNET_PeerIdentity));
-  hello_context->addresses_to_add = 1;
-  hello_msg = GNUNET_HELLO_create(&enc_message->pkey, &generate_hello_address, hello_context);
-#if DEBUG_DV_GOSSIP
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Sending %s message to plugin, type is %d, size %d!\n", "dv", "HELLO", ntohs(hello_hdr->type), ntohs(hello_hdr->size));
+  return GNUNET_OK;
+}
+
+
+/**
+ * Iterate over all currently known peers, add them to the
+ * fast gossip list for this peer so we get DV routing information
+ * out as fast as possible!
+ *
+ * @param cls the direct neighbor we will gossip to
+ * @param key the hashcode of the peer
+ * @param value the distant neighbor we should add to the list
+ *
+ * @return GNUNET_YES to continue iteration, GNUNET_NO otherwise
+ */
+static int 
+add_all_extended_peers (void *cls,
+                       const GNUNET_HashCode * key,
+                       void *value)
+{
+  struct NeighborSendContext *send_context = (struct NeighborSendContext *)cls;
+  struct DistantNeighbor *distant = (struct DistantNeighbor *)value;
+  struct FastGossipNeighborList *gossip_entry;
+
+  if (memcmp(&send_context->toNeighbor->identity, &distant->identity, sizeof(struct GNUNET_PeerIdentity)) == 0)
+    return GNUNET_YES; /* Don't gossip to a peer about itself! */
+
+#if SUPPORT_HIDING
+  if (distant->hidden == GNUNET_YES)
+    return GNUNET_YES; /* This peer should not be gossipped about (hidden) */
 #endif
 #endif
+  gossip_entry = GNUNET_malloc(sizeof(struct FastGossipNeighborList));
+  gossip_entry->about = distant;
 
 
-  send_to_plugin(hello_context->direct_peer, GNUNET_HELLO_get_header(hello_msg), GNUNET_HELLO_size(hello_msg), &hello_context->distant_peer, ntohl(enc_message->cost) + 1);
-  GNUNET_free(hello_context);
-  GNUNET_free(hello_msg);
-  return GNUNET_OK;
+  GNUNET_CONTAINER_DLL_insert_after(send_context->fast_gossip_list_head,
+                                    send_context->fast_gossip_list_tail,
+                                    send_context->fast_gossip_list_tail,
+                                    gossip_entry);
+
+  return GNUNET_YES;
 }
 
 }
 
+#if INSANE_GOSSIP
+/**
+ * Iterator over hash map entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return GNUNET_YES if we should continue to
+ *         iterate,
+ *         GNUNET_NO if not.
+ */
+static int 
+gossip_all_to_all_iterator (void *cls,
+                           const GNUNET_HashCode * key,
+                           void *abs_value)
+{
+  struct DirectNeighbor *direct = abs_value;
+
+  GNUNET_CONTAINER_multihashmap_iterate (extended_neighbors, &add_all_extended_peers, direct->send_context);
+
+  if (direct->send_context->task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel(direct->send_context->task);
+
+  direct->send_context->task = GNUNET_SCHEDULER_add_now(&neighbor_send_task, direct->send_context);
+  return GNUNET_YES;
+}
+
+/**
+ * Task run during shutdown.
+ *
+ * @param cls unused
+ * @param tc unused
+ */
+static void
+gossip_all_to_all (void *cls,
+                   const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  GNUNET_CONTAINER_multihashmap_iterate (direct_neighbors, &gossip_all_to_all_iterator, NULL);
+
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5),
+                                &gossip_all_to_all,
+                                NULL);
+
+}
+#endif
+/**
+ * Iterate over all current direct peers, add newly connected peer
+ * to the fast gossip list for that peer so we get DV routing
+ * information out as fast as possible!
+ *
+ * @param cls the newly connected neighbor we will gossip about
+ * @param key the hashcode of the peer
+ * @param value the direct neighbor we should gossip to
+ *
+ * @return GNUNET_YES to continue iteration, GNUNET_NO otherwise
+ */
+static int 
+add_all_direct_neighbors (void *cls,
+                         const GNUNET_HashCode * key,
+                         void *value)
+{
+  struct DirectNeighbor *direct = (struct DirectNeighbor *)value;
+  struct DirectNeighbor *to = (struct DirectNeighbor *)cls;
+  struct DistantNeighbor *distant;
+  struct NeighborSendContext *send_context = direct->send_context;
+  struct FastGossipNeighborList *gossip_entry;
+  char *direct_id;
+
+
+  distant = GNUNET_CONTAINER_multihashmap_get(extended_neighbors, &to->identity.hashPubKey);
+  if (distant == NULL)
+    {
+      return GNUNET_YES;
+    }
+
+  if (memcmp(&direct->identity, &to->identity, sizeof(struct GNUNET_PeerIdentity)) == 0)
+    {
+      return GNUNET_YES; /* Don't gossip to a peer about itself! */
+    }
+
+#if SUPPORT_HIDING
+  if (distant->hidden == GNUNET_YES)
+    return GNUNET_YES; /* This peer should not be gossipped about (hidden) */
+#endif
+  direct_id = GNUNET_strdup(GNUNET_i2s(&direct->identity));
+#if DEBUG_DV_GOSSIP
+  GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: adding peer %s to fast send list for %s\n", my_short_id, GNUNET_i2s(&distant->identity), direct_id);
+#endif
+  GNUNET_free(direct_id);
+  gossip_entry = GNUNET_malloc(sizeof(struct FastGossipNeighborList));
+  gossip_entry->about = distant;
+
+  GNUNET_CONTAINER_DLL_insert_after(send_context->fast_gossip_list_head,
+                                    send_context->fast_gossip_list_tail,
+                                    send_context->fast_gossip_list_tail,
+                                    gossip_entry);
+  if (send_context->task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel(send_context->task);
+
+  send_context->task = GNUNET_SCHEDULER_add_now(&neighbor_send_task, send_context);
+  //tc.reason = GNUNET_SCHEDULER_REASON_TIMEOUT;
+  //neighbor_send_task(send_context, &tc);
+  return GNUNET_YES;
+}
+
+/**
+ * Type of an iterator over the hosts.  Note that each
+ * host will be called with each available protocol.
+ *
+ * @param cls closure
+ * @param peer id of the peer, NULL for last call
+ * @param hello hello message for the peer (can be NULL)
+ * @param err_msg NULL if successful, otherwise contains error message
+ */
 static void
 process_peerinfo (void *cls,
 static void
 process_peerinfo (void *cls,
-         const struct GNUNET_PeerIdentity *peer,
-         const struct GNUNET_HELLO_Message *hello, uint32_t trust)
+                  const struct GNUNET_PeerIdentity *peer,
+                  const struct GNUNET_HELLO_Message *hello,
+                  const char *err_msg)
 {
   struct PeerIteratorContext *peerinfo_iterator = cls;
   struct DirectNeighbor *neighbor = peerinfo_iterator->neighbor;
 {
   struct PeerIteratorContext *peerinfo_iterator = cls;
   struct DirectNeighbor *neighbor = peerinfo_iterator->neighbor;
-
-  if ((peer == NULL))/* && (neighbor->pkey == NULL))*/
+  struct DistantNeighbor *distant = peerinfo_iterator->distant;
+#if DEBUG_DV_PEER_NUMBERS
+  char *neighbor_pid;
+#endif
+  int sent;
+  if (err_msg != NULL)
+  {
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     _("Error in communication with PEERINFO service\n"));
+         /* return; */
+  }
+  if (peer == NULL)
     {
     {
-      /* FIXME: Remove peer! */
-      GNUNET_free(peerinfo_iterator);
+      if (distant->pkey == NULL)
+        {
+#if DEBUG_DV
+          GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to get peerinfo information for this peer, retrying!\n");
+#endif
+          peerinfo_iterator->ic = GNUNET_PEERINFO_iterate(peerinfo_handle,
+                                                          &peerinfo_iterator->neighbor->identity,
+                                                          GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3),
+                                                          &process_peerinfo,
+                                                          peerinfo_iterator);
+        }
+      else
+        {
+          GNUNET_free(peerinfo_iterator);
+        }
       return;
     }
 
       return;
     }
 
@@ -1681,65 +2859,108 @@ process_peerinfo (void *cls,
 
   if ((hello != NULL) && (GNUNET_HELLO_get_key (hello, &neighbor->pkey) == GNUNET_OK))
     {
 
   if ((hello != NULL) && (GNUNET_HELLO_get_key (hello, &neighbor->pkey) == GNUNET_OK))
     {
-      GNUNET_CONTAINER_multihashmap_get_multiple(ctx.extended_neighbors,
-                                                 &peer->hashPubKey,
-                                                 &add_pkey_to_extended,
-                                                 &neighbor->pkey);
-      neighbor->send_context->task = GNUNET_SCHEDULER_add_now(sched, &neighbor_send_task, neighbor->send_context);
+      if (distant->pkey == NULL)
+        {
+          distant->pkey = GNUNET_malloc(sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+          memcpy(distant->pkey, &neighbor->pkey, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+        }
+
+      sent = GNUNET_CONTAINER_multihashmap_iterate (extended_neighbors, &add_all_extended_peers, neighbor->send_context);
+      if (stats != NULL)
+        {
+          GNUNET_STATISTICS_update (stats, "# distant peers gossiped to direct neighbors", sent, GNUNET_NO);
+        }
+#if DEBUG_DV_PEER_NUMBERS
+      neighbor_pid = GNUNET_strdup(GNUNET_i2s(&neighbor->identity));
+      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Gossipped %d extended peers to %s\n", GNUNET_i2s(&my_identity), sent, neighbor_pid);
+#endif
+      sent = GNUNET_CONTAINER_multihashmap_iterate (direct_neighbors, &add_all_direct_neighbors, neighbor);
+      if (stats != NULL)
+        {
+          GNUNET_STATISTICS_update (stats, "# direct peers gossiped to direct neighbors", sent, GNUNET_NO);
+        }
+#if DEBUG_DV_PEER_NUMBERS
+      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: Gossipped about %s to %d direct peers\n", GNUNET_i2s(&my_identity), neighbor_pid, sent);
+      GNUNET_free(neighbor_pid);
+#endif
+      neighbor->send_context->task = GNUNET_SCHEDULER_add_now(&neighbor_send_task, neighbor->send_context);
     }
 }
 
     }
 }
 
+
 /**
  * Method called whenever a peer connects.
  *
  * @param cls closure
  * @param peer peer identity this notification is about
 /**
  * Method called whenever a peer connects.
  *
  * @param cls closure
  * @param peer peer identity this notification is about
- * @param latency reported latency of the connection with peer
- * @param distance reported distance (DV) to peer
+ * @param atsi performance data
  */
  */
-void handle_core_connect (void *cls,
-                          const struct GNUNET_PeerIdentity * peer,
-                          struct GNUNET_TIME_Relative latency,
-                          uint32_t distance)
+static void 
+handle_core_connect (void *cls,
+                    const struct GNUNET_PeerIdentity * peer,
+                    const struct GNUNET_TRANSPORT_ATS_Information *atsi)
 {
   struct DirectNeighbor *neighbor;
 {
   struct DirectNeighbor *neighbor;
+  struct DistantNeighbor *about;
   struct PeerIteratorContext *peerinfo_iterator;
   struct PeerIteratorContext *peerinfo_iterator;
+  int sent;
 #if DEBUG_DV
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%s: Receives core connect message for peer %s distance %d!\n", "dv", GNUNET_i2s(peer), distance);
 #endif
 #if DEBUG_DV
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%s: Receives core connect message for peer %s distance %d!\n", "dv", GNUNET_i2s(peer), distance);
 #endif
+  uint32_t distance;
 
 
-  if ((distance == 0) && (GNUNET_CONTAINER_multihashmap_get(ctx.direct_neighbors, &peer->hashPubKey) == NULL))
+  /* Check for connect to self message */
+  if (0 == memcmp(&my_identity, peer, sizeof(struct GNUNET_PeerIdentity)))
+    return;
+
+  distance = get_atsi_distance (atsi);
+  if ((distance == DIRECT_NEIGHBOR_COST) &&
+      (GNUNET_CONTAINER_multihashmap_get(direct_neighbors, &peer->hashPubKey) == NULL))
   {
     peerinfo_iterator = GNUNET_malloc(sizeof(struct PeerIteratorContext));
     neighbor = GNUNET_malloc (sizeof (struct DirectNeighbor));
     neighbor->send_context = GNUNET_malloc(sizeof(struct NeighborSendContext));
     neighbor->send_context->toNeighbor = neighbor;
   {
     peerinfo_iterator = GNUNET_malloc(sizeof(struct PeerIteratorContext));
     neighbor = GNUNET_malloc (sizeof (struct DirectNeighbor));
     neighbor->send_context = GNUNET_malloc(sizeof(struct NeighborSendContext));
     neighbor->send_context->toNeighbor = neighbor;
-    neighbor->send_context->timeout = default_dv_delay; /* FIXME: base this on total gossip tasks, or bandwidth */
     memcpy (&neighbor->identity, peer, sizeof (struct GNUNET_PeerIdentity));
     memcpy (&neighbor->identity, peer, sizeof (struct GNUNET_PeerIdentity));
-    /*memcpy (&neighbor->pkey, ,sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));*/
-    GNUNET_CONTAINER_multihashmap_put (ctx.direct_neighbors,
+
+    GNUNET_assert(GNUNET_SYSERR != GNUNET_CONTAINER_multihashmap_put (direct_neighbors,
                                &peer->hashPubKey,
                                &peer->hashPubKey,
-                               neighbor, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-    addUpdateNeighbor (peer, NULL, 0, neighbor, 0);
+                               neighbor, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    about = addUpdateNeighbor (peer, NULL, 0, neighbor, DIRECT_NEIGHBOR_COST);
+    peerinfo_iterator->distant = about;
     peerinfo_iterator->neighbor = neighbor;
     peerinfo_iterator->neighbor = neighbor;
-    peerinfo_iterator->ic = GNUNET_PEERINFO_iterate (cfg,
-                                            sched,
-                                            peer,
-                                            0,
-                                            GNUNET_TIME_relative_multiply
-                                            (GNUNET_TIME_UNIT_SECONDS, 15),
-                                            &process_peerinfo, peerinfo_iterator);
-    /* Only add once we get the publicKey of this guy
-     *
-     * neighbor->send_context->task = GNUNET_SCHEDULER_add_now(sched, &neighbor_send_task, neighbor->send_context);
-     */
+    peerinfo_iterator->ic = GNUNET_PEERINFO_iterate (peerinfo_handle,
+                                                     peer,
+                                                     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3),
+                                                     &process_peerinfo,
+                                                     peerinfo_iterator);
+
+    if ((about != NULL) && (about->pkey == NULL))
+      {
+#if DEBUG_DV
+        GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Newly added peer %s has NULL pkey!\n", GNUNET_i2s(peer));
+#endif
+      }
+    else if (about != NULL)
+      {
+        GNUNET_free(peerinfo_iterator);
+      }
   }
   else
   {
   }
   else
   {
+    about = GNUNET_CONTAINER_multihashmap_get(extended_neighbors, &peer->hashPubKey);
+    if ((GNUNET_CONTAINER_multihashmap_get(direct_neighbors, &peer->hashPubKey) == NULL) && (about != NULL))
+      {
+        sent = GNUNET_CONTAINER_multihashmap_iterate(direct_neighbors, &add_distant_all_direct_neighbors, about);
+        if (stats != NULL)
+          {
+            GNUNET_STATISTICS_update (stats, "# direct peers gossiped to new direct neighbors", sent, GNUNET_NO);
+          }
+      }
 #if DEBUG_DV
 #if DEBUG_DV
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%s: Distance (%d) greater than 0 or already know about peer (%s), not re-adding!\n", "dv", distance, GNUNET_i2s(peer));
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%s: Distance (%d) greater than %d or already know about peer (%s), not re-adding!\n", "dv", distance, DIRECT_NEIGHBOR_COST, GNUNET_i2s(peer));
 #endif
     return;
   }
 #endif
     return;
   }
@@ -1756,25 +2977,63 @@ void handle_core_disconnect (void *cls,
 {
   struct DirectNeighbor *neighbor;
   struct DistantNeighbor *referee;
 {
   struct DirectNeighbor *neighbor;
   struct DistantNeighbor *referee;
+  struct FindDestinationContext fdc;
+  struct DisconnectContext disconnect_context;
+  struct PendingMessage *pending_pos;
 
 #if DEBUG_DV
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%s: Receives core peer disconnect message!\n", "dv");
 #endif
 
 
 #if DEBUG_DV
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%s: Receives core peer disconnect message!\n", "dv");
 #endif
 
+  /* Check for disconnect from self message */
+  if (0 == memcmp(&my_identity, peer, sizeof(struct GNUNET_PeerIdentity)))
+    return;
+
   neighbor =
   neighbor =
-    GNUNET_CONTAINER_multihashmap_get (ctx.direct_neighbors, &peer->hashPubKey);
+    GNUNET_CONTAINER_multihashmap_get (direct_neighbors, &peer->hashPubKey);
+
   if (neighbor == NULL)
     {
       return;
     }
   if (neighbor == NULL)
     {
       return;
     }
+
+  pending_pos = core_pending_head;
+  while (NULL != pending_pos)
+    {
+      if (0 == memcmp(&pending_pos->recipient, &neighbor->identity, sizeof(struct GNUNET_PeerIdentity)))
+        {
+          GNUNET_CONTAINER_DLL_remove(core_pending_head, core_pending_tail, pending_pos);
+          pending_pos = core_pending_head;
+        }
+      else
+        pending_pos = pending_pos->next;
+    }
+
   while (NULL != (referee = neighbor->referee_head))
     distant_neighbor_free (referee);
   while (NULL != (referee = neighbor->referee_head))
     distant_neighbor_free (referee);
+
+  fdc.dest = NULL;
+  fdc.tid = 0;
+
+  GNUNET_CONTAINER_multihashmap_iterate (extended_neighbors, &find_distant_peer, &fdc);
+
+  if (fdc.dest != NULL)
+    {
+      disconnect_context.direct = neighbor;
+      disconnect_context.distant = fdc.dest;
+      GNUNET_CONTAINER_multihashmap_iterate (direct_neighbors, &schedule_disconnect_messages, &disconnect_context);
+    }
+
   GNUNET_assert (neighbor->referee_tail == NULL);
   GNUNET_assert (neighbor->referee_tail == NULL);
-  GNUNET_CONTAINER_multihashmap_remove (ctx.direct_neighbors,
-                                &peer->hashPubKey, neighbor);
-  if ((neighbor->send_context != NULL) && (neighbor->send_context->task != GNUNET_SCHEDULER_NO_TASK))
-    GNUNET_SCHEDULER_cancel(sched, neighbor->send_context->task);
+  if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (direct_neighbors,
+                                                         &peer->hashPubKey, neighbor))
+    {
+      GNUNET_break(0);
+    }
+  if ((neighbor->send_context != NULL) &&
+      (neighbor->send_context->task != GNUNET_SCHEDULER_NO_TASK))
+    GNUNET_SCHEDULER_cancel(neighbor->send_context->task);
   GNUNET_free (neighbor);
 }
 
   GNUNET_free (neighbor);
 }
 
@@ -1783,47 +3042,50 @@ void handle_core_disconnect (void *cls,
  * Process dv requests.
  *
  * @param cls closure
  * Process dv requests.
  *
  * @param cls closure
- * @param scheduler scheduler to use
  * @param server the initialized server
  * @param c configuration to use
  */
 static void
 run (void *cls,
  * @param server the initialized server
  * @param c configuration to use
  */
 static void
 run (void *cls,
-     struct GNUNET_SCHEDULER_Handle *scheduler,
      struct GNUNET_SERVER_Handle *server,
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
      struct GNUNET_SERVER_Handle *server,
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
-  struct GNUNET_TIME_Relative timeout;
   unsigned long long max_hosts;
   unsigned long long max_hosts;
-  timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
-  sched = scheduler;
   cfg = c;
 
   /* FIXME: Read from config, or calculate, or something other than this! */
   cfg = c;
 
   /* FIXME: Read from config, or calculate, or something other than this! */
-  max_hosts = 50;
-  ctx.max_table_size = 100;
-  ctx.fisheye_depth = 3;
+  max_hosts = DEFAULT_DIRECT_CONNECTIONS;
+  max_table_size = DEFAULT_DV_SIZE;
+  fisheye_depth = DEFAULT_FISHEYE_DEPTH;
+
+  if (GNUNET_CONFIGURATION_have_value(cfg, "dv", "max_direct_connections"))
+    GNUNET_assert(GNUNET_OK == GNUNET_CONFIGURATION_get_value_number(cfg, "dv", "max_direct_connections", &max_hosts));
 
 
-  ctx.neighbor_min_heap =
+  if (GNUNET_CONFIGURATION_have_value(cfg, "dv", "max_total_connections"))
+    GNUNET_assert(GNUNET_OK == GNUNET_CONFIGURATION_get_value_number(cfg, "dv", "max_total_connections", &max_table_size));
+
+
+  if (GNUNET_CONFIGURATION_have_value(cfg, "dv", "fisheye_depth"))
+    GNUNET_assert(GNUNET_OK == GNUNET_CONFIGURATION_get_value_number(cfg, "dv", "fisheye_depth", &fisheye_depth));
+
+  neighbor_min_heap =
     GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
     GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
-  ctx.neighbor_max_heap =
+  neighbor_max_heap =
     GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
 
     GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
 
-  ctx.direct_neighbors = GNUNET_CONTAINER_multihashmap_create (max_hosts);
-  ctx.extended_neighbors =
-    GNUNET_CONTAINER_multihashmap_create (ctx.max_table_size * 3);
+  direct_neighbors = GNUNET_CONTAINER_multihashmap_create (max_hosts);
+  extended_neighbors =
+    GNUNET_CONTAINER_multihashmap_create (max_table_size * 3);
 
 
-  client_transmit_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
-  default_dv_delay = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
   GNUNET_SERVER_add_handlers (server, plugin_handlers);
   coreAPI =
   GNUNET_SERVER_add_handlers (server, plugin_handlers);
   coreAPI =
-  GNUNET_CORE_connect (sched,
-                       cfg,
-                       timeout,
+  GNUNET_CORE_connect (cfg,
+                      1,
                        NULL, /* FIXME: anything we want to pass around? */
                        &core_init,
                        &handle_core_connect,
                        &handle_core_disconnect,
                        NULL, /* FIXME: anything we want to pass around? */
                        &core_init,
                        &handle_core_connect,
                        &handle_core_disconnect,
+                      NULL,
                        NULL,
                        GNUNET_NO,
                        NULL,
                        NULL,
                        GNUNET_NO,
                        NULL,
@@ -1832,13 +3094,22 @@ run (void *cls,
 
   if (coreAPI == NULL)
     return;
 
   if (coreAPI == NULL)
     return;
-  /* load (server); Huh? */
+
+  coreMST = GNUNET_SERVER_mst_create (&tokenized_message_handler,
+                                      NULL);
+
+   peerinfo_handle = GNUNET_PEERINFO_connect(cfg);
+
+   if (peerinfo_handle == NULL)
+     {
+       GNUNET_CORE_disconnect(coreAPI);
+       return;
+     }
 
   /* Scheduled the task to clean up when shutdown is called */
 
   /* Scheduled the task to clean up when shutdown is called */
-  cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
-                                GNUNET_TIME_UNIT_FOREVER_REL,
-                                &shutdown_task,
-                                NULL);
+  cleanup_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+                                               &shutdown_task,
+                                               NULL);
 }
 
 
 }