new operation queue for limiting overlay connects
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
index 3c055a3b128e999a77e406b99f9b115ba295d721..232c77c2f7a675e81a94c54de125e8c9f87b07bf 100644 (file)
@@ -40,6 +40,7 @@
 #include "transport.h"
 
 
+
 /**
  * Size of the neighbour hash map.
  */
 /**
  * How often do we send KEEPALIVE messages to each of our neighbours and measure
  * the latency with this neighbour?
- * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
- * send 10 keepalives in each interval, so 10 messages would need to be
+ * (idle timeout is 5 minutes or 300 seconds, so with 100s interval we
+ * send 3 keepalives in each interval, so 3 messages would need to be
  * lost in a row for a disconnect).
  */
-#define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
+#define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 100)
 
 /**
  * How long are we willing to wait for a response from ATS before timing out?
@@ -221,10 +222,10 @@ struct MessageQueue
  * Possible state of a neighbour.  Initially, we are S_NOT_CONNECTED.
  *
  * Then, there are two main paths. If we receive a CONNECT message, we
- * first run a check against the blacklist and ask ATS for a
- * suggestion.  (S_CONNECT_RECV_ATS).  If the blacklist comes back
- * positive, we give the address to ATS.  If ATS makes a suggestion,
- * we ALSO give that suggestion to the blacklist
+ * first run a check against the blacklist (S_CONNECT_RECV_BLACKLIST_INBOUND).
+ * If this check is successful, we give the inbound address to ATS.
+ * After the check we ask ATS for a suggestion (S_CONNECT_RECV_ATS).
+ * If ATS makes a suggestion, we ALSO give that suggestion to the blacklist
  * (S_CONNECT_RECV_BLACKLIST).  Once the blacklist approves the
  * address we got from ATS, we send our CONNECT_ACK and go to
  * S_CONNECT_RECV_ACK.  If we receive a SESSION_ACK, we go to
@@ -301,6 +302,11 @@ enum State
    */
   S_CONNECT_SENT,
 
+  /**
+   * Received a CONNECT, do a blacklist check for inbound address
+   */
+  S_CONNECT_RECV_BLACKLIST_INBOUND,
+
   /**
    * Received a CONNECT, asking ATS about address suggestions.
    */
@@ -363,7 +369,13 @@ enum State
   S_DISCONNECT,
 
   /**
-   * We're finished with the disconnect; clean up state now!
+   * We're finished with the disconnect; and are cleaning up the state
+   * now!  We put the struct into this state when we are really in the
+   * task that calls 'free' on it and are about to remove the record
+   * from the map.  We should never find a 'struct NeighbourMapEntry'
+   * in this state in the map.  Accessing a 'struct NeighbourMapEntry'
+   * in this state virtually always means using memory that has been
+   * freed (the exception being the cleanup code in 'free_neighbour').
    */
   S_DISCONNECT_FINISHED
 };
@@ -584,7 +596,7 @@ static void *callback_cls;
 /**
  * Function to call when we connected to a neighbour.
  */
-static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
+static NotifyConnect connect_notify_cb;
 
 /**
  * Function to call when we disconnected from a neighbour.
@@ -629,55 +641,40 @@ print_state (int state)
   {
   case S_NOT_CONNECTED:
     return "S_NOT_CONNECTED";
-    break;
   case S_INIT_ATS:
     return "S_INIT_ATS";
-    break;
   case S_INIT_BLACKLIST:
     return "S_INIT_BLACKLIST";
-    break;
   case S_CONNECT_SENT:
     return "S_CONNECT_SENT";
-    break;
+  case S_CONNECT_RECV_BLACKLIST_INBOUND:
+    return "S_CONNECT_RECV_BLACKLIST_INBOUND";
   case S_CONNECT_RECV_ATS:
     return "S_CONNECT_RECV_ATS";
-    break;
   case S_CONNECT_RECV_BLACKLIST:
     return "S_CONNECT_RECV_BLACKLIST";
-    break;
   case S_CONNECT_RECV_ACK:
     return "S_CONNECT_RECV_ACK";
-    break;
   case S_CONNECTED:
     return "S_CONNECTED";
-    break;
   case S_RECONNECT_ATS:
     return "S_RECONNECT_ATS";
-    break;
   case S_RECONNECT_BLACKLIST:
     return "S_RECONNECT_BLACKLIST";
-    break;
   case S_RECONNECT_SENT:
     return "S_RECONNECT_SENT";
-    break;
   case S_CONNECTED_SWITCHING_BLACKLIST:
     return "S_CONNECTED_SWITCHING_BLACKLIST";
-    break;
   case S_CONNECTED_SWITCHING_CONNECT_SENT:
     return "S_CONNECTED_SWITCHING_CONNECT_SENT";
-    break;
   case S_DISCONNECT:
     return "S_DISCONNECT";
-    break;
   case S_DISCONNECT_FINISHED:
     return "S_DISCONNECT_FINISHED";
-    break;
   default:
-    return "UNDEFINED";
     GNUNET_break (0);
-    break;
+    return "UNDEFINED";
   }
-  GNUNET_break (0);
   return "UNDEFINED";
 }
 
@@ -698,6 +695,7 @@ test_connected (struct NeighbourMapEntry *n)
   case S_INIT_ATS:
   case S_INIT_BLACKLIST:
   case S_CONNECT_SENT:
+  case S_CONNECT_RECV_BLACKLIST_INBOUND:
   case S_CONNECT_RECV_ATS:
   case S_CONNECT_RECV_BLACKLIST:
   case S_CONNECT_RECV_ACK:
@@ -758,6 +756,7 @@ free_address (struct NeighbourAddress *na)
     GST_validation_set_address_use (na->address, na->session, GNUNET_NO, __LINE__);
     GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, GNUNET_NO);
   }
+
   na->ats_active = GNUNET_NO;
   if (NULL != na->address)
   {
@@ -832,7 +831,6 @@ set_address (struct NeighbourAddress *na,
   if (GNUNET_YES == is_active)
   {
     /* Telling ATS about new session */
-    GNUNET_ATS_address_update (GST_ats, na->address, na->session, NULL, 0);
     GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, GNUNET_YES);
     GST_validation_set_address_use (na->address, na->session, GNUNET_YES,  __LINE__);
 
@@ -847,12 +845,15 @@ set_address (struct NeighbourAddress *na,
  * Free a neighbour map entry.
  *
  * @param n entry to free
+ * @param keep_sessions GNUNET_NO to tell plugin to terminate sessions,
+ *                      GNUNET_YES to keep all sessions
  */
 static void
-free_neighbour (struct NeighbourMapEntry *n)
+free_neighbour (struct NeighbourMapEntry *n, int keep_sessions)
 {
   struct MessageQueue *mq;
   struct GNUNET_TRANSPORT_PluginFunctions *papi;
+  struct GNUNET_HELLO_Address *backup_primary;
 
   n->is_active = NULL; /* always free'd by its own continuation! */
 
@@ -861,7 +862,7 @@ free_neighbour (struct NeighbourMapEntry *n)
   {
     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
     if (NULL != mq->cont)
-      mq->cont (mq->cont_cls, GNUNET_SYSERR);
+      mq->cont (mq->cont_cls, GNUNET_SYSERR, mq->message_buf_size, 0);
     GNUNET_free (mq);
   }
   /* It is too late to send other peer disconnect notifications, but at
@@ -875,6 +876,17 @@ free_neighbour (struct NeighbourMapEntry *n)
     disconnect_notify_cb (callback_cls, &n->id);
   }
 
+  n->state = S_DISCONNECT_FINISHED;
+
+  if (NULL != n->primary_address.address)
+    backup_primary = GNUNET_HELLO_address_copy(n->primary_address.address);
+  else
+    backup_primary = NULL;
+
+  /* free addresses and mark as unused */
+  free_address (&n->primary_address);
+  free_address (&n->alternative_address);
+
   /* FIXME-PLUGIN-API: This does not seem to guarantee that all
      transport sessions eventually get killed due to inactivity; they
      MUST have their own timeout logic (but at least TCP doesn't have
@@ -884,20 +896,19 @@ free_neighbour (struct NeighbourMapEntry *n)
      API gives us not even the means to selectively kill only one of
      them! Killing all sessions like this seems to be very, very
      wrong. */
-  if ( (NULL != n->primary_address.address) &&
-       (NULL != (papi = GST_plugins_find (n->primary_address.address->transport_name))) )
+
+  /* cut transport-level connection */
+  if ((GNUNET_NO == keep_sessions) &&
+      (NULL != backup_primary) &&
+      (NULL != (papi = GST_plugins_find (backup_primary->transport_name))))
     papi->disconnect (papi->cls, &n->id);
 
-  n->state = S_DISCONNECT_FINISHED;
+  GNUNET_free_non_null (backup_primary);
 
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
                                                        &n->id.hashPubKey, n));
 
-  /* cut transport-level connection */
-  free_address (&n->primary_address);
-  free_address (&n->alternative_address);
-
   // FIXME-ATS-API: we might want to be more specific about
   // which states we do this from in the future (ATS should
   // have given us a 'suggest_address' handle, and if we have
@@ -913,7 +924,6 @@ free_neighbour (struct NeighbourMapEntry *n)
   GNUNET_free (n);
 }
 
-
 /**
  * Transmit a message using the current session of the given
  * neighbour.
@@ -937,15 +947,15 @@ send_with_session (struct NeighbourMapEntry *n,
   struct GNUNET_TRANSPORT_PluginFunctions *papi;
 
   GNUNET_assert (n->primary_address.session != NULL);
-  if ( ( (NULL == (papi = GST_plugins_find (n->primary_address.address->transport_name))) ||
+  if ( ((NULL == (papi = GST_plugins_find (n->primary_address.address->transport_name)) ||
         (-1 == papi->send (papi->cls,
                            n->primary_address.session,
                            msgbuf, msgbuf_size,
                            priority,
                            timeout,
-                           cont, cont_cls))) &&
-       (NULL != cont) )
-    cont (cont_cls, &n->id, GNUNET_SYSERR);
+                           cont, cont_cls)))) &&
+       (NULL != cont))
+    cont (cont_cls, &n->id, GNUNET_SYSERR, msgbuf_size, 0);
   GNUNET_break (NULL != papi);
 }
 
@@ -970,10 +980,12 @@ master_task (void *cls,
  * @param cls NULL
  * @param target identity of the neighbour that was disconnected
  * @param result GNUNET_OK if the disconnect got out successfully
+ * @param payload bytes payload
+ * @param physical bytes physical
  */
 static void
 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
-                      int result)
+                      int result, size_t payload, size_t physical)
 {
   struct NeighbourMapEntry *n;
 
@@ -983,7 +995,8 @@ send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
   if (S_DISCONNECT != n->state)
     return; /* have created a fresh entry since */
   n->state = S_DISCONNECT;
-  GNUNET_SCHEDULER_cancel (n->task);
+  if (GNUNET_SCHEDULER_NO_TASK != n->task)
+    GNUNET_SCHEDULER_cancel (n->task);
   n->task = GNUNET_SCHEDULER_add_now (&master_task, n);
 }
 
@@ -1047,17 +1060,18 @@ disconnect_neighbour (struct NeighbourMapEntry *n)
   case S_INIT_BLACKLIST:
     /* other peer is completely unaware of us, no need to send DISCONNECT */
     n->state = S_DISCONNECT_FINISHED;
-    free_neighbour (n);
+    free_neighbour (n, GNUNET_NO);
     return;
   case S_CONNECT_SENT:
     send_disconnect (n); 
     n->state = S_DISCONNECT;
-    break;   
+    break;
+  case S_CONNECT_RECV_BLACKLIST_INBOUND:
   case S_CONNECT_RECV_ATS:
   case S_CONNECT_RECV_BLACKLIST:
     /* we never ACK'ed the other peer's request, no need to send DISCONNECT */
     n->state = S_DISCONNECT_FINISHED;
-    free_neighbour (n);
+    free_neighbour (n, GNUNET_NO);
     return;
   case S_CONNECT_RECV_ACK:
     /* we DID ACK the other peer's request, must send DISCONNECT */
@@ -1114,31 +1128,41 @@ disconnect_neighbour (struct NeighbourMapEntry *n)
  * @param cls the 'struct MessageQueue' of the message
  * @param receiver intended receiver
  * @param success whether it worked or not
+ * @param size_payload bytes payload sent
+ * @param physical bytes sent on wire
  */
 static void
 transmit_send_continuation (void *cls,
                             const struct GNUNET_PeerIdentity *receiver,
-                            int success)
+                            int success, size_t size_payload, size_t physical)
 {
   struct MessageQueue *mq = cls;
   struct NeighbourMapEntry *n;
 
-  n = lookup_neighbour (receiver);
-  if (NULL == n)
+  if (NULL == (n = lookup_neighbour (receiver)))
   {
-    GNUNET_break (0);
-    return;
+    GNUNET_free (mq);
+    return; /* disconnect or other error while transmitting, can happen */
   }
-
   if (n->is_active == mq)
   {
     /* this is still "our" neighbour, remove us from its queue
        and allow it to send the next message now */
     n->is_active = NULL;
-    GNUNET_SCHEDULER_cancel (n->task);
+    if (GNUNET_SCHEDULER_NO_TASK != n->task)
+      GNUNET_SCHEDULER_cancel (n->task);
     n->task = GNUNET_SCHEDULER_add_now (&master_task, n);    
   }
-  GNUNET_assert (bytes_in_send_queue >= mq->message_buf_size);
+  if (bytes_in_send_queue < mq->message_buf_size)
+  {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "bytes_in_send_queue `%u' mq->message_buf_size %u\n",
+                  bytes_in_send_queue, mq->message_buf_size );
+      GNUNET_break (0);
+  }
+
+
+  GNUNET_break (size_payload == mq->message_buf_size);
   bytes_in_send_queue -= mq->message_buf_size;
   GNUNET_STATISTICS_set (GST_stats,
                         gettext_noop
@@ -1160,7 +1184,7 @@ transmit_send_continuation (void *cls,
               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
               (success == GNUNET_OK) ? "success" : "FAILURE");
   if (NULL != mq->cont)
-    mq->cont (mq->cont_cls, success);
+    mq->cont (mq->cont_cls, success, size_payload, physical);
   GNUNET_free (mq);
 }
 
@@ -1213,7 +1237,7 @@ try_transmission_to_peer (struct NeighbourMapEntry *n)
                              1, GNUNET_NO);
     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
     n->is_active = mq;
-    transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
+    transmit_send_continuation (mq, &n->id, GNUNET_SYSERR, mq->message_buf_size, 0);     /* timeout */
   }
   if (NULL == mq)
     return;                     /* no more messages */
@@ -1228,8 +1252,9 @@ try_transmission_to_peer (struct NeighbourMapEntry *n)
 
 /**
  * Send keepalive message to the neighbour.  Must only be called
- * if we are on 'connected' state.  Will internally determine
- * if a keepalive is truly needed (so can always be called).
+ * if we are on 'connected' state or while trying to switch addresses.
+ * Will internally determine if a keepalive is truly needed (so can
+ * always be called).
  *
  * @param n neighbour that went idle and needs a keepalive
  */
@@ -1238,7 +1263,9 @@ send_keepalive (struct NeighbourMapEntry *n)
 {
   struct GNUNET_MessageHeader m;
 
-  GNUNET_assert (S_CONNECTED == n->state);
+  GNUNET_assert ((S_CONNECTED == n->state) ||
+                 (S_CONNECTED_SWITCHING_BLACKLIST == n->state) ||
+                 (S_CONNECTED_SWITCHING_CONNECT_SENT));
   if (GNUNET_TIME_absolute_get_remaining (n->keep_alive_time).rel_value > 0)
     return; /* no keepalive needed at this time */
   m.size = htons (sizeof (struct GNUNET_MessageHeader));
@@ -1463,14 +1490,14 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
   {
     GNUNET_break (0);
     if (NULL != cont)
-      cont (cont_cls, GNUNET_SYSERR);
+      cont (cont_cls, GNUNET_SYSERR, msg_size, 0);
     return;
   }
   if (GNUNET_YES != test_connected (n))
   {
     GNUNET_break (0);
     if (NULL != cont)
-      cont (cont_cls, GNUNET_SYSERR);
+      cont (cont_cls, GNUNET_SYSERR, msg_size, 0);
     return;
   }
   bytes_in_send_queue += msg_size;
@@ -1489,7 +1516,8 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
   if ( (NULL != n->is_active) ||
        ( (NULL == n->primary_address.session) && (NULL == n->primary_address.address)) )
     return;
-  GNUNET_SCHEDULER_cancel (n->task);
+  if (GNUNET_SCHEDULER_NO_TASK != n->task)
+    GNUNET_SCHEDULER_cancel (n->task);
   n->task = GNUNET_SCHEDULER_add_now (&master_task, n);
 }
 
@@ -1504,7 +1532,7 @@ send_session_connect (struct NeighbourAddress *na)
 {
   struct GNUNET_TRANSPORT_PluginFunctions *papi;
   struct SessionConnectMessage connect_msg;
-  
+
   if (NULL == (papi = GST_plugins_find (na->address->transport_name)))  
   {
     GNUNET_break (0);
@@ -1528,6 +1556,7 @@ send_session_connect (struct NeighbourAddress *na)
                     UINT_MAX,
                     GNUNET_TIME_UNIT_FOREVER_REL,
                     NULL, NULL);
+
 }
 
 
@@ -1545,7 +1574,7 @@ send_session_connect_ack_message (const struct GNUNET_HELLO_Address *address,
 {
   struct GNUNET_TRANSPORT_PluginFunctions *papi;
   struct SessionConnectMessage connect_msg;
-  
+
   if (NULL == (papi = GST_plugins_find (address->transport_name)))  
   {
     GNUNET_break (0);
@@ -1568,6 +1597,7 @@ send_session_connect_ack_message (const struct GNUNET_HELLO_Address *address,
                     UINT_MAX,
                     GNUNET_TIME_UNIT_FOREVER_REL,
                     NULL, NULL);
+
 }
 
 
@@ -1659,11 +1689,12 @@ GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
     case S_NOT_CONNECTED:
       /* this should not be possible */
       GNUNET_break (0);
-      free_neighbour (n);
+      free_neighbour (n, GNUNET_NO);
       break;
     case S_INIT_ATS:
     case S_INIT_BLACKLIST:
     case S_CONNECT_SENT:
+    case S_CONNECT_RECV_BLACKLIST_INBOUND:
     case S_CONNECT_RECV_ATS:
     case S_CONNECT_RECV_BLACKLIST:
     case S_CONNECT_RECV_ACK:
@@ -1683,7 +1714,7 @@ GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
       return; /* already connected */
     case S_DISCONNECT:
       /* get rid of remains, ready to re-try immediately */
-      free_neighbour (n);
+      free_neighbour (n, GNUNET_NO);
       break;
     case S_DISCONNECT_FINISHED:
       /* should not be possible */      
@@ -1691,13 +1722,15 @@ GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
     default:
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state));
       GNUNET_break (0);
-      free_neighbour (n);
+      free_neighbour (n, GNUNET_NO);
       break;
     }
   }
   n = setup_neighbour (target);  
   n->state = S_INIT_ATS; 
   n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT);
+
+  GNUNET_ATS_reset_backoff (GST_ats, target);
   GNUNET_ATS_suggest_address (GST_ats, target);
 }
 
@@ -1722,14 +1755,6 @@ handle_test_blacklist_cont (void *cls,
               "Connection to new address of peer `%s' based on blacklist is `%s'\n",
               GNUNET_i2s (peer),
               (GNUNET_OK == result) ? "allowed" : "FORBIDDEN");
-  if (GNUNET_OK == result)
-  {
-    /* valid new address, let ATS know! */
-    GNUNET_ATS_address_update (GST_ats, 
-                              bcc->na.address, 
-                              bcc->na.session, 
-                              bcc->ats, bcc->ats_count);
-  }
   if (NULL == (n = lookup_neighbour (peer)))
     goto cleanup; /* nobody left to care about new address */
   switch (n->state)
@@ -1737,7 +1762,7 @@ handle_test_blacklist_cont (void *cls,
   case S_NOT_CONNECTED:
     /* this should not be possible */
     GNUNET_break (0);
-    free_neighbour (n);
+    free_neighbour (n, GNUNET_NO);
     break;
   case S_INIT_ATS:
     /* still waiting on ATS suggestion */
@@ -1763,9 +1788,11 @@ handle_test_blacklist_cont (void *cls,
     }
     else
     {
+      // FIXME: should also possibly destroy session with plugin!?
       GNUNET_ATS_address_destroyed (GST_ats,
                                    bcc->na.address,
                                    NULL);
+      free_address (&n->primary_address);
       n->state = S_INIT_ATS;
       n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT);
       // FIXME: do we need to ask ATS again for suggestions?
@@ -1783,9 +1810,23 @@ handle_test_blacklist_cont (void *cls,
                                        n->connect_ack_timestamp);
     }
     break; 
+  case S_CONNECT_RECV_BLACKLIST_INBOUND:
+    if (GNUNET_OK == result)
+    {
+      /* valid new address, let ATS know! */
+      GNUNET_ATS_address_add (GST_ats,
+                              bcc->na.address,
+                              bcc->na.session,
+                              bcc->ats, bcc->ats_count);
+    }
+    n->state = S_CONNECT_RECV_ATS;
+    n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT);
+    GNUNET_ATS_reset_backoff (GST_ats, peer);
+    GNUNET_ATS_suggest_address (GST_ats, peer);
+    break;
   case S_CONNECT_RECV_ATS:
     /* still waiting on ATS suggestion, don't care about blacklist */
-    break; 
+    break;
   case S_CONNECT_RECV_BLACKLIST:
     if (GNUNET_YES != address_matches (&bcc->na, &n->primary_address))
       break; /* result for an address we currently don't care about */
@@ -1801,12 +1842,15 @@ handle_test_blacklist_cont (void *cls,
     }
     else
     {
+      // FIXME: should also possibly destroy session with plugin!?
       GNUNET_ATS_address_destroyed (GST_ats,
                                    bcc->na.address,
                                    NULL);
+      free_address (&n->primary_address);
       n->state = S_INIT_ATS;
       n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT);
       // FIXME: do we need to ask ATS again for suggestions?
+      GNUNET_ATS_reset_backoff (GST_ats, peer);
       GNUNET_ATS_suggest_address (GST_ats, &n->id);
     }
     break;
@@ -1904,7 +1948,7 @@ handle_test_blacklist_cont (void *cls,
   default:
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state));
     GNUNET_break (0);
-    free_neighbour (n);
+    free_neighbour (n, GNUNET_NO);
     break;
   }
  cleanup:
@@ -1987,6 +2031,7 @@ GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received CONNECT message from peer `%s'\n", 
              GNUNET_i2s (peer));
+
   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
   {
     GNUNET_break_op (0);
@@ -2002,17 +2047,21 @@ GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
     n = setup_neighbour (peer);
   n->send_connect_ack = 1;
   n->connect_ack_timestamp = ts;
+
   switch (n->state)
   {  
   case S_NOT_CONNECTED:
-    n->state = S_CONNECT_RECV_ATS;
-    n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT);
-    GNUNET_ATS_suggest_address (GST_ats, peer);
+    n->state = S_CONNECT_RECV_BLACKLIST_INBOUND;
+    /* Do a blacklist check for the new address */
     check_blacklist (peer, ts, address, session, ats, ats_count);
     break;
   case S_INIT_ATS:
+    /* CONNECT message takes priority over us asking ATS for address */
+    n->state = S_CONNECT_RECV_BLACKLIST_INBOUND;
+    /* fallthrough */
   case S_INIT_BLACKLIST:
   case S_CONNECT_SENT:
+  case S_CONNECT_RECV_BLACKLIST_INBOUND:
   case S_CONNECT_RECV_ATS:
   case S_CONNECT_RECV_BLACKLIST:
   case S_CONNECT_RECV_ACK:
@@ -2051,12 +2100,12 @@ GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
     check_blacklist (peer, ts, address, session, ats, ats_count);
     break;
   case S_DISCONNECT:
-    /* get rid of remains, ready to re-try */
-    free_neighbour (n);
+    /* get rid of remains without terminating sessions, ready to re-try */
+    free_neighbour (n, GNUNET_YES);
     n = setup_neighbour (peer);
     n->state = S_CONNECT_RECV_ATS;
+    GNUNET_ATS_reset_backoff (GST_ats, peer);
     GNUNET_ATS_suggest_address (GST_ats, peer);
-    check_blacklist (peer, ts, address, session, ats, ats_count);
     break;
   case S_DISCONNECT_FINISHED:
     /* should not be possible */
@@ -2065,7 +2114,7 @@ GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
   default:
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state));
     GNUNET_break (0);
-    free_neighbour (n);
+    free_neighbour (n, GNUNET_NO);
     break;
   }
 }
@@ -2113,11 +2162,27 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
   {
     GNUNET_break (0);
     if (strlen (address->transport_name) > 0)
-      GNUNET_ATS_address_destroyed (GST_ats, address, session);
+      GNUNET_ATS_address_destroyed (GST_ats, address, NULL);
     return;
   }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "ATS tells us to switch to address '%s' session %p for "
+              "peer `%s' in state %s (quota in/out %u %u )\n",
+              (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>",
+              session,
+              GNUNET_i2s (peer),
+              print_state (n->state),
+              ntohl (bandwidth_in.value__),
+              ntohl (bandwidth_out.value__));
+
   if (NULL == session)
+  {
     session = papi->get_session (papi->cls, address);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Obtained new session for peer `%s' and  address '%s': %p\n",
+                GNUNET_i2s (&address->peer), GST_plugins_a2s (address), session);
+  }
   if (NULL == session)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2126,15 +2191,11 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
     GNUNET_ATS_address_destroyed (GST_ats, address, NULL);
     return;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "ATS tells us to switch to address '%s' for peer `%s'\n",
-              (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>",
-              GNUNET_i2s (peer));
   switch (n->state)
   {
   case S_NOT_CONNECTED:
     GNUNET_break (0);
-    free_neighbour (n);
+    free_neighbour (n, GNUNET_NO);
     return;
   case S_INIT_ATS:
     set_address (&n->primary_address,
@@ -2173,7 +2234,14 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
                     n->connect_ack_timestamp,
                     address, session, ats, ats_count);    
     break;
+  case S_CONNECT_RECV_BLACKLIST_INBOUND:
+    n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT);
+    check_blacklist (&n->id,
+                     n->connect_ack_timestamp,
+                     address, session, ats, ats_count);
+    break;
   case S_CONNECT_RECV_BLACKLIST:
+  case S_CONNECT_RECV_ACK:
     /* ATS asks us to switch while we were trying to connect; switch to new
        address and check blacklist again */
     set_address (&n->primary_address,
@@ -2196,7 +2264,7 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
     /* ATS asks us to switch a life connection; see if we can get
        a CONNECT_ACK on it before we actually do this! */
     set_address (&n->alternative_address,
-                address, session, bandwidth_in, bandwidth_out, GNUNET_YES);
+                address, session, bandwidth_in, bandwidth_out, GNUNET_NO);
     n->state = S_CONNECTED_SWITCHING_BLACKLIST;
     check_blacklist (&n->id,
                     GNUNET_TIME_absolute_get (),
@@ -2242,7 +2310,7 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
     }
     /* ATS asks us to switch a life connection, update blacklist check */
     set_address (&n->alternative_address,
-                address, session, bandwidth_in, bandwidth_out, GNUNET_YES);
+                address, session, bandwidth_in, bandwidth_out, GNUNET_NO);
     check_blacklist (&n->id,
                     GNUNET_TIME_absolute_get (),
                     address, session, ats, ats_count);
@@ -2257,7 +2325,7 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
     }
     /* ATS asks us to switch a life connection, update blacklist check */
     set_address (&n->alternative_address,
-                address, session, bandwidth_in, bandwidth_out, GNUNET_YES);
+                address, session, bandwidth_in, bandwidth_out, GNUNET_NO);
     n->state = S_CONNECTED_SWITCHING_BLACKLIST;
     check_blacklist (&n->id,
                     GNUNET_TIME_absolute_get (),
@@ -2294,13 +2362,18 @@ master_task (void *cls,
 
   n->task = GNUNET_SCHEDULER_NO_TASK;
   delay = GNUNET_TIME_absolute_get_remaining (n->timeout);  
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Master task runs for neighbour `%s' in state %s with timeout in %llu ms\n",
+             GNUNET_i2s (&n->id),
+             print_state(n->state),
+             (unsigned long long) delay.rel_value);
   switch (n->state)
   {
   case S_NOT_CONNECTED:
     /* invalid state for master task, clean up */
     GNUNET_break (0);
     n->state = S_DISCONNECT_FINISHED;
-    free_neighbour (n);
+    free_neighbour (n, GNUNET_NO);
     return;
   case S_INIT_ATS:
     if (0 == delay.rel_value)
@@ -2309,7 +2382,7 @@ master_task (void *cls,
                  "Connection to `%s' timed out waiting for ATS to provide address\n",
                  GNUNET_i2s (&n->id));
       n->state = S_DISCONNECT_FINISHED;
-      free_neighbour (n);
+      free_neighbour (n, GNUNET_NO);
       return;
     }
     break;
@@ -2320,7 +2393,7 @@ master_task (void *cls,
                  "Connection to `%s' timed out waiting for BLACKLIST to approve address\n",
                  GNUNET_i2s (&n->id));
       n->state = S_DISCONNECT_FINISHED;
-      free_neighbour (n);
+      free_neighbour (n, GNUNET_NO);
       return;
     }
     break;
@@ -2334,6 +2407,17 @@ master_task (void *cls,
       return;
     }
     break;
+  case S_CONNECT_RECV_BLACKLIST_INBOUND:
+    if (0 == delay.rel_value)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Connection to `%s' timed out waiting BLACKLIST to approve address to use for received CONNECT\n",
+                  GNUNET_i2s (&n->id));
+      n->state = S_DISCONNECT_FINISHED;
+      free_neighbour (n, GNUNET_NO);
+      return;
+    }
+    break;
   case S_CONNECT_RECV_ATS:
     if (0 == delay.rel_value)
     {
@@ -2341,7 +2425,7 @@ master_task (void *cls,
                  "Connection to `%s' timed out waiting ATS to provide address to use for CONNECT_ACK\n",
                  GNUNET_i2s (&n->id));
       n->state = S_DISCONNECT_FINISHED;
-      free_neighbour (n);
+      free_neighbour (n, GNUNET_NO);
       return;
     }
     break;
@@ -2352,7 +2436,7 @@ master_task (void *cls,
                  "Connection to `%s' timed out waiting BLACKLIST to approve address to use for CONNECT_ACK\n",
                  GNUNET_i2s (&n->id));
       n->state = S_DISCONNECT_FINISHED;
-      free_neighbour (n);
+      free_neighbour (n, GNUNET_NO);
       return;
     }
     break;
@@ -2436,8 +2520,7 @@ master_task (void *cls,
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Cleaning up connection to `%s' after sending DISCONNECT\n",
                GNUNET_i2s (&n->id));
-    n->state = S_DISCONNECT_FINISHED;
-    free_neighbour (n);
+    free_neighbour (n, GNUNET_NO);
     return;
   case S_DISCONNECT_FINISHED:
     /* how did we get here!? */
@@ -2448,12 +2531,20 @@ master_task (void *cls,
     GNUNET_break (0);
     break;  
   }
-  delay = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_remaining (n->keep_alive_time),
-                                   delay);
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == n->task);
-  n->task = GNUNET_SCHEDULER_add_delayed (delay,
-                                         &master_task,
-                                         n);
+  if ( (S_CONNECTED_SWITCHING_CONNECT_SENT == n->state) ||
+       (S_CONNECTED_SWITCHING_BLACKLIST == n->state) ||
+       (S_CONNECTED == n->state) )    
+  {
+    /* if we are *now* in one of these three states, we're sending
+       keep alive messages, so we need to consider the keepalive
+       delay, not just the connection timeout */
+    delay = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_remaining (n->keep_alive_time),
+                                     delay);
+  }
+  if (GNUNET_SCHEDULER_NO_TASK == n->task)
+    n->task = GNUNET_SCHEDULER_add_delayed (delay,
+                                           &master_task,
+                                           n);
 }
 
 
@@ -2504,6 +2595,7 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received CONNECT_ACK message from peer `%s'\n",
               GNUNET_i2s (peer));
+
   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
   {
     GNUNET_break_op (0);
@@ -2524,7 +2616,7 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
   {
   case S_NOT_CONNECTED:
     GNUNET_break (0);
-    free_neighbour (n);
+    free_neighbour (n, GNUNET_NO);
     return;
   case S_INIT_ATS:
   case S_INIT_BLACKLIST:
@@ -2542,7 +2634,14 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
                           gettext_noop ("# peers connected"), 
                           ++neighbours_connected,
                           GNUNET_NO);
-    connect_notify_cb (callback_cls, &n->id, ats, ats_count);
+    connect_notify_cb (callback_cls, &n->id, ats, ats_count,
+                       n->primary_address.bandwidth_in,
+                       n->primary_address.bandwidth_out);
+    /* Tell ATS that the outbound session we created to send CONNECT was successfull */
+    GNUNET_ATS_address_add (GST_ats,
+                            n->primary_address.address,
+                            n->primary_address.session,
+                            ats, ats_count);
     set_address (&n->primary_address,
                 n->primary_address.address,
                 n->primary_address.session,
@@ -2551,6 +2650,7 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
                 GNUNET_YES);
     send_session_ack_message (n);
     break;
+  case S_CONNECT_RECV_BLACKLIST_INBOUND:
   case S_CONNECT_RECV_ATS:
   case S_CONNECT_RECV_BLACKLIST:
   case S_CONNECT_RECV_ACK:
@@ -2585,7 +2685,11 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
     /* new address worked; adopt it and go back to connected! */
     n->state = S_CONNECTED;
     n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-    GNUNET_assert (GNUNET_NO == n->alternative_address.ats_active);
+    GNUNET_break (GNUNET_NO == n->alternative_address.ats_active);
+    GNUNET_ATS_address_add(GST_ats,
+                           n->alternative_address.address,
+                           n->alternative_address.session,
+                           ats, ats_count);
     set_address (&n->primary_address,
                 n->alternative_address.address,
                 n->alternative_address.session,
@@ -2618,8 +2722,10 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
  *
  * @param peer identity of the peer where the session died
  * @param session session that is gone
+ * @return GNUNET_YES if this was a session used, GNUNET_NO if
+ *        this session was not in use
  */
-void
+int
 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
                                    struct Session *session)
 {
@@ -2643,7 +2749,7 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
     }
   }
   if (NULL == (n = lookup_neighbour (peer)))
-    return; /* can't affect us */
+    return GNUNET_NO; /* can't affect us */
   if (session != n->primary_address.session)
   {
     if (session == n->alternative_address.session)
@@ -2655,21 +2761,20 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
       else
        GNUNET_break (0);
     }
-    return; /* doesn't affect us further */
+    return GNUNET_NO; /* doesn't affect us further */
   }
 
   n->expect_latency_response = GNUNET_NO;
-
   switch (n->state)
   {
   case S_NOT_CONNECTED:
     GNUNET_break (0);
-    free_neighbour (n);
-    return;
+    free_neighbour (n, GNUNET_NO);
+    return GNUNET_YES;
   case S_INIT_ATS:
     GNUNET_break (0);
-    free_neighbour (n);
-    return;
+    free_neighbour (n, GNUNET_NO);
+    return GNUNET_YES;
   case S_INIT_BLACKLIST:
   case S_CONNECT_SENT:
     free_address (&n->primary_address);
@@ -2678,13 +2783,14 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
     // FIXME: need to ask ATS for suggestions again?
     GNUNET_ATS_suggest_address (GST_ats, &n->id);
     break;
+  case S_CONNECT_RECV_BLACKLIST_INBOUND:
   case S_CONNECT_RECV_ATS:    
   case S_CONNECT_RECV_BLACKLIST:
   case S_CONNECT_RECV_ACK:
     /* error on inbound session; free neighbour entirely */
     free_address (&n->primary_address);
-    free_neighbour (n);
-    return;
+    free_neighbour (n, GNUNET_NO);
+    return GNUNET_YES;
   case S_CONNECTED:
     free_address (&n->primary_address);
     n->state = S_RECONNECT_ATS;
@@ -2726,6 +2832,7 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
     break;
   case S_DISCONNECT_FINISHED:
     /* neighbour was freed and plugins told to terminate session */
+    return GNUNET_NO;
     break;
   default:
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state));
@@ -2735,6 +2842,7 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
   if (GNUNET_SCHEDULER_NO_TASK != n->task)
     GNUNET_SCHEDULER_cancel (n->task);
   n->task = GNUNET_SCHEDULER_add_now (&master_task, n);
+  return GNUNET_YES;
 }
 
 
@@ -2788,7 +2896,13 @@ GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *message,
                         gettext_noop ("# peers connected"), 
                         ++neighbours_connected,
                         GNUNET_NO);
-  connect_notify_cb (callback_cls, &n->id, ats, ats_count);
+  connect_notify_cb (callback_cls, &n->id, ats, ats_count,
+                     n->primary_address.bandwidth_in,
+                     n->primary_address.bandwidth_out);
+  GNUNET_ATS_address_add(GST_ats,
+                         n->primary_address.address,
+                         n->primary_address.session,
+                         ats, ats_count);
   set_address (&n->primary_address,
               n->primary_address.address,
               n->primary_address.session,
@@ -2862,7 +2976,7 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
 {
   struct NeighbourMapEntry *n;
   const struct SessionDisconnectMessage *sdm;
-  GNUNET_HashCode hc;
+  struct GNUNET_HashCode hc;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received DISCONNECT message from peer `%s'\n",
@@ -2946,13 +3060,31 @@ struct IteratorContext
  * @return GNUNET_OK (continue to iterate)
  */
 static int
-neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
+neighbours_iterate (void *cls, const struct GNUNET_HashCode * key, void *value)
 {
   struct IteratorContext *ic = cls;
   struct NeighbourMapEntry *n = value;
 
   if (GNUNET_YES == test_connected (n))
-    ic->cb (ic->cb_cls, &n->id, NULL, 0, n->primary_address.address);
+  {
+    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
+    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
+
+    if (NULL != n->primary_address.address)
+    {
+      bandwidth_in = n->primary_address.bandwidth_in;
+      bandwidth_out = n->primary_address.bandwidth_out;
+    }
+    else
+    {
+      bandwidth_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
+      bandwidth_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
+    }
+
+    ic->cb (ic->cb_cls, &n->id, NULL, 0,
+            n->primary_address.address,
+            bandwidth_in, bandwidth_out);
+  }
   return GNUNET_OK;
 }
 
@@ -3015,14 +3147,20 @@ GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
   switch (n->state)
   {
   case S_CONNECTED:
+  case S_CONNECTED_SWITCHING_CONNECT_SENT:
+  case S_CONNECTED_SWITCHING_BLACKLIST:
   case S_RECONNECT_SENT:
   case S_RECONNECT_ATS:
+  case S_RECONNECT_BLACKLIST:
     return n->latency;
   case S_NOT_CONNECTED:
   case S_INIT_BLACKLIST:
   case S_INIT_ATS:
-  case S_CONNECT_SENT:
+  case S_CONNECT_RECV_BLACKLIST_INBOUND:
+  case S_CONNECT_RECV_ATS:
   case S_CONNECT_RECV_BLACKLIST:
+  case S_CONNECT_RECV_ACK:
+  case S_CONNECT_SENT:
   case S_DISCONNECT:
   case S_DISCONNECT_FINISHED:
     return GNUNET_TIME_UNIT_FOREVER_REL;
@@ -3064,7 +3202,7 @@ GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
  */
 void
 GST_neighbours_start (void *cls,
-                      GNUNET_TRANSPORT_NotifyConnect connect_cb,
+    NotifyConnect connect_cb,
                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb,
                       GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb)
 {
@@ -3072,7 +3210,7 @@ GST_neighbours_start (void *cls,
   connect_notify_cb = connect_cb;
   disconnect_notify_cb = disconnect_cb;
   address_change_cb = peer_address_cb;
-  neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
+  neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE, GNUNET_NO);
 }
 
 
@@ -3085,14 +3223,15 @@ GST_neighbours_start (void *cls,
  * @return GNUNET_OK (continue to iterate)
  */
 static int
-disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
+disconnect_all_neighbours (void *cls, const struct GNUNET_HashCode * key, void *value)
 {
   struct NeighbourMapEntry *n = value;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
              "Disconnecting peer `%4s', %s\n",
               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
-  free_neighbour (n);
+  n->state = S_DISCONNECT_FINISHED;
+  free_neighbour (n, GNUNET_NO);
   return GNUNET_OK;
 }