/*
* This file is part of GNUnet
- * (C) 2013 Christian Grothoff (and other contributing authors)
+ * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
*
* GNUnet is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published
#include "gnunet_env_lib.h"
#include "gnunet_psyc_util_lib.h"
#include "gnunet_psyc_service.h"
+#include "gnunet_core_service.h"
#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
const struct GNUNET_CONFIGURATION_Handle *cfg;
+struct GNUNET_CORE_Handle *core;
+struct GNUNET_PeerIdentity this_peer;
+
/**
* Handle for task for timeout termination.
*/
-GNUNET_SCHEDULER_TaskIdentifier end_badly_task;
+struct GNUNET_SCHEDULER_Task * end_badly_task;
struct GNUNET_PSYC_Master *mst;
struct GNUNET_PSYC_Slave *slv;
+struct GNUNET_PSYC_Channel *mst_chn, *slv_chn;
+
struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key;
struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key;
struct TransmitClosure *tmit;
-uint8_t join_req_count;
+uint8_t join_req_count, end_count;
enum
{
- TEST_NONE,
- TEST_SLAVE_TRANSMIT,
- TEST_MASTER_TRANSMIT,
+ TEST_NONE = 0,
+ TEST_MASTER_START = 1,
+ TEST_SLAVE_JOIN = 2,
+ TEST_SLAVE_TRANSMIT = 3,
+ TEST_MASTER_TRANSMIT = 4,
+ TEST_MASTER_HISTORY_REPLAY_LATEST = 5,
+ TEST_SLAVE_HISTORY_REPLAY_LATEST = 6,
+ TEST_MASTER_HISTORY_REPLAY = 7,
+ TEST_SLAVE_HISTORY_REPLAY = 8,
+ TEST_MASTER_STATE_GET = 9,
+ TEST_SLAVE_STATE_GET = 10,
+ TEST_MASTER_STATE_GET_PREFIX = 11,
+ TEST_SLAVE_STATE_GET_PREFIX = 12,
} test;
void
master_transmit ();
+void
+master_history_replay_latest ();
+
void master_stopped (void *cls)
{
void
cleanup ()
{
+ if (NULL != core)
+ {
+ GNUNET_CORE_disconnect (core);
+ core = NULL;
+ }
if (NULL != slv)
{
GNUNET_PSYC_slave_part (slv, GNUNET_NO, &slave_parted, NULL);
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending tests.\n");
- if (end_badly_task != GNUNET_SCHEDULER_NO_TASK)
+ if (end_badly_task != NULL)
{
GNUNET_SCHEDULER_cancel (end_badly_task);
- end_badly_task = GNUNET_SCHEDULER_NO_TASK;
+ end_badly_task = NULL;
}
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MILLISECONDS,
&end_normally, NULL);
const struct GNUNET_PSYC_MessageHeader *msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Master got PSYC message fragment of size %u "
- "belonging to message ID %llu with flags %x\n",
- ntohs (msg->header.size), message_id, flags);
+ "Test #%d: Master got PSYC message fragment of size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, ntohs (msg->header.size), message_id, flags);
// FIXME
}
if (NULL == msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Error while receiving message %llu\n", message_id);
+ "Error while receiving message %" PRIu64 "\n", message_id);
return;
}
uint16_t size = ntohs (msg->size);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Master got message part of type %u and size %u "
- "belonging to message ID %llu with flags %x\n",
- type, size, message_id, flags);
+ "Test #%d: Master got message part of type %u and size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, type, size, message_id, flags);
switch (test)
{
break;
case TEST_MASTER_TRANSMIT:
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count)
+ master_history_replay_latest ();
+ break;
+
+ case TEST_MASTER_HISTORY_REPLAY:
+ case TEST_MASTER_HISTORY_REPLAY_LATEST:
+ if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
+ test, flags);
+ GNUNET_assert (0);
+ return;
+ }
break;
default:
const struct GNUNET_PSYC_MessageHeader *msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Slave got PSYC message fragment of size %u "
- "belonging to message ID %llu with flags %x\n",
- ntohs (msg->header.size), message_id, flags);
+ "Test #%d: Slave got PSYC message fragment of size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, ntohs (msg->header.size), message_id, flags);
// FIXME
}
if (NULL == msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Error while receiving message %llu\n", message_id);
+ "Error while receiving message " PRIu64 "\n", message_id);
return;
}
uint16_t size = ntohs (msg->size);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Slave got message part of type %u and size %u "
- "belonging to message ID %llu with flags %x\n",
- type, size, message_id, flags);
+ "Test #%d: Slave got message part of type %u and size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, type, size, message_id, flags);
switch (test)
{
case TEST_MASTER_TRANSMIT:
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
- end ();
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count)
+ master_history_replay_latest ();
+ break;
+
+ case TEST_SLAVE_HISTORY_REPLAY:
+ case TEST_SLAVE_HISTORY_REPLAY_LATEST:
+ if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
+ flags);
+ GNUNET_assert (0);
+ return;
+ }
break;
default:
}
+void
+state_get_var (void *cls, const char *name, const void *value, size_t value_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Got state var: %s\n%.*s\n", name, value_size, value);
+}
+
+
+/*** Slave state_get_prefix() ***/
+
+void
+slave_state_get_prefix_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_state_get_prefix:\t%" PRId64 " (%.s)\n",
+ result, err_msg_size, err_msg);
+ // FIXME: GNUNET_assert (2 == result);
+ end ();
+}
+
+
+void
+slave_state_get_prefix ()
+{
+ test = TEST_SLAVE_STATE_GET_PREFIX;
+ GNUNET_PSYC_channel_state_get_prefix (slv_chn, "_foo", &state_get_var,
+ &slave_state_get_prefix_result, NULL);
+}
+
+
+/*** Master state_get_prefix() ***/
+
+
+void
+master_state_get_prefix_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg);
+ // FIXME: GNUNET_assert (2 == result);
+ slave_state_get_prefix ();
+}
+
+
+void
+master_state_get_prefix ()
+{
+ test = TEST_MASTER_STATE_GET_PREFIX;
+ GNUNET_PSYC_channel_state_get_prefix (mst_chn, "_foo", &state_get_var,
+ &master_state_get_prefix_result, NULL);
+}
+
+
+/*** Slave state_get() ***/
+
+
+void
+slave_state_get_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_state_get:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+ // FIXME: GNUNET_assert (2 == result);
+ master_state_get_prefix ();
+}
+
+
+void
+slave_state_get ()
+{
+ test = TEST_SLAVE_STATE_GET;
+ GNUNET_PSYC_channel_state_get (slv_chn, "_foo_bar_baz", &state_get_var,
+ &slave_state_get_result, NULL);
+}
+
+
+/*** Master state_get() ***/
+
+
+void
+master_state_get_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "master_state_get:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+ // FIXME: GNUNET_assert (1 == result);
+ slave_state_get ();
+}
+
+
+void
+master_state_get ()
+{
+ test = TEST_MASTER_STATE_GET;
+ GNUNET_PSYC_channel_state_get (mst_chn, "_foo_bar_baz", &state_get_var,
+ &master_state_get_result, NULL);
+}
+
+
+/*** Slave history_replay() ***/
+
+void
+slave_history_replay_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_history_replay:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+ GNUNET_assert (9 == result);
+
+ master_state_get ();
+}
+
+
+void
+slave_history_replay ()
+{
+ test = TEST_SLAVE_HISTORY_REPLAY;
+ GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &slave_message_cb,
+ &slave_message_part_cb,
+ &slave_history_replay_result, NULL);
+}
+
+
+/*** Master history_replay() ***/
+
+
+void
+master_history_replay_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "master_history_replay:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+ GNUNET_assert (9 == result);
+
+ slave_history_replay ();
+}
+
+
+void
+master_history_replay ()
+{
+ test = TEST_MASTER_HISTORY_REPLAY;
+ GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &master_message_cb,
+ &master_message_part_cb,
+ &master_history_replay_result, NULL);
+}
+
+
+/*** Slave history_replay_latest() ***/
+
+
+void
+slave_history_replay_latest_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_history_replay_latest:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+ GNUNET_assert (9 == result);
+
+ master_history_replay ();
+}
+
+
+void
+slave_history_replay_latest ()
+{
+ test = TEST_SLAVE_HISTORY_REPLAY_LATEST;
+ GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &slave_message_cb,
+ &slave_message_part_cb,
+ &slave_history_replay_latest_result,
+ NULL);
+}
+
+
+/*** Master history_replay_latest() ***/
+
+
+void
+master_history_replay_latest_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "master_history_replay_latest:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+ GNUNET_assert (9 == result);
+
+ slave_history_replay_latest ();
+}
+
+
+void
+master_history_replay_latest ()
+{
+ test = TEST_MASTER_HISTORY_REPLAY_LATEST;
+ GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &master_message_cb,
+ &master_message_part_cb,
+ &master_history_replay_latest_result,
+ NULL);
+}
+
+
void
transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
}
-void
+static void
slave_join ();
void
-join_decision_cb (void *cls,
- const struct GNUNET_PSYC_JoinDecisionMessage *dcsn,
- int is_admitted,
- const struct GNUNET_PSYC_Message *join_msg)
+slave_transmit ()
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Slave got join decision: %d\n", is_admitted);
-
- if (GNUNET_YES != is_admitted)
- { /* First join request is refused, retry. */
- GNUNET_assert (1 == join_req_count);
- slave_join ();
- return;
- }
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n");
-
test = TEST_SLAVE_TRANSMIT;
tmit = GNUNET_new (struct TransmitClosure);
tmit->data[0] = "slave test";
tmit->data_count = 1;
tmit->slv_tmit
- = GNUNET_PSYC_slave_transmit (slv, "_request_test", tmit_notify_mod,
- tmit_notify_data, tmit,
+ = GNUNET_PSYC_slave_transmit (slv, "_request_test", &tmit_notify_mod,
+ &tmit_notify_data, tmit,
GNUNET_PSYC_SLAVE_TRANSMIT_NONE);
}
void
+slave_remove_cb (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_remove:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+
+ slave_transmit ();
+}
+
+
+void
+slave_add_cb (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_add:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+
+ struct GNUNET_PSYC_Channel *chn = cls;
+ GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2,
+ &slave_remove_cb, chn);
+
+}
+
+
+static void
+join_decision_cb (void *cls,
+ const struct GNUNET_PSYC_JoinDecisionMessage *dcsn,
+ int is_admitted,
+ const struct GNUNET_PSYC_Message *join_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Slave got join decision: %d\n", is_admitted);
+
+ if (GNUNET_YES != is_admitted)
+ { /* First join request is refused, retry. */
+ GNUNET_assert (1 == join_req_count);
+ slave_join ();
+ return;
+ }
+
+ struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
+ GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn);
+}
+
+
+static void
join_request_cb (void *cls,
const struct GNUNET_PSYC_JoinRequestMessage *req,
const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
}
-void
-slave_connect_cb (void *cls, uint64_t max_message_id)
+static void
+slave_connect_cb (void *cls, int result, uint64_t max_message_id)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Slave connected: %lu\n", max_message_id);
+ "Slave connected: %d, max_message_id: %" PRIu64 "\n",
+ result, max_message_id);
+ GNUNET_assert (TEST_SLAVE_JOIN == test);
+ GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
}
-void
+static void
slave_join ()
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
+ test = TEST_SLAVE_JOIN;
- struct GNUNET_PeerIdentity origin = {}; // FIXME: this peer
+ struct GNUNET_PeerIdentity origin = this_peer;
struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
"_foo", "bar baz", 7);
&slave_message_cb, &slave_message_part_cb,
&slave_connect_cb, &join_decision_cb, NULL,
join_msg);
+ GNUNET_free (join_msg);
+ slv_chn = GNUNET_PSYC_slave_get_channel (slv);
GNUNET_ENV_environment_destroy (env);
}
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
test = TEST_MASTER_TRANSMIT;
+ end_count = 0;
+
uint32_t i, j;
char *name_max = "_test_max";
tmit->data_delay[1] = 3;
tmit->data_count = 4;
tmit->mst_tmit
- = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod,
- tmit_notify_data, tmit,
+ = GNUNET_PSYC_master_transmit (mst, "_notice_test", &tmit_notify_mod,
+ &tmit_notify_data, tmit,
GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
}
void
-master_start_cb (void *cls, uint64_t max_message_id)
+master_start_cb (void *cls, int result, uint64_t max_message_id)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Master started: %" PRIu64 "\n", max_message_id);
+ "Master started: %d, max_message_id: %" PRIu64 "\n",
+ result, max_message_id);
+ GNUNET_assert (TEST_MASTER_START == test);
+ GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
slave_join ();
}
master_start ()
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
+ test = TEST_MASTER_START;
mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE,
&master_start_cb, &join_request_cb,
&master_message_cb, &master_message_part_cb,
NULL);
+ mst_chn = GNUNET_PSYC_master_get_channel (mst);
}
void
}
+void
+core_connected (void *cls, const struct GNUNET_PeerIdentity *my_identity)
+{
+ this_peer = *my_identity;
+
+#if DEBUG_TEST_PSYC
+ master_start ();
+#else
+ /* Allow some time for the services to initialize. */
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+ &schedule_master_start, NULL);
+#endif
+
+}
+
/**
* Main function of the test, run from scheduler.
*
GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key);
GNUNET_CRYPTO_ecdsa_key_get_public (slave_key, &slave_pub_key);
-#if DEBUG_TEST_PSYC
- master_start ();
-#else
- /* Allow some time for the services to initialize. */
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
- &schedule_master_start, NULL);
-#endif
- return;
+ core = GNUNET_CORE_connect (cfg, NULL, &core_connected, NULL, NULL,
+ NULL, GNUNET_NO, NULL, GNUNET_NO, NULL);
}