X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fpsyc%2Fgnunet-service-psyc.c;h=2b436aa214008e1ac7dbb0a18f5b262c75c0efef;hb=29e6158507a0758192075ac6ece7ba8e75ddc49a;hp=dcb5031f175385b6f2ab89716e118f0548f9018c;hpb=8a0b8a4604526e5f832c4971f9c3b1b48d79bea4;p=oweals%2Fgnunet.git diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index dcb5031f1..2b436aa21 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c @@ -1,6 +1,6 @@ /* * This file is part of GNUnet - * (C) 2013 Christian Grothoff (and other contributing authors) + * Copyright (C) 2013 GNUnet e.V. * * GNUnet is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ * * You should have received a copy of the GNU General Public License * along with GNUnet; see the file COPYING. If not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. + * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. */ /** @@ -24,6 +24,8 @@ * @author Gabor X Toth */ +#include + #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_constants.h" @@ -32,6 +34,7 @@ #include "gnunet_multicast_service.h" #include "gnunet_psycstore_service.h" #include "gnunet_psyc_service.h" +#include "gnunet_psyc_util_lib.h" #include "psyc.h" @@ -56,10 +59,23 @@ static struct GNUNET_SERVER_NotificationContext *nc; static struct GNUNET_PSYCSTORE_Handle *store; /** - * All connected masters and slaves. - * Channel's pub_key_hash -> struct Channel + * All connected masters. + * Channel's pub_key_hash -> struct Master + */ +static struct GNUNET_CONTAINER_MultiHashMap *masters; + +/** + * All connected slaves. + * Channel's pub_key_hash -> struct Slave */ -static struct GNUNET_CONTAINER_MultiHashMap *clients; +static struct GNUNET_CONTAINER_MultiHashMap *slaves; + +/** + * Connected slaves per channel. + * Channel's pub_key_hash -> Slave's pub_key -> struct Slave + */ +static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves; + /** * Message in the transmission queue. @@ -69,25 +85,179 @@ struct TransmitMessage struct TransmitMessage *prev; struct TransmitMessage *next; - char *buf; + struct GNUNET_SERVER_Client *client; + + /** + * ID assigned to the message. + */ + uint64_t id; + + /** + * Size of message. + */ uint16_t size; + + /** + * Type of first message part. + */ + uint16_t first_ptype; + + /** + * Type of last message part. + */ + uint16_t last_ptype; + + /* Followed by message */ +}; + + +/** + * Cache for received message fragments. + * Message fragments are only sent to clients after all modifiers arrived. + * + * chan_key -> MultiHashMap chan_msgs + */ +static struct GNUNET_CONTAINER_MultiHashMap *recv_cache; + + +/** + * Entry in the chan_msgs hashmap of @a recv_cache: + * fragment_id -> RecvCacheEntry + */ +struct RecvCacheEntry +{ + struct GNUNET_MULTICAST_MessageHeader *mmsg; + uint16_t ref_count; +}; + + +/** + * Entry in the @a recv_frags hash map of a @a Channel. + * message_id -> FragmentQueue + */ +struct FragmentQueue +{ + /** + * Fragment IDs stored in @a recv_cache. + */ + struct GNUNET_CONTAINER_Heap *fragments; + + /** + * Total size of received fragments. + */ + uint64_t size; + + /** + * Total size of received header fragments (METHOD & MODIFIERs) + */ + uint64_t header_size; + /** - * enum MessageState + * The @a state_delta field from struct GNUNET_PSYC_MessageMethod. + */ + uint64_t state_delta; + + /** + * The @a flags field from struct GNUNET_PSYC_MessageMethod. + */ + uint32_t flags; + + /** + * Receive state of message. + * + * @see MessageFragmentState */ uint8_t state; + + /** + * Whether the state is already modified in PSYCstore. + */ + uint8_t state_is_modified; + + /** + * Is the message queued for delivery to the client? + * i.e. added to the recv_msgs queue + */ + uint8_t is_queued; }; + /** - * Common part of the client context for both a master and slave channel. + * List of connected clients. */ -struct Channel +struct Client +{ + struct Client *prev; + struct Client *next; + + struct GNUNET_SERVER_Client *client; +}; + + +struct Operation { + struct Operation *prev; + struct Operation *next; + struct GNUNET_SERVER_Client *client; + struct Channel *chn; + uint64_t op_id; + uint32_t flags; +}; + + +/** + * Common part of the client context for both a channel master and slave. + */ +struct Channel +{ + struct Client *clients_head; + struct Client *clients_tail; + + struct Operation *op_head; + struct Operation *op_tail; struct TransmitMessage *tmit_head; struct TransmitMessage *tmit_tail; - GNUNET_SCHEDULER_TaskIdentifier tmit_task; + /** + * Current PSYCstore operation. + */ + struct GNUNET_PSYCSTORE_OperationHandle *store_op; + + /** + * Received fragments not yet sent to the client. + * message_id -> FragmentQueue + */ + struct GNUNET_CONTAINER_MultiHashMap *recv_frags; + + /** + * Received message IDs not yet sent to the client. + */ + struct GNUNET_CONTAINER_Heap *recv_msgs; + + /** + * Public key of the channel. + */ + struct GNUNET_CRYPTO_EddsaPublicKey pub_key; + + /** + * Hash of @a pub_key. + */ + struct GNUNET_HashCode pub_key_hash; + + /** + * Last message ID sent to the client. + * 0 if there is no such message. + */ + uint64_t max_message_id; + + /** + * ID of the last stateful message, where the state operations has been + * processed and saved to PSYCstore and which has been sent to the client. + * 0 if there is no such message. + */ + uint64_t max_state_message_id; /** * Expected value size for the modifier being received from the PSYC service. @@ -100,38 +270,57 @@ struct Channel uint32_t tmit_mod_value_size; /** - * enum MessageState + * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)? */ - uint8_t tmit_state; - - uint8_t in_transmit; uint8_t is_master; /** - * Ready to receive messages from client. + * Is this channel ready to receive messages from client? + * #GNUNET_YES or #GNUNET_NO */ - uint8_t ready; + uint8_t is_ready; /** - * Client disconnected. + * Is the client disconnected? + * #GNUNET_YES or #GNUNET_NO */ - uint8_t disconnected; + uint8_t is_disconnected; }; + /** * Client context for a channel master. */ struct Master { - struct Channel channel; + /** + * Channel struct common for Master and Slave + */ + struct Channel chn; + + /** + * Private key of the channel. + */ struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; - struct GNUNET_CRYPTO_EddsaPublicKey pub_key; + /** + * Handle for the multicast origin. + */ struct GNUNET_MULTICAST_Origin *origin; - struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; /** - * Maximum message ID for this channel. + * Transmit handle for multicast. + */ + struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle; + + /** + * Incoming join requests from multicast. + * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle * + */ + struct GNUNET_CONTAINER_MultiHashMap *join_reqs; + + /** + * Last message ID transmitted to this channel. * * Incremented before sending a message, thus the message_id in messages sent * starts from 1. @@ -139,22 +328,20 @@ struct Master uint64_t max_message_id; /** - * ID of the last message that contains any state operations. + * ID of the last message with state operations transmitted to the channel. * 0 if there is no such message. */ uint64_t max_state_message_id; /** - * Maximum group generation for this channel. + * Maximum group generation transmitted to the channel. */ uint64_t max_group_generation; /** * @see enum GNUNET_PSYC_Policy */ - uint32_t policy; - - struct GNUNET_HashCode pub_key_hash; + enum GNUNET_PSYC_Policy policy; }; @@ -163,39 +350,99 @@ struct Master */ struct Slave { - struct Channel channel; - struct GNUNET_CRYPTO_EddsaPrivateKey slave_key; - struct GNUNET_CRYPTO_EddsaPublicKey chan_key; + /** + * Channel struct common for Master and Slave + */ + struct Channel chn; + + /** + * Private key of the slave. + */ + struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key; + + /** + * Public key of the slave. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey pub_key; + + /** + * Hash of @a pub_key. + */ + struct GNUNET_HashCode pub_key_hash; + /** + * Handle for the multicast member. + */ struct GNUNET_MULTICAST_Member *member; - struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle; + /** + * Transmit handle for multicast. + */ + struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle; + + /** + * Peer identity of the origin. + */ struct GNUNET_PeerIdentity origin; + /** + * Number of items in @a relays. + */ uint32_t relay_count; + + /** + * Relays that multicast can use to connect. + */ struct GNUNET_PeerIdentity *relays; - struct GNUNET_MessageHeader *join_req; + /** + * Join request to be transmitted to the master on join. + */ + struct GNUNET_PSYC_Message *join_msg; + + /** + * Join decision received from multicast. + */ + struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn; - uint64_t max_message_id; + /** + * Maximum request ID for this channel. + */ uint64_t max_request_id; - struct GNUNET_HashCode chan_key_hash; + /** + * Join flags. + */ + enum GNUNET_PSYC_SlaveJoinFlags join_flags; }; -static inline void -transmit_message (struct Channel *ch, uint8_t inc_msg_id); +static void +transmit_message (struct Channel *chn); + +static uint64_t +message_queue_run (struct Channel *chn); + +static uint64_t +message_queue_drop (struct Channel *chn); + + +static void +schedule_transmit_message (void *cls) +{ + struct Channel *chn = cls; + + transmit_message (chn); +} /** * Task run during shutdown. * * @param cls unused - * @param tc unused */ static void -shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_task (void *cls) { if (NULL != nc) { @@ -204,34 +451,112 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } if (NULL != stats) { - GNUNET_STATISTICS_destroy (stats, GNUNET_NO); + GNUNET_STATISTICS_destroy (stats, GNUNET_YES); stats = NULL; } } +static struct Operation * +op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client, + uint64_t op_id, uint32_t flags) +{ + struct Operation *op = GNUNET_malloc (sizeof (*op)); + op->client = client; + op->chn = chn; + op->op_id = op_id; + op->flags = flags; + GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op); + return op; +} + + +static void +op_remove (struct Operation *op) +{ + GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op); + GNUNET_free (op); +} + + +/** + * Clean up master data structures after a client disconnected. + */ +static void +cleanup_master (struct Master *mst) +{ + struct Channel *chn = &mst->chn; + + if (NULL != mst->origin) + GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME + GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); + GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst); +} + + +/** + * Clean up slave data structures after a client disconnected. + */ static void -client_cleanup (struct Channel *ch) +cleanup_slave (struct Slave *slv) { - if (ch->is_master) + struct Channel *chn = &slv->chn; + struct GNUNET_CONTAINER_MultiHashMap * + chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, + &chn->pub_key_hash); + GNUNET_assert (NULL != chn_slv); + GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv); + + if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv)) { - struct Master *mst = (struct Master *) ch; - if (NULL != mst->origin) - GNUNET_MULTICAST_origin_stop (mst->origin); - GNUNET_CONTAINER_multihashmap_remove (clients, &mst->pub_key_hash, mst); + GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash, + chn_slv); + GNUNET_CONTAINER_multihashmap_destroy (chn_slv); } - else + GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); + + if (NULL != slv->join_msg) + { + GNUNET_free (slv->join_msg); + slv->join_msg = NULL; + } + if (NULL != slv->relays) + { + GNUNET_free (slv->relays); + slv->relays = NULL; + } + if (NULL != slv->member) + { + GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME + slv->member = NULL; + } + GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); +} + + +/** + * Clean up channel data structures after a client disconnected. + */ +static void +cleanup_channel (struct Channel *chn) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Cleaning up channel %s. master? %u\n", + chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master); + message_queue_drop (chn); + GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags); + chn->recv_frags = NULL; + + if (NULL != chn->store_op) { - struct Slave *slv = (struct Slave *) ch; - if (NULL != slv->join_req) - GNUNET_free (slv->join_req); - if (NULL != slv->relays) - GNUNET_free (slv->relays); - if (NULL != slv->member) - GNUNET_MULTICAST_member_part (slv->member); + GNUNET_PSYCSTORE_operation_cancel (chn->store_op); + chn->store_op = NULL; } - GNUNET_free (ch); + (GNUNET_YES == chn->is_master) + ? cleanup_master ((struct Master *) chn) + : cleanup_slave ((struct Slave *) chn); + GNUNET_free (chn); } @@ -248,248 +573,995 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) if (NULL == client) return; - struct Channel *ch - = GNUNET_SERVER_client_get_user_context (client, struct Channel); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch); + struct Channel * + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); - if (NULL == ch) + if (NULL == chn) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "%p User context is NULL in client_disconnect()\n", ch); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p User context is NULL in client_disconnect()\n", chn); GNUNET_break (0); return; } - ch->disconnected = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Client (%s) disconnected from channel %s\n", + chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", + GNUNET_h2s (&chn->pub_key_hash)); - /* Send pending messages to multicast before cleanup. */ - if (NULL != ch->tmit_head) + struct Client *cli = chn->clients_head; + while (NULL != cli) { - transmit_message (ch, GNUNET_NO); + if (cli->client == client) + { + GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli); + GNUNET_free (cli); + break; + } + cli = cli->next; } - else + + struct Operation *op = chn->op_head; + while (NULL != op) { - client_cleanup (ch); + if (op->client == client) + { + op->client = NULL; + break; + } + op = op->next; + } + + if (NULL == chn->clients_head) + { /* Last client disconnected. */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Last client (%s) disconnected from channel %s\n", + chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", + GNUNET_h2s (&chn->pub_key_hash)); + chn->is_disconnected = GNUNET_YES; + if (NULL != chn->tmit_head) + { /* Send pending messages to multicast before cleanup. */ + transmit_message (chn); + } + else + { + cleanup_channel (chn); + } } } +/** + * Send message to all clients connected to the channel. + */ static void -join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, - const struct GNUNET_MessageHeader *join_req, - struct GNUNET_MULTICAST_JoinHandle *jh) +client_send_msg (const struct Channel *chn, + const struct GNUNET_MessageHeader *msg) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Sending message to clients.\n", chn); + struct Client *cli = chn->clients_head; + while (NULL != cli) + { + GNUNET_SERVER_notification_context_add (nc, cli->client); + GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO); + cli = cli->next; + } } +/** + * Send a result code back to the client. + * + * @param client + * Client that should receive the result code. + * @param result_code + * Code to transmit. + * @param op_id + * Operation ID in network byte order. + * @param data + * Data payload or NULL. + * @param data_size + * Size of @a data. + */ static void -membership_test_cb (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, - uint64_t message_id, uint64_t group_generation, - struct GNUNET_MULTICAST_MembershipTestHandle *mth) +client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id, + int64_t result_code, const void *data, uint16_t data_size) { + struct GNUNET_OperationResultMessage *res; -} - + res = GNUNET_malloc (sizeof (*res) + data_size); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); + res->header.size = htons (sizeof (*res) + data_size); + res->result_code = GNUNET_htonll (result_code); + res->op_id = op_id; + if (0 < data_size) + memcpy (&res[1], data, data_size); -static void -replay_fragment_cb (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, - uint64_t fragment_id, uint64_t flags, - struct GNUNET_MULTICAST_ReplayHandle *rh) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Sending result to client for operation #%" PRIu64 ": " + "%" PRId64 " (size: %u)\n", + client, GNUNET_ntohll (op_id), result_code, data_size); -{ + GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_unicast (nc, client, &res->header, + GNUNET_NO); + GNUNET_free (res); } -static void -replay_message_cb (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, - uint64_t message_id, - uint64_t fragment_offset, - uint64_t flags, - struct GNUNET_MULTICAST_ReplayHandle *rh) +/** + * Closure for join_mem_test_cb() + */ +struct JoinMemTestClosure { - -} + struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key; + struct Channel *chn; + struct GNUNET_MULTICAST_JoinHandle *jh; + struct GNUNET_PSYC_JoinRequestMessage *join_msg; +}; +/** + * Membership test result callback used for join requests. + */ static void -fragment_store_result (void *cls, int64_t result, const char *err_msg) +join_mem_test_cb (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "fragment_store() returned %l (%s)\n", result, err_msg); + struct JoinMemTestClosure *jcls = cls; + + if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master) + { /* Pass on join request to client if this is a master channel */ + struct Master *mst = (struct Master *) jcls->chn; + struct GNUNET_HashCode slave_pub_hash; + GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key), + &slave_pub_hash); + GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->jh, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + client_send_msg (jcls->chn, &jcls->join_msg->header); + } + else + { + if (GNUNET_SYSERR == result) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Could not perform membership test (%.*s)\n", + err_msg_size, err_msg); + } + // FIXME: add relays + GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL); + } + GNUNET_free (jcls->join_msg); + GNUNET_free (jcls); } /** - * Incoming message fragment from multicast. - * - * Store it using PSYCstore and send it to the client of the channel. + * Incoming join request from multicast. */ static void -message_cb (struct Channel *ch, - const struct GNUNET_CRYPTO_EddsaPublicKey *chan_key, - const struct GNUNET_HashCode *chan_key_hash, - const struct GNUNET_MessageHeader *msg) +mcast_recv_join_request (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key, + const struct GNUNET_MessageHeader *join_msg, + struct GNUNET_MULTICAST_JoinHandle *jh) { - uint16_t type = ntohs (msg->type); - uint16_t size = ntohs (msg->size); + struct Channel *chn = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received message of type %u and size %u from multicast.\n", - ch, type, size); - - switch (type) + uint16_t join_msg_size = 0; + if (NULL != join_msg) { - case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: - { - GNUNET_PSYCSTORE_fragment_store (store, chan_key, - (const struct - GNUNET_MULTICAST_MessageHeader *) msg, - 0, NULL, NULL); - -#if TODO - /* FIXME: apply modifiers to state in PSYCstore */ - GNUNET_PSYCSTORE_state_modify (store, chan_key, - GNUNET_ntohll (mmsg->message_id), - meth->mod_count, mods, - rcb, rcb_cls); -#endif - - const struct GNUNET_MULTICAST_MessageHeader *mmsg - = (const struct GNUNET_MULTICAST_MessageHeader *) msg; - - if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg), - (const char *) &mmsg[1])) + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type)) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Received message with invalid parts from multicast. " - "Dropping message.\n", ch); - GNUNET_break_op (0); - break; + join_msg_size = ntohs (join_msg->size); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%p Got join message with invalid type %u.\n", + chn, ntohs (join_msg->type)); } + } - struct GNUNET_PSYC_MessageHeader *pmsg; - uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); - pmsg = GNUNET_malloc (psize); - pmsg->header.size = htons (psize); - pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); - pmsg->message_id = mmsg->message_id; + struct GNUNET_PSYC_JoinRequestMessage * + req = GNUNET_malloc (sizeof (*req) + join_msg_size); + req->header.size = htons (sizeof (*req) + join_msg_size); + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST); + req->slave_pub_key = *slave_pub_key; + if (0 < join_msg_size) + memcpy (&req[1], join_msg, join_msg_size); + + struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls)); + jcls->slave_pub_key = *slave_pub_key; + jcls->chn = chn; + jcls->jh = jh; + jcls->join_msg = req; + + GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key, + chn->max_message_id, 0, + &join_mem_test_cb, jcls); +} - memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); - GNUNET_SERVER_notification_context_add (nc, ch->client); - GNUNET_SERVER_notification_context_unicast (nc, ch->client, - (const struct GNUNET_MessageHeader *) pmsg, - GNUNET_NO); - GNUNET_free (pmsg); - break; +/** + * Join decision received from multicast. + */ +static void +mcast_recv_join_decision (void *cls, int is_admitted, + const struct GNUNET_PeerIdentity *peer, + uint16_t relay_count, + const struct GNUNET_PeerIdentity *relays, + const struct GNUNET_MessageHeader *join_resp) +{ + struct Slave *slv = cls; + struct Channel *chn = &slv->chn; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Got join decision: %d\n", slv, is_admitted); + if (GNUNET_YES == chn->is_ready) + { + /* Already admitted */ + return; } - default: - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Dropping unknown message of type %u and size %u.\n", - ch, type, size); + + uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; + struct GNUNET_PSYC_JoinDecisionMessage * + dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size); + dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size); + dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); + dcsn->is_admitted = htonl (is_admitted); + if (0 < join_resp_size) + memcpy (&dcsn[1], join_resp, join_resp_size); + + client_send_msg (chn, &dcsn->header); + + if (GNUNET_YES == is_admitted + && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)) + { + chn->is_ready = GNUNET_YES; } } +static int +store_recv_fragment_replay (void *cls, + struct GNUNET_MULTICAST_MessageHeader *msg, + enum GNUNET_PSYCSTORE_MessageFlags flags) +{ + struct GNUNET_MULTICAST_ReplayHandle *rh = cls; + + GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK); + return GNUNET_YES; +} + + /** - * Incoming message fragment from multicast for a master. + * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. */ static void -master_message_cb (void *cls, const struct GNUNET_MessageHeader *msg) +store_recv_fragment_replay_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { - struct Master *mst = cls; - GNUNET_assert (NULL != mst); + struct GNUNET_MULTICAST_ReplayHandle *rh = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n", + rh, result, err_msg_size, err_msg); + + switch (result) + { + case GNUNET_YES: + break; + + case GNUNET_NO: + GNUNET_MULTICAST_replay_response (rh, NULL, + GNUNET_MULTICAST_REC_NOT_FOUND); + break; - struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &mst->pub_key; - struct GNUNET_HashCode *chan_key_hash = &mst->pub_key_hash; + case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED: + GNUNET_MULTICAST_replay_response (rh, NULL, + GNUNET_MULTICAST_REC_ACCESS_DENIED); + break; - message_cb (&mst->channel, chan_key, chan_key_hash, msg); + case GNUNET_SYSERR: + GNUNET_MULTICAST_replay_response (rh, NULL, + GNUNET_MULTICAST_REC_INTERNAL_ERROR); + break; + } + GNUNET_MULTICAST_replay_response_end (rh); } /** - * Incoming message fragment from multicast for a slave. + * Incoming fragment replay request from multicast. */ static void -slave_message_cb (void *cls, const struct GNUNET_MessageHeader *msg) +mcast_recv_replay_fragment (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key, + uint64_t fragment_id, uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) + { - struct Slave *slv = cls; - GNUNET_assert (NULL != slv); + struct Channel *chn = cls; + GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key, + fragment_id, fragment_id, + &store_recv_fragment_replay, + &store_recv_fragment_replay_result, rh); +} - struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &slv->chan_key; - struct GNUNET_HashCode *chan_key_hash = &slv->chan_key_hash; - message_cb (&slv->channel, chan_key, chan_key_hash, msg); +/** + * Incoming message replay request from multicast. + */ +static void +mcast_recv_replay_message (void *cls, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) +{ + struct Channel *chn = cls; + GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key, + message_id, message_id, 1, NULL, + &store_recv_fragment_replay, + &store_recv_fragment_replay_result, rh); } /** - * Incoming request fragment from multicast for a master. - * - * @param cls Master. - * @param member_key Sending member's public key. - * @param msg The message. - * @param flags Request flags. + * Convert an uint64_t in network byte order to a HashCode + * that can be used as key in a MultiHashMap + */ +static inline void +hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n) +{ + /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */ + /* TODO: use built-in byte swap functions if available */ + + n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL); + n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL); + + *key = (struct GNUNET_HashCode) {}; + *((uint64_t *) key) + = (n << 32) | (n >> 32); +} + + +/** + * Convert an uint64_t in host byte order to a HashCode + * that can be used as key in a MultiHashMap + */ +static inline void +hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) +{ +#if __BYTE_ORDER == __BIG_ENDIAN + hash_key_from_nll (key, n); +#elif __BYTE_ORDER == __LITTLE_ENDIAN + *key = (struct GNUNET_HashCode) {}; + *((uint64_t *) key) = n; +#else + #error byteorder undefined +#endif +} + + +/** + * Initialize PSYC message header. + */ +static inline void +psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg, + const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) +{ + uint16_t size = ntohs (mmsg->header.size); + uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); + + pmsg->header.size = htons (psize); + pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + pmsg->message_id = mmsg->message_id; + pmsg->fragment_offset = mmsg->fragment_offset; + pmsg->flags = htonl (flags); + + memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); +} + + +/** + * Create a new PSYC message from a multicast message for sending it to clients. + */ +static inline struct GNUNET_PSYC_MessageHeader * +psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) +{ + struct GNUNET_PSYC_MessageHeader *pmsg; + uint16_t size = ntohs (mmsg->header.size); + uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); + + pmsg = GNUNET_malloc (psize); + psyc_msg_init (pmsg, mmsg, flags); + return pmsg; +} + + +/** + * Send multicast message to all clients connected to the channel. */ static void -request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, - const struct GNUNET_MessageHeader *msg, - enum GNUNET_MULTICAST_MessageFlags flags) +client_send_mcast_msg (struct Channel *chn, + const struct GNUNET_MULTICAST_MessageHeader *mmsg, + uint32_t flags) { - struct Master *mst = cls; - struct Channel *ch = &mst->channel; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Sending multicast message to client. " + "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", + chn, GNUNET_ntohll (mmsg->fragment_id), + GNUNET_ntohll (mmsg->message_id)); + + struct GNUNET_PSYC_MessageHeader * + pmsg = GNUNET_PSYC_message_header_create (mmsg, flags); + client_send_msg (chn, &pmsg->header); + GNUNET_free (pmsg); +} - uint16_t type = ntohs (msg->type); - uint16_t size = ntohs (msg->size); + +/** + * Send multicast request to all clients connected to the channel. + */ +static void +client_send_mcast_req (struct Master *mst, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + struct Channel *chn = &mst->chn; + + struct GNUNET_PSYC_MessageHeader *pmsg; + uint16_t size = ntohs (req->header.size); + uint16_t psize = sizeof (*pmsg) + size - sizeof (*req); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received request of type %u and size %u from multicast.\n", - ch, type, size); + "%p Sending multicast request to client. " + "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", + chn, GNUNET_ntohll (req->fragment_id), + GNUNET_ntohll (req->request_id)); + + pmsg = GNUNET_malloc (psize); + pmsg->header.size = htons (psize); + pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + pmsg->message_id = req->request_id; + pmsg->fragment_offset = req->fragment_offset; + pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); + pmsg->slave_pub_key = req->member_pub_key; + memcpy (&pmsg[1], &req[1], size - sizeof (*req)); + + client_send_msg (chn, &pmsg->header); + + /* FIXME: save req to PSYCstore so that it can be resent later to clients */ + + GNUNET_free (pmsg); +} + + +/** + * Insert a multicast message fragment into the queue belonging to the message. + * + * @param chn Channel. + * @param mmsg Multicast message fragment. + * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. + * @param first_ptype First PSYC message part type in @a mmsg. + * @param last_ptype Last PSYC message part type in @a mmsg. + */ +static void +fragment_queue_insert (struct Channel *chn, + const struct GNUNET_MULTICAST_MessageHeader *mmsg, + uint16_t first_ptype, uint16_t last_ptype) +{ + const uint16_t size = ntohs (mmsg->header.size); + const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset); + struct GNUNET_CONTAINER_MultiHashMap + *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, + &chn->pub_key_hash); + + struct GNUNET_HashCode msg_id_hash; + hash_key_from_nll (&msg_id_hash, mmsg->message_id); + + struct FragmentQueue + *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); + + if (NULL == fragq) + { + fragq = GNUNET_new (struct FragmentQueue); + fragq->state = MSG_FRAG_STATE_HEADER; + fragq->fragments + = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + + GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + + if (NULL == chan_msgs) + { + chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + } - switch (type) + struct GNUNET_HashCode frag_id_hash; + hash_key_from_nll (&frag_id_hash, mmsg->fragment_id); + struct RecvCacheEntry + *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash); + if (NULL == cache_entry) { - case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Adding message fragment to cache. " + "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n", + chn, GNUNET_ntohll (mmsg->message_id), + GNUNET_ntohll (mmsg->fragment_id)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p header_size: %" PRIu64 " + %u\n", + chn, fragq->header_size, size); + cache_entry = GNUNET_new (struct RecvCacheEntry); + cache_entry->ref_count = 1; + cache_entry->mmsg = GNUNET_malloc (size); + memcpy (cache_entry->mmsg, mmsg, size); + GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + else { - const struct GNUNET_MULTICAST_RequestHeader *req - = (const struct GNUNET_MULTICAST_RequestHeader *) msg; + cache_entry->ref_count++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Message fragment is already in cache. " + "message_id: %" PRIu64 ", fragment_id: %" PRIu64 + ", ref_count: %u\n", + chn, GNUNET_ntohll (mmsg->message_id), + GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count); + } - if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*req), - (const char *) &req[1])) + if (MSG_FRAG_STATE_HEADER == fragq->state) + { + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Dropping message with invalid parts " - "received from multicast.\n", ch); - GNUNET_break_op (0); - break; + struct GNUNET_PSYC_MessageMethod * + pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1]; + fragq->state_delta = GNUNET_ntohll (pmeth->state_delta); + fragq->flags = ntohl (pmeth->flags); } - struct GNUNET_PSYC_MessageHeader *pmsg; - uint16_t psize = sizeof (*pmsg) + size - sizeof (*req); - pmsg = GNUNET_malloc (psize); - pmsg->header.size = htons (psize); - pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); - pmsg->message_id = req->request_id; - pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); + if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA) + { + fragq->header_size += size; + } + else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype + || frag_offset == fragq->header_size) + { /* header is now complete */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Header of message %" PRIu64 " is complete.\n", + chn, GNUNET_ntohll (mmsg->message_id)); - memcpy (&pmsg[1], &req[1], size - sizeof (*req)); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Adding message %" PRIu64 " to queue.\n", + chn, GNUNET_ntohll (mmsg->message_id)); + fragq->state = MSG_FRAG_STATE_DATA; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Header of message %" PRIu64 " is NOT complete yet: " + "%" PRIu64 " != %" PRIu64 "\n", + chn, GNUNET_ntohll (mmsg->message_id), + frag_offset, fragq->header_size); + } + } - GNUNET_SERVER_notification_context_add (nc, ch->client); - GNUNET_SERVER_notification_context_unicast (nc, ch->client, - (const struct GNUNET_MessageHeader *) pmsg, - GNUNET_NO); - GNUNET_free (pmsg); + switch (last_ptype) + { + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: + if (frag_offset == fragq->size) + fragq->state = MSG_FRAG_STATE_END; + else + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Message %" PRIu64 " is NOT complete yet: " + "%" PRIu64 " != %" PRIu64 "\n", + chn, GNUNET_ntohll (mmsg->message_id), + frag_offset, fragq->size); break; + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: + /* Drop message without delivering to client if it's a single fragment */ + fragq->state = + (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) + ? MSG_FRAG_STATE_DROP + : MSG_FRAG_STATE_CANCEL; + } + + switch (fragq->state) + { + case MSG_FRAG_STATE_DATA: + case MSG_FRAG_STATE_END: + case MSG_FRAG_STATE_CANCEL: + if (GNUNET_NO == fragq->is_queued) + { + GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL, + GNUNET_ntohll (mmsg->message_id)); + fragq->is_queued = GNUNET_YES; + } + } + + fragq->size += size; + GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL, + GNUNET_ntohll (mmsg->fragment_id)); +} + + +/** + * Run fragment queue of a message. + * + * Send fragments of a message in order to client, after all modifiers arrived + * from multicast. + * + * @param chn + * Channel. + * @param msg_id + * ID of the message @a fragq belongs to. + * @param fragq + * Fragment queue of the message. + * @param drop + * Drop message without delivering to client? + * #GNUNET_YES or #GNUNET_NO. + */ +static void +fragment_queue_run (struct Channel *chn, uint64_t msg_id, + struct FragmentQueue *fragq, uint8_t drop) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Running message fragment queue for message %" PRIu64 + " (state: %u).\n", + chn, msg_id, fragq->state); + + struct GNUNET_CONTAINER_MultiHashMap + *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, + &chn->pub_key_hash); + GNUNET_assert (NULL != chan_msgs); + uint64_t frag_id; + + while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL, + &frag_id)) + { + struct GNUNET_HashCode frag_id_hash; + hash_key_from_hll (&frag_id_hash, frag_id); + struct RecvCacheEntry *cache_entry + = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash); + if (cache_entry != NULL) + { + if (GNUNET_NO == drop) + { + client_send_mcast_msg (chn, cache_entry->mmsg, 0); + } + if (cache_entry->ref_count <= 1) + { + GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash, + cache_entry); + GNUNET_free (cache_entry->mmsg); + GNUNET_free (cache_entry); + } + else + { + cache_entry->ref_count--; + } + } +#if CACHE_AGING_IMPLEMENTED + else if (GNUNET_NO == drop) + { + /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */ + } +#endif + + GNUNET_CONTAINER_heap_remove_root (fragq->fragments); + } + + if (MSG_FRAG_STATE_END <= fragq->state) + { + struct GNUNET_HashCode msg_id_hash; + hash_key_from_hll (&msg_id_hash, msg_id); + + GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq); + GNUNET_CONTAINER_heap_destroy (fragq->fragments); + GNUNET_free (fragq); + } + else + { + fragq->is_queued = GNUNET_NO; } +} + + +struct StateModifyClosure +{ + struct Channel *chn; + uint64_t msg_id; + struct GNUNET_HashCode msg_id_hash; +}; + + +void +store_recv_state_modify_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct StateModifyClosure *mcls = cls; + struct Channel *chn = mcls->chn; + uint64_t msg_id = mcls->msg_id; + + struct FragmentQueue * + fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n", + chn, result, err_msg_size, err_msg); + + switch (result) + { + case GNUNET_OK: + case GNUNET_NO: + if (NULL != fragq) + fragq->state_is_modified = GNUNET_YES; + if (chn->max_state_message_id < msg_id) + chn->max_state_message_id = msg_id; + if (chn->max_message_id < msg_id) + chn->max_message_id = msg_id; + + if (NULL != fragq) + fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); + GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); + message_queue_run (chn); + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n", + chn, result, err_msg_size, err_msg); + /** @todo FIXME: handle state_modify error */ + } +} + + +/** + * Run message queue. + * + * Send messages in queue to client in order after a message has arrived from + * multicast, according to the following: + * - A message is only sent if all of its modifiers arrived. + * - A stateful message is only sent if the previous stateful message + * has already been delivered to the client. + * + * @param chn Channel. + * + * @return Number of messages removed from queue and sent to client. + */ +static uint64_t +message_queue_run (struct Channel *chn) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Running message queue.\n", chn); + uint64_t n = 0; + uint64_t msg_id; + + while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, + &msg_id)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id); + struct GNUNET_HashCode msg_id_hash; + hash_key_from_hll (&msg_id_hash, msg_id); + + struct FragmentQueue * + fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); + + if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p No fragq (%p) or header not complete.\n", + chn, fragq); + break; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Dropping unknown request of type %u and size %u.\n", - ch, type, size); + "%p Fragment queue entry: state: %u, state delta: " + "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n", + chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id); + + if (MSG_FRAG_STATE_DATA <= fragq->state) + { + /* Check if there's a missing message before the current one */ + if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn); + + if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) + && (chn->max_message_id != msg_id - 1 + && chn->max_message_id != msg_id)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Out of order message. " + "(%" PRIu64 " != %" PRIu64 " - 1)\n", + chn, chn->max_message_id, msg_id); + break; + // FIXME: keep track of messages processed in this queue run, + // and only stop after reaching the end + } + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn); + if (GNUNET_YES != fragq->state_is_modified) + { + if (msg_id - fragq->state_delta != chn->max_state_message_id) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Out of order stateful message. " + "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", + chn, msg_id, fragq->state_delta, chn->max_state_message_id); + break; + // FIXME: keep track of messages processed in this queue run, + // and only stop after reaching the end + } + + struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls)); + mcls->chn = chn; + mcls->msg_id = msg_id; + mcls->msg_id_hash = msg_id_hash; + + /* Apply modifiers to state in PSYCstore */ + GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id, + fragq->state_delta, + store_recv_state_modify_result, mcls); + break; // continue after asynchronous state modify result + } + } + chn->max_message_id = msg_id; + } + fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); + GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); + n++; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Removed %" PRIu64 " messages from queue.\n", chn, n); + return n; +} + + +/** + * Drop message queue of a channel. + * + * Remove all messages in queue without sending it to clients. + * + * @param chn Channel. + * + * @return Number of messages removed from queue. + */ +static uint64_t +message_queue_drop (struct Channel *chn) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Dropping message queue.\n", chn); + uint64_t n = 0; + uint64_t msg_id; + while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, + &msg_id)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id); + struct GNUNET_HashCode msg_id_hash; + hash_key_from_hll (&msg_id_hash, msg_id); + + struct FragmentQueue * + fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); + GNUNET_assert (NULL != fragq); + fragment_queue_run (chn, msg_id, fragq, GNUNET_YES); + GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); + n++; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Removed %" PRIu64 " messages from queue.\n", chn, n); + return n; +} + + +/** + * Received result of GNUNET_PSYCSTORE_fragment_store(). + */ +static void +store_recv_fragment_store_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct Channel *chn = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n", + chn, result, err_msg_size, err_msg); +} + + +/** + * Handle incoming message fragment from multicast. + * + * Store it using PSYCstore and send it to the clients of the channel in order. + */ +static void +mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg) +{ + struct Channel *chn = cls; + uint16_t size = ntohs (mmsg->header.size); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Received multicast message of size %u. " + "fragment_id=%" PRIu64 ", message_id=%" PRIu64 + ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n", + chn, size, + GNUNET_ntohll (mmsg->fragment_id), + GNUNET_ntohll (mmsg->message_id), + GNUNET_ntohll (mmsg->fragment_offset), + GNUNET_ntohll (mmsg->flags)); + + GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0, + &store_recv_fragment_store_result, chn); + + uint16_t first_ptype = 0, last_ptype = 0; + int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg), + (const char *) &mmsg[1], + &first_ptype, &last_ptype); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Message check result %d, first part type %u, last part type %u\n", + chn, check, first_ptype, last_ptype); + if (GNUNET_SYSERR == check) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Dropping incoming multicast message with invalid parts.\n", + chn); GNUNET_break_op (0); + return; + } + + fragment_queue_insert (chn, mmsg, first_ptype, last_ptype); + message_queue_run (chn); +} + + +/** + * Incoming request fragment from multicast for a master. + * + * @param cls Master. + * @param req The request. + */ +static void +mcast_recv_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + struct Master *mst = cls; + uint16_t size = ntohs (req->header.size); + + char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Received multicast request of size %u from %s.\n", + mst, size, str); + GNUNET_free (str); + + uint16_t first_ptype = 0, last_ptype = 0; + if (GNUNET_SYSERR + == GNUNET_PSYC_receive_check_parts (size - sizeof (*req), + (const char *) &req[1], + &first_ptype, &last_ptype)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Dropping incoming multicast request with invalid parts.\n", + mst); + GNUNET_break_op (0); + return; } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Message parts: first: type %u, last: type %u\n", + first_ptype, last_ptype); + + /* FIXME: in-order delivery */ + client_send_mcast_req (mst, req); } @@ -497,35 +1569,44 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, * Response from PSYCstore with the current counter values for a channel master. */ static void -master_counters_cb (void *cls, int result, uint64_t max_fragment_id, - uint64_t max_message_id, uint64_t max_group_generation, - uint64_t max_state_message_id) +store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id, + uint64_t max_message_id, uint64_t max_group_generation, + uint64_t max_state_message_id) { struct Master *mst = cls; - struct Channel *ch = &mst->channel; - struct CountersResult *res = GNUNET_malloc (sizeof (*res)); - res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); - res->header.size = htons (sizeof (*res)); - res->result_code = htonl (result); - res->max_message_id = GNUNET_htonll (max_message_id); + struct Channel *chn = &mst->chn; + chn->store_op = NULL; + + struct GNUNET_PSYC_CountersResultMessage res; + res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); + res.header.size = htons (sizeof (res)); + res.result_code = htonl (result); + res.max_message_id = GNUNET_htonll (max_message_id); if (GNUNET_OK == result || GNUNET_NO == result) { mst->max_message_id = max_message_id; - mst->max_state_message_id = max_state_message_id; + chn->max_message_id = max_message_id; + chn->max_state_message_id = max_state_message_id; mst->max_group_generation = max_group_generation; mst->origin - = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, - max_fragment_id + 1, - join_cb, membership_test_cb, - replay_fragment_cb, replay_message_cb, - request_cb, master_message_cb, ch); - ch->ready = GNUNET_YES; - } - GNUNET_SERVER_notification_context_add (nc, ch->client); - GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, - GNUNET_NO); - GNUNET_free (res); + = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id, + mcast_recv_join_request, + mcast_recv_replay_fragment, + mcast_recv_replay_message, + mcast_recv_request, + mcast_recv_message, chn); + chn->is_ready = GNUNET_YES; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p GNUNET_PSYCSTORE_counters_get() " + "returned %d for channel %s.\n", + chn, result, GNUNET_h2s (&chn->pub_key_hash)); + } + + client_send_msg (chn, &res.header); } @@ -533,106 +1614,336 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id, * Response from PSYCstore with the current counter values for a channel slave. */ void -slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, - uint64_t max_message_id, uint64_t max_group_generation, - uint64_t max_state_message_id) +store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, + uint64_t max_message_id, uint64_t max_group_generation, + uint64_t max_state_message_id) { struct Slave *slv = cls; - struct Channel *ch = &slv->channel; - struct CountersResult *res = GNUNET_malloc (sizeof (*res)); - res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); - res->header.size = htons (sizeof (*res)); - res->result_code = htonl (result); - res->max_message_id = GNUNET_htonll (max_message_id); + struct Channel *chn = &slv->chn; + chn->store_op = NULL; + + struct GNUNET_PSYC_CountersResultMessage res; + res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); + res.header.size = htons (sizeof (res)); + res.result_code = htonl (result); + res.max_message_id = GNUNET_htonll (max_message_id); if (GNUNET_OK == result || GNUNET_NO == result) { - slv->max_message_id = max_message_id; - slv->member - = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key, - &slv->origin, - slv->relay_count, slv->relays, - slv->join_req, join_cb, - membership_test_cb, - replay_fragment_cb, replay_message_cb, - slave_message_cb, ch); - ch->ready = GNUNET_YES; + chn->max_message_id = max_message_id; + chn->max_state_message_id = max_state_message_id; + slv->member + = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, + &slv->origin, + slv->relay_count, slv->relays, + &slv->join_msg->header, + mcast_recv_join_request, + mcast_recv_join_decision, + mcast_recv_replay_fragment, + mcast_recv_replay_message, + mcast_recv_message, chn); + if (NULL != slv->join_msg) + { + GNUNET_free (slv->join_msg); + slv->join_msg = NULL; + } + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p GNUNET_PSYCSTORE_counters_get() " + "returned %d for channel %s.\n", + chn, result, GNUNET_h2s (&chn->pub_key_hash)); + } + + client_send_msg (chn, &res.header); +} + + +static void +channel_init (struct Channel *chn) +{ + chn->recv_msgs + = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); +} + + +/** + * Handle a connecting client starting a channel master. + */ +static void +client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + const struct MasterStartRequest *req + = (const struct MasterStartRequest *) msg; + + struct GNUNET_CRYPTO_EddsaPublicKey pub_key; + struct GNUNET_HashCode pub_key_hash; + + GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key); + GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash); + + struct Master * + mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash); + struct Channel *chn; + + if (NULL == mst) + { + mst = GNUNET_new (struct Master); + mst->policy = ntohl (req->policy); + mst->priv_key = req->channel_key; + mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + + chn = &mst->chn; + chn->is_master = GNUNET_YES; + chn->pub_key = pub_key; + chn->pub_key_hash = pub_key_hash; + channel_init (chn); + + GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key, + store_recv_master_counters, mst); + } + else + { + chn = &mst->chn; + + struct GNUNET_PSYC_CountersResultMessage res; + res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); + res.header.size = htons (sizeof (res)); + res.result_code = htonl (GNUNET_OK); + res.max_message_id = GNUNET_htonll (mst->max_message_id); + + GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_unicast (nc, client, &res.header, + GNUNET_NO); + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Client connected as master to channel %s.\n", + mst, GNUNET_h2s (&chn->pub_key_hash)); + + struct Client *cli = GNUNET_new (struct Client); + cli->client = client; + GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); + + GNUNET_SERVER_client_set_user_context (client, chn); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +/** + * Handle a connecting client joining as a channel slave. + */ +static void +client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + const struct SlaveJoinRequest *req + = (const struct SlaveJoinRequest *) msg; + uint16_t req_size = ntohs (req->header.size); + + struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key; + struct GNUNET_HashCode pub_key_hash, slv_pub_hash; + + GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key); + GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash); + GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash); + + struct GNUNET_CONTAINER_MultiHashMap * + chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash); + struct Slave *slv = NULL; + struct Channel *chn; + + if (NULL != chn_slv) + { + slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash); + } + if (NULL == slv) + { + slv = GNUNET_new (struct Slave); + slv->priv_key = req->slave_key; + slv->pub_key = slv_pub_key; + slv->pub_key_hash = slv_pub_hash; + slv->origin = req->origin; + slv->relay_count = ntohl (req->relay_count); + slv->join_flags = ntohl (req->flags); + + const struct GNUNET_PeerIdentity * + relays = (const struct GNUNET_PeerIdentity *) &req[1]; + uint16_t relay_size = slv->relay_count * sizeof (*relays); + uint16_t join_msg_size = 0; + + if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader) + <= req_size) + { + struct GNUNET_PSYC_Message * + join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size); + join_msg_size = ntohs (join_msg->header.size); + slv->join_msg = GNUNET_malloc (join_msg_size); + memcpy (slv->join_msg, join_msg, join_msg_size); + } + if (sizeof (*req) + relay_size + join_msg_size != req_size) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%u + %u + %u != %u\n", + sizeof (*req), relay_size, join_msg_size, req_size); + GNUNET_break (0); + GNUNET_SERVER_client_disconnect (client); + GNUNET_free (slv); + return; + } + if (0 < slv->relay_count) + { + slv->relays = GNUNET_malloc (relay_size); + memcpy (slv->relays, &req[1], relay_size); + } + + chn = &slv->chn; + chn->is_master = GNUNET_NO; + chn->pub_key = req->channel_pub_key; + chn->pub_key_hash = pub_key_hash; + channel_init (chn); + + if (NULL == chn_slv) + { + chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key, + &store_recv_slave_counters, slv); + } + else + { + chn = &slv->chn; + + struct GNUNET_PSYC_CountersResultMessage res; + res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); + res.header.size = htons (sizeof (res)); + res.result_code = htonl (GNUNET_OK); + res.max_message_id = GNUNET_htonll (chn->max_message_id); + + GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_unicast (nc, client, &res.header, + GNUNET_NO); + + if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags) + { + mcast_recv_join_decision (slv, GNUNET_YES, + NULL, 0, NULL, NULL); + } + else if (NULL == slv->member) + { + slv->member + = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, + &slv->origin, + slv->relay_count, slv->relays, + &slv->join_msg->header, + &mcast_recv_join_request, + &mcast_recv_join_decision, + &mcast_recv_replay_fragment, + &mcast_recv_replay_message, + &mcast_recv_message, chn); + if (NULL != slv->join_msg) + { + GNUNET_free (slv->join_msg); + slv->join_msg = NULL; + } + } + else if (NULL != slv->join_dcsn) + { + GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_unicast (nc, client, + &slv->join_dcsn->header, + GNUNET_NO); + } } - GNUNET_SERVER_notification_context_add (nc, ch->client); - GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, - GNUNET_NO); - GNUNET_free (res); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Client connected as slave to channel %s.\n", + slv, GNUNET_h2s (&chn->pub_key_hash)); + + struct Client *cli = GNUNET_new (struct Client); + cli->client = client; + GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); + + GNUNET_SERVER_client_set_user_context (client, chn); + GNUNET_SERVER_receive_done (client, GNUNET_OK); } -/** - * Handle a connecting client starting a channel master. - */ -static void -handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +struct JoinDecisionClosure { - const struct MasterStartRequest *req - = (const struct MasterStartRequest *) msg; - struct Master *mst = GNUNET_new (struct Master); - mst->channel.client = client; - mst->channel.is_master = GNUNET_YES; - mst->policy = ntohl (req->policy); - mst->priv_key = req->channel_key; - GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &mst->pub_key); - GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Master connected to channel %s.\n", - mst, GNUNET_h2s (&mst->pub_key_hash)); + int32_t is_admitted; + struct GNUNET_MessageHeader *msg; +}; - GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key, - master_counters_cb, mst); - GNUNET_SERVER_client_set_user_context (client, &mst->channel); - GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - GNUNET_SERVER_receive_done (client, GNUNET_OK); +/** + * Iterator callback for sending join decisions to multicast. + */ +static int +mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash, + void *value) +{ + struct JoinDecisionClosure *jcls = cls; + struct GNUNET_MULTICAST_JoinHandle *jh = value; + // FIXME: add relays + GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg); + return GNUNET_YES; } /** - * Handle a connecting client joining as a channel slave. + * Join decision from client. */ static void -handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) { - const struct SlaveJoinRequest *req - = (const struct SlaveJoinRequest *) msg; - struct Slave *slv = GNUNET_new (struct Slave); - slv->channel.client = client; - slv->channel.is_master = GNUNET_NO; - slv->slave_key = req->slave_key; - slv->chan_key = req->channel_key; - GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key), - &slv->chan_key_hash); - slv->origin = req->origin; - slv->relay_count = ntohl (req->relay_count); - if (0 < slv->relay_count) - { - const struct GNUNET_PeerIdentity *relays - = (const struct GNUNET_PeerIdentity *) &req[1]; - slv->relays - = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity)); - uint32_t i; - for (i = 0; i < slv->relay_count; i++) - memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); + const struct GNUNET_PSYC_JoinDecisionMessage *dcsn + = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg; + struct Channel *chn; + struct Master *mst; + struct JoinDecisionClosure jcls; + + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + if (NULL == chn) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; } + GNUNET_assert (GNUNET_YES == chn->is_master); + mst = (struct Master *) chn; + jcls.is_admitted = ntohl (dcsn->is_admitted); + jcls.msg + = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size)) + ? (struct GNUNET_MessageHeader *) &dcsn[1] + : NULL; + + struct GNUNET_HashCode slave_pub_hash; + GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key), + &slave_pub_hash); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Slave connected to channel %s.\n", - slv, GNUNET_h2s (&slv->chan_key_hash)); - - GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key, - slave_counters_cb, slv); + "%p Got join decision (%d) from client for channel %s..\n", + mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p ..and slave %s.\n", + mst, GNUNET_h2s (&slave_pub_hash)); - GNUNET_SERVER_client_set_user_context (client, &slv->channel); + GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash, + &mcast_send_join_decision, &jcls); + GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -642,18 +1953,18 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, * * Sent after a message fragment has been passed on to multicast. * - * @param ch The channel struct for the client. + * @param chn The channel struct for the client. */ static void -send_message_ack (struct Channel *ch) +send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client) { struct GNUNET_MessageHeader res; res.size = htons (sizeof (res)); res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK); - GNUNET_SERVER_notification_context_add (nc, ch->client); - GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, - GNUNET_NO); + /* FIXME */ + GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO); } @@ -663,42 +1974,47 @@ send_message_ack (struct Channel *ch) static int transmit_notify (void *cls, size_t *data_size, void *data) { - struct Channel *ch = cls; - struct TransmitMessage *tmit_msg = ch->tmit_head; + struct Channel *chn = cls; + struct TransmitMessage *tmit_msg = chn->tmit_head; if (NULL == tmit_msg || *data_size < tmit_msg->size) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p transmit_notify: nothing to send.\n", ch); + "%p transmit_notify: nothing to send.\n", chn); + if (NULL != tmit_msg && *data_size < tmit_msg->size) + GNUNET_break (0); *data_size = 0; return GNUNET_NO; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size); + "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size); *data_size = tmit_msg->size; - memcpy (data, tmit_msg->buf, *data_size); + memcpy (data, &tmit_msg[1], *data_size); - GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg); - GNUNET_free (tmit_msg); + int ret + = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) + ? GNUNET_NO + : GNUNET_YES; + + /* FIXME: handle disconnecting clients */ + if (NULL != tmit_msg->client) + send_message_ack (chn, tmit_msg->client); - int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES; - send_message_ack (ch); + GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg); + GNUNET_free (tmit_msg); - if (0 == ch->tmit_task) + if (NULL != chn->tmit_head) { - if (NULL != ch->tmit_head) - { - transmit_message (ch, GNUNET_NO); - } - else if (ch->disconnected) - { - /* FIXME: handle partial message (when still in_transmit) */ - client_cleanup (ch); - } + GNUNET_SCHEDULER_add_now (schedule_transmit_message, chn); + } + else if (GNUNET_YES == chn->is_disconnected + && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) + { + /* FIXME: handle partial message (when still in_transmit) */ + return GNUNET_SYSERR; } - return ret; } @@ -741,16 +2057,16 @@ slave_transmit_notify (void *cls, size_t *data_size, void *data) * Transmit a message from a channel master to the multicast group. */ static void -master_transmit_message (struct Master *mst, uint8_t inc_msg_id) +master_transmit_message (struct Master *mst) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst); - mst->channel.tmit_task = 0; + struct Channel *chn = &mst->chn; + struct TransmitMessage *tmit_msg = chn->tmit_head; + if (NULL == tmit_msg) + return; if (NULL == mst->tmit_handle) { - if (GNUNET_YES == inc_msg_id) - mst->max_message_id++; mst->tmit_handle - = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id, + = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id, mst->max_group_generation, master_transmit_notify, mst); } @@ -765,15 +2081,14 @@ master_transmit_message (struct Master *mst, uint8_t inc_msg_id) * Transmit a message from a channel slave to the multicast group. */ static void -slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id) +slave_transmit_message (struct Slave *slv) { - slv->channel.tmit_task = 0; + if (NULL == slv->chn.tmit_head) + return; if (NULL == slv->tmit_handle) { - if (GNUNET_YES == inc_msg_id) - slv->max_message_id++; slv->tmit_handle - = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id, + = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id, slave_transmit_notify, slv); } else @@ -783,106 +2098,529 @@ slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id) } -static inline void -transmit_message (struct Channel *ch, uint8_t inc_msg_id) +static void +transmit_message (struct Channel *chn) { - ch->is_master - ? master_transmit_message ((struct Master *) ch, inc_msg_id) - : slave_transmit_message ((struct Slave *) ch, inc_msg_id); + chn->is_master + ? master_transmit_message ((struct Master *) chn) + : slave_transmit_message ((struct Slave *) chn); } +/** + * Queue a message from a channel master for sending to the multicast group. + */ static void -transmit_error (struct Channel *ch) +master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg) { - struct GNUNET_MessageHeader *msg; - struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) - + sizeof (*msg)); - msg = (struct GNUNET_MessageHeader *) &tmit_msg[1]; - msg->size = ntohs (sizeof (*msg)); - msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); - - tmit_msg->buf = (char *) &tmit_msg[1]; - tmit_msg->size = sizeof (*msg); - tmit_msg->state = ch->tmit_state; - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); - transmit_message (ch, GNUNET_NO); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst); + + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) + { + tmit_msg->id = ++mst->max_message_id; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p master_queue_message: message_id=%" PRIu64 "\n", + mst, tmit_msg->id); + struct GNUNET_PSYC_MessageMethod *pmeth + = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; + + if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET) + { + pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET); + } + else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p master_queue_message: state_delta=%" PRIu64 "\n", + mst, tmit_msg->id - mst->max_state_message_id); + pmeth->state_delta = GNUNET_htonll (tmit_msg->id + - mst->max_state_message_id); + mst->max_state_message_id = tmit_msg->id; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p master_queue_message: state not modified\n", mst); + pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); + } + + if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH) + { + /// @todo add state_hash to PSYC header + } + } +} + + +/** + * Queue a message from a channel slave for sending to the multicast group. + */ +static void +slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg) +{ + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) + { + struct GNUNET_PSYC_MessageMethod *pmeth + = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; + pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); + tmit_msg->id = ++slv->max_request_id; + } +} + + +/** + * Queue PSYC message parts for sending to multicast. + * + * @param chn + * Channel to send to. + * @param client + * Client the message originates from. + * @param data_size + * Size of @a data. + * @param data + * Concatenated message parts. + * @param first_ptype + * First message part type in @a data. + * @param last_ptype + * Last message part type in @a data. + */ +static struct TransmitMessage * +queue_message (struct Channel *chn, + struct GNUNET_SERVER_Client *client, + size_t data_size, + const void *data, + uint16_t first_ptype, uint16_t last_ptype) +{ + struct TransmitMessage * + tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size); + memcpy (&tmit_msg[1], data, data_size); + tmit_msg->client = client; + tmit_msg->size = data_size; + tmit_msg->first_ptype = first_ptype; + tmit_msg->last_ptype = last_ptype; + + /* FIXME: separate queue per message ID */ + + GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg); + + chn->is_master + ? master_queue_message ((struct Master *) chn, tmit_msg) + : slave_queue_message ((struct Slave *) chn, tmit_msg); + return tmit_msg; +} + + +/** + * Cancel transmission of current message. + * + * @param chn Channel to send to. + * @param client Client the message originates from. + */ +static void +transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client) +{ + uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; + + struct GNUNET_MessageHeader msg; + msg.size = htons (sizeof (msg)); + msg.type = htons (type); + + queue_message (chn, client, sizeof (msg), &msg, type, type); + transmit_message (chn); /* FIXME: cleanup */ } /** - * Incoming message from a client. + * Incoming message from a master or slave client. */ static void -handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) { - struct Channel *ch - = GNUNET_SERVER_client_get_user_context (client, struct Channel); - GNUNET_assert (NULL != ch); + struct Channel * + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + GNUNET_assert (NULL != chn); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Received message from client.\n", chn); + GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); - if (GNUNET_YES != ch->ready) + if (GNUNET_YES != chn->is_ready) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Ignoring message from client, channel is not ready yet.\n", - ch); + "%p Channel is not ready yet, disconnecting client.\n", chn); + GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - uint8_t inc_msg_id = GNUNET_NO; uint16_t size = ntohs (msg->size); - uint16_t psize = 0, ptype = 0, pos = 0; - if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p Message payload too large: %u < %u.\n", + chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg)); + GNUNET_break (0); + transmit_cancel (chn, client); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + + uint16_t first_ptype = 0, last_ptype = 0; + if (GNUNET_SYSERR + == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg), + (const char *) &msg[1], + &first_ptype, &last_ptype)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p Received invalid message part from client.\n", chn); GNUNET_break (0); - transmit_error (ch); + transmit_cancel (chn, client); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Received message with first part type %u and last part type %u.\n", + chn, first_ptype, last_ptype); + + queue_message (chn, client, size - sizeof (*msg), &msg[1], + first_ptype, last_ptype); + transmit_message (chn); + /* FIXME: send a few ACKs even before transmit_notify is called */ + + GNUNET_SERVER_receive_done (client, GNUNET_OK); +}; + +/** + * Received result of GNUNET_PSYCSTORE_membership_store() + */ +static void +store_recv_membership_store_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct Operation *op = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received message from client.\n", ch); - GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); + "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n", + op->chn, result, err_msg_size, err_msg); + + if (NULL != op->client) + client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); + op_remove (op); +} - for (pos = 0; sizeof (*msg) + pos < size; pos += psize) + +/** + * Client requests to add/remove a slave in the membership database. + */ +static void +client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + struct Channel * + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + GNUNET_assert (NULL != chn); + + const struct ChannelMembershipStoreRequest * + req = (const struct ChannelMembershipStoreRequest *) msg; + + struct Operation *op = op_add (chn, client, req->op_id, 0); + + uint64_t announced_at = GNUNET_ntohll (req->announced_at); + uint64_t effective_since = GNUNET_ntohll (req->effective_since); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Received membership store request from client.\n", chn); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n", + chn, req->did_join, announced_at, effective_since); + + GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key, + req->did_join, announced_at, effective_since, + 0, /* FIXME: group_generation */ + &store_recv_membership_store_result, op); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +/** + * Received a fragment for GNUNET_PSYCSTORE_fragment_get(), + * in response to a history request from a client. + */ +static int +store_recv_fragment_history (void *cls, + struct GNUNET_MULTICAST_MessageHeader *mmsg, + enum GNUNET_PSYCSTORE_MessageFlags flags) +{ + struct Operation *op = cls; + if (NULL == op->client) + { /* Requesting client already disconnected. */ + return GNUNET_NO; + } + struct Channel *chn = op->chn; + + struct GNUNET_PSYC_MessageHeader *pmsg; + uint16_t msize = ntohs (mmsg->header.size); + uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg); + + struct GNUNET_OperationResultMessage * + res = GNUNET_malloc (sizeof (*res) + psize); + res->header.size = htons (sizeof (*res) + psize); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT); + res->op_id = op->op_id; + res->result_code = GNUNET_htonll (GNUNET_OK); + + pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; + GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC); + memcpy (&res[1], pmsg, psize); + + /** @todo FIXME: send only to requesting client */ + client_send_msg (chn, &res->header); + return GNUNET_YES; +} + + +/** + * Received the result of GNUNET_PSYCSTORE_fragment_get(), + * in response to a history request from a client. + */ +static void +store_recv_fragment_history_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct Operation *op = cls; + if (NULL == op->client) + { /* Requesting client already disconnected. */ + return; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p History replay #%" PRIu64 ": " + "PSYCSTORE returned %" PRId64 " (%.*s)\n", + op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); + + if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE) { - const struct GNUNET_MessageHeader *pmsg - = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); - psize = ntohs (pmsg->size); - ptype = ntohs (pmsg->type); - if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "%p Received invalid message part of type %u and size %u " - "from client.\n", ch, ptype, psize); - GNUNET_break (0); - transmit_error (ch); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received message part from client.\n", ch); - GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); + /** @todo Multicast replay request for messages not found locally. */ + } + + client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); + op_remove (op); +} + + +/** + * Client requests channel history. + */ +static void +client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + struct Channel * + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + GNUNET_assert (NULL != chn); + + const struct GNUNET_PSYC_HistoryRequestMessage * + req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg; + uint16_t size = ntohs (msg->size); + const char *method_prefix = (const char *) &req[1]; + + if (size < sizeof (*req) + 1 + || '\0' != method_prefix[size - sizeof (*req) - 1]) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p History replay #%" PRIu64 ": " + "invalid method prefix. size: %u < %u?\n", + chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1); + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + + struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags)); + + if (0 == req->message_limit) + GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL, + GNUNET_ntohll (req->start_message_id), + GNUNET_ntohll (req->end_message_id), + 0, method_prefix, + &store_recv_fragment_history, + &store_recv_fragment_history_result, op); + else + GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL, + GNUNET_ntohll (req->message_limit), + method_prefix, + &store_recv_fragment_history, + &store_recv_fragment_history_result, + op); + + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + - if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype) - inc_msg_id = GNUNET_YES; +/** + * Received state var from PSYCstore, send it to client. + */ +static int +store_recv_state_var (void *cls, const char *name, + const void *value, uint32_t value_size) +{ + struct Operation *op = cls; + struct GNUNET_OperationResultMessage *res; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n", + op->chn, GNUNET_ntohll (op->op_id), name); + + if (NULL != name) /* First part */ + { + uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; + struct GNUNET_PSYC_MessageModifier *mod; + res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size); + res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); + res->op_id = op->op_id; + + mod = (struct GNUNET_PSYC_MessageModifier *) &res[1]; + mod->header.size = htons (sizeof (*mod) + name_size + value_size); + mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); + mod->name_size = htons (name_size); + mod->value_size = htonl (value_size); + mod->oper = htons (GNUNET_PSYC_OP_ASSIGN); + memcpy (&mod[1], name, name_size); + memcpy (((char *) &mod[1]) + name_size, value, value_size); + } + else /* Continuation */ + { + struct GNUNET_MessageHeader *mod; + res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size); + res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); + res->op_id = op->op_id; + + mod = (struct GNUNET_MessageHeader *) &res[1]; + mod->size = htons (sizeof (*mod) + value_size); + mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); + memcpy (&mod[1], value, value_size); + } + + // FIXME: client might have been disconnected + GNUNET_SERVER_notification_context_add (nc, op->client); + GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header, + GNUNET_NO); + return GNUNET_YES; +} + + +/** + * Received result of GNUNET_PSYCSTORE_state_get() + * or GNUNET_PSYCSTORE_state_get_prefix() + */ +static void +store_recv_state_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct Operation *op = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p state_get #%" PRIu64 ": " + "PSYCSTORE returned %" PRId64 " (%.*s)\n", + op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); + + // FIXME: client might have been disconnected + client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); + op_remove (op); +} + + +/** + * Client requests best matching state variable from PSYCstore. + */ +static void +client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + struct Channel * + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + GNUNET_assert (NULL != chn); + + const struct StateRequest * + req = (const struct StateRequest *) msg; + + uint16_t name_size = ntohs (req->header.size) - sizeof (*req); + const char *name = (const char *) &req[1]; + if (0 == name_size || '\0' != name[name_size - 1]) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; } - size -= sizeof (*msg); - struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size); - tmit_msg->buf = (char *) &tmit_msg[1]; - memcpy (tmit_msg->buf, &msg[1], size); - tmit_msg->size = size; - tmit_msg->state = ch->tmit_state; - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); - transmit_message (ch, inc_msg_id); + struct Operation *op = op_add (chn, client, req->op_id, 0); + GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name, + &store_recv_state_var, + &store_recv_state_result, op); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +/** + * Client requests state variables with a given prefix from PSYCstore. + */ +static void +client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + struct Channel * + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + GNUNET_assert (NULL != chn); + + const struct StateRequest * + req = (const struct StateRequest *) msg; + + uint16_t name_size = ntohs (req->header.size) - sizeof (*req); + const char *name = (const char *) &req[1]; + if (0 == name_size || '\0' != name[name_size - 1]) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + struct Operation *op = op_add (chn, client, req->op_id, 0); + GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name, + &store_recv_state_var, + &store_recv_state_result, op); GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { + { &client_recv_master_start, NULL, + GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 }, + + { &client_recv_slave_join, NULL, + GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, + + { &client_recv_join_decision, NULL, + GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 }, + + { &client_recv_psyc_message, NULL, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, + + { &client_recv_membership_store, NULL, + GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 }, + + { &client_recv_history_replay, NULL, + GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 }, + + { &client_recv_state_get, NULL, + GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 }, + + { &client_recv_state_get_prefix, NULL, + GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }, + + { NULL, NULL, 0, 0 } }; @@ -897,26 +2635,18 @@ static void run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) { - static const struct GNUNET_SERVER_MessageHandler handlers[] = { - { &handle_master_start, NULL, - GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 }, - - { &handle_slave_join, NULL, - GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, - - { &handle_psyc_message, NULL, - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, - }; - cfg = c; store = GNUNET_PSYCSTORE_connect (cfg); stats = GNUNET_STATISTICS_create ("psyc", cfg); - clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); nc = GNUNET_SERVER_notification_context_create (server, 1); - GNUNET_SERVER_add_handlers (server, handlers); + GNUNET_SERVER_add_handlers (server, server_handlers); GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, - NULL); + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, + &shutdown_task, NULL); } @@ -936,4 +2666,4 @@ main (int argc, char *const *argv) &run, NULL)) ? 0 : 1; } -/* end of gnunet-service-psycstore.c */ +/* end of gnunet-service-psyc.c */