2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
41 * Handle to our current configuration.
43 static const struct GNUNET_CONFIGURATION_Handle *cfg;
46 * Handle to the statistics service.
48 static struct GNUNET_STATISTICS_Handle *stats;
51 * Notification context, simplifies client broadcasts.
53 static struct GNUNET_SERVER_NotificationContext *nc;
56 * Handle to the PSYCstore.
58 static struct GNUNET_PSYCSTORE_Handle *store;
61 * All connected masters.
62 * Channel's pub_key_hash -> struct Channel
64 static struct GNUNET_CONTAINER_MultiHashMap *masters;
67 * All connected slaves.
68 * Channel's pub_key_hash -> struct Channel
70 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
74 * Message in the transmission queue.
76 struct TransmitMessage
78 struct TransmitMessage *prev;
79 struct TransmitMessage *next;
82 * ID assigned to the message.
92 * @see enum MessageState
96 /* Followed by message */
101 * Cache for received message fragments.
102 * Message fragments are only sent to clients after all modifiers arrived.
104 * chan_key -> MultiHashMap chan_msgs
106 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
110 * Entry in the chan_msgs hashmap of @a recv_cache:
111 * fragment_id -> RecvCacheEntry
113 struct RecvCacheEntry
115 struct GNUNET_MULTICAST_MessageHeader *mmsg;
121 * Entry in the @a recv_frags hash map of a @a Channel.
122 * message_id -> FragmentQueue
127 * Fragment IDs stored in @a recv_cache.
129 struct GNUNET_CONTAINER_Heap *fragments;
132 * Total size of received fragments.
137 * Total size of received header fragments (METHOD & MODIFIERs)
139 uint64_t header_size;
142 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
144 uint64_t state_delta;
147 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
152 * Receive state of message.
154 * @see MessageFragmentState
159 * Is the message queued for delivery to the client?
160 * i.e. added to the recv_msgs queue
167 * Common part of the client context for both a channel master and slave.
171 struct GNUNET_SERVER_Client *client;
173 struct TransmitMessage *tmit_head;
174 struct TransmitMessage *tmit_tail;
177 * Received fragments not yet sent to the client.
178 * message_id -> FragmentQueue
180 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
183 * Received message IDs not yet sent to the client.
185 struct GNUNET_CONTAINER_Heap *recv_msgs;
190 GNUNET_SCHEDULER_TaskIdentifier tmit_task;
193 * Public key of the channel.
195 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
198 * Hash of @a pub_key.
200 struct GNUNET_HashCode pub_key_hash;
203 * Last message ID sent to the client.
204 * 0 if there is no such message.
206 uint64_t max_message_id;
209 * ID of the last stateful message, where the state operations has been
210 * processed and saved to PSYCstore and which has been sent to the client.
211 * 0 if there is no such message.
213 uint64_t max_state_message_id;
216 * Expected value size for the modifier being received from the PSYC service.
218 uint32_t tmit_mod_value_size_expected;
221 * Actual value size for the modifier being received from the PSYC service.
223 uint32_t tmit_mod_value_size;
226 * @see enum MessageState
236 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
241 * Ready to receive messages from client? #GNUNET_YES or #GNUNET_NO
246 * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
248 uint8_t disconnected;
253 * Client context for a channel master.
258 * Channel struct common for Master and Slave
260 struct Channel channel;
263 * Private key of the channel.
265 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
268 * Handle for the multicast origin.
270 struct GNUNET_MULTICAST_Origin *origin;
273 * Transmit handle for multicast.
275 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
278 * Last message ID transmitted to this channel.
280 * Incremented before sending a message, thus the message_id in messages sent
283 uint64_t max_message_id;
286 * ID of the last message with state operations transmitted to the channel.
287 * 0 if there is no such message.
289 uint64_t max_state_message_id;
292 * Maximum group generation transmitted to the channel.
294 uint64_t max_group_generation;
297 * @see enum GNUNET_PSYC_Policy
304 * Client context for a channel slave.
309 * Channel struct common for Master and Slave
311 struct Channel channel;
314 * Private key of the slave.
316 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
319 * Handle for the multicast member.
321 struct GNUNET_MULTICAST_Member *member;
324 * Transmit handle for multicast.
326 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
329 * Peer identity of the origin.
331 struct GNUNET_PeerIdentity origin;
334 * Number of items in @a relays.
336 uint32_t relay_count;
339 * Relays that multicast can use to connect.
341 struct GNUNET_PeerIdentity *relays;
344 * Join request to be transmitted to the master on join.
346 struct GNUNET_MessageHeader *join_req;
349 * Maximum request ID for this channel.
351 uint64_t max_request_id;
356 transmit_message (struct Channel *ch);
360 * Task run during shutdown.
366 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
370 GNUNET_SERVER_notification_context_destroy (nc);
375 GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
382 client_cleanup (struct Channel *ch)
384 /* FIXME: fragment_cache_clear */
388 struct Master *mst = (struct Master *) ch;
389 if (NULL != mst->origin)
390 GNUNET_MULTICAST_origin_stop (mst->origin);
391 GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
395 struct Slave *slv = (struct Slave *) ch;
396 if (NULL != slv->join_req)
397 GNUNET_free (slv->join_req);
398 if (NULL != slv->relays)
399 GNUNET_free (slv->relays);
400 if (NULL != slv->member)
401 GNUNET_MULTICAST_member_part (slv->member);
402 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
410 * Called whenever a client is disconnected.
411 * Frees our resources associated with that client.
413 * @param cls Closure.
414 * @param client Identification of the client.
417 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
423 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
424 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch);
428 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
429 "%p User context is NULL in client_disconnect()\n", ch);
434 ch->disconnected = GNUNET_YES;
436 /* Send pending messages to multicast before cleanup. */
437 if (NULL != ch->tmit_head)
439 transmit_message (ch);
449 * Master receives a join request from a slave.
452 join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
453 const struct GNUNET_MessageHeader *join_req,
454 struct GNUNET_MULTICAST_JoinHandle *jh)
461 membership_test_cb (void *cls,
462 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
463 uint64_t message_id, uint64_t group_generation,
464 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
471 replay_fragment_cb (void *cls,
472 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
473 uint64_t fragment_id, uint64_t flags,
474 struct GNUNET_MULTICAST_ReplayHandle *rh)
481 replay_message_cb (void *cls,
482 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
484 uint64_t fragment_offset,
486 struct GNUNET_MULTICAST_ReplayHandle *rh)
493 fragment_store_result (void *cls, int64_t result, const char *err_msg)
495 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
496 "fragment_store() returned %l (%s)\n", result, err_msg);
501 message_to_client (struct Channel *ch,
502 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
504 uint16_t size = ntohs (mmsg->header.size);
505 struct GNUNET_PSYC_MessageHeader *pmsg;
506 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
508 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
509 "%p Sending message to client. "
510 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
511 ch, GNUNET_ntohll (mmsg->fragment_id),
512 GNUNET_ntohll (mmsg->message_id));
514 pmsg = GNUNET_malloc (psize);
515 pmsg->header.size = htons (psize);
516 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
517 pmsg->message_id = mmsg->message_id;
519 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
521 GNUNET_SERVER_notification_context_add (nc, ch->client);
522 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
523 (const struct GNUNET_MessageHeader *) pmsg,
530 * Convert an uint64_t in network byte order to a HashCode
531 * that can be used as key in a MultiHashMap
534 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
536 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
537 /* TODO: use built-in byte swap functions if available */
539 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
540 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
542 *key = (struct GNUNET_HashCode) {{ 0 }};
544 = (n << 32) | (n >> 32);
549 * Convert an uint64_t in host byte order to a HashCode
550 * that can be used as key in a MultiHashMap
553 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
555 #if __BYTE_ORDER == __BIG_ENDIAN
556 hash_key_from_nll (key, n);
557 #elif __BYTE_ORDER == __LITTLE_ENDIAN
558 *key = (struct GNUNET_HashCode) {{ 0 }};
559 *((uint64_t *) key) = n;
561 #error byteorder undefined
567 * Insert a multicast message fragment into the queue belonging to the message.
570 * @param mmsg Multicast message fragment.
571 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
572 * @param first_ptype First PSYC message part type in @a mmsg.
573 * @param last_ptype Last PSYC message part type in @a mmsg.
576 fragment_queue_insert (struct Channel *ch,
577 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
578 uint16_t first_ptype, uint16_t last_ptype)
580 const uint16_t size = ntohs (mmsg->header.size);
581 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
582 struct GNUNET_CONTAINER_MultiHashMap
583 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
586 struct GNUNET_HashCode msg_id_hash;
587 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
590 *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
594 fragq = GNUNET_new (struct FragmentQueue);
595 fragq->state = MSG_FRAG_STATE_HEADER;
597 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
599 GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq,
600 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
602 if (NULL == chan_msgs)
604 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
605 GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs,
606 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
610 struct GNUNET_HashCode frag_id_hash;
611 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
612 struct RecvCacheEntry
613 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
614 if (NULL == cache_entry)
616 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
617 "%p Adding message fragment to cache. "
618 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
619 "header_size: %" PRIu64 " + %u).\n",
620 ch, GNUNET_ntohll (mmsg->message_id),
621 GNUNET_ntohll (mmsg->fragment_id),
622 fragq->header_size, size);
623 cache_entry = GNUNET_new (struct RecvCacheEntry);
624 cache_entry->ref_count = 1;
625 cache_entry->mmsg = GNUNET_malloc (size);
626 memcpy (cache_entry->mmsg, mmsg, size);
627 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
628 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
632 cache_entry->ref_count++;
633 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
634 "%p Message fragment is already in cache. "
635 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
637 ch, GNUNET_ntohll (mmsg->message_id),
638 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
641 if (MSG_FRAG_STATE_HEADER == fragq->state)
643 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
645 struct GNUNET_PSYC_MessageMethod *
646 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
647 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
648 fragq->flags = ntohl (pmeth->flags);
651 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
653 fragq->header_size += size;
655 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
656 || frag_offset == fragq->header_size)
657 { /* header is now complete */
658 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
659 "%p Header of message %" PRIu64 " is complete.\n",
660 ch, GNUNET_ntohll (mmsg->message_id));
662 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
663 "%p Adding message %" PRIu64 " to queue.\n",
664 ch, GNUNET_ntohll (mmsg->message_id));
665 fragq->state = MSG_FRAG_STATE_DATA;
669 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
670 "%p Header of message %" PRIu64 " is NOT complete yet: "
671 "%" PRIu64 " != %" PRIu64 "\n",
672 ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
679 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
680 if (frag_offset == fragq->size)
681 fragq->state = MSG_FRAG_STATE_END;
683 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
684 "%p Message %" PRIu64 " is NOT complete yet: "
685 "%" PRIu64 " != %" PRIu64 "\n",
686 ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
690 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
691 /* Drop message without delivering to client if it's a single fragment */
693 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
694 ? MSG_FRAG_STATE_DROP
695 : MSG_FRAG_STATE_CANCEL;
698 switch (fragq->state)
700 case MSG_FRAG_STATE_DATA:
701 case MSG_FRAG_STATE_END:
702 case MSG_FRAG_STATE_CANCEL:
703 if (GNUNET_NO == fragq->queued)
705 GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL,
706 GNUNET_ntohll (mmsg->message_id));
707 fragq->queued = GNUNET_YES;
712 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
713 GNUNET_ntohll (mmsg->fragment_id));
718 * Run fragment queue of a message.
720 * Send fragments of a message in order to client, after all modifiers arrived
724 * @param msg_id ID of the message @a fragq belongs to.
725 * @param fragq Fragment queue of the message.
726 * @param drop Drop message without delivering to client?
727 * #GNUNET_YES or #GNUNET_NO.
730 fragment_queue_run (struct Channel *ch, uint64_t msg_id,
731 struct FragmentQueue *fragq, uint8_t drop)
733 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
734 "%p Running message fragment queue for message %" PRIu64
736 ch, msg_id, fragq->state);
738 struct GNUNET_CONTAINER_MultiHashMap
739 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
741 GNUNET_assert (NULL != chan_msgs);
744 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
747 struct GNUNET_HashCode frag_id_hash;
748 hash_key_from_hll (&frag_id_hash, frag_id);
749 struct RecvCacheEntry *cache_entry
750 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
751 if (cache_entry != NULL)
753 if (GNUNET_NO == drop)
755 message_to_client (ch, cache_entry->mmsg);
757 if (cache_entry->ref_count <= 1)
759 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
761 GNUNET_free (cache_entry->mmsg);
762 GNUNET_free (cache_entry);
766 cache_entry->ref_count--;
769 #if CACHE_AGING_IMPLEMENTED
770 else if (GNUNET_NO == drop)
772 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
776 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
779 if (MSG_FRAG_STATE_END <= fragq->state)
781 struct GNUNET_HashCode msg_id_hash;
782 hash_key_from_nll (&msg_id_hash, msg_id);
784 GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq);
785 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
790 fragq->queued = GNUNET_NO;
798 * Send messages in queue to client in order after a message has arrived from
799 * multicast, according to the following:
800 * - A message is only sent if all of its modifiers arrived.
801 * - A stateful message is only sent if the previous stateful message
802 * has already been delivered to the client.
805 * @return Number of messages removed from queue and sent to client.
808 message_queue_run (struct Channel *ch)
810 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
811 "%p Running message queue.\n", ch);
814 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
817 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
818 "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id);
819 struct GNUNET_HashCode msg_id_hash;
820 hash_key_from_hll (&msg_id_hash, msg_id);
822 struct FragmentQueue *
823 fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
825 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
827 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
828 "%p No fragq (%p) or header not complete.\n",
833 if (MSG_FRAG_STATE_HEADER == fragq->state)
835 /* Check if there's a missing message before the current one */
836 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
838 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
839 && msg_id - 1 != ch->max_message_id)
841 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
842 "%p Out of order message. "
843 "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
844 ch, msg_id, ch->max_message_id);
850 if (msg_id - fragq->state_delta != ch->max_state_message_id)
852 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
853 "%p Out of order stateful message. "
854 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
855 ch, msg_id, fragq->state_delta, ch->max_state_message_id);
859 /* FIXME: apply modifiers to state in PSYCstore */
860 GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id,
861 state_modify_result_cb, cls);
863 ch->max_state_message_id = msg_id;
865 ch->max_message_id = msg_id;
867 fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
868 GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
871 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872 "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
878 * Handle incoming message from multicast.
881 * @param mmsg Multicast message.
883 * @return #GNUNET_OK or #GNUNET_SYSERR
886 handle_multicast_message (struct Channel *ch,
887 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
889 GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL);
891 uint16_t size = ntohs (mmsg->header.size);
892 uint16_t first_ptype = 0, last_ptype = 0;
895 == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
896 (const char *) &mmsg[1],
897 &first_ptype, &last_ptype))
899 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
900 "%p Received message with invalid parts from multicast. "
901 "Dropping message.\n", ch);
903 return GNUNET_SYSERR;
906 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
907 "Message parts: first: type %u, last: type %u\n",
908 first_ptype, last_ptype);
910 fragment_queue_insert (ch, mmsg, first_ptype, last_ptype);
911 message_queue_run (ch);
918 * Incoming message fragment from multicast.
920 * Store it using PSYCstore and send it to the client of the channel.
923 message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
925 struct Channel *ch = cls;
926 uint16_t type = ntohs (msg->type);
927 uint16_t size = ntohs (msg->size);
929 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930 "%p Received message of type %u and size %u from multicast.\n",
935 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
937 handle_multicast_message (ch, (const struct
938 GNUNET_MULTICAST_MessageHeader *) msg);
942 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
943 "%p Dropping unknown message of type %u and size %u.\n",
950 * Incoming request fragment from multicast for a master.
953 * @param slave_key Sending slave's public key.
954 * @param msg The message.
955 * @param flags Request flags.
958 request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
959 const struct GNUNET_MessageHeader *msg,
960 enum GNUNET_MULTICAST_MessageFlags flags)
962 struct Master *mst = cls;
963 struct Channel *ch = &mst->channel;
965 uint16_t type = ntohs (msg->type);
966 uint16_t size = ntohs (msg->size);
968 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969 "%p Received request of type %u and size %u from multicast.\n",
974 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
976 const struct GNUNET_MULTICAST_RequestHeader *req
977 = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
979 /* FIXME: see message_cb() */
980 if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req),
981 (const char *) &req[1],
984 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
985 "%p Dropping request with invalid parts "
986 "received from multicast.\n", ch);
991 struct GNUNET_PSYC_MessageHeader *pmsg;
992 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
993 pmsg = GNUNET_malloc (psize);
994 pmsg->header.size = htons (psize);
995 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
996 pmsg->message_id = req->request_id;
997 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
999 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1001 GNUNET_SERVER_notification_context_add (nc, ch->client);
1002 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
1003 (const struct GNUNET_MessageHeader *) pmsg,
1009 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010 "%p Dropping unknown request of type %u and size %u.\n",
1012 GNUNET_break_op (0);
1018 * Response from PSYCstore with the current counter values for a channel master.
1021 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1022 uint64_t max_message_id, uint64_t max_group_generation,
1023 uint64_t max_state_message_id)
1025 struct Master *mst = cls;
1026 struct Channel *ch = &mst->channel;
1028 struct CountersResult *res = GNUNET_malloc (sizeof (*res));
1029 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1030 res->header.size = htons (sizeof (*res));
1031 res->result_code = htonl (result);
1032 res->max_message_id = GNUNET_htonll (max_message_id);
1034 if (GNUNET_OK == result || GNUNET_NO == result)
1036 mst->max_message_id = max_message_id;
1037 ch->max_message_id = max_message_id;
1038 ch->max_state_message_id = max_state_message_id;
1039 mst->max_group_generation = max_group_generation;
1041 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
1043 join_cb, membership_test_cb,
1044 replay_fragment_cb, replay_message_cb,
1045 request_cb, message_cb, ch);
1046 ch->ready = GNUNET_YES;
1050 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1051 "%p GNUNET_PSYCSTORE_counters_get() "
1052 "returned %d for channel %s.\n",
1053 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1056 GNUNET_SERVER_notification_context_add (nc, ch->client);
1057 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
1064 * Response from PSYCstore with the current counter values for a channel slave.
1067 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1068 uint64_t max_message_id, uint64_t max_group_generation,
1069 uint64_t max_state_message_id)
1071 struct Slave *slv = cls;
1072 struct Channel *ch = &slv->channel;
1074 struct CountersResult *res = GNUNET_malloc (sizeof (*res));
1075 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1076 res->header.size = htons (sizeof (*res));
1077 res->result_code = htonl (result);
1078 res->max_message_id = GNUNET_htonll (max_message_id);
1080 if (GNUNET_OK == result || GNUNET_NO == result)
1082 ch->max_message_id = max_message_id;
1083 ch->max_state_message_id = max_state_message_id;
1085 = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
1087 slv->relay_count, slv->relays,
1088 slv->join_req, join_cb,
1090 replay_fragment_cb, replay_message_cb,
1092 ch->ready = GNUNET_YES;
1096 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1097 "%p GNUNET_PSYCSTORE_counters_get() "
1098 "returned %d for channel %s.\n",
1099 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1102 GNUNET_SERVER_notification_context_add (nc, ch->client);
1103 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
1110 channel_init (struct Channel *ch)
1113 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1114 ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1119 * Handle a connecting client starting a channel master.
1122 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1123 const struct GNUNET_MessageHeader *msg)
1125 const struct MasterStartRequest *req
1126 = (const struct MasterStartRequest *) msg;
1128 struct Master *mst = GNUNET_new (struct Master);
1129 mst->policy = ntohl (req->policy);
1130 mst->priv_key = req->channel_key;
1132 struct Channel *ch = &mst->channel;
1133 ch->client = client;
1134 ch->is_master = GNUNET_YES;
1135 GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key);
1136 GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash);
1139 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140 "%p Master connected to channel %s.\n",
1141 mst, GNUNET_h2s (&ch->pub_key_hash));
1143 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst);
1145 GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
1146 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1147 GNUNET_SERVER_client_set_user_context (client, ch);
1148 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1153 * Handle a connecting client joining as a channel slave.
1156 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1157 const struct GNUNET_MessageHeader *msg)
1159 const struct SlaveJoinRequest *req
1160 = (const struct SlaveJoinRequest *) msg;
1161 struct Slave *slv = GNUNET_new (struct Slave);
1162 slv->priv_key = req->slave_key;
1163 slv->origin = req->origin;
1164 slv->relay_count = ntohl (req->relay_count);
1165 if (0 < slv->relay_count)
1167 const struct GNUNET_PeerIdentity *relays
1168 = (const struct GNUNET_PeerIdentity *) &req[1];
1170 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
1172 for (i = 0; i < slv->relay_count; i++)
1173 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
1176 struct Channel *ch = &slv->channel;
1177 ch->client = client;
1178 ch->is_master = GNUNET_NO;
1179 ch->pub_key = req->channel_key;
1180 GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
1184 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185 "%p Slave connected to channel %s.\n",
1186 slv, GNUNET_h2s (&ch->pub_key_hash));
1188 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv);
1190 GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
1191 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1192 GNUNET_SERVER_client_set_user_context (client, &slv->channel);
1193 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1198 * Send acknowledgement to a client.
1200 * Sent after a message fragment has been passed on to multicast.
1202 * @param ch The channel struct for the client.
1205 send_message_ack (struct Channel *ch)
1207 struct GNUNET_MessageHeader res;
1208 res.size = htons (sizeof (res));
1209 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1211 GNUNET_SERVER_notification_context_add (nc, ch->client);
1212 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, GNUNET_NO);
1217 * Callback for the transmit functions of multicast.
1220 transmit_notify (void *cls, size_t *data_size, void *data)
1222 struct Channel *ch = cls;
1223 struct TransmitMessage *tmit_msg = ch->tmit_head;
1225 if (NULL == tmit_msg || *data_size < tmit_msg->size)
1227 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1228 "%p transmit_notify: nothing to send.\n", ch);
1233 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1234 "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
1236 *data_size = tmit_msg->size;
1237 memcpy (data, &tmit_msg[1], *data_size);
1239 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
1240 GNUNET_free (tmit_msg);
1242 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
1243 send_message_ack (ch);
1245 if (0 == ch->tmit_task)
1247 if (NULL != ch->tmit_head)
1249 transmit_message (ch);
1251 else if (ch->disconnected)
1253 /* FIXME: handle partial message (when still in_transmit) */
1254 client_cleanup (ch);
1263 * Callback for the transmit functions of multicast.
1266 master_transmit_notify (void *cls, size_t *data_size, void *data)
1268 int ret = transmit_notify (cls, data_size, data);
1270 if (GNUNET_YES == ret)
1272 struct Master *mst = cls;
1273 mst->tmit_handle = NULL;
1280 * Callback for the transmit functions of multicast.
1283 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1285 int ret = transmit_notify (cls, data_size, data);
1287 if (GNUNET_YES == ret)
1289 struct Slave *slv = cls;
1290 slv->tmit_handle = NULL;
1297 * Transmit a message from a channel master to the multicast group.
1300 master_transmit_message (struct Master *mst)
1302 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
1303 mst->channel.tmit_task = 0;
1304 if (NULL == mst->tmit_handle)
1307 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1308 mst->max_group_generation,
1309 master_transmit_notify, mst);
1313 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1319 * Transmit a message from a channel slave to the multicast group.
1322 slave_transmit_message (struct Slave *slv)
1324 slv->channel.tmit_task = 0;
1325 if (NULL == slv->tmit_handle)
1328 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1329 slave_transmit_notify, slv);
1333 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1339 transmit_message (struct Channel *ch)
1342 ? master_transmit_message ((struct Master *) ch)
1343 : slave_transmit_message ((struct Slave *) ch);
1348 * Queue a message from a channel master for sending to the multicast group.
1351 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1352 uint16_t first_ptype, uint16_t last_ptype)
1354 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1356 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1358 tmit_msg->id = ++mst->max_message_id;
1359 struct GNUNET_PSYC_MessageMethod *pmeth
1360 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1362 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1364 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1366 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1368 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1369 - mst->max_state_message_id);
1373 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1380 * Queue a message from a channel slave for sending to the multicast group.
1383 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1384 uint16_t first_ptype, uint16_t last_ptype)
1386 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1388 struct GNUNET_PSYC_MessageMethod *pmeth
1389 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1390 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1391 tmit_msg->id = ++slv->max_request_id;
1397 queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg,
1398 uint16_t first_ptype, uint16_t last_ptype)
1400 uint16_t size = ntohs (msg->size) - sizeof (*msg);
1401 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
1402 memcpy (&tmit_msg[1], &msg[1], size);
1403 tmit_msg->size = size;
1404 tmit_msg->state = ch->tmit_state;
1406 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
1409 ? master_queue_message ((struct Master *) ch, tmit_msg,
1410 first_ptype, last_ptype)
1411 : slave_queue_message ((struct Slave *) ch, tmit_msg,
1412 first_ptype, last_ptype);
1417 transmit_error (struct Channel *ch)
1419 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1421 struct GNUNET_MessageHeader msg;
1422 msg.size = ntohs (sizeof (msg));
1423 msg.type = ntohs (type);
1425 queue_message (ch, &msg, type, type);
1426 transmit_message (ch);
1428 /* FIXME: cleanup */
1433 * Incoming message from a client.
1436 handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1437 const struct GNUNET_MessageHeader *msg)
1440 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1441 GNUNET_assert (NULL != ch);
1443 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1444 "%p Received message from client.\n", ch);
1445 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
1447 if (GNUNET_YES != ch->ready)
1449 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1450 "%p Dropping message from client, channel is not ready yet.\n",
1452 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1456 uint16_t size = ntohs (msg->size);
1457 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
1459 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
1461 transmit_error (ch);
1462 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1466 uint16_t first_ptype = 0, last_ptype = 0;
1468 == GNUNET_PSYC_check_message_parts (size - sizeof (*msg),
1469 (const char *) &msg[1],
1470 &first_ptype, &last_ptype))
1472 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1473 "%p Received invalid message part from client.\n", ch);
1475 transmit_error (ch);
1476 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1480 queue_message (ch, msg, first_ptype, last_ptype);
1481 transmit_message (ch);
1483 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1488 * Client requests to add a slave to the membership database.
1491 handle_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
1492 const struct GNUNET_MessageHeader *msg)
1499 * Client requests to remove a slave from the membership database.
1502 handle_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
1503 const struct GNUNET_MessageHeader *msg)
1510 * Client requests channel history from PSYCstore.
1513 handle_story_request (void *cls, struct GNUNET_SERVER_Client *client,
1514 const struct GNUNET_MessageHeader *msg)
1521 * Client requests best matching state variable from PSYCstore.
1524 handle_state_get (void *cls, struct GNUNET_SERVER_Client *client,
1525 const struct GNUNET_MessageHeader *msg)
1532 * Client requests state variables with a given prefix from PSYCstore.
1535 handle_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
1536 const struct GNUNET_MessageHeader *msg)
1543 * Initialize the PSYC service.
1545 * @param cls Closure.
1546 * @param server The initialized server.
1547 * @param c Configuration to use.
1550 run (void *cls, struct GNUNET_SERVER_Handle *server,
1551 const struct GNUNET_CONFIGURATION_Handle *c)
1553 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1554 { &handle_master_start, NULL,
1555 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
1557 { &handle_slave_join, NULL,
1558 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
1560 { &handle_psyc_message, NULL,
1561 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
1563 { &handle_slave_add, NULL,
1564 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
1566 { &handle_slave_remove, NULL,
1567 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
1569 { &handle_story_request, NULL,
1570 GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
1572 { &handle_state_get, NULL,
1573 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
1575 { &handle_state_get_prefix, NULL,
1576 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
1580 store = GNUNET_PSYCSTORE_connect (cfg);
1581 stats = GNUNET_STATISTICS_create ("psyc", cfg);
1582 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1583 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1584 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1585 nc = GNUNET_SERVER_notification_context_create (server, 1);
1586 GNUNET_SERVER_add_handlers (server, handlers);
1587 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
1588 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1589 &shutdown_task, NULL);
1594 * The main function for the service.
1596 * @param argc number of arguments from the command line
1597 * @param argv command line arguments
1598 * @return 0 ok, 1 on error
1601 main (int argc, char *const *argv)
1603 return (GNUNET_OK ==
1604 GNUNET_SERVICE_run (argc, argv, "psyc",
1605 GNUNET_SERVICE_OPTION_NONE,
1606 &run, NULL)) ? 0 : 1;
1609 /* end of gnunet-service-psycstore.c */