2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
42 * Handle to our current configuration.
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
47 * Handle to the statistics service.
49 static struct GNUNET_STATISTICS_Handle *stats;
52 * Notification context, simplifies client broadcasts.
54 static struct GNUNET_SERVER_NotificationContext *nc;
57 * Handle to the PSYCstore.
59 static struct GNUNET_PSYCSTORE_Handle *store;
62 * All connected masters.
63 * Channel's pub_key_hash -> struct Master
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
68 * All connected slaves.
69 * Channel's pub_key_hash -> struct Slave
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
74 * Connected slaves per channel.
75 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
81 * Message in the transmission queue.
83 struct TransmitMessage
85 struct TransmitMessage *prev;
86 struct TransmitMessage *next;
88 struct GNUNET_SERVER_Client *client;
91 * ID assigned to the message.
101 * @see enum MessageState
106 * Whether a message ACK has already been sent to the client.
107 * #GNUNET_YES or #GNUNET_NO
111 /* Followed by message */
116 * Cache for received message fragments.
117 * Message fragments are only sent to clients after all modifiers arrived.
119 * chan_key -> MultiHashMap chan_msgs
121 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
125 * Entry in the chan_msgs hashmap of @a recv_cache:
126 * fragment_id -> RecvCacheEntry
128 struct RecvCacheEntry
130 struct GNUNET_MULTICAST_MessageHeader *mmsg;
136 * Entry in the @a recv_frags hash map of a @a Channel.
137 * message_id -> FragmentQueue
142 * Fragment IDs stored in @a recv_cache.
144 struct GNUNET_CONTAINER_Heap *fragments;
147 * Total size of received fragments.
152 * Total size of received header fragments (METHOD & MODIFIERs)
154 uint64_t header_size;
157 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
159 uint64_t state_delta;
162 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
167 * Receive state of message.
169 * @see MessageFragmentState
174 * Is the message queued for delivery to the client?
175 * i.e. added to the recv_msgs queue
182 * List of connected clients.
184 struct ClientListItem
186 struct ClientListItem *prev;
187 struct ClientListItem *next;
188 struct GNUNET_SERVER_Client *client;
193 * Common part of the client context for both a channel master and slave.
197 struct ClientListItem *clients_head;
198 struct ClientListItem *clients_tail;
200 struct TransmitMessage *tmit_head;
201 struct TransmitMessage *tmit_tail;
204 * Current PSYCstore operation.
206 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
209 * Received fragments not yet sent to the client.
210 * message_id -> FragmentQueue
212 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
215 * Received message IDs not yet sent to the client.
217 struct GNUNET_CONTAINER_Heap *recv_msgs;
220 * Public key of the channel.
222 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
225 * Hash of @a pub_key.
227 struct GNUNET_HashCode pub_key_hash;
230 * Last message ID sent to the client.
231 * 0 if there is no such message.
233 uint64_t max_message_id;
236 * ID of the last stateful message, where the state operations has been
237 * processed and saved to PSYCstore and which has been sent to the client.
238 * 0 if there is no such message.
240 uint64_t max_state_message_id;
243 * Expected value size for the modifier being received from the PSYC service.
245 uint32_t tmit_mod_value_size_expected;
248 * Actual value size for the modifier being received from the PSYC service.
250 uint32_t tmit_mod_value_size;
253 * @see enum MessageState
258 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
263 * Is this channel ready to receive messages from client?
264 * #GNUNET_YES or #GNUNET_NO
269 * Is the client disconnected?
270 * #GNUNET_YES or #GNUNET_NO
272 uint8_t is_disconnected;
277 * Client context for a channel master.
282 * Channel struct common for Master and Slave
287 * Private key of the channel.
289 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
292 * Handle for the multicast origin.
294 struct GNUNET_MULTICAST_Origin *origin;
297 * Transmit handle for multicast.
299 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
302 * Incoming join requests from multicast.
303 * member_key -> struct GNUNET_MULTICAST_JoinHandle *
305 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
308 * Last message ID transmitted to this channel.
310 * Incremented before sending a message, thus the message_id in messages sent
313 uint64_t max_message_id;
316 * ID of the last message with state operations transmitted to the channel.
317 * 0 if there is no such message.
319 uint64_t max_state_message_id;
322 * Maximum group generation transmitted to the channel.
324 uint64_t max_group_generation;
327 * @see enum GNUNET_PSYC_Policy
329 enum GNUNET_PSYC_Policy policy;
334 * Client context for a channel slave.
339 * Channel struct common for Master and Slave
344 * Private key of the slave.
346 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
349 * Public key of the slave.
351 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
354 * Hash of @a pub_key.
356 struct GNUNET_HashCode pub_key_hash;
359 * Handle for the multicast member.
361 struct GNUNET_MULTICAST_Member *member;
364 * Transmit handle for multicast.
366 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
369 * Peer identity of the origin.
371 struct GNUNET_PeerIdentity origin;
374 * Number of items in @a relays.
376 uint32_t relay_count;
379 * Relays that multicast can use to connect.
381 struct GNUNET_PeerIdentity *relays;
384 * Join request to be transmitted to the master on join.
386 struct GNUNET_PSYC_Message *join_msg;
389 * Join decision received from multicast.
391 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
394 * Maximum request ID for this channel.
396 uint64_t max_request_id;
400 struct OperationClosure
402 struct GNUNET_SERVER_Client *client;
409 transmit_message (struct Channel *chn);
413 message_queue_drop (struct Channel *chn);
417 * Task run during shutdown.
423 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
427 GNUNET_SERVER_notification_context_destroy (nc);
432 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
439 * Clean up master data structures after a client disconnected.
442 cleanup_master (struct Master *mst)
444 struct Channel *chn = &mst->chn;
446 if (NULL != mst->origin)
447 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
448 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
449 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
454 * Clean up slave data structures after a client disconnected.
457 cleanup_slave (struct Slave *slv)
459 struct Channel *chn = &slv->chn;
460 struct GNUNET_CONTAINER_MultiHashMap *
461 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
463 GNUNET_assert (NULL != chn_slv);
464 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
466 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
468 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
470 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
472 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
474 if (NULL != slv->join_msg)
476 GNUNET_free (slv->join_msg);
477 slv->join_msg = NULL;
479 if (NULL != slv->relays)
481 GNUNET_free (slv->relays);
484 if (NULL != slv->member)
486 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
489 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
494 * Clean up channel data structures after a client disconnected.
497 cleanup_channel (struct Channel *chn)
499 message_queue_drop (chn);
500 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
502 if (NULL != chn->store_op)
504 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
505 chn->store_op = NULL;
508 (GNUNET_YES == chn->is_master)
509 ? cleanup_master ((struct Master *) chn)
510 : cleanup_slave ((struct Slave *) chn);
516 * Called whenever a client is disconnected.
517 * Frees our resources associated with that client.
519 * @param cls Closure.
520 * @param client Identification of the client.
523 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
529 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
534 "%p User context is NULL in client_disconnect()\n", chn);
539 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
540 "%p Client (%s) disconnected from channel %s\n",
541 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
542 GNUNET_h2s (&chn->pub_key_hash));
544 struct ClientListItem *cli = chn->clients_head;
547 if (cli->client == client)
549 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
556 if (NULL == chn->clients_head)
557 { /* Last client disconnected. */
558 if (NULL != chn->tmit_head)
559 { /* Send pending messages to multicast before cleanup. */
560 transmit_message (chn);
564 cleanup_channel (chn);
571 * Send message to all clients connected to the channel.
574 client_send_msg (const struct Channel *chn,
575 const struct GNUNET_MessageHeader *msg)
577 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
578 "%p Sending message to clients.\n", chn);
580 struct ClientListItem *cli = chn->clients_head;
583 GNUNET_SERVER_notification_context_add (nc, cli->client);
584 GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
591 * Send a result code back to the client.
594 * Client that should receive the result code.
598 * Operation ID in network byte order.
600 * Error message to include (or NULL for none).
603 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
604 int64_t result_code, const char *err_msg)
606 struct OperationResult *res;
610 err_size = strnlen (err_msg,
611 GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res)) + 1;
612 res = GNUNET_malloc (sizeof (struct OperationResult) + err_size);
613 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
614 res->header.size = htons (sizeof (struct OperationResult) + err_size);
615 res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1);
619 memcpy (&res[1], err_msg, err_size);
620 ((char *) &res[1])[err_size - 1] = '\0';
622 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
623 "%p Sending result to client for operation #%" PRIu64 ": "
624 "%" PRId64 " (%s)\n",
625 client, GNUNET_ntohll (op_id), result_code, err_msg);
627 GNUNET_SERVER_notification_context_add (nc, client);
628 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
635 * Closure for join_mem_test_cb()
637 struct JoinMemTestClosure
639 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
641 struct GNUNET_MULTICAST_JoinHandle *jh;
642 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
647 * Membership test result callback used for join requests.
650 join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
652 struct JoinMemTestClosure *jcls = cls;
654 if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
655 { /* Pass on join request to client if this is a master channel */
656 struct Master *mst = (struct Master *) jcls->chn;
657 struct GNUNET_HashCode slave_key_hash;
658 GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
660 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
661 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
662 client_send_msg (jcls->chn, &jcls->join_msg->header);
667 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
669 GNUNET_free (jcls->join_msg);
675 * Incoming join request from multicast.
678 mcast_recv_join_request (void *cls,
679 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
680 const struct GNUNET_MessageHeader *join_msg,
681 struct GNUNET_MULTICAST_JoinHandle *jh)
683 struct Channel *chn = cls;
684 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
686 uint16_t join_msg_size = 0;
687 if (NULL != join_msg)
689 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
691 join_msg_size = ntohs (join_msg->size);
695 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
696 "%p Got join message with invalid type %u.\n",
697 chn, ntohs (join_msg->type));
701 struct GNUNET_PSYC_JoinRequestMessage *
702 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
703 req->header.size = htons (sizeof (*req) + join_msg_size);
704 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
705 req->slave_key = *slave_key;
706 if (0 < join_msg_size)
707 memcpy (&req[1], join_msg, join_msg_size);
709 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
710 jcls->slave_key = *slave_key;
713 jcls->join_msg = req;
715 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
716 chn->max_message_id, 0,
717 &join_mem_test_cb, jcls);
722 * Join decision received from multicast.
725 mcast_recv_join_decision (void *cls, int is_admitted,
726 const struct GNUNET_PeerIdentity *peer,
727 uint16_t relay_count,
728 const struct GNUNET_PeerIdentity *relays,
729 const struct GNUNET_MessageHeader *join_resp)
731 struct Slave *slv = cls;
732 struct Channel *chn = &slv->chn;
733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734 "%p Got join decision: %d\n", slv, is_admitted);
736 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
737 struct GNUNET_PSYC_JoinDecisionMessage *
738 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
739 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
740 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
741 dcsn->is_admitted = htonl (is_admitted);
742 if (0 < join_resp_size)
743 memcpy (&dcsn[1], join_resp, join_resp_size);
745 client_send_msg (chn, &dcsn->header);
747 if (GNUNET_YES == is_admitted)
749 chn->is_ready = GNUNET_YES;
759 * Received result of GNUNET_PSYCSTORE_membership_test()
762 store_recv_membership_test_result (void *cls, int64_t result, const char *err_msg)
764 struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
765 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766 "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%s)\n",
767 mth, result, err_msg);
769 GNUNET_MULTICAST_membership_test_result (mth, result);
774 * Incoming membership test request from multicast.
777 mcast_recv_membership_test (void *cls,
778 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
779 uint64_t message_id, uint64_t group_generation,
780 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
782 struct Channel *chn = cls;
783 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784 "%p Received membership test request from multicast.\n",
786 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
787 message_id, group_generation,
788 &store_recv_membership_test_result, mth);
793 store_recv_fragment_replay (void *cls,
794 struct GNUNET_MULTICAST_MessageHeader *msg,
795 enum GNUNET_PSYCSTORE_MessageFlags flags)
797 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
799 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
805 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
808 store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_msg)
810 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%s)\n",
813 rh, result, err_msg);
821 GNUNET_MULTICAST_replay_response (rh, NULL,
822 GNUNET_MULTICAST_REC_NOT_FOUND);
825 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
826 GNUNET_MULTICAST_replay_response (rh, NULL,
827 GNUNET_MULTICAST_REC_ACCESS_DENIED);
830 GNUNET_MULTICAST_replay_response (rh, NULL,
831 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
834 GNUNET_MULTICAST_replay_response_end (rh);
839 * Incoming fragment replay request from multicast.
842 mcast_recv_replay_fragment (void *cls,
843 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
844 uint64_t fragment_id, uint64_t flags,
845 struct GNUNET_MULTICAST_ReplayHandle *rh)
848 struct Channel *chn = cls;
849 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
850 fragment_id, fragment_id,
851 &store_recv_fragment_replay,
852 &store_recv_fragment_replay_result, rh);
857 * Incoming message replay request from multicast.
860 mcast_recv_replay_message (void *cls,
861 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
863 uint64_t fragment_offset,
865 struct GNUNET_MULTICAST_ReplayHandle *rh)
867 struct Channel *chn = cls;
868 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
869 message_id, message_id,
870 &store_recv_fragment_replay,
871 &store_recv_fragment_replay_result, rh);
876 * Convert an uint64_t in network byte order to a HashCode
877 * that can be used as key in a MultiHashMap
880 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
882 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
883 /* TODO: use built-in byte swap functions if available */
885 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
886 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
888 *key = (struct GNUNET_HashCode) {};
890 = (n << 32) | (n >> 32);
895 * Convert an uint64_t in host byte order to a HashCode
896 * that can be used as key in a MultiHashMap
899 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
901 #if __BYTE_ORDER == __BIG_ENDIAN
902 hash_key_from_nll (key, n);
903 #elif __BYTE_ORDER == __LITTLE_ENDIAN
904 *key = (struct GNUNET_HashCode) {};
905 *((uint64_t *) key) = n;
907 #error byteorder undefined
913 * Send multicast message to all clients connected to the channel.
916 client_send_mcast_msg (struct Channel *chn,
917 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
920 struct GNUNET_PSYC_MessageHeader *pmsg;
921 uint16_t size = ntohs (mmsg->header.size);
922 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
924 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
925 "%p Sending multicast message to client. "
926 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
927 chn, GNUNET_ntohll (mmsg->fragment_id),
928 GNUNET_ntohll (mmsg->message_id));
930 pmsg = GNUNET_malloc (psize);
931 pmsg->header.size = htons (psize);
932 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
933 pmsg->message_id = mmsg->message_id;
934 pmsg->fragment_offset = mmsg->fragment_offset;
935 pmsg->flags = htonl (flags);
937 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
938 client_send_msg (chn, &pmsg->header);
944 * Send multicast request to all clients connected to the channel.
947 client_send_mcast_req (struct Master *mst,
948 const struct GNUNET_MULTICAST_RequestHeader *req)
950 struct Channel *chn = &mst->chn;
952 struct GNUNET_PSYC_MessageHeader *pmsg;
953 uint16_t size = ntohs (req->header.size);
954 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
956 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
957 "%p Sending multicast request to client. "
958 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
959 chn, GNUNET_ntohll (req->fragment_id),
960 GNUNET_ntohll (req->request_id));
962 pmsg = GNUNET_malloc (psize);
963 pmsg->header.size = htons (psize);
964 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
965 pmsg->message_id = req->request_id;
966 pmsg->fragment_offset = req->fragment_offset;
967 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
969 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
970 client_send_msg (chn, &pmsg->header);
976 * Insert a multicast message fragment into the queue belonging to the message.
978 * @param chn Channel.
979 * @param mmsg Multicast message fragment.
980 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
981 * @param first_ptype First PSYC message part type in @a mmsg.
982 * @param last_ptype Last PSYC message part type in @a mmsg.
985 fragment_queue_insert (struct Channel *chn,
986 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
987 uint16_t first_ptype, uint16_t last_ptype)
989 const uint16_t size = ntohs (mmsg->header.size);
990 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
991 struct GNUNET_CONTAINER_MultiHashMap
992 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
995 struct GNUNET_HashCode msg_id_hash;
996 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
999 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1003 fragq = GNUNET_new (struct FragmentQueue);
1004 fragq->state = MSG_FRAG_STATE_HEADER;
1006 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1008 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1009 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1011 if (NULL == chan_msgs)
1013 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1014 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1015 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1019 struct GNUNET_HashCode frag_id_hash;
1020 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1021 struct RecvCacheEntry
1022 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1023 if (NULL == cache_entry)
1025 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1026 "%p Adding message fragment to cache. "
1027 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1028 chn, GNUNET_ntohll (mmsg->message_id),
1029 GNUNET_ntohll (mmsg->fragment_id));
1030 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1031 "%p header_size: %" PRIu64 " + %u\n",
1032 chn, fragq->header_size, size);
1033 cache_entry = GNUNET_new (struct RecvCacheEntry);
1034 cache_entry->ref_count = 1;
1035 cache_entry->mmsg = GNUNET_malloc (size);
1036 memcpy (cache_entry->mmsg, mmsg, size);
1037 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1038 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1042 cache_entry->ref_count++;
1043 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044 "%p Message fragment is already in cache. "
1045 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1046 ", ref_count: %u\n",
1047 chn, GNUNET_ntohll (mmsg->message_id),
1048 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1051 if (MSG_FRAG_STATE_HEADER == fragq->state)
1053 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1055 struct GNUNET_PSYC_MessageMethod *
1056 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1057 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1058 fragq->flags = ntohl (pmeth->flags);
1061 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1063 fragq->header_size += size;
1065 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1066 || frag_offset == fragq->header_size)
1067 { /* header is now complete */
1068 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1069 "%p Header of message %" PRIu64 " is complete.\n",
1070 chn, GNUNET_ntohll (mmsg->message_id));
1072 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1073 "%p Adding message %" PRIu64 " to queue.\n",
1074 chn, GNUNET_ntohll (mmsg->message_id));
1075 fragq->state = MSG_FRAG_STATE_DATA;
1079 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1080 "%p Header of message %" PRIu64 " is NOT complete yet: "
1081 "%" PRIu64 " != %" PRIu64 "\n",
1082 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1083 fragq->header_size);
1089 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1090 if (frag_offset == fragq->size)
1091 fragq->state = MSG_FRAG_STATE_END;
1093 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1094 "%p Message %" PRIu64 " is NOT complete yet: "
1095 "%" PRIu64 " != %" PRIu64 "\n",
1096 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1100 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1101 /* Drop message without delivering to client if it's a single fragment */
1103 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1104 ? MSG_FRAG_STATE_DROP
1105 : MSG_FRAG_STATE_CANCEL;
1108 switch (fragq->state)
1110 case MSG_FRAG_STATE_DATA:
1111 case MSG_FRAG_STATE_END:
1112 case MSG_FRAG_STATE_CANCEL:
1113 if (GNUNET_NO == fragq->is_queued)
1115 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1116 GNUNET_ntohll (mmsg->message_id));
1117 fragq->is_queued = GNUNET_YES;
1121 fragq->size += size;
1122 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1123 GNUNET_ntohll (mmsg->fragment_id));
1128 * Run fragment queue of a message.
1130 * Send fragments of a message in order to client, after all modifiers arrived
1133 * @param chn Channel.
1134 * @param msg_id ID of the message @a fragq belongs to.
1135 * @param fragq Fragment queue of the message.
1136 * @param drop Drop message without delivering to client?
1137 * #GNUNET_YES or #GNUNET_NO.
1140 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1141 struct FragmentQueue *fragq, uint8_t drop)
1143 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1144 "%p Running message fragment queue for message %" PRIu64
1146 chn, msg_id, fragq->state);
1148 struct GNUNET_CONTAINER_MultiHashMap
1149 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1150 &chn->pub_key_hash);
1151 GNUNET_assert (NULL != chan_msgs);
1154 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1157 struct GNUNET_HashCode frag_id_hash;
1158 hash_key_from_hll (&frag_id_hash, frag_id);
1159 struct RecvCacheEntry *cache_entry
1160 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1161 if (cache_entry != NULL)
1163 if (GNUNET_NO == drop)
1165 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1167 if (cache_entry->ref_count <= 1)
1169 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1171 GNUNET_free (cache_entry->mmsg);
1172 GNUNET_free (cache_entry);
1176 cache_entry->ref_count--;
1179 #if CACHE_AGING_IMPLEMENTED
1180 else if (GNUNET_NO == drop)
1182 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1186 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1189 if (MSG_FRAG_STATE_END <= fragq->state)
1191 struct GNUNET_HashCode msg_id_hash;
1192 hash_key_from_hll (&msg_id_hash, msg_id);
1194 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1195 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1196 GNUNET_free (fragq);
1200 fragq->is_queued = GNUNET_NO;
1206 * Run message queue.
1208 * Send messages in queue to client in order after a message has arrived from
1209 * multicast, according to the following:
1210 * - A message is only sent if all of its modifiers arrived.
1211 * - A stateful message is only sent if the previous stateful message
1212 * has already been delivered to the client.
1214 * @param chn Channel.
1216 * @return Number of messages removed from queue and sent to client.
1219 message_queue_run (struct Channel *chn)
1221 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1222 "%p Running message queue.\n", chn);
1225 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1228 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1229 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1230 struct GNUNET_HashCode msg_id_hash;
1231 hash_key_from_hll (&msg_id_hash, msg_id);
1233 struct FragmentQueue *
1234 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1236 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1238 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1239 "%p No fragq (%p) or header not complete.\n",
1244 if (MSG_FRAG_STATE_HEADER == fragq->state)
1246 /* Check if there's a missing message before the current one */
1247 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1249 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1250 && msg_id - 1 != chn->max_message_id)
1252 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1253 "%p Out of order message. "
1254 "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1255 chn, msg_id, chn->max_message_id);
1261 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1263 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1264 "%p Out of order stateful message. "
1265 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1266 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1270 /* FIXME: apply modifiers to state in PSYCstore */
1271 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
1272 store_recv_state_modify_result, cls);
1274 chn->max_state_message_id = msg_id;
1276 chn->max_message_id = msg_id;
1278 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1279 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1282 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1283 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1289 * Drop message queue of a channel.
1291 * Remove all messages in queue without sending it to clients.
1293 * @param chn Channel.
1295 * @return Number of messages removed from queue.
1298 message_queue_drop (struct Channel *chn)
1300 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1301 "%p Dropping message queue.\n", chn);
1304 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1307 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1308 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1309 struct GNUNET_HashCode msg_id_hash;
1310 hash_key_from_hll (&msg_id_hash, msg_id);
1312 struct FragmentQueue *
1313 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1315 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1316 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1319 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1320 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1326 * Received result of GNUNET_PSYCSTORE_fragment_store().
1329 store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg)
1331 struct Channel *chn = cls;
1332 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1333 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n",
1334 chn, result, err_msg);
1339 * Handle incoming message fragment from multicast.
1341 * Store it using PSYCstore and send it to the clients of the channel in order.
1344 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1346 struct Channel *chn = cls;
1347 uint16_t size = ntohs (mmsg->header.size);
1349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1350 "%p Received multicast message of size %u.\n",
1353 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1354 &store_recv_fragment_store_result, chn);
1356 uint16_t first_ptype = 0, last_ptype = 0;
1358 == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1359 (const char *) &mmsg[1],
1360 &first_ptype, &last_ptype))
1362 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1363 "%p Dropping incoming multicast message with invalid parts.\n",
1365 GNUNET_break_op (0);
1369 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1370 "Message parts: first: type %u, last: type %u\n",
1371 first_ptype, last_ptype);
1373 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1374 message_queue_run (chn);
1379 * Incoming request fragment from multicast for a master.
1381 * @param cls Master.
1382 * @param req The request.
1385 mcast_recv_request (void *cls,
1386 const struct GNUNET_MULTICAST_RequestHeader *req)
1388 struct Master *mst = cls;
1389 uint16_t size = ntohs (req->header.size);
1391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1392 "%p Received multicast request of size %u.\n",
1395 uint16_t first_ptype = 0, last_ptype = 0;
1397 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1398 (const char *) &req[1],
1399 &first_ptype, &last_ptype))
1401 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1402 "%p Dropping incoming multicast request with invalid parts.\n",
1404 GNUNET_break_op (0);
1408 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1409 "Message parts: first: type %u, last: type %u\n",
1410 first_ptype, last_ptype);
1412 /* FIXME: in-order delivery */
1413 client_send_mcast_req (mst, req);
1418 * Response from PSYCstore with the current counter values for a channel master.
1421 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1422 uint64_t max_message_id, uint64_t max_group_generation,
1423 uint64_t max_state_message_id)
1425 struct Master *mst = cls;
1426 struct Channel *chn = &mst->chn;
1427 chn->store_op = NULL;
1429 struct GNUNET_PSYC_CountersResultMessage res;
1430 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1431 res.header.size = htons (sizeof (res));
1432 res.result_code = htonl (result - INT32_MIN);
1433 res.max_message_id = GNUNET_htonll (max_message_id);
1435 if (GNUNET_OK == result || GNUNET_NO == result)
1437 mst->max_message_id = max_message_id;
1438 chn->max_message_id = max_message_id;
1439 chn->max_state_message_id = max_state_message_id;
1440 mst->max_group_generation = max_group_generation;
1442 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1443 &mcast_recv_join_request,
1444 &mcast_recv_membership_test,
1445 &mcast_recv_replay_fragment,
1446 &mcast_recv_replay_message,
1447 &mcast_recv_request,
1448 &mcast_recv_message, chn);
1449 chn->is_ready = GNUNET_YES;
1453 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1454 "%p GNUNET_PSYCSTORE_counters_get() "
1455 "returned %d for channel %s.\n",
1456 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1459 client_send_msg (chn, &res.header);
1464 * Response from PSYCstore with the current counter values for a channel slave.
1467 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1468 uint64_t max_message_id, uint64_t max_group_generation,
1469 uint64_t max_state_message_id)
1471 struct Slave *slv = cls;
1472 struct Channel *chn = &slv->chn;
1473 chn->store_op = NULL;
1475 struct GNUNET_PSYC_CountersResultMessage res;
1476 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1477 res.header.size = htons (sizeof (res));
1478 res.result_code = htonl (result - INT32_MIN);
1479 res.max_message_id = GNUNET_htonll (max_message_id);
1481 if (GNUNET_OK == result || GNUNET_NO == result)
1483 chn->max_message_id = max_message_id;
1484 chn->max_state_message_id = max_state_message_id;
1486 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1488 slv->relay_count, slv->relays,
1489 &slv->join_msg->header,
1490 &mcast_recv_join_request,
1491 &mcast_recv_join_decision,
1492 &mcast_recv_membership_test,
1493 &mcast_recv_replay_fragment,
1494 &mcast_recv_replay_message,
1495 &mcast_recv_message, chn);
1496 if (NULL != slv->join_msg)
1498 GNUNET_free (slv->join_msg);
1499 slv->join_msg = NULL;
1504 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1505 "%p GNUNET_PSYCSTORE_counters_get() "
1506 "returned %d for channel %s.\n",
1507 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1510 client_send_msg (chn, &res.header);
1515 channel_init (struct Channel *chn)
1518 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1519 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1524 * Handle a connecting client starting a channel master.
1527 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1528 const struct GNUNET_MessageHeader *msg)
1530 const struct MasterStartRequest *req
1531 = (const struct MasterStartRequest *) msg;
1533 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1534 struct GNUNET_HashCode pub_key_hash;
1536 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1537 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1540 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1541 struct Channel *chn;
1545 mst = GNUNET_new (struct Master);
1546 mst->policy = ntohl (req->policy);
1547 mst->priv_key = req->channel_key;
1548 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1551 chn->is_master = GNUNET_YES;
1552 chn->pub_key = pub_key;
1553 chn->pub_key_hash = pub_key_hash;
1556 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1557 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1558 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1559 store_recv_master_counters, mst);
1565 struct GNUNET_PSYC_CountersResultMessage res;
1566 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1567 res.header.size = htons (sizeof (res));
1568 res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN);
1569 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1571 GNUNET_SERVER_notification_context_add (nc, client);
1572 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1576 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1577 "%p Client connected as master to channel %s.\n",
1578 mst, GNUNET_h2s (&chn->pub_key_hash));
1580 struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1581 cli->client = client;
1582 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1584 GNUNET_SERVER_client_set_user_context (client, chn);
1585 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1590 * Handle a connecting client joining as a channel slave.
1593 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1594 const struct GNUNET_MessageHeader *msg)
1596 const struct SlaveJoinRequest *req
1597 = (const struct SlaveJoinRequest *) msg;
1598 uint16_t req_size = ntohs (req->header.size);
1600 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1601 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1603 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1604 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1605 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1607 struct GNUNET_CONTAINER_MultiHashMap *
1608 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1609 struct Slave *slv = NULL;
1610 struct Channel *chn;
1612 if (NULL != chn_slv)
1614 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1618 slv = GNUNET_new (struct Slave);
1619 slv->priv_key = req->slave_key;
1620 slv->pub_key = slv_pub_key;
1621 slv->pub_key_hash = slv_pub_key_hash;
1622 slv->origin = req->origin;
1623 slv->relay_count = ntohl (req->relay_count);
1625 const struct GNUNET_PeerIdentity *
1626 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1627 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1628 uint16_t join_msg_size = 0;
1630 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1633 join_msg_size = ntohs (slv->join_msg->header.size);
1634 slv->join_msg = GNUNET_malloc (join_msg_size);
1635 memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
1637 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1639 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1640 "%u + %u + %u != %u\n",
1641 sizeof (*req), relay_size, join_msg_size, req_size);
1643 GNUNET_SERVER_client_disconnect (client);
1646 if (0 < slv->relay_count)
1648 slv->relays = GNUNET_malloc (relay_size);
1649 memcpy (slv->relays, &req[1], relay_size);
1653 chn->is_master = GNUNET_NO;
1654 chn->pub_key = req->channel_key;
1655 chn->pub_key_hash = pub_key_hash;
1658 if (NULL == chn_slv)
1660 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1661 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1662 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1664 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1665 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1666 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1667 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1668 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1669 &store_recv_slave_counters, slv);
1675 struct GNUNET_PSYC_CountersResultMessage res;
1676 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1677 res.header.size = htons (sizeof (res));
1678 res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN);
1679 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1681 GNUNET_SERVER_notification_context_add (nc, client);
1682 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1685 if (NULL == slv->member)
1688 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1690 slv->relay_count, slv->relays,
1691 &slv->join_msg->header,
1692 &mcast_recv_join_request,
1693 &mcast_recv_join_decision,
1694 &mcast_recv_membership_test,
1695 &mcast_recv_replay_fragment,
1696 &mcast_recv_replay_message,
1697 &mcast_recv_message, chn);
1698 if (NULL != slv->join_msg)
1700 GNUNET_free (slv->join_msg);
1701 slv->join_msg = NULL;
1704 else if (NULL != slv->join_dcsn)
1706 GNUNET_SERVER_notification_context_add (nc, client);
1707 GNUNET_SERVER_notification_context_unicast (nc, client,
1708 &slv->join_dcsn->header,
1713 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1714 "%p Client connected as slave to channel %s.\n",
1715 slv, GNUNET_h2s (&chn->pub_key_hash));
1717 struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1718 cli->client = client;
1719 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1721 GNUNET_SERVER_client_set_user_context (client, chn);
1722 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1726 struct JoinDecisionClosure
1728 int32_t is_admitted;
1729 struct GNUNET_MessageHeader *msg;
1734 * Iterator callback for sending join decisions to multicast.
1737 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1740 struct JoinDecisionClosure *jcls = cls;
1741 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1742 // FIXME: add relays
1743 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1749 * Join decision from client.
1752 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1753 const struct GNUNET_MessageHeader *msg)
1756 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1757 GNUNET_assert (GNUNET_YES == chn->is_master);
1758 struct Master *mst = (struct Master *) chn;
1760 struct GNUNET_PSYC_JoinDecisionMessage *
1761 dcsn = (struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1762 struct JoinDecisionClosure jcls;
1763 jcls.is_admitted = ntohl (dcsn->is_admitted);
1765 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1766 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1769 struct GNUNET_HashCode slave_key_hash;
1770 GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1773 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1774 "%p Got join decision (%d) from client for channel %s..\n",
1775 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1776 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1777 "%p ..and slave %s.\n",
1778 mst, GNUNET_h2s (&slave_key_hash));
1780 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1781 &mcast_send_join_decision, &jcls);
1782 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1783 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1788 * Send acknowledgement to a client.
1790 * Sent after a message fragment has been passed on to multicast.
1792 * @param chn The channel struct for the client.
1795 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1797 struct GNUNET_MessageHeader res;
1798 res.size = htons (sizeof (res));
1799 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1802 GNUNET_SERVER_notification_context_add (nc, client);
1803 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1808 * Callback for the transmit functions of multicast.
1811 transmit_notify (void *cls, size_t *data_size, void *data)
1813 struct Channel *chn = cls;
1814 struct TransmitMessage *tmit_msg = chn->tmit_head;
1816 if (NULL == tmit_msg || *data_size < tmit_msg->size)
1818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1819 "%p transmit_notify: nothing to send.\n", chn);
1824 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1825 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1827 *data_size = tmit_msg->size;
1828 memcpy (data, &tmit_msg[1], *data_size);
1830 int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1832 if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
1833 send_message_ack (chn, tmit_msg->client);
1835 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1836 GNUNET_free (tmit_msg);
1838 if (NULL != chn->tmit_head)
1840 transmit_message (chn);
1842 else if (GNUNET_YES == chn->is_disconnected)
1844 /* FIXME: handle partial message (when still in_transmit) */
1845 cleanup_channel (chn);
1852 * Callback for the transmit functions of multicast.
1855 master_transmit_notify (void *cls, size_t *data_size, void *data)
1857 int ret = transmit_notify (cls, data_size, data);
1859 if (GNUNET_YES == ret)
1861 struct Master *mst = cls;
1862 mst->tmit_handle = NULL;
1869 * Callback for the transmit functions of multicast.
1872 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1874 int ret = transmit_notify (cls, data_size, data);
1876 if (GNUNET_YES == ret)
1878 struct Slave *slv = cls;
1879 slv->tmit_handle = NULL;
1886 * Transmit a message from a channel master to the multicast group.
1889 master_transmit_message (struct Master *mst)
1891 if (NULL == mst->tmit_handle)
1894 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1895 mst->max_group_generation,
1896 master_transmit_notify, mst);
1900 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1906 * Transmit a message from a channel slave to the multicast group.
1909 slave_transmit_message (struct Slave *slv)
1911 if (NULL == slv->tmit_handle)
1914 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1915 slave_transmit_notify, slv);
1919 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1925 transmit_message (struct Channel *chn)
1928 ? master_transmit_message ((struct Master *) chn)
1929 : slave_transmit_message ((struct Slave *) chn);
1934 * Queue a message from a channel master for sending to the multicast group.
1937 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1938 uint16_t first_ptype, uint16_t last_ptype)
1940 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1942 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1944 tmit_msg->id = ++mst->max_message_id;
1945 struct GNUNET_PSYC_MessageMethod *pmeth
1946 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1948 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1950 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1952 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1954 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1955 - mst->max_state_message_id);
1959 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1966 * Queue a message from a channel slave for sending to the multicast group.
1969 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1970 uint16_t first_ptype, uint16_t last_ptype)
1972 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1974 struct GNUNET_PSYC_MessageMethod *pmeth
1975 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1976 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1977 tmit_msg->id = ++slv->max_request_id;
1983 * Queue PSYC message parts for sending to multicast.
1985 * @param chn Channel to send to.
1986 * @param client Client the message originates from.
1987 * @param data_size Size of @a data.
1988 * @param data Concatenated message parts.
1989 * @param first_ptype First message part type in @a data.
1990 * @param last_ptype Last message part type in @a data.
1992 static struct TransmitMessage *
1993 queue_message (struct Channel *chn,
1994 struct GNUNET_SERVER_Client *client,
1997 uint16_t first_ptype, uint16_t last_ptype)
1999 struct TransmitMessage *
2000 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2001 memcpy (&tmit_msg[1], data, data_size);
2002 tmit_msg->client = client;
2003 tmit_msg->size = data_size;
2004 tmit_msg->state = chn->tmit_state;
2006 /* FIXME: separate queue per message ID */
2008 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2011 ? master_queue_message ((struct Master *) chn, tmit_msg,
2012 first_ptype, last_ptype)
2013 : slave_queue_message ((struct Slave *) chn, tmit_msg,
2014 first_ptype, last_ptype);
2020 * Cancel transmission of current message.
2022 * @param chn Channel to send to.
2023 * @param client Client the message originates from.
2026 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2028 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2030 struct GNUNET_MessageHeader msg;
2031 msg.size = htons (sizeof (msg));
2032 msg.type = htons (type);
2034 queue_message (chn, client, sizeof (msg), &msg, type, type);
2035 transmit_message (chn);
2037 /* FIXME: cleanup */
2042 * Incoming message from a master or slave client.
2045 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2046 const struct GNUNET_MessageHeader *msg)
2049 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2050 GNUNET_assert (NULL != chn);
2052 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2053 "%p Received message from client.\n", chn);
2054 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2056 if (GNUNET_YES != chn->is_ready)
2058 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2059 "%p Channel is not ready yet, disconnecting client.\n", chn);
2061 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2065 uint16_t size = ntohs (msg->size);
2066 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2068 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
2070 transmit_cancel (chn, client);
2071 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2075 uint16_t first_ptype = 0, last_ptype = 0;
2077 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2078 (const char *) &msg[1],
2079 &first_ptype, &last_ptype))
2081 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2082 "%p Received invalid message part from client.\n", chn);
2084 transmit_cancel (chn, client);
2085 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2088 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2089 "%p Received message with first part type %u and last part type %u.\n",
2090 chn, first_ptype, last_ptype);
2092 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2093 first_ptype, last_ptype);
2094 transmit_message (chn);
2095 /* FIXME: send a few ACKs even before transmit_notify is called */
2097 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2101 struct MembershipStoreClosure
2103 struct GNUNET_SERVER_Client *client;
2104 struct Channel *chn;
2110 * Received result of GNUNET_PSYCSTORE_membership_store()
2113 store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg)
2115 struct MembershipStoreClosure *mcls = cls;
2116 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2117 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n",
2118 mcls->chn, result, err_msg);
2120 client_send_result (mcls->client, mcls->op_id, result, err_msg);
2125 * Client requests to add/remove a slave in the membership database.
2128 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2129 const struct GNUNET_MessageHeader *msg)
2132 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2133 GNUNET_assert (NULL != chn);
2135 const struct ChannelMembershipStoreRequest *
2136 req = (const struct ChannelMembershipStoreRequest *) msg;
2138 struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls));
2139 mcls->client = client;
2141 mcls->op_id = req->op_id;
2143 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2144 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2145 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2146 "%p Received membership store request from client.\n", chn);
2147 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2148 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2149 chn, req->did_join, announced_at, effective_since);
2151 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2152 req->did_join, announced_at, effective_since,
2153 0, /* FIXME: group_generation */
2154 &store_recv_membership_store_result, mcls);
2155 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2160 store_recv_fragment_history (void *cls,
2161 struct GNUNET_MULTICAST_MessageHeader *msg,
2162 enum GNUNET_PSYCSTORE_MessageFlags flags)
2164 struct OperationClosure *opcls = cls;
2165 struct Channel *chn = opcls->chn;
2166 client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC);
2172 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
2175 store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg)
2177 struct OperationClosure *opcls = cls;
2178 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2179 "%p History replay #%" PRIu64 ": "
2180 "PSYCSTORE returned %" PRId64 " (%s)\n",
2181 opcls->chn, opcls->op_id, result, err_msg);
2183 client_send_result (opcls->client, opcls->op_id, result, err_msg);
2188 * Client requests channel history from PSYCstore.
2191 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2192 const struct GNUNET_MessageHeader *msg)
2195 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2196 GNUNET_assert (NULL != chn);
2198 const struct HistoryRequest *
2199 req = (const struct HistoryRequest *) msg;
2201 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2202 opcls->client = client;
2204 opcls->op_id = req->op_id;
2206 if (0 == req->message_limit)
2207 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2208 GNUNET_ntohll (req->start_message_id),
2209 GNUNET_ntohll (req->end_message_id),
2210 &store_recv_fragment_history,
2211 &store_recv_fragment_history_result, opcls);
2213 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2214 GNUNET_ntohll (req->message_limit),
2215 &store_recv_fragment_history,
2216 &store_recv_fragment_history_result,
2219 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2224 * Received state var from PSYCstore, send it to client.
2227 store_recv_state_var (void *cls, const char *name,
2228 const void *value, size_t value_size)
2230 struct OperationClosure *opcls = cls;
2231 struct OperationResult *op;
2235 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2236 struct GNUNET_PSYC_MessageModifier *mod;
2237 op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size);
2238 op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size);
2239 op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2240 op->op_id = opcls->op_id;
2242 mod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
2243 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2244 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2245 mod->name_size = htons (name_size);
2246 mod->value_size = htonl (value_size);
2247 mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2248 memcpy (&mod[1], name, name_size);
2249 memcpy (((char *) &mod[1]) + name_size, value, value_size);
2253 struct GNUNET_MessageHeader *mod;
2254 op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size);
2255 op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size);
2256 op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2257 op->op_id = opcls->op_id;
2259 mod = (struct GNUNET_MessageHeader *) &op[1];
2260 mod->size = htons (sizeof (*mod) + value_size);
2261 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2262 memcpy (&mod[1], value, value_size);
2265 GNUNET_SERVER_notification_context_add (nc, opcls->client);
2266 GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header,
2273 * Received result of GNUNET_PSYCSTORE_state_get()
2274 * or GNUNET_PSYCSTORE_state_get_prefix()
2277 store_recv_state_result (void *cls, int64_t result, const char *err_msg)
2279 struct OperationClosure *opcls = cls;
2280 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2281 "%p History replay #%" PRIu64 ": "
2282 "PSYCSTORE returned %" PRId64 " (%s)\n",
2283 opcls->chn, opcls->op_id, result, err_msg);
2285 client_send_result (opcls->client, opcls->op_id, result, err_msg);
2290 * Client requests best matching state variable from PSYCstore.
2293 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2294 const struct GNUNET_MessageHeader *msg)
2297 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2298 GNUNET_assert (NULL != chn);
2300 const struct StateRequest *
2301 req = (const struct StateRequest *) msg;
2303 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2304 const char *name = (const char *) &req[1];
2305 if (0 == name_size || '\0' != name[name_size - 1])
2307 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2311 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2312 opcls->client = client;
2314 opcls->op_id = req->op_id;
2316 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2317 &store_recv_state_var,
2318 &store_recv_state_result, opcls);
2319 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2324 * Client requests state variables with a given prefix from PSYCstore.
2327 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2328 const struct GNUNET_MessageHeader *msg)
2331 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2332 GNUNET_assert (NULL != chn);
2334 const struct StateRequest *
2335 req = (const struct StateRequest *) msg;
2337 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2338 const char *name = (const char *) &req[1];
2339 if (0 == name_size || '\0' != name[name_size - 1])
2341 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2345 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2346 opcls->client = client;
2348 opcls->op_id = req->op_id;
2350 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2351 &store_recv_state_var,
2352 &store_recv_state_result, opcls);
2353 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2358 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2359 { &client_recv_master_start, NULL,
2360 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2362 { &client_recv_slave_join, NULL,
2363 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2365 { &client_recv_join_decision, NULL,
2366 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2368 { &client_recv_psyc_message, NULL,
2369 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2371 { &client_recv_membership_store, NULL,
2372 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2374 { &client_recv_history_replay, NULL,
2375 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2377 { &client_recv_state_get, NULL,
2378 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2380 { &client_recv_state_get_prefix, NULL,
2381 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2383 { NULL, NULL, 0, 0 }
2388 * Initialize the PSYC service.
2390 * @param cls Closure.
2391 * @param server The initialized server.
2392 * @param c Configuration to use.
2395 run (void *cls, struct GNUNET_SERVER_Handle *server,
2396 const struct GNUNET_CONFIGURATION_Handle *c)
2399 store = GNUNET_PSYCSTORE_connect (cfg);
2400 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2401 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2402 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2403 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2404 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2405 nc = GNUNET_SERVER_notification_context_create (server, 1);
2406 GNUNET_SERVER_add_handlers (server, server_handlers);
2407 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2408 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2409 &shutdown_task, NULL);
2414 * The main function for the service.
2416 * @param argc number of arguments from the command line
2417 * @param argv command line arguments
2418 * @return 0 ok, 1 on error
2421 main (int argc, char *const *argv)
2423 return (GNUNET_OK ==
2424 GNUNET_SERVICE_run (argc, argv, "psyc",
2425 GNUNET_SERVICE_OPTION_NONE,
2426 &run, NULL)) ? 0 : 1;
2429 /* end of gnunet-service-psyc.c */