2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
42 * Handle to our current configuration.
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
49 static struct GNUNET_SERVICE_Handle *service;
52 * Handle to the statistics service.
54 static struct GNUNET_STATISTICS_Handle *stats;
57 * Handle to the PSYCstore.
59 static struct GNUNET_PSYCSTORE_Handle *store;
62 * All connected masters.
63 * Channel's pub_key_hash -> struct Master
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
68 * All connected slaves.
69 * Channel's pub_key_hash -> struct Slave
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
74 * Connected slaves per channel.
75 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
81 * Message in the transmission queue.
83 struct TransmitMessage
85 struct TransmitMessage *prev;
86 struct TransmitMessage *next;
88 struct GNUNET_SERVICE_Client *client;
91 * ID assigned to the message.
101 * Type of first message part.
103 uint16_t first_ptype;
106 * Type of last message part.
110 /* Followed by message */
115 * Cache for received message fragments.
116 * Message fragments are only sent to clients after all modifiers arrived.
118 * chan_key -> MultiHashMap chan_msgs
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
124 * Entry in the chan_msgs hashmap of @a recv_cache:
125 * fragment_id -> RecvCacheEntry
127 struct RecvCacheEntry
129 struct GNUNET_MULTICAST_MessageHeader *mmsg;
135 * Entry in the @a recv_frags hash map of a @a Channel.
136 * message_id -> FragmentQueue
141 * Fragment IDs stored in @a recv_cache.
143 struct GNUNET_CONTAINER_Heap *fragments;
146 * Total size of received fragments.
151 * Total size of received header fragments (METHOD & MODIFIERs)
153 uint64_t header_size;
156 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
158 uint64_t state_delta;
161 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
166 * Receive state of message.
168 * @see MessageFragmentState
173 * Whether the state is already modified in PSYCstore.
175 uint8_t state_is_modified;
178 * Is the message queued for delivery to the client?
179 * i.e. added to the recv_msgs queue
186 * List of connected clients.
190 struct ClientList *prev;
191 struct ClientList *next;
193 struct GNUNET_SERVICE_Client *client;
199 struct Operation *prev;
200 struct Operation *next;
202 struct GNUNET_SERVICE_Client *client;
203 struct Channel *channel;
210 * Common part of the client context for both a channel master and slave.
214 struct ClientList *clients_head;
215 struct ClientList *clients_tail;
217 struct Operation *op_head;
218 struct Operation *op_tail;
220 struct TransmitMessage *tmit_head;
221 struct TransmitMessage *tmit_tail;
224 * Current PSYCstore operation.
226 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
229 * Received fragments not yet sent to the client.
230 * message_id -> FragmentQueue
232 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
235 * Received message IDs not yet sent to the client.
237 struct GNUNET_CONTAINER_Heap *recv_msgs;
240 * Public key of the channel.
242 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
245 * Hash of @a pub_key.
247 struct GNUNET_HashCode pub_key_hash;
250 * Last message ID sent to the client.
251 * 0 if there is no such message.
253 uint64_t max_message_id;
256 * ID of the last stateful message, where the state operations has been
257 * processed and saved to PSYCstore and which has been sent to the client.
258 * 0 if there is no such message.
260 uint64_t max_state_message_id;
263 * Expected value size for the modifier being received from the PSYC service.
265 uint32_t tmit_mod_value_size_expected;
268 * Actual value size for the modifier being received from the PSYC service.
270 uint32_t tmit_mod_value_size;
273 * Is this channel ready to receive messages from client?
274 * #GNUNET_YES or #GNUNET_NO
279 * Is the client disconnected?
280 * #GNUNET_YES or #GNUNET_NO
282 uint8_t is_disconnected;
285 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
290 struct Master *master;
297 * Client context for a channel master.
302 * Channel struct common for Master and Slave
304 struct Channel channel;
307 * Private key of the channel.
309 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
312 * Handle for the multicast origin.
314 struct GNUNET_MULTICAST_Origin *origin;
317 * Transmit handle for multicast.
319 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
322 * Incoming join requests from multicast.
323 * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
325 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
328 * Last message ID transmitted to this channel.
330 * Incremented before sending a message, thus the message_id in messages sent
333 uint64_t max_message_id;
336 * ID of the last message with state operations transmitted to the channel.
337 * 0 if there is no such message.
339 uint64_t max_state_message_id;
342 * Maximum group generation transmitted to the channel.
344 uint64_t max_group_generation;
347 * @see enum GNUNET_PSYC_Policy
349 enum GNUNET_PSYC_Policy policy;
354 * Client context for a channel slave.
359 * Channel struct common for Master and Slave
361 struct Channel channel;
364 * Private key of the slave.
366 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
369 * Public key of the slave.
371 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
374 * Hash of @a pub_key.
376 struct GNUNET_HashCode pub_key_hash;
379 * Handle for the multicast member.
381 struct GNUNET_MULTICAST_Member *member;
384 * Transmit handle for multicast.
386 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
389 * Peer identity of the origin.
391 struct GNUNET_PeerIdentity origin;
394 * Number of items in @a relays.
396 uint32_t relay_count;
399 * Relays that multicast can use to connect.
401 struct GNUNET_PeerIdentity *relays;
404 * Join request to be transmitted to the master on join.
406 struct GNUNET_PSYC_Message *join_msg;
409 * Join decision received from multicast.
411 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
414 * Maximum request ID for this channel.
416 uint64_t max_request_id;
421 enum GNUNET_PSYC_SlaveJoinFlags join_flags;
429 struct GNUNET_SERVICE_Client *client;
430 struct Channel *channel;
434 struct ReplayRequestKey
436 uint64_t fragment_id;
438 uint64_t fragment_offset;
444 transmit_message (struct Channel *chn);
447 message_queue_run (struct Channel *chn);
450 message_queue_drop (struct Channel *chn);
454 schedule_transmit_message (void *cls)
456 struct Channel *chn = cls;
458 transmit_message (chn);
463 * Task run during shutdown.
468 shutdown_task (void *cls)
472 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
478 static struct Operation *
479 op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
480 uint64_t op_id, uint32_t flags)
482 struct Operation *op = GNUNET_malloc (sizeof (*op));
487 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
493 op_remove (struct Operation *op)
495 GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
501 * Clean up master data structures after a client disconnected.
504 cleanup_master (struct Master *mst)
506 struct Channel *chn = &mst->channel;
508 if (NULL != mst->origin)
509 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
510 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
511 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
516 * Clean up slave data structures after a client disconnected.
519 cleanup_slave (struct Slave *slv)
521 struct Channel *chn = &slv->channel;
522 struct GNUNET_CONTAINER_MultiHashMap *
523 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
525 GNUNET_assert (NULL != chn_slv);
526 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
528 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
530 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
532 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
534 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
536 if (NULL != slv->join_msg)
538 GNUNET_free (slv->join_msg);
539 slv->join_msg = NULL;
541 if (NULL != slv->relays)
543 GNUNET_free (slv->relays);
546 if (NULL != slv->member)
548 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
551 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
556 * Clean up channel data structures after a client disconnected.
559 cleanup_channel (struct Channel *chn)
561 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
562 "%p Cleaning up channel %s. master? %u\n",
564 GNUNET_h2s (&chn->pub_key_hash),
566 message_queue_drop (chn);
567 GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
568 chn->recv_frags = NULL;
570 if (NULL != chn->store_op)
572 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
573 chn->store_op = NULL;
576 (GNUNET_YES == chn->is_master)
577 ? cleanup_master (chn->master)
578 : cleanup_slave (chn->slave);
584 * Called whenever a client is disconnected.
585 * Frees our resources associated with that client.
588 * @param client identification of the client
589 * @param app_ctx must match @a client
592 client_notify_disconnect (void *cls,
593 struct GNUNET_SERVICE_Client *client,
596 struct Client *c = app_ctx;
597 struct Channel *chn = c->channel;
602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603 "%p User context is NULL in client_disconnect()\n",
609 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
610 "%p Client (%s) disconnected from channel %s\n",
612 (GNUNET_YES == chn->is_master) ? "master" : "slave",
613 GNUNET_h2s (&chn->pub_key_hash));
615 struct ClientList *cli = chn->clients_head;
618 if (cli->client == client)
620 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
627 struct Operation *op = chn->op_head;
630 if (op->client == client)
638 if (NULL == chn->clients_head)
639 { /* Last client disconnected. */
640 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
641 "%p Last client (%s) disconnected from channel %s\n",
643 (GNUNET_YES == chn->is_master) ? "master" : "slave",
644 GNUNET_h2s (&chn->pub_key_hash));
645 chn->is_disconnected = GNUNET_YES;
646 if (NULL != chn->tmit_head)
647 { /* Send pending messages to multicast before cleanup. */
648 transmit_message (chn);
652 cleanup_channel (chn);
659 * A new client connected.
662 * @param client client to add
663 * @param mq message queue for @a client
667 client_notify_connect (void *cls,
668 struct GNUNET_SERVICE_Client *client,
669 struct GNUNET_MQ_Handle *mq)
671 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
673 struct Client *c = GNUNET_malloc (sizeof (*c));
681 * Send message to all clients connected to the channel.
684 client_send_msg (const struct Channel *chn,
685 const struct GNUNET_MessageHeader *msg)
687 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688 "%p Sending message to clients.\n",
691 struct ClientList *cli = chn->clients_head;
694 struct GNUNET_MQ_Envelope *
695 env = GNUNET_MQ_msg_copy (msg);
697 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
706 * Send a result code back to the client.
709 * Client that should receive the result code.
713 * Operation ID in network byte order.
715 * Data payload or NULL.
720 client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
721 int64_t result_code, const void *data, uint16_t data_size)
723 struct GNUNET_OperationResultMessage *res;
724 struct GNUNET_MQ_Envelope *
725 env = GNUNET_MQ_msg_extra (res,
727 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
728 res->result_code = GNUNET_htonll (result_code);
731 GNUNET_memcpy (&res[1], data, data_size);
733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734 "%p Sending result to client for operation #%" PRIu64 ": %" PRId64 " (size: %u)\n",
736 GNUNET_ntohll (op_id),
740 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
745 * Closure for join_mem_test_cb()
747 struct JoinMemTestClosure
749 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
750 struct Channel *channel;
751 struct GNUNET_MULTICAST_JoinHandle *join_handle;
752 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
757 * Membership test result callback used for join requests.
760 join_mem_test_cb (void *cls, int64_t result,
761 const char *err_msg, uint16_t err_msg_size)
763 struct JoinMemTestClosure *jcls = cls;
765 if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
766 { /* Pass on join request to client if this is a master channel */
767 struct Master *mst = jcls->channel->master;
768 struct GNUNET_HashCode slave_pub_hash;
769 GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
771 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
772 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
773 client_send_msg (jcls->channel, &jcls->join_msg->header);
777 if (GNUNET_SYSERR == result)
779 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
780 "Could not perform membership test (%.*s)\n",
781 err_msg_size, err_msg);
784 GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
786 GNUNET_free (jcls->join_msg);
792 * Incoming join request from multicast.
795 mcast_recv_join_request (void *cls,
796 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
797 const struct GNUNET_MessageHeader *join_msg,
798 struct GNUNET_MULTICAST_JoinHandle *jh)
800 struct Channel *chn = cls;
801 uint16_t join_msg_size = 0;
803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
804 "%p Got join request.\n",
806 if (NULL != join_msg)
808 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
810 join_msg_size = ntohs (join_msg->size);
814 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
815 "%p Got join message with invalid type %u.\n",
817 ntohs (join_msg->type));
821 struct GNUNET_PSYC_JoinRequestMessage *
822 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
823 req->header.size = htons (sizeof (*req) + join_msg_size);
824 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
825 req->slave_pub_key = *slave_pub_key;
826 if (0 < join_msg_size)
827 GNUNET_memcpy (&req[1], join_msg, join_msg_size);
829 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
830 jcls->slave_pub_key = *slave_pub_key;
832 jcls->join_handle = jh;
833 jcls->join_msg = req;
835 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
836 chn->max_message_id, 0,
837 &join_mem_test_cb, jcls);
842 * Join decision received from multicast.
845 mcast_recv_join_decision (void *cls, int is_admitted,
846 const struct GNUNET_PeerIdentity *peer,
847 uint16_t relay_count,
848 const struct GNUNET_PeerIdentity *relays,
849 const struct GNUNET_MessageHeader *join_resp)
851 struct Slave *slv = cls;
852 struct Channel *chn = &slv->channel;
853 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
854 "%p Got join decision: %d\n",
857 if (GNUNET_YES == chn->is_ready)
859 /* Already admitted */
863 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
864 struct GNUNET_PSYC_JoinDecisionMessage *
865 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
866 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
867 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
868 dcsn->is_admitted = htonl (is_admitted);
869 if (0 < join_resp_size)
870 GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
872 client_send_msg (chn, &dcsn->header);
874 if (GNUNET_YES == is_admitted
875 && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
877 chn->is_ready = GNUNET_YES;
883 store_recv_fragment_replay (void *cls,
884 struct GNUNET_MULTICAST_MessageHeader *msg,
885 enum GNUNET_PSYCSTORE_MessageFlags flags)
887 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
889 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
895 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
898 store_recv_fragment_replay_result (void *cls,
901 uint16_t err_msg_size)
903 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
905 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
906 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
917 GNUNET_MULTICAST_replay_response (rh, NULL,
918 GNUNET_MULTICAST_REC_NOT_FOUND);
921 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
922 GNUNET_MULTICAST_replay_response (rh, NULL,
923 GNUNET_MULTICAST_REC_ACCESS_DENIED);
927 GNUNET_MULTICAST_replay_response (rh, NULL,
928 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
931 /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
932 * an error code, so it must be ensured no further processing
933 * is attempted on 'rh'. Maybe this should be refactored as
934 * it doesn't look very intuitive. --lynX
936 GNUNET_MULTICAST_replay_response_end (rh);
941 * Incoming fragment replay request from multicast.
944 mcast_recv_replay_fragment (void *cls,
945 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
946 uint64_t fragment_id, uint64_t flags,
947 struct GNUNET_MULTICAST_ReplayHandle *rh)
950 struct Channel *chn = cls;
951 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
952 fragment_id, fragment_id,
953 &store_recv_fragment_replay,
954 &store_recv_fragment_replay_result, rh);
959 * Incoming message replay request from multicast.
962 mcast_recv_replay_message (void *cls,
963 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
965 uint64_t fragment_offset,
967 struct GNUNET_MULTICAST_ReplayHandle *rh)
969 struct Channel *chn = cls;
970 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
971 message_id, message_id, 1, NULL,
972 &store_recv_fragment_replay,
973 &store_recv_fragment_replay_result, rh);
978 * Convert an uint64_t in network byte order to a HashCode
979 * that can be used as key in a MultiHashMap
982 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
984 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
985 /* TODO: use built-in byte swap functions if available */
987 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
988 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
990 *key = (struct GNUNET_HashCode) {};
992 = (n << 32) | (n >> 32);
997 * Convert an uint64_t in host byte order to a HashCode
998 * that can be used as key in a MultiHashMap
1001 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
1003 #if __BYTE_ORDER == __BIG_ENDIAN
1004 hash_key_from_nll (key, n);
1005 #elif __BYTE_ORDER == __LITTLE_ENDIAN
1006 *key = (struct GNUNET_HashCode) {};
1007 *((uint64_t *) key) = n;
1009 #error byteorder undefined
1015 * Initialize PSYC message header.
1018 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1019 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1021 uint16_t size = ntohs (mmsg->header.size);
1022 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1024 pmsg->header.size = htons (psize);
1025 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1026 pmsg->message_id = mmsg->message_id;
1027 pmsg->fragment_offset = mmsg->fragment_offset;
1028 pmsg->flags = htonl (flags);
1030 GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1035 * Create a new PSYC message from a multicast message for sending it to clients.
1037 static inline struct GNUNET_PSYC_MessageHeader *
1038 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1040 struct GNUNET_PSYC_MessageHeader *pmsg;
1041 uint16_t size = ntohs (mmsg->header.size);
1042 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1044 pmsg = GNUNET_malloc (psize);
1045 psyc_msg_init (pmsg, mmsg, flags);
1051 * Send multicast message to all clients connected to the channel.
1054 client_send_mcast_msg (struct Channel *chn,
1055 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1058 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059 "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1061 GNUNET_ntohll (mmsg->fragment_id),
1062 GNUNET_ntohll (mmsg->message_id));
1064 struct GNUNET_PSYC_MessageHeader *
1065 pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1066 client_send_msg (chn, &pmsg->header);
1072 * Send multicast request to all clients connected to the channel.
1075 client_send_mcast_req (struct Master *mst,
1076 const struct GNUNET_MULTICAST_RequestHeader *req)
1078 struct Channel *chn = &mst->channel;
1080 struct GNUNET_PSYC_MessageHeader *pmsg;
1081 uint16_t size = ntohs (req->header.size);
1082 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1084 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1085 "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1087 GNUNET_ntohll (req->fragment_id),
1088 GNUNET_ntohll (req->request_id));
1090 pmsg = GNUNET_malloc (psize);
1091 pmsg->header.size = htons (psize);
1092 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1093 pmsg->message_id = req->request_id;
1094 pmsg->fragment_offset = req->fragment_offset;
1095 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1096 pmsg->slave_pub_key = req->member_pub_key;
1097 GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1099 client_send_msg (chn, &pmsg->header);
1101 /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1108 * Insert a multicast message fragment into the queue belonging to the message.
1110 * @param chn Channel.
1111 * @param mmsg Multicast message fragment.
1112 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
1113 * @param first_ptype First PSYC message part type in @a mmsg.
1114 * @param last_ptype Last PSYC message part type in @a mmsg.
1117 fragment_queue_insert (struct Channel *chn,
1118 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1119 uint16_t first_ptype, uint16_t last_ptype)
1121 const uint16_t size = ntohs (mmsg->header.size);
1122 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1123 struct GNUNET_CONTAINER_MultiHashMap
1124 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1125 &chn->pub_key_hash);
1127 struct GNUNET_HashCode msg_id_hash;
1128 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1130 struct FragmentQueue
1131 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1135 fragq = GNUNET_malloc (sizeof (*fragq));
1136 fragq->state = MSG_FRAG_STATE_HEADER;
1138 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1140 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1141 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1143 if (NULL == chan_msgs)
1145 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1146 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1147 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1151 struct GNUNET_HashCode frag_id_hash;
1152 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1153 struct RecvCacheEntry
1154 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1155 if (NULL == cache_entry)
1157 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1158 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1160 GNUNET_ntohll (mmsg->message_id),
1161 GNUNET_ntohll (mmsg->fragment_id));
1162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163 "%p header_size: %" PRIu64 " + %u\n",
1167 cache_entry = GNUNET_malloc (sizeof (*cache_entry));
1168 cache_entry->ref_count = 1;
1169 cache_entry->mmsg = GNUNET_malloc (size);
1170 GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1171 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1172 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1176 cache_entry->ref_count++;
1177 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1178 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1180 GNUNET_ntohll (mmsg->message_id),
1181 GNUNET_ntohll (mmsg->fragment_id),
1182 cache_entry->ref_count);
1185 if (MSG_FRAG_STATE_HEADER == fragq->state)
1187 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1189 struct GNUNET_PSYC_MessageMethod *
1190 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1191 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1192 fragq->flags = ntohl (pmeth->flags);
1195 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1197 fragq->header_size += size;
1199 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1200 || frag_offset == fragq->header_size)
1201 { /* header is now complete */
1202 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1203 "%p Header of message %" PRIu64 " is complete.\n",
1205 GNUNET_ntohll (mmsg->message_id));
1207 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1208 "%p Adding message %" PRIu64 " to queue.\n",
1210 GNUNET_ntohll (mmsg->message_id));
1211 fragq->state = MSG_FRAG_STATE_DATA;
1215 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1216 "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1218 GNUNET_ntohll (mmsg->message_id),
1220 fragq->header_size);
1226 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1227 if (frag_offset == fragq->size)
1228 fragq->state = MSG_FRAG_STATE_END;
1230 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1231 "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1233 GNUNET_ntohll (mmsg->message_id),
1238 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1239 /* Drop message without delivering to client if it's a single fragment */
1241 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1242 ? MSG_FRAG_STATE_DROP
1243 : MSG_FRAG_STATE_CANCEL;
1246 switch (fragq->state)
1248 case MSG_FRAG_STATE_DATA:
1249 case MSG_FRAG_STATE_END:
1250 case MSG_FRAG_STATE_CANCEL:
1251 if (GNUNET_NO == fragq->is_queued)
1253 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1254 GNUNET_ntohll (mmsg->message_id));
1255 fragq->is_queued = GNUNET_YES;
1259 fragq->size += size;
1260 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1261 GNUNET_ntohll (mmsg->fragment_id));
1266 * Run fragment queue of a message.
1268 * Send fragments of a message in order to client, after all modifiers arrived
1274 * ID of the message @a fragq belongs to.
1276 * Fragment queue of the message.
1278 * Drop message without delivering to client?
1279 * #GNUNET_YES or #GNUNET_NO.
1282 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1283 struct FragmentQueue *fragq, uint8_t drop)
1285 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1286 "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1291 struct GNUNET_CONTAINER_MultiHashMap
1292 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1293 &chn->pub_key_hash);
1294 GNUNET_assert (NULL != chan_msgs);
1297 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1300 struct GNUNET_HashCode frag_id_hash;
1301 hash_key_from_hll (&frag_id_hash, frag_id);
1302 struct RecvCacheEntry *cache_entry
1303 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1304 if (cache_entry != NULL)
1306 if (GNUNET_NO == drop)
1308 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1310 if (cache_entry->ref_count <= 1)
1312 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1314 GNUNET_free (cache_entry->mmsg);
1315 GNUNET_free (cache_entry);
1319 cache_entry->ref_count--;
1322 #if CACHE_AGING_IMPLEMENTED
1323 else if (GNUNET_NO == drop)
1325 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1329 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1332 if (MSG_FRAG_STATE_END <= fragq->state)
1334 struct GNUNET_HashCode msg_id_hash;
1335 hash_key_from_hll (&msg_id_hash, msg_id);
1337 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1338 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1339 GNUNET_free (fragq);
1343 fragq->is_queued = GNUNET_NO;
1348 struct StateModifyClosure
1350 struct Channel *channel;
1352 struct GNUNET_HashCode msg_id_hash;
1357 store_recv_state_modify_result (void *cls, int64_t result,
1358 const char *err_msg, uint16_t err_msg_size)
1360 struct StateModifyClosure *mcls = cls;
1361 struct Channel *chn = mcls->channel;
1362 uint64_t msg_id = mcls->msg_id;
1364 struct FragmentQueue *
1365 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1368 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1369 chn, result, err_msg_size, err_msg);
1376 fragq->state_is_modified = GNUNET_YES;
1377 if (chn->max_state_message_id < msg_id)
1378 chn->max_state_message_id = msg_id;
1379 if (chn->max_message_id < msg_id)
1380 chn->max_message_id = msg_id;
1383 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1384 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1385 message_queue_run (chn);
1389 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1390 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1391 chn, result, err_msg_size, err_msg);
1392 /** @todo FIXME: handle state_modify error */
1398 * Run message queue.
1400 * Send messages in queue to client in order after a message has arrived from
1401 * multicast, according to the following:
1402 * - A message is only sent if all of its modifiers arrived.
1403 * - A stateful message is only sent if the previous stateful message
1404 * has already been delivered to the client.
1406 * @param chn Channel.
1408 * @return Number of messages removed from queue and sent to client.
1411 message_queue_run (struct Channel *chn)
1413 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1414 "%p Running message queue.\n", chn);
1418 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1421 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1422 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1423 struct GNUNET_HashCode msg_id_hash;
1424 hash_key_from_hll (&msg_id_hash, msg_id);
1426 struct FragmentQueue *
1427 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1429 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1431 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1432 "%p No fragq (%p) or header not complete.\n",
1437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1438 "%p Fragment queue entry: state: %u, state delta: "
1439 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1440 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1442 if (MSG_FRAG_STATE_DATA <= fragq->state)
1444 /* Check if there's a missing message before the current one */
1445 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1449 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1450 && (chn->max_message_id != msg_id - 1
1451 && chn->max_message_id != msg_id))
1453 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1454 "%p Out of order message. "
1455 "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1456 chn, chn->max_message_id, msg_id);
1458 // FIXME: keep track of messages processed in this queue run,
1459 // and only stop after reaching the end
1464 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1465 if (GNUNET_YES != fragq->state_is_modified)
1467 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1469 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1470 "%p Out of order stateful message. "
1471 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1472 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1474 // FIXME: keep track of messages processed in this queue run,
1475 // and only stop after reaching the end
1478 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1479 mcls->channel = chn;
1480 mcls->msg_id = msg_id;
1481 mcls->msg_id_hash = msg_id_hash;
1483 /* Apply modifiers to state in PSYCstore */
1484 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1486 store_recv_state_modify_result, mcls);
1487 break; // continue after asynchronous state modify result
1490 chn->max_message_id = msg_id;
1492 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1493 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1497 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1498 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1504 * Drop message queue of a channel.
1506 * Remove all messages in queue without sending it to clients.
1508 * @param chn Channel.
1510 * @return Number of messages removed from queue.
1513 message_queue_drop (struct Channel *chn)
1515 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1516 "%p Dropping message queue.\n", chn);
1519 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1522 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1523 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1524 struct GNUNET_HashCode msg_id_hash;
1525 hash_key_from_hll (&msg_id_hash, msg_id);
1527 struct FragmentQueue *
1528 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1529 GNUNET_assert (NULL != fragq);
1530 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1531 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1534 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1535 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1541 * Received result of GNUNET_PSYCSTORE_fragment_store().
1544 store_recv_fragment_store_result (void *cls, int64_t result,
1545 const char *err_msg, uint16_t err_msg_size)
1547 struct Channel *chn = cls;
1548 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1549 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1550 chn, result, err_msg_size, err_msg);
1555 * Handle incoming message fragment from multicast.
1557 * Store it using PSYCstore and send it to the clients of the channel in order.
1560 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1562 struct Channel *chn = cls;
1563 uint16_t size = ntohs (mmsg->header.size);
1565 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1566 "%p Received multicast message of size %u. "
1567 "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1568 ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1570 GNUNET_ntohll (mmsg->fragment_id),
1571 GNUNET_ntohll (mmsg->message_id),
1572 GNUNET_ntohll (mmsg->fragment_offset),
1573 GNUNET_ntohll (mmsg->flags));
1575 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1576 &store_recv_fragment_store_result, chn);
1578 uint16_t first_ptype = 0, last_ptype = 0;
1579 int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1580 (const char *) &mmsg[1],
1581 &first_ptype, &last_ptype);
1582 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1583 "%p Message check result %d, first part type %u, last part type %u\n",
1584 chn, check, first_ptype, last_ptype);
1585 if (GNUNET_SYSERR == check)
1587 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1588 "%p Dropping incoming multicast message with invalid parts.\n",
1590 GNUNET_break_op (0);
1594 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1595 message_queue_run (chn);
1600 * Incoming request fragment from multicast for a master.
1602 * @param cls Master.
1603 * @param req The request.
1606 mcast_recv_request (void *cls,
1607 const struct GNUNET_MULTICAST_RequestHeader *req)
1609 struct Master *mst = cls;
1610 uint16_t size = ntohs (req->header.size);
1612 char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1613 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1614 "%p Received multicast request of size %u from %s.\n",
1618 uint16_t first_ptype = 0, last_ptype = 0;
1620 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1621 (const char *) &req[1],
1622 &first_ptype, &last_ptype))
1624 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1625 "%p Dropping incoming multicast request with invalid parts.\n",
1627 GNUNET_break_op (0);
1631 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1632 "Message parts: first: type %u, last: type %u\n",
1633 first_ptype, last_ptype);
1635 /* FIXME: in-order delivery */
1636 client_send_mcast_req (mst, req);
1641 * Response from PSYCstore with the current counter values for a channel master.
1644 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1645 uint64_t max_message_id, uint64_t max_group_generation,
1646 uint64_t max_state_message_id)
1648 struct Master *mst = cls;
1649 struct Channel *chn = &mst->channel;
1650 chn->store_op = NULL;
1652 struct GNUNET_PSYC_CountersResultMessage res;
1653 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1654 res.header.size = htons (sizeof (res));
1655 res.result_code = htonl (result);
1656 res.max_message_id = GNUNET_htonll (max_message_id);
1658 if (GNUNET_OK == result || GNUNET_NO == result)
1660 mst->max_message_id = max_message_id;
1661 chn->max_message_id = max_message_id;
1662 chn->max_state_message_id = max_state_message_id;
1663 mst->max_group_generation = max_group_generation;
1665 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1666 mcast_recv_join_request,
1667 mcast_recv_replay_fragment,
1668 mcast_recv_replay_message,
1670 mcast_recv_message, chn);
1671 chn->is_ready = GNUNET_YES;
1675 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1676 "%p GNUNET_PSYCSTORE_counters_get() "
1677 "returned %d for channel %s.\n",
1678 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1681 client_send_msg (chn, &res.header);
1686 * Response from PSYCstore with the current counter values for a channel slave.
1689 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1690 uint64_t max_message_id, uint64_t max_group_generation,
1691 uint64_t max_state_message_id)
1693 struct Slave *slv = cls;
1694 struct Channel *chn = &slv->channel;
1695 chn->store_op = NULL;
1697 struct GNUNET_PSYC_CountersResultMessage res;
1698 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1699 res.header.size = htons (sizeof (res));
1700 res.result_code = htonl (result);
1701 res.max_message_id = GNUNET_htonll (max_message_id);
1703 if (GNUNET_OK == result || GNUNET_NO == result)
1705 chn->max_message_id = max_message_id;
1706 chn->max_state_message_id = max_state_message_id;
1708 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1710 slv->relay_count, slv->relays,
1711 &slv->join_msg->header,
1712 mcast_recv_join_request,
1713 mcast_recv_join_decision,
1714 mcast_recv_replay_fragment,
1715 mcast_recv_replay_message,
1716 mcast_recv_message, chn);
1717 if (NULL != slv->join_msg)
1719 GNUNET_free (slv->join_msg);
1720 slv->join_msg = NULL;
1725 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1726 "%p GNUNET_PSYCSTORE_counters_get() "
1727 "returned %d for channel %s.\n",
1728 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1731 client_send_msg (chn, &res.header);
1736 channel_init (struct Channel *chn)
1739 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1740 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1745 * Handle a connecting client starting a channel master.
1748 handle_client_master_start (void *cls,
1749 const struct MasterStartRequest *req)
1751 struct Client *c = cls;
1752 struct GNUNET_SERVICE_Client *client = c->client;
1754 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1755 struct GNUNET_HashCode pub_key_hash;
1757 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1758 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1761 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1762 struct Channel *chn;
1766 mst = GNUNET_malloc (sizeof (*mst));
1767 mst->policy = ntohl (req->policy);
1768 mst->priv_key = req->channel_key;
1769 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1771 chn = c->channel = &mst->channel;
1773 chn->is_master = GNUNET_YES;
1774 chn->pub_key = pub_key;
1775 chn->pub_key_hash = pub_key_hash;
1778 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1779 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1780 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1781 store_recv_master_counters, mst);
1785 chn = &mst->channel;
1787 struct GNUNET_PSYC_CountersResultMessage *res;
1788 struct GNUNET_MQ_Envelope *
1789 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1790 res->result_code = htonl (GNUNET_OK);
1791 res->max_message_id = GNUNET_htonll (mst->max_message_id);
1793 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1796 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1797 "%p Client connected as master to channel %s.\n",
1798 mst, GNUNET_h2s (&chn->pub_key_hash));
1800 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1801 cli->client = client;
1802 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1804 GNUNET_SERVICE_client_continue (client);
1809 check_client_slave_join (void *cls,
1810 const struct SlaveJoinRequest *req)
1817 * Handle a connecting client joining as a channel slave.
1820 handle_client_slave_join (void *cls,
1821 const struct SlaveJoinRequest *req)
1823 struct Client *c = cls;
1824 struct GNUNET_SERVICE_Client *client = c->client;
1826 uint16_t req_size = ntohs (req->header.size);
1828 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1829 struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1831 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1832 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1833 GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1835 struct GNUNET_CONTAINER_MultiHashMap *
1836 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1837 struct Slave *slv = NULL;
1838 struct Channel *chn;
1840 if (NULL != chn_slv)
1842 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1846 slv = GNUNET_malloc (sizeof (*slv));
1847 slv->priv_key = req->slave_key;
1848 slv->pub_key = slv_pub_key;
1849 slv->pub_key_hash = slv_pub_hash;
1850 slv->origin = req->origin;
1851 slv->relay_count = ntohl (req->relay_count);
1852 slv->join_flags = ntohl (req->flags);
1854 const struct GNUNET_PeerIdentity *
1855 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1856 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1857 uint16_t join_msg_size = 0;
1859 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1862 struct GNUNET_PSYC_Message *
1863 join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1864 join_msg_size = ntohs (join_msg->header.size);
1865 slv->join_msg = GNUNET_malloc (join_msg_size);
1866 GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1868 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1870 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1871 "%u + %u + %u != %u\n",
1872 (unsigned int) sizeof (*req),
1877 GNUNET_SERVICE_client_drop (client);
1881 if (0 < slv->relay_count)
1883 slv->relays = GNUNET_malloc (relay_size);
1884 GNUNET_memcpy (slv->relays, &req[1], relay_size);
1887 chn = c->channel = &slv->channel;
1889 chn->is_master = GNUNET_NO;
1890 chn->pub_key = req->channel_pub_key;
1891 chn->pub_key_hash = pub_key_hash;
1894 if (NULL == chn_slv)
1896 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1897 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1898 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1900 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1901 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1902 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1903 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1904 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1905 &store_recv_slave_counters, slv);
1909 chn = &slv->channel;
1911 struct GNUNET_PSYC_CountersResultMessage *res;
1913 struct GNUNET_MQ_Envelope *
1914 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1915 res->result_code = htonl (GNUNET_OK);
1916 res->max_message_id = GNUNET_htonll (chn->max_message_id);
1918 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1920 if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1922 mcast_recv_join_decision (slv, GNUNET_YES,
1923 NULL, 0, NULL, NULL);
1925 else if (NULL == slv->member)
1928 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1930 slv->relay_count, slv->relays,
1931 &slv->join_msg->header,
1932 &mcast_recv_join_request,
1933 &mcast_recv_join_decision,
1934 &mcast_recv_replay_fragment,
1935 &mcast_recv_replay_message,
1936 &mcast_recv_message, chn);
1937 if (NULL != slv->join_msg)
1939 GNUNET_free (slv->join_msg);
1940 slv->join_msg = NULL;
1943 else if (NULL != slv->join_dcsn)
1945 struct GNUNET_MQ_Envelope *
1946 env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
1947 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1951 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1952 "%p Client connected as slave to channel %s.\n",
1953 slv, GNUNET_h2s (&chn->pub_key_hash));
1955 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1956 cli->client = client;
1957 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1959 GNUNET_SERVICE_client_continue (client);
1963 struct JoinDecisionClosure
1965 int32_t is_admitted;
1966 struct GNUNET_MessageHeader *msg;
1971 * Iterator callback for sending join decisions to multicast.
1974 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1977 struct JoinDecisionClosure *jcls = cls;
1978 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1979 // FIXME: add relays
1980 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1986 check_client_join_decision (void *cls,
1987 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1994 * Join decision from client.
1997 handle_client_join_decision (void *cls,
1998 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
2000 struct Client *c = cls;
2001 struct GNUNET_SERVICE_Client *client = c->client;
2002 struct Channel *chn = c->channel;
2006 GNUNET_SERVICE_client_drop (client);
2009 GNUNET_assert (GNUNET_YES == chn->is_master);
2010 struct Master *mst = chn->master;
2012 struct JoinDecisionClosure jcls;
2013 jcls.is_admitted = ntohl (dcsn->is_admitted);
2015 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
2016 ? (struct GNUNET_MessageHeader *) &dcsn[1]
2019 struct GNUNET_HashCode slave_pub_hash;
2020 GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
2023 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2024 "%p Got join decision (%d) from client for channel %s..\n",
2025 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
2026 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2027 "%p ..and slave %s.\n",
2028 mst, GNUNET_h2s (&slave_pub_hash));
2030 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
2031 &mcast_send_join_decision, &jcls);
2032 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
2033 GNUNET_SERVICE_client_continue (client);
2038 * Send acknowledgement to a client.
2040 * Sent after a message fragment has been passed on to multicast.
2042 * @param chn The channel struct for the client.
2045 send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2047 struct GNUNET_MessageHeader *res;
2048 struct GNUNET_MQ_Envelope *
2049 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
2052 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
2057 * Callback for the transmit functions of multicast.
2060 transmit_notify (void *cls, size_t *data_size, void *data)
2062 struct Channel *chn = cls;
2063 struct TransmitMessage *tmit_msg = chn->tmit_head;
2065 if (NULL == tmit_msg || *data_size < tmit_msg->size)
2067 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2068 "%p transmit_notify: nothing to send.\n", chn);
2069 if (NULL != tmit_msg && *data_size < tmit_msg->size)
2075 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2076 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2078 *data_size = tmit_msg->size;
2079 GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2082 = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2086 /* FIXME: handle disconnecting clients */
2087 if (NULL != tmit_msg->client)
2088 send_message_ack (chn, tmit_msg->client);
2090 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2092 if (NULL != chn->tmit_head)
2094 GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2096 else if (GNUNET_YES == chn->is_disconnected
2097 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2099 /* FIXME: handle partial message (when still in_transmit) */
2100 GNUNET_free (tmit_msg);
2101 return GNUNET_SYSERR;
2103 GNUNET_free (tmit_msg);
2109 * Callback for the transmit functions of multicast.
2112 master_transmit_notify (void *cls, size_t *data_size, void *data)
2114 int ret = transmit_notify (cls, data_size, data);
2116 if (GNUNET_YES == ret)
2118 struct Master *mst = cls;
2119 mst->tmit_handle = NULL;
2126 * Callback for the transmit functions of multicast.
2129 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2131 int ret = transmit_notify (cls, data_size, data);
2133 if (GNUNET_YES == ret)
2135 struct Slave *slv = cls;
2136 slv->tmit_handle = NULL;
2143 * Transmit a message from a channel master to the multicast group.
2146 master_transmit_message (struct Master *mst)
2148 struct Channel *chn = &mst->channel;
2149 struct TransmitMessage *tmit_msg = chn->tmit_head;
2150 if (NULL == tmit_msg)
2152 if (NULL == mst->tmit_handle)
2154 mst->tmit_handle = (void *) &mst->tmit_handle;
2155 struct GNUNET_MULTICAST_OriginTransmitHandle *
2156 tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id,
2157 mst->max_group_generation,
2158 master_transmit_notify, mst);
2159 if (NULL != mst->tmit_handle)
2160 mst->tmit_handle = tmit_handle;
2164 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2170 * Transmit a message from a channel slave to the multicast group.
2173 slave_transmit_message (struct Slave *slv)
2175 if (NULL == slv->channel.tmit_head)
2177 if (NULL == slv->tmit_handle)
2179 slv->tmit_handle = (void *) &slv->tmit_handle;
2180 struct GNUNET_MULTICAST_MemberTransmitHandle *
2181 tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, slv->channel.tmit_head->id,
2182 slave_transmit_notify, slv);
2183 if (NULL != slv->tmit_handle)
2184 slv->tmit_handle = tmit_handle;
2188 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2194 transmit_message (struct Channel *chn)
2197 ? master_transmit_message (chn->master)
2198 : slave_transmit_message (chn->slave);
2203 * Queue a message from a channel master for sending to the multicast group.
2206 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2208 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2210 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2212 tmit_msg->id = ++mst->max_message_id;
2213 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2214 "%p master_queue_message: message_id=%" PRIu64 "\n",
2216 struct GNUNET_PSYC_MessageMethod *pmeth
2217 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2219 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2221 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2223 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2225 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2226 "%p master_queue_message: state_delta=%" PRIu64 "\n",
2227 mst, tmit_msg->id - mst->max_state_message_id);
2228 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2229 - mst->max_state_message_id);
2230 mst->max_state_message_id = tmit_msg->id;
2234 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2235 "%p master_queue_message: state not modified\n", mst);
2236 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2239 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2241 /// @todo add state_hash to PSYC header
2248 * Queue a message from a channel slave for sending to the multicast group.
2251 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2253 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2255 struct GNUNET_PSYC_MessageMethod *pmeth
2256 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2257 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2258 tmit_msg->id = ++slv->max_request_id;
2264 * Queue PSYC message parts for sending to multicast.
2267 * Channel to send to.
2269 * Client the message originates from.
2273 * Concatenated message parts.
2274 * @param first_ptype
2275 * First message part type in @a data.
2277 * Last message part type in @a data.
2279 static struct TransmitMessage *
2280 queue_message (struct Channel *chn,
2281 struct GNUNET_SERVICE_Client *client,
2284 uint16_t first_ptype, uint16_t last_ptype)
2286 struct TransmitMessage *
2287 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2288 GNUNET_memcpy (&tmit_msg[1], data, data_size);
2289 tmit_msg->client = client;
2290 tmit_msg->size = data_size;
2291 tmit_msg->first_ptype = first_ptype;
2292 tmit_msg->last_ptype = last_ptype;
2294 /* FIXME: separate queue per message ID */
2296 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2299 ? master_queue_message (chn->master, tmit_msg)
2300 : slave_queue_message (chn->slave, tmit_msg);
2306 * Cancel transmission of current message.
2308 * @param chn Channel to send to.
2309 * @param client Client the message originates from.
2312 transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2314 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2316 struct GNUNET_MessageHeader msg;
2317 msg.size = htons (sizeof (msg));
2318 msg.type = htons (type);
2320 queue_message (chn, client, sizeof (msg), &msg, type, type);
2321 transmit_message (chn);
2323 /* FIXME: cleanup */
2328 check_client_psyc_message (void *cls,
2329 const struct GNUNET_MessageHeader *msg)
2336 * Incoming message from a master or slave client.
2339 handle_client_psyc_message (void *cls,
2340 const struct GNUNET_MessageHeader *msg)
2342 struct Client *c = cls;
2343 struct GNUNET_SERVICE_Client *client = c->client;
2344 struct Channel *chn = c->channel;
2348 GNUNET_SERVICE_client_drop (client);
2352 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2353 "%p Received message from client.\n", chn);
2354 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2356 if (GNUNET_YES != chn->is_ready)
2358 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2359 "%p Channel is not ready yet, disconnecting client.\n", chn);
2361 GNUNET_SERVICE_client_drop (client);
2365 uint16_t size = ntohs (msg->size);
2366 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2368 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2369 "%p Message payload too large: %u < %u.\n",
2371 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2372 (unsigned int) (size - sizeof (*msg)));
2374 transmit_cancel (chn, client);
2375 GNUNET_SERVICE_client_drop (client);
2379 uint16_t first_ptype = 0, last_ptype = 0;
2381 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2382 (const char *) &msg[1],
2383 &first_ptype, &last_ptype))
2385 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2386 "%p Received invalid message part from client.\n", chn);
2388 transmit_cancel (chn, client);
2389 GNUNET_SERVICE_client_drop (client);
2392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2393 "%p Received message with first part type %u and last part type %u.\n",
2394 chn, first_ptype, last_ptype);
2396 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2397 first_ptype, last_ptype);
2398 transmit_message (chn);
2399 /* FIXME: send a few ACKs even before transmit_notify is called */
2401 GNUNET_SERVICE_client_continue (client);
2406 * Received result of GNUNET_PSYCSTORE_membership_store()
2409 store_recv_membership_store_result (void *cls,
2411 const char *err_msg,
2412 uint16_t err_msg_size)
2414 struct Operation *op = cls;
2415 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2416 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2422 if (NULL != op->client)
2423 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2429 * Client requests to add/remove a slave in the membership database.
2432 handle_client_membership_store (void *cls,
2433 const struct ChannelMembershipStoreRequest *req)
2435 struct Client *c = cls;
2436 struct GNUNET_SERVICE_Client *client = c->client;
2437 struct Channel *chn = c->channel;
2441 GNUNET_SERVICE_client_drop (client);
2445 struct Operation *op = op_add (chn, client, req->op_id, 0);
2447 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2448 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2449 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2450 "%p Received membership store request from client.\n", chn);
2451 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2452 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2453 chn, req->did_join, announced_at, effective_since);
2455 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2456 req->did_join, announced_at, effective_since,
2457 0, /* FIXME: group_generation */
2458 &store_recv_membership_store_result, op);
2459 GNUNET_SERVICE_client_continue (client);
2464 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2465 * in response to a history request from a client.
2468 store_recv_fragment_history (void *cls,
2469 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2470 enum GNUNET_PSYCSTORE_MessageFlags flags)
2472 struct Operation *op = cls;
2473 if (NULL == op->client)
2474 { /* Requesting client already disconnected. */
2477 struct Channel *chn = op->channel;
2479 struct GNUNET_PSYC_MessageHeader *pmsg;
2480 uint16_t msize = ntohs (mmsg->header.size);
2481 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2483 struct GNUNET_OperationResultMessage *
2484 res = GNUNET_malloc (sizeof (*res) + psize);
2485 res->header.size = htons (sizeof (*res) + psize);
2486 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2487 res->op_id = op->op_id;
2488 res->result_code = GNUNET_htonll (GNUNET_OK);
2490 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2491 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2492 GNUNET_memcpy (&res[1], pmsg, psize);
2494 /** @todo FIXME: send only to requesting client */
2495 client_send_msg (chn, &res->header);
2503 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2504 * in response to a history request from a client.
2507 store_recv_fragment_history_result (void *cls, int64_t result,
2508 const char *err_msg, uint16_t err_msg_size)
2510 struct Operation *op = cls;
2511 if (NULL == op->client)
2512 { /* Requesting client already disconnected. */
2516 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2517 "%p History replay #%" PRIu64 ": "
2518 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2519 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2521 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2523 /** @todo Multicast replay request for messages not found locally. */
2526 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2532 check_client_history_replay (void *cls,
2533 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2540 * Client requests channel history.
2543 handle_client_history_replay (void *cls,
2544 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2546 struct Client *c = cls;
2547 struct GNUNET_SERVICE_Client *client = c->client;
2548 struct Channel *chn = c->channel;
2552 GNUNET_SERVICE_client_drop (client);
2556 uint16_t size = ntohs (req->header.size);
2557 const char *method_prefix = (const char *) &req[1];
2559 if (size < sizeof (*req) + 1
2560 || '\0' != method_prefix[size - sizeof (*req) - 1])
2562 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2563 "%p History replay #%" PRIu64 ": "
2564 "invalid method prefix. size: %u < %u?\n",
2566 GNUNET_ntohll (req->op_id),
2568 (unsigned int) sizeof (*req) + 1);
2570 GNUNET_SERVICE_client_drop (client);
2574 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2576 if (0 == req->message_limit)
2578 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2579 GNUNET_ntohll (req->start_message_id),
2580 GNUNET_ntohll (req->end_message_id),
2582 &store_recv_fragment_history,
2583 &store_recv_fragment_history_result, op);
2587 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2588 GNUNET_ntohll (req->message_limit),
2590 &store_recv_fragment_history,
2591 &store_recv_fragment_history_result,
2594 GNUNET_SERVICE_client_continue (client);
2599 * Received state var from PSYCstore, send it to client.
2602 store_recv_state_var (void *cls, const char *name,
2603 const void *value, uint32_t value_size)
2605 struct Operation *op = cls;
2606 struct GNUNET_OperationResultMessage *res;
2607 struct GNUNET_MQ_Envelope *env;
2609 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2610 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2611 op->channel, GNUNET_ntohll (op->op_id), name);
2613 if (NULL != name) /* First part */
2615 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2616 struct GNUNET_PSYC_MessageModifier *mod;
2617 env = GNUNET_MQ_msg_extra (res,
2618 sizeof (*mod) + name_size + value_size,
2619 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2620 res->op_id = op->op_id;
2622 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2623 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2624 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2625 mod->name_size = htons (name_size);
2626 mod->value_size = htonl (value_size);
2627 mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2628 GNUNET_memcpy (&mod[1], name, name_size);
2629 GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2631 else /* Continuation */
2633 struct GNUNET_MessageHeader *mod;
2634 env = GNUNET_MQ_msg_extra (res,
2635 sizeof (*mod) + value_size,
2636 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2637 res->op_id = op->op_id;
2639 mod = (struct GNUNET_MessageHeader *) &res[1];
2640 mod->size = htons (sizeof (*mod) + value_size);
2641 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2642 GNUNET_memcpy (&mod[1], value, value_size);
2645 // FIXME: client might have been disconnected
2646 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
2652 * Received result of GNUNET_PSYCSTORE_state_get()
2653 * or GNUNET_PSYCSTORE_state_get_prefix()
2656 store_recv_state_result (void *cls, int64_t result,
2657 const char *err_msg, uint16_t err_msg_size)
2659 struct Operation *op = cls;
2660 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2661 "%p state_get #%" PRIu64 ": "
2662 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2663 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2665 // FIXME: client might have been disconnected
2666 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2672 check_client_state_get (void *cls,
2673 const struct StateRequest *req)
2675 struct Client *c = cls;
2676 struct Channel *chn = c->channel;
2680 return GNUNET_SYSERR;
2683 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2684 const char *name = (const char *) &req[1];
2685 if (0 == name_size || '\0' != name[name_size - 1])
2688 return GNUNET_SYSERR;
2696 * Client requests best matching state variable from PSYCstore.
2699 handle_client_state_get (void *cls,
2700 const struct StateRequest *req)
2702 struct Client *c = cls;
2703 struct GNUNET_SERVICE_Client *client = c->client;
2704 struct Channel *chn = c->channel;
2706 const char *name = (const char *) &req[1];
2707 struct Operation *op = op_add (chn, client, req->op_id, 0);
2708 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2709 &store_recv_state_var,
2710 &store_recv_state_result, op);
2711 GNUNET_SERVICE_client_continue (client);
2716 check_client_state_get_prefix (void *cls,
2717 const struct StateRequest *req)
2719 struct Client *c = cls;
2720 struct Channel *chn = c->channel;
2724 return GNUNET_SYSERR;
2727 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2728 const char *name = (const char *) &req[1];
2729 if (0 == name_size || '\0' != name[name_size - 1])
2732 return GNUNET_SYSERR;
2740 * Client requests state variables with a given prefix from PSYCstore.
2743 handle_client_state_get_prefix (void *cls,
2744 const struct StateRequest *req)
2746 struct Client *c = cls;
2747 struct GNUNET_SERVICE_Client *client = c->client;
2748 struct Channel *chn = c->channel;
2750 const char *name = (const char *) &req[1];
2751 struct Operation *op = op_add (chn, client, req->op_id, 0);
2752 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2753 &store_recv_state_var,
2754 &store_recv_state_result, op);
2755 GNUNET_SERVICE_client_continue (client);
2760 * Initialize the PSYC service.
2762 * @param cls Closure.
2763 * @param server The initialized server.
2764 * @param c Configuration to use.
2768 const struct GNUNET_CONFIGURATION_Handle *c,
2769 struct GNUNET_SERVICE_Handle *svc)
2773 store = GNUNET_PSYCSTORE_connect (cfg);
2774 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2775 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2776 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2777 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2778 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2779 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2784 * Define "main" method using service macro.
2788 GNUNET_SERVICE_OPTION_NONE,
2790 client_notify_connect,
2791 client_notify_disconnect,
2793 GNUNET_MQ_hd_fixed_size (client_master_start,
2794 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
2795 struct MasterStartRequest,
2797 GNUNET_MQ_hd_var_size (client_slave_join,
2798 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
2799 struct SlaveJoinRequest,
2801 GNUNET_MQ_hd_var_size (client_join_decision,
2802 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2803 struct GNUNET_PSYC_JoinDecisionMessage,
2805 GNUNET_MQ_hd_var_size (client_psyc_message,
2806 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2807 struct GNUNET_MessageHeader,
2809 GNUNET_MQ_hd_fixed_size (client_membership_store,
2810 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
2811 struct ChannelMembershipStoreRequest,
2813 GNUNET_MQ_hd_var_size (client_history_replay,
2814 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
2815 struct GNUNET_PSYC_HistoryRequestMessage,
2817 GNUNET_MQ_hd_var_size (client_state_get,
2818 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
2819 struct StateRequest,
2821 GNUNET_MQ_hd_var_size (client_state_get_prefix,
2822 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2823 struct StateRequest,
2826 /* end of gnunet-service-psyc.c */