2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
41 * Handle to our current configuration.
43 static const struct GNUNET_CONFIGURATION_Handle *cfg;
46 * Handle to the statistics service.
48 static struct GNUNET_STATISTICS_Handle *stats;
51 * Notification context, simplifies client broadcasts.
53 static struct GNUNET_SERVER_NotificationContext *nc;
56 * Handle to the PSYCstore.
58 static struct GNUNET_PSYCSTORE_Handle *store;
61 * All connected masters.
62 * Channel's pub_key_hash -> struct Master
64 static struct GNUNET_CONTAINER_MultiHashMap *masters;
67 * All connected slaves.
68 * Channel's pub_key_hash -> struct Slave
70 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
73 * Connected slaves per channel.
74 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
80 * Message in the transmission queue.
82 struct TransmitMessage
84 struct TransmitMessage *prev;
85 struct TransmitMessage *next;
87 struct GNUNET_SERVER_Client *client;
90 * ID assigned to the message.
100 * @see enum MessageState
104 /* Followed by message */
109 * Cache for received message fragments.
110 * Message fragments are only sent to clients after all modifiers arrived.
112 * chan_key -> MultiHashMap chan_msgs
114 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
118 * Entry in the chan_msgs hashmap of @a recv_cache:
119 * fragment_id -> RecvCacheEntry
121 struct RecvCacheEntry
123 struct GNUNET_MULTICAST_MessageHeader *mmsg;
129 * Entry in the @a recv_frags hash map of a @a Channel.
130 * message_id -> FragmentQueue
135 * Fragment IDs stored in @a recv_cache.
137 struct GNUNET_CONTAINER_Heap *fragments;
140 * Total size of received fragments.
145 * Total size of received header fragments (METHOD & MODIFIERs)
147 uint64_t header_size;
150 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
152 uint64_t state_delta;
155 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
160 * Receive state of message.
162 * @see MessageFragmentState
167 * Is the message queued for delivery to the client?
168 * i.e. added to the recv_msgs queue
175 * List of connected clients.
179 struct ClientList *prev;
180 struct ClientList *next;
181 struct GNUNET_SERVER_Client *client;
186 * Common part of the client context for both a channel master and slave.
190 struct ClientList *clients_head;
191 struct ClientList *clients_tail;
193 struct TransmitMessage *tmit_head;
194 struct TransmitMessage *tmit_tail;
197 * Current PSYCstore operation.
199 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
202 * Received fragments not yet sent to the client.
203 * message_id -> FragmentQueue
205 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
208 * Received message IDs not yet sent to the client.
210 struct GNUNET_CONTAINER_Heap *recv_msgs;
215 GNUNET_SCHEDULER_TaskIdentifier tmit_task;
218 * Public key of the channel.
220 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
223 * Hash of @a pub_key.
225 struct GNUNET_HashCode pub_key_hash;
228 * Last message ID sent to the client.
229 * 0 if there is no such message.
231 uint64_t max_message_id;
234 * ID of the last stateful message, where the state operations has been
235 * processed and saved to PSYCstore and which has been sent to the client.
236 * 0 if there is no such message.
238 uint64_t max_state_message_id;
241 * Expected value size for the modifier being received from the PSYC service.
243 uint32_t tmit_mod_value_size_expected;
246 * Actual value size for the modifier being received from the PSYC service.
248 uint32_t tmit_mod_value_size;
251 * @see enum MessageState
261 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
266 * Ready to receive messages from client? #GNUNET_YES or #GNUNET_NO
271 * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
273 uint8_t disconnected;
278 * Client context for a channel master.
283 * Channel struct common for Master and Slave
288 * Private key of the channel.
290 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
293 * Handle for the multicast origin.
295 struct GNUNET_MULTICAST_Origin *origin;
298 * Transmit handle for multicast.
300 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
303 * Incoming join requests from multicast.
304 * member_key -> struct GNUNET_MULTICAST_JoinHandle *
306 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
309 * Last message ID transmitted to this channel.
311 * Incremented before sending a message, thus the message_id in messages sent
314 uint64_t max_message_id;
317 * ID of the last message with state operations transmitted to the channel.
318 * 0 if there is no such message.
320 uint64_t max_state_message_id;
323 * Maximum group generation transmitted to the channel.
325 uint64_t max_group_generation;
328 * @see enum GNUNET_PSYC_Policy
335 * Client context for a channel slave.
340 * Channel struct common for Master and Slave
345 * Private key of the slave.
347 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
350 * Public key of the slave.
352 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
355 * Hash of @a pub_key.
357 struct GNUNET_HashCode pub_key_hash;
360 * Handle for the multicast member.
362 struct GNUNET_MULTICAST_Member *member;
365 * Transmit handle for multicast.
367 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
370 * Peer identity of the origin.
372 struct GNUNET_PeerIdentity origin;
375 * Number of items in @a relays.
377 uint32_t relay_count;
380 * Relays that multicast can use to connect.
382 struct GNUNET_PeerIdentity *relays;
385 * Join request to be transmitted to the master on join.
387 struct GNUNET_MessageHeader *join_req;
390 * Maximum request ID for this channel.
392 uint64_t max_request_id;
397 transmit_message (struct Channel *ch);
401 * Task run during shutdown.
407 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
411 GNUNET_SERVER_notification_context_destroy (nc);
416 GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
423 * Clean up master data structures after a client disconnected.
426 cleanup_master (struct Master *mst)
428 struct Channel *ch = &mst->ch;
430 if (NULL != mst->origin)
431 GNUNET_MULTICAST_origin_stop (mst->origin);
432 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
433 GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
438 * Clean up slave data structures after a client disconnected.
441 cleanup_slave (struct Slave *slv)
443 struct Channel *ch = &slv->ch;
444 struct GNUNET_CONTAINER_MultiHashMap *
445 ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
447 GNUNET_assert (NULL != ch_slv);
448 GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv);
450 if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv))
452 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash,
454 GNUNET_CONTAINER_multihashmap_destroy (ch_slv);
456 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv);
458 if (NULL != slv->join_req)
459 GNUNET_free (slv->join_req);
460 if (NULL != slv->relays)
461 GNUNET_free (slv->relays);
462 if (NULL != slv->member)
463 GNUNET_MULTICAST_member_part (slv->member);
464 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
469 * Clean up channel data structures after a client disconnected.
472 cleanup_channel (struct Channel *ch)
474 /* FIXME: fragment_cache_clear */
476 if (NULL != ch->store_op)
477 GNUNET_PSYCSTORE_operation_cancel (ch->store_op);
479 (GNUNET_YES == ch->is_master)
480 ? cleanup_master ((struct Master *) ch)
481 : cleanup_slave ((struct Slave *) ch);
487 * Called whenever a client is disconnected.
488 * Frees our resources associated with that client.
490 * @param cls Closure.
491 * @param client Identification of the client.
494 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
500 ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
501 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
502 "%p Client (%s) disconnected from channel %s\n",
503 ch, (GNUNET_YES == ch->is_master) ? "master" : "slave",
504 GNUNET_h2s (&ch->pub_key_hash));
508 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
509 "%p User context is NULL in client_disconnect()\n", ch);
514 struct ClientList *cl = ch->clients_head;
517 if (cl->client == client)
519 GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl);
526 if (NULL == ch->clients_head)
527 { /* Last client disconnected. */
528 if (NULL != ch->tmit_head)
529 { /* Send pending messages to multicast before cleanup. */
530 transmit_message (ch);
534 cleanup_channel (ch);
541 * Send message to all clients connected to the channel.
544 msg_to_clients (const struct Channel *ch,
545 const struct GNUNET_MessageHeader *msg)
547 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
548 "%p Sending message to clients.\n", ch);
550 struct ClientList *cl = ch->clients_head;
553 GNUNET_SERVER_notification_context_add (nc, cl->client);
554 GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO);
561 * Closure for join_mem_test_cb()
563 struct JoinMemTestClosure
565 struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
567 struct GNUNET_MULTICAST_JoinHandle *jh;
568 struct MasterJoinRequest *master_join_req;
573 * Membership test result callback used for join requests.m
576 join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
578 struct JoinMemTestClosure *jcls = cls;
580 if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master)
581 { /* Pass on join request to client if this is a master channel */
582 struct Master *mst = (struct Master *) jcls->ch;
583 struct GNUNET_HashCode slave_key_hash;
584 GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
586 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
587 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
588 msg_to_clients (jcls->ch,
589 (struct GNUNET_MessageHeader *) jcls->master_join_req);
594 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
596 GNUNET_free (jcls->master_join_req);
602 * Incoming join request from multicast.
605 join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
606 const struct GNUNET_MessageHeader *join_msg,
607 struct GNUNET_MULTICAST_JoinHandle *jh)
609 struct Channel *ch = cls;
610 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch);
612 uint16_t join_msg_size = 0;
613 if (NULL != join_msg)
615 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
617 join_msg_size = ntohs (join_msg->size);
621 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
622 "%p Got join message with invalid type %u.\n",
623 ch, ntohs (join_msg->type));
627 struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + join_msg_size);
628 req->header.size = htons (sizeof (*req) + join_msg_size);
629 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
630 req->slave_key = *slave_key;
631 memcpy (&req[1], join_msg, join_msg_size);
633 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
634 jcls->slave_key = *slave_key;
637 jcls->master_join_req = req;
639 GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key,
640 ch->max_message_id, 0,
641 &join_mem_test_cb, jcls);
646 membership_test_cb (void *cls,
647 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
648 uint64_t message_id, uint64_t group_generation,
649 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
656 replay_fragment_cb (void *cls,
657 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
658 uint64_t fragment_id, uint64_t flags,
659 struct GNUNET_MULTICAST_ReplayHandle *rh)
667 replay_message_cb (void *cls,
668 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
670 uint64_t fragment_offset,
672 struct GNUNET_MULTICAST_ReplayHandle *rh)
679 fragment_store_result (void *cls, int64_t result, const char *err_msg)
681 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
682 "fragment_store() returned %l (%s)\n", result, err_msg);
687 * Convert an uint64_t in network byte order to a HashCode
688 * that can be used as key in a MultiHashMap
691 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
693 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
694 /* TODO: use built-in byte swap functions if available */
696 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
697 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
699 *key = (struct GNUNET_HashCode) {{ 0 }};
701 = (n << 32) | (n >> 32);
706 * Convert an uint64_t in host byte order to a HashCode
707 * that can be used as key in a MultiHashMap
710 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
712 #if __BYTE_ORDER == __BIG_ENDIAN
713 hash_key_from_nll (key, n);
714 #elif __BYTE_ORDER == __LITTLE_ENDIAN
715 *key = (struct GNUNET_HashCode) {{ 0 }};
716 *((uint64_t *) key) = n;
718 #error byteorder undefined
724 * Send multicast message to all clients connected to the channel.
727 mmsg_to_clients (struct Channel *ch,
728 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
730 uint16_t size = ntohs (mmsg->header.size);
731 struct GNUNET_PSYC_MessageHeader *pmsg;
732 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735 "%p Sending message to client. "
736 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
737 ch, GNUNET_ntohll (mmsg->fragment_id),
738 GNUNET_ntohll (mmsg->message_id));
740 pmsg = GNUNET_malloc (psize);
741 pmsg->header.size = htons (psize);
742 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
743 pmsg->message_id = mmsg->message_id;
745 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
746 msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
752 * Insert a multicast message fragment into the queue belonging to the message.
755 * @param mmsg Multicast message fragment.
756 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
757 * @param first_ptype First PSYC message part type in @a mmsg.
758 * @param last_ptype Last PSYC message part type in @a mmsg.
761 fragment_queue_insert (struct Channel *ch,
762 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
763 uint16_t first_ptype, uint16_t last_ptype)
765 const uint16_t size = ntohs (mmsg->header.size);
766 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
767 struct GNUNET_CONTAINER_MultiHashMap
768 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
771 struct GNUNET_HashCode msg_id_hash;
772 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
775 *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
779 fragq = GNUNET_new (struct FragmentQueue);
780 fragq->state = MSG_FRAG_STATE_HEADER;
782 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
784 GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq,
785 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
787 if (NULL == chan_msgs)
789 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
790 GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs,
791 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
795 struct GNUNET_HashCode frag_id_hash;
796 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
797 struct RecvCacheEntry
798 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
799 if (NULL == cache_entry)
801 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
802 "%p Adding message fragment to cache. "
803 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
804 "header_size: %" PRIu64 " + %u).\n",
805 ch, GNUNET_ntohll (mmsg->message_id),
806 GNUNET_ntohll (mmsg->fragment_id),
807 fragq->header_size, size);
808 cache_entry = GNUNET_new (struct RecvCacheEntry);
809 cache_entry->ref_count = 1;
810 cache_entry->mmsg = GNUNET_malloc (size);
811 memcpy (cache_entry->mmsg, mmsg, size);
812 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
813 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
817 cache_entry->ref_count++;
818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819 "%p Message fragment is already in cache. "
820 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
822 ch, GNUNET_ntohll (mmsg->message_id),
823 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
826 if (MSG_FRAG_STATE_HEADER == fragq->state)
828 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
830 struct GNUNET_PSYC_MessageMethod *
831 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
832 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
833 fragq->flags = ntohl (pmeth->flags);
836 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
838 fragq->header_size += size;
840 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
841 || frag_offset == fragq->header_size)
842 { /* header is now complete */
843 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
844 "%p Header of message %" PRIu64 " is complete.\n",
845 ch, GNUNET_ntohll (mmsg->message_id));
847 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
848 "%p Adding message %" PRIu64 " to queue.\n",
849 ch, GNUNET_ntohll (mmsg->message_id));
850 fragq->state = MSG_FRAG_STATE_DATA;
854 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
855 "%p Header of message %" PRIu64 " is NOT complete yet: "
856 "%" PRIu64 " != %" PRIu64 "\n",
857 ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
864 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
865 if (frag_offset == fragq->size)
866 fragq->state = MSG_FRAG_STATE_END;
868 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
869 "%p Message %" PRIu64 " is NOT complete yet: "
870 "%" PRIu64 " != %" PRIu64 "\n",
871 ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
875 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
876 /* Drop message without delivering to client if it's a single fragment */
878 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
879 ? MSG_FRAG_STATE_DROP
880 : MSG_FRAG_STATE_CANCEL;
883 switch (fragq->state)
885 case MSG_FRAG_STATE_DATA:
886 case MSG_FRAG_STATE_END:
887 case MSG_FRAG_STATE_CANCEL:
888 if (GNUNET_NO == fragq->queued)
890 GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL,
891 GNUNET_ntohll (mmsg->message_id));
892 fragq->queued = GNUNET_YES;
897 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
898 GNUNET_ntohll (mmsg->fragment_id));
903 * Run fragment queue of a message.
905 * Send fragments of a message in order to client, after all modifiers arrived
909 * @param msg_id ID of the message @a fragq belongs to.
910 * @param fragq Fragment queue of the message.
911 * @param drop Drop message without delivering to client?
912 * #GNUNET_YES or #GNUNET_NO.
915 fragment_queue_run (struct Channel *ch, uint64_t msg_id,
916 struct FragmentQueue *fragq, uint8_t drop)
918 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
919 "%p Running message fragment queue for message %" PRIu64
921 ch, msg_id, fragq->state);
923 struct GNUNET_CONTAINER_MultiHashMap
924 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
926 GNUNET_assert (NULL != chan_msgs);
929 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
932 struct GNUNET_HashCode frag_id_hash;
933 hash_key_from_hll (&frag_id_hash, frag_id);
934 struct RecvCacheEntry *cache_entry
935 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
936 if (cache_entry != NULL)
938 if (GNUNET_NO == drop)
940 mmsg_to_clients (ch, cache_entry->mmsg);
942 if (cache_entry->ref_count <= 1)
944 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
946 GNUNET_free (cache_entry->mmsg);
947 GNUNET_free (cache_entry);
951 cache_entry->ref_count--;
954 #if CACHE_AGING_IMPLEMENTED
955 else if (GNUNET_NO == drop)
957 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
961 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
964 if (MSG_FRAG_STATE_END <= fragq->state)
966 struct GNUNET_HashCode msg_id_hash;
967 hash_key_from_nll (&msg_id_hash, msg_id);
969 GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq);
970 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
975 fragq->queued = GNUNET_NO;
983 * Send messages in queue to client in order after a message has arrived from
984 * multicast, according to the following:
985 * - A message is only sent if all of its modifiers arrived.
986 * - A stateful message is only sent if the previous stateful message
987 * has already been delivered to the client.
990 * @return Number of messages removed from queue and sent to client.
993 message_queue_run (struct Channel *ch)
995 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
996 "%p Running message queue.\n", ch);
999 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
1002 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1003 "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id);
1004 struct GNUNET_HashCode msg_id_hash;
1005 hash_key_from_hll (&msg_id_hash, msg_id);
1007 struct FragmentQueue *
1008 fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
1010 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1012 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1013 "%p No fragq (%p) or header not complete.\n",
1018 if (MSG_FRAG_STATE_HEADER == fragq->state)
1020 /* Check if there's a missing message before the current one */
1021 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1023 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1024 && msg_id - 1 != ch->max_message_id)
1026 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1027 "%p Out of order message. "
1028 "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1029 ch, msg_id, ch->max_message_id);
1035 if (msg_id - fragq->state_delta != ch->max_state_message_id)
1037 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1038 "%p Out of order stateful message. "
1039 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1040 ch, msg_id, fragq->state_delta, ch->max_state_message_id);
1044 /* FIXME: apply modifiers to state in PSYCstore */
1045 GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id,
1046 state_modify_result_cb, cls);
1048 ch->max_state_message_id = msg_id;
1050 ch->max_message_id = msg_id;
1052 fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1053 GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
1056 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057 "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
1063 * Handle incoming message from multicast.
1065 * @param ch Channel.
1066 * @param mmsg Multicast message.
1068 * @return #GNUNET_OK or #GNUNET_SYSERR
1071 handle_multicast_message (struct Channel *ch,
1072 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1074 GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL);
1076 uint16_t size = ntohs (mmsg->header.size);
1077 uint16_t first_ptype = 0, last_ptype = 0;
1080 == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
1081 (const char *) &mmsg[1],
1082 &first_ptype, &last_ptype))
1084 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1085 "%p Received message with invalid parts from multicast. "
1086 "Dropping message.\n", ch);
1087 GNUNET_break_op (0);
1088 return GNUNET_SYSERR;
1091 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1092 "Message parts: first: type %u, last: type %u\n",
1093 first_ptype, last_ptype);
1095 fragment_queue_insert (ch, mmsg, first_ptype, last_ptype);
1096 message_queue_run (ch);
1103 * Incoming message fragment from multicast.
1105 * Store it using PSYCstore and send it to the client of the channel.
1108 message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
1110 struct Channel *ch = cls;
1111 uint16_t type = ntohs (msg->type);
1112 uint16_t size = ntohs (msg->size);
1114 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1115 "%p Received message of type %u and size %u from multicast.\n",
1120 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
1122 handle_multicast_message (ch, (const struct
1123 GNUNET_MULTICAST_MessageHeader *) msg);
1127 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1128 "%p Dropping unknown message of type %u and size %u.\n",
1135 * Incoming request fragment from multicast for a master.
1137 * @param cls Master.
1138 * @param slave_key Sending slave's public key.
1139 * @param msg The message.
1140 * @param flags Request flags.
1143 request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1144 const struct GNUNET_MessageHeader *msg,
1145 enum GNUNET_MULTICAST_MessageFlags flags)
1147 struct Master *mst = cls;
1148 struct Channel *ch = &mst->ch;
1150 uint16_t type = ntohs (msg->type);
1151 uint16_t size = ntohs (msg->size);
1153 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1154 "%p Received request of type %u and size %u from multicast.\n",
1159 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
1161 const struct GNUNET_MULTICAST_RequestHeader *req
1162 = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
1164 /* FIXME: see message_cb() */
1165 if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req),
1166 (const char *) &req[1],
1169 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1170 "%p Dropping request with invalid parts "
1171 "received from multicast.\n", ch);
1172 GNUNET_break_op (0);
1176 struct GNUNET_PSYC_MessageHeader *pmsg;
1177 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1178 pmsg = GNUNET_malloc (psize);
1179 pmsg->header.size = htons (psize);
1180 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1181 pmsg->message_id = req->request_id;
1182 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1184 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1185 msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
1190 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1191 "%p Dropping unknown request of type %u and size %u.\n",
1193 GNUNET_break_op (0);
1199 * Response from PSYCstore with the current counter values for a channel master.
1202 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1203 uint64_t max_message_id, uint64_t max_group_generation,
1204 uint64_t max_state_message_id)
1206 struct Master *mst = cls;
1207 struct Channel *ch = &mst->ch;
1208 ch->store_op = NULL;
1210 struct CountersResult res;
1211 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1212 res.header.size = htons (sizeof (res));
1213 res.result_code = htonl (result);
1214 res.max_message_id = GNUNET_htonll (max_message_id);
1216 if (GNUNET_OK == result || GNUNET_NO == result)
1218 mst->max_message_id = max_message_id;
1219 ch->max_message_id = max_message_id;
1220 ch->max_state_message_id = max_state_message_id;
1221 mst->max_group_generation = max_group_generation;
1223 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
1225 join_cb, membership_test_cb,
1226 replay_fragment_cb, replay_message_cb,
1227 request_cb, message_cb, ch);
1228 ch->ready = GNUNET_YES;
1232 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1233 "%p GNUNET_PSYCSTORE_counters_get() "
1234 "returned %d for channel %s.\n",
1235 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1238 msg_to_clients (ch, &res.header);
1243 * Response from PSYCstore with the current counter values for a channel slave.
1246 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1247 uint64_t max_message_id, uint64_t max_group_generation,
1248 uint64_t max_state_message_id)
1250 struct Slave *slv = cls;
1251 struct Channel *ch = &slv->ch;
1252 ch->store_op = NULL;
1254 struct CountersResult res;
1255 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1256 res.header.size = htons (sizeof (res));
1257 res.result_code = htonl (result);
1258 res.max_message_id = GNUNET_htonll (max_message_id);
1260 if (GNUNET_OK == result || GNUNET_NO == result)
1262 ch->max_message_id = max_message_id;
1263 ch->max_state_message_id = max_state_message_id;
1265 = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
1267 slv->relay_count, slv->relays,
1268 slv->join_req, join_cb,
1270 replay_fragment_cb, replay_message_cb,
1272 ch->ready = GNUNET_YES;
1276 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1277 "%p GNUNET_PSYCSTORE_counters_get() "
1278 "returned %d for channel %s.\n",
1279 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1282 msg_to_clients (ch, &res.header);
1287 channel_init (struct Channel *ch)
1290 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1291 ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1296 * Handle a connecting client starting a channel master.
1299 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1300 const struct GNUNET_MessageHeader *msg)
1302 const struct MasterStartRequest *req
1303 = (const struct MasterStartRequest *) msg;
1305 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1306 struct GNUNET_HashCode pub_key_hash;
1308 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1309 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1312 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1317 mst = GNUNET_new (struct Master);
1318 mst->policy = ntohl (req->policy);
1319 mst->priv_key = req->channel_key;
1320 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1323 ch->is_master = GNUNET_YES;
1324 ch->pub_key = pub_key;
1325 ch->pub_key_hash = pub_key_hash;
1328 GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
1329 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1330 ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
1331 master_counters_cb, mst);
1337 struct CountersResult res;
1338 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1339 res.header.size = htons (sizeof (res));
1340 res.result_code = htonl (GNUNET_OK);
1341 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1343 GNUNET_SERVER_notification_context_add (nc, client);
1344 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1348 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1349 "%p Client connected as master to channel %s.\n",
1350 mst, GNUNET_h2s (&ch->pub_key_hash));
1352 struct ClientList *cl = GNUNET_new (struct ClientList);
1353 cl->client = client;
1354 GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
1356 GNUNET_SERVER_client_set_user_context (client, ch);
1357 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1362 * Handle a connecting client joining as a channel slave.
1365 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1366 const struct GNUNET_MessageHeader *msg)
1368 const struct SlaveJoinRequest *req
1369 = (const struct SlaveJoinRequest *) msg;
1371 struct GNUNET_CRYPTO_EddsaPublicKey slv_pub_key;
1372 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1374 GNUNET_CRYPTO_eddsa_key_get_public (&req->slave_key, &slv_pub_key);
1375 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1376 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1378 struct GNUNET_CONTAINER_MultiHashMap *
1379 ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1380 struct Slave *slv = NULL;
1385 slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash);
1389 slv = GNUNET_new (struct Slave);
1390 slv->priv_key = req->slave_key;
1391 slv->origin = req->origin;
1392 slv->relay_count = ntohl (req->relay_count);
1393 if (0 < slv->relay_count)
1395 const struct GNUNET_PeerIdentity *relays
1396 = (const struct GNUNET_PeerIdentity *) &req[1];
1398 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
1400 for (i = 0; i < slv->relay_count; i++)
1401 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
1405 ch->is_master = GNUNET_NO;
1406 ch->pub_key = req->channel_key;
1407 ch->pub_key_hash = pub_key_hash;
1412 ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1413 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &pub_key_hash, ch_slv,
1414 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1416 GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv_pub_key_hash, ch,
1417 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1418 GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
1419 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1420 ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
1421 slave_counters_cb, slv);
1427 struct CountersResult res;
1428 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1429 res.header.size = htons (sizeof (res));
1430 res.result_code = htonl (GNUNET_OK);
1431 res.max_message_id = GNUNET_htonll (ch->max_message_id);
1433 GNUNET_SERVER_notification_context_add (nc, client);
1434 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1438 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1439 "%p Client connected as slave to channel %s.\n",
1440 slv, GNUNET_h2s (&ch->pub_key_hash));
1442 struct ClientList *cl = GNUNET_new (struct ClientList);
1443 cl->client = client;
1444 GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
1446 GNUNET_SERVER_client_set_user_context (client, &slv->ch);
1447 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1451 struct JoinDecisionClosure
1453 uint8_t is_admitted;
1454 struct GNUNET_MessageHeader *msg;
1459 * Iterator callback for responding to join requests of a slave.
1462 join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1465 struct JoinDecisionClosure *jcls = cls;
1466 // FIXME: add relays
1467 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1473 * Join decision from client.
1476 handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1477 const struct GNUNET_MessageHeader *msg)
1480 ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1481 GNUNET_assert (GNUNET_YES == ch->is_master);
1482 struct Master *mst = (struct Master *) ch;
1484 struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg;
1485 struct JoinDecisionClosure jcls;
1486 jcls.is_admitted = dcsn->is_admitted;
1488 = (sizeof (*dcsn) + sizeof (struct GNUNET_PSYC_MessageHeader)
1489 <= ntohs (msg->size))
1490 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1493 struct GNUNET_HashCode slave_key_hash;
1494 GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1496 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1497 &join_decision_cb, &jcls);
1498 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1503 * Send acknowledgement to a client.
1505 * Sent after a message fragment has been passed on to multicast.
1507 * @param ch The channel struct for the client.
1510 send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1512 struct GNUNET_MessageHeader res;
1513 res.size = htons (sizeof (res));
1514 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1517 GNUNET_SERVER_notification_context_add (nc, client);
1518 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1523 * Callback for the transmit functions of multicast.
1526 transmit_notify (void *cls, size_t *data_size, void *data)
1528 struct Channel *ch = cls;
1529 struct TransmitMessage *tmit_msg = ch->tmit_head;
1531 if (NULL == tmit_msg || *data_size < tmit_msg->size)
1533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1534 "%p transmit_notify: nothing to send.\n", ch);
1539 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1540 "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
1542 *data_size = tmit_msg->size;
1543 memcpy (data, &tmit_msg[1], *data_size);
1545 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
1546 if (NULL != tmit_msg->client)
1547 send_message_ack (ch, tmit_msg->client);
1549 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
1550 GNUNET_free (tmit_msg);
1552 if (0 == ch->tmit_task)
1554 if (NULL != ch->tmit_head)
1556 transmit_message (ch);
1558 else if (ch->disconnected)
1560 /* FIXME: handle partial message (when still in_transmit) */
1561 cleanup_channel (ch);
1570 * Callback for the transmit functions of multicast.
1573 master_transmit_notify (void *cls, size_t *data_size, void *data)
1575 int ret = transmit_notify (cls, data_size, data);
1577 if (GNUNET_YES == ret)
1579 struct Master *mst = cls;
1580 mst->tmit_handle = NULL;
1587 * Callback for the transmit functions of multicast.
1590 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1592 int ret = transmit_notify (cls, data_size, data);
1594 if (GNUNET_YES == ret)
1596 struct Slave *slv = cls;
1597 slv->tmit_handle = NULL;
1604 * Transmit a message from a channel master to the multicast group.
1607 master_transmit_message (struct Master *mst)
1609 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
1610 mst->ch.tmit_task = 0;
1611 if (NULL == mst->tmit_handle)
1614 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1615 mst->max_group_generation,
1616 master_transmit_notify, mst);
1620 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1626 * Transmit a message from a channel slave to the multicast group.
1629 slave_transmit_message (struct Slave *slv)
1631 slv->ch.tmit_task = 0;
1632 if (NULL == slv->tmit_handle)
1635 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1636 slave_transmit_notify, slv);
1640 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1646 transmit_message (struct Channel *ch)
1649 ? master_transmit_message ((struct Master *) ch)
1650 : slave_transmit_message ((struct Slave *) ch);
1655 * Queue a message from a channel master for sending to the multicast group.
1658 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1659 uint16_t first_ptype, uint16_t last_ptype)
1661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1663 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1665 tmit_msg->id = ++mst->max_message_id;
1666 struct GNUNET_PSYC_MessageMethod *pmeth
1667 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1669 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1671 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1673 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1675 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1676 - mst->max_state_message_id);
1680 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1687 * Queue a message from a channel slave for sending to the multicast group.
1690 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1691 uint16_t first_ptype, uint16_t last_ptype)
1693 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1695 struct GNUNET_PSYC_MessageMethod *pmeth
1696 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1697 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1698 tmit_msg->id = ++slv->max_request_id;
1704 queue_message (struct Channel *ch,
1705 struct GNUNET_SERVER_Client *client,
1706 const struct GNUNET_MessageHeader *msg,
1707 uint16_t first_ptype, uint16_t last_ptype)
1709 uint16_t size = ntohs (msg->size) - sizeof (*msg);
1710 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
1711 memcpy (&tmit_msg[1], &msg[1], size);
1712 tmit_msg->client = client;
1713 tmit_msg->size = size;
1714 tmit_msg->state = ch->tmit_state;
1716 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
1719 ? master_queue_message ((struct Master *) ch, tmit_msg,
1720 first_ptype, last_ptype)
1721 : slave_queue_message ((struct Slave *) ch, tmit_msg,
1722 first_ptype, last_ptype);
1727 transmit_error (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1729 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1731 struct GNUNET_MessageHeader msg;
1732 msg.size = ntohs (sizeof (msg));
1733 msg.type = ntohs (type);
1735 queue_message (ch, client, &msg, type, type);
1736 transmit_message (ch);
1738 /* FIXME: cleanup */
1743 * Incoming message from a client.
1746 handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1747 const struct GNUNET_MessageHeader *msg)
1750 ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1751 GNUNET_assert (NULL != ch);
1753 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1754 "%p Received message from client.\n", ch);
1755 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
1757 if (GNUNET_YES != ch->ready)
1759 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1760 "%p Dropping message from client, channel is not ready yet.\n",
1762 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1766 uint16_t size = ntohs (msg->size);
1767 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
1769 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
1771 transmit_error (ch, client);
1772 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1776 uint16_t first_ptype = 0, last_ptype = 0;
1778 == GNUNET_PSYC_check_message_parts (size - sizeof (*msg),
1779 (const char *) &msg[1],
1780 &first_ptype, &last_ptype))
1782 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1783 "%p Received invalid message part from client.\n", ch);
1785 transmit_error (ch, client);
1786 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1790 queue_message (ch, client, msg, first_ptype, last_ptype);
1791 transmit_message (ch);
1793 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1798 * Client requests to add a slave to the membership database.
1801 handle_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
1802 const struct GNUNET_MessageHeader *msg)
1809 * Client requests to remove a slave from the membership database.
1812 handle_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
1813 const struct GNUNET_MessageHeader *msg)
1820 * Client requests channel history from PSYCstore.
1823 handle_story_request (void *cls, struct GNUNET_SERVER_Client *client,
1824 const struct GNUNET_MessageHeader *msg)
1831 * Client requests best matching state variable from PSYCstore.
1834 handle_state_get (void *cls, struct GNUNET_SERVER_Client *client,
1835 const struct GNUNET_MessageHeader *msg)
1842 * Client requests state variables with a given prefix from PSYCstore.
1845 handle_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
1846 const struct GNUNET_MessageHeader *msg)
1853 * Initialize the PSYC service.
1855 * @param cls Closure.
1856 * @param server The initialized server.
1857 * @param c Configuration to use.
1860 run (void *cls, struct GNUNET_SERVER_Handle *server,
1861 const struct GNUNET_CONFIGURATION_Handle *c)
1863 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1864 { &handle_master_start, NULL,
1865 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
1867 { &handle_slave_join, NULL,
1868 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
1870 { &handle_join_decision, NULL,
1871 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
1873 { &handle_psyc_message, NULL,
1874 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
1876 { &handle_slave_add, NULL,
1877 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
1879 { &handle_slave_remove, NULL,
1880 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
1882 { &handle_story_request, NULL,
1883 GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
1885 { &handle_state_get, NULL,
1886 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
1888 { &handle_state_get_prefix, NULL,
1889 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
1893 store = GNUNET_PSYCSTORE_connect (cfg);
1894 stats = GNUNET_STATISTICS_create ("psyc", cfg);
1895 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1896 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1897 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1898 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1899 nc = GNUNET_SERVER_notification_context_create (server, 1);
1900 GNUNET_SERVER_add_handlers (server, handlers);
1901 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
1902 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1903 &shutdown_task, NULL);
1908 * The main function for the service.
1910 * @param argc number of arguments from the command line
1911 * @param argv command line arguments
1912 * @return 0 ok, 1 on error
1915 main (int argc, char *const *argv)
1917 return (GNUNET_OK ==
1918 GNUNET_SERVICE_run (argc, argv, "psyc",
1919 GNUNET_SERVICE_OPTION_NONE,
1920 &run, NULL)) ? 0 : 1;
1923 /* end of gnunet-service-psycstore.c */