+void
+master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
+ const struct GNUNET_PSYC_MessageHeader *msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Test #%d: Master got PSYC message fragment of size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, ntohs (msg->header.size), message_id, flags);
+ // FIXME
+}
+
+
+void
+master_message_part_cb (void *cls, uint64_t message_id,
+ uint64_t data_offset, uint32_t flags,
+ const struct GNUNET_MessageHeader *msg)
+{
+ if (NULL == msg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error while receiving message %" PRIu64 "\n", message_id);
+ return;
+ }
+
+ uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Test #%d: Master got message part of type %u and size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, type, size, message_id, flags);
+
+ switch (test)
+ {
+ case TEST_SLAVE_TRANSMIT:
+ if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Unexpected request flags: %x" PRIu32 "\n", flags);
+ GNUNET_assert (0);
+ return;
+ }
+ // FIXME: check rest of message
+
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
+ master_transmit ();
+ break;
+
+ case TEST_MASTER_TRANSMIT:
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count)
+ master_history_replay_latest ();
+ break;
+
+ case TEST_MASTER_HISTORY_REPLAY:
+ case TEST_MASTER_HISTORY_REPLAY_LATEST:
+ if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
+ test, flags);
+ GNUNET_assert (0);
+ return;
+ }
+ break;
+
+ default:
+ GNUNET_assert (0);
+ }
+}
+
+
+void
+slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
+ const struct GNUNET_PSYC_MessageHeader *msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Test #%d: Slave got PSYC message fragment of size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, ntohs (msg->header.size), message_id, flags);
+ // FIXME
+}
+
+
+void
+slave_message_part_cb (void *cls, uint64_t message_id,
+ uint64_t data_offset, uint32_t flags,
+ const struct GNUNET_MessageHeader *msg)
+{
+ if (NULL == msg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error while receiving message " PRIu64 "\n", message_id);
+ return;
+ }
+
+ uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Test #%d: Slave got message part of type %u and size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, type, size, message_id, flags);
+
+ switch (test)
+ {
+ case TEST_MASTER_TRANSMIT:
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count)
+ master_history_replay_latest ();
+ break;
+
+ case TEST_SLAVE_HISTORY_REPLAY:
+ case TEST_SLAVE_HISTORY_REPLAY_LATEST:
+ if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
+ flags);
+ GNUNET_assert (0);
+ return;
+ }
+ break;
+
+ default:
+ GNUNET_assert (0);
+ }
+}
+
+
+void
+state_get_var (void *cls, const char *name, const void *value, size_t value_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Got state var: %s\n%.*s\n", name, value_size, value);
+}
+
+
+/*** Slave state_get_prefix() ***/
+
+void
+slave_state_get_prefix_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_state_get_prefix:\t%" PRId64 " (%.s)\n",
+ result, err_msg_size, err_msg);
+ // FIXME: GNUNET_assert (2 == result);
+ end ();
+}
+
+
+void
+slave_state_get_prefix ()
+{
+ test = TEST_SLAVE_STATE_GET_PREFIX;
+ GNUNET_PSYC_channel_state_get_prefix (slv_chn, "_foo", &state_get_var,
+ &slave_state_get_prefix_result, NULL);
+}
+
+
+/*** Master state_get_prefix() ***/
+
+
+void
+master_state_get_prefix_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg);
+ // FIXME: GNUNET_assert (2 == result);
+ slave_state_get_prefix ();
+}
+
+
+void
+master_state_get_prefix ()
+{
+ test = TEST_MASTER_STATE_GET_PREFIX;
+ GNUNET_PSYC_channel_state_get_prefix (mst_chn, "_foo", &state_get_var,
+ &master_state_get_prefix_result, NULL);
+}
+
+
+/*** Slave state_get() ***/
+
+
+void
+slave_state_get_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_state_get:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+ // FIXME: GNUNET_assert (2 == result);
+ master_state_get_prefix ();
+}
+
+
+void
+slave_state_get ()
+{
+ test = TEST_SLAVE_STATE_GET;
+ GNUNET_PSYC_channel_state_get (slv_chn, "_foo_bar_baz", &state_get_var,
+ &slave_state_get_result, NULL);
+}
+
+
+/*** Master state_get() ***/
+
+
+void
+master_state_get_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "master_state_get:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+ // FIXME: GNUNET_assert (1 == result);
+ slave_state_get ();
+}
+
+
+void
+master_state_get ()
+{
+ test = TEST_MASTER_STATE_GET;
+ GNUNET_PSYC_channel_state_get (mst_chn, "_foo_bar_baz", &state_get_var,
+ &master_state_get_result, NULL);
+}
+
+
+/*** Slave history_replay() ***/
+
+void
+slave_history_replay_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_history_replay:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+ GNUNET_assert (9 == result);
+
+ master_state_get ();
+}
+
+
+void
+slave_history_replay ()
+{
+ test = TEST_SLAVE_HISTORY_REPLAY;
+ GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &slave_message_cb,
+ &slave_message_part_cb,
+ &slave_history_replay_result, NULL);
+}
+
+
+/*** Master history_replay() ***/
+
+
+void
+master_history_replay_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "master_history_replay:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+ GNUNET_assert (9 == result);
+
+ slave_history_replay ();
+}
+
+
+void
+master_history_replay ()
+{
+ test = TEST_MASTER_HISTORY_REPLAY;
+ GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &master_message_cb,
+ &master_message_part_cb,
+ &master_history_replay_result, NULL);
+}
+
+
+/*** Slave history_replay_latest() ***/
+
+
+void
+slave_history_replay_latest_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_history_replay_latest:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+ GNUNET_assert (9 == result);
+
+ master_history_replay ();
+}
+
+
+void
+slave_history_replay_latest ()
+{
+ test = TEST_SLAVE_HISTORY_REPLAY_LATEST;
+ GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &slave_message_cb,
+ &slave_message_part_cb,
+ &slave_history_replay_latest_result,
+ NULL);
+}
+
+
+/*** Master history_replay_latest() ***/
+
+
+void
+master_history_replay_latest_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "master_history_replay_latest:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
+ GNUNET_assert (9 == result);
+
+ slave_history_replay_latest ();
+}
+
+
+void
+master_history_replay_latest ()
+{
+ test = TEST_MASTER_HISTORY_REPLAY_LATEST;
+ GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &master_message_cb,
+ &master_message_part_cb,
+ &master_history_replay_latest_result,
+ NULL);
+}
+
+
+void
+transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
+ struct TransmitClosure *tmit = cls;
+ if (NULL != tmit->mst_tmit)
+ GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit);
+ else
+ GNUNET_PSYC_slave_transmit_resume (tmit->slv_tmit);
+}
+
+
+int
+tmit_notify_data (void *cls, uint16_t *data_size, void *data)
+{
+ struct TransmitClosure *tmit = cls;
+ if (0 == tmit->data_count)
+ {
+ *data_size = 0;
+ return GNUNET_YES;
+ }
+
+ uint16_t size = strlen (tmit->data[tmit->n]);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmit notify data: %u bytes available, "
+ "processing fragment %u/%u (size %u).\n",
+ *data_size, tmit->n + 1, tmit->data_count, size);
+ if (*data_size < size)
+ {
+ *data_size = 0;
+ GNUNET_assert (0);
+ return GNUNET_SYSERR;
+ }
+
+ if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
+ tmit->paused = GNUNET_YES;
+ GNUNET_SCHEDULER_add_delayed (
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ tmit->data_delay[tmit->n]),
+ &transmit_resume, tmit);
+ *data_size = 0;
+ return GNUNET_NO;
+ }
+ tmit->paused = GNUNET_NO;
+
+ *data_size = size;
+ memcpy (data, tmit->data[tmit->n], size);
+
+ return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES;
+}
+
+
+int
+tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
+ uint32_t *full_value_size)