new operation queue for limiting overlay connects
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
index 5ef6f0455a56884080a58079e0e4d44c3dc7f2df..232c77c2f7a675e81a94c54de125e8c9f87b07bf 100644 (file)
@@ -40,6 +40,7 @@
 #include "transport.h"
 
 
+
 /**
  * Size of the neighbour hash map.
  */
@@ -595,7 +596,7 @@ static void *callback_cls;
 /**
  * Function to call when we connected to a neighbour.
  */
-static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
+static NotifyConnect connect_notify_cb;
 
 /**
  * Function to call when we disconnected from a neighbour.
@@ -640,58 +641,40 @@ print_state (int state)
   {
   case S_NOT_CONNECTED:
     return "S_NOT_CONNECTED";
-    break;
   case S_INIT_ATS:
     return "S_INIT_ATS";
-    break;
   case S_INIT_BLACKLIST:
     return "S_INIT_BLACKLIST";
-    break;
   case S_CONNECT_SENT:
     return "S_CONNECT_SENT";
-    break;
   case S_CONNECT_RECV_BLACKLIST_INBOUND:
     return "S_CONNECT_RECV_BLACKLIST_INBOUND";
-    break;
   case S_CONNECT_RECV_ATS:
     return "S_CONNECT_RECV_ATS";
-    break;
   case S_CONNECT_RECV_BLACKLIST:
     return "S_CONNECT_RECV_BLACKLIST";
-    break;
   case S_CONNECT_RECV_ACK:
     return "S_CONNECT_RECV_ACK";
-    break;
   case S_CONNECTED:
     return "S_CONNECTED";
-    break;
   case S_RECONNECT_ATS:
     return "S_RECONNECT_ATS";
-    break;
   case S_RECONNECT_BLACKLIST:
     return "S_RECONNECT_BLACKLIST";
-    break;
   case S_RECONNECT_SENT:
     return "S_RECONNECT_SENT";
-    break;
   case S_CONNECTED_SWITCHING_BLACKLIST:
     return "S_CONNECTED_SWITCHING_BLACKLIST";
-    break;
   case S_CONNECTED_SWITCHING_CONNECT_SENT:
     return "S_CONNECTED_SWITCHING_CONNECT_SENT";
-    break;
   case S_DISCONNECT:
     return "S_DISCONNECT";
-    break;
   case S_DISCONNECT_FINISHED:
     return "S_DISCONNECT_FINISHED";
-    break;
   default:
-    return "UNDEFINED";
     GNUNET_break (0);
-    break;
+    return "UNDEFINED";
   }
-  GNUNET_break (0);
   return "UNDEFINED";
 }
 
@@ -879,7 +862,7 @@ free_neighbour (struct NeighbourMapEntry *n, int keep_sessions)
   {
     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
     if (NULL != mq->cont)
-      mq->cont (mq->cont_cls, GNUNET_SYSERR);
+      mq->cont (mq->cont_cls, GNUNET_SYSERR, mq->message_buf_size, 0);
     GNUNET_free (mq);
   }
   /* It is too late to send other peer disconnect notifications, but at
@@ -941,7 +924,6 @@ free_neighbour (struct NeighbourMapEntry *n, int keep_sessions)
   GNUNET_free (n);
 }
 
-
 /**
  * Transmit a message using the current session of the given
  * neighbour.
@@ -965,15 +947,15 @@ send_with_session (struct NeighbourMapEntry *n,
   struct GNUNET_TRANSPORT_PluginFunctions *papi;
 
   GNUNET_assert (n->primary_address.session != NULL);
-  if ( ( (NULL == (papi = GST_plugins_find (n->primary_address.address->transport_name))) ||
+  if ( ((NULL == (papi = GST_plugins_find (n->primary_address.address->transport_name)) ||
         (-1 == papi->send (papi->cls,
                            n->primary_address.session,
                            msgbuf, msgbuf_size,
                            priority,
                            timeout,
-                           cont, cont_cls))) &&
-       (NULL != cont) )
-    cont (cont_cls, &n->id, GNUNET_SYSERR);
+                           cont, cont_cls)))) &&
+       (NULL != cont))
+    cont (cont_cls, &n->id, GNUNET_SYSERR, msgbuf_size, 0);
   GNUNET_break (NULL != papi);
 }
 
@@ -998,10 +980,12 @@ master_task (void *cls,
  * @param cls NULL
  * @param target identity of the neighbour that was disconnected
  * @param result GNUNET_OK if the disconnect got out successfully
+ * @param payload bytes payload
+ * @param physical bytes physical
  */
 static void
 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
-                      int result)
+                      int result, size_t payload, size_t physical)
 {
   struct NeighbourMapEntry *n;
 
@@ -1144,11 +1128,13 @@ disconnect_neighbour (struct NeighbourMapEntry *n)
  * @param cls the 'struct MessageQueue' of the message
  * @param receiver intended receiver
  * @param success whether it worked or not
+ * @param size_payload bytes payload sent
+ * @param physical bytes sent on wire
  */
 static void
 transmit_send_continuation (void *cls,
                             const struct GNUNET_PeerIdentity *receiver,
-                            int success)
+                            int success, size_t size_payload, size_t physical)
 {
   struct MessageQueue *mq = cls;
   struct NeighbourMapEntry *n;
@@ -1167,7 +1153,16 @@ transmit_send_continuation (void *cls,
       GNUNET_SCHEDULER_cancel (n->task);
     n->task = GNUNET_SCHEDULER_add_now (&master_task, n);    
   }
-  GNUNET_assert (bytes_in_send_queue >= mq->message_buf_size);
+  if (bytes_in_send_queue < mq->message_buf_size)
+  {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "bytes_in_send_queue `%u' mq->message_buf_size %u\n",
+                  bytes_in_send_queue, mq->message_buf_size );
+      GNUNET_break (0);
+  }
+
+
+  GNUNET_break (size_payload == mq->message_buf_size);
   bytes_in_send_queue -= mq->message_buf_size;
   GNUNET_STATISTICS_set (GST_stats,
                         gettext_noop
@@ -1189,7 +1184,7 @@ transmit_send_continuation (void *cls,
               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
               (success == GNUNET_OK) ? "success" : "FAILURE");
   if (NULL != mq->cont)
-    mq->cont (mq->cont_cls, success);
+    mq->cont (mq->cont_cls, success, size_payload, physical);
   GNUNET_free (mq);
 }
 
@@ -1242,7 +1237,7 @@ try_transmission_to_peer (struct NeighbourMapEntry *n)
                              1, GNUNET_NO);
     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
     n->is_active = mq;
-    transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
+    transmit_send_continuation (mq, &n->id, GNUNET_SYSERR, mq->message_buf_size, 0);     /* timeout */
   }
   if (NULL == mq)
     return;                     /* no more messages */
@@ -1495,14 +1490,14 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
   {
     GNUNET_break (0);
     if (NULL != cont)
-      cont (cont_cls, GNUNET_SYSERR);
+      cont (cont_cls, GNUNET_SYSERR, msg_size, 0);
     return;
   }
   if (GNUNET_YES != test_connected (n))
   {
     GNUNET_break (0);
     if (NULL != cont)
-      cont (cont_cls, GNUNET_SYSERR);
+      cont (cont_cls, GNUNET_SYSERR, msg_size, 0);
     return;
   }
   bytes_in_send_queue += msg_size;
@@ -1537,7 +1532,7 @@ send_session_connect (struct NeighbourAddress *na)
 {
   struct GNUNET_TRANSPORT_PluginFunctions *papi;
   struct SessionConnectMessage connect_msg;
-  
+
   if (NULL == (papi = GST_plugins_find (na->address->transport_name)))  
   {
     GNUNET_break (0);
@@ -1561,6 +1556,7 @@ send_session_connect (struct NeighbourAddress *na)
                     UINT_MAX,
                     GNUNET_TIME_UNIT_FOREVER_REL,
                     NULL, NULL);
+
 }
 
 
@@ -1578,7 +1574,7 @@ send_session_connect_ack_message (const struct GNUNET_HELLO_Address *address,
 {
   struct GNUNET_TRANSPORT_PluginFunctions *papi;
   struct SessionConnectMessage connect_msg;
-  
+
   if (NULL == (papi = GST_plugins_find (address->transport_name)))  
   {
     GNUNET_break (0);
@@ -1601,6 +1597,7 @@ send_session_connect_ack_message (const struct GNUNET_HELLO_Address *address,
                     UINT_MAX,
                     GNUNET_TIME_UNIT_FOREVER_REL,
                     NULL, NULL);
+
 }
 
 
@@ -2064,6 +2061,7 @@ GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
     /* fallthrough */
   case S_INIT_BLACKLIST:
   case S_CONNECT_SENT:
+  case S_CONNECT_RECV_BLACKLIST_INBOUND:
   case S_CONNECT_RECV_ATS:
   case S_CONNECT_RECV_BLACKLIST:
   case S_CONNECT_RECV_ACK:
@@ -2169,11 +2167,14 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
   }
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "ATS tells us to switch to address '%s' session %p for peer `%s' in state %s\n",
+              "ATS tells us to switch to address '%s' session %p for "
+              "peer `%s' in state %s (quota in/out %u %u )\n",
               (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>",
               session,
               GNUNET_i2s (peer),
-              print_state (n->state));
+              print_state (n->state),
+              ntohl (bandwidth_in.value__),
+              ntohl (bandwidth_out.value__));
 
   if (NULL == session)
   {
@@ -2633,7 +2634,9 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
                           gettext_noop ("# peers connected"), 
                           ++neighbours_connected,
                           GNUNET_NO);
-    connect_notify_cb (callback_cls, &n->id, ats, ats_count);
+    connect_notify_cb (callback_cls, &n->id, ats, ats_count,
+                       n->primary_address.bandwidth_in,
+                       n->primary_address.bandwidth_out);
     /* Tell ATS that the outbound session we created to send CONNECT was successfull */
     GNUNET_ATS_address_add (GST_ats,
                             n->primary_address.address,
@@ -2893,7 +2896,9 @@ GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *message,
                         gettext_noop ("# peers connected"), 
                         ++neighbours_connected,
                         GNUNET_NO);
-  connect_notify_cb (callback_cls, &n->id, ats, ats_count);
+  connect_notify_cb (callback_cls, &n->id, ats, ats_count,
+                     n->primary_address.bandwidth_in,
+                     n->primary_address.bandwidth_out);
   GNUNET_ATS_address_add(GST_ats,
                          n->primary_address.address,
                          n->primary_address.session,
@@ -3061,7 +3066,25 @@ neighbours_iterate (void *cls, const struct GNUNET_HashCode * key, void *value)
   struct NeighbourMapEntry *n = value;
 
   if (GNUNET_YES == test_connected (n))
-    ic->cb (ic->cb_cls, &n->id, NULL, 0, n->primary_address.address);
+  {
+    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
+    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
+
+    if (NULL != n->primary_address.address)
+    {
+      bandwidth_in = n->primary_address.bandwidth_in;
+      bandwidth_out = n->primary_address.bandwidth_out;
+    }
+    else
+    {
+      bandwidth_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
+      bandwidth_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
+    }
+
+    ic->cb (ic->cb_cls, &n->id, NULL, 0,
+            n->primary_address.address,
+            bandwidth_in, bandwidth_out);
+  }
   return GNUNET_OK;
 }
 
@@ -3179,7 +3202,7 @@ GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
  */
 void
 GST_neighbours_start (void *cls,
-                      GNUNET_TRANSPORT_NotifyConnect connect_cb,
+    NotifyConnect connect_cb,
                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb,
                       GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb)
 {
@@ -3187,7 +3210,7 @@ GST_neighbours_start (void *cls,
   connect_notify_cb = connect_cb;
   disconnect_notify_cb = disconnect_cb;
   address_change_cb = peer_address_cb;
-  neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
+  neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE, GNUNET_NO);
 }