2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
42 * Handle to our current configuration.
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
47 * Handle to the statistics service.
49 static struct GNUNET_STATISTICS_Handle *stats;
52 * Notification context, simplifies client broadcasts.
54 static struct GNUNET_SERVER_NotificationContext *nc;
57 * Handle to the PSYCstore.
59 static struct GNUNET_PSYCSTORE_Handle *store;
62 * All connected masters.
63 * Channel's pub_key_hash -> struct Master
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
68 * All connected slaves.
69 * Channel's pub_key_hash -> struct Slave
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
74 * Connected slaves per channel.
75 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
81 * Message in the transmission queue.
83 struct TransmitMessage
85 struct TransmitMessage *prev;
86 struct TransmitMessage *next;
88 struct GNUNET_SERVER_Client *client;
91 * ID assigned to the message.
101 * @see enum MessageState
106 * Whether a message ACK has already been sent to the client.
107 * #GNUNET_YES or #GNUNET_NO
111 /* Followed by message */
116 * Cache for received message fragments.
117 * Message fragments are only sent to clients after all modifiers arrived.
119 * chan_key -> MultiHashMap chan_msgs
121 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
125 * Entry in the chan_msgs hashmap of @a recv_cache:
126 * fragment_id -> RecvCacheEntry
128 struct RecvCacheEntry
130 struct GNUNET_MULTICAST_MessageHeader *mmsg;
136 * Entry in the @a recv_frags hash map of a @a Channel.
137 * message_id -> FragmentQueue
142 * Fragment IDs stored in @a recv_cache.
144 struct GNUNET_CONTAINER_Heap *fragments;
147 * Total size of received fragments.
152 * Total size of received header fragments (METHOD & MODIFIERs)
154 uint64_t header_size;
157 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
159 uint64_t state_delta;
162 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
167 * Receive state of message.
169 * @see MessageFragmentState
174 * Is the message queued for delivery to the client?
175 * i.e. added to the recv_msgs queue
182 * List of connected clients.
184 struct ClientListItem
186 struct ClientListItem *prev;
187 struct ClientListItem *next;
188 struct GNUNET_SERVER_Client *client;
193 * Common part of the client context for both a channel master and slave.
197 struct ClientListItem *clients_head;
198 struct ClientListItem *clients_tail;
200 struct TransmitMessage *tmit_head;
201 struct TransmitMessage *tmit_tail;
204 * Current PSYCstore operation.
206 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
209 * Received fragments not yet sent to the client.
210 * message_id -> FragmentQueue
212 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
215 * Received message IDs not yet sent to the client.
217 struct GNUNET_CONTAINER_Heap *recv_msgs;
220 * Public key of the channel.
222 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
225 * Hash of @a pub_key.
227 struct GNUNET_HashCode pub_key_hash;
230 * Last message ID sent to the client.
231 * 0 if there is no such message.
233 uint64_t max_message_id;
236 * ID of the last stateful message, where the state operations has been
237 * processed and saved to PSYCstore and which has been sent to the client.
238 * 0 if there is no such message.
240 uint64_t max_state_message_id;
243 * Expected value size for the modifier being received from the PSYC service.
245 uint32_t tmit_mod_value_size_expected;
248 * Actual value size for the modifier being received from the PSYC service.
250 uint32_t tmit_mod_value_size;
253 * @see enum MessageState
258 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
263 * Is this channel ready to receive messages from client?
264 * #GNUNET_YES or #GNUNET_NO
269 * Is the client disconnected?
270 * #GNUNET_YES or #GNUNET_NO
272 uint8_t is_disconnected;
277 * Client context for a channel master.
282 * Channel struct common for Master and Slave
287 * Private key of the channel.
289 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
292 * Handle for the multicast origin.
294 struct GNUNET_MULTICAST_Origin *origin;
297 * Transmit handle for multicast.
299 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
302 * Incoming join requests from multicast.
303 * member_key -> struct GNUNET_MULTICAST_JoinHandle *
305 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
308 * Last message ID transmitted to this channel.
310 * Incremented before sending a message, thus the message_id in messages sent
313 uint64_t max_message_id;
316 * ID of the last message with state operations transmitted to the channel.
317 * 0 if there is no such message.
319 uint64_t max_state_message_id;
322 * Maximum group generation transmitted to the channel.
324 uint64_t max_group_generation;
327 * @see enum GNUNET_PSYC_Policy
329 enum GNUNET_PSYC_Policy policy;
334 * Client context for a channel slave.
339 * Channel struct common for Master and Slave
344 * Private key of the slave.
346 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
349 * Public key of the slave.
351 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
354 * Hash of @a pub_key.
356 struct GNUNET_HashCode pub_key_hash;
359 * Handle for the multicast member.
361 struct GNUNET_MULTICAST_Member *member;
364 * Transmit handle for multicast.
366 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
369 * Peer identity of the origin.
371 struct GNUNET_PeerIdentity origin;
374 * Number of items in @a relays.
376 uint32_t relay_count;
379 * Relays that multicast can use to connect.
381 struct GNUNET_PeerIdentity *relays;
384 * Join request to be transmitted to the master on join.
386 struct GNUNET_PSYC_Message *join_msg;
389 * Join decision received from multicast.
391 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
394 * Maximum request ID for this channel.
396 uint64_t max_request_id;
400 struct OperationClosure
402 struct GNUNET_SERVER_Client *client;
409 transmit_message (struct Channel *chn);
413 message_queue_drop (struct Channel *chn);
417 * Task run during shutdown.
423 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
427 GNUNET_SERVER_notification_context_destroy (nc);
432 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
439 * Clean up master data structures after a client disconnected.
442 cleanup_master (struct Master *mst)
444 struct Channel *chn = &mst->chn;
446 if (NULL != mst->origin)
447 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
448 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
449 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
454 * Clean up slave data structures after a client disconnected.
457 cleanup_slave (struct Slave *slv)
459 struct Channel *chn = &slv->chn;
460 struct GNUNET_CONTAINER_MultiHashMap *
461 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
463 GNUNET_assert (NULL != chn_slv);
464 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
466 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
468 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
470 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
472 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
474 if (NULL != slv->join_msg)
476 GNUNET_free (slv->join_msg);
477 slv->join_msg = NULL;
479 if (NULL != slv->relays)
481 GNUNET_free (slv->relays);
484 if (NULL != slv->member)
486 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
489 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
494 * Clean up channel data structures after a client disconnected.
497 cleanup_channel (struct Channel *chn)
499 message_queue_drop (chn);
500 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
502 if (NULL != chn->store_op)
504 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
505 chn->store_op = NULL;
508 (GNUNET_YES == chn->is_master)
509 ? cleanup_master ((struct Master *) chn)
510 : cleanup_slave ((struct Slave *) chn);
516 * Called whenever a client is disconnected.
517 * Frees our resources associated with that client.
519 * @param cls Closure.
520 * @param client Identification of the client.
523 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
529 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
534 "%p User context is NULL in client_disconnect()\n", chn);
539 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
540 "%p Client (%s) disconnected from channel %s\n",
541 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
542 GNUNET_h2s (&chn->pub_key_hash));
544 struct ClientListItem *cli = chn->clients_head;
547 if (cli->client == client)
549 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
556 if (NULL == chn->clients_head)
557 { /* Last client disconnected. */
558 if (NULL != chn->tmit_head)
559 { /* Send pending messages to multicast before cleanup. */
560 transmit_message (chn);
564 cleanup_channel (chn);
571 * Send message to all clients connected to the channel.
574 client_send_msg (const struct Channel *chn,
575 const struct GNUNET_MessageHeader *msg)
577 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
578 "%p Sending message to clients.\n", chn);
580 struct ClientListItem *cli = chn->clients_head;
583 GNUNET_SERVER_notification_context_add (nc, cli->client);
584 GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
591 * Send a result code back to the client.
594 * Client that should receive the result code.
598 * Operation ID in network byte order.
600 * Error message to include (or NULL for none).
603 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
604 int64_t result_code, const char *err_msg)
606 struct OperationResult *res;
610 err_size = strnlen (err_msg,
611 GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res)) + 1;
612 res = GNUNET_malloc (sizeof (struct OperationResult) + err_size);
613 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
614 res->header.size = htons (sizeof (struct OperationResult) + err_size);
615 res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1);
619 memcpy (&res[1], err_msg, err_size);
620 ((char *) &res[1])[err_size - 1] = '\0';
622 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
623 "%p Sending result to client for operation #%" PRIu64 ": "
624 "%" PRId64 " (%s)\n",
625 client, GNUNET_ntohll (op_id), result_code, err_msg);
627 GNUNET_SERVER_notification_context_add (nc, client);
628 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
635 * Closure for join_mem_test_cb()
637 struct JoinMemTestClosure
639 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
641 struct GNUNET_MULTICAST_JoinHandle *jh;
642 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
647 * Membership test result callback used for join requests.
650 join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
652 struct JoinMemTestClosure *jcls = cls;
654 if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
655 { /* Pass on join request to client if this is a master channel */
656 struct Master *mst = (struct Master *) jcls->chn;
657 struct GNUNET_HashCode slave_key_hash;
658 GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
660 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
661 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
662 client_send_msg (jcls->chn, &jcls->join_msg->header);
667 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
669 GNUNET_free (jcls->join_msg);
675 * Incoming join request from multicast.
678 mcast_recv_join_request (void *cls,
679 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
680 const struct GNUNET_MessageHeader *join_msg,
681 struct GNUNET_MULTICAST_JoinHandle *jh)
683 struct Channel *chn = cls;
684 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
686 uint16_t join_msg_size = 0;
687 if (NULL != join_msg)
689 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
691 join_msg_size = ntohs (join_msg->size);
695 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
696 "%p Got join message with invalid type %u.\n",
697 chn, ntohs (join_msg->type));
701 struct GNUNET_PSYC_JoinRequestMessage *
702 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
703 req->header.size = htons (sizeof (*req) + join_msg_size);
704 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
705 req->slave_key = *slave_key;
706 if (0 < join_msg_size)
707 memcpy (&req[1], join_msg, join_msg_size);
709 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
710 jcls->slave_key = *slave_key;
713 jcls->join_msg = req;
715 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
716 chn->max_message_id, 0,
717 &join_mem_test_cb, jcls);
722 * Join decision received from multicast.
725 mcast_recv_join_decision (void *cls, int is_admitted,
726 const struct GNUNET_PeerIdentity *peer,
727 uint16_t relay_count,
728 const struct GNUNET_PeerIdentity *relays,
729 const struct GNUNET_MessageHeader *join_resp)
731 struct Slave *slv = cls;
732 struct Channel *chn = &slv->chn;
733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734 "%p Got join decision: %d\n", slv, is_admitted);
736 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
737 struct GNUNET_PSYC_JoinDecisionMessage *
738 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
739 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
740 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
741 dcsn->is_admitted = htonl (is_admitted);
742 if (0 < join_resp_size)
743 memcpy (&dcsn[1], join_resp, join_resp_size);
745 client_send_msg (chn, &dcsn->header);
747 if (GNUNET_YES == is_admitted)
749 chn->is_ready = GNUNET_YES;
759 * Received result of GNUNET_PSYCSTORE_membership_test()
762 store_recv_membership_test_result (void *cls, int64_t result, const char *err_msg)
764 struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
765 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766 "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%s)\n",
767 mth, result, err_msg);
769 GNUNET_MULTICAST_membership_test_result (mth, result);
774 * Incoming membership test request from multicast.
777 mcast_recv_membership_test (void *cls,
778 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
779 uint64_t message_id, uint64_t group_generation,
780 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
782 struct Channel *chn = cls;
783 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784 "%p Received membership test request from multicast.\n",
786 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
787 message_id, group_generation,
788 &store_recv_membership_test_result, mth);
793 store_recv_fragment_replay (void *cls,
794 struct GNUNET_MULTICAST_MessageHeader *msg,
795 enum GNUNET_PSYCSTORE_MessageFlags flags)
797 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
799 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
805 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
808 store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_msg)
810 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%s)\n",
813 rh, result, err_msg);
821 GNUNET_MULTICAST_replay_response (rh, NULL,
822 GNUNET_MULTICAST_REC_NOT_FOUND);
825 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
826 GNUNET_MULTICAST_replay_response (rh, NULL,
827 GNUNET_MULTICAST_REC_ACCESS_DENIED);
831 GNUNET_MULTICAST_replay_response (rh, NULL,
832 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
835 GNUNET_MULTICAST_replay_response_end (rh);
840 * Incoming fragment replay request from multicast.
843 mcast_recv_replay_fragment (void *cls,
844 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
845 uint64_t fragment_id, uint64_t flags,
846 struct GNUNET_MULTICAST_ReplayHandle *rh)
849 struct Channel *chn = cls;
850 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
851 fragment_id, fragment_id,
852 &store_recv_fragment_replay,
853 &store_recv_fragment_replay_result, rh);
858 * Incoming message replay request from multicast.
861 mcast_recv_replay_message (void *cls,
862 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
864 uint64_t fragment_offset,
866 struct GNUNET_MULTICAST_ReplayHandle *rh)
868 struct Channel *chn = cls;
869 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
870 message_id, message_id,
871 &store_recv_fragment_replay,
872 &store_recv_fragment_replay_result, rh);
877 * Convert an uint64_t in network byte order to a HashCode
878 * that can be used as key in a MultiHashMap
881 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
883 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
884 /* TODO: use built-in byte swap functions if available */
886 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
887 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
889 *key = (struct GNUNET_HashCode) {};
891 = (n << 32) | (n >> 32);
896 * Convert an uint64_t in host byte order to a HashCode
897 * that can be used as key in a MultiHashMap
900 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
902 #if __BYTE_ORDER == __BIG_ENDIAN
903 hash_key_from_nll (key, n);
904 #elif __BYTE_ORDER == __LITTLE_ENDIAN
905 *key = (struct GNUNET_HashCode) {};
906 *((uint64_t *) key) = n;
908 #error byteorder undefined
914 * Send multicast message to all clients connected to the channel.
917 client_send_mcast_msg (struct Channel *chn,
918 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
921 struct GNUNET_PSYC_MessageHeader *pmsg;
922 uint16_t size = ntohs (mmsg->header.size);
923 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
925 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926 "%p Sending multicast message to client. "
927 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
928 chn, GNUNET_ntohll (mmsg->fragment_id),
929 GNUNET_ntohll (mmsg->message_id));
931 pmsg = GNUNET_malloc (psize);
932 pmsg->header.size = htons (psize);
933 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
934 pmsg->message_id = mmsg->message_id;
935 pmsg->fragment_offset = mmsg->fragment_offset;
936 pmsg->flags = htonl (flags);
938 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
939 client_send_msg (chn, &pmsg->header);
945 * Send multicast request to all clients connected to the channel.
948 client_send_mcast_req (struct Master *mst,
949 const struct GNUNET_MULTICAST_RequestHeader *req)
951 struct Channel *chn = &mst->chn;
953 struct GNUNET_PSYC_MessageHeader *pmsg;
954 uint16_t size = ntohs (req->header.size);
955 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
957 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958 "%p Sending multicast request to client. "
959 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
960 chn, GNUNET_ntohll (req->fragment_id),
961 GNUNET_ntohll (req->request_id));
963 pmsg = GNUNET_malloc (psize);
964 pmsg->header.size = htons (psize);
965 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
966 pmsg->message_id = req->request_id;
967 pmsg->fragment_offset = req->fragment_offset;
968 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
970 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
971 client_send_msg (chn, &pmsg->header);
977 * Insert a multicast message fragment into the queue belonging to the message.
979 * @param chn Channel.
980 * @param mmsg Multicast message fragment.
981 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
982 * @param first_ptype First PSYC message part type in @a mmsg.
983 * @param last_ptype Last PSYC message part type in @a mmsg.
986 fragment_queue_insert (struct Channel *chn,
987 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
988 uint16_t first_ptype, uint16_t last_ptype)
990 const uint16_t size = ntohs (mmsg->header.size);
991 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
992 struct GNUNET_CONTAINER_MultiHashMap
993 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
996 struct GNUNET_HashCode msg_id_hash;
997 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1000 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1004 fragq = GNUNET_new (struct FragmentQueue);
1005 fragq->state = MSG_FRAG_STATE_HEADER;
1007 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1009 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1010 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1012 if (NULL == chan_msgs)
1014 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1015 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1016 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1020 struct GNUNET_HashCode frag_id_hash;
1021 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1022 struct RecvCacheEntry
1023 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1024 if (NULL == cache_entry)
1026 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027 "%p Adding message fragment to cache. "
1028 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1029 chn, GNUNET_ntohll (mmsg->message_id),
1030 GNUNET_ntohll (mmsg->fragment_id));
1031 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1032 "%p header_size: %" PRIu64 " + %u\n",
1033 chn, fragq->header_size, size);
1034 cache_entry = GNUNET_new (struct RecvCacheEntry);
1035 cache_entry->ref_count = 1;
1036 cache_entry->mmsg = GNUNET_malloc (size);
1037 memcpy (cache_entry->mmsg, mmsg, size);
1038 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1039 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1043 cache_entry->ref_count++;
1044 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1045 "%p Message fragment is already in cache. "
1046 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1047 ", ref_count: %u\n",
1048 chn, GNUNET_ntohll (mmsg->message_id),
1049 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1052 if (MSG_FRAG_STATE_HEADER == fragq->state)
1054 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1056 struct GNUNET_PSYC_MessageMethod *
1057 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1058 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1059 fragq->flags = ntohl (pmeth->flags);
1062 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1064 fragq->header_size += size;
1066 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1067 || frag_offset == fragq->header_size)
1068 { /* header is now complete */
1069 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1070 "%p Header of message %" PRIu64 " is complete.\n",
1071 chn, GNUNET_ntohll (mmsg->message_id));
1073 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1074 "%p Adding message %" PRIu64 " to queue.\n",
1075 chn, GNUNET_ntohll (mmsg->message_id));
1076 fragq->state = MSG_FRAG_STATE_DATA;
1080 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1081 "%p Header of message %" PRIu64 " is NOT complete yet: "
1082 "%" PRIu64 " != %" PRIu64 "\n",
1083 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1084 fragq->header_size);
1090 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1091 if (frag_offset == fragq->size)
1092 fragq->state = MSG_FRAG_STATE_END;
1094 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1095 "%p Message %" PRIu64 " is NOT complete yet: "
1096 "%" PRIu64 " != %" PRIu64 "\n",
1097 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1101 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1102 /* Drop message without delivering to client if it's a single fragment */
1104 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1105 ? MSG_FRAG_STATE_DROP
1106 : MSG_FRAG_STATE_CANCEL;
1109 switch (fragq->state)
1111 case MSG_FRAG_STATE_DATA:
1112 case MSG_FRAG_STATE_END:
1113 case MSG_FRAG_STATE_CANCEL:
1114 if (GNUNET_NO == fragq->is_queued)
1116 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1117 GNUNET_ntohll (mmsg->message_id));
1118 fragq->is_queued = GNUNET_YES;
1122 fragq->size += size;
1123 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1124 GNUNET_ntohll (mmsg->fragment_id));
1129 * Run fragment queue of a message.
1131 * Send fragments of a message in order to client, after all modifiers arrived
1134 * @param chn Channel.
1135 * @param msg_id ID of the message @a fragq belongs to.
1136 * @param fragq Fragment queue of the message.
1137 * @param drop Drop message without delivering to client?
1138 * #GNUNET_YES or #GNUNET_NO.
1141 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1142 struct FragmentQueue *fragq, uint8_t drop)
1144 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1145 "%p Running message fragment queue for message %" PRIu64
1147 chn, msg_id, fragq->state);
1149 struct GNUNET_CONTAINER_MultiHashMap
1150 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1151 &chn->pub_key_hash);
1152 GNUNET_assert (NULL != chan_msgs);
1155 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1158 struct GNUNET_HashCode frag_id_hash;
1159 hash_key_from_hll (&frag_id_hash, frag_id);
1160 struct RecvCacheEntry *cache_entry
1161 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1162 if (cache_entry != NULL)
1164 if (GNUNET_NO == drop)
1166 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1168 if (cache_entry->ref_count <= 1)
1170 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1172 GNUNET_free (cache_entry->mmsg);
1173 GNUNET_free (cache_entry);
1177 cache_entry->ref_count--;
1180 #if CACHE_AGING_IMPLEMENTED
1181 else if (GNUNET_NO == drop)
1183 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1187 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1190 if (MSG_FRAG_STATE_END <= fragq->state)
1192 struct GNUNET_HashCode msg_id_hash;
1193 hash_key_from_hll (&msg_id_hash, msg_id);
1195 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1196 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1197 GNUNET_free (fragq);
1201 fragq->is_queued = GNUNET_NO;
1207 * Run message queue.
1209 * Send messages in queue to client in order after a message has arrived from
1210 * multicast, according to the following:
1211 * - A message is only sent if all of its modifiers arrived.
1212 * - A stateful message is only sent if the previous stateful message
1213 * has already been delivered to the client.
1215 * @param chn Channel.
1217 * @return Number of messages removed from queue and sent to client.
1220 message_queue_run (struct Channel *chn)
1222 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1223 "%p Running message queue.\n", chn);
1226 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1229 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1230 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1231 struct GNUNET_HashCode msg_id_hash;
1232 hash_key_from_hll (&msg_id_hash, msg_id);
1234 struct FragmentQueue *
1235 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1237 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1239 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1240 "%p No fragq (%p) or header not complete.\n",
1245 if (MSG_FRAG_STATE_HEADER == fragq->state)
1247 /* Check if there's a missing message before the current one */
1248 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1250 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1251 && msg_id - 1 != chn->max_message_id)
1253 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1254 "%p Out of order message. "
1255 "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1256 chn, msg_id, chn->max_message_id);
1262 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1264 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1265 "%p Out of order stateful message. "
1266 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1267 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1271 /* FIXME: apply modifiers to state in PSYCstore */
1272 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
1273 store_recv_state_modify_result, cls);
1275 chn->max_state_message_id = msg_id;
1277 chn->max_message_id = msg_id;
1279 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1280 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1283 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1290 * Drop message queue of a channel.
1292 * Remove all messages in queue without sending it to clients.
1294 * @param chn Channel.
1296 * @return Number of messages removed from queue.
1299 message_queue_drop (struct Channel *chn)
1301 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1302 "%p Dropping message queue.\n", chn);
1305 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1308 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1309 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1310 struct GNUNET_HashCode msg_id_hash;
1311 hash_key_from_hll (&msg_id_hash, msg_id);
1313 struct FragmentQueue *
1314 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1316 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1317 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1320 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1321 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1327 * Received result of GNUNET_PSYCSTORE_fragment_store().
1330 store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg)
1332 struct Channel *chn = cls;
1333 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1334 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n",
1335 chn, result, err_msg);
1340 * Handle incoming message fragment from multicast.
1342 * Store it using PSYCstore and send it to the clients of the channel in order.
1345 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1347 struct Channel *chn = cls;
1348 uint16_t size = ntohs (mmsg->header.size);
1350 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1351 "%p Received multicast message of size %u.\n",
1354 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1355 &store_recv_fragment_store_result, chn);
1357 uint16_t first_ptype = 0, last_ptype = 0;
1359 == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1360 (const char *) &mmsg[1],
1361 &first_ptype, &last_ptype))
1363 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1364 "%p Dropping incoming multicast message with invalid parts.\n",
1366 GNUNET_break_op (0);
1370 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1371 "Message parts: first: type %u, last: type %u\n",
1372 first_ptype, last_ptype);
1374 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1375 message_queue_run (chn);
1380 * Incoming request fragment from multicast for a master.
1382 * @param cls Master.
1383 * @param req The request.
1386 mcast_recv_request (void *cls,
1387 const struct GNUNET_MULTICAST_RequestHeader *req)
1389 struct Master *mst = cls;
1390 uint16_t size = ntohs (req->header.size);
1392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1393 "%p Received multicast request of size %u.\n",
1396 uint16_t first_ptype = 0, last_ptype = 0;
1398 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1399 (const char *) &req[1],
1400 &first_ptype, &last_ptype))
1402 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1403 "%p Dropping incoming multicast request with invalid parts.\n",
1405 GNUNET_break_op (0);
1409 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1410 "Message parts: first: type %u, last: type %u\n",
1411 first_ptype, last_ptype);
1413 /* FIXME: in-order delivery */
1414 client_send_mcast_req (mst, req);
1419 * Response from PSYCstore with the current counter values for a channel master.
1422 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1423 uint64_t max_message_id, uint64_t max_group_generation,
1424 uint64_t max_state_message_id)
1426 struct Master *mst = cls;
1427 struct Channel *chn = &mst->chn;
1428 chn->store_op = NULL;
1430 struct GNUNET_PSYC_CountersResultMessage res;
1431 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1432 res.header.size = htons (sizeof (res));
1433 res.result_code = htonl (result - INT32_MIN);
1434 res.max_message_id = GNUNET_htonll (max_message_id);
1436 if (GNUNET_OK == result || GNUNET_NO == result)
1438 mst->max_message_id = max_message_id;
1439 chn->max_message_id = max_message_id;
1440 chn->max_state_message_id = max_state_message_id;
1441 mst->max_group_generation = max_group_generation;
1443 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1444 &mcast_recv_join_request,
1445 &mcast_recv_membership_test,
1446 &mcast_recv_replay_fragment,
1447 &mcast_recv_replay_message,
1448 &mcast_recv_request,
1449 &mcast_recv_message, chn);
1450 chn->is_ready = GNUNET_YES;
1454 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1455 "%p GNUNET_PSYCSTORE_counters_get() "
1456 "returned %d for channel %s.\n",
1457 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1460 client_send_msg (chn, &res.header);
1465 * Response from PSYCstore with the current counter values for a channel slave.
1468 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1469 uint64_t max_message_id, uint64_t max_group_generation,
1470 uint64_t max_state_message_id)
1472 struct Slave *slv = cls;
1473 struct Channel *chn = &slv->chn;
1474 chn->store_op = NULL;
1476 struct GNUNET_PSYC_CountersResultMessage res;
1477 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1478 res.header.size = htons (sizeof (res));
1479 res.result_code = htonl (result - INT32_MIN);
1480 res.max_message_id = GNUNET_htonll (max_message_id);
1482 if (GNUNET_OK == result || GNUNET_NO == result)
1484 chn->max_message_id = max_message_id;
1485 chn->max_state_message_id = max_state_message_id;
1487 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1489 slv->relay_count, slv->relays,
1490 &slv->join_msg->header,
1491 &mcast_recv_join_request,
1492 &mcast_recv_join_decision,
1493 &mcast_recv_membership_test,
1494 &mcast_recv_replay_fragment,
1495 &mcast_recv_replay_message,
1496 &mcast_recv_message, chn);
1497 if (NULL != slv->join_msg)
1499 GNUNET_free (slv->join_msg);
1500 slv->join_msg = NULL;
1505 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1506 "%p GNUNET_PSYCSTORE_counters_get() "
1507 "returned %d for channel %s.\n",
1508 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1511 client_send_msg (chn, &res.header);
1516 channel_init (struct Channel *chn)
1519 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1520 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1525 * Handle a connecting client starting a channel master.
1528 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1529 const struct GNUNET_MessageHeader *msg)
1531 const struct MasterStartRequest *req
1532 = (const struct MasterStartRequest *) msg;
1534 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1535 struct GNUNET_HashCode pub_key_hash;
1537 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1538 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1541 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1542 struct Channel *chn;
1546 mst = GNUNET_new (struct Master);
1547 mst->policy = ntohl (req->policy);
1548 mst->priv_key = req->channel_key;
1549 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1552 chn->is_master = GNUNET_YES;
1553 chn->pub_key = pub_key;
1554 chn->pub_key_hash = pub_key_hash;
1557 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1558 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1559 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1560 store_recv_master_counters, mst);
1566 struct GNUNET_PSYC_CountersResultMessage res;
1567 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1568 res.header.size = htons (sizeof (res));
1569 res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN);
1570 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1572 GNUNET_SERVER_notification_context_add (nc, client);
1573 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1577 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1578 "%p Client connected as master to channel %s.\n",
1579 mst, GNUNET_h2s (&chn->pub_key_hash));
1581 struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1582 cli->client = client;
1583 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1585 GNUNET_SERVER_client_set_user_context (client, chn);
1586 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1591 * Handle a connecting client joining as a channel slave.
1594 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1595 const struct GNUNET_MessageHeader *msg)
1597 const struct SlaveJoinRequest *req
1598 = (const struct SlaveJoinRequest *) msg;
1599 uint16_t req_size = ntohs (req->header.size);
1601 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1602 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1604 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1605 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1606 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1608 struct GNUNET_CONTAINER_MultiHashMap *
1609 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1610 struct Slave *slv = NULL;
1611 struct Channel *chn;
1613 if (NULL != chn_slv)
1615 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1619 slv = GNUNET_new (struct Slave);
1620 slv->priv_key = req->slave_key;
1621 slv->pub_key = slv_pub_key;
1622 slv->pub_key_hash = slv_pub_key_hash;
1623 slv->origin = req->origin;
1624 slv->relay_count = ntohl (req->relay_count);
1626 const struct GNUNET_PeerIdentity *
1627 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1628 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1629 uint16_t join_msg_size = 0;
1631 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1634 join_msg_size = ntohs (slv->join_msg->header.size);
1635 slv->join_msg = GNUNET_malloc (join_msg_size);
1636 memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
1638 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1640 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1641 "%u + %u + %u != %u\n",
1642 sizeof (*req), relay_size, join_msg_size, req_size);
1644 GNUNET_SERVER_client_disconnect (client);
1647 if (0 < slv->relay_count)
1649 slv->relays = GNUNET_malloc (relay_size);
1650 memcpy (slv->relays, &req[1], relay_size);
1654 chn->is_master = GNUNET_NO;
1655 chn->pub_key = req->channel_key;
1656 chn->pub_key_hash = pub_key_hash;
1659 if (NULL == chn_slv)
1661 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1662 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1663 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1665 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1666 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1667 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1668 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1669 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1670 &store_recv_slave_counters, slv);
1676 struct GNUNET_PSYC_CountersResultMessage res;
1677 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1678 res.header.size = htons (sizeof (res));
1679 res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN);
1680 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1682 GNUNET_SERVER_notification_context_add (nc, client);
1683 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1686 if (NULL == slv->member)
1689 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1691 slv->relay_count, slv->relays,
1692 &slv->join_msg->header,
1693 &mcast_recv_join_request,
1694 &mcast_recv_join_decision,
1695 &mcast_recv_membership_test,
1696 &mcast_recv_replay_fragment,
1697 &mcast_recv_replay_message,
1698 &mcast_recv_message, chn);
1699 if (NULL != slv->join_msg)
1701 GNUNET_free (slv->join_msg);
1702 slv->join_msg = NULL;
1705 else if (NULL != slv->join_dcsn)
1707 GNUNET_SERVER_notification_context_add (nc, client);
1708 GNUNET_SERVER_notification_context_unicast (nc, client,
1709 &slv->join_dcsn->header,
1714 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1715 "%p Client connected as slave to channel %s.\n",
1716 slv, GNUNET_h2s (&chn->pub_key_hash));
1718 struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1719 cli->client = client;
1720 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1722 GNUNET_SERVER_client_set_user_context (client, chn);
1723 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1727 struct JoinDecisionClosure
1729 int32_t is_admitted;
1730 struct GNUNET_MessageHeader *msg;
1735 * Iterator callback for sending join decisions to multicast.
1738 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1741 struct JoinDecisionClosure *jcls = cls;
1742 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1743 // FIXME: add relays
1744 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1750 * Join decision from client.
1753 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1754 const struct GNUNET_MessageHeader *msg)
1757 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1758 GNUNET_assert (GNUNET_YES == chn->is_master);
1759 struct Master *mst = (struct Master *) chn;
1761 struct GNUNET_PSYC_JoinDecisionMessage *
1762 dcsn = (struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1763 struct JoinDecisionClosure jcls;
1764 jcls.is_admitted = ntohl (dcsn->is_admitted);
1766 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1767 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1770 struct GNUNET_HashCode slave_key_hash;
1771 GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1774 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1775 "%p Got join decision (%d) from client for channel %s..\n",
1776 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1777 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1778 "%p ..and slave %s.\n",
1779 mst, GNUNET_h2s (&slave_key_hash));
1781 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1782 &mcast_send_join_decision, &jcls);
1783 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1784 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1789 * Send acknowledgement to a client.
1791 * Sent after a message fragment has been passed on to multicast.
1793 * @param chn The channel struct for the client.
1796 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1798 struct GNUNET_MessageHeader res;
1799 res.size = htons (sizeof (res));
1800 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1803 GNUNET_SERVER_notification_context_add (nc, client);
1804 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1809 * Callback for the transmit functions of multicast.
1812 transmit_notify (void *cls, size_t *data_size, void *data)
1814 struct Channel *chn = cls;
1815 struct TransmitMessage *tmit_msg = chn->tmit_head;
1817 if (NULL == tmit_msg || *data_size < tmit_msg->size)
1819 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1820 "%p transmit_notify: nothing to send.\n", chn);
1825 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1826 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1828 *data_size = tmit_msg->size;
1829 memcpy (data, &tmit_msg[1], *data_size);
1831 int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1833 if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
1834 send_message_ack (chn, tmit_msg->client);
1836 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1837 GNUNET_free (tmit_msg);
1839 if (NULL != chn->tmit_head)
1841 transmit_message (chn);
1843 else if (GNUNET_YES == chn->is_disconnected)
1845 /* FIXME: handle partial message (when still in_transmit) */
1846 cleanup_channel (chn);
1853 * Callback for the transmit functions of multicast.
1856 master_transmit_notify (void *cls, size_t *data_size, void *data)
1858 int ret = transmit_notify (cls, data_size, data);
1860 if (GNUNET_YES == ret)
1862 struct Master *mst = cls;
1863 mst->tmit_handle = NULL;
1870 * Callback for the transmit functions of multicast.
1873 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1875 int ret = transmit_notify (cls, data_size, data);
1877 if (GNUNET_YES == ret)
1879 struct Slave *slv = cls;
1880 slv->tmit_handle = NULL;
1887 * Transmit a message from a channel master to the multicast group.
1890 master_transmit_message (struct Master *mst)
1892 if (NULL == mst->tmit_handle)
1895 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1896 mst->max_group_generation,
1897 master_transmit_notify, mst);
1901 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1907 * Transmit a message from a channel slave to the multicast group.
1910 slave_transmit_message (struct Slave *slv)
1912 if (NULL == slv->tmit_handle)
1915 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1916 slave_transmit_notify, slv);
1920 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1926 transmit_message (struct Channel *chn)
1929 ? master_transmit_message ((struct Master *) chn)
1930 : slave_transmit_message ((struct Slave *) chn);
1935 * Queue a message from a channel master for sending to the multicast group.
1938 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1939 uint16_t first_ptype, uint16_t last_ptype)
1941 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1943 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1945 tmit_msg->id = ++mst->max_message_id;
1946 struct GNUNET_PSYC_MessageMethod *pmeth
1947 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1949 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1951 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1953 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1955 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1956 - mst->max_state_message_id);
1960 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1967 * Queue a message from a channel slave for sending to the multicast group.
1970 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1971 uint16_t first_ptype, uint16_t last_ptype)
1973 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1975 struct GNUNET_PSYC_MessageMethod *pmeth
1976 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1977 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1978 tmit_msg->id = ++slv->max_request_id;
1984 * Queue PSYC message parts for sending to multicast.
1986 * @param chn Channel to send to.
1987 * @param client Client the message originates from.
1988 * @param data_size Size of @a data.
1989 * @param data Concatenated message parts.
1990 * @param first_ptype First message part type in @a data.
1991 * @param last_ptype Last message part type in @a data.
1993 static struct TransmitMessage *
1994 queue_message (struct Channel *chn,
1995 struct GNUNET_SERVER_Client *client,
1998 uint16_t first_ptype, uint16_t last_ptype)
2000 struct TransmitMessage *
2001 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2002 memcpy (&tmit_msg[1], data, data_size);
2003 tmit_msg->client = client;
2004 tmit_msg->size = data_size;
2005 tmit_msg->state = chn->tmit_state;
2007 /* FIXME: separate queue per message ID */
2009 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2012 ? master_queue_message ((struct Master *) chn, tmit_msg,
2013 first_ptype, last_ptype)
2014 : slave_queue_message ((struct Slave *) chn, tmit_msg,
2015 first_ptype, last_ptype);
2021 * Cancel transmission of current message.
2023 * @param chn Channel to send to.
2024 * @param client Client the message originates from.
2027 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2029 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2031 struct GNUNET_MessageHeader msg;
2032 msg.size = htons (sizeof (msg));
2033 msg.type = htons (type);
2035 queue_message (chn, client, sizeof (msg), &msg, type, type);
2036 transmit_message (chn);
2038 /* FIXME: cleanup */
2043 * Incoming message from a master or slave client.
2046 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2047 const struct GNUNET_MessageHeader *msg)
2050 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2051 GNUNET_assert (NULL != chn);
2053 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2054 "%p Received message from client.\n", chn);
2055 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2057 if (GNUNET_YES != chn->is_ready)
2059 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2060 "%p Channel is not ready yet, disconnecting client.\n", chn);
2062 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2066 uint16_t size = ntohs (msg->size);
2067 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2069 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
2071 transmit_cancel (chn, client);
2072 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2076 uint16_t first_ptype = 0, last_ptype = 0;
2078 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2079 (const char *) &msg[1],
2080 &first_ptype, &last_ptype))
2082 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2083 "%p Received invalid message part from client.\n", chn);
2085 transmit_cancel (chn, client);
2086 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2089 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2090 "%p Received message with first part type %u and last part type %u.\n",
2091 chn, first_ptype, last_ptype);
2093 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2094 first_ptype, last_ptype);
2095 transmit_message (chn);
2096 /* FIXME: send a few ACKs even before transmit_notify is called */
2098 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2102 struct MembershipStoreClosure
2104 struct GNUNET_SERVER_Client *client;
2105 struct Channel *chn;
2111 * Received result of GNUNET_PSYCSTORE_membership_store()
2114 store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg)
2116 struct MembershipStoreClosure *mcls = cls;
2117 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2118 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n",
2119 mcls->chn, result, err_msg);
2121 client_send_result (mcls->client, mcls->op_id, result, err_msg);
2126 * Client requests to add/remove a slave in the membership database.
2129 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2130 const struct GNUNET_MessageHeader *msg)
2133 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2134 GNUNET_assert (NULL != chn);
2136 const struct ChannelMembershipStoreRequest *
2137 req = (const struct ChannelMembershipStoreRequest *) msg;
2139 struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls));
2140 mcls->client = client;
2142 mcls->op_id = req->op_id;
2144 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2145 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2146 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2147 "%p Received membership store request from client.\n", chn);
2148 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2149 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2150 chn, req->did_join, announced_at, effective_since);
2152 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2153 req->did_join, announced_at, effective_since,
2154 0, /* FIXME: group_generation */
2155 &store_recv_membership_store_result, mcls);
2156 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2161 store_recv_fragment_history (void *cls,
2162 struct GNUNET_MULTICAST_MessageHeader *msg,
2163 enum GNUNET_PSYCSTORE_MessageFlags flags)
2165 struct OperationClosure *opcls = cls;
2166 struct Channel *chn = opcls->chn;
2167 client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC);
2173 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
2176 store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg)
2178 struct OperationClosure *opcls = cls;
2179 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2180 "%p History replay #%" PRIu64 ": "
2181 "PSYCSTORE returned %" PRId64 " (%s)\n",
2182 opcls->chn, opcls->op_id, result, err_msg);
2184 client_send_result (opcls->client, opcls->op_id, result, err_msg);
2189 * Client requests channel history from PSYCstore.
2192 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2193 const struct GNUNET_MessageHeader *msg)
2196 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2197 GNUNET_assert (NULL != chn);
2199 const struct HistoryRequest *
2200 req = (const struct HistoryRequest *) msg;
2202 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2203 opcls->client = client;
2205 opcls->op_id = req->op_id;
2207 if (0 == req->message_limit)
2208 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2209 GNUNET_ntohll (req->start_message_id),
2210 GNUNET_ntohll (req->end_message_id),
2211 &store_recv_fragment_history,
2212 &store_recv_fragment_history_result, opcls);
2214 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2215 GNUNET_ntohll (req->message_limit),
2216 &store_recv_fragment_history,
2217 &store_recv_fragment_history_result,
2220 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2225 * Received state var from PSYCstore, send it to client.
2228 store_recv_state_var (void *cls, const char *name,
2229 const void *value, size_t value_size)
2231 struct OperationClosure *opcls = cls;
2232 struct OperationResult *op;
2236 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2237 struct GNUNET_PSYC_MessageModifier *mod;
2238 op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size);
2239 op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size);
2240 op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2241 op->op_id = opcls->op_id;
2243 mod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
2244 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2245 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2246 mod->name_size = htons (name_size);
2247 mod->value_size = htonl (value_size);
2248 mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2249 memcpy (&mod[1], name, name_size);
2250 memcpy (((char *) &mod[1]) + name_size, value, value_size);
2254 struct GNUNET_MessageHeader *mod;
2255 op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size);
2256 op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size);
2257 op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2258 op->op_id = opcls->op_id;
2260 mod = (struct GNUNET_MessageHeader *) &op[1];
2261 mod->size = htons (sizeof (*mod) + value_size);
2262 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2263 memcpy (&mod[1], value, value_size);
2266 GNUNET_SERVER_notification_context_add (nc, opcls->client);
2267 GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header,
2274 * Received result of GNUNET_PSYCSTORE_state_get()
2275 * or GNUNET_PSYCSTORE_state_get_prefix()
2278 store_recv_state_result (void *cls, int64_t result, const char *err_msg)
2280 struct OperationClosure *opcls = cls;
2281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2282 "%p History replay #%" PRIu64 ": "
2283 "PSYCSTORE returned %" PRId64 " (%s)\n",
2284 opcls->chn, opcls->op_id, result, err_msg);
2286 client_send_result (opcls->client, opcls->op_id, result, err_msg);
2291 * Client requests best matching state variable from PSYCstore.
2294 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2295 const struct GNUNET_MessageHeader *msg)
2298 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2299 GNUNET_assert (NULL != chn);
2301 const struct StateRequest *
2302 req = (const struct StateRequest *) msg;
2304 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2305 const char *name = (const char *) &req[1];
2306 if (0 == name_size || '\0' != name[name_size - 1])
2308 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2312 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2313 opcls->client = client;
2315 opcls->op_id = req->op_id;
2317 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2318 &store_recv_state_var,
2319 &store_recv_state_result, opcls);
2320 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2325 * Client requests state variables with a given prefix from PSYCstore.
2328 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2329 const struct GNUNET_MessageHeader *msg)
2332 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2333 GNUNET_assert (NULL != chn);
2335 const struct StateRequest *
2336 req = (const struct StateRequest *) msg;
2338 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2339 const char *name = (const char *) &req[1];
2340 if (0 == name_size || '\0' != name[name_size - 1])
2342 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2346 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2347 opcls->client = client;
2349 opcls->op_id = req->op_id;
2351 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2352 &store_recv_state_var,
2353 &store_recv_state_result, opcls);
2354 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2359 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2360 { &client_recv_master_start, NULL,
2361 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2363 { &client_recv_slave_join, NULL,
2364 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2366 { &client_recv_join_decision, NULL,
2367 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2369 { &client_recv_psyc_message, NULL,
2370 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2372 { &client_recv_membership_store, NULL,
2373 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2375 { &client_recv_history_replay, NULL,
2376 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2378 { &client_recv_state_get, NULL,
2379 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2381 { &client_recv_state_get_prefix, NULL,
2382 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2384 { NULL, NULL, 0, 0 }
2389 * Initialize the PSYC service.
2391 * @param cls Closure.
2392 * @param server The initialized server.
2393 * @param c Configuration to use.
2396 run (void *cls, struct GNUNET_SERVER_Handle *server,
2397 const struct GNUNET_CONFIGURATION_Handle *c)
2400 store = GNUNET_PSYCSTORE_connect (cfg);
2401 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2402 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2403 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2404 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2405 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2406 nc = GNUNET_SERVER_notification_context_create (server, 1);
2407 GNUNET_SERVER_add_handlers (server, server_handlers);
2408 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2409 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2410 &shutdown_task, NULL);
2415 * The main function for the service.
2417 * @param argc number of arguments from the command line
2418 * @param argv command line arguments
2419 * @return 0 ok, 1 on error
2422 main (int argc, char *const *argv)
2424 return (GNUNET_OK ==
2425 GNUNET_SERVICE_run (argc, argv, "psyc",
2426 GNUNET_SERVICE_OPTION_NONE,
2427 &run, NULL)) ? 0 : 1;
2430 /* end of gnunet-service-psyc.c */