- experiment driver propagates barrier crossed notification
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_barriers.c
1 /*
2   This file is part of GNUnet.
3   (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file testbed/gnunet-service-testbed_barriers.c
23  * @brief barrier handling at the testbed controller
24  * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
25  */
26
27 #include "gnunet-service-testbed.h"
28 #include "gnunet-service-testbed_barriers.h"
29 #include "testbed_api_barriers.h"
30
31
32 /**
33  * timeout for outgoing message transmissions in seconds
34  */
35 #define MESSAGE_SEND_TIMEOUT(s) \
36   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
37
38
39 /**
40  * Test to see if local peers have reached the required quorum of a barrier
41  */
42 #define LOCAL_QUORUM_REACHED(barrier)           \
43   ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
44
45
46 #ifdef LOG
47 #undef LOG
48 #endif
49
50 /**
51  * Logging shorthand
52  */
53 #define LOG(kind,...)                                           \
54   GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__)
55
56
57 /**
58  * Barrier
59  */
60 struct Barrier;
61
62
63 /**
64  * Message queue for transmitting messages
65  */
66 struct MessageQueue
67 {
68   /**
69    * next pointer for DLL
70    */
71   struct MessageQueue *next;
72
73   /**
74    * prev pointer for DLL
75    */
76   struct MessageQueue *prev;
77
78   /**
79    * The message to be sent
80    */
81   struct GNUNET_MessageHeader *msg;
82 };
83
84
85 /**
86  * Context to be associated with each client
87  */
88 struct ClientCtx
89 {
90   /**
91    * The barrier this client is waiting for
92    */
93   struct Barrier *barrier;
94
95   /**
96    * DLL next ptr
97    */
98   struct ClientCtx *next;
99
100   /**
101    * DLL prev ptr
102    */
103   struct ClientCtx *prev;
104
105   /**
106    * The client handle
107    */
108   struct GNUNET_SERVER_Client *client;
109
110   /**
111    * the transmission handle
112    */
113   struct GNUNET_SERVER_TransmitHandle *tx;
114
115   /**
116    * message queue head
117    */
118   struct MessageQueue *mq_head;
119
120   /**
121    * message queue tail
122    */
123   struct MessageQueue *mq_tail;
124 };
125
126
127 /**
128  * Wrapper around Barrier handle
129  */
130 struct WBarrier
131 {
132   /**
133    * DLL next pointer
134    */
135   struct WBarrier *next;
136
137   /**
138    * DLL prev pointer
139    */
140   struct WBarrier *prev;
141
142   /**
143    * The local barrier associated with the creation of this wrapper
144    */
145   struct Barrier *barrier;
146
147   /**
148    * The barrier handle from API
149    */
150   struct GNUNET_TESTBED_Barrier *hbarrier;
151
152   /**
153    * Has this barrier been crossed?
154    */
155   uint8_t reached;
156 };
157
158
159 /**
160  * Barrier
161  */
162 struct Barrier
163 {
164   /**
165    * The hashcode of the barrier name
166    */
167   struct GNUNET_HashCode hash;
168
169   /**
170    * The client handle to the master controller
171    */
172   struct GNUNET_SERVER_Client *mc;
173
174   /**
175    * The name of the barrier
176    */
177   char *name;
178
179   /**
180    * DLL head for the list of clients waiting for this barrier
181    */
182   struct ClientCtx *head;
183
184   /**
185    * DLL tail for the list of clients waiting for this barrier
186    */
187   struct ClientCtx *tail;
188
189   /**
190    * DLL head for the list of barrier handles
191    */
192   struct WBarrier *whead;
193
194   /**
195    * DLL tail for the list of barrier handles
196    */
197   struct WBarrier *wtail;
198
199   /**
200    * Identifier for the timeout task
201    */
202   GNUNET_SCHEDULER_TaskIdentifier tout_task;
203   
204   /**
205    * The status of this barrier
206    */
207   enum GNUNET_TESTBED_BarrierStatus status;
208   
209   /**
210    * Number of barriers wrapped in the above DLL
211    */
212   unsigned int num_wbarriers;
213
214   /**
215    * Number of wrapped barriers reached so far
216    */
217   unsigned int num_wbarriers_reached;
218
219   /**
220    * Number of wrapped barrier initialised so far
221    */
222   unsigned int num_wbarriers_inited;
223
224   /**
225    * Number of peers which have reached this barrier
226    */
227   unsigned int nreached;
228
229   /**
230    * Number of slaves we have initialised this barrier
231    */
232   unsigned int nslaves;
233
234   /**
235    * Quorum percentage to be reached
236    */
237   uint8_t quorum;
238   
239 };
240
241
242 /**
243  * Hashtable handle for storing initialised barriers
244  */
245 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
246
247 /**
248  * Service context
249  */
250 static struct GNUNET_SERVICE_Context *ctx;
251
252
253 /**
254  * Function called to notify a client about the connection
255  * begin ready to queue more data.  "buf" will be
256  * NULL and "size" zero if the connection was closed for
257  * writing in the meantime.
258  *
259  * @param cls client context
260  * @param size number of bytes available in buf
261  * @param buf where the callee should write the message
262  * @return number of bytes written to buf
263  */
264 static size_t 
265 transmit_ready_cb (void *cls, size_t size, void *buf)
266 {
267   struct ClientCtx *ctx = cls;
268   struct GNUNET_SERVER_Client *client = ctx->client;
269   struct MessageQueue *mq;
270   struct GNUNET_MessageHeader *msg;
271   size_t wrote;
272
273   ctx->tx = NULL;
274   wrote = 0;
275   if ((0 == size) || (NULL == buf))
276   {
277     GNUNET_assert (NULL != ctx->client);
278     GNUNET_SERVER_client_drop (ctx->client);
279     ctx->client = NULL;    
280     return 0;
281   }
282   mq = ctx->mq_head;
283   msg = mq->msg;
284   wrote = ntohs (msg->size);
285   GNUNET_assert (size >= wrote);
286   (void) memcpy (buf, msg, wrote);
287   GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
288   GNUNET_free (mq->msg);
289   GNUNET_free (mq);
290   if (NULL != (mq = ctx->mq_head))
291     ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
292                                                   MESSAGE_SEND_TIMEOUT (30),
293                                                   &transmit_ready_cb, ctx);
294   return wrote;
295 }
296
297
298 /**
299  * Queue a message into a clients message queue
300  *
301  * @param ctx the context associated with the client
302  * @param msg the message to queue.  Will be consumed
303  */
304 static void
305 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
306 {
307   struct MessageQueue *mq;
308   struct GNUNET_SERVER_Client *client = ctx->client;
309   
310   mq = GNUNET_malloc (sizeof (struct MessageQueue));
311   mq->msg = msg;
312   LOG_DEBUG ("Queueing message of type %u, size %u for sending\n",
313              ntohs (msg->type), ntohs (msg->size));
314   GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
315   if (NULL == ctx->tx)
316    ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
317                                                   MESSAGE_SEND_TIMEOUT (30),
318                                                   &transmit_ready_cb, ctx);
319 }
320
321
322 /**
323  * Function to cleanup client context data structure
324  *
325  * @param ctx the client context data structure
326  */
327 static void
328 cleanup_clientctx (struct ClientCtx *ctx)
329 {
330   struct MessageQueue *mq;
331   
332   if (NULL != ctx->client)
333     GNUNET_SERVER_client_drop (ctx->client);
334   if (NULL != ctx->tx)
335     GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
336   if (NULL != (mq = ctx->mq_head))
337   {
338     GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
339     GNUNET_free (mq->msg);
340     GNUNET_free (mq);
341   }
342   GNUNET_free (ctx);
343 }
344
345
346 /**
347  * Function to remove a barrier from the barrier map and cleanup resources
348  * occupied by a barrier
349  *
350  * @param barrier the barrier handle
351  */
352 static void
353 remove_barrier (struct Barrier *barrier)
354 {
355   struct ClientCtx *ctx;
356   
357   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
358                                                                     &barrier->hash,
359                                                                     barrier));
360   while (NULL != (ctx = barrier->head))
361   {
362     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx);
363     GNUNET_SERVER_client_drop (ctx->client);
364     ctx->client = NULL;
365     if (NULL != ctx->tx)
366     {
367       GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
368       ctx->tx = NULL;
369     }
370   }
371   GNUNET_free (barrier->name);
372   GNUNET_SERVER_client_drop (barrier->mc);
373   GNUNET_free (barrier);
374 }
375
376
377 /**
378  * Cancels all subcontroller barrier handles
379  *
380  * @param barrier the local barrier
381  */
382 static void
383 cancel_wrappers (struct Barrier *barrier)
384 {
385   struct WBarrier *wrapper;
386
387   while (NULL != (wrapper = barrier->whead))
388   {
389     GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
390     GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
391     GNUNET_free (wrapper);
392   }
393 }
394
395
396 /**
397  * Send a status message about a barrier to the given client
398  *
399  * @param client the client to send the message to
400  * @param name the barrier name
401  * @param status the status of the barrier
402  * @param emsg the error message; should be non-NULL for
403  *   status=BARRIER_STATUS_ERROR 
404  */
405 static void
406 send_client_status_msg (struct GNUNET_SERVER_Client *client,
407                         const char *name,
408                         enum GNUNET_TESTBED_BarrierStatus status,
409                         const char *emsg)
410 {
411   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
412   size_t name_len;
413   uint16_t msize;
414
415   GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status));
416   name_len = strlen (name);
417   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
418       + (name_len + 1)
419       + ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
420   msg = GNUNET_malloc (msize);
421   msg->header.size = htons (msize);
422   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
423   msg->status = htons (status);
424   msg->name_len = htons ((uint16_t) name_len);
425   (void) memcpy (msg->data, name, name_len);
426   if (NULL != emsg)
427     (void) memcpy (msg->data + name_len + 1, emsg, strlen (emsg));
428   GST_queue_message (client, &msg->header);
429 }
430
431
432 /**
433  * Sends a barrier failed message
434  *
435  * @param barrier the corresponding barrier
436  * @param emsg the error message; should be non-NULL for
437  *   status=BARRIER_STATUS_ERROR 
438  */
439 static void
440 send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
441 {
442   GNUNET_assert (0 != barrier->status);
443   send_client_status_msg (barrier->mc, barrier->name, barrier->status, emsg);
444 }
445
446
447 /**
448  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.  This
449  * message should come from peers or a shared helper service using the
450  * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
451  *
452  * This handler is queued in the main service and will handle the messages sent
453  * either from the testbed driver or from a high level controller
454  *
455  * @param cls NULL
456  * @param client identification of the client
457  * @param message the actual message
458  */
459 static void
460 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
461                      const struct GNUNET_MessageHeader *message)
462 {
463   const struct GNUNET_TESTBED_BarrierWait *msg;
464   struct Barrier *barrier;
465   char *name;
466   struct ClientCtx *client_ctx;
467   struct GNUNET_HashCode key;
468   size_t name_len;
469   uint16_t msize;
470   
471   msize = ntohs (message->size);
472   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
473   {
474     GNUNET_break_op (0);
475     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
476     return;
477   }
478   if (NULL == barrier_map)
479   {
480     GNUNET_break (0);
481     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
482     return;
483   }
484   msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
485   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
486   name = GNUNET_malloc (name_len + 1);
487   name[name_len] = '\0';
488   (void) memcpy (name, msg->name, name_len);
489   LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n", name);
490   GNUNET_CRYPTO_hash (name, name_len, &key);
491   if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
492   {
493     GNUNET_break (0);
494     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
495     GNUNET_free (name);
496     return;
497   }
498   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
499   if (NULL == client_ctx)
500   {
501     client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
502     client_ctx->client = client;
503     GNUNET_SERVER_client_keep (client);
504     client_ctx->barrier = barrier;
505     GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
506     GNUNET_SERVER_client_set_user_context (client, client_ctx); 
507   }
508   barrier->nreached++;
509   if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
510         && (LOCAL_QUORUM_REACHED (barrier)))
511   {
512     barrier->status = BARRIER_STATUS_CROSSED;
513     send_barrier_status_msg (barrier, NULL);
514   }
515   GNUNET_SERVER_receive_done (client, GNUNET_OK);
516 }
517
518
519 /**
520  * Functions with this signature are called whenever a client
521  * is disconnected on the network level.
522  *
523  * @param cls closure
524  * @param client identification of the client; NULL
525  *        for the last call when the server is destroyed
526  */
527 static void
528 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
529 {
530   struct ClientCtx *client_ctx;
531   
532   if (NULL == client)
533     return;
534   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
535   cleanup_clientctx (client_ctx);
536 }
537
538
539 /**
540  * Function to initialise barrriers component
541  *
542  * @param cfg the configuration to use for initialisation
543  */
544 void
545 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
546 {
547   static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
548     {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
549     {NULL, NULL, 0, 0}
550   };
551   struct GNUNET_SERVER_Handle *srv;
552
553   barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
554   ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
555                               GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
556   srv = GNUNET_SERVICE_get_server (ctx);
557   GNUNET_SERVER_add_handlers (srv, message_handlers);
558   GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);  
559 }
560
561
562 /**
563  * Function to stop the barrier service
564  */
565 void
566 GST_barriers_destroy ()
567 {
568   GNUNET_assert (NULL != barrier_map);
569   GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
570   GNUNET_assert (NULL != ctx);
571   GNUNET_SERVICE_stop (ctx);
572 }
573
574
575 /**
576  * Functions of this type are to be given as callback argument to
577  * GNUNET_TESTBED_barrier_init().  The callback will be called when status
578  * information is available for the barrier.
579  *
580  * @param cls the closure given to GNUNET_TESTBED_barrier_init()
581  * @param name the name of the barrier
582  * @param b_ the barrier handle
583  * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
584  *   GNUNET_SYSERR upon error
585  * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
586  *   error messsage
587  */
588 static void 
589 wbarrier_status_cb (void *cls, const char *name,
590                     struct GNUNET_TESTBED_Barrier *b_,
591                     enum GNUNET_TESTBED_BarrierStatus status,
592                     const char *emsg)
593 {
594   struct WBarrier *wrapper = cls;
595   struct Barrier *barrier = wrapper->barrier;
596
597   GNUNET_assert (b_ == wrapper->hbarrier);
598   wrapper->hbarrier = NULL;
599   GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
600   GNUNET_free (wrapper);
601   switch (status)
602   {
603   case BARRIER_STATUS_ERROR:
604     LOG (GNUNET_ERROR_TYPE_ERROR,
605          "Initialising barrier `%s' failed at a sub-controller: %s\n",
606          barrier->name, (NULL != emsg) ? emsg : "NULL");
607     cancel_wrappers (barrier);
608     if (NULL == emsg)
609       emsg = "Initialisation failed at a sub-controller";
610     barrier->status = BARRIER_STATUS_ERROR;
611     send_barrier_status_msg (barrier, emsg);
612     return;
613   case BARRIER_STATUS_CROSSED:
614     if (BARRIER_STATUS_INITIALISED != barrier->status)
615     {
616       GNUNET_break_op (0);
617       return;
618     }
619     barrier->num_wbarriers_reached++;
620     if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
621         && (LOCAL_QUORUM_REACHED (barrier)))
622     {
623       barrier->status = BARRIER_STATUS_CROSSED;
624       send_barrier_status_msg (barrier, NULL);
625     }
626     return;
627   case BARRIER_STATUS_INITIALISED:
628     if (0 != barrier->status)
629     {
630       GNUNET_break_op (0);
631       return;
632     }
633     barrier->num_wbarriers_inited++;
634     if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
635     {
636       barrier->status = BARRIER_STATUS_INITIALISED;
637       send_barrier_status_msg (barrier, NULL);
638     }
639     return;
640   }
641 }
642
643
644 /**
645  * Function called upon timeout while waiting for a response from the
646  * subcontrollers to barrier init message
647  *
648  * @param cls barrier
649  * @param tc scheduler task context
650  */
651 static void
652 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
653 {
654   struct Barrier *barrier = cls;
655   
656   cancel_wrappers (barrier);
657   barrier->status = BARRIER_STATUS_ERROR;
658   send_barrier_status_msg (barrier,
659                            "Timedout while propagating barrier initialisation\n");
660   remove_barrier (barrier);
661 }
662
663
664 /**
665  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.  This
666  * message should always come from a parent controller or the testbed API if we
667  * are the root controller.
668  *
669  * This handler is queued in the main service and will handle the messages sent
670  * either from the testbed driver or from a high level controller
671  *
672  * @param cls NULL
673  * @param client identification of the client
674  * @param message the actual message
675  */
676 void
677 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
678                          const struct GNUNET_MessageHeader *message)
679 {
680   const struct GNUNET_TESTBED_BarrierInit *msg;
681   char *name;
682   struct Barrier *barrier;
683   struct Slave *slave;
684   struct WBarrier *wrapper;
685   struct GNUNET_HashCode hash;
686   size_t name_len;
687   unsigned int cnt;
688   uint16_t msize;
689   
690   if (NULL == GST_context)
691   {
692     GNUNET_break_op (0);
693     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
694     return;
695   }
696   if (client != GST_context->client)
697   {
698     GNUNET_break_op (0);
699     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
700     return;
701   }
702   msize = ntohs (message->size);
703   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
704   {
705     GNUNET_break_op (0);
706     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
707     return;
708   }
709   msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
710   name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
711   name = GNUNET_malloc (name_len + 1);
712   (void) memcpy (name, msg->name, name_len);
713   GNUNET_CRYPTO_hash (name, name_len, &hash);
714   LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n", name);
715   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
716   {
717     
718     send_client_status_msg (client, name, BARRIER_STATUS_ERROR,
719                             "A barrier with the same name already exists");
720     GNUNET_free (name);
721     GNUNET_SERVER_receive_done (client, GNUNET_OK);
722     return;
723   }
724   barrier = GNUNET_malloc (sizeof (struct Barrier));
725   (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
726   barrier->quorum = msg->quorum;
727   barrier->name = name;
728   barrier->mc = client;
729   GNUNET_SERVER_client_keep (client);
730   GNUNET_assert (GNUNET_OK ==
731                  GNUNET_CONTAINER_multihashmap_put (barrier_map,
732                                                     &barrier->hash,
733                                                     barrier,
734                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
735   GNUNET_SERVER_receive_done (client, GNUNET_OK);
736   /* Propagate barrier init to subcontrollers */
737   for (cnt = 0; cnt < GST_slave_list_size; cnt++)
738   {
739     if (NULL == (slave = GST_slave_list[cnt]))
740       continue;
741     if (NULL == slave->controller)
742     {
743       GNUNET_break (0);/* May happen when we are connecting to the controller */
744       continue;
745     }
746     wrapper = GNUNET_malloc (sizeof (struct WBarrier));
747     wrapper->barrier = barrier;
748     GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
749     wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (slave->controller,
750                                                       barrier->name,
751                                                       barrier->quorum,
752                                                       &wbarrier_status_cb,
753                                                       wrapper,
754                                                       GNUNET_NO);
755   }
756   if (NULL == barrier->whead)   /* No further propagation */
757   {
758     barrier->status = BARRIER_STATUS_INITIALISED;
759     LOG_DEBUG ("Sending BARRIER_STATUS_INITIALISED for barrier `%s'\n",
760                barrier->name);
761     send_barrier_status_msg (barrier, NULL);
762   }else
763     barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
764                                                        &fwd_tout_barrier_init,
765                                                        barrier);
766 }
767
768
769 /**
770  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.  This
771  * message should always come from a parent controller or the testbed API if we
772  * are the root controller.
773  *
774  * This handler is queued in the main service and will handle the messages sent
775  * either from the testbed driver or from a high level controller
776  *
777  * @param cls NULL
778  * @param client identification of the client
779  * @param message the actual message
780  */
781 void
782 GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
783                            const struct GNUNET_MessageHeader *message)
784 {
785   const struct GNUNET_TESTBED_BarrierCancel *msg;
786   char *name;
787   struct Barrier *barrier;
788   struct GNUNET_HashCode hash;
789   size_t name_len;
790   uint16_t msize;
791
792   if (NULL == GST_context)
793   {
794     GNUNET_break_op (0);
795     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
796     return;
797   }  
798   if (client != GST_context->client)
799   {
800     GNUNET_break_op (0);
801     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
802     return;
803   }
804   msize = ntohs (message->size);
805   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
806   {
807     GNUNET_break_op (0);
808     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
809     return;
810   }
811   msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
812   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
813   name = GNUNET_malloc (name_len + 1);
814   (void) memcpy (name, msg->name, name_len);
815   GNUNET_CRYPTO_hash (name, name_len, &hash);
816   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
817   {
818     GNUNET_break_op (0);
819     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
820     return;
821   }
822   barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash);
823   GNUNET_assert (NULL != barrier);
824   cancel_wrappers (barrier);
825   remove_barrier (barrier);
826   GNUNET_SERVER_receive_done (client, GNUNET_OK);  
827 }
828
829
830 /**
831  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
832  * This handler is queued in the main service and will handle the messages sent
833  * either from the testbed driver or from a high level controller
834  *
835  * @param cls NULL
836  * @param client identification of the client
837  * @param message the actual message
838  */
839 void
840 GST_handle_barrier_status (void *cls, struct GNUNET_SERVER_Client *client,
841                            const struct GNUNET_MessageHeader *message)
842 {
843   const struct GNUNET_TESTBED_BarrierStatusMsg *msg;
844   struct Barrier *barrier; 
845   struct ClientCtx *client_ctx;
846   const char *name;
847   struct GNUNET_HashCode key;
848   enum GNUNET_TESTBED_BarrierStatus status;
849   uint16_t msize;
850   uint16_t name_len;
851   
852   if (NULL == GST_context)
853   {
854     GNUNET_break_op (0);
855     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
856     return;
857   }  
858   if (client != GST_context->client)
859   {
860     GNUNET_break_op (0);
861     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
862     return;
863   }
864   msize = ntohs (message->size);
865   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg))
866   {
867     GNUNET_break_op (0);
868     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
869     return;
870   }
871   msg = (const struct GNUNET_TESTBED_BarrierStatusMsg *) message;
872   status = ntohs (msg->status);
873   if (BARRIER_STATUS_CROSSED != status)
874   {
875     GNUNET_break_op (0);        /* current we only expect BARRIER_CROSSED
876                                    status message this way */
877     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
878     return;
879   }
880   name = msg->data;
881   name_len = ntohs (msg->name_len);
882   if ((sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1) != msize)
883   {
884     GNUNET_break_op (0);
885     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
886     return;
887   }
888   if ('\0' != name[name_len])
889   {
890     GNUNET_break_op (0);
891     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
892     return;
893   }
894   GNUNET_CRYPTO_hash (name, name_len, &key);
895   barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key);
896   if (NULL == barrier)
897   {
898     GNUNET_break_op (0);
899     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
900     return;
901   }
902   GNUNET_SERVER_receive_done (client, GNUNET_OK);
903   while (NULL != (client_ctx = barrier->head)) /* Notify peers */
904   {
905     queue_message (client_ctx, GNUNET_copy_message (message));
906     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
907   }
908 }
909
910 /* end of gnunet-service-testbed_barriers.c */