- barrier wait API
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_barriers.c
1 /*
2   This file is part of GNUnet.
3   (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file testbed/gnunet-service-testbed_barriers.c
23  * @brief barrier handling at the testbed controller
24  * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
25  */
26
27 #include "gnunet-service-testbed.h"
28 #include "gnunet-service-testbed_barriers.h"
29
30
31 /**
32  * timeout for outgoing message transmissions in seconds
33  */
34 #define MESSAGE_SEND_TIMEOUT(s) \
35   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
36
37
38 /**
39  * Test to see if local peers have reached the required quorum of a barrier
40  */
41 #define LOCAL_QUORUM_REACHED(barrier)           \
42   ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
43
44
45 /**
46  * Barrier
47  */
48 struct Barrier;
49
50
51 /**
52  * Message queue for transmitting messages
53  */
54 struct MessageQueue
55 {
56   /**
57    * next pointer for DLL
58    */
59   struct MessageQueue *next;
60
61   /**
62    * prev pointer for DLL
63    */
64   struct MessageQueue *prev;
65
66   /**
67    * The message to be sent
68    */
69   struct GNUNET_MessageHeader *msg;
70 };
71
72
73 /**
74  * Context to be associated with each client
75  */
76 struct ClientCtx
77 {
78   /**
79    * The barrier this client is waiting for
80    */
81   struct Barrier *barrier;
82
83   /**
84    * DLL next ptr
85    */
86   struct ClientCtx *next;
87
88   /**
89    * DLL prev ptr
90    */
91   struct ClientCtx *prev;
92
93   /**
94    * The client handle
95    */
96   struct GNUNET_SERVER_Client *client;
97
98   /**
99    * the transmission handle
100    */
101   struct GNUNET_SERVER_TransmitHandle *tx;
102
103   /**
104    * message queue head
105    */
106   struct MessageQueue *mq_head;
107
108   /**
109    * message queue tail
110    */
111   struct MessageQueue *mq_tail;
112 };
113
114
115 /**
116  * Wrapper around Barrier handle
117  */
118 struct WBarrier
119 {
120   /**
121    * DLL next pointer
122    */
123   struct WBarrier *next;
124
125   /**
126    * DLL prev pointer
127    */
128   struct WBarrier *prev;
129
130   /**
131    * The local barrier associated with the creation of this wrapper
132    */
133   struct Barrier *barrier;
134
135   /**
136    * The barrier handle from API
137    */
138   struct GNUNET_TESTBED_Barrier *hbarrier;
139
140   /**
141    * Has this barrier been crossed?
142    */
143   uint8_t reached;
144 };
145
146
147 /**
148  * Barrier
149  */
150 struct Barrier
151 {
152   /**
153    * The hashcode of the barrier name
154    */
155   struct GNUNET_HashCode hash;
156
157   /**
158    * The client handle to the master controller
159    */
160   struct GNUNET_SERVER_Client *client;
161
162   /**
163    * The name of the barrier
164    */
165   char *name;
166
167   /**
168    * DLL head for the list of clients waiting for this barrier
169    */
170   struct ClientCtx *head;
171
172   /**
173    * DLL tail for the list of clients waiting for this barrier
174    */
175   struct ClientCtx *tail;
176
177   /**
178    * DLL head for the list of barrier handles
179    */
180   struct WBarrier *whead;
181
182   /**
183    * DLL tail for the list of barrier handles
184    */
185   struct WBarrier *wtail;
186
187   /**
188    * Identifier for the timeout task
189    */
190   GNUNET_SCHEDULER_TaskIdentifier tout_task;
191   
192   /**
193    * The status of this barrier
194    */
195   enum GNUNET_TESTBED_BarrierStatus status;
196   
197   /**
198    * Number of barriers wrapped in the above DLL
199    */
200   unsigned int num_wbarriers;
201
202   /**
203    * Number of wrapped barriers reached so far
204    */
205   unsigned int num_wbarriers_reached;
206
207   /**
208    * Number of wrapped barrier initialised so far
209    */
210   unsigned int num_wbarriers_inited;
211
212   /**
213    * Number of peers which have reached this barrier
214    */
215   unsigned int nreached;
216
217   /**
218    * Number of slaves we have initialised this barrier
219    */
220   unsigned int nslaves;
221
222   /**
223    * Quorum percentage to be reached
224    */
225   uint8_t quorum;
226   
227 };
228
229
230 /**
231  * Hashtable handle for storing initialised barriers
232  */
233 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
234
235 /**
236  * Service context
237  */
238 static struct GNUNET_SERVICE_Context *ctx;
239
240
241 /**
242  * Function called to notify a client about the connection
243  * begin ready to queue more data.  "buf" will be
244  * NULL and "size" zero if the connection was closed for
245  * writing in the meantime.
246  *
247  * @param cls client context
248  * @param size number of bytes available in buf
249  * @param buf where the callee should write the message
250  * @return number of bytes written to buf
251  */
252 static size_t 
253 transmit_ready_cb (void *cls, size_t size, void *buf)
254 {
255   struct ClientCtx *ctx = cls;
256   struct GNUNET_SERVER_Client *client = ctx->client;
257   struct MessageQueue *mq;
258   struct GNUNET_MessageHeader *msg;
259   size_t wrote;
260
261   ctx->tx = NULL;
262   wrote = 0;
263   if ((0 == size) || (NULL == buf))
264   {
265     GNUNET_assert (NULL != ctx->client);
266     GNUNET_SERVER_client_drop (ctx->client);
267     ctx->client = NULL;    
268     return 0;
269   }
270   mq = ctx->mq_head;
271   msg = mq->msg;
272   wrote = ntohs (msg->size);
273   GNUNET_assert (size >= wrote);
274   (void) memcpy (buf, msg, wrote);
275   GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
276   GNUNET_free (mq->msg);
277   GNUNET_free (mq);
278   if (NULL != (mq = ctx->mq_head))
279     ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
280                                                   MESSAGE_SEND_TIMEOUT (30),
281                                                   &transmit_ready_cb, ctx);
282   return wrote;
283 }
284
285
286 /**
287  * Queue a message into a clients message queue
288  *
289  * @param ctx the context associated with the client
290  * @param msg the message to queue.  Will be consumed
291  */
292 static void
293 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
294 {
295   struct MessageQueue *mq;
296   struct GNUNET_SERVER_Client *client = ctx->client;
297   
298   mq = GNUNET_malloc (sizeof (struct MessageQueue));
299   mq->msg = msg;
300   GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
301   if (NULL == ctx->tx)
302    ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
303                                                   MESSAGE_SEND_TIMEOUT (30),
304                                                   &transmit_ready_cb, ctx);
305 }
306
307
308 /**
309  * Function to cleanup client context data structure
310  *
311  * @param ctx the client context data structure
312  */
313 static void
314 cleanup_clientctx (struct ClientCtx *ctx)
315 {
316   struct MessageQueue *mq;
317   
318   GNUNET_SERVER_client_drop (ctx->client);
319   if (NULL != ctx->tx)
320     GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
321   if (NULL != (mq = ctx->mq_head))
322   {
323     GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
324     GNUNET_free (mq->msg);
325     GNUNET_free (mq);
326   }
327   GNUNET_free (ctx);
328 }
329
330
331 /**
332  * Function to remove a barrier from the barrier map and cleanup resources
333  * occupied by a barrier
334  *
335  * @param barrier the barrier handle
336  */
337 static void
338 remove_barrier (struct Barrier *barrier)
339 {
340   struct ClientCtx *ctx;
341   
342   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
343                                                                     &barrier->hash,
344                                                                     barrier));
345   while (NULL != (ctx = barrier->head))
346   {
347     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx);
348     cleanup_clientctx (ctx);
349   }
350   GNUNET_free (barrier->name);
351   GNUNET_SERVER_client_drop (barrier->client);
352   GNUNET_free (barrier);
353 }
354
355
356 /**
357  * Cancels all subcontroller barrier handles
358  *
359  * @param barrier the local barrier
360  */
361 static void
362 cancel_wrappers (struct Barrier *barrier)
363 {
364   struct WBarrier *wrapper;
365
366   while (NULL != (wrapper = barrier->whead))
367   {
368     GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
369     GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
370     GNUNET_free (wrapper);
371   }
372 }
373
374
375 /**
376  * Send a status message about a barrier to the given client
377  *
378  * @param client the client to send the message to
379  * @param name the barrier name
380  * @param status the status of the barrier
381  * @param emsg the error message; should be non-NULL for
382  *   status=BARRIER_STATUS_ERROR 
383  */
384 static void
385 send_client_status_msg (struct GNUNET_SERVER_Client *client,
386                         const char *name,
387                         enum GNUNET_TESTBED_BarrierStatus status,
388                         const char *emsg)
389 {
390   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
391   size_t name_len;
392   uint16_t msize;
393
394   GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status));
395   name_len = strlen (name) + 1;
396   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
397       + name_len
398       + (NULL == emsg) ? 0 : strlen (emsg) + 1;
399   msg = GNUNET_malloc (msize);
400   msg->status = htons (status);
401   msg->name_len = htons ((uint16_t) name_len);
402   (void) memcpy (msg->data, name, name_len);
403   if (NULL != emsg)
404     (void) memcpy (msg->data + name_len, emsg, strlen (emsg) + 1);
405   GST_queue_message (client, &msg->header);
406 }
407
408
409 /**
410  * Sends a barrier failed message
411  *
412  * @param barrier the corresponding barrier
413  * @param emsg the error message; should be non-NULL for
414  *   status=BARRIER_STATUS_ERROR 
415  */
416 static void
417 send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
418 {
419   GNUNET_assert (0 != barrier->status);
420   send_client_status_msg (barrier->client, barrier->name,
421                           barrier->status, emsg);
422 }
423
424
425
426 /**
427  * Task for sending barrier crossed notifications to waiting client
428  *
429  * @param cls the barrier which is crossed
430  * @param tc scheduler task context
431  */
432 static void
433 notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
434 {
435   struct Barrier *barrier = cls;
436   struct ClientCtx *client_ctx;
437   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
438   struct GNUNET_MessageHeader *dup_msg;
439   uint16_t name_len;
440   uint16_t msize;
441
442   name_len = strlen (barrier->name) + 1;
443   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len;  
444   msg = GNUNET_malloc (msize);
445   msg->header.size = htons (msize);
446   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
447   msg->status = 0;
448   msg->name_len = htons (name_len);
449   (void) memcpy (msg->data, barrier->name, name_len);
450   msg->data[name_len] = '\0';
451   while (NULL != (client_ctx = barrier->head))
452   {
453     dup_msg = GNUNET_copy_message (&msg->header);
454     queue_message (client_ctx, dup_msg);
455     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
456     GNUNET_SERVER_client_set_user_context_ (client_ctx->client, NULL, 0);
457     GNUNET_free (client_ctx);
458   }
459 }
460
461
462 /**
463  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.  This
464  * message should come from peers or a shared helper service using the
465  * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
466  *
467  * This handler is queued in the main service and will handle the messages sent
468  * either from the testbed driver or from a high level controller
469  *
470  * @param cls NULL
471  * @param client identification of the client
472  * @param message the actual message
473  */
474 static void
475 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
476                      const struct GNUNET_MessageHeader *message)
477 {
478   const struct GNUNET_TESTBED_BarrierWait *msg;
479   struct Barrier *barrier;
480   char *name;
481   struct ClientCtx *client_ctx;
482   struct GNUNET_HashCode key;
483   size_t name_len;
484   uint16_t msize;
485   
486   msize = ntohs (message->size);
487   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
488   {
489     GNUNET_break_op (0);
490     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
491     return;
492   }
493   if (NULL == barrier_map)
494   {
495     GNUNET_break (0);
496     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
497     return;
498   }
499   msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
500   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
501   name = GNUNET_malloc (name_len + 1);
502   name[name_len] = '\0';
503   (void) memcpy (name, msg->name, name_len);
504   GNUNET_CRYPTO_hash (name, name_len - 1, &key);
505   if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
506   {
507     GNUNET_break (0);
508     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
509     GNUNET_free (name);
510     return;
511   }
512   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
513   if (NULL == client_ctx)
514   {
515     client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
516     client_ctx->client = client;
517     GNUNET_SERVER_client_keep (client);
518     client_ctx->barrier = barrier;
519     GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
520     barrier->nreached++;
521     if (LOCAL_QUORUM_REACHED (barrier))
522       notify_task_cb (barrier, NULL);
523   }
524   GNUNET_SERVER_receive_done (client, GNUNET_OK);
525 }
526
527
528 /**
529  * Functions with this signature are called whenever a client
530  * is disconnected on the network level.
531  *
532  * @param cls closure
533  * @param client identification of the client; NULL
534  *        for the last call when the server is destroyed
535  */
536 static void
537 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
538 {
539   struct ClientCtx *client_ctx;
540   
541   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
542   if (NULL == client_ctx)
543     return;                     /* We only set user context for locally
544                                    connected clients */
545   cleanup_clientctx (client_ctx);
546 }
547
548
549 /**
550  * Function to initialise barrriers component
551  */
552 void
553 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
554 {
555   static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
556     {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
557     {NULL, NULL, 0, 0}
558   };
559   struct GNUNET_SERVER_Handle *srv;
560
561   barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
562   ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
563                               GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
564   srv = GNUNET_SERVICE_get_server (ctx);
565   GNUNET_SERVER_add_handlers (srv, message_handlers);
566   GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);  
567 }
568
569
570 /**
571  * Function to stop the barrier service
572  */
573 void
574 GST_barriers_stop ()
575 {
576   GNUNET_assert (NULL != barrier_map);
577   GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
578   GNUNET_assert (NULL != ctx);
579   GNUNET_SERVICE_stop (ctx);
580 }
581
582
583 /**
584  * Functions of this type are to be given as callback argument to
585  * GNUNET_TESTBED_barrier_init().  The callback will be called when status
586  * information is available for the barrier.
587  *
588  * @param cls the closure given to GNUNET_TESTBED_barrier_init()
589  * @param name the name of the barrier
590  * @param barrier the barrier handle
591  * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
592  *   GNUNET_SYSERR upon error
593  * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
594  *   error messsage
595  */
596 static void 
597 wbarrier_status_cb (void *cls, const char *name,
598                     struct GNUNET_TESTBED_Barrier *b_,
599                     enum GNUNET_TESTBED_BarrierStatus status,
600                     const char *emsg)
601 {
602   struct WBarrier *wrapper = cls;
603   struct Barrier *barrier = wrapper->barrier;
604
605   GNUNET_assert (b_ == wrapper->hbarrier);
606   wrapper->hbarrier = NULL;
607   GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
608   GNUNET_free (wrapper);
609   switch (status)
610   {
611   case BARRIER_STATUS_ERROR:
612     LOG (GNUNET_ERROR_TYPE_ERROR,
613          "Initialising barrier `%s' failed at a sub-controller: %s\n",
614          barrier->name, (NULL != emsg) ? emsg : "NULL");
615     cancel_wrappers (barrier);
616     if (NULL == emsg)
617       emsg = "Initialisation failed at a sub-controller";
618     barrier->status = BARRIER_STATUS_ERROR;
619     send_barrier_status_msg (barrier, emsg);
620     return;
621   case BARRIER_STATUS_CROSSED:
622     if (BARRIER_STATUS_INITIALISED != barrier->status)
623     {
624       GNUNET_break_op (0);
625       return;
626     }
627     barrier->num_wbarriers_reached++;
628     if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
629         && (LOCAL_QUORUM_REACHED (barrier)))
630     {
631       barrier->status = BARRIER_STATUS_CROSSED;
632       send_barrier_status_msg (barrier, NULL);
633     }
634     return;
635   case BARRIER_STATUS_INITIALISED:
636     if (0 != barrier->status)
637     {
638       GNUNET_break_op (0);
639       return;
640     }
641     barrier->num_wbarriers_inited++;
642     if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
643     {
644       barrier->status = BARRIER_STATUS_INITIALISED;
645       send_barrier_status_msg (barrier, NULL);
646     }
647     return;
648   }
649 }
650
651
652 /**
653  * Function called upon timeout while waiting for a response from the
654  * subcontrollers to barrier init message
655  *
656  * @param 
657  * @return 
658  */
659 static void
660 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
661 {
662   struct Barrier *barrier = cls;
663   
664   cancel_wrappers (barrier);
665   barrier->status = BARRIER_STATUS_ERROR;
666   send_barrier_status_msg (barrier,
667                            "Timedout while propagating barrier initialisation\n");
668   remove_barrier (barrier);
669 }
670
671
672 /**
673  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.  This
674  * message should always come from a parent controller or the testbed API if we
675  * are the root controller.
676  *
677  * This handler is queued in the main service and will handle the messages sent
678  * either from the testbed driver or from a high level controller
679  *
680  * @param cls NULL
681  * @param client identification of the client
682  * @param message the actual message
683  */
684 void
685 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
686                          const struct GNUNET_MessageHeader *message)
687 {
688   const struct GNUNET_TESTBED_BarrierInit *msg;
689   char *name;
690   struct Barrier *barrier;
691   struct Slave *slave;
692   struct WBarrier *wrapper;
693   struct GNUNET_HashCode hash;
694   size_t name_len;
695   unsigned int cnt;
696   uint16_t msize;
697   
698   if (NULL == GST_context)
699   {
700     GNUNET_break_op (0);
701     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
702     return;
703   }
704   if (client != GST_context->client)
705   {
706     GNUNET_break_op (0);
707     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
708     return;
709   }
710   msize = ntohs (message->size);
711   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
712   {
713     GNUNET_break_op (0);
714     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
715     return;
716   }
717   msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
718   name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
719   name = GNUNET_malloc (name_len + 1);
720   (void) memcpy (name, msg->name, name_len);
721   GNUNET_CRYPTO_hash (name, name_len, &hash);
722   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
723   {
724   
725     send_client_status_msg (client, name, BARRIER_STATUS_ERROR,
726                             "A barrier with the same name already exists");
727     GNUNET_free (name);
728     GNUNET_SERVER_receive_done (client, GNUNET_OK);
729     return;
730   }
731   barrier = GNUNET_malloc (sizeof (struct Barrier));
732   (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
733   barrier->quorum = msg->quorum;
734   barrier->name = name;
735   barrier->client = client;
736   GNUNET_SERVER_client_keep (client);
737   GNUNET_assert (GNUNET_OK ==
738                  GNUNET_CONTAINER_multihashmap_put (barrier_map,
739                                                     &barrier->hash,
740                                                     barrier,
741                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
742   GNUNET_SERVER_receive_done (client, GNUNET_OK);
743   /* Propagate barrier init to subcontrollers */
744   for (cnt = 0; cnt < GST_slave_list_size; cnt++)
745   {
746     if (NULL == (slave = GST_slave_list[cnt]))
747       continue;
748     if (NULL == slave->controller)
749     {
750       GNUNET_break (0);/* May happen when we are connecting to the controller */
751       continue;
752     }
753     wrapper = GNUNET_malloc (sizeof (struct WBarrier));
754     wrapper->barrier = barrier;
755     GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
756     wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller,
757                                                      barrier->name,
758                                                      barrier->quorum,
759                                                      &wbarrier_status_cb,
760                                                      wrapper);    
761   }
762   if (NULL == barrier->whead)   /* No further propagation */
763   {
764     barrier->status = BARRIER_STATUS_INITIALISED;
765     send_barrier_status_msg (barrier, NULL);
766   }else
767     barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
768                                                        &fwd_tout_barrier_init,
769                                                        barrier);
770 }
771
772
773 /**
774  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.  This
775  * message should always come from a parent controller or the testbed API if we
776  * are the root controller.
777  *
778  * This handler is queued in the main service and will handle the messages sent
779  * either from the testbed driver or from a high level controller
780  *
781  * @param cls NULL
782  * @param client identification of the client
783  * @param message the actual message
784  */
785 void
786 GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
787                            const struct GNUNET_MessageHeader *message)
788 {
789   const struct GNUNET_TESTBED_BarrierCancel *msg;
790   char *name;
791   struct Barrier *barrier;
792   struct GNUNET_HashCode hash;
793   size_t name_len;
794   uint16_t msize;
795
796   if (NULL == GST_context)
797   {
798     GNUNET_break_op (0);
799     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
800     return;
801   }  
802   if (client != GST_context->client)
803   {
804     GNUNET_break_op (0);
805     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
806     return;
807   }
808   msize = ntohs (message->size);
809   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
810   {
811     GNUNET_break_op (0);
812     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
813     return;
814   }
815   msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
816   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
817   name = GNUNET_malloc (name_len + 1);
818   (void) memcpy (name, msg->name, name_len);
819   GNUNET_CRYPTO_hash (name, name_len, &hash);
820   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
821   {
822     GNUNET_break_op (0);
823     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
824     return;
825   }
826   barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash);
827   GNUNET_assert (NULL != barrier);
828   cancel_wrappers (barrier);
829   remove_barrier (barrier);
830   GNUNET_SERVER_receive_done (client, GNUNET_OK);  
831 }
832
833 /* end of gnunet-service-testbed_barriers.c */