X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fpsyc%2Fgnunet-service-psyc.c;h=2b436aa214008e1ac7dbb0a18f5b262c75c0efef;hb=29e6158507a0758192075ac6ece7ba8e75ddc49a;hp=e7020bc69a72d86a2a98ad29c176268a036de2ab;hpb=b63820a52b63d264bead047d0d6f4b76a94c4030;p=oweals%2Fgnunet.git diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index e7020bc69..2b436aa21 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c @@ -1,6 +1,6 @@ /* * This file is part of GNUnet - * (C) 2013 Christian Grothoff (and other contributing authors) + * Copyright (C) 2013 GNUnet e.V. * * GNUnet is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ * * You should have received a copy of the GNU General Public License * along with GNUnet; see the file COPYING. If not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. + * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. */ /** @@ -98,15 +98,14 @@ struct TransmitMessage uint16_t size; /** - * @see enum MessageState + * Type of first message part. */ - uint8_t state; + uint16_t first_ptype; /** - * Whether a message ACK has already been sent to the client. - * #GNUNET_YES or #GNUNET_NO + * Type of last message part. */ - uint8_t ack_sent; + uint16_t last_ptype; /* Followed by message */ }; @@ -170,6 +169,11 @@ struct FragmentQueue */ uint8_t state; + /** + * Whether the state is already modified in PSYCstore. + */ + uint8_t state_is_modified; + /** * Is the message queued for delivery to the client? * i.e. added to the recv_msgs queue @@ -181,21 +185,37 @@ struct FragmentQueue /** * List of connected clients. */ -struct ClientListItem +struct Client { - struct ClientListItem *prev; - struct ClientListItem *next; + struct Client *prev; + struct Client *next; + struct GNUNET_SERVER_Client *client; }; +struct Operation +{ + struct Operation *prev; + struct Operation *next; + + struct GNUNET_SERVER_Client *client; + struct Channel *chn; + uint64_t op_id; + uint32_t flags; +}; + + /** * Common part of the client context for both a channel master and slave. */ struct Channel { - struct ClientListItem *clients_head; - struct ClientListItem *clients_tail; + struct Client *clients_head; + struct Client *clients_tail; + + struct Operation *op_head; + struct Operation *op_tail; struct TransmitMessage *tmit_head; struct TransmitMessage *tmit_tail; @@ -249,16 +269,6 @@ struct Channel */ uint32_t tmit_mod_value_size; - /** - * @see enum MessageState - */ - uint8_t tmit_state; - - /** - * FIXME: needed? - */ - uint8_t in_transmit; - /** * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)? */ @@ -305,7 +315,7 @@ struct Master /** * Incoming join requests from multicast. - * member_key -> struct GNUNET_MULTICAST_JoinHandle * + * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle * */ struct GNUNET_CONTAINER_MultiHashMap *join_reqs; @@ -399,25 +409,40 @@ struct Slave * Maximum request ID for this channel. */ uint64_t max_request_id; + + /** + * Join flags. + */ + enum GNUNET_PSYC_SlaveJoinFlags join_flags; }; static void transmit_message (struct Channel *chn); +static uint64_t +message_queue_run (struct Channel *chn); static uint64_t message_queue_drop (struct Channel *chn); +static void +schedule_transmit_message (void *cls) +{ + struct Channel *chn = cls; + + transmit_message (chn); +} + + /** * Task run during shutdown. * * @param cls unused - * @param tc unused */ static void -shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_task (void *cls) { if (NULL != nc) { @@ -432,6 +457,28 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } +static struct Operation * +op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client, + uint64_t op_id, uint32_t flags) +{ + struct Operation *op = GNUNET_malloc (sizeof (*op)); + op->client = client; + op->chn = chn; + op->op_id = op_id; + op->flags = flags; + GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op); + return op; +} + + +static void +op_remove (struct Operation *op) +{ + GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op); + GNUNET_free (op); +} + + /** * Clean up master data structures after a client disconnected. */ @@ -443,7 +490,7 @@ cleanup_master (struct Master *mst) if (NULL != mst->origin) GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); - GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn); + GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst); } @@ -483,7 +530,7 @@ cleanup_slave (struct Slave *slv) GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME slv->member = NULL; } - GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn); + GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); } @@ -493,8 +540,12 @@ cleanup_slave (struct Slave *slv) static void cleanup_channel (struct Channel *chn) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Cleaning up channel %s. master? %u\n", + chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master); message_queue_drop (chn); - GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash); + GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags); + chn->recv_frags = NULL; if (NULL != chn->store_op) { @@ -538,7 +589,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", GNUNET_h2s (&chn->pub_key_hash)); - struct ClientListItem *cli = chn->clients_head; + struct Client *cli = chn->clients_head; while (NULL != cli) { if (cli->client == client) @@ -550,8 +601,24 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) cli = cli->next; } + struct Operation *op = chn->op_head; + while (NULL != op) + { + if (op->client == client) + { + op->client = NULL; + break; + } + op = op->next; + } + if (NULL == chn->clients_head) { /* Last client disconnected. */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Last client (%s) disconnected from channel %s\n", + chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", + GNUNET_h2s (&chn->pub_key_hash)); + chn->is_disconnected = GNUNET_YES; if (NULL != chn->tmit_head) { /* Send pending messages to multicast before cleanup. */ transmit_message (chn); @@ -571,10 +638,10 @@ static void client_send_msg (const struct Channel *chn, const struct GNUNET_MessageHeader *msg) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Sending message to clients.\n", chn); - struct ClientListItem *cli = chn->clients_head; + struct Client *cli = chn->clients_head; while (NULL != cli) { GNUNET_SERVER_notification_context_add (nc, cli->client); @@ -584,12 +651,52 @@ client_send_msg (const struct Channel *chn, } +/** + * Send a result code back to the client. + * + * @param client + * Client that should receive the result code. + * @param result_code + * Code to transmit. + * @param op_id + * Operation ID in network byte order. + * @param data + * Data payload or NULL. + * @param data_size + * Size of @a data. + */ +static void +client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id, + int64_t result_code, const void *data, uint16_t data_size) +{ + struct GNUNET_OperationResultMessage *res; + + res = GNUNET_malloc (sizeof (*res) + data_size); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); + res->header.size = htons (sizeof (*res) + data_size); + res->result_code = GNUNET_htonll (result_code); + res->op_id = op_id; + if (0 < data_size) + memcpy (&res[1], data, data_size); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Sending result to client for operation #%" PRIu64 ": " + "%" PRId64 " (size: %u)\n", + client, GNUNET_ntohll (op_id), result_code, data_size); + + GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_unicast (nc, client, &res->header, + GNUNET_NO); + GNUNET_free (res); +} + + /** * Closure for join_mem_test_cb() */ struct JoinMemTestClosure { - struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; + struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key; struct Channel *chn; struct GNUNET_MULTICAST_JoinHandle *jh; struct GNUNET_PSYC_JoinRequestMessage *join_msg; @@ -600,22 +707,29 @@ struct JoinMemTestClosure * Membership test result callback used for join requests. */ static void -join_mem_test_cb (void *cls, int64_t result, const char *err_msg) +join_mem_test_cb (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { struct JoinMemTestClosure *jcls = cls; if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master) { /* Pass on join request to client if this is a master channel */ struct Master *mst = (struct Master *) jcls->chn; - struct GNUNET_HashCode slave_key_hash; - GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key), - &slave_key_hash); - GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh, + struct GNUNET_HashCode slave_pub_hash; + GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key), + &slave_pub_hash); + GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->jh, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); client_send_msg (jcls->chn, &jcls->join_msg->header); } else { + if (GNUNET_SYSERR == result) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Could not perform membership test (%.*s)\n", + err_msg_size, err_msg); + } // FIXME: add relays GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL); } @@ -629,7 +743,7 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg) */ static void mcast_recv_join_request (void *cls, - const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key, const struct GNUNET_MessageHeader *join_msg, struct GNUNET_MULTICAST_JoinHandle *jh) { @@ -655,17 +769,17 @@ mcast_recv_join_request (void *cls, req = GNUNET_malloc (sizeof (*req) + join_msg_size); req->header.size = htons (sizeof (*req) + join_msg_size); req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST); - req->slave_key = *slave_key; + req->slave_pub_key = *slave_pub_key; if (0 < join_msg_size) memcpy (&req[1], join_msg, join_msg_size); struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls)); - jcls->slave_key = *slave_key; + jcls->slave_pub_key = *slave_pub_key; jcls->chn = chn; jcls->jh = jh; jcls->join_msg = req; - GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key, + GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key, chn->max_message_id, 0, &join_mem_test_cb, jcls); } @@ -685,6 +799,11 @@ mcast_recv_join_decision (void *cls, int is_admitted, struct Channel *chn = &slv->chn; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join decision: %d\n", slv, is_admitted); + if (GNUNET_YES == chn->is_ready) + { + /* Already admitted */ + return; + } uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; struct GNUNET_PSYC_JoinDecisionMessage * @@ -697,48 +816,11 @@ mcast_recv_join_decision (void *cls, int is_admitted, client_send_msg (chn, &dcsn->header); - if (GNUNET_YES == is_admitted) + if (GNUNET_YES == is_admitted + && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)) { chn->is_ready = GNUNET_YES; } - else - { - slv->member = NULL; - } -} - - -/** - * Received result of GNUNET_PSYCSTORE_membership_test() - */ -static void -store_recv_membership_test_result (void *cls, int64_t result, const char *err_msg) -{ - struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%s)\n", - mth, result, err_msg); - - GNUNET_MULTICAST_membership_test_result (mth, result); -} - - -/** - * Incoming membership test request from multicast. - */ -static void -mcast_recv_membership_test (void *cls, - const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, - uint64_t message_id, uint64_t group_generation, - struct GNUNET_MULTICAST_MembershipTestHandle *mth) -{ - struct Channel *chn = cls; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received membership test request from multicast.\n", - mth); - GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key, - message_id, group_generation, - &store_recv_membership_test_result, mth); } @@ -758,12 +840,13 @@ store_recv_fragment_replay (void *cls, * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. */ static void -store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_msg) +store_recv_fragment_replay_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { struct GNUNET_MULTICAST_ReplayHandle *rh = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%s)\n", - rh, result, err_msg); + "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n", + rh, result, err_msg_size, err_msg); switch (result) { @@ -778,6 +861,7 @@ store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_ms case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED: GNUNET_MULTICAST_replay_response (rh, NULL, GNUNET_MULTICAST_REC_ACCESS_DENIED); + break; case GNUNET_SYSERR: GNUNET_MULTICAST_replay_response (rh, NULL, @@ -793,13 +877,14 @@ store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_ms */ static void mcast_recv_replay_fragment (void *cls, - const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key, uint64_t fragment_id, uint64_t flags, struct GNUNET_MULTICAST_ReplayHandle *rh) { struct Channel *chn = cls; - GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key, fragment_id, + GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key, + fragment_id, fragment_id, &store_recv_fragment_replay, &store_recv_fragment_replay_result, rh); } @@ -810,14 +895,15 @@ mcast_recv_replay_fragment (void *cls, */ static void mcast_recv_replay_message (void *cls, - const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key, uint64_t message_id, uint64_t fragment_offset, uint64_t flags, struct GNUNET_MULTICAST_ReplayHandle *rh) { struct Channel *chn = cls; - GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, message_id, + GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key, + message_id, message_id, 1, NULL, &store_recv_fragment_replay, &store_recv_fragment_replay_result, rh); } @@ -861,29 +947,57 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) /** - * Send multicast message to all clients connected to the channel. + * Initialize PSYC message header. */ -static void -client_send_mcast_msg (struct Channel *chn, - const struct GNUNET_MULTICAST_MessageHeader *mmsg) +static inline void +psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg, + const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) +{ + uint16_t size = ntohs (mmsg->header.size); + uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); + + pmsg->header.size = htons (psize); + pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + pmsg->message_id = mmsg->message_id; + pmsg->fragment_offset = mmsg->fragment_offset; + pmsg->flags = htonl (flags); + + memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); +} + + +/** + * Create a new PSYC message from a multicast message for sending it to clients. + */ +static inline struct GNUNET_PSYC_MessageHeader * +psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) { struct GNUNET_PSYC_MessageHeader *pmsg; uint16_t size = ntohs (mmsg->header.size); uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); + pmsg = GNUNET_malloc (psize); + psyc_msg_init (pmsg, mmsg, flags); + return pmsg; +} + + +/** + * Send multicast message to all clients connected to the channel. + */ +static void +client_send_mcast_msg (struct Channel *chn, + const struct GNUNET_MULTICAST_MessageHeader *mmsg, + uint32_t flags) +{ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Sending multicast message to client. " "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", chn, GNUNET_ntohll (mmsg->fragment_id), GNUNET_ntohll (mmsg->message_id)); - pmsg = GNUNET_malloc (psize); - pmsg->header.size = htons (psize); - pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); - pmsg->message_id = mmsg->message_id; - pmsg->fragment_offset = mmsg->fragment_offset; - - memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); + struct GNUNET_PSYC_MessageHeader * + pmsg = GNUNET_PSYC_message_header_create (mmsg, flags); client_send_msg (chn, &pmsg->header); GNUNET_free (pmsg); } @@ -914,9 +1028,13 @@ client_send_mcast_req (struct Master *mst, pmsg->message_id = req->request_id; pmsg->fragment_offset = req->fragment_offset; pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); - + pmsg->slave_pub_key = req->member_pub_key; memcpy (&pmsg[1], &req[1], size - sizeof (*req)); + client_send_msg (chn, &pmsg->header); + + /* FIXME: save req to PSYCstore so that it can be resent later to clients */ + GNUNET_free (pmsg); } @@ -924,7 +1042,7 @@ client_send_mcast_req (struct Master *mst, /** * Insert a multicast message fragment into the queue belonging to the message. * - * @param chn Channel. + * @param chn Channel. * @param mmsg Multicast message fragment. * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. * @param first_ptype First PSYC message part type in @a mmsg. @@ -1028,8 +1146,8 @@ fragment_queue_insert (struct Channel *chn, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Header of message %" PRIu64 " is NOT complete yet: " "%" PRIu64 " != %" PRIu64 "\n", - chn, GNUNET_ntohll (mmsg->message_id), frag_offset, - fragq->header_size); + chn, GNUNET_ntohll (mmsg->message_id), + frag_offset, fragq->header_size); } } @@ -1042,8 +1160,8 @@ fragment_queue_insert (struct Channel *chn, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Message %" PRIu64 " is NOT complete yet: " "%" PRIu64 " != %" PRIu64 "\n", - chn, GNUNET_ntohll (mmsg->message_id), frag_offset, - fragq->size); + chn, GNUNET_ntohll (mmsg->message_id), + frag_offset, fragq->size); break; case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: @@ -1079,11 +1197,15 @@ fragment_queue_insert (struct Channel *chn, * Send fragments of a message in order to client, after all modifiers arrived * from multicast. * - * @param chn Channel. - * @param msg_id ID of the message @a fragq belongs to. - * @param fragq Fragment queue of the message. - * @param drop Drop message without delivering to client? - * #GNUNET_YES or #GNUNET_NO. + * @param chn + * Channel. + * @param msg_id + * ID of the message @a fragq belongs to. + * @param fragq + * Fragment queue of the message. + * @param drop + * Drop message without delivering to client? + * #GNUNET_YES or #GNUNET_NO. */ static void fragment_queue_run (struct Channel *chn, uint64_t msg_id, @@ -1111,7 +1233,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id, { if (GNUNET_NO == drop) { - client_send_mcast_msg (chn, cache_entry->mmsg); + client_send_mcast_msg (chn, cache_entry->mmsg, 0); } if (cache_entry->ref_count <= 1) { @@ -1151,6 +1273,55 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id, } +struct StateModifyClosure +{ + struct Channel *chn; + uint64_t msg_id; + struct GNUNET_HashCode msg_id_hash; +}; + + +void +store_recv_state_modify_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct StateModifyClosure *mcls = cls; + struct Channel *chn = mcls->chn; + uint64_t msg_id = mcls->msg_id; + + struct FragmentQueue * + fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n", + chn, result, err_msg_size, err_msg); + + switch (result) + { + case GNUNET_OK: + case GNUNET_NO: + if (NULL != fragq) + fragq->state_is_modified = GNUNET_YES; + if (chn->max_state_message_id < msg_id) + chn->max_state_message_id = msg_id; + if (chn->max_message_id < msg_id) + chn->max_message_id = msg_id; + + if (NULL != fragq) + fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); + GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); + message_queue_run (chn); + break; + + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n", + chn, result, err_msg_size, err_msg); + /** @todo FIXME: handle state_modify error */ + } +} + + /** * Run message queue. * @@ -1171,6 +1342,7 @@ message_queue_run (struct Channel *chn) "%p Running message queue.\n", chn); uint64_t n = 0; uint64_t msg_id; + while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, &msg_id)) { @@ -1190,37 +1362,58 @@ message_queue_run (struct Channel *chn) break; } - if (MSG_FRAG_STATE_HEADER == fragq->state) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Fragment queue entry: state: %u, state delta: " + "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n", + chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id); + + if (MSG_FRAG_STATE_DATA <= fragq->state) { /* Check if there's a missing message before the current one */ if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn); + if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) - && msg_id - 1 != chn->max_message_id) + && (chn->max_message_id != msg_id - 1 + && chn->max_message_id != msg_id)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Out of order message. " - "(%" PRIu64 " - 1 != %" PRIu64 ")\n", - chn, msg_id, chn->max_message_id); + "(%" PRIu64 " != %" PRIu64 " - 1)\n", + chn, chn->max_message_id, msg_id); break; + // FIXME: keep track of messages processed in this queue run, + // and only stop after reaching the end } } else { - if (msg_id - fragq->state_delta != chn->max_state_message_id) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn); + if (GNUNET_YES != fragq->state_is_modified) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Out of order stateful message. " - "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", - chn, msg_id, fragq->state_delta, chn->max_state_message_id); - break; + if (msg_id - fragq->state_delta != chn->max_state_message_id) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Out of order stateful message. " + "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", + chn, msg_id, fragq->state_delta, chn->max_state_message_id); + break; + // FIXME: keep track of messages processed in this queue run, + // and only stop after reaching the end + } + + struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls)); + mcls->chn = chn; + mcls->msg_id = msg_id; + mcls->msg_id_hash = msg_id_hash; + + /* Apply modifiers to state in PSYCstore */ + GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id, + fragq->state_delta, + store_recv_state_modify_result, mcls); + break; // continue after asynchronous state modify result } -#if TODO - /* FIXME: apply modifiers to state in PSYCstore */ - GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id, - store_recv_state_modify_result, cls); -#endif - chn->max_state_message_id = msg_id; } chn->max_message_id = msg_id; } @@ -1228,6 +1421,7 @@ message_queue_run (struct Channel *chn) GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); n++; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Removed %" PRIu64 " messages from queue.\n", chn, n); return n; @@ -1260,7 +1454,7 @@ message_queue_drop (struct Channel *chn) struct FragmentQueue * fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); - + GNUNET_assert (NULL != fragq); fragment_queue_run (chn, msg_id, fragq, GNUNET_YES); GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); n++; @@ -1275,12 +1469,13 @@ message_queue_drop (struct Channel *chn) * Received result of GNUNET_PSYCSTORE_fragment_store(). */ static void -store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg) +store_recv_fragment_store_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { struct Channel *chn = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n", - chn, result, err_msg); + "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n", + chn, result, err_msg_size, err_msg); } @@ -1296,17 +1491,26 @@ mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg uint16_t size = ntohs (mmsg->header.size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received multicast message of size %u.\n", - chn, size); + "%p Received multicast message of size %u. " + "fragment_id=%" PRIu64 ", message_id=%" PRIu64 + ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n", + chn, size, + GNUNET_ntohll (mmsg->fragment_id), + GNUNET_ntohll (mmsg->message_id), + GNUNET_ntohll (mmsg->fragment_offset), + GNUNET_ntohll (mmsg->flags)); GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0, &store_recv_fragment_store_result, chn); uint16_t first_ptype = 0, last_ptype = 0; - if (GNUNET_SYSERR - == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg), - (const char *) &mmsg[1], - &first_ptype, &last_ptype)) + int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg), + (const char *) &mmsg[1], + &first_ptype, &last_ptype); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Message check result %d, first part type %u, last part type %u\n", + chn, check, first_ptype, last_ptype); + if (GNUNET_SYSERR == check) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Dropping incoming multicast message with invalid parts.\n", @@ -1315,10 +1519,6 @@ mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Message parts: first: type %u, last: type %u\n", - first_ptype, last_ptype); - fragment_queue_insert (chn, mmsg, first_ptype, last_ptype); message_queue_run (chn); } @@ -1337,9 +1537,11 @@ mcast_recv_request (void *cls, struct Master *mst = cls; uint16_t size = ntohs (req->header.size); + char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received multicast request of size %u.\n", - mst, size); + "%p Received multicast request of size %u from %s.\n", + mst, size, str); + GNUNET_free (str); uint16_t first_ptype = 0, last_ptype = 0; if (GNUNET_SYSERR @@ -1375,7 +1577,7 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id, struct Channel *chn = &mst->chn; chn->store_op = NULL; - struct CountersResult res; + struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); res.header.size = htons (sizeof (res)); res.result_code = htonl (result); @@ -1389,12 +1591,11 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id, mst->max_group_generation = max_group_generation; mst->origin = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id, - &mcast_recv_join_request, - &mcast_recv_membership_test, - &mcast_recv_replay_fragment, - &mcast_recv_replay_message, - &mcast_recv_request, - &mcast_recv_message, chn); + mcast_recv_join_request, + mcast_recv_replay_fragment, + mcast_recv_replay_message, + mcast_recv_request, + mcast_recv_message, chn); chn->is_ready = GNUNET_YES; } else @@ -1421,7 +1622,7 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, struct Channel *chn = &slv->chn; chn->store_op = NULL; - struct CountersResult res; + struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); res.header.size = htons (sizeof (res)); res.result_code = htonl (result); @@ -1436,12 +1637,11 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, &slv->origin, slv->relay_count, slv->relays, &slv->join_msg->header, - &mcast_recv_join_request, - &mcast_recv_join_decision, - &mcast_recv_membership_test, - &mcast_recv_replay_fragment, - &mcast_recv_replay_message, - &mcast_recv_message, chn); + mcast_recv_join_request, + mcast_recv_join_decision, + mcast_recv_replay_fragment, + mcast_recv_replay_message, + mcast_recv_message, chn); if (NULL != slv->join_msg) { GNUNET_free (slv->join_msg); @@ -1511,7 +1711,7 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client, { chn = &mst->chn; - struct CountersResult res; + struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); res.header.size = htons (sizeof (res)); res.result_code = htonl (GNUNET_OK); @@ -1526,7 +1726,7 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client, "%p Client connected as master to channel %s.\n", mst, GNUNET_h2s (&chn->pub_key_hash)); - struct ClientListItem *cli = GNUNET_new (struct ClientListItem); + struct Client *cli = GNUNET_new (struct Client); cli->client = client; GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); @@ -1547,11 +1747,11 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, uint16_t req_size = ntohs (req->header.size); struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key; - struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash; + struct GNUNET_HashCode pub_key_hash, slv_pub_hash; GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key); - GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash); - GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash); + GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash); + GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash); struct GNUNET_CONTAINER_MultiHashMap * chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash); @@ -1560,16 +1760,17 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, if (NULL != chn_slv) { - slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash); + slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash); } if (NULL == slv) { slv = GNUNET_new (struct Slave); slv->priv_key = req->slave_key; slv->pub_key = slv_pub_key; - slv->pub_key_hash = slv_pub_key_hash; + slv->pub_key_hash = slv_pub_hash; slv->origin = req->origin; slv->relay_count = ntohl (req->relay_count); + slv->join_flags = ntohl (req->flags); const struct GNUNET_PeerIdentity * relays = (const struct GNUNET_PeerIdentity *) &req[1]; @@ -1579,9 +1780,11 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader) <= req_size) { - join_msg_size = ntohs (slv->join_msg->header.size); + struct GNUNET_PSYC_Message * + join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size); + join_msg_size = ntohs (join_msg->header.size); slv->join_msg = GNUNET_malloc (join_msg_size); - memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size); + memcpy (slv->join_msg, join_msg, join_msg_size); } if (sizeof (*req) + relay_size + join_msg_size != req_size) { @@ -1590,6 +1793,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, sizeof (*req), relay_size, join_msg_size, req_size); GNUNET_break (0); GNUNET_SERVER_client_disconnect (client); + GNUNET_free (slv); return; } if (0 < slv->relay_count) @@ -1600,7 +1804,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, chn = &slv->chn; chn->is_master = GNUNET_NO; - chn->pub_key = req->channel_key; + chn->pub_key = req->channel_pub_key; chn->pub_key_hash = pub_key_hash; channel_init (chn); @@ -1621,7 +1825,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, { chn = &slv->chn; - struct CountersResult res; + struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); res.header.size = htons (sizeof (res)); res.result_code = htonl (GNUNET_OK); @@ -1631,7 +1835,12 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_notification_context_unicast (nc, client, &res.header, GNUNET_NO); - if (NULL == slv->member) + if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags) + { + mcast_recv_join_decision (slv, GNUNET_YES, + NULL, 0, NULL, NULL); + } + else if (NULL == slv->member) { slv->member = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, @@ -1640,7 +1849,6 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, &slv->join_msg->header, &mcast_recv_join_request, &mcast_recv_join_decision, - &mcast_recv_membership_test, &mcast_recv_replay_fragment, &mcast_recv_replay_message, &mcast_recv_message, chn); @@ -1663,7 +1871,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, "%p Client connected as slave to channel %s.\n", slv, GNUNET_h2s (&chn->pub_key_hash)); - struct ClientListItem *cli = GNUNET_new (struct ClientListItem); + struct Client *cli = GNUNET_new (struct Client); cli->client = client; GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); @@ -1701,34 +1909,41 @@ static void client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { - struct Channel * - chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); - GNUNET_assert (GNUNET_YES == chn->is_master); - struct Master *mst = (struct Master *) chn; - - struct GNUNET_PSYC_JoinDecisionMessage * - dcsn = (struct GNUNET_PSYC_JoinDecisionMessage *) msg; + const struct GNUNET_PSYC_JoinDecisionMessage *dcsn + = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg; + struct Channel *chn; + struct Master *mst; struct JoinDecisionClosure jcls; + + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + if (NULL == chn) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + GNUNET_assert (GNUNET_YES == chn->is_master); + mst = (struct Master *) chn; jcls.is_admitted = ntohl (dcsn->is_admitted); jcls.msg = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size)) ? (struct GNUNET_MessageHeader *) &dcsn[1] : NULL; - struct GNUNET_HashCode slave_key_hash; - GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key), - &slave_key_hash); + struct GNUNET_HashCode slave_pub_hash; + GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key), + &slave_pub_hash); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join decision (%d) from client for channel %s..\n", mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p ..and slave %s.\n", - mst, GNUNET_h2s (&slave_key_hash)); + mst, GNUNET_h2s (&slave_pub_hash)); - GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash, + GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash, &mcast_send_join_decision, &jcls); - GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash); + GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -1766,6 +1981,8 @@ transmit_notify (void *cls, size_t *data_size, void *data) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p transmit_notify: nothing to send.\n", chn); + if (NULL != tmit_msg && *data_size < tmit_msg->size) + GNUNET_break (0); *data_size = 0; return GNUNET_NO; } @@ -1776,9 +1993,13 @@ transmit_notify (void *cls, size_t *data_size, void *data) *data_size = tmit_msg->size; memcpy (data, &tmit_msg[1], *data_size); - int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES; + int ret + = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) + ? GNUNET_NO + : GNUNET_YES; - if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent) + /* FIXME: handle disconnecting clients */ + if (NULL != tmit_msg->client) send_message_ack (chn, tmit_msg->client); GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg); @@ -1786,12 +2007,13 @@ transmit_notify (void *cls, size_t *data_size, void *data) if (NULL != chn->tmit_head) { - transmit_message (chn); + GNUNET_SCHEDULER_add_now (schedule_transmit_message, chn); } - else if (GNUNET_YES == chn->is_disconnected) + else if (GNUNET_YES == chn->is_disconnected + && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) { /* FIXME: handle partial message (when still in_transmit) */ - cleanup_channel (chn); + return GNUNET_SYSERR; } return ret; } @@ -1837,10 +2059,14 @@ slave_transmit_notify (void *cls, size_t *data_size, void *data) static void master_transmit_message (struct Master *mst) { + struct Channel *chn = &mst->chn; + struct TransmitMessage *tmit_msg = chn->tmit_head; + if (NULL == tmit_msg) + return; if (NULL == mst->tmit_handle) { mst->tmit_handle - = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id, + = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id, mst->max_group_generation, master_transmit_notify, mst); } @@ -1857,10 +2083,12 @@ master_transmit_message (struct Master *mst) static void slave_transmit_message (struct Slave *slv) { + if (NULL == slv->chn.tmit_head) + return; if (NULL == slv->tmit_handle) { slv->tmit_handle - = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id, + = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id, slave_transmit_notify, slv); } else @@ -1883,14 +2111,16 @@ transmit_message (struct Channel *chn) * Queue a message from a channel master for sending to the multicast group. */ static void -master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, - uint16_t first_ptype, uint16_t last_ptype) +master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst); - if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) { tmit_msg->id = ++mst->max_message_id; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p master_queue_message: message_id=%" PRIu64 "\n", + mst, tmit_msg->id); struct GNUNET_PSYC_MessageMethod *pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; @@ -1900,13 +2130,24 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, } else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p master_queue_message: state_delta=%" PRIu64 "\n", + mst, tmit_msg->id - mst->max_state_message_id); pmeth->state_delta = GNUNET_htonll (tmit_msg->id - mst->max_state_message_id); + mst->max_state_message_id = tmit_msg->id; } else { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p master_queue_message: state not modified\n", mst); pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); } + + if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH) + { + /// @todo add state_hash to PSYC header + } } } @@ -1915,10 +2156,9 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, * Queue a message from a channel slave for sending to the multicast group. */ static void -slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg, - uint16_t first_ptype, uint16_t last_ptype) +slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg) { - if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) { struct GNUNET_PSYC_MessageMethod *pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; @@ -1931,12 +2171,18 @@ slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg, /** * Queue PSYC message parts for sending to multicast. * - * @param chn Channel to send to. - * @param client Client the message originates from. - * @param data_size Size of @a data. - * @param data Concatenated message parts. - * @param first_ptype First message part type in @a data. - * @param last_ptype Last message part type in @a data. + * @param chn + * Channel to send to. + * @param client + * Client the message originates from. + * @param data_size + * Size of @a data. + * @param data + * Concatenated message parts. + * @param first_ptype + * First message part type in @a data. + * @param last_ptype + * Last message part type in @a data. */ static struct TransmitMessage * queue_message (struct Channel *chn, @@ -1950,17 +2196,16 @@ queue_message (struct Channel *chn, memcpy (&tmit_msg[1], data, data_size); tmit_msg->client = client; tmit_msg->size = data_size; - tmit_msg->state = chn->tmit_state; + tmit_msg->first_ptype = first_ptype; + tmit_msg->last_ptype = last_ptype; /* FIXME: separate queue per message ID */ GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg); chn->is_master - ? master_queue_message ((struct Master *) chn, tmit_msg, - first_ptype, last_ptype) - : slave_queue_message ((struct Slave *) chn, tmit_msg, - first_ptype, last_ptype); + ? master_queue_message ((struct Master *) chn, tmit_msg) + : slave_queue_message ((struct Slave *) chn, tmit_msg); return tmit_msg; } @@ -2014,7 +2259,9 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, uint16_t size = ntohs (msg->size); if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p Message payload too large: %u < %u.\n", + chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg)); GNUNET_break (0); transmit_cancel (chn, client); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); @@ -2051,13 +2298,17 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, * Received result of GNUNET_PSYCSTORE_membership_store() */ static void -store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg) +store_recv_membership_store_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { - struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls; + struct Operation *op = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n", - mth, result, err_msg); - /* FIXME: send result to client */ + "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n", + op->chn, result, err_msg_size, err_msg); + + if (NULL != op->client) + client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); + op_remove (op); } @@ -2075,6 +2326,8 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, const struct ChannelMembershipStoreRequest * req = (const struct ChannelMembershipStoreRequest *) msg; + struct Operation *op = op_add (chn, client, req->op_id, 0); + uint64_t announced_at = GNUNET_ntohll (req->announced_at); uint64_t effective_since = GNUNET_ntohll (req->effective_since); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2083,22 +2336,200 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n", chn, req->did_join, announced_at, effective_since); - GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key, + GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key, req->did_join, announced_at, effective_since, 0, /* FIXME: group_generation */ - &store_recv_membership_store_result, chn); + &store_recv_membership_store_result, op); GNUNET_SERVER_receive_done (client, GNUNET_OK); } /** - * Client requests channel history from PSYCstore. + * Received a fragment for GNUNET_PSYCSTORE_fragment_get(), + * in response to a history request from a client. + */ +static int +store_recv_fragment_history (void *cls, + struct GNUNET_MULTICAST_MessageHeader *mmsg, + enum GNUNET_PSYCSTORE_MessageFlags flags) +{ + struct Operation *op = cls; + if (NULL == op->client) + { /* Requesting client already disconnected. */ + return GNUNET_NO; + } + struct Channel *chn = op->chn; + + struct GNUNET_PSYC_MessageHeader *pmsg; + uint16_t msize = ntohs (mmsg->header.size); + uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg); + + struct GNUNET_OperationResultMessage * + res = GNUNET_malloc (sizeof (*res) + psize); + res->header.size = htons (sizeof (*res) + psize); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT); + res->op_id = op->op_id; + res->result_code = GNUNET_htonll (GNUNET_OK); + + pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; + GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC); + memcpy (&res[1], pmsg, psize); + + /** @todo FIXME: send only to requesting client */ + client_send_msg (chn, &res->header); + return GNUNET_YES; +} + + +/** + * Received the result of GNUNET_PSYCSTORE_fragment_get(), + * in response to a history request from a client. */ static void -client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +store_recv_fragment_history_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { + struct Operation *op = cls; + if (NULL == op->client) + { /* Requesting client already disconnected. */ + return; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p History replay #%" PRIu64 ": " + "PSYCSTORE returned %" PRId64 " (%.*s)\n", + op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); + if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE) + { + /** @todo Multicast replay request for messages not found locally. */ + } + + client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); + op_remove (op); +} + + +/** + * Client requests channel history. + */ +static void +client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + struct Channel * + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + GNUNET_assert (NULL != chn); + + const struct GNUNET_PSYC_HistoryRequestMessage * + req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg; + uint16_t size = ntohs (msg->size); + const char *method_prefix = (const char *) &req[1]; + + if (size < sizeof (*req) + 1 + || '\0' != method_prefix[size - sizeof (*req) - 1]) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p History replay #%" PRIu64 ": " + "invalid method prefix. size: %u < %u?\n", + chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1); + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + + struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags)); + + if (0 == req->message_limit) + GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL, + GNUNET_ntohll (req->start_message_id), + GNUNET_ntohll (req->end_message_id), + 0, method_prefix, + &store_recv_fragment_history, + &store_recv_fragment_history_result, op); + else + GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL, + GNUNET_ntohll (req->message_limit), + method_prefix, + &store_recv_fragment_history, + &store_recv_fragment_history_result, + op); + + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +/** + * Received state var from PSYCstore, send it to client. + */ +static int +store_recv_state_var (void *cls, const char *name, + const void *value, uint32_t value_size) +{ + struct Operation *op = cls; + struct GNUNET_OperationResultMessage *res; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n", + op->chn, GNUNET_ntohll (op->op_id), name); + + if (NULL != name) /* First part */ + { + uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; + struct GNUNET_PSYC_MessageModifier *mod; + res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size); + res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); + res->op_id = op->op_id; + + mod = (struct GNUNET_PSYC_MessageModifier *) &res[1]; + mod->header.size = htons (sizeof (*mod) + name_size + value_size); + mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); + mod->name_size = htons (name_size); + mod->value_size = htonl (value_size); + mod->oper = htons (GNUNET_PSYC_OP_ASSIGN); + memcpy (&mod[1], name, name_size); + memcpy (((char *) &mod[1]) + name_size, value, value_size); + } + else /* Continuation */ + { + struct GNUNET_MessageHeader *mod; + res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size); + res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); + res->op_id = op->op_id; + + mod = (struct GNUNET_MessageHeader *) &res[1]; + mod->size = htons (sizeof (*mod) + value_size); + mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); + memcpy (&mod[1], value, value_size); + } + + // FIXME: client might have been disconnected + GNUNET_SERVER_notification_context_add (nc, op->client); + GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header, + GNUNET_NO); + return GNUNET_YES; +} + + +/** + * Received result of GNUNET_PSYCSTORE_state_get() + * or GNUNET_PSYCSTORE_state_get_prefix() + */ +static void +store_recv_state_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct Operation *op = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p state_get #%" PRIu64 ": " + "PSYCSTORE returned %" PRId64 " (%.*s)\n", + op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); + + // FIXME: client might have been disconnected + client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); + op_remove (op); } @@ -2109,7 +2540,27 @@ static void client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { + struct Channel * + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + GNUNET_assert (NULL != chn); + + const struct StateRequest * + req = (const struct StateRequest *) msg; + uint16_t name_size = ntohs (req->header.size) - sizeof (*req); + const char *name = (const char *) &req[1]; + if (0 == name_size || '\0' != name[name_size - 1]) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + + struct Operation *op = op_add (chn, client, req->op_id, 0); + GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name, + &store_recv_state_var, + &store_recv_state_result, op); + GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2120,7 +2571,27 @@ static void client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { + struct Channel * + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + GNUNET_assert (NULL != chn); + + const struct StateRequest * + req = (const struct StateRequest *) msg; + uint16_t name_size = ntohs (req->header.size) - sizeof (*req); + const char *name = (const char *) &req[1]; + if (0 == name_size || '\0' != name[name_size - 1]) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + + struct Operation *op = op_add (chn, client, req->op_id, 0); + GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name, + &store_recv_state_var, + &store_recv_state_result, op); + GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2140,8 +2611,8 @@ static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { { &client_recv_membership_store, NULL, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 }, - { &client_recv_story_request, NULL, - GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 }, + { &client_recv_history_replay, NULL, + GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 }, { &client_recv_state_get, NULL, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 }, @@ -2170,7 +2641,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); - recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); nc = GNUNET_SERVER_notification_context_create (server, 1); GNUNET_SERVER_add_handlers (server, server_handlers); GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);