3 * This file is part of GNUnet
4 * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
6 * GNUnet is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published
8 * by the Free Software Foundation; either version 3, or (at your
9 * option) any later version.
11 * GNUnet is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with GNUnet; see the file COPYING. If not, write to the
18 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
23 * @file psyc/gnunet-service-psyc.c
25 * @author Gabor X Toth
31 #include "gnunet_util_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_statistics_service.h"
35 #include "gnunet_multicast_service.h"
36 #include "gnunet_psycstore_service.h"
37 #include "gnunet_psyc_service.h"
38 #include "gnunet_psyc_util_lib.h"
43 * Handle to our current configuration.
45 static const struct GNUNET_CONFIGURATION_Handle *cfg;
48 * Handle to the statistics service.
50 static struct GNUNET_STATISTICS_Handle *stats;
53 * Notification context, simplifies client broadcasts.
55 static struct GNUNET_SERVER_NotificationContext *nc;
58 * Handle to the PSYCstore.
60 static struct GNUNET_PSYCSTORE_Handle *store;
63 * All connected masters.
64 * Channel's pub_key_hash -> struct Master
66 static struct GNUNET_CONTAINER_MultiHashMap *masters;
69 * All connected slaves.
70 * Channel's pub_key_hash -> struct Slave
72 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
75 * Connected slaves per channel.
76 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
78 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
82 * Message in the transmission queue.
84 struct TransmitMessage
86 struct TransmitMessage *prev;
87 struct TransmitMessage *next;
89 struct GNUNET_SERVER_Client *client;
92 * ID assigned to the message.
102 * @see enum MessageState
107 * Whether a message ACK has already been sent to the client.
108 * #GNUNET_YES or #GNUNET_NO
112 /* Followed by message */
117 * Cache for received message fragments.
118 * Message fragments are only sent to clients after all modifiers arrived.
120 * chan_key -> MultiHashMap chan_msgs
122 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
126 * Entry in the chan_msgs hashmap of @a recv_cache:
127 * fragment_id -> RecvCacheEntry
129 struct RecvCacheEntry
131 struct GNUNET_MULTICAST_MessageHeader *mmsg;
137 * Entry in the @a recv_frags hash map of a @a Channel.
138 * message_id -> FragmentQueue
143 * Fragment IDs stored in @a recv_cache.
145 struct GNUNET_CONTAINER_Heap *fragments;
148 * Total size of received fragments.
153 * Total size of received header fragments (METHOD & MODIFIERs)
155 uint64_t header_size;
158 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
160 uint64_t state_delta;
163 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
168 * Receive state of message.
170 * @see MessageFragmentState
175 * Whether the state is already modified in PSYCstore.
177 uint8_t state_is_modified;
180 * Is the message queued for delivery to the client?
181 * i.e. added to the recv_msgs queue
188 * List of connected clients.
195 struct GNUNET_SERVER_Client *client;
201 struct Operation *prev;
202 struct Operation *next;
204 struct GNUNET_SERVER_Client *client;
212 * Common part of the client context for both a channel master and slave.
216 struct Client *clients_head;
217 struct Client *clients_tail;
219 struct Operation *op_head;
220 struct Operation *op_tail;
222 struct TransmitMessage *tmit_head;
223 struct TransmitMessage *tmit_tail;
226 * Current PSYCstore operation.
228 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
231 * Received fragments not yet sent to the client.
232 * message_id -> FragmentQueue
234 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
237 * Received message IDs not yet sent to the client.
239 struct GNUNET_CONTAINER_Heap *recv_msgs;
242 * Public key of the channel.
244 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
247 * Hash of @a pub_key.
249 struct GNUNET_HashCode pub_key_hash;
252 * Last message ID sent to the client.
253 * 0 if there is no such message.
255 uint64_t max_message_id;
258 * ID of the last stateful message, where the state operations has been
259 * processed and saved to PSYCstore and which has been sent to the client.
260 * 0 if there is no such message.
262 uint64_t max_state_message_id;
265 * Expected value size for the modifier being received from the PSYC service.
267 uint32_t tmit_mod_value_size_expected;
270 * Actual value size for the modifier being received from the PSYC service.
272 uint32_t tmit_mod_value_size;
275 * @see enum MessageState
280 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
285 * Is this channel ready to receive messages from client?
286 * #GNUNET_YES or #GNUNET_NO
291 * Is the client disconnected?
292 * #GNUNET_YES or #GNUNET_NO
294 uint8_t is_disconnected;
299 * Client context for a channel master.
304 * Channel struct common for Master and Slave
309 * Private key of the channel.
311 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
314 * Handle for the multicast origin.
316 struct GNUNET_MULTICAST_Origin *origin;
319 * Transmit handle for multicast.
321 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
324 * Incoming join requests from multicast.
325 * member_key -> struct GNUNET_MULTICAST_JoinHandle *
327 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
330 * Last message ID transmitted to this channel.
332 * Incremented before sending a message, thus the message_id in messages sent
335 uint64_t max_message_id;
338 * ID of the last message with state operations transmitted to the channel.
339 * 0 if there is no such message.
341 uint64_t max_state_message_id;
344 * Maximum group generation transmitted to the channel.
346 uint64_t max_group_generation;
349 * @see enum GNUNET_PSYC_Policy
351 enum GNUNET_PSYC_Policy policy;
356 * Client context for a channel slave.
361 * Channel struct common for Master and Slave
366 * Private key of the slave.
368 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
371 * Public key of the slave.
373 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
376 * Hash of @a pub_key.
378 struct GNUNET_HashCode pub_key_hash;
381 * Handle for the multicast member.
383 struct GNUNET_MULTICAST_Member *member;
386 * Transmit handle for multicast.
388 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
391 * Peer identity of the origin.
393 struct GNUNET_PeerIdentity origin;
396 * Number of items in @a relays.
398 uint32_t relay_count;
401 * Relays that multicast can use to connect.
403 struct GNUNET_PeerIdentity *relays;
406 * Join request to be transmitted to the master on join.
408 struct GNUNET_PSYC_Message *join_msg;
411 * Join decision received from multicast.
413 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
416 * Maximum request ID for this channel.
418 uint64_t max_request_id;
423 transmit_message (struct Channel *chn);
426 message_queue_run (struct Channel *chn);
429 message_queue_drop (struct Channel *chn);
433 * Task run during shutdown.
439 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
443 GNUNET_SERVER_notification_context_destroy (nc);
448 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
454 static struct Operation *
455 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
456 uint64_t op_id, uint32_t flags)
458 struct Operation *op = GNUNET_malloc (sizeof (*op));
463 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
469 op_remove (struct Operation *op)
471 GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
477 * Clean up master data structures after a client disconnected.
480 cleanup_master (struct Master *mst)
482 struct Channel *chn = &mst->chn;
484 if (NULL != mst->origin)
485 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
486 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
487 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
492 * Clean up slave data structures after a client disconnected.
495 cleanup_slave (struct Slave *slv)
497 struct Channel *chn = &slv->chn;
498 struct GNUNET_CONTAINER_MultiHashMap *
499 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
501 GNUNET_assert (NULL != chn_slv);
502 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
504 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
506 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
508 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
510 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
512 if (NULL != slv->join_msg)
514 GNUNET_free (slv->join_msg);
515 slv->join_msg = NULL;
517 if (NULL != slv->relays)
519 GNUNET_free (slv->relays);
522 if (NULL != slv->member)
524 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
527 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
532 * Clean up channel data structures after a client disconnected.
535 cleanup_channel (struct Channel *chn)
537 message_queue_drop (chn);
538 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
540 if (NULL != chn->store_op)
542 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
543 chn->store_op = NULL;
546 (GNUNET_YES == chn->is_master)
547 ? cleanup_master ((struct Master *) chn)
548 : cleanup_slave ((struct Slave *) chn);
554 * Called whenever a client is disconnected.
555 * Frees our resources associated with that client.
557 * @param cls Closure.
558 * @param client Identification of the client.
561 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
567 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
571 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
572 "%p User context is NULL in client_disconnect()\n", chn);
577 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
578 "%p Client (%s) disconnected from channel %s\n",
579 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
580 GNUNET_h2s (&chn->pub_key_hash));
582 struct Client *cli = chn->clients_head;
585 if (cli->client == client)
587 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
594 struct Operation *op = chn->op_head;
597 if (op->client == client)
605 if (NULL == chn->clients_head)
606 { /* Last client disconnected. */
607 if (NULL != chn->tmit_head)
608 { /* Send pending messages to multicast before cleanup. */
609 transmit_message (chn);
613 cleanup_channel (chn);
620 * Send message to all clients connected to the channel.
623 client_send_msg (const struct Channel *chn,
624 const struct GNUNET_MessageHeader *msg)
626 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
627 "%p Sending message to clients.\n", chn);
629 struct Client *cli = chn->clients_head;
632 GNUNET_SERVER_notification_context_add (nc, cli->client);
633 GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
640 * Send a result code back to the client.
643 * Client that should receive the result code.
647 * Operation ID in network byte order.
649 * Data payload or NULL.
654 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
655 int64_t result_code, const void *data, uint16_t data_size)
657 struct GNUNET_OperationResultMessage *res;
659 res = GNUNET_malloc (sizeof (*res) + data_size);
660 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
661 res->header.size = htons (sizeof (*res) + data_size);
662 res->result_code = GNUNET_htonll (result_code);
665 memcpy (&res[1], data, data_size);
667 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
668 "%p Sending result to client for operation #%" PRIu64 ": "
669 "%" PRId64 " (size: %u)\n",
670 client, GNUNET_ntohll (op_id), result_code, data_size);
672 GNUNET_SERVER_notification_context_add (nc, client);
673 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
680 * Closure for join_mem_test_cb()
682 struct JoinMemTestClosure
684 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
686 struct GNUNET_MULTICAST_JoinHandle *jh;
687 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
692 * Membership test result callback used for join requests.
695 join_mem_test_cb (void *cls, int64_t result,
696 const char *err_msg, uint16_t err_msg_size)
698 struct JoinMemTestClosure *jcls = cls;
700 if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
701 { /* Pass on join request to client if this is a master channel */
702 struct Master *mst = (struct Master *) jcls->chn;
703 struct GNUNET_HashCode slave_key_hash;
704 GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
706 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
707 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
708 client_send_msg (jcls->chn, &jcls->join_msg->header);
712 if (GNUNET_SYSERR == result)
714 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
715 "Could not perform membership test (%.*s)\n",
716 err_msg_size, err_msg);
719 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
721 GNUNET_free (jcls->join_msg);
727 * Incoming join request from multicast.
730 mcast_recv_join_request (void *cls,
731 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
732 const struct GNUNET_MessageHeader *join_msg,
733 struct GNUNET_MULTICAST_JoinHandle *jh)
735 struct Channel *chn = cls;
736 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
738 uint16_t join_msg_size = 0;
739 if (NULL != join_msg)
741 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
743 join_msg_size = ntohs (join_msg->size);
747 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
748 "%p Got join message with invalid type %u.\n",
749 chn, ntohs (join_msg->type));
753 struct GNUNET_PSYC_JoinRequestMessage *
754 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
755 req->header.size = htons (sizeof (*req) + join_msg_size);
756 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
757 req->slave_key = *slave_key;
758 if (0 < join_msg_size)
759 memcpy (&req[1], join_msg, join_msg_size);
761 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
762 jcls->slave_key = *slave_key;
765 jcls->join_msg = req;
767 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
768 chn->max_message_id, 0,
769 &join_mem_test_cb, jcls);
774 * Join decision received from multicast.
777 mcast_recv_join_decision (void *cls, int is_admitted,
778 const struct GNUNET_PeerIdentity *peer,
779 uint16_t relay_count,
780 const struct GNUNET_PeerIdentity *relays,
781 const struct GNUNET_MessageHeader *join_resp)
783 struct Slave *slv = cls;
784 struct Channel *chn = &slv->chn;
785 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786 "%p Got join decision: %d\n", slv, is_admitted);
788 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
789 struct GNUNET_PSYC_JoinDecisionMessage *
790 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
791 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
792 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
793 dcsn->is_admitted = htonl (is_admitted);
794 if (0 < join_resp_size)
795 memcpy (&dcsn[1], join_resp, join_resp_size);
797 client_send_msg (chn, &dcsn->header);
799 if (GNUNET_YES == is_admitted)
801 chn->is_ready = GNUNET_YES;
811 * Received result of GNUNET_PSYCSTORE_membership_test()
814 store_recv_membership_test_result (void *cls, int64_t result,
815 const char *err_msg, uint16_t err_msg_size)
817 struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819 "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%.*s)\n",
820 mth, result, err_msg_size, err_msg);
822 GNUNET_MULTICAST_membership_test_result (mth, result);
827 * Incoming membership test request from multicast.
830 mcast_recv_membership_test (void *cls,
831 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
832 uint64_t message_id, uint64_t group_generation,
833 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
835 struct Channel *chn = cls;
836 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
837 "%p Received membership test request from multicast.\n",
839 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
840 message_id, group_generation,
841 &store_recv_membership_test_result, mth);
846 store_recv_fragment_replay (void *cls,
847 struct GNUNET_MULTICAST_MessageHeader *msg,
848 enum GNUNET_PSYCSTORE_MessageFlags flags)
850 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
852 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
858 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
861 store_recv_fragment_replay_result (void *cls, int64_t result,
862 const char *err_msg, uint16_t err_msg_size)
864 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
865 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
866 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
867 rh, result, err_msg_size, err_msg);
875 GNUNET_MULTICAST_replay_response (rh, NULL,
876 GNUNET_MULTICAST_REC_NOT_FOUND);
879 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
880 GNUNET_MULTICAST_replay_response (rh, NULL,
881 GNUNET_MULTICAST_REC_ACCESS_DENIED);
885 GNUNET_MULTICAST_replay_response (rh, NULL,
886 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
889 GNUNET_MULTICAST_replay_response_end (rh);
894 * Incoming fragment replay request from multicast.
897 mcast_recv_replay_fragment (void *cls,
898 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
899 uint64_t fragment_id, uint64_t flags,
900 struct GNUNET_MULTICAST_ReplayHandle *rh)
903 struct Channel *chn = cls;
904 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
905 fragment_id, fragment_id,
906 &store_recv_fragment_replay,
907 &store_recv_fragment_replay_result, rh);
912 * Incoming message replay request from multicast.
915 mcast_recv_replay_message (void *cls,
916 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
918 uint64_t fragment_offset,
920 struct GNUNET_MULTICAST_ReplayHandle *rh)
922 struct Channel *chn = cls;
923 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
924 message_id, message_id, NULL,
925 &store_recv_fragment_replay,
926 &store_recv_fragment_replay_result, rh);
931 * Convert an uint64_t in network byte order to a HashCode
932 * that can be used as key in a MultiHashMap
935 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
937 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
938 /* TODO: use built-in byte swap functions if available */
940 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
941 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
943 *key = (struct GNUNET_HashCode) {};
945 = (n << 32) | (n >> 32);
950 * Convert an uint64_t in host byte order to a HashCode
951 * that can be used as key in a MultiHashMap
954 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
956 #if __BYTE_ORDER == __BIG_ENDIAN
957 hash_key_from_nll (key, n);
958 #elif __BYTE_ORDER == __LITTLE_ENDIAN
959 *key = (struct GNUNET_HashCode) {};
960 *((uint64_t *) key) = n;
962 #error byteorder undefined
968 * Initialize PSYC message header.
971 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
972 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
974 uint16_t size = ntohs (mmsg->header.size);
975 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
977 pmsg->header.size = htons (psize);
978 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
979 pmsg->message_id = mmsg->message_id;
980 pmsg->fragment_offset = mmsg->fragment_offset;
981 pmsg->flags = htonl (flags);
983 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
988 * Create a new PSYC message from a multicast message for sending it to clients.
990 static inline struct GNUNET_PSYC_MessageHeader *
991 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
993 struct GNUNET_PSYC_MessageHeader *pmsg;
994 uint16_t size = ntohs (mmsg->header.size);
995 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
997 pmsg = GNUNET_malloc (psize);
998 psyc_msg_init (pmsg, mmsg, flags);
1004 * Send multicast message to all clients connected to the channel.
1007 client_send_mcast_msg (struct Channel *chn,
1008 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1011 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1012 "%p Sending multicast message to client. "
1013 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1014 chn, GNUNET_ntohll (mmsg->fragment_id),
1015 GNUNET_ntohll (mmsg->message_id));
1017 struct GNUNET_PSYC_MessageHeader *
1018 pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1019 client_send_msg (chn, &pmsg->header);
1025 * Send multicast request to all clients connected to the channel.
1028 client_send_mcast_req (struct Master *mst,
1029 const struct GNUNET_MULTICAST_RequestHeader *req)
1031 struct Channel *chn = &mst->chn;
1033 struct GNUNET_PSYC_MessageHeader *pmsg;
1034 uint16_t size = ntohs (req->header.size);
1035 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038 "%p Sending multicast request to client. "
1039 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1040 chn, GNUNET_ntohll (req->fragment_id),
1041 GNUNET_ntohll (req->request_id));
1043 pmsg = GNUNET_malloc (psize);
1044 pmsg->header.size = htons (psize);
1045 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1046 pmsg->message_id = req->request_id;
1047 pmsg->fragment_offset = req->fragment_offset;
1048 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1050 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1051 client_send_msg (chn, &pmsg->header);
1057 * Insert a multicast message fragment into the queue belonging to the message.
1059 * @param chn Channel.
1060 * @param mmsg Multicast message fragment.
1061 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
1062 * @param first_ptype First PSYC message part type in @a mmsg.
1063 * @param last_ptype Last PSYC message part type in @a mmsg.
1066 fragment_queue_insert (struct Channel *chn,
1067 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1068 uint16_t first_ptype, uint16_t last_ptype)
1070 const uint16_t size = ntohs (mmsg->header.size);
1071 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1072 struct GNUNET_CONTAINER_MultiHashMap
1073 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1074 &chn->pub_key_hash);
1076 struct GNUNET_HashCode msg_id_hash;
1077 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1079 struct FragmentQueue
1080 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1084 fragq = GNUNET_new (struct FragmentQueue);
1085 fragq->state = MSG_FRAG_STATE_HEADER;
1087 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1089 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1090 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1092 if (NULL == chan_msgs)
1094 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1095 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1096 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1100 struct GNUNET_HashCode frag_id_hash;
1101 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1102 struct RecvCacheEntry
1103 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1104 if (NULL == cache_entry)
1106 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1107 "%p Adding message fragment to cache. "
1108 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1109 chn, GNUNET_ntohll (mmsg->message_id),
1110 GNUNET_ntohll (mmsg->fragment_id));
1111 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1112 "%p header_size: %" PRIu64 " + %u\n",
1113 chn, fragq->header_size, size);
1114 cache_entry = GNUNET_new (struct RecvCacheEntry);
1115 cache_entry->ref_count = 1;
1116 cache_entry->mmsg = GNUNET_malloc (size);
1117 memcpy (cache_entry->mmsg, mmsg, size);
1118 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1119 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1123 cache_entry->ref_count++;
1124 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1125 "%p Message fragment is already in cache. "
1126 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1127 ", ref_count: %u\n",
1128 chn, GNUNET_ntohll (mmsg->message_id),
1129 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1132 if (MSG_FRAG_STATE_HEADER == fragq->state)
1134 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1136 struct GNUNET_PSYC_MessageMethod *
1137 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1138 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1139 fragq->flags = ntohl (pmeth->flags);
1142 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1144 fragq->header_size += size;
1146 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1147 || frag_offset == fragq->header_size)
1148 { /* header is now complete */
1149 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1150 "%p Header of message %" PRIu64 " is complete.\n",
1151 chn, GNUNET_ntohll (mmsg->message_id));
1153 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1154 "%p Adding message %" PRIu64 " to queue.\n",
1155 chn, GNUNET_ntohll (mmsg->message_id));
1156 fragq->state = MSG_FRAG_STATE_DATA;
1160 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1161 "%p Header of message %" PRIu64 " is NOT complete yet: "
1162 "%" PRIu64 " != %" PRIu64 "\n",
1163 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1164 fragq->header_size);
1170 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1171 if (frag_offset == fragq->size)
1172 fragq->state = MSG_FRAG_STATE_END;
1174 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1175 "%p Message %" PRIu64 " is NOT complete yet: "
1176 "%" PRIu64 " != %" PRIu64 "\n",
1177 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1181 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1182 /* Drop message without delivering to client if it's a single fragment */
1184 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1185 ? MSG_FRAG_STATE_DROP
1186 : MSG_FRAG_STATE_CANCEL;
1189 switch (fragq->state)
1191 case MSG_FRAG_STATE_DATA:
1192 case MSG_FRAG_STATE_END:
1193 case MSG_FRAG_STATE_CANCEL:
1194 if (GNUNET_NO == fragq->is_queued)
1196 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1197 GNUNET_ntohll (mmsg->message_id));
1198 fragq->is_queued = GNUNET_YES;
1202 fragq->size += size;
1203 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1204 GNUNET_ntohll (mmsg->fragment_id));
1209 * Run fragment queue of a message.
1211 * Send fragments of a message in order to client, after all modifiers arrived
1214 * @param chn Channel.
1215 * @param msg_id ID of the message @a fragq belongs to.
1216 * @param fragq Fragment queue of the message.
1217 * @param drop Drop message without delivering to client?
1218 * #GNUNET_YES or #GNUNET_NO.
1221 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1222 struct FragmentQueue *fragq, uint8_t drop)
1224 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1225 "%p Running message fragment queue for message %" PRIu64
1227 chn, msg_id, fragq->state);
1229 struct GNUNET_CONTAINER_MultiHashMap
1230 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1231 &chn->pub_key_hash);
1232 GNUNET_assert (NULL != chan_msgs); // FIXME
1235 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1238 struct GNUNET_HashCode frag_id_hash;
1239 hash_key_from_hll (&frag_id_hash, frag_id);
1240 struct RecvCacheEntry *cache_entry
1241 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1242 if (cache_entry != NULL)
1244 if (GNUNET_NO == drop)
1246 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1248 if (cache_entry->ref_count <= 1)
1250 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1252 GNUNET_free (cache_entry->mmsg);
1253 GNUNET_free (cache_entry);
1257 cache_entry->ref_count--;
1260 #if CACHE_AGING_IMPLEMENTED
1261 else if (GNUNET_NO == drop)
1263 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1267 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1270 if (MSG_FRAG_STATE_END <= fragq->state)
1272 struct GNUNET_HashCode msg_id_hash;
1273 hash_key_from_hll (&msg_id_hash, msg_id);
1275 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1276 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1277 GNUNET_free (fragq);
1281 fragq->is_queued = GNUNET_NO;
1286 struct StateModifyClosure
1288 struct Channel *chn;
1290 struct GNUNET_HashCode msg_id_hash;
1295 store_recv_state_modify_result (void *cls, int64_t result,
1296 const char *err_msg, uint16_t err_msg_size)
1298 struct StateModifyClosure *mcls = cls;
1299 struct Channel *chn = mcls->chn;
1300 uint64_t msg_id = mcls->msg_id;
1302 struct FragmentQueue *
1303 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1305 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1307 chn, result, err_msg_size, err_msg);
1314 fragq->state_is_modified = GNUNET_YES;
1315 if (chn->max_state_message_id < msg_id)
1316 chn->max_state_message_id = msg_id;
1317 if (chn->max_message_id < msg_id)
1318 chn->max_message_id = msg_id;
1321 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1322 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1323 message_queue_run (chn);
1327 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1328 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1329 chn, result, err_msg_size, err_msg);
1330 /** @todo FIXME: handle state_modify error */
1336 * Run message queue.
1338 * Send messages in queue to client in order after a message has arrived from
1339 * multicast, according to the following:
1340 * - A message is only sent if all of its modifiers arrived.
1341 * - A stateful message is only sent if the previous stateful message
1342 * has already been delivered to the client.
1344 * @param chn Channel.
1346 * @return Number of messages removed from queue and sent to client.
1349 message_queue_run (struct Channel *chn)
1351 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1352 "%p Running message queue.\n", chn);
1356 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1359 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1360 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1361 struct GNUNET_HashCode msg_id_hash;
1362 hash_key_from_hll (&msg_id_hash, msg_id);
1364 struct FragmentQueue *
1365 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1367 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1369 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1370 "%p No fragq (%p) or header not complete.\n",
1375 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1376 "%p Fragment queue entry: state: %u, state delta: "
1377 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1378 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1380 if (MSG_FRAG_STATE_DATA <= fragq->state)
1382 /* Check if there's a missing message before the current one */
1383 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n");
1387 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1388 && (chn->max_message_id != msg_id - 1
1389 && chn->max_message_id != msg_id))
1391 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1392 "%p Out of order message. "
1393 "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1394 chn, chn->max_message_id, msg_id);
1396 // FIXME: keep track of messages processed in this queue run,
1397 // and only stop after reaching the end
1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n");
1403 if (GNUNET_YES != fragq->state_is_modified)
1405 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1407 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1408 "%p Out of order stateful message. "
1409 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1410 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1412 // FIXME: keep track of messages processed in this queue run,
1413 // and only stop after reaching the end
1416 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1418 mcls->msg_id = msg_id;
1419 mcls->msg_id_hash = msg_id_hash;
1421 /* Apply modifiers to state in PSYCstore */
1422 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1424 store_recv_state_modify_result, mcls);
1425 break; // continue after asynchronous state modify result
1428 chn->max_message_id = msg_id;
1430 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1431 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1435 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1436 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1442 * Drop message queue of a channel.
1444 * Remove all messages in queue without sending it to clients.
1446 * @param chn Channel.
1448 * @return Number of messages removed from queue.
1451 message_queue_drop (struct Channel *chn)
1453 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1454 "%p Dropping message queue.\n", chn);
1457 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1460 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1461 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1462 struct GNUNET_HashCode msg_id_hash;
1463 hash_key_from_hll (&msg_id_hash, msg_id);
1465 struct FragmentQueue *
1466 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1467 GNUNET_assert (NULL != fragq);
1468 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1469 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1472 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1473 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1479 * Received result of GNUNET_PSYCSTORE_fragment_store().
1482 store_recv_fragment_store_result (void *cls, int64_t result,
1483 const char *err_msg, uint16_t err_msg_size)
1485 struct Channel *chn = cls;
1486 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1487 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1488 chn, result, err_msg_size, err_msg);
1493 * Handle incoming message fragment from multicast.
1495 * Store it using PSYCstore and send it to the clients of the channel in order.
1498 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1500 struct Channel *chn = cls;
1501 uint16_t size = ntohs (mmsg->header.size);
1503 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1504 "%p Received multicast message of size %u.\n",
1507 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1508 &store_recv_fragment_store_result, chn);
1510 uint16_t first_ptype = 0, last_ptype = 0;
1512 == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1513 (const char *) &mmsg[1],
1514 &first_ptype, &last_ptype))
1516 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1517 "%p Dropping incoming multicast message with invalid parts.\n",
1519 GNUNET_break_op (0);
1523 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1524 "Message parts: first: type %u, last: type %u\n",
1525 first_ptype, last_ptype);
1527 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1528 message_queue_run (chn);
1533 * Incoming request fragment from multicast for a master.
1535 * @param cls Master.
1536 * @param req The request.
1539 mcast_recv_request (void *cls,
1540 const struct GNUNET_MULTICAST_RequestHeader *req)
1542 struct Master *mst = cls;
1543 uint16_t size = ntohs (req->header.size);
1545 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1546 "%p Received multicast request of size %u.\n",
1549 uint16_t first_ptype = 0, last_ptype = 0;
1551 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1552 (const char *) &req[1],
1553 &first_ptype, &last_ptype))
1555 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1556 "%p Dropping incoming multicast request with invalid parts.\n",
1558 GNUNET_break_op (0);
1562 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1563 "Message parts: first: type %u, last: type %u\n",
1564 first_ptype, last_ptype);
1566 /* FIXME: in-order delivery */
1567 client_send_mcast_req (mst, req);
1572 * Response from PSYCstore with the current counter values for a channel master.
1575 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1576 uint64_t max_message_id, uint64_t max_group_generation,
1577 uint64_t max_state_message_id)
1579 struct Master *mst = cls;
1580 struct Channel *chn = &mst->chn;
1581 chn->store_op = NULL;
1583 struct GNUNET_PSYC_CountersResultMessage res;
1584 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1585 res.header.size = htons (sizeof (res));
1586 res.result_code = htonl (result);
1587 res.max_message_id = GNUNET_htonll (max_message_id);
1589 if (GNUNET_OK == result || GNUNET_NO == result)
1591 mst->max_message_id = max_message_id;
1592 chn->max_message_id = max_message_id;
1593 chn->max_state_message_id = max_state_message_id;
1594 mst->max_group_generation = max_group_generation;
1596 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1597 &mcast_recv_join_request,
1598 &mcast_recv_membership_test,
1599 &mcast_recv_replay_fragment,
1600 &mcast_recv_replay_message,
1601 &mcast_recv_request,
1602 &mcast_recv_message, chn);
1603 chn->is_ready = GNUNET_YES;
1607 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1608 "%p GNUNET_PSYCSTORE_counters_get() "
1609 "returned %d for channel %s.\n",
1610 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1613 client_send_msg (chn, &res.header);
1618 * Response from PSYCstore with the current counter values for a channel slave.
1621 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1622 uint64_t max_message_id, uint64_t max_group_generation,
1623 uint64_t max_state_message_id)
1625 struct Slave *slv = cls;
1626 struct Channel *chn = &slv->chn;
1627 chn->store_op = NULL;
1629 struct GNUNET_PSYC_CountersResultMessage res;
1630 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1631 res.header.size = htons (sizeof (res));
1632 res.result_code = htonl (result);
1633 res.max_message_id = GNUNET_htonll (max_message_id);
1635 if (GNUNET_OK == result || GNUNET_NO == result)
1637 chn->max_message_id = max_message_id;
1638 chn->max_state_message_id = max_state_message_id;
1640 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1642 slv->relay_count, slv->relays,
1643 &slv->join_msg->header,
1644 &mcast_recv_join_request,
1645 &mcast_recv_join_decision,
1646 &mcast_recv_membership_test,
1647 &mcast_recv_replay_fragment,
1648 &mcast_recv_replay_message,
1649 &mcast_recv_message, chn);
1650 if (NULL != slv->join_msg)
1652 GNUNET_free (slv->join_msg);
1653 slv->join_msg = NULL;
1658 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1659 "%p GNUNET_PSYCSTORE_counters_get() "
1660 "returned %d for channel %s.\n",
1661 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1664 client_send_msg (chn, &res.header);
1669 channel_init (struct Channel *chn)
1672 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1673 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1678 * Handle a connecting client starting a channel master.
1681 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1682 const struct GNUNET_MessageHeader *msg)
1684 const struct MasterStartRequest *req
1685 = (const struct MasterStartRequest *) msg;
1687 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1688 struct GNUNET_HashCode pub_key_hash;
1690 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1691 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1694 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1695 struct Channel *chn;
1699 mst = GNUNET_new (struct Master);
1700 mst->policy = ntohl (req->policy);
1701 mst->priv_key = req->channel_key;
1702 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1705 chn->is_master = GNUNET_YES;
1706 chn->pub_key = pub_key;
1707 chn->pub_key_hash = pub_key_hash;
1710 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1711 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1712 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1713 store_recv_master_counters, mst);
1719 struct GNUNET_PSYC_CountersResultMessage res;
1720 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1721 res.header.size = htons (sizeof (res));
1722 res.result_code = htonl (GNUNET_OK);
1723 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1725 GNUNET_SERVER_notification_context_add (nc, client);
1726 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1730 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1731 "%p Client connected as master to channel %s.\n",
1732 mst, GNUNET_h2s (&chn->pub_key_hash));
1734 struct Client *cli = GNUNET_new (struct Client);
1735 cli->client = client;
1736 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1738 GNUNET_SERVER_client_set_user_context (client, chn);
1739 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1744 * Handle a connecting client joining as a channel slave.
1747 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1748 const struct GNUNET_MessageHeader *msg)
1750 const struct SlaveJoinRequest *req
1751 = (const struct SlaveJoinRequest *) msg;
1752 uint16_t req_size = ntohs (req->header.size);
1754 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1755 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1757 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1758 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1759 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1761 struct GNUNET_CONTAINER_MultiHashMap *
1762 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1763 struct Slave *slv = NULL;
1764 struct Channel *chn;
1766 if (NULL != chn_slv)
1768 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1772 slv = GNUNET_new (struct Slave);
1773 slv->priv_key = req->slave_key;
1774 slv->pub_key = slv_pub_key;
1775 slv->pub_key_hash = slv_pub_key_hash;
1776 slv->origin = req->origin;
1777 slv->relay_count = ntohl (req->relay_count);
1779 const struct GNUNET_PeerIdentity *
1780 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1781 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1782 uint16_t join_msg_size = 0;
1784 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1787 join_msg_size = ntohs (slv->join_msg->header.size);
1788 slv->join_msg = GNUNET_malloc (join_msg_size);
1789 memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
1791 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1793 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1794 "%u + %u + %u != %u\n",
1795 sizeof (*req), relay_size, join_msg_size, req_size);
1797 GNUNET_SERVER_client_disconnect (client);
1801 if (0 < slv->relay_count)
1803 slv->relays = GNUNET_malloc (relay_size);
1804 memcpy (slv->relays, &req[1], relay_size);
1808 chn->is_master = GNUNET_NO;
1809 chn->pub_key = req->channel_key;
1810 chn->pub_key_hash = pub_key_hash;
1813 if (NULL == chn_slv)
1815 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1816 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1817 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1819 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1820 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1821 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1822 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1823 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1824 &store_recv_slave_counters, slv);
1830 struct GNUNET_PSYC_CountersResultMessage res;
1831 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1832 res.header.size = htons (sizeof (res));
1833 res.result_code = htonl (GNUNET_OK);
1834 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1836 GNUNET_SERVER_notification_context_add (nc, client);
1837 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1840 if (NULL == slv->member)
1843 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1845 slv->relay_count, slv->relays,
1846 &slv->join_msg->header,
1847 &mcast_recv_join_request,
1848 &mcast_recv_join_decision,
1849 &mcast_recv_membership_test,
1850 &mcast_recv_replay_fragment,
1851 &mcast_recv_replay_message,
1852 &mcast_recv_message, chn);
1853 if (NULL != slv->join_msg)
1855 GNUNET_free (slv->join_msg);
1856 slv->join_msg = NULL;
1859 else if (NULL != slv->join_dcsn)
1861 GNUNET_SERVER_notification_context_add (nc, client);
1862 GNUNET_SERVER_notification_context_unicast (nc, client,
1863 &slv->join_dcsn->header,
1868 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1869 "%p Client connected as slave to channel %s.\n",
1870 slv, GNUNET_h2s (&chn->pub_key_hash));
1872 struct Client *cli = GNUNET_new (struct Client);
1873 cli->client = client;
1874 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1876 GNUNET_SERVER_client_set_user_context (client, chn);
1877 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1881 struct JoinDecisionClosure
1883 int32_t is_admitted;
1884 struct GNUNET_MessageHeader *msg;
1889 * Iterator callback for sending join decisions to multicast.
1892 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1895 struct JoinDecisionClosure *jcls = cls;
1896 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1897 // FIXME: add relays
1898 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1904 * Join decision from client.
1907 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1908 const struct GNUNET_MessageHeader *msg)
1910 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1911 = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1912 struct Channel *chn;
1914 struct JoinDecisionClosure jcls;
1916 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1920 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1923 GNUNET_assert (GNUNET_YES == chn->is_master);
1924 mst = (struct Master *) chn;
1925 jcls.is_admitted = ntohl (dcsn->is_admitted);
1927 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1928 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1931 struct GNUNET_HashCode slave_key_hash;
1932 GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1935 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1936 "%p Got join decision (%d) from client for channel %s..\n",
1937 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1938 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1939 "%p ..and slave %s.\n",
1940 mst, GNUNET_h2s (&slave_key_hash));
1942 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1943 &mcast_send_join_decision, &jcls);
1944 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1945 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1950 * Send acknowledgement to a client.
1952 * Sent after a message fragment has been passed on to multicast.
1954 * @param chn The channel struct for the client.
1957 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1959 struct GNUNET_MessageHeader res;
1960 res.size = htons (sizeof (res));
1961 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1964 GNUNET_SERVER_notification_context_add (nc, client);
1965 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1970 * Callback for the transmit functions of multicast.
1973 transmit_notify (void *cls, size_t *data_size, void *data)
1975 struct Channel *chn = cls;
1976 struct TransmitMessage *tmit_msg = chn->tmit_head;
1978 if (NULL == tmit_msg || *data_size < tmit_msg->size)
1980 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1981 "%p transmit_notify: nothing to send.\n", chn);
1986 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1987 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1989 *data_size = tmit_msg->size;
1990 memcpy (data, &tmit_msg[1], *data_size);
1992 int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1994 if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
1995 send_message_ack (chn, tmit_msg->client);
1997 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1998 GNUNET_free (tmit_msg);
2000 if (NULL != chn->tmit_head)
2002 transmit_message (chn);
2004 else if (GNUNET_YES == chn->is_disconnected)
2006 /* FIXME: handle partial message (when still in_transmit) */
2007 cleanup_channel (chn);
2014 * Callback for the transmit functions of multicast.
2017 master_transmit_notify (void *cls, size_t *data_size, void *data)
2019 int ret = transmit_notify (cls, data_size, data);
2021 if (GNUNET_YES == ret)
2023 struct Master *mst = cls;
2024 mst->tmit_handle = NULL;
2031 * Callback for the transmit functions of multicast.
2034 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2036 int ret = transmit_notify (cls, data_size, data);
2038 if (GNUNET_YES == ret)
2040 struct Slave *slv = cls;
2041 slv->tmit_handle = NULL;
2048 * Transmit a message from a channel master to the multicast group.
2051 master_transmit_message (struct Master *mst)
2053 if (NULL == mst->tmit_handle)
2056 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
2057 mst->max_group_generation,
2058 master_transmit_notify, mst);
2062 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2068 * Transmit a message from a channel slave to the multicast group.
2071 slave_transmit_message (struct Slave *slv)
2073 if (NULL == slv->tmit_handle)
2076 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
2077 slave_transmit_notify, slv);
2081 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2087 transmit_message (struct Channel *chn)
2090 ? master_transmit_message ((struct Master *) chn)
2091 : slave_transmit_message ((struct Slave *) chn);
2096 * Queue a message from a channel master for sending to the multicast group.
2099 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
2100 uint16_t first_ptype, uint16_t last_ptype)
2102 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2104 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
2106 tmit_msg->id = ++mst->max_message_id;
2107 struct GNUNET_PSYC_MessageMethod *pmeth
2108 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2110 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2112 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2114 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2116 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_modify flag\n", mst);
2117 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2118 - mst->max_state_message_id);
2122 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_not_modified flag\n", mst);
2123 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2126 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2128 /// @todo add state_hash to PSYC header
2135 * Queue a message from a channel slave for sending to the multicast group.
2138 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
2139 uint16_t first_ptype, uint16_t last_ptype)
2141 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
2143 struct GNUNET_PSYC_MessageMethod *pmeth
2144 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2145 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2146 tmit_msg->id = ++slv->max_request_id;
2152 * Queue PSYC message parts for sending to multicast.
2154 * @param chn Channel to send to.
2155 * @param client Client the message originates from.
2156 * @param data_size Size of @a data.
2157 * @param data Concatenated message parts.
2158 * @param first_ptype First message part type in @a data.
2159 * @param last_ptype Last message part type in @a data.
2161 static struct TransmitMessage *
2162 queue_message (struct Channel *chn,
2163 struct GNUNET_SERVER_Client *client,
2166 uint16_t first_ptype, uint16_t last_ptype)
2168 struct TransmitMessage *
2169 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2170 memcpy (&tmit_msg[1], data, data_size);
2171 tmit_msg->client = client;
2172 tmit_msg->size = data_size;
2173 tmit_msg->state = chn->tmit_state;
2175 /* FIXME: separate queue per message ID */
2177 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2180 ? master_queue_message ((struct Master *) chn, tmit_msg,
2181 first_ptype, last_ptype)
2182 : slave_queue_message ((struct Slave *) chn, tmit_msg,
2183 first_ptype, last_ptype);
2189 * Cancel transmission of current message.
2191 * @param chn Channel to send to.
2192 * @param client Client the message originates from.
2195 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2197 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2199 struct GNUNET_MessageHeader msg;
2200 msg.size = htons (sizeof (msg));
2201 msg.type = htons (type);
2203 queue_message (chn, client, sizeof (msg), &msg, type, type);
2204 transmit_message (chn);
2206 /* FIXME: cleanup */
2211 * Incoming message from a master or slave client.
2214 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2215 const struct GNUNET_MessageHeader *msg)
2218 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2219 GNUNET_assert (NULL != chn);
2221 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2222 "%p Received message from client.\n", chn);
2223 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2225 if (GNUNET_YES != chn->is_ready)
2227 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2228 "%p Channel is not ready yet, disconnecting client.\n", chn);
2230 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2234 uint16_t size = ntohs (msg->size);
2235 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2237 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
2239 transmit_cancel (chn, client);
2240 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2244 uint16_t first_ptype = 0, last_ptype = 0;
2246 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2247 (const char *) &msg[1],
2248 &first_ptype, &last_ptype))
2250 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2251 "%p Received invalid message part from client.\n", chn);
2253 transmit_cancel (chn, client);
2254 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2257 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2258 "%p Received message with first part type %u and last part type %u.\n",
2259 chn, first_ptype, last_ptype);
2261 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2262 first_ptype, last_ptype);
2263 transmit_message (chn);
2264 /* FIXME: send a few ACKs even before transmit_notify is called */
2266 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2271 * Received result of GNUNET_PSYCSTORE_membership_store()
2274 store_recv_membership_store_result (void *cls, int64_t result,
2275 const char *err_msg, uint16_t err_msg_size)
2277 struct Operation *op = cls;
2278 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2279 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2280 op->chn, result, err_msg_size, err_msg);
2282 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2288 * Client requests to add/remove a slave in the membership database.
2291 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2292 const struct GNUNET_MessageHeader *msg)
2295 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2296 GNUNET_assert (NULL != chn);
2298 const struct ChannelMembershipStoreRequest *
2299 req = (const struct ChannelMembershipStoreRequest *) msg;
2301 struct Operation *op = op_add (chn, client, req->op_id, 0);
2303 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2304 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2305 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2306 "%p Received membership store request from client.\n", chn);
2307 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2308 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2309 chn, req->did_join, announced_at, effective_since);
2311 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2312 req->did_join, announced_at, effective_since,
2313 0, /* FIXME: group_generation */
2314 &store_recv_membership_store_result, op);
2315 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2320 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2321 * in response to a history request from a client.
2324 store_recv_fragment_history (void *cls,
2325 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2326 enum GNUNET_PSYCSTORE_MessageFlags flags)
2328 struct Operation *op = cls;
2329 if (NULL == op->client)
2330 { /* Requesting client already disconnected. */
2333 struct Channel *chn = op->chn;
2335 struct GNUNET_PSYC_MessageHeader *pmsg;
2336 uint16_t msize = ntohs (mmsg->header.size);
2337 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2339 struct GNUNET_OperationResultMessage *
2340 res = GNUNET_malloc (sizeof (*res) + psize);
2341 res->header.size = htons (sizeof (*res) + psize);
2342 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2343 res->op_id = op->op_id;
2344 res->result_code = GNUNET_htonll (GNUNET_OK);
2346 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2347 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2348 memcpy (&res[1], pmsg, psize);
2350 /** @todo FIXME: send only to requesting client */
2351 client_send_msg (chn, &res->header);
2357 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2358 * in response to a history request from a client.
2361 store_recv_fragment_history_result (void *cls, int64_t result,
2362 const char *err_msg, uint16_t err_msg_size)
2364 struct Operation *op = cls;
2365 if (NULL == op->client)
2366 { /* Requesting client already disconnected. */
2370 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2371 "%p History replay #%" PRIu64 ": "
2372 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2373 op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2375 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2377 /** @todo Multicast replay request for messages not found locally. */
2380 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2386 * Client requests channel history.
2389 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2390 const struct GNUNET_MessageHeader *msg)
2393 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2394 GNUNET_assert (NULL != chn);
2396 const struct GNUNET_PSYC_HistoryRequestMessage *
2397 req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2398 uint16_t size = ntohs (msg->size);
2399 const char *method_prefix = (const char *) &req[1];
2401 if (size < sizeof (*req) + 1
2402 || '\0' != method_prefix[size - sizeof (*req) - 1])
2404 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2405 "%p History replay #%" PRIu64 ": "
2406 "invalid method prefix. size: %u < %u?\n",
2407 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2409 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2413 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2415 if (0 == req->message_limit)
2416 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2417 GNUNET_ntohll (req->start_message_id),
2418 GNUNET_ntohll (req->end_message_id),
2420 &store_recv_fragment_history,
2421 &store_recv_fragment_history_result, op);
2423 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2424 GNUNET_ntohll (req->message_limit),
2426 &store_recv_fragment_history,
2427 &store_recv_fragment_history_result,
2430 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2435 * Received state var from PSYCstore, send it to client.
2438 store_recv_state_var (void *cls, const char *name,
2439 const void *value, uint32_t value_size)
2441 struct Operation *op = cls;
2442 struct GNUNET_OperationResultMessage *res;
2444 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2445 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2446 op->chn, GNUNET_ntohll (op->op_id), name);
2448 if (NULL != name) /* First part */
2450 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2451 struct GNUNET_PSYC_MessageModifier *mod;
2452 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2453 res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2454 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2455 res->op_id = op->op_id;
2457 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2458 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2459 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2460 mod->name_size = htons (name_size);
2461 mod->value_size = htonl (value_size);
2462 mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2463 memcpy (&mod[1], name, name_size);
2464 memcpy (((char *) &mod[1]) + name_size, value, value_size);
2466 else /* Continuation */
2468 struct GNUNET_MessageHeader *mod;
2469 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2470 res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2471 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2472 res->op_id = op->op_id;
2474 mod = (struct GNUNET_MessageHeader *) &res[1];
2475 mod->size = htons (sizeof (*mod) + value_size);
2476 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2477 memcpy (&mod[1], value, value_size);
2480 // FIXME: client might have been disconnected
2481 GNUNET_SERVER_notification_context_add (nc, op->client);
2482 GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2489 * Received result of GNUNET_PSYCSTORE_state_get()
2490 * or GNUNET_PSYCSTORE_state_get_prefix()
2493 store_recv_state_result (void *cls, int64_t result,
2494 const char *err_msg, uint16_t err_msg_size)
2496 struct Operation *op = cls;
2497 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2498 "%p state_get #%" PRIu64 ": "
2499 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2500 op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2502 // FIXME: client might have been disconnected
2503 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2509 * Client requests best matching state variable from PSYCstore.
2512 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2513 const struct GNUNET_MessageHeader *msg)
2516 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2517 GNUNET_assert (NULL != chn);
2519 const struct StateRequest *
2520 req = (const struct StateRequest *) msg;
2522 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2523 const char *name = (const char *) &req[1];
2524 if (0 == name_size || '\0' != name[name_size - 1])
2527 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2531 struct Operation *op = op_add (chn, client, req->op_id, 0);
2532 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2533 &store_recv_state_var,
2534 &store_recv_state_result, op);
2535 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2540 * Client requests state variables with a given prefix from PSYCstore.
2543 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2544 const struct GNUNET_MessageHeader *msg)
2547 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2548 GNUNET_assert (NULL != chn);
2550 const struct StateRequest *
2551 req = (const struct StateRequest *) msg;
2553 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2554 const char *name = (const char *) &req[1];
2555 if (0 == name_size || '\0' != name[name_size - 1])
2558 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2562 struct Operation *op = op_add (chn, client, req->op_id, 0);
2563 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2564 &store_recv_state_var,
2565 &store_recv_state_result, op);
2566 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2570 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2571 { &client_recv_master_start, NULL,
2572 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2574 { &client_recv_slave_join, NULL,
2575 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2577 { &client_recv_join_decision, NULL,
2578 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2580 { &client_recv_psyc_message, NULL,
2581 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2583 { &client_recv_membership_store, NULL,
2584 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2586 { &client_recv_history_replay, NULL,
2587 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2589 { &client_recv_state_get, NULL,
2590 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2592 { &client_recv_state_get_prefix, NULL,
2593 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2595 { NULL, NULL, 0, 0 }
2600 * Initialize the PSYC service.
2602 * @param cls Closure.
2603 * @param server The initialized server.
2604 * @param c Configuration to use.
2607 run (void *cls, struct GNUNET_SERVER_Handle *server,
2608 const struct GNUNET_CONFIGURATION_Handle *c)
2611 store = GNUNET_PSYCSTORE_connect (cfg);
2612 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2613 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2614 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2615 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2616 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2617 nc = GNUNET_SERVER_notification_context_create (server, 1);
2618 GNUNET_SERVER_add_handlers (server, server_handlers);
2619 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2620 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2621 &shutdown_task, NULL);
2626 * The main function for the service.
2628 * @param argc number of arguments from the command line
2629 * @param argv command line arguments
2630 * @return 0 ok, 1 on error
2633 main (int argc, char *const *argv)
2635 return (GNUNET_OK ==
2636 GNUNET_SERVICE_run (argc, argv, "psyc",
2637 GNUNET_SERVICE_OPTION_NONE,
2638 &run, NULL)) ? 0 : 1;
2641 /* end of gnunet-service-psyc.c */