2 This file is part of GNUnet.
3 (C) 2008--2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file testbed/gnunet-service-testbed_barriers.c
23 * @brief barrier handling at the testbed controller
24 * @author Sree Harsha Totakura <sreeharsha@totakura.in>
27 #include "gnunet-service-testbed.h"
30 * timeout for outgoing message transmissions in seconds
32 #define MESSAGE_SEND_TIMEOUT(s) \
33 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
37 * Test to see if local peers have reached the required quorum of a barrier
39 #define LOCAL_QUORUM_REACHED(barrier) \
40 ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
50 * Message queue for transmitting messages
55 * next pointer for DLL
57 struct MessageQueue *next;
60 * prev pointer for DLL
62 struct MessageQueue *prev;
65 * The message to be sent
67 struct GNUNET_MessageHeader *msg;
72 * Context to be associated with each client
77 * The barrier this client is waiting for
79 struct Barrier *barrier;
84 struct ClientCtx *next;
89 struct ClientCtx *prev;
94 struct GNUNET_SERVER_Client *client;
97 * the transmission handle
99 struct GNUNET_SERVER_TransmitHandle *tx;
104 struct MessageQueue *mq_head;
109 struct MessageQueue *mq_tail;
114 * Wrapper around Barrier handle
121 struct WBarrier *next;
126 struct WBarrier *prev;
129 * The local barrier associated with the creation of this wrapper
131 struct Barrier *barrier;
134 * The barrier handle from API
136 struct GNUNET_TESTBED_Barrier *hbarrier;
139 * Has this barrier been crossed?
151 * The hashcode of the barrier name
153 struct GNUNET_HashCode hash;
156 * The client handle to the master controller
158 struct GNUNET_SERVER_Client *client;
161 * The name of the barrier
166 * DLL head for the list of clients waiting for this barrier
168 struct ClientCtx *head;
171 * DLL tail for the list of clients waiting for this barrier
173 struct ClientCtx *tail;
176 * DLL head for the list of barrier handles
178 struct WBarrier *whead;
181 * DLL tail for the list of barrier handles
183 struct WBarrier *wtail;
186 * Identifier for the timeout task
188 GNUNET_SCHEDULER_TaskIdentifier tout_task;
191 * The status of this barrier
193 enum GNUNET_TESTBED_BarrierStatus status;
196 * Number of barriers wrapped in the above DLL
198 unsigned int num_wbarriers;
201 * Number of wrapped barriers reached so far
203 unsigned int num_wbarriers_reached;
206 * Number of wrapped barrier initialised so far
208 unsigned int num_wbarriers_inited;
211 * Number of peers which have reached this barrier
213 unsigned int nreached;
216 * Number of slaves we have initialised this barrier
218 unsigned int nslaves;
221 * Quorum percentage to be reached
226 * Was there a timeout while propagating initialisation
233 * Hashtable handle for storing initialised barriers
235 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
240 static struct GNUNET_SERVICE_Context *ctx;
244 * Function called to notify a client about the connection
245 * begin ready to queue more data. "buf" will be
246 * NULL and "size" zero if the connection was closed for
247 * writing in the meantime.
249 * @param cls client context
250 * @param size number of bytes available in buf
251 * @param buf where the callee should write the message
252 * @return number of bytes written to buf
255 transmit_ready_cb (void *cls, size_t size, void *buf)
257 struct ClientCtx *ctx = cls;
258 struct GNUNET_SERVER_Client *client = ctx->client;
259 struct MessageQueue *mq;
260 struct GNUNET_MessageHeader *msg;
265 if ((0 == size) || (NULL == buf))
267 GNUNET_assert (NULL != ctx->client);
268 GNUNET_SERVER_client_drop (ctx->client);
274 wrote = ntohs (msg->size);
275 GNUNET_assert (size >= wrote);
276 (void) memcpy (buf, msg, wrote);
277 GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
278 GNUNET_free (mq->msg);
280 if (NULL != (mq = ctx->mq_head))
281 ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
282 MESSAGE_SEND_TIMEOUT (30),
283 &transmit_ready_cb, ctx);
289 * Queue a message into a clients message queue
291 * @param ctx the context associated with the client
292 * @param msg the message to queue. Will be consumed
295 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
297 struct MessageQueue *mq;
298 struct GNUNET_SERVER_Client *client = ctx->client;
300 mq = GNUNET_malloc (sizeof (struct MessageQueue));
302 GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
304 ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
305 MESSAGE_SEND_TIMEOUT (30),
306 &transmit_ready_cb, ctx);
311 * Function to remove a barrier from the barrier map and cleanup resources
312 * occupied by a barrier
314 * @param barrier the barrier handle
317 remove_barrier (struct Barrier *barrier)
319 GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
322 GNUNET_free (barrier->name);
323 if (NULL != barrier->client)
324 GNUNET_SERVER_client_drop (barrier->client);
325 GNUNET_free (barrier);
330 * Cancels all subcontroller barrier handles
332 * @param barrier the local barrier
335 cancel_wrappers (struct Barrier *barrier)
337 struct WBarrier *wrapper;
339 while (NULL != (wrapper = barrier->whead))
341 GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
342 GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
343 GNUNET_free (wrapper);
349 * Sends a barrier failed message
351 * @param barrier the corresponding barrier
352 * @param status the status of the barrier
353 * @param emsg the error message; should be non-NULL for
354 * status=BARRIER_STATUS_ERROR
357 send_barrier_status_msg (struct Barrier *barrier,
358 enum GNUNET_TESTBED_BarrierStatus status,
361 struct GNUNET_TESTBED_BarrierStatusMsg *msg;
365 GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status));
366 name_len = strlen (barrier->name) + 1;
367 msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
369 + (NULL == emsg) ? 0 : strlen (emsg) + 1;
370 msg = GNUNET_malloc (msize);
371 msg->status = htons (status);
372 msg->name_len = htons (name_len);
373 (void) memcpy (msg->data, barrier->name, name_len);
375 (void) memcpy (msg->data + name_len, emsg, strlen (emsg) + 1);
376 GST_queue_message (barrier->client, &msg->header);
382 * Task for sending barrier crossed notifications to waiting client
384 * @param cls the barrier which is crossed
385 * @param tc scheduler task context
388 notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
390 struct Barrier *barrier = cls;
391 struct ClientCtx *client_ctx;
392 struct GNUNET_TESTBED_BarrierStatusMsg *msg;
393 struct GNUNET_MessageHeader *dup_msg;
397 name_len = strlen (barrier->name) + 1;
398 msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len;
399 msg = GNUNET_malloc (msize);
400 msg->header.size = htons (msize);
401 msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
403 msg->name_len = htons (name_len);
404 (void) memcpy (msg->data, barrier->name, name_len);
405 msg->data[name_len] = '\0';
406 while (NULL != (client_ctx = barrier->head))
408 dup_msg = GNUNET_copy_message (&msg->header);
409 queue_message (client_ctx, dup_msg);
410 GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
411 GNUNET_SERVER_client_set_user_context_ (client_ctx->client, NULL, 0);
412 GNUNET_free (client_ctx);
418 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages. This
419 * message should come from peers or a shared helper service using the
420 * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
422 * This handler is queued in the main service and will handle the messages sent
423 * either from the testbed driver or from a high level controller
426 * @param client identification of the client
427 * @param message the actual message
430 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
431 const struct GNUNET_MessageHeader *message)
433 const struct GNUNET_TESTBED_BarrierWait *msg;
434 struct Barrier *barrier;
436 struct ClientCtx *client_ctx;
437 struct GNUNET_HashCode key;
441 msize = ntohs (message->size);
442 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
445 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
448 if (NULL == barrier_map)
451 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
454 msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
455 name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
456 name = GNUNET_malloc (name_len + 1);
457 name[name_len] = '\0';
458 (void) memcpy (name, msg->name, name_len);
459 GNUNET_CRYPTO_hash (name, name_len - 1, &key);
460 if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
463 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
467 client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
468 if (NULL == client_ctx)
470 client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
471 client_ctx->client = client;
472 GNUNET_SERVER_client_keep (client);
473 client_ctx->barrier = barrier;
474 GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
476 if (LOCAL_QUORUM_REACHED (barrier))
477 notify_task_cb (barrier, NULL);
479 GNUNET_SERVER_receive_done (client, GNUNET_OK);
484 * Functions with this signature are called whenever a client
485 * is disconnected on the network level.
488 * @param client identification of the client; NULL
489 * for the last call when the server is destroyed
492 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
494 struct ClientCtx *client_ctx;
495 struct Barrier *barrier;
497 client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
498 if (NULL == client_ctx)
500 barrier = client_ctx->barrier;
501 GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
502 if (NULL != client_ctx->tx)
503 GNUNET_SERVER_notify_transmit_ready_cancel (client_ctx->tx);
509 * Function to initialise barrriers component
512 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
514 static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
515 {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
518 struct GNUNET_SERVER_Handle *srv;
520 barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
521 ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
522 GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
523 srv = GNUNET_SERVICE_get_server (ctx);
524 GNUNET_SERVER_add_handlers (srv, message_handlers);
525 GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);
530 * Function to stop the barrier service
535 GNUNET_assert (NULL != barrier_map);
536 GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
537 GNUNET_assert (NULL != ctx);
538 GNUNET_SERVICE_stop (ctx);
543 * Functions of this type are to be given as callback argument to
544 * GNUNET_TESTBED_barrier_init(). The callback will be called when status
545 * information is available for the barrier.
547 * @param cls the closure given to GNUNET_TESTBED_barrier_init()
548 * @param name the name of the barrier
549 * @param barrier the barrier handle
550 * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
551 * GNUNET_SYSERR upon error
552 * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
556 wbarrier_status_cb (void *cls, const char *name,
557 struct GNUNET_TESTBED_Barrier *b_,
558 enum GNUNET_TESTBED_BarrierStatus status,
561 struct WBarrier *wrapper = cls;
562 struct Barrier *barrier = wrapper->barrier;
564 GNUNET_assert (b_ == wrapper->hbarrier);
565 wrapper->hbarrier = NULL;
566 GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
567 GNUNET_free (wrapper);
568 if (BARRIER_STATUS_ERROR == status)
570 LOG (GNUNET_ERROR_TYPE_ERROR,
571 "Initialising barrier `%s' failed at a sub-controller: %s\n",
572 barrier->name, (NULL != emsg) ? emsg : "NULL");
573 cancel_wrappers (barrier);
575 emsg = "Initialisation failed at a sub-controller";
576 send_barrier_status_msg (barrier, BARRIER_STATUS_ERROR, emsg);
581 case BARRIER_STATUS_CROSSED:
582 barrier->num_wbarriers_reached++;
583 if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
584 && (LOCAL_QUORUM_REACHED (barrier)))
585 send_barrier_status_msg (barrier, BARRIER_STATUS_CROSSED, NULL);
587 case BARRIER_STATUS_INITIALISED:
588 barrier->num_wbarriers_inited++;
589 if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
590 send_barrier_status_msg (barrier, BARRIER_STATUS_INITIALISED, NULL);
592 case BARRIER_STATUS_ERROR:
600 * Function called upon timeout while waiting for a response from the
601 * subcontrollers to barrier init message
607 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
609 struct Barrier *barrier = cls;
612 barrier->timedout = GNUNET_YES;
613 cancel_wrappers (barrier);
614 send_barrier_status_msg (barrier, BARRIER_STATUS_ERROR,
615 "Timedout while propagating barrier initialisation\n");
616 remove_barrier (barrier);
621 * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This
622 * message should always come from a parent controller or the testbed API if we
623 * are the root controller.
625 * This handler is queued in the main service and will handle the messages sent
626 * either from the testbed driver or from a high level controller
629 * @param client identification of the client
630 * @param message the actual message
633 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
634 const struct GNUNET_MessageHeader *message)
636 const struct GNUNET_TESTBED_BarrierInit *msg;
638 struct Barrier *barrier;
640 struct WBarrier *wrapper;
641 struct GNUNET_HashCode hash;
647 if (NULL == GST_context)
650 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
653 if (client != GST_context->client)
656 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
659 msize = ntohs (message->size);
660 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
663 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
666 msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
667 op_id = GNUNET_ntohll (msg->op_id);
669 name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
670 GNUNET_CRYPTO_hash (name, name_len, &hash);
671 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
673 GST_send_operation_fail_msg (client, op_id, "Barrier already initialised");
674 GNUNET_SERVER_receive_done (client, GNUNET_OK);
677 barrier = GNUNET_malloc (sizeof (struct Barrier));
678 (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
679 barrier->quorum = msg->quorum;
680 barrier->name = GNUNET_malloc (name_len + 1);
681 barrier->name[name_len] = '\0';
682 (void) memcpy (barrier->name, name, name_len);
683 barrier->client = client;
684 GNUNET_SERVER_client_keep (client);
685 GNUNET_assert (GNUNET_OK ==
686 GNUNET_CONTAINER_multihashmap_put (barrier_map,
689 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
690 GNUNET_SERVER_receive_done (client, GNUNET_OK);
691 /* Propagate barrier init to subcontrollers */
692 for (cnt = 0; cnt < GST_slave_list_size; cnt++)
694 if (NULL == (slave = GST_slave_list[cnt]))
696 if (NULL == slave->controller)
698 GNUNET_break (0);/* May happen when we are connecting to the controller */
701 wrapper = GNUNET_malloc (sizeof (struct WBarrier));
702 wrapper->barrier = barrier;
703 GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
704 wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller,
710 if (NULL == barrier->whead) /* No further propagation */
711 send_barrier_status_msg (barrier, BARRIER_STATUS_INITIALISED, NULL);
713 barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
714 &fwd_tout_barrier_init,
718 /* end of gnunet-service-testbed_barriers.c */