2 This file is part of GNUnet.
3 Copyright (C) 2008--2016 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file testbed/gnunet-service-testbed_barriers.c
23 * @brief barrier handling at the testbed controller
24 * @author Sree Harsha Totakura <sreeharsha@totakura.in>
27 #include "gnunet-service-testbed.h"
28 #include "gnunet-service-testbed_barriers.h"
29 #include "testbed_api.h"
33 * timeout for outgoing message transmissions in seconds
35 #define MESSAGE_SEND_TIMEOUT(s) \
36 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
40 * Test to see if local peers have reached the required quorum of a barrier
42 #define LOCAL_QUORUM_REACHED(barrier) \
43 ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
53 #define LOG(kind,...) \
54 GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__)
64 * Context to be associated with each client
69 * The barrier this client is waiting for
71 struct Barrier *barrier;
76 struct ClientCtx *next;
81 struct ClientCtx *prev;
86 struct GNUNET_SERVICE_Client *client;
92 * Wrapper around Barrier handle
99 struct WBarrier *next;
104 struct WBarrier *prev;
107 * The local barrier associated with the creation of this wrapper
109 struct Barrier *barrier;
112 * Handle to the slave controller where this wrapper creates a barrier
114 struct GNUNET_TESTBED_Controller *controller;
117 * The barrier handle from API
119 struct GNUNET_TESTBED_Barrier *hbarrier;
122 * Has this barrier been crossed?
134 * The hashcode of the barrier name
136 struct GNUNET_HashCode hash;
139 * The client handle to the master controller
141 struct GNUNET_SERVICE_Client *mc;
144 * The name of the barrier
149 * DLL head for the list of clients waiting for this barrier
151 struct ClientCtx *head;
154 * DLL tail for the list of clients waiting for this barrier
156 struct ClientCtx *tail;
159 * DLL head for the list of barrier handles
161 struct WBarrier *whead;
164 * DLL tail for the list of barrier handles
166 struct WBarrier *wtail;
169 * Identifier for the timeout task
171 struct GNUNET_SCHEDULER_Task *tout_task;
174 * The status of this barrier
176 enum GNUNET_TESTBED_BarrierStatus status;
179 * Number of barriers wrapped in the above DLL
181 unsigned int num_wbarriers;
184 * Number of wrapped barriers reached so far
186 unsigned int num_wbarriers_reached;
189 * Number of wrapped barrier initialised so far
191 unsigned int num_wbarriers_inited;
194 * Number of peers which have reached this barrier
196 unsigned int nreached;
199 * Number of slaves we have initialised this barrier
201 unsigned int nslaves;
204 * Quorum percentage to be reached
212 * Hashtable handle for storing initialised barriers
214 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
219 static struct GNUNET_SERVICE_Handle *ctx;
223 * Function to remove a barrier from the barrier map and cleanup resources
224 * occupied by a barrier
226 * @param barrier the barrier handle
229 remove_barrier (struct Barrier *barrier)
231 struct ClientCtx *ctx;
233 GNUNET_assert (GNUNET_YES ==
234 GNUNET_CONTAINER_multihashmap_remove (barrier_map,
237 while (NULL != (ctx = barrier->head))
239 GNUNET_CONTAINER_DLL_remove (barrier->head,
244 GNUNET_free (barrier->name);
245 GNUNET_free (barrier);
250 * Cancels all subcontroller barrier handles
252 * @param barrier the local barrier
255 cancel_wrappers (struct Barrier *barrier)
257 struct WBarrier *wrapper;
259 while (NULL != (wrapper = barrier->whead))
261 GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
262 GNUNET_CONTAINER_DLL_remove (barrier->whead,
265 GNUNET_free (wrapper);
271 * Send a status message about a barrier to the given client
273 * @param client the client to send the message to
274 * @param name the barrier name
275 * @param status the status of the barrier
276 * @param emsg the error message; should be non-NULL for
277 * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
280 send_client_status_msg (struct GNUNET_SERVICE_Client *client,
282 enum GNUNET_TESTBED_BarrierStatus status,
285 struct GNUNET_MQ_Envelope *env;
286 struct GNUNET_TESTBED_BarrierStatusMsg *msg;
290 GNUNET_assert ( (NULL == emsg) ||
291 (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status) );
292 name_len = strlen (name) + 1;
293 err_len = ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
294 env = GNUNET_MQ_msg_extra (msg,
296 GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
297 msg->status = htons (status);
298 msg->name_len = htons ((uint16_t) name_len - 1);
299 GNUNET_memcpy (msg->data,
302 GNUNET_memcpy (msg->data + name_len,
305 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
311 * Sends a barrier failed message
313 * @param barrier the corresponding barrier
314 * @param emsg the error message; should be non-NULL for
315 * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
318 send_barrier_status_msg (struct Barrier *barrier,
321 GNUNET_assert (0 != barrier->status);
322 send_client_status_msg (barrier->mc,
330 * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.
332 * @param cls identification of the client
333 * @param message the actual message
336 check_barrier_wait (void *cls,
337 const struct GNUNET_TESTBED_BarrierWait *msg)
339 return GNUNET_OK; /* always well-formed */
344 * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages. This
345 * message should come from peers or a shared helper service using the
346 * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
348 * This handler is queued in the main service and will handle the messages sent
349 * either from the testbed driver or from a high level controller
351 * @param cls identification of the client
352 * @param message the actual message
355 handle_barrier_wait (void *cls,
356 const struct GNUNET_TESTBED_BarrierWait *msg)
358 struct ClientCtx *client_ctx = cls;
359 struct Barrier *barrier;
361 struct GNUNET_HashCode key;
365 msize = ntohs (msg->header.size);
366 if (NULL == barrier_map)
369 GNUNET_SERVICE_client_drop (client_ctx->client);
372 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
373 name = GNUNET_malloc (name_len + 1);
374 name[name_len] = '\0';
378 LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n",
380 GNUNET_CRYPTO_hash (name,
384 if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
387 GNUNET_SERVICE_client_drop (client_ctx->client);
390 if (NULL != client_ctx->barrier)
393 GNUNET_SERVICE_client_drop (client_ctx->client);
396 client_ctx->barrier = barrier;
397 GNUNET_CONTAINER_DLL_insert_tail (barrier->head,
401 if ( (barrier->num_wbarriers_reached == barrier->num_wbarriers) &&
402 (LOCAL_QUORUM_REACHED (barrier)) )
404 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
405 send_barrier_status_msg (barrier,
408 GNUNET_SERVICE_client_continue (client_ctx->client);
413 * Function called when a client connects to the testbed-barrier service.
416 * @param client the connecting client
417 * @param mq queue to talk to @a client
418 * @return our `struct ClientCtx`
421 connect_cb (void *cls,
422 struct GNUNET_SERVICE_Client *client,
423 struct GNUNET_MQ_Handle *mq)
425 struct ClientCtx *client_ctx;
427 LOG_DEBUG ("Client connected to testbed-barrier service\n");
428 client_ctx = GNUNET_new (struct ClientCtx);
429 client_ctx->client = client;
435 * Functions with this signature are called whenever a client
436 * is disconnected on the network level.
439 * @param client identification of the client; NULL
440 * for the last call when the server is destroyed
443 disconnect_cb (void *cls,
444 struct GNUNET_SERVICE_Client *client,
447 struct ClientCtx *client_ctx = app_ctx;
448 struct Barrier *barrier = client_ctx->barrier;
452 GNUNET_CONTAINER_DLL_remove (barrier->head,
455 client_ctx->barrier = NULL;
457 GNUNET_free (client_ctx);
458 LOG_DEBUG ("Client disconnected from testbed-barrier service\n");
463 * Function to initialise barrriers component
465 * @param cfg the configuration to use for initialisation
468 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
470 struct GNUNET_MQ_MessageHandler message_handlers[] = {
471 GNUNET_MQ_hd_var_size (barrier_wait,
472 GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT,
473 struct GNUNET_TESTBED_BarrierWait,
475 GNUNET_MQ_handler_end ()
478 LOG_DEBUG ("Launching testbed-barrier service\n");
479 barrier_map = GNUNET_CONTAINER_multihashmap_create (3,
481 ctx = GNUNET_SERVICE_start ("testbed-barrier",
491 * Iterator over hash map entries.
494 * @param key current key code
495 * @param value value in the hash map
496 * @return #GNUNET_YES if we should continue to
501 barrier_destroy_iterator (void *cls,
502 const struct GNUNET_HashCode *key,
505 struct Barrier *barrier = value;
507 GNUNET_assert (NULL != barrier);
508 cancel_wrappers (barrier);
509 remove_barrier (barrier);
515 * Function to stop the barrier service
518 GST_barriers_destroy ()
520 GNUNET_assert (NULL != barrier_map);
521 GNUNET_assert (GNUNET_SYSERR !=
522 GNUNET_CONTAINER_multihashmap_iterate (barrier_map,
523 &barrier_destroy_iterator,
525 GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
526 GNUNET_assert (NULL != ctx);
527 GNUNET_SERVICE_stop (ctx);
532 * Functions of this type are to be given as callback argument to
533 * GNUNET_TESTBED_barrier_init(). The callback will be called when status
534 * information is available for the barrier.
536 * @param cls the closure given to GNUNET_TESTBED_barrier_init()
537 * @param name the name of the barrier
538 * @param b_ the barrier handle
539 * @param status status of the barrier; #GNUNET_OK if the barrier is crossed;
540 * #GNUNET_SYSERR upon error
541 * @param emsg if the status were to be #GNUNET_SYSERR, this parameter has the
545 wbarrier_status_cb (void *cls,
547 struct GNUNET_TESTBED_Barrier *b_,
548 enum GNUNET_TESTBED_BarrierStatus status,
551 struct WBarrier *wrapper = cls;
552 struct Barrier *barrier = wrapper->barrier;
554 GNUNET_assert (b_ == wrapper->hbarrier);
557 case GNUNET_TESTBED_BARRIERSTATUS_ERROR:
558 LOG (GNUNET_ERROR_TYPE_ERROR,
559 "Initialising barrier `%s' failed at a sub-controller: %s\n",
561 (NULL != emsg) ? emsg : "NULL");
562 cancel_wrappers (barrier);
564 emsg = "Initialisation failed at a sub-controller";
565 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
566 send_barrier_status_msg (barrier, emsg);
568 case GNUNET_TESTBED_BARRIERSTATUS_CROSSED:
569 if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED != barrier->status)
574 barrier->num_wbarriers_reached++;
575 if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
576 && (LOCAL_QUORUM_REACHED (barrier)))
578 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
579 send_barrier_status_msg (barrier, NULL);
582 case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED:
583 if (0 != barrier->status)
588 barrier->num_wbarriers_inited++;
589 if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
591 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
592 send_barrier_status_msg (barrier, NULL);
600 * Function called upon timeout while waiting for a response from the
601 * subcontrollers to barrier init message
606 fwd_tout_barrier_init (void *cls)
608 struct Barrier *barrier = cls;
610 cancel_wrappers (barrier);
611 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
612 send_barrier_status_msg (barrier,
613 "Timedout while propagating barrier initialisation\n");
614 remove_barrier (barrier);
620 * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.
622 * @param cls identification of the client
623 * @param msg the actual message
624 * @return #GNUNET_OK if @a msg is well-formed
627 check_barrier_init (void *cls,
628 const struct GNUNET_TESTBED_BarrierInit *msg)
630 return GNUNET_OK; /* always well-formed */
635 * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This
636 * message should always come from a parent controller or the testbed API if we
637 * are the root controller.
639 * This handler is queued in the main service and will handle the messages sent
640 * either from the testbed driver or from a high level controller
642 * @param cls identification of the client
643 * @param msg the actual message
646 handle_barrier_init (void *cls,
647 const struct GNUNET_TESTBED_BarrierInit *msg)
649 struct GNUNET_SERVICE_Client *client = cls;
651 struct Barrier *barrier;
653 struct WBarrier *wrapper;
654 struct GNUNET_HashCode hash;
659 if (NULL == GST_context)
662 GNUNET_SERVICE_client_drop (client);
665 if (client != GST_context->client)
668 GNUNET_SERVICE_client_drop (client);
671 msize = ntohs (msg->header.size);
672 name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
673 name = GNUNET_malloc (name_len + 1);
674 GNUNET_memcpy (name, msg->name, name_len);
675 GNUNET_CRYPTO_hash (name, name_len, &hash);
676 LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n",
679 GNUNET_CONTAINER_multihashmap_contains (barrier_map,
682 send_client_status_msg (client,
684 GNUNET_TESTBED_BARRIERSTATUS_ERROR,
685 "A barrier with the same name already exists");
687 GNUNET_SERVICE_client_continue (client);
690 barrier = GNUNET_new (struct Barrier);
691 barrier->hash = hash;
692 barrier->quorum = msg->quorum;
693 barrier->name = name;
694 barrier->mc = client;
695 GNUNET_assert (GNUNET_OK ==
696 GNUNET_CONTAINER_multihashmap_put (barrier_map,
699 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
700 GNUNET_SERVICE_client_continue (client);
701 /* Propagate barrier init to subcontrollers */
702 for (cnt = 0; cnt < GST_slave_list_size; cnt++)
704 if (NULL == (slave = GST_slave_list[cnt]))
706 if (NULL == slave->controller)
708 GNUNET_break (0);/* May happen when we are connecting to the controller */
711 wrapper = GNUNET_new (struct WBarrier);
712 wrapper->barrier = barrier;
713 wrapper->controller = slave->controller;
714 GNUNET_CONTAINER_DLL_insert_tail (barrier->whead,
717 barrier->num_wbarriers++;
718 wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (wrapper->controller,
725 if (NULL == barrier->whead) /* No further propagation */
727 barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
728 LOG_DEBUG ("Sending GNUNET_TESTBED_BARRIERSTATUS_INITIALISED for barrier `%s'\n",
730 send_barrier_status_msg (barrier, NULL);
732 barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
733 &fwd_tout_barrier_init,
739 * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.
741 * @param cls identification of the client
742 * @param msg the actual message
743 * @return #GNUNET_OK if @a msg is well-formed
746 check_barrier_cancel (void *cls,
747 const struct GNUNET_TESTBED_BarrierCancel *msg)
749 return GNUNET_OK; /* all are well-formed */
754 * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages. This
755 * message should always come from a parent controller or the testbed API if we
756 * are the root controller.
758 * This handler is queued in the main service and will handle the messages sent
759 * either from the testbed driver or from a high level controller
761 * @param cls identification of the client
762 * @param msg the actual message
765 handle_barrier_cancel (void *cls,
766 const struct GNUNET_TESTBED_BarrierCancel *msg)
768 struct GNUNET_SERVICE_Client *client = cls;
770 struct Barrier *barrier;
771 struct GNUNET_HashCode hash;
775 if (NULL == GST_context)
778 GNUNET_SERVICE_client_drop (client);
781 if (client != GST_context->client)
784 GNUNET_SERVICE_client_drop (client);
787 msize = ntohs (msg->header.size);
788 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
789 name = GNUNET_malloc (name_len + 1);
793 LOG_DEBUG ("Received BARRIER_CANCEL for barrier `%s'\n",
795 GNUNET_CRYPTO_hash (name,
799 GNUNET_CONTAINER_multihashmap_contains (barrier_map,
803 GNUNET_SERVICE_client_drop (client);
806 barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map,
808 GNUNET_assert (NULL != barrier);
809 cancel_wrappers (barrier);
810 remove_barrier (barrier);
811 GNUNET_SERVICE_client_continue (client);
816 * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
818 * @param cls identification of the client
819 * @param msg the actual message
820 * @return #GNUNET_OK if @a msg is well-formed
823 check_barrier_status (void *cls,
824 const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
829 enum GNUNET_TESTBED_BarrierStatus status;
831 msize = ntohs (msg->header.size) - sizeof (*msg);
832 status = ntohs (msg->status);
833 if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status)
835 GNUNET_break_op (0); /* current we only expect BARRIER_CROSSED
836 status message this way */
837 return GNUNET_SYSERR;
840 name_len = ntohs (msg->name_len);
841 if ((name_len + 1) != msize)
844 return GNUNET_SYSERR;
846 if ('\0' != name[name_len])
849 return GNUNET_SYSERR;
856 * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
857 * This handler is queued in the main service and will handle the messages sent
858 * either from the testbed driver or from a high level controller
860 * @param cls identification of the client
861 * @param msg the actual message
864 handle_barrier_status (void *cls,
865 const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
867 struct GNUNET_SERVICE_Client *client = cls;
868 struct Barrier *barrier;
869 struct ClientCtx *client_ctx;
870 struct WBarrier *wrapper;
872 struct GNUNET_HashCode key;
874 struct GNUNET_MQ_Envelope *env;
876 if (NULL == GST_context)
879 GNUNET_SERVICE_client_drop (client);
882 if (client != GST_context->client)
885 GNUNET_SERVICE_client_drop (client);
889 name_len = ntohs (msg->name_len);
890 LOG_DEBUG ("Received BARRIER_STATUS for barrier `%s'\n",
892 GNUNET_CRYPTO_hash (name,
895 barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map,
900 GNUNET_SERVICE_client_drop (client);
903 GNUNET_SERVICE_client_continue (client);
904 for(client_ctx = barrier->head; NULL != client_ctx; client_ctx = client_ctx->next) /* Notify peers */
906 env = GNUNET_MQ_msg_copy (&msg->header);
907 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client_ctx->client),
911 * The wrapper barriers do not echo the barrier status, so we have to do it
914 for (wrapper = barrier->whead; NULL != wrapper; wrapper = wrapper->next)
916 GNUNET_TESTBED_queue_message_ (wrapper->controller,
917 GNUNET_copy_message (&msg->header));
921 /* end of gnunet-service-testbed_barriers.c */