2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
42 * Handle to our current configuration.
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
49 static struct GNUNET_SERVICE_Handle *service;
52 * Handle to the statistics service.
54 static struct GNUNET_STATISTICS_Handle *stats;
57 * Handle to the PSYCstore.
59 static struct GNUNET_PSYCSTORE_Handle *store;
62 * All connected masters.
63 * Channel's pub_key_hash -> struct Master
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
68 * All connected slaves.
69 * Channel's pub_key_hash -> struct Slave
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
74 * Connected slaves per channel.
75 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
81 * Message in the transmission queue.
83 struct TransmitMessage
85 struct TransmitMessage *prev;
86 struct TransmitMessage *next;
88 struct GNUNET_SERVICE_Client *client;
91 * ID assigned to the message.
101 * Type of first message part.
103 uint16_t first_ptype;
106 * Type of last message part.
110 /* Followed by message */
115 * Cache for received message fragments.
116 * Message fragments are only sent to clients after all modifiers arrived.
118 * chan_key -> MultiHashMap chan_msgs
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
124 * Entry in the chan_msgs hashmap of @a recv_cache:
125 * fragment_id -> RecvCacheEntry
127 struct RecvCacheEntry
129 struct GNUNET_MULTICAST_MessageHeader *mmsg;
135 * Entry in the @a recv_frags hash map of a @a Channel.
136 * message_id -> FragmentQueue
141 * Fragment IDs stored in @a recv_cache.
143 struct GNUNET_CONTAINER_Heap *fragments;
146 * Total size of received fragments.
151 * Total size of received header fragments (METHOD & MODIFIERs)
153 uint64_t header_size;
156 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
158 uint64_t state_delta;
161 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
166 * Receive state of message.
168 * @see MessageFragmentState
173 * Whether the state is already modified in PSYCstore.
175 uint8_t state_is_modified;
178 * Is the message queued for delivery to the client?
179 * i.e. added to the recv_msgs queue
186 * List of connected clients.
190 struct ClientList *prev;
191 struct ClientList *next;
193 struct GNUNET_SERVICE_Client *client;
199 struct Operation *prev;
200 struct Operation *next;
202 struct GNUNET_SERVICE_Client *client;
203 struct Channel *channel;
210 * Common part of the client context for both a channel master and slave.
214 struct ClientList *clients_head;
215 struct ClientList *clients_tail;
217 struct Operation *op_head;
218 struct Operation *op_tail;
220 struct TransmitMessage *tmit_head;
221 struct TransmitMessage *tmit_tail;
224 * Current PSYCstore operation.
226 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
229 * Received fragments not yet sent to the client.
230 * message_id -> FragmentQueue
232 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
235 * Received message IDs not yet sent to the client.
237 struct GNUNET_CONTAINER_Heap *recv_msgs;
240 * Public key of the channel.
242 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
245 * Hash of @a pub_key.
247 struct GNUNET_HashCode pub_key_hash;
250 * Last message ID sent to the client.
251 * 0 if there is no such message.
253 uint64_t max_message_id;
256 * ID of the last stateful message, where the state operations has been
257 * processed and saved to PSYCstore and which has been sent to the client.
258 * 0 if there is no such message.
260 uint64_t max_state_message_id;
263 * Expected value size for the modifier being received from the PSYC service.
265 uint32_t tmit_mod_value_size_expected;
268 * Actual value size for the modifier being received from the PSYC service.
270 uint32_t tmit_mod_value_size;
273 * Is this channel ready to receive messages from client?
274 * #GNUNET_YES or #GNUNET_NO
279 * Is the client disconnected?
280 * #GNUNET_YES or #GNUNET_NO
282 uint8_t is_disconnecting;
285 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
290 struct Master *master;
297 * Client context for a channel master.
302 * Channel struct common for Master and Slave
304 struct Channel channel;
307 * Private key of the channel.
309 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
312 * Handle for the multicast origin.
314 struct GNUNET_MULTICAST_Origin *origin;
317 * Transmit handle for multicast.
319 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
322 * Incoming join requests from multicast.
323 * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
325 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
328 * Last message ID transmitted to this channel.
330 * Incremented before sending a message, thus the message_id in messages sent
333 uint64_t max_message_id;
336 * ID of the last message with state operations transmitted to the channel.
337 * 0 if there is no such message.
339 uint64_t max_state_message_id;
342 * Maximum group generation transmitted to the channel.
344 uint64_t max_group_generation;
347 * @see enum GNUNET_PSYC_Policy
349 enum GNUNET_PSYC_Policy policy;
354 * Client context for a channel slave.
359 * Channel struct common for Master and Slave
361 struct Channel channel;
364 * Private key of the slave.
366 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
369 * Public key of the slave.
371 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
374 * Hash of @a pub_key.
376 struct GNUNET_HashCode pub_key_hash;
379 * Handle for the multicast member.
381 struct GNUNET_MULTICAST_Member *member;
384 * Transmit handle for multicast.
386 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
389 * Peer identity of the origin.
391 struct GNUNET_PeerIdentity origin;
394 * Number of items in @a relays.
396 uint32_t relay_count;
399 * Relays that multicast can use to connect.
401 struct GNUNET_PeerIdentity *relays;
404 * Join request to be transmitted to the master on join.
406 struct GNUNET_PSYC_Message *join_msg;
409 * Join decision received from multicast.
411 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
414 * Maximum request ID for this channel.
416 uint64_t max_request_id;
421 enum GNUNET_PSYC_SlaveJoinFlags join_flags;
429 struct GNUNET_SERVICE_Client *client;
430 struct Channel *channel;
434 struct ReplayRequestKey
436 uint64_t fragment_id;
438 uint64_t fragment_offset;
444 transmit_message (struct Channel *chn);
447 message_queue_run (struct Channel *chn);
450 message_queue_drop (struct Channel *chn);
454 schedule_transmit_message (void *cls)
456 struct Channel *chn = cls;
458 transmit_message (chn);
463 * Task run during shutdown.
468 shutdown_task (void *cls)
470 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471 "shutting down...\n");
472 GNUNET_PSYCSTORE_disconnect (store);
475 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
481 static struct Operation *
482 op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
483 uint64_t op_id, uint32_t flags)
485 struct Operation *op = GNUNET_malloc (sizeof (*op));
490 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
496 op_remove (struct Operation *op)
498 GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
504 * Clean up master data structures after a client disconnected.
507 cleanup_master (struct Master *mst)
509 struct Channel *chn = &mst->channel;
511 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
512 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
517 * Clean up slave data structures after a client disconnected.
520 cleanup_slave (struct Slave *slv)
522 struct Channel *chn = &slv->channel;
523 struct GNUNET_CONTAINER_MultiHashMap *
524 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
526 GNUNET_assert (NULL != chn_slv);
527 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
529 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
531 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
533 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
535 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
537 if (NULL != slv->join_msg)
539 GNUNET_free (slv->join_msg);
540 slv->join_msg = NULL;
542 if (NULL != slv->relays)
544 GNUNET_free (slv->relays);
547 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
552 * Clean up channel data structures after a client disconnected.
555 cleanup_channel (struct Channel *chn)
557 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558 "%p Cleaning up channel %s. master? %u\n",
560 GNUNET_h2s (&chn->pub_key_hash),
562 message_queue_drop (chn);
563 GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
564 chn->recv_frags = NULL;
566 if (NULL != chn->store_op)
568 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
569 chn->store_op = NULL;
572 (GNUNET_YES == chn->is_master)
573 ? cleanup_master (chn->master)
574 : cleanup_slave (chn->slave);
580 * Called whenever a client is disconnected.
581 * Frees our resources associated with that client.
584 * @param client identification of the client
585 * @param app_ctx must match @a client
588 client_notify_disconnect (void *cls,
589 struct GNUNET_SERVICE_Client *client,
592 struct Client *c = app_ctx;
593 struct Channel *chn = c->channel;
598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599 "%p User context is NULL in client_notify_disconnect ()\n",
605 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606 "%p Client %p (%s) disconnected from channel %s\n",
609 (GNUNET_YES == chn->is_master) ? "master" : "slave",
610 GNUNET_h2s (&chn->pub_key_hash));
612 struct ClientList *cli = chn->clients_head;
615 if (cli->client == client)
617 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
624 struct Operation *op = chn->op_head;
627 if (op->client == client)
635 if (NULL == chn->clients_head)
636 { /* Last client disconnected. */
637 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
638 "%p Last client (%s) disconnected from channel %s\n",
640 (GNUNET_YES == chn->is_master) ? "master" : "slave",
641 GNUNET_h2s (&chn->pub_key_hash));
642 chn->is_disconnecting = GNUNET_YES;
643 cleanup_channel (chn);
649 * A new client connected.
652 * @param client client to add
653 * @param mq message queue for @a client
657 client_notify_connect (void *cls,
658 struct GNUNET_SERVICE_Client *client,
659 struct GNUNET_MQ_Handle *mq)
661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
663 struct Client *c = GNUNET_malloc (sizeof (*c));
671 * Send message to all clients connected to the channel.
674 client_send_msg (const struct Channel *chn,
675 const struct GNUNET_MessageHeader *msg)
677 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
678 "Sending message to clients of channel %p.\n",
681 struct ClientList *cli = chn->clients_head;
684 struct GNUNET_MQ_Envelope *
685 env = GNUNET_MQ_msg_copy (msg);
687 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
695 * Send a result code back to the client.
698 * Client that should receive the result code.
702 * Operation ID in network byte order.
704 * Data payload or NULL.
709 client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
710 int64_t result_code, const void *data, uint16_t data_size)
712 struct GNUNET_OperationResultMessage *res;
713 struct GNUNET_MQ_Envelope *
714 env = GNUNET_MQ_msg_extra (res,
716 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
717 res->result_code = GNUNET_htonll (result_code);
720 GNUNET_memcpy (&res[1], data, data_size);
722 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
723 "%p Sending result to client for OP ID %" PRIu64 ": %" PRId64 " (size: %u)\n",
725 GNUNET_ntohll (op_id),
729 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
734 * Closure for join_mem_test_cb()
736 struct JoinMemTestClosure
738 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
739 struct Channel *channel;
740 struct GNUNET_MULTICAST_JoinHandle *join_handle;
741 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
746 * Membership test result callback used for join requests.
749 join_mem_test_cb (void *cls, int64_t result,
750 const char *err_msg, uint16_t err_msg_size)
752 struct JoinMemTestClosure *jcls = cls;
754 if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
755 { /* Pass on join request to client if this is a master channel */
756 struct Master *mst = jcls->channel->master;
757 struct GNUNET_HashCode slave_pub_hash;
758 GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
760 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
761 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
762 client_send_msg (jcls->channel, &jcls->join_msg->header);
766 if (GNUNET_SYSERR == result)
768 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
769 "Could not perform membership test (%.*s)\n",
770 err_msg_size, err_msg);
773 GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
775 GNUNET_free (jcls->join_msg);
781 * Incoming join request from multicast.
784 mcast_recv_join_request (void *cls,
785 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
786 const struct GNUNET_MessageHeader *join_msg,
787 struct GNUNET_MULTICAST_JoinHandle *jh)
789 struct Channel *chn = cls;
790 uint16_t join_msg_size = 0;
792 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
793 "%p Got join request.\n",
795 if (NULL != join_msg)
797 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
799 join_msg_size = ntohs (join_msg->size);
803 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
804 "%p Got join message with invalid type %u.\n",
806 ntohs (join_msg->type));
810 struct GNUNET_PSYC_JoinRequestMessage *
811 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
812 req->header.size = htons (sizeof (*req) + join_msg_size);
813 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
814 req->slave_pub_key = *slave_pub_key;
815 if (0 < join_msg_size)
816 GNUNET_memcpy (&req[1], join_msg, join_msg_size);
818 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
819 jcls->slave_pub_key = *slave_pub_key;
821 jcls->join_handle = jh;
822 jcls->join_msg = req;
824 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
825 chn->max_message_id, 0,
826 &join_mem_test_cb, jcls);
831 * Join decision received from multicast.
834 mcast_recv_join_decision (void *cls, int is_admitted,
835 const struct GNUNET_PeerIdentity *peer,
836 uint16_t relay_count,
837 const struct GNUNET_PeerIdentity *relays,
838 const struct GNUNET_MessageHeader *join_resp)
840 struct Slave *slv = cls;
841 struct Channel *chn = &slv->channel;
842 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843 "%p Got join decision: %d\n",
846 if (GNUNET_YES == chn->is_ready)
848 /* Already admitted */
852 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
853 struct GNUNET_PSYC_JoinDecisionMessage *
854 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
855 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
856 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
857 dcsn->is_admitted = htonl (is_admitted);
858 if (0 < join_resp_size)
859 GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
861 client_send_msg (chn, &dcsn->header);
863 if (GNUNET_YES == is_admitted
864 && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
866 chn->is_ready = GNUNET_YES;
872 store_recv_fragment_replay (void *cls,
873 struct GNUNET_MULTICAST_MessageHeader *msg,
874 enum GNUNET_PSYCSTORE_MessageFlags flags)
876 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
878 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
884 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
887 store_recv_fragment_replay_result (void *cls,
890 uint16_t err_msg_size)
892 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
894 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
895 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
906 GNUNET_MULTICAST_replay_response (rh, NULL,
907 GNUNET_MULTICAST_REC_NOT_FOUND);
910 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
911 GNUNET_MULTICAST_replay_response (rh, NULL,
912 GNUNET_MULTICAST_REC_ACCESS_DENIED);
916 GNUNET_MULTICAST_replay_response (rh, NULL,
917 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
920 /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
921 * an error code, so it must be ensured no further processing
922 * is attempted on 'rh'. Maybe this should be refactored as
923 * it doesn't look very intuitive. --lynX
925 GNUNET_MULTICAST_replay_response_end (rh);
930 * Incoming fragment replay request from multicast.
933 mcast_recv_replay_fragment (void *cls,
934 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
935 uint64_t fragment_id, uint64_t flags,
936 struct GNUNET_MULTICAST_ReplayHandle *rh)
939 struct Channel *chn = cls;
940 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
941 fragment_id, fragment_id,
942 &store_recv_fragment_replay,
943 &store_recv_fragment_replay_result, rh);
948 * Incoming message replay request from multicast.
951 mcast_recv_replay_message (void *cls,
952 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
954 uint64_t fragment_offset,
956 struct GNUNET_MULTICAST_ReplayHandle *rh)
958 struct Channel *chn = cls;
959 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
960 message_id, message_id, 1, NULL,
961 &store_recv_fragment_replay,
962 &store_recv_fragment_replay_result, rh);
967 * Convert an uint64_t in network byte order to a HashCode
968 * that can be used as key in a MultiHashMap
971 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
973 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
974 /* TODO: use built-in byte swap functions if available */
976 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
977 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
979 *key = (struct GNUNET_HashCode) {};
981 = (n << 32) | (n >> 32);
986 * Convert an uint64_t in host byte order to a HashCode
987 * that can be used as key in a MultiHashMap
990 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
992 #if __BYTE_ORDER == __BIG_ENDIAN
993 hash_key_from_nll (key, n);
994 #elif __BYTE_ORDER == __LITTLE_ENDIAN
995 *key = (struct GNUNET_HashCode) {};
996 *((uint64_t *) key) = n;
998 #error byteorder undefined
1004 * Initialize PSYC message header.
1007 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1008 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1010 uint16_t size = ntohs (mmsg->header.size);
1011 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1013 pmsg->header.size = htons (psize);
1014 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1015 pmsg->message_id = mmsg->message_id;
1016 pmsg->fragment_offset = mmsg->fragment_offset;
1017 pmsg->flags = htonl (flags);
1019 GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1024 * Create a new PSYC message from a multicast message for sending it to clients.
1026 static inline struct GNUNET_PSYC_MessageHeader *
1027 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1029 struct GNUNET_PSYC_MessageHeader *pmsg;
1030 uint16_t size = ntohs (mmsg->header.size);
1031 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1033 pmsg = GNUNET_malloc (psize);
1034 psyc_msg_init (pmsg, mmsg, flags);
1040 * Send multicast message to all clients connected to the channel.
1043 client_send_mcast_msg (struct Channel *chn,
1044 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1047 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1048 "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1050 GNUNET_ntohll (mmsg->fragment_id),
1051 GNUNET_ntohll (mmsg->message_id));
1053 struct GNUNET_PSYC_MessageHeader *
1054 pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1055 client_send_msg (chn, &pmsg->header);
1061 * Send multicast request to all clients connected to the channel.
1064 client_send_mcast_req (struct Master *mst,
1065 const struct GNUNET_MULTICAST_RequestHeader *req)
1067 struct Channel *chn = &mst->channel;
1069 struct GNUNET_PSYC_MessageHeader *pmsg;
1070 uint16_t size = ntohs (req->header.size);
1071 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1073 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1074 "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1076 GNUNET_ntohll (req->fragment_id),
1077 GNUNET_ntohll (req->request_id));
1079 pmsg = GNUNET_malloc (psize);
1080 pmsg->header.size = htons (psize);
1081 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1082 pmsg->message_id = req->request_id;
1083 pmsg->fragment_offset = req->fragment_offset;
1084 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1085 pmsg->slave_pub_key = req->member_pub_key;
1086 GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1088 client_send_msg (chn, &pmsg->header);
1090 /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1097 * Insert a multicast message fragment into the queue belonging to the message.
1099 * @param chn Channel.
1100 * @param mmsg Multicast message fragment.
1101 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
1102 * @param first_ptype First PSYC message part type in @a mmsg.
1103 * @param last_ptype Last PSYC message part type in @a mmsg.
1106 fragment_queue_insert (struct Channel *chn,
1107 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1108 uint16_t first_ptype, uint16_t last_ptype)
1110 const uint16_t size = ntohs (mmsg->header.size);
1111 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1112 struct GNUNET_CONTAINER_MultiHashMap
1113 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1114 &chn->pub_key_hash);
1116 struct GNUNET_HashCode msg_id_hash;
1117 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1119 struct FragmentQueue
1120 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1124 fragq = GNUNET_malloc (sizeof (*fragq));
1125 fragq->state = MSG_FRAG_STATE_HEADER;
1127 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1129 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1130 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1132 if (NULL == chan_msgs)
1134 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1135 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1136 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1140 struct GNUNET_HashCode frag_id_hash;
1141 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1142 struct RecvCacheEntry
1143 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1144 if (NULL == cache_entry)
1146 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1149 GNUNET_ntohll (mmsg->message_id),
1150 GNUNET_ntohll (mmsg->fragment_id));
1151 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152 "%p header_size: %" PRIu64 " + %u\n",
1156 cache_entry = GNUNET_malloc (sizeof (*cache_entry));
1157 cache_entry->ref_count = 1;
1158 cache_entry->mmsg = GNUNET_malloc (size);
1159 GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1160 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1161 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1165 cache_entry->ref_count++;
1166 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1167 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1169 GNUNET_ntohll (mmsg->message_id),
1170 GNUNET_ntohll (mmsg->fragment_id),
1171 cache_entry->ref_count);
1174 if (MSG_FRAG_STATE_HEADER == fragq->state)
1176 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1178 struct GNUNET_PSYC_MessageMethod *
1179 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1180 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1181 fragq->flags = ntohl (pmeth->flags);
1184 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1186 fragq->header_size += size;
1188 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1189 || frag_offset == fragq->header_size)
1190 { /* header is now complete */
1191 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1192 "%p Header of message %" PRIu64 " is complete.\n",
1194 GNUNET_ntohll (mmsg->message_id));
1196 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1197 "%p Adding message %" PRIu64 " to queue.\n",
1199 GNUNET_ntohll (mmsg->message_id));
1200 fragq->state = MSG_FRAG_STATE_DATA;
1204 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1205 "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1207 GNUNET_ntohll (mmsg->message_id),
1209 fragq->header_size);
1215 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1216 if (frag_offset == fragq->size)
1217 fragq->state = MSG_FRAG_STATE_END;
1219 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1220 "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1222 GNUNET_ntohll (mmsg->message_id),
1227 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1228 /* Drop message without delivering to client if it's a single fragment */
1230 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1231 ? MSG_FRAG_STATE_DROP
1232 : MSG_FRAG_STATE_CANCEL;
1235 switch (fragq->state)
1237 case MSG_FRAG_STATE_DATA:
1238 case MSG_FRAG_STATE_END:
1239 case MSG_FRAG_STATE_CANCEL:
1240 if (GNUNET_NO == fragq->is_queued)
1242 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1243 GNUNET_ntohll (mmsg->message_id));
1244 fragq->is_queued = GNUNET_YES;
1248 fragq->size += size;
1249 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1250 GNUNET_ntohll (mmsg->fragment_id));
1255 * Run fragment queue of a message.
1257 * Send fragments of a message in order to client, after all modifiers arrived
1263 * ID of the message @a fragq belongs to.
1265 * Fragment queue of the message.
1267 * Drop message without delivering to client?
1268 * #GNUNET_YES or #GNUNET_NO.
1271 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1272 struct FragmentQueue *fragq, uint8_t drop)
1274 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1275 "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1280 struct GNUNET_CONTAINER_MultiHashMap
1281 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1282 &chn->pub_key_hash);
1283 GNUNET_assert (NULL != chan_msgs);
1286 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1289 struct GNUNET_HashCode frag_id_hash;
1290 hash_key_from_hll (&frag_id_hash, frag_id);
1291 struct RecvCacheEntry *cache_entry
1292 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1293 if (cache_entry != NULL)
1295 if (GNUNET_NO == drop)
1297 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1299 if (cache_entry->ref_count <= 1)
1301 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1303 GNUNET_free (cache_entry->mmsg);
1304 GNUNET_free (cache_entry);
1308 cache_entry->ref_count--;
1311 #if CACHE_AGING_IMPLEMENTED
1312 else if (GNUNET_NO == drop)
1314 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1318 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1321 if (MSG_FRAG_STATE_END <= fragq->state)
1323 struct GNUNET_HashCode msg_id_hash;
1324 hash_key_from_hll (&msg_id_hash, msg_id);
1326 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1327 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1328 GNUNET_free (fragq);
1332 fragq->is_queued = GNUNET_NO;
1337 struct StateModifyClosure
1339 struct Channel *channel;
1341 struct GNUNET_HashCode msg_id_hash;
1346 store_recv_state_modify_result (void *cls, int64_t result,
1347 const char *err_msg, uint16_t err_msg_size)
1349 struct StateModifyClosure *mcls = cls;
1350 struct Channel *chn = mcls->channel;
1351 uint64_t msg_id = mcls->msg_id;
1353 struct FragmentQueue *
1354 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1356 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1357 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1358 chn, result, err_msg_size, err_msg);
1365 fragq->state_is_modified = GNUNET_YES;
1366 if (chn->max_state_message_id < msg_id)
1367 chn->max_state_message_id = msg_id;
1368 if (chn->max_message_id < msg_id)
1369 chn->max_message_id = msg_id;
1372 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1373 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1374 message_queue_run (chn);
1378 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1379 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1380 chn, result, err_msg_size, err_msg);
1381 /** @todo FIXME: handle state_modify error */
1387 * Run message queue.
1389 * Send messages in queue to client in order after a message has arrived from
1390 * multicast, according to the following:
1391 * - A message is only sent if all of its modifiers arrived.
1392 * - A stateful message is only sent if the previous stateful message
1393 * has already been delivered to the client.
1395 * @param chn Channel.
1397 * @return Number of messages removed from queue and sent to client.
1400 message_queue_run (struct Channel *chn)
1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1403 "%p Running message queue.\n", chn);
1407 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1410 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1411 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1412 struct GNUNET_HashCode msg_id_hash;
1413 hash_key_from_hll (&msg_id_hash, msg_id);
1415 struct FragmentQueue *
1416 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1418 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1421 "%p No fragq (%p) or header not complete.\n",
1426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1427 "%p Fragment queue entry: state: %u, state delta: "
1428 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1429 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1431 if (MSG_FRAG_STATE_DATA <= fragq->state)
1433 /* Check if there's a missing message before the current one */
1434 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1438 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1439 && (chn->max_message_id != msg_id - 1
1440 && chn->max_message_id != msg_id))
1442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1443 "%p Out of order message. "
1444 "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1445 chn, chn->max_message_id, msg_id);
1447 // FIXME: keep track of messages processed in this queue run,
1448 // and only stop after reaching the end
1453 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1454 if (GNUNET_YES != fragq->state_is_modified)
1456 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1458 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1459 "%p Out of order stateful message. "
1460 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1461 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1463 // FIXME: keep track of messages processed in this queue run,
1464 // and only stop after reaching the end
1467 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1468 mcls->channel = chn;
1469 mcls->msg_id = msg_id;
1470 mcls->msg_id_hash = msg_id_hash;
1472 /* Apply modifiers to state in PSYCstore */
1473 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1475 store_recv_state_modify_result, mcls);
1476 break; // continue after asynchronous state modify result
1479 chn->max_message_id = msg_id;
1481 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1482 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1486 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1487 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1493 * Drop message queue of a channel.
1495 * Remove all messages in queue without sending it to clients.
1497 * @param chn Channel.
1499 * @return Number of messages removed from queue.
1502 message_queue_drop (struct Channel *chn)
1506 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1509 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1510 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1511 struct GNUNET_HashCode msg_id_hash;
1512 hash_key_from_hll (&msg_id_hash, msg_id);
1514 struct FragmentQueue *
1515 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1516 GNUNET_assert (NULL != fragq);
1517 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1518 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1521 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1522 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1528 * Received result of GNUNET_PSYCSTORE_fragment_store().
1531 store_recv_fragment_store_result (void *cls, int64_t result,
1532 const char *err_msg, uint16_t err_msg_size)
1534 struct Channel *chn = cls;
1535 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1537 chn, result, err_msg_size, err_msg);
1542 * Handle incoming message fragment from multicast.
1544 * Store it using PSYCstore and send it to the clients of the channel in order.
1547 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1549 struct Channel *chn = cls;
1550 uint16_t size = ntohs (mmsg->header.size);
1552 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1553 "%p Received multicast message of size %u. "
1554 "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1555 ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1557 GNUNET_ntohll (mmsg->fragment_id),
1558 GNUNET_ntohll (mmsg->message_id),
1559 GNUNET_ntohll (mmsg->fragment_offset),
1560 GNUNET_ntohll (mmsg->flags));
1562 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1563 &store_recv_fragment_store_result, chn);
1565 uint16_t first_ptype = 0, last_ptype = 0;
1566 int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1567 (const char *) &mmsg[1],
1568 &first_ptype, &last_ptype);
1569 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1570 "%p Message check result %d, first part type %u, last part type %u\n",
1571 chn, check, first_ptype, last_ptype);
1572 if (GNUNET_SYSERR == check)
1574 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1575 "%p Dropping incoming multicast message with invalid parts.\n",
1577 GNUNET_break_op (0);
1581 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1582 message_queue_run (chn);
1587 * Incoming request fragment from multicast for a master.
1589 * @param cls Master.
1590 * @param req The request.
1593 mcast_recv_request (void *cls,
1594 const struct GNUNET_MULTICAST_RequestHeader *req)
1596 struct Master *mst = cls;
1597 uint16_t size = ntohs (req->header.size);
1599 char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1600 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1601 "%p Received multicast request of size %u from %s.\n",
1605 uint16_t first_ptype = 0, last_ptype = 0;
1607 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1608 (const char *) &req[1],
1609 &first_ptype, &last_ptype))
1611 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1612 "%p Dropping incoming multicast request with invalid parts.\n",
1614 GNUNET_break_op (0);
1618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1619 "Message parts: first: type %u, last: type %u\n",
1620 first_ptype, last_ptype);
1622 /* FIXME: in-order delivery */
1623 client_send_mcast_req (mst, req);
1628 * Response from PSYCstore with the current counter values for a channel master.
1631 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1632 uint64_t max_message_id, uint64_t max_group_generation,
1633 uint64_t max_state_message_id)
1635 struct Master *mst = cls;
1636 struct Channel *chn = &mst->channel;
1637 chn->store_op = NULL;
1639 struct GNUNET_PSYC_CountersResultMessage res;
1640 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1641 res.header.size = htons (sizeof (res));
1642 res.result_code = htonl (result);
1643 res.max_message_id = GNUNET_htonll (max_message_id);
1645 if (GNUNET_OK == result || GNUNET_NO == result)
1647 mst->max_message_id = max_message_id;
1648 chn->max_message_id = max_message_id;
1649 chn->max_state_message_id = max_state_message_id;
1650 mst->max_group_generation = max_group_generation;
1652 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1653 mcast_recv_join_request,
1654 mcast_recv_replay_fragment,
1655 mcast_recv_replay_message,
1657 mcast_recv_message, chn);
1658 chn->is_ready = GNUNET_YES;
1662 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1663 "%p GNUNET_PSYCSTORE_counters_get() "
1664 "returned %d for channel %s.\n",
1665 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1668 client_send_msg (chn, &res.header);
1673 * Response from PSYCstore with the current counter values for a channel slave.
1676 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1677 uint64_t max_message_id, uint64_t max_group_generation,
1678 uint64_t max_state_message_id)
1680 struct Slave *slv = cls;
1681 struct Channel *chn = &slv->channel;
1682 chn->store_op = NULL;
1684 struct GNUNET_PSYC_CountersResultMessage res;
1685 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1686 res.header.size = htons (sizeof (res));
1687 res.result_code = htonl (result);
1688 res.max_message_id = GNUNET_htonll (max_message_id);
1690 if (GNUNET_YES == result || GNUNET_NO == result)
1692 chn->max_message_id = max_message_id;
1693 chn->max_state_message_id = max_state_message_id;
1695 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1697 slv->relay_count, slv->relays,
1698 &slv->join_msg->header,
1699 mcast_recv_join_request,
1700 mcast_recv_join_decision,
1701 mcast_recv_replay_fragment,
1702 mcast_recv_replay_message,
1703 mcast_recv_message, chn);
1704 if (NULL != slv->join_msg)
1706 GNUNET_free (slv->join_msg);
1707 slv->join_msg = NULL;
1712 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1713 "%p GNUNET_PSYCSTORE_counters_get() "
1714 "returned %d for channel %s.\n",
1715 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1718 client_send_msg (chn, &res.header);
1723 channel_init (struct Channel *chn)
1726 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1727 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1732 * Handle a connecting client starting a channel master.
1735 handle_client_master_start (void *cls,
1736 const struct MasterStartRequest *req)
1738 struct Client *c = cls;
1739 struct GNUNET_SERVICE_Client *client = c->client;
1741 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1742 struct GNUNET_HashCode pub_key_hash;
1744 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1745 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1748 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1749 struct Channel *chn;
1753 mst = GNUNET_malloc (sizeof (*mst));
1754 mst->policy = ntohl (req->policy);
1755 mst->priv_key = req->channel_key;
1756 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1758 chn = c->channel = &mst->channel;
1760 chn->is_master = GNUNET_YES;
1761 chn->pub_key = pub_key;
1762 chn->pub_key_hash = pub_key_hash;
1765 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1766 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1767 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1768 store_recv_master_counters, mst);
1772 chn = &mst->channel;
1774 struct GNUNET_PSYC_CountersResultMessage *res;
1775 struct GNUNET_MQ_Envelope *
1776 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1777 res->result_code = htonl (GNUNET_OK);
1778 res->max_message_id = GNUNET_htonll (mst->max_message_id);
1780 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1783 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1784 "%p Client connected as master to channel %s.\n",
1785 mst, GNUNET_h2s (&chn->pub_key_hash));
1787 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1788 cli->client = client;
1789 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1791 GNUNET_SERVICE_client_continue (client);
1796 check_client_slave_join (void *cls,
1797 const struct SlaveJoinRequest *req)
1804 * Handle a connecting client joining as a channel slave.
1807 handle_client_slave_join (void *cls,
1808 const struct SlaveJoinRequest *req)
1810 struct Client *c = cls;
1811 struct GNUNET_SERVICE_Client *client = c->client;
1813 uint16_t req_size = ntohs (req->header.size);
1815 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1816 struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1819 "got join request from client %p\n",
1821 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1822 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1823 GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1825 struct GNUNET_CONTAINER_MultiHashMap *
1826 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1827 struct Slave *slv = NULL;
1828 struct Channel *chn;
1830 if (NULL != chn_slv)
1832 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1836 slv = GNUNET_malloc (sizeof (*slv));
1837 slv->priv_key = req->slave_key;
1838 slv->pub_key = slv_pub_key;
1839 slv->pub_key_hash = slv_pub_hash;
1840 slv->origin = req->origin;
1841 slv->relay_count = ntohl (req->relay_count);
1842 slv->join_flags = ntohl (req->flags);
1844 const struct GNUNET_PeerIdentity *
1845 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1846 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1847 uint16_t join_msg_size = 0;
1849 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1852 struct GNUNET_PSYC_Message *
1853 join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1854 join_msg_size = ntohs (join_msg->header.size);
1855 slv->join_msg = GNUNET_malloc (join_msg_size);
1856 GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1858 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1860 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1861 "%u + %u + %u != %u\n",
1862 (unsigned int) sizeof (*req),
1867 GNUNET_SERVICE_client_drop (client);
1871 if (0 < slv->relay_count)
1873 slv->relays = GNUNET_malloc (relay_size);
1874 GNUNET_memcpy (slv->relays, &req[1], relay_size);
1877 chn = c->channel = &slv->channel;
1879 chn->is_master = GNUNET_NO;
1880 chn->pub_key = req->channel_pub_key;
1881 chn->pub_key_hash = pub_key_hash;
1884 if (NULL == chn_slv)
1886 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1887 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1888 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1890 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1891 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1892 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1893 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1894 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1895 &store_recv_slave_counters, slv);
1899 chn = &slv->channel;
1901 struct GNUNET_PSYC_CountersResultMessage *res;
1903 struct GNUNET_MQ_Envelope *
1904 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1905 res->result_code = htonl (GNUNET_OK);
1906 res->max_message_id = GNUNET_htonll (chn->max_message_id);
1908 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1910 if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1912 mcast_recv_join_decision (slv, GNUNET_YES,
1913 NULL, 0, NULL, NULL);
1915 else if (NULL == slv->member)
1918 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1920 slv->relay_count, slv->relays,
1921 &slv->join_msg->header,
1922 &mcast_recv_join_request,
1923 &mcast_recv_join_decision,
1924 &mcast_recv_replay_fragment,
1925 &mcast_recv_replay_message,
1926 &mcast_recv_message, chn);
1927 if (NULL != slv->join_msg)
1929 GNUNET_free (slv->join_msg);
1930 slv->join_msg = NULL;
1933 else if (NULL != slv->join_dcsn)
1935 struct GNUNET_MQ_Envelope *
1936 env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
1937 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1941 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1942 "Client %p connected as slave to channel %s.\n",
1944 GNUNET_h2s (&chn->pub_key_hash));
1946 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1947 cli->client = client;
1948 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1950 GNUNET_SERVICE_client_continue (client);
1954 struct JoinDecisionClosure
1956 int32_t is_admitted;
1957 struct GNUNET_MessageHeader *msg;
1962 * Iterator callback for sending join decisions to multicast.
1965 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1968 struct JoinDecisionClosure *jcls = cls;
1969 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1970 // FIXME: add relays
1971 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1977 check_client_join_decision (void *cls,
1978 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1985 * Join decision from client.
1988 handle_client_join_decision (void *cls,
1989 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1991 struct Client *c = cls;
1992 struct GNUNET_SERVICE_Client *client = c->client;
1993 struct Channel *chn = c->channel;
1997 GNUNET_SERVICE_client_drop (client);
2000 GNUNET_assert (GNUNET_YES == chn->is_master);
2001 struct Master *mst = chn->master;
2003 struct JoinDecisionClosure jcls;
2004 jcls.is_admitted = ntohl (dcsn->is_admitted);
2006 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
2007 ? (struct GNUNET_MessageHeader *) &dcsn[1]
2010 struct GNUNET_HashCode slave_pub_hash;
2011 GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
2014 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2015 "%p Got join decision (%d) from client for channel %s..\n",
2016 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
2017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2018 "%p ..and slave %s.\n",
2019 mst, GNUNET_h2s (&slave_pub_hash));
2021 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
2022 &mcast_send_join_decision, &jcls);
2023 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
2024 GNUNET_SERVICE_client_continue (client);
2029 channel_part_cb (void *cls)
2031 struct GNUNET_SERVICE_Client *client = cls;
2032 struct GNUNET_MQ_Envelope *env;
2034 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_ACK);
2035 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
2041 handle_client_part_request (void *cls,
2042 const struct GNUNET_MessageHeader *msg)
2044 struct Client *c = cls;
2046 c->channel->is_disconnecting = GNUNET_YES;
2047 if (GNUNET_YES == c->channel->is_master)
2049 struct Master *mst = (struct Master *) c->channel;
2051 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2052 "Got part request from master %p\n",
2054 GNUNET_assert (NULL != mst->origin);
2055 GNUNET_MULTICAST_origin_stop (mst->origin, channel_part_cb, c->client);
2059 struct Slave *slv = (struct Slave *) c->channel;
2061 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2062 "Got part request from slave %p\n",
2064 GNUNET_assert (NULL != slv->member);
2065 GNUNET_MULTICAST_member_part (slv->member, channel_part_cb, c->client);
2067 GNUNET_SERVICE_client_continue (c->client);
2072 * Send acknowledgement to a client.
2074 * Sent after a message fragment has been passed on to multicast.
2076 * @param chn The channel struct for the client.
2079 send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2081 struct GNUNET_MessageHeader *res;
2082 struct GNUNET_MQ_Envelope *
2083 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
2086 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
2091 * Callback for the transmit functions of multicast.
2094 transmit_notify (void *cls, size_t *data_size, void *data)
2096 struct Channel *chn = cls;
2097 struct TransmitMessage *tmit_msg = chn->tmit_head;
2099 if (NULL == tmit_msg || *data_size < tmit_msg->size)
2101 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2102 "%p transmit_notify: nothing to send.\n", chn);
2103 if (NULL != tmit_msg && *data_size < tmit_msg->size)
2109 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2110 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2112 *data_size = tmit_msg->size;
2113 GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2116 = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2120 /* FIXME: handle disconnecting clients */
2121 if (NULL != tmit_msg->client)
2122 send_message_ack (chn, tmit_msg->client);
2124 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2126 if (NULL != chn->tmit_head)
2128 GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2130 else if (GNUNET_YES == chn->is_disconnecting
2131 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2133 /* FIXME: handle partial message (when still in_transmit) */
2134 GNUNET_free (tmit_msg);
2135 return GNUNET_SYSERR;
2137 GNUNET_free (tmit_msg);
2143 * Callback for the transmit functions of multicast.
2146 master_transmit_notify (void *cls, size_t *data_size, void *data)
2148 int ret = transmit_notify (cls, data_size, data);
2150 if (GNUNET_YES == ret)
2152 struct Master *mst = cls;
2153 mst->tmit_handle = NULL;
2160 * Callback for the transmit functions of multicast.
2163 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2165 int ret = transmit_notify (cls, data_size, data);
2167 if (GNUNET_YES == ret)
2169 struct Slave *slv = cls;
2170 slv->tmit_handle = NULL;
2177 * Transmit a message from a channel master to the multicast group.
2180 master_transmit_message (struct Master *mst)
2182 struct Channel *chn = &mst->channel;
2183 struct TransmitMessage *tmit_msg = chn->tmit_head;
2184 if (NULL == tmit_msg)
2186 if (NULL == mst->tmit_handle)
2188 mst->tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin,
2190 mst->max_group_generation,
2191 &master_transmit_notify,
2196 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2202 * Transmit a message from a channel slave to the multicast group.
2205 slave_transmit_message (struct Slave *slv)
2207 if (NULL == slv->channel.tmit_head)
2209 if (NULL == slv->tmit_handle)
2211 slv->tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member,
2212 slv->channel.tmit_head->id,
2213 &slave_transmit_notify,
2218 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2224 transmit_message (struct Channel *chn)
2227 ? master_transmit_message (chn->master)
2228 : slave_transmit_message (chn->slave);
2233 * Queue a message from a channel master for sending to the multicast group.
2236 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2238 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2240 tmit_msg->id = ++mst->max_message_id;
2241 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2242 "%p master_queue_message: message_id=%" PRIu64 "\n",
2244 struct GNUNET_PSYC_MessageMethod *pmeth
2245 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2247 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2249 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2251 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2253 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2254 "%p master_queue_message: state_delta=%" PRIu64 "\n",
2255 mst, tmit_msg->id - mst->max_state_message_id);
2256 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2257 - mst->max_state_message_id);
2258 mst->max_state_message_id = tmit_msg->id;
2262 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2263 "%p master_queue_message: state not modified\n", mst);
2264 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2267 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2269 /// @todo add state_hash to PSYC header
2276 * Queue a message from a channel slave for sending to the multicast group.
2279 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2281 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2283 struct GNUNET_PSYC_MessageMethod *pmeth
2284 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2285 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2286 tmit_msg->id = ++slv->max_request_id;
2292 * Queue PSYC message parts for sending to multicast.
2295 * Channel to send to.
2297 * Client the message originates from.
2301 * Concatenated message parts.
2302 * @param first_ptype
2303 * First message part type in @a data.
2305 * Last message part type in @a data.
2307 static struct TransmitMessage *
2308 queue_message (struct Channel *chn,
2309 struct GNUNET_SERVICE_Client *client,
2312 uint16_t first_ptype, uint16_t last_ptype)
2314 struct TransmitMessage *
2315 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2316 GNUNET_memcpy (&tmit_msg[1], data, data_size);
2317 tmit_msg->client = client;
2318 tmit_msg->size = data_size;
2319 tmit_msg->first_ptype = first_ptype;
2320 tmit_msg->last_ptype = last_ptype;
2322 /* FIXME: separate queue per message ID */
2324 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2327 ? master_queue_message (chn->master, tmit_msg)
2328 : slave_queue_message (chn->slave, tmit_msg);
2334 * Cancel transmission of current message.
2336 * @param chn Channel to send to.
2337 * @param client Client the message originates from.
2340 transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2342 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2344 struct GNUNET_MessageHeader msg;
2345 msg.size = htons (sizeof (msg));
2346 msg.type = htons (type);
2348 queue_message (chn, client, sizeof (msg), &msg, type, type);
2349 transmit_message (chn);
2351 /* FIXME: cleanup */
2356 check_client_psyc_message (void *cls,
2357 const struct GNUNET_MessageHeader *msg)
2364 * Incoming message from a master or slave client.
2367 handle_client_psyc_message (void *cls,
2368 const struct GNUNET_MessageHeader *msg)
2370 struct Client *c = cls;
2371 struct GNUNET_SERVICE_Client *client = c->client;
2372 struct Channel *chn = c->channel;
2376 GNUNET_SERVICE_client_drop (client);
2380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2381 "%p Received message from client.\n", chn);
2382 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2384 if (GNUNET_YES != chn->is_ready)
2386 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2387 "%p Channel is not ready yet, disconnecting client %p.\n",
2391 GNUNET_SERVICE_client_drop (client);
2395 uint16_t size = ntohs (msg->size);
2396 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2398 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2399 "%p Message payload too large: %u < %u.\n",
2401 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2402 (unsigned int) (size - sizeof (*msg)));
2404 transmit_cancel (chn, client);
2405 GNUNET_SERVICE_client_drop (client);
2409 uint16_t first_ptype = 0, last_ptype = 0;
2411 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2412 (const char *) &msg[1],
2413 &first_ptype, &last_ptype))
2415 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2416 "%p Received invalid message part from client.\n", chn);
2418 transmit_cancel (chn, client);
2419 GNUNET_SERVICE_client_drop (client);
2422 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2423 "%p Received message with first part type %u and last part type %u.\n",
2424 chn, first_ptype, last_ptype);
2426 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2427 first_ptype, last_ptype);
2428 transmit_message (chn);
2429 /* FIXME: send a few ACKs even before transmit_notify is called */
2431 GNUNET_SERVICE_client_continue (client);
2436 * Received result of GNUNET_PSYCSTORE_membership_store()
2439 store_recv_membership_store_result (void *cls,
2441 const char *err_msg,
2442 uint16_t err_msg_size)
2444 struct Operation *op = cls;
2445 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2446 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2452 if (NULL != op->client)
2453 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2459 * Client requests to add/remove a slave in the membership database.
2462 handle_client_membership_store (void *cls,
2463 const struct ChannelMembershipStoreRequest *req)
2465 struct Client *c = cls;
2466 struct GNUNET_SERVICE_Client *client = c->client;
2467 struct Channel *chn = c->channel;
2471 GNUNET_SERVICE_client_drop (client);
2475 struct Operation *op = op_add (chn, client, req->op_id, 0);
2477 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2478 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2479 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2480 "%p Received membership store request from client.\n", chn);
2481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2482 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2483 chn, req->did_join, announced_at, effective_since);
2485 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2486 req->did_join, announced_at, effective_since,
2487 0, /* FIXME: group_generation */
2488 &store_recv_membership_store_result, op);
2489 GNUNET_SERVICE_client_continue (client);
2494 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2495 * in response to a history request from a client.
2498 store_recv_fragment_history (void *cls,
2499 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2500 enum GNUNET_PSYCSTORE_MessageFlags flags)
2502 struct Operation *op = cls;
2503 if (NULL == op->client)
2504 { /* Requesting client already disconnected. */
2507 struct Channel *chn = op->channel;
2509 struct GNUNET_PSYC_MessageHeader *pmsg;
2510 uint16_t msize = ntohs (mmsg->header.size);
2511 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2513 struct GNUNET_OperationResultMessage *
2514 res = GNUNET_malloc (sizeof (*res) + psize);
2515 res->header.size = htons (sizeof (*res) + psize);
2516 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2517 res->op_id = op->op_id;
2518 res->result_code = GNUNET_htonll (GNUNET_OK);
2520 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2521 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2522 GNUNET_memcpy (&res[1], pmsg, psize);
2524 /** @todo FIXME: send only to requesting client */
2525 client_send_msg (chn, &res->header);
2533 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2534 * in response to a history request from a client.
2537 store_recv_fragment_history_result (void *cls, int64_t result,
2538 const char *err_msg, uint16_t err_msg_size)
2540 struct Operation *op = cls;
2541 if (NULL == op->client)
2542 { /* Requesting client already disconnected. */
2546 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2547 "%p History replay #%" PRIu64 ": "
2548 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2549 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2551 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2553 /** @todo Multicast replay request for messages not found locally. */
2556 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2562 check_client_history_replay (void *cls,
2563 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2570 * Client requests channel history.
2573 handle_client_history_replay (void *cls,
2574 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2576 struct Client *c = cls;
2577 struct GNUNET_SERVICE_Client *client = c->client;
2578 struct Channel *chn = c->channel;
2582 GNUNET_SERVICE_client_drop (client);
2586 uint16_t size = ntohs (req->header.size);
2587 const char *method_prefix = (const char *) &req[1];
2589 if (size < sizeof (*req) + 1
2590 || '\0' != method_prefix[size - sizeof (*req) - 1])
2592 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2593 "%p History replay #%" PRIu64 ": "
2594 "invalid method prefix. size: %u < %u?\n",
2596 GNUNET_ntohll (req->op_id),
2598 (unsigned int) sizeof (*req) + 1);
2600 GNUNET_SERVICE_client_drop (client);
2604 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2606 if (0 == req->message_limit)
2608 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2609 GNUNET_ntohll (req->start_message_id),
2610 GNUNET_ntohll (req->end_message_id),
2612 &store_recv_fragment_history,
2613 &store_recv_fragment_history_result, op);
2617 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2618 GNUNET_ntohll (req->message_limit),
2620 &store_recv_fragment_history,
2621 &store_recv_fragment_history_result,
2624 GNUNET_SERVICE_client_continue (client);
2629 * Received state var from PSYCstore, send it to client.
2632 store_recv_state_var (void *cls, const char *name,
2633 const void *value, uint32_t value_size)
2635 struct Operation *op = cls;
2636 struct GNUNET_OperationResultMessage *res;
2637 struct GNUNET_MQ_Envelope *env;
2639 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2640 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2641 op->channel, GNUNET_ntohll (op->op_id), name);
2643 if (NULL != name) /* First part */
2645 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2646 struct GNUNET_PSYC_MessageModifier *mod;
2647 env = GNUNET_MQ_msg_extra (res,
2648 sizeof (*mod) + name_size + value_size,
2649 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2650 res->op_id = op->op_id;
2652 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2653 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2654 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2655 mod->name_size = htons (name_size);
2656 mod->value_size = htonl (value_size);
2657 mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2658 GNUNET_memcpy (&mod[1], name, name_size);
2659 GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2661 else /* Continuation */
2663 struct GNUNET_MessageHeader *mod;
2664 env = GNUNET_MQ_msg_extra (res,
2665 sizeof (*mod) + value_size,
2666 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2667 res->op_id = op->op_id;
2669 mod = (struct GNUNET_MessageHeader *) &res[1];
2670 mod->size = htons (sizeof (*mod) + value_size);
2671 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2672 GNUNET_memcpy (&mod[1], value, value_size);
2675 // FIXME: client might have been disconnected
2676 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
2682 * Received result of GNUNET_PSYCSTORE_state_get()
2683 * or GNUNET_PSYCSTORE_state_get_prefix()
2686 store_recv_state_result (void *cls, int64_t result,
2687 const char *err_msg, uint16_t err_msg_size)
2689 struct Operation *op = cls;
2690 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2691 "%p state_get #%" PRIu64 ": "
2692 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2693 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2695 // FIXME: client might have been disconnected
2696 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2702 check_client_state_get (void *cls,
2703 const struct StateRequest *req)
2705 struct Client *c = cls;
2706 struct Channel *chn = c->channel;
2710 return GNUNET_SYSERR;
2713 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2714 const char *name = (const char *) &req[1];
2715 if (0 == name_size || '\0' != name[name_size - 1])
2718 return GNUNET_SYSERR;
2726 * Client requests best matching state variable from PSYCstore.
2729 handle_client_state_get (void *cls,
2730 const struct StateRequest *req)
2732 struct Client *c = cls;
2733 struct GNUNET_SERVICE_Client *client = c->client;
2734 struct Channel *chn = c->channel;
2736 const char *name = (const char *) &req[1];
2737 struct Operation *op = op_add (chn, client, req->op_id, 0);
2738 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2739 &store_recv_state_var,
2740 &store_recv_state_result, op);
2741 GNUNET_SERVICE_client_continue (client);
2746 check_client_state_get_prefix (void *cls,
2747 const struct StateRequest *req)
2749 struct Client *c = cls;
2750 struct Channel *chn = c->channel;
2754 return GNUNET_SYSERR;
2757 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2758 const char *name = (const char *) &req[1];
2759 if (0 == name_size || '\0' != name[name_size - 1])
2762 return GNUNET_SYSERR;
2770 * Client requests state variables with a given prefix from PSYCstore.
2773 handle_client_state_get_prefix (void *cls,
2774 const struct StateRequest *req)
2776 struct Client *c = cls;
2777 struct GNUNET_SERVICE_Client *client = c->client;
2778 struct Channel *chn = c->channel;
2780 const char *name = (const char *) &req[1];
2781 struct Operation *op = op_add (chn, client, req->op_id, 0);
2782 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2783 &store_recv_state_var,
2784 &store_recv_state_result, op);
2785 GNUNET_SERVICE_client_continue (client);
2790 * Initialize the PSYC service.
2792 * @param cls Closure.
2793 * @param server The initialized server.
2794 * @param c Configuration to use.
2798 const struct GNUNET_CONFIGURATION_Handle *c,
2799 struct GNUNET_SERVICE_Handle *svc)
2803 store = GNUNET_PSYCSTORE_connect (cfg);
2804 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2805 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2806 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2807 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2808 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2809 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2814 * Define "main" method using service macro.
2818 GNUNET_SERVICE_OPTION_NONE,
2820 &client_notify_connect,
2821 &client_notify_disconnect,
2823 GNUNET_MQ_hd_fixed_size (client_master_start,
2824 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
2825 struct MasterStartRequest,
2827 GNUNET_MQ_hd_var_size (client_slave_join,
2828 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
2829 struct SlaveJoinRequest,
2831 GNUNET_MQ_hd_var_size (client_join_decision,
2832 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2833 struct GNUNET_PSYC_JoinDecisionMessage,
2835 GNUNET_MQ_hd_fixed_size (client_part_request,
2836 GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST,
2837 struct GNUNET_MessageHeader,
2839 GNUNET_MQ_hd_var_size (client_psyc_message,
2840 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2841 struct GNUNET_MessageHeader,
2843 GNUNET_MQ_hd_fixed_size (client_membership_store,
2844 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
2845 struct ChannelMembershipStoreRequest,
2847 GNUNET_MQ_hd_var_size (client_history_replay,
2848 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
2849 struct GNUNET_PSYC_HistoryRequestMessage,
2851 GNUNET_MQ_hd_var_size (client_state_get,
2852 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
2853 struct StateRequest,
2855 GNUNET_MQ_hd_var_size (client_state_get_prefix,
2856 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2857 struct StateRequest,
2860 /* end of gnunet-service-psyc.c */