- propagate barrier cancel message
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_barriers.c
1 /*
2   This file is part of GNUnet.
3   (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file testbed/gnunet-service-testbed_barriers.c
23  * @brief barrier handling at the testbed controller
24  * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
25  */
26
27 #include "gnunet-service-testbed.h"
28 #include "gnunet-service-testbed_barriers.h"
29
30
31 /**
32  * timeout for outgoing message transmissions in seconds
33  */
34 #define MESSAGE_SEND_TIMEOUT(s) \
35   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
36
37
38 /**
39  * Test to see if local peers have reached the required quorum of a barrier
40  */
41 #define LOCAL_QUORUM_REACHED(barrier)           \
42   ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
43
44
45 /**
46  * Barrier
47  */
48 struct Barrier;
49
50
51 /**
52  * Message queue for transmitting messages
53  */
54 struct MessageQueue
55 {
56   /**
57    * next pointer for DLL
58    */
59   struct MessageQueue *next;
60
61   /**
62    * prev pointer for DLL
63    */
64   struct MessageQueue *prev;
65
66   /**
67    * The message to be sent
68    */
69   struct GNUNET_MessageHeader *msg;
70 };
71
72
73 /**
74  * Context to be associated with each client
75  */
76 struct ClientCtx
77 {
78   /**
79    * The barrier this client is waiting for
80    */
81   struct Barrier *barrier;
82
83   /**
84    * DLL next ptr
85    */
86   struct ClientCtx *next;
87
88   /**
89    * DLL prev ptr
90    */
91   struct ClientCtx *prev;
92
93   /**
94    * The client handle
95    */
96   struct GNUNET_SERVER_Client *client;
97
98   /**
99    * the transmission handle
100    */
101   struct GNUNET_SERVER_TransmitHandle *tx;
102
103   /**
104    * message queue head
105    */
106   struct MessageQueue *mq_head;
107
108   /**
109    * message queue tail
110    */
111   struct MessageQueue *mq_tail;
112 };
113
114
115 /**
116  * Wrapper around Barrier handle
117  */
118 struct WBarrier
119 {
120   /**
121    * DLL next pointer
122    */
123   struct WBarrier *next;
124
125   /**
126    * DLL prev pointer
127    */
128   struct WBarrier *prev;
129
130   /**
131    * The local barrier associated with the creation of this wrapper
132    */
133   struct Barrier *barrier;
134
135   /**
136    * The barrier handle from API
137    */
138   struct GNUNET_TESTBED_Barrier *hbarrier;
139
140   /**
141    * Has this barrier been crossed?
142    */
143   uint8_t reached;
144 };
145
146
147 /**
148  * Barrier
149  */
150 struct Barrier
151 {
152   /**
153    * The hashcode of the barrier name
154    */
155   struct GNUNET_HashCode hash;
156
157   /**
158    * The client handle to the master controller
159    */
160   struct GNUNET_SERVER_Client *client;
161
162   /**
163    * The name of the barrier
164    */
165   char *name;
166
167   /**
168    * DLL head for the list of clients waiting for this barrier
169    */
170   struct ClientCtx *head;
171
172   /**
173    * DLL tail for the list of clients waiting for this barrier
174    */
175   struct ClientCtx *tail;
176
177   /**
178    * DLL head for the list of barrier handles
179    */
180   struct WBarrier *whead;
181
182   /**
183    * DLL tail for the list of barrier handles
184    */
185   struct WBarrier *wtail;
186
187   /**
188    * Identifier for the timeout task
189    */
190   GNUNET_SCHEDULER_TaskIdentifier tout_task;
191   
192   /**
193    * The status of this barrier
194    */
195   enum GNUNET_TESTBED_BarrierStatus status;
196   
197   /**
198    * Number of barriers wrapped in the above DLL
199    */
200   unsigned int num_wbarriers;
201
202   /**
203    * Number of wrapped barriers reached so far
204    */
205   unsigned int num_wbarriers_reached;
206
207   /**
208    * Number of wrapped barrier initialised so far
209    */
210   unsigned int num_wbarriers_inited;
211
212   /**
213    * Number of peers which have reached this barrier
214    */
215   unsigned int nreached;
216
217   /**
218    * Number of slaves we have initialised this barrier
219    */
220   unsigned int nslaves;
221
222   /**
223    * Quorum percentage to be reached
224    */
225   uint8_t quorum;
226   
227   /**
228    * Was there a timeout while propagating initialisation
229    */
230   uint8_t timedout;
231 };
232
233
234 /**
235  * Hashtable handle for storing initialised barriers
236  */
237 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
238
239 /**
240  * Service context
241  */
242 static struct GNUNET_SERVICE_Context *ctx;
243
244
245 /**
246  * Function called to notify a client about the connection
247  * begin ready to queue more data.  "buf" will be
248  * NULL and "size" zero if the connection was closed for
249  * writing in the meantime.
250  *
251  * @param cls client context
252  * @param size number of bytes available in buf
253  * @param buf where the callee should write the message
254  * @return number of bytes written to buf
255  */
256 static size_t 
257 transmit_ready_cb (void *cls, size_t size, void *buf)
258 {
259   struct ClientCtx *ctx = cls;
260   struct GNUNET_SERVER_Client *client = ctx->client;
261   struct MessageQueue *mq;
262   struct GNUNET_MessageHeader *msg;
263   size_t wrote;
264
265   ctx->tx = NULL;
266   wrote = 0;
267   if ((0 == size) || (NULL == buf))
268   {
269     GNUNET_assert (NULL != ctx->client);
270     GNUNET_SERVER_client_drop (ctx->client);
271     ctx->client = NULL;    
272     return 0;
273   }
274   mq = ctx->mq_head;
275   msg = mq->msg;
276   wrote = ntohs (msg->size);
277   GNUNET_assert (size >= wrote);
278   (void) memcpy (buf, msg, wrote);
279   GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
280   GNUNET_free (mq->msg);
281   GNUNET_free (mq);
282   if (NULL != (mq = ctx->mq_head))
283     ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
284                                                   MESSAGE_SEND_TIMEOUT (30),
285                                                   &transmit_ready_cb, ctx);
286   return wrote;
287 }
288
289
290 /**
291  * Queue a message into a clients message queue
292  *
293  * @param ctx the context associated with the client
294  * @param msg the message to queue.  Will be consumed
295  */
296 static void
297 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
298 {
299   struct MessageQueue *mq;
300   struct GNUNET_SERVER_Client *client = ctx->client;
301   
302   mq = GNUNET_malloc (sizeof (struct MessageQueue));
303   mq->msg = msg;
304   GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
305   if (NULL == ctx->tx)
306    ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
307                                                   MESSAGE_SEND_TIMEOUT (30),
308                                                   &transmit_ready_cb, ctx);
309 }
310
311
312 /**
313  * Function to cleanup client context data structure
314  *
315  * @param ctx the client context data structure
316  */
317 static void
318 cleanup_clientctx (struct ClientCtx *ctx)
319 {
320   struct MessageQueue *mq;
321   
322   GNUNET_SERVER_client_drop (ctx->client);
323   if (NULL != ctx->tx)
324     GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
325   if (NULL != (mq = ctx->mq_head))
326   {
327     GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
328     GNUNET_free (mq->msg);
329     GNUNET_free (mq);
330   }
331   GNUNET_free (ctx);
332 }
333
334
335 /**
336  * Function to remove a barrier from the barrier map and cleanup resources
337  * occupied by a barrier
338  *
339  * @param barrier the barrier handle
340  */
341 static void
342 remove_barrier (struct Barrier *barrier)
343 {
344   struct ClientCtx *ctx;
345   
346   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
347                                                                     &barrier->hash,
348                                                                     barrier));
349   while (NULL != (ctx = barrier->head))
350   {
351     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx);
352     cleanup_clientctx (ctx);
353   }
354   GNUNET_free (barrier->name);
355   GNUNET_SERVER_client_drop (barrier->client);
356   GNUNET_free (barrier);
357 }
358
359
360 /**
361  * Cancels all subcontroller barrier handles
362  *
363  * @param barrier the local barrier
364  */
365 static void
366 cancel_wrappers (struct Barrier *barrier)
367 {
368   struct WBarrier *wrapper;
369
370   while (NULL != (wrapper = barrier->whead))
371   {
372     GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
373     GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
374     GNUNET_free (wrapper);
375   }
376 }
377
378
379 /**
380  * Send a status message about a barrier to the given client
381  *
382  * @param client the client to send the message to
383  * @param name the barrier name
384  * @param status the status of the barrier
385  * @param emsg the error message; should be non-NULL for
386  *   status=BARRIER_STATUS_ERROR 
387  */
388 static void
389 send_client_status_msg (struct GNUNET_SERVER_Client *client,
390                         const char *name,
391                         enum GNUNET_TESTBED_BarrierStatus status,
392                         const char *emsg)
393 {
394   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
395   size_t name_len;
396   uint16_t msize;
397
398   GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status));
399   name_len = strlen (name) + 1;
400   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
401       + name_len
402       + (NULL == emsg) ? 0 : strlen (emsg) + 1;
403   msg = GNUNET_malloc (msize);
404   msg->status = htons (status);
405   msg->name_len = htons ((uint16_t) name_len);
406   (void) memcpy (msg->data, name, name_len);
407   if (NULL != emsg)
408     (void) memcpy (msg->data + name_len, emsg, strlen (emsg) + 1);
409   GST_queue_message (client, &msg->header);
410 }
411
412
413 /**
414  * Sends a barrier failed message
415  *
416  * @param barrier the corresponding barrier
417  * @param emsg the error message; should be non-NULL for
418  *   status=BARRIER_STATUS_ERROR 
419  */
420 static void
421 send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
422 {
423   GNUNET_assert (0 != barrier->status);
424   send_client_status_msg (barrier->client, barrier->name,
425                           barrier->status, emsg);
426 }
427
428
429
430 /**
431  * Task for sending barrier crossed notifications to waiting client
432  *
433  * @param cls the barrier which is crossed
434  * @param tc scheduler task context
435  */
436 static void
437 notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
438 {
439   struct Barrier *barrier = cls;
440   struct ClientCtx *client_ctx;
441   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
442   struct GNUNET_MessageHeader *dup_msg;
443   uint16_t name_len;
444   uint16_t msize;
445
446   name_len = strlen (barrier->name) + 1;
447   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len;  
448   msg = GNUNET_malloc (msize);
449   msg->header.size = htons (msize);
450   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
451   msg->status = 0;
452   msg->name_len = htons (name_len);
453   (void) memcpy (msg->data, barrier->name, name_len);
454   msg->data[name_len] = '\0';
455   while (NULL != (client_ctx = barrier->head))
456   {
457     dup_msg = GNUNET_copy_message (&msg->header);
458     queue_message (client_ctx, dup_msg);
459     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
460     GNUNET_SERVER_client_set_user_context_ (client_ctx->client, NULL, 0);
461     GNUNET_free (client_ctx);
462   }
463 }
464
465
466 /**
467  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.  This
468  * message should come from peers or a shared helper service using the
469  * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
470  *
471  * This handler is queued in the main service and will handle the messages sent
472  * either from the testbed driver or from a high level controller
473  *
474  * @param cls NULL
475  * @param client identification of the client
476  * @param message the actual message
477  */
478 static void
479 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
480                      const struct GNUNET_MessageHeader *message)
481 {
482   const struct GNUNET_TESTBED_BarrierWait *msg;
483   struct Barrier *barrier;
484   char *name;
485   struct ClientCtx *client_ctx;
486   struct GNUNET_HashCode key;
487   size_t name_len;
488   uint16_t msize;
489   
490   msize = ntohs (message->size);
491   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
492   {
493     GNUNET_break_op (0);
494     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
495     return;
496   }
497   if (NULL == barrier_map)
498   {
499     GNUNET_break (0);
500     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
501     return;
502   }
503   msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
504   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
505   name = GNUNET_malloc (name_len + 1);
506   name[name_len] = '\0';
507   (void) memcpy (name, msg->name, name_len);
508   GNUNET_CRYPTO_hash (name, name_len - 1, &key);
509   if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
510   {
511     GNUNET_break (0);
512     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
513     GNUNET_free (name);
514     return;
515   }
516   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
517   if (NULL == client_ctx)
518   {
519     client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
520     client_ctx->client = client;
521     GNUNET_SERVER_client_keep (client);
522     client_ctx->barrier = barrier;
523     GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
524     barrier->nreached++;
525     if (LOCAL_QUORUM_REACHED (barrier))
526       notify_task_cb (barrier, NULL);
527   }
528   GNUNET_SERVER_receive_done (client, GNUNET_OK);
529 }
530
531
532 /**
533  * Functions with this signature are called whenever a client
534  * is disconnected on the network level.
535  *
536  * @param cls closure
537  * @param client identification of the client; NULL
538  *        for the last call when the server is destroyed
539  */
540 static void
541 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
542 {
543   struct ClientCtx *client_ctx;
544   
545   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
546   if (NULL == client_ctx)
547     return;                     /* We only set user context for locally
548                                    connected clients */
549   cleanup_clientctx (client_ctx);
550 }
551
552
553 /**
554  * Function to initialise barrriers component
555  */
556 void
557 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
558 {
559   static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
560     {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
561     {NULL, NULL, 0, 0}
562   };
563   struct GNUNET_SERVER_Handle *srv;
564
565   barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
566   ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
567                               GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
568   srv = GNUNET_SERVICE_get_server (ctx);
569   GNUNET_SERVER_add_handlers (srv, message_handlers);
570   GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);  
571 }
572
573
574 /**
575  * Function to stop the barrier service
576  */
577 void
578 GST_barriers_stop ()
579 {
580   GNUNET_assert (NULL != barrier_map);
581   GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
582   GNUNET_assert (NULL != ctx);
583   GNUNET_SERVICE_stop (ctx);
584 }
585
586
587 /**
588  * Functions of this type are to be given as callback argument to
589  * GNUNET_TESTBED_barrier_init().  The callback will be called when status
590  * information is available for the barrier.
591  *
592  * @param cls the closure given to GNUNET_TESTBED_barrier_init()
593  * @param name the name of the barrier
594  * @param barrier the barrier handle
595  * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
596  *   GNUNET_SYSERR upon error
597  * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
598  *   error messsage
599  */
600 static void 
601 wbarrier_status_cb (void *cls, const char *name,
602                     struct GNUNET_TESTBED_Barrier *b_,
603                     enum GNUNET_TESTBED_BarrierStatus status,
604                     const char *emsg)
605 {
606   struct WBarrier *wrapper = cls;
607   struct Barrier *barrier = wrapper->barrier;
608
609   GNUNET_assert (b_ == wrapper->hbarrier);
610   wrapper->hbarrier = NULL;
611   GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
612   GNUNET_free (wrapper);
613   if (BARRIER_STATUS_ERROR == status)
614   {
615     LOG (GNUNET_ERROR_TYPE_ERROR,
616          "Initialising barrier `%s' failed at a sub-controller: %s\n",
617          barrier->name, (NULL != emsg) ? emsg : "NULL");
618     cancel_wrappers (barrier);
619     if (NULL == emsg)
620       emsg = "Initialisation failed at a sub-controller";
621     barrier->status = BARRIER_STATUS_ERROR;
622     send_barrier_status_msg (barrier, emsg);
623     return;
624   }
625   switch (status)
626   {
627   case BARRIER_STATUS_CROSSED:
628     if (BARRIER_STATUS_INITIALISED != barrier->status)
629     {
630       GNUNET_break_op (0);
631       return;
632     }
633     barrier->num_wbarriers_reached++;
634     if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
635         && (LOCAL_QUORUM_REACHED (barrier)))
636     {
637       barrier->status = BARRIER_STATUS_CROSSED;
638       send_barrier_status_msg (barrier, NULL);
639     }
640     break;
641   case BARRIER_STATUS_INITIALISED:
642     if (0 != barrier->status)
643     {
644       GNUNET_break_op (0);
645       return;
646     }
647     barrier->num_wbarriers_inited++;
648     if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
649     {
650       barrier->status = BARRIER_STATUS_INITIALISED;
651       send_barrier_status_msg (barrier, NULL);
652     }
653     break;
654   case BARRIER_STATUS_ERROR:
655     GNUNET_assert (0);
656   }
657   return;
658 }
659
660
661 /**
662  * Function called upon timeout while waiting for a response from the
663  * subcontrollers to barrier init message
664  *
665  * @param 
666  * @return 
667  */
668 static void
669 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
670 {
671   struct Barrier *barrier = cls;
672   
673   barrier->nslaves--;
674   barrier->timedout = GNUNET_YES;
675   cancel_wrappers (barrier);
676   barrier->status = BARRIER_STATUS_ERROR;
677   send_barrier_status_msg (barrier,
678                            "Timedout while propagating barrier initialisation\n");
679   remove_barrier (barrier);
680 }
681
682
683 /**
684  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.  This
685  * message should always come from a parent controller or the testbed API if we
686  * are the root controller.
687  *
688  * This handler is queued in the main service and will handle the messages sent
689  * either from the testbed driver or from a high level controller
690  *
691  * @param cls NULL
692  * @param client identification of the client
693  * @param message the actual message
694  */
695 void
696 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
697                          const struct GNUNET_MessageHeader *message)
698 {
699   const struct GNUNET_TESTBED_BarrierInit *msg;
700   char *name;
701   struct Barrier *barrier;
702   struct Slave *slave;
703   struct WBarrier *wrapper;
704   struct GNUNET_HashCode hash;
705   size_t name_len;
706   unsigned int cnt;
707   uint16_t msize;
708   
709   if (NULL == GST_context)
710   {
711     GNUNET_break_op (0);
712     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
713     return;
714   }
715   if (client != GST_context->client)
716   {
717     GNUNET_break_op (0);
718     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
719     return;
720   }
721   msize = ntohs (message->size);
722   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
723   {
724     GNUNET_break_op (0);
725     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
726     return;
727   }
728   msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
729   name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
730   name = GNUNET_malloc (name_len + 1);
731   (void) memcpy (name, msg->name, name_len);
732   GNUNET_CRYPTO_hash (name, name_len, &hash);
733   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
734   {
735   
736     send_client_status_msg (client, name, BARRIER_STATUS_ERROR,
737                             "A barrier with the same name already exists");
738     GNUNET_free (name);
739     GNUNET_SERVER_receive_done (client, GNUNET_OK);
740     return;
741   }
742   barrier = GNUNET_malloc (sizeof (struct Barrier));
743   (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
744   barrier->quorum = msg->quorum;
745   barrier->name = name;
746   barrier->client = client;
747   GNUNET_SERVER_client_keep (client);
748   GNUNET_assert (GNUNET_OK ==
749                  GNUNET_CONTAINER_multihashmap_put (barrier_map,
750                                                     &barrier->hash,
751                                                     barrier,
752                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
753   GNUNET_SERVER_receive_done (client, GNUNET_OK);
754   /* Propagate barrier init to subcontrollers */
755   for (cnt = 0; cnt < GST_slave_list_size; cnt++)
756   {
757     if (NULL == (slave = GST_slave_list[cnt]))
758       continue;
759     if (NULL == slave->controller)
760     {
761       GNUNET_break (0);/* May happen when we are connecting to the controller */
762       continue;
763     }
764     wrapper = GNUNET_malloc (sizeof (struct WBarrier));
765     wrapper->barrier = barrier;
766     GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
767     wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller,
768                                                      barrier->name,
769                                                      barrier->quorum,
770                                                      &wbarrier_status_cb,
771                                                      wrapper);    
772   }
773   if (NULL == barrier->whead)   /* No further propagation */
774   {
775     barrier->status = BARRIER_STATUS_INITIALISED;
776     send_barrier_status_msg (barrier, NULL);
777   }else
778     barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
779                                                        &fwd_tout_barrier_init,
780                                                        barrier);
781 }
782
783
784 /**
785  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.  This
786  * message should always come from a parent controller or the testbed API if we
787  * are the root controller.
788  *
789  * This handler is queued in the main service and will handle the messages sent
790  * either from the testbed driver or from a high level controller
791  *
792  * @param cls NULL
793  * @param client identification of the client
794  * @param message the actual message
795  */
796 void
797 GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
798                            const struct GNUNET_MessageHeader *message)
799 {
800   const struct GNUNET_TESTBED_BarrierCancel *msg;
801   char *name;
802   struct Barrier *barrier;
803   struct GNUNET_HashCode hash;
804   size_t name_len;
805   uint16_t msize;
806
807   if (NULL == GST_context)
808   {
809     GNUNET_break_op (0);
810     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
811     return;
812   }  
813   if (client != GST_context->client)
814   {
815     GNUNET_break_op (0);
816     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
817     return;
818   }
819   msize = ntohs (message->size);
820   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
821   {
822     GNUNET_break_op (0);
823     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
824     return;
825   }
826   msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
827   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
828   name = GNUNET_malloc (name_len + 1);
829   (void) memcpy (name, msg->name, name_len);
830   GNUNET_CRYPTO_hash (name, name_len, &hash);
831   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
832   {
833     GNUNET_break_op (0);
834     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
835     return;
836   }
837   barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash);
838   GNUNET_assert (NULL != barrier);
839   cancel_wrappers (barrier);
840   remove_barrier (barrier);
841   GNUNET_SERVER_receive_done (client, GNUNET_OK);  
842 }
843
844 /* end of gnunet-service-testbed_barriers.c */