2 * This file is part of GNUnet
3 * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
42 * Handle to our current configuration.
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
47 * Handle to the statistics service.
49 static struct GNUNET_STATISTICS_Handle *stats;
52 * Notification context, simplifies client broadcasts.
54 static struct GNUNET_SERVER_NotificationContext *nc;
57 * Handle to the PSYCstore.
59 static struct GNUNET_PSYCSTORE_Handle *store;
62 * All connected masters.
63 * Channel's pub_key_hash -> struct Master
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
68 * All connected slaves.
69 * Channel's pub_key_hash -> struct Slave
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
74 * Connected slaves per channel.
75 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
81 * Message in the transmission queue.
83 struct TransmitMessage
85 struct TransmitMessage *prev;
86 struct TransmitMessage *next;
88 struct GNUNET_SERVER_Client *client;
91 * ID assigned to the message.
101 * @see enum MessageState
106 * Whether a message ACK has already been sent to the client.
107 * #GNUNET_YES or #GNUNET_NO
111 /* Followed by message */
116 * Cache for received message fragments.
117 * Message fragments are only sent to clients after all modifiers arrived.
119 * chan_key -> MultiHashMap chan_msgs
121 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
125 * Entry in the chan_msgs hashmap of @a recv_cache:
126 * fragment_id -> RecvCacheEntry
128 struct RecvCacheEntry
130 struct GNUNET_MULTICAST_MessageHeader *mmsg;
136 * Entry in the @a recv_frags hash map of a @a Channel.
137 * message_id -> FragmentQueue
142 * Fragment IDs stored in @a recv_cache.
144 struct GNUNET_CONTAINER_Heap *fragments;
147 * Total size of received fragments.
152 * Total size of received header fragments (METHOD & MODIFIERs)
154 uint64_t header_size;
157 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
159 uint64_t state_delta;
162 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
167 * Receive state of message.
169 * @see MessageFragmentState
174 * Is the message queued for delivery to the client?
175 * i.e. added to the recv_msgs queue
182 * List of connected clients.
189 struct GNUNET_SERVER_Client *client;
195 struct Operation *prev;
196 struct Operation *next;
198 struct GNUNET_SERVER_Client *client;
206 * Common part of the client context for both a channel master and slave.
210 struct Client *clients_head;
211 struct Client *clients_tail;
213 struct Operation *op_head;
214 struct Operation *op_tail;
216 struct TransmitMessage *tmit_head;
217 struct TransmitMessage *tmit_tail;
220 * Current PSYCstore operation.
222 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
225 * Received fragments not yet sent to the client.
226 * message_id -> FragmentQueue
228 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
231 * Received message IDs not yet sent to the client.
233 struct GNUNET_CONTAINER_Heap *recv_msgs;
236 * Public key of the channel.
238 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
241 * Hash of @a pub_key.
243 struct GNUNET_HashCode pub_key_hash;
246 * Last message ID sent to the client.
247 * 0 if there is no such message.
249 uint64_t max_message_id;
252 * ID of the last stateful message, where the state operations has been
253 * processed and saved to PSYCstore and which has been sent to the client.
254 * 0 if there is no such message.
256 uint64_t max_state_message_id;
259 * Expected value size for the modifier being received from the PSYC service.
261 uint32_t tmit_mod_value_size_expected;
264 * Actual value size for the modifier being received from the PSYC service.
266 uint32_t tmit_mod_value_size;
269 * @see enum MessageState
274 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
279 * Is this channel ready to receive messages from client?
280 * #GNUNET_YES or #GNUNET_NO
285 * Is the client disconnected?
286 * #GNUNET_YES or #GNUNET_NO
288 uint8_t is_disconnected;
293 * Client context for a channel master.
298 * Channel struct common for Master and Slave
303 * Private key of the channel.
305 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
308 * Handle for the multicast origin.
310 struct GNUNET_MULTICAST_Origin *origin;
313 * Transmit handle for multicast.
315 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
318 * Incoming join requests from multicast.
319 * member_key -> struct GNUNET_MULTICAST_JoinHandle *
321 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
324 * Last message ID transmitted to this channel.
326 * Incremented before sending a message, thus the message_id in messages sent
329 uint64_t max_message_id;
332 * ID of the last message with state operations transmitted to the channel.
333 * 0 if there is no such message.
335 uint64_t max_state_message_id;
338 * Maximum group generation transmitted to the channel.
340 uint64_t max_group_generation;
343 * @see enum GNUNET_PSYC_Policy
345 enum GNUNET_PSYC_Policy policy;
350 * Client context for a channel slave.
355 * Channel struct common for Master and Slave
360 * Private key of the slave.
362 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
365 * Public key of the slave.
367 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
370 * Hash of @a pub_key.
372 struct GNUNET_HashCode pub_key_hash;
375 * Handle for the multicast member.
377 struct GNUNET_MULTICAST_Member *member;
380 * Transmit handle for multicast.
382 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
385 * Peer identity of the origin.
387 struct GNUNET_PeerIdentity origin;
390 * Number of items in @a relays.
392 uint32_t relay_count;
395 * Relays that multicast can use to connect.
397 struct GNUNET_PeerIdentity *relays;
400 * Join request to be transmitted to the master on join.
402 struct GNUNET_PSYC_Message *join_msg;
405 * Join decision received from multicast.
407 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
410 * Maximum request ID for this channel.
412 uint64_t max_request_id;
417 transmit_message (struct Channel *chn);
421 message_queue_drop (struct Channel *chn);
425 * Task run during shutdown.
431 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
435 GNUNET_SERVER_notification_context_destroy (nc);
440 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
446 static struct Operation *
447 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
448 uint64_t op_id, uint32_t flags)
450 struct Operation *op = GNUNET_malloc (sizeof (*op));
455 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
461 op_remove (struct Channel *chn, struct Operation *op)
463 GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op);
469 * Clean up master data structures after a client disconnected.
472 cleanup_master (struct Master *mst)
474 struct Channel *chn = &mst->chn;
476 if (NULL != mst->origin)
477 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
478 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
479 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
484 * Clean up slave data structures after a client disconnected.
487 cleanup_slave (struct Slave *slv)
489 struct Channel *chn = &slv->chn;
490 struct GNUNET_CONTAINER_MultiHashMap *
491 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
493 GNUNET_assert (NULL != chn_slv);
494 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
496 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
498 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
500 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
502 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
504 if (NULL != slv->join_msg)
506 GNUNET_free (slv->join_msg);
507 slv->join_msg = NULL;
509 if (NULL != slv->relays)
511 GNUNET_free (slv->relays);
514 if (NULL != slv->member)
516 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
519 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
524 * Clean up channel data structures after a client disconnected.
527 cleanup_channel (struct Channel *chn)
529 message_queue_drop (chn);
530 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
532 if (NULL != chn->store_op)
534 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
535 chn->store_op = NULL;
538 (GNUNET_YES == chn->is_master)
539 ? cleanup_master ((struct Master *) chn)
540 : cleanup_slave ((struct Slave *) chn);
546 * Called whenever a client is disconnected.
547 * Frees our resources associated with that client.
549 * @param cls Closure.
550 * @param client Identification of the client.
553 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
559 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
563 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
564 "%p User context is NULL in client_disconnect()\n", chn);
569 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
570 "%p Client (%s) disconnected from channel %s\n",
571 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
572 GNUNET_h2s (&chn->pub_key_hash));
574 struct Client *cli = chn->clients_head;
577 if (cli->client == client)
579 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
586 struct Operation *op = chn->op_head;
589 if (op->client == client)
597 if (NULL == chn->clients_head)
598 { /* Last client disconnected. */
599 if (NULL != chn->tmit_head)
600 { /* Send pending messages to multicast before cleanup. */
601 transmit_message (chn);
605 cleanup_channel (chn);
612 * Send message to all clients connected to the channel.
615 client_send_msg (const struct Channel *chn,
616 const struct GNUNET_MessageHeader *msg)
618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619 "%p Sending message to clients.\n", chn);
621 struct Client *cli = chn->clients_head;
624 GNUNET_SERVER_notification_context_add (nc, cli->client);
625 GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
632 * Send a result code back to the client.
635 * Client that should receive the result code.
639 * Operation ID in network byte order.
641 * Data payload or NULL.
646 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
647 int64_t result_code, const void *data, uint16_t data_size)
649 struct GNUNET_OperationResultMessage *res;
651 res = GNUNET_malloc (sizeof (*res) + data_size);
652 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
653 res->header.size = htons (sizeof (*res) + data_size);
654 res->result_code = GNUNET_htonll_signed (result_code);
657 memcpy (&res[1], data, data_size);
659 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
660 "%p Sending result to client for operation #%" PRIu64 ": "
661 "%" PRId64 " (size: %u)\n",
662 client, GNUNET_ntohll (op_id), result_code, data_size);
664 GNUNET_SERVER_notification_context_add (nc, client);
665 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
672 * Closure for join_mem_test_cb()
674 struct JoinMemTestClosure
676 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
678 struct GNUNET_MULTICAST_JoinHandle *jh;
679 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
684 * Membership test result callback used for join requests.
687 join_mem_test_cb (void *cls, int64_t result,
688 const char *err_msg, uint16_t err_msg_size)
690 struct JoinMemTestClosure *jcls = cls;
692 if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
693 { /* Pass on join request to client if this is a master channel */
694 struct Master *mst = (struct Master *) jcls->chn;
695 struct GNUNET_HashCode slave_key_hash;
696 GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
698 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
699 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
700 client_send_msg (jcls->chn, &jcls->join_msg->header);
704 if (GNUNET_SYSERR == result)
706 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
707 "Could not perform membership test (%.*s)\n",
708 err_msg_size, err_msg);
711 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
713 GNUNET_free (jcls->join_msg);
719 * Incoming join request from multicast.
722 mcast_recv_join_request (void *cls,
723 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
724 const struct GNUNET_MessageHeader *join_msg,
725 struct GNUNET_MULTICAST_JoinHandle *jh)
727 struct Channel *chn = cls;
728 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
730 uint16_t join_msg_size = 0;
731 if (NULL != join_msg)
733 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
735 join_msg_size = ntohs (join_msg->size);
739 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
740 "%p Got join message with invalid type %u.\n",
741 chn, ntohs (join_msg->type));
745 struct GNUNET_PSYC_JoinRequestMessage *
746 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
747 req->header.size = htons (sizeof (*req) + join_msg_size);
748 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
749 req->slave_key = *slave_key;
750 if (0 < join_msg_size)
751 memcpy (&req[1], join_msg, join_msg_size);
753 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
754 jcls->slave_key = *slave_key;
757 jcls->join_msg = req;
759 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
760 chn->max_message_id, 0,
761 &join_mem_test_cb, jcls);
766 * Join decision received from multicast.
769 mcast_recv_join_decision (void *cls, int is_admitted,
770 const struct GNUNET_PeerIdentity *peer,
771 uint16_t relay_count,
772 const struct GNUNET_PeerIdentity *relays,
773 const struct GNUNET_MessageHeader *join_resp)
775 struct Slave *slv = cls;
776 struct Channel *chn = &slv->chn;
777 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
778 "%p Got join decision: %d\n", slv, is_admitted);
780 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
781 struct GNUNET_PSYC_JoinDecisionMessage *
782 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
783 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
784 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
785 dcsn->is_admitted = htonl (is_admitted);
786 if (0 < join_resp_size)
787 memcpy (&dcsn[1], join_resp, join_resp_size);
789 client_send_msg (chn, &dcsn->header);
791 if (GNUNET_YES == is_admitted)
793 chn->is_ready = GNUNET_YES;
803 * Received result of GNUNET_PSYCSTORE_membership_test()
806 store_recv_membership_test_result (void *cls, int64_t result,
807 const char *err_msg, uint16_t err_msg_size)
809 struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
810 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
811 "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%.*s)\n",
812 mth, result, err_msg_size, err_msg);
814 GNUNET_MULTICAST_membership_test_result (mth, result);
819 * Incoming membership test request from multicast.
822 mcast_recv_membership_test (void *cls,
823 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
824 uint64_t message_id, uint64_t group_generation,
825 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
827 struct Channel *chn = cls;
828 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
829 "%p Received membership test request from multicast.\n",
831 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
832 message_id, group_generation,
833 &store_recv_membership_test_result, mth);
838 store_recv_fragment_replay (void *cls,
839 struct GNUNET_MULTICAST_MessageHeader *msg,
840 enum GNUNET_PSYCSTORE_MessageFlags flags)
842 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
844 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
850 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
853 store_recv_fragment_replay_result (void *cls, int64_t result,
854 const char *err_msg, uint16_t err_msg_size)
856 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
857 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
859 rh, result, err_msg_size, err_msg);
867 GNUNET_MULTICAST_replay_response (rh, NULL,
868 GNUNET_MULTICAST_REC_NOT_FOUND);
871 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
872 GNUNET_MULTICAST_replay_response (rh, NULL,
873 GNUNET_MULTICAST_REC_ACCESS_DENIED);
877 GNUNET_MULTICAST_replay_response (rh, NULL,
878 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
881 GNUNET_MULTICAST_replay_response_end (rh);
886 * Incoming fragment replay request from multicast.
889 mcast_recv_replay_fragment (void *cls,
890 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
891 uint64_t fragment_id, uint64_t flags,
892 struct GNUNET_MULTICAST_ReplayHandle *rh)
895 struct Channel *chn = cls;
896 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
897 fragment_id, fragment_id,
898 &store_recv_fragment_replay,
899 &store_recv_fragment_replay_result, rh);
904 * Incoming message replay request from multicast.
907 mcast_recv_replay_message (void *cls,
908 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
910 uint64_t fragment_offset,
912 struct GNUNET_MULTICAST_ReplayHandle *rh)
914 struct Channel *chn = cls;
915 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
916 message_id, message_id, NULL,
917 &store_recv_fragment_replay,
918 &store_recv_fragment_replay_result, rh);
923 * Convert an uint64_t in network byte order to a HashCode
924 * that can be used as key in a MultiHashMap
927 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
929 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
930 /* TODO: use built-in byte swap functions if available */
932 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
933 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
935 *key = (struct GNUNET_HashCode) {};
937 = (n << 32) | (n >> 32);
942 * Convert an uint64_t in host byte order to a HashCode
943 * that can be used as key in a MultiHashMap
946 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
948 #if __BYTE_ORDER == __BIG_ENDIAN
949 hash_key_from_nll (key, n);
950 #elif __BYTE_ORDER == __LITTLE_ENDIAN
951 *key = (struct GNUNET_HashCode) {};
952 *((uint64_t *) key) = n;
954 #error byteorder undefined
960 * Initialize PSYC message header.
963 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
964 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
966 uint16_t size = ntohs (mmsg->header.size);
967 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
969 pmsg->header.size = htons (psize);
970 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
971 pmsg->message_id = mmsg->message_id;
972 pmsg->fragment_offset = mmsg->fragment_offset;
973 pmsg->flags = htonl (flags);
975 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
980 * Create a new PSYC message from a multicast message for sending it to clients.
982 static inline struct GNUNET_PSYC_MessageHeader *
983 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
985 struct GNUNET_PSYC_MessageHeader *pmsg;
986 uint16_t size = ntohs (mmsg->header.size);
987 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
989 pmsg = GNUNET_malloc (psize);
990 psyc_msg_init (pmsg, mmsg, flags);
996 * Send multicast message to all clients connected to the channel.
999 client_send_mcast_msg (struct Channel *chn,
1000 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1003 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1004 "%p Sending multicast message to client. "
1005 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1006 chn, GNUNET_ntohll (mmsg->fragment_id),
1007 GNUNET_ntohll (mmsg->message_id));
1009 struct GNUNET_PSYC_MessageHeader *pmsg = psyc_msg_new (mmsg, flags);
1010 client_send_msg (chn, &pmsg->header);
1016 * Send multicast request to all clients connected to the channel.
1019 client_send_mcast_req (struct Master *mst,
1020 const struct GNUNET_MULTICAST_RequestHeader *req)
1022 struct Channel *chn = &mst->chn;
1024 struct GNUNET_PSYC_MessageHeader *pmsg;
1025 uint16_t size = ntohs (req->header.size);
1026 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1028 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1029 "%p Sending multicast request to client. "
1030 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1031 chn, GNUNET_ntohll (req->fragment_id),
1032 GNUNET_ntohll (req->request_id));
1034 pmsg = GNUNET_malloc (psize);
1035 pmsg->header.size = htons (psize);
1036 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1037 pmsg->message_id = req->request_id;
1038 pmsg->fragment_offset = req->fragment_offset;
1039 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1041 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1042 client_send_msg (chn, &pmsg->header);
1048 * Insert a multicast message fragment into the queue belonging to the message.
1050 * @param chn Channel.
1051 * @param mmsg Multicast message fragment.
1052 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
1053 * @param first_ptype First PSYC message part type in @a mmsg.
1054 * @param last_ptype Last PSYC message part type in @a mmsg.
1057 fragment_queue_insert (struct Channel *chn,
1058 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1059 uint16_t first_ptype, uint16_t last_ptype)
1061 const uint16_t size = ntohs (mmsg->header.size);
1062 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1063 struct GNUNET_CONTAINER_MultiHashMap
1064 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1065 &chn->pub_key_hash);
1067 struct GNUNET_HashCode msg_id_hash;
1068 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1070 struct FragmentQueue
1071 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1075 fragq = GNUNET_new (struct FragmentQueue);
1076 fragq->state = MSG_FRAG_STATE_HEADER;
1078 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1080 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1081 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1083 if (NULL == chan_msgs)
1085 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1086 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1087 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1091 struct GNUNET_HashCode frag_id_hash;
1092 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1093 struct RecvCacheEntry
1094 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1095 if (NULL == cache_entry)
1097 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098 "%p Adding message fragment to cache. "
1099 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1100 chn, GNUNET_ntohll (mmsg->message_id),
1101 GNUNET_ntohll (mmsg->fragment_id));
1102 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103 "%p header_size: %" PRIu64 " + %u\n",
1104 chn, fragq->header_size, size);
1105 cache_entry = GNUNET_new (struct RecvCacheEntry);
1106 cache_entry->ref_count = 1;
1107 cache_entry->mmsg = GNUNET_malloc (size);
1108 memcpy (cache_entry->mmsg, mmsg, size);
1109 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1110 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1114 cache_entry->ref_count++;
1115 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1116 "%p Message fragment is already in cache. "
1117 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1118 ", ref_count: %u\n",
1119 chn, GNUNET_ntohll (mmsg->message_id),
1120 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1123 if (MSG_FRAG_STATE_HEADER == fragq->state)
1125 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1127 struct GNUNET_PSYC_MessageMethod *
1128 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1129 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1130 fragq->flags = ntohl (pmeth->flags);
1133 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1135 fragq->header_size += size;
1137 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1138 || frag_offset == fragq->header_size)
1139 { /* header is now complete */
1140 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1141 "%p Header of message %" PRIu64 " is complete.\n",
1142 chn, GNUNET_ntohll (mmsg->message_id));
1144 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1145 "%p Adding message %" PRIu64 " to queue.\n",
1146 chn, GNUNET_ntohll (mmsg->message_id));
1147 fragq->state = MSG_FRAG_STATE_DATA;
1151 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1152 "%p Header of message %" PRIu64 " is NOT complete yet: "
1153 "%" PRIu64 " != %" PRIu64 "\n",
1154 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1155 fragq->header_size);
1161 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1162 if (frag_offset == fragq->size)
1163 fragq->state = MSG_FRAG_STATE_END;
1165 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1166 "%p Message %" PRIu64 " is NOT complete yet: "
1167 "%" PRIu64 " != %" PRIu64 "\n",
1168 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1172 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1173 /* Drop message without delivering to client if it's a single fragment */
1175 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1176 ? MSG_FRAG_STATE_DROP
1177 : MSG_FRAG_STATE_CANCEL;
1180 switch (fragq->state)
1182 case MSG_FRAG_STATE_DATA:
1183 case MSG_FRAG_STATE_END:
1184 case MSG_FRAG_STATE_CANCEL:
1185 if (GNUNET_NO == fragq->is_queued)
1187 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1188 GNUNET_ntohll (mmsg->message_id));
1189 fragq->is_queued = GNUNET_YES;
1193 fragq->size += size;
1194 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1195 GNUNET_ntohll (mmsg->fragment_id));
1200 * Run fragment queue of a message.
1202 * Send fragments of a message in order to client, after all modifiers arrived
1205 * @param chn Channel.
1206 * @param msg_id ID of the message @a fragq belongs to.
1207 * @param fragq Fragment queue of the message.
1208 * @param drop Drop message without delivering to client?
1209 * #GNUNET_YES or #GNUNET_NO.
1212 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1213 struct FragmentQueue *fragq, uint8_t drop)
1215 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1216 "%p Running message fragment queue for message %" PRIu64
1218 chn, msg_id, fragq->state);
1220 struct GNUNET_CONTAINER_MultiHashMap
1221 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1222 &chn->pub_key_hash);
1223 GNUNET_assert (NULL != chan_msgs);
1226 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1229 struct GNUNET_HashCode frag_id_hash;
1230 hash_key_from_hll (&frag_id_hash, frag_id);
1231 struct RecvCacheEntry *cache_entry
1232 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1233 if (cache_entry != NULL)
1235 if (GNUNET_NO == drop)
1237 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1239 if (cache_entry->ref_count <= 1)
1241 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1243 GNUNET_free (cache_entry->mmsg);
1244 GNUNET_free (cache_entry);
1248 cache_entry->ref_count--;
1251 #if CACHE_AGING_IMPLEMENTED
1252 else if (GNUNET_NO == drop)
1254 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1258 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1261 if (MSG_FRAG_STATE_END <= fragq->state)
1263 struct GNUNET_HashCode msg_id_hash;
1264 hash_key_from_hll (&msg_id_hash, msg_id);
1266 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1267 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1268 GNUNET_free (fragq);
1272 fragq->is_queued = GNUNET_NO;
1278 * Run message queue.
1280 * Send messages in queue to client in order after a message has arrived from
1281 * multicast, according to the following:
1282 * - A message is only sent if all of its modifiers arrived.
1283 * - A stateful message is only sent if the previous stateful message
1284 * has already been delivered to the client.
1286 * @param chn Channel.
1288 * @return Number of messages removed from queue and sent to client.
1291 message_queue_run (struct Channel *chn)
1293 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1294 "%p Running message queue.\n", chn);
1297 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1300 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1301 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1302 struct GNUNET_HashCode msg_id_hash;
1303 hash_key_from_hll (&msg_id_hash, msg_id);
1305 struct FragmentQueue *
1306 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1308 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1310 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1311 "%p No fragq (%p) or header not complete.\n",
1316 if (MSG_FRAG_STATE_HEADER == fragq->state)
1318 /* Check if there's a missing message before the current one */
1319 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1321 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1322 && msg_id - 1 != chn->max_message_id)
1324 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1325 "%p Out of order message. "
1326 "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1327 chn, msg_id, chn->max_message_id);
1333 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1335 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1336 "%p Out of order stateful message. "
1337 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1338 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1342 /* FIXME: apply modifiers to state in PSYCstore */
1343 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
1344 store_recv_state_modify_result, cls);
1346 chn->max_state_message_id = msg_id;
1348 chn->max_message_id = msg_id;
1350 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1351 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1354 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1361 * Drop message queue of a channel.
1363 * Remove all messages in queue without sending it to clients.
1365 * @param chn Channel.
1367 * @return Number of messages removed from queue.
1370 message_queue_drop (struct Channel *chn)
1372 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1373 "%p Dropping message queue.\n", chn);
1376 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1379 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1380 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1381 struct GNUNET_HashCode msg_id_hash;
1382 hash_key_from_hll (&msg_id_hash, msg_id);
1384 struct FragmentQueue *
1385 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1386 GNUNET_assert (NULL != fragq);
1387 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1388 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1392 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1398 * Received result of GNUNET_PSYCSTORE_fragment_store().
1401 store_recv_fragment_store_result (void *cls, int64_t result,
1402 const char *err_msg, uint16_t err_msg_size)
1404 struct Channel *chn = cls;
1405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1406 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1407 chn, result, err_msg_size, err_msg);
1412 * Handle incoming message fragment from multicast.
1414 * Store it using PSYCstore and send it to the clients of the channel in order.
1417 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1419 struct Channel *chn = cls;
1420 uint16_t size = ntohs (mmsg->header.size);
1422 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1423 "%p Received multicast message of size %u.\n",
1426 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1427 &store_recv_fragment_store_result, chn);
1429 uint16_t first_ptype = 0, last_ptype = 0;
1431 == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1432 (const char *) &mmsg[1],
1433 &first_ptype, &last_ptype))
1435 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1436 "%p Dropping incoming multicast message with invalid parts.\n",
1438 GNUNET_break_op (0);
1442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1443 "Message parts: first: type %u, last: type %u\n",
1444 first_ptype, last_ptype);
1446 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1447 message_queue_run (chn);
1452 * Incoming request fragment from multicast for a master.
1454 * @param cls Master.
1455 * @param req The request.
1458 mcast_recv_request (void *cls,
1459 const struct GNUNET_MULTICAST_RequestHeader *req)
1461 struct Master *mst = cls;
1462 uint16_t size = ntohs (req->header.size);
1464 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1465 "%p Received multicast request of size %u.\n",
1468 uint16_t first_ptype = 0, last_ptype = 0;
1470 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1471 (const char *) &req[1],
1472 &first_ptype, &last_ptype))
1474 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1475 "%p Dropping incoming multicast request with invalid parts.\n",
1477 GNUNET_break_op (0);
1481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1482 "Message parts: first: type %u, last: type %u\n",
1483 first_ptype, last_ptype);
1485 /* FIXME: in-order delivery */
1486 client_send_mcast_req (mst, req);
1491 * Response from PSYCstore with the current counter values for a channel master.
1494 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1495 uint64_t max_message_id, uint64_t max_group_generation,
1496 uint64_t max_state_message_id)
1498 struct Master *mst = cls;
1499 struct Channel *chn = &mst->chn;
1500 chn->store_op = NULL;
1502 struct GNUNET_PSYC_CountersResultMessage res;
1503 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1504 res.header.size = htons (sizeof (res));
1505 res.result_code = GNUNET_htonl_signed (result);
1506 res.max_message_id = GNUNET_htonll (max_message_id);
1508 if (GNUNET_OK == result || GNUNET_NO == result)
1510 mst->max_message_id = max_message_id;
1511 chn->max_message_id = max_message_id;
1512 chn->max_state_message_id = max_state_message_id;
1513 mst->max_group_generation = max_group_generation;
1515 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1516 &mcast_recv_join_request,
1517 &mcast_recv_membership_test,
1518 &mcast_recv_replay_fragment,
1519 &mcast_recv_replay_message,
1520 &mcast_recv_request,
1521 &mcast_recv_message, chn);
1522 chn->is_ready = GNUNET_YES;
1526 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1527 "%p GNUNET_PSYCSTORE_counters_get() "
1528 "returned %d for channel %s.\n",
1529 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1532 client_send_msg (chn, &res.header);
1537 * Response from PSYCstore with the current counter values for a channel slave.
1540 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1541 uint64_t max_message_id, uint64_t max_group_generation,
1542 uint64_t max_state_message_id)
1544 struct Slave *slv = cls;
1545 struct Channel *chn = &slv->chn;
1546 chn->store_op = NULL;
1548 struct GNUNET_PSYC_CountersResultMessage res;
1549 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1550 res.header.size = htons (sizeof (res));
1551 res.result_code = GNUNET_htonl_signed (result);
1552 res.max_message_id = GNUNET_htonll (max_message_id);
1554 if (GNUNET_OK == result || GNUNET_NO == result)
1556 chn->max_message_id = max_message_id;
1557 chn->max_state_message_id = max_state_message_id;
1559 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1561 slv->relay_count, slv->relays,
1562 &slv->join_msg->header,
1563 &mcast_recv_join_request,
1564 &mcast_recv_join_decision,
1565 &mcast_recv_membership_test,
1566 &mcast_recv_replay_fragment,
1567 &mcast_recv_replay_message,
1568 &mcast_recv_message, chn);
1569 if (NULL != slv->join_msg)
1571 GNUNET_free (slv->join_msg);
1572 slv->join_msg = NULL;
1577 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1578 "%p GNUNET_PSYCSTORE_counters_get() "
1579 "returned %d for channel %s.\n",
1580 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1583 client_send_msg (chn, &res.header);
1588 channel_init (struct Channel *chn)
1591 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1592 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1597 * Handle a connecting client starting a channel master.
1600 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1601 const struct GNUNET_MessageHeader *msg)
1603 const struct MasterStartRequest *req
1604 = (const struct MasterStartRequest *) msg;
1606 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1607 struct GNUNET_HashCode pub_key_hash;
1609 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1610 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1613 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1614 struct Channel *chn;
1618 mst = GNUNET_new (struct Master);
1619 mst->policy = ntohl (req->policy);
1620 mst->priv_key = req->channel_key;
1621 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1624 chn->is_master = GNUNET_YES;
1625 chn->pub_key = pub_key;
1626 chn->pub_key_hash = pub_key_hash;
1629 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1630 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1631 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1632 store_recv_master_counters, mst);
1638 struct GNUNET_PSYC_CountersResultMessage res;
1639 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1640 res.header.size = htons (sizeof (res));
1641 res.result_code = GNUNET_htonl_signed (GNUNET_OK);
1642 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1644 GNUNET_SERVER_notification_context_add (nc, client);
1645 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1649 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1650 "%p Client connected as master to channel %s.\n",
1651 mst, GNUNET_h2s (&chn->pub_key_hash));
1653 struct Client *cli = GNUNET_new (struct Client);
1654 cli->client = client;
1655 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1657 GNUNET_SERVER_client_set_user_context (client, chn);
1658 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1663 * Handle a connecting client joining as a channel slave.
1666 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1667 const struct GNUNET_MessageHeader *msg)
1669 const struct SlaveJoinRequest *req
1670 = (const struct SlaveJoinRequest *) msg;
1671 uint16_t req_size = ntohs (req->header.size);
1673 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1674 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1676 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1677 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1678 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1680 struct GNUNET_CONTAINER_MultiHashMap *
1681 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1682 struct Slave *slv = NULL;
1683 struct Channel *chn;
1685 if (NULL != chn_slv)
1687 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1691 slv = GNUNET_new (struct Slave);
1692 slv->priv_key = req->slave_key;
1693 slv->pub_key = slv_pub_key;
1694 slv->pub_key_hash = slv_pub_key_hash;
1695 slv->origin = req->origin;
1696 slv->relay_count = ntohl (req->relay_count);
1698 const struct GNUNET_PeerIdentity *
1699 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1700 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1701 uint16_t join_msg_size = 0;
1703 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1706 join_msg_size = ntohs (slv->join_msg->header.size);
1707 slv->join_msg = GNUNET_malloc (join_msg_size);
1708 memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
1710 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1712 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1713 "%u + %u + %u != %u\n",
1714 sizeof (*req), relay_size, join_msg_size, req_size);
1716 GNUNET_SERVER_client_disconnect (client);
1720 if (0 < slv->relay_count)
1722 slv->relays = GNUNET_malloc (relay_size);
1723 memcpy (slv->relays, &req[1], relay_size);
1727 chn->is_master = GNUNET_NO;
1728 chn->pub_key = req->channel_key;
1729 chn->pub_key_hash = pub_key_hash;
1732 if (NULL == chn_slv)
1734 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1735 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1736 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1738 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1739 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1740 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1741 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1742 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1743 &store_recv_slave_counters, slv);
1749 struct GNUNET_PSYC_CountersResultMessage res;
1750 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1751 res.header.size = htons (sizeof (res));
1752 res.result_code = GNUNET_htonl_signed (GNUNET_OK);
1753 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1755 GNUNET_SERVER_notification_context_add (nc, client);
1756 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1759 if (NULL == slv->member)
1762 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1764 slv->relay_count, slv->relays,
1765 &slv->join_msg->header,
1766 &mcast_recv_join_request,
1767 &mcast_recv_join_decision,
1768 &mcast_recv_membership_test,
1769 &mcast_recv_replay_fragment,
1770 &mcast_recv_replay_message,
1771 &mcast_recv_message, chn);
1772 if (NULL != slv->join_msg)
1774 GNUNET_free (slv->join_msg);
1775 slv->join_msg = NULL;
1778 else if (NULL != slv->join_dcsn)
1780 GNUNET_SERVER_notification_context_add (nc, client);
1781 GNUNET_SERVER_notification_context_unicast (nc, client,
1782 &slv->join_dcsn->header,
1787 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1788 "%p Client connected as slave to channel %s.\n",
1789 slv, GNUNET_h2s (&chn->pub_key_hash));
1791 struct Client *cli = GNUNET_new (struct Client);
1792 cli->client = client;
1793 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1795 GNUNET_SERVER_client_set_user_context (client, chn);
1796 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1800 struct JoinDecisionClosure
1802 int32_t is_admitted;
1803 struct GNUNET_MessageHeader *msg;
1808 * Iterator callback for sending join decisions to multicast.
1811 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1814 struct JoinDecisionClosure *jcls = cls;
1815 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1816 // FIXME: add relays
1817 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1823 * Join decision from client.
1826 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1827 const struct GNUNET_MessageHeader *msg)
1829 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1830 = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1831 struct Channel *chn;
1833 struct JoinDecisionClosure jcls;
1835 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1839 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1842 GNUNET_assert (GNUNET_YES == chn->is_master);
1843 mst = (struct Master *) chn;
1844 jcls.is_admitted = ntohl (dcsn->is_admitted);
1846 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1847 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1850 struct GNUNET_HashCode slave_key_hash;
1851 GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1854 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1855 "%p Got join decision (%d) from client for channel %s..\n",
1856 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1857 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1858 "%p ..and slave %s.\n",
1859 mst, GNUNET_h2s (&slave_key_hash));
1861 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1862 &mcast_send_join_decision, &jcls);
1863 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1864 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1869 * Send acknowledgement to a client.
1871 * Sent after a message fragment has been passed on to multicast.
1873 * @param chn The channel struct for the client.
1876 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1878 struct GNUNET_MessageHeader res;
1879 res.size = htons (sizeof (res));
1880 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1883 GNUNET_SERVER_notification_context_add (nc, client);
1884 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1889 * Callback for the transmit functions of multicast.
1892 transmit_notify (void *cls, size_t *data_size, void *data)
1894 struct Channel *chn = cls;
1895 struct TransmitMessage *tmit_msg = chn->tmit_head;
1897 if (NULL == tmit_msg || *data_size < tmit_msg->size)
1899 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1900 "%p transmit_notify: nothing to send.\n", chn);
1905 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1908 *data_size = tmit_msg->size;
1909 memcpy (data, &tmit_msg[1], *data_size);
1911 int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1913 if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
1914 send_message_ack (chn, tmit_msg->client);
1916 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1917 GNUNET_free (tmit_msg);
1919 if (NULL != chn->tmit_head)
1921 transmit_message (chn);
1923 else if (GNUNET_YES == chn->is_disconnected)
1925 /* FIXME: handle partial message (when still in_transmit) */
1926 cleanup_channel (chn);
1933 * Callback for the transmit functions of multicast.
1936 master_transmit_notify (void *cls, size_t *data_size, void *data)
1938 int ret = transmit_notify (cls, data_size, data);
1940 if (GNUNET_YES == ret)
1942 struct Master *mst = cls;
1943 mst->tmit_handle = NULL;
1950 * Callback for the transmit functions of multicast.
1953 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1955 int ret = transmit_notify (cls, data_size, data);
1957 if (GNUNET_YES == ret)
1959 struct Slave *slv = cls;
1960 slv->tmit_handle = NULL;
1967 * Transmit a message from a channel master to the multicast group.
1970 master_transmit_message (struct Master *mst)
1972 if (NULL == mst->tmit_handle)
1975 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1976 mst->max_group_generation,
1977 master_transmit_notify, mst);
1981 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1987 * Transmit a message from a channel slave to the multicast group.
1990 slave_transmit_message (struct Slave *slv)
1992 if (NULL == slv->tmit_handle)
1995 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1996 slave_transmit_notify, slv);
2000 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2006 transmit_message (struct Channel *chn)
2009 ? master_transmit_message ((struct Master *) chn)
2010 : slave_transmit_message ((struct Slave *) chn);
2015 * Queue a message from a channel master for sending to the multicast group.
2018 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
2019 uint16_t first_ptype, uint16_t last_ptype)
2021 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
2023 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
2025 tmit_msg->id = ++mst->max_message_id;
2026 struct GNUNET_PSYC_MessageMethod *pmeth
2027 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2029 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2031 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2033 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2035 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2036 - mst->max_state_message_id);
2040 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2047 * Queue a message from a channel slave for sending to the multicast group.
2050 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
2051 uint16_t first_ptype, uint16_t last_ptype)
2053 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
2055 struct GNUNET_PSYC_MessageMethod *pmeth
2056 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2057 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2058 tmit_msg->id = ++slv->max_request_id;
2064 * Queue PSYC message parts for sending to multicast.
2066 * @param chn Channel to send to.
2067 * @param client Client the message originates from.
2068 * @param data_size Size of @a data.
2069 * @param data Concatenated message parts.
2070 * @param first_ptype First message part type in @a data.
2071 * @param last_ptype Last message part type in @a data.
2073 static struct TransmitMessage *
2074 queue_message (struct Channel *chn,
2075 struct GNUNET_SERVER_Client *client,
2078 uint16_t first_ptype, uint16_t last_ptype)
2080 struct TransmitMessage *
2081 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2082 memcpy (&tmit_msg[1], data, data_size);
2083 tmit_msg->client = client;
2084 tmit_msg->size = data_size;
2085 tmit_msg->state = chn->tmit_state;
2087 /* FIXME: separate queue per message ID */
2089 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2092 ? master_queue_message ((struct Master *) chn, tmit_msg,
2093 first_ptype, last_ptype)
2094 : slave_queue_message ((struct Slave *) chn, tmit_msg,
2095 first_ptype, last_ptype);
2101 * Cancel transmission of current message.
2103 * @param chn Channel to send to.
2104 * @param client Client the message originates from.
2107 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2109 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2111 struct GNUNET_MessageHeader msg;
2112 msg.size = htons (sizeof (msg));
2113 msg.type = htons (type);
2115 queue_message (chn, client, sizeof (msg), &msg, type, type);
2116 transmit_message (chn);
2118 /* FIXME: cleanup */
2123 * Incoming message from a master or slave client.
2126 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2127 const struct GNUNET_MessageHeader *msg)
2130 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2131 GNUNET_assert (NULL != chn);
2133 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2134 "%p Received message from client.\n", chn);
2135 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2137 if (GNUNET_YES != chn->is_ready)
2139 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2140 "%p Channel is not ready yet, disconnecting client.\n", chn);
2142 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2146 uint16_t size = ntohs (msg->size);
2147 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2149 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
2151 transmit_cancel (chn, client);
2152 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2156 uint16_t first_ptype = 0, last_ptype = 0;
2158 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2159 (const char *) &msg[1],
2160 &first_ptype, &last_ptype))
2162 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2163 "%p Received invalid message part from client.\n", chn);
2165 transmit_cancel (chn, client);
2166 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2169 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2170 "%p Received message with first part type %u and last part type %u.\n",
2171 chn, first_ptype, last_ptype);
2173 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2174 first_ptype, last_ptype);
2175 transmit_message (chn);
2176 /* FIXME: send a few ACKs even before transmit_notify is called */
2178 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2182 struct MembershipStoreClosure
2184 struct GNUNET_SERVER_Client *client;
2185 struct Channel *chn;
2191 * Received result of GNUNET_PSYCSTORE_membership_store()
2194 store_recv_membership_store_result (void *cls, int64_t result,
2195 const char *err_msg, uint16_t err_msg_size)
2197 struct MembershipStoreClosure *mcls = cls;
2198 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2199 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2200 mcls->chn, result, err_msg_size, err_msg);
2202 client_send_result (mcls->client, mcls->op_id, result, err_msg, err_msg_size);
2207 * Client requests to add/remove a slave in the membership database.
2210 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2211 const struct GNUNET_MessageHeader *msg)
2214 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2215 GNUNET_assert (NULL != chn);
2217 const struct ChannelMembershipStoreRequest *
2218 req = (const struct ChannelMembershipStoreRequest *) msg;
2220 struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls));
2221 mcls->client = client;
2223 mcls->op_id = req->op_id;
2225 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2226 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2227 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2228 "%p Received membership store request from client.\n", chn);
2229 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2230 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2231 chn, req->did_join, announced_at, effective_since);
2233 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2234 req->did_join, announced_at, effective_since,
2235 0, /* FIXME: group_generation */
2236 &store_recv_membership_store_result, mcls);
2237 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2242 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2243 * in response to a history request from a client.
2246 store_recv_fragment_history (void *cls,
2247 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2248 enum GNUNET_PSYCSTORE_MessageFlags flags)
2250 struct Operation *op = cls;
2251 if (NULL == op->client)
2252 { /* Requesting client already disconnected. */
2255 struct Channel *chn = op->chn;
2257 struct GNUNET_PSYC_MessageHeader *pmsg;
2258 uint16_t msize = ntohs (mmsg->header.size);
2259 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2261 struct GNUNET_OperationResultMessage *
2262 res = GNUNET_malloc (sizeof (*res) + psize);
2263 res->header.size = htons (sizeof (*res) + psize);
2264 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2265 res->op_id = op->op_id;
2266 res->result_code = GNUNET_htonll_signed (GNUNET_OK);
2268 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2269 psyc_msg_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2270 memcpy (&res[1], pmsg, psize);
2272 /** @todo FIXME: send only to requesting client */
2273 client_send_msg (chn, &res->header);
2279 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2280 * in response to a history request from a client.
2283 store_recv_fragment_history_result (void *cls, int64_t result,
2284 const char *err_msg, uint16_t err_msg_size)
2286 struct Operation *op = cls;
2287 if (NULL == op->client)
2288 { /* Requesting client already disconnected. */
2292 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2293 "%p History replay #%" PRIu64 ": "
2294 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2295 op->chn, op->op_id, result, err_msg_size, err_msg);
2297 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2299 /** @todo Multicast replay request for messages not found locally. */
2302 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2307 * Client requests channel history.
2310 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2311 const struct GNUNET_MessageHeader *msg)
2314 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2315 GNUNET_assert (NULL != chn);
2317 const struct GNUNET_PSYC_HistoryRequestMessage *
2318 req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2319 uint16_t size = ntohs (msg->size);
2320 const char *method_prefix = (const char *) &req[1];
2322 if (size < sizeof (*req) + 1
2323 || '\0' != method_prefix[size - sizeof (*req) - 1])
2325 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2326 "%p History replay #%" PRIu64 ": "
2327 "invalid method prefix. size: %u < %u?\n",
2328 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2330 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2334 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2336 if (0 == req->message_limit)
2337 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2338 GNUNET_ntohll (req->start_message_id),
2339 GNUNET_ntohll (req->end_message_id),
2341 &store_recv_fragment_history,
2342 &store_recv_fragment_history_result, op);
2344 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2345 GNUNET_ntohll (req->message_limit),
2347 &store_recv_fragment_history,
2348 &store_recv_fragment_history_result,
2351 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2356 * Received state var from PSYCstore, send it to client.
2359 store_recv_state_var (void *cls, const char *name,
2360 const void *value, size_t value_size)
2362 struct Operation *op = cls;
2363 struct GNUNET_OperationResultMessage *res;
2367 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2368 struct GNUNET_PSYC_MessageModifier *mod;
2369 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2370 res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2371 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2372 res->op_id = op->op_id;
2374 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2375 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2376 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2377 mod->name_size = htons (name_size);
2378 mod->value_size = htonl (value_size);
2379 mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2380 memcpy (&mod[1], name, name_size);
2381 memcpy (((char *) &mod[1]) + name_size, value, value_size);
2385 struct GNUNET_MessageHeader *mod;
2386 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2387 res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2388 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2389 res->op_id = op->op_id;
2391 mod = (struct GNUNET_MessageHeader *) &res[1];
2392 mod->size = htons (sizeof (*mod) + value_size);
2393 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2394 memcpy (&mod[1], value, value_size);
2397 // FIXME: client might have been disconnected
2398 GNUNET_SERVER_notification_context_add (nc, op->client);
2399 GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2407 * Received result of GNUNET_PSYCSTORE_state_get()
2408 * or GNUNET_PSYCSTORE_state_get_prefix()
2411 store_recv_state_result (void *cls, int64_t result,
2412 const char *err_msg, uint16_t err_msg_size)
2414 struct Operation *op = cls;
2415 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2416 "%p History replay #%" PRIu64 ": "
2417 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2418 op->chn, op->op_id, result, err_msg_size, err_msg);
2420 // FIXME: client might have been disconnected
2421 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2426 * Client requests best matching state variable from PSYCstore.
2429 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2430 const struct GNUNET_MessageHeader *msg)
2433 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2434 GNUNET_assert (NULL != chn);
2436 const struct StateRequest *
2437 req = (const struct StateRequest *) msg;
2439 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2440 const char *name = (const char *) &req[1];
2441 if (0 == name_size || '\0' != name[name_size - 1])
2444 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2448 struct Operation *op = op_add (chn, client, req->op_id, 0);
2449 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2450 &store_recv_state_var,
2451 &store_recv_state_result, op);
2452 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2457 * Client requests state variables with a given prefix from PSYCstore.
2460 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2461 const struct GNUNET_MessageHeader *msg)
2464 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2465 GNUNET_assert (NULL != chn);
2467 const struct StateRequest *
2468 req = (const struct StateRequest *) msg;
2470 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2471 const char *name = (const char *) &req[1];
2472 if (0 == name_size || '\0' != name[name_size - 1])
2475 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2479 struct Operation *op = op_add (chn, client, req->op_id, 0);
2480 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2481 &store_recv_state_var,
2482 &store_recv_state_result, op);
2483 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2487 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2488 { &client_recv_master_start, NULL,
2489 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2491 { &client_recv_slave_join, NULL,
2492 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2494 { &client_recv_join_decision, NULL,
2495 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2497 { &client_recv_psyc_message, NULL,
2498 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2500 { &client_recv_membership_store, NULL,
2501 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2503 { &client_recv_history_replay, NULL,
2504 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2506 { &client_recv_state_get, NULL,
2507 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2509 { &client_recv_state_get_prefix, NULL,
2510 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2512 { NULL, NULL, 0, 0 }
2517 * Initialize the PSYC service.
2519 * @param cls Closure.
2520 * @param server The initialized server.
2521 * @param c Configuration to use.
2524 run (void *cls, struct GNUNET_SERVER_Handle *server,
2525 const struct GNUNET_CONFIGURATION_Handle *c)
2528 store = GNUNET_PSYCSTORE_connect (cfg);
2529 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2530 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2531 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2532 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2533 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2534 nc = GNUNET_SERVER_notification_context_create (server, 1);
2535 GNUNET_SERVER_add_handlers (server, server_handlers);
2536 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2537 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2538 &shutdown_task, NULL);
2543 * The main function for the service.
2545 * @param argc number of arguments from the command line
2546 * @param argv command line arguments
2547 * @return 0 ok, 1 on error
2550 main (int argc, char *const *argv)
2552 return (GNUNET_OK ==
2553 GNUNET_SERVICE_run (argc, argv, "psyc",
2554 GNUNET_SERVICE_OPTION_NONE,
2555 &run, NULL)) ? 0 : 1;
2558 /* end of gnunet-service-psyc.c */