malloc -> new
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_barriers.c
1 /*
2   This file is part of GNUnet.
3   (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file testbed/gnunet-service-testbed_barriers.c
23  * @brief barrier handling at the testbed controller
24  * @author Sree Harsha Totakura <sreeharsha@totakura.in>
25  */
26
27 #include "gnunet-service-testbed.h"
28 #include "gnunet-service-testbed_barriers.h"
29 #include "testbed_api_barriers.h"
30
31
32 /**
33  * timeout for outgoing message transmissions in seconds
34  */
35 #define MESSAGE_SEND_TIMEOUT(s) \
36   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
37
38
39 /**
40  * Test to see if local peers have reached the required quorum of a barrier
41  */
42 #define LOCAL_QUORUM_REACHED(barrier)           \
43   ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
44
45
46 #ifdef LOG
47 #undef LOG
48 #endif
49
50 /**
51  * Logging shorthand
52  */
53 #define LOG(kind,...)                                           \
54   GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__)
55
56
57 /**
58  * Barrier
59  */
60 struct Barrier;
61
62
63 /**
64  * Message queue for transmitting messages
65  */
66 struct MessageQueue
67 {
68   /**
69    * next pointer for DLL
70    */
71   struct MessageQueue *next;
72
73   /**
74    * prev pointer for DLL
75    */
76   struct MessageQueue *prev;
77
78   /**
79    * The message to be sent
80    */
81   struct GNUNET_MessageHeader *msg;
82 };
83
84
85 /**
86  * Context to be associated with each client
87  */
88 struct ClientCtx
89 {
90   /**
91    * The barrier this client is waiting for
92    */
93   struct Barrier *barrier;
94
95   /**
96    * DLL next ptr
97    */
98   struct ClientCtx *next;
99
100   /**
101    * DLL prev ptr
102    */
103   struct ClientCtx *prev;
104
105   /**
106    * The client handle
107    */
108   struct GNUNET_SERVER_Client *client;
109
110   /**
111    * the transmission handle
112    */
113   struct GNUNET_SERVER_TransmitHandle *tx;
114
115   /**
116    * message queue head
117    */
118   struct MessageQueue *mq_head;
119
120   /**
121    * message queue tail
122    */
123   struct MessageQueue *mq_tail;
124 };
125
126
127 /**
128  * Wrapper around Barrier handle
129  */
130 struct WBarrier
131 {
132   /**
133    * DLL next pointer
134    */
135   struct WBarrier *next;
136
137   /**
138    * DLL prev pointer
139    */
140   struct WBarrier *prev;
141
142   /**
143    * The local barrier associated with the creation of this wrapper
144    */
145   struct Barrier *barrier;
146
147   /**
148    * The barrier handle from API
149    */
150   struct GNUNET_TESTBED_Barrier *hbarrier;
151
152   /**
153    * Has this barrier been crossed?
154    */
155   uint8_t reached;
156 };
157
158
159 /**
160  * Barrier
161  */
162 struct Barrier
163 {
164   /**
165    * The hashcode of the barrier name
166    */
167   struct GNUNET_HashCode hash;
168
169   /**
170    * The client handle to the master controller
171    */
172   struct GNUNET_SERVER_Client *mc;
173
174   /**
175    * The name of the barrier
176    */
177   char *name;
178
179   /**
180    * DLL head for the list of clients waiting for this barrier
181    */
182   struct ClientCtx *head;
183
184   /**
185    * DLL tail for the list of clients waiting for this barrier
186    */
187   struct ClientCtx *tail;
188
189   /**
190    * DLL head for the list of barrier handles
191    */
192   struct WBarrier *whead;
193
194   /**
195    * DLL tail for the list of barrier handles
196    */
197   struct WBarrier *wtail;
198
199   /**
200    * Identifier for the timeout task
201    */
202   GNUNET_SCHEDULER_TaskIdentifier tout_task;
203
204   /**
205    * The status of this barrier
206    */
207   enum GNUNET_TESTBED_BarrierStatus status;
208
209   /**
210    * Number of barriers wrapped in the above DLL
211    */
212   unsigned int num_wbarriers;
213
214   /**
215    * Number of wrapped barriers reached so far
216    */
217   unsigned int num_wbarriers_reached;
218
219   /**
220    * Number of wrapped barrier initialised so far
221    */
222   unsigned int num_wbarriers_inited;
223
224   /**
225    * Number of peers which have reached this barrier
226    */
227   unsigned int nreached;
228
229   /**
230    * Number of slaves we have initialised this barrier
231    */
232   unsigned int nslaves;
233
234   /**
235    * Quorum percentage to be reached
236    */
237   uint8_t quorum;
238
239 };
240
241
242 /**
243  * Hashtable handle for storing initialised barriers
244  */
245 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
246
247 /**
248  * Service context
249  */
250 static struct GNUNET_SERVICE_Context *ctx;
251
252
253 /**
254  * Function called to notify a client about the connection
255  * begin ready to queue more data.  "buf" will be
256  * NULL and "size" zero if the connection was closed for
257  * writing in the meantime.
258  *
259  * @param cls client context
260  * @param size number of bytes available in buf
261  * @param buf where the callee should write the message
262  * @return number of bytes written to buf
263  */
264 static size_t
265 transmit_ready_cb (void *cls, size_t size, void *buf)
266 {
267   struct ClientCtx *ctx = cls;
268   struct GNUNET_SERVER_Client *client = ctx->client;
269   struct MessageQueue *mq;
270   struct GNUNET_MessageHeader *msg;
271   size_t wrote;
272
273   ctx->tx = NULL;
274   if ((0 == size) || (NULL == buf))
275   {
276     GNUNET_assert (NULL != ctx->client);
277     GNUNET_SERVER_client_drop (ctx->client);
278     ctx->client = NULL;
279     return 0;
280   }
281   mq = ctx->mq_head;
282   msg = mq->msg;
283   wrote = ntohs (msg->size);
284   GNUNET_assert (size >= wrote);
285   (void) memcpy (buf, msg, wrote);
286   GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
287   GNUNET_free (mq->msg);
288   GNUNET_free (mq);
289   if (NULL != (mq = ctx->mq_head))
290     ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
291                                                   MESSAGE_SEND_TIMEOUT (30),
292                                                   &transmit_ready_cb, ctx);
293   return wrote;
294 }
295
296
297 /**
298  * Queue a message into a clients message queue
299  *
300  * @param ctx the context associated with the client
301  * @param msg the message to queue.  Will be consumed
302  */
303 static void
304 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
305 {
306   struct MessageQueue *mq;
307   struct GNUNET_SERVER_Client *client = ctx->client;
308
309   mq = GNUNET_new (struct MessageQueue);
310   mq->msg = msg;
311   LOG_DEBUG ("Queueing message of type %u, size %u for sending\n",
312              ntohs (msg->type), ntohs (msg->size));
313   GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
314   if (NULL == ctx->tx)
315    ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
316                                                   MESSAGE_SEND_TIMEOUT (30),
317                                                   &transmit_ready_cb, ctx);
318 }
319
320
321 /**
322  * Function to cleanup client context data structure
323  *
324  * @param ctx the client context data structure
325  */
326 static void
327 cleanup_clientctx (struct ClientCtx *ctx)
328 {
329   struct MessageQueue *mq;
330
331   if (NULL != ctx->client)
332   {
333     GNUNET_SERVER_client_set_user_context_ (ctx->client, NULL, 0);
334     GNUNET_SERVER_client_drop (ctx->client);
335   }
336   if (NULL != ctx->tx)
337     GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
338   if (NULL != (mq = ctx->mq_head))
339   {
340     GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
341     GNUNET_free (mq->msg);
342     GNUNET_free (mq);
343   }
344   GNUNET_free (ctx);
345 }
346
347
348 /**
349  * Function to remove a barrier from the barrier map and cleanup resources
350  * occupied by a barrier
351  *
352  * @param barrier the barrier handle
353  */
354 static void
355 remove_barrier (struct Barrier *barrier)
356 {
357   struct ClientCtx *ctx;
358
359   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
360                                                                     &barrier->hash,
361                                                                     barrier));
362   while (NULL != (ctx = barrier->head))
363   {
364     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx);
365     cleanup_clientctx (ctx);
366   }
367   GNUNET_free (barrier->name);
368   GNUNET_SERVER_client_drop (barrier->mc);
369   GNUNET_free (barrier);
370 }
371
372
373 /**
374  * Cancels all subcontroller barrier handles
375  *
376  * @param barrier the local barrier
377  */
378 static void
379 cancel_wrappers (struct Barrier *barrier)
380 {
381   struct WBarrier *wrapper;
382
383   while (NULL != (wrapper = barrier->whead))
384   {
385     GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
386     GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
387     GNUNET_free (wrapper);
388   }
389 }
390
391
392 /**
393  * Send a status message about a barrier to the given client
394  *
395  * @param client the client to send the message to
396  * @param name the barrier name
397  * @param status the status of the barrier
398  * @param emsg the error message; should be non-NULL for
399  *   status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
400  */
401 static void
402 send_client_status_msg (struct GNUNET_SERVER_Client *client,
403                         const char *name,
404                         enum GNUNET_TESTBED_BarrierStatus status,
405                         const char *emsg)
406 {
407   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
408   size_t name_len;
409   uint16_t msize;
410
411   GNUNET_assert ((NULL == emsg) || (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status));
412   name_len = strlen (name);
413   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
414       + (name_len + 1)
415       + ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
416   msg = GNUNET_malloc (msize);
417   msg->header.size = htons (msize);
418   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
419   msg->status = htons (status);
420   msg->name_len = htons ((uint16_t) name_len);
421   (void) memcpy (msg->data, name, name_len);
422   if (NULL != emsg)
423     (void) memcpy (msg->data + name_len + 1, emsg, strlen (emsg));
424   GST_queue_message (client, &msg->header);
425 }
426
427
428 /**
429  * Sends a barrier failed message
430  *
431  * @param barrier the corresponding barrier
432  * @param emsg the error message; should be non-NULL for
433  *   status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
434  */
435 static void
436 send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
437 {
438   GNUNET_assert (0 != barrier->status);
439   send_client_status_msg (barrier->mc, barrier->name, barrier->status, emsg);
440 }
441
442
443 /**
444  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.  This
445  * message should come from peers or a shared helper service using the
446  * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
447  *
448  * This handler is queued in the main service and will handle the messages sent
449  * either from the testbed driver or from a high level controller
450  *
451  * @param cls NULL
452  * @param client identification of the client
453  * @param message the actual message
454  */
455 static void
456 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
457                      const struct GNUNET_MessageHeader *message)
458 {
459   const struct GNUNET_TESTBED_BarrierWait *msg;
460   struct Barrier *barrier;
461   char *name;
462   struct ClientCtx *client_ctx;
463   struct GNUNET_HashCode key;
464   size_t name_len;
465   uint16_t msize;
466
467   msize = ntohs (message->size);
468   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
469   {
470     GNUNET_break_op (0);
471     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
472     return;
473   }
474   if (NULL == barrier_map)
475   {
476     GNUNET_break (0);
477     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
478     return;
479   }
480   msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
481   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
482   name = GNUNET_malloc (name_len + 1);
483   name[name_len] = '\0';
484   (void) memcpy (name, msg->name, name_len);
485   LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n", name);
486   GNUNET_CRYPTO_hash (name, name_len, &key);
487   GNUNET_free (name);
488   if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
489   {
490     GNUNET_break (0);
491     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
492     return;
493   }
494   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
495   if (NULL == client_ctx)
496   {
497     client_ctx = GNUNET_new (struct ClientCtx);
498     client_ctx->client = client;
499     GNUNET_SERVER_client_keep (client);
500     client_ctx->barrier = barrier;
501     GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
502     GNUNET_SERVER_client_set_user_context (client, client_ctx);
503   }
504   barrier->nreached++;
505   if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
506         && (LOCAL_QUORUM_REACHED (barrier)))
507   {
508     barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
509     send_barrier_status_msg (barrier, NULL);
510   }
511   GNUNET_SERVER_receive_done (client, GNUNET_OK);
512 }
513
514
515 /**
516  * Functions with this signature are called whenever a client
517  * is disconnected on the network level.
518  *
519  * @param cls closure
520  * @param client identification of the client; NULL
521  *        for the last call when the server is destroyed
522  */
523 static void
524 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
525 {
526   struct ClientCtx *client_ctx;
527
528   if (NULL == client)
529     return;
530   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
531   if (NULL == client_ctx)
532     return;
533   cleanup_clientctx (client_ctx);
534 }
535
536
537 /**
538  * Function to initialise barrriers component
539  *
540  * @param cfg the configuration to use for initialisation
541  */
542 void
543 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
544 {
545   static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
546     {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
547     {NULL, NULL, 0, 0}
548   };
549   struct GNUNET_SERVER_Handle *srv;
550
551   barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
552   ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
553                               GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
554   srv = GNUNET_SERVICE_get_server (ctx);
555   GNUNET_SERVER_add_handlers (srv, message_handlers);
556   GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);
557 }
558
559
560 /**
561  * Iterator over hash map entries.
562  *
563  * @param cls closure
564  * @param key current key code
565  * @param value value in the hash map
566  * @return #GNUNET_YES if we should continue to
567  *         iterate,
568  *         #GNUNET_NO if not.
569  */
570 static int
571 barrier_destroy_iterator (void *cls,
572                           const struct GNUNET_HashCode *key,
573                           void *value)
574 {
575   struct Barrier *barrier = value;
576
577   GNUNET_assert (NULL != barrier);
578   cancel_wrappers (barrier);
579   remove_barrier (barrier);
580   return GNUNET_YES;
581 }
582
583
584 /**
585  * Function to stop the barrier service
586  */
587 void
588 GST_barriers_destroy ()
589 {
590   GNUNET_assert (NULL != barrier_map);
591   GNUNET_assert (GNUNET_SYSERR !=
592                  GNUNET_CONTAINER_multihashmap_iterate (barrier_map,
593                                                         &barrier_destroy_iterator,
594                                                         NULL));
595   GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
596   GNUNET_assert (NULL != ctx);
597   GNUNET_SERVICE_stop (ctx);
598 }
599
600
601 /**
602  * Functions of this type are to be given as callback argument to
603  * GNUNET_TESTBED_barrier_init().  The callback will be called when status
604  * information is available for the barrier.
605  *
606  * @param cls the closure given to GNUNET_TESTBED_barrier_init()
607  * @param name the name of the barrier
608  * @param b_ the barrier handle
609  * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
610  *   GNUNET_SYSERR upon error
611  * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
612  *   error messsage
613  */
614 static void
615 wbarrier_status_cb (void *cls, const char *name,
616                     struct GNUNET_TESTBED_Barrier *b_,
617                     enum GNUNET_TESTBED_BarrierStatus status,
618                     const char *emsg)
619 {
620   struct WBarrier *wrapper = cls;
621   struct Barrier *barrier = wrapper->barrier;
622
623   GNUNET_assert (b_ == wrapper->hbarrier);
624   wrapper->hbarrier = NULL;
625   GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
626   GNUNET_free (wrapper);
627   switch (status)
628   {
629   case GNUNET_TESTBED_BARRIERSTATUS_ERROR:
630     LOG (GNUNET_ERROR_TYPE_ERROR,
631          "Initialising barrier `%s' failed at a sub-controller: %s\n",
632          barrier->name, (NULL != emsg) ? emsg : "NULL");
633     cancel_wrappers (barrier);
634     if (NULL == emsg)
635       emsg = "Initialisation failed at a sub-controller";
636     barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
637     send_barrier_status_msg (barrier, emsg);
638     return;
639   case GNUNET_TESTBED_BARRIERSTATUS_CROSSED:
640     if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED != barrier->status)
641     {
642       GNUNET_break_op (0);
643       return;
644     }
645     barrier->num_wbarriers_reached++;
646     if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
647         && (LOCAL_QUORUM_REACHED (barrier)))
648     {
649       barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
650       send_barrier_status_msg (barrier, NULL);
651     }
652     return;
653   case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED:
654     if (0 != barrier->status)
655     {
656       GNUNET_break_op (0);
657       return;
658     }
659     barrier->num_wbarriers_inited++;
660     if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
661     {
662       barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
663       send_barrier_status_msg (barrier, NULL);
664     }
665     return;
666   }
667 }
668
669
670 /**
671  * Function called upon timeout while waiting for a response from the
672  * subcontrollers to barrier init message
673  *
674  * @param cls barrier
675  * @param tc scheduler task context
676  */
677 static void
678 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
679 {
680   struct Barrier *barrier = cls;
681
682   cancel_wrappers (barrier);
683   barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
684   send_barrier_status_msg (barrier,
685                            "Timedout while propagating barrier initialisation\n");
686   remove_barrier (barrier);
687 }
688
689
690 /**
691  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.  This
692  * message should always come from a parent controller or the testbed API if we
693  * are the root controller.
694  *
695  * This handler is queued in the main service and will handle the messages sent
696  * either from the testbed driver or from a high level controller
697  *
698  * @param cls NULL
699  * @param client identification of the client
700  * @param message the actual message
701  */
702 void
703 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
704                          const struct GNUNET_MessageHeader *message)
705 {
706   const struct GNUNET_TESTBED_BarrierInit *msg;
707   char *name;
708   struct Barrier *barrier;
709   struct Slave *slave;
710   struct WBarrier *wrapper;
711   struct GNUNET_HashCode hash;
712   size_t name_len;
713   unsigned int cnt;
714   uint16_t msize;
715
716   if (NULL == GST_context)
717   {
718     GNUNET_break_op (0);
719     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
720     return;
721   }
722   if (client != GST_context->client)
723   {
724     GNUNET_break_op (0);
725     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
726     return;
727   }
728   msize = ntohs (message->size);
729   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
730   {
731     GNUNET_break_op (0);
732     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
733     return;
734   }
735   msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
736   name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
737   name = GNUNET_malloc (name_len + 1);
738   (void) memcpy (name, msg->name, name_len);
739   GNUNET_CRYPTO_hash (name, name_len, &hash);
740   LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n", name);
741   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
742   {
743
744     send_client_status_msg (client, name, GNUNET_TESTBED_BARRIERSTATUS_ERROR,
745                             "A barrier with the same name already exists");
746     GNUNET_free (name);
747     GNUNET_SERVER_receive_done (client, GNUNET_OK);
748     return;
749   }
750   barrier = GNUNET_new (struct Barrier);
751   (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
752   barrier->quorum = msg->quorum;
753   barrier->name = name;
754   barrier->mc = client;
755   GNUNET_SERVER_client_keep (client);
756   GNUNET_assert (GNUNET_OK ==
757                  GNUNET_CONTAINER_multihashmap_put (barrier_map,
758                                                     &barrier->hash,
759                                                     barrier,
760                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
761   GNUNET_SERVER_receive_done (client, GNUNET_OK);
762   /* Propagate barrier init to subcontrollers */
763   for (cnt = 0; cnt < GST_slave_list_size; cnt++)
764   {
765     if (NULL == (slave = GST_slave_list[cnt]))
766       continue;
767     if (NULL == slave->controller)
768     {
769       GNUNET_break (0);/* May happen when we are connecting to the controller */
770       continue;
771     }
772     wrapper = GNUNET_new (struct WBarrier);
773     wrapper->barrier = barrier;
774     GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
775     wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (slave->controller,
776                                                       barrier->name,
777                                                       barrier->quorum,
778                                                       &wbarrier_status_cb,
779                                                       wrapper,
780                                                       GNUNET_NO);
781   }
782   if (NULL == barrier->whead)   /* No further propagation */
783   {
784     barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
785     LOG_DEBUG ("Sending GNUNET_TESTBED_BARRIERSTATUS_INITIALISED for barrier `%s'\n",
786                barrier->name);
787     send_barrier_status_msg (barrier, NULL);
788   }else
789     barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
790                                                        &fwd_tout_barrier_init,
791                                                        barrier);
792 }
793
794
795 /**
796  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.  This
797  * message should always come from a parent controller or the testbed API if we
798  * are the root controller.
799  *
800  * This handler is queued in the main service and will handle the messages sent
801  * either from the testbed driver or from a high level controller
802  *
803  * @param cls NULL
804  * @param client identification of the client
805  * @param message the actual message
806  */
807 void
808 GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
809                            const struct GNUNET_MessageHeader *message)
810 {
811   const struct GNUNET_TESTBED_BarrierCancel *msg;
812   char *name;
813   struct Barrier *barrier;
814   struct GNUNET_HashCode hash;
815   size_t name_len;
816   uint16_t msize;
817
818   if (NULL == GST_context)
819   {
820     GNUNET_break_op (0);
821     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
822     return;
823   }
824   if (client != GST_context->client)
825   {
826     GNUNET_break_op (0);
827     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
828     return;
829   }
830   msize = ntohs (message->size);
831   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
832   {
833     GNUNET_break_op (0);
834     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
835     return;
836   }
837   msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
838   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
839   name = GNUNET_malloc (name_len + 1);
840   (void) memcpy (name, msg->name, name_len);
841   GNUNET_CRYPTO_hash (name, name_len, &hash);
842   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
843   {
844     GNUNET_break_op (0);
845     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
846     return;
847   }
848   barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash);
849   GNUNET_assert (NULL != barrier);
850   cancel_wrappers (barrier);
851   remove_barrier (barrier);
852   GNUNET_SERVER_receive_done (client, GNUNET_OK);
853 }
854
855
856 /**
857  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
858  * This handler is queued in the main service and will handle the messages sent
859  * either from the testbed driver or from a high level controller
860  *
861  * @param cls NULL
862  * @param client identification of the client
863  * @param message the actual message
864  */
865 void
866 GST_handle_barrier_status (void *cls, struct GNUNET_SERVER_Client *client,
867                            const struct GNUNET_MessageHeader *message)
868 {
869   const struct GNUNET_TESTBED_BarrierStatusMsg *msg;
870   struct Barrier *barrier;
871   struct ClientCtx *client_ctx;
872   const char *name;
873   struct GNUNET_HashCode key;
874   enum GNUNET_TESTBED_BarrierStatus status;
875   uint16_t msize;
876   uint16_t name_len;
877
878   if (NULL == GST_context)
879   {
880     GNUNET_break_op (0);
881     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
882     return;
883   }
884   if (client != GST_context->client)
885   {
886     GNUNET_break_op (0);
887     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
888     return;
889   }
890   msize = ntohs (message->size);
891   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg))
892   {
893     GNUNET_break_op (0);
894     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
895     return;
896   }
897   msg = (const struct GNUNET_TESTBED_BarrierStatusMsg *) message;
898   status = ntohs (msg->status);
899   if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status)
900   {
901     GNUNET_break_op (0);        /* current we only expect BARRIER_CROSSED
902                                    status message this way */
903     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
904     return;
905   }
906   name = msg->data;
907   name_len = ntohs (msg->name_len);
908   if ((sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1) != msize)
909   {
910     GNUNET_break_op (0);
911     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
912     return;
913   }
914   if ('\0' != name[name_len])
915   {
916     GNUNET_break_op (0);
917     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
918     return;
919   }
920   GNUNET_CRYPTO_hash (name, name_len, &key);
921   barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key);
922   if (NULL == barrier)
923   {
924     GNUNET_break_op (0);
925     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
926     return;
927   }
928   GNUNET_SERVER_receive_done (client, GNUNET_OK);
929   while (NULL != (client_ctx = barrier->head)) /* Notify peers */
930   {
931     queue_message (client_ctx, GNUNET_copy_message (message));
932     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
933   }
934 }
935
936 /* end of gnunet-service-testbed_barriers.c */