/*
This file is part of GNUnet.
- (C) 2010,2011 Christian Grothoff (and other contributing authors)
+ (C) 2010-2013 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
#include "gnunet-service-transport.h"
#include "transport.h"
-#define DELAY 0
-#define DISTANCE 1
+enum TRAFFIC_METRIC_DIRECTION
+{
+ TM_SEND = 0,
+ TM_RECEIVE = 1,
+ TM_BOTH = 2
+};
-struct GST_ManipulationHandle man_handle;
+/**
+ * Struct containing information about manipulations to a specific peer
+ */
+struct TM_Peer;
-struct GST_ManipulationHandle
+/**
+ * Manipulation entry
+ */
+struct PropManipulationEntry
{
- struct GNUNET_CONTAINER_MultiHashMap *peers;
+ /**
+ * Next in DLL
+ */
+ struct PropManipulationEntry *next;
+
+ /**
+ * Previous in DLL
+ */
+ struct PropManipulationEntry *prev;
/**
- * General inbound delay
+ * ATS type in HBO
*/
- struct GNUNET_TIME_Relative delay_in;
+ uint32_t type;
+
+ /**
+ * Value in HBO
+ */
+ uint32_t metrics[TM_BOTH];
+
+};
+
+/**
+ * Struct containing information about manipulations to a specific peer
+ */
+struct TM_Peer
+{
+ /**
+ * Peer ID
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ struct PropManipulationEntry *head;
+ struct PropManipulationEntry *tail;
/**
- * General outbound delay
+ * Peer specific manipulation metrics
*/
- struct GNUNET_TIME_Relative delay_out;
+ uint32_t metrics [TM_BOTH][GNUNET_ATS_QualityPropertiesCount];
/**
- * General inbound distance
+ * Task to schedule delayed sendding
*/
- unsigned long long distance_in;
+ GNUNET_SCHEDULER_TaskIdentifier send_delay_task;
/**
- * General outbound distance
+ * Send queue DLL head
*/
- unsigned long long distance_out;
+ struct DelayQueueEntry *send_head;
+ /**
+ * Send queue DLL tail
+ */
+ struct DelayQueueEntry *send_tail;
};
-struct TM_Peer;
+struct GST_ManipulationHandle
+{
+ /**
+ * Hashmap contain all peers currently manipulated
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *peers;
+
+ /**
+ * Peer containing information for general manipulation
+ */
+ struct TM_Peer general;
+};
+/**
+ * Entry in the delay queue for an outbound delayed message
+ */
struct DelayQueueEntry
{
+ /**
+ * Next in DLL
+ */
struct DelayQueueEntry *prev;
+
+ /**
+ * Previous in DLL
+ */
struct DelayQueueEntry *next;
+
+ /**
+ * Peer this entry is belonging to
+ * if (NULL == tmp): enqueued in generic DLL and scheduled by generic_send_delay_task
+ * else: enqueued in tmp->send_head and tmp->send_tail and scheduled by tmp->send_delay_task
+ */
struct TM_Peer *tmp;
+
+ /**
+ * Peer ID
+ */
+ struct GNUNET_PeerIdentity id;
+
+ /**
+ * Absolute time when to send
+ */
struct GNUNET_TIME_Absolute sent_at;
+
+ /**
+ * The message
+ */
void *msg;
+
+ /**
+ * The message size
+ */
size_t msg_size;
+
+ /**
+ * Message timeout
+ */
struct GNUNET_TIME_Relative timeout;
+
+ /**
+ * Transports send continuation
+ */
GST_NeighbourSendContinuation cont;
+
+ /**
+ * Transports send continuation cls
+ */
void *cont_cls;
};
-struct TM_Peer
-{
- struct GNUNET_PeerIdentity peer;
- uint32_t metrics [TM_BOTH][GNUNET_ATS_QualityPropertiesCount];
- GNUNET_SCHEDULER_TaskIdentifier send_delay_task;
- struct DelayQueueEntry *send_head;
- struct DelayQueueEntry *send_tail;
-};
+struct GST_ManipulationHandle man_handle;
+
+/**
+ * DLL head for delayed messages based on general delay
+ */
+struct DelayQueueEntry *generic_dqe_head;
+/**
+ * DLL tail for delayed messages based on general delay
+ */
+struct DelayQueueEntry *generic_dqe_tail;
+/**
+ * Task to schedule delayed sending based on general delay
+ */
+GNUNET_SCHEDULER_TaskIdentifier generic_send_delay_task;
static void
-set_delay(struct TM_Peer *tmp, struct GNUNET_PeerIdentity *peer, int direction, uint32_t value)
+set_metric (struct TM_Peer *dest, int direction, uint32_t type, uint32_t value)
{
- uint32_t val;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Set traffic metrics %s for peer `%s' in direction %s to %u\n",
- "DELAY", GNUNET_i2s(peer),
- (TM_BOTH == direction) ? "BOTH" : (TM_SEND == direction) ? "SEND": "RECEIVE", value);
-
- if (UINT32_MAX == value)
- val = UINT32_MAX - 1; /* prevent overflow */
- else if (0 == value)
- val = UINT32_MAX; /* disable */
- else
- val = value;
+ struct PropManipulationEntry *cur;
+ for (cur = dest->head; NULL != cur; cur = cur->next)
+ {
+ if (cur->type == type)
+ break;
+ }
+ if (NULL == cur)
+ {
+ cur = GNUNET_malloc (sizeof (struct PropManipulationEntry));
+ GNUNET_CONTAINER_DLL_insert (dest->head, dest->tail, cur);
+ cur->type = type;
+ cur->metrics[TM_SEND] = UINT32_MAX;
+ cur->metrics[TM_RECEIVE] = UINT32_MAX;
+ }
+
switch (direction) {
case TM_BOTH:
- tmp->metrics[TM_SEND][DELAY] = val;
- tmp->metrics[TM_RECEIVE][DELAY] = val;
+ cur->metrics[TM_SEND] = value;
+ cur->metrics[TM_RECEIVE] = value;
break;
case TM_SEND:
- tmp->metrics[TM_SEND][DELAY] = val;
+ cur->metrics[TM_SEND] = value;
break;
case TM_RECEIVE:
- tmp->metrics[TM_RECEIVE][DELAY] = val;
+ cur->metrics[TM_RECEIVE] = value;
break;
default:
break;
}
+static uint32_t
+find_metric (struct TM_Peer *dest, uint32_t type, int direction)
+{
+ struct PropManipulationEntry *cur;
+
+ for (cur = dest->head; NULL != cur; cur = cur->next)
+ {
+ if (cur->type == type)
+ return cur->metrics[direction];
+
+ }
+ return UINT32_MAX;
+}
+
+/**
+ * Clean up metrics for a peer
+ */
+
static void
-set_distance (struct TM_Peer *tmp, struct GNUNET_PeerIdentity *peer, int direction, uint32_t value)
+free_metric (struct TM_Peer *dest)
{
- uint32_t val;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Set traffic metrics %s for peer `%s' in direction %s to %u\n",
- "DISTANCE", GNUNET_i2s(peer),
- (TM_BOTH == direction) ? "BOTH" : (TM_SEND == direction) ? "SEND": "RECEIVE", value);
-
- if (UINT32_MAX == value)
- val = UINT32_MAX - 1; /* prevent overflow */
- else if (0 == value)
- val = UINT32_MAX; /* disable */
- else
- val = value;
+ struct PropManipulationEntry *cur;
+ struct PropManipulationEntry *next;
- switch (direction) {
- case TM_BOTH:
- tmp->metrics[TM_SEND][DISTANCE] = val;
- tmp->metrics[TM_RECEIVE][DISTANCE] = val;
- break;
- case TM_SEND:
- tmp->metrics[TM_SEND][DISTANCE] = val;
- break;
- case TM_RECEIVE:
- tmp->metrics[TM_RECEIVE][DISTANCE] = val;
- break;
- default:
- break;
+ for (cur = dest->head; NULL != cur; cur = next)
+ {
+ next = cur->next;
+ GNUNET_CONTAINER_DLL_remove (dest->head, dest->tail, cur);
+ GNUNET_free (cur);
}
}
+
+/**
+ * Set traffic metric to manipulate
+ *
+ * @param cls closure
+ * @param client client sending message
+ * @param message containing information
+ */
void
GST_manipulation_set_metric (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message)
if (0 == ntohs (tm->ats_count))
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ direction = TM_BOTH;
+ switch (ntohs(tm->direction)) {
+ case 1:
+ direction = TM_SEND;
+ break;
+ case 2:
+ direction = TM_RECEIVE;
+ break;
+ case 3:
+ direction = TM_BOTH;
+ break;
+ default:
+ break;
+ }
+
memset (&dummy, '\0', sizeof (struct GNUNET_PeerIdentity));
if (0 == memcmp (&tm->peer, &dummy, sizeof (struct GNUNET_PeerIdentity)))
{
{
type = htonl (ats[c].type);
value = htonl (ats[c].value);
- direction = ntohs (tm->direction);
- switch (type) {
- case GNUNET_ATS_QUALITY_NET_DELAY:
-
- if ((TM_RECEIVE == direction) || (TM_BOTH == direction))
- man_handle.delay_in.rel_value = value;
- if ((TM_SEND == direction) || (TM_BOTH == direction))
- man_handle.delay_out.rel_value = value;
- break;
- case GNUNET_ATS_QUALITY_NET_DISTANCE:
-
- break;
- default:
- break;
- }
-
+ set_metric (&man_handle.general, direction, type, value);
}
return;
}
{
type = htonl (ats[c].type);
value = htonl (ats[c].value);
- switch (type) {
- case GNUNET_ATS_QUALITY_NET_DELAY:
- set_delay (tmp, &tm->peer, ntohs (tm->direction), value);
- break;
- case GNUNET_ATS_QUALITY_NET_DISTANCE:
- set_distance (tmp, &tm->peer, ntohs (tm->direction), value);
- break;
- default:
- break;
- }
+ set_metric (tmp, direction, type, value);
}
GNUNET_SERVER_receive_done (client, GNUNET_OK);
struct DelayQueueEntry *next;
struct TM_Peer *tmp = dqe->tmp;
struct GNUNET_TIME_Relative delay;
- tmp->send_delay_task = GNUNET_SCHEDULER_NO_TASK;
- GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
- GST_neighbours_send (&tmp->peer, dqe->msg, dqe->msg_size, dqe->timeout, dqe->cont, dqe->cont_cls);
- next = tmp->send_head;
- if (NULL != next)
+ if (NULL != tmp)
{
- /* More delayed messages */
- delay = GNUNET_TIME_absolute_get_remaining (next->sent_at);
- tmp->send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
- }
+ GNUNET_break (GNUNET_YES == GST_neighbours_test_connected (&dqe->id));
+ tmp->send_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
+ GST_neighbours_send (&dqe->id, dqe->msg, dqe->msg_size, dqe->timeout, dqe->cont, dqe->cont_cls);
+ next = tmp->send_head;
+ if (NULL != next)
+ {
+ /* More delayed messages */
+ delay = GNUNET_TIME_absolute_get_remaining (next->sent_at);
+ tmp->send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, next);
+ }
+ }
+ else
+ {
+ /* Remove from generic queue */
+ GNUNET_break (GNUNET_YES == GST_neighbours_test_connected (&dqe->id));
+ generic_send_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_CONTAINER_DLL_remove (generic_dqe_head, generic_dqe_tail, dqe);
+ GST_neighbours_send (&dqe->id, dqe->msg, dqe->msg_size, dqe->timeout, dqe->cont, dqe->cont_cls);
+ next = generic_dqe_head;
+ if (NULL != next)
+ {
+ /* More delayed messages */
+ delay = GNUNET_TIME_absolute_get_remaining (next->sent_at);
+ generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, next);
+ }
+ }
GNUNET_free (dqe);
}
+
+/**
+ * Adapter function between transport's send function and transport plugins
+ *
+ * @param target the peer the message to send to
+ * @param msg the message received
+ * @param msg_size message size
+ * @param timeout timeout
+ * @param cont the continuation to call after sending
+ * @param cont_cls cls for continuation
+ */
void
GST_manipulation_send (const struct GNUNET_PeerIdentity *target, const void *msg,
size_t msg_size, struct GNUNET_TIME_Relative timeout,
if (NULL != (tmp = GNUNET_CONTAINER_multihashmap_get (man_handle.peers, &target->hashPubKey)))
{
+ GNUNET_break (GNUNET_YES == GST_neighbours_test_connected(target));
/* Manipulate here */
/* Delay */
- if (UINT32_MAX != tmp->metrics[TM_SEND][DELAY])
+ if (UINT32_MAX != find_metric(tmp, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND))
{
/* We have a delay */
- delay.rel_value = tmp->metrics[TM_SEND][DELAY];
+ delay.rel_value = find_metric(tmp, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND);
dqe = GNUNET_malloc (sizeof (struct DelayQueueEntry) + msg_size);
+ dqe->id = *target;
dqe->tmp = tmp;
dqe->sent_at = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get(), delay);
dqe->cont = cont;
GNUNET_CONTAINER_DLL_insert_tail (tmp->send_head, tmp->send_tail, dqe);
if (GNUNET_SCHEDULER_NO_TASK == tmp->send_delay_task)
tmp->send_delay_task =GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Delaying %u byte message to peer `%s' with generic delay for %llu ms\n",
+ msg_size, GNUNET_i2s (target), (long long unsigned int) delay.rel_value);
return;
}
}
- else if (man_handle.delay_out.rel_value != 0)
+ else if (UINT32_MAX != find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND))
{
+ GNUNET_break (GNUNET_YES == GST_neighbours_test_connected(target));
/* We have a delay */
- delay = man_handle.delay_out;
+ delay.rel_value = find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND);
dqe = GNUNET_malloc (sizeof (struct DelayQueueEntry) + msg_size);
- dqe->tmp = tmp;
+ dqe->id = *target;
+ dqe->tmp = NULL;
dqe->sent_at = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get(), delay);
dqe->cont = cont;
dqe->cont_cls = cont_cls;
dqe->msg_size = msg_size;
dqe->timeout = timeout;
memcpy (dqe->msg, msg, msg_size);
- GNUNET_CONTAINER_DLL_insert_tail (tmp->send_head, tmp->send_tail, dqe);
- if (GNUNET_SCHEDULER_NO_TASK == tmp->send_delay_task)
- tmp->send_delay_task =GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
+ GNUNET_CONTAINER_DLL_insert_tail (generic_dqe_head, generic_dqe_tail, dqe);
+ if (GNUNET_SCHEDULER_NO_TASK == generic_send_delay_task)
+ {
+ generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (delay, &send_delayed, dqe);
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Delaying %u byte message to peer `%s' with peer specific delay for %llu ms\n",
+ msg_size, GNUNET_i2s (target), (long long unsigned int) delay.rel_value);
return;
}
GST_neighbours_send (target, msg, msg_size, timeout, cont, cont_cls);
}
+
+/**
+ * Function that will be called to manipulate ATS information according to
+ * current manipulation settings
+ *
+ * @param peer the peer
+ * @param address binary address
+ * @param session the session
+ * @param ats the ats information
+ * @param ats_count the number of ats information
+ */
+struct GNUNET_ATS_Information *
+GST_manipulation_manipulate_metrics (const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_HELLO_Address *address,
+ struct Session *session,
+ const struct GNUNET_ATS_Information *ats,
+ uint32_t ats_count)
+{
+ struct GNUNET_ATS_Information *ats_new = GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) *ats_count);
+ struct TM_Peer *tmp;
+ uint32_t m_tmp;
+ uint32_t g_tmp;
+ int d;
+ tmp = GNUNET_CONTAINER_multihashmap_get (man_handle.peers, &peer->hashPubKey);
+
+ for (d = 0; d < ats_count; d++)
+ {
+ ats_new[d] = ats[d];
+ m_tmp = UINT32_MAX;
+ g_tmp = UINT32_MAX;
+ if (NULL != tmp)
+ m_tmp = find_metric (tmp, ntohl(ats[d].type), TM_RECEIVE);
+ g_tmp = find_metric (&man_handle.general, ntohl(ats[d].type), TM_RECEIVE);
+
+ if (UINT32_MAX != g_tmp)
+ ats_new[d].value = htonl(g_tmp);
+ if (UINT32_MAX != m_tmp)
+ ats_new[d].value = htonl(m_tmp);
+ }
+
+ return ats_new;
+}
+
+
+/**
+ * Adapter function between transport plugins and transport receive function
+ * manipulation delays for next send.
+ *
+ * @param cls the closure for transport
+ * @param peer the peer the message was received from
+ * @param message the message received
+ * @param session the session the message was received on
+ * @param sender_address the sender address
+ * @param sender_address_len the length of the sender address
+ * @return manipulated delay for next receive
+ */
struct GNUNET_TIME_Relative
-GST_manipulation_recv (void *cls, const struct GNUNET_PeerIdentity *peer,
+GST_manipulation_recv (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_MessageHeader *message,
- const struct GNUNET_ATS_Information *ats,
- uint32_t ats_count, struct Session *session,
+ struct Session *session,
const char *sender_address,
uint16_t sender_address_len)
{
struct TM_Peer *tmp;
- int d;
- struct GNUNET_ATS_Information ats_new[ats_count];
+ uint32_t p_recv_delay;
+ uint32_t g_recv_delay;
struct GNUNET_TIME_Relative quota_delay;
struct GNUNET_TIME_Relative m_delay;
- if (man_handle.delay_in.rel_value > GNUNET_TIME_UNIT_ZERO.rel_value)
- m_delay = man_handle.delay_in; /* Global delay */
+ g_recv_delay = find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_RECEIVE);
+ if ((g_recv_delay >= GNUNET_TIME_UNIT_ZERO.rel_value) && (UINT32_MAX != g_recv_delay))
+ m_delay.rel_value = g_recv_delay; /* Global delay */
else
m_delay = GNUNET_TIME_UNIT_ZERO;
-
- for (d = 0; d < ats_count; d++)
- {
- ats_new[d] = ats[d];
- if ((ntohl(ats[d].type) == GNUNET_ATS_QUALITY_NET_DISTANCE) &&
- (man_handle.distance_in > 0))
- ats_new[d].value = htonl(man_handle.distance_in); /* Global inbound distance */
- }
-
if (NULL != (tmp = GNUNET_CONTAINER_multihashmap_get (man_handle.peers, &peer->hashPubKey)))
{
- /* Manipulate distance */
- for (d = 0; d < ats_count; d++)
- {
- ats_new[d] = ats[d];
- /* Set distance */
- if ((ntohl(ats[d].type) == GNUNET_ATS_QUALITY_NET_DISTANCE) &&
- (UINT32_MAX != tmp->metrics[TM_RECEIVE][DISTANCE]))
- ats_new[d].value = htonl(tmp->metrics[TM_RECEIVE][DISTANCE]);
- }
/* Manipulate receive delay */
- if (UINT32_MAX != tmp->metrics[TM_RECEIVE][DELAY])
- m_delay.rel_value = tmp->metrics[TM_RECEIVE][DELAY]; /* Peer specific delay */
+ p_recv_delay = find_metric (tmp, GNUNET_ATS_QUALITY_NET_DELAY, TM_RECEIVE);
+ if (UINT32_MAX != p_recv_delay)
+ m_delay.rel_value = p_recv_delay; /* Peer specific delay */
}
- quota_delay = GST_receive_callback (cls, peer, message, ats_new, ats_count,
+ quota_delay = GST_receive_callback (cls, peer, message,
session, sender_address, sender_address_len);
+
if (quota_delay.rel_value > m_delay.rel_value)
- return quota_delay;
- else
- return m_delay;
+ m_delay = quota_delay;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Delaying next receive for peer `%s' for %llu ms\n",
+ GNUNET_i2s (peer), (long long unsigned int) m_delay.rel_value);
+ return m_delay;
+
}
+
+/**
+ * Initialize traffic manipulation
+ *
+ * @param GST_cfg configuration handle
+ */
void
GST_manipulation_init (const struct GNUNET_CONFIGURATION_Handle *GST_cfg)
{
+ unsigned long long tmp;
if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
- "transport", "MANIPULATE_DISTANCE_IN", &man_handle.distance_in))
+ "transport", "MANIPULATE_DISTANCE_IN", &tmp) && (tmp > 0))
+ {
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Setting inbound distance_in to %u\n",
- (unsigned long long) man_handle.distance_in);
- else
- man_handle.distance_in = 0;
+ (unsigned long long) tmp);
+ set_metric (&man_handle.general, TM_RECEIVE, GNUNET_ATS_QUALITY_NET_DISTANCE, tmp);
+ }
if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
- "transport", "MANIPULATE_DISTANCE_OUT", &man_handle.distance_out))
+ "transport", "MANIPULATE_DISTANCE_OUT", &tmp) && (tmp > 0))
+ {
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Setting outbound distance_in to %u\n",
- (unsigned long long) man_handle.distance_out);
- else
- man_handle.distance_out = 0;
+ (unsigned long long) tmp);
+ set_metric (&man_handle.general, TM_SEND, GNUNET_ATS_QUALITY_NET_DISTANCE, tmp);
+ }
- if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_time (GST_cfg,
- "transport", "MANIPULATE_DELAY_IN", &man_handle.delay_in))
+ if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
+ "transport", "MANIPULATE_DELAY_IN", &tmp) && (tmp > 0))
+ {
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying inbound traffic for %llu ms\n",
- (unsigned long long) man_handle.delay_in.rel_value);
- else
- man_handle.delay_in.rel_value = 0;
+ (unsigned long long) tmp);
+ set_metric (&man_handle.general, TM_RECEIVE, GNUNET_ATS_QUALITY_NET_DELAY, tmp);
+ }
- if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_time (GST_cfg,
- "transport", "MANIPULATE_DELAY_OUT", &man_handle.delay_out))
+
+ if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GST_cfg,
+ "transport", "MANIPULATE_DELAY_OUT", &tmp) && (tmp > 0))
+ {
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying outbound traffic for %llu ms\n",
- (unsigned long long) man_handle.delay_out.rel_value);
- else
- man_handle.delay_out.rel_value = 0;
+ (unsigned long long) tmp);
+ set_metric (&man_handle.general, TM_SEND, GNUNET_ATS_QUALITY_NET_DELAY, tmp);
+ }
man_handle.peers = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
}
-int free_tmps (void *cls,
- const struct GNUNET_HashCode * key,
- void *value)
+
+static int
+free_tmps (void *cls,
+ const struct GNUNET_HashCode * key,
+ void *value)
{
struct DelayQueueEntry *dqe;
struct DelayQueueEntry *next;
if (NULL != value)
{
struct TM_Peer *tmp = (struct TM_Peer *) value;
- GNUNET_CONTAINER_multihashmap_remove (man_handle.peers, key, value);
+ if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (man_handle.peers, key, value))
+ GNUNET_break (0);
+ free_metric (tmp);
next = tmp->send_head;
while (NULL != (dqe = next))
{
next = dqe->next;
GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
+ if (NULL != dqe->cont)
+ dqe->cont (dqe->cont_cls, GNUNET_SYSERR, dqe->msg_size, 0);
GNUNET_free (dqe);
}
if (GNUNET_SCHEDULER_NO_TASK != tmp->send_delay_task)
return GNUNET_OK;
}
+/**
+ * Notify manipulation about disconnect so it can discard queued messages
+ *
+ * @param peer the disconnecting peer
+ */
+void
+GST_manipulation_peer_disconnect (const struct GNUNET_PeerIdentity *peer)
+{
+ struct TM_Peer *tmp;
+ struct DelayQueueEntry *dqe;
+ struct DelayQueueEntry *next;
+
+ if (NULL != (tmp = GNUNET_CONTAINER_multihashmap_get (man_handle.peers, &peer->hashPubKey)))
+ {
+ next = tmp->send_head;
+ while (NULL != (dqe = next))
+ {
+ next = dqe->next;
+ GNUNET_CONTAINER_DLL_remove (tmp->send_head, tmp->send_tail, dqe);
+ if (NULL != dqe->cont)
+ dqe->cont (dqe->cont_cls, GNUNET_SYSERR, dqe->msg_size, 0);
+ GNUNET_free (dqe);
+ }
+ }
+ else if (UINT32_MAX != find_metric (&man_handle.general, GNUNET_ATS_QUALITY_NET_DELAY, TM_SEND))
+ {
+ next = generic_dqe_head;
+ while (NULL != (dqe = next))
+ {
+ next = dqe->next;
+ if (0 == memcmp (peer, &dqe->id, sizeof (dqe->id)))
+ {
+ GNUNET_CONTAINER_DLL_remove (generic_dqe_head, generic_dqe_tail, dqe);
+ if (NULL != dqe->cont)
+ dqe->cont (dqe->cont_cls, GNUNET_SYSERR, dqe->msg_size, 0);
+ GNUNET_free (dqe);
+ }
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != generic_send_delay_task)
+ {
+ GNUNET_SCHEDULER_cancel (generic_send_delay_task);
+ generic_send_delay_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining(generic_dqe_head->sent_at), &send_delayed, generic_dqe_head);
+ }
+ }
+}
+
+
+/**
+ * Stop traffic manipulation
+ */
void
GST_manipulation_stop ()
{
+ struct DelayQueueEntry *cur;
+ struct DelayQueueEntry *next;
GNUNET_CONTAINER_multihashmap_iterate (man_handle.peers, &free_tmps,NULL);
-
GNUNET_CONTAINER_multihashmap_destroy (man_handle.peers);
+
+ next = generic_dqe_head;
+ while (NULL != (cur = next))
+ {
+ next = cur->next;
+ GNUNET_CONTAINER_DLL_remove (generic_dqe_head, generic_dqe_tail, cur);
+ if (NULL != cur->cont)
+ cur->cont (cur->cont_cls, GNUNET_SYSERR, cur->msg_size, 0);
+ GNUNET_free (cur);
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != generic_send_delay_task)
+ {
+ GNUNET_SCHEDULER_cancel (generic_send_delay_task);
+ generic_send_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+
+ free_metric (&man_handle.general);
man_handle.peers = NULL;
}