2 This file is part of GNUnet.
3 (C) 2008--2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file testbed/gnunet-service-testbed_barriers.c
23 * @brief barrier handling at the testbed controller
24 * @author Sree Harsha Totakura <sreeharsha@totakura.in>
27 #include "gnunet-service-testbed.h"
28 #include "gnunet-service-testbed_barriers.h"
32 * timeout for outgoing message transmissions in seconds
34 #define MESSAGE_SEND_TIMEOUT(s) \
35 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
39 * Test to see if local peers have reached the required quorum of a barrier
41 #define LOCAL_QUORUM_REACHED(barrier) \
42 ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
52 #define LOG(kind,...) \
53 GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__)
63 * Message queue for transmitting messages
68 * next pointer for DLL
70 struct MessageQueue *next;
73 * prev pointer for DLL
75 struct MessageQueue *prev;
78 * The message to be sent
80 struct GNUNET_MessageHeader *msg;
85 * Context to be associated with each client
90 * The barrier this client is waiting for
92 struct Barrier *barrier;
97 struct ClientCtx *next;
102 struct ClientCtx *prev;
107 struct GNUNET_SERVER_Client *client;
110 * the transmission handle
112 struct GNUNET_SERVER_TransmitHandle *tx;
117 struct MessageQueue *mq_head;
122 struct MessageQueue *mq_tail;
127 * Wrapper around Barrier handle
134 struct WBarrier *next;
139 struct WBarrier *prev;
142 * The local barrier associated with the creation of this wrapper
144 struct Barrier *barrier;
147 * The barrier handle from API
149 struct GNUNET_TESTBED_Barrier *hbarrier;
152 * Has this barrier been crossed?
164 * The hashcode of the barrier name
166 struct GNUNET_HashCode hash;
169 * The client handle to the master controller
171 struct GNUNET_SERVER_Client *mc;
174 * The name of the barrier
179 * DLL head for the list of clients waiting for this barrier
181 struct ClientCtx *head;
184 * DLL tail for the list of clients waiting for this barrier
186 struct ClientCtx *tail;
189 * DLL head for the list of barrier handles
191 struct WBarrier *whead;
194 * DLL tail for the list of barrier handles
196 struct WBarrier *wtail;
199 * Identifier for the timeout task
201 GNUNET_SCHEDULER_TaskIdentifier tout_task;
204 * The status of this barrier
206 enum GNUNET_TESTBED_BarrierStatus status;
209 * Number of barriers wrapped in the above DLL
211 unsigned int num_wbarriers;
214 * Number of wrapped barriers reached so far
216 unsigned int num_wbarriers_reached;
219 * Number of wrapped barrier initialised so far
221 unsigned int num_wbarriers_inited;
224 * Number of peers which have reached this barrier
226 unsigned int nreached;
229 * Number of slaves we have initialised this barrier
231 unsigned int nslaves;
234 * Quorum percentage to be reached
242 * Hashtable handle for storing initialised barriers
244 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
249 static struct GNUNET_SERVICE_Context *ctx;
253 * Function called to notify a client about the connection
254 * begin ready to queue more data. "buf" will be
255 * NULL and "size" zero if the connection was closed for
256 * writing in the meantime.
258 * @param cls client context
259 * @param size number of bytes available in buf
260 * @param buf where the callee should write the message
261 * @return number of bytes written to buf
264 transmit_ready_cb (void *cls, size_t size, void *buf)
266 struct ClientCtx *ctx = cls;
267 struct GNUNET_SERVER_Client *client = ctx->client;
268 struct MessageQueue *mq;
269 struct GNUNET_MessageHeader *msg;
274 if ((0 == size) || (NULL == buf))
276 GNUNET_assert (NULL != ctx->client);
277 GNUNET_SERVER_client_drop (ctx->client);
283 wrote = ntohs (msg->size);
284 GNUNET_assert (size >= wrote);
285 (void) memcpy (buf, msg, wrote);
286 GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
287 GNUNET_free (mq->msg);
289 if (NULL != (mq = ctx->mq_head))
290 ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
291 MESSAGE_SEND_TIMEOUT (30),
292 &transmit_ready_cb, ctx);
298 * Queue a message into a clients message queue
300 * @param ctx the context associated with the client
301 * @param msg the message to queue. Will be consumed
304 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
306 struct MessageQueue *mq;
307 struct GNUNET_SERVER_Client *client = ctx->client;
309 mq = GNUNET_malloc (sizeof (struct MessageQueue));
311 LOG_DEBUG ("Queueing message of type %u, size %u for sending\n",
312 ntohs (msg->type), ntohs (msg->size));
313 GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
315 ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
316 MESSAGE_SEND_TIMEOUT (30),
317 &transmit_ready_cb, ctx);
322 * Function to cleanup client context data structure
324 * @param ctx the client context data structure
327 cleanup_clientctx (struct ClientCtx *ctx)
329 struct MessageQueue *mq;
331 GNUNET_SERVER_client_drop (ctx->client);
333 GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
334 if (NULL != (mq = ctx->mq_head))
336 GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
337 GNUNET_free (mq->msg);
345 * Function to remove a barrier from the barrier map and cleanup resources
346 * occupied by a barrier
348 * @param barrier the barrier handle
351 remove_barrier (struct Barrier *barrier)
353 struct ClientCtx *ctx;
355 GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
358 while (NULL != (ctx = barrier->head))
360 GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx);
361 cleanup_clientctx (ctx);
363 GNUNET_free (barrier->name);
364 GNUNET_SERVER_client_drop (barrier->mc);
365 GNUNET_free (barrier);
370 * Cancels all subcontroller barrier handles
372 * @param barrier the local barrier
375 cancel_wrappers (struct Barrier *barrier)
377 struct WBarrier *wrapper;
379 while (NULL != (wrapper = barrier->whead))
381 GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
382 GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
383 GNUNET_free (wrapper);
389 * Send a status message about a barrier to the given client
391 * @param client the client to send the message to
392 * @param name the barrier name
393 * @param status the status of the barrier
394 * @param emsg the error message; should be non-NULL for
395 * status=BARRIER_STATUS_ERROR
398 send_client_status_msg (struct GNUNET_SERVER_Client *client,
400 enum GNUNET_TESTBED_BarrierStatus status,
403 struct GNUNET_TESTBED_BarrierStatusMsg *msg;
407 GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status));
408 name_len = strlen (name);
409 msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
411 + ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
412 msg = GNUNET_malloc (msize);
413 msg->header.size = htons (msize);
414 msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
415 msg->status = htons (status);
416 msg->name_len = htons ((uint16_t) name_len);
417 (void) memcpy (msg->data, name, name_len);
419 (void) memcpy (msg->data + name_len + 1, emsg, strlen (emsg));
420 GST_queue_message (client, &msg->header);
425 * Sends a barrier failed message
427 * @param barrier the corresponding barrier
428 * @param emsg the error message; should be non-NULL for
429 * status=BARRIER_STATUS_ERROR
432 send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
434 GNUNET_assert (0 != barrier->status);
435 send_client_status_msg (barrier->mc, barrier->name, barrier->status, emsg);
441 * Task for sending barrier crossed notifications to waiting client
443 * @param cls the barrier which is crossed
444 * @param tc scheduler task context
447 notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
449 struct Barrier *barrier = cls;
450 struct ClientCtx *client_ctx;
451 struct GNUNET_TESTBED_BarrierStatusMsg *msg;
452 struct GNUNET_MessageHeader *dup_msg;
456 name_len = strlen (barrier->name) + 1;
457 msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len;
458 msg = GNUNET_malloc (msize);
459 msg->header.size = htons (msize);
460 msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
461 msg->status = htons (BARRIER_STATUS_CROSSED);
462 msg->name_len = htons (name_len);
463 (void) memcpy (msg->data, barrier->name, name_len);
464 msg->data[name_len] = '\0';
465 while (NULL != (client_ctx = barrier->head))
467 dup_msg = GNUNET_copy_message (&msg->header);
468 queue_message (client_ctx, dup_msg);
469 GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
475 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages. This
476 * message should come from peers or a shared helper service using the
477 * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
479 * This handler is queued in the main service and will handle the messages sent
480 * either from the testbed driver or from a high level controller
483 * @param client identification of the client
484 * @param message the actual message
487 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
488 const struct GNUNET_MessageHeader *message)
490 const struct GNUNET_TESTBED_BarrierWait *msg;
491 struct Barrier *barrier;
493 struct ClientCtx *client_ctx;
494 struct GNUNET_HashCode key;
498 msize = ntohs (message->size);
499 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
502 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
505 if (NULL == barrier_map)
508 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
511 msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
512 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
513 name = GNUNET_malloc (name_len + 1);
514 name[name_len] = '\0';
515 (void) memcpy (name, msg->name, name_len);
516 LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n", name);
517 GNUNET_CRYPTO_hash (name, name_len, &key);
518 if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
521 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
525 client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
526 if (NULL == client_ctx)
528 client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
529 client_ctx->client = client;
530 GNUNET_SERVER_client_keep (client);
531 client_ctx->barrier = barrier;
532 GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
533 GNUNET_SERVER_client_set_user_context (client, client_ctx);
536 if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
537 && (LOCAL_QUORUM_REACHED (barrier)))
539 barrier->status = BARRIER_STATUS_CROSSED;
540 send_barrier_status_msg (barrier, NULL);
541 notify_task_cb (barrier, NULL);
543 GNUNET_SERVER_receive_done (client, GNUNET_OK);
548 * Functions with this signature are called whenever a client
549 * is disconnected on the network level.
552 * @param client identification of the client; NULL
553 * for the last call when the server is destroyed
556 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
558 struct ClientCtx *client_ctx;
562 client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
563 if (NULL == client_ctx)
564 return; /* We only set user context for locally
566 cleanup_clientctx (client_ctx);
571 * Function to initialise barrriers component
573 * @param cfg the configuration to use for initialisation
576 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
578 static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
579 {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
582 struct GNUNET_SERVER_Handle *srv;
584 barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
585 ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
586 GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
587 srv = GNUNET_SERVICE_get_server (ctx);
588 GNUNET_SERVER_add_handlers (srv, message_handlers);
589 GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);
594 * Function to stop the barrier service
597 GST_barriers_destroy ()
599 GNUNET_assert (NULL != barrier_map);
600 GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
601 GNUNET_assert (NULL != ctx);
602 GNUNET_SERVICE_stop (ctx);
607 * Functions of this type are to be given as callback argument to
608 * GNUNET_TESTBED_barrier_init(). The callback will be called when status
609 * information is available for the barrier.
611 * @param cls the closure given to GNUNET_TESTBED_barrier_init()
612 * @param name the name of the barrier
613 * @param b_ the barrier handle
614 * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
615 * GNUNET_SYSERR upon error
616 * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
620 wbarrier_status_cb (void *cls, const char *name,
621 struct GNUNET_TESTBED_Barrier *b_,
622 enum GNUNET_TESTBED_BarrierStatus status,
625 struct WBarrier *wrapper = cls;
626 struct Barrier *barrier = wrapper->barrier;
628 GNUNET_assert (b_ == wrapper->hbarrier);
629 wrapper->hbarrier = NULL;
630 GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
631 GNUNET_free (wrapper);
634 case BARRIER_STATUS_ERROR:
635 LOG (GNUNET_ERROR_TYPE_ERROR,
636 "Initialising barrier `%s' failed at a sub-controller: %s\n",
637 barrier->name, (NULL != emsg) ? emsg : "NULL");
638 cancel_wrappers (barrier);
640 emsg = "Initialisation failed at a sub-controller";
641 barrier->status = BARRIER_STATUS_ERROR;
642 send_barrier_status_msg (barrier, emsg);
644 case BARRIER_STATUS_CROSSED:
645 if (BARRIER_STATUS_INITIALISED != barrier->status)
650 barrier->num_wbarriers_reached++;
651 if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
652 && (LOCAL_QUORUM_REACHED (barrier)))
654 barrier->status = BARRIER_STATUS_CROSSED;
655 send_barrier_status_msg (barrier, NULL);
658 case BARRIER_STATUS_INITIALISED:
659 if (0 != barrier->status)
664 barrier->num_wbarriers_inited++;
665 if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
667 barrier->status = BARRIER_STATUS_INITIALISED;
668 send_barrier_status_msg (barrier, NULL);
676 * Function called upon timeout while waiting for a response from the
677 * subcontrollers to barrier init message
680 * @param tc scheduler task context
683 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
685 struct Barrier *barrier = cls;
687 cancel_wrappers (barrier);
688 barrier->status = BARRIER_STATUS_ERROR;
689 send_barrier_status_msg (barrier,
690 "Timedout while propagating barrier initialisation\n");
691 remove_barrier (barrier);
696 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This
697 * message should always come from a parent controller or the testbed API if we
698 * are the root controller.
700 * This handler is queued in the main service and will handle the messages sent
701 * either from the testbed driver or from a high level controller
704 * @param client identification of the client
705 * @param message the actual message
708 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
709 const struct GNUNET_MessageHeader *message)
711 const struct GNUNET_TESTBED_BarrierInit *msg;
713 struct Barrier *barrier;
715 struct WBarrier *wrapper;
716 struct GNUNET_HashCode hash;
721 if (NULL == GST_context)
724 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
727 if (client != GST_context->client)
730 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
733 msize = ntohs (message->size);
734 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
737 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
740 msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
741 name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
742 name = GNUNET_malloc (name_len + 1);
743 (void) memcpy (name, msg->name, name_len);
744 GNUNET_CRYPTO_hash (name, name_len, &hash);
745 LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n", name);
746 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
749 send_client_status_msg (client, name, BARRIER_STATUS_ERROR,
750 "A barrier with the same name already exists");
752 GNUNET_SERVER_receive_done (client, GNUNET_OK);
755 barrier = GNUNET_malloc (sizeof (struct Barrier));
756 (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
757 barrier->quorum = msg->quorum;
758 barrier->name = name;
759 barrier->mc = client;
760 GNUNET_SERVER_client_keep (client);
761 GNUNET_assert (GNUNET_OK ==
762 GNUNET_CONTAINER_multihashmap_put (barrier_map,
765 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
766 GNUNET_SERVER_receive_done (client, GNUNET_OK);
767 /* Propagate barrier init to subcontrollers */
768 for (cnt = 0; cnt < GST_slave_list_size; cnt++)
770 if (NULL == (slave = GST_slave_list[cnt]))
772 if (NULL == slave->controller)
774 GNUNET_break (0);/* May happen when we are connecting to the controller */
777 wrapper = GNUNET_malloc (sizeof (struct WBarrier));
778 wrapper->barrier = barrier;
779 GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
780 wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller,
786 if (NULL == barrier->whead) /* No further propagation */
788 barrier->status = BARRIER_STATUS_INITIALISED;
789 LOG_DEBUG ("Sending BARRIER_STATUS_INITIALISED for barrier `%s'\n",
791 send_barrier_status_msg (barrier, NULL);
793 barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
794 &fwd_tout_barrier_init,
800 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages. This
801 * message should always come from a parent controller or the testbed API if we
802 * are the root controller.
804 * This handler is queued in the main service and will handle the messages sent
805 * either from the testbed driver or from a high level controller
808 * @param client identification of the client
809 * @param message the actual message
812 GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
813 const struct GNUNET_MessageHeader *message)
815 const struct GNUNET_TESTBED_BarrierCancel *msg;
817 struct Barrier *barrier;
818 struct GNUNET_HashCode hash;
822 if (NULL == GST_context)
825 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
828 if (client != GST_context->client)
831 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
834 msize = ntohs (message->size);
835 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
838 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
841 msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
842 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
843 name = GNUNET_malloc (name_len + 1);
844 (void) memcpy (name, msg->name, name_len);
845 GNUNET_CRYPTO_hash (name, name_len, &hash);
846 if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
849 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
852 barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash);
853 GNUNET_assert (NULL != barrier);
854 cancel_wrappers (barrier);
855 remove_barrier (barrier);
856 GNUNET_SERVER_receive_done (client, GNUNET_OK);
859 /* end of gnunet-service-testbed_barriers.c */