From 38963d1e81332032e0ac774f4f2c6b804c38802a Mon Sep 17 00:00:00 2001 From: Gabor X Toth <*@tg-x.net> Date: Fri, 28 Aug 2015 13:33:43 +0000 Subject: [PATCH] psyc/social: get state from psycstore --- src/include/gnunet_psyc_service.h | 28 +++- src/include/gnunet_psyc_util_lib.h | 17 +++ src/include/gnunet_psycstore_service.h | 2 +- src/psyc/gnunet-service-psyc.c | 156 ++++++++++++++--------- src/psyc/psyc_api.c | 22 ++-- src/psyc/test_psyc.c | 20 +-- src/psycstore/gnunet-service-psycstore.c | 78 +++++++----- src/psycstore/plugin_psycstore_sqlite.c | 38 ++++-- src/psycstore/psyc_util_lib.c | 40 +++++- src/psycstore/test_plugin_psycstore.c | 2 +- src/psycstore/test_psycstore.c | 2 +- src/social/gnunet-service-social.c | 133 +++++++++++++++++-- src/social/social_api.c | 68 +++++++--- src/social/test_social.c | 125 +++++++++++++++--- 14 files changed, 556 insertions(+), 175 deletions(-) diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h index 92397ec2e..1346e77c7 100644 --- a/src/include/gnunet_psyc_service.h +++ b/src/include/gnunet_psyc_service.h @@ -220,7 +220,8 @@ struct GNUNET_PSYC_Message /** * Header of a PSYC message. * - * Only present when receiving a message. + * The PSYC service adds this when delivering the message to local clients, + * not present on the multicast layer. */ struct GNUNET_PSYC_MessageHeader { @@ -1193,17 +1194,30 @@ GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel, /** * Function called to inform a member about stored state values for a channel. * - * @param cls Closure. - * @param name Name of the state variable. A NULL value indicates that there - * are no more state variables to be returned. - * @param value Value of the state variable. - * @param value_size Number of bytes in @a value. + * If @a full_value_size > value_size then this function is called multiple + * times until the whole value arrived. + * + * @param cls + * Closure. + * @param name + * Name of the state variable. + * NULL if there are no more state variables to be returned. + * @param value + * Value of the state variable. + * @param value_size + * Number of bytes in @a value. + * @param full_value_size + * Number of bytes in the full value, including continuations. + * Only set for the first part of a variable, + * in case of a continuation it is 0. */ typedef void (*GNUNET_PSYC_StateVarCallback) (void *cls, + const struct GNUNET_MessageHeader *mod, const char *name, const void *value, - size_t value_size); + uint32_t value_size, + uint32_t full_value_size); /** diff --git a/src/include/gnunet_psyc_util_lib.h b/src/include/gnunet_psyc_util_lib.h index c5dae975f..504476573 100644 --- a/src/include/gnunet_psyc_util_lib.h +++ b/src/include/gnunet_psyc_util_lib.h @@ -230,6 +230,23 @@ GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data, uint16_t *first_ptype, uint16_t *last_ptype); +/** + * Initialize PSYC message header. + */ +void +GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg, + const struct GNUNET_MULTICAST_MessageHeader *mmsg, + uint32_t flags); + + +/** + * Create a new PSYC message header from a multicast message for sending it to clients. + */ +struct GNUNET_PSYC_MessageHeader * +GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg, + uint32_t flags); + + #if 0 /* keep Emacsens' auto-indent happy */ { #endif diff --git a/src/include/gnunet_psycstore_service.h b/src/include/gnunet_psycstore_service.h index 8f3866bdb..17d53b34d 100644 --- a/src/include/gnunet_psycstore_service.h +++ b/src/include/gnunet_psycstore_service.h @@ -612,7 +612,7 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h, */; typedef int (*GNUNET_PSYCSTORE_StateCallback) (void *cls, const char *name, - const void *value, size_t value_size); + const void *value, uint32_t value_size); /** diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 4c34f6108..2afc98040 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c @@ -1,3 +1,4 @@ + /* * This file is part of GNUnet * Copyright (C) 2013 Christian Grothoff (and other contributing authors) @@ -170,6 +171,11 @@ struct FragmentQueue */ uint8_t state; + /** + * Whether the state is already modified in PSYCstore. + */ + uint8_t state_is_modified; + /** * Is the message queued for delivery to the client? * i.e. added to the recv_msgs queue @@ -460,9 +466,9 @@ op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client, static void -op_remove (struct Channel *chn, struct Operation *op) +op_remove (struct Operation *op) { - GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op); + GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op); GNUNET_free (op); } @@ -1008,7 +1014,8 @@ client_send_mcast_msg (struct Channel *chn, chn, GNUNET_ntohll (mmsg->fragment_id), GNUNET_ntohll (mmsg->message_id)); - struct GNUNET_PSYC_MessageHeader *pmsg = psyc_msg_new (mmsg, flags); + struct GNUNET_PSYC_MessageHeader * + pmsg = GNUNET_PSYC_message_header_create (mmsg, flags); client_send_msg (chn, &pmsg->header); GNUNET_free (pmsg); } @@ -1049,7 +1056,7 @@ client_send_mcast_req (struct Master *mst, /** * Insert a multicast message fragment into the queue belonging to the message. * - * @param chn Channel. + * @param chn Channel. * @param mmsg Multicast message fragment. * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. * @param first_ptype First PSYC message part type in @a mmsg. @@ -1222,7 +1229,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id, struct GNUNET_CONTAINER_MultiHashMap *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, &chn->pub_key_hash); - GNUNET_assert (NULL != chan_msgs); + GNUNET_assert (NULL != chan_msgs); // FIXME uint64_t frag_id; while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL, @@ -1279,8 +1286,8 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id, struct StateModifyClosure { struct Channel *chn; - struct FragmentQueue *fragq; - uint64_t message_id; + uint64_t msg_id; + struct GNUNET_HashCode msg_id_hash; }; @@ -1290,21 +1297,37 @@ store_recv_state_modify_result (void *cls, int64_t result, { struct StateModifyClosure *mcls = cls; struct Channel *chn = mcls->chn; - struct FragmentQueue *fragq = mcls->fragq; - uint64_t msg_id = mcls->message_id; + uint64_t msg_id = mcls->msg_id; + + struct FragmentQueue * + fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n", chn, result, err_msg_size, err_msg); - if (GNUNET_OK == result) + switch (result) { - chn->max_state_message_id = msg_id; - chn->max_message_id = msg_id; + case GNUNET_OK: + case GNUNET_NO: + if (NULL != fragq) + fragq->state_is_modified = GNUNET_YES; + if (chn->max_state_message_id < msg_id) + chn->max_state_message_id = msg_id; + if (chn->max_message_id < msg_id) + chn->max_message_id = msg_id; - fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); + if (NULL != fragq) + fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); message_queue_run (chn); + break; + + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n", + chn, result, err_msg_size, err_msg); + /** @todo FIXME: handle state_modify error */ } } @@ -1349,42 +1372,58 @@ message_queue_run (struct Channel *chn) break; } - if (MSG_FRAG_STATE_HEADER == fragq->state) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Fragment queue entry: state: %u, state delta: " + "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n", + chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id); + + if (MSG_FRAG_STATE_DATA <= fragq->state) { /* Check if there's a missing message before the current one */ if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n"); + if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) - && msg_id - 1 != chn->max_message_id) + && (chn->max_message_id != msg_id - 1 + && chn->max_message_id != msg_id)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Out of order message. " - "(%" PRIu64 " - 1 != %" PRIu64 ")\n", - chn, msg_id, chn->max_message_id); - continue; + "(%" PRIu64 " != %" PRIu64 " - 1)\n", + chn, chn->max_message_id, msg_id); + break; + // FIXME: keep track of messages processed in this queue run, + // and only stop after reaching the end } } else { - if (msg_id - fragq->state_delta != chn->max_state_message_id) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n"); + if (GNUNET_YES != fragq->state_is_modified) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Out of order stateful message. " - "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", - chn, msg_id, fragq->state_delta, chn->max_state_message_id); - continue; + if (msg_id - fragq->state_delta != chn->max_state_message_id) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Out of order stateful message. " + "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", + chn, msg_id, fragq->state_delta, chn->max_state_message_id); + break; + // FIXME: keep track of messages processed in this queue run, + // and only stop after reaching the end + } + + struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls)); + mcls->chn = chn; + mcls->msg_id = msg_id; + mcls->msg_id_hash = msg_id_hash; + + /* Apply modifiers to state in PSYCstore */ + GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id, + fragq->state_delta, + store_recv_state_modify_result, mcls); + break; // continue after asynchronous state modify result } - - struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls)); - mcls->chn = chn; - mcls->fragq = fragq; - mcls->message_id = msg_id; - - /* Apply modifiers to state in PSYCstore */ - GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id, - fragq->state_delta, - store_recv_state_modify_result, mcls); - break; } chn->max_message_id = msg_id; } @@ -2060,7 +2099,7 @@ static void master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, uint16_t first_ptype, uint16_t last_ptype) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst); if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) { @@ -2074,11 +2113,13 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, } else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_modify flag\n", mst); pmeth->state_delta = GNUNET_htonll (tmit_msg->id - mst->max_state_message_id); } else { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_not_modified flag\n", mst); pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); } @@ -2226,14 +2267,6 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, }; -struct MembershipStoreClosure -{ - struct GNUNET_SERVER_Client *client; - struct Channel *chn; - uint64_t op_id; -}; - - /** * Received result of GNUNET_PSYCSTORE_membership_store() */ @@ -2241,12 +2274,13 @@ static void store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg, uint16_t err_msg_size) { - struct MembershipStoreClosure *mcls = cls; + struct Operation *op = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n", - mcls->chn, result, err_msg_size, err_msg); + op->chn, result, err_msg_size, err_msg); - client_send_result (mcls->client, mcls->op_id, result, err_msg, err_msg_size); + client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); + op_remove (op); } @@ -2264,10 +2298,7 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, const struct ChannelMembershipStoreRequest * req = (const struct ChannelMembershipStoreRequest *) msg; - struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls)); - mcls->client = client; - mcls->chn = chn; - mcls->op_id = req->op_id; + struct Operation *op = op_add (chn, client, req->op_id, 0); uint64_t announced_at = GNUNET_ntohll (req->announced_at); uint64_t effective_since = GNUNET_ntohll (req->effective_since); @@ -2280,7 +2311,7 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key, req->did_join, announced_at, effective_since, 0, /* FIXME: group_generation */ - &store_recv_membership_store_result, mcls); + &store_recv_membership_store_result, op); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2313,7 +2344,7 @@ store_recv_fragment_history (void *cls, res->result_code = GNUNET_htonll (GNUNET_OK); pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; - psyc_msg_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC); + GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC); memcpy (&res[1], pmsg, psize); /** @todo FIXME: send only to requesting client */ @@ -2339,7 +2370,7 @@ store_recv_fragment_history_result (void *cls, int64_t result, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p History replay #%" PRIu64 ": " "PSYCSTORE returned %" PRId64 " (%.*s)\n", - op->chn, op->op_id, result, err_msg_size, err_msg); + op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE) { @@ -2347,6 +2378,7 @@ store_recv_fragment_history_result (void *cls, int64_t result, } client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); + op_remove (op); } @@ -2404,12 +2436,16 @@ client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, */ static int store_recv_state_var (void *cls, const char *name, - const void *value, size_t value_size) + const void *value, uint32_t value_size) { struct Operation *op = cls; struct GNUNET_OperationResultMessage *res; - if (NULL != name) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n", + op->chn, GNUNET_ntohll (op->op_id), name); + + if (NULL != name) /* First part */ { uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; struct GNUNET_PSYC_MessageModifier *mod; @@ -2427,7 +2463,7 @@ store_recv_state_var (void *cls, const char *name, memcpy (&mod[1], name, name_size); memcpy (((char *) &mod[1]) + name_size, value, value_size); } - else + else /* Continuation */ { struct GNUNET_MessageHeader *mod; res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size); @@ -2445,7 +2481,6 @@ store_recv_state_var (void *cls, const char *name, GNUNET_SERVER_notification_context_add (nc, op->client); GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header, GNUNET_NO); - GNUNET_free (op); return GNUNET_YES; } @@ -2460,12 +2495,13 @@ store_recv_state_result (void *cls, int64_t result, { struct Operation *op = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p History replay #%" PRIu64 ": " + "%p state_get #%" PRIu64 ": " "PSYCSTORE returned %" PRId64 " (%.*s)\n", - op->chn, op->op_id, result, err_msg_size, err_msg); + op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); // FIXME: client might have been disconnected client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); + op_remove (op); } diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 5fc5391a0..b862eee9c 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c @@ -366,34 +366,36 @@ channel_recv_state_result (void *cls, } const struct GNUNET_MessageHeader * - modc = (struct GNUNET_MessageHeader *) &res[1]; - uint16_t modc_size = ntohs (modc->size); - if (ntohs (msg->size) - sizeof (*msg) != modc_size) + mod = (struct GNUNET_MessageHeader *) &res[1]; + uint16_t mod_size = ntohs (mod->size); + if (ntohs (msg->size) - sizeof (*res) != mod_size) { GNUNET_break (0); return; } - switch (ntohs (modc->type)) + switch (ntohs (mod->type)) { case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: { const struct GNUNET_PSYC_MessageModifier * - mod = (const struct GNUNET_PSYC_MessageModifier *) modc; + pmod = (const struct GNUNET_PSYC_MessageModifier *) mod; - const char *name = (const char *) &mod[1]; - uint16_t name_size = ntohs (mod->name_size); + const char *name = (const char *) &pmod[1]; + uint16_t name_size = ntohs (pmod->name_size); if ('\0' != name[name_size - 1]) { GNUNET_break (0); return; } - sr->var_cb (sr->cls, name, name + name_size, ntohs (mod->value_size)); + sr->var_cb (sr->cls, mod, name, name + name_size, + ntohs (pmod->header.size) - sizeof (*pmod), + ntohs (pmod->value_size)); break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: - sr->var_cb (sr->cls, NULL, (const char *) &modc[1], - modc_size - sizeof (*modc)); + sr->var_cb (sr->cls, mod, NULL, (const char *) &mod[1], + mod_size - sizeof (*mod), 0); break; } } diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index d62474afb..4e7979a4d 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c @@ -329,7 +329,9 @@ slave_message_part_cb (void *cls, uint64_t message_id, void -state_get_var (void *cls, const char *name, const void *value, size_t value_size) +state_get_var (void *cls, const struct GNUNET_MessageHeader *mod, + const char *name, const void *value, + uint32_t value_size, uint32_t full_value_size) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got state var: %s\n%.*s\n", name, value_size, value); @@ -354,8 +356,8 @@ void slave_state_get_prefix () { test = TEST_SLAVE_STATE_GET_PREFIX; - GNUNET_PSYC_channel_state_get_prefix (slv_chn, "_foo", &state_get_var, - &slave_state_get_prefix_result, NULL); + GNUNET_PSYC_channel_state_get_prefix (slv_chn, "_foo", state_get_var, + slave_state_get_prefix_result, NULL); } @@ -377,8 +379,8 @@ void master_state_get_prefix () { test = TEST_MASTER_STATE_GET_PREFIX; - GNUNET_PSYC_channel_state_get_prefix (mst_chn, "_foo", &state_get_var, - &master_state_get_prefix_result, NULL); + GNUNET_PSYC_channel_state_get_prefix (mst_chn, "_foo", state_get_var, + master_state_get_prefix_result, NULL); } @@ -401,8 +403,8 @@ void slave_state_get () { test = TEST_SLAVE_STATE_GET; - GNUNET_PSYC_channel_state_get (slv_chn, "_foo_bar_baz", &state_get_var, - &slave_state_get_result, NULL); + GNUNET_PSYC_channel_state_get (slv_chn, "_foo_bar_baz", state_get_var, + slave_state_get_result, NULL); } @@ -425,8 +427,8 @@ void master_state_get () { test = TEST_MASTER_STATE_GET; - GNUNET_PSYC_channel_state_get (mst_chn, "_foo_bar_baz", &state_get_var, - &master_state_get_result, NULL); + GNUNET_PSYC_channel_state_get (mst_chn, "_foo_bar_baz", state_get_var, + master_state_get_result, NULL); } diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c index 6e40e7849..1f9de54f8 100644 --- a/src/psycstore/gnunet-service-psycstore.c +++ b/src/psycstore/gnunet-service-psycstore.c @@ -217,7 +217,7 @@ send_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg, static int send_state_var (void *cls, const char *name, - const void *value, size_t value_size) + const void *value, uint32_t value_size) { struct SendClosure *sc = cls; struct StateResult *res; @@ -496,14 +496,14 @@ handle_counters_get (void *cls, struct StateModifyClosure { - const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key; + const struct GNUNET_CRYPTO_EddsaPublicKey channel_key; struct GNUNET_PSYC_ReceiveHandle *recv; enum GNUNET_PSYC_MessageState msg_state; char mod_oper; char *mod_name; char *mod_value; - uint64_t mod_value_size; - uint64_t mod_value_remaining; + uint32_t mod_value_size; + uint32_t mod_value_remaining; }; @@ -513,6 +513,12 @@ recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset, { struct StateModifyClosure *scls = cls; uint16_t psize; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "recv_state_message_part() message_id: %" PRIu64 + ", data_offset: %" PRIu64 ", flags: %u\n", + message_id, data_offset, flags); + if (NULL == msg) { scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; @@ -533,7 +539,7 @@ recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset, pmod = (struct GNUNET_PSYC_MessageModifier *) msg; psize = ntohs (pmod->header.size); uint16_t name_size = ntohs (pmod->name_size); - uint16_t value_size = ntohs (pmod->value_size); + uint32_t value_size = ntohl (pmod->value_size); const char *name = (const char *) &pmod[1]; const void *value = name + name_size; @@ -542,7 +548,7 @@ recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset, { // Apply non-transient operation. if (psize == sizeof (*pmod) + name_size + value_size) { - db->state_modify_op (db->cls, scls->channel_key, + db->state_modify_op (db->cls, &scls->channel_key, pmod->oper, name, value, value_size); } else @@ -576,7 +582,7 @@ recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset, scls->mod_value_remaining -= psize - sizeof (*msg); if (0 == scls->mod_value_remaining) { - db->state_modify_op (db->cls, scls->channel_key, + db->state_modify_op (db->cls, &scls->channel_key, scls->mod_oper, scls->mod_name, scls->mod_value, scls->mod_value_size); GNUNET_free (scls->mod_name); @@ -616,9 +622,13 @@ recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg, scls); } - const struct GNUNET_PSYC_MessageHeader * - pmsg = (const struct GNUNET_PSYC_MessageHeader *) &msg[1]; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "recv_state_fragment: %" PRIu64 "\n", GNUNET_ntohll (msg->fragment_id)); + + struct GNUNET_PSYC_MessageHeader * + pmsg = GNUNET_PSYC_message_header_create (msg, flags); GNUNET_PSYC_receive_message (scls->recv, pmsg); + GNUNET_free (pmsg); return GNUNET_YES; } @@ -635,31 +645,41 @@ handle_state_modify (void *cls, uint64_t message_id = GNUNET_ntohll (req->message_id); uint64_t state_delta = GNUNET_ntohll (req->state_delta); uint64_t ret_frags = 0; + struct StateModifyClosure + scls = { .channel_key = req->channel_key }; - struct StateModifyClosure scls = { 0 }; - - if (GNUNET_OK != db->state_modify_begin (db->cls, &req->channel_key, - message_id, state_delta)) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Failed to begin modifying state!\n")); - GNUNET_break (0); - } - - int ret = db->message_get (db->cls, &req->channel_key, - message_id, message_id, - &ret_frags, &recv_state_fragment, &scls); + int ret = db->state_modify_begin (db->cls, &req->channel_key, + message_id, state_delta); - if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, message_id)) + if (GNUNET_OK != ret) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Failed to end modifying state!\n")); - GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to begin modifying state: %d\n"), ret); } - - if (NULL != scls.recv) + else { - GNUNET_PSYC_receive_destroy (scls.recv); + ret = db->message_get (db->cls, &req->channel_key, + message_id, message_id, + &ret_frags, &recv_state_fragment, &scls); + if (GNUNET_OK != ret) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to modify state: %d\n"), ret); + GNUNET_break (0); + } + else + { + if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, message_id)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to end modifying state!\n")); + GNUNET_break (0); + } + } + if (NULL != scls.recv) + { + GNUNET_PSYC_receive_destroy (scls.recv); + } } send_result_code (client, req->op_id, ret, NULL); diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c index 1abc479d2..1bf14644b 100644 --- a/src/psycstore/plugin_psycstore_sqlite.c +++ b/src/psycstore/plugin_psycstore_sqlite.c @@ -64,7 +64,8 @@ enum Transactions { TRANSACTION_NONE = 0, - TRANSACTION_STATE_MODIFY + TRANSACTION_STATE_MODIFY, + TRANSACTION_STATE_SYNC, }; /** @@ -1522,18 +1523,27 @@ state_modify_begin (void *cls, uint64_t max_state_message_id = 0; int ret = counters_state_get (plugin, channel_key, &max_state_message_id); - if (GNUNET_OK != ret) + switch (ret) + { + case GNUNET_OK: + case GNUNET_NO: // no state yet + ret = GNUNET_OK; + break; + default: return ret; + } - if (message_id - state_delta != max_state_message_id) - return GNUNET_NO; + if (max_state_message_id < message_id - state_delta) + return GNUNET_NO; /* some stateful messages not yet applied */ + else if (message_id - state_delta < max_state_message_id) + return GNUNET_NO; /* changes already applied */ } - // Make sure no other transaction is going on. if (TRANSACTION_NONE != plugin->transaction) - if (GNUNET_OK != transaction_rollback (plugin)) - return GNUNET_SYSERR; - + { + /** @todo FIXME: wait for other transaction to finish */ + return GNUNET_SYSERR; + } return transaction_begin (plugin, TRANSACTION_STATE_MODIFY); } @@ -1560,8 +1570,8 @@ state_modify_op (void *cls, return state_assign (plugin, plugin->insert_state_current, channel_key, name, value, value_size); - /// @todo implement more state operations - default: + default: /** @todo implement more state operations */ + GNUNET_break (0); return GNUNET_SYSERR; } } @@ -1630,7 +1640,13 @@ state_sync_end (void *cls, struct Plugin *plugin = cls; int ret = GNUNET_SYSERR; - GNUNET_OK == transaction_begin (plugin, TRANSACTION_NONE) + if (TRANSACTION_NONE != plugin->transaction) + { + /** @todo FIXME: wait for other transaction to finish */ + return GNUNET_SYSERR; + } + + GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC) && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key) && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync, channel_key) diff --git a/src/psycstore/psyc_util_lib.c b/src/psycstore/psyc_util_lib.c index 75f97aad0..f6dd4e593 100644 --- a/src/psycstore/psyc_util_lib.c +++ b/src/psycstore/psyc_util_lib.c @@ -570,7 +570,7 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper, { struct GNUNET_PSYC_TransmitHandle *tmit = cls; uint16_t name_size = 0; - size_t value_size = 0; + uint32_t value_size = 0; const char *value = NULL; if (NULL != oper) @@ -1231,3 +1231,41 @@ GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_Message *msg, ? GNUNET_OK : GNUNET_SYSERR; } + + +/** + * Initialize PSYC message header. + */ +void +GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg, + const struct GNUNET_MULTICAST_MessageHeader *mmsg, + uint32_t flags) +{ + uint16_t size = ntohs (mmsg->header.size); + uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); + + pmsg->header.size = htons (psize); + pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + pmsg->message_id = mmsg->message_id; + pmsg->fragment_offset = mmsg->fragment_offset; + pmsg->flags = htonl (flags); + + memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); +} + + +/** + * Create a new PSYC message header from a multicast message for sending it to clients. + */ +struct GNUNET_PSYC_MessageHeader * +GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg, + uint32_t flags) +{ + struct GNUNET_PSYC_MessageHeader *pmsg; + uint16_t size = ntohs (mmsg->header.size); + uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); + + pmsg = GNUNET_malloc (psize); + GNUNET_PSYC_message_header_init (pmsg, mmsg, flags); + return pmsg; +} diff --git a/src/psycstore/test_plugin_psycstore.c b/src/psycstore/test_plugin_psycstore.c index 0a7824929..5a5f970b9 100644 --- a/src/psycstore/test_plugin_psycstore.c +++ b/src/psycstore/test_plugin_psycstore.c @@ -141,7 +141,7 @@ struct StateClosure { }; static int -state_cb (void *cls, const char *name, const void *value, size_t value_size) +state_cb (void *cls, const char *name, const void *value, uint32_t value_size) { struct StateClosure *scls = cls; const void *val = scls->value[scls->n]; diff --git a/src/psycstore/test_psycstore.c b/src/psycstore/test_psycstore.c index c869a862f..58e6243b7 100644 --- a/src/psycstore/test_psycstore.c +++ b/src/psycstore/test_psycstore.c @@ -170,7 +170,7 @@ state_reset_result (void *cls, int64_t result, static int -state_result (void *cls, const char *name, const void *value, size_t value_size) +state_result (void *cls, const char *name, const void *value, uint32_t value_size) { struct StateClosure *scls = cls; const char *nam = scls->name[scls->n]; diff --git a/src/social/gnunet-service-social.c b/src/social/gnunet-service-social.c index 4211772f1..56612b488 100644 --- a/src/social/gnunet-service-social.c +++ b/src/social/gnunet-service-social.c @@ -350,7 +350,7 @@ cleanup_guest (struct Guest *gst) struct GNUNET_CONTAINER_MultiHashMap * plc_gst = GNUNET_CONTAINER_multihashmap_get (place_guests, &plc->pub_key_hash); - GNUNET_assert (NULL != plc_gst); + GNUNET_assert (NULL != plc_gst); // FIXME GNUNET_CONTAINER_multihashmap_remove (plc_gst, &gst->pub_key_hash, gst); if (0 == GNUNET_CONTAINER_multihashmap_size (plc_gst)) @@ -1302,7 +1302,7 @@ psyc_transmit_queue_next_method (struct Place *plc, { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p psyc_transmit_queue_next_method: unexpected message part of type %u.\n", - plc, ntohs (pmsg->type)); + plc, NULL != pmsg ? ntohs (pmsg->type) : 0); GNUNET_break (0); return GNUNET_SYSERR; } @@ -1536,12 +1536,10 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, /** - * A historic message result arrived from PSYC. + * A historic message arrived from PSYC. */ static void -psyc_recv_history_message (void *cls, - uint64_t message_id, - uint32_t flags, +psyc_recv_history_message (void *cls, uint64_t message_id, uint32_t flags, const struct GNUNET_PSYC_MessageHeader *msg) { struct OperationClosure *opcls = cls; @@ -1567,6 +1565,9 @@ psyc_recv_history_message (void *cls, } +/** + * Result of message history replay from PSYC. + */ static void psyc_recv_history_result (void *cls, int64_t result, const void *err_msg, uint16_t err_msg_size) @@ -1574,7 +1575,7 @@ psyc_recv_history_result (void *cls, int64_t result, struct OperationClosure *opcls = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p History replay #%" PRIu64 ": " - "PSYCSTORE returned %" PRId64 " (%.*s)\n", + "PSYCstore returned %" PRId64 " (%.*s)\n", opcls->plc, GNUNET_ntohll (opcls->op_id), result, err_msg_size, err_msg); // FIXME: place might have been destroyed @@ -1635,6 +1636,118 @@ client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, } +/** + * A state variable part arrived from PSYC. + */ +void +psyc_recv_state_var (void *cls, + const struct GNUNET_MessageHeader *mod, + const char *name, + const void *value, + uint32_t value_size, + uint32_t full_value_size) +{ + struct OperationClosure *opcls = cls; + struct Place *plc = opcls->plc; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Received state variable %s from PSYC\n", + plc, name); + + uint16_t size = ntohs (mod->size); + + struct GNUNET_OperationResultMessage * + res = GNUNET_malloc (sizeof (*res) + size); + res->header.size = htons (sizeof (*res) + size); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); + res->op_id = opcls->op_id; + res->result_code = GNUNET_htonll (GNUNET_OK); + + memcpy (&res[1], mod, size); + + /** @todo FIXME: send only to requesting client */ + client_send_msg (plc, &res->header); +} + + +/** + * Result of retrieving state variable from PSYC. + */ +static void +psyc_recv_state_result (void *cls, int64_t result, + const void *err_msg, uint16_t err_msg_size) +{ + struct OperationClosure *opcls = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p State get #%" PRIu64 ": " + "PSYCstore returned %" PRId64 " (%.*s)\n", + opcls->plc, GNUNET_ntohll (opcls->op_id), result, err_msg_size, err_msg); + + // FIXME: place might have been destroyed + client_send_result (opcls->client, opcls->op_id, result, err_msg, err_msg_size); +} + + +/** + * Client requests channel history. + */ +static void +client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + struct Client * + ctx = GNUNET_SERVER_client_get_user_context (client, struct Client); + GNUNET_assert (NULL != ctx); + struct Place *plc = ctx->plc; + + const struct GNUNET_PSYC_StateRequestMessage * + req = (const struct GNUNET_PSYC_StateRequestMessage *) msg; + uint16_t size = ntohs (msg->size); + const char *name = (const char *) &req[1]; + + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p State get #%" PRIu64 ": %s\n", + plc, GNUNET_ntohll (req->op_id), name); + + if (size < sizeof (*req) + 1 + || '\0' != name[size - sizeof (*req) - 1]) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p State get #%" PRIu64 ": " + "invalid name. size: %u < %u?\n", + plc, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1); + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + + struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); + opcls->client = client; + opcls->plc = plc; + opcls->op_id = req->op_id; + + switch (ntohs (msg->type)) + { + case GNUNET_MESSAGE_TYPE_PSYC_STATE_GET: + GNUNET_PSYC_channel_state_get (plc->channel, name, + psyc_recv_state_var, + psyc_recv_state_result, opcls); + break; + + case GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX: + GNUNET_PSYC_channel_state_get_prefix (plc->channel, name, + psyc_recv_state_var, + psyc_recv_state_result, opcls); + break; + + default: + GNUNET_assert (0); + } + + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + static const struct GNUNET_SERVER_MessageHandler handlers[] = { { &client_recv_host_enter, NULL, GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER, 0 }, @@ -1650,13 +1763,13 @@ static const struct GNUNET_SERVER_MessageHandler handlers[] = { { &client_recv_history_replay, NULL, GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 }, -#if FIXME + { &client_recv_state_get, NULL, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 }, - { &client_recv_state_get_prefix, NULL, + { &client_recv_state_get, NULL, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }, -#endif + { NULL, NULL, 0, 0 } }; diff --git a/src/social/social_api.c b/src/social/social_api.c index 74a13cc35..20d0dc8b8 100644 --- a/src/social/social_api.c +++ b/src/social/social_api.c @@ -309,6 +309,21 @@ struct GNUNET_SOCIAL_LookHandle */ GNUNET_ResultCallback result_cb; + /** + * Name of current modifier being received. + */ + char *mod_name; + + /** + * Size of current modifier value being received. + */ + size_t mod_value_size; + + /** + * Remaining size of current modifier value still to be received. + */ + size_t mod_value_remaining; + /** * Closure for @a result_cb. */ @@ -753,49 +768,66 @@ place_recv_state_result (void *cls, const struct GNUNET_OperationResultMessage * res = (const struct GNUNET_OperationResultMessage *) msg; -#if FIXME GNUNET_ResultCallback result_cb = NULL; - struct GNUNET_PSYC_StateRequest *sr = NULL; + struct GNUNET_SOCIAL_LookHandle *look = NULL; if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (plc->client, GNUNET_ntohll (res->op_id), - &result_cb, (void *) &sr)) + &result_cb, (void *) &look)) { /* Operation not found. */ return; } const struct GNUNET_MessageHeader * - modc = (struct GNUNET_MessageHeader *) &res[1]; - uint16_t modc_size = ntohs (modc->size); - if (ntohs (msg->size) - sizeof (*msg) != modc_size) + mod = (struct GNUNET_MessageHeader *) &res[1]; + uint16_t mod_size = ntohs (mod->size); + if (ntohs (msg->size) - sizeof (*res) != mod_size) { - GNUNET_break (0); + GNUNET_break_op (0); + LOG (GNUNET_ERROR_TYPE_WARNING, + "Invalid modifier size in state result: %u - %u != %u\n", + ntohs (msg->size), sizeof (*res), mod_size); return; } - switch (ntohs (modc->type)) + switch (ntohs (mod->type)) { case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: { const struct GNUNET_PSYC_MessageModifier * - mod = (const struct GNUNET_PSYC_MessageModifier *) modc; + pmod = (const struct GNUNET_PSYC_MessageModifier *) mod; - const char *name = (const char *) &mod[1]; - uint16_t name_size = ntohs (mod->name_size); + const char *name = (const char *) &pmod[1]; + uint16_t name_size = ntohs (pmod->name_size); if ('\0' != name[name_size - 1]) { - GNUNET_break (0); + GNUNET_break_op (0); + LOG (GNUNET_ERROR_TYPE_WARNING, + "Invalid modifier name in state result\n"); return; } - sr->var_cb (sr->cls, name, name + name_size, ntohs (mod->value_size)); + look->mod_value_size = ntohs (pmod->value_size); + look->var_cb (look->cls, mod, name, name + name_size, + mod_size - sizeof (*mod) - name_size, + look->mod_value_size); + if (look->mod_value_size > mod_size - sizeof (*mod) - name_size) + { + look->mod_value_remaining = look->mod_value_size; + look->mod_name = GNUNET_malloc (name_size); + memcpy (look->mod_name, name, name_size); + } break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: - sr->var_cb (sr->cls, NULL, (const char *) &modc[1], - modc_size - sizeof (*modc)); + look->var_cb (look->cls, mod, look->mod_name, (const char *) &mod[1], + mod_size - sizeof (*mod), look->mod_value_size); + look->mod_value_remaining -= mod_size - sizeof (*mod); + if (0 == look->mod_value_remaining) + { + GNUNET_free (look->mod_name); + } break; } -#endif } @@ -1980,7 +2012,7 @@ place_state_get (struct GNUNET_SOCIAL_Place *plc, * what was requested). * * @param place - * The place to look the object at. + * The place where to look. * @param full_name * Full name of the object. * @param value_size @@ -2004,7 +2036,7 @@ GNUNET_SOCIAL_place_look_at (struct GNUNET_SOCIAL_Place *plc, * Look for objects in the place with a matching name prefix. * * @param place - * The place to look its objects at. + * The place where to look. * @param name_prefix * Look at objects with names beginning with this value. * @param var_cb diff --git a/src/social/test_social.c b/src/social/test_social.c index 19a81f43d..dbcf822f8 100644 --- a/src/social/test_social.c +++ b/src/social/test_social.c @@ -36,7 +36,7 @@ #include "gnunet_core_service.h" #include "gnunet_identity_service.h" -#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) +#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 300) #define DATA2ARG(data) data, sizeof (data) @@ -99,6 +99,10 @@ struct TransmitClosure uint8_t n; } tmit; +struct ResultClosure { + uint32_t n; +}; + uint8_t join_req_count; struct GNUNET_PSYC_Message *join_resp; @@ -107,19 +111,21 @@ uint32_t counter; enum { TEST_NONE = 0, - TEST_HOST_ANSWER_DOOR_REFUSE = 1, - TEST_GUEST_RECV_ENTRY_DCSN_REFUSE = 2, - TEST_HOST_ANSWER_DOOR_ADMIT = 3, - TEST_GUEST_RECV_ENTRY_DCSN_ADMIT = 4, - TEST_HOST_ANNOUNCE = 5, - TEST_HOST_ANNOUNCE_END = 6, - TEST_HOST_ANNOUNCE2 = 7, - TEST_HOST_ANNOUNCE2_END = 8, - TEST_GUEST_TALK = 9, - TEST_GUEST_HISTORY_REPLAY = 10, - TEST_GUEST_HISTORY_REPLAY_LATEST = 11, - TEST_GUEST_LEAVE = 12, - TEST_HOST_LEAVE = 13, + TEST_HOST_ANSWER_DOOR_REFUSE = 1, + TEST_GUEST_RECV_ENTRY_DCSN_REFUSE = 2, + TEST_HOST_ANSWER_DOOR_ADMIT = 3, + TEST_GUEST_RECV_ENTRY_DCSN_ADMIT = 4, + TEST_HOST_ANNOUNCE = 5, + TEST_HOST_ANNOUNCE_END = 6, + TEST_HOST_ANNOUNCE2 = 7, + TEST_HOST_ANNOUNCE2_END = 8, + TEST_GUEST_TALK = 9, + TEST_GUEST_HISTORY_REPLAY = 10, + TEST_GUEST_HISTORY_REPLAY_LATEST = 11, + TEST_GUEST_LOOK_AT = 12, + TEST_GUEST_LOOK_FOR = 13, + TEST_GUEST_LEAVE = 14, + TEST_HOST_LEAVE = 15, } test; @@ -350,6 +356,86 @@ schedule_guest_leave (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } +static void +guest_look_for_result (void *cls, int64_t result_code, + const void *data, uint16_t data_size) +{ + struct ResultClosure *rcls = cls; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "guest_look_for_result: %d\n", result_code); + GNUNET_assert (GNUNET_OK == result_code); + GNUNET_assert (3 == rcls->n); + GNUNET_free (rcls); + GNUNET_SCHEDULER_add_now (&schedule_guest_leave, NULL); +} + + +static void +guest_look_for_var (void *cls, + const struct GNUNET_MessageHeader *mod, + const char *name, + const void *value, + uint32_t value_size, + uint32_t full_value_size) +{ + struct ResultClosure *rcls = cls; + rcls->n++; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "guest_look_for_var: %s\n%.*s\n", + name, value_size, value); +} + + +static void +guest_look_for () +{ + test = TEST_GUEST_LOOK_FOR; + struct ResultClosure *rcls = GNUNET_malloc (sizeof (*rcls)); + GNUNET_SOCIAL_place_look_for (gst_plc, "_foo", guest_look_for_var, guest_look_for_result, rcls); +} + + +static void +guest_look_at_result (void *cls, int64_t result_code, + const void *data, uint16_t data_size) +{ + struct ResultClosure *rcls = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "guest_look_at_result: %d\n", result_code); + GNUNET_assert (GNUNET_OK == result_code); + GNUNET_assert (1 == rcls->n); + GNUNET_free (rcls); + guest_look_for (); +} + + +static void +guest_look_at_var (void *cls, + const struct GNUNET_MessageHeader *mod, + const char *name, + const void *value, + uint32_t value_size, + uint32_t full_value_size) +{ + struct ResultClosure *rcls = cls; + rcls->n++; + + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "guest_look_at_var: %s\n%.*s\n", + name, value_size, value); +} + + +static void +guest_look_at () +{ + test = TEST_GUEST_LOOK_AT; + struct ResultClosure *rcls = GNUNET_malloc (sizeof (*rcls)); + GNUNET_SOCIAL_place_look_at (gst_plc, "_foo_bar", guest_look_at_var, guest_look_at_result, rcls); +} + + static void guest_recv_history_replay_latest_result (void *cls, int64_t result, const void *data, uint16_t data_size) @@ -361,7 +447,7 @@ guest_recv_history_replay_latest_result (void *cls, int64_t result, GNUNET_assert (2 == counter); /* message count */ GNUNET_assert (7 == result); /* fragment count */ - GNUNET_SCHEDULER_add_now (&schedule_guest_leave, NULL); + guest_look_at (); } @@ -488,6 +574,7 @@ guest_recv_eom (void *cls, break; default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "invalid test: %d\n", test); GNUNET_assert (0); } } @@ -570,10 +657,11 @@ host_recv_eom (void *cls, break; case TEST_GUEST_TALK: - guest_history_replay (); + guest_history_replay (); break; default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "invalid test: %d\n", test); GNUNET_assert (0); } } @@ -624,7 +712,8 @@ host_announce () tmit.host_ann = GNUNET_SOCIAL_host_announce (hst, "_message_host", tmit.env, ¬ify_data, &tmit, - GNUNET_SOCIAL_ANNOUNCE_NONE); + GNUNET_SOCIAL_ANNOUNCE_NONE + | GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY); } @@ -689,6 +778,7 @@ guest_recv_entry_decision (void *cls, break; default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "invalid test: %d\n", test); GNUNET_assert (0); } } @@ -728,6 +818,7 @@ host_answer_door (void *cls, break; default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "invalid test: %d\n", test); GNUNET_assert (0); } } -- 2.25.1