2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
41 * Handle to our current configuration.
43 static const struct GNUNET_CONFIGURATION_Handle *cfg;
46 * Handle to the statistics service.
48 static struct GNUNET_STATISTICS_Handle *stats;
51 * Notification context, simplifies client broadcasts.
53 static struct GNUNET_SERVER_NotificationContext *nc;
56 * Handle to the PSYCstore.
58 static struct GNUNET_PSYCSTORE_Handle *store;
61 * All connected masters.
62 * Channel's pub_key_hash -> struct Master
64 static struct GNUNET_CONTAINER_MultiHashMap *masters;
67 * All connected slaves.
68 * Channel's pub_key_hash -> struct Slave
70 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
73 * Connected slaves per channel.
74 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
80 * Message in the transmission queue.
82 struct TransmitMessage
84 struct TransmitMessage *prev;
85 struct TransmitMessage *next;
87 struct GNUNET_SERVER_Client *client;
90 * ID assigned to the message.
100 * @see enum MessageState
104 /* Followed by message */
109 * Cache for received message fragments.
110 * Message fragments are only sent to clients after all modifiers arrived.
112 * chan_key -> MultiHashMap chan_msgs
114 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
118 * Entry in the chan_msgs hashmap of @a recv_cache:
119 * fragment_id -> RecvCacheEntry
121 struct RecvCacheEntry
123 struct GNUNET_MULTICAST_MessageHeader *mmsg;
129 * Entry in the @a recv_frags hash map of a @a Channel.
130 * message_id -> FragmentQueue
135 * Fragment IDs stored in @a recv_cache.
137 struct GNUNET_CONTAINER_Heap *fragments;
140 * Total size of received fragments.
145 * Total size of received header fragments (METHOD & MODIFIERs)
147 uint64_t header_size;
150 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
152 uint64_t state_delta;
155 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
160 * Receive state of message.
162 * @see MessageFragmentState
167 * Is the message queued for delivery to the client?
168 * i.e. added to the recv_msgs queue
175 * List of connected clients.
179 struct ClientList *prev;
180 struct ClientList *next;
181 struct GNUNET_SERVER_Client *client;
186 * Common part of the client context for both a channel master and slave.
190 struct ClientList *clients_head;
191 struct ClientList *clients_tail;
193 struct TransmitMessage *tmit_head;
194 struct TransmitMessage *tmit_tail;
197 * Current PSYCstore operation.
199 struct GNUNET_PSYCSTORE_OperationHandle *store_op;
202 * Received fragments not yet sent to the client.
203 * message_id -> FragmentQueue
205 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
208 * Received message IDs not yet sent to the client.
210 struct GNUNET_CONTAINER_Heap *recv_msgs;
215 GNUNET_SCHEDULER_TaskIdentifier tmit_task;
218 * Public key of the channel.
220 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
223 * Hash of @a pub_key.
225 struct GNUNET_HashCode pub_key_hash;
228 * Last message ID sent to the client.
229 * 0 if there is no such message.
231 uint64_t max_message_id;
234 * ID of the last stateful message, where the state operations has been
235 * processed and saved to PSYCstore and which has been sent to the client.
236 * 0 if there is no such message.
238 uint64_t max_state_message_id;
241 * Expected value size for the modifier being received from the PSYC service.
243 uint32_t tmit_mod_value_size_expected;
246 * Actual value size for the modifier being received from the PSYC service.
248 uint32_t tmit_mod_value_size;
251 * @see enum MessageState
261 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
266 * Ready to receive messages from client? #GNUNET_YES or #GNUNET_NO
271 * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
273 uint8_t disconnected;
278 * Client context for a channel master.
283 * Channel struct common for Master and Slave
288 * Private key of the channel.
290 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
293 * Handle for the multicast origin.
295 struct GNUNET_MULTICAST_Origin *origin;
298 * Transmit handle for multicast.
300 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
303 * Incoming join requests from multicast.
304 * member_key -> struct GNUNET_MULTICAST_JoinHandle *
306 struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
309 * Last message ID transmitted to this channel.
311 * Incremented before sending a message, thus the message_id in messages sent
314 uint64_t max_message_id;
317 * ID of the last message with state operations transmitted to the channel.
318 * 0 if there is no such message.
320 uint64_t max_state_message_id;
323 * Maximum group generation transmitted to the channel.
325 uint64_t max_group_generation;
328 * @see enum GNUNET_PSYC_Policy
330 enum GNUNET_PSYC_Policy policy;
335 * Client context for a channel slave.
340 * Channel struct common for Master and Slave
345 * Private key of the slave.
347 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
350 * Public key of the slave.
352 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
355 * Hash of @a pub_key.
357 struct GNUNET_HashCode pub_key_hash;
360 * Handle for the multicast member.
362 struct GNUNET_MULTICAST_Member *member;
365 * Transmit handle for multicast.
367 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
370 * Peer identity of the origin.
372 struct GNUNET_PeerIdentity origin;
375 * Number of items in @a relays.
377 uint32_t relay_count;
380 * Relays that multicast can use to connect.
382 struct GNUNET_PeerIdentity *relays;
385 * Join request to be transmitted to the master on join.
387 struct GNUNET_MessageHeader *join_req;
390 * Join decision received from multicast.
392 struct SlaveJoinDecision *join_dcsn;
395 * Maximum request ID for this channel.
397 uint64_t max_request_id;
402 transmit_message (struct Channel *ch);
406 message_queue_drop (struct Channel *ch);
410 * Task run during shutdown.
416 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
420 GNUNET_SERVER_notification_context_destroy (nc);
425 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
432 * Clean up master data structures after a client disconnected.
435 cleanup_master (struct Master *mst)
437 struct Channel *ch = &mst->ch;
439 if (NULL != mst->origin)
440 GNUNET_MULTICAST_origin_stop (mst->origin);
441 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
442 GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
447 * Clean up slave data structures after a client disconnected.
450 cleanup_slave (struct Slave *slv)
452 struct Channel *ch = &slv->ch;
453 struct GNUNET_CONTAINER_MultiHashMap *
454 ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
456 GNUNET_assert (NULL != ch_slv);
457 GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv);
459 if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv))
461 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash,
463 GNUNET_CONTAINER_multihashmap_destroy (ch_slv);
465 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv);
467 if (NULL != slv->join_req)
468 GNUNET_free (slv->join_req);
469 if (NULL != slv->relays)
470 GNUNET_free (slv->relays);
471 if (NULL != slv->member)
472 GNUNET_MULTICAST_member_part (slv->member);
473 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
478 * Clean up channel data structures after a client disconnected.
481 cleanup_channel (struct Channel *ch)
483 message_queue_drop (ch);
484 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &ch->pub_key_hash);
486 if (NULL != ch->store_op)
487 GNUNET_PSYCSTORE_operation_cancel (ch->store_op);
489 (GNUNET_YES == ch->is_master)
490 ? cleanup_master ((struct Master *) ch)
491 : cleanup_slave ((struct Slave *) ch);
497 * Called whenever a client is disconnected.
498 * Frees our resources associated with that client.
500 * @param cls Closure.
501 * @param client Identification of the client.
504 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
510 ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
511 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
512 "%p Client (%s) disconnected from channel %s\n",
513 ch, (GNUNET_YES == ch->is_master) ? "master" : "slave",
514 GNUNET_h2s (&ch->pub_key_hash));
518 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
519 "%p User context is NULL in client_disconnect()\n", ch);
524 struct ClientList *cl = ch->clients_head;
527 if (cl->client == client)
529 GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl);
536 if (NULL == ch->clients_head)
537 { /* Last client disconnected. */
538 if (NULL != ch->tmit_head)
539 { /* Send pending messages to multicast before cleanup. */
540 transmit_message (ch);
544 cleanup_channel (ch);
551 * Send message to all clients connected to the channel.
554 msg_to_clients (const struct Channel *ch,
555 const struct GNUNET_MessageHeader *msg)
557 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
558 "%p Sending message to clients.\n", ch);
560 struct ClientList *cl = ch->clients_head;
563 GNUNET_SERVER_notification_context_add (nc, cl->client);
564 GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO);
571 * Closure for join_mem_test_cb()
573 struct JoinMemTestClosure
575 struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
577 struct GNUNET_MULTICAST_JoinHandle *jh;
578 struct MasterJoinRequest *master_join_req;
583 * Membership test result callback used for join requests.
586 join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
588 struct JoinMemTestClosure *jcls = cls;
590 if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master)
591 { /* Pass on join request to client if this is a master channel */
592 struct Master *mst = (struct Master *) jcls->ch;
593 struct GNUNET_HashCode slave_key_hash;
594 GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
596 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
597 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
598 msg_to_clients (jcls->ch, &jcls->master_join_req->header);
603 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
605 GNUNET_free (jcls->master_join_req);
611 * Incoming join request from multicast.
614 mcast_join_request_cb (void *cls,
615 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
616 const struct GNUNET_MessageHeader *join_msg,
617 struct GNUNET_MULTICAST_JoinHandle *jh)
619 struct Channel *ch = cls;
620 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch);
622 uint16_t join_msg_size = 0;
623 if (NULL != join_msg)
625 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
627 join_msg_size = ntohs (join_msg->size);
631 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
632 "%p Got join message with invalid type %u.\n",
633 ch, ntohs (join_msg->type));
637 struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + join_msg_size);
638 req->header.size = htons (sizeof (*req) + join_msg_size);
639 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
640 req->slave_key = *slave_key;
641 if (0 < join_msg_size)
642 memcpy (&req[1], join_msg, join_msg_size);
644 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
645 jcls->slave_key = *slave_key;
648 jcls->master_join_req = req;
650 GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key,
651 ch->max_message_id, 0,
652 &join_mem_test_cb, jcls);
657 * Join decision received from multicast.
660 mcast_join_decision_cb (void *cls, int is_admitted,
661 const struct GNUNET_PeerIdentity *peer,
662 uint16_t relay_count,
663 const struct GNUNET_PeerIdentity *relays,
664 const struct GNUNET_MessageHeader *join_resp)
666 struct Slave *slv = cls;
667 struct Channel *ch = &slv->ch;
668 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669 "%p Got join decision: %d\n", slv, is_admitted);
671 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
672 struct SlaveJoinDecision *
673 dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
674 dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
675 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
676 dcsn->is_admitted = htonl (is_admitted);
677 if (0 < join_resp_size)
678 memcpy (&dcsn[1], join_resp, join_resp_size);
680 msg_to_clients (ch, &dcsn->header);
682 if (GNUNET_YES == is_admitted)
684 ch->ready = GNUNET_YES;
694 mcast_membership_test_cb (void *cls,
695 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
696 uint64_t message_id, uint64_t group_generation,
697 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
704 mcast_replay_fragment_cb (void *cls,
705 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
706 uint64_t fragment_id, uint64_t flags,
707 struct GNUNET_MULTICAST_ReplayHandle *rh)
715 mcast_replay_message_cb (void *cls,
716 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
718 uint64_t fragment_offset,
720 struct GNUNET_MULTICAST_ReplayHandle *rh)
727 fragment_store_result (void *cls, int64_t result, const char *err_msg)
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "fragment_store() returned %l (%s)\n", result, err_msg);
735 * Convert an uint64_t in network byte order to a HashCode
736 * that can be used as key in a MultiHashMap
739 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
741 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
742 /* TODO: use built-in byte swap functions if available */
744 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
745 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
747 *key = (struct GNUNET_HashCode) {{ 0 }};
749 = (n << 32) | (n >> 32);
754 * Convert an uint64_t in host byte order to a HashCode
755 * that can be used as key in a MultiHashMap
758 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
760 #if __BYTE_ORDER == __BIG_ENDIAN
761 hash_key_from_nll (key, n);
762 #elif __BYTE_ORDER == __LITTLE_ENDIAN
763 *key = (struct GNUNET_HashCode) {{ 0 }};
764 *((uint64_t *) key) = n;
766 #error byteorder undefined
772 * Send multicast message to all clients connected to the channel.
775 mmsg_to_clients (struct Channel *ch,
776 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
778 uint16_t size = ntohs (mmsg->header.size);
779 struct GNUNET_PSYC_MessageHeader *pmsg;
780 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
782 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783 "%p Sending message to client. "
784 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
785 ch, GNUNET_ntohll (mmsg->fragment_id),
786 GNUNET_ntohll (mmsg->message_id));
788 pmsg = GNUNET_malloc (psize);
789 pmsg->header.size = htons (psize);
790 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
791 pmsg->message_id = mmsg->message_id;
793 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
794 msg_to_clients (ch, &pmsg->header);
800 * Insert a multicast message fragment into the queue belonging to the message.
803 * @param mmsg Multicast message fragment.
804 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
805 * @param first_ptype First PSYC message part type in @a mmsg.
806 * @param last_ptype Last PSYC message part type in @a mmsg.
809 fragment_queue_insert (struct Channel *ch,
810 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
811 uint16_t first_ptype, uint16_t last_ptype)
813 const uint16_t size = ntohs (mmsg->header.size);
814 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
815 struct GNUNET_CONTAINER_MultiHashMap
816 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
819 struct GNUNET_HashCode msg_id_hash;
820 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
823 *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
827 fragq = GNUNET_new (struct FragmentQueue);
828 fragq->state = MSG_FRAG_STATE_HEADER;
830 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
832 GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq,
833 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
835 if (NULL == chan_msgs)
837 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
838 GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs,
839 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
843 struct GNUNET_HashCode frag_id_hash;
844 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
845 struct RecvCacheEntry
846 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
847 if (NULL == cache_entry)
849 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850 "%p Adding message fragment to cache. "
851 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
852 "header_size: %" PRIu64 " + %u).\n",
853 ch, GNUNET_ntohll (mmsg->message_id),
854 GNUNET_ntohll (mmsg->fragment_id),
855 fragq->header_size, size);
856 cache_entry = GNUNET_new (struct RecvCacheEntry);
857 cache_entry->ref_count = 1;
858 cache_entry->mmsg = GNUNET_malloc (size);
859 memcpy (cache_entry->mmsg, mmsg, size);
860 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
861 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
865 cache_entry->ref_count++;
866 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
867 "%p Message fragment is already in cache. "
868 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
870 ch, GNUNET_ntohll (mmsg->message_id),
871 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
874 if (MSG_FRAG_STATE_HEADER == fragq->state)
876 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
878 struct GNUNET_PSYC_MessageMethod *
879 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
880 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
881 fragq->flags = ntohl (pmeth->flags);
884 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
886 fragq->header_size += size;
888 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
889 || frag_offset == fragq->header_size)
890 { /* header is now complete */
891 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
892 "%p Header of message %" PRIu64 " is complete.\n",
893 ch, GNUNET_ntohll (mmsg->message_id));
895 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
896 "%p Adding message %" PRIu64 " to queue.\n",
897 ch, GNUNET_ntohll (mmsg->message_id));
898 fragq->state = MSG_FRAG_STATE_DATA;
902 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
903 "%p Header of message %" PRIu64 " is NOT complete yet: "
904 "%" PRIu64 " != %" PRIu64 "\n",
905 ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
912 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
913 if (frag_offset == fragq->size)
914 fragq->state = MSG_FRAG_STATE_END;
916 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
917 "%p Message %" PRIu64 " is NOT complete yet: "
918 "%" PRIu64 " != %" PRIu64 "\n",
919 ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
923 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
924 /* Drop message without delivering to client if it's a single fragment */
926 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
927 ? MSG_FRAG_STATE_DROP
928 : MSG_FRAG_STATE_CANCEL;
931 switch (fragq->state)
933 case MSG_FRAG_STATE_DATA:
934 case MSG_FRAG_STATE_END:
935 case MSG_FRAG_STATE_CANCEL:
936 if (GNUNET_NO == fragq->queued)
938 GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL,
939 GNUNET_ntohll (mmsg->message_id));
940 fragq->queued = GNUNET_YES;
945 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
946 GNUNET_ntohll (mmsg->fragment_id));
951 * Run fragment queue of a message.
953 * Send fragments of a message in order to client, after all modifiers arrived
957 * @param msg_id ID of the message @a fragq belongs to.
958 * @param fragq Fragment queue of the message.
959 * @param drop Drop message without delivering to client?
960 * #GNUNET_YES or #GNUNET_NO.
963 fragment_queue_run (struct Channel *ch, uint64_t msg_id,
964 struct FragmentQueue *fragq, uint8_t drop)
966 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
967 "%p Running message fragment queue for message %" PRIu64
969 ch, msg_id, fragq->state);
971 struct GNUNET_CONTAINER_MultiHashMap
972 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
974 GNUNET_assert (NULL != chan_msgs);
977 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
980 struct GNUNET_HashCode frag_id_hash;
981 hash_key_from_hll (&frag_id_hash, frag_id);
982 struct RecvCacheEntry *cache_entry
983 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
984 if (cache_entry != NULL)
986 if (GNUNET_NO == drop)
988 mmsg_to_clients (ch, cache_entry->mmsg);
990 if (cache_entry->ref_count <= 1)
992 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
994 GNUNET_free (cache_entry->mmsg);
995 GNUNET_free (cache_entry);
999 cache_entry->ref_count--;
1002 #if CACHE_AGING_IMPLEMENTED
1003 else if (GNUNET_NO == drop)
1005 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1009 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1012 if (MSG_FRAG_STATE_END <= fragq->state)
1014 struct GNUNET_HashCode msg_id_hash;
1015 hash_key_from_nll (&msg_id_hash, msg_id);
1017 GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq);
1018 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1019 GNUNET_free (fragq);
1023 fragq->queued = GNUNET_NO;
1029 * Run message queue.
1031 * Send messages in queue to client in order after a message has arrived from
1032 * multicast, according to the following:
1033 * - A message is only sent if all of its modifiers arrived.
1034 * - A stateful message is only sent if the previous stateful message
1035 * has already been delivered to the client.
1037 * @param ch Channel.
1039 * @return Number of messages removed from queue and sent to client.
1042 message_queue_run (struct Channel *ch)
1044 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1045 "%p Running message queue.\n", ch);
1048 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
1051 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1052 "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id);
1053 struct GNUNET_HashCode msg_id_hash;
1054 hash_key_from_hll (&msg_id_hash, msg_id);
1056 struct FragmentQueue *
1057 fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
1059 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1061 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1062 "%p No fragq (%p) or header not complete.\n",
1067 if (MSG_FRAG_STATE_HEADER == fragq->state)
1069 /* Check if there's a missing message before the current one */
1070 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1072 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1073 && msg_id - 1 != ch->max_message_id)
1075 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1076 "%p Out of order message. "
1077 "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1078 ch, msg_id, ch->max_message_id);
1084 if (msg_id - fragq->state_delta != ch->max_state_message_id)
1086 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1087 "%p Out of order stateful message. "
1088 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1089 ch, msg_id, fragq->state_delta, ch->max_state_message_id);
1093 /* FIXME: apply modifiers to state in PSYCstore */
1094 GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id,
1095 state_modify_result_cb, cls);
1097 ch->max_state_message_id = msg_id;
1099 ch->max_message_id = msg_id;
1101 fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1102 GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
1105 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1106 "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
1112 * Drop message queue of a channel.
1114 * Remove all messages in queue without sending it to clients.
1116 * @param ch Channel.
1118 * @return Number of messages removed from queue.
1121 message_queue_drop (struct Channel *ch)
1123 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1124 "%p Dropping message queue.\n", ch);
1127 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
1130 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1131 "%p Dropping message %" PRIu64 " from queue.\n", ch, msg_id);
1132 struct GNUNET_HashCode msg_id_hash;
1133 hash_key_from_hll (&msg_id_hash, msg_id);
1135 struct FragmentQueue *
1136 fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
1138 fragment_queue_run (ch, msg_id, fragq, GNUNET_YES);
1139 GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
1142 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1143 "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
1149 * Handle incoming message from multicast.
1151 * @param ch Channel.
1152 * @param mmsg Multicast message.
1154 * @return #GNUNET_OK or #GNUNET_SYSERR
1157 client_multicast_message (struct Channel *ch,
1158 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1160 GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL);
1162 uint16_t size = ntohs (mmsg->header.size);
1163 uint16_t first_ptype = 0, last_ptype = 0;
1166 == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
1167 (const char *) &mmsg[1],
1168 &first_ptype, &last_ptype))
1170 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1171 "%p Received message with invalid parts from multicast. "
1172 "Dropping message.\n", ch);
1173 GNUNET_break_op (0);
1174 return GNUNET_SYSERR;
1177 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1178 "Message parts: first: type %u, last: type %u\n",
1179 first_ptype, last_ptype);
1181 fragment_queue_insert (ch, mmsg, first_ptype, last_ptype);
1182 message_queue_run (ch);
1189 * Incoming message fragment from multicast.
1191 * Store it using PSYCstore and send it to the client of the channel.
1194 mcast_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
1196 struct Channel *ch = cls;
1197 uint16_t type = ntohs (msg->type);
1198 uint16_t size = ntohs (msg->size);
1200 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1201 "%p Received message of type %u and size %u from multicast.\n",
1206 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
1208 client_multicast_message (ch, (const struct
1209 GNUNET_MULTICAST_MessageHeader *) msg);
1213 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1214 "%p Dropping unknown message of type %u and size %u.\n",
1221 * Incoming request fragment from multicast for a master.
1223 * @param cls Master.
1224 * @param slave_key Sending slave's public key.
1225 * @param msg The message.
1226 * @param flags Request flags.
1229 mcast_request_cb (void *cls,
1230 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1231 const struct GNUNET_MessageHeader *msg,
1232 enum GNUNET_MULTICAST_MessageFlags flags)
1234 struct Master *mst = cls;
1235 struct Channel *ch = &mst->ch;
1237 uint16_t type = ntohs (msg->type);
1238 uint16_t size = ntohs (msg->size);
1240 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1241 "%p Received request of type %u and size %u from multicast.\n",
1246 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
1248 const struct GNUNET_MULTICAST_RequestHeader *req
1249 = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
1251 /* FIXME: see message_cb() */
1252 if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req),
1253 (const char *) &req[1],
1256 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1257 "%p Dropping request with invalid parts "
1258 "received from multicast.\n", ch);
1259 GNUNET_break_op (0);
1263 struct GNUNET_PSYC_MessageHeader *pmsg;
1264 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1265 pmsg = GNUNET_malloc (psize);
1266 pmsg->header.size = htons (psize);
1267 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1268 pmsg->message_id = req->request_id;
1269 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1271 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1272 msg_to_clients (ch, &pmsg->header);
1277 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1278 "%p Dropping unknown request of type %u and size %u.\n",
1280 GNUNET_break_op (0);
1286 * Response from PSYCstore with the current counter values for a channel master.
1289 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1290 uint64_t max_message_id, uint64_t max_group_generation,
1291 uint64_t max_state_message_id)
1293 struct Master *mst = cls;
1294 struct Channel *ch = &mst->ch;
1295 ch->store_op = NULL;
1297 struct CountersResult res;
1298 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1299 res.header.size = htons (sizeof (res));
1300 res.result_code = htonl (result);
1301 res.max_message_id = GNUNET_htonll (max_message_id);
1303 if (GNUNET_OK == result || GNUNET_NO == result)
1305 mst->max_message_id = max_message_id;
1306 ch->max_message_id = max_message_id;
1307 ch->max_state_message_id = max_state_message_id;
1308 mst->max_group_generation = max_group_generation;
1310 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1311 &mcast_join_request_cb,
1312 &mcast_membership_test_cb,
1313 &mcast_replay_fragment_cb,
1314 &mcast_replay_message_cb,
1316 &mcast_message_cb, ch);
1317 ch->ready = GNUNET_YES;
1321 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1322 "%p GNUNET_PSYCSTORE_counters_get() "
1323 "returned %d for channel %s.\n",
1324 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1327 msg_to_clients (ch, &res.header);
1332 * Response from PSYCstore with the current counter values for a channel slave.
1335 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1336 uint64_t max_message_id, uint64_t max_group_generation,
1337 uint64_t max_state_message_id)
1339 struct Slave *slv = cls;
1340 struct Channel *ch = &slv->ch;
1341 ch->store_op = NULL;
1343 struct CountersResult res;
1344 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1345 res.header.size = htons (sizeof (res));
1346 res.result_code = htonl (result);
1347 res.max_message_id = GNUNET_htonll (max_message_id);
1349 if (GNUNET_OK == result || GNUNET_NO == result)
1351 ch->max_message_id = max_message_id;
1352 ch->max_state_message_id = max_state_message_id;
1354 = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
1356 slv->relay_count, slv->relays,
1358 &mcast_join_request_cb,
1359 &mcast_join_decision_cb,
1360 &mcast_membership_test_cb,
1361 &mcast_replay_fragment_cb,
1362 &mcast_replay_message_cb,
1363 &mcast_message_cb, ch);
1367 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1368 "%p GNUNET_PSYCSTORE_counters_get() "
1369 "returned %d for channel %s.\n",
1370 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1373 msg_to_clients (ch, &res.header);
1378 channel_init (struct Channel *ch)
1381 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1382 ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1387 * Handle a connecting client starting a channel master.
1390 client_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1391 const struct GNUNET_MessageHeader *msg)
1393 const struct MasterStartRequest *req
1394 = (const struct MasterStartRequest *) msg;
1396 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1397 struct GNUNET_HashCode pub_key_hash;
1399 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1400 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1403 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1408 mst = GNUNET_new (struct Master);
1409 mst->policy = ntohl (req->policy);
1410 mst->priv_key = req->channel_key;
1411 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1414 ch->is_master = GNUNET_YES;
1415 ch->pub_key = pub_key;
1416 ch->pub_key_hash = pub_key_hash;
1419 GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
1420 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1421 ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
1422 master_counters_cb, mst);
1428 struct CountersResult res;
1429 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1430 res.header.size = htons (sizeof (res));
1431 res.result_code = htonl (GNUNET_OK);
1432 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1434 GNUNET_SERVER_notification_context_add (nc, client);
1435 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1440 "%p Client connected as master to channel %s.\n",
1441 mst, GNUNET_h2s (&ch->pub_key_hash));
1443 struct ClientList *cl = GNUNET_new (struct ClientList);
1444 cl->client = client;
1445 GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
1447 GNUNET_SERVER_client_set_user_context (client, ch);
1448 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1453 * Handle a connecting client joining as a channel slave.
1456 client_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1457 const struct GNUNET_MessageHeader *msg)
1459 const struct SlaveJoinRequest *req
1460 = (const struct SlaveJoinRequest *) msg;
1462 struct GNUNET_CRYPTO_EddsaPublicKey slv_pub_key;
1463 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1465 GNUNET_CRYPTO_eddsa_key_get_public (&req->slave_key, &slv_pub_key);
1466 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1467 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1469 struct GNUNET_CONTAINER_MultiHashMap *
1470 ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1471 struct Slave *slv = NULL;
1476 slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash);
1480 slv = GNUNET_new (struct Slave);
1481 slv->priv_key = req->slave_key;
1482 slv->pub_key = slv_pub_key;
1483 slv->pub_key_hash = slv_pub_key_hash;
1484 slv->origin = req->origin;
1485 slv->relay_count = ntohl (req->relay_count);
1486 if (0 < slv->relay_count)
1488 const struct GNUNET_PeerIdentity *relays
1489 = (const struct GNUNET_PeerIdentity *) &req[1];
1491 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
1493 for (i = 0; i < slv->relay_count; i++)
1494 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
1498 ch->is_master = GNUNET_NO;
1499 ch->pub_key = req->channel_key;
1500 ch->pub_key_hash = pub_key_hash;
1505 ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1506 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &ch->pub_key_hash, ch_slv,
1507 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1509 GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv->pub_key_hash, ch,
1510 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1511 GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
1512 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1513 ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
1514 slave_counters_cb, slv);
1520 struct CountersResult res;
1521 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1522 res.header.size = htons (sizeof (res));
1523 res.result_code = htonl (GNUNET_OK);
1524 res.max_message_id = GNUNET_htonll (ch->max_message_id);
1526 GNUNET_SERVER_notification_context_add (nc, client);
1527 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1530 if (NULL == slv->member)
1533 = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
1535 slv->relay_count, slv->relays,
1537 &mcast_join_request_cb,
1538 &mcast_join_decision_cb,
1539 &mcast_membership_test_cb,
1540 &mcast_replay_fragment_cb,
1541 &mcast_replay_message_cb,
1542 &mcast_message_cb, ch);
1545 else if (NULL != slv->join_dcsn)
1547 GNUNET_SERVER_notification_context_add (nc, client);
1548 GNUNET_SERVER_notification_context_unicast (nc, client,
1549 &slv->join_dcsn->header,
1554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1555 "%p Client connected as slave to channel %s.\n",
1556 slv, GNUNET_h2s (&ch->pub_key_hash));
1558 struct ClientList *cl = GNUNET_new (struct ClientList);
1559 cl->client = client;
1560 GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
1562 GNUNET_SERVER_client_set_user_context (client, &slv->ch);
1563 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1567 struct JoinDecisionClosure
1569 int32_t is_admitted;
1570 struct GNUNET_MessageHeader *msg;
1575 * Iterator callback for responding to join requests of a slave.
1578 send_join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1581 struct JoinDecisionClosure *jcls = cls;
1582 // FIXME: add relays
1583 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1589 * Join decision from client.
1592 client_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1593 const struct GNUNET_MessageHeader *msg)
1596 ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1597 GNUNET_assert (GNUNET_YES == ch->is_master);
1598 struct Master *mst = (struct Master *) ch;
1600 struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg;
1601 struct JoinDecisionClosure jcls;
1602 jcls.is_admitted = ntohl (dcsn->is_admitted);
1604 = (sizeof (*dcsn) + sizeof (struct GNUNET_PSYC_MessageHeader)
1605 <= ntohs (msg->size))
1606 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1609 struct GNUNET_HashCode slave_key_hash;
1610 GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1613 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1614 "%p Got join decision (%d) from client for channel %s..\n",
1615 mst, jcls.is_admitted, GNUNET_h2s (&ch->pub_key_hash));
1616 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617 "%p ..and slave %s.\n",
1618 mst, GNUNET_h2s (&slave_key_hash));
1620 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1621 &send_join_decision_cb, &jcls);
1622 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1623 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1628 * Send acknowledgement to a client.
1630 * Sent after a message fragment has been passed on to multicast.
1632 * @param ch The channel struct for the client.
1635 send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1637 struct GNUNET_MessageHeader res;
1638 res.size = htons (sizeof (res));
1639 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1642 GNUNET_SERVER_notification_context_add (nc, client);
1643 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1648 * Callback for the transmit functions of multicast.
1651 transmit_notify (void *cls, size_t *data_size, void *data)
1653 struct Channel *ch = cls;
1654 struct TransmitMessage *tmit_msg = ch->tmit_head;
1656 if (NULL == tmit_msg || *data_size < tmit_msg->size)
1658 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1659 "%p transmit_notify: nothing to send.\n", ch);
1664 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1665 "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
1667 *data_size = tmit_msg->size;
1668 memcpy (data, &tmit_msg[1], *data_size);
1670 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
1671 if (NULL != tmit_msg->client)
1672 send_message_ack (ch, tmit_msg->client);
1674 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
1675 GNUNET_free (tmit_msg);
1677 if (0 == ch->tmit_task)
1679 if (NULL != ch->tmit_head)
1681 transmit_message (ch);
1683 else if (ch->disconnected)
1685 /* FIXME: handle partial message (when still in_transmit) */
1686 cleanup_channel (ch);
1695 * Callback for the transmit functions of multicast.
1698 master_transmit_notify (void *cls, size_t *data_size, void *data)
1700 int ret = transmit_notify (cls, data_size, data);
1702 if (GNUNET_YES == ret)
1704 struct Master *mst = cls;
1705 mst->tmit_handle = NULL;
1712 * Callback for the transmit functions of multicast.
1715 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1717 int ret = transmit_notify (cls, data_size, data);
1719 if (GNUNET_YES == ret)
1721 struct Slave *slv = cls;
1722 slv->tmit_handle = NULL;
1729 * Transmit a message from a channel master to the multicast group.
1732 master_transmit_message (struct Master *mst)
1734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
1735 mst->ch.tmit_task = 0;
1736 if (NULL == mst->tmit_handle)
1739 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1740 mst->max_group_generation,
1741 master_transmit_notify, mst);
1745 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1751 * Transmit a message from a channel slave to the multicast group.
1754 slave_transmit_message (struct Slave *slv)
1756 slv->ch.tmit_task = 0;
1757 if (NULL == slv->tmit_handle)
1760 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1761 slave_transmit_notify, slv);
1765 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1771 transmit_message (struct Channel *ch)
1774 ? master_transmit_message ((struct Master *) ch)
1775 : slave_transmit_message ((struct Slave *) ch);
1780 * Queue a message from a channel master for sending to the multicast group.
1783 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1784 uint16_t first_ptype, uint16_t last_ptype)
1786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1788 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1790 tmit_msg->id = ++mst->max_message_id;
1791 struct GNUNET_PSYC_MessageMethod *pmeth
1792 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1794 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1796 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1798 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1800 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1801 - mst->max_state_message_id);
1805 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1812 * Queue a message from a channel slave for sending to the multicast group.
1815 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1816 uint16_t first_ptype, uint16_t last_ptype)
1818 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1820 struct GNUNET_PSYC_MessageMethod *pmeth
1821 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1822 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1823 tmit_msg->id = ++slv->max_request_id;
1829 * Queue PSYC message parts for sending to multicast.
1831 * @param ch Channel to send to.
1832 * @param client Client the message originates from.
1833 * @param data_size Size of @a data.
1834 * @param data Concatenated message parts.
1835 * @param first_ptype First message part type in @a data.
1836 * @param last_ptype Last message part type in @a data.
1839 queue_message (struct Channel *ch,
1840 struct GNUNET_SERVER_Client *client,
1843 uint16_t first_ptype, uint16_t last_ptype)
1845 struct TransmitMessage *
1846 tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
1847 memcpy (&tmit_msg[1], data, data_size);
1848 tmit_msg->client = client;
1849 tmit_msg->size = data_size;
1850 tmit_msg->state = ch->tmit_state;
1852 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
1855 ? master_queue_message ((struct Master *) ch, tmit_msg,
1856 first_ptype, last_ptype)
1857 : slave_queue_message ((struct Slave *) ch, tmit_msg,
1858 first_ptype, last_ptype);
1863 * Cancel transmission of current message.
1865 * @param ch Channel to send to.
1866 * @param client Client the message originates from.
1869 transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1871 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1873 struct GNUNET_MessageHeader msg;
1874 msg.size = htons (sizeof (msg));
1875 msg.type = htons (type);
1877 queue_message (ch, client, sizeof (msg), &msg, type, type);
1878 transmit_message (ch);
1880 /* FIXME: cleanup */
1885 * Incoming message from a master or slave client.
1888 client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1889 const struct GNUNET_MessageHeader *msg)
1892 ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1893 GNUNET_assert (NULL != ch);
1895 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1896 "%p Received message from client.\n", ch);
1897 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
1899 if (GNUNET_YES != ch->ready)
1901 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1902 "%p Channel is not ready, dropping message from client.\n", ch);
1903 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1907 uint16_t size = ntohs (msg->size);
1908 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
1910 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", ch);
1912 transmit_cancel (ch, client);
1913 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1917 uint16_t first_ptype = 0, last_ptype = 0;
1919 == GNUNET_PSYC_check_message_parts (size - sizeof (*msg),
1920 (const char *) &msg[1],
1921 &first_ptype, &last_ptype))
1923 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1924 "%p Received invalid message part from client.\n", ch);
1926 transmit_cancel (ch, client);
1927 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1931 queue_message (ch, client, size - sizeof (*msg), &msg[1],
1932 first_ptype, last_ptype);
1933 transmit_message (ch);
1935 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1940 * Client requests to add a slave to the membership database.
1943 client_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
1944 const struct GNUNET_MessageHeader *msg)
1951 * Client requests to remove a slave from the membership database.
1954 client_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
1955 const struct GNUNET_MessageHeader *msg)
1962 * Client requests channel history from PSYCstore.
1965 client_story_request (void *cls, struct GNUNET_SERVER_Client *client,
1966 const struct GNUNET_MessageHeader *msg)
1973 * Client requests best matching state variable from PSYCstore.
1976 client_state_get (void *cls, struct GNUNET_SERVER_Client *client,
1977 const struct GNUNET_MessageHeader *msg)
1984 * Client requests state variables with a given prefix from PSYCstore.
1987 client_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
1988 const struct GNUNET_MessageHeader *msg)
1995 * Initialize the PSYC service.
1997 * @param cls Closure.
1998 * @param server The initialized server.
1999 * @param c Configuration to use.
2002 run (void *cls, struct GNUNET_SERVER_Handle *server,
2003 const struct GNUNET_CONFIGURATION_Handle *c)
2005 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
2006 { &client_master_start, NULL,
2007 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2009 { &client_slave_join, NULL,
2010 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2012 { &client_join_decision, NULL,
2013 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2015 { &client_psyc_message, NULL,
2016 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2018 { &client_slave_add, NULL,
2019 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
2021 { &client_slave_remove, NULL,
2022 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
2024 { &client_story_request, NULL,
2025 GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
2027 { &client_state_get, NULL,
2028 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2030 { &client_state_get_prefix, NULL,
2031 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
2035 store = GNUNET_PSYCSTORE_connect (cfg);
2036 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2037 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2038 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2039 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2040 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2041 nc = GNUNET_SERVER_notification_context_create (server, 1);
2042 GNUNET_SERVER_add_handlers (server, handlers);
2043 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2044 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2045 &shutdown_task, NULL);
2050 * The main function for the service.
2052 * @param argc number of arguments from the command line
2053 * @param argv command line arguments
2054 * @return 0 ok, 1 on error
2057 main (int argc, char *const *argv)
2059 return (GNUNET_OK ==
2060 GNUNET_SERVICE_run (argc, argv, "psyc",
2061 GNUNET_SERVICE_OPTION_NONE,
2062 &run, NULL)) ? 0 : 1;
2065 /* end of gnunet-service-psyc.c */