2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
17 * @file psyc/gnunet-service-psyc.c
19 * @author Gabor X Toth
25 #include "gnunet_util_lib.h"
26 #include "gnunet_constants.h"
27 #include "gnunet_protocols.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet_multicast_service.h"
30 #include "gnunet_psycstore_service.h"
31 #include "gnunet_psyc_service.h"
32 #include "gnunet_psyc_util_lib.h"
37 * Handle to our current configuration.
39 static const struct GNUNET_CONFIGURATION_Handle *cfg;
44 static struct GNUNET_SERVICE_Handle *service;
47 * Handle to the statistics service.
49 static struct GNUNET_STATISTICS_Handle *stats;
52 * Handle to the PSYCstore.
54 static struct GNUNET_PSYCSTORE_Handle *store;
57 * All connected masters.
58 * Channel's pub_key_hash -> struct Master
60 static struct GNUNET_CONTAINER_MultiHashMap *masters;
63 * All connected slaves.
64 * Channel's pub_key_hash -> struct Slave
66 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
69 * Connected slaves per channel.
70 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
72 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
76 * Message in the transmission queue.
78 struct TransmitMessage
80 struct TransmitMessage *prev;
81 struct TransmitMessage *next;
83 struct GNUNET_SERVICE_Client *client;
86 * ID assigned to the message.
96 * Type of first message part.
101 * Type of last message part.
105 /* Followed by message */
110 * Cache for received message fragments.
111 * Message fragments are only sent to clients after all modifiers arrived.
113 * chan_key -> MultiHashMap chan_msgs
115 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
119 * Entry in the chan_msgs hashmap of @a recv_cache:
120 * fragment_id -> RecvCacheEntry
122 struct RecvCacheEntry
124 struct GNUNET_MULTICAST_MessageHeader *mmsg;
130 * Entry in the @a recv_frags hash map of a @a Channel.
131 * message_id -> FragmentQueue
136 * Fragment IDs stored in @a recv_cache.
138 struct GNUNET_CONTAINER_Heap *fragments;
141 * Total size of received fragments.
146 * Total size of received header fragments (METHOD & MODIFIERs)
148 uint64_t header_size;
151 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
153 uint64_t state_delta;
156 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
161 * Receive state of message.
163 * @see MessageFragmentState
168 * Whether the state is already modified in PSYCstore.
170 uint8_t state_is_modified;
173 * Is the message queued for delivery to the client?
174 * i.e. added to the recv_msgs queue
181 * List of connected clients.
185 struct ClientList *prev;
186 struct ClientList *next;
188 struct GNUNET_SERVICE_Client *client;
194 struct Operation *prev;
195 struct Operation *next;
197 struct GNUNET_SERVICE_Client *client;
198 struct Channel *channel;
205 * Common part of the client context for both a channel master and slave.
209 struct ClientList *clients_head;
210 struct ClientList *clients_tail;
212 struct Operation *op_head;
213 struct Operation *op_tail;
215 struct TransmitMessage *tmit_head;
216 struct TransmitMessage *tmit_tail;
219 * Current PSYCstore operation.
221 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
224 * Received fragments not yet sent to the client.
225 * message_id -> FragmentQueue
227 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
230 * Received message IDs not yet sent to the client.
232 struct GNUNET_CONTAINER_Heap *recv_msgs;
235 * Public key of the channel.
237 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
240 * Hash of @a pub_key.
242 struct GNUNET_HashCode pub_key_hash;
245 * Last message ID sent to the client.
246 * 0 if there is no such message.
248 uint64_t max_message_id;
251 * ID of the last stateful message, where the state operations has been
252 * processed and saved to PSYCstore and which has been sent to the client.
253 * 0 if there is no such message.
255 uint64_t max_state_message_id;
258 * Expected value size for the modifier being received from the PSYC service.
260 uint32_t tmit_mod_value_size_expected;
263 * Actual value size for the modifier being received from the PSYC service.
265 uint32_t tmit_mod_value_size;
268 * Is this channel ready to receive messages from client?
269 * #GNUNET_YES or #GNUNET_NO
274 * Is the client disconnected?
275 * #GNUNET_YES or #GNUNET_NO
277 uint8_t is_disconnecting;
280 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
285 struct Master *master;
292 * Client context for a channel master.
297 * Channel struct common for Master and Slave
299 struct Channel channel;
302 * Private key of the channel.
304 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
307 * Handle for the multicast origin.
309 struct GNUNET_MULTICAST_Origin *origin;
312 * Transmit handle for multicast.
314 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
317 * Incoming join requests from multicast.
318 * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
320 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
323 * Last message ID transmitted to this channel.
325 * Incremented before sending a message, thus the message_id in messages sent
328 uint64_t max_message_id;
331 * ID of the last message with state operations transmitted to the channel.
332 * 0 if there is no such message.
334 uint64_t max_state_message_id;
337 * Maximum group generation transmitted to the channel.
339 uint64_t max_group_generation;
342 * @see enum GNUNET_PSYC_Policy
344 enum GNUNET_PSYC_Policy policy;
349 * Client context for a channel slave.
354 * Channel struct common for Master and Slave
356 struct Channel channel;
359 * Private key of the slave.
361 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
364 * Public key of the slave.
366 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
369 * Hash of @a pub_key.
371 struct GNUNET_HashCode pub_key_hash;
374 * Handle for the multicast member.
376 struct GNUNET_MULTICAST_Member *member;
379 * Transmit handle for multicast.
381 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
384 * Peer identity of the origin.
386 struct GNUNET_PeerIdentity origin;
389 * Number of items in @a relays.
391 uint32_t relay_count;
394 * Relays that multicast can use to connect.
396 struct GNUNET_PeerIdentity *relays;
399 * Join request to be transmitted to the master on join.
401 struct GNUNET_PSYC_Message *join_msg;
404 * Join decision received from multicast.
406 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
409 * Maximum request ID for this channel.
411 uint64_t max_request_id;
416 enum GNUNET_PSYC_SlaveJoinFlags join_flags;
424 struct GNUNET_SERVICE_Client *client;
425 struct Channel *channel;
429 struct ReplayRequestKey
431 uint64_t fragment_id;
433 uint64_t fragment_offset;
439 transmit_message (struct Channel *chn);
442 message_queue_run (struct Channel *chn);
445 message_queue_drop (struct Channel *chn);
449 schedule_transmit_message (void *cls)
451 struct Channel *chn = cls;
453 transmit_message (chn);
458 * Task run during shutdown.
463 shutdown_task (void *cls)
465 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
466 "shutting down...\n");
467 GNUNET_PSYCSTORE_disconnect (store);
470 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
476 static struct Operation *
477 op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
478 uint64_t op_id, uint32_t flags)
480 struct Operation *op = GNUNET_malloc (sizeof (*op));
485 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
491 op_remove (struct Operation *op)
493 GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
499 * Clean up master data structures after a client disconnected.
502 cleanup_master (struct Master *mst)
504 struct Channel *chn = &mst->channel;
506 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
507 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
512 * Clean up slave data structures after a client disconnected.
515 cleanup_slave (struct Slave *slv)
517 struct Channel *chn = &slv->channel;
518 struct GNUNET_CONTAINER_MultiHashMap *
519 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
521 GNUNET_assert (NULL != chn_slv);
522 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
524 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
526 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
528 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
530 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
532 if (NULL != slv->join_msg)
534 GNUNET_free (slv->join_msg);
535 slv->join_msg = NULL;
537 if (NULL != slv->relays)
539 GNUNET_free (slv->relays);
542 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
547 * Clean up channel data structures after a client disconnected.
550 cleanup_channel (struct Channel *chn)
552 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553 "%p Cleaning up channel %s. master? %u\n",
555 GNUNET_h2s (&chn->pub_key_hash),
557 message_queue_drop (chn);
558 GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
559 chn->recv_frags = NULL;
561 if (NULL != chn->store_op)
563 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
564 chn->store_op = NULL;
567 (GNUNET_YES == chn->is_master)
568 ? cleanup_master (chn->master)
569 : cleanup_slave (chn->slave);
575 * Called whenever a client is disconnected.
576 * Frees our resources associated with that client.
579 * @param client identification of the client
580 * @param app_ctx must match @a client
583 client_notify_disconnect (void *cls,
584 struct GNUNET_SERVICE_Client *client,
587 struct Client *c = app_ctx;
588 struct Channel *chn = c->channel;
593 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594 "%p User context is NULL in client_notify_disconnect ()\n",
600 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
601 "%p Client %p (%s) disconnected from channel %s\n",
604 (GNUNET_YES == chn->is_master) ? "master" : "slave",
605 GNUNET_h2s (&chn->pub_key_hash));
607 struct ClientList *cli = chn->clients_head;
610 if (cli->client == client)
612 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
619 struct Operation *op = chn->op_head;
622 if (op->client == client)
630 if (NULL == chn->clients_head)
631 { /* Last client disconnected. */
632 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
633 "%p Last client (%s) disconnected from channel %s\n",
635 (GNUNET_YES == chn->is_master) ? "master" : "slave",
636 GNUNET_h2s (&chn->pub_key_hash));
637 chn->is_disconnecting = GNUNET_YES;
638 cleanup_channel (chn);
644 * A new client connected.
647 * @param client client to add
648 * @param mq message queue for @a client
652 client_notify_connect (void *cls,
653 struct GNUNET_SERVICE_Client *client,
654 struct GNUNET_MQ_Handle *mq)
656 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
658 struct Client *c = GNUNET_malloc (sizeof (*c));
666 * Send message to all clients connected to the channel.
669 client_send_msg (const struct Channel *chn,
670 const struct GNUNET_MessageHeader *msg)
672 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
673 "Sending message to clients of channel %p.\n",
676 struct ClientList *cli = chn->clients_head;
679 struct GNUNET_MQ_Envelope *
680 env = GNUNET_MQ_msg_copy (msg);
682 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
690 * Send a result code back to the client.
693 * Client that should receive the result code.
697 * Operation ID in network byte order.
699 * Data payload or NULL.
704 client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
705 int64_t result_code, const void *data, uint16_t data_size)
707 struct GNUNET_OperationResultMessage *res;
708 struct GNUNET_MQ_Envelope *
709 env = GNUNET_MQ_msg_extra (res,
711 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
712 res->result_code = GNUNET_htonll (result_code);
715 GNUNET_memcpy (&res[1], data, data_size);
717 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
718 "%p Sending result to client for OP ID %" PRIu64 ": %" PRId64 " (size: %u)\n",
720 GNUNET_ntohll (op_id),
724 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
729 * Closure for join_mem_test_cb()
731 struct JoinMemTestClosure
733 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
734 struct Channel *channel;
735 struct GNUNET_MULTICAST_JoinHandle *join_handle;
736 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
741 * Membership test result callback used for join requests.
744 join_mem_test_cb (void *cls, int64_t result,
745 const char *err_msg, uint16_t err_msg_size)
747 struct JoinMemTestClosure *jcls = cls;
749 if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
750 { /* Pass on join request to client if this is a master channel */
751 struct Master *mst = jcls->channel->master;
752 struct GNUNET_HashCode slave_pub_hash;
753 GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
755 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
756 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
757 client_send_msg (jcls->channel, &jcls->join_msg->header);
761 if (GNUNET_SYSERR == result)
763 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
764 "Could not perform membership test (%.*s)\n",
765 err_msg_size, err_msg);
768 GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
770 GNUNET_free (jcls->join_msg);
776 * Incoming join request from multicast.
779 mcast_recv_join_request (void *cls,
780 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
781 const struct GNUNET_MessageHeader *join_msg,
782 struct GNUNET_MULTICAST_JoinHandle *jh)
784 struct Channel *chn = cls;
785 uint16_t join_msg_size = 0;
787 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788 "%p Got join request.\n",
790 if (NULL != join_msg)
792 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
794 join_msg_size = ntohs (join_msg->size);
798 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
799 "%p Got join message with invalid type %u.\n",
801 ntohs (join_msg->type));
805 struct GNUNET_PSYC_JoinRequestMessage *
806 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
807 req->header.size = htons (sizeof (*req) + join_msg_size);
808 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
809 req->slave_pub_key = *slave_pub_key;
810 if (0 < join_msg_size)
811 GNUNET_memcpy (&req[1], join_msg, join_msg_size);
813 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
814 jcls->slave_pub_key = *slave_pub_key;
816 jcls->join_handle = jh;
817 jcls->join_msg = req;
819 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
820 chn->max_message_id, 0,
821 &join_mem_test_cb, jcls);
826 * Join decision received from multicast.
829 mcast_recv_join_decision (void *cls, int is_admitted,
830 const struct GNUNET_PeerIdentity *peer,
831 uint16_t relay_count,
832 const struct GNUNET_PeerIdentity *relays,
833 const struct GNUNET_MessageHeader *join_resp)
835 struct Slave *slv = cls;
836 struct Channel *chn = &slv->channel;
837 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838 "%p Got join decision: %d\n",
841 if (GNUNET_YES == chn->is_ready)
843 /* Already admitted */
847 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
848 struct GNUNET_PSYC_JoinDecisionMessage *
849 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
850 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
851 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
852 dcsn->is_admitted = htonl (is_admitted);
853 if (0 < join_resp_size)
854 GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
856 client_send_msg (chn, &dcsn->header);
858 if (GNUNET_YES == is_admitted
859 && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
861 chn->is_ready = GNUNET_YES;
867 store_recv_fragment_replay (void *cls,
868 struct GNUNET_MULTICAST_MessageHeader *msg,
869 enum GNUNET_PSYCSTORE_MessageFlags flags)
871 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
873 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
879 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
882 store_recv_fragment_replay_result (void *cls,
885 uint16_t err_msg_size)
887 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
889 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
890 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
901 GNUNET_MULTICAST_replay_response (rh, NULL,
902 GNUNET_MULTICAST_REC_NOT_FOUND);
905 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
906 GNUNET_MULTICAST_replay_response (rh, NULL,
907 GNUNET_MULTICAST_REC_ACCESS_DENIED);
911 GNUNET_MULTICAST_replay_response (rh, NULL,
912 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
915 /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
916 * an error code, so it must be ensured no further processing
917 * is attempted on 'rh'. Maybe this should be refactored as
918 * it doesn't look very intuitive. --lynX
920 GNUNET_MULTICAST_replay_response_end (rh);
925 * Incoming fragment replay request from multicast.
928 mcast_recv_replay_fragment (void *cls,
929 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
930 uint64_t fragment_id, uint64_t flags,
931 struct GNUNET_MULTICAST_ReplayHandle *rh)
934 struct Channel *chn = cls;
935 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
936 fragment_id, fragment_id,
937 &store_recv_fragment_replay,
938 &store_recv_fragment_replay_result, rh);
943 * Incoming message replay request from multicast.
946 mcast_recv_replay_message (void *cls,
947 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
949 uint64_t fragment_offset,
951 struct GNUNET_MULTICAST_ReplayHandle *rh)
953 struct Channel *chn = cls;
954 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
955 message_id, message_id, 1, NULL,
956 &store_recv_fragment_replay,
957 &store_recv_fragment_replay_result, rh);
962 * Convert an uint64_t in network byte order to a HashCode
963 * that can be used as key in a MultiHashMap
966 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
968 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
969 /* TODO: use built-in byte swap functions if available */
971 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
972 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
974 *key = (struct GNUNET_HashCode) {};
976 = (n << 32) | (n >> 32);
981 * Convert an uint64_t in host byte order to a HashCode
982 * that can be used as key in a MultiHashMap
985 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
987 #if __BYTE_ORDER == __BIG_ENDIAN
988 hash_key_from_nll (key, n);
989 #elif __BYTE_ORDER == __LITTLE_ENDIAN
990 *key = (struct GNUNET_HashCode) {};
991 *((uint64_t *) key) = n;
993 #error byteorder undefined
999 * Initialize PSYC message header.
1002 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1003 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1005 uint16_t size = ntohs (mmsg->header.size);
1006 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1008 pmsg->header.size = htons (psize);
1009 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1010 pmsg->message_id = mmsg->message_id;
1011 pmsg->fragment_offset = mmsg->fragment_offset;
1012 pmsg->flags = htonl (flags);
1014 GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1019 * Create a new PSYC message from a multicast message for sending it to clients.
1021 static inline struct GNUNET_PSYC_MessageHeader *
1022 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1024 struct GNUNET_PSYC_MessageHeader *pmsg;
1025 uint16_t size = ntohs (mmsg->header.size);
1026 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1028 pmsg = GNUNET_malloc (psize);
1029 psyc_msg_init (pmsg, mmsg, flags);
1035 * Send multicast message to all clients connected to the channel.
1038 client_send_mcast_msg (struct Channel *chn,
1039 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1042 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043 "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1045 GNUNET_ntohll (mmsg->fragment_id),
1046 GNUNET_ntohll (mmsg->message_id));
1048 struct GNUNET_PSYC_MessageHeader *
1049 pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1050 client_send_msg (chn, &pmsg->header);
1056 * Send multicast request to all clients connected to the channel.
1059 client_send_mcast_req (struct Master *mst,
1060 const struct GNUNET_MULTICAST_RequestHeader *req)
1062 struct Channel *chn = &mst->channel;
1064 struct GNUNET_PSYC_MessageHeader *pmsg;
1065 uint16_t size = ntohs (req->header.size);
1066 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1068 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1069 "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1071 GNUNET_ntohll (req->fragment_id),
1072 GNUNET_ntohll (req->request_id));
1074 pmsg = GNUNET_malloc (psize);
1075 pmsg->header.size = htons (psize);
1076 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1077 pmsg->message_id = req->request_id;
1078 pmsg->fragment_offset = req->fragment_offset;
1079 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1080 pmsg->slave_pub_key = req->member_pub_key;
1081 GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1083 client_send_msg (chn, &pmsg->header);
1085 /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1092 * Insert a multicast message fragment into the queue belonging to the message.
1094 * @param chn Channel.
1095 * @param mmsg Multicast message fragment.
1096 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
1097 * @param first_ptype First PSYC message part type in @a mmsg.
1098 * @param last_ptype Last PSYC message part type in @a mmsg.
1101 fragment_queue_insert (struct Channel *chn,
1102 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1103 uint16_t first_ptype, uint16_t last_ptype)
1105 const uint16_t size = ntohs (mmsg->header.size);
1106 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1107 struct GNUNET_CONTAINER_MultiHashMap
1108 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1109 &chn->pub_key_hash);
1111 struct GNUNET_HashCode msg_id_hash;
1112 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1114 struct FragmentQueue
1115 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1119 fragq = GNUNET_malloc (sizeof (*fragq));
1120 fragq->state = MSG_FRAG_STATE_HEADER;
1122 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1124 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1125 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1127 if (NULL == chan_msgs)
1129 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1130 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1131 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1135 struct GNUNET_HashCode frag_id_hash;
1136 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1137 struct RecvCacheEntry
1138 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1139 if (NULL == cache_entry)
1141 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1142 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1144 GNUNET_ntohll (mmsg->message_id),
1145 GNUNET_ntohll (mmsg->fragment_id));
1146 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147 "%p header_size: %" PRIu64 " + %u\n",
1151 cache_entry = GNUNET_malloc (sizeof (*cache_entry));
1152 cache_entry->ref_count = 1;
1153 cache_entry->mmsg = GNUNET_malloc (size);
1154 GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1155 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1156 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1160 cache_entry->ref_count++;
1161 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1162 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1164 GNUNET_ntohll (mmsg->message_id),
1165 GNUNET_ntohll (mmsg->fragment_id),
1166 cache_entry->ref_count);
1169 if (MSG_FRAG_STATE_HEADER == fragq->state)
1171 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1173 struct GNUNET_PSYC_MessageMethod *
1174 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1175 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1176 fragq->flags = ntohl (pmeth->flags);
1179 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1181 fragq->header_size += size;
1183 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1184 || frag_offset == fragq->header_size)
1185 { /* header is now complete */
1186 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1187 "%p Header of message %" PRIu64 " is complete.\n",
1189 GNUNET_ntohll (mmsg->message_id));
1191 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1192 "%p Adding message %" PRIu64 " to queue.\n",
1194 GNUNET_ntohll (mmsg->message_id));
1195 fragq->state = MSG_FRAG_STATE_DATA;
1199 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1200 "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1202 GNUNET_ntohll (mmsg->message_id),
1204 fragq->header_size);
1210 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1211 if (frag_offset == fragq->size)
1212 fragq->state = MSG_FRAG_STATE_END;
1214 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1215 "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1217 GNUNET_ntohll (mmsg->message_id),
1222 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1223 /* Drop message without delivering to client if it's a single fragment */
1225 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1226 ? MSG_FRAG_STATE_DROP
1227 : MSG_FRAG_STATE_CANCEL;
1230 switch (fragq->state)
1232 case MSG_FRAG_STATE_DATA:
1233 case MSG_FRAG_STATE_END:
1234 case MSG_FRAG_STATE_CANCEL:
1235 if (GNUNET_NO == fragq->is_queued)
1237 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1238 GNUNET_ntohll (mmsg->message_id));
1239 fragq->is_queued = GNUNET_YES;
1243 fragq->size += size;
1244 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1245 GNUNET_ntohll (mmsg->fragment_id));
1250 * Run fragment queue of a message.
1252 * Send fragments of a message in order to client, after all modifiers arrived
1258 * ID of the message @a fragq belongs to.
1260 * Fragment queue of the message.
1262 * Drop message without delivering to client?
1263 * #GNUNET_YES or #GNUNET_NO.
1266 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1267 struct FragmentQueue *fragq, uint8_t drop)
1269 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1270 "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1275 struct GNUNET_CONTAINER_MultiHashMap
1276 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1277 &chn->pub_key_hash);
1278 GNUNET_assert (NULL != chan_msgs);
1281 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1284 struct GNUNET_HashCode frag_id_hash;
1285 hash_key_from_hll (&frag_id_hash, frag_id);
1286 struct RecvCacheEntry *cache_entry
1287 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1288 if (cache_entry != NULL)
1290 if (GNUNET_NO == drop)
1292 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1294 if (cache_entry->ref_count <= 1)
1296 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1298 GNUNET_free (cache_entry->mmsg);
1299 GNUNET_free (cache_entry);
1303 cache_entry->ref_count--;
1306 #if CACHE_AGING_IMPLEMENTED
1307 else if (GNUNET_NO == drop)
1309 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1313 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1316 if (MSG_FRAG_STATE_END <= fragq->state)
1318 struct GNUNET_HashCode msg_id_hash;
1319 hash_key_from_hll (&msg_id_hash, msg_id);
1321 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1322 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1323 GNUNET_free (fragq);
1327 fragq->is_queued = GNUNET_NO;
1332 struct StateModifyClosure
1334 struct Channel *channel;
1336 struct GNUNET_HashCode msg_id_hash;
1341 store_recv_state_modify_result (void *cls, int64_t result,
1342 const char *err_msg, uint16_t err_msg_size)
1344 struct StateModifyClosure *mcls = cls;
1345 struct Channel *chn = mcls->channel;
1346 uint64_t msg_id = mcls->msg_id;
1348 struct FragmentQueue *
1349 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1351 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1352 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1353 chn, result, err_msg_size, err_msg);
1360 fragq->state_is_modified = GNUNET_YES;
1361 if (chn->max_state_message_id < msg_id)
1362 chn->max_state_message_id = msg_id;
1363 if (chn->max_message_id < msg_id)
1364 chn->max_message_id = msg_id;
1367 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1368 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1369 message_queue_run (chn);
1373 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1374 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1375 chn, result, err_msg_size, err_msg);
1376 /** @todo FIXME: handle state_modify error */
1382 * Run message queue.
1384 * Send messages in queue to client in order after a message has arrived from
1385 * multicast, according to the following:
1386 * - A message is only sent if all of its modifiers arrived.
1387 * - A stateful message is only sent if the previous stateful message
1388 * has already been delivered to the client.
1390 * @param chn Channel.
1392 * @return Number of messages removed from queue and sent to client.
1395 message_queue_run (struct Channel *chn)
1397 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1398 "%p Running message queue.\n", chn);
1402 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1406 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1407 struct GNUNET_HashCode msg_id_hash;
1408 hash_key_from_hll (&msg_id_hash, msg_id);
1410 struct FragmentQueue *
1411 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1413 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1415 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1416 "%p No fragq (%p) or header not complete.\n",
1421 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1422 "%p Fragment queue entry: state: %u, state delta: "
1423 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1424 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1426 if (MSG_FRAG_STATE_DATA <= fragq->state)
1428 /* Check if there's a missing message before the current one */
1429 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1433 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1434 && (chn->max_message_id != msg_id - 1
1435 && chn->max_message_id != msg_id))
1437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1438 "%p Out of order message. "
1439 "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1440 chn, chn->max_message_id, msg_id);
1442 // FIXME: keep track of messages processed in this queue run,
1443 // and only stop after reaching the end
1448 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1449 if (GNUNET_YES != fragq->state_is_modified)
1451 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1453 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1454 "%p Out of order stateful message. "
1455 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1456 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1458 // FIXME: keep track of messages processed in this queue run,
1459 // and only stop after reaching the end
1462 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1463 mcls->channel = chn;
1464 mcls->msg_id = msg_id;
1465 mcls->msg_id_hash = msg_id_hash;
1467 /* Apply modifiers to state in PSYCstore */
1468 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1470 store_recv_state_modify_result, mcls);
1471 break; // continue after asynchronous state modify result
1474 chn->max_message_id = msg_id;
1476 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1477 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1482 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1488 * Drop message queue of a channel.
1490 * Remove all messages in queue without sending it to clients.
1492 * @param chn Channel.
1494 * @return Number of messages removed from queue.
1497 message_queue_drop (struct Channel *chn)
1501 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1504 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1505 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1506 struct GNUNET_HashCode msg_id_hash;
1507 hash_key_from_hll (&msg_id_hash, msg_id);
1509 struct FragmentQueue *
1510 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1511 GNUNET_assert (NULL != fragq);
1512 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1513 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1516 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1517 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1523 * Received result of GNUNET_PSYCSTORE_fragment_store().
1526 store_recv_fragment_store_result (void *cls, int64_t result,
1527 const char *err_msg, uint16_t err_msg_size)
1529 struct Channel *chn = cls;
1530 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1531 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1532 chn, result, err_msg_size, err_msg);
1537 * Handle incoming message fragment from multicast.
1539 * Store it using PSYCstore and send it to the clients of the channel in order.
1542 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1544 struct Channel *chn = cls;
1545 uint16_t size = ntohs (mmsg->header.size);
1547 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1548 "%p Received multicast message of size %u. "
1549 "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1550 ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1552 GNUNET_ntohll (mmsg->fragment_id),
1553 GNUNET_ntohll (mmsg->message_id),
1554 GNUNET_ntohll (mmsg->fragment_offset),
1555 GNUNET_ntohll (mmsg->flags));
1557 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1558 &store_recv_fragment_store_result, chn);
1560 uint16_t first_ptype = 0, last_ptype = 0;
1561 int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1562 (const char *) &mmsg[1],
1563 &first_ptype, &last_ptype);
1564 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1565 "%p Message check result %d, first part type %u, last part type %u\n",
1566 chn, check, first_ptype, last_ptype);
1567 if (GNUNET_SYSERR == check)
1569 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1570 "%p Dropping incoming multicast message with invalid parts.\n",
1572 GNUNET_break_op (0);
1576 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1577 message_queue_run (chn);
1582 * Incoming request fragment from multicast for a master.
1584 * @param cls Master.
1585 * @param req The request.
1588 mcast_recv_request (void *cls,
1589 const struct GNUNET_MULTICAST_RequestHeader *req)
1591 struct Master *mst = cls;
1592 uint16_t size = ntohs (req->header.size);
1594 char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1595 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1596 "%p Received multicast request of size %u from %s.\n",
1600 uint16_t first_ptype = 0, last_ptype = 0;
1602 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1603 (const char *) &req[1],
1604 &first_ptype, &last_ptype))
1606 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1607 "%p Dropping incoming multicast request with invalid parts.\n",
1609 GNUNET_break_op (0);
1613 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1614 "Message parts: first: type %u, last: type %u\n",
1615 first_ptype, last_ptype);
1617 /* FIXME: in-order delivery */
1618 client_send_mcast_req (mst, req);
1623 * Response from PSYCstore with the current counter values for a channel master.
1626 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1627 uint64_t max_message_id, uint64_t max_group_generation,
1628 uint64_t max_state_message_id)
1630 struct Master *mst = cls;
1631 struct Channel *chn = &mst->channel;
1632 chn->store_op = NULL;
1634 struct GNUNET_PSYC_CountersResultMessage res;
1635 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1636 res.header.size = htons (sizeof (res));
1637 res.result_code = htonl (result);
1638 res.max_message_id = GNUNET_htonll (max_message_id);
1640 if (GNUNET_OK == result || GNUNET_NO == result)
1642 mst->max_message_id = max_message_id;
1643 chn->max_message_id = max_message_id;
1644 chn->max_state_message_id = max_state_message_id;
1645 mst->max_group_generation = max_group_generation;
1647 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1648 mcast_recv_join_request,
1649 mcast_recv_replay_fragment,
1650 mcast_recv_replay_message,
1652 mcast_recv_message, chn);
1653 chn->is_ready = GNUNET_YES;
1657 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1658 "%p GNUNET_PSYCSTORE_counters_get() "
1659 "returned %d for channel %s.\n",
1660 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1663 client_send_msg (chn, &res.header);
1668 * Response from PSYCstore with the current counter values for a channel slave.
1671 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1672 uint64_t max_message_id, uint64_t max_group_generation,
1673 uint64_t max_state_message_id)
1675 struct Slave *slv = cls;
1676 struct Channel *chn = &slv->channel;
1677 chn->store_op = NULL;
1679 struct GNUNET_PSYC_CountersResultMessage res;
1680 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1681 res.header.size = htons (sizeof (res));
1682 res.result_code = htonl (result);
1683 res.max_message_id = GNUNET_htonll (max_message_id);
1685 if (GNUNET_YES == result || GNUNET_NO == result)
1687 chn->max_message_id = max_message_id;
1688 chn->max_state_message_id = max_state_message_id;
1690 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1692 slv->relay_count, slv->relays,
1693 &slv->join_msg->header,
1694 mcast_recv_join_request,
1695 mcast_recv_join_decision,
1696 mcast_recv_replay_fragment,
1697 mcast_recv_replay_message,
1698 mcast_recv_message, chn);
1699 if (NULL != slv->join_msg)
1701 GNUNET_free (slv->join_msg);
1702 slv->join_msg = NULL;
1707 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1708 "%p GNUNET_PSYCSTORE_counters_get() "
1709 "returned %d for channel %s.\n",
1710 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1713 client_send_msg (chn, &res.header);
1718 channel_init (struct Channel *chn)
1721 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1722 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1727 * Handle a connecting client starting a channel master.
1730 handle_client_master_start (void *cls,
1731 const struct MasterStartRequest *req)
1733 struct Client *c = cls;
1734 struct GNUNET_SERVICE_Client *client = c->client;
1736 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1737 struct GNUNET_HashCode pub_key_hash;
1739 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1740 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1743 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1744 struct Channel *chn;
1748 mst = GNUNET_malloc (sizeof (*mst));
1749 mst->policy = ntohl (req->policy);
1750 mst->priv_key = req->channel_key;
1751 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1753 chn = c->channel = &mst->channel;
1755 chn->is_master = GNUNET_YES;
1756 chn->pub_key = pub_key;
1757 chn->pub_key_hash = pub_key_hash;
1760 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1761 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1762 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1763 store_recv_master_counters, mst);
1767 chn = &mst->channel;
1769 struct GNUNET_PSYC_CountersResultMessage *res;
1770 struct GNUNET_MQ_Envelope *
1771 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1772 res->result_code = htonl (GNUNET_OK);
1773 res->max_message_id = GNUNET_htonll (mst->max_message_id);
1775 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1778 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1779 "%p Client connected as master to channel %s.\n",
1780 mst, GNUNET_h2s (&chn->pub_key_hash));
1782 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1783 cli->client = client;
1784 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1786 GNUNET_SERVICE_client_continue (client);
1791 check_client_slave_join (void *cls,
1792 const struct SlaveJoinRequest *req)
1799 * Handle a connecting client joining as a channel slave.
1802 handle_client_slave_join (void *cls,
1803 const struct SlaveJoinRequest *req)
1805 struct Client *c = cls;
1806 struct GNUNET_SERVICE_Client *client = c->client;
1808 uint16_t req_size = ntohs (req->header.size);
1810 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1811 struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1813 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1814 "got join request from client %p\n",
1816 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1817 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1818 GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1820 struct GNUNET_CONTAINER_MultiHashMap *
1821 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1822 struct Slave *slv = NULL;
1823 struct Channel *chn;
1825 if (NULL != chn_slv)
1827 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1831 slv = GNUNET_malloc (sizeof (*slv));
1832 slv->priv_key = req->slave_key;
1833 slv->pub_key = slv_pub_key;
1834 slv->pub_key_hash = slv_pub_hash;
1835 slv->origin = req->origin;
1836 slv->relay_count = ntohl (req->relay_count);
1837 slv->join_flags = ntohl (req->flags);
1839 const struct GNUNET_PeerIdentity *
1840 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1841 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1842 uint16_t join_msg_size = 0;
1844 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1847 struct GNUNET_PSYC_Message *
1848 join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1849 join_msg_size = ntohs (join_msg->header.size);
1850 slv->join_msg = GNUNET_malloc (join_msg_size);
1851 GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1853 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1855 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1856 "%u + %u + %u != %u\n",
1857 (unsigned int) sizeof (*req),
1862 GNUNET_SERVICE_client_drop (client);
1866 if (0 < slv->relay_count)
1868 slv->relays = GNUNET_malloc (relay_size);
1869 GNUNET_memcpy (slv->relays, &req[1], relay_size);
1872 chn = c->channel = &slv->channel;
1874 chn->is_master = GNUNET_NO;
1875 chn->pub_key = req->channel_pub_key;
1876 chn->pub_key_hash = pub_key_hash;
1879 if (NULL == chn_slv)
1881 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1882 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1883 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1885 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1886 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1887 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1888 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1889 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1890 &store_recv_slave_counters, slv);
1894 chn = &slv->channel;
1896 struct GNUNET_PSYC_CountersResultMessage *res;
1898 struct GNUNET_MQ_Envelope *
1899 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1900 res->result_code = htonl (GNUNET_OK);
1901 res->max_message_id = GNUNET_htonll (chn->max_message_id);
1903 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1905 if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1907 mcast_recv_join_decision (slv, GNUNET_YES,
1908 NULL, 0, NULL, NULL);
1910 else if (NULL == slv->member)
1913 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1915 slv->relay_count, slv->relays,
1916 &slv->join_msg->header,
1917 &mcast_recv_join_request,
1918 &mcast_recv_join_decision,
1919 &mcast_recv_replay_fragment,
1920 &mcast_recv_replay_message,
1921 &mcast_recv_message, chn);
1922 if (NULL != slv->join_msg)
1924 GNUNET_free (slv->join_msg);
1925 slv->join_msg = NULL;
1928 else if (NULL != slv->join_dcsn)
1930 struct GNUNET_MQ_Envelope *
1931 env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
1932 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1936 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1937 "Client %p connected as slave to channel %s.\n",
1939 GNUNET_h2s (&chn->pub_key_hash));
1941 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1942 cli->client = client;
1943 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1945 GNUNET_SERVICE_client_continue (client);
1949 struct JoinDecisionClosure
1951 int32_t is_admitted;
1952 struct GNUNET_MessageHeader *msg;
1957 * Iterator callback for sending join decisions to multicast.
1960 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1963 struct JoinDecisionClosure *jcls = cls;
1964 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1965 // FIXME: add relays
1966 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1972 check_client_join_decision (void *cls,
1973 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1980 * Join decision from client.
1983 handle_client_join_decision (void *cls,
1984 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1986 struct Client *c = cls;
1987 struct GNUNET_SERVICE_Client *client = c->client;
1988 struct Channel *chn = c->channel;
1992 GNUNET_SERVICE_client_drop (client);
1995 GNUNET_assert (GNUNET_YES == chn->is_master);
1996 struct Master *mst = chn->master;
1998 struct JoinDecisionClosure jcls;
1999 jcls.is_admitted = ntohl (dcsn->is_admitted);
2001 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
2002 ? (struct GNUNET_MessageHeader *) &dcsn[1]
2005 struct GNUNET_HashCode slave_pub_hash;
2006 GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
2009 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2010 "%p Got join decision (%d) from client for channel %s..\n",
2011 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
2012 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2013 "%p ..and slave %s.\n",
2014 mst, GNUNET_h2s (&slave_pub_hash));
2016 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
2017 &mcast_send_join_decision, &jcls);
2018 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
2019 GNUNET_SERVICE_client_continue (client);
2024 channel_part_cb (void *cls)
2026 struct GNUNET_SERVICE_Client *client = cls;
2027 struct GNUNET_MQ_Envelope *env;
2029 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_ACK);
2030 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
2036 handle_client_part_request (void *cls,
2037 const struct GNUNET_MessageHeader *msg)
2039 struct Client *c = cls;
2041 c->channel->is_disconnecting = GNUNET_YES;
2042 if (GNUNET_YES == c->channel->is_master)
2044 struct Master *mst = (struct Master *) c->channel;
2046 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2047 "Got part request from master %p\n",
2049 GNUNET_assert (NULL != mst->origin);
2050 GNUNET_MULTICAST_origin_stop (mst->origin, channel_part_cb, c->client);
2054 struct Slave *slv = (struct Slave *) c->channel;
2056 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2057 "Got part request from slave %p\n",
2059 GNUNET_assert (NULL != slv->member);
2060 GNUNET_MULTICAST_member_part (slv->member, channel_part_cb, c->client);
2062 GNUNET_SERVICE_client_continue (c->client);
2067 * Send acknowledgement to a client.
2069 * Sent after a message fragment has been passed on to multicast.
2071 * @param chn The channel struct for the client.
2074 send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2076 struct GNUNET_MessageHeader *res;
2077 struct GNUNET_MQ_Envelope *
2078 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
2081 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
2086 * Callback for the transmit functions of multicast.
2089 transmit_notify (void *cls, size_t *data_size, void *data)
2091 struct Channel *chn = cls;
2092 struct TransmitMessage *tmit_msg = chn->tmit_head;
2094 if (NULL == tmit_msg || *data_size < tmit_msg->size)
2096 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2097 "%p transmit_notify: nothing to send.\n", chn);
2098 if (NULL != tmit_msg && *data_size < tmit_msg->size)
2104 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2105 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2107 *data_size = tmit_msg->size;
2108 GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2111 = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2115 /* FIXME: handle disconnecting clients */
2116 if (NULL != tmit_msg->client)
2117 send_message_ack (chn, tmit_msg->client);
2119 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2121 if (NULL != chn->tmit_head)
2123 GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2125 else if (GNUNET_YES == chn->is_disconnecting
2126 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2128 /* FIXME: handle partial message (when still in_transmit) */
2129 GNUNET_free (tmit_msg);
2130 return GNUNET_SYSERR;
2132 GNUNET_free (tmit_msg);
2138 * Callback for the transmit functions of multicast.
2141 master_transmit_notify (void *cls, size_t *data_size, void *data)
2143 int ret = transmit_notify (cls, data_size, data);
2145 if (GNUNET_YES == ret)
2147 struct Master *mst = cls;
2148 mst->tmit_handle = NULL;
2155 * Callback for the transmit functions of multicast.
2158 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2160 int ret = transmit_notify (cls, data_size, data);
2162 if (GNUNET_YES == ret)
2164 struct Slave *slv = cls;
2165 slv->tmit_handle = NULL;
2172 * Transmit a message from a channel master to the multicast group.
2175 master_transmit_message (struct Master *mst)
2177 struct Channel *chn = &mst->channel;
2178 struct TransmitMessage *tmit_msg = chn->tmit_head;
2179 if (NULL == tmit_msg)
2181 if (NULL == mst->tmit_handle)
2183 mst->tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin,
2185 mst->max_group_generation,
2186 &master_transmit_notify,
2191 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2197 * Transmit a message from a channel slave to the multicast group.
2200 slave_transmit_message (struct Slave *slv)
2202 if (NULL == slv->channel.tmit_head)
2204 if (NULL == slv->tmit_handle)
2206 slv->tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member,
2207 slv->channel.tmit_head->id,
2208 &slave_transmit_notify,
2213 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2219 transmit_message (struct Channel *chn)
2222 ? master_transmit_message (chn->master)
2223 : slave_transmit_message (chn->slave);
2228 * Queue a message from a channel master for sending to the multicast group.
2231 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2233 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2235 tmit_msg->id = ++mst->max_message_id;
2236 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2237 "%p master_queue_message: message_id=%" PRIu64 "\n",
2239 struct GNUNET_PSYC_MessageMethod *pmeth
2240 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2242 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2244 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2246 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2249 "%p master_queue_message: state_delta=%" PRIu64 "\n",
2250 mst, tmit_msg->id - mst->max_state_message_id);
2251 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2252 - mst->max_state_message_id);
2253 mst->max_state_message_id = tmit_msg->id;
2257 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2258 "%p master_queue_message: state not modified\n", mst);
2259 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2262 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2264 /// @todo add state_hash to PSYC header
2271 * Queue a message from a channel slave for sending to the multicast group.
2274 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2276 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2278 struct GNUNET_PSYC_MessageMethod *pmeth
2279 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2280 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2281 tmit_msg->id = ++slv->max_request_id;
2287 * Queue PSYC message parts for sending to multicast.
2290 * Channel to send to.
2292 * Client the message originates from.
2296 * Concatenated message parts.
2297 * @param first_ptype
2298 * First message part type in @a data.
2300 * Last message part type in @a data.
2302 static struct TransmitMessage *
2303 queue_message (struct Channel *chn,
2304 struct GNUNET_SERVICE_Client *client,
2307 uint16_t first_ptype, uint16_t last_ptype)
2309 struct TransmitMessage *
2310 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2311 GNUNET_memcpy (&tmit_msg[1], data, data_size);
2312 tmit_msg->client = client;
2313 tmit_msg->size = data_size;
2314 tmit_msg->first_ptype = first_ptype;
2315 tmit_msg->last_ptype = last_ptype;
2317 /* FIXME: separate queue per message ID */
2319 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2322 ? master_queue_message (chn->master, tmit_msg)
2323 : slave_queue_message (chn->slave, tmit_msg);
2329 * Cancel transmission of current message.
2331 * @param chn Channel to send to.
2332 * @param client Client the message originates from.
2335 transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2337 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2339 struct GNUNET_MessageHeader msg;
2340 msg.size = htons (sizeof (msg));
2341 msg.type = htons (type);
2343 queue_message (chn, client, sizeof (msg), &msg, type, type);
2344 transmit_message (chn);
2346 /* FIXME: cleanup */
2351 check_client_psyc_message (void *cls,
2352 const struct GNUNET_MessageHeader *msg)
2359 * Incoming message from a master or slave client.
2362 handle_client_psyc_message (void *cls,
2363 const struct GNUNET_MessageHeader *msg)
2365 struct Client *c = cls;
2366 struct GNUNET_SERVICE_Client *client = c->client;
2367 struct Channel *chn = c->channel;
2371 GNUNET_SERVICE_client_drop (client);
2375 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2376 "%p Received message from client.\n", chn);
2377 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2379 if (GNUNET_YES != chn->is_ready)
2381 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2382 "%p Channel is not ready yet, disconnecting client %p.\n",
2386 GNUNET_SERVICE_client_drop (client);
2390 uint16_t size = ntohs (msg->size);
2391 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2393 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2394 "%p Message payload too large: %u < %u.\n",
2396 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2397 (unsigned int) (size - sizeof (*msg)));
2399 transmit_cancel (chn, client);
2400 GNUNET_SERVICE_client_drop (client);
2404 uint16_t first_ptype = 0, last_ptype = 0;
2406 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2407 (const char *) &msg[1],
2408 &first_ptype, &last_ptype))
2410 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2411 "%p Received invalid message part from client.\n", chn);
2413 transmit_cancel (chn, client);
2414 GNUNET_SERVICE_client_drop (client);
2417 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2418 "%p Received message with first part type %u and last part type %u.\n",
2419 chn, first_ptype, last_ptype);
2421 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2422 first_ptype, last_ptype);
2423 transmit_message (chn);
2424 /* FIXME: send a few ACKs even before transmit_notify is called */
2426 GNUNET_SERVICE_client_continue (client);
2431 * Received result of GNUNET_PSYCSTORE_membership_store()
2434 store_recv_membership_store_result (void *cls,
2436 const char *err_msg,
2437 uint16_t err_msg_size)
2439 struct Operation *op = cls;
2440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2441 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2447 if (NULL != op->client)
2448 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2454 * Client requests to add/remove a slave in the membership database.
2457 handle_client_membership_store (void *cls,
2458 const struct ChannelMembershipStoreRequest *req)
2460 struct Client *c = cls;
2461 struct GNUNET_SERVICE_Client *client = c->client;
2462 struct Channel *chn = c->channel;
2466 GNUNET_SERVICE_client_drop (client);
2470 struct Operation *op = op_add (chn, client, req->op_id, 0);
2472 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2473 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2474 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2475 "%p Received membership store request from client.\n", chn);
2476 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2477 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2478 chn, req->did_join, announced_at, effective_since);
2480 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2481 req->did_join, announced_at, effective_since,
2482 0, /* FIXME: group_generation */
2483 &store_recv_membership_store_result, op);
2484 GNUNET_SERVICE_client_continue (client);
2489 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2490 * in response to a history request from a client.
2493 store_recv_fragment_history (void *cls,
2494 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2495 enum GNUNET_PSYCSTORE_MessageFlags flags)
2497 struct Operation *op = cls;
2498 if (NULL == op->client)
2499 { /* Requesting client already disconnected. */
2502 struct Channel *chn = op->channel;
2504 struct GNUNET_PSYC_MessageHeader *pmsg;
2505 uint16_t msize = ntohs (mmsg->header.size);
2506 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2508 struct GNUNET_OperationResultMessage *
2509 res = GNUNET_malloc (sizeof (*res) + psize);
2510 res->header.size = htons (sizeof (*res) + psize);
2511 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2512 res->op_id = op->op_id;
2513 res->result_code = GNUNET_htonll (GNUNET_OK);
2515 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2516 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2517 GNUNET_memcpy (&res[1], pmsg, psize);
2519 /** @todo FIXME: send only to requesting client */
2520 client_send_msg (chn, &res->header);
2528 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2529 * in response to a history request from a client.
2532 store_recv_fragment_history_result (void *cls, int64_t result,
2533 const char *err_msg, uint16_t err_msg_size)
2535 struct Operation *op = cls;
2536 if (NULL == op->client)
2537 { /* Requesting client already disconnected. */
2541 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2542 "%p History replay #%" PRIu64 ": "
2543 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2544 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2546 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2548 /** @todo Multicast replay request for messages not found locally. */
2551 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2557 check_client_history_replay (void *cls,
2558 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2565 * Client requests channel history.
2568 handle_client_history_replay (void *cls,
2569 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2571 struct Client *c = cls;
2572 struct GNUNET_SERVICE_Client *client = c->client;
2573 struct Channel *chn = c->channel;
2577 GNUNET_SERVICE_client_drop (client);
2581 uint16_t size = ntohs (req->header.size);
2582 const char *method_prefix = (const char *) &req[1];
2584 if (size < sizeof (*req) + 1
2585 || '\0' != method_prefix[size - sizeof (*req) - 1])
2587 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2588 "%p History replay #%" PRIu64 ": "
2589 "invalid method prefix. size: %u < %u?\n",
2591 GNUNET_ntohll (req->op_id),
2593 (unsigned int) sizeof (*req) + 1);
2595 GNUNET_SERVICE_client_drop (client);
2599 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2601 if (0 == req->message_limit)
2603 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2604 GNUNET_ntohll (req->start_message_id),
2605 GNUNET_ntohll (req->end_message_id),
2607 &store_recv_fragment_history,
2608 &store_recv_fragment_history_result, op);
2612 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2613 GNUNET_ntohll (req->message_limit),
2615 &store_recv_fragment_history,
2616 &store_recv_fragment_history_result,
2619 GNUNET_SERVICE_client_continue (client);
2624 * Received state var from PSYCstore, send it to client.
2627 store_recv_state_var (void *cls, const char *name,
2628 const void *value, uint32_t value_size)
2630 struct Operation *op = cls;
2631 struct GNUNET_OperationResultMessage *res;
2632 struct GNUNET_MQ_Envelope *env;
2634 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2635 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2636 op->channel, GNUNET_ntohll (op->op_id), name);
2638 if (NULL != name) /* First part */
2640 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2641 struct GNUNET_PSYC_MessageModifier *mod;
2642 env = GNUNET_MQ_msg_extra (res,
2643 sizeof (*mod) + name_size + value_size,
2644 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2645 res->op_id = op->op_id;
2647 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2648 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2649 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2650 mod->name_size = htons (name_size);
2651 mod->value_size = htonl (value_size);
2652 mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2653 GNUNET_memcpy (&mod[1], name, name_size);
2654 GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2656 else /* Continuation */
2658 struct GNUNET_MessageHeader *mod;
2659 env = GNUNET_MQ_msg_extra (res,
2660 sizeof (*mod) + value_size,
2661 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2662 res->op_id = op->op_id;
2664 mod = (struct GNUNET_MessageHeader *) &res[1];
2665 mod->size = htons (sizeof (*mod) + value_size);
2666 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2667 GNUNET_memcpy (&mod[1], value, value_size);
2670 // FIXME: client might have been disconnected
2671 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
2677 * Received result of GNUNET_PSYCSTORE_state_get()
2678 * or GNUNET_PSYCSTORE_state_get_prefix()
2681 store_recv_state_result (void *cls, int64_t result,
2682 const char *err_msg, uint16_t err_msg_size)
2684 struct Operation *op = cls;
2685 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2686 "%p state_get #%" PRIu64 ": "
2687 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2688 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2690 // FIXME: client might have been disconnected
2691 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2697 check_client_state_get (void *cls,
2698 const struct StateRequest *req)
2700 struct Client *c = cls;
2701 struct Channel *chn = c->channel;
2705 return GNUNET_SYSERR;
2708 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2709 const char *name = (const char *) &req[1];
2710 if (0 == name_size || '\0' != name[name_size - 1])
2713 return GNUNET_SYSERR;
2721 * Client requests best matching state variable from PSYCstore.
2724 handle_client_state_get (void *cls,
2725 const struct StateRequest *req)
2727 struct Client *c = cls;
2728 struct GNUNET_SERVICE_Client *client = c->client;
2729 struct Channel *chn = c->channel;
2731 const char *name = (const char *) &req[1];
2732 struct Operation *op = op_add (chn, client, req->op_id, 0);
2733 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2734 &store_recv_state_var,
2735 &store_recv_state_result, op);
2736 GNUNET_SERVICE_client_continue (client);
2741 check_client_state_get_prefix (void *cls,
2742 const struct StateRequest *req)
2744 struct Client *c = cls;
2745 struct Channel *chn = c->channel;
2749 return GNUNET_SYSERR;
2752 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2753 const char *name = (const char *) &req[1];
2754 if (0 == name_size || '\0' != name[name_size - 1])
2757 return GNUNET_SYSERR;
2765 * Client requests state variables with a given prefix from PSYCstore.
2768 handle_client_state_get_prefix (void *cls,
2769 const struct StateRequest *req)
2771 struct Client *c = cls;
2772 struct GNUNET_SERVICE_Client *client = c->client;
2773 struct Channel *chn = c->channel;
2775 const char *name = (const char *) &req[1];
2776 struct Operation *op = op_add (chn, client, req->op_id, 0);
2777 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2778 &store_recv_state_var,
2779 &store_recv_state_result, op);
2780 GNUNET_SERVICE_client_continue (client);
2785 * Initialize the PSYC service.
2787 * @param cls Closure.
2788 * @param server The initialized server.
2789 * @param c Configuration to use.
2793 const struct GNUNET_CONFIGURATION_Handle *c,
2794 struct GNUNET_SERVICE_Handle *svc)
2798 store = GNUNET_PSYCSTORE_connect (cfg);
2799 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2800 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2801 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2802 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2803 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2804 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2809 * Define "main" method using service macro.
2813 GNUNET_SERVICE_OPTION_NONE,
2815 &client_notify_connect,
2816 &client_notify_disconnect,
2818 GNUNET_MQ_hd_fixed_size (client_master_start,
2819 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
2820 struct MasterStartRequest,
2822 GNUNET_MQ_hd_var_size (client_slave_join,
2823 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
2824 struct SlaveJoinRequest,
2826 GNUNET_MQ_hd_var_size (client_join_decision,
2827 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2828 struct GNUNET_PSYC_JoinDecisionMessage,
2830 GNUNET_MQ_hd_fixed_size (client_part_request,
2831 GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST,
2832 struct GNUNET_MessageHeader,
2834 GNUNET_MQ_hd_var_size (client_psyc_message,
2835 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2836 struct GNUNET_MessageHeader,
2838 GNUNET_MQ_hd_fixed_size (client_membership_store,
2839 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
2840 struct ChannelMembershipStoreRequest,
2842 GNUNET_MQ_hd_var_size (client_history_replay,
2843 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
2844 struct GNUNET_PSYC_HistoryRequestMessage,
2846 GNUNET_MQ_hd_var_size (client_state_get,
2847 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
2848 struct StateRequest,
2850 GNUNET_MQ_hd_var_size (client_state_get_prefix,
2851 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2852 struct StateRequest,
2855 /* end of gnunet-service-psyc.c */