trying to fix #1813 by actually respecting delay values, also some minor code cleanup
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
index e0ef50acfe08aa9ee26c13bebf608cacd0b0b671..ad5b165b184265e4519290fc08e10dca892a3a37 100644 (file)
@@ -93,6 +93,28 @@ struct UDPMessage
 };
 
 
+/**
+ * UDP ACK Message-Packet header (after defragmentation).
+ */
+struct UDP_ACK_Message
+{
+  /**
+   * Message header.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Desired delay for flow control
+   */
+  uint32_t delay;
+
+  /**
+   * What is the identity of the sender
+   */
+  struct GNUNET_PeerIdentity sender;
+};
+
+
 /**
  * Network format for IPv4 addresses.
  */
@@ -174,6 +196,18 @@ struct Session
   struct GNUNET_TIME_Absolute valid_until;
 
   GNUNET_SCHEDULER_TaskIdentifier invalidation_task;
+
+  GNUNET_SCHEDULER_TaskIdentifier delayed_cont_task;
+
+  /**
+   * Desired delay for next sending we send to other peer
+   */
+  struct GNUNET_TIME_Relative flow_delay_for_other_peer;
+
+  /**
+   * Desired delay for next sending we received from other peer
+   */
+  struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
 };
 
 
@@ -210,6 +244,8 @@ struct ReceiveContext
    */
   size_t addr_len;
 
+  struct GNUNET_PeerIdentity id;
+
 };
 
 
@@ -318,16 +354,18 @@ struct PeerSessionIteratorContext
  * @param peer peer's identity
  * @return NULL if we have no session
  */
-struct Session *
+static struct Session *
 find_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *peer)
 {
   return GNUNET_CONTAINER_multihashmap_get (plugin->sessions,
                                             &peer->hashPubKey);
 }
 
-int inbound_session_iterator (void *cls,
-                             const GNUNET_HashCode * key,
-                             void *value)
+
+static int 
+inbound_session_iterator (void *cls,
+                         const GNUNET_HashCode * key,
+                         void *value)
 {
   struct PeerSessionIteratorContext *psc = cls;
   struct Session *s = value;
@@ -338,9 +376,9 @@ int inbound_session_iterator (void *cls,
   }
   if (psc->result != NULL)
     return GNUNET_NO;
-  else
-    return GNUNET_YES;
-};
+  return GNUNET_YES;
+}
+
 
 /**
  * Lookup the session for the given peer.
@@ -349,7 +387,7 @@ int inbound_session_iterator (void *cls,
  * @param peer peer's identity
  * @return NULL if we have no session
  */
-struct Session *
+static struct Session *
 find_inbound_session (struct Plugin *plugin,
                       const struct GNUNET_PeerIdentity *peer,
                       const void * addr, size_t addrlen)
@@ -364,6 +402,45 @@ find_inbound_session (struct Plugin *plugin,
 }
 
 
+static int 
+inbound_session_by_addr_iterator (void *cls,
+                                 const GNUNET_HashCode * key,
+                                 void *value)
+{
+  struct PeerSessionIteratorContext *psc = cls;
+  struct Session *s = value;
+  if (s->addrlen == psc->addrlen)
+  {
+    if (0 == memcmp (&s[1], psc->addr, s->addrlen))
+      psc->result = s;
+  }
+  if (psc->result != NULL)
+    return GNUNET_NO;
+  else
+    return GNUNET_YES;
+};
+
+/**
+ * Lookup the session for the given peer just by address.
+ *
+ * @param plugin the plugin
+ * @param addr address
+ * @param addrlen address length
+ * @return NULL if we have no session
+ */
+static struct Session *
+find_inbound_session_by_addr (struct Plugin *plugin, const void * addr, size_t addrlen)
+{
+  struct PeerSessionIteratorContext psc;
+  psc.result = NULL;
+  psc.addrlen = addrlen;
+  psc.addr = addr;
+
+  GNUNET_CONTAINER_multihashmap_iterate (plugin->inbound_sessions, &inbound_session_by_addr_iterator, &psc);
+  return psc.result;
+}
+
+
 /**
  * Destroy a session, plugin is being unloaded.
  *
@@ -379,6 +456,8 @@ destroy_session (void *cls, const GNUNET_HashCode * key, void *value)
 
   if (peer_session->frag != NULL)
     GNUNET_FRAGMENT_context_destroy (peer_session->frag);
+  if (GNUNET_SCHEDULER_NO_TASK != peer_session->delayed_cont_task)
+    GNUNET_SCHEDULER_cancel (peer_session->delayed_cont_task);
   GNUNET_free (peer_session);
   return GNUNET_OK;
 }
@@ -398,6 +477,8 @@ destroy_inbound_session (void *cls, const GNUNET_HashCode * key, void *value)
 
   if (s->invalidation_task != GNUNET_SCHEDULER_NO_TASK)
     GNUNET_SCHEDULER_cancel(s->invalidation_task);
+  if (GNUNET_SCHEDULER_NO_TASK != s->delayed_cont_task)
+    GNUNET_SCHEDULER_cancel (s->delayed_cont_task);
   GNUNET_free (s);
   return GNUNET_OK;
 }
@@ -431,6 +512,8 @@ udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
               "UDP DISCONNECT\n");
 
   plugin->last_expected_delay = GNUNET_FRAGMENT_context_destroy (session->frag);
+  if (GNUNET_SCHEDULER_NO_TASK != session->delayed_cont_task)
+    GNUNET_SCHEDULER_cancel (session->delayed_cont_task);
   if (session->cont != NULL)
     session->cont (session->cont_cls, target, GNUNET_SYSERR);
   GNUNET_free (session);
@@ -507,6 +590,7 @@ send_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
   GNUNET_FRAGMENT_context_transmission_done (session->frag);
 }
 
+
 static struct Session *
 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
     const void *addr, size_t addrlen,
@@ -578,6 +662,20 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
 static const char *
 udp_address_to_string (void *cls, const void *addr, size_t addrlen);
 
+
+static void
+udp_call_continuation (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Session *s = cls;
+  GNUNET_TRANSPORT_TransmitContinuation cont = s->cont;
+
+  s->delayed_cont_task = GNUNET_SCHEDULER_NO_TASK;
+  s->cont = NULL;
+  cont (s->cont_cls, &s->target, GNUNET_OK);
+}
+
+
 /**
  * Function that can be used by the transport service to transmit
  * a message using the plugin.
@@ -617,6 +715,7 @@ udp_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target,
   size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
   char mbuf[mlen];
   struct UDPMessage *udp;
+  struct GNUNET_TIME_Relative delta;
 
   if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
   {
@@ -633,6 +732,7 @@ udp_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target,
   if ((force_address == GNUNET_SYSERR) && (session == NULL))
     return GNUNET_SYSERR;
 
+  s = NULL;
   /* safety check: comparing address to address stored in session */
   if ((session != NULL) && (addr != NULL) && (addrlen != 0))
   {
@@ -699,11 +799,27 @@ udp_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target,
   udp->sender = *plugin->env->my_identity;
   memcpy (&udp[1], msgbuf, msgbuf_size);
 
+  if (s != NULL)  
+    delta = GNUNET_TIME_absolute_get_remaining (s->flow_delay_from_other_peer);
+  else
+    delta = GNUNET_TIME_UNIT_ZERO;
   if (mlen <= UDP_MTU)
   {
     mlen = udp_send (plugin, peer_session->sock_addr, &udp->header);
     if (cont != NULL)
-      cont (cont_cls, target, (mlen > 0) ? GNUNET_OK : GNUNET_SYSERR);
+    {
+      if ( (delta.rel_value > 0) &&
+          (mlen > 0) )
+      {
+       s->cont = cont;
+       s->cont_cls = cont_cls;
+       s->delayed_cont_task = GNUNET_SCHEDULER_add_delayed (delta,
+                                                            &udp_call_continuation,
+                                                            s);
+      }
+      else
+       cont (cont_cls, target, (mlen > 0) ? GNUNET_OK : GNUNET_SYSERR);
+    }
     GNUNET_free_non_null (peer_session);
   }
   else
@@ -763,6 +879,7 @@ process_inbound_tokenized_messages (void *cls, void *client,
   struct Plugin *plugin = cls;
   struct SourceInformation *si = client;
   struct GNUNET_ATS_Information distance;
+  struct GNUNET_TIME_Relative delay;
 
   /* setup ATS */
   distance.type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
@@ -770,8 +887,9 @@ process_inbound_tokenized_messages (void *cls, void *client,
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
                    "Giving Session %X %s  to transport\n", si->session, GNUNET_i2s(&si->session->target));
-  plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 1, si->session,
+  delay = plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 1, si->session,
                         si->arg, si->args);
+  si->session->flow_delay_for_other_peer = delay;
 }
 
 static void
@@ -880,7 +998,7 @@ process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
                                                       s,
                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
   }
-  s->valid_until = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+  s->valid_until = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
   if (s->invalidation_task != GNUNET_SCHEDULER_NO_TASK)
   {
     GNUNET_SCHEDULER_cancel(s->invalidation_task);
@@ -938,24 +1056,38 @@ static void
 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
 {
   struct ReceiveContext *rc = cls;
-  size_t msize = sizeof (struct UDPMessage) + ntohs (msg->size);
+
+  size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
   char buf[msize];
-  struct UDPMessage *udp;
+  struct UDP_ACK_Message *udp_ack;
+  uint32_t delay = 0;
+
+  struct Session *s;
+  s = find_inbound_session_by_addr (rc->plugin, rc->src_addr, rc->addr_len);
+  if (s != NULL)
+  {
+    if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
+      delay = s->flow_delay_for_other_peer.rel_value;
+    else
+      delay = UINT32_MAX;
+  }
+
 
 #if DEBUG_UDP
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s'\n",
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s' including delay of %u ms\n",
                    GNUNET_a2s (rc->src_addr,
                                (rc->src_addr->sa_family ==
                                 AF_INET) ? sizeof (struct sockaddr_in) :
-                               sizeof (struct sockaddr_in6)));
+                               sizeof (struct sockaddr_in6)),
+                               delay);
 #endif
-  udp = (struct UDPMessage *) buf;
-  udp->header.size = htons ((uint16_t) msize);
-  udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
-  udp->reserved = htonl (0);
-  udp->sender = *rc->plugin->env->my_identity;
-  memcpy (&udp[1], msg, ntohs (msg->size));
-  (void) udp_send (rc->plugin, rc->src_addr, &udp->header);
+  udp_ack = (struct UDP_ACK_Message *) buf;
+  udp_ack->header.size = htons ((uint16_t) msize);
+  udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
+  udp_ack->delay = htonl (delay);
+  udp_ack->sender = *rc->plugin->env->my_identity;
+  memcpy (&udp_ack[1], msg, ntohs (msg->size));
+  (void) udp_send (rc->plugin, rc->src_addr, &udp_ack->header);
 }
 
 
@@ -978,6 +1110,8 @@ struct FindReceiveContext
    * Number of bytes in 'addr'.
    */
   socklen_t addr_len;
+
+  struct Session * session;
 };
 
 
@@ -1024,10 +1158,12 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
   const struct GNUNET_MessageHeader *msg;
   const struct GNUNET_MessageHeader *ack;
   struct Session *peer_session;
-  const struct UDPMessage *udp;
+  const struct UDP_ACK_Message *udp_ack;
   struct ReceiveContext *rc;
   struct GNUNET_TIME_Absolute now;
   struct FindReceiveContext frc;
+  struct Session * s = NULL;
+  struct GNUNET_TIME_Relative flow_delay;
 
   fromlen = sizeof (addr);
   memset (&addr, 0, sizeof (addr));
@@ -1062,20 +1198,26 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
                          (const struct sockaddr *) addr, fromlen);
     return;
   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
+
     if (ntohs (msg->size) <
-        sizeof (struct UDPMessage) + sizeof (struct GNUNET_MessageHeader))
+        sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
     {
       GNUNET_break_op (0);
       return;
     }
-    udp = (const struct UDPMessage *) msg;
-    if (ntohl (udp->reserved) != 0)
+    udp_ack = (const struct UDP_ACK_Message *) msg;
+    s = find_inbound_session(plugin, &udp_ack->sender, addr, fromlen);
+    if (s != NULL)
     {
-      GNUNET_break_op (0);
-      return;
+      flow_delay.rel_value = (uint64_t) ntohl(udp_ack->delay);
+
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+                  "We received a sending delay of %llu\n", flow_delay.rel_value);
+
+      s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
     }
-    ack = (const struct GNUNET_MessageHeader *) &udp[1];
-    if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct UDPMessage))
+    ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
+    if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
     {
       GNUNET_break_op (0);
       return;
@@ -1083,11 +1225,11 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
 #if DEBUG_UDP
     LOG (GNUNET_ERROR_TYPE_DEBUG,
                 "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
-                (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp->sender),
+                (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
                 GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
 #endif
 
-    peer_session = find_session (plugin, &udp->sender);
+    peer_session = find_session (plugin, &udp_ack->sender);
     if (NULL == peer_session)
     {
 #if DEBUG_UDP
@@ -1100,13 +1242,13 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
       return;
     GNUNET_assert (GNUNET_OK ==
                    GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
-                                                         &udp->
+                                                         &udp_ack->
                                                          sender.hashPubKey,
                                                          peer_session));
     plugin->last_expected_delay =
         GNUNET_FRAGMENT_context_destroy (peer_session->frag);
     if (peer_session->cont != NULL)
-      peer_session->cont (peer_session->cont_cls, &udp->sender, GNUNET_OK);
+      peer_session->cont (peer_session->cont_cls, &udp_ack->sender, GNUNET_OK);
     GNUNET_free (peer_session);
     return;
   case GNUNET_MESSAGE_TYPE_FRAGMENT:
@@ -1717,27 +1859,6 @@ libgnunet_plugin_transport_udp_init (void *cls)
   return api;
 }
 
-/*
-
-static void invalidation_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct Session * s = cls;
-  struct Plugin * plugin = s->plugin;
-
-  s->invalidation_task = GNUNET_SCHEDULER_NO_TASK;
-
-  GNUNET_CONTAINER_multihashmap_remove (plugin->inbound_sessions, &s->target.hashPubKey, s);
-
-
-  plugin->env->session_end (plugin->env->cls, &s->target, s);
-  LOG (GNUNET_ERROR_TYPE_ERROR,
-              "Session %X is now invalid\n", s);
-  destroy_session(s, &s->target.hashPubKey, s);
-}
-*/
-
-
-
 /**
  * Shutdown the plugin.
  *