2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
42 * Handle to our current configuration.
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
47 * Handle to the statistics service.
49 static struct GNUNET_STATISTICS_Handle *stats;
52 * Notification context, simplifies client broadcasts.
54 static struct GNUNET_SERVER_NotificationContext *nc;
57 * Handle to the PSYCstore.
59 static struct GNUNET_PSYCSTORE_Handle *store;
62 * All connected masters.
63 * Channel's pub_key_hash -> struct Master
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
68 * All connected slaves.
69 * Channel's pub_key_hash -> struct Slave
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
74 * Connected slaves per channel.
75 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
81 * Message in the transmission queue.
83 struct TransmitMessage
85 struct TransmitMessage *prev;
86 struct TransmitMessage *next;
88 struct GNUNET_SERVER_Client *client;
91 * ID assigned to the message.
101 * Type of first message part.
103 uint16_t first_ptype;
106 * Type of last message part.
110 /* Followed by message */
115 * Cache for received message fragments.
116 * Message fragments are only sent to clients after all modifiers arrived.
118 * chan_key -> MultiHashMap chan_msgs
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
124 * Entry in the chan_msgs hashmap of @a recv_cache:
125 * fragment_id -> RecvCacheEntry
127 struct RecvCacheEntry
129 struct GNUNET_MULTICAST_MessageHeader *mmsg;
135 * Entry in the @a recv_frags hash map of a @a Channel.
136 * message_id -> FragmentQueue
141 * Fragment IDs stored in @a recv_cache.
143 struct GNUNET_CONTAINER_Heap *fragments;
146 * Total size of received fragments.
151 * Total size of received header fragments (METHOD & MODIFIERs)
153 uint64_t header_size;
156 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
158 uint64_t state_delta;
161 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
166 * Receive state of message.
168 * @see MessageFragmentState
173 * Whether the state is already modified in PSYCstore.
175 uint8_t state_is_modified;
178 * Is the message queued for delivery to the client?
179 * i.e. added to the recv_msgs queue
186 * List of connected clients.
193 struct GNUNET_SERVER_Client *client;
199 struct Operation *prev;
200 struct Operation *next;
202 struct GNUNET_SERVER_Client *client;
210 * Common part of the client context for both a channel master and slave.
214 struct Client *clients_head;
215 struct Client *clients_tail;
217 struct Operation *op_head;
218 struct Operation *op_tail;
220 struct TransmitMessage *tmit_head;
221 struct TransmitMessage *tmit_tail;
224 * Current PSYCstore operation.
226 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
229 * Received fragments not yet sent to the client.
230 * message_id -> FragmentQueue
232 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
235 * Received message IDs not yet sent to the client.
237 struct GNUNET_CONTAINER_Heap *recv_msgs;
240 * Public key of the channel.
242 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
245 * Hash of @a pub_key.
247 struct GNUNET_HashCode pub_key_hash;
250 * Last message ID sent to the client.
251 * 0 if there is no such message.
253 uint64_t max_message_id;
256 * ID of the last stateful message, where the state operations has been
257 * processed and saved to PSYCstore and which has been sent to the client.
258 * 0 if there is no such message.
260 uint64_t max_state_message_id;
263 * Expected value size for the modifier being received from the PSYC service.
265 uint32_t tmit_mod_value_size_expected;
268 * Actual value size for the modifier being received from the PSYC service.
270 uint32_t tmit_mod_value_size;
273 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
278 * Is this channel ready to receive messages from client?
279 * #GNUNET_YES or #GNUNET_NO
284 * Is the client disconnected?
285 * #GNUNET_YES or #GNUNET_NO
287 uint8_t is_disconnected;
292 * Client context for a channel master.
297 * Channel struct common for Master and Slave
302 * Private key of the channel.
304 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
307 * Handle for the multicast origin.
309 struct GNUNET_MULTICAST_Origin *origin;
312 * Transmit handle for multicast.
314 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
317 * Incoming join requests from multicast.
318 * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
320 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
323 * Last message ID transmitted to this channel.
325 * Incremented before sending a message, thus the message_id in messages sent
328 uint64_t max_message_id;
331 * ID of the last message with state operations transmitted to the channel.
332 * 0 if there is no such message.
334 uint64_t max_state_message_id;
337 * Maximum group generation transmitted to the channel.
339 uint64_t max_group_generation;
342 * @see enum GNUNET_PSYC_Policy
344 enum GNUNET_PSYC_Policy policy;
349 * Client context for a channel slave.
354 * Channel struct common for Master and Slave
359 * Private key of the slave.
361 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
364 * Public key of the slave.
366 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
369 * Hash of @a pub_key.
371 struct GNUNET_HashCode pub_key_hash;
374 * Handle for the multicast member.
376 struct GNUNET_MULTICAST_Member *member;
379 * Transmit handle for multicast.
381 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
384 * Peer identity of the origin.
386 struct GNUNET_PeerIdentity origin;
389 * Number of items in @a relays.
391 uint32_t relay_count;
394 * Relays that multicast can use to connect.
396 struct GNUNET_PeerIdentity *relays;
399 * Join request to be transmitted to the master on join.
401 struct GNUNET_PSYC_Message *join_msg;
404 * Join decision received from multicast.
406 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
409 * Maximum request ID for this channel.
411 uint64_t max_request_id;
416 enum GNUNET_PSYC_SlaveJoinFlags join_flags;
421 transmit_message (struct Channel *chn);
424 message_queue_run (struct Channel *chn);
427 message_queue_drop (struct Channel *chn);
431 schedule_transmit_message (void *cls)
433 struct Channel *chn = cls;
435 transmit_message (chn);
440 * Task run during shutdown.
445 shutdown_task (void *cls)
449 GNUNET_SERVER_notification_context_destroy (nc);
454 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
460 static struct Operation *
461 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
462 uint64_t op_id, uint32_t flags)
464 struct Operation *op = GNUNET_malloc (sizeof (*op));
469 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
475 op_remove (struct Operation *op)
477 GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
483 * Clean up master data structures after a client disconnected.
486 cleanup_master (struct Master *mst)
488 struct Channel *chn = &mst->chn;
490 if (NULL != mst->origin)
491 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
492 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
493 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
498 * Clean up slave data structures after a client disconnected.
501 cleanup_slave (struct Slave *slv)
503 struct Channel *chn = &slv->chn;
504 struct GNUNET_CONTAINER_MultiHashMap *
505 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
507 GNUNET_assert (NULL != chn_slv);
508 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
510 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
512 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
514 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
516 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
518 if (NULL != slv->join_msg)
520 GNUNET_free (slv->join_msg);
521 slv->join_msg = NULL;
523 if (NULL != slv->relays)
525 GNUNET_free (slv->relays);
528 if (NULL != slv->member)
530 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
533 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
538 * Clean up channel data structures after a client disconnected.
541 cleanup_channel (struct Channel *chn)
543 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
544 "%p Cleaning up channel %s. master? %u\n",
546 GNUNET_h2s (&chn->pub_key_hash),
548 message_queue_drop (chn);
549 GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
550 chn->recv_frags = NULL;
552 if (NULL != chn->store_op)
554 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
555 chn->store_op = NULL;
558 (GNUNET_YES == chn->is_master)
559 ? cleanup_master ((struct Master *) chn)
560 : cleanup_slave ((struct Slave *) chn);
566 * Called whenever a client is disconnected.
567 * Frees our resources associated with that client.
569 * @param cls Closure.
570 * @param client Identification of the client.
573 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
579 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
583 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
584 "%p User context is NULL in client_disconnect()\n",
590 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
591 "%p Client (%s) disconnected from channel %s\n",
593 (GNUNET_YES == chn->is_master) ? "master" : "slave",
594 GNUNET_h2s (&chn->pub_key_hash));
596 struct Client *cli = chn->clients_head;
599 if (cli->client == client)
601 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
608 struct Operation *op = chn->op_head;
611 if (op->client == client)
619 if (NULL == chn->clients_head)
620 { /* Last client disconnected. */
621 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622 "%p Last client (%s) disconnected from channel %s\n",
624 (GNUNET_YES == chn->is_master) ? "master" : "slave",
625 GNUNET_h2s (&chn->pub_key_hash));
626 chn->is_disconnected = GNUNET_YES;
627 if (NULL != chn->tmit_head)
628 { /* Send pending messages to multicast before cleanup. */
629 transmit_message (chn);
633 cleanup_channel (chn);
640 * Send message to all clients connected to the channel.
643 client_send_msg (const struct Channel *chn,
644 const struct GNUNET_MessageHeader *msg)
646 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
647 "%p Sending message to clients.\n",
650 struct Client *cli = chn->clients_head;
653 GNUNET_SERVER_notification_context_add (nc, cli->client);
654 GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
661 * Send a result code back to the client.
664 * Client that should receive the result code.
668 * Operation ID in network byte order.
670 * Data payload or NULL.
675 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
676 int64_t result_code, const void *data, uint16_t data_size)
678 struct GNUNET_OperationResultMessage *res;
680 res = GNUNET_malloc (sizeof (*res) + data_size);
681 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
682 res->header.size = htons (sizeof (*res) + data_size);
683 res->result_code = GNUNET_htonll (result_code);
686 GNUNET_memcpy (&res[1], data, data_size);
688 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
689 "%p Sending result to client for operation #%" PRIu64 ": %" PRId64 " (size: %u)\n",
691 GNUNET_ntohll (op_id),
695 GNUNET_SERVER_notification_context_add (nc, client);
696 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
703 * Closure for join_mem_test_cb()
705 struct JoinMemTestClosure
707 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
709 struct GNUNET_MULTICAST_JoinHandle *jh;
710 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
715 * Membership test result callback used for join requests.
718 join_mem_test_cb (void *cls, int64_t result,
719 const char *err_msg, uint16_t err_msg_size)
721 struct JoinMemTestClosure *jcls = cls;
723 if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
724 { /* Pass on join request to client if this is a master channel */
725 struct Master *mst = (struct Master *) jcls->chn;
726 struct GNUNET_HashCode slave_pub_hash;
727 GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
729 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->jh,
730 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
731 client_send_msg (jcls->chn, &jcls->join_msg->header);
735 if (GNUNET_SYSERR == result)
737 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
738 "Could not perform membership test (%.*s)\n",
739 err_msg_size, err_msg);
742 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
744 GNUNET_free (jcls->join_msg);
750 * Incoming join request from multicast.
753 mcast_recv_join_request (void *cls,
754 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
755 const struct GNUNET_MessageHeader *join_msg,
756 struct GNUNET_MULTICAST_JoinHandle *jh)
758 struct Channel *chn = cls;
759 uint16_t join_msg_size = 0;
761 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762 "%p Got join request.\n",
764 if (NULL != join_msg)
766 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
768 join_msg_size = ntohs (join_msg->size);
772 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
773 "%p Got join message with invalid type %u.\n",
775 ntohs (join_msg->type));
779 struct GNUNET_PSYC_JoinRequestMessage *
780 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
781 req->header.size = htons (sizeof (*req) + join_msg_size);
782 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
783 req->slave_pub_key = *slave_pub_key;
784 if (0 < join_msg_size)
785 GNUNET_memcpy (&req[1], join_msg, join_msg_size);
787 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
788 jcls->slave_pub_key = *slave_pub_key;
791 jcls->join_msg = req;
793 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
794 chn->max_message_id, 0,
795 &join_mem_test_cb, jcls);
800 * Join decision received from multicast.
803 mcast_recv_join_decision (void *cls, int is_admitted,
804 const struct GNUNET_PeerIdentity *peer,
805 uint16_t relay_count,
806 const struct GNUNET_PeerIdentity *relays,
807 const struct GNUNET_MessageHeader *join_resp)
809 struct Slave *slv = cls;
810 struct Channel *chn = &slv->chn;
811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812 "%p Got join decision: %d\n",
815 if (GNUNET_YES == chn->is_ready)
817 /* Already admitted */
821 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
822 struct GNUNET_PSYC_JoinDecisionMessage *
823 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
824 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
825 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
826 dcsn->is_admitted = htonl (is_admitted);
827 if (0 < join_resp_size)
828 GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
830 client_send_msg (chn, &dcsn->header);
832 if (GNUNET_YES == is_admitted
833 && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
835 chn->is_ready = GNUNET_YES;
841 store_recv_fragment_replay (void *cls,
842 struct GNUNET_MULTICAST_MessageHeader *msg,
843 enum GNUNET_PSYCSTORE_MessageFlags flags)
845 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
847 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
853 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
856 store_recv_fragment_replay_result (void *cls,
859 uint16_t err_msg_size)
861 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
863 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
864 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
875 GNUNET_MULTICAST_replay_response (rh, NULL,
876 GNUNET_MULTICAST_REC_NOT_FOUND);
879 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
880 GNUNET_MULTICAST_replay_response (rh, NULL,
881 GNUNET_MULTICAST_REC_ACCESS_DENIED);
885 GNUNET_MULTICAST_replay_response (rh, NULL,
886 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
889 /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
890 * an error code, so it must be ensured no further processing
891 * is attempted on 'rh'. Maybe this should be refactored as
892 * it doesn't look very intuitive. --lynX
894 GNUNET_MULTICAST_replay_response_end (rh);
899 * Incoming fragment replay request from multicast.
902 mcast_recv_replay_fragment (void *cls,
903 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
904 uint64_t fragment_id, uint64_t flags,
905 struct GNUNET_MULTICAST_ReplayHandle *rh)
908 struct Channel *chn = cls;
909 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
910 fragment_id, fragment_id,
911 &store_recv_fragment_replay,
912 &store_recv_fragment_replay_result, rh);
917 * Incoming message replay request from multicast.
920 mcast_recv_replay_message (void *cls,
921 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
923 uint64_t fragment_offset,
925 struct GNUNET_MULTICAST_ReplayHandle *rh)
927 struct Channel *chn = cls;
928 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
929 message_id, message_id, 1, NULL,
930 &store_recv_fragment_replay,
931 &store_recv_fragment_replay_result, rh);
936 * Convert an uint64_t in network byte order to a HashCode
937 * that can be used as key in a MultiHashMap
940 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
942 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
943 /* TODO: use built-in byte swap functions if available */
945 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
946 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
948 *key = (struct GNUNET_HashCode) {};
950 = (n << 32) | (n >> 32);
955 * Convert an uint64_t in host byte order to a HashCode
956 * that can be used as key in a MultiHashMap
959 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
961 #if __BYTE_ORDER == __BIG_ENDIAN
962 hash_key_from_nll (key, n);
963 #elif __BYTE_ORDER == __LITTLE_ENDIAN
964 *key = (struct GNUNET_HashCode) {};
965 *((uint64_t *) key) = n;
967 #error byteorder undefined
973 * Initialize PSYC message header.
976 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
977 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
979 uint16_t size = ntohs (mmsg->header.size);
980 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
982 pmsg->header.size = htons (psize);
983 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
984 pmsg->message_id = mmsg->message_id;
985 pmsg->fragment_offset = mmsg->fragment_offset;
986 pmsg->flags = htonl (flags);
988 GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
993 * Create a new PSYC message from a multicast message for sending it to clients.
995 static inline struct GNUNET_PSYC_MessageHeader *
996 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
998 struct GNUNET_PSYC_MessageHeader *pmsg;
999 uint16_t size = ntohs (mmsg->header.size);
1000 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1002 pmsg = GNUNET_malloc (psize);
1003 psyc_msg_init (pmsg, mmsg, flags);
1009 * Send multicast message to all clients connected to the channel.
1012 client_send_mcast_msg (struct Channel *chn,
1013 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1016 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1017 "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1019 GNUNET_ntohll (mmsg->fragment_id),
1020 GNUNET_ntohll (mmsg->message_id));
1022 struct GNUNET_PSYC_MessageHeader *
1023 pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1024 client_send_msg (chn, &pmsg->header);
1030 * Send multicast request to all clients connected to the channel.
1033 client_send_mcast_req (struct Master *mst,
1034 const struct GNUNET_MULTICAST_RequestHeader *req)
1036 struct Channel *chn = &mst->chn;
1038 struct GNUNET_PSYC_MessageHeader *pmsg;
1039 uint16_t size = ntohs (req->header.size);
1040 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1042 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043 "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1045 GNUNET_ntohll (req->fragment_id),
1046 GNUNET_ntohll (req->request_id));
1048 pmsg = GNUNET_malloc (psize);
1049 pmsg->header.size = htons (psize);
1050 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1051 pmsg->message_id = req->request_id;
1052 pmsg->fragment_offset = req->fragment_offset;
1053 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1054 pmsg->slave_pub_key = req->member_pub_key;
1055 GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1057 client_send_msg (chn, &pmsg->header);
1059 /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1066 * Insert a multicast message fragment into the queue belonging to the message.
1068 * @param chn Channel.
1069 * @param mmsg Multicast message fragment.
1070 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
1071 * @param first_ptype First PSYC message part type in @a mmsg.
1072 * @param last_ptype Last PSYC message part type in @a mmsg.
1075 fragment_queue_insert (struct Channel *chn,
1076 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1077 uint16_t first_ptype, uint16_t last_ptype)
1079 const uint16_t size = ntohs (mmsg->header.size);
1080 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1081 struct GNUNET_CONTAINER_MultiHashMap
1082 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1083 &chn->pub_key_hash);
1085 struct GNUNET_HashCode msg_id_hash;
1086 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1088 struct FragmentQueue
1089 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1093 fragq = GNUNET_new (struct FragmentQueue);
1094 fragq->state = MSG_FRAG_STATE_HEADER;
1096 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1098 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1099 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1101 if (NULL == chan_msgs)
1103 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1104 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1105 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1109 struct GNUNET_HashCode frag_id_hash;
1110 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1111 struct RecvCacheEntry
1112 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1113 if (NULL == cache_entry)
1115 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1116 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1118 GNUNET_ntohll (mmsg->message_id),
1119 GNUNET_ntohll (mmsg->fragment_id));
1120 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1121 "%p header_size: %" PRIu64 " + %u\n",
1125 cache_entry = GNUNET_new (struct RecvCacheEntry);
1126 cache_entry->ref_count = 1;
1127 cache_entry->mmsg = GNUNET_malloc (size);
1128 GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1129 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1130 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1134 cache_entry->ref_count++;
1135 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1136 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1138 GNUNET_ntohll (mmsg->message_id),
1139 GNUNET_ntohll (mmsg->fragment_id),
1140 cache_entry->ref_count);
1143 if (MSG_FRAG_STATE_HEADER == fragq->state)
1145 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1147 struct GNUNET_PSYC_MessageMethod *
1148 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1149 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1150 fragq->flags = ntohl (pmeth->flags);
1153 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1155 fragq->header_size += size;
1157 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1158 || frag_offset == fragq->header_size)
1159 { /* header is now complete */
1160 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1161 "%p Header of message %" PRIu64 " is complete.\n",
1163 GNUNET_ntohll (mmsg->message_id));
1165 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1166 "%p Adding message %" PRIu64 " to queue.\n",
1168 GNUNET_ntohll (mmsg->message_id));
1169 fragq->state = MSG_FRAG_STATE_DATA;
1173 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1174 "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1176 GNUNET_ntohll (mmsg->message_id),
1178 fragq->header_size);
1184 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1185 if (frag_offset == fragq->size)
1186 fragq->state = MSG_FRAG_STATE_END;
1188 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1189 "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1191 GNUNET_ntohll (mmsg->message_id),
1196 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1197 /* Drop message without delivering to client if it's a single fragment */
1199 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1200 ? MSG_FRAG_STATE_DROP
1201 : MSG_FRAG_STATE_CANCEL;
1204 switch (fragq->state)
1206 case MSG_FRAG_STATE_DATA:
1207 case MSG_FRAG_STATE_END:
1208 case MSG_FRAG_STATE_CANCEL:
1209 if (GNUNET_NO == fragq->is_queued)
1211 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1212 GNUNET_ntohll (mmsg->message_id));
1213 fragq->is_queued = GNUNET_YES;
1217 fragq->size += size;
1218 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1219 GNUNET_ntohll (mmsg->fragment_id));
1224 * Run fragment queue of a message.
1226 * Send fragments of a message in order to client, after all modifiers arrived
1232 * ID of the message @a fragq belongs to.
1234 * Fragment queue of the message.
1236 * Drop message without delivering to client?
1237 * #GNUNET_YES or #GNUNET_NO.
1240 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1241 struct FragmentQueue *fragq, uint8_t drop)
1243 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1244 "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1249 struct GNUNET_CONTAINER_MultiHashMap
1250 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1251 &chn->pub_key_hash);
1252 GNUNET_assert (NULL != chan_msgs);
1255 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1258 struct GNUNET_HashCode frag_id_hash;
1259 hash_key_from_hll (&frag_id_hash, frag_id);
1260 struct RecvCacheEntry *cache_entry
1261 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1262 if (cache_entry != NULL)
1264 if (GNUNET_NO == drop)
1266 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1268 if (cache_entry->ref_count <= 1)
1270 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1272 GNUNET_free (cache_entry->mmsg);
1273 GNUNET_free (cache_entry);
1277 cache_entry->ref_count--;
1280 #if CACHE_AGING_IMPLEMENTED
1281 else if (GNUNET_NO == drop)
1283 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1287 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1290 if (MSG_FRAG_STATE_END <= fragq->state)
1292 struct GNUNET_HashCode msg_id_hash;
1293 hash_key_from_hll (&msg_id_hash, msg_id);
1295 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1296 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1297 GNUNET_free (fragq);
1301 fragq->is_queued = GNUNET_NO;
1306 struct StateModifyClosure
1308 struct Channel *chn;
1310 struct GNUNET_HashCode msg_id_hash;
1315 store_recv_state_modify_result (void *cls, int64_t result,
1316 const char *err_msg, uint16_t err_msg_size)
1318 struct StateModifyClosure *mcls = cls;
1319 struct Channel *chn = mcls->chn;
1320 uint64_t msg_id = mcls->msg_id;
1322 struct FragmentQueue *
1323 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1325 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1327 chn, result, err_msg_size, err_msg);
1334 fragq->state_is_modified = GNUNET_YES;
1335 if (chn->max_state_message_id < msg_id)
1336 chn->max_state_message_id = msg_id;
1337 if (chn->max_message_id < msg_id)
1338 chn->max_message_id = msg_id;
1341 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1342 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1343 message_queue_run (chn);
1347 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1348 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1349 chn, result, err_msg_size, err_msg);
1350 /** @todo FIXME: handle state_modify error */
1356 * Run message queue.
1358 * Send messages in queue to client in order after a message has arrived from
1359 * multicast, according to the following:
1360 * - A message is only sent if all of its modifiers arrived.
1361 * - A stateful message is only sent if the previous stateful message
1362 * has already been delivered to the client.
1364 * @param chn Channel.
1366 * @return Number of messages removed from queue and sent to client.
1369 message_queue_run (struct Channel *chn)
1371 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1372 "%p Running message queue.\n", chn);
1376 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1379 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1380 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1381 struct GNUNET_HashCode msg_id_hash;
1382 hash_key_from_hll (&msg_id_hash, msg_id);
1384 struct FragmentQueue *
1385 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1387 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1389 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1390 "%p No fragq (%p) or header not complete.\n",
1395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1396 "%p Fragment queue entry: state: %u, state delta: "
1397 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1398 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1400 if (MSG_FRAG_STATE_DATA <= fragq->state)
1402 /* Check if there's a missing message before the current one */
1403 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1407 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1408 && (chn->max_message_id != msg_id - 1
1409 && chn->max_message_id != msg_id))
1411 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1412 "%p Out of order message. "
1413 "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1414 chn, chn->max_message_id, msg_id);
1416 // FIXME: keep track of messages processed in this queue run,
1417 // and only stop after reaching the end
1422 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1423 if (GNUNET_YES != fragq->state_is_modified)
1425 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1427 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1428 "%p Out of order stateful message. "
1429 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1430 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1432 // FIXME: keep track of messages processed in this queue run,
1433 // and only stop after reaching the end
1436 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1438 mcls->msg_id = msg_id;
1439 mcls->msg_id_hash = msg_id_hash;
1441 /* Apply modifiers to state in PSYCstore */
1442 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1444 store_recv_state_modify_result, mcls);
1445 break; // continue after asynchronous state modify result
1448 chn->max_message_id = msg_id;
1450 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1451 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1455 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1456 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1462 * Drop message queue of a channel.
1464 * Remove all messages in queue without sending it to clients.
1466 * @param chn Channel.
1468 * @return Number of messages removed from queue.
1471 message_queue_drop (struct Channel *chn)
1473 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1474 "%p Dropping message queue.\n", chn);
1477 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1480 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1481 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1482 struct GNUNET_HashCode msg_id_hash;
1483 hash_key_from_hll (&msg_id_hash, msg_id);
1485 struct FragmentQueue *
1486 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1487 GNUNET_assert (NULL != fragq);
1488 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1489 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1492 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1499 * Received result of GNUNET_PSYCSTORE_fragment_store().
1502 store_recv_fragment_store_result (void *cls, int64_t result,
1503 const char *err_msg, uint16_t err_msg_size)
1505 struct Channel *chn = cls;
1506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1507 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1508 chn, result, err_msg_size, err_msg);
1513 * Handle incoming message fragment from multicast.
1515 * Store it using PSYCstore and send it to the clients of the channel in order.
1518 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1520 struct Channel *chn = cls;
1521 uint16_t size = ntohs (mmsg->header.size);
1523 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1524 "%p Received multicast message of size %u. "
1525 "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1526 ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1528 GNUNET_ntohll (mmsg->fragment_id),
1529 GNUNET_ntohll (mmsg->message_id),
1530 GNUNET_ntohll (mmsg->fragment_offset),
1531 GNUNET_ntohll (mmsg->flags));
1533 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1534 &store_recv_fragment_store_result, chn);
1536 uint16_t first_ptype = 0, last_ptype = 0;
1537 int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1538 (const char *) &mmsg[1],
1539 &first_ptype, &last_ptype);
1540 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1541 "%p Message check result %d, first part type %u, last part type %u\n",
1542 chn, check, first_ptype, last_ptype);
1543 if (GNUNET_SYSERR == check)
1545 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1546 "%p Dropping incoming multicast message with invalid parts.\n",
1548 GNUNET_break_op (0);
1552 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1553 message_queue_run (chn);
1558 * Incoming request fragment from multicast for a master.
1560 * @param cls Master.
1561 * @param req The request.
1564 mcast_recv_request (void *cls,
1565 const struct GNUNET_MULTICAST_RequestHeader *req)
1567 struct Master *mst = cls;
1568 uint16_t size = ntohs (req->header.size);
1570 char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1571 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1572 "%p Received multicast request of size %u from %s.\n",
1576 uint16_t first_ptype = 0, last_ptype = 0;
1578 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1579 (const char *) &req[1],
1580 &first_ptype, &last_ptype))
1582 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1583 "%p Dropping incoming multicast request with invalid parts.\n",
1585 GNUNET_break_op (0);
1589 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1590 "Message parts: first: type %u, last: type %u\n",
1591 first_ptype, last_ptype);
1593 /* FIXME: in-order delivery */
1594 client_send_mcast_req (mst, req);
1599 * Response from PSYCstore with the current counter values for a channel master.
1602 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1603 uint64_t max_message_id, uint64_t max_group_generation,
1604 uint64_t max_state_message_id)
1606 struct Master *mst = cls;
1607 struct Channel *chn = &mst->chn;
1608 chn->store_op = NULL;
1610 struct GNUNET_PSYC_CountersResultMessage res;
1611 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1612 res.header.size = htons (sizeof (res));
1613 res.result_code = htonl (result);
1614 res.max_message_id = GNUNET_htonll (max_message_id);
1616 if (GNUNET_OK == result || GNUNET_NO == result)
1618 mst->max_message_id = max_message_id;
1619 chn->max_message_id = max_message_id;
1620 chn->max_state_message_id = max_state_message_id;
1621 mst->max_group_generation = max_group_generation;
1623 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1624 mcast_recv_join_request,
1625 mcast_recv_replay_fragment,
1626 mcast_recv_replay_message,
1628 mcast_recv_message, chn);
1629 chn->is_ready = GNUNET_YES;
1633 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1634 "%p GNUNET_PSYCSTORE_counters_get() "
1635 "returned %d for channel %s.\n",
1636 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1639 client_send_msg (chn, &res.header);
1644 * Response from PSYCstore with the current counter values for a channel slave.
1647 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1648 uint64_t max_message_id, uint64_t max_group_generation,
1649 uint64_t max_state_message_id)
1651 struct Slave *slv = cls;
1652 struct Channel *chn = &slv->chn;
1653 chn->store_op = NULL;
1655 struct GNUNET_PSYC_CountersResultMessage res;
1656 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1657 res.header.size = htons (sizeof (res));
1658 res.result_code = htonl (result);
1659 res.max_message_id = GNUNET_htonll (max_message_id);
1661 if (GNUNET_OK == result || GNUNET_NO == result)
1663 chn->max_message_id = max_message_id;
1664 chn->max_state_message_id = max_state_message_id;
1666 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1668 slv->relay_count, slv->relays,
1669 &slv->join_msg->header,
1670 mcast_recv_join_request,
1671 mcast_recv_join_decision,
1672 mcast_recv_replay_fragment,
1673 mcast_recv_replay_message,
1674 mcast_recv_message, chn);
1675 if (NULL != slv->join_msg)
1677 GNUNET_free (slv->join_msg);
1678 slv->join_msg = NULL;
1683 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1684 "%p GNUNET_PSYCSTORE_counters_get() "
1685 "returned %d for channel %s.\n",
1686 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1689 client_send_msg (chn, &res.header);
1694 channel_init (struct Channel *chn)
1697 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1698 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1703 * Handle a connecting client starting a channel master.
1706 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1707 const struct GNUNET_MessageHeader *msg)
1709 const struct MasterStartRequest *req
1710 = (const struct MasterStartRequest *) msg;
1712 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1713 struct GNUNET_HashCode pub_key_hash;
1715 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1716 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1719 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1720 struct Channel *chn;
1724 mst = GNUNET_new (struct Master);
1725 mst->policy = ntohl (req->policy);
1726 mst->priv_key = req->channel_key;
1727 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1730 chn->is_master = GNUNET_YES;
1731 chn->pub_key = pub_key;
1732 chn->pub_key_hash = pub_key_hash;
1735 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1736 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1737 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1738 store_recv_master_counters, mst);
1744 struct GNUNET_PSYC_CountersResultMessage res;
1745 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1746 res.header.size = htons (sizeof (res));
1747 res.result_code = htonl (GNUNET_OK);
1748 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1750 GNUNET_SERVER_notification_context_add (nc, client);
1751 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1755 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1756 "%p Client connected as master to channel %s.\n",
1757 mst, GNUNET_h2s (&chn->pub_key_hash));
1759 struct Client *cli = GNUNET_new (struct Client);
1760 cli->client = client;
1761 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1763 GNUNET_SERVER_client_set_user_context (client, chn);
1764 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1769 * Handle a connecting client joining as a channel slave.
1772 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1773 const struct GNUNET_MessageHeader *msg)
1775 const struct SlaveJoinRequest *req
1776 = (const struct SlaveJoinRequest *) msg;
1777 uint16_t req_size = ntohs (req->header.size);
1779 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1780 struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1782 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1783 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1784 GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1786 struct GNUNET_CONTAINER_MultiHashMap *
1787 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1788 struct Slave *slv = NULL;
1789 struct Channel *chn;
1791 if (NULL != chn_slv)
1793 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1797 slv = GNUNET_new (struct Slave);
1798 slv->priv_key = req->slave_key;
1799 slv->pub_key = slv_pub_key;
1800 slv->pub_key_hash = slv_pub_hash;
1801 slv->origin = req->origin;
1802 slv->relay_count = ntohl (req->relay_count);
1803 slv->join_flags = ntohl (req->flags);
1805 const struct GNUNET_PeerIdentity *
1806 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1807 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1808 uint16_t join_msg_size = 0;
1810 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1813 struct GNUNET_PSYC_Message *
1814 join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1815 join_msg_size = ntohs (join_msg->header.size);
1816 slv->join_msg = GNUNET_malloc (join_msg_size);
1817 GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1819 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1821 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1822 "%u + %u + %u != %u\n",
1823 (unsigned int) sizeof (*req),
1828 GNUNET_SERVER_client_disconnect (client);
1832 if (0 < slv->relay_count)
1834 slv->relays = GNUNET_malloc (relay_size);
1835 GNUNET_memcpy (slv->relays, &req[1], relay_size);
1839 chn->is_master = GNUNET_NO;
1840 chn->pub_key = req->channel_pub_key;
1841 chn->pub_key_hash = pub_key_hash;
1844 if (NULL == chn_slv)
1846 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1847 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1848 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1850 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1851 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1852 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1853 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1854 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1855 &store_recv_slave_counters, slv);
1861 struct GNUNET_PSYC_CountersResultMessage res;
1862 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1863 res.header.size = htons (sizeof (res));
1864 res.result_code = htonl (GNUNET_OK);
1865 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1867 GNUNET_SERVER_notification_context_add (nc, client);
1868 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1871 if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1873 mcast_recv_join_decision (slv, GNUNET_YES,
1874 NULL, 0, NULL, NULL);
1876 else if (NULL == slv->member)
1879 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1881 slv->relay_count, slv->relays,
1882 &slv->join_msg->header,
1883 &mcast_recv_join_request,
1884 &mcast_recv_join_decision,
1885 &mcast_recv_replay_fragment,
1886 &mcast_recv_replay_message,
1887 &mcast_recv_message, chn);
1888 if (NULL != slv->join_msg)
1890 GNUNET_free (slv->join_msg);
1891 slv->join_msg = NULL;
1894 else if (NULL != slv->join_dcsn)
1896 GNUNET_SERVER_notification_context_add (nc, client);
1897 GNUNET_SERVER_notification_context_unicast (nc, client,
1898 &slv->join_dcsn->header,
1903 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1904 "%p Client connected as slave to channel %s.\n",
1905 slv, GNUNET_h2s (&chn->pub_key_hash));
1907 struct Client *cli = GNUNET_new (struct Client);
1908 cli->client = client;
1909 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1911 GNUNET_SERVER_client_set_user_context (client, chn);
1912 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1916 struct JoinDecisionClosure
1918 int32_t is_admitted;
1919 struct GNUNET_MessageHeader *msg;
1924 * Iterator callback for sending join decisions to multicast.
1927 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1930 struct JoinDecisionClosure *jcls = cls;
1931 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1932 // FIXME: add relays
1933 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1939 * Join decision from client.
1942 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1943 const struct GNUNET_MessageHeader *msg)
1945 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1946 = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1947 struct Channel *chn;
1949 struct JoinDecisionClosure jcls;
1951 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1955 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1958 GNUNET_assert (GNUNET_YES == chn->is_master);
1959 mst = (struct Master *) chn;
1960 jcls.is_admitted = ntohl (dcsn->is_admitted);
1962 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1963 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1966 struct GNUNET_HashCode slave_pub_hash;
1967 GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
1970 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1971 "%p Got join decision (%d) from client for channel %s..\n",
1972 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1973 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1974 "%p ..and slave %s.\n",
1975 mst, GNUNET_h2s (&slave_pub_hash));
1977 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
1978 &mcast_send_join_decision, &jcls);
1979 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
1980 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1985 * Send acknowledgement to a client.
1987 * Sent after a message fragment has been passed on to multicast.
1989 * @param chn The channel struct for the client.
1992 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1994 struct GNUNET_MessageHeader res;
1995 res.size = htons (sizeof (res));
1996 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1999 GNUNET_SERVER_notification_context_add (nc, client);
2000 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
2005 * Callback for the transmit functions of multicast.
2008 transmit_notify (void *cls, size_t *data_size, void *data)
2010 struct Channel *chn = cls;
2011 struct TransmitMessage *tmit_msg = chn->tmit_head;
2013 if (NULL == tmit_msg || *data_size < tmit_msg->size)
2015 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2016 "%p transmit_notify: nothing to send.\n", chn);
2017 if (NULL != tmit_msg && *data_size < tmit_msg->size)
2023 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2024 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2026 *data_size = tmit_msg->size;
2027 GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2030 = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2034 /* FIXME: handle disconnecting clients */
2035 if (NULL != tmit_msg->client)
2036 send_message_ack (chn, tmit_msg->client);
2038 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2040 if (NULL != chn->tmit_head)
2042 GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2044 else if (GNUNET_YES == chn->is_disconnected
2045 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2047 /* FIXME: handle partial message (when still in_transmit) */
2048 GNUNET_free (tmit_msg);
2049 return GNUNET_SYSERR;
2051 GNUNET_free (tmit_msg);
2057 * Callback for the transmit functions of multicast.
2060 master_transmit_notify (void *cls, size_t *data_size, void *data)
2062 int ret = transmit_notify (cls, data_size, data);
2064 if (GNUNET_YES == ret)
2066 struct Master *mst = cls;
2067 mst->tmit_handle = NULL;
2074 * Callback for the transmit functions of multicast.
2077 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2079 int ret = transmit_notify (cls, data_size, data);
2081 if (GNUNET_YES == ret)
2083 struct Slave *slv = cls;
2084 slv->tmit_handle = NULL;
2091 * Transmit a message from a channel master to the multicast group.
2094 master_transmit_message (struct Master *mst)
2096 struct Channel *chn = &mst->chn;
2097 struct TransmitMessage *tmit_msg = chn->tmit_head;
2098 if (NULL == tmit_msg)
2100 if (NULL == mst->tmit_handle)
2102 mst->tmit_handle = (void *) &mst->tmit_handle;
2103 struct GNUNET_MULTICAST_OriginTransmitHandle *
2104 tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id,
2105 mst->max_group_generation,
2106 master_transmit_notify, mst);
2107 if (NULL != mst->tmit_handle)
2108 mst->tmit_handle = tmit_handle;
2112 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2118 * Transmit a message from a channel slave to the multicast group.
2121 slave_transmit_message (struct Slave *slv)
2123 if (NULL == slv->chn.tmit_head)
2125 if (NULL == slv->tmit_handle)
2127 slv->tmit_handle = (void *) &slv->tmit_handle;
2128 struct GNUNET_MULTICAST_MemberTransmitHandle *
2129 tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
2130 slave_transmit_notify, slv);
2131 if (NULL != slv->tmit_handle)
2132 slv->tmit_handle = tmit_handle;
2136 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2142 transmit_message (struct Channel *chn)
2145 ? master_transmit_message ((struct Master *) chn)
2146 : slave_transmit_message ((struct Slave *) chn);
2151 * Queue a message from a channel master for sending to the multicast group.
2154 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2156 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2158 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2160 tmit_msg->id = ++mst->max_message_id;
2161 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2162 "%p master_queue_message: message_id=%" PRIu64 "\n",
2164 struct GNUNET_PSYC_MessageMethod *pmeth
2165 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2167 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2169 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2171 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2173 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2174 "%p master_queue_message: state_delta=%" PRIu64 "\n",
2175 mst, tmit_msg->id - mst->max_state_message_id);
2176 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2177 - mst->max_state_message_id);
2178 mst->max_state_message_id = tmit_msg->id;
2182 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2183 "%p master_queue_message: state not modified\n", mst);
2184 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2187 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2189 /// @todo add state_hash to PSYC header
2196 * Queue a message from a channel slave for sending to the multicast group.
2199 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2201 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2203 struct GNUNET_PSYC_MessageMethod *pmeth
2204 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2205 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2206 tmit_msg->id = ++slv->max_request_id;
2212 * Queue PSYC message parts for sending to multicast.
2215 * Channel to send to.
2217 * Client the message originates from.
2221 * Concatenated message parts.
2222 * @param first_ptype
2223 * First message part type in @a data.
2225 * Last message part type in @a data.
2227 static struct TransmitMessage *
2228 queue_message (struct Channel *chn,
2229 struct GNUNET_SERVER_Client *client,
2232 uint16_t first_ptype, uint16_t last_ptype)
2234 struct TransmitMessage *
2235 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2236 GNUNET_memcpy (&tmit_msg[1], data, data_size);
2237 tmit_msg->client = client;
2238 tmit_msg->size = data_size;
2239 tmit_msg->first_ptype = first_ptype;
2240 tmit_msg->last_ptype = last_ptype;
2242 /* FIXME: separate queue per message ID */
2244 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2247 ? master_queue_message ((struct Master *) chn, tmit_msg)
2248 : slave_queue_message ((struct Slave *) chn, tmit_msg);
2254 * Cancel transmission of current message.
2256 * @param chn Channel to send to.
2257 * @param client Client the message originates from.
2260 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2262 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2264 struct GNUNET_MessageHeader msg;
2265 msg.size = htons (sizeof (msg));
2266 msg.type = htons (type);
2268 queue_message (chn, client, sizeof (msg), &msg, type, type);
2269 transmit_message (chn);
2271 /* FIXME: cleanup */
2276 * Incoming message from a master or slave client.
2279 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2280 const struct GNUNET_MessageHeader *msg)
2283 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2284 GNUNET_assert (NULL != chn);
2286 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2287 "%p Received message from client.\n", chn);
2288 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2290 if (GNUNET_YES != chn->is_ready)
2292 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2293 "%p Channel is not ready yet, disconnecting client.\n", chn);
2295 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2299 uint16_t size = ntohs (msg->size);
2300 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2302 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2303 "%p Message payload too large: %u < %u.\n",
2305 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2306 (unsigned int) (size - sizeof (*msg)));
2308 transmit_cancel (chn, client);
2309 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2313 uint16_t first_ptype = 0, last_ptype = 0;
2315 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2316 (const char *) &msg[1],
2317 &first_ptype, &last_ptype))
2319 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2320 "%p Received invalid message part from client.\n", chn);
2322 transmit_cancel (chn, client);
2323 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2326 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2327 "%p Received message with first part type %u and last part type %u.\n",
2328 chn, first_ptype, last_ptype);
2330 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2331 first_ptype, last_ptype);
2332 transmit_message (chn);
2333 /* FIXME: send a few ACKs even before transmit_notify is called */
2335 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2340 * Received result of GNUNET_PSYCSTORE_membership_store()
2343 store_recv_membership_store_result (void *cls,
2345 const char *err_msg,
2346 uint16_t err_msg_size)
2348 struct Operation *op = cls;
2349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2350 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2356 if (NULL != op->client)
2357 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2363 * Client requests to add/remove a slave in the membership database.
2366 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2367 const struct GNUNET_MessageHeader *msg)
2370 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2371 GNUNET_assert (NULL != chn);
2373 const struct ChannelMembershipStoreRequest *
2374 req = (const struct ChannelMembershipStoreRequest *) msg;
2376 struct Operation *op = op_add (chn, client, req->op_id, 0);
2378 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2379 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2381 "%p Received membership store request from client.\n", chn);
2382 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2383 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2384 chn, req->did_join, announced_at, effective_since);
2386 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2387 req->did_join, announced_at, effective_since,
2388 0, /* FIXME: group_generation */
2389 &store_recv_membership_store_result, op);
2390 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2395 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2396 * in response to a history request from a client.
2399 store_recv_fragment_history (void *cls,
2400 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2401 enum GNUNET_PSYCSTORE_MessageFlags flags)
2403 struct Operation *op = cls;
2404 if (NULL == op->client)
2405 { /* Requesting client already disconnected. */
2408 struct Channel *chn = op->chn;
2410 struct GNUNET_PSYC_MessageHeader *pmsg;
2411 uint16_t msize = ntohs (mmsg->header.size);
2412 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2414 struct GNUNET_OperationResultMessage *
2415 res = GNUNET_malloc (sizeof (*res) + psize);
2416 res->header.size = htons (sizeof (*res) + psize);
2417 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2418 res->op_id = op->op_id;
2419 res->result_code = GNUNET_htonll (GNUNET_OK);
2421 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2422 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2423 GNUNET_memcpy (&res[1], pmsg, psize);
2425 /** @todo FIXME: send only to requesting client */
2426 client_send_msg (chn, &res->header);
2434 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2435 * in response to a history request from a client.
2438 store_recv_fragment_history_result (void *cls, int64_t result,
2439 const char *err_msg, uint16_t err_msg_size)
2441 struct Operation *op = cls;
2442 if (NULL == op->client)
2443 { /* Requesting client already disconnected. */
2447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2448 "%p History replay #%" PRIu64 ": "
2449 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2450 op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2452 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2454 /** @todo Multicast replay request for messages not found locally. */
2457 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2463 * Client requests channel history.
2466 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2467 const struct GNUNET_MessageHeader *msg)
2470 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2471 GNUNET_assert (NULL != chn);
2473 const struct GNUNET_PSYC_HistoryRequestMessage *
2474 req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2475 uint16_t size = ntohs (msg->size);
2476 const char *method_prefix = (const char *) &req[1];
2478 if (size < sizeof (*req) + 1
2479 || '\0' != method_prefix[size - sizeof (*req) - 1])
2481 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2482 "%p History replay #%" PRIu64 ": "
2483 "invalid method prefix. size: %u < %u?\n",
2485 GNUNET_ntohll (req->op_id),
2487 (unsigned int) sizeof (*req) + 1);
2489 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2493 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2495 if (0 == req->message_limit)
2496 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2497 GNUNET_ntohll (req->start_message_id),
2498 GNUNET_ntohll (req->end_message_id),
2500 &store_recv_fragment_history,
2501 &store_recv_fragment_history_result, op);
2503 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2504 GNUNET_ntohll (req->message_limit),
2506 &store_recv_fragment_history,
2507 &store_recv_fragment_history_result,
2510 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2515 * Received state var from PSYCstore, send it to client.
2518 store_recv_state_var (void *cls, const char *name,
2519 const void *value, uint32_t value_size)
2521 struct Operation *op = cls;
2522 struct GNUNET_OperationResultMessage *res;
2524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2525 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2526 op->chn, GNUNET_ntohll (op->op_id), name);
2528 if (NULL != name) /* First part */
2530 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2531 struct GNUNET_PSYC_MessageModifier *mod;
2532 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2533 res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2534 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2535 res->op_id = op->op_id;
2537 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2538 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2539 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2540 mod->name_size = htons (name_size);
2541 mod->value_size = htonl (value_size);
2542 mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2543 GNUNET_memcpy (&mod[1], name, name_size);
2544 GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2546 else /* Continuation */
2548 struct GNUNET_MessageHeader *mod;
2549 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2550 res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2551 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2552 res->op_id = op->op_id;
2554 mod = (struct GNUNET_MessageHeader *) &res[1];
2555 mod->size = htons (sizeof (*mod) + value_size);
2556 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2557 GNUNET_memcpy (&mod[1], value, value_size);
2560 // FIXME: client might have been disconnected
2561 GNUNET_SERVER_notification_context_add (nc, op->client);
2562 GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2570 * Received result of GNUNET_PSYCSTORE_state_get()
2571 * or GNUNET_PSYCSTORE_state_get_prefix()
2574 store_recv_state_result (void *cls, int64_t result,
2575 const char *err_msg, uint16_t err_msg_size)
2577 struct Operation *op = cls;
2578 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2579 "%p state_get #%" PRIu64 ": "
2580 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2581 op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2583 // FIXME: client might have been disconnected
2584 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2590 * Client requests best matching state variable from PSYCstore.
2593 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2594 const struct GNUNET_MessageHeader *msg)
2597 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2598 GNUNET_assert (NULL != chn);
2600 const struct StateRequest *
2601 req = (const struct StateRequest *) msg;
2603 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2604 const char *name = (const char *) &req[1];
2605 if (0 == name_size || '\0' != name[name_size - 1])
2608 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2612 struct Operation *op = op_add (chn, client, req->op_id, 0);
2613 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2614 &store_recv_state_var,
2615 &store_recv_state_result, op);
2616 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2621 * Client requests state variables with a given prefix from PSYCstore.
2624 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2625 const struct GNUNET_MessageHeader *msg)
2628 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2629 GNUNET_assert (NULL != chn);
2631 const struct StateRequest *
2632 req = (const struct StateRequest *) msg;
2634 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2635 const char *name = (const char *) &req[1];
2636 if (0 == name_size || '\0' != name[name_size - 1])
2639 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2643 struct Operation *op = op_add (chn, client, req->op_id, 0);
2644 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2645 &store_recv_state_var,
2646 &store_recv_state_result, op);
2647 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2651 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2652 { &client_recv_master_start, NULL,
2653 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2655 { &client_recv_slave_join, NULL,
2656 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2658 { &client_recv_join_decision, NULL,
2659 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2661 { &client_recv_psyc_message, NULL,
2662 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2664 { &client_recv_membership_store, NULL,
2665 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2667 { &client_recv_history_replay, NULL,
2668 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2670 { &client_recv_state_get, NULL,
2671 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2673 { &client_recv_state_get_prefix, NULL,
2674 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2676 { NULL, NULL, 0, 0 }
2681 * Initialize the PSYC service.
2683 * @param cls Closure.
2684 * @param server The initialized server.
2685 * @param c Configuration to use.
2688 run (void *cls, struct GNUNET_SERVER_Handle *server,
2689 const struct GNUNET_CONFIGURATION_Handle *c)
2692 store = GNUNET_PSYCSTORE_connect (cfg);
2693 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2694 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2695 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2696 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2697 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2698 nc = GNUNET_SERVER_notification_context_create (server, 1);
2699 GNUNET_SERVER_add_handlers (server, server_handlers);
2700 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2701 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2706 * The main function for the service.
2708 * @param argc number of arguments from the command line
2709 * @param argv command line arguments
2710 * @return 0 ok, 1 on error
2713 main (int argc, char *const *argv)
2715 return (GNUNET_OK ==
2716 GNUNET_SERVICE_run (argc, argv, "psyc",
2717 GNUNET_SERVICE_OPTION_NONE,
2718 &run, NULL)) ? 0 : 1;
2721 /* end of gnunet-service-psyc.c */