2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
42 * Handle to our current configuration.
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
49 static struct GNUNET_SERVICE_Handle *service;
52 * Handle to the statistics service.
54 static struct GNUNET_STATISTICS_Handle *stats;
57 * Handle to the PSYCstore.
59 static struct GNUNET_PSYCSTORE_Handle *store;
62 * All connected masters.
63 * Channel's pub_key_hash -> struct Master
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
68 * All connected slaves.
69 * Channel's pub_key_hash -> struct Slave
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
74 * Connected slaves per channel.
75 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
81 * Message in the transmission queue.
83 struct TransmitMessage
85 struct TransmitMessage *prev;
86 struct TransmitMessage *next;
88 struct GNUNET_SERVICE_Client *client;
91 * ID assigned to the message.
101 * Type of first message part.
103 uint16_t first_ptype;
106 * Type of last message part.
110 /* Followed by message */
115 * Cache for received message fragments.
116 * Message fragments are only sent to clients after all modifiers arrived.
118 * chan_key -> MultiHashMap chan_msgs
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
124 * Entry in the chan_msgs hashmap of @a recv_cache:
125 * fragment_id -> RecvCacheEntry
127 struct RecvCacheEntry
129 struct GNUNET_MULTICAST_MessageHeader *mmsg;
135 * Entry in the @a recv_frags hash map of a @a Channel.
136 * message_id -> FragmentQueue
141 * Fragment IDs stored in @a recv_cache.
143 struct GNUNET_CONTAINER_Heap *fragments;
146 * Total size of received fragments.
151 * Total size of received header fragments (METHOD & MODIFIERs)
153 uint64_t header_size;
156 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
158 uint64_t state_delta;
161 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
166 * Receive state of message.
168 * @see MessageFragmentState
173 * Whether the state is already modified in PSYCstore.
175 uint8_t state_is_modified;
178 * Is the message queued for delivery to the client?
179 * i.e. added to the recv_msgs queue
186 * List of connected clients.
190 struct ClientList *prev;
191 struct ClientList *next;
193 struct GNUNET_SERVICE_Client *client;
199 struct Operation *prev;
200 struct Operation *next;
202 struct GNUNET_SERVICE_Client *client;
203 struct Channel *channel;
210 * Common part of the client context for both a channel master and slave.
214 struct ClientList *clients_head;
215 struct ClientList *clients_tail;
217 struct Operation *op_head;
218 struct Operation *op_tail;
220 struct TransmitMessage *tmit_head;
221 struct TransmitMessage *tmit_tail;
224 * Current PSYCstore operation.
226 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
229 * Received fragments not yet sent to the client.
230 * message_id -> FragmentQueue
232 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
235 * Received message IDs not yet sent to the client.
237 struct GNUNET_CONTAINER_Heap *recv_msgs;
240 * Public key of the channel.
242 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
245 * Hash of @a pub_key.
247 struct GNUNET_HashCode pub_key_hash;
250 * Last message ID sent to the client.
251 * 0 if there is no such message.
253 uint64_t max_message_id;
256 * ID of the last stateful message, where the state operations has been
257 * processed and saved to PSYCstore and which has been sent to the client.
258 * 0 if there is no such message.
260 uint64_t max_state_message_id;
263 * Expected value size for the modifier being received from the PSYC service.
265 uint32_t tmit_mod_value_size_expected;
268 * Actual value size for the modifier being received from the PSYC service.
270 uint32_t tmit_mod_value_size;
273 * Is this channel ready to receive messages from client?
274 * #GNUNET_YES or #GNUNET_NO
279 * Is the client disconnected?
280 * #GNUNET_YES or #GNUNET_NO
282 uint8_t is_disconnected;
285 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
290 struct Master *master;
297 * Client context for a channel master.
302 * Channel struct common for Master and Slave
304 struct Channel channel;
307 * Private key of the channel.
309 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
312 * Handle for the multicast origin.
314 struct GNUNET_MULTICAST_Origin *origin;
317 * Transmit handle for multicast.
319 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
322 * Incoming join requests from multicast.
323 * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
325 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
328 * Last message ID transmitted to this channel.
330 * Incremented before sending a message, thus the message_id in messages sent
333 uint64_t max_message_id;
336 * ID of the last message with state operations transmitted to the channel.
337 * 0 if there is no such message.
339 uint64_t max_state_message_id;
342 * Maximum group generation transmitted to the channel.
344 uint64_t max_group_generation;
347 * @see enum GNUNET_PSYC_Policy
349 enum GNUNET_PSYC_Policy policy;
354 * Client context for a channel slave.
359 * Channel struct common for Master and Slave
361 struct Channel channel;
364 * Private key of the slave.
366 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
369 * Public key of the slave.
371 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
374 * Hash of @a pub_key.
376 struct GNUNET_HashCode pub_key_hash;
379 * Handle for the multicast member.
381 struct GNUNET_MULTICAST_Member *member;
384 * Transmit handle for multicast.
386 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
389 * Peer identity of the origin.
391 struct GNUNET_PeerIdentity origin;
394 * Number of items in @a relays.
396 uint32_t relay_count;
399 * Relays that multicast can use to connect.
401 struct GNUNET_PeerIdentity *relays;
404 * Join request to be transmitted to the master on join.
406 struct GNUNET_PSYC_Message *join_msg;
409 * Join decision received from multicast.
411 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
414 * Maximum request ID for this channel.
416 uint64_t max_request_id;
421 enum GNUNET_PSYC_SlaveJoinFlags join_flags;
429 struct GNUNET_SERVICE_Client *client;
430 struct Channel *channel;
434 struct ReplayRequestKey
436 uint64_t fragment_id;
438 uint64_t fragment_offset;
444 transmit_message (struct Channel *chn);
447 message_queue_run (struct Channel *chn);
450 message_queue_drop (struct Channel *chn);
454 schedule_transmit_message (void *cls)
456 struct Channel *chn = cls;
458 transmit_message (chn);
463 * Task run during shutdown.
468 shutdown_task (void *cls)
470 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471 "shutting down...\n");
472 GNUNET_PSYCSTORE_disconnect (store);
475 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
481 static struct Operation *
482 op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
483 uint64_t op_id, uint32_t flags)
485 struct Operation *op = GNUNET_malloc (sizeof (*op));
490 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
496 op_remove (struct Operation *op)
498 GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
504 * Clean up master data structures after a client disconnected.
507 cleanup_master (struct Master *mst)
509 struct Channel *chn = &mst->channel;
511 if (NULL != mst->origin)
512 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
513 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
514 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
519 * Clean up slave data structures after a client disconnected.
522 cleanup_slave (struct Slave *slv)
524 struct Channel *chn = &slv->channel;
525 struct GNUNET_CONTAINER_MultiHashMap *
526 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
528 GNUNET_assert (NULL != chn_slv);
529 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
531 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
533 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
535 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
537 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
539 if (NULL != slv->join_msg)
541 GNUNET_free (slv->join_msg);
542 slv->join_msg = NULL;
544 if (NULL != slv->relays)
546 GNUNET_free (slv->relays);
549 if (NULL != slv->member)
551 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
554 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
559 * Clean up channel data structures after a client disconnected.
562 cleanup_channel (struct Channel *chn)
564 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565 "%p Cleaning up channel %s. master? %u\n",
567 GNUNET_h2s (&chn->pub_key_hash),
569 message_queue_drop (chn);
570 GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
571 chn->recv_frags = NULL;
573 if (NULL != chn->store_op)
575 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
576 chn->store_op = NULL;
579 (GNUNET_YES == chn->is_master)
580 ? cleanup_master (chn->master)
581 : cleanup_slave (chn->slave);
587 * Called whenever a client is disconnected.
588 * Frees our resources associated with that client.
591 * @param client identification of the client
592 * @param app_ctx must match @a client
595 client_notify_disconnect (void *cls,
596 struct GNUNET_SERVICE_Client *client,
599 struct Client *c = app_ctx;
600 struct Channel *chn = c->channel;
605 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606 "%p User context is NULL in client_disconnect()\n",
612 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
613 "%p Client (%s) disconnected from channel %s\n",
615 (GNUNET_YES == chn->is_master) ? "master" : "slave",
616 GNUNET_h2s (&chn->pub_key_hash));
618 struct ClientList *cli = chn->clients_head;
621 if (cli->client == client)
623 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
630 struct Operation *op = chn->op_head;
633 if (op->client == client)
641 if (NULL == chn->clients_head)
642 { /* Last client disconnected. */
643 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
644 "%p Last client (%s) disconnected from channel %s\n",
646 (GNUNET_YES == chn->is_master) ? "master" : "slave",
647 GNUNET_h2s (&chn->pub_key_hash));
648 chn->is_disconnected = GNUNET_YES;
649 if (NULL != chn->tmit_head)
650 { /* Send pending messages to multicast before cleanup. */
651 transmit_message (chn);
655 cleanup_channel (chn);
662 * A new client connected.
665 * @param client client to add
666 * @param mq message queue for @a client
670 client_notify_connect (void *cls,
671 struct GNUNET_SERVICE_Client *client,
672 struct GNUNET_MQ_Handle *mq)
674 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
676 struct Client *c = GNUNET_malloc (sizeof (*c));
684 * Send message to all clients connected to the channel.
687 client_send_msg (const struct Channel *chn,
688 const struct GNUNET_MessageHeader *msg)
690 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
691 "%p Sending message to clients.\n",
694 struct ClientList *cli = chn->clients_head;
697 struct GNUNET_MQ_Envelope *
698 env = GNUNET_MQ_msg_copy (msg);
700 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
709 * Send a result code back to the client.
712 * Client that should receive the result code.
716 * Operation ID in network byte order.
718 * Data payload or NULL.
723 client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
724 int64_t result_code, const void *data, uint16_t data_size)
726 struct GNUNET_OperationResultMessage *res;
727 struct GNUNET_MQ_Envelope *
728 env = GNUNET_MQ_msg_extra (res,
730 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
731 res->result_code = GNUNET_htonll (result_code);
734 GNUNET_memcpy (&res[1], data, data_size);
736 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
737 "%p Sending result to client for operation #%" PRIu64 ": %" PRId64 " (size: %u)\n",
739 GNUNET_ntohll (op_id),
743 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
748 * Closure for join_mem_test_cb()
750 struct JoinMemTestClosure
752 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
753 struct Channel *channel;
754 struct GNUNET_MULTICAST_JoinHandle *join_handle;
755 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
760 * Membership test result callback used for join requests.
763 join_mem_test_cb (void *cls, int64_t result,
764 const char *err_msg, uint16_t err_msg_size)
766 struct JoinMemTestClosure *jcls = cls;
768 if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
769 { /* Pass on join request to client if this is a master channel */
770 struct Master *mst = jcls->channel->master;
771 struct GNUNET_HashCode slave_pub_hash;
772 GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
774 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
775 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
776 client_send_msg (jcls->channel, &jcls->join_msg->header);
780 if (GNUNET_SYSERR == result)
782 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
783 "Could not perform membership test (%.*s)\n",
784 err_msg_size, err_msg);
787 GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
789 GNUNET_free (jcls->join_msg);
795 * Incoming join request from multicast.
798 mcast_recv_join_request (void *cls,
799 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
800 const struct GNUNET_MessageHeader *join_msg,
801 struct GNUNET_MULTICAST_JoinHandle *jh)
803 struct Channel *chn = cls;
804 uint16_t join_msg_size = 0;
806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807 "%p Got join request.\n",
809 if (NULL != join_msg)
811 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
813 join_msg_size = ntohs (join_msg->size);
817 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
818 "%p Got join message with invalid type %u.\n",
820 ntohs (join_msg->type));
824 struct GNUNET_PSYC_JoinRequestMessage *
825 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
826 req->header.size = htons (sizeof (*req) + join_msg_size);
827 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
828 req->slave_pub_key = *slave_pub_key;
829 if (0 < join_msg_size)
830 GNUNET_memcpy (&req[1], join_msg, join_msg_size);
832 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
833 jcls->slave_pub_key = *slave_pub_key;
835 jcls->join_handle = jh;
836 jcls->join_msg = req;
838 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
839 chn->max_message_id, 0,
840 &join_mem_test_cb, jcls);
845 * Join decision received from multicast.
848 mcast_recv_join_decision (void *cls, int is_admitted,
849 const struct GNUNET_PeerIdentity *peer,
850 uint16_t relay_count,
851 const struct GNUNET_PeerIdentity *relays,
852 const struct GNUNET_MessageHeader *join_resp)
854 struct Slave *slv = cls;
855 struct Channel *chn = &slv->channel;
856 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857 "%p Got join decision: %d\n",
860 if (GNUNET_YES == chn->is_ready)
862 /* Already admitted */
866 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
867 struct GNUNET_PSYC_JoinDecisionMessage *
868 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
869 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
870 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
871 dcsn->is_admitted = htonl (is_admitted);
872 if (0 < join_resp_size)
873 GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
875 client_send_msg (chn, &dcsn->header);
877 if (GNUNET_YES == is_admitted
878 && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
880 chn->is_ready = GNUNET_YES;
886 store_recv_fragment_replay (void *cls,
887 struct GNUNET_MULTICAST_MessageHeader *msg,
888 enum GNUNET_PSYCSTORE_MessageFlags flags)
890 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
892 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
898 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
901 store_recv_fragment_replay_result (void *cls,
904 uint16_t err_msg_size)
906 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
908 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
920 GNUNET_MULTICAST_replay_response (rh, NULL,
921 GNUNET_MULTICAST_REC_NOT_FOUND);
924 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
925 GNUNET_MULTICAST_replay_response (rh, NULL,
926 GNUNET_MULTICAST_REC_ACCESS_DENIED);
930 GNUNET_MULTICAST_replay_response (rh, NULL,
931 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
934 /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
935 * an error code, so it must be ensured no further processing
936 * is attempted on 'rh'. Maybe this should be refactored as
937 * it doesn't look very intuitive. --lynX
939 GNUNET_MULTICAST_replay_response_end (rh);
944 * Incoming fragment replay request from multicast.
947 mcast_recv_replay_fragment (void *cls,
948 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
949 uint64_t fragment_id, uint64_t flags,
950 struct GNUNET_MULTICAST_ReplayHandle *rh)
953 struct Channel *chn = cls;
954 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
955 fragment_id, fragment_id,
956 &store_recv_fragment_replay,
957 &store_recv_fragment_replay_result, rh);
962 * Incoming message replay request from multicast.
965 mcast_recv_replay_message (void *cls,
966 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
968 uint64_t fragment_offset,
970 struct GNUNET_MULTICAST_ReplayHandle *rh)
972 struct Channel *chn = cls;
973 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
974 message_id, message_id, 1, NULL,
975 &store_recv_fragment_replay,
976 &store_recv_fragment_replay_result, rh);
981 * Convert an uint64_t in network byte order to a HashCode
982 * that can be used as key in a MultiHashMap
985 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
987 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
988 /* TODO: use built-in byte swap functions if available */
990 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
991 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
993 *key = (struct GNUNET_HashCode) {};
995 = (n << 32) | (n >> 32);
1000 * Convert an uint64_t in host byte order to a HashCode
1001 * that can be used as key in a MultiHashMap
1004 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
1006 #if __BYTE_ORDER == __BIG_ENDIAN
1007 hash_key_from_nll (key, n);
1008 #elif __BYTE_ORDER == __LITTLE_ENDIAN
1009 *key = (struct GNUNET_HashCode) {};
1010 *((uint64_t *) key) = n;
1012 #error byteorder undefined
1018 * Initialize PSYC message header.
1021 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1022 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1024 uint16_t size = ntohs (mmsg->header.size);
1025 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1027 pmsg->header.size = htons (psize);
1028 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1029 pmsg->message_id = mmsg->message_id;
1030 pmsg->fragment_offset = mmsg->fragment_offset;
1031 pmsg->flags = htonl (flags);
1033 GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1038 * Create a new PSYC message from a multicast message for sending it to clients.
1040 static inline struct GNUNET_PSYC_MessageHeader *
1041 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1043 struct GNUNET_PSYC_MessageHeader *pmsg;
1044 uint16_t size = ntohs (mmsg->header.size);
1045 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1047 pmsg = GNUNET_malloc (psize);
1048 psyc_msg_init (pmsg, mmsg, flags);
1054 * Send multicast message to all clients connected to the channel.
1057 client_send_mcast_msg (struct Channel *chn,
1058 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1061 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1062 "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1064 GNUNET_ntohll (mmsg->fragment_id),
1065 GNUNET_ntohll (mmsg->message_id));
1067 struct GNUNET_PSYC_MessageHeader *
1068 pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1069 client_send_msg (chn, &pmsg->header);
1075 * Send multicast request to all clients connected to the channel.
1078 client_send_mcast_req (struct Master *mst,
1079 const struct GNUNET_MULTICAST_RequestHeader *req)
1081 struct Channel *chn = &mst->channel;
1083 struct GNUNET_PSYC_MessageHeader *pmsg;
1084 uint16_t size = ntohs (req->header.size);
1085 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1087 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1088 "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1090 GNUNET_ntohll (req->fragment_id),
1091 GNUNET_ntohll (req->request_id));
1093 pmsg = GNUNET_malloc (psize);
1094 pmsg->header.size = htons (psize);
1095 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1096 pmsg->message_id = req->request_id;
1097 pmsg->fragment_offset = req->fragment_offset;
1098 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1099 pmsg->slave_pub_key = req->member_pub_key;
1100 GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1102 client_send_msg (chn, &pmsg->header);
1104 /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1111 * Insert a multicast message fragment into the queue belonging to the message.
1113 * @param chn Channel.
1114 * @param mmsg Multicast message fragment.
1115 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
1116 * @param first_ptype First PSYC message part type in @a mmsg.
1117 * @param last_ptype Last PSYC message part type in @a mmsg.
1120 fragment_queue_insert (struct Channel *chn,
1121 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1122 uint16_t first_ptype, uint16_t last_ptype)
1124 const uint16_t size = ntohs (mmsg->header.size);
1125 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1126 struct GNUNET_CONTAINER_MultiHashMap
1127 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1128 &chn->pub_key_hash);
1130 struct GNUNET_HashCode msg_id_hash;
1131 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1133 struct FragmentQueue
1134 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1138 fragq = GNUNET_malloc (sizeof (*fragq));
1139 fragq->state = MSG_FRAG_STATE_HEADER;
1141 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1143 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1144 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1146 if (NULL == chan_msgs)
1148 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1149 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1150 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1154 struct GNUNET_HashCode frag_id_hash;
1155 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1156 struct RecvCacheEntry
1157 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1158 if (NULL == cache_entry)
1160 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1161 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1163 GNUNET_ntohll (mmsg->message_id),
1164 GNUNET_ntohll (mmsg->fragment_id));
1165 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1166 "%p header_size: %" PRIu64 " + %u\n",
1170 cache_entry = GNUNET_malloc (sizeof (*cache_entry));
1171 cache_entry->ref_count = 1;
1172 cache_entry->mmsg = GNUNET_malloc (size);
1173 GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1174 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1175 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1179 cache_entry->ref_count++;
1180 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1181 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1183 GNUNET_ntohll (mmsg->message_id),
1184 GNUNET_ntohll (mmsg->fragment_id),
1185 cache_entry->ref_count);
1188 if (MSG_FRAG_STATE_HEADER == fragq->state)
1190 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1192 struct GNUNET_PSYC_MessageMethod *
1193 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1194 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1195 fragq->flags = ntohl (pmeth->flags);
1198 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1200 fragq->header_size += size;
1202 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1203 || frag_offset == fragq->header_size)
1204 { /* header is now complete */
1205 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1206 "%p Header of message %" PRIu64 " is complete.\n",
1208 GNUNET_ntohll (mmsg->message_id));
1210 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1211 "%p Adding message %" PRIu64 " to queue.\n",
1213 GNUNET_ntohll (mmsg->message_id));
1214 fragq->state = MSG_FRAG_STATE_DATA;
1218 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1219 "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1221 GNUNET_ntohll (mmsg->message_id),
1223 fragq->header_size);
1229 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1230 if (frag_offset == fragq->size)
1231 fragq->state = MSG_FRAG_STATE_END;
1233 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1234 "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1236 GNUNET_ntohll (mmsg->message_id),
1241 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1242 /* Drop message without delivering to client if it's a single fragment */
1244 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1245 ? MSG_FRAG_STATE_DROP
1246 : MSG_FRAG_STATE_CANCEL;
1249 switch (fragq->state)
1251 case MSG_FRAG_STATE_DATA:
1252 case MSG_FRAG_STATE_END:
1253 case MSG_FRAG_STATE_CANCEL:
1254 if (GNUNET_NO == fragq->is_queued)
1256 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1257 GNUNET_ntohll (mmsg->message_id));
1258 fragq->is_queued = GNUNET_YES;
1262 fragq->size += size;
1263 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1264 GNUNET_ntohll (mmsg->fragment_id));
1269 * Run fragment queue of a message.
1271 * Send fragments of a message in order to client, after all modifiers arrived
1277 * ID of the message @a fragq belongs to.
1279 * Fragment queue of the message.
1281 * Drop message without delivering to client?
1282 * #GNUNET_YES or #GNUNET_NO.
1285 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1286 struct FragmentQueue *fragq, uint8_t drop)
1288 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1289 "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1294 struct GNUNET_CONTAINER_MultiHashMap
1295 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1296 &chn->pub_key_hash);
1297 GNUNET_assert (NULL != chan_msgs);
1300 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1303 struct GNUNET_HashCode frag_id_hash;
1304 hash_key_from_hll (&frag_id_hash, frag_id);
1305 struct RecvCacheEntry *cache_entry
1306 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1307 if (cache_entry != NULL)
1309 if (GNUNET_NO == drop)
1311 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1313 if (cache_entry->ref_count <= 1)
1315 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1317 GNUNET_free (cache_entry->mmsg);
1318 GNUNET_free (cache_entry);
1322 cache_entry->ref_count--;
1325 #if CACHE_AGING_IMPLEMENTED
1326 else if (GNUNET_NO == drop)
1328 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1332 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1335 if (MSG_FRAG_STATE_END <= fragq->state)
1337 struct GNUNET_HashCode msg_id_hash;
1338 hash_key_from_hll (&msg_id_hash, msg_id);
1340 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1341 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1342 GNUNET_free (fragq);
1346 fragq->is_queued = GNUNET_NO;
1351 struct StateModifyClosure
1353 struct Channel *channel;
1355 struct GNUNET_HashCode msg_id_hash;
1360 store_recv_state_modify_result (void *cls, int64_t result,
1361 const char *err_msg, uint16_t err_msg_size)
1363 struct StateModifyClosure *mcls = cls;
1364 struct Channel *chn = mcls->channel;
1365 uint64_t msg_id = mcls->msg_id;
1367 struct FragmentQueue *
1368 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1370 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1371 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1372 chn, result, err_msg_size, err_msg);
1379 fragq->state_is_modified = GNUNET_YES;
1380 if (chn->max_state_message_id < msg_id)
1381 chn->max_state_message_id = msg_id;
1382 if (chn->max_message_id < msg_id)
1383 chn->max_message_id = msg_id;
1386 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1387 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1388 message_queue_run (chn);
1392 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1393 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1394 chn, result, err_msg_size, err_msg);
1395 /** @todo FIXME: handle state_modify error */
1401 * Run message queue.
1403 * Send messages in queue to client in order after a message has arrived from
1404 * multicast, according to the following:
1405 * - A message is only sent if all of its modifiers arrived.
1406 * - A stateful message is only sent if the previous stateful message
1407 * has already been delivered to the client.
1409 * @param chn Channel.
1411 * @return Number of messages removed from queue and sent to client.
1414 message_queue_run (struct Channel *chn)
1416 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1417 "%p Running message queue.\n", chn);
1421 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1424 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1425 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1426 struct GNUNET_HashCode msg_id_hash;
1427 hash_key_from_hll (&msg_id_hash, msg_id);
1429 struct FragmentQueue *
1430 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1432 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1434 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1435 "%p No fragq (%p) or header not complete.\n",
1440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1441 "%p Fragment queue entry: state: %u, state delta: "
1442 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1443 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1445 if (MSG_FRAG_STATE_DATA <= fragq->state)
1447 /* Check if there's a missing message before the current one */
1448 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1450 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1452 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1453 && (chn->max_message_id != msg_id - 1
1454 && chn->max_message_id != msg_id))
1456 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1457 "%p Out of order message. "
1458 "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1459 chn, chn->max_message_id, msg_id);
1461 // FIXME: keep track of messages processed in this queue run,
1462 // and only stop after reaching the end
1467 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1468 if (GNUNET_YES != fragq->state_is_modified)
1470 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1472 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1473 "%p Out of order stateful message. "
1474 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1475 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1477 // FIXME: keep track of messages processed in this queue run,
1478 // and only stop after reaching the end
1481 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1482 mcls->channel = chn;
1483 mcls->msg_id = msg_id;
1484 mcls->msg_id_hash = msg_id_hash;
1486 /* Apply modifiers to state in PSYCstore */
1487 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1489 store_recv_state_modify_result, mcls);
1490 break; // continue after asynchronous state modify result
1493 chn->max_message_id = msg_id;
1495 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1496 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1500 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1501 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1507 * Drop message queue of a channel.
1509 * Remove all messages in queue without sending it to clients.
1511 * @param chn Channel.
1513 * @return Number of messages removed from queue.
1516 message_queue_drop (struct Channel *chn)
1518 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1519 "%p Dropping message queue.\n", chn);
1522 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1525 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1526 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1527 struct GNUNET_HashCode msg_id_hash;
1528 hash_key_from_hll (&msg_id_hash, msg_id);
1530 struct FragmentQueue *
1531 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1532 GNUNET_assert (NULL != fragq);
1533 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1534 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1537 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1538 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1544 * Received result of GNUNET_PSYCSTORE_fragment_store().
1547 store_recv_fragment_store_result (void *cls, int64_t result,
1548 const char *err_msg, uint16_t err_msg_size)
1550 struct Channel *chn = cls;
1551 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1552 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1553 chn, result, err_msg_size, err_msg);
1558 * Handle incoming message fragment from multicast.
1560 * Store it using PSYCstore and send it to the clients of the channel in order.
1563 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1565 struct Channel *chn = cls;
1566 uint16_t size = ntohs (mmsg->header.size);
1568 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1569 "%p Received multicast message of size %u. "
1570 "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1571 ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1573 GNUNET_ntohll (mmsg->fragment_id),
1574 GNUNET_ntohll (mmsg->message_id),
1575 GNUNET_ntohll (mmsg->fragment_offset),
1576 GNUNET_ntohll (mmsg->flags));
1578 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1579 &store_recv_fragment_store_result, chn);
1581 uint16_t first_ptype = 0, last_ptype = 0;
1582 int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1583 (const char *) &mmsg[1],
1584 &first_ptype, &last_ptype);
1585 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1586 "%p Message check result %d, first part type %u, last part type %u\n",
1587 chn, check, first_ptype, last_ptype);
1588 if (GNUNET_SYSERR == check)
1590 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1591 "%p Dropping incoming multicast message with invalid parts.\n",
1593 GNUNET_break_op (0);
1597 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1598 message_queue_run (chn);
1603 * Incoming request fragment from multicast for a master.
1605 * @param cls Master.
1606 * @param req The request.
1609 mcast_recv_request (void *cls,
1610 const struct GNUNET_MULTICAST_RequestHeader *req)
1612 struct Master *mst = cls;
1613 uint16_t size = ntohs (req->header.size);
1615 char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1616 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617 "%p Received multicast request of size %u from %s.\n",
1621 uint16_t first_ptype = 0, last_ptype = 0;
1623 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1624 (const char *) &req[1],
1625 &first_ptype, &last_ptype))
1627 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1628 "%p Dropping incoming multicast request with invalid parts.\n",
1630 GNUNET_break_op (0);
1634 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1635 "Message parts: first: type %u, last: type %u\n",
1636 first_ptype, last_ptype);
1638 /* FIXME: in-order delivery */
1639 client_send_mcast_req (mst, req);
1644 * Response from PSYCstore with the current counter values for a channel master.
1647 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1648 uint64_t max_message_id, uint64_t max_group_generation,
1649 uint64_t max_state_message_id)
1651 struct Master *mst = cls;
1652 struct Channel *chn = &mst->channel;
1653 chn->store_op = NULL;
1655 struct GNUNET_PSYC_CountersResultMessage res;
1656 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1657 res.header.size = htons (sizeof (res));
1658 res.result_code = htonl (result);
1659 res.max_message_id = GNUNET_htonll (max_message_id);
1661 if (GNUNET_OK == result || GNUNET_NO == result)
1663 mst->max_message_id = max_message_id;
1664 chn->max_message_id = max_message_id;
1665 chn->max_state_message_id = max_state_message_id;
1666 mst->max_group_generation = max_group_generation;
1668 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1669 mcast_recv_join_request,
1670 mcast_recv_replay_fragment,
1671 mcast_recv_replay_message,
1673 mcast_recv_message, chn);
1674 chn->is_ready = GNUNET_YES;
1678 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1679 "%p GNUNET_PSYCSTORE_counters_get() "
1680 "returned %d for channel %s.\n",
1681 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1684 client_send_msg (chn, &res.header);
1689 * Response from PSYCstore with the current counter values for a channel slave.
1692 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1693 uint64_t max_message_id, uint64_t max_group_generation,
1694 uint64_t max_state_message_id)
1696 struct Slave *slv = cls;
1697 struct Channel *chn = &slv->channel;
1698 chn->store_op = NULL;
1700 struct GNUNET_PSYC_CountersResultMessage res;
1701 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1702 res.header.size = htons (sizeof (res));
1703 res.result_code = htonl (result);
1704 res.max_message_id = GNUNET_htonll (max_message_id);
1706 if (GNUNET_OK == result || GNUNET_NO == result)
1708 chn->max_message_id = max_message_id;
1709 chn->max_state_message_id = max_state_message_id;
1711 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1713 slv->relay_count, slv->relays,
1714 &slv->join_msg->header,
1715 mcast_recv_join_request,
1716 mcast_recv_join_decision,
1717 mcast_recv_replay_fragment,
1718 mcast_recv_replay_message,
1719 mcast_recv_message, chn);
1720 if (NULL != slv->join_msg)
1722 GNUNET_free (slv->join_msg);
1723 slv->join_msg = NULL;
1728 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1729 "%p GNUNET_PSYCSTORE_counters_get() "
1730 "returned %d for channel %s.\n",
1731 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1734 client_send_msg (chn, &res.header);
1739 channel_init (struct Channel *chn)
1742 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1743 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1748 * Handle a connecting client starting a channel master.
1751 handle_client_master_start (void *cls,
1752 const struct MasterStartRequest *req)
1754 struct Client *c = cls;
1755 struct GNUNET_SERVICE_Client *client = c->client;
1757 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1758 struct GNUNET_HashCode pub_key_hash;
1760 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1761 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1764 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1765 struct Channel *chn;
1769 mst = GNUNET_malloc (sizeof (*mst));
1770 mst->policy = ntohl (req->policy);
1771 mst->priv_key = req->channel_key;
1772 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1774 chn = c->channel = &mst->channel;
1776 chn->is_master = GNUNET_YES;
1777 chn->pub_key = pub_key;
1778 chn->pub_key_hash = pub_key_hash;
1781 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1782 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1783 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1784 store_recv_master_counters, mst);
1788 chn = &mst->channel;
1790 struct GNUNET_PSYC_CountersResultMessage *res;
1791 struct GNUNET_MQ_Envelope *
1792 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1793 res->result_code = htonl (GNUNET_OK);
1794 res->max_message_id = GNUNET_htonll (mst->max_message_id);
1796 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1799 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1800 "%p Client connected as master to channel %s.\n",
1801 mst, GNUNET_h2s (&chn->pub_key_hash));
1803 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1804 cli->client = client;
1805 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1807 GNUNET_SERVICE_client_continue (client);
1812 check_client_slave_join (void *cls,
1813 const struct SlaveJoinRequest *req)
1820 * Handle a connecting client joining as a channel slave.
1823 handle_client_slave_join (void *cls,
1824 const struct SlaveJoinRequest *req)
1826 struct Client *c = cls;
1827 struct GNUNET_SERVICE_Client *client = c->client;
1829 uint16_t req_size = ntohs (req->header.size);
1831 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1832 struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1834 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1835 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1836 GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1838 struct GNUNET_CONTAINER_MultiHashMap *
1839 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1840 struct Slave *slv = NULL;
1841 struct Channel *chn;
1843 if (NULL != chn_slv)
1845 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1849 slv = GNUNET_malloc (sizeof (*slv));
1850 slv->priv_key = req->slave_key;
1851 slv->pub_key = slv_pub_key;
1852 slv->pub_key_hash = slv_pub_hash;
1853 slv->origin = req->origin;
1854 slv->relay_count = ntohl (req->relay_count);
1855 slv->join_flags = ntohl (req->flags);
1857 const struct GNUNET_PeerIdentity *
1858 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1859 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1860 uint16_t join_msg_size = 0;
1862 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1865 struct GNUNET_PSYC_Message *
1866 join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1867 join_msg_size = ntohs (join_msg->header.size);
1868 slv->join_msg = GNUNET_malloc (join_msg_size);
1869 GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1871 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1873 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1874 "%u + %u + %u != %u\n",
1875 (unsigned int) sizeof (*req),
1880 GNUNET_SERVICE_client_drop (client);
1884 if (0 < slv->relay_count)
1886 slv->relays = GNUNET_malloc (relay_size);
1887 GNUNET_memcpy (slv->relays, &req[1], relay_size);
1890 chn = c->channel = &slv->channel;
1892 chn->is_master = GNUNET_NO;
1893 chn->pub_key = req->channel_pub_key;
1894 chn->pub_key_hash = pub_key_hash;
1897 if (NULL == chn_slv)
1899 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1900 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1901 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1903 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1904 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1905 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1906 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1907 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1908 &store_recv_slave_counters, slv);
1912 chn = &slv->channel;
1914 struct GNUNET_PSYC_CountersResultMessage *res;
1916 struct GNUNET_MQ_Envelope *
1917 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1918 res->result_code = htonl (GNUNET_OK);
1919 res->max_message_id = GNUNET_htonll (chn->max_message_id);
1921 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1923 if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1925 mcast_recv_join_decision (slv, GNUNET_YES,
1926 NULL, 0, NULL, NULL);
1928 else if (NULL == slv->member)
1931 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1933 slv->relay_count, slv->relays,
1934 &slv->join_msg->header,
1935 &mcast_recv_join_request,
1936 &mcast_recv_join_decision,
1937 &mcast_recv_replay_fragment,
1938 &mcast_recv_replay_message,
1939 &mcast_recv_message, chn);
1940 if (NULL != slv->join_msg)
1942 GNUNET_free (slv->join_msg);
1943 slv->join_msg = NULL;
1946 else if (NULL != slv->join_dcsn)
1948 struct GNUNET_MQ_Envelope *
1949 env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
1950 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1954 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1955 "%p Client connected as slave to channel %s.\n",
1956 slv, GNUNET_h2s (&chn->pub_key_hash));
1958 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1959 cli->client = client;
1960 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1962 GNUNET_SERVICE_client_continue (client);
1966 struct JoinDecisionClosure
1968 int32_t is_admitted;
1969 struct GNUNET_MessageHeader *msg;
1974 * Iterator callback for sending join decisions to multicast.
1977 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1980 struct JoinDecisionClosure *jcls = cls;
1981 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1982 // FIXME: add relays
1983 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1989 check_client_join_decision (void *cls,
1990 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1997 * Join decision from client.
2000 handle_client_join_decision (void *cls,
2001 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
2003 struct Client *c = cls;
2004 struct GNUNET_SERVICE_Client *client = c->client;
2005 struct Channel *chn = c->channel;
2009 GNUNET_SERVICE_client_drop (client);
2012 GNUNET_assert (GNUNET_YES == chn->is_master);
2013 struct Master *mst = chn->master;
2015 struct JoinDecisionClosure jcls;
2016 jcls.is_admitted = ntohl (dcsn->is_admitted);
2018 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
2019 ? (struct GNUNET_MessageHeader *) &dcsn[1]
2022 struct GNUNET_HashCode slave_pub_hash;
2023 GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
2026 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2027 "%p Got join decision (%d) from client for channel %s..\n",
2028 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
2029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2030 "%p ..and slave %s.\n",
2031 mst, GNUNET_h2s (&slave_pub_hash));
2033 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
2034 &mcast_send_join_decision, &jcls);
2035 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
2036 GNUNET_SERVICE_client_continue (client);
2041 * Send acknowledgement to a client.
2043 * Sent after a message fragment has been passed on to multicast.
2045 * @param chn The channel struct for the client.
2048 send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2050 struct GNUNET_MessageHeader *res;
2051 struct GNUNET_MQ_Envelope *
2052 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
2055 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
2060 * Callback for the transmit functions of multicast.
2063 transmit_notify (void *cls, size_t *data_size, void *data)
2065 struct Channel *chn = cls;
2066 struct TransmitMessage *tmit_msg = chn->tmit_head;
2068 if (NULL == tmit_msg || *data_size < tmit_msg->size)
2070 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2071 "%p transmit_notify: nothing to send.\n", chn);
2072 if (NULL != tmit_msg && *data_size < tmit_msg->size)
2078 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2079 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2081 *data_size = tmit_msg->size;
2082 GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2085 = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2089 /* FIXME: handle disconnecting clients */
2090 if (NULL != tmit_msg->client)
2091 send_message_ack (chn, tmit_msg->client);
2093 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2095 if (NULL != chn->tmit_head)
2097 GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2099 else if (GNUNET_YES == chn->is_disconnected
2100 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2102 /* FIXME: handle partial message (when still in_transmit) */
2103 GNUNET_free (tmit_msg);
2104 return GNUNET_SYSERR;
2106 GNUNET_free (tmit_msg);
2112 * Callback for the transmit functions of multicast.
2115 master_transmit_notify (void *cls, size_t *data_size, void *data)
2117 int ret = transmit_notify (cls, data_size, data);
2119 if (GNUNET_YES == ret)
2121 struct Master *mst = cls;
2122 mst->tmit_handle = NULL;
2129 * Callback for the transmit functions of multicast.
2132 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2134 int ret = transmit_notify (cls, data_size, data);
2136 if (GNUNET_YES == ret)
2138 struct Slave *slv = cls;
2139 slv->tmit_handle = NULL;
2146 * Transmit a message from a channel master to the multicast group.
2149 master_transmit_message (struct Master *mst)
2151 struct Channel *chn = &mst->channel;
2152 struct TransmitMessage *tmit_msg = chn->tmit_head;
2153 if (NULL == tmit_msg)
2155 if (NULL == mst->tmit_handle)
2157 mst->tmit_handle = (void *) &mst->tmit_handle;
2158 struct GNUNET_MULTICAST_OriginTransmitHandle *
2159 tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id,
2160 mst->max_group_generation,
2161 master_transmit_notify, mst);
2162 if (NULL != mst->tmit_handle)
2163 mst->tmit_handle = tmit_handle;
2167 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2173 * Transmit a message from a channel slave to the multicast group.
2176 slave_transmit_message (struct Slave *slv)
2178 if (NULL == slv->channel.tmit_head)
2180 if (NULL == slv->tmit_handle)
2182 slv->tmit_handle = (void *) &slv->tmit_handle;
2183 struct GNUNET_MULTICAST_MemberTransmitHandle *
2184 tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, slv->channel.tmit_head->id,
2185 slave_transmit_notify, slv);
2186 if (NULL != slv->tmit_handle)
2187 slv->tmit_handle = tmit_handle;
2191 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2197 transmit_message (struct Channel *chn)
2200 ? master_transmit_message (chn->master)
2201 : slave_transmit_message (chn->slave);
2206 * Queue a message from a channel master for sending to the multicast group.
2209 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2211 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2213 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2215 tmit_msg->id = ++mst->max_message_id;
2216 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2217 "%p master_queue_message: message_id=%" PRIu64 "\n",
2219 struct GNUNET_PSYC_MessageMethod *pmeth
2220 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2222 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2224 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2226 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2228 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2229 "%p master_queue_message: state_delta=%" PRIu64 "\n",
2230 mst, tmit_msg->id - mst->max_state_message_id);
2231 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2232 - mst->max_state_message_id);
2233 mst->max_state_message_id = tmit_msg->id;
2237 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2238 "%p master_queue_message: state not modified\n", mst);
2239 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2242 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2244 /// @todo add state_hash to PSYC header
2251 * Queue a message from a channel slave for sending to the multicast group.
2254 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2256 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2258 struct GNUNET_PSYC_MessageMethod *pmeth
2259 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2260 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2261 tmit_msg->id = ++slv->max_request_id;
2267 * Queue PSYC message parts for sending to multicast.
2270 * Channel to send to.
2272 * Client the message originates from.
2276 * Concatenated message parts.
2277 * @param first_ptype
2278 * First message part type in @a data.
2280 * Last message part type in @a data.
2282 static struct TransmitMessage *
2283 queue_message (struct Channel *chn,
2284 struct GNUNET_SERVICE_Client *client,
2287 uint16_t first_ptype, uint16_t last_ptype)
2289 struct TransmitMessage *
2290 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2291 GNUNET_memcpy (&tmit_msg[1], data, data_size);
2292 tmit_msg->client = client;
2293 tmit_msg->size = data_size;
2294 tmit_msg->first_ptype = first_ptype;
2295 tmit_msg->last_ptype = last_ptype;
2297 /* FIXME: separate queue per message ID */
2299 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2302 ? master_queue_message (chn->master, tmit_msg)
2303 : slave_queue_message (chn->slave, tmit_msg);
2309 * Cancel transmission of current message.
2311 * @param chn Channel to send to.
2312 * @param client Client the message originates from.
2315 transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2317 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2319 struct GNUNET_MessageHeader msg;
2320 msg.size = htons (sizeof (msg));
2321 msg.type = htons (type);
2323 queue_message (chn, client, sizeof (msg), &msg, type, type);
2324 transmit_message (chn);
2326 /* FIXME: cleanup */
2331 check_client_psyc_message (void *cls,
2332 const struct GNUNET_MessageHeader *msg)
2339 * Incoming message from a master or slave client.
2342 handle_client_psyc_message (void *cls,
2343 const struct GNUNET_MessageHeader *msg)
2345 struct Client *c = cls;
2346 struct GNUNET_SERVICE_Client *client = c->client;
2347 struct Channel *chn = c->channel;
2351 GNUNET_SERVICE_client_drop (client);
2355 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2356 "%p Received message from client.\n", chn);
2357 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2359 if (GNUNET_YES != chn->is_ready)
2361 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2362 "%p Channel is not ready yet, disconnecting client.\n", chn);
2364 GNUNET_SERVICE_client_drop (client);
2368 uint16_t size = ntohs (msg->size);
2369 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2371 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2372 "%p Message payload too large: %u < %u.\n",
2374 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2375 (unsigned int) (size - sizeof (*msg)));
2377 transmit_cancel (chn, client);
2378 GNUNET_SERVICE_client_drop (client);
2382 uint16_t first_ptype = 0, last_ptype = 0;
2384 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2385 (const char *) &msg[1],
2386 &first_ptype, &last_ptype))
2388 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2389 "%p Received invalid message part from client.\n", chn);
2391 transmit_cancel (chn, client);
2392 GNUNET_SERVICE_client_drop (client);
2395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2396 "%p Received message with first part type %u and last part type %u.\n",
2397 chn, first_ptype, last_ptype);
2399 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2400 first_ptype, last_ptype);
2401 transmit_message (chn);
2402 /* FIXME: send a few ACKs even before transmit_notify is called */
2404 GNUNET_SERVICE_client_continue (client);
2409 * Received result of GNUNET_PSYCSTORE_membership_store()
2412 store_recv_membership_store_result (void *cls,
2414 const char *err_msg,
2415 uint16_t err_msg_size)
2417 struct Operation *op = cls;
2418 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2419 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2425 if (NULL != op->client)
2426 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2432 * Client requests to add/remove a slave in the membership database.
2435 handle_client_membership_store (void *cls,
2436 const struct ChannelMembershipStoreRequest *req)
2438 struct Client *c = cls;
2439 struct GNUNET_SERVICE_Client *client = c->client;
2440 struct Channel *chn = c->channel;
2444 GNUNET_SERVICE_client_drop (client);
2448 struct Operation *op = op_add (chn, client, req->op_id, 0);
2450 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2451 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2453 "%p Received membership store request from client.\n", chn);
2454 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2455 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2456 chn, req->did_join, announced_at, effective_since);
2458 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2459 req->did_join, announced_at, effective_since,
2460 0, /* FIXME: group_generation */
2461 &store_recv_membership_store_result, op);
2462 GNUNET_SERVICE_client_continue (client);
2467 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2468 * in response to a history request from a client.
2471 store_recv_fragment_history (void *cls,
2472 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2473 enum GNUNET_PSYCSTORE_MessageFlags flags)
2475 struct Operation *op = cls;
2476 if (NULL == op->client)
2477 { /* Requesting client already disconnected. */
2480 struct Channel *chn = op->channel;
2482 struct GNUNET_PSYC_MessageHeader *pmsg;
2483 uint16_t msize = ntohs (mmsg->header.size);
2484 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2486 struct GNUNET_OperationResultMessage *
2487 res = GNUNET_malloc (sizeof (*res) + psize);
2488 res->header.size = htons (sizeof (*res) + psize);
2489 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2490 res->op_id = op->op_id;
2491 res->result_code = GNUNET_htonll (GNUNET_OK);
2493 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2494 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2495 GNUNET_memcpy (&res[1], pmsg, psize);
2497 /** @todo FIXME: send only to requesting client */
2498 client_send_msg (chn, &res->header);
2506 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2507 * in response to a history request from a client.
2510 store_recv_fragment_history_result (void *cls, int64_t result,
2511 const char *err_msg, uint16_t err_msg_size)
2513 struct Operation *op = cls;
2514 if (NULL == op->client)
2515 { /* Requesting client already disconnected. */
2519 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2520 "%p History replay #%" PRIu64 ": "
2521 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2522 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2524 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2526 /** @todo Multicast replay request for messages not found locally. */
2529 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2535 check_client_history_replay (void *cls,
2536 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2543 * Client requests channel history.
2546 handle_client_history_replay (void *cls,
2547 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2549 struct Client *c = cls;
2550 struct GNUNET_SERVICE_Client *client = c->client;
2551 struct Channel *chn = c->channel;
2555 GNUNET_SERVICE_client_drop (client);
2559 uint16_t size = ntohs (req->header.size);
2560 const char *method_prefix = (const char *) &req[1];
2562 if (size < sizeof (*req) + 1
2563 || '\0' != method_prefix[size - sizeof (*req) - 1])
2565 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2566 "%p History replay #%" PRIu64 ": "
2567 "invalid method prefix. size: %u < %u?\n",
2569 GNUNET_ntohll (req->op_id),
2571 (unsigned int) sizeof (*req) + 1);
2573 GNUNET_SERVICE_client_drop (client);
2577 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2579 if (0 == req->message_limit)
2581 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2582 GNUNET_ntohll (req->start_message_id),
2583 GNUNET_ntohll (req->end_message_id),
2585 &store_recv_fragment_history,
2586 &store_recv_fragment_history_result, op);
2590 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2591 GNUNET_ntohll (req->message_limit),
2593 &store_recv_fragment_history,
2594 &store_recv_fragment_history_result,
2597 GNUNET_SERVICE_client_continue (client);
2602 * Received state var from PSYCstore, send it to client.
2605 store_recv_state_var (void *cls, const char *name,
2606 const void *value, uint32_t value_size)
2608 struct Operation *op = cls;
2609 struct GNUNET_OperationResultMessage *res;
2610 struct GNUNET_MQ_Envelope *env;
2612 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2613 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2614 op->channel, GNUNET_ntohll (op->op_id), name);
2616 if (NULL != name) /* First part */
2618 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2619 struct GNUNET_PSYC_MessageModifier *mod;
2620 env = GNUNET_MQ_msg_extra (res,
2621 sizeof (*mod) + name_size + value_size,
2622 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2623 res->op_id = op->op_id;
2625 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2626 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2627 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2628 mod->name_size = htons (name_size);
2629 mod->value_size = htonl (value_size);
2630 mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2631 GNUNET_memcpy (&mod[1], name, name_size);
2632 GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2634 else /* Continuation */
2636 struct GNUNET_MessageHeader *mod;
2637 env = GNUNET_MQ_msg_extra (res,
2638 sizeof (*mod) + value_size,
2639 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2640 res->op_id = op->op_id;
2642 mod = (struct GNUNET_MessageHeader *) &res[1];
2643 mod->size = htons (sizeof (*mod) + value_size);
2644 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2645 GNUNET_memcpy (&mod[1], value, value_size);
2648 // FIXME: client might have been disconnected
2649 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
2655 * Received result of GNUNET_PSYCSTORE_state_get()
2656 * or GNUNET_PSYCSTORE_state_get_prefix()
2659 store_recv_state_result (void *cls, int64_t result,
2660 const char *err_msg, uint16_t err_msg_size)
2662 struct Operation *op = cls;
2663 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2664 "%p state_get #%" PRIu64 ": "
2665 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2666 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2668 // FIXME: client might have been disconnected
2669 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2675 check_client_state_get (void *cls,
2676 const struct StateRequest *req)
2678 struct Client *c = cls;
2679 struct Channel *chn = c->channel;
2683 return GNUNET_SYSERR;
2686 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2687 const char *name = (const char *) &req[1];
2688 if (0 == name_size || '\0' != name[name_size - 1])
2691 return GNUNET_SYSERR;
2699 * Client requests best matching state variable from PSYCstore.
2702 handle_client_state_get (void *cls,
2703 const struct StateRequest *req)
2705 struct Client *c = cls;
2706 struct GNUNET_SERVICE_Client *client = c->client;
2707 struct Channel *chn = c->channel;
2709 const char *name = (const char *) &req[1];
2710 struct Operation *op = op_add (chn, client, req->op_id, 0);
2711 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2712 &store_recv_state_var,
2713 &store_recv_state_result, op);
2714 GNUNET_SERVICE_client_continue (client);
2719 check_client_state_get_prefix (void *cls,
2720 const struct StateRequest *req)
2722 struct Client *c = cls;
2723 struct Channel *chn = c->channel;
2727 return GNUNET_SYSERR;
2730 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2731 const char *name = (const char *) &req[1];
2732 if (0 == name_size || '\0' != name[name_size - 1])
2735 return GNUNET_SYSERR;
2743 * Client requests state variables with a given prefix from PSYCstore.
2746 handle_client_state_get_prefix (void *cls,
2747 const struct StateRequest *req)
2749 struct Client *c = cls;
2750 struct GNUNET_SERVICE_Client *client = c->client;
2751 struct Channel *chn = c->channel;
2753 const char *name = (const char *) &req[1];
2754 struct Operation *op = op_add (chn, client, req->op_id, 0);
2755 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2756 &store_recv_state_var,
2757 &store_recv_state_result, op);
2758 GNUNET_SERVICE_client_continue (client);
2763 * Initialize the PSYC service.
2765 * @param cls Closure.
2766 * @param server The initialized server.
2767 * @param c Configuration to use.
2771 const struct GNUNET_CONFIGURATION_Handle *c,
2772 struct GNUNET_SERVICE_Handle *svc)
2776 store = GNUNET_PSYCSTORE_connect (cfg);
2777 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2778 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2779 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2780 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2781 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2782 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2787 * Define "main" method using service macro.
2791 GNUNET_SERVICE_OPTION_NONE,
2793 client_notify_connect,
2794 client_notify_disconnect,
2796 GNUNET_MQ_hd_fixed_size (client_master_start,
2797 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
2798 struct MasterStartRequest,
2800 GNUNET_MQ_hd_var_size (client_slave_join,
2801 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
2802 struct SlaveJoinRequest,
2804 GNUNET_MQ_hd_var_size (client_join_decision,
2805 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2806 struct GNUNET_PSYC_JoinDecisionMessage,
2808 GNUNET_MQ_hd_var_size (client_psyc_message,
2809 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2810 struct GNUNET_MessageHeader,
2812 GNUNET_MQ_hd_fixed_size (client_membership_store,
2813 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
2814 struct ChannelMembershipStoreRequest,
2816 GNUNET_MQ_hd_var_size (client_history_replay,
2817 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
2818 struct GNUNET_PSYC_HistoryRequestMessage,
2820 GNUNET_MQ_hd_var_size (client_state_get,
2821 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
2822 struct StateRequest,
2824 GNUNET_MQ_hd_var_size (client_state_get_prefix,
2825 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2826 struct StateRequest,
2829 /* end of gnunet-service-psyc.c */