check
[oweals/gnunet.git] / src / core / gnunet-service-core.c
index f6b96a13e890b71be91b1cd114de96516934db47..f54e4a7a0434537530cbec2904c460ccb6c52ec6 100644 (file)
  */
 enum PeerStateMachine
 {
+  /**
+   * No handshake yet.
+   */
   PEER_STATE_DOWN,
+
+  /**
+   * We've sent our session key.
+   */
   PEER_STATE_KEY_SENT,
+  
+  /**
+   * We've received the other peers session key.
+   */
   PEER_STATE_KEY_RECEIVED,
+
+  /**
+   * The other peer has confirmed our session key with a message
+   * encrypted with his session key (which we got).  Session is now fully up.
+   */
   PEER_STATE_KEY_CONFIRMED
 };
 
@@ -402,10 +418,6 @@ struct ClientActiveRequest;
  */
 struct Neighbour
 {
-  /**
-   * We keep neighbours in a linked list (for now).
-   */
-  struct Neighbour *next;
 
   /**
    * Unencrypted messages destined for this peer.
@@ -769,9 +781,14 @@ static struct Client *clients;
 static struct GNUNET_SERVER_NotificationContext *notifier;
 
 /**
- * We keep neighbours in a linked list (for now).
+ * Map of peer identities to 'struct Neighbour'.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
+
+/**
+ * Neighbour entry for "this" peer.
  */
-static struct Neighbour *neighbours;
+static struct Neighbour self;
 
 /**
  * For creating statistics.
@@ -783,14 +800,8 @@ static struct GNUNET_STATISTICS_Handle *stats;
  */
 static unsigned long long preference_sum;
 
-/**
- * Total number of neighbours we have.
- */
-static unsigned int neighbour_count;
-
 /**
  * How much inbound bandwidth are we supposed to be using per second?
- * FIXME: this value is not used!
  */
 static unsigned long long bandwidth_target_in_bps;
 
@@ -884,6 +895,23 @@ get_neighbour_timeout (struct Neighbour *n)
 }
 
 
+/**
+ * Helper function for update_preference_sum.
+ */
+static int
+update_preference (void *cls,
+                  const GNUNET_HashCode *key,
+                  void *value)
+{
+  unsigned long long *ps = cls;
+  struct Neighbour *n = value;
+
+  n->current_preference /= 2;
+  *ps += n->current_preference;
+  return GNUNET_OK;
+}    
+
+
 /**
  * A preference value for a neighbour was update.  Update
  * the preference sum accordingly.
@@ -893,7 +921,6 @@ get_neighbour_timeout (struct Neighbour *n)
 static void
 update_preference_sum (unsigned long long inc)
 {
-  struct Neighbour *n;
   unsigned long long os;
 
   os = preference_sum;
@@ -902,13 +929,9 @@ update_preference_sum (unsigned long long inc)
     return; /* done! */
   /* overflow! compensate by cutting all values in half! */
   preference_sum = 0;
-  n = neighbours;
-  while (n != NULL)
-    {
-      n->current_preference /= 2;
-      preference_sum += n->current_preference;
-      n = n->next;
-    }    
+  GNUNET_CONTAINER_multihashmap_iterate (neighbours,
+                                        &update_preference,
+                                        &preference_sum);
   GNUNET_STATISTICS_set (stats, gettext_noop ("# total peer preference"), preference_sum, GNUNET_NO);
 }
 
@@ -923,14 +946,7 @@ update_preference_sum (unsigned long long inc)
 static struct Neighbour *
 find_neighbour (const struct GNUNET_PeerIdentity *peer)
 {
-  struct Neighbour *ret;
-
-  ret = neighbours;
-  while ((ret != NULL) &&
-         (0 != memcmp (&ret->peer,
-                       peer, sizeof (struct GNUNET_PeerIdentity))))
-    ret = ret->next;
-  return ret;
+  return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey);
 }
 
 
@@ -1015,7 +1031,7 @@ handle_peer_status_change (struct Neighbour *n)
              GNUNET_i2s (&n->peer));
 #endif
   size = sizeof (struct PeerStatusNotifyMessage) +
-    (n->ats_count+1) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+    n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
   if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
     {
       GNUNET_break (0);
@@ -1024,7 +1040,7 @@ handle_peer_status_change (struct Neighbour *n)
                         n->ats_count,
                         0);
       size = sizeof (struct PeerStatusNotifyMessage) +
-       (n->ats_count+1) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+       n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
     }
   psnm = (struct PeerStatusNotifyMessage*) buf;
   psnm->header.size = htons (size);
@@ -1068,30 +1084,37 @@ schedule_peer_messages (struct Neighbour *n)
   unsigned int queue_size;
   
   /* check if neighbour queue is empty enough! */
-  queue_size = 0;
-  mqe = n->messages;
-  while (mqe != NULL) 
-    {
-      queue_size++;
-      mqe = mqe->next;
-    }
-  if (queue_size >= MAX_PEER_QUEUE_SIZE)
+  if (n != &self)
     {
+      queue_size = 0;
+      mqe = n->messages;
+      while (mqe != NULL) 
+       {
+         queue_size++;
+         mqe = mqe->next;
+       }
+      if (queue_size >= MAX_PEER_QUEUE_SIZE)
+       {
 #if DEBUG_CORE_CLIENT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Not considering client transmission requests: queue full\n");
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Not considering client transmission requests: queue full\n");
 #endif
-      return; /* queue still full */
+         return; /* queue still full */
+       }
+      /* find highest priority request */
+      pos = n->active_client_request_head;
+      car = NULL;
+      while (pos != NULL)
+       {
+         if ( (car == NULL) ||
+              (pos->priority > car->priority) )
+           car = pos;
+         pos = pos->next;
+       }
     }
-  /* find highest priority request */
-  pos = n->active_client_request_head;
-  car = NULL;
-  while (pos != NULL)
+  else
     {
-      if ( (car == NULL) ||
-          (pos->priority > car->priority) )
-       car = pos;
-      pos = pos->next;
+      car = n->active_client_request_head;
     }
   if (car == NULL)
     return; /* no pending requests */
@@ -1104,9 +1127,10 @@ schedule_peer_messages (struct Neighbour *n)
   GNUNET_CONTAINER_DLL_remove (n->active_client_request_head,
                               n->active_client_request_tail,
                               car);
-  GNUNET_CONTAINER_multihashmap_remove (c->requests,
-                                       &n->peer.hashPubKey,
-                                       car);  
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multihashmap_remove (c->requests,
+                                                      &n->peer.hashPubKey,
+                                                      car));  
   smr.header.size = htons (sizeof (struct SendMessageReady));
   smr.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_READY);
   smr.size = htons (car->msize);
@@ -1131,7 +1155,12 @@ handle_client_send_request (void *cls,
   struct ClientActiveRequest *car;
 
   req = (const struct SendMessageRequest*) message;
-  n = find_neighbour (&req->peer);
+  if (0 == memcmp (&req->peer,
+                  &my_identity,
+                  sizeof (struct GNUNET_PeerIdentity)))
+    n = &self;
+  else
+    n = find_neighbour (&req->peer);
   if ( (n == NULL) ||
        (GNUNET_YES != n->is_connected) ||
        (n->status != PEER_STATE_KEY_CONFIRMED) )
@@ -1193,6 +1222,57 @@ handle_client_send_request (void *cls,
 }
 
 
+/**
+ * Notify client about an existing connection to one of our neighbours.
+ */
+static int
+notify_client_about_neighbour (void *cls,
+                              const GNUNET_HashCode *key,
+                              void *value)
+{
+  struct Client *c = cls;
+  struct Neighbour *n = value;
+  size_t size;
+  char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
+  struct GNUNET_TRANSPORT_ATS_Information *ats;
+  struct ConnectNotifyMessage *cnm;
+
+  size = sizeof (struct ConnectNotifyMessage) +
+    (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+  if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+    {
+      GNUNET_break (0);
+      /* recovery strategy: throw away performance data */
+      GNUNET_array_grow (n->ats,
+                        n->ats_count,
+                        0);
+      size = sizeof (struct ConnectNotifyMessage) +
+       (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+    }
+  cnm = (struct ConnectNotifyMessage*) buf;      
+  cnm->header.size = htons (size);
+  cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
+  cnm->ats_count = htonl (n->ats_count);
+  ats = &cnm->ats;
+  memcpy (ats,
+         n->ats,
+         sizeof (struct GNUNET_TRANSPORT_ATS_Information) * n->ats_count);
+  ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
+  ats[n->ats_count].value = htonl (0);
+  if (n->status == PEER_STATE_KEY_CONFIRMED)
+    {
+#if DEBUG_CORE_CLIENT
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
+#endif
+      cnm->peer = n->peer;
+      send_to_client (c, &cnm->header, GNUNET_NO);
+    }
+  return GNUNET_OK;
+}
+
+
+
 /**
  * Handle CORE_INIT request.
  */
@@ -1207,11 +1287,6 @@ handle_client_init (void *cls,
   uint16_t msize;
   const uint16_t *types;
   uint16_t *wtypes;
-  struct Neighbour *n;
-  struct ConnectNotifyMessage *cnm;
-  char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
-  struct GNUNET_TRANSPORT_ATS_Information *ats;
-  size_t size;
   unsigned int i;
 
 #if DEBUG_CORE_CLIENT
@@ -1273,42 +1348,9 @@ handle_client_init (void *cls,
   if (0 != (c->options & GNUNET_CORE_OPTION_SEND_CONNECT))
     {
       /* notify new client about existing neighbours */
-      n = neighbours;
-      while (n != NULL)
-       {
-         size = sizeof (struct ConnectNotifyMessage) +
-           (n->ats_count+1) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
-         if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
-           {
-             GNUNET_break (0);
-             /* recovery strategy: throw away performance data */
-             GNUNET_array_grow (n->ats,
-                                n->ats_count,
-                                0);
-             size = sizeof (struct ConnectNotifyMessage) +
-               (n->ats_count+1) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
-           }
-         cnm = (struct ConnectNotifyMessage*) buf;       
-         cnm->header.size = htons (size);
-         cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
-         cnm->ats_count = htonl (n->ats_count);
-         ats = &cnm->ats;
-         memcpy (ats,
-                 n->ats,
-                 sizeof (struct GNUNET_TRANSPORT_ATS_Information) * n->ats_count);
-         ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
-         ats[n->ats_count].value = htonl (0);
-         if (n->status == PEER_STATE_KEY_CONFIRMED)
-           {
-#if DEBUG_CORE_CLIENT
-             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                         "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
-#endif
-             cnm->peer = n->peer;
-             send_to_client (c, &cnm->header, GNUNET_NO);
-           }
-         n = n->next;
-       }
+      GNUNET_CONTAINER_multihashmap_iterate (neighbours,
+                                            &notify_client_about_neighbour,
+                                            c);
     }
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -1333,6 +1375,7 @@ destroy_active_client_request (void *cls,
 
   peer.hashPubKey = *key;
   n = find_neighbour (&peer);
+  GNUNET_assert (NULL != n);
   GNUNET_CONTAINER_DLL_remove (n->active_client_request_head,
                               n->active_client_request_tail,
                               car);
@@ -1389,6 +1432,63 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
 }
 
 
+/**
+ * Helper function for handle_client_iterate_peers.
+ *
+ * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies
+ * @param key identity of the connected peer
+ * @param value the 'struct Neighbour' for the peer
+ * @return GNUNET_OK (continue to iterate)
+ */
+static int
+queue_connect_message (void *cls,
+                      const GNUNET_HashCode *key,
+                      void *value)
+{
+  struct GNUNET_SERVER_TransmitContext *tc = cls;
+  struct Neighbour *n = value;
+  char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
+  struct GNUNET_TRANSPORT_ATS_Information *ats;
+  size_t size;
+  struct ConnectNotifyMessage *cnm;
+
+  cnm = (struct ConnectNotifyMessage*) buf;
+  if (n->status != PEER_STATE_KEY_CONFIRMED)
+    return GNUNET_OK;
+  size = sizeof (struct ConnectNotifyMessage) +
+    (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+  if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+    {
+      GNUNET_break (0);
+      /* recovery strategy: throw away performance data */
+      GNUNET_array_grow (n->ats,
+                        n->ats_count,
+                        0);
+      size = sizeof (struct PeerStatusNotifyMessage) +
+       n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+    }
+  cnm = (struct ConnectNotifyMessage*) buf;
+  cnm->header.size = htons (size);
+  cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
+  cnm->ats_count = htonl (n->ats_count);
+  ats = &cnm->ats;
+  memcpy (ats,
+         n->ats,
+         n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
+  ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
+  ats[n->ats_count].value = htonl (0);   
+#if DEBUG_CORE_CLIENT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Sending `%s' message to client.\n",
+             "NOTIFY_CONNECT");
+#endif
+  cnm->peer = n->peer;
+  GNUNET_SERVER_transmit_context_append_message (tc, 
+                                                &cnm->header);
+  return GNUNET_OK;
+}
+
+
 /**
  * Handle CORE_ITERATE_PEERS request.
  *
@@ -1402,57 +1502,50 @@ handle_client_iterate_peers (void *cls,
                             const struct GNUNET_MessageHeader *message)
 
 {
-  struct Neighbour *n;
-  struct ConnectNotifyMessage *cnm;
   struct GNUNET_MessageHeader done_msg;
   struct GNUNET_SERVER_TransmitContext *tc;
-  char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
-  struct GNUNET_TRANSPORT_ATS_Information *ats;
-  size_t size;
-
+  int msize;
   /* notify new client about existing neighbours */
+
+  msize = ntohs(message->size);
   tc = GNUNET_SERVER_transmit_context_create (client);
-  cnm = (struct ConnectNotifyMessage*) buf;
-  n = neighbours;
-  while (n != NULL)
-    {
-      if (n->status == PEER_STATE_KEY_CONFIRMED)
-        {
-         size = sizeof (struct ConnectNotifyMessage) +
-           (n->ats_count+1) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
-         if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
-           {
-             GNUNET_break (0);
-             /* recovery strategy: throw away performance data */
-             GNUNET_array_grow (n->ats,
-                                n->ats_count,
-                                0);
-             size = sizeof (struct PeerStatusNotifyMessage) +
-               (n->ats_count+1) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
-           }
-         cnm = (struct ConnectNotifyMessage*) buf;
-         cnm->header.size = htons (size);
-         cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
-         cnm->ats_count = htonl (n->ats_count);
-         ats = &cnm->ats;
-         memcpy (ats,
-                 n->ats,
-                 n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
-         ats[n->ats_count].type = htonl (0);
-         ats[n->ats_count].value = htonl (0);    
-#if DEBUG_CORE_CLIENT
-          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "Sending `%s' message to client.\n",
-                     "NOTIFY_CONNECT");
-#endif
-          cnm->peer = n->peer;
-          GNUNET_SERVER_transmit_context_append_message (tc, 
-                                                        &cnm->header);
-        }
-      n = n->next;
-    }
+  if (msize == sizeof(struct GNUNET_MessageHeader))
+    GNUNET_CONTAINER_multihashmap_iterate (neighbours, &queue_connect_message, tc);
+  else
+    GNUNET_break(0);
+
+  done_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
+  done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END);
+  GNUNET_SERVER_transmit_context_append_message (tc, &done_msg);
+  GNUNET_SERVER_transmit_context_run (tc,
+                                      GNUNET_TIME_UNIT_FOREVER_REL);
+}
+
+/**
+ * Handle CORE_ITERATE_PEERS request.  Notify client about existing neighbours.
+ *
+ * @param cls unused
+ * @param client client sending the iteration request
+ * @param message iteration request message
+ */
+static void
+handle_client_have_peer (void *cls,
+                             struct GNUNET_SERVER_Client *client,
+                             const struct GNUNET_MessageHeader *message)
+
+{
+  struct GNUNET_MessageHeader done_msg;
+  struct GNUNET_SERVER_TransmitContext *tc;
+  struct GNUNET_PeerIdentity *peer;
+
+  tc = GNUNET_SERVER_transmit_context_create (client);
+  peer = (struct GNUNET_PeerIdentity *) &message[1];
+  GNUNET_CONTAINER_multihashmap_get_multiple(neighbours,
+                                            &peer->hashPubKey, 
+                                            &queue_connect_message, 
+                                            tc);
   done_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
-  done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
+  done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END);
   GNUNET_SERVER_transmit_context_append_message (tc, &done_msg);
   GNUNET_SERVER_transmit_context_run (tc,
                                       GNUNET_TIME_UNIT_FOREVER_REL);
@@ -1461,6 +1554,10 @@ handle_client_iterate_peers (void *cls,
 
 /**
  * Handle REQUEST_INFO request.
+ *
+ * @param cls unused
+ * @param client client sending the request
+ * @param message iteration request message
  */
 static void
 handle_client_request_info (void *cls,
@@ -1474,7 +1571,9 @@ handle_client_request_info (void *cls,
   int32_t want_reserv;
   int32_t got_reserv;
   unsigned long long old_preference;
+  struct GNUNET_TIME_Relative rdelay;
 
+  rdelay = GNUNET_TIME_relative_get_zero();
 #if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core service receives `%s' request.\n", "REQUEST_INFO");
@@ -1524,8 +1623,9 @@ handle_client_request_info (void *cls,
         }
       else if (want_reserv > 0)
         {
-         if (GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
-                                                 want_reserv).rel_value == 0)
+         rdelay = GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
+                                                      want_reserv);
+         if (rdelay.rel_value == 0)
            got_reserv = want_reserv;
          else
             got_reserv = 0; /* all or nothing */
@@ -1550,6 +1650,7 @@ handle_client_request_info (void *cls,
                  (int) got_reserv);
 #endif
       cim.reserved_amount = htonl (got_reserv);
+      cim.reserve_delay = GNUNET_TIME_relative_hton (rdelay);
       cim.rim_id = rcm->rim_id;
       cim.bw_out = n->bw_out;
       cim.preference = n->current_preference;
@@ -1631,7 +1732,10 @@ free_neighbour (struct Neighbour *n)
   if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)    
       GNUNET_SCHEDULER_cancel (n->keep_alive_task);
   if (n->status == PEER_STATE_KEY_CONFIRMED)
-    GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"), -1, GNUNET_NO);
+    GNUNET_STATISTICS_update (stats, 
+                             gettext_noop ("# established sessions"), 
+                             -1, 
+                             GNUNET_NO);
   GNUNET_array_grow (n->ats, n->ats_count, 0);
   GNUNET_free_non_null (n->public_key);
   GNUNET_free_non_null (n->pending_ping);
@@ -1792,8 +1896,6 @@ consider_free_task (void *cls,
 static void
 consider_free_neighbour (struct Neighbour *n)
 { 
-  struct Neighbour *pos;
-  struct Neighbour *prev;
   struct GNUNET_TIME_Relative left;
 
   if ( (n->th != NULL) ||
@@ -1812,22 +1914,13 @@ consider_free_neighbour (struct Neighbour *n)
       return;
     }
   /* actually free the neighbour... */
-  prev = NULL;
-  pos = neighbours;
-  while (pos != n)
-    {
-      prev = pos;
-      pos = pos->next;
-    }
-  if (prev == NULL)
-    neighbours = n->next;
-  else
-    prev->next = n->next;
-  GNUNET_assert (neighbour_count > 0);
-  neighbour_count--;
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multihashmap_remove (neighbours,
+                                                      &n->peer.hashPubKey,
+                                                      n));
   GNUNET_STATISTICS_set (stats,
                         gettext_noop ("# neighbour entries allocated"), 
-                        neighbour_count,
+                        GNUNET_CONTAINER_multihashmap_size (neighbours),
                         GNUNET_NO);
   free_neighbour (n);
 }
@@ -1843,7 +1936,9 @@ consider_free_neighbour (struct Neighbour *n)
  * @return number of bytes transmitted
  */
 static size_t
-notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
+notify_encrypted_transmit_ready (void *cls, 
+                                size_t size, 
+                                void *buf)
 {
   struct Neighbour *n = cls;
   struct MessageEntry *m;
@@ -1893,6 +1988,10 @@ notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
     }
   GNUNET_free (m);
   consider_free_neighbour (n);
+  GNUNET_STATISTICS_update (stats, 
+                           gettext_noop ("# encrypted bytes given to transport"), 
+                           ret, 
+                           GNUNET_NO);
   return ret;
 }
 
@@ -1995,7 +2094,10 @@ do_decrypt (struct Neighbour *n,
       GNUNET_break (0);
       return GNUNET_SYSERR;
     }
-  GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes decrypted"), size, GNUNET_NO);
+  GNUNET_STATISTICS_update (stats, 
+                           gettext_noop ("# bytes decrypted"), 
+                           size, 
+                           GNUNET_NO);
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Decrypted %u bytes from `%4s' using key %u, IV %u\n",
@@ -2643,10 +2745,6 @@ create_neighbour (const struct GNUNET_PeerIdentity *pid)
              GNUNET_i2s (pid));
 #endif
   n = GNUNET_malloc (sizeof (struct Neighbour));
-  n->next = neighbours;
-  neighbours = n;
-  neighbour_count++;
-  GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"), neighbour_count, GNUNET_NO);
   n->peer = *pid;
   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
   now = GNUNET_TIME_absolute_get ();
@@ -2659,6 +2757,13 @@ create_neighbour (const struct GNUNET_PeerIdentity *pid)
   n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
                                                 UINT32_MAX);
+  GNUNET_assert (GNUNET_OK == 
+                GNUNET_CONTAINER_multihashmap_put (neighbours,
+                                                   &n->peer.hashPubKey,
+                                                   n,
+                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"), 
+                        GNUNET_CONTAINER_multihashmap_size (neighbours), GNUNET_NO);
   neighbour_quota_update (n, NULL);
   consider_free_neighbour (n);
   return n;
@@ -2702,10 +2807,15 @@ handle_client_send (void *cls,
   msize -= sizeof (struct SendMessage);
   if (0 == memcmp (&sm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
     {
-      /* FIXME: should we not allow loopback-injection here? */
-      GNUNET_break (0);
+      /* loopback */
+      GNUNET_SERVER_mst_receive (mst,
+                                &self,
+                                (const char*) &sm[1],
+                                msize,
+                                GNUNET_YES,
+                                GNUNET_NO);
       if (client != NULL)
-        GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+        GNUNET_SERVER_receive_done (client, GNUNET_OK);
       return;
     }
   n = find_neighbour (&sm->peer);
@@ -2788,10 +2898,13 @@ handle_client_send (void *cls,
              GNUNET_i2s (&sm->peer),
              (unsigned int) msize);
 #endif  
+  GNUNET_break (0 == ntohl (sm->reserved));
   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
   e->priority = ntohl (sm->priority);
   e->size = msize;
+  if (GNUNET_YES != (int) ntohl (sm->cork))
+    e->got_slack = GNUNET_YES;
   memcpy (&e[1], &sm[1], msize);
 
   /* insert, keep list sorted by deadline */
@@ -2825,7 +2938,9 @@ handle_client_send (void *cls,
  * @return number of bytes transmitted
  */
 static size_t
-notify_transport_connect_done (void *cls, size_t size, void *buf)
+notify_transport_connect_done (void *cls,
+                              size_t size,
+                              void *buf)
 {
   struct Neighbour *n = cls;
 
@@ -2835,12 +2950,21 @@ notify_transport_connect_done (void *cls, size_t size, void *buf)
       /* transport should only call us to transmit a message after
        * telling us about a successful connection to the respective peer */
 #if DEBUG_CORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout on notify connect!\n");
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
+                 "Timeout on notify connect!\n");
 #endif
+      GNUNET_STATISTICS_update (stats, 
+                               gettext_noop ("# connection requests timed out in transport"), 
+                               1,
+                               GNUNET_NO);
       return 0;
     }
   if (buf == NULL)
     {
+      GNUNET_STATISTICS_update (stats,
+                                gettext_noop ("# connection requests timed out in transport"),
+                                1,
+                                GNUNET_NO);
       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                  _("Failed to connect to `%4s': transport failed to connect\n"),
                  GNUNET_i2s (&n->peer));
@@ -2873,12 +2997,16 @@ handle_client_request_connect (void *cls,
   struct Neighbour *n;
   struct GNUNET_TIME_Relative timeout;
 
-  if (0 == memcmp (&cm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
+  if (0 == memcmp (&cm->peer, 
+                  &my_identity, 
+                  sizeof (struct GNUNET_PeerIdentity)))
     {
-      GNUNET_break (0);
-      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+      /* In this case a client has asked us to connect to ourselves, not really an error! */
+      GNUNET_SERVER_receive_done (client, GNUNET_OK);
       return;
     }
+  timeout = GNUNET_TIME_relative_ntoh (cm->timeout);
+  GNUNET_break (ntohl (cm->reserved) == 0);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
   n = find_neighbour (&cm->peer);
   if (n == NULL)
@@ -2886,20 +3014,40 @@ handle_client_request_connect (void *cls,
   if ( (GNUNET_YES == n->is_connected) ||
        (n->th != NULL) )
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Core received `%s' request for `%4s', already connected!\n",
-                 "REQUEST_CONNECT",
-                 GNUNET_i2s (&cm->peer));
+      if (GNUNET_YES == n->is_connected) 
+       GNUNET_STATISTICS_update (stats, 
+                                 gettext_noop ("# connection requests ignored (already connected)"), 
+                                 1,
+                                 GNUNET_NO);
+      else
+        {
+          GNUNET_TRANSPORT_notify_transmit_ready_cancel(n->th);
+          n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
+                                                          &cm->peer,
+                                                          sizeof (struct GNUNET_MessageHeader), 0,
+                                                          timeout,
+                                                          &notify_transport_connect_done,
+                                                          n);
+          GNUNET_break (NULL != n->th);
+          GNUNET_STATISTICS_update (stats,
+                                    gettext_noop ("# connection requests retried (due to repeat request connect)"),
+                                    1,
+                                    GNUNET_NO);
+        }
       return; /* already connected, or at least trying */
     }
-  GNUNET_STATISTICS_update (stats, gettext_noop ("# connection requests received"), 1, GNUNET_NO);
+  GNUNET_STATISTICS_update (stats, 
+                           gettext_noop ("# connection requests received"), 
+                           1,
+                           GNUNET_NO);
 
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Core received `%s' request for `%4s', will try to establish connection\n",
              "REQUEST_CONNECT",
              GNUNET_i2s (&cm->peer));
+#endif
 
-  timeout = GNUNET_TIME_relative_ntoh (cm->timeout);
   /* ask transport to connect to the peer */
   n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
                                                  &cm->peer,
@@ -2919,14 +3067,23 @@ handle_client_request_connect (void *cls,
  * @param cls the 'struct Neighbour' to retry sending the key for
  * @param peer the peer for which this is the HELLO
  * @param hello HELLO message of that peer
+ * @param err_msg NULL if successful, otherwise contains error message
  */
 static void
 process_hello_retry_send_key (void *cls,
                               const struct GNUNET_PeerIdentity *peer,
-                              const struct GNUNET_HELLO_Message *hello)
+                              const struct GNUNET_HELLO_Message *hello,
+                              const char *err_msg)
 {
   struct Neighbour *n = cls;
 
+  if (err_msg != NULL)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                _("Error in communication with PEERINFO service\n"));
+    /* return; */
+  }
+
   if (peer == NULL)
     {
 #if DEBUG_CORE
@@ -3231,15 +3388,24 @@ handle_set_key (struct Neighbour *n,
  * @param cls pointer to the set key message
  * @param peer the peer for which this is the HELLO
  * @param hello HELLO message of that peer
+ * @param err_msg NULL if successful, otherwise contains error message
  */
 static void
 process_hello_retry_handle_set_key (void *cls,
                                     const struct GNUNET_PeerIdentity *peer,
-                                    const struct GNUNET_HELLO_Message *hello)
+                                    const struct GNUNET_HELLO_Message *hello,
+                                    const char *err_msg)
 {
   struct Neighbour *n = cls;
   struct SetKeyMessage *sm = n->skm;
 
+  if (err_msg != NULL)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                _("Error in communication with PEERINFO service\n"));
+    /* return; */
+  }
+
   if (peer == NULL)
     {
       n->skm = NULL;
@@ -3296,9 +3462,9 @@ update_neighbour_performance (struct Neighbour *n,
 
   if (ats_count == 0)
     return;
-  for (i=0;i<ats_count;i++)
+  for (i = 0; i < ats_count; i++)
     {
-      for (j=0;j<n->ats_count;j++)
+      for (j=0;j < n->ats_count; j++)
        {
          if (n->ats[j].type == ats[i].type)
            {
@@ -3306,10 +3472,12 @@ update_neighbour_performance (struct Neighbour *n,
              break;
            }
        }
-      if (j == n->ats_count)   
-       GNUNET_array_append (n->ats,
-                            n->ats_count,
-                            *ats);     
+      if (j == n->ats_count)
+        {
+          GNUNET_array_append (n->ats,
+                               n->ats_count,
+                               ats[i]);
+        }
     }
 }
 
@@ -3526,7 +3694,7 @@ handle_pong (struct Neighbour *n,
         }      
       update_neighbour_performance (n, ats, ats_count);      
       size = sizeof (struct ConnectNotifyMessage) +
-       (n->ats_count+1) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+       (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
       if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
        {
          GNUNET_break (0);
@@ -3535,17 +3703,18 @@ handle_pong (struct Neighbour *n,
                             n->ats_count,
                             0);
          size = sizeof (struct PeerStatusNotifyMessage) +
-           (n->ats_count+1) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+           n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
        }
       cnm = (struct ConnectNotifyMessage*) buf;
       cnm->header.size = htons (size);
       cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
       cnm->ats_count = htonl (n->ats_count);
+      cnm->peer = n->peer;
       mats = &cnm->ats;
       memcpy (mats,
              n->ats,
              n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
-      mats[n->ats_count].type = htonl (0);
+      mats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
       mats[n->ats_count].value = htonl (0);      
       send_to_all_clients (&cnm->header, 
                           GNUNET_NO, 
@@ -3762,7 +3931,7 @@ send_p2p_message_to_client (struct Neighbour *sender,
                             const void *m, size_t msize)
 {
   size_t size = msize + sizeof (struct NotifyTrafficMessage) +
-    (sender->ats_count+1) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+    (sender->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
   char buf[size];
   struct NotifyTrafficMessage *ntm;
   struct GNUNET_TRANSPORT_ATS_Information *ats;
@@ -3775,7 +3944,7 @@ send_p2p_message_to_client (struct Neighbour *sender,
                         sender->ats_count,
                         0);
       size = msize + sizeof (struct NotifyTrafficMessage) +
-       (sender->ats_count+1) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+       (sender->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
     }
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -4094,7 +4263,7 @@ handle_transport_receive (void *cls,
   n = find_neighbour (peer);
   if (n == NULL)
     n = create_neighbour (peer);
-  changed = GNUNET_YES; /* FIXME... */
+  changed = GNUNET_NO;
   up = (n->status == PEER_STATE_KEY_CONFIRMED);
   type = ntohs (message->type);
   size = ntohs (message->size);
@@ -4106,7 +4275,10 @@ handle_transport_receive (void *cls,
           GNUNET_break_op (0);
           return;
         }
-      GNUNET_STATISTICS_update (stats, gettext_noop ("# session keys received"), 1, GNUNET_NO);
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# session keys received"), 
+                               1, 
+                               GNUNET_NO);
       handle_set_key (n,
                      (const struct SetKeyMessage *) message,
                      ats, ats_count);
@@ -4121,7 +4293,11 @@ handle_transport_receive (void *cls,
       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
           (n->status != PEER_STATE_KEY_CONFIRMED))
         {
-          GNUNET_break_op (0);
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# failed to decrypt message (no session key)"), 
+                                   1, 
+                                   GNUNET_NO);
+          send_key (n);
           return;
         }
       handle_encrypted_message (n, 
@@ -4187,7 +4363,10 @@ handle_transport_receive (void *cls,
       changed = GNUNET_YES;
       if (!up)
        {
-         GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"), 1, GNUNET_NO);
+         GNUNET_STATISTICS_update (stats, 
+                                   gettext_noop ("# established sessions"), 
+                                   1, 
+                                   GNUNET_NO);
          n->time_established = now;
        }
       if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
@@ -4222,6 +4401,7 @@ neighbour_quota_update (void *cls,
   unsigned long long distributable;
   uint64_t need_per_peer;
   uint64_t need_per_second;
+  unsigned int neighbour_count;
 
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -4233,6 +4413,9 @@ neighbour_quota_update (void *cls,
      divides by a bit more to avoid division by zero AND to
      account for possibility of new neighbours joining any time 
      AND to convert to double... */
+  neighbour_count = GNUNET_CONTAINER_multihashmap_size (neighbours);
+  if (neighbour_count == 0)
+    return;
   if (preference_sum == 0)
     {
       pref_rel = 1.0 / (double) neighbour_count;
@@ -4390,31 +4573,35 @@ handle_transport_notify_disconnect (void *cls,
 
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Peer `%4s' disconnected from us; received notification from transport.\n", GNUNET_i2s (peer));
+              "Peer `%4s' disconnected from us; received notification from transport.\n", 
+             GNUNET_i2s (peer));
 #endif
-
   n = find_neighbour (peer);
   if (n == NULL)
     {
       GNUNET_break (0);
       return;
     }
-  GNUNET_break (n->is_connected);
+  GNUNET_break (n->is_connected == GNUNET_YES);
   if (n->status == PEER_STATE_KEY_CONFIRMED)
     {
       cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
       cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
+      cnm.reserved = htonl (0);
       cnm.peer = *peer;
       send_to_all_clients (&cnm.header, GNUNET_NO, GNUNET_CORE_OPTION_SEND_DISCONNECT);
+      GNUNET_STATISTICS_update (stats, 
+                               gettext_noop ("# established sessions"), 
+                               -1, 
+                               GNUNET_NO);
     }
 
   /* On transport disconnect transport doesn't cancel requests, so must do so here. */
   if (n->th != NULL)
     {
-      GNUNET_TRANSPORT_notify_transmit_ready_cancel(n->th);
+      GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
+      n->th = NULL;
     }
-  n->th = NULL;
-
   n->is_connected = GNUNET_NO;
   n->status = PEER_STATE_DOWN;
   while (NULL != (car = n->active_client_request_head))
@@ -4444,6 +4631,21 @@ handle_transport_notify_disconnect (void *cls,
 }
 
 
+/**
+ * Wrapper around 'free_neighbour'; helper for 'cleaning_task'.
+ */
+static int
+free_neighbour_helper (void *cls,
+                      const GNUNET_HashCode *key,
+                      void *value)
+{
+  struct Neighbour *n = value;
+
+  free_neighbour (n);
+  return GNUNET_OK;
+}
+
+
 /**
  * Last task run during shutdown.  Disconnects us from
  * the transport.
@@ -4451,24 +4653,21 @@ handle_transport_notify_disconnect (void *cls,
 static void
 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct Neighbour *n;
   struct Client *c;
 
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Core service shutting down.\n");
 #endif
-  while (NULL != (n = neighbours))
-    {
-      neighbours = n->next;
-      GNUNET_assert (neighbour_count > 0);
-      neighbour_count--;
-      free_neighbour (n);
-    }
+  GNUNET_CONTAINER_multihashmap_iterate (neighbours,
+                                        &free_neighbour_helper,
+                                        NULL);
+  GNUNET_CONTAINER_multihashmap_destroy (neighbours);
+  neighbours = NULL;
+  GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"), 0, GNUNET_NO);
   GNUNET_assert (transport != NULL);
   GNUNET_TRANSPORT_disconnect (transport);
   transport = NULL;
-  GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"), neighbour_count, GNUNET_NO);
   GNUNET_SERVER_notification_context_destroy (notifier);
   notifier = NULL;
   while (NULL != (c = clients))
@@ -4502,6 +4701,9 @@ run (void *cls,
     {&handle_client_iterate_peers, NULL,
      GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS,
      sizeof (struct GNUNET_MessageHeader)},
+    {&handle_client_have_peer, NULL,
+     GNUNET_MESSAGE_TYPE_CORE_PEER_CONNECTED,
+     sizeof (struct GNUNET_MessageHeader) + sizeof(struct GNUNET_PeerIdentity)},
     {&handle_client_request_info, NULL,
      GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
      sizeof (struct RequestInfoMessage)},
@@ -4517,7 +4719,7 @@ run (void *cls,
   };
   char *keyfile;
 
-  cfg = c;  
+  cfg = c;    
   /* parse configuration */
   if (
        (GNUNET_OK !=
@@ -4560,9 +4762,15 @@ run (void *cls,
       GNUNET_SCHEDULER_shutdown ();
       return;
     }
+  neighbours = GNUNET_CONTAINER_multihashmap_create (128);
   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);
   GNUNET_CRYPTO_hash (&my_public_key,
                       sizeof (my_public_key), &my_identity.hashPubKey);
+  self.public_key = &my_public_key;
+  self.peer = my_identity;
+  self.last_activity = GNUNET_TIME_UNIT_FOREVER_ABS;
+  self.status = PEER_STATE_KEY_CONFIRMED;
+  self.is_connected = GNUNET_YES;
   /* setup notification */
   notifier = GNUNET_SERVER_notification_context_create (server, 
                                                        MAX_NOTIFY_QUEUE);