2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psyc/gnunet-service-psyc.c
24 * @author Gabor X Toth
28 #include "gnunet_util_lib.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_psycstore_service.h"
34 #include "gnunet_psyc_service.h"
39 * Handle to our current configuration.
41 static const struct GNUNET_CONFIGURATION_Handle *cfg;
44 * Handle to the statistics service.
46 static struct GNUNET_STATISTICS_Handle *stats;
49 * Notification context, simplifies client broadcasts.
51 static struct GNUNET_SERVER_NotificationContext *nc;
54 * Handle to the PSYCstore.
56 static struct GNUNET_PSYCSTORE_Handle *store;
59 * channel's pub_key_hash -> struct Channel
61 static struct GNUNET_CONTAINER_MultiHashMap *clients;
64 * Message in the transmission queue.
66 struct TransmitMessage
68 struct TransmitMessage *prev;
69 struct TransmitMessage *next;
77 * Common part of the client context for both a master and slave channel.
81 struct GNUNET_SERVER_Client *client;
83 struct TransmitMessage *tmit_head;
84 struct TransmitMessage *tmit_tail;
87 GNUNET_SCHEDULER_TaskIdentifier tmit_task;
88 uint32_t tmit_mod_count;
89 uint32_t tmit_mod_recvd;
98 * Client context for a channel master.
102 struct Channel channel;
103 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
104 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
105 struct GNUNET_HashCode pub_key_hash;
107 struct GNUNET_MULTICAST_Origin *origin;
108 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
110 uint64_t max_message_id;
111 uint64_t max_state_message_id;
112 uint64_t max_group_generation;
115 * enum GNUNET_PSYC_Policy
122 * Client context for a channel slave.
126 struct Channel channel;
127 struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
128 struct GNUNET_CRYPTO_EddsaPublicKey chan_key;
129 struct GNUNET_HashCode chan_key_hash;
131 struct GNUNET_MULTICAST_Member *member;
132 struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
134 struct GNUNET_PeerIdentity origin;
135 struct GNUNET_PeerIdentity *relays;
136 struct GNUNET_MessageHeader *join_req;
138 uint64_t max_message_id;
139 uint64_t max_request_id;
141 uint32_t relay_count;
146 * Task run during shutdown.
152 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
156 GNUNET_SERVER_notification_context_destroy (nc);
161 GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
167 * Called whenever a client is disconnected.
168 * Frees our resources associated with that client.
170 * @param cls Closure.
171 * @param client Identification of the client.
174 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
179 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
182 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
185 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
186 "User context is NULL in client_disconnect()\n");
191 if (NULL != ch->tmit_buf)
193 GNUNET_free (ch->tmit_buf);
199 struct Master *mst = (struct Master *) ch;
200 if (NULL != mst->origin)
201 GNUNET_MULTICAST_origin_stop (mst->origin);
205 struct Slave *slv = (struct Slave *) ch;
206 if (NULL != slv->join_req)
207 GNUNET_free (slv->join_req);
208 if (NULL != slv->relays)
209 GNUNET_free (slv->relays);
210 if (NULL != slv->member)
211 GNUNET_MULTICAST_member_part (slv->member);
218 join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
219 const struct GNUNET_MessageHeader *join_req,
220 struct GNUNET_MULTICAST_JoinHandle *jh)
226 membership_test_cb (void *cls,
227 const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
228 uint64_t message_id, uint64_t group_generation,
229 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
235 replay_fragment_cb (void *cls,
236 const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
237 uint64_t fragment_id, uint64_t flags,
238 struct GNUNET_MULTICAST_ReplayHandle *rh)
244 replay_message_cb (void *cls,
245 const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
247 uint64_t fragment_offset,
249 struct GNUNET_MULTICAST_ReplayHandle *rh)
255 request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
256 const struct GNUNET_MessageHeader *req,
257 enum GNUNET_MULTICAST_MessageFlags flags)
263 message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
265 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
266 "Received message of type %u from multicast.\n",
271 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
272 uint64_t max_message_id, uint64_t max_group_generation,
273 uint64_t max_state_message_id)
275 struct Master *mst = cls;
276 struct Channel *ch = &mst->channel;
277 struct CountersResult *res = GNUNET_malloc (sizeof (*res));
278 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
279 res->header.size = htons (sizeof (*res));
280 res->result_code = htonl (result);
281 res->max_message_id = GNUNET_htonll (max_message_id);
283 if (GNUNET_OK == result || GNUNET_NO == result)
285 mst->max_message_id = max_message_id;
286 mst->max_state_message_id = max_state_message_id;
287 mst->max_group_generation = max_group_generation;
289 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
291 join_cb, membership_test_cb,
292 replay_fragment_cb, replay_message_cb,
293 request_cb, message_cb, ch);
295 GNUNET_SERVER_notification_context_add (nc, ch->client);
296 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
303 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
304 uint64_t max_message_id, uint64_t max_group_generation,
305 uint64_t max_state_message_id)
307 struct Slave *slv = cls;
308 struct Channel *ch = &slv->channel;
309 struct CountersResult *res = GNUNET_malloc (sizeof (*res));
310 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
311 res->header.size = htons (sizeof (*res));
312 res->result_code = htonl (result);
313 res->max_message_id = GNUNET_htonll (max_message_id);
315 if (GNUNET_OK == result || GNUNET_NO == result)
317 slv->max_message_id = max_message_id;
319 = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key,
321 slv->relay_count, slv->relays,
322 slv->join_req, join_cb,
324 replay_fragment_cb, replay_message_cb,
328 GNUNET_SERVER_notification_context_add (nc, ch->client);
329 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
336 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
337 const struct GNUNET_MessageHeader *msg)
339 const struct MasterStartRequest *req
340 = (const struct MasterStartRequest *) msg;
341 struct Master *mst = GNUNET_new (struct Master);
342 mst->channel.client = client;
343 mst->channel.is_master = GNUNET_YES;
344 mst->policy = ntohl (req->policy);
345 mst->priv_key = req->channel_key;
346 GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key,
348 GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash);
350 GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
351 master_counters_cb, mst);
353 GNUNET_SERVER_client_set_user_context (client, &mst->channel);
354 GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst,
355 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
356 GNUNET_SERVER_receive_done (client, GNUNET_OK);
361 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
362 const struct GNUNET_MessageHeader *msg)
364 const struct SlaveJoinRequest *req
365 = (const struct SlaveJoinRequest *) msg;
366 struct Slave *slv = GNUNET_new (struct Slave);
367 slv->channel.client = client;
368 slv->channel.is_master = GNUNET_NO;
369 slv->slave_key = req->slave_key;
370 slv->chan_key = req->channel_key;
371 GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key),
372 &slv->chan_key_hash);
373 slv->origin = req->origin;
374 slv->relay_count = ntohl (req->relay_count);
376 const struct GNUNET_PeerIdentity *relays
377 = (const struct GNUNET_PeerIdentity *) &req[1];
379 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
381 for (i = 0; i < slv->relay_count; i++)
382 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
384 GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
385 slave_counters_cb, slv);
387 GNUNET_SERVER_client_set_user_context (client, &slv->channel);
388 GNUNET_SERVER_receive_done (client, GNUNET_OK);
393 send_transmit_ack (struct Channel *ch)
395 struct TransmitAck *res = GNUNET_malloc (sizeof (*res));
396 res->header.size = htons (sizeof (*res));
397 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
398 res->buf_avail = htons (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - ch->tmit_size);
400 GNUNET_SERVER_notification_context_add (nc, ch->client);
401 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
408 transmit_notify (void *cls, size_t *data_size, void *data)
410 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n");
411 struct Channel *ch = cls;
412 struct TransmitMessage *msg = ch->tmit_head;
414 if (NULL == msg || *data_size < ntohs (msg->size))
420 *data_size = ntohs (msg->size);
421 memcpy (data, msg->buf, *data_size);
423 GNUNET_free (ch->tmit_buf);
425 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
427 return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
432 master_transmit_message (void *cls,
433 const struct GNUNET_SCHEDULER_TaskContext *tc)
435 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
436 struct Master *mst = cls;
437 mst->channel.tmit_task = 0;
438 if (NULL == mst->tmit_handle)
441 = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id,
442 mst->max_group_generation,
443 transmit_notify, mst);
447 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
453 slave_transmit_message (void *cls,
454 const struct GNUNET_SCHEDULER_TaskContext *tc)
456 struct Slave *slv = cls;
457 slv->channel.tmit_task = 0;
458 if (NULL == slv->tmit_handle)
461 = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id,
462 transmit_notify, slv);
466 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
472 buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
474 uint16_t size = ntohs (msg->size);
475 struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
477 if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
478 return GNUNET_SYSERR;
480 if (0 == ch->tmit_size)
482 ch->tmit_buf = GNUNET_malloc (size);
483 memcpy (ch->tmit_buf, msg, size);
484 ch->tmit_size = size;
486 else if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE <= ch->tmit_size + size)
488 ch->tmit_buf = GNUNET_realloc (ch->tmit_buf, ch->tmit_size + size);
489 memcpy (ch->tmit_buf + ch->tmit_size, msg, size);
490 ch->tmit_size += size;
493 if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE
494 < ch->tmit_size + sizeof (struct GNUNET_PSYC_MessageData))
496 struct TransmitMessage *tmit_msg = GNUNET_new (struct TransmitMessage);
497 tmit_msg->buf = (char *) msg;
498 tmit_msg->size = size;
499 tmit_msg->status = ch->tmit_status;
500 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
501 tmit_delay = GNUNET_TIME_UNIT_ZERO;
504 if (0 != ch->tmit_task)
505 GNUNET_SCHEDULER_cancel (ch->tmit_task);
509 ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch)
510 : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch);
516 handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
517 const struct GNUNET_MessageHeader *msg)
519 const struct GNUNET_PSYC_MessageMethod *meth
520 = (const struct GNUNET_PSYC_MessageMethod *) msg;
522 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
523 GNUNET_assert (NULL != ch);
525 if (GNUNET_NO != ch->in_transmit)
527 // FIXME: already transmitting a message, send back error message.
531 ch->in_transmit = GNUNET_YES;
534 ch->tmit_mod_recvd = 0;
535 ch->tmit_mod_count = ntohl (meth->mod_count);
536 ch->tmit_status = GNUNET_PSYC_DATA_CONT;
538 buffer_message (ch, msg);
540 if (0 == ch->tmit_mod_count)
541 send_transmit_ack (ch);
543 GNUNET_SERVER_receive_done (client, GNUNET_OK);
548 handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
549 const struct GNUNET_MessageHeader *msg)
551 const struct GNUNET_PSYC_MessageModifier *mod
552 = (const struct GNUNET_PSYC_MessageModifier *) msg;
554 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
555 GNUNET_assert (NULL != ch);
557 ch->tmit_mod_recvd++;
558 buffer_message (ch, msg);
560 if (ch->tmit_mod_recvd == ch->tmit_mod_count)
561 send_transmit_ack (ch);
563 GNUNET_SERVER_receive_done (client, GNUNET_OK);
568 handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
569 const struct GNUNET_MessageHeader *msg)
571 const struct GNUNET_PSYC_MessageData *data
572 = (const struct GNUNET_PSYC_MessageData *) msg;
574 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
575 GNUNET_assert (NULL != ch);
577 ch->tmit_status = ntohs (data->status);
578 buffer_message (ch, msg);
579 send_transmit_ack (ch);
581 if (GNUNET_PSYC_DATA_CONT != ch->tmit_status)
582 ch->in_transmit = GNUNET_NO;
584 GNUNET_SERVER_receive_done (client, GNUNET_OK);
589 * Initialize the PSYC service.
591 * @param cls Closure.
592 * @param server The initialized server.
593 * @param c Configuration to use.
596 run (void *cls, struct GNUNET_SERVER_Handle *server,
597 const struct GNUNET_CONFIGURATION_Handle *c)
599 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
600 { &handle_master_start, NULL,
601 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
603 { &handle_slave_join, NULL,
604 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
606 { &handle_transmit_method, NULL,
607 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 },
609 { &handle_transmit_modifier, NULL,
610 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 },
612 { &handle_transmit_data, NULL,
613 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 },
619 store = GNUNET_PSYCSTORE_connect (cfg);
620 stats = GNUNET_STATISTICS_create ("psyc", cfg);
621 clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
622 nc = GNUNET_SERVER_notification_context_create (server, 1);
623 GNUNET_SERVER_add_handlers (server, handlers);
624 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
625 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
631 * The main function for the service.
633 * @param argc number of arguments from the command line
634 * @param argv command line arguments
635 * @return 0 ok, 1 on error
638 main (int argc, char *const *argv)
641 GNUNET_SERVICE_run (argc, argv, "psyc",
642 GNUNET_SERVICE_OPTION_NONE,
643 &run, NULL)) ? 0 : 1;
646 /* end of gnunet-service-psycstore.c */