+ tmit->paused = GNUNET_NO;
+
+ *data_size = size;
+ GNUNET_memcpy (data, tmit->data[tmit->n], size);
+
+ return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES;
+}
+
+
+static int
+tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
+ uint32_t *full_value_size)
+{
+ struct TransmitClosure *tmit = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Test #%d: Transmit notify modifier: %u bytes available, "
+ "%u modifiers left to process.\n",
+ test, *data_size, (unsigned int) GNUNET_PSYC_env_get_count (tmit->env));
+
+ uint16_t name_size = 0;
+ size_t value_size = 0;
+ const char *value = NULL;
+
+ if (NULL != oper && NULL != tmit->mod)
+ { /* New modifier */
+ tmit->mod = tmit->mod->next;
+ if (NULL == tmit->mod)
+ { /* No more modifiers, continue with data */
+ *data_size = 0;
+ return GNUNET_YES;
+ }
+
+ GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
+ *full_value_size = tmit->mod->value_size;
+ *oper = tmit->mod->oper;
+ name_size = strlen (tmit->mod->name);
+
+ if (name_size + 1 + tmit->mod->value_size <= *data_size)
+ {
+ *data_size = name_size + 1 + tmit->mod->value_size;
+ }
+ else
+ {
+ tmit->mod_value_size = tmit->mod->value_size;
+ value_size = *data_size - name_size - 1;
+ tmit->mod_value_size -= value_size;
+ tmit->mod_value = tmit->mod->value + value_size;
+ }
+
+ GNUNET_memcpy (data, tmit->mod->name, name_size);
+ ((char *)data)[name_size] = '\0';
+ GNUNET_memcpy ((char *)data + name_size + 1, tmit->mod->value, value_size);
+ }
+ else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
+ { /* Modifier continuation */
+ value = tmit->mod_value;
+ if (tmit->mod_value_size <= *data_size)
+ {
+ value_size = tmit->mod_value_size;
+ tmit->mod_value = NULL;
+ }
+ else
+ {
+ value_size = *data_size;
+ tmit->mod_value += value_size;
+ }
+ tmit->mod_value_size -= value_size;
+
+ if (*data_size < value_size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "value larger than buffer: %u < %zu\n",
+ *data_size, value_size);
+ *data_size = 0;
+ return GNUNET_NO;
+ }
+
+ *data_size = value_size;
+ GNUNET_memcpy (data, value, value_size);
+ }
+
+ return GNUNET_NO;
+}
+
+
+static void
+slave_join ();
+
+
+static void
+slave_transmit ()
+{
+ test = TEST_SLAVE_TRANSMIT;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Test #%d: Slave sending request to master.\n", test);
+
+ tmit = GNUNET_new (struct TransmitClosure);
+ tmit->env = GNUNET_PSYC_env_create ();
+ GNUNET_PSYC_env_add (tmit->env, GNUNET_PSYC_OP_ASSIGN,
+ "_abc", "abc def", 7);
+ GNUNET_PSYC_env_add (tmit->env, GNUNET_PSYC_OP_ASSIGN,
+ "_abc_def", "abc def ghi", 11);
+ tmit->mod = GNUNET_PSYC_env_head (tmit->env);
+ tmit->n = 0;
+ tmit->data[0] = "slave test";
+ tmit->data_count = 1;
+ tmit->slv_tmit
+ = GNUNET_PSYC_slave_transmit (slv, "_request_test", &tmit_notify_mod,
+ &tmit_notify_data, tmit,
+ GNUNET_PSYC_SLAVE_TRANSMIT_NONE);
+}
+
+
+static void
+slave_remove_cb (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Test #%d: slave_remove:\t%" PRId64 " (%.*s)\n",
+ test, result, err_msg_size, (char *) err_msg);
+
+ slave_transmit ();
+}
+
+
+static void
+slave_remove ()
+{
+ test = TEST_SLAVE_REMOVE;
+ struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
+ GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2,
+ &slave_remove_cb, chn);
+}
+
+
+static void
+slave_add_cb (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Test #%d: slave_add:\t%" PRId64 " (%.*s)\n",
+ test, result, err_msg_size, (char *) err_msg);
+ slave_remove ();
+}
+
+
+static void
+slave_add ()
+{
+ test = TEST_SLAVE_ADD;
+ struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
+ GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn);
+}
+
+
+static void
+first_slave_parted (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "First slave parted.\n");
+ slave_join (TEST_SLAVE_JOIN_ACCEPT);
+}
+
+
+static void
+schedule_slave_part (void *cls)
+{
+ GNUNET_PSYC_slave_part (slv, GNUNET_NO, &first_slave_parted, NULL);
+}
+
+
+static void
+join_decision_cb (void *cls,
+ const struct GNUNET_PSYC_JoinDecisionMessage *dcsn,
+ int is_admitted,
+ const struct GNUNET_PSYC_Message *join_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Test #%d: Slave got join decision: %d\n", test, is_admitted);
+
+ switch (test)
+ {
+ case TEST_SLAVE_JOIN_REJECT:
+ GNUNET_assert (0 == is_admitted);
+ GNUNET_assert (1 == join_req_count);
+ GNUNET_SCHEDULER_add_now (&schedule_slave_part, NULL);
+ break;
+
+ case TEST_SLAVE_JOIN_ACCEPT:
+ GNUNET_assert (1 == is_admitted);
+ GNUNET_assert (2 == join_req_count);
+ slave_add ();
+ break;
+
+ default:
+ GNUNET_break (0);
+ }
+}
+
+
+static void
+join_request_cb (void *cls,
+ const struct GNUNET_PSYC_JoinRequestMessage *req,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ const struct GNUNET_PSYC_Message *join_msg,
+ struct GNUNET_PSYC_JoinHandle *jh)
+{
+ struct GNUNET_HashCode slave_key_hash;
+ GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Test #%d: Got join request #%u from %s.\n",
+ test, join_req_count, GNUNET_h2s (&slave_key_hash));
+
+ /* Reject first request */
+ int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO;
+ GNUNET_PSYC_join_decision (jh, is_admitted, 0, NULL, NULL);
+}
+
+
+static void
+slave_connect_cb (void *cls, int result, uint64_t max_message_id)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Test #%d: Slave connected: %d, max_message_id: %" PRIu64 "\n",
+ test, result, max_message_id);
+ GNUNET_assert (TEST_SLAVE_JOIN_REJECT == test || TEST_SLAVE_JOIN_ACCEPT == test);
+ GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);