This probably fixes #3944
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
index 561a62396ea72933078c792a9b7da85a8684c639..01546ded4d059e1a58fa928012b364ae55419fe2 100644 (file)
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
@@ -157,6 +157,26 @@ struct SessionKeepAliveMessage
   uint32_t nonce GNUNET_PACKED;
 };
 
+
+/**
+ * Message a peer sends to another when connected to indicate that
+ * the other peer should limit transmissions to the indicated
+ * quota.
+ */
+struct SessionQuotaMessage
+{
+  /**
+   * Header of type #GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_QUOTA.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Quota to use (for sending), in bytes per second.
+   */
+  uint32_t quota GNUNET_PACKED;
+};
+
+
 /**
  * Message we send to the other peer to notify him that we intentionally
  * are disconnecting (to reduce timeouts).  This is just a friendly
@@ -336,7 +356,7 @@ struct NeighbourMapEntry
 
   /**
    * Main task that drives this peer (timeouts, keepalives, etc.).
-   * Always runs the 'master_task'.
+   * Always runs the #master_task().
    */
   struct GNUNET_SCHEDULER_Task *task;
 
@@ -371,7 +391,7 @@ struct NeighbourMapEntry
   /**
    * Time where we should cut the connection (timeout) if we don't
    * make progress in the state machine (or get a KEEPALIVE_RESPONSE
-   * if we are in #S_CONNECTED).
+   * if we are in #GNUNET_TRANSPORT_PS_CONNECTED).
    */
   struct GNUNET_TIME_Absolute timeout;
 
@@ -387,6 +407,13 @@ struct NeighbourMapEntry
    */
   unsigned int quota_violation_count;
 
+  /**
+   * Latest quota the other peer send us in bytes per second.
+   * We should not send more, least the other peer throttle
+   * receiving our traffic.
+   */
+  struct GNUNET_BANDWIDTH_Value32NBO neighbour_receive_quota;
+
   /**
    * The current state of the peer.
    */
@@ -641,13 +668,15 @@ test_connected (struct NeighbourMapEntry *n)
 
 /**
  * Send information about a new outbound quota to our clients.
+ * Note that the outbound quota is enforced client-side (i.e.
+ * in libgnunettransport).
  *
  * @param target affected peer
  * @param quota new quota
  */
 static void
-send_outbound_quota (const struct GNUNET_PeerIdentity *target,
-                     struct GNUNET_BANDWIDTH_Value32NBO quota)
+send_outbound_quota_to_clients (const struct GNUNET_PeerIdentity *target,
+                                struct GNUNET_BANDWIDTH_Value32NBO quota)
 {
   struct QuotaSetMessage q_msg;
 
@@ -863,8 +892,8 @@ set_primary_address (struct NeighbourMapEntry *n,
     if (n->primary_address.bandwidth_out.value__ != bandwidth_out.value__)
     {
       n->primary_address.bandwidth_out = bandwidth_out;
-      send_outbound_quota (&address->peer,
-                           bandwidth_out);
+      send_outbound_quota_to_clients (&address->peer,
+                                      bandwidth_out);
     }
     return;
   }
@@ -901,8 +930,8 @@ set_primary_address (struct NeighbourMapEntry *n,
                                   GNUNET_YES);
   GST_neighbours_set_incoming_quota (&address->peer,
                                      bandwidth_in);
-  send_outbound_quota (&address->peer,
-                       bandwidth_out);
+  send_outbound_quota_to_clients (&address->peer,
+                                  bandwidth_out);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Neighbour `%s' switched to address `%s'\n",
               GNUNET_i2s (&n->id),
@@ -1254,7 +1283,8 @@ transmit_send_continuation (void *cls,
     n->is_active = NULL;
     if (NULL != n->task)
       GNUNET_SCHEDULER_cancel (n->task);
-    n->task = GNUNET_SCHEDULER_add_now (&master_task, n);
+    n->task = GNUNET_SCHEDULER_add_now (&master_task,
+                                        n);
   }
   if (bytes_in_send_queue < mq->message_buf_size)
   {
@@ -1271,14 +1301,14 @@ transmit_send_continuation (void *cls,
   GNUNET_break (size_payload == mq->message_buf_size);
   bytes_in_send_queue -= mq->message_buf_size;
   GNUNET_STATISTICS_set (GST_stats,
-                        gettext_noop
-                        ("# bytes in message queue for other peers"),
-                        bytes_in_send_queue, GNUNET_NO);
+                         gettext_noop ("# bytes in message queue for other peers"),
+                        bytes_in_send_queue,
+                         GNUNET_NO);
   if (GNUNET_OK == success)
     GNUNET_STATISTICS_update (GST_stats,
-                             gettext_noop
-                             ("# messages transmitted to other peers"),
-                             1, GNUNET_NO);
+                             gettext_noop ("# messages transmitted to other peers"),
+                             1,
+                              GNUNET_NO);
   else
     GNUNET_STATISTICS_update (GST_stats,
                              gettext_noop
@@ -1291,7 +1321,10 @@ transmit_send_continuation (void *cls,
               mq->message_buf_size,
               (success == GNUNET_OK) ? "success" : "FAILURE");
   if (NULL != mq->cont)
-    mq->cont (mq->cont_cls, success, size_payload, physical);
+    mq->cont (mq->cont_cls,
+              success,
+              size_payload,
+              physical);
   GNUNET_free (mq);
 }
 
@@ -2107,6 +2140,7 @@ setup_neighbour (const struct GNUNET_PeerIdentity *peer)
   n->id = *peer;
   n->ack_state = ACK_UNDEFINED;
   n->last_util_transmission = GNUNET_TIME_absolute_get();
+  n->neighbour_receive_quota = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
                                  &inbound_bw_tracker_update,
                                  n,
@@ -2296,7 +2330,7 @@ GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
  * We received a 'SYN' message from the other peer.
  * Consider switching to it.
  *
- * @param message possibly a 'struct TransportSynMessage' (check format)
+ * @param message possibly a `struct TransportSynMessage` (check format)
  * @param peer identity of the peer to switch the address for
  * @return #GNUNET_OK if the message was fine, #GNUNET_SYSERR on serious error
  */
@@ -2447,6 +2481,7 @@ try_run_fast_ats_update (const struct GNUNET_HELLO_Address *address,
                          struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out)
 {
   struct NeighbourMapEntry *n;
+  struct GNUNET_BANDWIDTH_Value32NBO bandwidth_min;
 
   n = lookup_neighbour (&address->peer);
   if ( (NULL == n) ||
@@ -2465,12 +2500,20 @@ try_run_fast_ats_update (const struct GNUNET_HELLO_Address *address,
                    GST_ats_is_known (n->primary_address.address,
                                      n->primary_address.session));
   }
-  n->primary_address.bandwidth_in = bandwidth_in;
-  n->primary_address.bandwidth_out = bandwidth_out;
-  GST_neighbours_set_incoming_quota (&address->peer,
-                                     bandwidth_in);
-  send_outbound_quota (&address->peer,
-                       bandwidth_out);
+  if (n->primary_address.bandwidth_in.value__ != bandwidth_in.value__)
+  {
+    n->primary_address.bandwidth_in = bandwidth_in;
+    GST_neighbours_set_incoming_quota (&address->peer,
+                                       bandwidth_in);
+  }
+  if (n->primary_address.bandwidth_out.value__ != bandwidth_out.value__)
+  {
+    n->primary_address.bandwidth_out = bandwidth_out;
+    bandwidth_min = GNUNET_BANDWIDTH_value_min (bandwidth_out,
+                                                n->neighbour_receive_quota);
+    send_outbound_quota_to_clients (&address->peer,
+                                    bandwidth_min);
+  }
   return GNUNET_OK;
 }
 
@@ -3485,8 +3528,9 @@ GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *message,
      now wait for the ACK to finally be connected
      - If we sent a SYN_ACK to this peer before */
 
-  if ( (GNUNET_TRANSPORT_PS_SYN_RECV_ACK != n->state) &&
-       (ACK_SEND_ACK != n->ack_state))
+  if ( ( (GNUNET_TRANSPORT_PS_SYN_RECV_ACK != n->state) &&
+        (ACK_SEND_ACK != n->ack_state) ) ||
+       (NULL == n->primary_address.address) ) 
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "Received unexpected ACK message from peer `%s' in state %s/%s\n",
@@ -3495,7 +3539,8 @@ GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *message,
                 print_ack_state (n->ack_state));
 
     GNUNET_STATISTICS_update (GST_stats,
-                              gettext_noop ("# unexpected ACK messages"), 1,
+                              gettext_noop ("# unexpected ACK messages"),
+                              1,
                               GNUNET_NO);
     return GNUNET_OK;
   }
@@ -3510,7 +3555,19 @@ GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *message,
                          GNUNET_TRANSPORT_PS_CONNECTED,
                          GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
 
-  /* Reset backoff for primary address  */
+  if (NULL == n->primary_address.address) {
+    /* See issue #3693.
+     * We are in state = PSY_SYN_RECV_ACK or ack_state = ACK_SEND_ACK, which
+     * really means we did try (and succeed) to send a SYN and are waiting for
+     * an ACK.
+     * That suggests that the primary_address used to be non-NULL, but maybe it
+     * got reset to NULL without the state being changed appropriately?
+     */
+    GNUNET_break (0);
+    return GNUNET_OK;
+  }
+
+  /* Reset backoff for primary address */
   GST_ats_block_reset (n->primary_address.address,
                        n->primary_address.session);
   return GNUNET_OK;
@@ -3531,7 +3588,9 @@ GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
 
 
 /**
- * Change the incoming quota for the given peer.
+ * Change the incoming quota for the given peer.  Updates
+ * our own receive rate and informs the neighbour about
+ * the new quota.
  *
  * @param neighbour identity of peer to change qutoa for
  * @param quota new quota
@@ -3555,7 +3614,21 @@ GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
               ntohl (quota.value__), GNUNET_i2s (&n->id));
   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
   if (0 != ntohl (quota.value__))
+  {
+    struct SessionQuotaMessage sqm;
+
+    sqm.header.size = htons (sizeof (struct SessionQuotaMessage));
+    sqm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_QUOTA);
+    sqm.quota = quota.value__;
+    (void) send_with_session (n,
+                              &sqm,
+                              sizeof (sqm),
+                              UINT32_MAX - 1,
+                              GNUNET_TIME_UNIT_FOREVER_REL,
+                              GNUNET_NO,
+                              NULL, NULL);
     return;
+  }
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Disconnecting peer `%4s' due to SET_QUOTA\n",
               GNUNET_i2s (&n->id));
@@ -3587,6 +3660,54 @@ delayed_disconnect (void *cls,
 }
 
 
+/**
+ * We received a quoat message from the given peer,
+ * validate and process.
+ *
+ * @param peer sender of the message
+ * @param msg the quota message
+ */
+void
+GST_neighbours_handle_quota_message (const struct GNUNET_PeerIdentity *peer,
+                                     const struct GNUNET_MessageHeader *msg)
+{
+  struct NeighbourMapEntry *n;
+  const struct SessionQuotaMessage *sqm;
+  struct GNUNET_BANDWIDTH_Value32NBO bandwidth_min;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received QUOTA message from peer `%s'\n",
+              GNUNET_i2s (peer));
+  if (ntohs (msg->size) != sizeof (struct SessionQuotaMessage))
+  {
+    GNUNET_break_op (0);
+    GNUNET_STATISTICS_update (GST_stats,
+                              gettext_noop ("# quota messages ignored (malformed)"),
+                              1,
+                              GNUNET_NO);
+    return;
+  }
+  GNUNET_STATISTICS_update (GST_stats,
+                            gettext_noop
+                            ("# QUOTA messages received"),
+                            1, GNUNET_NO);
+  sqm = (const struct SessionQuotaMessage *) msg;
+  if (NULL == (n = lookup_neighbour (peer)))
+  {
+    /* gone already */
+    return;
+  }
+  n->neighbour_receive_quota
+    = GNUNET_BANDWIDTH_value_max (GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
+                                  GNUNET_BANDWIDTH_value_init (ntohl (sqm->quota)));
+
+  bandwidth_min = GNUNET_BANDWIDTH_value_min (n->primary_address.bandwidth_out,
+                                              n->neighbour_receive_quota);
+  send_outbound_quota_to_clients (peer,
+                                  bandwidth_min);
+}
+
+
 /**
  * We received a disconnect message from the given peer,
  * validate and process.
@@ -3601,7 +3722,7 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity *peer
   struct NeighbourMapEntry *n;
   const struct SessionDisconnectMessage *sdm;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received DISCONNECT message from peer `%s'\n",
               GNUNET_i2s (peer));
   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
@@ -3609,7 +3730,8 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity *peer
     GNUNET_break_op (0);
     GNUNET_STATISTICS_update (GST_stats,
                               gettext_noop
-                              ("# disconnect messages ignored (malformed)"), 1,
+                              ("# disconnect messages ignored (malformed)"),
+                              1,
                               GNUNET_NO);
     return;
   }
@@ -3661,7 +3783,11 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity *peer
     GNUNET_break_op (0);
     return;
   }
-  n->delayed_disconnect_task = GNUNET_SCHEDULER_add_now (&delayed_disconnect, n);
+  if (NULL == n->delayed_disconnect_task)
+  {
+    n->delayed_disconnect_task = GNUNET_SCHEDULER_add_now (&delayed_disconnect,
+                                                           n);
+  }
 }