2 * This file is part of GNUnet
3 * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
42 * Handle to our current configuration.
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
47 * Handle to the statistics service.
49 static struct GNUNET_STATISTICS_Handle *stats;
52 * Notification context, simplifies client broadcasts.
54 static struct GNUNET_SERVER_NotificationContext *nc;
57 * Handle to the PSYCstore.
59 static struct GNUNET_PSYCSTORE_Handle *store;
62 * All connected masters.
63 * Channel's pub_key_hash -> struct Master
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
68 * All connected slaves.
69 * Channel's pub_key_hash -> struct Slave
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
74 * Connected slaves per channel.
75 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
81 * Message in the transmission queue.
83 struct TransmitMessage
85 struct TransmitMessage *prev;
86 struct TransmitMessage *next;
88 struct GNUNET_SERVER_Client *client;
91 * ID assigned to the message.
101 * @see enum MessageState
106 * Whether a message ACK has already been sent to the client.
107 * #GNUNET_YES or #GNUNET_NO
111 /* Followed by message */
116 * Cache for received message fragments.
117 * Message fragments are only sent to clients after all modifiers arrived.
119 * chan_key -> MultiHashMap chan_msgs
121 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
125 * Entry in the chan_msgs hashmap of @a recv_cache:
126 * fragment_id -> RecvCacheEntry
128 struct RecvCacheEntry
130 struct GNUNET_MULTICAST_MessageHeader *mmsg;
136 * Entry in the @a recv_frags hash map of a @a Channel.
137 * message_id -> FragmentQueue
142 * Fragment IDs stored in @a recv_cache.
144 struct GNUNET_CONTAINER_Heap *fragments;
147 * Total size of received fragments.
152 * Total size of received header fragments (METHOD & MODIFIERs)
154 uint64_t header_size;
157 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
159 uint64_t state_delta;
162 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
167 * Receive state of message.
169 * @see MessageFragmentState
174 * Whether the state is already modified in PSYCstore.
176 uint8_t state_is_modified;
179 * Is the message queued for delivery to the client?
180 * i.e. added to the recv_msgs queue
187 * List of connected clients.
194 struct GNUNET_SERVER_Client *client;
200 struct Operation *prev;
201 struct Operation *next;
203 struct GNUNET_SERVER_Client *client;
211 * Common part of the client context for both a channel master and slave.
215 struct Client *clients_head;
216 struct Client *clients_tail;
218 struct Operation *op_head;
219 struct Operation *op_tail;
221 struct TransmitMessage *tmit_head;
222 struct TransmitMessage *tmit_tail;
225 * Current PSYCstore operation.
227 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
230 * Received fragments not yet sent to the client.
231 * message_id -> FragmentQueue
233 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
236 * Received message IDs not yet sent to the client.
238 struct GNUNET_CONTAINER_Heap *recv_msgs;
241 * Public key of the channel.
243 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
246 * Hash of @a pub_key.
248 struct GNUNET_HashCode pub_key_hash;
251 * Last message ID sent to the client.
252 * 0 if there is no such message.
254 uint64_t max_message_id;
257 * ID of the last stateful message, where the state operations has been
258 * processed and saved to PSYCstore and which has been sent to the client.
259 * 0 if there is no such message.
261 uint64_t max_state_message_id;
264 * Expected value size for the modifier being received from the PSYC service.
266 uint32_t tmit_mod_value_size_expected;
269 * Actual value size for the modifier being received from the PSYC service.
271 uint32_t tmit_mod_value_size;
274 * @see enum MessageState
279 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
284 * Is this channel ready to receive messages from client?
285 * #GNUNET_YES or #GNUNET_NO
290 * Is the client disconnected?
291 * #GNUNET_YES or #GNUNET_NO
293 uint8_t is_disconnected;
298 * Client context for a channel master.
303 * Channel struct common for Master and Slave
308 * Private key of the channel.
310 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
313 * Handle for the multicast origin.
315 struct GNUNET_MULTICAST_Origin *origin;
318 * Transmit handle for multicast.
320 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
323 * Incoming join requests from multicast.
324 * member_key -> struct GNUNET_MULTICAST_JoinHandle *
326 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
329 * Last message ID transmitted to this channel.
331 * Incremented before sending a message, thus the message_id in messages sent
334 uint64_t max_message_id;
337 * ID of the last message with state operations transmitted to the channel.
338 * 0 if there is no such message.
340 uint64_t max_state_message_id;
343 * Maximum group generation transmitted to the channel.
345 uint64_t max_group_generation;
348 * @see enum GNUNET_PSYC_Policy
350 enum GNUNET_PSYC_Policy policy;
355 * Client context for a channel slave.
360 * Channel struct common for Master and Slave
365 * Private key of the slave.
367 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
370 * Public key of the slave.
372 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
375 * Hash of @a pub_key.
377 struct GNUNET_HashCode pub_key_hash;
380 * Handle for the multicast member.
382 struct GNUNET_MULTICAST_Member *member;
385 * Transmit handle for multicast.
387 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
390 * Peer identity of the origin.
392 struct GNUNET_PeerIdentity origin;
395 * Number of items in @a relays.
397 uint32_t relay_count;
400 * Relays that multicast can use to connect.
402 struct GNUNET_PeerIdentity *relays;
405 * Join request to be transmitted to the master on join.
407 struct GNUNET_PSYC_Message *join_msg;
410 * Join decision received from multicast.
412 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
415 * Maximum request ID for this channel.
417 uint64_t max_request_id;
422 transmit_message (struct Channel *chn);
425 message_queue_run (struct Channel *chn);
428 message_queue_drop (struct Channel *chn);
432 * Task run during shutdown.
438 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
442 GNUNET_SERVER_notification_context_destroy (nc);
447 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
453 static struct Operation *
454 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
455 uint64_t op_id, uint32_t flags)
457 struct Operation *op = GNUNET_malloc (sizeof (*op));
462 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
468 op_remove (struct Operation *op)
470 GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
476 * Clean up master data structures after a client disconnected.
479 cleanup_master (struct Master *mst)
481 struct Channel *chn = &mst->chn;
483 if (NULL != mst->origin)
484 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
485 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
486 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
491 * Clean up slave data structures after a client disconnected.
494 cleanup_slave (struct Slave *slv)
496 struct Channel *chn = &slv->chn;
497 struct GNUNET_CONTAINER_MultiHashMap *
498 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
500 GNUNET_assert (NULL != chn_slv);
501 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
503 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
505 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
507 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
509 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
511 if (NULL != slv->join_msg)
513 GNUNET_free (slv->join_msg);
514 slv->join_msg = NULL;
516 if (NULL != slv->relays)
518 GNUNET_free (slv->relays);
521 if (NULL != slv->member)
523 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
526 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
531 * Clean up channel data structures after a client disconnected.
534 cleanup_channel (struct Channel *chn)
536 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
537 "%p Cleaning up channel %s. master? %u\n",
538 chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
539 message_queue_drop (chn);
540 GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
541 chn->recv_frags = NULL;
543 if (NULL != chn->store_op)
545 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
546 chn->store_op = NULL;
549 (GNUNET_YES == chn->is_master)
550 ? cleanup_master ((struct Master *) chn)
551 : cleanup_slave ((struct Slave *) chn);
557 * Called whenever a client is disconnected.
558 * Frees our resources associated with that client.
560 * @param cls Closure.
561 * @param client Identification of the client.
564 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
570 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
574 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
575 "%p User context is NULL in client_disconnect()\n", chn);
580 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
581 "%p Client (%s) disconnected from channel %s\n",
582 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
583 GNUNET_h2s (&chn->pub_key_hash));
585 chn->is_disconnected = GNUNET_YES;
587 struct Client *cli = chn->clients_head;
590 if (cli->client == client)
592 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
599 struct Operation *op = chn->op_head;
602 if (op->client == client)
610 if (NULL == chn->clients_head)
611 { /* Last client disconnected. */
612 if (NULL != chn->tmit_head)
613 { /* Send pending messages to multicast before cleanup. */
614 transmit_message (chn);
618 cleanup_channel (chn);
625 * Send message to all clients connected to the channel.
628 client_send_msg (const struct Channel *chn,
629 const struct GNUNET_MessageHeader *msg)
631 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632 "%p Sending message to clients.\n", chn);
634 struct Client *cli = chn->clients_head;
637 GNUNET_SERVER_notification_context_add (nc, cli->client);
638 GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
645 * Send a result code back to the client.
648 * Client that should receive the result code.
652 * Operation ID in network byte order.
654 * Data payload or NULL.
659 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
660 int64_t result_code, const void *data, uint16_t data_size)
662 struct GNUNET_OperationResultMessage *res;
664 res = GNUNET_malloc (sizeof (*res) + data_size);
665 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
666 res->header.size = htons (sizeof (*res) + data_size);
667 res->result_code = GNUNET_htonll (result_code);
670 memcpy (&res[1], data, data_size);
672 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
673 "%p Sending result to client for operation #%" PRIu64 ": "
674 "%" PRId64 " (size: %u)\n",
675 client, GNUNET_ntohll (op_id), result_code, data_size);
677 GNUNET_SERVER_notification_context_add (nc, client);
678 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
685 * Closure for join_mem_test_cb()
687 struct JoinMemTestClosure
689 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
691 struct GNUNET_MULTICAST_JoinHandle *jh;
692 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
697 * Membership test result callback used for join requests.
700 join_mem_test_cb (void *cls, int64_t result,
701 const char *err_msg, uint16_t err_msg_size)
703 struct JoinMemTestClosure *jcls = cls;
705 if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
706 { /* Pass on join request to client if this is a master channel */
707 struct Master *mst = (struct Master *) jcls->chn;
708 struct GNUNET_HashCode slave_key_hash;
709 GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
711 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
712 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
713 client_send_msg (jcls->chn, &jcls->join_msg->header);
717 if (GNUNET_SYSERR == result)
719 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
720 "Could not perform membership test (%.*s)\n",
721 err_msg_size, err_msg);
724 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
726 GNUNET_free (jcls->join_msg);
732 * Incoming join request from multicast.
735 mcast_recv_join_request (void *cls,
736 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
737 const struct GNUNET_MessageHeader *join_msg,
738 struct GNUNET_MULTICAST_JoinHandle *jh)
740 struct Channel *chn = cls;
741 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
743 uint16_t join_msg_size = 0;
744 if (NULL != join_msg)
746 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
748 join_msg_size = ntohs (join_msg->size);
752 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
753 "%p Got join message with invalid type %u.\n",
754 chn, ntohs (join_msg->type));
758 struct GNUNET_PSYC_JoinRequestMessage *
759 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
760 req->header.size = htons (sizeof (*req) + join_msg_size);
761 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
762 req->slave_key = *slave_key;
763 if (0 < join_msg_size)
764 memcpy (&req[1], join_msg, join_msg_size);
766 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
767 jcls->slave_key = *slave_key;
770 jcls->join_msg = req;
772 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
773 chn->max_message_id, 0,
774 &join_mem_test_cb, jcls);
779 * Join decision received from multicast.
782 mcast_recv_join_decision (void *cls, int is_admitted,
783 const struct GNUNET_PeerIdentity *peer,
784 uint16_t relay_count,
785 const struct GNUNET_PeerIdentity *relays,
786 const struct GNUNET_MessageHeader *join_resp)
788 struct Slave *slv = cls;
789 struct Channel *chn = &slv->chn;
790 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791 "%p Got join decision: %d\n", slv, is_admitted);
793 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
794 struct GNUNET_PSYC_JoinDecisionMessage *
795 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
796 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
797 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
798 dcsn->is_admitted = htonl (is_admitted);
799 if (0 < join_resp_size)
800 memcpy (&dcsn[1], join_resp, join_resp_size);
802 client_send_msg (chn, &dcsn->header);
804 if (GNUNET_YES == is_admitted)
806 chn->is_ready = GNUNET_YES;
816 * Received result of GNUNET_PSYCSTORE_membership_test()
819 store_recv_membership_test_result (void *cls, int64_t result,
820 const char *err_msg, uint16_t err_msg_size)
822 struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
823 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
824 "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%.*s)\n",
825 mth, result, err_msg_size, err_msg);
827 GNUNET_MULTICAST_membership_test_result (mth, result);
832 * Incoming membership test request from multicast.
835 mcast_recv_membership_test (void *cls,
836 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
837 uint64_t message_id, uint64_t group_generation,
838 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
840 struct Channel *chn = cls;
841 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
842 "%p Received membership test request from multicast.\n",
844 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
845 message_id, group_generation,
846 &store_recv_membership_test_result, mth);
851 store_recv_fragment_replay (void *cls,
852 struct GNUNET_MULTICAST_MessageHeader *msg,
853 enum GNUNET_PSYCSTORE_MessageFlags flags)
855 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
857 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
863 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
866 store_recv_fragment_replay_result (void *cls, int64_t result,
867 const char *err_msg, uint16_t err_msg_size)
869 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
870 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
872 rh, result, err_msg_size, err_msg);
880 GNUNET_MULTICAST_replay_response (rh, NULL,
881 GNUNET_MULTICAST_REC_NOT_FOUND);
884 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
885 GNUNET_MULTICAST_replay_response (rh, NULL,
886 GNUNET_MULTICAST_REC_ACCESS_DENIED);
890 GNUNET_MULTICAST_replay_response (rh, NULL,
891 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
894 GNUNET_MULTICAST_replay_response_end (rh);
899 * Incoming fragment replay request from multicast.
902 mcast_recv_replay_fragment (void *cls,
903 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
904 uint64_t fragment_id, uint64_t flags,
905 struct GNUNET_MULTICAST_ReplayHandle *rh)
908 struct Channel *chn = cls;
909 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
910 fragment_id, fragment_id,
911 &store_recv_fragment_replay,
912 &store_recv_fragment_replay_result, rh);
917 * Incoming message replay request from multicast.
920 mcast_recv_replay_message (void *cls,
921 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
923 uint64_t fragment_offset,
925 struct GNUNET_MULTICAST_ReplayHandle *rh)
927 struct Channel *chn = cls;
928 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
929 message_id, message_id, 1, NULL,
930 &store_recv_fragment_replay,
931 &store_recv_fragment_replay_result, rh);
936 * Convert an uint64_t in network byte order to a HashCode
937 * that can be used as key in a MultiHashMap
940 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
942 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
943 /* TODO: use built-in byte swap functions if available */
945 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
946 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
948 *key = (struct GNUNET_HashCode) {};
950 = (n << 32) | (n >> 32);
955 * Convert an uint64_t in host byte order to a HashCode
956 * that can be used as key in a MultiHashMap
959 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
961 #if __BYTE_ORDER == __BIG_ENDIAN
962 hash_key_from_nll (key, n);
963 #elif __BYTE_ORDER == __LITTLE_ENDIAN
964 *key = (struct GNUNET_HashCode) {};
965 *((uint64_t *) key) = n;
967 #error byteorder undefined
973 * Initialize PSYC message header.
976 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
977 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
979 uint16_t size = ntohs (mmsg->header.size);
980 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
982 pmsg->header.size = htons (psize);
983 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
984 pmsg->message_id = mmsg->message_id;
985 pmsg->fragment_offset = mmsg->fragment_offset;
986 pmsg->flags = htonl (flags);
988 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
993 * Create a new PSYC message from a multicast message for sending it to clients.
995 static inline struct GNUNET_PSYC_MessageHeader *
996 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
998 struct GNUNET_PSYC_MessageHeader *pmsg;
999 uint16_t size = ntohs (mmsg->header.size);
1000 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1002 pmsg = GNUNET_malloc (psize);
1003 psyc_msg_init (pmsg, mmsg, flags);
1009 * Send multicast message to all clients connected to the channel.
1012 client_send_mcast_msg (struct Channel *chn,
1013 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1016 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1017 "%p Sending multicast message to client. "
1018 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1019 chn, GNUNET_ntohll (mmsg->fragment_id),
1020 GNUNET_ntohll (mmsg->message_id));
1022 struct GNUNET_PSYC_MessageHeader *
1023 pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1024 client_send_msg (chn, &pmsg->header);
1030 * Send multicast request to all clients connected to the channel.
1033 client_send_mcast_req (struct Master *mst,
1034 const struct GNUNET_MULTICAST_RequestHeader *req)
1036 struct Channel *chn = &mst->chn;
1038 struct GNUNET_PSYC_MessageHeader *pmsg;
1039 uint16_t size = ntohs (req->header.size);
1040 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1042 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043 "%p Sending multicast request to client. "
1044 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1045 chn, GNUNET_ntohll (req->fragment_id),
1046 GNUNET_ntohll (req->request_id));
1048 pmsg = GNUNET_malloc (psize);
1049 pmsg->header.size = htons (psize);
1050 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1051 pmsg->message_id = req->request_id;
1052 pmsg->fragment_offset = req->fragment_offset;
1053 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1054 pmsg->slave_key = req->member_key;
1056 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1057 client_send_msg (chn, &pmsg->header);
1063 * Insert a multicast message fragment into the queue belonging to the message.
1065 * @param chn Channel.
1066 * @param mmsg Multicast message fragment.
1067 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
1068 * @param first_ptype First PSYC message part type in @a mmsg.
1069 * @param last_ptype Last PSYC message part type in @a mmsg.
1072 fragment_queue_insert (struct Channel *chn,
1073 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1074 uint16_t first_ptype, uint16_t last_ptype)
1076 const uint16_t size = ntohs (mmsg->header.size);
1077 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1078 struct GNUNET_CONTAINER_MultiHashMap
1079 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1080 &chn->pub_key_hash);
1082 struct GNUNET_HashCode msg_id_hash;
1083 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1085 struct FragmentQueue
1086 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1090 fragq = GNUNET_new (struct FragmentQueue);
1091 fragq->state = MSG_FRAG_STATE_HEADER;
1093 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1095 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1096 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1098 if (NULL == chan_msgs)
1100 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1101 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1102 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1106 struct GNUNET_HashCode frag_id_hash;
1107 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1108 struct RecvCacheEntry
1109 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1110 if (NULL == cache_entry)
1112 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113 "%p Adding message fragment to cache. "
1114 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1115 chn, GNUNET_ntohll (mmsg->message_id),
1116 GNUNET_ntohll (mmsg->fragment_id));
1117 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1118 "%p header_size: %" PRIu64 " + %u\n",
1119 chn, fragq->header_size, size);
1120 cache_entry = GNUNET_new (struct RecvCacheEntry);
1121 cache_entry->ref_count = 1;
1122 cache_entry->mmsg = GNUNET_malloc (size);
1123 memcpy (cache_entry->mmsg, mmsg, size);
1124 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1125 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1129 cache_entry->ref_count++;
1130 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1131 "%p Message fragment is already in cache. "
1132 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1133 ", ref_count: %u\n",
1134 chn, GNUNET_ntohll (mmsg->message_id),
1135 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1138 if (MSG_FRAG_STATE_HEADER == fragq->state)
1140 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1142 struct GNUNET_PSYC_MessageMethod *
1143 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1144 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1145 fragq->flags = ntohl (pmeth->flags);
1148 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1150 fragq->header_size += size;
1152 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1153 || frag_offset == fragq->header_size)
1154 { /* header is now complete */
1155 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1156 "%p Header of message %" PRIu64 " is complete.\n",
1157 chn, GNUNET_ntohll (mmsg->message_id));
1159 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1160 "%p Adding message %" PRIu64 " to queue.\n",
1161 chn, GNUNET_ntohll (mmsg->message_id));
1162 fragq->state = MSG_FRAG_STATE_DATA;
1166 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1167 "%p Header of message %" PRIu64 " is NOT complete yet: "
1168 "%" PRIu64 " != %" PRIu64 "\n",
1169 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1170 fragq->header_size);
1176 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1177 if (frag_offset == fragq->size)
1178 fragq->state = MSG_FRAG_STATE_END;
1180 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1181 "%p Message %" PRIu64 " is NOT complete yet: "
1182 "%" PRIu64 " != %" PRIu64 "\n",
1183 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1187 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1188 /* Drop message without delivering to client if it's a single fragment */
1190 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1191 ? MSG_FRAG_STATE_DROP
1192 : MSG_FRAG_STATE_CANCEL;
1195 switch (fragq->state)
1197 case MSG_FRAG_STATE_DATA:
1198 case MSG_FRAG_STATE_END:
1199 case MSG_FRAG_STATE_CANCEL:
1200 if (GNUNET_NO == fragq->is_queued)
1202 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1203 GNUNET_ntohll (mmsg->message_id));
1204 fragq->is_queued = GNUNET_YES;
1208 fragq->size += size;
1209 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1210 GNUNET_ntohll (mmsg->fragment_id));
1215 * Run fragment queue of a message.
1217 * Send fragments of a message in order to client, after all modifiers arrived
1220 * @param chn Channel.
1221 * @param msg_id ID of the message @a fragq belongs to.
1222 * @param fragq Fragment queue of the message.
1223 * @param drop Drop message without delivering to client?
1224 * #GNUNET_YES or #GNUNET_NO.
1227 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1228 struct FragmentQueue *fragq, uint8_t drop)
1230 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1231 "%p Running message fragment queue for message %" PRIu64
1233 chn, msg_id, fragq->state);
1235 struct GNUNET_CONTAINER_MultiHashMap
1236 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1237 &chn->pub_key_hash);
1238 GNUNET_assert (NULL != chan_msgs);
1241 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1244 struct GNUNET_HashCode frag_id_hash;
1245 hash_key_from_hll (&frag_id_hash, frag_id);
1246 struct RecvCacheEntry *cache_entry
1247 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1248 if (cache_entry != NULL)
1250 if (GNUNET_NO == drop)
1252 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1254 if (cache_entry->ref_count <= 1)
1256 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1258 GNUNET_free (cache_entry->mmsg);
1259 GNUNET_free (cache_entry);
1263 cache_entry->ref_count--;
1266 #if CACHE_AGING_IMPLEMENTED
1267 else if (GNUNET_NO == drop)
1269 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1273 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1276 if (MSG_FRAG_STATE_END <= fragq->state)
1278 struct GNUNET_HashCode msg_id_hash;
1279 hash_key_from_hll (&msg_id_hash, msg_id);
1281 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1282 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1283 GNUNET_free (fragq);
1287 fragq->is_queued = GNUNET_NO;
1292 struct StateModifyClosure
1294 struct Channel *chn;
1296 struct GNUNET_HashCode msg_id_hash;
1301 store_recv_state_modify_result (void *cls, int64_t result,
1302 const char *err_msg, uint16_t err_msg_size)
1304 struct StateModifyClosure *mcls = cls;
1305 struct Channel *chn = mcls->chn;
1306 uint64_t msg_id = mcls->msg_id;
1308 struct FragmentQueue *
1309 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1311 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1312 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1313 chn, result, err_msg_size, err_msg);
1320 fragq->state_is_modified = GNUNET_YES;
1321 if (chn->max_state_message_id < msg_id)
1322 chn->max_state_message_id = msg_id;
1323 if (chn->max_message_id < msg_id)
1324 chn->max_message_id = msg_id;
1327 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1328 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1329 message_queue_run (chn);
1333 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1334 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1335 chn, result, err_msg_size, err_msg);
1336 /** @todo FIXME: handle state_modify error */
1342 * Run message queue.
1344 * Send messages in queue to client in order after a message has arrived from
1345 * multicast, according to the following:
1346 * - A message is only sent if all of its modifiers arrived.
1347 * - A stateful message is only sent if the previous stateful message
1348 * has already been delivered to the client.
1350 * @param chn Channel.
1352 * @return Number of messages removed from queue and sent to client.
1355 message_queue_run (struct Channel *chn)
1357 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1358 "%p Running message queue.\n", chn);
1362 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1365 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1366 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1367 struct GNUNET_HashCode msg_id_hash;
1368 hash_key_from_hll (&msg_id_hash, msg_id);
1370 struct FragmentQueue *
1371 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1373 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1375 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1376 "%p No fragq (%p) or header not complete.\n",
1381 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1382 "%p Fragment queue entry: state: %u, state delta: "
1383 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1384 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1386 if (MSG_FRAG_STATE_DATA <= fragq->state)
1388 /* Check if there's a missing message before the current one */
1389 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1393 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1394 && (chn->max_message_id != msg_id - 1
1395 && chn->max_message_id != msg_id))
1397 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1398 "%p Out of order message. "
1399 "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1400 chn, chn->max_message_id, msg_id);
1402 // FIXME: keep track of messages processed in this queue run,
1403 // and only stop after reaching the end
1408 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1409 if (GNUNET_YES != fragq->state_is_modified)
1411 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1413 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1414 "%p Out of order stateful message. "
1415 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1416 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1418 // FIXME: keep track of messages processed in this queue run,
1419 // and only stop after reaching the end
1422 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1424 mcls->msg_id = msg_id;
1425 mcls->msg_id_hash = msg_id_hash;
1427 /* Apply modifiers to state in PSYCstore */
1428 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1430 store_recv_state_modify_result, mcls);
1431 break; // continue after asynchronous state modify result
1434 chn->max_message_id = msg_id;
1436 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1437 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1441 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1442 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1448 * Drop message queue of a channel.
1450 * Remove all messages in queue without sending it to clients.
1452 * @param chn Channel.
1454 * @return Number of messages removed from queue.
1457 message_queue_drop (struct Channel *chn)
1459 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1460 "%p Dropping message queue.\n", chn);
1463 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1466 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1467 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1468 struct GNUNET_HashCode msg_id_hash;
1469 hash_key_from_hll (&msg_id_hash, msg_id);
1471 struct FragmentQueue *
1472 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1473 GNUNET_assert (NULL != fragq);
1474 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1475 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1478 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1479 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1485 * Received result of GNUNET_PSYCSTORE_fragment_store().
1488 store_recv_fragment_store_result (void *cls, int64_t result,
1489 const char *err_msg, uint16_t err_msg_size)
1491 struct Channel *chn = cls;
1492 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1494 chn, result, err_msg_size, err_msg);
1499 * Handle incoming message fragment from multicast.
1501 * Store it using PSYCstore and send it to the clients of the channel in order.
1504 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1506 struct Channel *chn = cls;
1507 uint16_t size = ntohs (mmsg->header.size);
1509 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1510 "%p Received multicast message of size %u.\n",
1513 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1514 &store_recv_fragment_store_result, chn);
1516 uint16_t first_ptype = 0, last_ptype = 0;
1518 == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1519 (const char *) &mmsg[1],
1520 &first_ptype, &last_ptype))
1522 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1523 "%p Dropping incoming multicast message with invalid parts.\n",
1525 GNUNET_break_op (0);
1529 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1530 "Message parts: first: type %u, last: type %u\n",
1531 first_ptype, last_ptype);
1533 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1534 message_queue_run (chn);
1539 * Incoming request fragment from multicast for a master.
1541 * @param cls Master.
1542 * @param req The request.
1545 mcast_recv_request (void *cls,
1546 const struct GNUNET_MULTICAST_RequestHeader *req)
1548 struct Master *mst = cls;
1549 uint16_t size = ntohs (req->header.size);
1551 char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_key);
1552 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1553 "%p Received multicast request of size %u from %s.\n",
1557 uint16_t first_ptype = 0, last_ptype = 0;
1559 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1560 (const char *) &req[1],
1561 &first_ptype, &last_ptype))
1563 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1564 "%p Dropping incoming multicast request with invalid parts.\n",
1566 GNUNET_break_op (0);
1570 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1571 "Message parts: first: type %u, last: type %u\n",
1572 first_ptype, last_ptype);
1574 /* FIXME: in-order delivery */
1575 client_send_mcast_req (mst, req);
1580 * Response from PSYCstore with the current counter values for a channel master.
1583 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1584 uint64_t max_message_id, uint64_t max_group_generation,
1585 uint64_t max_state_message_id)
1587 struct Master *mst = cls;
1588 struct Channel *chn = &mst->chn;
1589 chn->store_op = NULL;
1591 struct GNUNET_PSYC_CountersResultMessage res;
1592 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1593 res.header.size = htons (sizeof (res));
1594 res.result_code = htonl (result);
1595 res.max_message_id = GNUNET_htonll (max_message_id);
1597 if (GNUNET_OK == result || GNUNET_NO == result)
1599 mst->max_message_id = max_message_id;
1600 chn->max_message_id = max_message_id;
1601 chn->max_state_message_id = max_state_message_id;
1602 mst->max_group_generation = max_group_generation;
1604 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1605 mcast_recv_join_request,
1606 mcast_recv_membership_test,
1607 mcast_recv_replay_fragment,
1608 mcast_recv_replay_message,
1610 mcast_recv_message, chn);
1611 chn->is_ready = GNUNET_YES;
1615 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1616 "%p GNUNET_PSYCSTORE_counters_get() "
1617 "returned %d for channel %s.\n",
1618 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1621 client_send_msg (chn, &res.header);
1626 * Response from PSYCstore with the current counter values for a channel slave.
1629 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1630 uint64_t max_message_id, uint64_t max_group_generation,
1631 uint64_t max_state_message_id)
1633 struct Slave *slv = cls;
1634 struct Channel *chn = &slv->chn;
1635 chn->store_op = NULL;
1637 struct GNUNET_PSYC_CountersResultMessage res;
1638 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1639 res.header.size = htons (sizeof (res));
1640 res.result_code = htonl (result);
1641 res.max_message_id = GNUNET_htonll (max_message_id);
1643 if (GNUNET_OK == result || GNUNET_NO == result)
1645 chn->max_message_id = max_message_id;
1646 chn->max_state_message_id = max_state_message_id;
1648 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1650 slv->relay_count, slv->relays,
1651 &slv->join_msg->header,
1652 mcast_recv_join_request,
1653 mcast_recv_join_decision,
1654 mcast_recv_membership_test,
1655 mcast_recv_replay_fragment,
1656 mcast_recv_replay_message,
1657 mcast_recv_message, chn);
1658 if (NULL != slv->join_msg)
1660 GNUNET_free (slv->join_msg);
1661 slv->join_msg = NULL;
1666 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1667 "%p GNUNET_PSYCSTORE_counters_get() "
1668 "returned %d for channel %s.\n",
1669 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1672 client_send_msg (chn, &res.header);
1677 channel_init (struct Channel *chn)
1680 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1681 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1686 * Handle a connecting client starting a channel master.
1689 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1690 const struct GNUNET_MessageHeader *msg)
1692 const struct MasterStartRequest *req
1693 = (const struct MasterStartRequest *) msg;
1695 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1696 struct GNUNET_HashCode pub_key_hash;
1698 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1699 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1702 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1703 struct Channel *chn;
1707 mst = GNUNET_new (struct Master);
1708 mst->policy = ntohl (req->policy);
1709 mst->priv_key = req->channel_key;
1710 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1713 chn->is_master = GNUNET_YES;
1714 chn->pub_key = pub_key;
1715 chn->pub_key_hash = pub_key_hash;
1718 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1719 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1720 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1721 store_recv_master_counters, mst);
1727 struct GNUNET_PSYC_CountersResultMessage res;
1728 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1729 res.header.size = htons (sizeof (res));
1730 res.result_code = htonl (GNUNET_OK);
1731 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1733 GNUNET_SERVER_notification_context_add (nc, client);
1734 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1738 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1739 "%p Client connected as master to channel %s.\n",
1740 mst, GNUNET_h2s (&chn->pub_key_hash));
1742 struct Client *cli = GNUNET_new (struct Client);
1743 cli->client = client;
1744 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1746 GNUNET_SERVER_client_set_user_context (client, chn);
1747 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1752 * Handle a connecting client joining as a channel slave.
1755 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1756 const struct GNUNET_MessageHeader *msg)
1758 const struct SlaveJoinRequest *req
1759 = (const struct SlaveJoinRequest *) msg;
1760 uint16_t req_size = ntohs (req->header.size);
1762 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1763 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1765 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1766 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1767 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1769 struct GNUNET_CONTAINER_MultiHashMap *
1770 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1771 struct Slave *slv = NULL;
1772 struct Channel *chn;
1774 if (NULL != chn_slv)
1776 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1780 slv = GNUNET_new (struct Slave);
1781 slv->priv_key = req->slave_key;
1782 slv->pub_key = slv_pub_key;
1783 slv->pub_key_hash = slv_pub_key_hash;
1784 slv->origin = req->origin;
1785 slv->relay_count = ntohl (req->relay_count);
1787 const struct GNUNET_PeerIdentity *
1788 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1789 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1790 uint16_t join_msg_size = 0;
1792 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1795 struct GNUNET_PSYC_Message *
1796 join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1797 join_msg_size = ntohs (join_msg->header.size);
1798 slv->join_msg = GNUNET_malloc (join_msg_size);
1799 memcpy (slv->join_msg, join_msg, join_msg_size);
1801 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1803 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1804 "%u + %u + %u != %u\n",
1805 sizeof (*req), relay_size, join_msg_size, req_size);
1807 GNUNET_SERVER_client_disconnect (client);
1811 if (0 < slv->relay_count)
1813 slv->relays = GNUNET_malloc (relay_size);
1814 memcpy (slv->relays, &req[1], relay_size);
1818 chn->is_master = GNUNET_NO;
1819 chn->pub_key = req->channel_key;
1820 chn->pub_key_hash = pub_key_hash;
1823 if (NULL == chn_slv)
1825 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1826 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1827 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1829 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1830 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1831 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1832 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1833 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1834 &store_recv_slave_counters, slv);
1840 struct GNUNET_PSYC_CountersResultMessage res;
1841 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1842 res.header.size = htons (sizeof (res));
1843 res.result_code = htonl (GNUNET_OK);
1844 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1846 GNUNET_SERVER_notification_context_add (nc, client);
1847 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1850 if (NULL == slv->member)
1853 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1855 slv->relay_count, slv->relays,
1856 &slv->join_msg->header,
1857 &mcast_recv_join_request,
1858 &mcast_recv_join_decision,
1859 &mcast_recv_membership_test,
1860 &mcast_recv_replay_fragment,
1861 &mcast_recv_replay_message,
1862 &mcast_recv_message, chn);
1863 if (NULL != slv->join_msg)
1865 GNUNET_free (slv->join_msg);
1866 slv->join_msg = NULL;
1869 else if (NULL != slv->join_dcsn)
1871 GNUNET_SERVER_notification_context_add (nc, client);
1872 GNUNET_SERVER_notification_context_unicast (nc, client,
1873 &slv->join_dcsn->header,
1878 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1879 "%p Client connected as slave to channel %s.\n",
1880 slv, GNUNET_h2s (&chn->pub_key_hash));
1882 struct Client *cli = GNUNET_new (struct Client);
1883 cli->client = client;
1884 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1886 GNUNET_SERVER_client_set_user_context (client, chn);
1887 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1891 struct JoinDecisionClosure
1893 int32_t is_admitted;
1894 struct GNUNET_MessageHeader *msg;
1899 * Iterator callback for sending join decisions to multicast.
1902 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1905 struct JoinDecisionClosure *jcls = cls;
1906 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1907 // FIXME: add relays
1908 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1914 * Join decision from client.
1917 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1918 const struct GNUNET_MessageHeader *msg)
1920 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1921 = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1922 struct Channel *chn;
1924 struct JoinDecisionClosure jcls;
1926 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1930 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1933 GNUNET_assert (GNUNET_YES == chn->is_master);
1934 mst = (struct Master *) chn;
1935 jcls.is_admitted = ntohl (dcsn->is_admitted);
1937 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1938 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1941 struct GNUNET_HashCode slave_key_hash;
1942 GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1945 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1946 "%p Got join decision (%d) from client for channel %s..\n",
1947 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1948 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1949 "%p ..and slave %s.\n",
1950 mst, GNUNET_h2s (&slave_key_hash));
1952 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1953 &mcast_send_join_decision, &jcls);
1954 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1955 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1960 * Send acknowledgement to a client.
1962 * Sent after a message fragment has been passed on to multicast.
1964 * @param chn The channel struct for the client.
1967 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1969 struct GNUNET_MessageHeader res;
1970 res.size = htons (sizeof (res));
1971 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1974 GNUNET_SERVER_notification_context_add (nc, client);
1975 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1980 * Callback for the transmit functions of multicast.
1983 transmit_notify (void *cls, size_t *data_size, void *data)
1985 struct Channel *chn = cls;
1986 struct TransmitMessage *tmit_msg = chn->tmit_head;
1988 if (NULL == tmit_msg || *data_size < tmit_msg->size)
1990 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1991 "%p transmit_notify: nothing to send.\n", chn);
1996 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1997 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1999 *data_size = tmit_msg->size;
2000 memcpy (data, &tmit_msg[1], *data_size);
2002 int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
2004 if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
2005 send_message_ack (chn, tmit_msg->client);
2007 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2008 GNUNET_free (tmit_msg);
2010 if (NULL != chn->tmit_head)
2012 transmit_message (chn);
2014 else if (GNUNET_YES == chn->is_disconnected)
2016 /* FIXME: handle partial message (when still in_transmit) */
2017 return GNUNET_SYSERR;
2024 * Callback for the transmit functions of multicast.
2027 master_transmit_notify (void *cls, size_t *data_size, void *data)
2029 int ret = transmit_notify (cls, data_size, data);
2031 if (GNUNET_YES == ret)
2033 struct Master *mst = cls;
2034 mst->tmit_handle = NULL;
2041 * Callback for the transmit functions of multicast.
2044 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2046 int ret = transmit_notify (cls, data_size, data);
2048 if (GNUNET_YES == ret)
2050 struct Slave *slv = cls;
2051 slv->tmit_handle = NULL;
2058 * Transmit a message from a channel master to the multicast group.
2061 master_transmit_message (struct Master *mst)
2063 if (NULL == mst->tmit_handle)
2066 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
2067 mst->max_group_generation,
2068 master_transmit_notify, mst);
2072 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2078 * Transmit a message from a channel slave to the multicast group.
2081 slave_transmit_message (struct Slave *slv)
2083 if (NULL == slv->tmit_handle)
2086 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
2087 slave_transmit_notify, slv);
2091 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2097 transmit_message (struct Channel *chn)
2100 ? master_transmit_message ((struct Master *) chn)
2101 : slave_transmit_message ((struct Slave *) chn);
2106 * Queue a message from a channel master for sending to the multicast group.
2109 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
2110 uint16_t first_ptype, uint16_t last_ptype)
2112 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2114 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
2116 tmit_msg->id = ++mst->max_message_id;
2117 struct GNUNET_PSYC_MessageMethod *pmeth
2118 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2120 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2122 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2124 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2126 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2127 "%p master_queue_message: state_delta=%" PRIu64 "\n",
2128 mst, tmit_msg->id - mst->max_state_message_id);
2129 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2130 - mst->max_state_message_id);
2131 mst->max_state_message_id = tmit_msg->id;
2135 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2136 "%p master_queue_message: state not modified\n", mst);
2137 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2140 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2142 /// @todo add state_hash to PSYC header
2149 * Queue a message from a channel slave for sending to the multicast group.
2152 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
2153 uint16_t first_ptype, uint16_t last_ptype)
2155 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
2157 struct GNUNET_PSYC_MessageMethod *pmeth
2158 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2159 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2160 tmit_msg->id = ++slv->max_request_id;
2166 * Queue PSYC message parts for sending to multicast.
2168 * @param chn Channel to send to.
2169 * @param client Client the message originates from.
2170 * @param data_size Size of @a data.
2171 * @param data Concatenated message parts.
2172 * @param first_ptype First message part type in @a data.
2173 * @param last_ptype Last message part type in @a data.
2175 static struct TransmitMessage *
2176 queue_message (struct Channel *chn,
2177 struct GNUNET_SERVER_Client *client,
2180 uint16_t first_ptype, uint16_t last_ptype)
2182 struct TransmitMessage *
2183 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2184 memcpy (&tmit_msg[1], data, data_size);
2185 tmit_msg->client = client;
2186 tmit_msg->size = data_size;
2187 tmit_msg->state = chn->tmit_state;
2189 /* FIXME: separate queue per message ID */
2191 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2194 ? master_queue_message ((struct Master *) chn, tmit_msg,
2195 first_ptype, last_ptype)
2196 : slave_queue_message ((struct Slave *) chn, tmit_msg,
2197 first_ptype, last_ptype);
2203 * Cancel transmission of current message.
2205 * @param chn Channel to send to.
2206 * @param client Client the message originates from.
2209 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2211 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2213 struct GNUNET_MessageHeader msg;
2214 msg.size = htons (sizeof (msg));
2215 msg.type = htons (type);
2217 queue_message (chn, client, sizeof (msg), &msg, type, type);
2218 transmit_message (chn);
2220 /* FIXME: cleanup */
2225 * Incoming message from a master or slave client.
2228 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2229 const struct GNUNET_MessageHeader *msg)
2232 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2233 GNUNET_assert (NULL != chn);
2235 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2236 "%p Received message from client.\n", chn);
2237 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2239 if (GNUNET_YES != chn->is_ready)
2241 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2242 "%p Channel is not ready yet, disconnecting client.\n", chn);
2244 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2248 uint16_t size = ntohs (msg->size);
2249 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2251 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2252 "%p Message payload too large: %u < %u.\n",
2253 chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg));
2255 transmit_cancel (chn, client);
2256 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2260 uint16_t first_ptype = 0, last_ptype = 0;
2262 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2263 (const char *) &msg[1],
2264 &first_ptype, &last_ptype))
2266 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2267 "%p Received invalid message part from client.\n", chn);
2269 transmit_cancel (chn, client);
2270 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2273 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2274 "%p Received message with first part type %u and last part type %u.\n",
2275 chn, first_ptype, last_ptype);
2277 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2278 first_ptype, last_ptype);
2279 transmit_message (chn);
2280 /* FIXME: send a few ACKs even before transmit_notify is called */
2282 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2287 * Received result of GNUNET_PSYCSTORE_membership_store()
2290 store_recv_membership_store_result (void *cls, int64_t result,
2291 const char *err_msg, uint16_t err_msg_size)
2293 struct Operation *op = cls;
2294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2295 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2296 op->chn, result, err_msg_size, err_msg);
2298 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2304 * Client requests to add/remove a slave in the membership database.
2307 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2308 const struct GNUNET_MessageHeader *msg)
2311 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2312 GNUNET_assert (NULL != chn);
2314 const struct ChannelMembershipStoreRequest *
2315 req = (const struct ChannelMembershipStoreRequest *) msg;
2317 struct Operation *op = op_add (chn, client, req->op_id, 0);
2319 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2320 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2321 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2322 "%p Received membership store request from client.\n", chn);
2323 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2324 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2325 chn, req->did_join, announced_at, effective_since);
2327 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2328 req->did_join, announced_at, effective_since,
2329 0, /* FIXME: group_generation */
2330 &store_recv_membership_store_result, op);
2331 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2336 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2337 * in response to a history request from a client.
2340 store_recv_fragment_history (void *cls,
2341 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2342 enum GNUNET_PSYCSTORE_MessageFlags flags)
2344 struct Operation *op = cls;
2345 if (NULL == op->client)
2346 { /* Requesting client already disconnected. */
2349 struct Channel *chn = op->chn;
2351 struct GNUNET_PSYC_MessageHeader *pmsg;
2352 uint16_t msize = ntohs (mmsg->header.size);
2353 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2355 struct GNUNET_OperationResultMessage *
2356 res = GNUNET_malloc (sizeof (*res) + psize);
2357 res->header.size = htons (sizeof (*res) + psize);
2358 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2359 res->op_id = op->op_id;
2360 res->result_code = GNUNET_htonll (GNUNET_OK);
2362 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2363 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2364 memcpy (&res[1], pmsg, psize);
2366 /** @todo FIXME: send only to requesting client */
2367 client_send_msg (chn, &res->header);
2373 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2374 * in response to a history request from a client.
2377 store_recv_fragment_history_result (void *cls, int64_t result,
2378 const char *err_msg, uint16_t err_msg_size)
2380 struct Operation *op = cls;
2381 if (NULL == op->client)
2382 { /* Requesting client already disconnected. */
2386 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2387 "%p History replay #%" PRIu64 ": "
2388 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2389 op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2391 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2393 /** @todo Multicast replay request for messages not found locally. */
2396 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2402 * Client requests channel history.
2405 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2406 const struct GNUNET_MessageHeader *msg)
2409 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2410 GNUNET_assert (NULL != chn);
2412 const struct GNUNET_PSYC_HistoryRequestMessage *
2413 req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2414 uint16_t size = ntohs (msg->size);
2415 const char *method_prefix = (const char *) &req[1];
2417 if (size < sizeof (*req) + 1
2418 || '\0' != method_prefix[size - sizeof (*req) - 1])
2420 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2421 "%p History replay #%" PRIu64 ": "
2422 "invalid method prefix. size: %u < %u?\n",
2423 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2425 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2429 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2431 if (0 == req->message_limit)
2432 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2433 GNUNET_ntohll (req->start_message_id),
2434 GNUNET_ntohll (req->end_message_id),
2436 &store_recv_fragment_history,
2437 &store_recv_fragment_history_result, op);
2439 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2440 GNUNET_ntohll (req->message_limit),
2442 &store_recv_fragment_history,
2443 &store_recv_fragment_history_result,
2446 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2451 * Received state var from PSYCstore, send it to client.
2454 store_recv_state_var (void *cls, const char *name,
2455 const void *value, uint32_t value_size)
2457 struct Operation *op = cls;
2458 struct GNUNET_OperationResultMessage *res;
2460 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2461 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2462 op->chn, GNUNET_ntohll (op->op_id), name);
2464 if (NULL != name) /* First part */
2466 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2467 struct GNUNET_PSYC_MessageModifier *mod;
2468 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2469 res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2470 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2471 res->op_id = op->op_id;
2473 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2474 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2475 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2476 mod->name_size = htons (name_size);
2477 mod->value_size = htonl (value_size);
2478 mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2479 memcpy (&mod[1], name, name_size);
2480 memcpy (((char *) &mod[1]) + name_size, value, value_size);
2482 else /* Continuation */
2484 struct GNUNET_MessageHeader *mod;
2485 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2486 res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2487 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2488 res->op_id = op->op_id;
2490 mod = (struct GNUNET_MessageHeader *) &res[1];
2491 mod->size = htons (sizeof (*mod) + value_size);
2492 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2493 memcpy (&mod[1], value, value_size);
2496 // FIXME: client might have been disconnected
2497 GNUNET_SERVER_notification_context_add (nc, op->client);
2498 GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2505 * Received result of GNUNET_PSYCSTORE_state_get()
2506 * or GNUNET_PSYCSTORE_state_get_prefix()
2509 store_recv_state_result (void *cls, int64_t result,
2510 const char *err_msg, uint16_t err_msg_size)
2512 struct Operation *op = cls;
2513 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2514 "%p state_get #%" PRIu64 ": "
2515 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2516 op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2518 // FIXME: client might have been disconnected
2519 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2525 * Client requests best matching state variable from PSYCstore.
2528 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2529 const struct GNUNET_MessageHeader *msg)
2532 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2533 GNUNET_assert (NULL != chn);
2535 const struct StateRequest *
2536 req = (const struct StateRequest *) msg;
2538 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2539 const char *name = (const char *) &req[1];
2540 if (0 == name_size || '\0' != name[name_size - 1])
2543 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2547 struct Operation *op = op_add (chn, client, req->op_id, 0);
2548 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2549 &store_recv_state_var,
2550 &store_recv_state_result, op);
2551 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2556 * Client requests state variables with a given prefix from PSYCstore.
2559 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2560 const struct GNUNET_MessageHeader *msg)
2563 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2564 GNUNET_assert (NULL != chn);
2566 const struct StateRequest *
2567 req = (const struct StateRequest *) msg;
2569 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2570 const char *name = (const char *) &req[1];
2571 if (0 == name_size || '\0' != name[name_size - 1])
2574 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2578 struct Operation *op = op_add (chn, client, req->op_id, 0);
2579 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2580 &store_recv_state_var,
2581 &store_recv_state_result, op);
2582 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2586 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2587 { &client_recv_master_start, NULL,
2588 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2590 { &client_recv_slave_join, NULL,
2591 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2593 { &client_recv_join_decision, NULL,
2594 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2596 { &client_recv_psyc_message, NULL,
2597 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2599 { &client_recv_membership_store, NULL,
2600 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2602 { &client_recv_history_replay, NULL,
2603 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2605 { &client_recv_state_get, NULL,
2606 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2608 { &client_recv_state_get_prefix, NULL,
2609 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2611 { NULL, NULL, 0, 0 }
2616 * Initialize the PSYC service.
2618 * @param cls Closure.
2619 * @param server The initialized server.
2620 * @param c Configuration to use.
2623 run (void *cls, struct GNUNET_SERVER_Handle *server,
2624 const struct GNUNET_CONFIGURATION_Handle *c)
2627 store = GNUNET_PSYCSTORE_connect (cfg);
2628 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2629 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2630 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2631 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2632 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2633 nc = GNUNET_SERVER_notification_context_create (server, 1);
2634 GNUNET_SERVER_add_handlers (server, server_handlers);
2635 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2636 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2637 &shutdown_task, NULL);
2642 * The main function for the service.
2644 * @param argc number of arguments from the command line
2645 * @param argv command line arguments
2646 * @return 0 ok, 1 on error
2649 main (int argc, char *const *argv)
2651 return (GNUNET_OK ==
2652 GNUNET_SERVICE_run (argc, argv, "psyc",
2653 GNUNET_SERVICE_OPTION_NONE,
2654 &run, NULL)) ? 0 : 1;
2657 /* end of gnunet-service-psyc.c */