2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
41 * Handle to our current configuration.
43 static const struct GNUNET_CONFIGURATION_Handle *cfg;
46 * Handle to the statistics service.
48 static struct GNUNET_STATISTICS_Handle *stats;
51 * Notification context, simplifies client broadcasts.
53 static struct GNUNET_SERVER_NotificationContext *nc;
56 * Handle to the PSYCstore.
58 static struct GNUNET_PSYCSTORE_Handle *store;
61 * All connected masters and slaves.
62 * Channel's pub_key_hash -> struct Channel
64 static struct GNUNET_CONTAINER_MultiHashMap *clients;
68 * Message in the transmission queue.
70 struct TransmitMessage
72 struct TransmitMessage *prev;
73 struct TransmitMessage *next;
76 * Buffer with message to be transmitted.
86 * @see enum MessageState
93 * Cache for received message fragments.
94 * Message fragments are only sent to clients after all modifiers arrived.
96 * chan_key -> MultiHashMap chan_msgs
98 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
102 * Entry in the chan_msgs hashmap of @a recv_cache:
103 * fragment_id -> FragmentEntry
107 struct GNUNET_MULTICAST_MessageHeader *mmsg;
113 * Entry in the @a recv_msgs hash map of a @a Channel.
114 * message_id -> FragmentCache
119 * Total size of header fragments (METHOD & MODIFIERs)
121 uint64_t header_size;
124 * Fragment IDs stored in @a recv_cache.
126 struct GNUNET_CONTAINER_Heap *fragments;
131 * Common part of the client context for both a master and slave channel.
135 struct GNUNET_SERVER_Client *client;
137 struct TransmitMessage *tmit_head;
138 struct TransmitMessage *tmit_tail;
141 * Received fragments not yet sent to the client.
142 * message_id -> FragmentCache
144 struct GNUNET_CONTAINER_MultiHashMap *recv_msgs;
149 GNUNET_SCHEDULER_TaskIdentifier tmit_task;
152 * Public key of the channel.
154 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
157 * Hash of @a pub_key.
159 struct GNUNET_HashCode pub_key_hash;
162 * Expected value size for the modifier being received from the PSYC service.
164 uint32_t tmit_mod_value_size_expected;
167 * Actual value size for the modifier being received from the PSYC service.
169 uint32_t tmit_mod_value_size;
172 * @see enum MessageState
182 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
187 * Ready to receive messages from client? #GNUNET_YES or #GNUNET_NO
192 * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
194 uint8_t disconnected;
199 * Client context for a channel master.
204 * Channel struct common for Master and Slave
206 struct Channel channel;
209 * Private key of the channel.
211 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
214 * Handle for the multicast origin.
216 struct GNUNET_MULTICAST_Origin *origin;
219 * Transmit handle for multicast.
221 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
224 * Maximum message ID for this channel.
226 * Incremented before sending a message, thus the message_id in messages sent
229 uint64_t max_message_id;
232 * ID of the last message that contains any state operations.
233 * 0 if there is no such message.
235 uint64_t max_state_message_id;
238 * Maximum group generation for this channel.
240 uint64_t max_group_generation;
243 * @see enum GNUNET_PSYC_Policy
250 * Client context for a channel slave.
255 * Channel struct common for Master and Slave
257 struct Channel channel;
260 * Private key of the slave.
262 struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
265 * Handle for the multicast member.
267 struct GNUNET_MULTICAST_Member *member;
270 * Transmit handle for multicast.
272 struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
275 * Peer identity of the origin.
277 struct GNUNET_PeerIdentity origin;
280 * Number of items in @a relays.
282 uint32_t relay_count;
285 * Relays that multicast can use to connect.
287 struct GNUNET_PeerIdentity *relays;
290 * Join request to be transmitted to the master on join.
292 struct GNUNET_MessageHeader *join_req;
295 * Maximum message ID for this channel.
297 uint64_t max_message_id;
300 * Maximum request ID for this channel.
302 uint64_t max_request_id;
307 transmit_message (struct Channel *ch, uint8_t inc_msg_id);
311 * Task run during shutdown.
317 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
321 GNUNET_SERVER_notification_context_destroy (nc);
326 GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
333 client_cleanup (struct Channel *ch)
335 /* FIXME: fragment_cache_clear */
339 struct Master *mst = (struct Master *) ch;
340 if (NULL != mst->origin)
341 GNUNET_MULTICAST_origin_stop (mst->origin);
342 GNUNET_CONTAINER_multihashmap_remove (clients, &ch->pub_key_hash, mst);
346 struct Slave *slv = (struct Slave *) ch;
347 if (NULL != slv->join_req)
348 GNUNET_free (slv->join_req);
349 if (NULL != slv->relays)
350 GNUNET_free (slv->relays);
351 if (NULL != slv->member)
352 GNUNET_MULTICAST_member_part (slv->member);
360 * Called whenever a client is disconnected.
361 * Frees our resources associated with that client.
363 * @param cls Closure.
364 * @param client Identification of the client.
367 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
373 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
374 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch);
378 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
379 "%p User context is NULL in client_disconnect()\n", ch);
384 ch->disconnected = GNUNET_YES;
386 /* Send pending messages to multicast before cleanup. */
387 if (NULL != ch->tmit_head)
389 transmit_message (ch, GNUNET_NO);
399 * Master receives a join request from a slave.
402 join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
403 const struct GNUNET_MessageHeader *join_req,
404 struct GNUNET_MULTICAST_JoinHandle *jh)
411 membership_test_cb (void *cls,
412 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
413 uint64_t message_id, uint64_t group_generation,
414 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
421 replay_fragment_cb (void *cls,
422 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
423 uint64_t fragment_id, uint64_t flags,
424 struct GNUNET_MULTICAST_ReplayHandle *rh)
431 replay_message_cb (void *cls,
432 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
434 uint64_t fragment_offset,
436 struct GNUNET_MULTICAST_ReplayHandle *rh)
443 fragment_store_result (void *cls, int64_t result, const char *err_msg)
445 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
446 "fragment_store() returned %l (%s)\n", result, err_msg);
451 message_to_client (struct Channel *ch,
452 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
454 uint16_t size = ntohs (mmsg->header.size);
455 struct GNUNET_PSYC_MessageHeader *pmsg;
456 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
458 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
459 "%p Sending message to client. "
460 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
461 ch, GNUNET_ntohll (mmsg->fragment_id),
462 GNUNET_ntohll (mmsg->message_id));
464 pmsg = GNUNET_malloc (psize);
465 pmsg->header.size = htons (psize);
466 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
467 pmsg->message_id = mmsg->message_id;
469 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
471 GNUNET_SERVER_notification_context_add (nc, ch->client);
472 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
473 (const struct GNUNET_MessageHeader *) pmsg,
480 * Convert an uint64_t in network byte order to a HashCode
481 * that can be used as key in a MultiHashMap
484 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
486 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
488 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
489 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
491 *key = (struct GNUNET_HashCode) {{ 0 }};
493 = (n << 32) | (n >> 32);
498 * Convert an uint64_t in host byte order to a HashCode
499 * that can be used as key in a MultiHashMap
502 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
504 #if __BYTE_ORDER == __BIG_ENDIAN
505 hash_key_from_nll (key, n);
506 #elif __BYTE_ORDER == __LITTLE_ENDIAN
507 *key = (struct GNUNET_HashCode) {{ 0 }};
508 *((uint64_t *) key) = n;
510 #error byteorder undefined
516 fragment_cache_insert (struct Channel *ch,
517 const struct GNUNET_HashCode *msg_id,
518 struct FragmentCache *frag_cache,
519 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
520 uint16_t last_part_type)
522 uint16_t size = ntohs (mmsg->header.size);
523 struct GNUNET_CONTAINER_MultiHashMap
524 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
527 if (NULL == frag_cache)
529 frag_cache = GNUNET_new (struct FragmentCache);
530 frag_cache->fragments
531 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
533 if (NULL == ch->recv_msgs)
535 ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
537 GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache,
538 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
540 if (NULL == chan_msgs)
542 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
543 GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs,
544 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
548 struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode);
549 hash_key_from_nll (frag_id, mmsg->fragment_id);
551 *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
552 if (NULL == frag_entry)
554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555 "%p Adding message fragment to cache. "
556 "fragment_id: %" PRIu64 ", "
557 "header_size: %" PRIu64 " + %" PRIu64 ").\n",
558 ch, GNUNET_ntohll (mmsg->fragment_id),
559 frag_cache->header_size, size);
560 frag_entry = GNUNET_new (struct FragmentEntry);
561 frag_entry->ref_count = 1;
562 frag_entry->mmsg = GNUNET_malloc (size);
563 memcpy (frag_entry->mmsg, mmsg, size);
564 GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry,
565 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
569 frag_entry->ref_count++;
570 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
571 "%p Message fragment already in cache. "
572 "fragment_id: %" PRIu64 ", ref_count: %u\n",
573 ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count);
576 switch (last_part_type)
578 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
579 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
580 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
581 frag_cache->header_size += size;
583 GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id,
584 GNUNET_ntohll (mmsg->fragment_id));
589 fragment_cache_clear (struct Channel *ch,
590 const struct GNUNET_HashCode *msg_id,
591 struct FragmentCache *frag_cache,
592 uint8_t send_to_client)
594 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
595 "%p Clearing message fragment cache.\n", ch);
597 struct GNUNET_CONTAINER_MultiHashMap
598 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
600 GNUNET_assert (NULL != chan_msgs);
601 struct GNUNET_HashCode *frag_id;
603 while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments)))
606 *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
607 if (frag_entry != NULL)
609 if (GNUNET_YES == send_to_client)
611 message_to_client (ch, frag_entry->mmsg);
613 if (1 == frag_entry->ref_count)
615 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry);
616 GNUNET_free (frag_entry->mmsg);
617 GNUNET_free (frag_entry);
621 frag_entry->ref_count--;
624 GNUNET_free (frag_id);
627 GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache);
628 GNUNET_CONTAINER_heap_destroy (frag_cache->fragments);
629 GNUNET_free (frag_cache);
634 * Incoming message fragment from multicast.
636 * Store it using PSYCstore and send it to the client of the channel.
639 message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
641 struct Channel *ch = cls;
642 uint16_t type = ntohs (msg->type);
643 uint16_t size = ntohs (msg->size);
645 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
646 "%p Received message of type %u and size %u from multicast.\n",
651 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
653 GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key,
655 GNUNET_MULTICAST_MessageHeader *) msg,
659 /* FIXME: apply modifiers to state in PSYCstore */
660 GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key,
661 GNUNET_ntohll (mmsg->message_id),
662 meth->mod_count, mods,
666 const struct GNUNET_MULTICAST_MessageHeader
667 *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
669 uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg),
670 (const char *) &mmsg[1]);
671 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
672 "Last message part type %u\n", ptype);
674 if (GNUNET_NO == ptype)
676 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
677 "%p Received message with invalid parts from multicast. "
678 "Dropping message.\n", ch);
683 struct GNUNET_HashCode msg_id;
684 hash_key_from_nll (&msg_id, mmsg->message_id);
686 struct FragmentCache *frag_cache = NULL;
687 if (NULL != ch->recv_msgs)
688 frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id);
692 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
693 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
694 /* FIXME: check state flag / max_state_message_id */
695 if (NULL == frag_cache)
697 message_to_client (ch, mmsg);
702 if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size)
703 { /* first data fragment after the header, send cached fragments */
704 fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_YES);
705 message_to_client (ch, mmsg);
709 { /* still missing fragments from the header, cache data fragment */
714 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
715 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
716 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
717 /* not all modifiers arrived yet, cache fragment */
718 fragment_cache_insert (ch, &msg_id, frag_cache, mmsg, ptype);
721 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
722 if (NULL != frag_cache)
723 { /* fragments not yet sent to client, remove from cache */
724 fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_NO);
728 message_to_client (ch, mmsg);
735 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
736 "%p Dropping unknown message of type %u and size %u.\n",
743 * Incoming request fragment from multicast for a master.
746 * @param slave_key Sending slave's public key.
747 * @param msg The message.
748 * @param flags Request flags.
751 request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
752 const struct GNUNET_MessageHeader *msg,
753 enum GNUNET_MULTICAST_MessageFlags flags)
755 struct Master *mst = cls;
756 struct Channel *ch = &mst->channel;
758 uint16_t type = ntohs (msg->type);
759 uint16_t size = ntohs (msg->size);
761 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762 "%p Received request of type %u and size %u from multicast.\n",
767 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
769 const struct GNUNET_MULTICAST_RequestHeader *req
770 = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
772 /* FIXME: see message_cb() */
773 if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req),
774 (const char *) &req[1]))
776 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
777 "%p Dropping message with invalid parts "
778 "received from multicast.\n", ch);
783 struct GNUNET_PSYC_MessageHeader *pmsg;
784 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
785 pmsg = GNUNET_malloc (psize);
786 pmsg->header.size = htons (psize);
787 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
788 pmsg->message_id = req->request_id;
789 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
791 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
793 GNUNET_SERVER_notification_context_add (nc, ch->client);
794 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
795 (const struct GNUNET_MessageHeader *) pmsg,
801 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
802 "%p Dropping unknown request of type %u and size %u.\n",
810 * Response from PSYCstore with the current counter values for a channel master.
813 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
814 uint64_t max_message_id, uint64_t max_group_generation,
815 uint64_t max_state_message_id)
817 struct Master *mst = cls;
818 struct Channel *ch = &mst->channel;
819 struct CountersResult *res = GNUNET_malloc (sizeof (*res));
820 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
821 res->header.size = htons (sizeof (*res));
822 res->result_code = htonl (result);
823 res->max_message_id = GNUNET_htonll (max_message_id);
825 if (GNUNET_OK == result || GNUNET_NO == result)
827 mst->max_message_id = max_message_id;
828 mst->max_state_message_id = max_state_message_id;
829 mst->max_group_generation = max_group_generation;
831 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
833 join_cb, membership_test_cb,
834 replay_fragment_cb, replay_message_cb,
835 request_cb, message_cb, ch);
836 ch->ready = GNUNET_YES;
838 GNUNET_SERVER_notification_context_add (nc, ch->client);
839 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
846 * Response from PSYCstore with the current counter values for a channel slave.
849 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
850 uint64_t max_message_id, uint64_t max_group_generation,
851 uint64_t max_state_message_id)
853 struct Slave *slv = cls;
854 struct Channel *ch = &slv->channel;
855 struct CountersResult *res = GNUNET_malloc (sizeof (*res));
856 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
857 res->header.size = htons (sizeof (*res));
858 res->result_code = htonl (result);
859 res->max_message_id = GNUNET_htonll (max_message_id);
861 if (GNUNET_OK == result || GNUNET_NO == result)
863 slv->max_message_id = max_message_id;
865 = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->slave_key,
867 slv->relay_count, slv->relays,
868 slv->join_req, join_cb,
870 replay_fragment_cb, replay_message_cb,
872 ch->ready = GNUNET_YES;
875 GNUNET_SERVER_notification_context_add (nc, ch->client);
876 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
883 * Handle a connecting client starting a channel master.
886 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
887 const struct GNUNET_MessageHeader *msg)
889 const struct MasterStartRequest *req
890 = (const struct MasterStartRequest *) msg;
891 struct Master *mst = GNUNET_new (struct Master);
892 struct Channel *ch = &mst->channel;
894 ch->is_master = GNUNET_YES;
895 mst->policy = ntohl (req->policy);
896 mst->priv_key = req->channel_key;
897 GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key);
898 GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash);
899 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
900 "%p Master connected to channel %s.\n",
901 mst, GNUNET_h2s (&ch->pub_key_hash));
903 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst);
905 GNUNET_SERVER_client_set_user_context (client, &mst->channel);
906 GNUNET_CONTAINER_multihashmap_put (clients, &ch->pub_key_hash, mst,
907 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
908 GNUNET_SERVER_receive_done (client, GNUNET_OK);
913 * Handle a connecting client joining as a channel slave.
916 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
917 const struct GNUNET_MessageHeader *msg)
919 const struct SlaveJoinRequest *req
920 = (const struct SlaveJoinRequest *) msg;
921 struct Slave *slv = GNUNET_new (struct Slave);
922 struct Channel *ch = &slv->channel;
923 slv->channel.client = client;
924 slv->channel.is_master = GNUNET_NO;
925 slv->slave_key = req->slave_key;
926 ch->pub_key = req->channel_key;
927 GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
929 slv->origin = req->origin;
930 slv->relay_count = ntohl (req->relay_count);
931 if (0 < slv->relay_count)
933 const struct GNUNET_PeerIdentity *relays
934 = (const struct GNUNET_PeerIdentity *) &req[1];
936 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
938 for (i = 0; i < slv->relay_count; i++)
939 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
942 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
943 "%p Slave connected to channel %s.\n",
944 slv, GNUNET_h2s (&ch->pub_key_hash));
946 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv);
948 GNUNET_SERVER_client_set_user_context (client, &slv->channel);
949 GNUNET_SERVER_receive_done (client, GNUNET_OK);
954 * Send acknowledgement to a client.
956 * Sent after a message fragment has been passed on to multicast.
958 * @param ch The channel struct for the client.
961 send_message_ack (struct Channel *ch)
963 struct GNUNET_MessageHeader res;
964 res.size = htons (sizeof (res));
965 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
967 GNUNET_SERVER_notification_context_add (nc, ch->client);
968 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res,
974 * Callback for the transmit functions of multicast.
977 transmit_notify (void *cls, size_t *data_size, void *data)
979 struct Channel *ch = cls;
980 struct TransmitMessage *tmit_msg = ch->tmit_head;
982 if (NULL == tmit_msg || *data_size < tmit_msg->size)
984 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985 "%p transmit_notify: nothing to send.\n", ch);
990 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991 "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
993 *data_size = tmit_msg->size;
994 memcpy (data, tmit_msg->buf, *data_size);
996 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
997 GNUNET_free (tmit_msg);
999 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
1000 send_message_ack (ch);
1002 if (0 == ch->tmit_task)
1004 if (NULL != ch->tmit_head)
1006 transmit_message (ch, GNUNET_NO);
1008 else if (ch->disconnected)
1010 /* FIXME: handle partial message (when still in_transmit) */
1011 client_cleanup (ch);
1020 * Callback for the transmit functions of multicast.
1023 master_transmit_notify (void *cls, size_t *data_size, void *data)
1025 int ret = transmit_notify (cls, data_size, data);
1027 if (GNUNET_YES == ret)
1029 struct Master *mst = cls;
1030 mst->tmit_handle = NULL;
1037 * Callback for the transmit functions of multicast.
1040 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1042 int ret = transmit_notify (cls, data_size, data);
1044 if (GNUNET_YES == ret)
1046 struct Slave *slv = cls;
1047 slv->tmit_handle = NULL;
1054 * Transmit a message from a channel master to the multicast group.
1057 master_transmit_message (struct Master *mst, uint8_t inc_msg_id)
1059 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
1060 mst->channel.tmit_task = 0;
1061 if (NULL == mst->tmit_handle)
1063 if (GNUNET_YES == inc_msg_id)
1064 mst->max_message_id++;
1066 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1067 mst->max_group_generation,
1068 master_transmit_notify, mst);
1072 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1078 * Transmit a message from a channel slave to the multicast group.
1081 slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id)
1083 slv->channel.tmit_task = 0;
1084 if (NULL == slv->tmit_handle)
1086 if (GNUNET_YES == inc_msg_id)
1087 slv->max_message_id++;
1089 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1090 slave_transmit_notify, slv);
1094 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1100 transmit_message (struct Channel *ch, uint8_t inc_msg_id)
1103 ? master_transmit_message ((struct Master *) ch, inc_msg_id)
1104 : slave_transmit_message ((struct Slave *) ch, inc_msg_id);
1109 transmit_error (struct Channel *ch)
1111 struct GNUNET_MessageHeader *msg;
1112 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg)
1114 msg = (struct GNUNET_MessageHeader *) &tmit_msg[1];
1115 msg->size = ntohs (sizeof (*msg));
1116 msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
1118 tmit_msg->buf = (char *) &tmit_msg[1];
1119 tmit_msg->size = sizeof (*msg);
1120 tmit_msg->state = ch->tmit_state;
1121 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
1122 transmit_message (ch, GNUNET_NO);
1124 /* FIXME: cleanup */
1129 * Incoming message from a client.
1132 handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1133 const struct GNUNET_MessageHeader *msg)
1136 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1137 GNUNET_assert (NULL != ch);
1139 if (GNUNET_YES != ch->ready)
1141 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1142 "%p Dropping message from client, channel is not ready yet.\n",
1144 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1148 uint8_t inc_msg_id = GNUNET_NO;
1149 uint16_t size = ntohs (msg->size);
1150 uint16_t psize = 0, ptype = 0, pos = 0;
1152 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
1154 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
1156 transmit_error (ch);
1157 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1161 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1162 "%p Received message from client.\n", ch);
1163 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
1165 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
1167 const struct GNUNET_MessageHeader *pmsg
1168 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
1169 psize = ntohs (pmsg->size);
1170 ptype = ntohs (pmsg->type);
1171 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
1173 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1174 "%p Received invalid message part of type %u and size %u "
1175 "from client.\n", ch, ptype, psize);
1177 transmit_error (ch);
1178 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1181 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1182 "%p Received message part from client.\n", ch);
1183 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1185 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype)
1186 inc_msg_id = GNUNET_YES;
1189 size -= sizeof (*msg);
1190 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
1191 tmit_msg->buf = (char *) &tmit_msg[1];
1192 memcpy (tmit_msg->buf, &msg[1], size);
1193 tmit_msg->size = size;
1194 tmit_msg->state = ch->tmit_state;
1195 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
1196 transmit_message (ch, inc_msg_id);
1198 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1203 * Client requests to add a slave to the membership database.
1206 handle_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
1207 const struct GNUNET_MessageHeader *msg)
1214 * Client requests to remove a slave from the membership database.
1217 handle_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
1218 const struct GNUNET_MessageHeader *msg)
1225 * Client requests channel history from PSYCstore.
1228 handle_story_request (void *cls, struct GNUNET_SERVER_Client *client,
1229 const struct GNUNET_MessageHeader *msg)
1236 * Client requests best matching state variable from PSYCstore.
1239 handle_state_get (void *cls, struct GNUNET_SERVER_Client *client,
1240 const struct GNUNET_MessageHeader *msg)
1247 * Client requests state variables with a given prefix from PSYCstore.
1250 handle_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
1251 const struct GNUNET_MessageHeader *msg)
1258 * Initialize the PSYC service.
1260 * @param cls Closure.
1261 * @param server The initialized server.
1262 * @param c Configuration to use.
1265 run (void *cls, struct GNUNET_SERVER_Handle *server,
1266 const struct GNUNET_CONFIGURATION_Handle *c)
1268 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1269 { &handle_master_start, NULL,
1270 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
1272 { &handle_slave_join, NULL,
1273 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
1275 { &handle_psyc_message, NULL,
1276 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
1278 { &handle_slave_add, NULL,
1279 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
1281 { &handle_slave_remove, NULL,
1282 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
1284 { &handle_story_request, NULL,
1285 GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
1287 { &handle_state_get, NULL,
1288 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
1290 { &handle_state_get_prefix, NULL,
1291 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
1295 store = GNUNET_PSYCSTORE_connect (cfg);
1296 stats = GNUNET_STATISTICS_create ("psyc", cfg);
1297 clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1298 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1299 nc = GNUNET_SERVER_notification_context_create (server, 1);
1300 GNUNET_SERVER_add_handlers (server, handlers);
1301 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
1302 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1303 &shutdown_task, NULL);
1308 * The main function for the service.
1310 * @param argc number of arguments from the command line
1311 * @param argv command line arguments
1312 * @return 0 ok, 1 on error
1315 main (int argc, char *const *argv)
1317 return (GNUNET_OK ==
1318 GNUNET_SERVICE_run (argc, argv, "psyc",
1319 GNUNET_SERVICE_OPTION_NONE,
1320 &run, NULL)) ? 0 : 1;
1323 /* end of gnunet-service-psycstore.c */