2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
28 #include "gnunet_util_lib.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_psycstore_service.h"
34 #include "gnunet_psyc_service.h"
39 * Handle to our current configuration.
41 static const struct GNUNET_CONFIGURATION_Handle *cfg;
44 * Handle to the statistics service.
46 static struct GNUNET_STATISTICS_Handle *stats;
49 * Notification context, simplifies client broadcasts.
51 static struct GNUNET_SERVER_NotificationContext *nc;
54 * Handle to the PSYCstore.
56 static struct GNUNET_PSYCSTORE_Handle *store;
59 * Channel's pub_key_hash -> struct Channel
61 static struct GNUNET_CONTAINER_MultiHashMap *clients;
64 * Message in the transmission queue.
66 struct TransmitMessage
68 struct TransmitMessage *prev;
69 struct TransmitMessage *next;
80 * Common part of the client context for both a master and slave channel.
84 struct GNUNET_SERVER_Client *client;
86 struct TransmitMessage *tmit_head;
87 struct TransmitMessage *tmit_tail;
89 GNUNET_SCHEDULER_TaskIdentifier tmit_task;
92 * Expected value size for the modifier being received from the PSYC service.
94 uint32_t tmit_mod_value_size_expected;
97 * Actual value size for the modifier being received from the PSYC service.
99 uint32_t tmit_mod_value_size;
108 uint8_t disconnected;
112 * Client context for a channel master.
116 struct Channel channel;
117 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
118 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
119 struct GNUNET_HashCode pub_key_hash;
121 struct GNUNET_MULTICAST_Origin *origin;
122 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
125 * Maximum message ID for this channel.
127 * Incremented before sending a message, thus the message_id in messages sent
130 uint64_t max_message_id;
133 * ID of the last message that contains any state operations.
134 * 0 if there is no such message.
136 uint64_t max_state_message_id;
139 * Maximum group generation for this channel.
141 uint64_t max_group_generation;
144 * @see enum GNUNET_PSYC_Policy
151 * Client context for a channel slave.
155 struct Channel channel;
156 struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
157 struct GNUNET_CRYPTO_EddsaPublicKey chan_key;
158 struct GNUNET_HashCode chan_key_hash;
160 struct GNUNET_MULTICAST_Member *member;
161 struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
163 struct GNUNET_PeerIdentity origin;
164 struct GNUNET_PeerIdentity *relays;
165 struct GNUNET_MessageHeader *join_req;
167 uint64_t max_message_id;
168 uint64_t max_request_id;
170 uint32_t relay_count;
175 transmit_message (struct Channel *ch);
179 * Task run during shutdown.
185 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
189 GNUNET_SERVER_notification_context_destroy (nc);
194 GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
201 client_cleanup (struct Channel *ch)
205 struct Master *mst = (struct Master *) ch;
206 if (NULL != mst->origin)
207 GNUNET_MULTICAST_origin_stop (mst->origin);
208 GNUNET_CONTAINER_multihashmap_remove (clients, &mst->pub_key_hash, mst);
212 struct Slave *slv = (struct Slave *) ch;
213 if (NULL != slv->join_req)
214 GNUNET_free (slv->join_req);
215 if (NULL != slv->relays)
216 GNUNET_free (slv->relays);
217 if (NULL != slv->member)
218 GNUNET_MULTICAST_member_part (slv->member);
226 * Called whenever a client is disconnected.
227 * Frees our resources associated with that client.
229 * @param cls Closure.
230 * @param client Identification of the client.
233 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
238 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
241 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
244 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
245 "User context is NULL in client_disconnect()\n");
250 ch->disconnected = GNUNET_YES;
252 /* Send pending messages to multicast before cleanup. */
253 if (NULL != ch->tmit_head)
255 transmit_message (ch);
265 join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
266 const struct GNUNET_MessageHeader *join_req,
267 struct GNUNET_MULTICAST_JoinHandle *jh)
274 membership_test_cb (void *cls,
275 const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
276 uint64_t message_id, uint64_t group_generation,
277 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
284 replay_fragment_cb (void *cls,
285 const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
286 uint64_t fragment_id, uint64_t flags,
287 struct GNUNET_MULTICAST_ReplayHandle *rh)
294 replay_message_cb (void *cls,
295 const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
297 uint64_t fragment_offset,
299 struct GNUNET_MULTICAST_ReplayHandle *rh)
306 fragment_store_result (void *cls, int64_t result, const char *err_msg)
308 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
309 "fragment_store() returned %l (%s)\n", result, err_msg);
314 * Iterator callback for sending a message to a client.
319 message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
322 const struct GNUNET_MessageHeader *msg = cls;
323 struct Channel *ch = chan;
325 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
326 "Sending message of type %u and size %u to client 0x%zx.\n",
327 ntohs (msg->type), ntohs (msg->size), ch->client);
329 GNUNET_SERVER_notification_context_add (nc, ch->client);
330 GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO);
337 * Incoming message fragment from multicast.
339 * Store it using PSYCstore and send it to all clients of the channel.
342 message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
344 uint16_t type = ntohs (msg->type);
345 uint16_t size = ntohs (msg->size);
347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
348 "Received message of type %u and size %u from multicast.\n",
351 struct Channel *ch = cls;
352 struct Master *mst = cls;
353 struct Slave *slv = cls;
355 /* const struct GNUNET_MULTICAST_MessageHeader *mmsg
356 = (const struct GNUNET_MULTICAST_MessageHeader *) msg; */
357 struct GNUNET_CRYPTO_EddsaPublicKey *chan_key
358 = ch->is_master ? &mst->pub_key : &slv->chan_key;
359 struct GNUNET_HashCode *chan_key_hash
360 = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash;
364 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
366 GNUNET_PSYCSTORE_fragment_store (store, chan_key,
368 GNUNET_MULTICAST_MessageHeader *) msg,
372 /* FIXME: apply modifiers to state in PSYCstore */
373 GNUNET_PSYCSTORE_state_modify (store, chan_key,
374 GNUNET_ntohll (mmsg->message_id),
375 meth->mod_count, mods,
379 const struct GNUNET_MULTICAST_MessageHeader *mmsg
380 = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
381 struct GNUNET_PSYC_MessageHeader *pmsg;
383 uint16_t size = ntohs (msg->size);
387 for (pos = 0; sizeof (*mmsg) + pos < size; pos += psize)
389 const struct GNUNET_MessageHeader *pmsg
390 = (const struct GNUNET_MessageHeader *) ((char *) &mmsg[1] + pos);
391 psize = ntohs (pmsg->size);
392 if (psize < sizeof (*pmsg) || sizeof (*mmsg) + pos + psize > size)
394 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
395 "Received invalid message part of type %u and size %u "
396 "from multicast. Not sending to clients.\n",
397 ntohs (pmsg->type), psize);
403 psize = sizeof (*pmsg) + size - sizeof (*mmsg);
404 pmsg = GNUNET_malloc (psize);
405 pmsg->header.size = htons (psize);
406 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
407 pmsg->message_id = mmsg->message_id;
409 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
411 GNUNET_CONTAINER_multihashmap_get_multiple (clients, chan_key_hash,
418 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
419 "Discarding unknown message of type %u and size %u.\n",
426 * Send a request received from multicast to a client.
429 request_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
439 request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
440 const struct GNUNET_MessageHeader *req,
441 enum GNUNET_MULTICAST_MessageFlags flags)
448 * Response from PSYCstore with the current counter values for a channel master.
451 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
452 uint64_t max_message_id, uint64_t max_group_generation,
453 uint64_t max_state_message_id)
455 struct Master *mst = cls;
456 struct Channel *ch = &mst->channel;
457 struct CountersResult *res = GNUNET_malloc (sizeof (*res));
458 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
459 res->header.size = htons (sizeof (*res));
460 res->result_code = htonl (result);
461 res->max_message_id = GNUNET_htonll (max_message_id);
463 if (GNUNET_OK == result || GNUNET_NO == result)
465 mst->max_message_id = max_message_id;
466 mst->max_state_message_id = max_state_message_id;
467 mst->max_group_generation = max_group_generation;
469 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
471 join_cb, membership_test_cb,
472 replay_fragment_cb, replay_message_cb,
473 request_cb, message_cb, ch);
475 GNUNET_SERVER_notification_context_add (nc, ch->client);
476 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
483 * Response from PSYCstore with the current counter values for a channel slave.
486 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
487 uint64_t max_message_id, uint64_t max_group_generation,
488 uint64_t max_state_message_id)
490 struct Slave *slv = cls;
491 struct Channel *ch = &slv->channel;
492 struct CountersResult *res = GNUNET_malloc (sizeof (*res));
493 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
494 res->header.size = htons (sizeof (*res));
495 res->result_code = htonl (result);
496 res->max_message_id = GNUNET_htonll (max_message_id);
498 if (GNUNET_OK == result || GNUNET_NO == result)
500 slv->max_message_id = max_message_id;
502 = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key,
504 slv->relay_count, slv->relays,
505 slv->join_req, join_cb,
507 replay_fragment_cb, replay_message_cb,
511 GNUNET_SERVER_notification_context_add (nc, ch->client);
512 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
519 * Handle a connecting client starting a channel master.
522 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
523 const struct GNUNET_MessageHeader *msg)
525 const struct MasterStartRequest *req
526 = (const struct MasterStartRequest *) msg;
527 struct Master *mst = GNUNET_new (struct Master);
528 mst->channel.client = client;
529 mst->channel.is_master = GNUNET_YES;
530 mst->policy = ntohl (req->policy);
531 mst->priv_key = req->channel_key;
532 GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key,
534 GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash);
536 GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
537 master_counters_cb, mst);
539 GNUNET_SERVER_client_set_user_context (client, &mst->channel);
540 GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst,
541 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
542 GNUNET_SERVER_receive_done (client, GNUNET_OK);
547 * Handle a connecting client joining as a channel slave.
550 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
551 const struct GNUNET_MessageHeader *msg)
553 const struct SlaveJoinRequest *req
554 = (const struct SlaveJoinRequest *) msg;
555 struct Slave *slv = GNUNET_new (struct Slave);
556 slv->channel.client = client;
557 slv->channel.is_master = GNUNET_NO;
558 slv->slave_key = req->slave_key;
559 slv->chan_key = req->channel_key;
560 GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key),
561 &slv->chan_key_hash);
562 slv->origin = req->origin;
563 slv->relay_count = ntohl (req->relay_count);
565 const struct GNUNET_PeerIdentity *relays
566 = (const struct GNUNET_PeerIdentity *) &req[1];
568 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
570 for (i = 0; i < slv->relay_count; i++)
571 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
573 GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
574 slave_counters_cb, slv);
576 GNUNET_SERVER_client_set_user_context (client, &slv->channel);
577 GNUNET_SERVER_receive_done (client, GNUNET_OK);
582 * Send acknowledgement to a client.
584 * Sent after a message fragment has been passed on to multicast.
586 * @param ch The channel struct for the client.
589 send_message_ack (struct Channel *ch)
591 struct GNUNET_MessageHeader res;
592 res.size = htons (sizeof (res));
593 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
595 GNUNET_SERVER_notification_context_add (nc, ch->client);
596 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res,
602 * Callback for the transmit functions of multicast.
605 transmit_notify (void *cls, size_t *data_size, void *data)
607 struct Channel *ch = cls;
608 struct TransmitMessage *tmit_msg = ch->tmit_head;
610 if (NULL == tmit_msg || *data_size < tmit_msg->size)
612 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n");
617 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618 "transmit_notify: sending %u bytes.\n", tmit_msg->size);
620 *data_size = tmit_msg->size;
621 memcpy (data, tmit_msg->buf, *data_size);
623 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
624 GNUNET_free (tmit_msg);
626 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
627 send_message_ack (ch);
629 if (0 == ch->tmit_task)
631 if (NULL != ch->tmit_head)
633 transmit_message (ch);
635 else if (ch->disconnected)
637 /* FIXME: handle partial message (when still in_transmit) */
647 * Transmit a message from a channel master to the multicast group.
650 master_transmit_message (struct Master *mst)
652 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
653 mst->channel.tmit_task = 0;
654 if (NULL == mst->tmit_handle)
657 = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id,
658 mst->max_group_generation,
659 transmit_notify, mst);
663 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
669 * Transmit a message from a channel slave to the multicast group.
672 slave_transmit_message (struct Slave *slv)
674 slv->channel.tmit_task = 0;
675 if (NULL == slv->tmit_handle)
678 = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id,
679 transmit_notify, slv);
683 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
689 transmit_message (struct Channel *ch)
692 ? master_transmit_message ((struct Master *) ch)
693 : slave_transmit_message ((struct Slave *) ch);
698 transmit_error (struct Channel *ch)
700 struct GNUNET_MessageHeader *msg;
701 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg)
703 msg = (struct GNUNET_MessageHeader *) &tmit_msg[1];
704 msg->size = ntohs (sizeof (*msg));
705 msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
707 tmit_msg->buf = (char *) &tmit_msg[1];
708 tmit_msg->size = sizeof (*msg);
709 tmit_msg->state = ch->tmit_state;
710 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
711 transmit_message (ch);
714 GNUNET_SERVER_client_disconnect (ch->client);
719 * Incoming message from a client.
722 handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
723 const struct GNUNET_MessageHeader *msg)
726 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
727 GNUNET_assert (NULL != ch);
729 uint16_t size = ntohs (msg->size);
730 uint16_t psize = 0, pos = 0;
732 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
734 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Message payload too large\n");
740 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
742 const struct GNUNET_MessageHeader *pmsg
743 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
744 psize = ntohs (pmsg->size);
745 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
746 "Received message part of type %u and size %u "
747 "from client.\n", ntohs (pmsg->type), psize);
748 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
750 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
751 "Received invalid message part of type %u and size %u "
752 "from client.\n", ntohs (pmsg->type), psize);
759 size -= sizeof (*msg);
760 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
761 tmit_msg->buf = (char *) &tmit_msg[1];
762 memcpy (tmit_msg->buf, &msg[1], size);
763 tmit_msg->size = size;
764 tmit_msg->state = ch->tmit_state;
765 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
766 transmit_message (ch);
768 GNUNET_SERVER_receive_done (client, GNUNET_OK);
773 * Initialize the PSYC service.
775 * @param cls Closure.
776 * @param server The initialized server.
777 * @param c Configuration to use.
780 run (void *cls, struct GNUNET_SERVER_Handle *server,
781 const struct GNUNET_CONFIGURATION_Handle *c)
783 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
784 { &handle_master_start, NULL,
785 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
787 { &handle_slave_join, NULL,
788 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
790 { &handle_psyc_message, NULL,
791 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
795 store = GNUNET_PSYCSTORE_connect (cfg);
796 stats = GNUNET_STATISTICS_create ("psyc", cfg);
797 clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
798 nc = GNUNET_SERVER_notification_context_create (server, 1);
799 GNUNET_SERVER_add_handlers (server, handlers);
800 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
801 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
807 * The main function for the service.
809 * @param argc number of arguments from the command line
810 * @param argv command line arguments
811 * @return 0 ok, 1 on error
814 main (int argc, char *const *argv)
817 GNUNET_SERVICE_run (argc, argv, "psyc",
818 GNUNET_SERVICE_OPTION_NONE,
819 &run, NULL)) ? 0 : 1;
822 /* end of gnunet-service-psycstore.c */