663c090b6326f6b911875de4cf911cde2950ad77
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_barriers.c
1 /*
2   This file is part of GNUnet.
3   (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file testbed/gnunet-service-testbed_barriers.c
23  * @brief barrier handling at the testbed controller
24  * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
25  */
26
27 #include "gnunet-service-testbed.h"
28 #include "gnunet-service-testbed_barriers.h"
29
30
31 /**
32  * timeout for outgoing message transmissions in seconds
33  */
34 #define MESSAGE_SEND_TIMEOUT(s) \
35   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
36
37
38 /**
39  * Test to see if local peers have reached the required quorum of a barrier
40  */
41 #define LOCAL_QUORUM_REACHED(barrier)           \
42   ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
43
44
45 #ifdef LOG
46 #undef LOG
47 #endif
48
49 /**
50  * Logging shorthand
51  */
52 #define LOG(kind,...)                                           \
53   GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__)
54
55
56 /**
57  * Barrier
58  */
59 struct Barrier;
60
61
62 /**
63  * Message queue for transmitting messages
64  */
65 struct MessageQueue
66 {
67   /**
68    * next pointer for DLL
69    */
70   struct MessageQueue *next;
71
72   /**
73    * prev pointer for DLL
74    */
75   struct MessageQueue *prev;
76
77   /**
78    * The message to be sent
79    */
80   struct GNUNET_MessageHeader *msg;
81 };
82
83
84 /**
85  * Context to be associated with each client
86  */
87 struct ClientCtx
88 {
89   /**
90    * The barrier this client is waiting for
91    */
92   struct Barrier *barrier;
93
94   /**
95    * DLL next ptr
96    */
97   struct ClientCtx *next;
98
99   /**
100    * DLL prev ptr
101    */
102   struct ClientCtx *prev;
103
104   /**
105    * The client handle
106    */
107   struct GNUNET_SERVER_Client *client;
108
109   /**
110    * the transmission handle
111    */
112   struct GNUNET_SERVER_TransmitHandle *tx;
113
114   /**
115    * message queue head
116    */
117   struct MessageQueue *mq_head;
118
119   /**
120    * message queue tail
121    */
122   struct MessageQueue *mq_tail;
123 };
124
125
126 /**
127  * Wrapper around Barrier handle
128  */
129 struct WBarrier
130 {
131   /**
132    * DLL next pointer
133    */
134   struct WBarrier *next;
135
136   /**
137    * DLL prev pointer
138    */
139   struct WBarrier *prev;
140
141   /**
142    * The local barrier associated with the creation of this wrapper
143    */
144   struct Barrier *barrier;
145
146   /**
147    * The barrier handle from API
148    */
149   struct GNUNET_TESTBED_Barrier *hbarrier;
150
151   /**
152    * Has this barrier been crossed?
153    */
154   uint8_t reached;
155 };
156
157
158 /**
159  * Barrier
160  */
161 struct Barrier
162 {
163   /**
164    * The hashcode of the barrier name
165    */
166   struct GNUNET_HashCode hash;
167
168   /**
169    * The client handle to the master controller
170    */
171   struct GNUNET_SERVER_Client *mc;
172
173   /**
174    * The name of the barrier
175    */
176   char *name;
177
178   /**
179    * DLL head for the list of clients waiting for this barrier
180    */
181   struct ClientCtx *head;
182
183   /**
184    * DLL tail for the list of clients waiting for this barrier
185    */
186   struct ClientCtx *tail;
187
188   /**
189    * DLL head for the list of barrier handles
190    */
191   struct WBarrier *whead;
192
193   /**
194    * DLL tail for the list of barrier handles
195    */
196   struct WBarrier *wtail;
197
198   /**
199    * Identifier for the timeout task
200    */
201   GNUNET_SCHEDULER_TaskIdentifier tout_task;
202   
203   /**
204    * The status of this barrier
205    */
206   enum GNUNET_TESTBED_BarrierStatus status;
207   
208   /**
209    * Number of barriers wrapped in the above DLL
210    */
211   unsigned int num_wbarriers;
212
213   /**
214    * Number of wrapped barriers reached so far
215    */
216   unsigned int num_wbarriers_reached;
217
218   /**
219    * Number of wrapped barrier initialised so far
220    */
221   unsigned int num_wbarriers_inited;
222
223   /**
224    * Number of peers which have reached this barrier
225    */
226   unsigned int nreached;
227
228   /**
229    * Number of slaves we have initialised this barrier
230    */
231   unsigned int nslaves;
232
233   /**
234    * Quorum percentage to be reached
235    */
236   uint8_t quorum;
237   
238 };
239
240
241 /**
242  * Hashtable handle for storing initialised barriers
243  */
244 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
245
246 /**
247  * Service context
248  */
249 static struct GNUNET_SERVICE_Context *ctx;
250
251
252 /**
253  * Function called to notify a client about the connection
254  * begin ready to queue more data.  "buf" will be
255  * NULL and "size" zero if the connection was closed for
256  * writing in the meantime.
257  *
258  * @param cls client context
259  * @param size number of bytes available in buf
260  * @param buf where the callee should write the message
261  * @return number of bytes written to buf
262  */
263 static size_t 
264 transmit_ready_cb (void *cls, size_t size, void *buf)
265 {
266   struct ClientCtx *ctx = cls;
267   struct GNUNET_SERVER_Client *client = ctx->client;
268   struct MessageQueue *mq;
269   struct GNUNET_MessageHeader *msg;
270   size_t wrote;
271
272   ctx->tx = NULL;
273   wrote = 0;
274   if ((0 == size) || (NULL == buf))
275   {
276     GNUNET_assert (NULL != ctx->client);
277     GNUNET_SERVER_client_drop (ctx->client);
278     ctx->client = NULL;    
279     return 0;
280   }
281   mq = ctx->mq_head;
282   msg = mq->msg;
283   wrote = ntohs (msg->size);
284   GNUNET_assert (size >= wrote);
285   (void) memcpy (buf, msg, wrote);
286   GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
287   GNUNET_free (mq->msg);
288   GNUNET_free (mq);
289   if (NULL != (mq = ctx->mq_head))
290     ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
291                                                   MESSAGE_SEND_TIMEOUT (30),
292                                                   &transmit_ready_cb, ctx);
293   return wrote;
294 }
295
296
297 /**
298  * Queue a message into a clients message queue
299  *
300  * @param ctx the context associated with the client
301  * @param msg the message to queue.  Will be consumed
302  */
303 static void
304 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
305 {
306   struct MessageQueue *mq;
307   struct GNUNET_SERVER_Client *client = ctx->client;
308   
309   mq = GNUNET_malloc (sizeof (struct MessageQueue));
310   mq->msg = msg;
311   LOG_DEBUG ("Queueing message of type %u, size %u for sending\n",
312              ntohs (msg->type), ntohs (msg->size));
313   GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
314   if (NULL == ctx->tx)
315    ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
316                                                   MESSAGE_SEND_TIMEOUT (30),
317                                                   &transmit_ready_cb, ctx);
318 }
319
320
321 /**
322  * Function to cleanup client context data structure
323  *
324  * @param ctx the client context data structure
325  */
326 static void
327 cleanup_clientctx (struct ClientCtx *ctx)
328 {
329   struct MessageQueue *mq;
330   
331   GNUNET_SERVER_client_drop (ctx->client);
332   if (NULL != ctx->tx)
333     GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
334   if (NULL != (mq = ctx->mq_head))
335   {
336     GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
337     GNUNET_free (mq->msg);
338     GNUNET_free (mq);
339   }
340   GNUNET_free (ctx);
341 }
342
343
344 /**
345  * Function to remove a barrier from the barrier map and cleanup resources
346  * occupied by a barrier
347  *
348  * @param barrier the barrier handle
349  */
350 static void
351 remove_barrier (struct Barrier *barrier)
352 {
353   struct ClientCtx *ctx;
354   
355   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
356                                                                     &barrier->hash,
357                                                                     barrier));
358   while (NULL != (ctx = barrier->head))
359   {
360     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx);
361     cleanup_clientctx (ctx);
362   }
363   GNUNET_free (barrier->name);
364   GNUNET_SERVER_client_drop (barrier->mc);
365   GNUNET_free (barrier);
366 }
367
368
369 /**
370  * Cancels all subcontroller barrier handles
371  *
372  * @param barrier the local barrier
373  */
374 static void
375 cancel_wrappers (struct Barrier *barrier)
376 {
377   struct WBarrier *wrapper;
378
379   while (NULL != (wrapper = barrier->whead))
380   {
381     GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
382     GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
383     GNUNET_free (wrapper);
384   }
385 }
386
387
388 /**
389  * Send a status message about a barrier to the given client
390  *
391  * @param client the client to send the message to
392  * @param name the barrier name
393  * @param status the status of the barrier
394  * @param emsg the error message; should be non-NULL for
395  *   status=BARRIER_STATUS_ERROR 
396  */
397 static void
398 send_client_status_msg (struct GNUNET_SERVER_Client *client,
399                         const char *name,
400                         enum GNUNET_TESTBED_BarrierStatus status,
401                         const char *emsg)
402 {
403   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
404   size_t name_len;
405   uint16_t msize;
406
407   GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status));
408   name_len = strlen (name);
409   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
410       + (name_len + 1)
411       + ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
412   msg = GNUNET_malloc (msize);
413   msg->header.size = htons (msize);
414   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
415   msg->status = htons (status);
416   msg->name_len = htons ((uint16_t) name_len);
417   (void) memcpy (msg->data, name, name_len);
418   if (NULL != emsg)
419     (void) memcpy (msg->data + name_len + 1, emsg, strlen (emsg));
420   GST_queue_message (client, &msg->header);
421 }
422
423
424 /**
425  * Sends a barrier failed message
426  *
427  * @param barrier the corresponding barrier
428  * @param emsg the error message; should be non-NULL for
429  *   status=BARRIER_STATUS_ERROR 
430  */
431 static void
432 send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
433 {
434   GNUNET_assert (0 != barrier->status);
435   send_client_status_msg (barrier->mc, barrier->name, barrier->status, emsg);
436 }
437
438
439
440 /**
441  * Task for sending barrier crossed notifications to waiting client
442  *
443  * @param cls the barrier which is crossed
444  * @param tc scheduler task context
445  */
446 static void
447 notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
448 {
449   struct Barrier *barrier = cls;
450   struct ClientCtx *client_ctx;
451   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
452   struct GNUNET_MessageHeader *dup_msg;
453   uint16_t name_len;
454   uint16_t msize;
455
456   name_len = strlen (barrier->name) + 1;
457   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len;  
458   msg = GNUNET_malloc (msize);
459   msg->header.size = htons (msize);
460   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
461   msg->status = htons (BARRIER_STATUS_CROSSED);
462   msg->name_len = htons (name_len);
463   (void) memcpy (msg->data, barrier->name, name_len);
464   msg->data[name_len] = '\0';
465   while (NULL != (client_ctx = barrier->head))
466   {
467     dup_msg = GNUNET_copy_message (&msg->header);
468     queue_message (client_ctx, dup_msg);
469     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
470   }
471 }
472
473
474 /**
475  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.  This
476  * message should come from peers or a shared helper service using the
477  * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
478  *
479  * This handler is queued in the main service and will handle the messages sent
480  * either from the testbed driver or from a high level controller
481  *
482  * @param cls NULL
483  * @param client identification of the client
484  * @param message the actual message
485  */
486 static void
487 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
488                      const struct GNUNET_MessageHeader *message)
489 {
490   const struct GNUNET_TESTBED_BarrierWait *msg;
491   struct Barrier *barrier;
492   char *name;
493   struct ClientCtx *client_ctx;
494   struct GNUNET_HashCode key;
495   size_t name_len;
496   uint16_t msize;
497   
498   msize = ntohs (message->size);
499   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
500   {
501     GNUNET_break_op (0);
502     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
503     return;
504   }
505   if (NULL == barrier_map)
506   {
507     GNUNET_break (0);
508     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
509     return;
510   }
511   msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
512   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
513   name = GNUNET_malloc (name_len + 1);
514   name[name_len] = '\0';
515   (void) memcpy (name, msg->name, name_len);
516   LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n", name);
517   GNUNET_CRYPTO_hash (name, name_len, &key);
518   if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
519   {
520     GNUNET_break (0);
521     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
522     GNUNET_free (name);
523     return;
524   }
525   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
526   if (NULL == client_ctx)
527   {
528     client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
529     client_ctx->client = client;
530     GNUNET_SERVER_client_keep (client);
531     client_ctx->barrier = barrier;
532     GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
533     GNUNET_SERVER_client_set_user_context (client, client_ctx); 
534   }
535   barrier->nreached++;
536   if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
537         && (LOCAL_QUORUM_REACHED (barrier)))
538   {
539     barrier->status = BARRIER_STATUS_CROSSED;
540     send_barrier_status_msg (barrier, NULL);
541     notify_task_cb (barrier, NULL);
542   }
543   GNUNET_SERVER_receive_done (client, GNUNET_OK);
544 }
545
546
547 /**
548  * Functions with this signature are called whenever a client
549  * is disconnected on the network level.
550  *
551  * @param cls closure
552  * @param client identification of the client; NULL
553  *        for the last call when the server is destroyed
554  */
555 static void
556 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
557 {
558   struct ClientCtx *client_ctx;
559   
560   if (NULL == client)
561     return;
562   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
563   if (NULL == client_ctx)
564     return;                     /* We only set user context for locally
565                                    connected clients */
566   cleanup_clientctx (client_ctx);
567 }
568
569
570 /**
571  * Function to initialise barrriers component
572  *
573  * @param cfg the configuration to use for initialisation
574  */
575 void
576 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
577 {
578   static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
579     {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
580     {NULL, NULL, 0, 0}
581   };
582   struct GNUNET_SERVER_Handle *srv;
583
584   barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
585   ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
586                               GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
587   srv = GNUNET_SERVICE_get_server (ctx);
588   GNUNET_SERVER_add_handlers (srv, message_handlers);
589   GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);  
590 }
591
592
593 /**
594  * Function to stop the barrier service
595  */
596 void
597 GST_barriers_destroy ()
598 {
599   GNUNET_assert (NULL != barrier_map);
600   GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
601   GNUNET_assert (NULL != ctx);
602   GNUNET_SERVICE_stop (ctx);
603 }
604
605
606 /**
607  * Functions of this type are to be given as callback argument to
608  * GNUNET_TESTBED_barrier_init().  The callback will be called when status
609  * information is available for the barrier.
610  *
611  * @param cls the closure given to GNUNET_TESTBED_barrier_init()
612  * @param name the name of the barrier
613  * @param b_ the barrier handle
614  * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
615  *   GNUNET_SYSERR upon error
616  * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
617  *   error messsage
618  */
619 static void 
620 wbarrier_status_cb (void *cls, const char *name,
621                     struct GNUNET_TESTBED_Barrier *b_,
622                     enum GNUNET_TESTBED_BarrierStatus status,
623                     const char *emsg)
624 {
625   struct WBarrier *wrapper = cls;
626   struct Barrier *barrier = wrapper->barrier;
627
628   GNUNET_assert (b_ == wrapper->hbarrier);
629   wrapper->hbarrier = NULL;
630   GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
631   GNUNET_free (wrapper);
632   switch (status)
633   {
634   case BARRIER_STATUS_ERROR:
635     LOG (GNUNET_ERROR_TYPE_ERROR,
636          "Initialising barrier `%s' failed at a sub-controller: %s\n",
637          barrier->name, (NULL != emsg) ? emsg : "NULL");
638     cancel_wrappers (barrier);
639     if (NULL == emsg)
640       emsg = "Initialisation failed at a sub-controller";
641     barrier->status = BARRIER_STATUS_ERROR;
642     send_barrier_status_msg (barrier, emsg);
643     return;
644   case BARRIER_STATUS_CROSSED:
645     if (BARRIER_STATUS_INITIALISED != barrier->status)
646     {
647       GNUNET_break_op (0);
648       return;
649     }
650     barrier->num_wbarriers_reached++;
651     if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
652         && (LOCAL_QUORUM_REACHED (barrier)))
653     {
654       barrier->status = BARRIER_STATUS_CROSSED;
655       send_barrier_status_msg (barrier, NULL);
656     }
657     return;
658   case BARRIER_STATUS_INITIALISED:
659     if (0 != barrier->status)
660     {
661       GNUNET_break_op (0);
662       return;
663     }
664     barrier->num_wbarriers_inited++;
665     if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
666     {
667       barrier->status = BARRIER_STATUS_INITIALISED;
668       send_barrier_status_msg (barrier, NULL);
669     }
670     return;
671   }
672 }
673
674
675 /**
676  * Function called upon timeout while waiting for a response from the
677  * subcontrollers to barrier init message
678  *
679  * @param cls barrier
680  * @param tc scheduler task context
681  */
682 static void
683 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
684 {
685   struct Barrier *barrier = cls;
686   
687   cancel_wrappers (barrier);
688   barrier->status = BARRIER_STATUS_ERROR;
689   send_barrier_status_msg (barrier,
690                            "Timedout while propagating barrier initialisation\n");
691   remove_barrier (barrier);
692 }
693
694
695 /**
696  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.  This
697  * message should always come from a parent controller or the testbed API if we
698  * are the root controller.
699  *
700  * This handler is queued in the main service and will handle the messages sent
701  * either from the testbed driver or from a high level controller
702  *
703  * @param cls NULL
704  * @param client identification of the client
705  * @param message the actual message
706  */
707 void
708 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
709                          const struct GNUNET_MessageHeader *message)
710 {
711   const struct GNUNET_TESTBED_BarrierInit *msg;
712   char *name;
713   struct Barrier *barrier;
714   struct Slave *slave;
715   struct WBarrier *wrapper;
716   struct GNUNET_HashCode hash;
717   size_t name_len;
718   unsigned int cnt;
719   uint16_t msize;
720   
721   if (NULL == GST_context)
722   {
723     GNUNET_break_op (0);
724     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
725     return;
726   }
727   if (client != GST_context->client)
728   {
729     GNUNET_break_op (0);
730     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
731     return;
732   }
733   msize = ntohs (message->size);
734   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
735   {
736     GNUNET_break_op (0);
737     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
738     return;
739   }
740   msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
741   name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
742   name = GNUNET_malloc (name_len + 1);
743   (void) memcpy (name, msg->name, name_len);
744   GNUNET_CRYPTO_hash (name, name_len, &hash);
745   LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n", name);
746   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
747   {
748     
749     send_client_status_msg (client, name, BARRIER_STATUS_ERROR,
750                             "A barrier with the same name already exists");
751     GNUNET_free (name);
752     GNUNET_SERVER_receive_done (client, GNUNET_OK);
753     return;
754   }
755   barrier = GNUNET_malloc (sizeof (struct Barrier));
756   (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
757   barrier->quorum = msg->quorum;
758   barrier->name = name;
759   barrier->mc = client;
760   GNUNET_SERVER_client_keep (client);
761   GNUNET_assert (GNUNET_OK ==
762                  GNUNET_CONTAINER_multihashmap_put (barrier_map,
763                                                     &barrier->hash,
764                                                     barrier,
765                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
766   GNUNET_SERVER_receive_done (client, GNUNET_OK);
767   /* Propagate barrier init to subcontrollers */
768   for (cnt = 0; cnt < GST_slave_list_size; cnt++)
769   {
770     if (NULL == (slave = GST_slave_list[cnt]))
771       continue;
772     if (NULL == slave->controller)
773     {
774       GNUNET_break (0);/* May happen when we are connecting to the controller */
775       continue;
776     }
777     wrapper = GNUNET_malloc (sizeof (struct WBarrier));
778     wrapper->barrier = barrier;
779     GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
780     wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller,
781                                                      barrier->name,
782                                                      barrier->quorum,
783                                                      &wbarrier_status_cb,
784                                                      wrapper);    
785   }
786   if (NULL == barrier->whead)   /* No further propagation */
787   {
788     barrier->status = BARRIER_STATUS_INITIALISED;
789     LOG_DEBUG ("Sending BARRIER_STATUS_INITIALISED for barrier `%s'\n",
790                barrier->name);
791     send_barrier_status_msg (barrier, NULL);
792   }else
793     barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
794                                                        &fwd_tout_barrier_init,
795                                                        barrier);
796 }
797
798
799 /**
800  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.  This
801  * message should always come from a parent controller or the testbed API if we
802  * are the root controller.
803  *
804  * This handler is queued in the main service and will handle the messages sent
805  * either from the testbed driver or from a high level controller
806  *
807  * @param cls NULL
808  * @param client identification of the client
809  * @param message the actual message
810  */
811 void
812 GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
813                            const struct GNUNET_MessageHeader *message)
814 {
815   const struct GNUNET_TESTBED_BarrierCancel *msg;
816   char *name;
817   struct Barrier *barrier;
818   struct GNUNET_HashCode hash;
819   size_t name_len;
820   uint16_t msize;
821
822   if (NULL == GST_context)
823   {
824     GNUNET_break_op (0);
825     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
826     return;
827   }  
828   if (client != GST_context->client)
829   {
830     GNUNET_break_op (0);
831     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
832     return;
833   }
834   msize = ntohs (message->size);
835   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
836   {
837     GNUNET_break_op (0);
838     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
839     return;
840   }
841   msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
842   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
843   name = GNUNET_malloc (name_len + 1);
844   (void) memcpy (name, msg->name, name_len);
845   GNUNET_CRYPTO_hash (name, name_len, &hash);
846   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
847   {
848     GNUNET_break_op (0);
849     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
850     return;
851   }
852   barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash);
853   GNUNET_assert (NULL != barrier);
854   cancel_wrappers (barrier);
855   remove_barrier (barrier);
856   GNUNET_SERVER_receive_done (client, GNUNET_OK);  
857 }
858
859 /* end of gnunet-service-testbed_barriers.c */