2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
42 * Handle to our current configuration.
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
47 * Handle to the statistics service.
49 static struct GNUNET_STATISTICS_Handle *stats;
52 * Notification context, simplifies client broadcasts.
54 static struct GNUNET_SERVER_NotificationContext *nc;
57 * Handle to the PSYCstore.
59 static struct GNUNET_PSYCSTORE_Handle *store;
62 * All connected masters.
63 * Channel's pub_key_hash -> struct Master
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
68 * All connected slaves.
69 * Channel's pub_key_hash -> struct Slave
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
74 * Connected slaves per channel.
75 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
81 * Message in the transmission queue.
83 struct TransmitMessage
85 struct TransmitMessage *prev;
86 struct TransmitMessage *next;
88 struct GNUNET_SERVER_Client *client;
91 * ID assigned to the message.
101 * @see enum MessageState
106 * Whether a message ACK has already been sent to the client.
107 * #GNUNET_YES or #GNUNET_NO
111 /* Followed by message */
116 * Cache for received message fragments.
117 * Message fragments are only sent to clients after all modifiers arrived.
119 * chan_key -> MultiHashMap chan_msgs
121 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
125 * Entry in the chan_msgs hashmap of @a recv_cache:
126 * fragment_id -> RecvCacheEntry
128 struct RecvCacheEntry
130 struct GNUNET_MULTICAST_MessageHeader *mmsg;
136 * Entry in the @a recv_frags hash map of a @a Channel.
137 * message_id -> FragmentQueue
142 * Fragment IDs stored in @a recv_cache.
144 struct GNUNET_CONTAINER_Heap *fragments;
147 * Total size of received fragments.
152 * Total size of received header fragments (METHOD & MODIFIERs)
154 uint64_t header_size;
157 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
159 uint64_t state_delta;
162 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
167 * Receive state of message.
169 * @see MessageFragmentState
174 * Is the message queued for delivery to the client?
175 * i.e. added to the recv_msgs queue
182 * List of connected clients.
184 struct ClientListItem
186 struct ClientListItem *prev;
187 struct ClientListItem *next;
188 struct GNUNET_SERVER_Client *client;
193 * Common part of the client context for both a channel master and slave.
197 struct ClientListItem *clients_head;
198 struct ClientListItem *clients_tail;
200 struct TransmitMessage *tmit_head;
201 struct TransmitMessage *tmit_tail;
204 * Current PSYCstore operation.
206 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
209 * Received fragments not yet sent to the client.
210 * message_id -> FragmentQueue
212 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
215 * Received message IDs not yet sent to the client.
217 struct GNUNET_CONTAINER_Heap *recv_msgs;
220 * Public key of the channel.
222 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
225 * Hash of @a pub_key.
227 struct GNUNET_HashCode pub_key_hash;
230 * Last message ID sent to the client.
231 * 0 if there is no such message.
233 uint64_t max_message_id;
236 * ID of the last stateful message, where the state operations has been
237 * processed and saved to PSYCstore and which has been sent to the client.
238 * 0 if there is no such message.
240 uint64_t max_state_message_id;
243 * Expected value size for the modifier being received from the PSYC service.
245 uint32_t tmit_mod_value_size_expected;
248 * Actual value size for the modifier being received from the PSYC service.
250 uint32_t tmit_mod_value_size;
253 * @see enum MessageState
263 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
268 * Is this channel ready to receive messages from client?
269 * #GNUNET_YES or #GNUNET_NO
274 * Is the client disconnected?
275 * #GNUNET_YES or #GNUNET_NO
277 uint8_t is_disconnected;
282 * Client context for a channel master.
287 * Channel struct common for Master and Slave
292 * Private key of the channel.
294 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
297 * Handle for the multicast origin.
299 struct GNUNET_MULTICAST_Origin *origin;
302 * Transmit handle for multicast.
304 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
307 * Incoming join requests from multicast.
308 * member_key -> struct GNUNET_MULTICAST_JoinHandle *
310 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
313 * Last message ID transmitted to this channel.
315 * Incremented before sending a message, thus the message_id in messages sent
318 uint64_t max_message_id;
321 * ID of the last message with state operations transmitted to the channel.
322 * 0 if there is no such message.
324 uint64_t max_state_message_id;
327 * Maximum group generation transmitted to the channel.
329 uint64_t max_group_generation;
332 * @see enum GNUNET_PSYC_Policy
334 enum GNUNET_PSYC_Policy policy;
339 * Client context for a channel slave.
344 * Channel struct common for Master and Slave
349 * Private key of the slave.
351 struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
354 * Public key of the slave.
356 struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
359 * Hash of @a pub_key.
361 struct GNUNET_HashCode pub_key_hash;
364 * Handle for the multicast member.
366 struct GNUNET_MULTICAST_Member *member;
369 * Transmit handle for multicast.
371 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
374 * Peer identity of the origin.
376 struct GNUNET_PeerIdentity origin;
379 * Number of items in @a relays.
381 uint32_t relay_count;
384 * Relays that multicast can use to connect.
386 struct GNUNET_PeerIdentity *relays;
389 * Join request to be transmitted to the master on join.
391 struct GNUNET_PSYC_Message *join_msg;
394 * Join decision received from multicast.
396 struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
399 * Maximum request ID for this channel.
401 uint64_t max_request_id;
406 transmit_message (struct Channel *chn);
410 message_queue_drop (struct Channel *chn);
414 * Task run during shutdown.
420 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
424 GNUNET_SERVER_notification_context_destroy (nc);
429 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
436 * Clean up master data structures after a client disconnected.
439 cleanup_master (struct Master *mst)
441 struct Channel *chn = &mst->chn;
443 if (NULL != mst->origin)
444 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
445 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
446 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
451 * Clean up slave data structures after a client disconnected.
454 cleanup_slave (struct Slave *slv)
456 struct Channel *chn = &slv->chn;
457 struct GNUNET_CONTAINER_MultiHashMap *
458 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
460 GNUNET_assert (NULL != chn_slv);
461 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
463 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
465 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
467 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
469 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
471 if (NULL != slv->join_msg)
473 GNUNET_free (slv->join_msg);
474 slv->join_msg = NULL;
476 if (NULL != slv->relays)
478 GNUNET_free (slv->relays);
481 if (NULL != slv->member)
483 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
486 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
491 * Clean up channel data structures after a client disconnected.
494 cleanup_channel (struct Channel *chn)
496 message_queue_drop (chn);
497 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
499 if (NULL != chn->store_op)
501 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
502 chn->store_op = NULL;
505 (GNUNET_YES == chn->is_master)
506 ? cleanup_master ((struct Master *) chn)
507 : cleanup_slave ((struct Slave *) chn);
513 * Called whenever a client is disconnected.
514 * Frees our resources associated with that client.
516 * @param cls Closure.
517 * @param client Identification of the client.
520 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
526 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
530 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
531 "%p User context is NULL in client_disconnect()\n", chn);
536 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
537 "%p Client (%s) disconnected from channel %s\n",
538 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
539 GNUNET_h2s (&chn->pub_key_hash));
541 struct ClientListItem *cli = chn->clients_head;
544 if (cli->client == client)
546 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
553 if (NULL == chn->clients_head)
554 { /* Last client disconnected. */
555 if (NULL != chn->tmit_head)
556 { /* Send pending messages to multicast before cleanup. */
557 transmit_message (chn);
561 cleanup_channel (chn);
568 * Send message to all clients connected to the channel.
571 client_send_msg (const struct Channel *chn,
572 const struct GNUNET_MessageHeader *msg)
574 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
575 "%p Sending message to clients.\n", chn);
577 struct ClientListItem *cli = chn->clients_head;
580 GNUNET_SERVER_notification_context_add (nc, cli->client);
581 GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
588 * Closure for join_mem_test_cb()
590 struct JoinMemTestClosure
592 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
594 struct GNUNET_MULTICAST_JoinHandle *jh;
595 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
600 * Membership test result callback used for join requests.
603 join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
605 struct JoinMemTestClosure *jcls = cls;
607 if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
608 { /* Pass on join request to client if this is a master channel */
609 struct Master *mst = (struct Master *) jcls->chn;
610 struct GNUNET_HashCode slave_key_hash;
611 GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
613 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
614 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
615 client_send_msg (jcls->chn, &jcls->join_msg->header);
620 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
622 GNUNET_free (jcls->join_msg);
628 * Incoming join request from multicast.
631 mcast_recv_join_request (void *cls,
632 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
633 const struct GNUNET_MessageHeader *join_msg,
634 struct GNUNET_MULTICAST_JoinHandle *jh)
636 struct Channel *chn = cls;
637 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
639 uint16_t join_msg_size = 0;
640 if (NULL != join_msg)
642 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
644 join_msg_size = ntohs (join_msg->size);
648 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
649 "%p Got join message with invalid type %u.\n",
650 chn, ntohs (join_msg->type));
654 struct GNUNET_PSYC_JoinRequestMessage *
655 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
656 req->header.size = htons (sizeof (*req) + join_msg_size);
657 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
658 req->slave_key = *slave_key;
659 if (0 < join_msg_size)
660 memcpy (&req[1], join_msg, join_msg_size);
662 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
663 jcls->slave_key = *slave_key;
666 jcls->join_msg = req;
668 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
669 chn->max_message_id, 0,
670 &join_mem_test_cb, jcls);
675 * Join decision received from multicast.
678 mcast_recv_join_decision (void *cls, int is_admitted,
679 const struct GNUNET_PeerIdentity *peer,
680 uint16_t relay_count,
681 const struct GNUNET_PeerIdentity *relays,
682 const struct GNUNET_MessageHeader *join_resp)
684 struct Slave *slv = cls;
685 struct Channel *chn = &slv->chn;
686 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687 "%p Got join decision: %d\n", slv, is_admitted);
689 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
690 struct GNUNET_PSYC_JoinDecisionMessage *
691 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
692 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
693 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
694 dcsn->is_admitted = htonl (is_admitted);
695 if (0 < join_resp_size)
696 memcpy (&dcsn[1], join_resp, join_resp_size);
698 client_send_msg (chn, &dcsn->header);
700 if (GNUNET_YES == is_admitted)
702 chn->is_ready = GNUNET_YES;
712 * Received result of GNUNET_PSYCSTORE_membership_test()
715 store_recv_membership_test_result (void *cls, int64_t result, const char *err_msg)
717 struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
718 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719 "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%s)\n",
720 mth, result, err_msg);
722 GNUNET_MULTICAST_membership_test_result (mth, result);
727 * Incoming membership test request from multicast.
730 mcast_recv_membership_test (void *cls,
731 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
732 uint64_t message_id, uint64_t group_generation,
733 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
735 struct Channel *chn = cls;
736 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
737 "%p Received membership test request from multicast.\n",
739 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
740 message_id, group_generation,
741 &store_recv_membership_test_result, mth);
746 store_recv_fragment_replay (void *cls,
747 struct GNUNET_MULTICAST_MessageHeader *msg,
748 enum GNUNET_PSYCSTORE_MessageFlags flags)
750 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
752 GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
758 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
761 store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_msg)
763 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
764 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
765 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%s)\n",
766 rh, result, err_msg);
774 GNUNET_MULTICAST_replay_response (rh, NULL,
775 GNUNET_MULTICAST_REC_NOT_FOUND);
778 case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
779 GNUNET_MULTICAST_replay_response (rh, NULL,
780 GNUNET_MULTICAST_REC_ACCESS_DENIED);
783 GNUNET_MULTICAST_replay_response (rh, NULL,
784 GNUNET_MULTICAST_REC_INTERNAL_ERROR);
787 GNUNET_MULTICAST_replay_response_end (rh);
792 * Incoming fragment replay request from multicast.
795 mcast_recv_replay_fragment (void *cls,
796 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
797 uint64_t fragment_id, uint64_t flags,
798 struct GNUNET_MULTICAST_ReplayHandle *rh)
801 struct Channel *chn = cls;
802 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key, fragment_id,
803 &store_recv_fragment_replay,
804 &store_recv_fragment_replay_result, rh);
809 * Incoming message replay request from multicast.
812 mcast_recv_replay_message (void *cls,
813 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
815 uint64_t fragment_offset,
817 struct GNUNET_MULTICAST_ReplayHandle *rh)
819 struct Channel *chn = cls;
820 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, message_id,
821 &store_recv_fragment_replay,
822 &store_recv_fragment_replay_result, rh);
827 * Convert an uint64_t in network byte order to a HashCode
828 * that can be used as key in a MultiHashMap
831 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
833 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
834 /* TODO: use built-in byte swap functions if available */
836 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
837 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
839 *key = (struct GNUNET_HashCode) {};
841 = (n << 32) | (n >> 32);
846 * Convert an uint64_t in host byte order to a HashCode
847 * that can be used as key in a MultiHashMap
850 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
852 #if __BYTE_ORDER == __BIG_ENDIAN
853 hash_key_from_nll (key, n);
854 #elif __BYTE_ORDER == __LITTLE_ENDIAN
855 *key = (struct GNUNET_HashCode) {};
856 *((uint64_t *) key) = n;
858 #error byteorder undefined
864 * Send multicast message to all clients connected to the channel.
867 client_send_mcast_msg (struct Channel *chn,
868 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
870 struct GNUNET_PSYC_MessageHeader *pmsg;
871 uint16_t size = ntohs (mmsg->header.size);
872 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
874 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
875 "%p Sending multicast message to client. "
876 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
877 chn, GNUNET_ntohll (mmsg->fragment_id),
878 GNUNET_ntohll (mmsg->message_id));
880 pmsg = GNUNET_malloc (psize);
881 pmsg->header.size = htons (psize);
882 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
883 pmsg->message_id = mmsg->message_id;
884 pmsg->fragment_offset = mmsg->fragment_offset;
886 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
887 client_send_msg (chn, &pmsg->header);
893 * Send multicast request to all clients connected to the channel.
896 client_send_mcast_req (struct Master *mst,
897 const struct GNUNET_MULTICAST_RequestHeader *req)
899 struct Channel *chn = &mst->chn;
901 struct GNUNET_PSYC_MessageHeader *pmsg;
902 uint16_t size = ntohs (req->header.size);
903 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
905 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
906 "%p Sending multicast request to client. "
907 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
908 chn, GNUNET_ntohll (req->fragment_id),
909 GNUNET_ntohll (req->request_id));
911 pmsg = GNUNET_malloc (psize);
912 pmsg->header.size = htons (psize);
913 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
914 pmsg->message_id = req->request_id;
915 pmsg->fragment_offset = req->fragment_offset;
916 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
918 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
919 client_send_msg (chn, &pmsg->header);
925 * Insert a multicast message fragment into the queue belonging to the message.
927 * @param chn Channel.
928 * @param mmsg Multicast message fragment.
929 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
930 * @param first_ptype First PSYC message part type in @a mmsg.
931 * @param last_ptype Last PSYC message part type in @a mmsg.
934 fragment_queue_insert (struct Channel *chn,
935 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
936 uint16_t first_ptype, uint16_t last_ptype)
938 const uint16_t size = ntohs (mmsg->header.size);
939 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
940 struct GNUNET_CONTAINER_MultiHashMap
941 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
944 struct GNUNET_HashCode msg_id_hash;
945 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
948 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
952 fragq = GNUNET_new (struct FragmentQueue);
953 fragq->state = MSG_FRAG_STATE_HEADER;
955 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
957 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
958 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
960 if (NULL == chan_msgs)
962 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
963 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
964 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
968 struct GNUNET_HashCode frag_id_hash;
969 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
970 struct RecvCacheEntry
971 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
972 if (NULL == cache_entry)
974 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975 "%p Adding message fragment to cache. "
976 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
977 chn, GNUNET_ntohll (mmsg->message_id),
978 GNUNET_ntohll (mmsg->fragment_id));
979 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980 "%p header_size: %" PRIu64 " + %u\n",
981 chn, fragq->header_size, size);
982 cache_entry = GNUNET_new (struct RecvCacheEntry);
983 cache_entry->ref_count = 1;
984 cache_entry->mmsg = GNUNET_malloc (size);
985 memcpy (cache_entry->mmsg, mmsg, size);
986 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
987 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
991 cache_entry->ref_count++;
992 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
993 "%p Message fragment is already in cache. "
994 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
996 chn, GNUNET_ntohll (mmsg->message_id),
997 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1000 if (MSG_FRAG_STATE_HEADER == fragq->state)
1002 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1004 struct GNUNET_PSYC_MessageMethod *
1005 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1006 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1007 fragq->flags = ntohl (pmeth->flags);
1010 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1012 fragq->header_size += size;
1014 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1015 || frag_offset == fragq->header_size)
1016 { /* header is now complete */
1017 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1018 "%p Header of message %" PRIu64 " is complete.\n",
1019 chn, GNUNET_ntohll (mmsg->message_id));
1021 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1022 "%p Adding message %" PRIu64 " to queue.\n",
1023 chn, GNUNET_ntohll (mmsg->message_id));
1024 fragq->state = MSG_FRAG_STATE_DATA;
1028 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1029 "%p Header of message %" PRIu64 " is NOT complete yet: "
1030 "%" PRIu64 " != %" PRIu64 "\n",
1031 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1032 fragq->header_size);
1038 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1039 if (frag_offset == fragq->size)
1040 fragq->state = MSG_FRAG_STATE_END;
1042 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1043 "%p Message %" PRIu64 " is NOT complete yet: "
1044 "%" PRIu64 " != %" PRIu64 "\n",
1045 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1049 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1050 /* Drop message without delivering to client if it's a single fragment */
1052 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1053 ? MSG_FRAG_STATE_DROP
1054 : MSG_FRAG_STATE_CANCEL;
1057 switch (fragq->state)
1059 case MSG_FRAG_STATE_DATA:
1060 case MSG_FRAG_STATE_END:
1061 case MSG_FRAG_STATE_CANCEL:
1062 if (GNUNET_NO == fragq->is_queued)
1064 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1065 GNUNET_ntohll (mmsg->message_id));
1066 fragq->is_queued = GNUNET_YES;
1070 fragq->size += size;
1071 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1072 GNUNET_ntohll (mmsg->fragment_id));
1077 * Run fragment queue of a message.
1079 * Send fragments of a message in order to client, after all modifiers arrived
1082 * @param chn Channel.
1083 * @param msg_id ID of the message @a fragq belongs to.
1084 * @param fragq Fragment queue of the message.
1085 * @param drop Drop message without delivering to client?
1086 * #GNUNET_YES or #GNUNET_NO.
1089 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1090 struct FragmentQueue *fragq, uint8_t drop)
1092 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1093 "%p Running message fragment queue for message %" PRIu64
1095 chn, msg_id, fragq->state);
1097 struct GNUNET_CONTAINER_MultiHashMap
1098 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1099 &chn->pub_key_hash);
1100 GNUNET_assert (NULL != chan_msgs);
1103 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1106 struct GNUNET_HashCode frag_id_hash;
1107 hash_key_from_hll (&frag_id_hash, frag_id);
1108 struct RecvCacheEntry *cache_entry
1109 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1110 if (cache_entry != NULL)
1112 if (GNUNET_NO == drop)
1114 client_send_mcast_msg (chn, cache_entry->mmsg);
1116 if (cache_entry->ref_count <= 1)
1118 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1120 GNUNET_free (cache_entry->mmsg);
1121 GNUNET_free (cache_entry);
1125 cache_entry->ref_count--;
1128 #if CACHE_AGING_IMPLEMENTED
1129 else if (GNUNET_NO == drop)
1131 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1135 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1138 if (MSG_FRAG_STATE_END <= fragq->state)
1140 struct GNUNET_HashCode msg_id_hash;
1141 hash_key_from_hll (&msg_id_hash, msg_id);
1143 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1144 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1145 GNUNET_free (fragq);
1149 fragq->is_queued = GNUNET_NO;
1155 * Run message queue.
1157 * Send messages in queue to client in order after a message has arrived from
1158 * multicast, according to the following:
1159 * - A message is only sent if all of its modifiers arrived.
1160 * - A stateful message is only sent if the previous stateful message
1161 * has already been delivered to the client.
1163 * @param chn Channel.
1165 * @return Number of messages removed from queue and sent to client.
1168 message_queue_run (struct Channel *chn)
1170 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1171 "%p Running message queue.\n", chn);
1174 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1177 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1178 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1179 struct GNUNET_HashCode msg_id_hash;
1180 hash_key_from_hll (&msg_id_hash, msg_id);
1182 struct FragmentQueue *
1183 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1185 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1187 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1188 "%p No fragq (%p) or header not complete.\n",
1193 if (MSG_FRAG_STATE_HEADER == fragq->state)
1195 /* Check if there's a missing message before the current one */
1196 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1198 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1199 && msg_id - 1 != chn->max_message_id)
1201 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1202 "%p Out of order message. "
1203 "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1204 chn, msg_id, chn->max_message_id);
1210 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1212 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1213 "%p Out of order stateful message. "
1214 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1215 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1219 /* FIXME: apply modifiers to state in PSYCstore */
1220 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
1221 store_recv_state_modify_result, cls);
1223 chn->max_state_message_id = msg_id;
1225 chn->max_message_id = msg_id;
1227 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1228 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1231 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1232 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1238 * Drop message queue of a channel.
1240 * Remove all messages in queue without sending it to clients.
1242 * @param chn Channel.
1244 * @return Number of messages removed from queue.
1247 message_queue_drop (struct Channel *chn)
1249 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1250 "%p Dropping message queue.\n", chn);
1253 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1256 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1257 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1258 struct GNUNET_HashCode msg_id_hash;
1259 hash_key_from_hll (&msg_id_hash, msg_id);
1261 struct FragmentQueue *
1262 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1264 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1265 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1268 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1269 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1275 * Received result of GNUNET_PSYCSTORE_fragment_store().
1278 store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg)
1280 struct Channel *chn = cls;
1281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1282 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n",
1283 chn, result, err_msg);
1288 * Handle incoming message fragment from multicast.
1290 * Store it using PSYCstore and send it to the clients of the channel in order.
1293 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1295 struct Channel *chn = cls;
1296 uint16_t size = ntohs (mmsg->header.size);
1298 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1299 "%p Received multicast message of size %u.\n",
1302 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1303 &store_recv_fragment_store_result, chn);
1305 uint16_t first_ptype = 0, last_ptype = 0;
1307 == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1308 (const char *) &mmsg[1],
1309 &first_ptype, &last_ptype))
1311 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1312 "%p Dropping incoming multicast message with invalid parts.\n",
1314 GNUNET_break_op (0);
1318 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1319 "Message parts: first: type %u, last: type %u\n",
1320 first_ptype, last_ptype);
1322 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1323 message_queue_run (chn);
1328 * Incoming request fragment from multicast for a master.
1330 * @param cls Master.
1331 * @param req The request.
1334 mcast_recv_request (void *cls,
1335 const struct GNUNET_MULTICAST_RequestHeader *req)
1337 struct Master *mst = cls;
1338 uint16_t size = ntohs (req->header.size);
1340 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1341 "%p Received multicast request of size %u.\n",
1344 uint16_t first_ptype = 0, last_ptype = 0;
1346 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1347 (const char *) &req[1],
1348 &first_ptype, &last_ptype))
1350 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1351 "%p Dropping incoming multicast request with invalid parts.\n",
1353 GNUNET_break_op (0);
1357 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1358 "Message parts: first: type %u, last: type %u\n",
1359 first_ptype, last_ptype);
1361 /* FIXME: in-order delivery */
1362 client_send_mcast_req (mst, req);
1367 * Response from PSYCstore with the current counter values for a channel master.
1370 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1371 uint64_t max_message_id, uint64_t max_group_generation,
1372 uint64_t max_state_message_id)
1374 struct Master *mst = cls;
1375 struct Channel *chn = &mst->chn;
1376 chn->store_op = NULL;
1378 struct CountersResult res;
1379 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1380 res.header.size = htons (sizeof (res));
1381 res.result_code = htonl (result);
1382 res.max_message_id = GNUNET_htonll (max_message_id);
1384 if (GNUNET_OK == result || GNUNET_NO == result)
1386 mst->max_message_id = max_message_id;
1387 chn->max_message_id = max_message_id;
1388 chn->max_state_message_id = max_state_message_id;
1389 mst->max_group_generation = max_group_generation;
1391 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1392 &mcast_recv_join_request,
1393 &mcast_recv_membership_test,
1394 &mcast_recv_replay_fragment,
1395 &mcast_recv_replay_message,
1396 &mcast_recv_request,
1397 &mcast_recv_message, chn);
1398 chn->is_ready = GNUNET_YES;
1402 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1403 "%p GNUNET_PSYCSTORE_counters_get() "
1404 "returned %d for channel %s.\n",
1405 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1408 client_send_msg (chn, &res.header);
1413 * Response from PSYCstore with the current counter values for a channel slave.
1416 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1417 uint64_t max_message_id, uint64_t max_group_generation,
1418 uint64_t max_state_message_id)
1420 struct Slave *slv = cls;
1421 struct Channel *chn = &slv->chn;
1422 chn->store_op = NULL;
1424 struct CountersResult res;
1425 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1426 res.header.size = htons (sizeof (res));
1427 res.result_code = htonl (result);
1428 res.max_message_id = GNUNET_htonll (max_message_id);
1430 if (GNUNET_OK == result || GNUNET_NO == result)
1432 chn->max_message_id = max_message_id;
1433 chn->max_state_message_id = max_state_message_id;
1435 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1437 slv->relay_count, slv->relays,
1438 &slv->join_msg->header,
1439 &mcast_recv_join_request,
1440 &mcast_recv_join_decision,
1441 &mcast_recv_membership_test,
1442 &mcast_recv_replay_fragment,
1443 &mcast_recv_replay_message,
1444 &mcast_recv_message, chn);
1445 if (NULL != slv->join_msg)
1447 GNUNET_free (slv->join_msg);
1448 slv->join_msg = NULL;
1453 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1454 "%p GNUNET_PSYCSTORE_counters_get() "
1455 "returned %d for channel %s.\n",
1456 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1459 client_send_msg (chn, &res.header);
1464 channel_init (struct Channel *chn)
1467 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1468 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1473 * Handle a connecting client starting a channel master.
1476 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1477 const struct GNUNET_MessageHeader *msg)
1479 const struct MasterStartRequest *req
1480 = (const struct MasterStartRequest *) msg;
1482 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1483 struct GNUNET_HashCode pub_key_hash;
1485 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1486 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1489 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1490 struct Channel *chn;
1494 mst = GNUNET_new (struct Master);
1495 mst->policy = ntohl (req->policy);
1496 mst->priv_key = req->channel_key;
1497 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1500 chn->is_master = GNUNET_YES;
1501 chn->pub_key = pub_key;
1502 chn->pub_key_hash = pub_key_hash;
1505 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1506 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1507 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1508 store_recv_master_counters, mst);
1514 struct CountersResult res;
1515 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1516 res.header.size = htons (sizeof (res));
1517 res.result_code = htonl (GNUNET_OK);
1518 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1520 GNUNET_SERVER_notification_context_add (nc, client);
1521 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1525 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1526 "%p Client connected as master to channel %s.\n",
1527 mst, GNUNET_h2s (&chn->pub_key_hash));
1529 struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1530 cli->client = client;
1531 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1533 GNUNET_SERVER_client_set_user_context (client, chn);
1534 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1539 * Handle a connecting client joining as a channel slave.
1542 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1543 const struct GNUNET_MessageHeader *msg)
1545 const struct SlaveJoinRequest *req
1546 = (const struct SlaveJoinRequest *) msg;
1547 uint16_t req_size = ntohs (req->header.size);
1549 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1550 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1552 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1553 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1554 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1556 struct GNUNET_CONTAINER_MultiHashMap *
1557 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1558 struct Slave *slv = NULL;
1559 struct Channel *chn;
1561 if (NULL != chn_slv)
1563 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1567 slv = GNUNET_new (struct Slave);
1568 slv->priv_key = req->slave_key;
1569 slv->pub_key = slv_pub_key;
1570 slv->pub_key_hash = slv_pub_key_hash;
1571 slv->origin = req->origin;
1572 slv->relay_count = ntohl (req->relay_count);
1574 const struct GNUNET_PeerIdentity *
1575 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1576 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1577 uint16_t join_msg_size = 0;
1579 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1582 join_msg_size = ntohs (slv->join_msg->header.size);
1583 slv->join_msg = GNUNET_malloc (join_msg_size);
1584 memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
1586 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1588 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1589 "%u + %u + %u != %u\n",
1590 sizeof (*req), relay_size, join_msg_size, req_size);
1592 GNUNET_SERVER_client_disconnect (client);
1595 if (0 < slv->relay_count)
1597 slv->relays = GNUNET_malloc (relay_size);
1598 memcpy (slv->relays, &req[1], relay_size);
1602 chn->is_master = GNUNET_NO;
1603 chn->pub_key = req->channel_key;
1604 chn->pub_key_hash = pub_key_hash;
1607 if (NULL == chn_slv)
1609 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1610 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1611 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1613 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1614 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1615 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1616 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1617 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1618 &store_recv_slave_counters, slv);
1624 struct CountersResult res;
1625 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1626 res.header.size = htons (sizeof (res));
1627 res.result_code = htonl (GNUNET_OK);
1628 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1630 GNUNET_SERVER_notification_context_add (nc, client);
1631 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1634 if (NULL == slv->member)
1637 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1639 slv->relay_count, slv->relays,
1640 &slv->join_msg->header,
1641 &mcast_recv_join_request,
1642 &mcast_recv_join_decision,
1643 &mcast_recv_membership_test,
1644 &mcast_recv_replay_fragment,
1645 &mcast_recv_replay_message,
1646 &mcast_recv_message, chn);
1647 if (NULL != slv->join_msg)
1649 GNUNET_free (slv->join_msg);
1650 slv->join_msg = NULL;
1653 else if (NULL != slv->join_dcsn)
1655 GNUNET_SERVER_notification_context_add (nc, client);
1656 GNUNET_SERVER_notification_context_unicast (nc, client,
1657 &slv->join_dcsn->header,
1662 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1663 "%p Client connected as slave to channel %s.\n",
1664 slv, GNUNET_h2s (&chn->pub_key_hash));
1666 struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1667 cli->client = client;
1668 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1670 GNUNET_SERVER_client_set_user_context (client, chn);
1671 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1675 struct JoinDecisionClosure
1677 int32_t is_admitted;
1678 struct GNUNET_MessageHeader *msg;
1683 * Iterator callback for sending join decisions to multicast.
1686 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1689 struct JoinDecisionClosure *jcls = cls;
1690 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1691 // FIXME: add relays
1692 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1698 * Join decision from client.
1701 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1702 const struct GNUNET_MessageHeader *msg)
1705 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1706 GNUNET_assert (GNUNET_YES == chn->is_master);
1707 struct Master *mst = (struct Master *) chn;
1709 struct GNUNET_PSYC_JoinDecisionMessage *
1710 dcsn = (struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1711 struct JoinDecisionClosure jcls;
1712 jcls.is_admitted = ntohl (dcsn->is_admitted);
1714 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1715 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1718 struct GNUNET_HashCode slave_key_hash;
1719 GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1722 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1723 "%p Got join decision (%d) from client for channel %s..\n",
1724 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1725 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1726 "%p ..and slave %s.\n",
1727 mst, GNUNET_h2s (&slave_key_hash));
1729 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1730 &mcast_send_join_decision, &jcls);
1731 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1732 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1737 * Send acknowledgement to a client.
1739 * Sent after a message fragment has been passed on to multicast.
1741 * @param chn The channel struct for the client.
1744 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1746 struct GNUNET_MessageHeader res;
1747 res.size = htons (sizeof (res));
1748 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1751 GNUNET_SERVER_notification_context_add (nc, client);
1752 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1757 * Callback for the transmit functions of multicast.
1760 transmit_notify (void *cls, size_t *data_size, void *data)
1762 struct Channel *chn = cls;
1763 struct TransmitMessage *tmit_msg = chn->tmit_head;
1765 if (NULL == tmit_msg || *data_size < tmit_msg->size)
1767 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1768 "%p transmit_notify: nothing to send.\n", chn);
1773 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1774 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1776 *data_size = tmit_msg->size;
1777 memcpy (data, &tmit_msg[1], *data_size);
1779 int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1781 if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
1782 send_message_ack (chn, tmit_msg->client);
1784 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1785 GNUNET_free (tmit_msg);
1787 if (NULL != chn->tmit_head)
1789 transmit_message (chn);
1791 else if (GNUNET_YES == chn->is_disconnected)
1793 /* FIXME: handle partial message (when still in_transmit) */
1794 cleanup_channel (chn);
1801 * Callback for the transmit functions of multicast.
1804 master_transmit_notify (void *cls, size_t *data_size, void *data)
1806 int ret = transmit_notify (cls, data_size, data);
1808 if (GNUNET_YES == ret)
1810 struct Master *mst = cls;
1811 mst->tmit_handle = NULL;
1818 * Callback for the transmit functions of multicast.
1821 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1823 int ret = transmit_notify (cls, data_size, data);
1825 if (GNUNET_YES == ret)
1827 struct Slave *slv = cls;
1828 slv->tmit_handle = NULL;
1835 * Transmit a message from a channel master to the multicast group.
1838 master_transmit_message (struct Master *mst)
1840 if (NULL == mst->tmit_handle)
1843 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1844 mst->max_group_generation,
1845 master_transmit_notify, mst);
1849 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1855 * Transmit a message from a channel slave to the multicast group.
1858 slave_transmit_message (struct Slave *slv)
1860 if (NULL == slv->tmit_handle)
1863 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1864 slave_transmit_notify, slv);
1868 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1874 transmit_message (struct Channel *chn)
1877 ? master_transmit_message ((struct Master *) chn)
1878 : slave_transmit_message ((struct Slave *) chn);
1883 * Queue a message from a channel master for sending to the multicast group.
1886 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1887 uint16_t first_ptype, uint16_t last_ptype)
1889 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1891 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1893 tmit_msg->id = ++mst->max_message_id;
1894 struct GNUNET_PSYC_MessageMethod *pmeth
1895 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1897 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1899 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1901 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1903 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1904 - mst->max_state_message_id);
1908 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1915 * Queue a message from a channel slave for sending to the multicast group.
1918 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1919 uint16_t first_ptype, uint16_t last_ptype)
1921 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1923 struct GNUNET_PSYC_MessageMethod *pmeth
1924 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1925 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1926 tmit_msg->id = ++slv->max_request_id;
1932 * Queue PSYC message parts for sending to multicast.
1934 * @param chn Channel to send to.
1935 * @param client Client the message originates from.
1936 * @param data_size Size of @a data.
1937 * @param data Concatenated message parts.
1938 * @param first_ptype First message part type in @a data.
1939 * @param last_ptype Last message part type in @a data.
1941 static struct TransmitMessage *
1942 queue_message (struct Channel *chn,
1943 struct GNUNET_SERVER_Client *client,
1946 uint16_t first_ptype, uint16_t last_ptype)
1948 struct TransmitMessage *
1949 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
1950 memcpy (&tmit_msg[1], data, data_size);
1951 tmit_msg->client = client;
1952 tmit_msg->size = data_size;
1953 tmit_msg->state = chn->tmit_state;
1955 /* FIXME: separate queue per message ID */
1957 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
1960 ? master_queue_message ((struct Master *) chn, tmit_msg,
1961 first_ptype, last_ptype)
1962 : slave_queue_message ((struct Slave *) chn, tmit_msg,
1963 first_ptype, last_ptype);
1969 * Cancel transmission of current message.
1971 * @param chn Channel to send to.
1972 * @param client Client the message originates from.
1975 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1977 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1979 struct GNUNET_MessageHeader msg;
1980 msg.size = htons (sizeof (msg));
1981 msg.type = htons (type);
1983 queue_message (chn, client, sizeof (msg), &msg, type, type);
1984 transmit_message (chn);
1986 /* FIXME: cleanup */
1991 * Incoming message from a master or slave client.
1994 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1995 const struct GNUNET_MessageHeader *msg)
1998 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1999 GNUNET_assert (NULL != chn);
2001 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2002 "%p Received message from client.\n", chn);
2003 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2005 if (GNUNET_YES != chn->is_ready)
2007 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2008 "%p Channel is not ready yet, disconnecting client.\n", chn);
2010 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2014 uint16_t size = ntohs (msg->size);
2015 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2017 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
2019 transmit_cancel (chn, client);
2020 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2024 uint16_t first_ptype = 0, last_ptype = 0;
2026 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2027 (const char *) &msg[1],
2028 &first_ptype, &last_ptype))
2030 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2031 "%p Received invalid message part from client.\n", chn);
2033 transmit_cancel (chn, client);
2034 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2038 "%p Received message with first part type %u and last part type %u.\n",
2039 chn, first_ptype, last_ptype);
2041 queue_message (chn, client, size - sizeof (*msg), &msg[1],
2042 first_ptype, last_ptype);
2043 transmit_message (chn);
2044 /* FIXME: send a few ACKs even before transmit_notify is called */
2046 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2051 * Received result of GNUNET_PSYCSTORE_membership_store()
2054 store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg)
2056 struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
2057 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2058 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n",
2059 mth, result, err_msg);
2060 /* FIXME: send result to client */
2065 * Client requests to add/remove a slave in the membership database.
2068 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2069 const struct GNUNET_MessageHeader *msg)
2072 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2073 GNUNET_assert (NULL != chn);
2075 const struct ChannelMembershipStoreRequest *
2076 req = (const struct ChannelMembershipStoreRequest *) msg;
2078 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2079 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2080 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2081 "%p Received membership store request from client.\n", chn);
2082 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2083 "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2084 chn, req->did_join, announced_at, effective_since);
2086 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2087 req->did_join, announced_at, effective_since,
2088 0, /* FIXME: group_generation */
2089 &store_recv_membership_store_result, chn);
2090 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2095 * Client requests channel history from PSYCstore.
2098 client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client,
2099 const struct GNUNET_MessageHeader *msg)
2106 * Client requests best matching state variable from PSYCstore.
2109 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2110 const struct GNUNET_MessageHeader *msg)
2117 * Client requests state variables with a given prefix from PSYCstore.
2120 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2121 const struct GNUNET_MessageHeader *msg)
2127 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2128 { &client_recv_master_start, NULL,
2129 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2131 { &client_recv_slave_join, NULL,
2132 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2134 { &client_recv_join_decision, NULL,
2135 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2137 { &client_recv_psyc_message, NULL,
2138 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2140 { &client_recv_membership_store, NULL,
2141 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2143 { &client_recv_story_request, NULL,
2144 GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
2146 { &client_recv_state_get, NULL,
2147 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2149 { &client_recv_state_get_prefix, NULL,
2150 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2152 { NULL, NULL, 0, 0 }
2157 * Initialize the PSYC service.
2159 * @param cls Closure.
2160 * @param server The initialized server.
2161 * @param c Configuration to use.
2164 run (void *cls, struct GNUNET_SERVER_Handle *server,
2165 const struct GNUNET_CONFIGURATION_Handle *c)
2168 store = GNUNET_PSYCSTORE_connect (cfg);
2169 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2170 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2171 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2172 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2173 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2174 nc = GNUNET_SERVER_notification_context_create (server, 1);
2175 GNUNET_SERVER_add_handlers (server, server_handlers);
2176 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2177 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2178 &shutdown_task, NULL);
2183 * The main function for the service.
2185 * @param argc number of arguments from the command line
2186 * @param argv command line arguments
2187 * @return 0 ok, 1 on error
2190 main (int argc, char *const *argv)
2192 return (GNUNET_OK ==
2193 GNUNET_SERVICE_run (argc, argv, "psyc",
2194 GNUNET_SERVICE_OPTION_NONE,
2195 &run, NULL)) ? 0 : 1;
2198 /* end of gnunet-service-psyc.c */