+void
+slave_history_replay_latest ()
+{
+ test = TEST_SLAVE_HISTORY_REPLAY_LATEST;
+ GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &slave_message_cb,
+ &slave_message_part_cb,
+ &slave_history_replay_latest_result,
+ NULL);
+}
+
+
+/*** Master history_replay_latest() ***/
+
+
+void
+master_history_replay_latest_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "master_history_replay_latest:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+ GNUNET_assert (9 == result);
+
+ slave_history_replay_latest ();
+}
+
+
+void
+master_history_replay_latest ()
+{
+ test = TEST_MASTER_HISTORY_REPLAY_LATEST;
+ GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &master_message_cb,
+ &master_message_part_cb,
+ &master_history_replay_latest_result,
+ NULL);
+}
+
+
+void
+transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
+ struct TransmitClosure *tmit = cls;
+ if (NULL != tmit->mst_tmit)
+ GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit);
+ else
+ GNUNET_PSYC_slave_transmit_resume (tmit->slv_tmit);
+}
+
+
+int
+tmit_notify_data (void *cls, uint16_t *data_size, void *data)
+{
+ struct TransmitClosure *tmit = cls;
+ if (0 == tmit->data_count)
+ {
+ *data_size = 0;
+ return GNUNET_YES;
+ }
+
+ uint16_t size = strlen (tmit->data[tmit->n]);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmit notify data: %u bytes available, "
+ "processing fragment %u/%u (size %u).\n",
+ *data_size, tmit->n + 1, tmit->data_count, size);
+ if (*data_size < size)
+ {
+ *data_size = 0;
+ GNUNET_assert (0);
+ return GNUNET_SYSERR;
+ }
+
+ if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
+ tmit->paused = GNUNET_YES;
+ GNUNET_SCHEDULER_add_delayed (
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ tmit->data_delay[tmit->n]),
+ &transmit_resume, tmit);
+ *data_size = 0;
+ return GNUNET_NO;
+ }
+ tmit->paused = GNUNET_NO;
+
+ *data_size = size;
+ memcpy (data, tmit->data[tmit->n], size);
+
+ return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES;
+}
+
+
+int
+tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
+ uint32_t *full_value_size)
+{
+ struct TransmitClosure *tmit = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmit notify modifier: %lu bytes available, "
+ "%u modifiers left to process.\n",
+ *data_size, GNUNET_ENV_environment_get_count (tmit->env));
+
+ uint16_t name_size = 0;
+ size_t value_size = 0;
+ const char *value = NULL;
+
+ if (NULL != oper && NULL != tmit->mod)
+ { /* New modifier */
+ tmit->mod = tmit->mod->next;
+ if (NULL == tmit->mod)
+ { /* No more modifiers, continue with data */
+ *data_size = 0;
+ return GNUNET_YES;
+ }
+
+ GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
+ *full_value_size = tmit->mod->value_size;
+ *oper = tmit->mod->oper;
+ name_size = strlen (tmit->mod->name);
+
+ if (name_size + 1 + tmit->mod->value_size <= *data_size)
+ {
+ *data_size = name_size + 1 + tmit->mod->value_size;
+ }
+ else
+ {
+ tmit->mod_value_size = tmit->mod->value_size;
+ value_size = *data_size - name_size - 1;
+ tmit->mod_value_size -= value_size;
+ tmit->mod_value = tmit->mod->value + value_size;
+ }
+
+ memcpy (data, tmit->mod->name, name_size);
+ ((char *)data)[name_size] = '\0';
+ memcpy ((char *)data + name_size + 1, tmit->mod->value, value_size);
+ }
+ else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
+ { /* Modifier continuation */
+ value = tmit->mod_value;
+ if (tmit->mod_value_size <= *data_size)
+ {
+ value_size = tmit->mod_value_size;
+ tmit->mod_value = NULL;
+ }
+ else
+ {
+ value_size = *data_size;
+ tmit->mod_value += value_size;
+ }
+ tmit->mod_value_size -= value_size;
+
+ if (*data_size < value_size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "value larger than buffer: %u < %zu\n",
+ *data_size, value_size);
+ *data_size = 0;
+ return GNUNET_NO;
+ }
+
+ *data_size = value_size;
+ memcpy (data, value, value_size);
+ }
+
+ return GNUNET_NO;
+}
+
+
+static void
+slave_join ();
+
+
+void
+slave_transmit ()
+{
+
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n");
+ test = TEST_SLAVE_TRANSMIT;
+
+ tmit = GNUNET_new (struct TransmitClosure);
+ tmit->env = GNUNET_ENV_environment_create ();
+ GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+ "_abc", "abc def", 7);
+ GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+ "_abc_def", "abc def ghi", 11);
+ tmit->mod = GNUNET_ENV_environment_head (tmit->env);
+ tmit->n = 0;
+ tmit->data[0] = "slave test";
+ tmit->data_count = 1;
+ tmit->slv_tmit
+ = GNUNET_PSYC_slave_transmit (slv, "_request_test", &tmit_notify_mod,
+ &tmit_notify_data, tmit,
+ GNUNET_PSYC_SLAVE_TRANSMIT_NONE);
+}
+
+
+void
+slave_remove_cb (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_remove:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+
+ slave_transmit ();
+}
+
+
+void
+slave_add_cb (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_add:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+
+ struct GNUNET_PSYC_Channel *chn = cls;
+ GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2,
+ &slave_remove_cb, chn);
+
+}
+
+
+static void
+join_decision_cb (void *cls,
+ const struct GNUNET_PSYC_JoinDecisionMessage *dcsn,
+ int is_admitted,
+ const struct GNUNET_PSYC_Message *join_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Slave got join decision: %d\n", is_admitted);
+
+ if (GNUNET_YES != is_admitted)
+ { /* First join request is refused, retry. */
+ GNUNET_assert (1 == join_req_count);
+ slave_join ();
+ return;
+ }
+
+ struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
+ GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn);
+}
+
+
+static void
+join_request_cb (void *cls,
+ const struct GNUNET_PSYC_JoinRequestMessage *req,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ const struct GNUNET_PSYC_Message *join_msg,
+ struct GNUNET_PSYC_JoinHandle *jh)
+{
+ struct GNUNET_HashCode slave_key_hash;
+ GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Got join request #%u from %s.\n",
+ join_req_count, GNUNET_h2s (&slave_key_hash));
+
+ /* Reject first request */
+ int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO;
+ GNUNET_PSYC_join_decision (jh, is_admitted, 0, NULL, NULL);
+}
+
+
+static void
+slave_connect_cb (void *cls, int result, uint64_t max_message_id)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Slave connected: %d, max_message_id: %" PRIu64 "\n",
+ result, max_message_id);
+ GNUNET_assert (TEST_SLAVE_JOIN == test);
+ GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
+}
+
+
+static void
+slave_join ()
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
+ test = TEST_SLAVE_JOIN;
+
+ struct GNUNET_PeerIdentity origin = this_peer;
+ struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
+ GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
+ "_foo", "bar baz", 7);
+ GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
+ "_foo_bar", "foo bar baz", 11);
+ struct GNUNET_PSYC_Message *
+ join_msg = GNUNET_PSYC_message_create ("_request_join", env, "some data", 9);
+
+ slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, 0, NULL,
+ &slave_message_cb, &slave_message_part_cb,
+ &slave_connect_cb, &join_decision_cb, NULL,
+ join_msg);
+ GNUNET_free (join_msg);
+ slv_chn = GNUNET_PSYC_slave_get_channel (slv);
+ GNUNET_ENV_environment_destroy (env);
+}
+
+
+void
+master_transmit ()
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
+ test = TEST_MASTER_TRANSMIT;
+ end_count = 0;
+
+ uint32_t i, j;
+
+ char *name_max = "_test_max";
+ uint8_t name_max_size = sizeof ("_test_max");
+ char *val_max = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD);
+ for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; i++)
+ val_max[i] = (0 == i % 10000) ? '0' + i / 10000 : '.';
+
+ char *name_cont = "_test_cont";
+ uint8_t name_cont_size = sizeof ("_test_cont");
+ char *val_cont = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD
+ + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
+ for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size; i++)
+ val_cont[i] = (0 == i % 10000) ? '0' + i / 10000 : ':';
+ for (j = 0; j < GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; j++, i++)
+ val_cont[i] = (0 == j % 10000) ? '0' + j / 10000 : '!';
+
+ tmit = GNUNET_new (struct TransmitClosure);
+ tmit->env = GNUNET_ENV_environment_create ();
+ GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+ "_foo", "bar baz", 7);
+ GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+ name_max, val_max,
+ GNUNET_PSYC_MODIFIER_MAX_PAYLOAD
+ - name_max_size);
+ GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+ "_foo_bar", "foo bar baz", 11);
+ GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+ name_cont, val_cont,
+ GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size
+ + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
+ tmit->mod = GNUNET_ENV_environment_head (tmit->env);
+ tmit->data[0] = "foo";
+ tmit->data[1] = GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1);
+ for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++)
+ tmit->data[1][i] = (0 == i % 10000) ? '0' + i / 10000 : '_';
+ tmit->data[2] = "foo bar";
+ tmit->data[3] = "foo bar baz";
+ tmit->data_delay[1] = 3;
+ tmit->data_count = 4;
+ tmit->mst_tmit
+ = GNUNET_PSYC_master_transmit (mst, "_notice_test", &tmit_notify_mod,
+ &tmit_notify_data, tmit,
+ GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
+}
+
+
+void
+master_start_cb (void *cls, int result, uint64_t max_message_id)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Master started: %d, max_message_id: %" PRIu64 "\n",
+ result, max_message_id);
+ GNUNET_assert (TEST_MASTER_START == test);
+ GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
+ slave_join ();
+}
+
+
+void
+master_start ()
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
+ test = TEST_MASTER_START;
+ mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE,
+ &master_start_cb, &join_request_cb,
+ &master_message_cb, &master_message_part_cb,
+ NULL);
+ mst_chn = GNUNET_PSYC_master_get_channel (mst);
+}
+
+void
+schedule_master_start (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ master_start ();
+}
+
+
+void
+core_connected (void *cls, const struct GNUNET_PeerIdentity *my_identity)
+{
+ this_peer = *my_identity;
+
+#if DEBUG_TEST_PSYC
+ master_start ();
+#else
+ /* Allow some time for the services to initialize. */
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+ &schedule_master_start, NULL);
+#endif
+
+}
+