h_addr is a define in in netdb.h
[oweals/gnunet.git] / src / transport / gnunet-service-transport_manipulation.c
index 76278aa4bfe7be0ff0689ca7a1d3a3bb2825621e..f66d15d824ba0340a4d07671e6ca1a53c1d082f2 100644 (file)
 #include "gnunet-service-transport.h"
 #include "transport.h"
 
-#define DELAY 0
-#define DISTANCE 1
-
-
 enum TRAFFIC_METRIC_DIRECTION
 {
        TM_SEND = 0,
@@ -45,8 +41,6 @@ enum TRAFFIC_METRIC_DIRECTION
        TM_BOTH = 2
 };
 
-struct GST_ManipulationHandle man_handle;
-
 
 /**
  * Struct containing information about manipulations to a specific peer
@@ -147,9 +141,16 @@ struct DelayQueueEntry
 
        /**
         * Peer this entry is belonging to
+        * if (NULL == tmp): enqueued in generic DLL and scheduled by generic_send_delay_task
+        * else: enqueued in tmp->send_head and tmp->send_tail and scheduled by tmp->send_delay_task
         */
        struct TM_Peer *tmp;
 
+       /**
+        * Peer ID
+        */
+       struct GNUNET_PeerIdentity id;
+
        /**
         * Absolute time when to send
         */
@@ -181,6 +182,22 @@ struct DelayQueueEntry
        void *cont_cls;
 };
 
+struct GST_ManipulationHandle man_handle;
+
+/**
+ * DLL head for delayed messages based on general delay
+ */
+struct DelayQueueEntry *generic_dqe_head;
+
+/**
+ * DLL tail for delayed messages based on general delay
+ */
+struct DelayQueueEntry *generic_dqe_tail;
+
+/**
+ * Task to schedule delayed sending based on general delay
+ */
+GNUNET_SCHEDULER_TaskIdentifier generic_send_delay_task;
 
 static void
 set_metric (struct TM_Peer *dest, int direction, uint32_t type, uint32_t value)
@@ -340,18 +357,37 @@ send_delayed (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
        struct DelayQueueEntry *next;
        struct TM_Peer *tmp = dqe->tmp;
        struct GNUNET_TIME_Relative delay;
-       tmp->send_delay_task = GNUNET_SCHEDULER_NO_TASK;
-       GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
-       GST_neighbours_send (&tmp->peer, dqe->msg, dqe->msg_size, dqe->timeout, dqe->cont, dqe->cont_cls);
 
-       next = tmp->send_head;
-       if (NULL != next)
+       if (NULL != tmp)
        {
-                       /* More delayed messages */
-                       delay = GNUNET_TIME_absolute_get_remaining (next->sent_at);
-                       tmp->send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
-       }
+                       GNUNET_break (GNUNET_YES == GST_neighbours_test_connected (&dqe->id));
+                       tmp->send_delay_task = GNUNET_SCHEDULER_NO_TASK;
+                       GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
+                       GST_neighbours_send (&dqe->id, dqe->msg, dqe->msg_size, dqe->timeout, dqe->cont, dqe->cont_cls);
 
+                       next = tmp->send_head;
+                       if (NULL != next)
+                       {
+                               /* More delayed messages */
+                               delay = GNUNET_TIME_absolute_get_remaining (next->sent_at);
+                               tmp->send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, next);
+                       }
+       }
+       else
+       {
+                       /* Remove from generic queue */
+                       GNUNET_break (GNUNET_YES == GST_neighbours_test_connected (&dqe->id));
+                       generic_send_delay_task = GNUNET_SCHEDULER_NO_TASK;
+                       GNUNET_CONTAINER_DLL_remove (generic_dqe_head, generic_dqe_tail, dqe);
+                       GST_neighbours_send (&dqe->id, dqe->msg, dqe->msg_size, dqe->timeout, dqe->cont, dqe->cont_cls);
+                       next = generic_dqe_head;
+                       if (NULL != next)
+                       {
+                               /* More delayed messages */
+                               delay = GNUNET_TIME_absolute_get_remaining (next->sent_at);
+                               generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, next);
+                       }
+       }
        GNUNET_free (dqe);
 }
 
@@ -377,13 +413,15 @@ GST_manipulation_send (const struct GNUNET_PeerIdentity *target, const void *msg
 
        if (NULL != (tmp = GNUNET_CONTAINER_multihashmap_get (man_handle.peers, &target->hashPubKey)))
        {
+                       GNUNET_break (GNUNET_YES == GST_neighbours_test_connected(target));
                        /* Manipulate here */
                        /* Delay */
                        if (UINT32_MAX != find_metric(tmp, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND))
                        {
                                        /* We have a delay */
-                                       delay.rel_value = find_metric(tmp, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND);
+                                       delay.rel_value_us = find_metric (tmp, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND);
                                        dqe = GNUNET_malloc (sizeof (struct DelayQueueEntry) + msg_size);
+                                       dqe->id = *target;
                                        dqe->tmp = tmp;
                                        dqe->sent_at = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get(), delay);
                                        dqe->cont = cont;
@@ -395,15 +433,21 @@ GST_manipulation_send (const struct GNUNET_PeerIdentity *target, const void *msg
                                        GNUNET_CONTAINER_DLL_insert_tail (tmp->send_head, tmp->send_tail, dqe);
                                        if (GNUNET_SCHEDULER_NO_TASK == tmp->send_delay_task)
                                                tmp->send_delay_task =GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
+                                       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                                                       "Delaying %u byte message to peer `%s' with generic delay for %ms\n",
+                                                       msg_size, GNUNET_i2s (target),
+                                                   GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
                                        return;
                        }
        }
        else if (UINT32_MAX != find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND))
        {
+                       GNUNET_break (GNUNET_YES == GST_neighbours_test_connected(target));
                        /* We have a delay */
-                       delay.rel_value = find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND);
+                       delay.rel_value_us = find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND);
                        dqe = GNUNET_malloc (sizeof (struct DelayQueueEntry) + msg_size);
-                       dqe->tmp = tmp;
+                       dqe->id = *target;
+                       dqe->tmp = NULL;
                        dqe->sent_at = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get(), delay);
                        dqe->cont = cont;
                        dqe->cont_cls = cont_cls;
@@ -411,9 +455,15 @@ GST_manipulation_send (const struct GNUNET_PeerIdentity *target, const void *msg
                        dqe->msg_size = msg_size;
                        dqe->timeout = timeout;
                        memcpy (dqe->msg, msg, msg_size);
-                       GNUNET_CONTAINER_DLL_insert_tail (tmp->send_head, tmp->send_tail, dqe);
-                       if (GNUNET_SCHEDULER_NO_TASK == tmp->send_delay_task)
-                               tmp->send_delay_task =GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
+                       GNUNET_CONTAINER_DLL_insert_tail (generic_dqe_head, generic_dqe_tail, dqe);
+                       if (GNUNET_SCHEDULER_NO_TASK == generic_send_delay_task)
+                       {
+                               generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
+                       }
+                       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                                   "Delaying %u byte message to peer `%s' with peer specific delay for %s\n",
+                                   msg_size, GNUNET_i2s (target),
+                                   GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
                        return;
        }
 
@@ -450,7 +500,6 @@ GST_manipulation_manipulate_metrics (const struct GNUNET_PeerIdentity *peer,
        {
                ats_new[d] = ats[d];
                m_tmp = UINT32_MAX;
-               g_tmp = UINT32_MAX;
                if (NULL != tmp)
                        m_tmp = find_metric (tmp, ntohl(ats[d].type), TM_RECEIVE);
                g_tmp = find_metric (&man_handle.general, ntohl(ats[d].type), TM_RECEIVE);
@@ -492,25 +541,31 @@ GST_manipulation_recv (void *cls,
        struct GNUNET_TIME_Relative m_delay;
 
        g_recv_delay = find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_RECEIVE);
-       if ((g_recv_delay >= GNUNET_TIME_UNIT_ZERO.rel_value) && (UINT32_MAX != g_recv_delay))
-               m_delay.rel_value = g_recv_delay; /* Global delay */
+       if ((g_recv_delay >= GNUNET_TIME_UNIT_ZERO.rel_value_us) && (UINT32_MAX != g_recv_delay))
+         m_delay.rel_value_us = g_recv_delay; /* Global delay */
        else
-               m_delay = GNUNET_TIME_UNIT_ZERO;
+         m_delay = GNUNET_TIME_UNIT_ZERO;
 
        if (NULL != (tmp = GNUNET_CONTAINER_multihashmap_get (man_handle.peers, &peer->hashPubKey)))
        {
-                       /* Manipulate receive delay */
-                       p_recv_delay = find_metric (tmp, GNUNET_ATS_QUALITY_NET_DELAY, TM_RECEIVE);
-                       if (UINT32_MAX != p_recv_delay)
-                                       m_delay.rel_value = p_recv_delay; /* Peer specific delay */
+         /* Manipulate receive delay */
+         p_recv_delay = find_metric (tmp, GNUNET_ATS_QUALITY_NET_DELAY, TM_RECEIVE);
+         if (UINT32_MAX != p_recv_delay)
+           m_delay.rel_value_us = p_recv_delay; /* Peer specific delay */
        }
 
        quota_delay = GST_receive_callback (cls, peer, message,
                        session, sender_address, sender_address_len);
-       if (quota_delay.rel_value > m_delay.rel_value)
-               return quota_delay;
-       else
-               return m_delay;
+
+       if (quota_delay.rel_value_us > m_delay.rel_value_us)
+               m_delay = quota_delay;
+
+       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                   "Delaying next receive for peer `%s' for %s\n",
+                   GNUNET_i2s (peer),
+                   GNUNET_STRINGS_relative_time_to_string (m_delay, GNUNET_YES));
+       return m_delay;
+
 }
 
 
@@ -522,42 +577,62 @@ GST_manipulation_recv (void *cls,
 void
 GST_manipulation_init (const struct GNUNET_CONFIGURATION_Handle *GST_cfg)
 {
-       unsigned long long tmp;
-
-       if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
-                       "transport", "MANIPULATE_DISTANCE_IN", &tmp))
-       {
-               GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Setting inbound distance_in to %u\n",
-                               (unsigned long long) tmp);
-               set_metric (&man_handle.general, TM_RECEIVE, GNUNET_ATS_QUALITY_NET_DISTANCE, tmp);
-       }
-
-       if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
-                       "transport", "MANIPULATE_DISTANCE_OUT", &tmp))
-       {
-               GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Setting outbound distance_in to %u\n",
-                               (unsigned long long) tmp);
-               set_metric (&man_handle.general, TM_SEND, GNUNET_ATS_QUALITY_NET_DISTANCE, tmp);
-       }
-
-       if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
-                       "transport", "MANIPULATE_DELAY_IN", &tmp))
-       {
-               GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying inbound traffic for %llu ms\n",
-                               (unsigned long long) tmp);
-               set_metric (&man_handle.general, TM_RECEIVE, GNUNET_ATS_QUALITY_NET_DELAY, tmp);
-       }
-
-
-       if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
-                       "transport", "MANIPULATE_DELAY_OUT", &tmp))
-       {
-               GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying outbound traffic for %llu ms\n",
-                               (unsigned long long) tmp);
-               set_metric (&man_handle.general, TM_SEND, GNUNET_ATS_QUALITY_NET_DELAY, tmp);
-       }
-
-       man_handle.peers = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
+  unsigned long long tmp;
+  struct GNUNET_TIME_Relative delay;
+  
+  if ( (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
+                                                         "transport",
+                                                         "MANIPULATE_DISTANCE_IN",
+                                                           &tmp)) && 
+       (tmp > 0) )
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+               "Setting inbound distance_in to %llu\n",
+               (unsigned long long) tmp);
+    set_metric (&man_handle.general, TM_RECEIVE, GNUNET_ATS_QUALITY_NET_DISTANCE, tmp);
+  }
+       
+  if ( (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
+                                                           "transport", 
+                                                           "MANIPULATE_DISTANCE_OUT",
+                                                           &tmp)) &&
+       (tmp > 0) )
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+               "Setting outbound distance_in to %llu\n",
+               (unsigned long long) tmp);
+    set_metric (&man_handle.general, TM_SEND, 
+               GNUNET_ATS_QUALITY_NET_DISTANCE, tmp);
+  }
+  
+  if ( (GNUNET_OK == GNUNET_CONFIGURATION_get_value_time (GST_cfg,
+                                                         "transport",
+                                                         "MANIPULATE_DELAY_IN",
+                                                         &delay)) && 
+       (delay.rel_value_us > 0) )
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+               "Delaying inbound traffic for %s\n",
+               GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
+    set_metric (&man_handle.general, TM_RECEIVE,
+               GNUNET_ATS_QUALITY_NET_DELAY, 
+               delay.rel_value_us);
+  }
+  if ( (GNUNET_OK == GNUNET_CONFIGURATION_get_value_time (GST_cfg,
+                                                         "transport",
+                                                         "MANIPULATE_DELAY_OUT",
+                                                         &delay)) && 
+       (delay.rel_value_us > 0) )
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+               "Delaying outbound traffic for %s\n",
+               GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
+    set_metric (&man_handle.general, 
+               TM_SEND,
+               GNUNET_ATS_QUALITY_NET_DELAY, 
+               delay.rel_value_us);
+  }  
+  man_handle.peers = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
 }
 
 
@@ -566,28 +641,83 @@ free_tmps (void *cls,
           const struct GNUNET_HashCode * key,
           void *value)
 {
+  struct DelayQueueEntry *dqe;
+  struct DelayQueueEntry *next;
+
+  if (NULL != value)
+  {
+    struct TM_Peer *tmp = (struct TM_Peer *) value;
+
+    if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (man_handle.peers, key, value))
+      GNUNET_break (0);
+    free_metric (tmp);
+    next = tmp->send_head;
+    while (NULL != (dqe = next))
+      {
+       next = dqe->next;
+       GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
+       if (NULL != dqe->cont)
+         dqe->cont (dqe->cont_cls, GNUNET_SYSERR, dqe->msg_size, 0);
+       GNUNET_free (dqe);
+      }
+    if (GNUNET_SCHEDULER_NO_TASK != tmp->send_delay_task)
+      {
+       GNUNET_SCHEDULER_cancel (tmp->send_delay_task);
+       tmp->send_delay_task = GNUNET_SCHEDULER_NO_TASK;
+      }
+    GNUNET_free (tmp);
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Notify manipulation about disconnect so it can discard queued messages
+ *
+ * @param peer the disconnecting peer
+ */
+void
+GST_manipulation_peer_disconnect (const struct GNUNET_PeerIdentity *peer)
+{
+       struct TM_Peer *tmp;
        struct DelayQueueEntry *dqe;
        struct DelayQueueEntry *next;
-       if (NULL != value)
+
+       if (NULL != (tmp = GNUNET_CONTAINER_multihashmap_get (man_handle.peers, &peer->hashPubKey)))
        {
-                       struct TM_Peer *tmp = (struct TM_Peer *) value;
-                       GNUNET_CONTAINER_multihashmap_remove (man_handle.peers, key, value);
-                       free_metric (tmp);
                        next = tmp->send_head;
                        while (NULL != (dqe = next))
                        {
                                        next = dqe->next;
                                        GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
+                                       if (NULL != dqe->cont)
+                                                       dqe->cont (dqe->cont_cls, GNUNET_SYSERR, dqe->msg_size, 0);
                                        GNUNET_free (dqe);
                        }
-                       if (GNUNET_SCHEDULER_NO_TASK != tmp->send_delay_task)
+       }
+       else if (UINT32_MAX != find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND))
+       {
+                       next = generic_dqe_head;
+                       while (NULL != (dqe = next))
+                       {
+                                       next = dqe->next;
+                                       if (0 == memcmp (peer, &dqe->id, sizeof (dqe->id)))
+                                       {
+                                                       GNUNET_CONTAINER_DLL_remove (generic_dqe_head, generic_dqe_tail, dqe);
+                                                       if (NULL != dqe->cont)
+                                                               dqe->cont (dqe->cont_cls, GNUNET_SYSERR, dqe->msg_size, 0);
+                                                       GNUNET_free (dqe);
+                                       }
+                       }
+                       if (GNUNET_SCHEDULER_NO_TASK != generic_send_delay_task)
                        {
-                                       GNUNET_SCHEDULER_cancel (tmp->send_delay_task);
-                                       tmp->send_delay_task = GNUNET_SCHEDULER_NO_TASK;
+                                       GNUNET_SCHEDULER_cancel (generic_send_delay_task);
+                                       if (NULL != generic_dqe_head)
+                                               generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (
+                                                               GNUNET_TIME_absolute_get_remaining(generic_dqe_head->sent_at),
+                                                               &send_delayed, generic_dqe_head);
                        }
-                       GNUNET_free (tmp);
        }
-       return GNUNET_OK;
 }
 
 
@@ -597,9 +727,26 @@ free_tmps (void *cls,
 void
 GST_manipulation_stop ()
 {
+       struct DelayQueueEntry *cur;
+       struct DelayQueueEntry *next;
        GNUNET_CONTAINER_multihashmap_iterate (man_handle.peers, &free_tmps,NULL);
-
        GNUNET_CONTAINER_multihashmap_destroy (man_handle.peers);
+
+       next = generic_dqe_head;
+       while (NULL != (cur = next))
+       {
+                       next = cur->next;
+                       GNUNET_CONTAINER_DLL_remove (generic_dqe_head, generic_dqe_tail, cur);
+                       if (NULL != cur->cont)
+                               cur->cont (cur->cont_cls, GNUNET_SYSERR, cur->msg_size, 0);
+                       GNUNET_free (cur);
+       }
+       if (GNUNET_SCHEDULER_NO_TASK != generic_send_delay_task)
+       {
+                       GNUNET_SCHEDULER_cancel (generic_send_delay_task);
+                       generic_send_delay_task = GNUNET_SCHEDULER_NO_TASK;
+       }
+
        free_metric (&man_handle.general);
        man_handle.peers = NULL;
 }