#include "gnunet-service-transport.h"
#include "transport.h"
-#define DELAY 0
-#define DISTANCE 1
-
-
enum TRAFFIC_METRIC_DIRECTION
{
TM_SEND = 0,
TM_BOTH = 2
};
-struct GST_ManipulationHandle man_handle;
-
-
/**
* Struct containing information about manipulations to a specific peer
/**
* Peer this entry is belonging to
+ * if (NULL == tmp): enqueued in generic DLL and scheduled by generic_send_delay_task
+ * else: enqueued in tmp->send_head and tmp->send_tail and scheduled by tmp->send_delay_task
*/
struct TM_Peer *tmp;
void *cont_cls;
};
+struct GST_ManipulationHandle man_handle;
+/**
+ * DLL head for delayed messages based on general delay
+ */
struct DelayQueueEntry *generic_dqe_head;
+
+/**
+ * DLL tail for delayed messages based on general delay
+ */
struct DelayQueueEntry *generic_dqe_tail;
/**
- * Task to schedule delayed sendding
+ * Task to schedule delayed sending based on general delay
*/
GNUNET_SCHEDULER_TaskIdentifier generic_send_delay_task;
if (NULL != tmp)
{
+ GNUNET_break (GNUNET_YES == GST_neighbours_test_connected (&dqe->id));
tmp->send_delay_task = GNUNET_SCHEDULER_NO_TASK;
GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
GST_neighbours_send (&dqe->id, dqe->msg, dqe->msg_size, dqe->timeout, dqe->cont, dqe->cont_cls);
{
/* More delayed messages */
delay = GNUNET_TIME_absolute_get_remaining (next->sent_at);
- tmp->send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
+ tmp->send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, next);
}
}
else
{
- /* Remove from generic queue */
+ /* Remove from generic queue */
+ GNUNET_break (GNUNET_YES == GST_neighbours_test_connected (&dqe->id));
generic_send_delay_task = GNUNET_SCHEDULER_NO_TASK;
GNUNET_CONTAINER_DLL_remove (generic_dqe_head, generic_dqe_tail, dqe);
GST_neighbours_send (&dqe->id, dqe->msg, dqe->msg_size, dqe->timeout, dqe->cont, dqe->cont_cls);
{
/* More delayed messages */
delay = GNUNET_TIME_absolute_get_remaining (next->sent_at);
- generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
+ generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, next);
}
}
-
GNUNET_free (dqe);
}
if (NULL != (tmp = GNUNET_CONTAINER_multihashmap_get (man_handle.peers, &target->hashPubKey)))
{
+ GNUNET_break (GNUNET_YES == GST_neighbours_test_connected(target));
/* Manipulate here */
/* Delay */
if (UINT32_MAX != find_metric(tmp, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND))
GNUNET_CONTAINER_DLL_insert_tail (tmp->send_head, tmp->send_tail, dqe);
if (GNUNET_SCHEDULER_NO_TASK == tmp->send_delay_task)
tmp->send_delay_task =GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Delaying %u byte message to peer `%s' with generic delay for %llu ms\n",
+ msg_size, GNUNET_i2s (target), (long long unsigned int) delay.rel_value);
return;
}
}
else if (UINT32_MAX != find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND))
{
+ GNUNET_break (GNUNET_YES == GST_neighbours_test_connected(target));
/* We have a delay */
delay.rel_value = find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND);
dqe = GNUNET_malloc (sizeof (struct DelayQueueEntry) + msg_size);
memcpy (dqe->msg, msg, msg_size);
GNUNET_CONTAINER_DLL_insert_tail (generic_dqe_head, generic_dqe_tail, dqe);
if (GNUNET_SCHEDULER_NO_TASK == generic_send_delay_task)
+ {
generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Delaying %u byte message to peer `%s' with peer specific delay for %llu ms\n",
+ msg_size, GNUNET_i2s (target), (long long unsigned int) delay.rel_value);
return;
}
quota_delay = GST_receive_callback (cls, peer, message,
session, sender_address, sender_address_len);
+
if (quota_delay.rel_value > m_delay.rel_value)
- return quota_delay;
- else
- return m_delay;
+ m_delay = quota_delay;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Delaying next receive for peer `%s' for %llu ms\n",
+ GNUNET_i2s (peer), (long long unsigned int) m_delay.rel_value);
+ return m_delay;
+
}
unsigned long long tmp;
if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
- "transport", "MANIPULATE_DISTANCE_IN", &tmp))
+ "transport", "MANIPULATE_DISTANCE_IN", &tmp) && (tmp > 0))
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Setting inbound distance_in to %u\n",
(unsigned long long) tmp);
}
if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
- "transport", "MANIPULATE_DISTANCE_OUT", &tmp))
+ "transport", "MANIPULATE_DISTANCE_OUT", &tmp) && (tmp > 0))
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Setting outbound distance_in to %u\n",
(unsigned long long) tmp);
}
if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
- "transport", "MANIPULATE_DELAY_IN", &tmp))
+ "transport", "MANIPULATE_DELAY_IN", &tmp) && (tmp > 0))
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying inbound traffic for %llu ms\n",
(unsigned long long) tmp);
if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
- "transport", "MANIPULATE_DELAY_OUT", &tmp))
+ "transport", "MANIPULATE_DELAY_OUT", &tmp) && (tmp > 0))
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying outbound traffic for %llu ms\n",
(unsigned long long) tmp);
if (NULL != value)
{
struct TM_Peer *tmp = (struct TM_Peer *) value;
- GNUNET_CONTAINER_multihashmap_remove (man_handle.peers, key, value);
+ if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (man_handle.peers, key, value))
+ GNUNET_break (0);
free_metric (tmp);
next = tmp->send_head;
while (NULL != (dqe = next))
{
next = dqe->next;
GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
+ if (NULL != dqe->cont)
+ dqe->cont (dqe->cont_cls, GNUNET_SYSERR, dqe->msg_size, 0);
GNUNET_free (dqe);
}
if (GNUNET_SCHEDULER_NO_TASK != tmp->send_delay_task)
return GNUNET_OK;
}
+/**
+ * Notify manipulation about disconnect so it can discard queued messages
+ *
+ * @param peer the disconnecting peer
+ */
+void
+GST_manipulation_peer_disconnect (const struct GNUNET_PeerIdentity *peer)
+{
+ struct TM_Peer *tmp;
+ struct DelayQueueEntry *dqe;
+ struct DelayQueueEntry *next;
+
+ if (NULL != (tmp = GNUNET_CONTAINER_multihashmap_get (man_handle.peers, &peer->hashPubKey)))
+ {
+ next = tmp->send_head;
+ while (NULL != (dqe = next))
+ {
+ next = dqe->next;
+ GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
+ if (NULL != dqe->cont)
+ dqe->cont (dqe->cont_cls, GNUNET_SYSERR, dqe->msg_size, 0);
+ GNUNET_free (dqe);
+ }
+ }
+ else if (UINT32_MAX != find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND))
+ {
+ next = generic_dqe_head;
+ while (NULL != (dqe = next))
+ {
+ next = dqe->next;
+ if (0 == memcmp (peer, &dqe->id, sizeof (dqe->id)))
+ {
+ GNUNET_CONTAINER_DLL_remove (generic_dqe_head, generic_dqe_tail, dqe);
+ if (NULL != dqe->cont)
+ dqe->cont (dqe->cont_cls, GNUNET_SYSERR, dqe->msg_size, 0);
+ GNUNET_free (dqe);
+ }
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != generic_send_delay_task)
+ {
+ GNUNET_SCHEDULER_cancel (generic_send_delay_task);
+ generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining(generic_dqe_head->sent_at), &send_delayed, generic_dqe_head);
+ }
+ }
+}
+
/**
* Stop traffic manipulation
{
next = cur->next;
GNUNET_CONTAINER_DLL_remove (generic_dqe_head, generic_dqe_tail, cur);
+ if (NULL != cur->cont)
+ cur->cont (cur->cont_cls, GNUNET_SYSERR, cur->msg_size, 0);
GNUNET_free (cur);
}
if (GNUNET_SCHEDULER_NO_TASK != generic_send_delay_task)