2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
42 * Handle to our current configuration.
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
47 * Handle to the statistics service.
49 static struct GNUNET_STATISTICS_Handle *stats;
52 * Notification context, simplifies client broadcasts.
54 static struct GNUNET_SERVER_NotificationContext *nc;
57 * Handle to the PSYCstore.
59 static struct GNUNET_PSYCSTORE_Handle *store;
62 * All connected masters.
63 * Channel's pub_key_hash -> struct Master
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
68 * All connected slaves.
69 * Channel's pub_key_hash -> struct Slave
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
74 * Connected slaves per channel.
75 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
81 * Message in the transmission queue.
83 struct TransmitMessage
85 struct TransmitMessage *prev;
86 struct TransmitMessage *next;
88 struct GNUNET_SERVER_Client *client;
91 * ID assigned to the message.
101 * @see enum MessageState
105 /* Followed by message */
110 * Cache for received message fragments.
111 * Message fragments are only sent to clients after all modifiers arrived.
113 * chan_key -> MultiHashMap chan_msgs
115 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
119 * Entry in the chan_msgs hashmap of @a recv_cache:
120 * fragment_id -> RecvCacheEntry
122 struct RecvCacheEntry
124 struct GNUNET_MULTICAST_MessageHeader *mmsg;
130 * Entry in the @a recv_frags hash map of a @a Channel.
131 * message_id -> FragmentQueue
136 * Fragment IDs stored in @a recv_cache.
138 struct GNUNET_CONTAINER_Heap *fragments;
141 * Total size of received fragments.
146 * Total size of received header fragments (METHOD & MODIFIERs)
148 uint64_t header_size;
151 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
153 uint64_t state_delta;
156 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
161 * Receive state of message.
163 * @see MessageFragmentState
168 * Is the message queued for delivery to the client?
169 * i.e. added to the recv_msgs queue
176 * List of connected clients.
178 struct ClientListItem
180 struct ClientListItem *prev;
181 struct ClientListItem *next;
182 struct GNUNET_SERVER_Client *client;
187 * Common part of the client context for both a channel master and slave.
191 struct ClientListItem *clients_head;
192 struct ClientListItem *clients_tail;
194 struct TransmitMessage *tmit_head;
195 struct TransmitMessage *tmit_tail;
198 * Current PSYCstore operation.
200 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
203 * Received fragments not yet sent to the client.
204 * message_id -> FragmentQueue
206 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
209 * Received message IDs not yet sent to the client.
211 struct GNUNET_CONTAINER_Heap *recv_msgs;
216 GNUNET_SCHEDULER_TaskIdentifier tmit_task;
219 * Public key of the channel.
221 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
224 * Hash of @a pub_key.
226 struct GNUNET_HashCode pub_key_hash;
229 * Last message ID sent to the client.
230 * 0 if there is no such message.
232 uint64_t max_message_id;
235 * ID of the last stateful message, where the state operations has been
236 * processed and saved to PSYCstore and which has been sent to the client.
237 * 0 if there is no such message.
239 uint64_t max_state_message_id;
242 * Expected value size for the modifier being received from the PSYC service.
244 uint32_t tmit_mod_value_size_expected;
247 * Actual value size for the modifier being received from the PSYC service.
249 uint32_t tmit_mod_value_size;
252 * @see enum MessageState
262 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
267 * Ready to receive messages from client? #GNUNET_YES or #GNUNET_NO
272 * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
274 uint8_t disconnected;
279 * Client context for a channel master.
284 * Channel struct common for Master and Slave
289 * Private key of the channel.
291 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
294 * Handle for the multicast origin.
296 struct GNUNET_MULTICAST_Origin *origin;
299 * Transmit handle for multicast.
301 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
304 * Incoming join requests from multicast.
305 * member_key -> struct GNUNET_MULTICAST_JoinHandle *
307 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
310 * Last message ID transmitted to this channel.
312 * Incremented before sending a message, thus the message_id in messages sent
315 uint64_t max_message_id;
318 * ID of the last message with state operations transmitted to the channel.
319 * 0 if there is no such message.
321 uint64_t max_state_message_id;
324 * Maximum group generation transmitted to the channel.
326 uint64_t max_group_generation;
329 * @see enum GNUNET_PSYC_Policy
331 enum GNUNET_PSYC_Policy policy;
336 * Client context for a channel slave.
341 * Channel struct common for Master and Slave
346 * Private key of the slave.
348 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
351 * Public key of the slave.
353 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
356 * Hash of @a pub_key.
358 struct GNUNET_HashCode pub_key_hash;
361 * Handle for the multicast member.
363 struct GNUNET_MULTICAST_Member *member;
366 * Transmit handle for multicast.
368 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
371 * Peer identity of the origin.
373 struct GNUNET_PeerIdentity origin;
376 * Number of items in @a relays.
378 uint32_t relay_count;
381 * Relays that multicast can use to connect.
383 struct GNUNET_PeerIdentity *relays;
386 * Join request to be transmitted to the master on join.
388 struct GNUNET_MessageHeader *join_req;
391 * Join decision received from multicast.
393 struct SlaveJoinDecision *join_dcsn;
396 * Maximum request ID for this channel.
398 uint64_t max_request_id;
403 transmit_message (struct Channel *chn);
407 message_queue_drop (struct Channel *chn);
411 * Task run during shutdown.
417 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
421 GNUNET_SERVER_notification_context_destroy (nc);
426 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
433 * Clean up master data structures after a client disconnected.
436 cleanup_master (struct Master *mst)
438 struct Channel *chn = &mst->chn;
440 if (NULL != mst->origin)
441 GNUNET_MULTICAST_origin_stop (mst->origin);
442 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
443 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
448 * Clean up slave data structures after a client disconnected.
451 cleanup_slave (struct Slave *slv)
453 struct Channel *chn = &slv->chn;
454 struct GNUNET_CONTAINER_MultiHashMap *
455 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
457 GNUNET_assert (NULL != chn_slv);
458 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
460 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
462 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
464 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
466 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
468 if (NULL != slv->join_req)
469 GNUNET_free (slv->join_req);
470 if (NULL != slv->relays)
471 GNUNET_free (slv->relays);
472 if (NULL != slv->member)
473 GNUNET_MULTICAST_member_part (slv->member);
474 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
479 * Clean up channel data structures after a client disconnected.
482 cleanup_channel (struct Channel *chn)
484 message_queue_drop (chn);
485 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
487 if (NULL != chn->store_op)
488 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
490 (GNUNET_YES == chn->is_master)
491 ? cleanup_master ((struct Master *) chn)
492 : cleanup_slave ((struct Slave *) chn);
498 * Called whenever a client is disconnected.
499 * Frees our resources associated with that client.
501 * @param cls Closure.
502 * @param client Identification of the client.
505 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
511 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
515 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
516 "%p User context is NULL in client_disconnect()\n", chn);
520 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521 "%p Client (%s) disconnected from channel %s\n",
522 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
523 GNUNET_h2s (&chn->pub_key_hash));
525 struct ClientListItem *cli = chn->clients_head;
528 if (cli->client == client)
530 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
537 if (NULL == chn->clients_head)
538 { /* Last client disconnected. */
539 if (NULL != chn->tmit_head)
540 { /* Send pending messages to multicast before cleanup. */
541 transmit_message (chn);
545 cleanup_channel (chn);
552 * Send message to all clients connected to the channel.
555 client_send_msg (const struct Channel *chn,
556 const struct GNUNET_MessageHeader *msg)
558 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
559 "%p Sending message to clients.\n", chn);
561 struct ClientListItem *cli = chn->clients_head;
564 GNUNET_SERVER_notification_context_add (nc, cli->client);
565 GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
572 * Closure for join_mem_test_cb()
574 struct JoinMemTestClosure
576 struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
578 struct GNUNET_MULTICAST_JoinHandle *jh;
579 struct MasterJoinRequest *master_join_req;
584 * Membership test result callback used for join requests.
587 join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
589 struct JoinMemTestClosure *jcls = cls;
591 if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
592 { /* Pass on join request to client if this is a master channel */
593 struct Master *mst = (struct Master *) jcls->chn;
594 struct GNUNET_HashCode slave_key_hash;
595 GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
597 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
598 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
599 client_send_msg (jcls->chn, &jcls->master_join_req->header);
604 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
606 GNUNET_free (jcls->master_join_req);
612 * Incoming join request from multicast.
615 mcast_recv_join_request (void *cls,
616 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
617 const struct GNUNET_MessageHeader *join_msg,
618 struct GNUNET_MULTICAST_JoinHandle *jh)
620 struct Channel *chn = cls;
621 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
623 uint16_t join_msg_size = 0;
624 if (NULL != join_msg)
626 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
628 join_msg_size = ntohs (join_msg->size);
632 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
633 "%p Got join message with invalid type %u.\n",
634 chn, ntohs (join_msg->type));
638 struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + join_msg_size);
639 req->header.size = htons (sizeof (*req) + join_msg_size);
640 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
641 req->slave_key = *slave_key;
642 if (0 < join_msg_size)
643 memcpy (&req[1], join_msg, join_msg_size);
645 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
646 jcls->slave_key = *slave_key;
649 jcls->master_join_req = req;
651 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
652 chn->max_message_id, 0,
653 &join_mem_test_cb, jcls);
658 * Join decision received from multicast.
661 mcast_recv_join_decision (void *cls, int is_admitted,
662 const struct GNUNET_PeerIdentity *peer,
663 uint16_t relay_count,
664 const struct GNUNET_PeerIdentity *relays,
665 const struct GNUNET_MessageHeader *join_resp)
667 struct Slave *slv = cls;
668 struct Channel *chn = &slv->chn;
669 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
670 "%p Got join decision: %d\n", slv, is_admitted);
672 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
673 struct SlaveJoinDecision *
674 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
675 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
676 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
677 dcsn->is_admitted = htonl (is_admitted);
678 if (0 < join_resp_size)
679 memcpy (&dcsn[1], join_resp, join_resp_size);
681 client_send_msg (chn, &dcsn->header);
683 if (GNUNET_YES == is_admitted)
685 chn->ready = GNUNET_YES;
695 mcast_recv_membership_test (void *cls,
696 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
697 uint64_t message_id, uint64_t group_generation,
698 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
705 mcast_recv_replay_fragment (void *cls,
706 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
707 uint64_t fragment_id, uint64_t flags,
708 struct GNUNET_MULTICAST_ReplayHandle *rh)
716 mcast_recv_replay_message (void *cls,
717 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
719 uint64_t fragment_offset,
721 struct GNUNET_MULTICAST_ReplayHandle *rh)
728 * Convert an uint64_t in network byte order to a HashCode
729 * that can be used as key in a MultiHashMap
732 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
734 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
735 /* TODO: use built-in byte swap functions if available */
737 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
738 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
740 *key = (struct GNUNET_HashCode) {{ 0 }};
742 = (n << 32) | (n >> 32);
747 * Convert an uint64_t in host byte order to a HashCode
748 * that can be used as key in a MultiHashMap
751 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
753 #if __BYTE_ORDER == __BIG_ENDIAN
754 hash_key_from_nll (key, n);
755 #elif __BYTE_ORDER == __LITTLE_ENDIAN
756 *key = (struct GNUNET_HashCode) {{ 0 }};
757 *((uint64_t *) key) = n;
759 #error byteorder undefined
765 * Send multicast message to all clients connected to the channel.
768 client_send_mcast_msg (struct Channel *chn,
769 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
771 struct GNUNET_PSYC_MessageHeader *pmsg;
772 uint16_t size = ntohs (mmsg->header.size);
773 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
775 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776 "%p Sending multicast message to client. "
777 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
778 chn, GNUNET_ntohll (mmsg->fragment_id),
779 GNUNET_ntohll (mmsg->message_id));
781 pmsg = GNUNET_malloc (psize);
782 pmsg->header.size = htons (psize);
783 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
784 pmsg->message_id = mmsg->message_id;
786 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
787 client_send_msg (chn, &pmsg->header);
793 * Send multicast request to all clients connected to the channel.
796 client_send_mcast_req (struct Master *mst,
797 const struct GNUNET_MULTICAST_RequestHeader *req)
799 struct Channel *chn = &mst->chn;
801 struct GNUNET_PSYC_MessageHeader *pmsg;
802 uint16_t size = ntohs (req->header.size);
803 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
805 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
806 "%p Sending multicast request to client. "
807 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
808 chn, GNUNET_ntohll (req->fragment_id),
809 GNUNET_ntohll (req->request_id));
811 pmsg = GNUNET_malloc (psize);
812 pmsg->header.size = htons (psize);
813 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
814 pmsg->message_id = req->request_id;
815 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
817 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
818 client_send_msg (chn, &pmsg->header);
824 * Insert a multicast message fragment into the queue belonging to the message.
826 * @param chn Channel.
827 * @param mmsg Multicast message fragment.
828 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
829 * @param first_ptype First PSYC message part type in @a mmsg.
830 * @param last_ptype Last PSYC message part type in @a mmsg.
833 fragment_queue_insert (struct Channel *chn,
834 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
835 uint16_t first_ptype, uint16_t last_ptype)
837 const uint16_t size = ntohs (mmsg->header.size);
838 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
839 struct GNUNET_CONTAINER_MultiHashMap
840 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
843 struct GNUNET_HashCode msg_id_hash;
844 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
847 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
851 fragq = GNUNET_new (struct FragmentQueue);
852 fragq->state = MSG_FRAG_STATE_HEADER;
854 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
856 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
857 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
859 if (NULL == chan_msgs)
861 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
862 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
863 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
867 struct GNUNET_HashCode frag_id_hash;
868 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
869 struct RecvCacheEntry
870 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
871 if (NULL == cache_entry)
873 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
874 "%p Adding message fragment to cache. "
875 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
876 "header_size: %" PRIu64 " + %u).\n",
877 chn, GNUNET_ntohll (mmsg->message_id),
878 GNUNET_ntohll (mmsg->fragment_id),
879 fragq->header_size, size);
880 cache_entry = GNUNET_new (struct RecvCacheEntry);
881 cache_entry->ref_count = 1;
882 cache_entry->mmsg = GNUNET_malloc (size);
883 memcpy (cache_entry->mmsg, mmsg, size);
884 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
885 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
889 cache_entry->ref_count++;
890 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
891 "%p Message fragment is already in cache. "
892 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
894 chn, GNUNET_ntohll (mmsg->message_id),
895 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
898 if (MSG_FRAG_STATE_HEADER == fragq->state)
900 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
902 struct GNUNET_PSYC_MessageMethod *
903 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
904 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
905 fragq->flags = ntohl (pmeth->flags);
908 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
910 fragq->header_size += size;
912 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
913 || frag_offset == fragq->header_size)
914 { /* header is now complete */
915 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
916 "%p Header of message %" PRIu64 " is complete.\n",
917 chn, GNUNET_ntohll (mmsg->message_id));
919 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
920 "%p Adding message %" PRIu64 " to queue.\n",
921 chn, GNUNET_ntohll (mmsg->message_id));
922 fragq->state = MSG_FRAG_STATE_DATA;
926 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
927 "%p Header of message %" PRIu64 " is NOT complete yet: "
928 "%" PRIu64 " != %" PRIu64 "\n",
929 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
936 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
937 if (frag_offset == fragq->size)
938 fragq->state = MSG_FRAG_STATE_END;
940 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
941 "%p Message %" PRIu64 " is NOT complete yet: "
942 "%" PRIu64 " != %" PRIu64 "\n",
943 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
947 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
948 /* Drop message without delivering to client if it's a single fragment */
950 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
951 ? MSG_FRAG_STATE_DROP
952 : MSG_FRAG_STATE_CANCEL;
955 switch (fragq->state)
957 case MSG_FRAG_STATE_DATA:
958 case MSG_FRAG_STATE_END:
959 case MSG_FRAG_STATE_CANCEL:
960 if (GNUNET_NO == fragq->queued)
962 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
963 GNUNET_ntohll (mmsg->message_id));
964 fragq->queued = GNUNET_YES;
969 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
970 GNUNET_ntohll (mmsg->fragment_id));
975 * Run fragment queue of a message.
977 * Send fragments of a message in order to client, after all modifiers arrived
980 * @param chn Channel.
981 * @param msg_id ID of the message @a fragq belongs to.
982 * @param fragq Fragment queue of the message.
983 * @param drop Drop message without delivering to client?
984 * #GNUNET_YES or #GNUNET_NO.
987 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
988 struct FragmentQueue *fragq, uint8_t drop)
990 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
991 "%p Running message fragment queue for message %" PRIu64
993 chn, msg_id, fragq->state);
995 struct GNUNET_CONTAINER_MultiHashMap
996 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
998 GNUNET_assert (NULL != chan_msgs);
1001 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1004 struct GNUNET_HashCode frag_id_hash;
1005 hash_key_from_hll (&frag_id_hash, frag_id);
1006 struct RecvCacheEntry *cache_entry
1007 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1008 if (cache_entry != NULL)
1010 if (GNUNET_NO == drop)
1012 client_send_mcast_msg (chn, cache_entry->mmsg);
1014 if (cache_entry->ref_count <= 1)
1016 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1018 GNUNET_free (cache_entry->mmsg);
1019 GNUNET_free (cache_entry);
1023 cache_entry->ref_count--;
1026 #if CACHE_AGING_IMPLEMENTED
1027 else if (GNUNET_NO == drop)
1029 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1033 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1036 if (MSG_FRAG_STATE_END <= fragq->state)
1038 struct GNUNET_HashCode msg_id_hash;
1039 hash_key_from_nll (&msg_id_hash, msg_id);
1041 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1042 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1043 GNUNET_free (fragq);
1047 fragq->queued = GNUNET_NO;
1053 * Run message queue.
1055 * Send messages in queue to client in order after a message has arrived from
1056 * multicast, according to the following:
1057 * - A message is only sent if all of its modifiers arrived.
1058 * - A stateful message is only sent if the previous stateful message
1059 * has already been delivered to the client.
1061 * @param chn Channel.
1063 * @return Number of messages removed from queue and sent to client.
1066 message_queue_run (struct Channel *chn)
1068 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1069 "%p Running message queue.\n", chn);
1072 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1075 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1076 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1077 struct GNUNET_HashCode msg_id_hash;
1078 hash_key_from_hll (&msg_id_hash, msg_id);
1080 struct FragmentQueue *
1081 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1083 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1085 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1086 "%p No fragq (%p) or header not complete.\n",
1091 if (MSG_FRAG_STATE_HEADER == fragq->state)
1093 /* Check if there's a missing message before the current one */
1094 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1096 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1097 && msg_id - 1 != chn->max_message_id)
1099 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1100 "%p Out of order message. "
1101 "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1102 chn, msg_id, chn->max_message_id);
1108 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1110 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1111 "%p Out of order stateful message. "
1112 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1113 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1117 /* FIXME: apply modifiers to state in PSYCstore */
1118 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
1119 store_recv_state_modify_result, cls);
1121 chn->max_state_message_id = msg_id;
1123 chn->max_message_id = msg_id;
1125 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1126 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1129 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1130 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1136 * Drop message queue of a channel.
1138 * Remove all messages in queue without sending it to clients.
1140 * @param chn Channel.
1142 * @return Number of messages removed from queue.
1145 message_queue_drop (struct Channel *chn)
1147 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1148 "%p Dropping message queue.\n", chn);
1151 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1154 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1155 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1156 struct GNUNET_HashCode msg_id_hash;
1157 hash_key_from_hll (&msg_id_hash, msg_id);
1159 struct FragmentQueue *
1160 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1162 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1163 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1166 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1167 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1173 * Handle the result of a GNUNET_PSYCSTORE_fragment_store() operation.
1176 store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg)
1178 struct Channel *chn = cls;
1179 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1180 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n",
1181 chn, result, err_msg);
1186 * Handle incoming message fragment from multicast.
1188 * Store it using PSYCstore and send it to the clients of the channel in order.
1191 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1193 struct Channel *chn = cls;
1194 uint16_t size = ntohs (mmsg->header.size);
1196 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1197 "%p Received multicast message of size %u.\n",
1200 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1201 &store_recv_fragment_store_result, chn);
1203 uint16_t first_ptype = 0, last_ptype = 0;
1205 == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1206 (const char *) &mmsg[1],
1207 &first_ptype, &last_ptype))
1209 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1210 "%p Dropping incoming multicast message with invalid parts.\n",
1212 GNUNET_break_op (0);
1216 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1217 "Message parts: first: type %u, last: type %u\n",
1218 first_ptype, last_ptype);
1220 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1221 message_queue_run (chn);
1226 * Incoming request fragment from multicast for a master.
1228 * @param cls Master.
1229 * @param slave_key Sending slave's public key.
1230 * @param msg The message.
1231 * @param flags Request flags.
1234 mcast_recv_request (void *cls,
1235 const struct GNUNET_MULTICAST_RequestHeader *req)
1237 struct Master *mst = cls;
1238 uint16_t size = ntohs (req->header.size);
1240 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1241 "%p Received multicast request of size %u.\n",
1244 uint16_t first_ptype = 0, last_ptype = 0;
1246 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1247 (const char *) &req[1],
1248 &first_ptype, &last_ptype))
1250 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1251 "%p Dropping incoming multicast request with invalid parts.\n",
1253 GNUNET_break_op (0);
1257 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258 "Message parts: first: type %u, last: type %u\n",
1259 first_ptype, last_ptype);
1261 /* FIXME: in-order delivery */
1262 client_send_mcast_req (mst, req);
1267 * Response from PSYCstore with the current counter values for a channel master.
1270 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1271 uint64_t max_message_id, uint64_t max_group_generation,
1272 uint64_t max_state_message_id)
1274 struct Master *mst = cls;
1275 struct Channel *chn = &mst->chn;
1276 chn->store_op = NULL;
1278 struct CountersResult res;
1279 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1280 res.header.size = htons (sizeof (res));
1281 res.result_code = htonl (result);
1282 res.max_message_id = GNUNET_htonll (max_message_id);
1284 if (GNUNET_OK == result || GNUNET_NO == result)
1286 mst->max_message_id = max_message_id;
1287 chn->max_message_id = max_message_id;
1288 chn->max_state_message_id = max_state_message_id;
1289 mst->max_group_generation = max_group_generation;
1291 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1292 &mcast_recv_join_request,
1293 &mcast_recv_membership_test,
1294 &mcast_recv_replay_fragment,
1295 &mcast_recv_replay_message,
1296 &mcast_recv_request,
1297 &mcast_recv_message, chn);
1298 chn->ready = GNUNET_YES;
1302 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1303 "%p GNUNET_PSYCSTORE_counters_get() "
1304 "returned %d for channel %s.\n",
1305 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1308 client_send_msg (chn, &res.header);
1313 * Response from PSYCstore with the current counter values for a channel slave.
1316 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1317 uint64_t max_message_id, uint64_t max_group_generation,
1318 uint64_t max_state_message_id)
1320 struct Slave *slv = cls;
1321 struct Channel *chn = &slv->chn;
1322 chn->store_op = NULL;
1324 struct CountersResult res;
1325 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1326 res.header.size = htons (sizeof (res));
1327 res.result_code = htonl (result);
1328 res.max_message_id = GNUNET_htonll (max_message_id);
1330 if (GNUNET_OK == result || GNUNET_NO == result)
1332 chn->max_message_id = max_message_id;
1333 chn->max_state_message_id = max_state_message_id;
1335 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1337 slv->relay_count, slv->relays,
1339 &mcast_recv_join_request,
1340 &mcast_recv_join_decision,
1341 &mcast_recv_membership_test,
1342 &mcast_recv_replay_fragment,
1343 &mcast_recv_replay_message,
1344 &mcast_recv_message, chn);
1348 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1349 "%p GNUNET_PSYCSTORE_counters_get() "
1350 "returned %d for channel %s.\n",
1351 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1354 client_send_msg (chn, &res.header);
1359 channel_init (struct Channel *chn)
1362 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1363 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1368 * Handle a connecting client starting a channel master.
1371 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1372 const struct GNUNET_MessageHeader *msg)
1374 const struct MasterStartRequest *req
1375 = (const struct MasterStartRequest *) msg;
1377 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1378 struct GNUNET_HashCode pub_key_hash;
1380 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1381 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1384 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1385 struct Channel *chn;
1389 mst = GNUNET_new (struct Master);
1390 mst->policy = ntohl (req->policy);
1391 mst->priv_key = req->channel_key;
1392 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1395 chn->is_master = GNUNET_YES;
1396 chn->pub_key = pub_key;
1397 chn->pub_key_hash = pub_key_hash;
1400 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1401 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1402 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1403 store_recv_master_counters, mst);
1409 struct CountersResult res;
1410 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1411 res.header.size = htons (sizeof (res));
1412 res.result_code = htonl (GNUNET_OK);
1413 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1415 GNUNET_SERVER_notification_context_add (nc, client);
1416 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1421 "%p Client connected as master to channel %s.\n",
1422 mst, GNUNET_h2s (&chn->pub_key_hash));
1424 struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1425 cli->client = client;
1426 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1428 GNUNET_SERVER_client_set_user_context (client, chn);
1429 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1434 * Handle a connecting client joining as a channel slave.
1437 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1438 const struct GNUNET_MessageHeader *msg)
1440 const struct SlaveJoinRequest *req
1441 = (const struct SlaveJoinRequest *) msg;
1443 struct GNUNET_CRYPTO_EddsaPublicKey slv_pub_key;
1444 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1446 GNUNET_CRYPTO_eddsa_key_get_public (&req->slave_key, &slv_pub_key);
1447 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1448 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1450 struct GNUNET_CONTAINER_MultiHashMap *
1451 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1452 struct Slave *slv = NULL;
1453 struct Channel *chn;
1455 if (NULL != chn_slv)
1457 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1461 slv = GNUNET_new (struct Slave);
1462 slv->priv_key = req->slave_key;
1463 slv->pub_key = slv_pub_key;
1464 slv->pub_key_hash = slv_pub_key_hash;
1465 slv->origin = req->origin;
1466 slv->relay_count = ntohl (req->relay_count);
1467 if (0 < slv->relay_count)
1469 const struct GNUNET_PeerIdentity *relays
1470 = (const struct GNUNET_PeerIdentity *) &req[1];
1472 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
1474 for (i = 0; i < slv->relay_count; i++)
1475 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
1479 chn->is_master = GNUNET_NO;
1480 chn->pub_key = req->channel_key;
1481 chn->pub_key_hash = pub_key_hash;
1484 if (NULL == chn_slv)
1486 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1487 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1488 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1490 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1491 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1492 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1493 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1494 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1495 &store_recv_slave_counters, slv);
1501 struct CountersResult res;
1502 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1503 res.header.size = htons (sizeof (res));
1504 res.result_code = htonl (GNUNET_OK);
1505 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1507 GNUNET_SERVER_notification_context_add (nc, client);
1508 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1511 if (NULL == slv->member)
1514 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1516 slv->relay_count, slv->relays,
1518 &mcast_recv_join_request,
1519 &mcast_recv_join_decision,
1520 &mcast_recv_membership_test,
1521 &mcast_recv_replay_fragment,
1522 &mcast_recv_replay_message,
1523 &mcast_recv_message, chn);
1526 else if (NULL != slv->join_dcsn)
1528 GNUNET_SERVER_notification_context_add (nc, client);
1529 GNUNET_SERVER_notification_context_unicast (nc, client,
1530 &slv->join_dcsn->header,
1535 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536 "%p Client connected as slave to channel %s.\n",
1537 slv, GNUNET_h2s (&chn->pub_key_hash));
1539 struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1540 cli->client = client;
1541 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1543 GNUNET_SERVER_client_set_user_context (client, &slv->chn);
1544 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1548 struct JoinDecisionClosure
1550 int32_t is_admitted;
1551 struct GNUNET_MessageHeader *msg;
1556 * Iterator callback for responding to join requests of a slave.
1559 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1562 struct JoinDecisionClosure *jcls = cls;
1563 // FIXME: add relays
1564 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1570 * Join decision from client.
1573 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1574 const struct GNUNET_MessageHeader *msg)
1577 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1578 GNUNET_assert (GNUNET_YES == chn->is_master);
1579 struct Master *mst = (struct Master *) chn;
1581 struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg;
1582 struct JoinDecisionClosure jcls;
1583 jcls.is_admitted = ntohl (dcsn->is_admitted);
1585 = (sizeof (*dcsn) + sizeof (struct GNUNET_PSYC_MessageHeader)
1586 <= ntohs (msg->size))
1587 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1590 struct GNUNET_HashCode slave_key_hash;
1591 GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1594 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1595 "%p Got join decision (%d) from client for channel %s..\n",
1596 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1597 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1598 "%p ..and slave %s.\n",
1599 mst, GNUNET_h2s (&slave_key_hash));
1601 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1602 &mcast_send_join_decision, &jcls);
1603 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1604 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1609 * Send acknowledgement to a client.
1611 * Sent after a message fragment has been passed on to multicast.
1613 * @param chn The channel struct for the client.
1616 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1618 struct GNUNET_MessageHeader res;
1619 res.size = htons (sizeof (res));
1620 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1623 GNUNET_SERVER_notification_context_add (nc, client);
1624 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1629 * Callback for the transmit functions of multicast.
1632 transmit_notify (void *cls, size_t *data_size, void *data)
1634 struct Channel *chn = cls;
1635 struct TransmitMessage *tmit_msg = chn->tmit_head;
1637 if (NULL == tmit_msg || *data_size < tmit_msg->size)
1639 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1640 "%p transmit_notify: nothing to send.\n", chn);
1645 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1646 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1648 *data_size = tmit_msg->size;
1649 memcpy (data, &tmit_msg[1], *data_size);
1651 int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1652 if (NULL != tmit_msg->client)
1653 send_message_ack (chn, tmit_msg->client);
1655 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1656 GNUNET_free (tmit_msg);
1658 if (0 == chn->tmit_task)
1660 if (NULL != chn->tmit_head)
1662 transmit_message (chn);
1664 else if (chn->disconnected)
1666 /* FIXME: handle partial message (when still in_transmit) */
1667 cleanup_channel (chn);
1676 * Callback for the transmit functions of multicast.
1679 master_transmit_notify (void *cls, size_t *data_size, void *data)
1681 int ret = transmit_notify (cls, data_size, data);
1683 if (GNUNET_YES == ret)
1685 struct Master *mst = cls;
1686 mst->tmit_handle = NULL;
1693 * Callback for the transmit functions of multicast.
1696 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1698 int ret = transmit_notify (cls, data_size, data);
1700 if (GNUNET_YES == ret)
1702 struct Slave *slv = cls;
1703 slv->tmit_handle = NULL;
1710 * Transmit a message from a channel master to the multicast group.
1713 master_transmit_message (struct Master *mst)
1715 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
1716 mst->chn.tmit_task = 0;
1717 if (NULL == mst->tmit_handle)
1720 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1721 mst->max_group_generation,
1722 master_transmit_notify, mst);
1726 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1732 * Transmit a message from a channel slave to the multicast group.
1735 slave_transmit_message (struct Slave *slv)
1737 slv->chn.tmit_task = 0;
1738 if (NULL == slv->tmit_handle)
1741 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1742 slave_transmit_notify, slv);
1746 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1752 transmit_message (struct Channel *chn)
1755 ? master_transmit_message ((struct Master *) chn)
1756 : slave_transmit_message ((struct Slave *) chn);
1761 * Queue a message from a channel master for sending to the multicast group.
1764 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1765 uint16_t first_ptype, uint16_t last_ptype)
1767 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1769 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1771 tmit_msg->id = ++mst->max_message_id;
1772 struct GNUNET_PSYC_MessageMethod *pmeth
1773 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1775 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1777 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1779 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1781 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1782 - mst->max_state_message_id);
1786 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1793 * Queue a message from a channel slave for sending to the multicast group.
1796 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1797 uint16_t first_ptype, uint16_t last_ptype)
1799 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1801 struct GNUNET_PSYC_MessageMethod *pmeth
1802 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1803 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1804 tmit_msg->id = ++slv->max_request_id;
1810 * Queue PSYC message parts for sending to multicast.
1812 * @param chn Channel to send to.
1813 * @param client Client the message originates from.
1814 * @param data_size Size of @a data.
1815 * @param data Concatenated message parts.
1816 * @param first_ptype First message part type in @a data.
1817 * @param last_ptype Last message part type in @a data.
1820 queue_message (struct Channel *chn,
1821 struct GNUNET_SERVER_Client *client,
1824 uint16_t first_ptype, uint16_t last_ptype)
1826 struct TransmitMessage *
1827 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
1828 memcpy (&tmit_msg[1], data, data_size);
1829 tmit_msg->client = client;
1830 tmit_msg->size = data_size;
1831 tmit_msg->state = chn->tmit_state;
1833 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
1836 ? master_queue_message ((struct Master *) chn, tmit_msg,
1837 first_ptype, last_ptype)
1838 : slave_queue_message ((struct Slave *) chn, tmit_msg,
1839 first_ptype, last_ptype);
1844 * Cancel transmission of current message.
1846 * @param chn Channel to send to.
1847 * @param client Client the message originates from.
1850 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1852 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1854 struct GNUNET_MessageHeader msg;
1855 msg.size = htons (sizeof (msg));
1856 msg.type = htons (type);
1858 queue_message (chn, client, sizeof (msg), &msg, type, type);
1859 transmit_message (chn);
1861 /* FIXME: cleanup */
1866 * Incoming message from a master or slave client.
1869 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1870 const struct GNUNET_MessageHeader *msg)
1873 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1874 GNUNET_assert (NULL != chn);
1876 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1877 "%p Received message from client.\n", chn);
1878 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
1880 if (GNUNET_YES != chn->ready)
1882 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1883 "%p Channel is not ready, dropping message from client.\n", chn);
1884 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1888 uint16_t size = ntohs (msg->size);
1889 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
1891 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
1893 transmit_cancel (chn, client);
1894 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1898 uint16_t first_ptype = 0, last_ptype = 0;
1900 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
1901 (const char *) &msg[1],
1902 &first_ptype, &last_ptype))
1904 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1905 "%p Received invalid message part from client.\n", chn);
1907 transmit_cancel (chn, client);
1908 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1912 queue_message (chn, client, size - sizeof (*msg), &msg[1],
1913 first_ptype, last_ptype);
1914 transmit_message (chn);
1916 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1921 * Client requests to add a slave to the membership database.
1924 client_recv_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
1925 const struct GNUNET_MessageHeader *msg)
1932 * Client requests to remove a slave from the membership database.
1935 client_recv_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
1936 const struct GNUNET_MessageHeader *msg)
1943 * Client requests channel history from PSYCstore.
1946 client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client,
1947 const struct GNUNET_MessageHeader *msg)
1954 * Client requests best matching state variable from PSYCstore.
1957 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
1958 const struct GNUNET_MessageHeader *msg)
1965 * Client requests state variables with a given prefix from PSYCstore.
1968 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
1969 const struct GNUNET_MessageHeader *msg)
1976 * Initialize the PSYC service.
1978 * @param cls Closure.
1979 * @param server The initialized server.
1980 * @param c Configuration to use.
1983 run (void *cls, struct GNUNET_SERVER_Handle *server,
1984 const struct GNUNET_CONFIGURATION_Handle *c)
1986 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1987 { &client_recv_master_start, NULL,
1988 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
1990 { &client_recv_slave_join, NULL,
1991 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
1993 { &client_recv_join_decision, NULL,
1994 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
1996 { &client_recv_psyc_message, NULL,
1997 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
1999 { &client_recv_slave_add, NULL,
2000 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
2002 { &client_recv_slave_remove, NULL,
2003 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
2005 { &client_recv_story_request, NULL,
2006 GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
2008 { &client_recv_state_get, NULL,
2009 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2011 { &client_recv_state_get_prefix, NULL,
2012 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2014 { NULL, NULL, 0, 0 }
2018 store = GNUNET_PSYCSTORE_connect (cfg);
2019 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2020 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2021 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2022 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2023 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2024 nc = GNUNET_SERVER_notification_context_create (server, 1);
2025 GNUNET_SERVER_add_handlers (server, handlers);
2026 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2027 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2028 &shutdown_task, NULL);
2033 * The main function for the service.
2035 * @param argc number of arguments from the command line
2036 * @param argv command line arguments
2037 * @return 0 ok, 1 on error
2040 main (int argc, char *const *argv)
2042 return (GNUNET_OK ==
2043 GNUNET_SERVICE_run (argc, argv, "psyc",
2044 GNUNET_SERVICE_OPTION_NONE,
2045 &run, NULL)) ? 0 : 1;
2048 /* end of gnunet-service-psyc.c */