*/
#include "platform.h"
#include "gnunet_ats_service.h"
+#include "ats_api.h"
-#define DEBUG_ATS GNUNET_YES
+#define DEBUG_ATS GNUNET_EXTRA_LOGGING
-// NOTE: this implementation is simply supposed
-// to implement a simplistic strategy in-process;
-// in the future, we plan to replace it with a real
-// service implementation
+#define LOG(kind,...) GNUNET_log_from (kind, "ats-api", __VA_ARGS__)
/**
- * Allocation record for a peer's address.
+ * Receive and send buffer windows grow over time. For
+ * how long can 'unused' bandwidth accumulate before we
+ * need to cap it? (specified in seconds).
*/
-struct AllocationRecord
-{
-
- /**
- * Performance information associated with this address (array).
- */
- struct GNUNET_TRANSPORT_ATS_Information *ats;
-
- /**
- * Name of the plugin
- */
- char *plugin_name;
-
- /**
- * Address this record represents, allocated at the end of this struct.
- */
- const void *plugin_addr;
-
- /**
- * Session associated with this record.
- */
- struct Session *session;
-
- /**
- * Number of bytes in plugin_addr.
- */
- size_t plugin_addr_len;
-
- /**
- * Number of entries in 'ats'.
- */
- uint32_t ats_count;
-
- /**
- * Bandwidth assigned to this address right now, 0 for none.
- */
- struct GNUNET_BANDWIDTH_Value32NBO bandwidth;
-
- /**
- * Set to GNUNET_YES if this is the connected address of a connected peer.
- */
- int connected;
+#define MAX_WINDOW_TIME_S (5 * 60)
-};
+// NOTE: this implementation is simply supposed
+// to implement a simplistic strategy in-process;
+// in the future, we plan to replace it with a real
+// service implementation
/**
/**
* Global ATS handle.
*/
- struct GNUNET_ATS_Handle *atc;
+ struct GNUNET_ATS_SchedulingHandle *atc;
/**
* Which peer are we monitoring?
};
-/**
- * Handle to the ATS subsystem.
- */
-struct GNUNET_ATS_Handle
-{
- /**
- * Configuration.
- */
- const struct GNUNET_CONFIGURATION_Handle *cfg;
-
- /**
- * Function to call when the allocation changes.
- */
- GNUNET_TRANSPORT_ATS_AllocationNotification alloc_cb;
-
- /**
- * Closure for 'alloc_cb'.
- */
- void *alloc_cb_cls;
-
- /**
- * Information about all connected peers. Maps peer identities
- * to one or more 'struct AllocationRecord' values.
- */
- struct GNUNET_CONTAINER_MultiHashMap *peers;
-
- /**
- * Map of PeerIdentities to 'struct GNUNET_ATS_SuggestionContext's.
- */
- struct GNUNET_CONTAINER_MultiHashMap *notify_map;
-
-
- /**
- * Task scheduled to update our bandwidth assignment.
- */
- GNUNET_SCHEDULER_TaskIdentifier ba_task;
-
- /**
- * Total bandwidth per configuration.
- */
- unsigned long long total_bps;
-};
-
-
/**
* Count number of connected records.
*
/**
* ATS handle.
*/
- struct GNUNET_ATS_Handle *atc;
+ struct GNUNET_ATS_SchedulingHandle *atc;
/**
- * Bandwidth to assign.
+ * Inbound bandwidth to assign.
*/
- struct GNUNET_BANDWIDTH_Value32NBO bw;
+ struct GNUNET_BANDWIDTH_Value32NBO bw_in;
+
+ /**
+ * Outbound bandwidth to assign.
+ */
+ struct GNUNET_BANDWIDTH_Value32NBO bw_out;
};
struct SetBandwidthContext *sbc = cls;
struct AllocationRecord *ar = value;
- if (GNUNET_YES == ar->connected)
+ GNUNET_assert (GNUNET_SYSERR != ar->connected);
+ /* FIXME: ||1 because we currently NEVER get 'connected' events... */
+ if ((GNUNET_YES == ar->connected) || 1)
{
- ar->bandwidth = sbc->bw;
- sbc->atc->alloc_cb (sbc->atc->alloc_cb_cls,
- (const struct GNUNET_PeerIdentity *) key,
- ar->plugin_name, ar->session, ar->plugin_addr,
- ar->plugin_addr_len, ar->bandwidth);
+ ar->bandwidth_in = sbc->bw_in;
+ ar->bandwidth_out = sbc->bw_out;
+ GNUNET_BANDWIDTH_tracker_update_quota (&ar->available_recv_window,
+ ar->bandwidth_in);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Bandwidth assigned to peer %s is i:%u/o:%u bytes/s\n",
+ GNUNET_i2s ((const struct GNUNET_PeerIdentity *) key),
+ ntohl (ar->bandwidth_in.value__), ntohl (ar->bandwidth_out.value__));
+ if (NULL != sbc->atc->alloc_cb)
+ sbc->atc->alloc_cb (sbc->atc->alloc_cb_cls,
+ (const struct GNUNET_PeerIdentity *) key,
+ ar->plugin_name, ar->plugin_addr, ar->plugin_addr_len,
+ ar->session, ar->bandwidth_out, ar->bandwidth_in,
+ NULL, 0);
}
- else if (ntohl (ar->bandwidth.value__) > 0)
+ else if (ntohl (ar->bandwidth_out.value__) > 0)
{
- ar->bandwidth = GNUNET_BANDWIDTH_value_init (0);
- sbc->atc->alloc_cb (sbc->atc->alloc_cb_cls,
- (const struct GNUNET_PeerIdentity *) key,
- ar->plugin_name, ar->session, ar->plugin_addr,
- ar->plugin_addr_len, ar->bandwidth);
+ ar->bandwidth_in = GNUNET_BANDWIDTH_value_init (0);
+ ar->bandwidth_out = GNUNET_BANDWIDTH_value_init (0);
+ if (NULL != sbc->atc->alloc_cb)
+ sbc->atc->alloc_cb (sbc->atc->alloc_cb_cls,
+ (const struct GNUNET_PeerIdentity *) key,
+ ar->plugin_name, ar->plugin_addr, ar->plugin_addr_len,
+ ar->session, ar->bandwidth_out, ar->bandwidth_in,
+ NULL, 0);
}
+ else
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Not communicating bandwidth assigned to peer %s: not connected and bw is: i:%u/o:%u bytes/s\n",
+ GNUNET_i2s ((const struct GNUNET_PeerIdentity *) key),
+ ntohl (ar->bandwidth_in.value__), ntohl (ar->bandwidth_out.value__));
+
return GNUNET_YES;
}
/**
* Task run to update bandwidth assignments.
*
- * @param cls the 'struct GNUNET_ATS_Handle'
+ * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
* @param tc scheduler context
*/
static void
update_bandwidth_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct GNUNET_ATS_Handle *atc = cls;
+ struct GNUNET_ATS_SchedulingHandle *atc = cls;
unsigned int ac = 0;
struct SetBandwidthContext bwc;
if (ac == 0)
ac++;
GNUNET_assert (ac > 0);
- bwc.bw = GNUNET_BANDWIDTH_value_init (atc->total_bps / ac);
+ bwc.bw_in = GNUNET_BANDWIDTH_value_init (atc->total_bps_in / ac);
+ bwc.bw_out = GNUNET_BANDWIDTH_value_init (atc->total_bps_out / ac);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Trivial implementation: bandwidth assigned to each peer is i:%u/o:%u bytes/s\n",
+ ntohl (bwc.bw_in.value__), ntohl (bwc.bw_out.value__));
GNUNET_CONTAINER_multihashmap_iterate (atc->peers, &set_bw_connections, &bwc);
}
* @param change which allocation record changed?
*/
static void
-update_bandwidth_assignment (struct GNUNET_ATS_Handle *atc,
+update_bandwidth_assignment (struct GNUNET_ATS_SchedulingHandle *atc,
struct AllocationRecord *change)
{
/* FIXME: based on the 'change', update the LP-problem... */
struct AllocationRecord *ar = value;
#if DEBUG_ATS
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "ats-api",
- "Suggesting address for peer `%s'\n", GNUNET_h2s (key));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Suggesting address for peer `%s', starting with i:%u/o:%u bytes/s\n",
+ GNUNET_h2s (key), asc->atc->total_bps_in / 32,
+ asc->atc->total_bps_out / 32);
#endif
/* trivial strategy: pick first available address... */
asc->cb (asc->cb_cls, &asc->target, ar->plugin_name, ar->plugin_addr,
ar->plugin_addr_len, ar->session,
- GNUNET_BANDWIDTH_value_init (asc->atc->total_bps / 32), ar->ats,
+ GNUNET_BANDWIDTH_value_init (asc->atc->total_bps_out / 32),
+ GNUNET_BANDWIDTH_value_init (asc->atc->total_bps_in / 32), ar->ats,
ar->ats_count);
asc->cb = NULL;
return GNUNET_NO;
}
+
int
map_it (void *cls, const GNUNET_HashCode * key, void *value)
{
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "ats-api", "Found entry for %s\n",
- GNUNET_h2s (key));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Found entry for %s\n", GNUNET_h2s (key));
return GNUNET_YES;
}
* @param cb_cls closure for cb
*/
struct GNUNET_ATS_SuggestionContext *
-GNUNET_ATS_suggest_address (struct GNUNET_ATS_Handle *atc,
+GNUNET_ATS_suggest_address (struct GNUNET_ATS_SchedulingHandle *atc,
const struct GNUNET_PeerIdentity *peer,
GNUNET_ATS_AddressSuggestionCallback cb,
void *cb_cls)
struct GNUNET_ATS_SuggestionContext *asc;
#if DEBUG_ATS
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "ats-api",
- "Looking up suggested address for peer `%s'\n",
- GNUNET_i2s (peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Looking up suggested address for peer `%s'\n",
+ GNUNET_i2s (peer));
#endif
- int count = 0;
-
asc = GNUNET_malloc (sizeof (struct GNUNET_ATS_SuggestionContext));
asc->cb = cb;
asc->cb_cls = cb_cls;
asc->atc = atc;
asc->target = *peer;
- count =
- GNUNET_CONTAINER_multihashmap_get_multiple (atc->peers, &peer->hashPubKey,
- &suggest_address, asc);
-
-#if DEBUG_ATS
- GNUNET_CONTAINER_multihashmap_iterate (atc->peers, &map_it, (void *) peer);
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "ats-api",
- "Addresses %u (of %i) processed, \n", count,
- GNUNET_CONTAINER_multihashmap_size (atc->peers));
-#endif
+ (void) GNUNET_CONTAINER_multihashmap_get_multiple (atc->peers,
+ &peer->hashPubKey,
+ &suggest_address, asc);
if (NULL == asc->cb)
{
* @param alloc_cb_cls closure for 'alloc_cb'
* @return ats context
*/
-struct GNUNET_ATS_Handle *
+struct GNUNET_ATS_SchedulingHandle *
GNUNET_ATS_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
- GNUNET_TRANSPORT_ATS_AllocationNotification alloc_cb,
+ GNUNET_ATS_AddressSuggestionCallback alloc_cb,
void *alloc_cb_cls)
{
- struct GNUNET_ATS_Handle *atc;
+ struct GNUNET_ATS_SchedulingHandle *atc;
#if DEBUG_ATS
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "ats-api", "ATS init\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "ATS init\n");
#endif
- atc = GNUNET_malloc (sizeof (struct GNUNET_ATS_Handle));
+ atc = GNUNET_malloc (sizeof (struct GNUNET_ATS_SchedulingHandle));
atc->cfg = cfg;
atc->alloc_cb = alloc_cb;
atc->alloc_cb_cls = alloc_cb_cls;
atc->peers = GNUNET_CONTAINER_multihashmap_create (256);
atc->notify_map = GNUNET_CONTAINER_multihashmap_create (256);
- GNUNET_CONFIGURATION_get_value_number (cfg, "core", "TOTAL_QUOTA_OUT",
- &atc->total_bps);
+ GNUNET_CONFIGURATION_get_value_size (cfg, "ats", "WAN_QUOTA_OUT",
+ &atc->total_bps_out);
+ GNUNET_CONFIGURATION_get_value_size (cfg, "ats", "WAN_QUOTA_IN",
+ &atc->total_bps_in);
return atc;
}
* @param atc handle
*/
void
-GNUNET_ATS_shutdown (struct GNUNET_ATS_Handle *atc)
+GNUNET_ATS_shutdown (struct GNUNET_ATS_SchedulingHandle *atc)
{
#if DEBUG_ATS
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "ats-api", "ATS shutdown\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "ATS shutdown\n");
#endif
if (GNUNET_SCHEDULER_NO_TASK != atc->ba_task)
{
/**
* Ats handle.
*/
- struct GNUNET_ATS_Handle *atc;
+ struct GNUNET_ATS_SchedulingHandle *atc;
/**
* Allocation record with new information.
struct UpdateSessionContext *usc = cls;
struct AllocationRecord *arnew = usc->arnew;
struct AllocationRecord *arold = value;
- int change;
+ int c_old;
+ int c_new;
+ int found;
-#if DEBUG_ATS
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "ats-api",
- "Updating session for peer `%s' plugin `%s'\n",
- GNUNET_h2s (key), arold->plugin_name, arnew->session,
- arold->session);
-#endif
if (0 != strcmp (arnew->plugin_name, arold->plugin_name))
return GNUNET_YES;
- if (((arnew->session == arold->session) && (arnew->session != NULL)) ||
- ((arold->session == NULL) &&
- (arold->plugin_addr_len == arnew->plugin_addr_len) &&
- (0 ==
- memcmp (arold->plugin_addr, arnew->plugin_addr,
- arnew->plugin_addr_len))))
+ if (!
+ (((arnew->session == arold->session) && (arnew->session != NULL)) ||
+ ((arold->session == NULL) &&
+ (arold->plugin_addr_len == arnew->plugin_addr_len) &&
+ (0 ==
+ memcmp (arold->plugin_addr, arnew->plugin_addr,
+ arnew->plugin_addr_len)))))
+ return GNUNET_YES; /* no match */
+ /* records match */
+#if DEBUG_ATS
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Updating session for peer `%s' plugin `%s'\n",
+ GNUNET_h2s (key), arold->plugin_name);
+#endif
+ if (arnew->session != arold->session)
+ {
+ arold->session = arnew->session;
+ }
+ if ((arnew->connected == GNUNET_YES) && (arold->connected == GNUNET_NO))
+ {
+ arold->connected = GNUNET_YES;
+ }
+
+ /* Update existing value */
+ c_new = 0;
+ while (c_new < arnew->ats_count)
{
- change = GNUNET_NO;
- /* records match */
- if (arnew->session != arold->session)
+ c_old = 0;
+ found = GNUNET_NO;
+ while (c_old < arold->ats_count)
{
- arold->session = arnew->session;
- change = GNUNET_YES;
+ if (arold->ats[c_old].type == arnew->ats[c_new].type)
+ {
+#if DEBUG_ATS
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Found type %i, old value=%i new value=%i\n",
+ ntohl (arold->ats[c_old].type), ntohl (arold->ats[c_old].value),
+ ntohl (arnew->ats[c_new].value));
+#endif
+ arold->ats[c_old].value = arnew->ats[c_new].value;
+ found = GNUNET_YES;
+ }
+ c_old++;
}
- if ((arnew->connected == GNUNET_YES) && (arold->connected == GNUNET_NO))
+ /* Add new value */
+ if (found == GNUNET_NO)
{
- arold->connected = GNUNET_YES;
- change = GNUNET_YES;
+#if DEBUG_ATS
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Added new type %i new value=%i\n",
+ ntohl (arnew->ats[c_new].type), ntohl (arnew->ats[c_new].value));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Old array size: %u\n", arold->ats_count);
+#endif
+ GNUNET_array_grow (arold->ats, arold->ats_count, arold->ats_count + 1);
+ GNUNET_assert (arold->ats_count >= 2);
+ arold->ats[arold->ats_count - 2].type = arnew->ats[c_new].type;
+ arold->ats[arold->ats_count - 2].value = arnew->ats[c_new].value;
+ arold->ats[arold->ats_count - 1].type = htonl (0);
+ arold->ats[arold->ats_count - 1].value = htonl (0);
+#if DEBUG_ATS
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "New array size: %i\n", arold->ats_count);
+#endif
}
- // FIXME: merge ats arrays of (arold, arnew);
-
- if (GNUNET_YES == change)
- update_bandwidth_assignment (usc->atc, arold);
- return GNUNET_NO;
+ c_new++;
}
- return GNUNET_YES;
+
+ update_bandwidth_assignment (usc->atc, arold);
+ return GNUNET_NO;
}
static struct AllocationRecord *
create_allocation_record (const char *plugin_name, struct Session *session,
const void *plugin_addr, size_t plugin_addr_len,
- const struct GNUNET_TRANSPORT_ATS_Information *ats,
+ const struct GNUNET_ATS_Information *ats,
uint32_t ats_count)
{
struct AllocationRecord *ar;
memcpy (&ar[1], plugin_addr, plugin_addr_len);
ar->session = session;
ar->plugin_addr_len = plugin_addr_len;
+ GNUNET_BANDWIDTH_tracker_init (&ar->available_recv_window, ar->bandwidth_in,
+ MAX_WINDOW_TIME_S);
+ GNUNET_assert (ats_count > 0);
GNUNET_array_grow (ar->ats, ar->ats_count, ats_count);
- memcpy (ar->ats, ats,
- ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
+ memcpy (ar->ats, ats, ats_count * sizeof (struct GNUNET_ATS_Information));
+ ar->connected = GNUNET_SYSERR; /* aka: not known / no change */
return ar;
}
static int
disconnect_peer (void *cls, const GNUNET_HashCode * key, void *value)
{
- struct GNUNET_ATS_Handle *atc = cls;
+ struct GNUNET_ATS_SchedulingHandle *atc = cls;
struct AllocationRecord *ar = value;
if (GNUNET_YES == ar->connected)
* @param ats_count number of performance records in 'ats'
*/
void
-GNUNET_ATS_peer_connect (struct GNUNET_ATS_Handle *atc,
+GNUNET_ATS_peer_connect (struct GNUNET_ATS_SchedulingHandle *atc,
const struct GNUNET_PeerIdentity *peer,
const char *plugin_name, struct Session *session,
const void *plugin_addr, size_t plugin_addr_len,
- const struct GNUNET_TRANSPORT_ATS_Information *ats,
+ const struct GNUNET_ATS_Information *ats,
uint32_t ats_count)
{
struct AllocationRecord *ar;
struct UpdateSessionContext usc;
+#if DEBUG_ATS
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to peer %s\n", GNUNET_i2s (peer));
+#endif
+
(void) GNUNET_CONTAINER_multihashmap_iterate (atc->peers, &disconnect_peer,
atc);
ar = create_allocation_record (plugin_name, session, plugin_addr,
* @param peer identity of the new peer
*/
void
-GNUNET_ATS_peer_disconnect (struct GNUNET_ATS_Handle *atc,
+GNUNET_ATS_peer_disconnect (struct GNUNET_ATS_SchedulingHandle *atc,
const struct GNUNET_PeerIdentity *peer)
{
+#if DEBUG_ATS
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnected from peer %s\n",
+ GNUNET_i2s (peer));
+#endif
(void) GNUNET_CONTAINER_multihashmap_get_multiple (atc->peers,
&peer->hashPubKey,
&disconnect_peer, atc);
/**
* Ats handle.
*/
- struct GNUNET_ATS_Handle *atc;
+ struct GNUNET_ATS_SchedulingHandle *atc;
/**
* Session being destroyed.
* @param session session handle that is no longer valid
*/
void
-GNUNET_ATS_session_destroyed (struct GNUNET_ATS_Handle *atc,
+GNUNET_ATS_session_destroyed (struct GNUNET_ATS_SchedulingHandle *atc,
const struct GNUNET_PeerIdentity *peer,
const struct Session *session)
{
asc->cb (asc->cb_cls, &asc->target, ar->plugin_name, ar->plugin_addr,
ar->plugin_addr_len, ar->session,
- GNUNET_BANDWIDTH_value_init (asc->atc->total_bps / 32), ar->ats,
+ GNUNET_BANDWIDTH_value_init (asc->atc->total_bps_out / 32),
+ GNUNET_BANDWIDTH_value_init (asc->atc->total_bps_in / 32), ar->ats,
ar->ats_count);
GNUNET_ATS_suggest_address_cancel (asc);
+ asc = NULL;
return GNUNET_OK;
}
* @param ats_count number of performance records in 'ats'
*/
void
-GNUNET_ATS_address_update (struct GNUNET_ATS_Handle *atc,
+GNUNET_ATS_address_update (struct GNUNET_ATS_SchedulingHandle *atc,
const struct GNUNET_PeerIdentity *peer,
struct GNUNET_TIME_Absolute valid_until,
const char *plugin_name, struct Session *session,
const void *plugin_addr, size_t plugin_addr_len,
- const struct GNUNET_TRANSPORT_ATS_Information *ats,
+ const struct GNUNET_ATS_Information *ats,
uint32_t ats_count)
{
struct AllocationRecord *ar;
struct UpdateSessionContext usc;
-
#if DEBUG_ATS
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "ats-api",
- "Updating address for peer `%s', plugin `%s'\n",
- GNUNET_i2s (peer), plugin_name);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Updating address for peer `%s', plugin `%s'\n",
+ GNUNET_i2s (peer), plugin_name);
#endif
-
ar = create_allocation_record (plugin_name, session, plugin_addr,
plugin_addr_len, ats, ats_count);
usc.atc = atc;
return;
}
#if DEBUG_ATS
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "ats-api",
- "Adding new address for peer `%s', plugin `%s'\n",
- GNUNET_i2s (peer), plugin_name);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Adding new address for peer `%s', plugin `%s'\n", GNUNET_i2s (peer),
+ plugin_name);
#endif
+ ar->connected = GNUNET_NO;
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (atc->peers,
&peer->hashPubKey, ar,