- fix misc. memleaks
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_barriers.c
1 /*
2   This file is part of GNUnet.
3   (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file testbed/gnunet-service-testbed_barriers.c
23  * @brief barrier handling at the testbed controller
24  * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
25  */
26
27 #include "gnunet-service-testbed.h"
28 #include "gnunet-service-testbed_barriers.h"
29 #include "testbed_api_barriers.h"
30
31
32 /**
33  * timeout for outgoing message transmissions in seconds
34  */
35 #define MESSAGE_SEND_TIMEOUT(s) \
36   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
37
38
39 /**
40  * Test to see if local peers have reached the required quorum of a barrier
41  */
42 #define LOCAL_QUORUM_REACHED(barrier)           \
43   ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
44
45
46 #ifdef LOG
47 #undef LOG
48 #endif
49
50 /**
51  * Logging shorthand
52  */
53 #define LOG(kind,...)                                           \
54   GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__)
55
56
57 /**
58  * Barrier
59  */
60 struct Barrier;
61
62
63 /**
64  * Message queue for transmitting messages
65  */
66 struct MessageQueue
67 {
68   /**
69    * next pointer for DLL
70    */
71   struct MessageQueue *next;
72
73   /**
74    * prev pointer for DLL
75    */
76   struct MessageQueue *prev;
77
78   /**
79    * The message to be sent
80    */
81   struct GNUNET_MessageHeader *msg;
82 };
83
84
85 /**
86  * Context to be associated with each client
87  */
88 struct ClientCtx
89 {
90   /**
91    * The barrier this client is waiting for
92    */
93   struct Barrier *barrier;
94
95   /**
96    * DLL next ptr
97    */
98   struct ClientCtx *next;
99
100   /**
101    * DLL prev ptr
102    */
103   struct ClientCtx *prev;
104
105   /**
106    * The client handle
107    */
108   struct GNUNET_SERVER_Client *client;
109
110   /**
111    * the transmission handle
112    */
113   struct GNUNET_SERVER_TransmitHandle *tx;
114
115   /**
116    * message queue head
117    */
118   struct MessageQueue *mq_head;
119
120   /**
121    * message queue tail
122    */
123   struct MessageQueue *mq_tail;
124 };
125
126
127 /**
128  * Wrapper around Barrier handle
129  */
130 struct WBarrier
131 {
132   /**
133    * DLL next pointer
134    */
135   struct WBarrier *next;
136
137   /**
138    * DLL prev pointer
139    */
140   struct WBarrier *prev;
141
142   /**
143    * The local barrier associated with the creation of this wrapper
144    */
145   struct Barrier *barrier;
146
147   /**
148    * The barrier handle from API
149    */
150   struct GNUNET_TESTBED_Barrier *hbarrier;
151
152   /**
153    * Has this barrier been crossed?
154    */
155   uint8_t reached;
156 };
157
158
159 /**
160  * Barrier
161  */
162 struct Barrier
163 {
164   /**
165    * The hashcode of the barrier name
166    */
167   struct GNUNET_HashCode hash;
168
169   /**
170    * The client handle to the master controller
171    */
172   struct GNUNET_SERVER_Client *mc;
173
174   /**
175    * The name of the barrier
176    */
177   char *name;
178
179   /**
180    * DLL head for the list of clients waiting for this barrier
181    */
182   struct ClientCtx *head;
183
184   /**
185    * DLL tail for the list of clients waiting for this barrier
186    */
187   struct ClientCtx *tail;
188
189   /**
190    * DLL head for the list of barrier handles
191    */
192   struct WBarrier *whead;
193
194   /**
195    * DLL tail for the list of barrier handles
196    */
197   struct WBarrier *wtail;
198
199   /**
200    * Identifier for the timeout task
201    */
202   GNUNET_SCHEDULER_TaskIdentifier tout_task;
203   
204   /**
205    * The status of this barrier
206    */
207   enum GNUNET_TESTBED_BarrierStatus status;
208   
209   /**
210    * Number of barriers wrapped in the above DLL
211    */
212   unsigned int num_wbarriers;
213
214   /**
215    * Number of wrapped barriers reached so far
216    */
217   unsigned int num_wbarriers_reached;
218
219   /**
220    * Number of wrapped barrier initialised so far
221    */
222   unsigned int num_wbarriers_inited;
223
224   /**
225    * Number of peers which have reached this barrier
226    */
227   unsigned int nreached;
228
229   /**
230    * Number of slaves we have initialised this barrier
231    */
232   unsigned int nslaves;
233
234   /**
235    * Quorum percentage to be reached
236    */
237   uint8_t quorum;
238   
239 };
240
241
242 /**
243  * Hashtable handle for storing initialised barriers
244  */
245 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
246
247 /**
248  * Service context
249  */
250 static struct GNUNET_SERVICE_Context *ctx;
251
252
253 /**
254  * Function called to notify a client about the connection
255  * begin ready to queue more data.  "buf" will be
256  * NULL and "size" zero if the connection was closed for
257  * writing in the meantime.
258  *
259  * @param cls client context
260  * @param size number of bytes available in buf
261  * @param buf where the callee should write the message
262  * @return number of bytes written to buf
263  */
264 static size_t 
265 transmit_ready_cb (void *cls, size_t size, void *buf)
266 {
267   struct ClientCtx *ctx = cls;
268   struct GNUNET_SERVER_Client *client = ctx->client;
269   struct MessageQueue *mq;
270   struct GNUNET_MessageHeader *msg;
271   size_t wrote;
272
273   ctx->tx = NULL;
274   wrote = 0;
275   if ((0 == size) || (NULL == buf))
276   {
277     GNUNET_assert (NULL != ctx->client);
278     GNUNET_SERVER_client_drop (ctx->client);
279     ctx->client = NULL;    
280     return 0;
281   }
282   mq = ctx->mq_head;
283   msg = mq->msg;
284   wrote = ntohs (msg->size);
285   GNUNET_assert (size >= wrote);
286   (void) memcpy (buf, msg, wrote);
287   GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
288   GNUNET_free (mq->msg);
289   GNUNET_free (mq);
290   if (NULL != (mq = ctx->mq_head))
291     ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
292                                                   MESSAGE_SEND_TIMEOUT (30),
293                                                   &transmit_ready_cb, ctx);
294   return wrote;
295 }
296
297
298 /**
299  * Queue a message into a clients message queue
300  *
301  * @param ctx the context associated with the client
302  * @param msg the message to queue.  Will be consumed
303  */
304 static void
305 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
306 {
307   struct MessageQueue *mq;
308   struct GNUNET_SERVER_Client *client = ctx->client;
309   
310   mq = GNUNET_malloc (sizeof (struct MessageQueue));
311   mq->msg = msg;
312   LOG_DEBUG ("Queueing message of type %u, size %u for sending\n",
313              ntohs (msg->type), ntohs (msg->size));
314   GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
315   if (NULL == ctx->tx)
316    ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
317                                                   MESSAGE_SEND_TIMEOUT (30),
318                                                   &transmit_ready_cb, ctx);
319 }
320
321
322 /**
323  * Function to cleanup client context data structure
324  *
325  * @param ctx the client context data structure
326  */
327 static void
328 cleanup_clientctx (struct ClientCtx *ctx)
329 {
330   struct MessageQueue *mq;
331   
332   if (NULL != ctx->client)
333   {
334     GNUNET_SERVER_client_set_user_context_ (ctx->client, NULL, 0);
335     GNUNET_SERVER_client_drop (ctx->client);
336   } 
337   if (NULL != ctx->tx)
338     GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
339   if (NULL != (mq = ctx->mq_head))
340   {
341     GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
342     GNUNET_free (mq->msg);
343     GNUNET_free (mq);
344   }
345   GNUNET_free (ctx);
346 }
347
348
349 /**
350  * Function to remove a barrier from the barrier map and cleanup resources
351  * occupied by a barrier
352  *
353  * @param barrier the barrier handle
354  */
355 static void
356 remove_barrier (struct Barrier *barrier)
357 {
358   struct ClientCtx *ctx;
359   
360   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
361                                                                     &barrier->hash,
362                                                                     barrier));
363   while (NULL != (ctx = barrier->head))
364   {
365     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx);
366     cleanup_clientctx (ctx);
367   }
368   GNUNET_free (barrier->name);
369   GNUNET_SERVER_client_drop (barrier->mc);
370   GNUNET_free (barrier);
371 }
372
373
374 /**
375  * Cancels all subcontroller barrier handles
376  *
377  * @param barrier the local barrier
378  */
379 static void
380 cancel_wrappers (struct Barrier *barrier)
381 {
382   struct WBarrier *wrapper;
383
384   while (NULL != (wrapper = barrier->whead))
385   {
386     GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
387     GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
388     GNUNET_free (wrapper);
389   }
390 }
391
392
393 /**
394  * Send a status message about a barrier to the given client
395  *
396  * @param client the client to send the message to
397  * @param name the barrier name
398  * @param status the status of the barrier
399  * @param emsg the error message; should be non-NULL for
400  *   status=GNUNET_TESTBED_BARRIERSTATUS_ERROR 
401  */
402 static void
403 send_client_status_msg (struct GNUNET_SERVER_Client *client,
404                         const char *name,
405                         enum GNUNET_TESTBED_BarrierStatus status,
406                         const char *emsg)
407 {
408   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
409   size_t name_len;
410   uint16_t msize;
411
412   GNUNET_assert ((NULL == emsg) || (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status));
413   name_len = strlen (name);
414   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
415       + (name_len + 1)
416       + ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
417   msg = GNUNET_malloc (msize);
418   msg->header.size = htons (msize);
419   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
420   msg->status = htons (status);
421   msg->name_len = htons ((uint16_t) name_len);
422   (void) memcpy (msg->data, name, name_len);
423   if (NULL != emsg)
424     (void) memcpy (msg->data + name_len + 1, emsg, strlen (emsg));
425   GST_queue_message (client, &msg->header);
426 }
427
428
429 /**
430  * Sends a barrier failed message
431  *
432  * @param barrier the corresponding barrier
433  * @param emsg the error message; should be non-NULL for
434  *   status=GNUNET_TESTBED_BARRIERSTATUS_ERROR 
435  */
436 static void
437 send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
438 {
439   GNUNET_assert (0 != barrier->status);
440   send_client_status_msg (barrier->mc, barrier->name, barrier->status, emsg);
441 }
442
443
444 /**
445  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.  This
446  * message should come from peers or a shared helper service using the
447  * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
448  *
449  * This handler is queued in the main service and will handle the messages sent
450  * either from the testbed driver or from a high level controller
451  *
452  * @param cls NULL
453  * @param client identification of the client
454  * @param message the actual message
455  */
456 static void
457 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
458                      const struct GNUNET_MessageHeader *message)
459 {
460   const struct GNUNET_TESTBED_BarrierWait *msg;
461   struct Barrier *barrier;
462   char *name;
463   struct ClientCtx *client_ctx;
464   struct GNUNET_HashCode key;
465   size_t name_len;
466   uint16_t msize;
467   
468   msize = ntohs (message->size);
469   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
470   {
471     GNUNET_break_op (0);
472     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
473     return;
474   }
475   if (NULL == barrier_map)
476   {
477     GNUNET_break (0);
478     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
479     return;
480   }
481   msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
482   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
483   name = GNUNET_malloc (name_len + 1);
484   name[name_len] = '\0';
485   (void) memcpy (name, msg->name, name_len);
486   LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n", name);
487   GNUNET_CRYPTO_hash (name, name_len, &key);
488   GNUNET_free (name);
489   if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
490   {
491     GNUNET_break (0);
492     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
493     return;
494   }
495   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
496   if (NULL == client_ctx)
497   {
498     client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
499     client_ctx->client = client;
500     GNUNET_SERVER_client_keep (client);
501     client_ctx->barrier = barrier;
502     GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
503     GNUNET_SERVER_client_set_user_context (client, client_ctx); 
504   }
505   barrier->nreached++;
506   if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
507         && (LOCAL_QUORUM_REACHED (barrier)))
508   {
509     barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
510     send_barrier_status_msg (barrier, NULL);
511   }
512   GNUNET_SERVER_receive_done (client, GNUNET_OK);
513 }
514
515
516 /**
517  * Functions with this signature are called whenever a client
518  * is disconnected on the network level.
519  *
520  * @param cls closure
521  * @param client identification of the client; NULL
522  *        for the last call when the server is destroyed
523  */
524 static void
525 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
526 {
527   struct ClientCtx *client_ctx;
528   
529   if (NULL == client)
530     return;
531   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
532   if (NULL == client_ctx)
533     return;
534   cleanup_clientctx (client_ctx);
535 }
536
537
538 /**
539  * Function to initialise barrriers component
540  *
541  * @param cfg the configuration to use for initialisation
542  */
543 void
544 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
545 {
546   static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
547     {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
548     {NULL, NULL, 0, 0}
549   };
550   struct GNUNET_SERVER_Handle *srv;
551
552   barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
553   ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
554                               GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
555   srv = GNUNET_SERVICE_get_server (ctx);
556   GNUNET_SERVER_add_handlers (srv, message_handlers);
557   GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);  
558 }
559
560
561 /**
562  * Iterator over hash map entries.
563  *
564  * @param cls closure
565  * @param key current key code
566  * @param value value in the hash map
567  * @return #GNUNET_YES if we should continue to
568  *         iterate,
569  *         #GNUNET_NO if not.
570  */
571 static int 
572 barrier_destroy_iterator (void *cls,
573                           const struct GNUNET_HashCode *key,
574                           void *value)
575 {
576   struct Barrier *barrier = value;
577
578   GNUNET_assert (NULL != barrier);
579   cancel_wrappers (barrier);
580   remove_barrier (barrier);
581   return GNUNET_YES;
582 }
583
584
585 /**
586  * Function to stop the barrier service
587  */
588 void
589 GST_barriers_destroy ()
590 {
591   GNUNET_assert (NULL != barrier_map);
592   GNUNET_assert (GNUNET_SYSERR !=
593                  GNUNET_CONTAINER_multihashmap_iterate (barrier_map,
594                                                         &barrier_destroy_iterator,
595                                                         NULL));
596   GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
597   GNUNET_assert (NULL != ctx);
598   GNUNET_SERVICE_stop (ctx);
599 }
600
601
602 /**
603  * Functions of this type are to be given as callback argument to
604  * GNUNET_TESTBED_barrier_init().  The callback will be called when status
605  * information is available for the barrier.
606  *
607  * @param cls the closure given to GNUNET_TESTBED_barrier_init()
608  * @param name the name of the barrier
609  * @param b_ the barrier handle
610  * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
611  *   GNUNET_SYSERR upon error
612  * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
613  *   error messsage
614  */
615 static void 
616 wbarrier_status_cb (void *cls, const char *name,
617                     struct GNUNET_TESTBED_Barrier *b_,
618                     enum GNUNET_TESTBED_BarrierStatus status,
619                     const char *emsg)
620 {
621   struct WBarrier *wrapper = cls;
622   struct Barrier *barrier = wrapper->barrier;
623
624   GNUNET_assert (b_ == wrapper->hbarrier);
625   wrapper->hbarrier = NULL;
626   GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
627   GNUNET_free (wrapper);
628   switch (status)
629   {
630   case GNUNET_TESTBED_BARRIERSTATUS_ERROR:
631     LOG (GNUNET_ERROR_TYPE_ERROR,
632          "Initialising barrier `%s' failed at a sub-controller: %s\n",
633          barrier->name, (NULL != emsg) ? emsg : "NULL");
634     cancel_wrappers (barrier);
635     if (NULL == emsg)
636       emsg = "Initialisation failed at a sub-controller";
637     barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
638     send_barrier_status_msg (barrier, emsg);
639     return;
640   case GNUNET_TESTBED_BARRIERSTATUS_CROSSED:
641     if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED != barrier->status)
642     {
643       GNUNET_break_op (0);
644       return;
645     }
646     barrier->num_wbarriers_reached++;
647     if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
648         && (LOCAL_QUORUM_REACHED (barrier)))
649     {
650       barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
651       send_barrier_status_msg (barrier, NULL);
652     }
653     return;
654   case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED:
655     if (0 != barrier->status)
656     {
657       GNUNET_break_op (0);
658       return;
659     }
660     barrier->num_wbarriers_inited++;
661     if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
662     {
663       barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
664       send_barrier_status_msg (barrier, NULL);
665     }
666     return;
667   }
668 }
669
670
671 /**
672  * Function called upon timeout while waiting for a response from the
673  * subcontrollers to barrier init message
674  *
675  * @param cls barrier
676  * @param tc scheduler task context
677  */
678 static void
679 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
680 {
681   struct Barrier *barrier = cls;
682   
683   cancel_wrappers (barrier);
684   barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
685   send_barrier_status_msg (barrier,
686                            "Timedout while propagating barrier initialisation\n");
687   remove_barrier (barrier);
688 }
689
690
691 /**
692  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.  This
693  * message should always come from a parent controller or the testbed API if we
694  * are the root controller.
695  *
696  * This handler is queued in the main service and will handle the messages sent
697  * either from the testbed driver or from a high level controller
698  *
699  * @param cls NULL
700  * @param client identification of the client
701  * @param message the actual message
702  */
703 void
704 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
705                          const struct GNUNET_MessageHeader *message)
706 {
707   const struct GNUNET_TESTBED_BarrierInit *msg;
708   char *name;
709   struct Barrier *barrier;
710   struct Slave *slave;
711   struct WBarrier *wrapper;
712   struct GNUNET_HashCode hash;
713   size_t name_len;
714   unsigned int cnt;
715   uint16_t msize;
716   
717   if (NULL == GST_context)
718   {
719     GNUNET_break_op (0);
720     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
721     return;
722   }
723   if (client != GST_context->client)
724   {
725     GNUNET_break_op (0);
726     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
727     return;
728   }
729   msize = ntohs (message->size);
730   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
731   {
732     GNUNET_break_op (0);
733     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
734     return;
735   }
736   msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
737   name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
738   name = GNUNET_malloc (name_len + 1);
739   (void) memcpy (name, msg->name, name_len);
740   GNUNET_CRYPTO_hash (name, name_len, &hash);
741   LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n", name);
742   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
743   {
744     
745     send_client_status_msg (client, name, GNUNET_TESTBED_BARRIERSTATUS_ERROR,
746                             "A barrier with the same name already exists");
747     GNUNET_free (name);
748     GNUNET_SERVER_receive_done (client, GNUNET_OK);
749     return;
750   }
751   barrier = GNUNET_malloc (sizeof (struct Barrier));
752   (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
753   barrier->quorum = msg->quorum;
754   barrier->name = name;
755   barrier->mc = client;
756   GNUNET_SERVER_client_keep (client);
757   GNUNET_assert (GNUNET_OK ==
758                  GNUNET_CONTAINER_multihashmap_put (barrier_map,
759                                                     &barrier->hash,
760                                                     barrier,
761                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
762   GNUNET_SERVER_receive_done (client, GNUNET_OK);
763   /* Propagate barrier init to subcontrollers */
764   for (cnt = 0; cnt < GST_slave_list_size; cnt++)
765   {
766     if (NULL == (slave = GST_slave_list[cnt]))
767       continue;
768     if (NULL == slave->controller)
769     {
770       GNUNET_break (0);/* May happen when we are connecting to the controller */
771       continue;
772     }
773     wrapper = GNUNET_malloc (sizeof (struct WBarrier));
774     wrapper->barrier = barrier;
775     GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
776     wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (slave->controller,
777                                                       barrier->name,
778                                                       barrier->quorum,
779                                                       &wbarrier_status_cb,
780                                                       wrapper,
781                                                       GNUNET_NO);
782   }
783   if (NULL == barrier->whead)   /* No further propagation */
784   {
785     barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
786     LOG_DEBUG ("Sending GNUNET_TESTBED_BARRIERSTATUS_INITIALISED for barrier `%s'\n",
787                barrier->name);
788     send_barrier_status_msg (barrier, NULL);
789   }else
790     barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
791                                                        &fwd_tout_barrier_init,
792                                                        barrier);
793 }
794
795
796 /**
797  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.  This
798  * message should always come from a parent controller or the testbed API if we
799  * are the root controller.
800  *
801  * This handler is queued in the main service and will handle the messages sent
802  * either from the testbed driver or from a high level controller
803  *
804  * @param cls NULL
805  * @param client identification of the client
806  * @param message the actual message
807  */
808 void
809 GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
810                            const struct GNUNET_MessageHeader *message)
811 {
812   const struct GNUNET_TESTBED_BarrierCancel *msg;
813   char *name;
814   struct Barrier *barrier;
815   struct GNUNET_HashCode hash;
816   size_t name_len;
817   uint16_t msize;
818
819   if (NULL == GST_context)
820   {
821     GNUNET_break_op (0);
822     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
823     return;
824   }  
825   if (client != GST_context->client)
826   {
827     GNUNET_break_op (0);
828     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
829     return;
830   }
831   msize = ntohs (message->size);
832   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
833   {
834     GNUNET_break_op (0);
835     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
836     return;
837   }
838   msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
839   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
840   name = GNUNET_malloc (name_len + 1);
841   (void) memcpy (name, msg->name, name_len);
842   GNUNET_CRYPTO_hash (name, name_len, &hash);
843   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
844   {
845     GNUNET_break_op (0);
846     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
847     return;
848   }
849   barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash);
850   GNUNET_assert (NULL != barrier);
851   cancel_wrappers (barrier);
852   remove_barrier (barrier);
853   GNUNET_SERVER_receive_done (client, GNUNET_OK);  
854 }
855
856
857 /**
858  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
859  * This handler is queued in the main service and will handle the messages sent
860  * either from the testbed driver or from a high level controller
861  *
862  * @param cls NULL
863  * @param client identification of the client
864  * @param message the actual message
865  */
866 void
867 GST_handle_barrier_status (void *cls, struct GNUNET_SERVER_Client *client,
868                            const struct GNUNET_MessageHeader *message)
869 {
870   const struct GNUNET_TESTBED_BarrierStatusMsg *msg;
871   struct Barrier *barrier; 
872   struct ClientCtx *client_ctx;
873   const char *name;
874   struct GNUNET_HashCode key;
875   enum GNUNET_TESTBED_BarrierStatus status;
876   uint16_t msize;
877   uint16_t name_len;
878   
879   if (NULL == GST_context)
880   {
881     GNUNET_break_op (0);
882     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
883     return;
884   }  
885   if (client != GST_context->client)
886   {
887     GNUNET_break_op (0);
888     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
889     return;
890   }
891   msize = ntohs (message->size);
892   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg))
893   {
894     GNUNET_break_op (0);
895     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
896     return;
897   }
898   msg = (const struct GNUNET_TESTBED_BarrierStatusMsg *) message;
899   status = ntohs (msg->status);
900   if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status)
901   {
902     GNUNET_break_op (0);        /* current we only expect BARRIER_CROSSED
903                                    status message this way */
904     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
905     return;
906   }
907   name = msg->data;
908   name_len = ntohs (msg->name_len);
909   if ((sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1) != msize)
910   {
911     GNUNET_break_op (0);
912     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
913     return;
914   }
915   if ('\0' != name[name_len])
916   {
917     GNUNET_break_op (0);
918     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
919     return;
920   }
921   GNUNET_CRYPTO_hash (name, name_len, &key);
922   barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key);
923   if (NULL == barrier)
924   {
925     GNUNET_break_op (0);
926     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
927     return;
928   }
929   GNUNET_SERVER_receive_done (client, GNUNET_OK);
930   while (NULL != (client_ctx = barrier->head)) /* Notify peers */
931   {
932     queue_message (client_ctx, GNUNET_copy_message (message));
933     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
934   }
935 }
936
937 /* end of gnunet-service-testbed_barriers.c */