2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * @file psyc/gnunet-service-psyc.c
22 * @author Gabor X Toth
28 #include "gnunet_util_lib.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_psycstore_service.h"
34 #include "gnunet_psyc_service.h"
35 #include "gnunet_psyc_util_lib.h"
40 * Handle to our current configuration.
42 static const struct GNUNET_CONFIGURATION_Handle *cfg;
47 static struct GNUNET_SERVICE_Handle *service;
50 * Handle to the statistics service.
52 static struct GNUNET_STATISTICS_Handle *stats;
55 * Handle to the PSYCstore.
57 static struct GNUNET_PSYCSTORE_Handle *store;
60 * All connected masters.
61 * Channel's pub_key_hash -> struct Master
63 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66 * All connected slaves.
67 * Channel's pub_key_hash -> struct Slave
69 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72 * Connected slaves per channel.
73 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
75 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
79 * Message in the transmission queue.
81 struct TransmitMessage
83 struct TransmitMessage *prev;
84 struct TransmitMessage *next;
86 struct GNUNET_SERVICE_Client *client;
89 * ID assigned to the message.
99 * Type of first message part.
101 uint16_t first_ptype;
104 * Type of last message part.
108 /* Followed by message */
113 * Cache for received message fragments.
114 * Message fragments are only sent to clients after all modifiers arrived.
116 * chan_key -> MultiHashMap chan_msgs
118 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
122 * Entry in the chan_msgs hashmap of @a recv_cache:
123 * fragment_id -> RecvCacheEntry
125 struct RecvCacheEntry
127 struct GNUNET_MULTICAST_MessageHeader *mmsg;
133 * Entry in the @a recv_frags hash map of a @a Channel.
134 * message_id -> FragmentQueue
139 * Fragment IDs stored in @a recv_cache.
141 struct GNUNET_CONTAINER_Heap *fragments;
144 * Total size of received fragments.
149 * Total size of received header fragments (METHOD & MODIFIERs)
151 uint64_t header_size;
154 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
156 uint64_t state_delta;
159 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
164 * Receive state of message.
166 * @see MessageFragmentState
171 * Whether the state is already modified in PSYCstore.
173 uint8_t state_is_modified;
176 * Is the message queued for delivery to the client?
177 * i.e. added to the recv_msgs queue
184 * List of connected clients.
188 struct ClientList *prev;
189 struct ClientList *next;
191 struct GNUNET_SERVICE_Client *client;
197 struct Operation *prev;
198 struct Operation *next;
200 struct GNUNET_SERVICE_Client *client;
201 struct Channel *channel;
208 * Common part of the client context for both a channel master and slave.
212 struct ClientList *clients_head;
213 struct ClientList *clients_tail;
215 struct Operation *op_head;
216 struct Operation *op_tail;
218 struct TransmitMessage *tmit_head;
219 struct TransmitMessage *tmit_tail;
222 * Current PSYCstore operation.
224 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
227 * Received fragments not yet sent to the client.
228 * message_id -> FragmentQueue
230 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
233 * Received message IDs not yet sent to the client.
235 struct GNUNET_CONTAINER_Heap *recv_msgs;
238 * Public key of the channel.
240 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
243 * Hash of @a pub_key.
245 struct GNUNET_HashCode pub_key_hash;
248 * Last message ID sent to the client.
249 * 0 if there is no such message.
251 uint64_t max_message_id;
254 * ID of the last stateful message, where the state operations has been
255 * processed and saved to PSYCstore and which has been sent to the client.
256 * 0 if there is no such message.
258 uint64_t max_state_message_id;
261 * Expected value size for the modifier being received from the PSYC service.
263 uint32_t tmit_mod_value_size_expected;
266 * Actual value size for the modifier being received from the PSYC service.
268 uint32_t tmit_mod_value_size;
271 * Is this channel ready to receive messages from client?
272 * #GNUNET_YES or #GNUNET_NO
277 * Is the client disconnected?
278 * #GNUNET_YES or #GNUNET_NO
280 uint8_t is_disconnecting;
283 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
288 struct Master *master;
295 * Client context for a channel master.
300 * Channel struct common for Master and Slave
302 struct Channel channel;
305 * Private key of the channel.
307 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
310 * Handle for the multicast origin.
312 struct GNUNET_MULTICAST_Origin *origin;
315 * Transmit handle for multicast.
317 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
320 * Incoming join requests from multicast.
321 * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
323 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
326 * Last message ID transmitted to this channel.
328 * Incremented before sending a message, thus the message_id in messages sent
331 uint64_t max_message_id;
334 * ID of the last message with state operations transmitted to the channel.
335 * 0 if there is no such message.
337 uint64_t max_state_message_id;
340 * Maximum group generation transmitted to the channel.
342 uint64_t max_group_generation;
345 * @see enum GNUNET_PSYC_Policy
347 enum GNUNET_PSYC_Policy policy;
352 * Client context for a channel slave.
357 * Channel struct common for Master and Slave
359 struct Channel channel;
362 * Private key of the slave.
364 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
367 * Public key of the slave.
369 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
372 * Hash of @a pub_key.
374 struct GNUNET_HashCode pub_key_hash;
377 * Handle for the multicast member.
379 struct GNUNET_MULTICAST_Member *member;
382 * Transmit handle for multicast.
384 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
387 * Peer identity of the origin.
389 struct GNUNET_PeerIdentity origin;
392 * Number of items in @a relays.
394 uint32_t relay_count;
397 * Relays that multicast can use to connect.
399 struct GNUNET_PeerIdentity *relays;
402 * Join request to be transmitted to the master on join.
404 struct GNUNET_PSYC_Message *join_msg;
407 * Join decision received from multicast.
409 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
412 * Maximum request ID for this channel.
414 uint64_t max_request_id;
419 enum GNUNET_PSYC_SlaveJoinFlags join_flags;
427 struct GNUNET_SERVICE_Client *client;
428 struct Channel *channel;
432 struct ReplayRequestKey
434 uint64_t fragment_id;
436 uint64_t fragment_offset;
442 transmit_message (struct Channel *chn);
445 message_queue_run (struct Channel *chn);
448 message_queue_drop (struct Channel *chn);
452 schedule_transmit_message (void *cls)
454 struct Channel *chn = cls;
456 transmit_message (chn);
461 * Task run during shutdown.
466 shutdown_task (void *cls)
468 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
469 "shutting down...\n");
470 GNUNET_PSYCSTORE_disconnect (store);
473 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
479 static struct Operation *
480 op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
481 uint64_t op_id, uint32_t flags)
483 struct Operation *op = GNUNET_malloc (sizeof (*op));
488 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
494 op_remove (struct Operation *op)
496 GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
502 * Clean up master data structures after a client disconnected.
505 cleanup_master (struct Master *mst)
507 struct Channel *chn = &mst->channel;
509 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
510 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
515 * Clean up slave data structures after a client disconnected.
518 cleanup_slave (struct Slave *slv)
520 struct Channel *chn = &slv->channel;
521 struct GNUNET_CONTAINER_MultiHashMap *
522 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
524 GNUNET_assert (NULL != chn_slv);
525 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
527 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
529 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
531 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
533 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
535 if (NULL != slv->join_msg)
537 GNUNET_free (slv->join_msg);
538 slv->join_msg = NULL;
540 if (NULL != slv->relays)
542 GNUNET_free (slv->relays);
545 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
550 * Clean up channel data structures after a client disconnected.
553 cleanup_channel (struct Channel *chn)
555 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
556 "%p Cleaning up channel %s. master? %u\n",
558 GNUNET_h2s (&chn->pub_key_hash),
560 message_queue_drop (chn);
561 GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
562 chn->recv_frags = NULL;
564 if (NULL != chn->store_op)
566 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
567 chn->store_op = NULL;
570 (GNUNET_YES == chn->is_master)
571 ? cleanup_master (chn->master)
572 : cleanup_slave (chn->slave);
578 * Called whenever a client is disconnected.
579 * Frees our resources associated with that client.
582 * @param client identification of the client
583 * @param app_ctx must match @a client
586 client_notify_disconnect (void *cls,
587 struct GNUNET_SERVICE_Client *client,
590 struct Client *c = app_ctx;
591 struct Channel *chn = c->channel;
596 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
597 "%p User context is NULL in client_notify_disconnect ()\n",
603 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
604 "%p Client %p (%s) disconnected from channel %s\n",
607 (GNUNET_YES == chn->is_master) ? "master" : "slave",
608 GNUNET_h2s (&chn->pub_key_hash));
610 struct ClientList *cli = chn->clients_head;
613 if (cli->client == client)
615 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
622 struct Operation *op = chn->op_head;
625 if (op->client == client)
633 if (NULL == chn->clients_head)
634 { /* Last client disconnected. */
635 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
636 "%p Last client (%s) disconnected from channel %s\n",
638 (GNUNET_YES == chn->is_master) ? "master" : "slave",
639 GNUNET_h2s (&chn->pub_key_hash));
640 chn->is_disconnecting = GNUNET_YES;
641 cleanup_channel (chn);
647 * A new client connected.
650 * @param client client to add
651 * @param mq message queue for @a client
655 client_notify_connect (void *cls,
656 struct GNUNET_SERVICE_Client *client,
657 struct GNUNET_MQ_Handle *mq)
659 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
661 struct Client *c = GNUNET_malloc (sizeof (*c));
669 * Send message to all clients connected to the channel.
672 client_send_msg (const struct Channel *chn,
673 const struct GNUNET_MessageHeader *msg)
675 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676 "Sending message to clients of channel %p.\n",
679 struct ClientList *cli = chn->clients_head;
682 struct GNUNET_MQ_Envelope *
683 env = GNUNET_MQ_msg_copy (msg);
685 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
693 * Send a result code back to the client.
696 * Client that should receive the result code.
700 * Operation ID in network byte order.
702 * Data payload or NULL.
707 client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
708 int64_t result_code, const void *data, uint16_t data_size)
710 struct GNUNET_OperationResultMessage *res;
711 struct GNUNET_MQ_Envelope *
712 env = GNUNET_MQ_msg_extra (res,
714 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
715 res->result_code = GNUNET_htonll (result_code);
718 GNUNET_memcpy (&res[1], data, data_size);
720 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
721 "%p Sending result to client for OP ID %" PRIu64 ": %" PRId64 " (size: %u)\n",
723 GNUNET_ntohll (op_id),
727 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
732 * Closure for join_mem_test_cb()
734 struct JoinMemTestClosure
736 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
737 struct Channel *channel;
738 struct GNUNET_MULTICAST_JoinHandle *join_handle;
739 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
744 * Membership test result callback used for join requests.
747 join_mem_test_cb (void *cls, int64_t result,
748 const char *err_msg, uint16_t err_msg_size)
750 struct JoinMemTestClosure *jcls = cls;
752 if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
753 { /* Pass on join request to client if this is a master channel */
754 struct Master *mst = jcls->channel->master;
755 struct GNUNET_HashCode slave_pub_hash;
756 GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
758 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
759 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
760 client_send_msg (jcls->channel, &jcls->join_msg->header);
764 if (GNUNET_SYSERR == result)
766 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
767 "Could not perform membership test (%.*s)\n",
768 err_msg_size, err_msg);
771 GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
773 GNUNET_free (jcls->join_msg);
779 * Incoming join request from multicast.
782 mcast_recv_join_request (void *cls,
783 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
784 const struct GNUNET_MessageHeader *join_msg,
785 struct GNUNET_MULTICAST_JoinHandle *jh)
787 struct Channel *chn = cls;
788 uint16_t join_msg_size = 0;
790 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791 "%p Got join request.\n",
793 if (NULL != join_msg)
795 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
797 join_msg_size = ntohs (join_msg->size);
801 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
802 "%p Got join message with invalid type %u.\n",
804 ntohs (join_msg->type));
808 struct GNUNET_PSYC_JoinRequestMessage *
809 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
810 req->header.size = htons (sizeof (*req) + join_msg_size);
811 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
812 req->slave_pub_key = *slave_pub_key;
813 if (0 < join_msg_size)
814 GNUNET_memcpy (&req[1], join_msg, join_msg_size);
816 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
817 jcls->slave_pub_key = *slave_pub_key;
819 jcls->join_handle = jh;
820 jcls->join_msg = req;
822 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
823 chn->max_message_id, 0,
824 &join_mem_test_cb, jcls);
829 * Join decision received from multicast.
832 mcast_recv_join_decision (void *cls, int is_admitted,
833 const struct GNUNET_PeerIdentity *peer,
834 uint16_t relay_count,
835 const struct GNUNET_PeerIdentity *relays,
836 const struct GNUNET_MessageHeader *join_resp)
838 struct Slave *slv = cls;
839 struct Channel *chn = &slv->channel;
840 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
841 "%p Got join decision: %d\n",
844 if (GNUNET_YES == chn->is_ready)
846 /* Already admitted */
850 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
851 struct GNUNET_PSYC_JoinDecisionMessage *
852 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
853 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
854 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
855 dcsn->is_admitted = htonl (is_admitted);
856 if (0 < join_resp_size)
857 GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
859 client_send_msg (chn, &dcsn->header);
861 if (GNUNET_YES == is_admitted
862 && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
864 chn->is_ready = GNUNET_YES;
870 store_recv_fragment_replay (void *cls,
871 struct GNUNET_MULTICAST_MessageHeader *msg,
872 enum GNUNET_PSYCSTORE_MessageFlags flags)
874 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
876 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
882 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
885 store_recv_fragment_replay_result (void *cls,
888 uint16_t err_msg_size)
890 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
892 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
893 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
904 GNUNET_MULTICAST_replay_response (rh, NULL,
905 GNUNET_MULTICAST_REC_NOT_FOUND);
908 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
909 GNUNET_MULTICAST_replay_response (rh, NULL,
910 GNUNET_MULTICAST_REC_ACCESS_DENIED);
914 GNUNET_MULTICAST_replay_response (rh, NULL,
915 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
918 /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
919 * an error code, so it must be ensured no further processing
920 * is attempted on 'rh'. Maybe this should be refactored as
921 * it doesn't look very intuitive. --lynX
923 GNUNET_MULTICAST_replay_response_end (rh);
928 * Incoming fragment replay request from multicast.
931 mcast_recv_replay_fragment (void *cls,
932 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
933 uint64_t fragment_id, uint64_t flags,
934 struct GNUNET_MULTICAST_ReplayHandle *rh)
937 struct Channel *chn = cls;
938 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
939 fragment_id, fragment_id,
940 &store_recv_fragment_replay,
941 &store_recv_fragment_replay_result, rh);
946 * Incoming message replay request from multicast.
949 mcast_recv_replay_message (void *cls,
950 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
952 uint64_t fragment_offset,
954 struct GNUNET_MULTICAST_ReplayHandle *rh)
956 struct Channel *chn = cls;
957 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
958 message_id, message_id, 1, NULL,
959 &store_recv_fragment_replay,
960 &store_recv_fragment_replay_result, rh);
965 * Convert an uint64_t in network byte order to a HashCode
966 * that can be used as key in a MultiHashMap
969 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
971 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
972 /* TODO: use built-in byte swap functions if available */
974 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
975 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
977 *key = (struct GNUNET_HashCode) {};
979 = (n << 32) | (n >> 32);
984 * Convert an uint64_t in host byte order to a HashCode
985 * that can be used as key in a MultiHashMap
988 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
990 #if __BYTE_ORDER == __BIG_ENDIAN
991 hash_key_from_nll (key, n);
992 #elif __BYTE_ORDER == __LITTLE_ENDIAN
993 *key = (struct GNUNET_HashCode) {};
994 *((uint64_t *) key) = n;
996 #error byteorder undefined
1002 * Initialize PSYC message header.
1005 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1006 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1008 uint16_t size = ntohs (mmsg->header.size);
1009 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1011 pmsg->header.size = htons (psize);
1012 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1013 pmsg->message_id = mmsg->message_id;
1014 pmsg->fragment_offset = mmsg->fragment_offset;
1015 pmsg->flags = htonl (flags);
1017 GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1022 * Create a new PSYC message from a multicast message for sending it to clients.
1024 static inline struct GNUNET_PSYC_MessageHeader *
1025 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1027 struct GNUNET_PSYC_MessageHeader *pmsg;
1028 uint16_t size = ntohs (mmsg->header.size);
1029 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1031 pmsg = GNUNET_malloc (psize);
1032 psyc_msg_init (pmsg, mmsg, flags);
1038 * Send multicast message to all clients connected to the channel.
1041 client_send_mcast_msg (struct Channel *chn,
1042 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1045 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1046 "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1048 GNUNET_ntohll (mmsg->fragment_id),
1049 GNUNET_ntohll (mmsg->message_id));
1051 struct GNUNET_PSYC_MessageHeader *
1052 pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1053 client_send_msg (chn, &pmsg->header);
1059 * Send multicast request to all clients connected to the channel.
1062 client_send_mcast_req (struct Master *mst,
1063 const struct GNUNET_MULTICAST_RequestHeader *req)
1065 struct Channel *chn = &mst->channel;
1067 struct GNUNET_PSYC_MessageHeader *pmsg;
1068 uint16_t size = ntohs (req->header.size);
1069 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1071 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072 "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1074 GNUNET_ntohll (req->fragment_id),
1075 GNUNET_ntohll (req->request_id));
1077 pmsg = GNUNET_malloc (psize);
1078 pmsg->header.size = htons (psize);
1079 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1080 pmsg->message_id = req->request_id;
1081 pmsg->fragment_offset = req->fragment_offset;
1082 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1083 pmsg->slave_pub_key = req->member_pub_key;
1084 GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1086 client_send_msg (chn, &pmsg->header);
1088 /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1095 * Insert a multicast message fragment into the queue belonging to the message.
1097 * @param chn Channel.
1098 * @param mmsg Multicast message fragment.
1099 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
1100 * @param first_ptype First PSYC message part type in @a mmsg.
1101 * @param last_ptype Last PSYC message part type in @a mmsg.
1104 fragment_queue_insert (struct Channel *chn,
1105 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1106 uint16_t first_ptype, uint16_t last_ptype)
1108 const uint16_t size = ntohs (mmsg->header.size);
1109 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1110 struct GNUNET_CONTAINER_MultiHashMap
1111 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1112 &chn->pub_key_hash);
1114 struct GNUNET_HashCode msg_id_hash;
1115 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1117 struct FragmentQueue
1118 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1122 fragq = GNUNET_malloc (sizeof (*fragq));
1123 fragq->state = MSG_FRAG_STATE_HEADER;
1125 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1127 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1128 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1130 if (NULL == chan_msgs)
1132 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1133 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1134 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1138 struct GNUNET_HashCode frag_id_hash;
1139 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1140 struct RecvCacheEntry
1141 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1142 if (NULL == cache_entry)
1144 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1145 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1147 GNUNET_ntohll (mmsg->message_id),
1148 GNUNET_ntohll (mmsg->fragment_id));
1149 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1150 "%p header_size: %" PRIu64 " + %u\n",
1154 cache_entry = GNUNET_malloc (sizeof (*cache_entry));
1155 cache_entry->ref_count = 1;
1156 cache_entry->mmsg = GNUNET_malloc (size);
1157 GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1158 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1159 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1163 cache_entry->ref_count++;
1164 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1165 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1167 GNUNET_ntohll (mmsg->message_id),
1168 GNUNET_ntohll (mmsg->fragment_id),
1169 cache_entry->ref_count);
1172 if (MSG_FRAG_STATE_HEADER == fragq->state)
1174 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1176 struct GNUNET_PSYC_MessageMethod *
1177 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1178 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1179 fragq->flags = ntohl (pmeth->flags);
1182 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1184 fragq->header_size += size;
1186 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1187 || frag_offset == fragq->header_size)
1188 { /* header is now complete */
1189 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1190 "%p Header of message %" PRIu64 " is complete.\n",
1192 GNUNET_ntohll (mmsg->message_id));
1194 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1195 "%p Adding message %" PRIu64 " to queue.\n",
1197 GNUNET_ntohll (mmsg->message_id));
1198 fragq->state = MSG_FRAG_STATE_DATA;
1202 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1203 "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1205 GNUNET_ntohll (mmsg->message_id),
1207 fragq->header_size);
1213 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1214 if (frag_offset == fragq->size)
1215 fragq->state = MSG_FRAG_STATE_END;
1217 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1218 "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1220 GNUNET_ntohll (mmsg->message_id),
1225 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1226 /* Drop message without delivering to client if it's a single fragment */
1228 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1229 ? MSG_FRAG_STATE_DROP
1230 : MSG_FRAG_STATE_CANCEL;
1233 switch (fragq->state)
1235 case MSG_FRAG_STATE_DATA:
1236 case MSG_FRAG_STATE_END:
1237 case MSG_FRAG_STATE_CANCEL:
1238 if (GNUNET_NO == fragq->is_queued)
1240 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1241 GNUNET_ntohll (mmsg->message_id));
1242 fragq->is_queued = GNUNET_YES;
1246 fragq->size += size;
1247 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1248 GNUNET_ntohll (mmsg->fragment_id));
1253 * Run fragment queue of a message.
1255 * Send fragments of a message in order to client, after all modifiers arrived
1261 * ID of the message @a fragq belongs to.
1263 * Fragment queue of the message.
1265 * Drop message without delivering to client?
1266 * #GNUNET_YES or #GNUNET_NO.
1269 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1270 struct FragmentQueue *fragq, uint8_t drop)
1272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273 "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1278 struct GNUNET_CONTAINER_MultiHashMap
1279 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1280 &chn->pub_key_hash);
1281 GNUNET_assert (NULL != chan_msgs);
1284 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1287 struct GNUNET_HashCode frag_id_hash;
1288 hash_key_from_hll (&frag_id_hash, frag_id);
1289 struct RecvCacheEntry *cache_entry
1290 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1291 if (cache_entry != NULL)
1293 if (GNUNET_NO == drop)
1295 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1297 if (cache_entry->ref_count <= 1)
1299 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1301 GNUNET_free (cache_entry->mmsg);
1302 GNUNET_free (cache_entry);
1306 cache_entry->ref_count--;
1309 #if CACHE_AGING_IMPLEMENTED
1310 else if (GNUNET_NO == drop)
1312 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1316 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1319 if (MSG_FRAG_STATE_END <= fragq->state)
1321 struct GNUNET_HashCode msg_id_hash;
1322 hash_key_from_hll (&msg_id_hash, msg_id);
1324 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1325 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1326 GNUNET_free (fragq);
1330 fragq->is_queued = GNUNET_NO;
1335 struct StateModifyClosure
1337 struct Channel *channel;
1339 struct GNUNET_HashCode msg_id_hash;
1344 store_recv_state_modify_result (void *cls, int64_t result,
1345 const char *err_msg, uint16_t err_msg_size)
1347 struct StateModifyClosure *mcls = cls;
1348 struct Channel *chn = mcls->channel;
1349 uint64_t msg_id = mcls->msg_id;
1351 struct FragmentQueue *
1352 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1354 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1356 chn, result, err_msg_size, err_msg);
1363 fragq->state_is_modified = GNUNET_YES;
1364 if (chn->max_state_message_id < msg_id)
1365 chn->max_state_message_id = msg_id;
1366 if (chn->max_message_id < msg_id)
1367 chn->max_message_id = msg_id;
1370 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1371 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1372 message_queue_run (chn);
1376 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1377 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1378 chn, result, err_msg_size, err_msg);
1379 /** @todo FIXME: handle state_modify error */
1385 * Run message queue.
1387 * Send messages in queue to client in order after a message has arrived from
1388 * multicast, according to the following:
1389 * - A message is only sent if all of its modifiers arrived.
1390 * - A stateful message is only sent if the previous stateful message
1391 * has already been delivered to the client.
1393 * @param chn Channel.
1395 * @return Number of messages removed from queue and sent to client.
1398 message_queue_run (struct Channel *chn)
1400 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1401 "%p Running message queue.\n", chn);
1405 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1408 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1409 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1410 struct GNUNET_HashCode msg_id_hash;
1411 hash_key_from_hll (&msg_id_hash, msg_id);
1413 struct FragmentQueue *
1414 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1416 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1418 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1419 "%p No fragq (%p) or header not complete.\n",
1424 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1425 "%p Fragment queue entry: state: %u, state delta: "
1426 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1427 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1429 if (MSG_FRAG_STATE_DATA <= fragq->state)
1431 /* Check if there's a missing message before the current one */
1432 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1434 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1436 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1437 && (chn->max_message_id != msg_id - 1
1438 && chn->max_message_id != msg_id))
1440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1441 "%p Out of order message. "
1442 "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1443 chn, chn->max_message_id, msg_id);
1445 // FIXME: keep track of messages processed in this queue run,
1446 // and only stop after reaching the end
1451 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1452 if (GNUNET_YES != fragq->state_is_modified)
1454 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1456 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1457 "%p Out of order stateful message. "
1458 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1459 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1461 // FIXME: keep track of messages processed in this queue run,
1462 // and only stop after reaching the end
1465 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1466 mcls->channel = chn;
1467 mcls->msg_id = msg_id;
1468 mcls->msg_id_hash = msg_id_hash;
1470 /* Apply modifiers to state in PSYCstore */
1471 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1473 store_recv_state_modify_result, mcls);
1474 break; // continue after asynchronous state modify result
1477 chn->max_message_id = msg_id;
1479 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1480 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1484 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1485 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1491 * Drop message queue of a channel.
1493 * Remove all messages in queue without sending it to clients.
1495 * @param chn Channel.
1497 * @return Number of messages removed from queue.
1500 message_queue_drop (struct Channel *chn)
1504 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1507 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1508 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1509 struct GNUNET_HashCode msg_id_hash;
1510 hash_key_from_hll (&msg_id_hash, msg_id);
1512 struct FragmentQueue *
1513 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1514 GNUNET_assert (NULL != fragq);
1515 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1516 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1519 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1520 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1526 * Received result of GNUNET_PSYCSTORE_fragment_store().
1529 store_recv_fragment_store_result (void *cls, int64_t result,
1530 const char *err_msg, uint16_t err_msg_size)
1532 struct Channel *chn = cls;
1533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1534 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1535 chn, result, err_msg_size, err_msg);
1540 * Handle incoming message fragment from multicast.
1542 * Store it using PSYCstore and send it to the clients of the channel in order.
1545 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1547 struct Channel *chn = cls;
1548 uint16_t size = ntohs (mmsg->header.size);
1550 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1551 "%p Received multicast message of size %u. "
1552 "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1553 ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1555 GNUNET_ntohll (mmsg->fragment_id),
1556 GNUNET_ntohll (mmsg->message_id),
1557 GNUNET_ntohll (mmsg->fragment_offset),
1558 GNUNET_ntohll (mmsg->flags));
1560 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1561 &store_recv_fragment_store_result, chn);
1563 uint16_t first_ptype = 0, last_ptype = 0;
1564 int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1565 (const char *) &mmsg[1],
1566 &first_ptype, &last_ptype);
1567 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1568 "%p Message check result %d, first part type %u, last part type %u\n",
1569 chn, check, first_ptype, last_ptype);
1570 if (GNUNET_SYSERR == check)
1572 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1573 "%p Dropping incoming multicast message with invalid parts.\n",
1575 GNUNET_break_op (0);
1579 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1580 message_queue_run (chn);
1585 * Incoming request fragment from multicast for a master.
1587 * @param cls Master.
1588 * @param req The request.
1591 mcast_recv_request (void *cls,
1592 const struct GNUNET_MULTICAST_RequestHeader *req)
1594 struct Master *mst = cls;
1595 uint16_t size = ntohs (req->header.size);
1597 char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1599 "%p Received multicast request of size %u from %s.\n",
1603 uint16_t first_ptype = 0, last_ptype = 0;
1605 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1606 (const char *) &req[1],
1607 &first_ptype, &last_ptype))
1609 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1610 "%p Dropping incoming multicast request with invalid parts.\n",
1612 GNUNET_break_op (0);
1616 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617 "Message parts: first: type %u, last: type %u\n",
1618 first_ptype, last_ptype);
1620 /* FIXME: in-order delivery */
1621 client_send_mcast_req (mst, req);
1626 * Response from PSYCstore with the current counter values for a channel master.
1629 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1630 uint64_t max_message_id, uint64_t max_group_generation,
1631 uint64_t max_state_message_id)
1633 struct Master *mst = cls;
1634 struct Channel *chn = &mst->channel;
1635 chn->store_op = NULL;
1637 struct GNUNET_PSYC_CountersResultMessage res;
1638 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1639 res.header.size = htons (sizeof (res));
1640 res.result_code = htonl (result);
1641 res.max_message_id = GNUNET_htonll (max_message_id);
1643 if (GNUNET_OK == result || GNUNET_NO == result)
1645 mst->max_message_id = max_message_id;
1646 chn->max_message_id = max_message_id;
1647 chn->max_state_message_id = max_state_message_id;
1648 mst->max_group_generation = max_group_generation;
1650 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1651 mcast_recv_join_request,
1652 mcast_recv_replay_fragment,
1653 mcast_recv_replay_message,
1655 mcast_recv_message, chn);
1656 chn->is_ready = GNUNET_YES;
1660 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1661 "%p GNUNET_PSYCSTORE_counters_get() "
1662 "returned %d for channel %s.\n",
1663 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1666 client_send_msg (chn, &res.header);
1671 * Response from PSYCstore with the current counter values for a channel slave.
1674 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1675 uint64_t max_message_id, uint64_t max_group_generation,
1676 uint64_t max_state_message_id)
1678 struct Slave *slv = cls;
1679 struct Channel *chn = &slv->channel;
1680 chn->store_op = NULL;
1682 struct GNUNET_PSYC_CountersResultMessage res;
1683 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1684 res.header.size = htons (sizeof (res));
1685 res.result_code = htonl (result);
1686 res.max_message_id = GNUNET_htonll (max_message_id);
1688 if (GNUNET_YES == result || GNUNET_NO == result)
1690 chn->max_message_id = max_message_id;
1691 chn->max_state_message_id = max_state_message_id;
1693 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1695 slv->relay_count, slv->relays,
1696 &slv->join_msg->header,
1697 mcast_recv_join_request,
1698 mcast_recv_join_decision,
1699 mcast_recv_replay_fragment,
1700 mcast_recv_replay_message,
1701 mcast_recv_message, chn);
1702 if (NULL != slv->join_msg)
1704 GNUNET_free (slv->join_msg);
1705 slv->join_msg = NULL;
1710 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1711 "%p GNUNET_PSYCSTORE_counters_get() "
1712 "returned %d for channel %s.\n",
1713 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1716 client_send_msg (chn, &res.header);
1721 channel_init (struct Channel *chn)
1724 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1725 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1730 * Handle a connecting client starting a channel master.
1733 handle_client_master_start (void *cls,
1734 const struct MasterStartRequest *req)
1736 struct Client *c = cls;
1737 struct GNUNET_SERVICE_Client *client = c->client;
1739 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1740 struct GNUNET_HashCode pub_key_hash;
1742 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1743 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1746 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1747 struct Channel *chn;
1751 mst = GNUNET_malloc (sizeof (*mst));
1752 mst->policy = ntohl (req->policy);
1753 mst->priv_key = req->channel_key;
1754 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1756 chn = c->channel = &mst->channel;
1758 chn->is_master = GNUNET_YES;
1759 chn->pub_key = pub_key;
1760 chn->pub_key_hash = pub_key_hash;
1763 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1764 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1765 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1766 store_recv_master_counters, mst);
1770 chn = &mst->channel;
1772 struct GNUNET_PSYC_CountersResultMessage *res;
1773 struct GNUNET_MQ_Envelope *
1774 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1775 res->result_code = htonl (GNUNET_OK);
1776 res->max_message_id = GNUNET_htonll (mst->max_message_id);
1778 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1781 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1782 "%p Client connected as master to channel %s.\n",
1783 mst, GNUNET_h2s (&chn->pub_key_hash));
1785 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1786 cli->client = client;
1787 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1789 GNUNET_SERVICE_client_continue (client);
1794 check_client_slave_join (void *cls,
1795 const struct SlaveJoinRequest *req)
1802 * Handle a connecting client joining as a channel slave.
1805 handle_client_slave_join (void *cls,
1806 const struct SlaveJoinRequest *req)
1808 struct Client *c = cls;
1809 struct GNUNET_SERVICE_Client *client = c->client;
1811 uint16_t req_size = ntohs (req->header.size);
1813 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1814 struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1816 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1817 "got join request from client %p\n",
1819 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1820 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1821 GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1823 struct GNUNET_CONTAINER_MultiHashMap *
1824 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1825 struct Slave *slv = NULL;
1826 struct Channel *chn;
1828 if (NULL != chn_slv)
1830 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1834 slv = GNUNET_malloc (sizeof (*slv));
1835 slv->priv_key = req->slave_key;
1836 slv->pub_key = slv_pub_key;
1837 slv->pub_key_hash = slv_pub_hash;
1838 slv->origin = req->origin;
1839 slv->relay_count = ntohl (req->relay_count);
1840 slv->join_flags = ntohl (req->flags);
1842 const struct GNUNET_PeerIdentity *
1843 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1844 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1845 uint16_t join_msg_size = 0;
1847 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1850 struct GNUNET_PSYC_Message *
1851 join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1852 join_msg_size = ntohs (join_msg->header.size);
1853 slv->join_msg = GNUNET_malloc (join_msg_size);
1854 GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1856 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1858 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1859 "%u + %u + %u != %u\n",
1860 (unsigned int) sizeof (*req),
1865 GNUNET_SERVICE_client_drop (client);
1869 if (0 < slv->relay_count)
1871 slv->relays = GNUNET_malloc (relay_size);
1872 GNUNET_memcpy (slv->relays, &req[1], relay_size);
1875 chn = c->channel = &slv->channel;
1877 chn->is_master = GNUNET_NO;
1878 chn->pub_key = req->channel_pub_key;
1879 chn->pub_key_hash = pub_key_hash;
1882 if (NULL == chn_slv)
1884 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1885 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1886 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1888 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1889 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1890 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1891 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1892 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1893 &store_recv_slave_counters, slv);
1897 chn = &slv->channel;
1899 struct GNUNET_PSYC_CountersResultMessage *res;
1901 struct GNUNET_MQ_Envelope *
1902 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1903 res->result_code = htonl (GNUNET_OK);
1904 res->max_message_id = GNUNET_htonll (chn->max_message_id);
1906 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1908 if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1910 mcast_recv_join_decision (slv, GNUNET_YES,
1911 NULL, 0, NULL, NULL);
1913 else if (NULL == slv->member)
1916 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1918 slv->relay_count, slv->relays,
1919 &slv->join_msg->header,
1920 &mcast_recv_join_request,
1921 &mcast_recv_join_decision,
1922 &mcast_recv_replay_fragment,
1923 &mcast_recv_replay_message,
1924 &mcast_recv_message, chn);
1925 if (NULL != slv->join_msg)
1927 GNUNET_free (slv->join_msg);
1928 slv->join_msg = NULL;
1931 else if (NULL != slv->join_dcsn)
1933 struct GNUNET_MQ_Envelope *
1934 env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
1935 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1939 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1940 "Client %p connected as slave to channel %s.\n",
1942 GNUNET_h2s (&chn->pub_key_hash));
1944 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1945 cli->client = client;
1946 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1948 GNUNET_SERVICE_client_continue (client);
1952 struct JoinDecisionClosure
1954 int32_t is_admitted;
1955 struct GNUNET_MessageHeader *msg;
1960 * Iterator callback for sending join decisions to multicast.
1963 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1966 struct JoinDecisionClosure *jcls = cls;
1967 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1968 // FIXME: add relays
1969 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1975 check_client_join_decision (void *cls,
1976 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1983 * Join decision from client.
1986 handle_client_join_decision (void *cls,
1987 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1989 struct Client *c = cls;
1990 struct GNUNET_SERVICE_Client *client = c->client;
1991 struct Channel *chn = c->channel;
1995 GNUNET_SERVICE_client_drop (client);
1998 GNUNET_assert (GNUNET_YES == chn->is_master);
1999 struct Master *mst = chn->master;
2001 struct JoinDecisionClosure jcls;
2002 jcls.is_admitted = ntohl (dcsn->is_admitted);
2004 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
2005 ? (struct GNUNET_MessageHeader *) &dcsn[1]
2008 struct GNUNET_HashCode slave_pub_hash;
2009 GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
2012 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2013 "%p Got join decision (%d) from client for channel %s..\n",
2014 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
2015 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2016 "%p ..and slave %s.\n",
2017 mst, GNUNET_h2s (&slave_pub_hash));
2019 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
2020 &mcast_send_join_decision, &jcls);
2021 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
2022 GNUNET_SERVICE_client_continue (client);
2027 channel_part_cb (void *cls)
2029 struct GNUNET_SERVICE_Client *client = cls;
2030 struct GNUNET_MQ_Envelope *env;
2032 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_ACK);
2033 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
2039 handle_client_part_request (void *cls,
2040 const struct GNUNET_MessageHeader *msg)
2042 struct Client *c = cls;
2044 c->channel->is_disconnecting = GNUNET_YES;
2045 if (GNUNET_YES == c->channel->is_master)
2047 struct Master *mst = (struct Master *) c->channel;
2049 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2050 "Got part request from master %p\n",
2052 GNUNET_assert (NULL != mst->origin);
2053 GNUNET_MULTICAST_origin_stop (mst->origin, channel_part_cb, c->client);
2057 struct Slave *slv = (struct Slave *) c->channel;
2059 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2060 "Got part request from slave %p\n",
2062 GNUNET_assert (NULL != slv->member);
2063 GNUNET_MULTICAST_member_part (slv->member, channel_part_cb, c->client);
2065 GNUNET_SERVICE_client_continue (c->client);
2070 * Send acknowledgement to a client.
2072 * Sent after a message fragment has been passed on to multicast.
2074 * @param chn The channel struct for the client.
2077 send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2079 struct GNUNET_MessageHeader *res;
2080 struct GNUNET_MQ_Envelope *
2081 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
2084 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
2089 * Callback for the transmit functions of multicast.
2092 transmit_notify (void *cls, size_t *data_size, void *data)
2094 struct Channel *chn = cls;
2095 struct TransmitMessage *tmit_msg = chn->tmit_head;
2097 if (NULL == tmit_msg || *data_size < tmit_msg->size)
2099 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2100 "%p transmit_notify: nothing to send.\n", chn);
2101 if (NULL != tmit_msg && *data_size < tmit_msg->size)
2107 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2108 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2110 *data_size = tmit_msg->size;
2111 GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2114 = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2118 /* FIXME: handle disconnecting clients */
2119 if (NULL != tmit_msg->client)
2120 send_message_ack (chn, tmit_msg->client);
2122 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2124 if (NULL != chn->tmit_head)
2126 GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2128 else if (GNUNET_YES == chn->is_disconnecting
2129 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2131 /* FIXME: handle partial message (when still in_transmit) */
2132 GNUNET_free (tmit_msg);
2133 return GNUNET_SYSERR;
2135 GNUNET_free (tmit_msg);
2141 * Callback for the transmit functions of multicast.
2144 master_transmit_notify (void *cls, size_t *data_size, void *data)
2146 int ret = transmit_notify (cls, data_size, data);
2148 if (GNUNET_YES == ret)
2150 struct Master *mst = cls;
2151 mst->tmit_handle = NULL;
2158 * Callback for the transmit functions of multicast.
2161 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2163 int ret = transmit_notify (cls, data_size, data);
2165 if (GNUNET_YES == ret)
2167 struct Slave *slv = cls;
2168 slv->tmit_handle = NULL;
2175 * Transmit a message from a channel master to the multicast group.
2178 master_transmit_message (struct Master *mst)
2180 struct Channel *chn = &mst->channel;
2181 struct TransmitMessage *tmit_msg = chn->tmit_head;
2182 if (NULL == tmit_msg)
2184 if (NULL == mst->tmit_handle)
2186 mst->tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin,
2188 mst->max_group_generation,
2189 &master_transmit_notify,
2194 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2200 * Transmit a message from a channel slave to the multicast group.
2203 slave_transmit_message (struct Slave *slv)
2205 if (NULL == slv->channel.tmit_head)
2207 if (NULL == slv->tmit_handle)
2209 slv->tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member,
2210 slv->channel.tmit_head->id,
2211 &slave_transmit_notify,
2216 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2222 transmit_message (struct Channel *chn)
2225 ? master_transmit_message (chn->master)
2226 : slave_transmit_message (chn->slave);
2231 * Queue a message from a channel master for sending to the multicast group.
2234 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2236 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2238 tmit_msg->id = ++mst->max_message_id;
2239 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2240 "%p master_queue_message: message_id=%" PRIu64 "\n",
2242 struct GNUNET_PSYC_MessageMethod *pmeth
2243 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2245 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2247 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2249 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2251 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2252 "%p master_queue_message: state_delta=%" PRIu64 "\n",
2253 mst, tmit_msg->id - mst->max_state_message_id);
2254 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2255 - mst->max_state_message_id);
2256 mst->max_state_message_id = tmit_msg->id;
2260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2261 "%p master_queue_message: state not modified\n", mst);
2262 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2265 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2267 /// @todo add state_hash to PSYC header
2274 * Queue a message from a channel slave for sending to the multicast group.
2277 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2279 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2281 struct GNUNET_PSYC_MessageMethod *pmeth
2282 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2283 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2284 tmit_msg->id = ++slv->max_request_id;
2290 * Queue PSYC message parts for sending to multicast.
2293 * Channel to send to.
2295 * Client the message originates from.
2299 * Concatenated message parts.
2300 * @param first_ptype
2301 * First message part type in @a data.
2303 * Last message part type in @a data.
2305 static struct TransmitMessage *
2306 queue_message (struct Channel *chn,
2307 struct GNUNET_SERVICE_Client *client,
2310 uint16_t first_ptype, uint16_t last_ptype)
2312 struct TransmitMessage *
2313 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2314 GNUNET_memcpy (&tmit_msg[1], data, data_size);
2315 tmit_msg->client = client;
2316 tmit_msg->size = data_size;
2317 tmit_msg->first_ptype = first_ptype;
2318 tmit_msg->last_ptype = last_ptype;
2320 /* FIXME: separate queue per message ID */
2322 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2325 ? master_queue_message (chn->master, tmit_msg)
2326 : slave_queue_message (chn->slave, tmit_msg);
2332 * Cancel transmission of current message.
2334 * @param chn Channel to send to.
2335 * @param client Client the message originates from.
2338 transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2340 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2342 struct GNUNET_MessageHeader msg;
2343 msg.size = htons (sizeof (msg));
2344 msg.type = htons (type);
2346 queue_message (chn, client, sizeof (msg), &msg, type, type);
2347 transmit_message (chn);
2349 /* FIXME: cleanup */
2354 check_client_psyc_message (void *cls,
2355 const struct GNUNET_MessageHeader *msg)
2362 * Incoming message from a master or slave client.
2365 handle_client_psyc_message (void *cls,
2366 const struct GNUNET_MessageHeader *msg)
2368 struct Client *c = cls;
2369 struct GNUNET_SERVICE_Client *client = c->client;
2370 struct Channel *chn = c->channel;
2374 GNUNET_SERVICE_client_drop (client);
2378 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2379 "%p Received message from client.\n", chn);
2380 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2382 if (GNUNET_YES != chn->is_ready)
2384 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2385 "%p Channel is not ready yet, disconnecting client %p.\n",
2389 GNUNET_SERVICE_client_drop (client);
2393 uint16_t size = ntohs (msg->size);
2394 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2396 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2397 "%p Message payload too large: %u < %u.\n",
2399 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2400 (unsigned int) (size - sizeof (*msg)));
2402 transmit_cancel (chn, client);
2403 GNUNET_SERVICE_client_drop (client);
2407 uint16_t first_ptype = 0, last_ptype = 0;
2409 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2410 (const char *) &msg[1],
2411 &first_ptype, &last_ptype))
2413 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2414 "%p Received invalid message part from client.\n", chn);
2416 transmit_cancel (chn, client);
2417 GNUNET_SERVICE_client_drop (client);
2420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2421 "%p Received message with first part type %u and last part type %u.\n",
2422 chn, first_ptype, last_ptype);
2424 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2425 first_ptype, last_ptype);
2426 transmit_message (chn);
2427 /* FIXME: send a few ACKs even before transmit_notify is called */
2429 GNUNET_SERVICE_client_continue (client);
2434 * Received result of GNUNET_PSYCSTORE_membership_store()
2437 store_recv_membership_store_result (void *cls,
2439 const char *err_msg,
2440 uint16_t err_msg_size)
2442 struct Operation *op = cls;
2443 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2444 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2450 if (NULL != op->client)
2451 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2457 * Client requests to add/remove a slave in the membership database.
2460 handle_client_membership_store (void *cls,
2461 const struct ChannelMembershipStoreRequest *req)
2463 struct Client *c = cls;
2464 struct GNUNET_SERVICE_Client *client = c->client;
2465 struct Channel *chn = c->channel;
2469 GNUNET_SERVICE_client_drop (client);
2473 struct Operation *op = op_add (chn, client, req->op_id, 0);
2475 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2476 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2477 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2478 "%p Received membership store request from client.\n", chn);
2479 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2480 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2481 chn, req->did_join, announced_at, effective_since);
2483 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2484 req->did_join, announced_at, effective_since,
2485 0, /* FIXME: group_generation */
2486 &store_recv_membership_store_result, op);
2487 GNUNET_SERVICE_client_continue (client);
2492 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2493 * in response to a history request from a client.
2496 store_recv_fragment_history (void *cls,
2497 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2498 enum GNUNET_PSYCSTORE_MessageFlags flags)
2500 struct Operation *op = cls;
2501 if (NULL == op->client)
2502 { /* Requesting client already disconnected. */
2505 struct Channel *chn = op->channel;
2507 struct GNUNET_PSYC_MessageHeader *pmsg;
2508 uint16_t msize = ntohs (mmsg->header.size);
2509 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2511 struct GNUNET_OperationResultMessage *
2512 res = GNUNET_malloc (sizeof (*res) + psize);
2513 res->header.size = htons (sizeof (*res) + psize);
2514 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2515 res->op_id = op->op_id;
2516 res->result_code = GNUNET_htonll (GNUNET_OK);
2518 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2519 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2520 GNUNET_memcpy (&res[1], pmsg, psize);
2522 /** @todo FIXME: send only to requesting client */
2523 client_send_msg (chn, &res->header);
2531 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2532 * in response to a history request from a client.
2535 store_recv_fragment_history_result (void *cls, int64_t result,
2536 const char *err_msg, uint16_t err_msg_size)
2538 struct Operation *op = cls;
2539 if (NULL == op->client)
2540 { /* Requesting client already disconnected. */
2544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2545 "%p History replay #%" PRIu64 ": "
2546 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2547 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2549 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2551 /** @todo Multicast replay request for messages not found locally. */
2554 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2560 check_client_history_replay (void *cls,
2561 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2568 * Client requests channel history.
2571 handle_client_history_replay (void *cls,
2572 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2574 struct Client *c = cls;
2575 struct GNUNET_SERVICE_Client *client = c->client;
2576 struct Channel *chn = c->channel;
2580 GNUNET_SERVICE_client_drop (client);
2584 uint16_t size = ntohs (req->header.size);
2585 const char *method_prefix = (const char *) &req[1];
2587 if (size < sizeof (*req) + 1
2588 || '\0' != method_prefix[size - sizeof (*req) - 1])
2590 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2591 "%p History replay #%" PRIu64 ": "
2592 "invalid method prefix. size: %u < %u?\n",
2594 GNUNET_ntohll (req->op_id),
2596 (unsigned int) sizeof (*req) + 1);
2598 GNUNET_SERVICE_client_drop (client);
2602 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2604 if (0 == req->message_limit)
2606 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2607 GNUNET_ntohll (req->start_message_id),
2608 GNUNET_ntohll (req->end_message_id),
2610 &store_recv_fragment_history,
2611 &store_recv_fragment_history_result, op);
2615 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2616 GNUNET_ntohll (req->message_limit),
2618 &store_recv_fragment_history,
2619 &store_recv_fragment_history_result,
2622 GNUNET_SERVICE_client_continue (client);
2627 * Received state var from PSYCstore, send it to client.
2630 store_recv_state_var (void *cls, const char *name,
2631 const void *value, uint32_t value_size)
2633 struct Operation *op = cls;
2634 struct GNUNET_OperationResultMessage *res;
2635 struct GNUNET_MQ_Envelope *env;
2637 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2638 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2639 op->channel, GNUNET_ntohll (op->op_id), name);
2641 if (NULL != name) /* First part */
2643 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2644 struct GNUNET_PSYC_MessageModifier *mod;
2645 env = GNUNET_MQ_msg_extra (res,
2646 sizeof (*mod) + name_size + value_size,
2647 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2648 res->op_id = op->op_id;
2650 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2651 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2652 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2653 mod->name_size = htons (name_size);
2654 mod->value_size = htonl (value_size);
2655 mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2656 GNUNET_memcpy (&mod[1], name, name_size);
2657 GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2659 else /* Continuation */
2661 struct GNUNET_MessageHeader *mod;
2662 env = GNUNET_MQ_msg_extra (res,
2663 sizeof (*mod) + value_size,
2664 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2665 res->op_id = op->op_id;
2667 mod = (struct GNUNET_MessageHeader *) &res[1];
2668 mod->size = htons (sizeof (*mod) + value_size);
2669 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2670 GNUNET_memcpy (&mod[1], value, value_size);
2673 // FIXME: client might have been disconnected
2674 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
2680 * Received result of GNUNET_PSYCSTORE_state_get()
2681 * or GNUNET_PSYCSTORE_state_get_prefix()
2684 store_recv_state_result (void *cls, int64_t result,
2685 const char *err_msg, uint16_t err_msg_size)
2687 struct Operation *op = cls;
2688 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2689 "%p state_get #%" PRIu64 ": "
2690 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2691 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2693 // FIXME: client might have been disconnected
2694 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2700 check_client_state_get (void *cls,
2701 const struct StateRequest *req)
2703 struct Client *c = cls;
2704 struct Channel *chn = c->channel;
2708 return GNUNET_SYSERR;
2711 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2712 const char *name = (const char *) &req[1];
2713 if (0 == name_size || '\0' != name[name_size - 1])
2716 return GNUNET_SYSERR;
2724 * Client requests best matching state variable from PSYCstore.
2727 handle_client_state_get (void *cls,
2728 const struct StateRequest *req)
2730 struct Client *c = cls;
2731 struct GNUNET_SERVICE_Client *client = c->client;
2732 struct Channel *chn = c->channel;
2734 const char *name = (const char *) &req[1];
2735 struct Operation *op = op_add (chn, client, req->op_id, 0);
2736 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2737 &store_recv_state_var,
2738 &store_recv_state_result, op);
2739 GNUNET_SERVICE_client_continue (client);
2744 check_client_state_get_prefix (void *cls,
2745 const struct StateRequest *req)
2747 struct Client *c = cls;
2748 struct Channel *chn = c->channel;
2752 return GNUNET_SYSERR;
2755 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2756 const char *name = (const char *) &req[1];
2757 if (0 == name_size || '\0' != name[name_size - 1])
2760 return GNUNET_SYSERR;
2768 * Client requests state variables with a given prefix from PSYCstore.
2771 handle_client_state_get_prefix (void *cls,
2772 const struct StateRequest *req)
2774 struct Client *c = cls;
2775 struct GNUNET_SERVICE_Client *client = c->client;
2776 struct Channel *chn = c->channel;
2778 const char *name = (const char *) &req[1];
2779 struct Operation *op = op_add (chn, client, req->op_id, 0);
2780 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2781 &store_recv_state_var,
2782 &store_recv_state_result, op);
2783 GNUNET_SERVICE_client_continue (client);
2788 * Initialize the PSYC service.
2790 * @param cls Closure.
2791 * @param server The initialized server.
2792 * @param c Configuration to use.
2796 const struct GNUNET_CONFIGURATION_Handle *c,
2797 struct GNUNET_SERVICE_Handle *svc)
2801 store = GNUNET_PSYCSTORE_connect (cfg);
2802 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2803 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2804 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2805 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2806 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2807 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2812 * Define "main" method using service macro.
2816 GNUNET_SERVICE_OPTION_NONE,
2818 &client_notify_connect,
2819 &client_notify_disconnect,
2821 GNUNET_MQ_hd_fixed_size (client_master_start,
2822 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
2823 struct MasterStartRequest,
2825 GNUNET_MQ_hd_var_size (client_slave_join,
2826 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
2827 struct SlaveJoinRequest,
2829 GNUNET_MQ_hd_var_size (client_join_decision,
2830 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2831 struct GNUNET_PSYC_JoinDecisionMessage,
2833 GNUNET_MQ_hd_fixed_size (client_part_request,
2834 GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST,
2835 struct GNUNET_MessageHeader,
2837 GNUNET_MQ_hd_var_size (client_psyc_message,
2838 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2839 struct GNUNET_MessageHeader,
2841 GNUNET_MQ_hd_fixed_size (client_membership_store,
2842 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
2843 struct ChannelMembershipStoreRequest,
2845 GNUNET_MQ_hd_var_size (client_history_replay,
2846 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
2847 struct GNUNET_PSYC_HistoryRequestMessage,
2849 GNUNET_MQ_hd_var_size (client_state_get,
2850 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
2851 struct StateRequest,
2853 GNUNET_MQ_hd_var_size (client_state_get_prefix,
2854 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2855 struct StateRequest,
2858 /* end of gnunet-service-psyc.c */