#include "testbed_api.h"
#include "testbed_api_hosts.h"
+#include "testbed_api_operations.h"
+#include "testbed_api_sd.h"
/**
* Generic logging shorthand
};
+/**
+ * A slot to record time taken by an overlay connect operation
+ */
+struct TimeSlot
+{
+ /**
+ * A key to identify this timeslot
+ */
+ void *key;
+
+ /**
+ * Time
+ */
+ struct GNUNET_TIME_Relative time;
+
+ /**
+ * Number of timing values accumulated
+ */
+ unsigned int nvals;
+};
+
+
/**
* Opaque handle to a host running experiments managed by the testing framework.
* The master process must be able to SSH to this host without password (via
*/
struct RegisteredController *rc_tail;
+ /**
+ * Operation queue for simultaneous overlay connect operations target at this
+ * host
+ */
+ struct OperationQueue *opq_parallel_overlay_connect_operations;
+
+ /**
+ * An array of timing slots; size should be equal to the current number of parallel
+ * overlay connects
+ */
+ struct TimeSlot *tslots;
+
+ /**
+ * Handle for SD calculations amount parallel overlay connect operation finish
+ * times
+ */
+ struct SDHandle *poc_sd;
+
+ /**
+ * The number of parallel overlay connects we do currently
+ */
+ unsigned int num_parallel_connects;
+
+ /**
+ * Counter to indicate when all the available time slots are filled
+ */
+ unsigned int tslots_filled;
+
/**
* Global ID we use to refer to a host on the network
*/
/**
* The size of the available hosts list
*/
-static uint32_t host_list_size;
+static unsigned int host_list_size;
/**
* @return hostname of the host
*/
const char *
-GNUNET_TESTBED_host_get_hostname_ (const struct GNUNET_TESTBED_Host *host)
+GNUNET_TESTBED_host_get_hostname (const struct GNUNET_TESTBED_Host *host)
{
return host->hostname;
}
const char *username, uint16_t port)
{
struct GNUNET_TESTBED_Host *host;
- uint32_t new_size;
+ unsigned int new_size;
if ((id < host_list_size) && (NULL != host_list[id]))
{
host->hostname = (NULL != hostname) ? GNUNET_strdup (hostname) : NULL;
host->username = (NULL != username) ? GNUNET_strdup (username) : NULL;
host->id = id;
- host->port = (0 == port) ? 22 : port;
+ host->port = (0 == port) ? 22 : port;
+ host->opq_parallel_overlay_connect_operations =
+ GNUNET_TESTBED_operation_queue_create_ (0);
+ GNUNET_TESTBED_set_num_parallel_overlay_connects_ (host, 1);
+ host->poc_sd = GNUNET_TESTBED_SD_init_ (10);
new_size = host_list_size;
while (id >= new_size)
new_size += HOST_LIST_GROW_STEP;
if (new_size != host_list_size)
- {
- host_list =
- GNUNET_realloc (host_list,
- sizeof (struct GNUNET_TESTBED_Host *) * new_size);
- (void) memset (&host_list[host_list_size], 0,
- sizeof (struct GNUNET_TESTBED_Host *) * (new_size -
- host_list_size));
- host_list_size = new_size;
- }
+ GNUNET_array_grow (host_list, host_list_size, new_size);
+ GNUNET_assert (id < host_list_size);
LOG (GNUNET_ERROR_TYPE_DEBUG, "Adding host with id: %u\n", host->id);
host_list[id] = host;
return host;
struct GNUNET_TESTBED_Host *starting_host;
char *data;
char *buf;
- char *username;
- char *hostname;
+ char username[256];
+ char hostname[256];
uint64_t fs;
short int port;
int ret;
unsigned int offset;
unsigned int count;
-
+
GNUNET_assert (NULL != filename);
if (GNUNET_YES != GNUNET_DISK_file_test (filename))
LOG (GNUNET_ERROR_TYPE_WARNING, _("Hosts file %s not found\n"), filename);
return 0;
}
- if (GNUNET_OK !=
+ if (GNUNET_OK !=
GNUNET_DISK_file_size (filename, &fs, GNUNET_YES, GNUNET_YES))
fs = 0;
if (0 == fs)
LOG (GNUNET_ERROR_TYPE_WARNING, _("Hosts file %s has no data\n"), filename);
return 0;
}
- data = GNUNET_malloc (fs);
+ data = GNUNET_malloc (fs);
if (fs != GNUNET_DISK_fn_read (filename, data, fs))
{
GNUNET_free (data);
if (((data[offset] == '\n')) && (buf != &data[offset]))
{
data[offset] = '\0';
- username = NULL;
- hostname = NULL;
- ret = SSCANF (buf, "%a[a-zA-Z0-9_]@%a[a-zA-Z0-9.]:%hd",
- &username, &hostname, &port);
- if (3 == ret)
+ ret =
+ SSCANF (buf, "%255[a-zA-Z0-9_]@%255[a-zA-Z0-9.]:%5hd", username,
+ hostname, &port);
+ if (3 == ret)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Successfully read host %s, port %d and user %s from file\n",
hostname, port, username);
/* We store hosts in a static list; hence we only require the starting
- host pointer in that list to access the newly created list of hosts */
+ * host pointer in that list to access the newly created list of hosts */
if (NULL == starting_host)
- starting_host = GNUNET_TESTBED_host_create (hostname, username,
- port);
+ starting_host = GNUNET_TESTBED_host_create (hostname, username, port);
else
(void) GNUNET_TESTBED_host_create (hostname, username, port);
count++;
else
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Error reading line `%s' in hostfile\n", buf);
- GNUNET_free_non_null (hostname);
- GNUNET_free_non_null (username);
buf = &data[offset + 1];
}
else if ((data[offset] == '\n') || (data[offset] == '\0'))
- buf = &data[offset + 1];
+ buf = &data[offset + 1];
}
GNUNET_free (data);
if (NULL == starting_host)
- return 0;
+ return 0;
*hosts = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Host *) * count);
- memcpy (*hosts,
- &host_list[GNUNET_TESTBED_host_get_id_ (starting_host)],
+ memcpy (*hosts, &host_list[GNUNET_TESTBED_host_get_id_ (starting_host)],
sizeof (struct GNUNET_TESTBED_Host *) * count);
return count;
}
}
GNUNET_free_non_null ((char *) host->username);
GNUNET_free_non_null ((char *) host->hostname);
+ GNUNET_TESTBED_operation_queue_destroy_
+ (host->opq_parallel_overlay_connect_operations);
+ GNUNET_TESTBED_SD_destroy_ (host->poc_sd);
+ GNUNET_free_non_null (host->tslots);
GNUNET_free (host);
while (host_list_size >= HOST_LIST_GROW_STEP)
{
}
rc = GNUNET_malloc (sizeof (struct RegisteredController));
rc->controller = controller;
- //host->controller = controller;
GNUNET_CONTAINER_DLL_insert_tail (host->rc_head, host->rc_tail, rc);
}
}
+/**
+ * The handle for whether a host is habitable or not
+ */
+struct GNUNET_TESTBED_HostHabitableCheckHandle
+{
+ /**
+ * The host to check
+ */
+ const struct GNUNET_TESTBED_Host *host;
+
+ /* /\** */
+ /* * the configuration handle to lookup the path of the testbed helper */
+ /* *\/ */
+ /* const struct GNUNET_CONFIGURATION_Handle *cfg; */
+
+ /**
+ * The callback to call once we have the status
+ */
+ GNUNET_TESTBED_HostHabitableCallback cb;
+
+ /**
+ * The callback closure
+ */
+ void *cb_cls;
+
+ /**
+ * The process handle for the SSH process
+ */
+ struct GNUNET_OS_Process *auxp;
+
+ /**
+ * The SSH destination address string
+ */
+ char *ssh_addr;
+
+ /**
+ * The destination port string
+ */
+ char *portstr;
+
+ /**
+ * The path for hte testbed helper binary
+ */
+ char *helper_binary_path;
+
+ /**
+ * Task id for the habitability check task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier habitability_check_task;
+
+ /**
+ * How long we wait before checking the process status. Should grow
+ * exponentially
+ */
+ struct GNUNET_TIME_Relative wait_time;
+
+};
+
+
+/**
+ * Task for checking whether a host is habitable or not
+ *
+ * @param cls GNUNET_TESTBED_HostHabitableCheckHandle
+ * @param tc the scheduler task context
+ */
+static void
+habitability_check (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_TESTBED_HostHabitableCheckHandle *h = cls;
+ void *cb_cls;
+ GNUNET_TESTBED_HostHabitableCallback cb;
+ const struct GNUNET_TESTBED_Host *host;
+ unsigned long code;
+ enum GNUNET_OS_ProcessStatusType type;
+ int ret;
+
+ h->habitability_check_task = GNUNET_SCHEDULER_NO_TASK;
+ ret = GNUNET_OS_process_status (h->auxp, &type, &code);
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_break (0);
+ ret = GNUNET_NO;
+ goto call_cb;
+ }
+ if (GNUNET_NO == ret)
+ {
+ h->wait_time = GNUNET_TIME_STD_BACKOFF (h->wait_time);
+ h->habitability_check_task =
+ GNUNET_SCHEDULER_add_delayed (h->wait_time, &habitability_check, h);
+ return;
+ }
+ GNUNET_OS_process_destroy (h->auxp);
+ h->auxp = NULL;
+ ret = (0 != code) ? GNUNET_NO : GNUNET_YES;
+
+call_cb:
+ GNUNET_free (h->ssh_addr);
+ GNUNET_free (h->portstr);
+ GNUNET_free (h->helper_binary_path);
+ if (NULL != h->auxp)
+ GNUNET_OS_process_destroy (h->auxp);
+ cb = h->cb;
+ cb_cls = h->cb_cls;
+ host = h->host;
+ GNUNET_free (h);
+ if (NULL != cb)
+ cb (cb_cls, host, ret);
+}
+
+
/**
* Checks whether a host can be used to start testbed service
*
* @param host the host to check
- * @return GNUNET_YES if testbed service can be started on the given host
- * remotely; GNUNET_NO if not
+ * @param config the configuration handle to lookup the path of the testbed
+ * helper
+ * @param cb the callback to call to inform about habitability of the given host
+ * @param cb_cls the closure for the callback
+ * @return NULL upon any error or a handle which can be passed to
+ * GNUNET_TESTBED_is_host_habitable_cancel()
*/
-int
-GNUNET_TESTBED_is_host_habitable (const struct GNUNET_TESTBED_Host *host)
+struct GNUNET_TESTBED_HostHabitableCheckHandle *
+GNUNET_TESTBED_is_host_habitable (const struct GNUNET_TESTBED_Host *host,
+ const struct GNUNET_CONFIGURATION_Handle
+ *config,
+ GNUNET_TESTBED_HostHabitableCallback cb,
+ void *cb_cls)
{
+ struct GNUNET_TESTBED_HostHabitableCheckHandle *h;
char *remote_args[11];
- char *portstr;
- char *ssh_addr;
const char *hostname;
- struct GNUNET_OS_Process *auxp;
- unsigned long code;
- enum GNUNET_OS_ProcessStatusType type;
- int ret;
unsigned int argp;
- portstr = NULL;
- ssh_addr = NULL;
+ h = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_HostHabitableCheckHandle));
+ h->cb = cb;
+ h->cb_cls = cb_cls;
+ h->host = host;
hostname = (NULL == host->hostname) ? "127.0.0.1" : host->hostname;
if (NULL == host->username)
- ssh_addr = GNUNET_strdup (hostname);
+ h->ssh_addr = GNUNET_strdup (hostname);
else
- GNUNET_asprintf (&ssh_addr, "%s@%s", host->username, hostname);
+ GNUNET_asprintf (&h->ssh_addr, "%s@%s", host->username, hostname);
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_string (config, "testbed",
+ "HELPER_BINARY_PATH",
+ &h->helper_binary_path))
+ h->helper_binary_path =
+ GNUNET_OS_get_libexec_binary_path (HELPER_TESTBED_BINARY);
argp = 0;
remote_args[argp++] = "ssh";
- GNUNET_asprintf (&portstr, "%u", host->port);
+ GNUNET_asprintf (&h->portstr, "%u", host->port);
remote_args[argp++] = "-p";
- remote_args[argp++] = portstr;
+ remote_args[argp++] = h->portstr;
remote_args[argp++] = "-o";
remote_args[argp++] = "BatchMode=yes";
remote_args[argp++] = "-o";
remote_args[argp++] = "NoHostAuthenticationForLocalhost=yes";
- remote_args[argp++] = ssh_addr;
- remote_args[argp++] = "which";
- remote_args[argp++] = "gnunet-helper-testbed";
+ remote_args[argp++] = h->ssh_addr;
+ remote_args[argp++] = "stat";
+ remote_args[argp++] = h->helper_binary_path;
remote_args[argp++] = NULL;
GNUNET_assert (argp == 11);
- auxp =
- GNUNET_OS_start_process_vap (GNUNET_NO, GNUNET_OS_INHERIT_STD_ALL, NULL,
+ h->auxp =
+ GNUNET_OS_start_process_vap (GNUNET_NO, GNUNET_OS_INHERIT_STD_ERR, NULL,
NULL, "ssh", remote_args);
- if (NULL == auxp)
+ if (NULL == h->auxp)
{
- GNUNET_free (ssh_addr);
- GNUNET_free (portstr);
+ GNUNET_break (0); /* Cannot exec SSH? */
+ GNUNET_free (h->ssh_addr);
+ GNUNET_free (h->portstr);
+ GNUNET_free (h->helper_binary_path);
+ GNUNET_free (h);
+ return NULL;
+ }
+ h->wait_time = GNUNET_TIME_STD_BACKOFF (h->wait_time);
+ h->habitability_check_task =
+ GNUNET_SCHEDULER_add_delayed (h->wait_time, &habitability_check, h);
+ return h;
+}
+
+
+/**
+ * Function to cancel a request started using GNUNET_TESTBED_is_host_habitable()
+ *
+ * @param handle the habitability check handle
+ */
+void
+GNUNET_TESTBED_is_host_habitable_cancel (struct
+ GNUNET_TESTBED_HostHabitableCheckHandle
+ *handle)
+{
+ GNUNET_SCHEDULER_cancel (handle->habitability_check_task);
+ (void) GNUNET_OS_process_kill (handle->auxp, SIGTERM);
+ (void) GNUNET_OS_process_wait (handle->auxp);
+ GNUNET_OS_process_destroy (handle->auxp);
+ GNUNET_free (handle->ssh_addr);
+ GNUNET_free (handle->portstr);
+ GNUNET_free (handle->helper_binary_path);
+ GNUNET_free (handle);
+}
+
+
+/**
+ * Initializes the operation queue for parallel overlay connects
+ *
+ * @param h the host handle
+ * @param npoc the number of parallel overlay connects - the queue size
+ */
+void
+GNUNET_TESTBED_set_num_parallel_overlay_connects_ (struct
+ GNUNET_TESTBED_Host *h,
+ unsigned int npoc)
+{
+ //fprintf (stderr, "%d", npoc);
+ GNUNET_free_non_null (h->tslots);
+ h->tslots_filled = 0;
+ h->num_parallel_connects = npoc;
+ h->tslots = GNUNET_malloc (npoc * sizeof (struct TimeSlot));
+ GNUNET_TESTBED_operation_queue_reset_max_active_
+ (h->opq_parallel_overlay_connect_operations, npoc);
+}
+
+
+/**
+ * Returns a timing slot which will be exclusively locked
+ *
+ * @param h the host handle
+ * @param key a pointer which is associated to the returned slot; should not be
+ * NULL. It serves as a key to determine the correct owner of the slot
+ * @return the time slot index in the array of time slots in the controller
+ * handle
+ */
+unsigned int
+GNUNET_TESTBED_get_tslot_ (struct GNUNET_TESTBED_Host *h, void *key)
+{
+ unsigned int slot;
+
+ GNUNET_assert (NULL != h->tslots);
+ GNUNET_assert (NULL != key);
+ for (slot = 0; slot < h->num_parallel_connects; slot++)
+ if (NULL == h->tslots[slot].key)
+ {
+ h->tslots[slot].key = key;
+ return slot;
+ }
+ GNUNET_assert (0); /* We should always find a free tslot */
+}
+
+
+/**
+ * Decides whether any change in the number of parallel overlay connects is
+ * necessary to adapt to the load on the system
+ *
+ * @param h the host handle
+ */
+static void
+decide_npoc (struct GNUNET_TESTBED_Host *h)
+{
+ struct GNUNET_TIME_Relative avg;
+ int sd;
+ unsigned int slot;
+ unsigned int nvals;
+
+ if (h->tslots_filled != h->num_parallel_connects)
+ return;
+ avg = GNUNET_TIME_UNIT_ZERO;
+ nvals = 0;
+ for (slot = 0; slot < h->num_parallel_connects; slot++)
+ {
+ avg = GNUNET_TIME_relative_add (avg, h->tslots[slot].time);
+ nvals += h->tslots[slot].nvals;
+ }
+ GNUNET_assert (nvals >= h->num_parallel_connects);
+ avg = GNUNET_TIME_relative_divide (avg, nvals);
+ GNUNET_assert (GNUNET_TIME_UNIT_FOREVER_REL.rel_value != avg.rel_value);
+ sd = GNUNET_TESTBED_SD_deviation_factor_ (h->poc_sd, (unsigned int) avg.rel_value);
+ if ( (sd <= 5) ||
+ (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ h->num_parallel_connects)) )
+ GNUNET_TESTBED_SD_add_data_ (h->poc_sd, (unsigned int) avg.rel_value);
+ if (GNUNET_SYSERR == sd)
+ {
+ GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
+ h->num_parallel_connects);
+ return;
+ }
+ GNUNET_assert (0 <= sd);
+ if (0 == sd)
+ {
+ GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
+ h->num_parallel_connects
+ * 2);
+ return;
+ }
+ if (1 == sd)
+ {
+ GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
+ h->num_parallel_connects
+ + 1);
+ return;
+ }
+ if (1 == h->num_parallel_connects)
+ {
+ GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h, 1);
+ return;
+ }
+ if (2 == sd)
+ {
+ GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
+ h->num_parallel_connects
+ - 1);
+ return;
+ }
+ GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
+ h->num_parallel_connects /
+ 2);
+}
+
+
+/**
+ * Releases a time slot thus making it available for be used again
+ *
+ * @param h the host handle
+ * @param index the index of the the time slot
+ * @param key the key to prove ownership of the timeslot
+ * @return GNUNET_YES if the time slot is successfully removed; GNUNET_NO if the
+ * time slot cannot be removed - this could be because of the index
+ * greater than existing number of time slots or `key' being different
+ */
+int
+GNUNET_TESTBED_release_time_slot_ (struct GNUNET_TESTBED_Host *h,
+ unsigned int index, void *key)
+{
+ struct TimeSlot *slot;
+
+ GNUNET_assert (NULL != key);
+ if (index >= h->num_parallel_connects)
return GNUNET_NO;
+ slot = &h->tslots[index];
+ if (key != slot->key)
+ return GNUNET_NO;
+ slot->key = NULL;
+ return GNUNET_YES;
+}
+
+
+/**
+ * Function to update a time slot
+ *
+ * @param h the host handle
+ * @param index the index of the time slot to update
+ * @param key the key to identify ownership of the slot
+ * @param time the new time
+ * @param failed should this reading be treated as coming from a fail event
+ */
+void
+GNUNET_TESTBED_update_time_slot_ (struct GNUNET_TESTBED_Host *h,
+ unsigned int index, void *key,
+ struct GNUNET_TIME_Relative time, int failed)
+{
+ struct TimeSlot *slot;
+
+ if (GNUNET_YES == failed)
+ {
+ if (1 == h->num_parallel_connects)
+ {
+ GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h, 1);
+ return;
+ }
+ GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
+ h->num_parallel_connects
+ - 1);
}
- do
+ if (GNUNET_NO == GNUNET_TESTBED_release_time_slot_ (h, index, key))
+ return;
+ slot = &h->tslots[index];
+ slot->nvals++;
+ if (GNUNET_TIME_UNIT_ZERO.rel_value == slot->time.rel_value)
{
- ret = GNUNET_OS_process_status (auxp, &type, &code);
- GNUNET_assert (GNUNET_SYSERR != ret);
- (void) usleep (300);
+ slot->time = time;
+ h->tslots_filled++;
+ decide_npoc (h);
+ return;
}
- while (GNUNET_NO == ret);
- //(void) GNUNET_OS_process_wait (auxp);
- GNUNET_OS_process_destroy (auxp);
- GNUNET_free (ssh_addr);
- GNUNET_free (portstr);
- return (0 != code) ? GNUNET_NO : GNUNET_YES;
+ slot->time = GNUNET_TIME_relative_add (slot->time, time);
+}
+
+
+/**
+ * Queues the given operation in the queue for parallel overlay connects of the
+ * given host
+ *
+ * @param h the host handle
+ * @param op the operation to queue in the given host's parally overlay connect
+ * queue
+ */
+void
+GNUNET_TESTBED_host_queue_oc (struct GNUNET_TESTBED_Host *h,
+ struct GNUNET_TESTBED_Operation *op)
+{
+ GNUNET_TESTBED_operation_queue_insert_
+ (h->opq_parallel_overlay_connect_operations, op);
}
/* end of testbed_api_hosts.c */