2 This file is part of GNUnet.
3 Copyright (C) 2008--2016 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * @file testbed/gnunet-service-testbed_barriers.c
21 * @brief barrier handling at the testbed controller
22 * @author Sree Harsha Totakura <sreeharsha@totakura.in>
25 #include "gnunet-service-testbed.h"
26 #include "gnunet-service-testbed_barriers.h"
27 #include "testbed_api.h"
31 * timeout for outgoing message transmissions in seconds
33 #define MESSAGE_SEND_TIMEOUT(s) \
34 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
38 * Test to see if local peers have reached the required quorum of a barrier
40 #define LOCAL_QUORUM_REACHED(barrier) \
41 ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
51 #define LOG(kind,...) \
52 GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__)
62 * Context to be associated with each client
67 * The barrier this client is waiting for
69 struct Barrier *barrier;
74 struct ClientCtx *next;
79 struct ClientCtx *prev;
84 struct GNUNET_SERVICE_Client *client;
90 * Wrapper around Barrier handle
97 struct WBarrier *next;
102 struct WBarrier *prev;
105 * The local barrier associated with the creation of this wrapper
107 struct Barrier *barrier;
110 * Handle to the slave controller where this wrapper creates a barrier
112 struct GNUNET_TESTBED_Controller *controller;
115 * The barrier handle from API
117 struct GNUNET_TESTBED_Barrier *hbarrier;
120 * Has this barrier been crossed?
132 * The hashcode of the barrier name
134 struct GNUNET_HashCode hash;
137 * The client handle to the master controller
139 struct GNUNET_SERVICE_Client *mc;
142 * The name of the barrier
147 * DLL head for the list of clients waiting for this barrier
149 struct ClientCtx *head;
152 * DLL tail for the list of clients waiting for this barrier
154 struct ClientCtx *tail;
157 * DLL head for the list of barrier handles
159 struct WBarrier *whead;
162 * DLL tail for the list of barrier handles
164 struct WBarrier *wtail;
167 * Identifier for the timeout task
169 struct GNUNET_SCHEDULER_Task *tout_task;
172 * The status of this barrier
174 enum GNUNET_TESTBED_BarrierStatus status;
177 * Number of barriers wrapped in the above DLL
179 unsigned int num_wbarriers;
182 * Number of wrapped barriers reached so far
184 unsigned int num_wbarriers_reached;
187 * Number of wrapped barrier initialised so far
189 unsigned int num_wbarriers_inited;
192 * Number of peers which have reached this barrier
194 unsigned int nreached;
197 * Number of slaves we have initialised this barrier
199 unsigned int nslaves;
202 * Quorum percentage to be reached
210 * Hashtable handle for storing initialised barriers
212 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
217 static struct GNUNET_SERVICE_Handle *ctx;
221 * Function to remove a barrier from the barrier map and cleanup resources
222 * occupied by a barrier
224 * @param barrier the barrier handle
227 remove_barrier (struct Barrier *barrier)
229 struct ClientCtx *ctx;
231 GNUNET_assert (GNUNET_YES ==
232 GNUNET_CONTAINER_multihashmap_remove (barrier_map,
235 while (NULL != (ctx = barrier->head))
237 GNUNET_CONTAINER_DLL_remove (barrier->head,
242 GNUNET_free (barrier->name);
243 GNUNET_free (barrier);
248 * Cancels all subcontroller barrier handles
250 * @param barrier the local barrier
253 cancel_wrappers (struct Barrier *barrier)
255 struct WBarrier *wrapper;
257 while (NULL != (wrapper = barrier->whead))
259 GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
260 GNUNET_CONTAINER_DLL_remove (barrier->whead,
263 GNUNET_free (wrapper);
269 * Send a status message about a barrier to the given client
271 * @param client the client to send the message to
272 * @param name the barrier name
273 * @param status the status of the barrier
274 * @param emsg the error message; should be non-NULL for
275 * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
278 send_client_status_msg (struct GNUNET_SERVICE_Client *client,
280 enum GNUNET_TESTBED_BarrierStatus status,
283 struct GNUNET_MQ_Envelope *env;
284 struct GNUNET_TESTBED_BarrierStatusMsg *msg;
288 GNUNET_assert ( (NULL == emsg) ||
289 (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status) );
290 name_len = strlen (name) + 1;
291 err_len = ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
292 env = GNUNET_MQ_msg_extra (msg,
294 GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
295 msg->status = htons (status);
296 msg->name_len = htons ((uint16_t) name_len - 1);
297 GNUNET_memcpy (msg->data,
300 GNUNET_memcpy (msg->data + name_len,
303 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
309 * Sends a barrier failed message
311 * @param barrier the corresponding barrier
312 * @param emsg the error message; should be non-NULL for
313 * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
316 send_barrier_status_msg (struct Barrier *barrier,
319 GNUNET_assert (0 != barrier->status);
320 send_client_status_msg (barrier->mc,
328 * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.
330 * @param cls identification of the client
331 * @param message the actual message
334 check_barrier_wait (void *cls,
335 const struct GNUNET_TESTBED_BarrierWait *msg)
337 return GNUNET_OK; /* always well-formed */
342 * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages. This
343 * message should come from peers or a shared helper service using the
344 * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
346 * This handler is queued in the main service and will handle the messages sent
347 * either from the testbed driver or from a high level controller
349 * @param cls identification of the client
350 * @param message the actual message
353 handle_barrier_wait (void *cls,
354 const struct GNUNET_TESTBED_BarrierWait *msg)
356 struct ClientCtx *client_ctx = cls;
357 struct Barrier *barrier;
359 struct GNUNET_HashCode key;
363 msize = ntohs (msg->header.size);
364 if (NULL == barrier_map)
367 GNUNET_SERVICE_client_drop (client_ctx->client);
370 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
371 name = GNUNET_malloc (name_len + 1);
372 name[name_len] = '\0';
376 LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n",
378 GNUNET_CRYPTO_hash (name,
382 if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
385 GNUNET_SERVICE_client_drop (client_ctx->client);
388 if (NULL != client_ctx->barrier)
391 GNUNET_SERVICE_client_drop (client_ctx->client);
394 client_ctx->barrier = barrier;
395 GNUNET_CONTAINER_DLL_insert_tail (barrier->head,
399 if ( (barrier->num_wbarriers_reached == barrier->num_wbarriers) &&
400 (LOCAL_QUORUM_REACHED (barrier)) )
402 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
403 send_barrier_status_msg (barrier,
406 GNUNET_SERVICE_client_continue (client_ctx->client);
411 * Function called when a client connects to the testbed-barrier service.
414 * @param client the connecting client
415 * @param mq queue to talk to @a client
416 * @return our `struct ClientCtx`
419 connect_cb (void *cls,
420 struct GNUNET_SERVICE_Client *client,
421 struct GNUNET_MQ_Handle *mq)
423 struct ClientCtx *client_ctx;
425 LOG_DEBUG ("Client connected to testbed-barrier service\n");
426 client_ctx = GNUNET_new (struct ClientCtx);
427 client_ctx->client = client;
433 * Functions with this signature are called whenever a client
434 * is disconnected on the network level.
437 * @param client identification of the client; NULL
438 * for the last call when the server is destroyed
441 disconnect_cb (void *cls,
442 struct GNUNET_SERVICE_Client *client,
445 struct ClientCtx *client_ctx = app_ctx;
446 struct Barrier *barrier = client_ctx->barrier;
450 GNUNET_CONTAINER_DLL_remove (barrier->head,
453 client_ctx->barrier = NULL;
455 GNUNET_free (client_ctx);
456 LOG_DEBUG ("Client disconnected from testbed-barrier service\n");
461 * Function to initialise barrriers component
463 * @param cfg the configuration to use for initialisation
466 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
468 struct GNUNET_MQ_MessageHandler message_handlers[] = {
469 GNUNET_MQ_hd_var_size (barrier_wait,
470 GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT,
471 struct GNUNET_TESTBED_BarrierWait,
473 GNUNET_MQ_handler_end ()
476 LOG_DEBUG ("Launching testbed-barrier service\n");
477 barrier_map = GNUNET_CONTAINER_multihashmap_create (3,
479 ctx = GNUNET_SERVICE_start ("testbed-barrier",
489 * Iterator over hash map entries.
492 * @param key current key code
493 * @param value value in the hash map
494 * @return #GNUNET_YES if we should continue to
499 barrier_destroy_iterator (void *cls,
500 const struct GNUNET_HashCode *key,
503 struct Barrier *barrier = value;
505 GNUNET_assert (NULL != barrier);
506 cancel_wrappers (barrier);
507 remove_barrier (barrier);
513 * Function to stop the barrier service
516 GST_barriers_destroy ()
518 GNUNET_assert (NULL != barrier_map);
519 GNUNET_assert (GNUNET_SYSERR !=
520 GNUNET_CONTAINER_multihashmap_iterate (barrier_map,
521 &barrier_destroy_iterator,
523 GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
524 GNUNET_assert (NULL != ctx);
525 GNUNET_SERVICE_stop (ctx);
530 * Functions of this type are to be given as callback argument to
531 * GNUNET_TESTBED_barrier_init(). The callback will be called when status
532 * information is available for the barrier.
534 * @param cls the closure given to GNUNET_TESTBED_barrier_init()
535 * @param name the name of the barrier
536 * @param b_ the barrier handle
537 * @param status status of the barrier; #GNUNET_OK if the barrier is crossed;
538 * #GNUNET_SYSERR upon error
539 * @param emsg if the status were to be #GNUNET_SYSERR, this parameter has the
543 wbarrier_status_cb (void *cls,
545 struct GNUNET_TESTBED_Barrier *b_,
546 enum GNUNET_TESTBED_BarrierStatus status,
549 struct WBarrier *wrapper = cls;
550 struct Barrier *barrier = wrapper->barrier;
552 GNUNET_assert (b_ == wrapper->hbarrier);
555 case GNUNET_TESTBED_BARRIERSTATUS_ERROR:
556 LOG (GNUNET_ERROR_TYPE_ERROR,
557 "Initialising barrier `%s' failed at a sub-controller: %s\n",
559 (NULL != emsg) ? emsg : "NULL");
560 cancel_wrappers (barrier);
562 emsg = "Initialisation failed at a sub-controller";
563 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
564 send_barrier_status_msg (barrier, emsg);
566 case GNUNET_TESTBED_BARRIERSTATUS_CROSSED:
567 if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED != barrier->status)
572 barrier->num_wbarriers_reached++;
573 if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
574 && (LOCAL_QUORUM_REACHED (barrier)))
576 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
577 send_barrier_status_msg (barrier, NULL);
580 case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED:
581 if (0 != barrier->status)
586 barrier->num_wbarriers_inited++;
587 if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
589 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
590 send_barrier_status_msg (barrier, NULL);
598 * Function called upon timeout while waiting for a response from the
599 * subcontrollers to barrier init message
604 fwd_tout_barrier_init (void *cls)
606 struct Barrier *barrier = cls;
608 cancel_wrappers (barrier);
609 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
610 send_barrier_status_msg (barrier,
611 "Timedout while propagating barrier initialisation\n");
612 remove_barrier (barrier);
618 * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.
620 * @param cls identification of the client
621 * @param msg the actual message
622 * @return #GNUNET_OK if @a msg is well-formed
625 check_barrier_init (void *cls,
626 const struct GNUNET_TESTBED_BarrierInit *msg)
628 return GNUNET_OK; /* always well-formed */
633 * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This
634 * message should always come from a parent controller or the testbed API if we
635 * are the root controller.
637 * This handler is queued in the main service and will handle the messages sent
638 * either from the testbed driver or from a high level controller
640 * @param cls identification of the client
641 * @param msg the actual message
644 handle_barrier_init (void *cls,
645 const struct GNUNET_TESTBED_BarrierInit *msg)
647 struct GNUNET_SERVICE_Client *client = cls;
649 struct Barrier *barrier;
651 struct WBarrier *wrapper;
652 struct GNUNET_HashCode hash;
657 if (NULL == GST_context)
660 GNUNET_SERVICE_client_drop (client);
663 if (client != GST_context->client)
666 GNUNET_SERVICE_client_drop (client);
669 msize = ntohs (msg->header.size);
670 name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
671 name = GNUNET_malloc (name_len + 1);
672 GNUNET_memcpy (name, msg->name, name_len);
673 GNUNET_CRYPTO_hash (name, name_len, &hash);
674 LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n",
677 GNUNET_CONTAINER_multihashmap_contains (barrier_map,
680 send_client_status_msg (client,
682 GNUNET_TESTBED_BARRIERSTATUS_ERROR,
683 "A barrier with the same name already exists");
685 GNUNET_SERVICE_client_continue (client);
688 barrier = GNUNET_new (struct Barrier);
689 barrier->hash = hash;
690 barrier->quorum = msg->quorum;
691 barrier->name = name;
692 barrier->mc = client;
693 GNUNET_assert (GNUNET_OK ==
694 GNUNET_CONTAINER_multihashmap_put (barrier_map,
697 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
698 GNUNET_SERVICE_client_continue (client);
699 /* Propagate barrier init to subcontrollers */
700 for (cnt = 0; cnt < GST_slave_list_size; cnt++)
702 if (NULL == (slave = GST_slave_list[cnt]))
704 if (NULL == slave->controller)
706 GNUNET_break (0);/* May happen when we are connecting to the controller */
709 wrapper = GNUNET_new (struct WBarrier);
710 wrapper->barrier = barrier;
711 wrapper->controller = slave->controller;
712 GNUNET_CONTAINER_DLL_insert_tail (barrier->whead,
715 barrier->num_wbarriers++;
716 wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (wrapper->controller,
723 if (NULL == barrier->whead) /* No further propagation */
725 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
726 LOG_DEBUG ("Sending GNUNET_TESTBED_BARRIERSTATUS_INITIALISED for barrier `%s'\n",
728 send_barrier_status_msg (barrier, NULL);
730 barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
731 &fwd_tout_barrier_init,
737 * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.
739 * @param cls identification of the client
740 * @param msg the actual message
741 * @return #GNUNET_OK if @a msg is well-formed
744 check_barrier_cancel (void *cls,
745 const struct GNUNET_TESTBED_BarrierCancel *msg)
747 return GNUNET_OK; /* all are well-formed */
752 * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages. This
753 * message should always come from a parent controller or the testbed API if we
754 * are the root controller.
756 * This handler is queued in the main service and will handle the messages sent
757 * either from the testbed driver or from a high level controller
759 * @param cls identification of the client
760 * @param msg the actual message
763 handle_barrier_cancel (void *cls,
764 const struct GNUNET_TESTBED_BarrierCancel *msg)
766 struct GNUNET_SERVICE_Client *client = cls;
768 struct Barrier *barrier;
769 struct GNUNET_HashCode hash;
773 if (NULL == GST_context)
776 GNUNET_SERVICE_client_drop (client);
779 if (client != GST_context->client)
782 GNUNET_SERVICE_client_drop (client);
785 msize = ntohs (msg->header.size);
786 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
787 name = GNUNET_malloc (name_len + 1);
791 LOG_DEBUG ("Received BARRIER_CANCEL for barrier `%s'\n",
793 GNUNET_CRYPTO_hash (name,
797 GNUNET_CONTAINER_multihashmap_contains (barrier_map,
801 GNUNET_SERVICE_client_drop (client);
804 barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map,
806 GNUNET_assert (NULL != barrier);
807 cancel_wrappers (barrier);
808 remove_barrier (barrier);
809 GNUNET_SERVICE_client_continue (client);
814 * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
816 * @param cls identification of the client
817 * @param msg the actual message
818 * @return #GNUNET_OK if @a msg is well-formed
821 check_barrier_status (void *cls,
822 const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
827 enum GNUNET_TESTBED_BarrierStatus status;
829 msize = ntohs (msg->header.size) - sizeof (*msg);
830 status = ntohs (msg->status);
831 if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status)
833 GNUNET_break_op (0); /* current we only expect BARRIER_CROSSED
834 status message this way */
835 return GNUNET_SYSERR;
838 name_len = ntohs (msg->name_len);
839 if ((name_len + 1) != msize)
842 return GNUNET_SYSERR;
844 if ('\0' != name[name_len])
847 return GNUNET_SYSERR;
854 * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
855 * This handler is queued in the main service and will handle the messages sent
856 * either from the testbed driver or from a high level controller
858 * @param cls identification of the client
859 * @param msg the actual message
862 handle_barrier_status (void *cls,
863 const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
865 struct GNUNET_SERVICE_Client *client = cls;
866 struct Barrier *barrier;
867 struct ClientCtx *client_ctx;
868 struct WBarrier *wrapper;
870 struct GNUNET_HashCode key;
872 struct GNUNET_MQ_Envelope *env;
874 if (NULL == GST_context)
877 GNUNET_SERVICE_client_drop (client);
880 if (client != GST_context->client)
883 GNUNET_SERVICE_client_drop (client);
887 name_len = ntohs (msg->name_len);
888 LOG_DEBUG ("Received BARRIER_STATUS for barrier `%s'\n",
890 GNUNET_CRYPTO_hash (name,
893 barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map,
898 GNUNET_SERVICE_client_drop (client);
901 GNUNET_SERVICE_client_continue (client);
902 for(client_ctx = barrier->head; NULL != client_ctx; client_ctx = client_ctx->next) /* Notify peers */
904 env = GNUNET_MQ_msg_copy (&msg->header);
905 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client_ctx->client),
909 * The wrapper barriers do not echo the barrier status, so we have to do it
912 for (wrapper = barrier->whead; NULL != wrapper; wrapper = wrapper->next)
914 GNUNET_TESTBED_queue_message_ (wrapper->controller,
915 GNUNET_copy_message (&msg->header));
919 /* end of gnunet-service-testbed_barriers.c */