2 This file is part of GNUnet.
3 (C) 2008--2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file testbed/gnunet-service-testbed_barriers.c
23 * @brief barrier handling at the testbed controller
24 * @author Sree Harsha Totakura <sreeharsha@totakura.in>
27 #include "gnunet-service-testbed.h"
28 #include "gnunet-service-testbed_barriers.h"
32 * timeout for outgoing message transmissions in seconds
34 #define MESSAGE_SEND_TIMEOUT(s) \
35 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
39 * Test to see if local peers have reached the required quorum of a barrier
41 #define LOCAL_QUORUM_REACHED(barrier) \
42 ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
52 * Message queue for transmitting messages
57 * next pointer for DLL
59 struct MessageQueue *next;
62 * prev pointer for DLL
64 struct MessageQueue *prev;
67 * The message to be sent
69 struct GNUNET_MessageHeader *msg;
74 * Context to be associated with each client
79 * The barrier this client is waiting for
81 struct Barrier *barrier;
86 struct ClientCtx *next;
91 struct ClientCtx *prev;
96 struct GNUNET_SERVER_Client *client;
99 * the transmission handle
101 struct GNUNET_SERVER_TransmitHandle *tx;
106 struct MessageQueue *mq_head;
111 struct MessageQueue *mq_tail;
116 * Wrapper around Barrier handle
123 struct WBarrier *next;
128 struct WBarrier *prev;
131 * The local barrier associated with the creation of this wrapper
133 struct Barrier *barrier;
136 * The barrier handle from API
138 struct GNUNET_TESTBED_Barrier *hbarrier;
141 * Has this barrier been crossed?
153 * The hashcode of the barrier name
155 struct GNUNET_HashCode hash;
158 * The client handle to the master controller
160 struct GNUNET_SERVER_Client *client;
163 * The name of the barrier
168 * DLL head for the list of clients waiting for this barrier
170 struct ClientCtx *head;
173 * DLL tail for the list of clients waiting for this barrier
175 struct ClientCtx *tail;
178 * DLL head for the list of barrier handles
180 struct WBarrier *whead;
183 * DLL tail for the list of barrier handles
185 struct WBarrier *wtail;
188 * Identifier for the timeout task
190 GNUNET_SCHEDULER_TaskIdentifier tout_task;
193 * The status of this barrier
195 enum GNUNET_TESTBED_BarrierStatus status;
198 * Number of barriers wrapped in the above DLL
200 unsigned int num_wbarriers;
203 * Number of wrapped barriers reached so far
205 unsigned int num_wbarriers_reached;
208 * Number of wrapped barrier initialised so far
210 unsigned int num_wbarriers_inited;
213 * Number of peers which have reached this barrier
215 unsigned int nreached;
218 * Number of slaves we have initialised this barrier
220 unsigned int nslaves;
223 * Quorum percentage to be reached
228 * Was there a timeout while propagating initialisation
235 * Hashtable handle for storing initialised barriers
237 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
242 static struct GNUNET_SERVICE_Context *ctx;
246 * Function called to notify a client about the connection
247 * begin ready to queue more data. "buf" will be
248 * NULL and "size" zero if the connection was closed for
249 * writing in the meantime.
251 * @param cls client context
252 * @param size number of bytes available in buf
253 * @param buf where the callee should write the message
254 * @return number of bytes written to buf
257 transmit_ready_cb (void *cls, size_t size, void *buf)
259 struct ClientCtx *ctx = cls;
260 struct GNUNET_SERVER_Client *client = ctx->client;
261 struct MessageQueue *mq;
262 struct GNUNET_MessageHeader *msg;
267 if ((0 == size) || (NULL == buf))
269 GNUNET_assert (NULL != ctx->client);
270 GNUNET_SERVER_client_drop (ctx->client);
276 wrote = ntohs (msg->size);
277 GNUNET_assert (size >= wrote);
278 (void) memcpy (buf, msg, wrote);
279 GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
280 GNUNET_free (mq->msg);
282 if (NULL != (mq = ctx->mq_head))
283 ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
284 MESSAGE_SEND_TIMEOUT (30),
285 &transmit_ready_cb, ctx);
291 * Queue a message into a clients message queue
293 * @param ctx the context associated with the client
294 * @param msg the message to queue. Will be consumed
297 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
299 struct MessageQueue *mq;
300 struct GNUNET_SERVER_Client *client = ctx->client;
302 mq = GNUNET_malloc (sizeof (struct MessageQueue));
304 GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
306 ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
307 MESSAGE_SEND_TIMEOUT (30),
308 &transmit_ready_cb, ctx);
313 * Function to cleanup client context data structure
315 * @param ctx the client context data structure
318 cleanup_clientctx (struct ClientCtx *ctx)
320 struct MessageQueue *mq;
322 GNUNET_SERVER_client_drop (ctx->client);
324 GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
325 if (NULL != (mq = ctx->mq_head))
327 GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
328 GNUNET_free (mq->msg);
336 * Function to remove a barrier from the barrier map and cleanup resources
337 * occupied by a barrier
339 * @param barrier the barrier handle
342 remove_barrier (struct Barrier *barrier)
344 struct ClientCtx *ctx;
346 GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
349 while (NULL != (ctx = barrier->head))
351 GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx);
352 cleanup_clientctx (ctx);
354 GNUNET_free (barrier->name);
355 GNUNET_SERVER_client_drop (barrier->client);
356 GNUNET_free (barrier);
361 * Cancels all subcontroller barrier handles
363 * @param barrier the local barrier
366 cancel_wrappers (struct Barrier *barrier)
368 struct WBarrier *wrapper;
370 while (NULL != (wrapper = barrier->whead))
372 GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
373 GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
374 GNUNET_free (wrapper);
380 * Send a status message about a barrier to the given client
382 * @param client the client to send the message to
383 * @param name the barrier name
384 * @param status the status of the barrier
385 * @param emsg the error message; should be non-NULL for
386 * status=BARRIER_STATUS_ERROR
389 send_client_status_msg (struct GNUNET_SERVER_Client *client,
391 enum GNUNET_TESTBED_BarrierStatus status,
394 struct GNUNET_TESTBED_BarrierStatusMsg *msg;
398 GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status));
399 name_len = strlen (name) + 1;
400 msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
402 + (NULL == emsg) ? 0 : strlen (emsg) + 1;
403 msg = GNUNET_malloc (msize);
404 msg->status = htons (status);
405 msg->name_len = htons ((uint16_t) name_len);
406 (void) memcpy (msg->data, name, name_len);
408 (void) memcpy (msg->data + name_len, emsg, strlen (emsg) + 1);
409 GST_queue_message (client, &msg->header);
414 * Sends a barrier failed message
416 * @param barrier the corresponding barrier
417 * @param emsg the error message; should be non-NULL for
418 * status=BARRIER_STATUS_ERROR
421 send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
423 GNUNET_assert (0 != barrier->status);
424 send_client_status_msg (barrier->client, barrier->name,
425 barrier->status, emsg);
431 * Task for sending barrier crossed notifications to waiting client
433 * @param cls the barrier which is crossed
434 * @param tc scheduler task context
437 notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
439 struct Barrier *barrier = cls;
440 struct ClientCtx *client_ctx;
441 struct GNUNET_TESTBED_BarrierStatusMsg *msg;
442 struct GNUNET_MessageHeader *dup_msg;
446 name_len = strlen (barrier->name) + 1;
447 msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len;
448 msg = GNUNET_malloc (msize);
449 msg->header.size = htons (msize);
450 msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
452 msg->name_len = htons (name_len);
453 (void) memcpy (msg->data, barrier->name, name_len);
454 msg->data[name_len] = '\0';
455 while (NULL != (client_ctx = barrier->head))
457 dup_msg = GNUNET_copy_message (&msg->header);
458 queue_message (client_ctx, dup_msg);
459 GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
460 GNUNET_SERVER_client_set_user_context_ (client_ctx->client, NULL, 0);
461 GNUNET_free (client_ctx);
467 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages. This
468 * message should come from peers or a shared helper service using the
469 * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
471 * This handler is queued in the main service and will handle the messages sent
472 * either from the testbed driver or from a high level controller
475 * @param client identification of the client
476 * @param message the actual message
479 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
480 const struct GNUNET_MessageHeader *message)
482 const struct GNUNET_TESTBED_BarrierWait *msg;
483 struct Barrier *barrier;
485 struct ClientCtx *client_ctx;
486 struct GNUNET_HashCode key;
490 msize = ntohs (message->size);
491 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
494 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
497 if (NULL == barrier_map)
500 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
503 msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
504 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
505 name = GNUNET_malloc (name_len + 1);
506 name[name_len] = '\0';
507 (void) memcpy (name, msg->name, name_len);
508 GNUNET_CRYPTO_hash (name, name_len - 1, &key);
509 if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
512 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
516 client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
517 if (NULL == client_ctx)
519 client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
520 client_ctx->client = client;
521 GNUNET_SERVER_client_keep (client);
522 client_ctx->barrier = barrier;
523 GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
525 if (LOCAL_QUORUM_REACHED (barrier))
526 notify_task_cb (barrier, NULL);
528 GNUNET_SERVER_receive_done (client, GNUNET_OK);
533 * Functions with this signature are called whenever a client
534 * is disconnected on the network level.
537 * @param client identification of the client; NULL
538 * for the last call when the server is destroyed
541 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
543 struct ClientCtx *client_ctx;
545 client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
546 if (NULL == client_ctx)
547 return; /* We only set user context for locally
549 cleanup_clientctx (client_ctx);
554 * Function to initialise barrriers component
557 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
559 static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
560 {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
563 struct GNUNET_SERVER_Handle *srv;
565 barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
566 ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
567 GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
568 srv = GNUNET_SERVICE_get_server (ctx);
569 GNUNET_SERVER_add_handlers (srv, message_handlers);
570 GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);
575 * Function to stop the barrier service
580 GNUNET_assert (NULL != barrier_map);
581 GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
582 GNUNET_assert (NULL != ctx);
583 GNUNET_SERVICE_stop (ctx);
588 * Functions of this type are to be given as callback argument to
589 * GNUNET_TESTBED_barrier_init(). The callback will be called when status
590 * information is available for the barrier.
592 * @param cls the closure given to GNUNET_TESTBED_barrier_init()
593 * @param name the name of the barrier
594 * @param barrier the barrier handle
595 * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
596 * GNUNET_SYSERR upon error
597 * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
601 wbarrier_status_cb (void *cls, const char *name,
602 struct GNUNET_TESTBED_Barrier *b_,
603 enum GNUNET_TESTBED_BarrierStatus status,
606 struct WBarrier *wrapper = cls;
607 struct Barrier *barrier = wrapper->barrier;
609 GNUNET_assert (b_ == wrapper->hbarrier);
610 wrapper->hbarrier = NULL;
611 GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
612 GNUNET_free (wrapper);
613 if (BARRIER_STATUS_ERROR == status)
615 LOG (GNUNET_ERROR_TYPE_ERROR,
616 "Initialising barrier `%s' failed at a sub-controller: %s\n",
617 barrier->name, (NULL != emsg) ? emsg : "NULL");
618 cancel_wrappers (barrier);
620 emsg = "Initialisation failed at a sub-controller";
621 barrier->status = BARRIER_STATUS_ERROR;
622 send_barrier_status_msg (barrier, emsg);
627 case BARRIER_STATUS_CROSSED:
628 if (BARRIER_STATUS_INITIALISED != barrier->status)
633 barrier->num_wbarriers_reached++;
634 if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
635 && (LOCAL_QUORUM_REACHED (barrier)))
637 barrier->status = BARRIER_STATUS_CROSSED;
638 send_barrier_status_msg (barrier, NULL);
641 case BARRIER_STATUS_INITIALISED:
642 if (0 != barrier->status)
647 barrier->num_wbarriers_inited++;
648 if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
650 barrier->status = BARRIER_STATUS_INITIALISED;
651 send_barrier_status_msg (barrier, NULL);
654 case BARRIER_STATUS_ERROR:
662 * Function called upon timeout while waiting for a response from the
663 * subcontrollers to barrier init message
669 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
671 struct Barrier *barrier = cls;
674 barrier->timedout = GNUNET_YES;
675 cancel_wrappers (barrier);
676 barrier->status = BARRIER_STATUS_ERROR;
677 send_barrier_status_msg (barrier,
678 "Timedout while propagating barrier initialisation\n");
679 remove_barrier (barrier);
684 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This
685 * message should always come from a parent controller or the testbed API if we
686 * are the root controller.
688 * This handler is queued in the main service and will handle the messages sent
689 * either from the testbed driver or from a high level controller
692 * @param client identification of the client
693 * @param message the actual message
696 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
697 const struct GNUNET_MessageHeader *message)
699 const struct GNUNET_TESTBED_BarrierInit *msg;
701 struct Barrier *barrier;
703 struct WBarrier *wrapper;
704 struct GNUNET_HashCode hash;
709 if (NULL == GST_context)
712 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
715 if (client != GST_context->client)
718 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
721 msize = ntohs (message->size);
722 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
725 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
728 msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
729 name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
730 name = GNUNET_malloc (name_len + 1);
731 (void) memcpy (name, msg->name, name_len);
732 GNUNET_CRYPTO_hash (name, name_len, &hash);
733 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
736 send_client_status_msg (client, name, BARRIER_STATUS_ERROR,
737 "A barrier with the same name already exists");
739 GNUNET_SERVER_receive_done (client, GNUNET_OK);
742 barrier = GNUNET_malloc (sizeof (struct Barrier));
743 (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
744 barrier->quorum = msg->quorum;
745 barrier->name = name;
746 barrier->client = client;
747 GNUNET_SERVER_client_keep (client);
748 GNUNET_assert (GNUNET_OK ==
749 GNUNET_CONTAINER_multihashmap_put (barrier_map,
752 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
753 GNUNET_SERVER_receive_done (client, GNUNET_OK);
754 /* Propagate barrier init to subcontrollers */
755 for (cnt = 0; cnt < GST_slave_list_size; cnt++)
757 if (NULL == (slave = GST_slave_list[cnt]))
759 if (NULL == slave->controller)
761 GNUNET_break (0);/* May happen when we are connecting to the controller */
764 wrapper = GNUNET_malloc (sizeof (struct WBarrier));
765 wrapper->barrier = barrier;
766 GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
767 wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller,
773 if (NULL == barrier->whead) /* No further propagation */
775 barrier->status = BARRIER_STATUS_INITIALISED;
776 send_barrier_status_msg (barrier, NULL);
778 barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
779 &fwd_tout_barrier_init,
785 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages. This
786 * message should always come from a parent controller or the testbed API if we
787 * are the root controller.
789 * This handler is queued in the main service and will handle the messages sent
790 * either from the testbed driver or from a high level controller
793 * @param client identification of the client
794 * @param message the actual message
797 GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
798 const struct GNUNET_MessageHeader *message)
800 const struct GNUNET_TESTBED_BarrierCancel *msg;
802 struct Barrier *barrier;
803 struct GNUNET_HashCode hash;
807 if (NULL == GST_context)
810 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
813 if (client != GST_context->client)
816 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
819 msize = ntohs (message->size);
820 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
823 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
826 msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
827 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
828 name = GNUNET_malloc (name_len + 1);
829 (void) memcpy (name, msg->name, name_len);
830 GNUNET_CRYPTO_hash (name, name_len, &hash);
831 if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
834 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
837 barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash);
838 GNUNET_assert (NULL != barrier);
839 cancel_wrappers (barrier);
840 remove_barrier (barrier);
841 GNUNET_SERVER_receive_done (client, GNUNET_OK);
844 /* end of gnunet-service-testbed_barriers.c */