-use UINT32_MAX to mean disconnect, for real
authorChristian Grothoff <christian@grothoff.org>
Sun, 18 Oct 2015 18:57:48 +0000 (18:57 +0000)
committerChristian Grothoff <christian@grothoff.org>
Sun, 18 Oct 2015 18:57:48 +0000 (18:57 +0000)
src/transport/plugin_transport_udp.c

index 2c95918f1d0b845a023cdacc19e7b64c87a781cf..29ade08f04ebbe843061d2ce64b568aa6c8220e9 100644 (file)
@@ -480,6 +480,8 @@ struct UDP_ACK_Message
 
   /**
    * Desired delay for flow control, in us (in NBO).
+   * A value of UINT32_MAX indicates that the other
+   * peer wants us to disconnect.
    */
   uint32_t delay GNUNET_PACKED;
 
@@ -2143,118 +2145,6 @@ udp_plugin_send (void *cls,
 }
 
 
-/**
- * Handle an ACK message.
- *
- * @param plugin the UDP plugin
- * @param msg the (presumed) UDP ACK message
- * @param udp_addr sender address
- * @param udp_addr_len number of bytes in @a udp_addr
- */
-static void
-read_process_ack (struct Plugin *plugin,
-                  const struct GNUNET_MessageHeader *msg,
-                  const union UdpAddress *udp_addr,
-                  socklen_t udp_addr_len)
-{
-  const struct GNUNET_MessageHeader *ack;
-  const struct UDP_ACK_Message *udp_ack;
-  struct GNUNET_HELLO_Address *address;
-  struct GNUNET_ATS_Session *s;
-  struct GNUNET_TIME_Relative flow_delay;
-
-  if (ntohs (msg->size)
-      < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
-  {
-    GNUNET_break_op (0);
-    return;
-  }
-  udp_ack = (const struct UDP_ACK_Message *) msg;
-  ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
-  if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
-  {
-    GNUNET_break_op(0);
-    return;
-  }
-  address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
-                                           PLUGIN_NAME,
-                                           udp_addr,
-                                           udp_addr_len,
-                                           GNUNET_HELLO_ADDRESS_INFO_NONE);
-  s = udp_plugin_lookup_session (plugin,
-                                 address);
-  if (NULL == s)
-  {
-    LOG (GNUNET_ERROR_TYPE_WARNING,
-         "UDP session of address %s for ACK not found\n",
-         udp_address_to_string (plugin,
-                                address->address,
-                                address->address_length));
-    GNUNET_HELLO_address_free (address);
-    return;
-  }
-  if (NULL == s->frag_ctx)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-         "Fragmentation context of address %s for ACK (%s) not found\n",
-         udp_address_to_string (plugin,
-                                address->address,
-                                address->address_length),
-         GNUNET_FRAGMENT_print_ack (ack));
-    GNUNET_HELLO_address_free (address);
-    return;
-  }
-  GNUNET_HELLO_address_free (address);
-
-  flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
-  if (flow_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
-    LOG (GNUNET_ERROR_TYPE_WARNING,
-         "We received a sending delay of %s for %s\n",
-         GNUNET_STRINGS_relative_time_to_string (flow_delay,
-                                                 GNUNET_YES),
-         GNUNET_i2s (&udp_ack->sender));
-  else
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "We received a sending delay of %s for %s\n",
-         GNUNET_STRINGS_relative_time_to_string (flow_delay,
-                                                 GNUNET_YES),
-         GNUNET_i2s (&udp_ack->sender));
-  /* Flow delay is for the reassembled packet, however, our delay
-     is per packet, so we need to adjust: */
-  flow_delay = GNUNET_TIME_relative_divide (flow_delay,
-                                            1 + (s->frag_ctx->payload_size /
-                                                 UDP_MTU));
-  s->flow_delay_from_other_peer = flow_delay;
-
-
-  if (GNUNET_OK !=
-      GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
-                                   ack))
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
-         (unsigned int) ntohs (msg->size),
-         GNUNET_i2s (&udp_ack->sender),
-         udp_address_to_string (plugin,
-                                udp_addr,
-                                udp_addr_len));
-    /* Expect more ACKs to arrive */
-    return;
-  }
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Message from %s at %s full ACK'ed\n",
-       GNUNET_i2s (&udp_ack->sender),
-       udp_address_to_string (plugin,
-                              udp_addr,
-                              udp_addr_len));
-
-  /* Remove fragmented message after successful sending */
-  fragmented_message_done (s->frag_ctx,
-                           GNUNET_OK);
-}
-
-
 /* ********************** Receiving ********************** */
 
 
@@ -2317,35 +2207,6 @@ find_receive_context (void *cls,
 }
 
 
-/**
- * Message tokenizer has broken up an incomming message. Pass it on
- * to the service.
- *
- * @param cls the `struct Plugin *`
- * @param client the `struct GNUNET_ATS_Session *`
- * @param hdr the actual message
- * @return #GNUNET_OK (always)
- */
-static int
-process_inbound_tokenized_messages (void *cls,
-                                    void *client,
-                                    const struct GNUNET_MessageHeader *hdr)
-{
-  struct Plugin *plugin = cls;
-  struct GNUNET_ATS_Session *session = client;
-
-  if (GNUNET_YES == session->in_destroy)
-    return GNUNET_OK;
-  reschedule_session_timeout (session);
-  session->flow_delay_for_other_peer
-    = plugin->env->receive (plugin->env->cls,
-                            session->address,
-                            session,
-                            hdr);
-  return GNUNET_OK;
-}
-
-
 /**
  * Functions with this signature are called whenever we need to close
  * a session due to a disconnect or failure to establish a connection.
@@ -2463,6 +2324,154 @@ udp_disconnect_session (void *cls,
 }
 
 
+/**
+ * Handle an ACK message.
+ *
+ * @param plugin the UDP plugin
+ * @param msg the (presumed) UDP ACK message
+ * @param udp_addr sender address
+ * @param udp_addr_len number of bytes in @a udp_addr
+ */
+static void
+read_process_ack (struct Plugin *plugin,
+                  const struct GNUNET_MessageHeader *msg,
+                  const union UdpAddress *udp_addr,
+                  socklen_t udp_addr_len)
+{
+  const struct GNUNET_MessageHeader *ack;
+  const struct UDP_ACK_Message *udp_ack;
+  struct GNUNET_HELLO_Address *address;
+  struct GNUNET_ATS_Session *s;
+  struct GNUNET_TIME_Relative flow_delay;
+
+  if (ntohs (msg->size)
+      < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  udp_ack = (const struct UDP_ACK_Message *) msg;
+  ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
+  if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
+  {
+    GNUNET_break_op(0);
+    return;
+  }
+  address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
+                                           PLUGIN_NAME,
+                                           udp_addr,
+                                           udp_addr_len,
+                                           GNUNET_HELLO_ADDRESS_INFO_NONE);
+  s = udp_plugin_lookup_session (plugin,
+                                 address);
+  if (NULL == s)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "UDP session of address %s for ACK not found\n",
+         udp_address_to_string (plugin,
+                                address->address,
+                                address->address_length));
+    GNUNET_HELLO_address_free (address);
+    return;
+  }
+  if (NULL == s->frag_ctx)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+         "Fragmentation context of address %s for ACK (%s) not found\n",
+         udp_address_to_string (plugin,
+                                address->address,
+                                address->address_length),
+         GNUNET_FRAGMENT_print_ack (ack));
+    GNUNET_HELLO_address_free (address);
+    return;
+  }
+  GNUNET_HELLO_address_free (address);
+
+  if (UINT32_MAX == ntohl (udp_ack->delay))
+  {
+    /* Other peer asked for us to terminate the session */
+    udp_disconnect_session (plugin,
+                            s);
+    return;
+  }
+  flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
+  if (flow_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "We received a sending delay of %s for %s\n",
+         GNUNET_STRINGS_relative_time_to_string (flow_delay,
+                                                 GNUNET_YES),
+         GNUNET_i2s (&udp_ack->sender));
+  else
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "We received a sending delay of %s for %s\n",
+         GNUNET_STRINGS_relative_time_to_string (flow_delay,
+                                                 GNUNET_YES),
+         GNUNET_i2s (&udp_ack->sender));
+  /* Flow delay is for the reassembled packet, however, our delay
+     is per packet, so we need to adjust: */
+  flow_delay = GNUNET_TIME_relative_divide (flow_delay,
+                                            1 + (s->frag_ctx->payload_size /
+                                                 UDP_MTU));
+  s->flow_delay_from_other_peer = flow_delay;
+
+
+  if (GNUNET_OK !=
+      GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
+                                   ack))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
+         (unsigned int) ntohs (msg->size),
+         GNUNET_i2s (&udp_ack->sender),
+         udp_address_to_string (plugin,
+                                udp_addr,
+                                udp_addr_len));
+    /* Expect more ACKs to arrive */
+    return;
+  }
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Message from %s at %s full ACK'ed\n",
+       GNUNET_i2s (&udp_ack->sender),
+       udp_address_to_string (plugin,
+                              udp_addr,
+                              udp_addr_len));
+
+  /* Remove fragmented message after successful sending */
+  fragmented_message_done (s->frag_ctx,
+                           GNUNET_OK);
+}
+
+
+/**
+ * Message tokenizer has broken up an incomming message. Pass it on
+ * to the service.
+ *
+ * @param cls the `struct Plugin *`
+ * @param client the `struct GNUNET_ATS_Session *`
+ * @param hdr the actual message
+ * @return #GNUNET_OK (always)
+ */
+static int
+process_inbound_tokenized_messages (void *cls,
+                                    void *client,
+                                    const struct GNUNET_MessageHeader *hdr)
+{
+  struct Plugin *plugin = cls;
+  struct GNUNET_ATS_Session *session = client;
+
+  if (GNUNET_YES == session->in_destroy)
+    return GNUNET_OK;
+  reschedule_session_timeout (session);
+  session->flow_delay_for_other_peer
+    = plugin->env->receive (plugin->env->cls,
+                            session->address,
+                            session,
+                            hdr);
+  return GNUNET_OK;
+}
+
+
 /**
  * Destroy a session, plugin is being unloaded.
  *
@@ -2868,10 +2877,13 @@ ack_proc (void *cls,
                               GNUNET_NO);
     return;
   }
-  if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
+  if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us ==
+      s->flow_delay_for_other_peer.rel_value_us)
+    delay = UINT32_MAX;
+  else if (s->flow_delay_for_other_peer.rel_value_us < UINT32_MAX)
     delay = s->flow_delay_for_other_peer.rel_value_us;
   else
-    delay = UINT32_MAX;
+    delay = UINT32_MAX - 1; /* largest value we can communicate */
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Sending ACK to `%s' including delay of %s\n",
        udp_address_to_string (plugin,
@@ -3078,9 +3090,6 @@ udp_select_read (struct Plugin *plugin,
     return;
   }
 
-
-
-
   msg = (const struct GNUNET_MessageHeader *) buf;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "UDP received %u-byte message from `%s' type %u\n",