2 * This file is part of GNUnet
3 * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
42 * Handle to our current configuration.
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
47 * Handle to the statistics service.
49 static struct GNUNET_STATISTICS_Handle *stats;
52 * Notification context, simplifies client broadcasts.
54 static struct GNUNET_SERVER_NotificationContext *nc;
57 * Handle to the PSYCstore.
59 static struct GNUNET_PSYCSTORE_Handle *store;
62 * All connected masters.
63 * Channel's pub_key_hash -> struct Master
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
68 * All connected slaves.
69 * Channel's pub_key_hash -> struct Slave
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
74 * Connected slaves per channel.
75 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
81 * Message in the transmission queue.
83 struct TransmitMessage
85 struct TransmitMessage *prev;
86 struct TransmitMessage *next;
88 struct GNUNET_SERVER_Client *client;
91 * ID assigned to the message.
101 * Type of first message part.
103 uint16_t first_ptype;
106 * Type of last message part.
111 * @see enum MessageState
116 * Whether a message ACK has already been sent to the client.
117 * #GNUNET_YES or #GNUNET_NO
121 /* Followed by message */
126 * Cache for received message fragments.
127 * Message fragments are only sent to clients after all modifiers arrived.
129 * chan_key -> MultiHashMap chan_msgs
131 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
135 * Entry in the chan_msgs hashmap of @a recv_cache:
136 * fragment_id -> RecvCacheEntry
138 struct RecvCacheEntry
140 struct GNUNET_MULTICAST_MessageHeader *mmsg;
146 * Entry in the @a recv_frags hash map of a @a Channel.
147 * message_id -> FragmentQueue
152 * Fragment IDs stored in @a recv_cache.
154 struct GNUNET_CONTAINER_Heap *fragments;
157 * Total size of received fragments.
162 * Total size of received header fragments (METHOD & MODIFIERs)
164 uint64_t header_size;
167 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
169 uint64_t state_delta;
172 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
177 * Receive state of message.
179 * @see MessageFragmentState
184 * Whether the state is already modified in PSYCstore.
186 uint8_t state_is_modified;
189 * Is the message queued for delivery to the client?
190 * i.e. added to the recv_msgs queue
197 * List of connected clients.
204 struct GNUNET_SERVER_Client *client;
210 struct Operation *prev;
211 struct Operation *next;
213 struct GNUNET_SERVER_Client *client;
221 * Common part of the client context for both a channel master and slave.
225 struct Client *clients_head;
226 struct Client *clients_tail;
228 struct Operation *op_head;
229 struct Operation *op_tail;
231 struct TransmitMessage *tmit_head;
232 struct TransmitMessage *tmit_tail;
235 * Current PSYCstore operation.
237 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
240 * Received fragments not yet sent to the client.
241 * message_id -> FragmentQueue
243 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
246 * Received message IDs not yet sent to the client.
248 struct GNUNET_CONTAINER_Heap *recv_msgs;
251 * Public key of the channel.
253 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
256 * Hash of @a pub_key.
258 struct GNUNET_HashCode pub_key_hash;
261 * Last message ID sent to the client.
262 * 0 if there is no such message.
264 uint64_t max_message_id;
267 * ID of the last stateful message, where the state operations has been
268 * processed and saved to PSYCstore and which has been sent to the client.
269 * 0 if there is no such message.
271 uint64_t max_state_message_id;
274 * Expected value size for the modifier being received from the PSYC service.
276 uint32_t tmit_mod_value_size_expected;
279 * Actual value size for the modifier being received from the PSYC service.
281 uint32_t tmit_mod_value_size;
284 * @see enum MessageState
289 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
294 * Is this channel ready to receive messages from client?
295 * #GNUNET_YES or #GNUNET_NO
300 * Is the client disconnected?
301 * #GNUNET_YES or #GNUNET_NO
303 uint8_t is_disconnected;
308 * Client context for a channel master.
313 * Channel struct common for Master and Slave
318 * Private key of the channel.
320 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
323 * Handle for the multicast origin.
325 struct GNUNET_MULTICAST_Origin *origin;
328 * Transmit handle for multicast.
330 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
333 * Incoming join requests from multicast.
334 * member_key -> struct GNUNET_MULTICAST_JoinHandle *
336 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
339 * Last message ID transmitted to this channel.
341 * Incremented before sending a message, thus the message_id in messages sent
344 uint64_t max_message_id;
347 * ID of the last message with state operations transmitted to the channel.
348 * 0 if there is no such message.
350 uint64_t max_state_message_id;
353 * Maximum group generation transmitted to the channel.
355 uint64_t max_group_generation;
358 * @see enum GNUNET_PSYC_Policy
360 enum GNUNET_PSYC_Policy policy;
365 * Client context for a channel slave.
370 * Channel struct common for Master and Slave
375 * Private key of the slave.
377 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
380 * Public key of the slave.
382 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
385 * Hash of @a pub_key.
387 struct GNUNET_HashCode pub_key_hash;
390 * Handle for the multicast member.
392 struct GNUNET_MULTICAST_Member *member;
395 * Transmit handle for multicast.
397 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
400 * Peer identity of the origin.
402 struct GNUNET_PeerIdentity origin;
405 * Number of items in @a relays.
407 uint32_t relay_count;
410 * Relays that multicast can use to connect.
412 struct GNUNET_PeerIdentity *relays;
415 * Join request to be transmitted to the master on join.
417 struct GNUNET_PSYC_Message *join_msg;
420 * Join decision received from multicast.
422 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
425 * Maximum request ID for this channel.
427 uint64_t max_request_id;
432 transmit_message (struct Channel *chn);
435 message_queue_run (struct Channel *chn);
438 message_queue_drop (struct Channel *chn);
442 * Task run during shutdown.
448 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
452 GNUNET_SERVER_notification_context_destroy (nc);
457 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
463 static struct Operation *
464 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
465 uint64_t op_id, uint32_t flags)
467 struct Operation *op = GNUNET_malloc (sizeof (*op));
472 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
478 op_remove (struct Operation *op)
480 GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
486 * Clean up master data structures after a client disconnected.
489 cleanup_master (struct Master *mst)
491 struct Channel *chn = &mst->chn;
493 if (NULL != mst->origin)
494 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
495 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
496 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
501 * Clean up slave data structures after a client disconnected.
504 cleanup_slave (struct Slave *slv)
506 struct Channel *chn = &slv->chn;
507 struct GNUNET_CONTAINER_MultiHashMap *
508 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
510 GNUNET_assert (NULL != chn_slv);
511 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
513 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
515 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
517 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
519 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
521 if (NULL != slv->join_msg)
523 GNUNET_free (slv->join_msg);
524 slv->join_msg = NULL;
526 if (NULL != slv->relays)
528 GNUNET_free (slv->relays);
531 if (NULL != slv->member)
533 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
536 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
541 * Clean up channel data structures after a client disconnected.
544 cleanup_channel (struct Channel *chn)
546 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
547 "%p Cleaning up channel %s. master? %u\n",
548 chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
549 message_queue_drop (chn);
550 GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
551 chn->recv_frags = NULL;
553 if (NULL != chn->store_op)
555 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
556 chn->store_op = NULL;
559 (GNUNET_YES == chn->is_master)
560 ? cleanup_master ((struct Master *) chn)
561 : cleanup_slave ((struct Slave *) chn);
567 * Called whenever a client is disconnected.
568 * Frees our resources associated with that client.
570 * @param cls Closure.
571 * @param client Identification of the client.
574 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
580 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
584 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
585 "%p User context is NULL in client_disconnect()\n", chn);
590 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
591 "%p Client (%s) disconnected from channel %s\n",
592 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
593 GNUNET_h2s (&chn->pub_key_hash));
595 struct Client *cli = chn->clients_head;
598 if (cli->client == client)
600 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
607 struct Operation *op = chn->op_head;
610 if (op->client == client)
618 if (NULL == chn->clients_head)
619 { /* Last client disconnected. */
620 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
621 "%p Last client (%s) disconnected from channel %s\n",
622 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
623 GNUNET_h2s (&chn->pub_key_hash));
624 chn->is_disconnected = GNUNET_YES;
625 if (NULL != chn->tmit_head)
626 { /* Send pending messages to multicast before cleanup. */
627 transmit_message (chn);
631 cleanup_channel (chn);
638 * Send message to all clients connected to the channel.
641 client_send_msg (const struct Channel *chn,
642 const struct GNUNET_MessageHeader *msg)
644 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
645 "%p Sending message to clients.\n", chn);
647 struct Client *cli = chn->clients_head;
650 GNUNET_SERVER_notification_context_add (nc, cli->client);
651 GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
658 * Send a result code back to the client.
661 * Client that should receive the result code.
665 * Operation ID in network byte order.
667 * Data payload or NULL.
672 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
673 int64_t result_code, const void *data, uint16_t data_size)
675 struct GNUNET_OperationResultMessage *res;
677 res = GNUNET_malloc (sizeof (*res) + data_size);
678 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
679 res->header.size = htons (sizeof (*res) + data_size);
680 res->result_code = GNUNET_htonll (result_code);
683 memcpy (&res[1], data, data_size);
685 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686 "%p Sending result to client for operation #%" PRIu64 ": "
687 "%" PRId64 " (size: %u)\n",
688 client, GNUNET_ntohll (op_id), result_code, data_size);
690 GNUNET_SERVER_notification_context_add (nc, client);
691 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
698 * Closure for join_mem_test_cb()
700 struct JoinMemTestClosure
702 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
704 struct GNUNET_MULTICAST_JoinHandle *jh;
705 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
710 * Membership test result callback used for join requests.
713 join_mem_test_cb (void *cls, int64_t result,
714 const char *err_msg, uint16_t err_msg_size)
716 struct JoinMemTestClosure *jcls = cls;
718 if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
719 { /* Pass on join request to client if this is a master channel */
720 struct Master *mst = (struct Master *) jcls->chn;
721 struct GNUNET_HashCode slave_key_hash;
722 GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
724 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
725 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
726 client_send_msg (jcls->chn, &jcls->join_msg->header);
730 if (GNUNET_SYSERR == result)
732 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
733 "Could not perform membership test (%.*s)\n",
734 err_msg_size, err_msg);
737 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
739 GNUNET_free (jcls->join_msg);
745 * Incoming join request from multicast.
748 mcast_recv_join_request (void *cls,
749 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
750 const struct GNUNET_MessageHeader *join_msg,
751 struct GNUNET_MULTICAST_JoinHandle *jh)
753 struct Channel *chn = cls;
754 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
756 uint16_t join_msg_size = 0;
757 if (NULL != join_msg)
759 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
761 join_msg_size = ntohs (join_msg->size);
765 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
766 "%p Got join message with invalid type %u.\n",
767 chn, ntohs (join_msg->type));
771 struct GNUNET_PSYC_JoinRequestMessage *
772 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
773 req->header.size = htons (sizeof (*req) + join_msg_size);
774 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
775 req->slave_key = *slave_key;
776 if (0 < join_msg_size)
777 memcpy (&req[1], join_msg, join_msg_size);
779 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
780 jcls->slave_key = *slave_key;
783 jcls->join_msg = req;
785 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
786 chn->max_message_id, 0,
787 &join_mem_test_cb, jcls);
792 * Join decision received from multicast.
795 mcast_recv_join_decision (void *cls, int is_admitted,
796 const struct GNUNET_PeerIdentity *peer,
797 uint16_t relay_count,
798 const struct GNUNET_PeerIdentity *relays,
799 const struct GNUNET_MessageHeader *join_resp)
801 struct Slave *slv = cls;
802 struct Channel *chn = &slv->chn;
803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
804 "%p Got join decision: %d\n", slv, is_admitted);
805 if (GNUNET_YES == chn->is_ready)
807 /* Already admitted */
811 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
812 struct GNUNET_PSYC_JoinDecisionMessage *
813 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
814 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
815 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
816 dcsn->is_admitted = htonl (is_admitted);
817 if (0 < join_resp_size)
818 memcpy (&dcsn[1], join_resp, join_resp_size);
820 client_send_msg (chn, &dcsn->header);
822 if (GNUNET_YES == is_admitted)
824 chn->is_ready = GNUNET_YES;
830 store_recv_fragment_replay (void *cls,
831 struct GNUNET_MULTICAST_MessageHeader *msg,
832 enum GNUNET_PSYCSTORE_MessageFlags flags)
834 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
836 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
842 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
845 store_recv_fragment_replay_result (void *cls, int64_t result,
846 const char *err_msg, uint16_t err_msg_size)
848 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
849 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
851 rh, result, err_msg_size, err_msg);
859 GNUNET_MULTICAST_replay_response (rh, NULL,
860 GNUNET_MULTICAST_REC_NOT_FOUND);
863 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
864 GNUNET_MULTICAST_replay_response (rh, NULL,
865 GNUNET_MULTICAST_REC_ACCESS_DENIED);
869 GNUNET_MULTICAST_replay_response (rh, NULL,
870 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
873 GNUNET_MULTICAST_replay_response_end (rh);
878 * Incoming fragment replay request from multicast.
881 mcast_recv_replay_fragment (void *cls,
882 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
883 uint64_t fragment_id, uint64_t flags,
884 struct GNUNET_MULTICAST_ReplayHandle *rh)
887 struct Channel *chn = cls;
888 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
889 fragment_id, fragment_id,
890 &store_recv_fragment_replay,
891 &store_recv_fragment_replay_result, rh);
896 * Incoming message replay request from multicast.
899 mcast_recv_replay_message (void *cls,
900 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
902 uint64_t fragment_offset,
904 struct GNUNET_MULTICAST_ReplayHandle *rh)
906 struct Channel *chn = cls;
907 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
908 message_id, message_id, 1, NULL,
909 &store_recv_fragment_replay,
910 &store_recv_fragment_replay_result, rh);
915 * Convert an uint64_t in network byte order to a HashCode
916 * that can be used as key in a MultiHashMap
919 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
921 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
922 /* TODO: use built-in byte swap functions if available */
924 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
925 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
927 *key = (struct GNUNET_HashCode) {};
929 = (n << 32) | (n >> 32);
934 * Convert an uint64_t in host byte order to a HashCode
935 * that can be used as key in a MultiHashMap
938 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
940 #if __BYTE_ORDER == __BIG_ENDIAN
941 hash_key_from_nll (key, n);
942 #elif __BYTE_ORDER == __LITTLE_ENDIAN
943 *key = (struct GNUNET_HashCode) {};
944 *((uint64_t *) key) = n;
946 #error byteorder undefined
952 * Initialize PSYC message header.
955 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
956 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
958 uint16_t size = ntohs (mmsg->header.size);
959 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
961 pmsg->header.size = htons (psize);
962 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
963 pmsg->message_id = mmsg->message_id;
964 pmsg->fragment_offset = mmsg->fragment_offset;
965 pmsg->flags = htonl (flags);
967 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
972 * Create a new PSYC message from a multicast message for sending it to clients.
974 static inline struct GNUNET_PSYC_MessageHeader *
975 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
977 struct GNUNET_PSYC_MessageHeader *pmsg;
978 uint16_t size = ntohs (mmsg->header.size);
979 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
981 pmsg = GNUNET_malloc (psize);
982 psyc_msg_init (pmsg, mmsg, flags);
988 * Send multicast message to all clients connected to the channel.
991 client_send_mcast_msg (struct Channel *chn,
992 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
995 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
996 "%p Sending multicast message to client. "
997 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
998 chn, GNUNET_ntohll (mmsg->fragment_id),
999 GNUNET_ntohll (mmsg->message_id));
1001 struct GNUNET_PSYC_MessageHeader *
1002 pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1003 client_send_msg (chn, &pmsg->header);
1009 * Send multicast request to all clients connected to the channel.
1012 client_send_mcast_req (struct Master *mst,
1013 const struct GNUNET_MULTICAST_RequestHeader *req)
1015 struct Channel *chn = &mst->chn;
1017 struct GNUNET_PSYC_MessageHeader *pmsg;
1018 uint16_t size = ntohs (req->header.size);
1019 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1021 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022 "%p Sending multicast request to client. "
1023 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1024 chn, GNUNET_ntohll (req->fragment_id),
1025 GNUNET_ntohll (req->request_id));
1027 pmsg = GNUNET_malloc (psize);
1028 pmsg->header.size = htons (psize);
1029 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1030 pmsg->message_id = req->request_id;
1031 pmsg->fragment_offset = req->fragment_offset;
1032 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1033 pmsg->slave_key = req->member_key;
1035 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1036 client_send_msg (chn, &pmsg->header);
1042 * Insert a multicast message fragment into the queue belonging to the message.
1044 * @param chn Channel.
1045 * @param mmsg Multicast message fragment.
1046 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
1047 * @param first_ptype First PSYC message part type in @a mmsg.
1048 * @param last_ptype Last PSYC message part type in @a mmsg.
1051 fragment_queue_insert (struct Channel *chn,
1052 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1053 uint16_t first_ptype, uint16_t last_ptype)
1055 const uint16_t size = ntohs (mmsg->header.size);
1056 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1057 struct GNUNET_CONTAINER_MultiHashMap
1058 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1059 &chn->pub_key_hash);
1061 struct GNUNET_HashCode msg_id_hash;
1062 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1064 struct FragmentQueue
1065 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1069 fragq = GNUNET_new (struct FragmentQueue);
1070 fragq->state = MSG_FRAG_STATE_HEADER;
1072 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1074 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1075 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1077 if (NULL == chan_msgs)
1079 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1080 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1081 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1085 struct GNUNET_HashCode frag_id_hash;
1086 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1087 struct RecvCacheEntry
1088 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1089 if (NULL == cache_entry)
1091 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1092 "%p Adding message fragment to cache. "
1093 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1094 chn, GNUNET_ntohll (mmsg->message_id),
1095 GNUNET_ntohll (mmsg->fragment_id));
1096 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1097 "%p header_size: %" PRIu64 " + %u\n",
1098 chn, fragq->header_size, size);
1099 cache_entry = GNUNET_new (struct RecvCacheEntry);
1100 cache_entry->ref_count = 1;
1101 cache_entry->mmsg = GNUNET_malloc (size);
1102 memcpy (cache_entry->mmsg, mmsg, size);
1103 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1104 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1108 cache_entry->ref_count++;
1109 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110 "%p Message fragment is already in cache. "
1111 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1112 ", ref_count: %u\n",
1113 chn, GNUNET_ntohll (mmsg->message_id),
1114 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1117 if (MSG_FRAG_STATE_HEADER == fragq->state)
1119 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1121 struct GNUNET_PSYC_MessageMethod *
1122 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1123 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1124 fragq->flags = ntohl (pmeth->flags);
1127 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1129 fragq->header_size += size;
1131 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1132 || frag_offset == fragq->header_size)
1133 { /* header is now complete */
1134 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1135 "%p Header of message %" PRIu64 " is complete.\n",
1136 chn, GNUNET_ntohll (mmsg->message_id));
1138 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1139 "%p Adding message %" PRIu64 " to queue.\n",
1140 chn, GNUNET_ntohll (mmsg->message_id));
1141 fragq->state = MSG_FRAG_STATE_DATA;
1145 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1146 "%p Header of message %" PRIu64 " is NOT complete yet: "
1147 "%" PRIu64 " != %" PRIu64 "\n",
1148 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1149 fragq->header_size);
1155 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1156 if (frag_offset == fragq->size)
1157 fragq->state = MSG_FRAG_STATE_END;
1159 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1160 "%p Message %" PRIu64 " is NOT complete yet: "
1161 "%" PRIu64 " != %" PRIu64 "\n",
1162 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1166 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1167 /* Drop message without delivering to client if it's a single fragment */
1169 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1170 ? MSG_FRAG_STATE_DROP
1171 : MSG_FRAG_STATE_CANCEL;
1174 switch (fragq->state)
1176 case MSG_FRAG_STATE_DATA:
1177 case MSG_FRAG_STATE_END:
1178 case MSG_FRAG_STATE_CANCEL:
1179 if (GNUNET_NO == fragq->is_queued)
1181 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1182 GNUNET_ntohll (mmsg->message_id));
1183 fragq->is_queued = GNUNET_YES;
1187 fragq->size += size;
1188 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1189 GNUNET_ntohll (mmsg->fragment_id));
1194 * Run fragment queue of a message.
1196 * Send fragments of a message in order to client, after all modifiers arrived
1199 * @param chn Channel.
1200 * @param msg_id ID of the message @a fragq belongs to.
1201 * @param fragq Fragment queue of the message.
1202 * @param drop Drop message without delivering to client?
1203 * #GNUNET_YES or #GNUNET_NO.
1206 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1207 struct FragmentQueue *fragq, uint8_t drop)
1209 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1210 "%p Running message fragment queue for message %" PRIu64
1212 chn, msg_id, fragq->state);
1214 struct GNUNET_CONTAINER_MultiHashMap
1215 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1216 &chn->pub_key_hash);
1217 GNUNET_assert (NULL != chan_msgs);
1220 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1223 struct GNUNET_HashCode frag_id_hash;
1224 hash_key_from_hll (&frag_id_hash, frag_id);
1225 struct RecvCacheEntry *cache_entry
1226 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1227 if (cache_entry != NULL)
1229 if (GNUNET_NO == drop)
1231 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1233 if (cache_entry->ref_count <= 1)
1235 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1237 GNUNET_free (cache_entry->mmsg);
1238 GNUNET_free (cache_entry);
1242 cache_entry->ref_count--;
1245 #if CACHE_AGING_IMPLEMENTED
1246 else if (GNUNET_NO == drop)
1248 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1252 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1255 if (MSG_FRAG_STATE_END <= fragq->state)
1257 struct GNUNET_HashCode msg_id_hash;
1258 hash_key_from_hll (&msg_id_hash, msg_id);
1260 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1261 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1262 GNUNET_free (fragq);
1266 fragq->is_queued = GNUNET_NO;
1271 struct StateModifyClosure
1273 struct Channel *chn;
1275 struct GNUNET_HashCode msg_id_hash;
1280 store_recv_state_modify_result (void *cls, int64_t result,
1281 const char *err_msg, uint16_t err_msg_size)
1283 struct StateModifyClosure *mcls = cls;
1284 struct Channel *chn = mcls->chn;
1285 uint64_t msg_id = mcls->msg_id;
1287 struct FragmentQueue *
1288 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1290 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1291 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1292 chn, result, err_msg_size, err_msg);
1299 fragq->state_is_modified = GNUNET_YES;
1300 if (chn->max_state_message_id < msg_id)
1301 chn->max_state_message_id = msg_id;
1302 if (chn->max_message_id < msg_id)
1303 chn->max_message_id = msg_id;
1306 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1307 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1308 message_queue_run (chn);
1312 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1313 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1314 chn, result, err_msg_size, err_msg);
1315 /** @todo FIXME: handle state_modify error */
1321 * Run message queue.
1323 * Send messages in queue to client in order after a message has arrived from
1324 * multicast, according to the following:
1325 * - A message is only sent if all of its modifiers arrived.
1326 * - A stateful message is only sent if the previous stateful message
1327 * has already been delivered to the client.
1329 * @param chn Channel.
1331 * @return Number of messages removed from queue and sent to client.
1334 message_queue_run (struct Channel *chn)
1336 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1337 "%p Running message queue.\n", chn);
1341 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1344 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1345 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1346 struct GNUNET_HashCode msg_id_hash;
1347 hash_key_from_hll (&msg_id_hash, msg_id);
1349 struct FragmentQueue *
1350 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1352 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1354 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1355 "%p No fragq (%p) or header not complete.\n",
1360 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1361 "%p Fragment queue entry: state: %u, state delta: "
1362 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1363 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1365 if (MSG_FRAG_STATE_DATA <= fragq->state)
1367 /* Check if there's a missing message before the current one */
1368 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1370 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1372 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1373 && (chn->max_message_id != msg_id - 1
1374 && chn->max_message_id != msg_id))
1376 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1377 "%p Out of order message. "
1378 "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1379 chn, chn->max_message_id, msg_id);
1381 // FIXME: keep track of messages processed in this queue run,
1382 // and only stop after reaching the end
1387 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1388 if (GNUNET_YES != fragq->state_is_modified)
1390 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1392 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1393 "%p Out of order stateful message. "
1394 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1395 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1397 // FIXME: keep track of messages processed in this queue run,
1398 // and only stop after reaching the end
1401 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1403 mcls->msg_id = msg_id;
1404 mcls->msg_id_hash = msg_id_hash;
1406 /* Apply modifiers to state in PSYCstore */
1407 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1409 store_recv_state_modify_result, mcls);
1410 break; // continue after asynchronous state modify result
1413 chn->max_message_id = msg_id;
1415 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1416 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1421 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1427 * Drop message queue of a channel.
1429 * Remove all messages in queue without sending it to clients.
1431 * @param chn Channel.
1433 * @return Number of messages removed from queue.
1436 message_queue_drop (struct Channel *chn)
1438 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1439 "%p Dropping message queue.\n", chn);
1442 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1445 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1446 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1447 struct GNUNET_HashCode msg_id_hash;
1448 hash_key_from_hll (&msg_id_hash, msg_id);
1450 struct FragmentQueue *
1451 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1452 GNUNET_assert (NULL != fragq);
1453 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1454 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1457 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1458 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1464 * Received result of GNUNET_PSYCSTORE_fragment_store().
1467 store_recv_fragment_store_result (void *cls, int64_t result,
1468 const char *err_msg, uint16_t err_msg_size)
1470 struct Channel *chn = cls;
1471 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1472 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1473 chn, result, err_msg_size, err_msg);
1478 * Handle incoming message fragment from multicast.
1480 * Store it using PSYCstore and send it to the clients of the channel in order.
1483 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1485 struct Channel *chn = cls;
1486 uint16_t size = ntohs (mmsg->header.size);
1488 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1489 "%p Received multicast message of size %u.\n",
1492 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1493 &store_recv_fragment_store_result, chn);
1495 uint16_t first_ptype = 0, last_ptype = 0;
1497 == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1498 (const char *) &mmsg[1],
1499 &first_ptype, &last_ptype))
1501 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1502 "%p Dropping incoming multicast message with invalid parts.\n",
1504 GNUNET_break_op (0);
1508 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1509 "Message parts: first: type %u, last: type %u\n",
1510 first_ptype, last_ptype);
1512 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1513 message_queue_run (chn);
1518 * Incoming request fragment from multicast for a master.
1520 * @param cls Master.
1521 * @param req The request.
1524 mcast_recv_request (void *cls,
1525 const struct GNUNET_MULTICAST_RequestHeader *req)
1527 struct Master *mst = cls;
1528 uint16_t size = ntohs (req->header.size);
1530 char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_key);
1531 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1532 "%p Received multicast request of size %u from %s.\n",
1536 uint16_t first_ptype = 0, last_ptype = 0;
1538 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1539 (const char *) &req[1],
1540 &first_ptype, &last_ptype))
1542 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1543 "%p Dropping incoming multicast request with invalid parts.\n",
1545 GNUNET_break_op (0);
1549 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1550 "Message parts: first: type %u, last: type %u\n",
1551 first_ptype, last_ptype);
1553 /* FIXME: in-order delivery */
1554 client_send_mcast_req (mst, req);
1559 * Response from PSYCstore with the current counter values for a channel master.
1562 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1563 uint64_t max_message_id, uint64_t max_group_generation,
1564 uint64_t max_state_message_id)
1566 struct Master *mst = cls;
1567 struct Channel *chn = &mst->chn;
1568 chn->store_op = NULL;
1570 struct GNUNET_PSYC_CountersResultMessage res;
1571 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1572 res.header.size = htons (sizeof (res));
1573 res.result_code = htonl (result);
1574 res.max_message_id = GNUNET_htonll (max_message_id);
1576 if (GNUNET_OK == result || GNUNET_NO == result)
1578 mst->max_message_id = max_message_id;
1579 chn->max_message_id = max_message_id;
1580 chn->max_state_message_id = max_state_message_id;
1581 mst->max_group_generation = max_group_generation;
1583 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1584 mcast_recv_join_request,
1585 mcast_recv_replay_fragment,
1586 mcast_recv_replay_message,
1588 mcast_recv_message, chn);
1589 chn->is_ready = GNUNET_YES;
1593 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1594 "%p GNUNET_PSYCSTORE_counters_get() "
1595 "returned %d for channel %s.\n",
1596 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1599 client_send_msg (chn, &res.header);
1604 * Response from PSYCstore with the current counter values for a channel slave.
1607 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1608 uint64_t max_message_id, uint64_t max_group_generation,
1609 uint64_t max_state_message_id)
1611 struct Slave *slv = cls;
1612 struct Channel *chn = &slv->chn;
1613 chn->store_op = NULL;
1615 struct GNUNET_PSYC_CountersResultMessage res;
1616 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1617 res.header.size = htons (sizeof (res));
1618 res.result_code = htonl (result);
1619 res.max_message_id = GNUNET_htonll (max_message_id);
1621 if (GNUNET_OK == result || GNUNET_NO == result)
1623 chn->max_message_id = max_message_id;
1624 chn->max_state_message_id = max_state_message_id;
1626 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1628 slv->relay_count, slv->relays,
1629 &slv->join_msg->header,
1630 mcast_recv_join_request,
1631 mcast_recv_join_decision,
1632 mcast_recv_replay_fragment,
1633 mcast_recv_replay_message,
1634 mcast_recv_message, chn);
1635 if (NULL != slv->join_msg)
1637 GNUNET_free (slv->join_msg);
1638 slv->join_msg = NULL;
1643 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1644 "%p GNUNET_PSYCSTORE_counters_get() "
1645 "returned %d for channel %s.\n",
1646 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1649 client_send_msg (chn, &res.header);
1654 channel_init (struct Channel *chn)
1657 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1658 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1663 * Handle a connecting client starting a channel master.
1666 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1667 const struct GNUNET_MessageHeader *msg)
1669 const struct MasterStartRequest *req
1670 = (const struct MasterStartRequest *) msg;
1672 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1673 struct GNUNET_HashCode pub_key_hash;
1675 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1676 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1679 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1680 struct Channel *chn;
1684 mst = GNUNET_new (struct Master);
1685 mst->policy = ntohl (req->policy);
1686 mst->priv_key = req->channel_key;
1687 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1690 chn->is_master = GNUNET_YES;
1691 chn->pub_key = pub_key;
1692 chn->pub_key_hash = pub_key_hash;
1695 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1696 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1697 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1698 store_recv_master_counters, mst);
1704 struct GNUNET_PSYC_CountersResultMessage res;
1705 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1706 res.header.size = htons (sizeof (res));
1707 res.result_code = htonl (GNUNET_OK);
1708 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1710 GNUNET_SERVER_notification_context_add (nc, client);
1711 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1715 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1716 "%p Client connected as master to channel %s.\n",
1717 mst, GNUNET_h2s (&chn->pub_key_hash));
1719 struct Client *cli = GNUNET_new (struct Client);
1720 cli->client = client;
1721 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1723 GNUNET_SERVER_client_set_user_context (client, chn);
1724 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1729 * Handle a connecting client joining as a channel slave.
1732 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1733 const struct GNUNET_MessageHeader *msg)
1735 const struct SlaveJoinRequest *req
1736 = (const struct SlaveJoinRequest *) msg;
1737 uint16_t req_size = ntohs (req->header.size);
1739 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1740 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1742 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1743 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1744 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1746 struct GNUNET_CONTAINER_MultiHashMap *
1747 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1748 struct Slave *slv = NULL;
1749 struct Channel *chn;
1751 if (NULL != chn_slv)
1753 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1757 slv = GNUNET_new (struct Slave);
1758 slv->priv_key = req->slave_key;
1759 slv->pub_key = slv_pub_key;
1760 slv->pub_key_hash = slv_pub_key_hash;
1761 slv->origin = req->origin;
1762 slv->relay_count = ntohl (req->relay_count);
1764 const struct GNUNET_PeerIdentity *
1765 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1766 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1767 uint16_t join_msg_size = 0;
1769 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1772 struct GNUNET_PSYC_Message *
1773 join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1774 join_msg_size = ntohs (join_msg->header.size);
1775 slv->join_msg = GNUNET_malloc (join_msg_size);
1776 memcpy (slv->join_msg, join_msg, join_msg_size);
1778 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1780 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1781 "%u + %u + %u != %u\n",
1782 sizeof (*req), relay_size, join_msg_size, req_size);
1784 GNUNET_SERVER_client_disconnect (client);
1788 if (0 < slv->relay_count)
1790 slv->relays = GNUNET_malloc (relay_size);
1791 memcpy (slv->relays, &req[1], relay_size);
1795 chn->is_master = GNUNET_NO;
1796 chn->pub_key = req->channel_key;
1797 chn->pub_key_hash = pub_key_hash;
1800 if (NULL == chn_slv)
1802 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1803 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1804 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1806 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1807 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1808 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1809 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1810 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1811 &store_recv_slave_counters, slv);
1817 struct GNUNET_PSYC_CountersResultMessage res;
1818 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1819 res.header.size = htons (sizeof (res));
1820 res.result_code = htonl (GNUNET_OK);
1821 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1823 GNUNET_SERVER_notification_context_add (nc, client);
1824 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1827 if (NULL == slv->member)
1830 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1832 slv->relay_count, slv->relays,
1833 &slv->join_msg->header,
1834 &mcast_recv_join_request,
1835 &mcast_recv_join_decision,
1836 &mcast_recv_replay_fragment,
1837 &mcast_recv_replay_message,
1838 &mcast_recv_message, chn);
1839 if (NULL != slv->join_msg)
1841 GNUNET_free (slv->join_msg);
1842 slv->join_msg = NULL;
1845 else if (NULL != slv->join_dcsn)
1847 GNUNET_SERVER_notification_context_add (nc, client);
1848 GNUNET_SERVER_notification_context_unicast (nc, client,
1849 &slv->join_dcsn->header,
1854 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1855 "%p Client connected as slave to channel %s.\n",
1856 slv, GNUNET_h2s (&chn->pub_key_hash));
1858 struct Client *cli = GNUNET_new (struct Client);
1859 cli->client = client;
1860 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1862 GNUNET_SERVER_client_set_user_context (client, chn);
1863 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1867 struct JoinDecisionClosure
1869 int32_t is_admitted;
1870 struct GNUNET_MessageHeader *msg;
1875 * Iterator callback for sending join decisions to multicast.
1878 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1881 struct JoinDecisionClosure *jcls = cls;
1882 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1883 // FIXME: add relays
1884 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1890 * Join decision from client.
1893 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1894 const struct GNUNET_MessageHeader *msg)
1896 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1897 = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1898 struct Channel *chn;
1900 struct JoinDecisionClosure jcls;
1902 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1906 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1909 GNUNET_assert (GNUNET_YES == chn->is_master);
1910 mst = (struct Master *) chn;
1911 jcls.is_admitted = ntohl (dcsn->is_admitted);
1913 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1914 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1917 struct GNUNET_HashCode slave_key_hash;
1918 GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1921 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1922 "%p Got join decision (%d) from client for channel %s..\n",
1923 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1924 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1925 "%p ..and slave %s.\n",
1926 mst, GNUNET_h2s (&slave_key_hash));
1928 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1929 &mcast_send_join_decision, &jcls);
1930 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1931 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1936 * Send acknowledgement to a client.
1938 * Sent after a message fragment has been passed on to multicast.
1940 * @param chn The channel struct for the client.
1943 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1945 struct GNUNET_MessageHeader res;
1946 res.size = htons (sizeof (res));
1947 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1950 GNUNET_SERVER_notification_context_add (nc, client);
1951 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1956 * Callback for the transmit functions of multicast.
1959 transmit_notify (void *cls, size_t *data_size, void *data)
1961 struct Channel *chn = cls;
1962 struct TransmitMessage *tmit_msg = chn->tmit_head;
1964 if (NULL == tmit_msg || *data_size < tmit_msg->size)
1966 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1967 "%p transmit_notify: nothing to send.\n", chn);
1972 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1973 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1975 *data_size = tmit_msg->size;
1976 memcpy (data, &tmit_msg[1], *data_size);
1978 int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1980 if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
1981 send_message_ack (chn, tmit_msg->client);
1983 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1984 GNUNET_free (tmit_msg);
1986 if (NULL != chn->tmit_head)
1988 transmit_message (chn);
1990 else if (GNUNET_YES == chn->is_disconnected
1991 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1993 /* FIXME: handle partial message (when still in_transmit) */
1994 return GNUNET_SYSERR;
2001 * Callback for the transmit functions of multicast.
2004 master_transmit_notify (void *cls, size_t *data_size, void *data)
2006 int ret = transmit_notify (cls, data_size, data);
2008 if (GNUNET_YES == ret)
2010 struct Master *mst = cls;
2011 mst->tmit_handle = NULL;
2018 * Callback for the transmit functions of multicast.
2021 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2023 int ret = transmit_notify (cls, data_size, data);
2025 if (GNUNET_YES == ret)
2027 struct Slave *slv = cls;
2028 slv->tmit_handle = NULL;
2035 * Transmit a message from a channel master to the multicast group.
2038 master_transmit_message (struct Master *mst)
2040 if (NULL == mst->tmit_handle)
2043 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
2044 mst->max_group_generation,
2045 master_transmit_notify, mst);
2049 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2055 * Transmit a message from a channel slave to the multicast group.
2058 slave_transmit_message (struct Slave *slv)
2060 if (NULL == slv->tmit_handle)
2063 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
2064 slave_transmit_notify, slv);
2068 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2074 transmit_message (struct Channel *chn)
2077 ? master_transmit_message ((struct Master *) chn)
2078 : slave_transmit_message ((struct Slave *) chn);
2083 * Queue a message from a channel master for sending to the multicast group.
2086 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2088 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2090 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2092 tmit_msg->id = ++mst->max_message_id;
2093 struct GNUNET_PSYC_MessageMethod *pmeth
2094 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2096 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2098 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2100 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2102 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2103 "%p master_queue_message: state_delta=%" PRIu64 "\n",
2104 mst, tmit_msg->id - mst->max_state_message_id);
2105 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2106 - mst->max_state_message_id);
2107 mst->max_state_message_id = tmit_msg->id;
2111 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2112 "%p master_queue_message: state not modified\n", mst);
2113 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2116 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2118 /// @todo add state_hash to PSYC header
2125 * Queue a message from a channel slave for sending to the multicast group.
2128 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2130 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2132 struct GNUNET_PSYC_MessageMethod *pmeth
2133 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2134 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2135 tmit_msg->id = ++slv->max_request_id;
2141 * Queue PSYC message parts for sending to multicast.
2143 * @param chn Channel to send to.
2144 * @param client Client the message originates from.
2145 * @param data_size Size of @a data.
2146 * @param data Concatenated message parts.
2147 * @param first_ptype First message part type in @a data.
2148 * @param last_ptype Last message part type in @a data.
2150 static struct TransmitMessage *
2151 queue_message (struct Channel *chn,
2152 struct GNUNET_SERVER_Client *client,
2155 uint16_t first_ptype, uint16_t last_ptype)
2157 struct TransmitMessage *
2158 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2159 memcpy (&tmit_msg[1], data, data_size);
2160 tmit_msg->client = client;
2161 tmit_msg->size = data_size;
2162 tmit_msg->state = chn->tmit_state;
2163 tmit_msg->first_ptype = first_ptype;
2164 tmit_msg->last_ptype = last_ptype;
2166 /* FIXME: separate queue per message ID */
2168 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2171 ? master_queue_message ((struct Master *) chn, tmit_msg)
2172 : slave_queue_message ((struct Slave *) chn, tmit_msg);
2178 * Cancel transmission of current message.
2180 * @param chn Channel to send to.
2181 * @param client Client the message originates from.
2184 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2186 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2188 struct GNUNET_MessageHeader msg;
2189 msg.size = htons (sizeof (msg));
2190 msg.type = htons (type);
2192 queue_message (chn, client, sizeof (msg), &msg, type, type);
2193 transmit_message (chn);
2195 /* FIXME: cleanup */
2200 * Incoming message from a master or slave client.
2203 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2204 const struct GNUNET_MessageHeader *msg)
2207 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2208 GNUNET_assert (NULL != chn);
2210 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2211 "%p Received message from client.\n", chn);
2212 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2214 if (GNUNET_YES != chn->is_ready)
2216 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2217 "%p Channel is not ready yet, disconnecting client.\n", chn);
2219 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2223 uint16_t size = ntohs (msg->size);
2224 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2226 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2227 "%p Message payload too large: %u < %u.\n",
2228 chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg));
2230 transmit_cancel (chn, client);
2231 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2235 uint16_t first_ptype = 0, last_ptype = 0;
2237 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2238 (const char *) &msg[1],
2239 &first_ptype, &last_ptype))
2241 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2242 "%p Received invalid message part from client.\n", chn);
2244 transmit_cancel (chn, client);
2245 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2249 "%p Received message with first part type %u and last part type %u.\n",
2250 chn, first_ptype, last_ptype);
2252 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2253 first_ptype, last_ptype);
2254 transmit_message (chn);
2255 /* FIXME: send a few ACKs even before transmit_notify is called */
2257 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2262 * Received result of GNUNET_PSYCSTORE_membership_store()
2265 store_recv_membership_store_result (void *cls, int64_t result,
2266 const char *err_msg, uint16_t err_msg_size)
2268 struct Operation *op = cls;
2269 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2270 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2271 op->chn, result, err_msg_size, err_msg);
2273 if (NULL != op->client)
2274 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2280 * Client requests to add/remove a slave in the membership database.
2283 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2284 const struct GNUNET_MessageHeader *msg)
2287 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2288 GNUNET_assert (NULL != chn);
2290 const struct ChannelMembershipStoreRequest *
2291 req = (const struct ChannelMembershipStoreRequest *) msg;
2293 struct Operation *op = op_add (chn, client, req->op_id, 0);
2295 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2296 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2297 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2298 "%p Received membership store request from client.\n", chn);
2299 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2300 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2301 chn, req->did_join, announced_at, effective_since);
2303 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2304 req->did_join, announced_at, effective_since,
2305 0, /* FIXME: group_generation */
2306 &store_recv_membership_store_result, op);
2307 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2312 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2313 * in response to a history request from a client.
2316 store_recv_fragment_history (void *cls,
2317 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2318 enum GNUNET_PSYCSTORE_MessageFlags flags)
2320 struct Operation *op = cls;
2321 if (NULL == op->client)
2322 { /* Requesting client already disconnected. */
2325 struct Channel *chn = op->chn;
2327 struct GNUNET_PSYC_MessageHeader *pmsg;
2328 uint16_t msize = ntohs (mmsg->header.size);
2329 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2331 struct GNUNET_OperationResultMessage *
2332 res = GNUNET_malloc (sizeof (*res) + psize);
2333 res->header.size = htons (sizeof (*res) + psize);
2334 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2335 res->op_id = op->op_id;
2336 res->result_code = GNUNET_htonll (GNUNET_OK);
2338 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2339 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2340 memcpy (&res[1], pmsg, psize);
2342 /** @todo FIXME: send only to requesting client */
2343 client_send_msg (chn, &res->header);
2349 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2350 * in response to a history request from a client.
2353 store_recv_fragment_history_result (void *cls, int64_t result,
2354 const char *err_msg, uint16_t err_msg_size)
2356 struct Operation *op = cls;
2357 if (NULL == op->client)
2358 { /* Requesting client already disconnected. */
2362 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2363 "%p History replay #%" PRIu64 ": "
2364 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2365 op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2367 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2369 /** @todo Multicast replay request for messages not found locally. */
2372 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2378 * Client requests channel history.
2381 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2382 const struct GNUNET_MessageHeader *msg)
2385 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2386 GNUNET_assert (NULL != chn);
2388 const struct GNUNET_PSYC_HistoryRequestMessage *
2389 req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2390 uint16_t size = ntohs (msg->size);
2391 const char *method_prefix = (const char *) &req[1];
2393 if (size < sizeof (*req) + 1
2394 || '\0' != method_prefix[size - sizeof (*req) - 1])
2396 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2397 "%p History replay #%" PRIu64 ": "
2398 "invalid method prefix. size: %u < %u?\n",
2399 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2401 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2405 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2407 if (0 == req->message_limit)
2408 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2409 GNUNET_ntohll (req->start_message_id),
2410 GNUNET_ntohll (req->end_message_id),
2412 &store_recv_fragment_history,
2413 &store_recv_fragment_history_result, op);
2415 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2416 GNUNET_ntohll (req->message_limit),
2418 &store_recv_fragment_history,
2419 &store_recv_fragment_history_result,
2422 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2427 * Received state var from PSYCstore, send it to client.
2430 store_recv_state_var (void *cls, const char *name,
2431 const void *value, uint32_t value_size)
2433 struct Operation *op = cls;
2434 struct GNUNET_OperationResultMessage *res;
2436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2437 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2438 op->chn, GNUNET_ntohll (op->op_id), name);
2440 if (NULL != name) /* First part */
2442 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2443 struct GNUNET_PSYC_MessageModifier *mod;
2444 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2445 res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2446 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2447 res->op_id = op->op_id;
2449 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2450 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2451 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2452 mod->name_size = htons (name_size);
2453 mod->value_size = htonl (value_size);
2454 mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2455 memcpy (&mod[1], name, name_size);
2456 memcpy (((char *) &mod[1]) + name_size, value, value_size);
2458 else /* Continuation */
2460 struct GNUNET_MessageHeader *mod;
2461 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2462 res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2463 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2464 res->op_id = op->op_id;
2466 mod = (struct GNUNET_MessageHeader *) &res[1];
2467 mod->size = htons (sizeof (*mod) + value_size);
2468 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2469 memcpy (&mod[1], value, value_size);
2472 // FIXME: client might have been disconnected
2473 GNUNET_SERVER_notification_context_add (nc, op->client);
2474 GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2481 * Received result of GNUNET_PSYCSTORE_state_get()
2482 * or GNUNET_PSYCSTORE_state_get_prefix()
2485 store_recv_state_result (void *cls, int64_t result,
2486 const char *err_msg, uint16_t err_msg_size)
2488 struct Operation *op = cls;
2489 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2490 "%p state_get #%" PRIu64 ": "
2491 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2492 op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2494 // FIXME: client might have been disconnected
2495 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2501 * Client requests best matching state variable from PSYCstore.
2504 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2505 const struct GNUNET_MessageHeader *msg)
2508 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2509 GNUNET_assert (NULL != chn);
2511 const struct StateRequest *
2512 req = (const struct StateRequest *) msg;
2514 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2515 const char *name = (const char *) &req[1];
2516 if (0 == name_size || '\0' != name[name_size - 1])
2519 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2523 struct Operation *op = op_add (chn, client, req->op_id, 0);
2524 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2525 &store_recv_state_var,
2526 &store_recv_state_result, op);
2527 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2532 * Client requests state variables with a given prefix from PSYCstore.
2535 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2536 const struct GNUNET_MessageHeader *msg)
2539 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2540 GNUNET_assert (NULL != chn);
2542 const struct StateRequest *
2543 req = (const struct StateRequest *) msg;
2545 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2546 const char *name = (const char *) &req[1];
2547 if (0 == name_size || '\0' != name[name_size - 1])
2550 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2554 struct Operation *op = op_add (chn, client, req->op_id, 0);
2555 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2556 &store_recv_state_var,
2557 &store_recv_state_result, op);
2558 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2562 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2563 { &client_recv_master_start, NULL,
2564 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2566 { &client_recv_slave_join, NULL,
2567 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2569 { &client_recv_join_decision, NULL,
2570 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2572 { &client_recv_psyc_message, NULL,
2573 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2575 { &client_recv_membership_store, NULL,
2576 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2578 { &client_recv_history_replay, NULL,
2579 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2581 { &client_recv_state_get, NULL,
2582 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2584 { &client_recv_state_get_prefix, NULL,
2585 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2587 { NULL, NULL, 0, 0 }
2592 * Initialize the PSYC service.
2594 * @param cls Closure.
2595 * @param server The initialized server.
2596 * @param c Configuration to use.
2599 run (void *cls, struct GNUNET_SERVER_Handle *server,
2600 const struct GNUNET_CONFIGURATION_Handle *c)
2603 store = GNUNET_PSYCSTORE_connect (cfg);
2604 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2605 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2606 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2607 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2608 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2609 nc = GNUNET_SERVER_notification_context_create (server, 1);
2610 GNUNET_SERVER_add_handlers (server, server_handlers);
2611 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2612 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2613 &shutdown_task, NULL);
2618 * The main function for the service.
2620 * @param argc number of arguments from the command line
2621 * @param argv command line arguments
2622 * @return 0 ok, 1 on error
2625 main (int argc, char *const *argv)
2627 return (GNUNET_OK ==
2628 GNUNET_SERVICE_run (argc, argv, "psyc",
2629 GNUNET_SERVICE_OPTION_NONE,
2630 &run, NULL)) ? 0 : 1;
2633 /* end of gnunet-service-psyc.c */