693396ef6e42170b30b406cc150f391f74ec68a1
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_barriers.c
1 /*
2   This file is part of GNUnet.
3   (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18   Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file testbed/gnunet-service-testbed_barriers.c
23  * @brief barrier handling at the testbed controller
24  * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
25  */
26
27 #include "gnunet-service-testbed.h"
28 #include "gnunet-service-testbed_barriers.h"
29
30
31 /**
32  * timeout for outgoing message transmissions in seconds
33  */
34 #define MESSAGE_SEND_TIMEOUT(s) \
35   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
36
37
38 /**
39  * Test to see if local peers have reached the required quorum of a barrier
40  */
41 #define LOCAL_QUORUM_REACHED(barrier)           \
42   ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
43
44
45 #ifdef LOG
46 #undef LOG
47 #endif
48
49 /**
50  * Logging shorthand
51  */
52 #define LOG(kind,...)                                           \
53   GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__)
54
55
56 /**
57  * Barrier
58  */
59 struct Barrier;
60
61
62 /**
63  * Message queue for transmitting messages
64  */
65 struct MessageQueue
66 {
67   /**
68    * next pointer for DLL
69    */
70   struct MessageQueue *next;
71
72   /**
73    * prev pointer for DLL
74    */
75   struct MessageQueue *prev;
76
77   /**
78    * The message to be sent
79    */
80   struct GNUNET_MessageHeader *msg;
81 };
82
83
84 /**
85  * Context to be associated with each client
86  */
87 struct ClientCtx
88 {
89   /**
90    * The barrier this client is waiting for
91    */
92   struct Barrier *barrier;
93
94   /**
95    * DLL next ptr
96    */
97   struct ClientCtx *next;
98
99   /**
100    * DLL prev ptr
101    */
102   struct ClientCtx *prev;
103
104   /**
105    * The client handle
106    */
107   struct GNUNET_SERVER_Client *client;
108
109   /**
110    * the transmission handle
111    */
112   struct GNUNET_SERVER_TransmitHandle *tx;
113
114   /**
115    * message queue head
116    */
117   struct MessageQueue *mq_head;
118
119   /**
120    * message queue tail
121    */
122   struct MessageQueue *mq_tail;
123 };
124
125
126 /**
127  * Wrapper around Barrier handle
128  */
129 struct WBarrier
130 {
131   /**
132    * DLL next pointer
133    */
134   struct WBarrier *next;
135
136   /**
137    * DLL prev pointer
138    */
139   struct WBarrier *prev;
140
141   /**
142    * The local barrier associated with the creation of this wrapper
143    */
144   struct Barrier *barrier;
145
146   /**
147    * The barrier handle from API
148    */
149   struct GNUNET_TESTBED_Barrier *hbarrier;
150
151   /**
152    * Has this barrier been crossed?
153    */
154   uint8_t reached;
155 };
156
157
158 /**
159  * Barrier
160  */
161 struct Barrier
162 {
163   /**
164    * The hashcode of the barrier name
165    */
166   struct GNUNET_HashCode hash;
167
168   /**
169    * The client handle to the master controller
170    */
171   struct GNUNET_SERVER_Client *client;
172
173   /**
174    * The name of the barrier
175    */
176   char *name;
177
178   /**
179    * DLL head for the list of clients waiting for this barrier
180    */
181   struct ClientCtx *head;
182
183   /**
184    * DLL tail for the list of clients waiting for this barrier
185    */
186   struct ClientCtx *tail;
187
188   /**
189    * DLL head for the list of barrier handles
190    */
191   struct WBarrier *whead;
192
193   /**
194    * DLL tail for the list of barrier handles
195    */
196   struct WBarrier *wtail;
197
198   /**
199    * Identifier for the timeout task
200    */
201   GNUNET_SCHEDULER_TaskIdentifier tout_task;
202   
203   /**
204    * The status of this barrier
205    */
206   enum GNUNET_TESTBED_BarrierStatus status;
207   
208   /**
209    * Number of barriers wrapped in the above DLL
210    */
211   unsigned int num_wbarriers;
212
213   /**
214    * Number of wrapped barriers reached so far
215    */
216   unsigned int num_wbarriers_reached;
217
218   /**
219    * Number of wrapped barrier initialised so far
220    */
221   unsigned int num_wbarriers_inited;
222
223   /**
224    * Number of peers which have reached this barrier
225    */
226   unsigned int nreached;
227
228   /**
229    * Number of slaves we have initialised this barrier
230    */
231   unsigned int nslaves;
232
233   /**
234    * Quorum percentage to be reached
235    */
236   uint8_t quorum;
237   
238 };
239
240
241 /**
242  * Hashtable handle for storing initialised barriers
243  */
244 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
245
246 /**
247  * Service context
248  */
249 static struct GNUNET_SERVICE_Context *ctx;
250
251
252 /**
253  * Function called to notify a client about the connection
254  * begin ready to queue more data.  "buf" will be
255  * NULL and "size" zero if the connection was closed for
256  * writing in the meantime.
257  *
258  * @param cls client context
259  * @param size number of bytes available in buf
260  * @param buf where the callee should write the message
261  * @return number of bytes written to buf
262  */
263 static size_t 
264 transmit_ready_cb (void *cls, size_t size, void *buf)
265 {
266   struct ClientCtx *ctx = cls;
267   struct GNUNET_SERVER_Client *client = ctx->client;
268   struct MessageQueue *mq;
269   struct GNUNET_MessageHeader *msg;
270   size_t wrote;
271
272   ctx->tx = NULL;
273   wrote = 0;
274   if ((0 == size) || (NULL == buf))
275   {
276     GNUNET_assert (NULL != ctx->client);
277     GNUNET_SERVER_client_drop (ctx->client);
278     ctx->client = NULL;    
279     return 0;
280   }
281   mq = ctx->mq_head;
282   msg = mq->msg;
283   wrote = ntohs (msg->size);
284   GNUNET_assert (size >= wrote);
285   (void) memcpy (buf, msg, wrote);
286   GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
287   GNUNET_free (mq->msg);
288   GNUNET_free (mq);
289   if (NULL != (mq = ctx->mq_head))
290     ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
291                                                   MESSAGE_SEND_TIMEOUT (30),
292                                                   &transmit_ready_cb, ctx);
293   return wrote;
294 }
295
296
297 /**
298  * Queue a message into a clients message queue
299  *
300  * @param ctx the context associated with the client
301  * @param msg the message to queue.  Will be consumed
302  */
303 static void
304 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
305 {
306   struct MessageQueue *mq;
307   struct GNUNET_SERVER_Client *client = ctx->client;
308   
309   mq = GNUNET_malloc (sizeof (struct MessageQueue));
310   mq->msg = msg;
311   GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
312   if (NULL == ctx->tx)
313    ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
314                                                   MESSAGE_SEND_TIMEOUT (30),
315                                                   &transmit_ready_cb, ctx);
316 }
317
318
319 /**
320  * Function to cleanup client context data structure
321  *
322  * @param ctx the client context data structure
323  */
324 static void
325 cleanup_clientctx (struct ClientCtx *ctx)
326 {
327   struct MessageQueue *mq;
328   
329   GNUNET_SERVER_client_drop (ctx->client);
330   if (NULL != ctx->tx)
331     GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
332   if (NULL != (mq = ctx->mq_head))
333   {
334     GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
335     GNUNET_free (mq->msg);
336     GNUNET_free (mq);
337   }
338   GNUNET_free (ctx);
339 }
340
341
342 /**
343  * Function to remove a barrier from the barrier map and cleanup resources
344  * occupied by a barrier
345  *
346  * @param barrier the barrier handle
347  */
348 static void
349 remove_barrier (struct Barrier *barrier)
350 {
351   struct ClientCtx *ctx;
352   
353   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
354                                                                     &barrier->hash,
355                                                                     barrier));
356   while (NULL != (ctx = barrier->head))
357   {
358     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx);
359     cleanup_clientctx (ctx);
360   }
361   GNUNET_free (barrier->name);
362   GNUNET_SERVER_client_drop (barrier->client);
363   GNUNET_free (barrier);
364 }
365
366
367 /**
368  * Cancels all subcontroller barrier handles
369  *
370  * @param barrier the local barrier
371  */
372 static void
373 cancel_wrappers (struct Barrier *barrier)
374 {
375   struct WBarrier *wrapper;
376
377   while (NULL != (wrapper = barrier->whead))
378   {
379     GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
380     GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
381     GNUNET_free (wrapper);
382   }
383 }
384
385
386 /**
387  * Send a status message about a barrier to the given client
388  *
389  * @param client the client to send the message to
390  * @param name the barrier name
391  * @param status the status of the barrier
392  * @param emsg the error message; should be non-NULL for
393  *   status=BARRIER_STATUS_ERROR 
394  */
395 static void
396 send_client_status_msg (struct GNUNET_SERVER_Client *client,
397                         const char *name,
398                         enum GNUNET_TESTBED_BarrierStatus status,
399                         const char *emsg)
400 {
401   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
402   size_t name_len;
403   uint16_t msize;
404
405   GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status));
406   name_len = strlen (name);
407   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
408       + (name_len + 1)
409       + ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
410   msg = GNUNET_malloc (msize);
411   msg->header.size = htons (msize);
412   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
413   msg->status = htons (status);
414   msg->name_len = htons ((uint16_t) name_len);
415   (void) memcpy (msg->data, name, name_len);
416   if (NULL != emsg)
417     (void) memcpy (msg->data + name_len + 1, emsg, strlen (emsg));
418   GST_queue_message (client, &msg->header);
419 }
420
421
422 /**
423  * Sends a barrier failed message
424  *
425  * @param barrier the corresponding barrier
426  * @param emsg the error message; should be non-NULL for
427  *   status=BARRIER_STATUS_ERROR 
428  */
429 static void
430 send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
431 {
432   GNUNET_assert (0 != barrier->status);
433   send_client_status_msg (barrier->client, barrier->name,
434                           barrier->status, emsg);
435 }
436
437
438
439 /**
440  * Task for sending barrier crossed notifications to waiting client
441  *
442  * @param cls the barrier which is crossed
443  * @param tc scheduler task context
444  */
445 static void
446 notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
447 {
448   struct Barrier *barrier = cls;
449   struct ClientCtx *client_ctx;
450   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
451   struct GNUNET_MessageHeader *dup_msg;
452   uint16_t name_len;
453   uint16_t msize;
454
455   name_len = strlen (barrier->name) + 1;
456   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len;  
457   msg = GNUNET_malloc (msize);
458   msg->header.size = htons (msize);
459   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
460   msg->status = htons (BARRIER_STATUS_CROSSED);
461   msg->name_len = htons (name_len);
462   (void) memcpy (msg->data, barrier->name, name_len);
463   msg->data[name_len] = '\0';
464   while (NULL != (client_ctx = barrier->head))
465   {
466     dup_msg = GNUNET_copy_message (&msg->header);
467     queue_message (client_ctx, dup_msg);
468     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
469   }
470 }
471
472
473 /**
474  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.  This
475  * message should come from peers or a shared helper service using the
476  * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
477  *
478  * This handler is queued in the main service and will handle the messages sent
479  * either from the testbed driver or from a high level controller
480  *
481  * @param cls NULL
482  * @param client identification of the client
483  * @param message the actual message
484  */
485 static void
486 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
487                      const struct GNUNET_MessageHeader *message)
488 {
489   const struct GNUNET_TESTBED_BarrierWait *msg;
490   struct Barrier *barrier;
491   char *name;
492   struct ClientCtx *client_ctx;
493   struct GNUNET_HashCode key;
494   size_t name_len;
495   uint16_t msize;
496   
497   msize = ntohs (message->size);
498   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
499   {
500     GNUNET_break_op (0);
501     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
502     return;
503   }
504   if (NULL == barrier_map)
505   {
506     GNUNET_break (0);
507     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
508     return;
509   }
510   msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
511   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
512   name = GNUNET_malloc (name_len + 1);
513   name[name_len] = '\0';
514   (void) memcpy (name, msg->name, name_len);
515   LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n", name);
516   GNUNET_CRYPTO_hash (name, name_len, &key);
517   if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
518   {
519     GNUNET_break (0);
520     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
521     GNUNET_free (name);
522     return;
523   }
524   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
525   if (NULL == client_ctx)
526   {
527     client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
528     client_ctx->client = client;
529     GNUNET_SERVER_client_keep (client);
530     client_ctx->barrier = barrier;
531     GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
532   }
533   barrier->nreached++;
534   if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
535         && (LOCAL_QUORUM_REACHED (barrier)))
536   {
537     barrier->status = BARRIER_STATUS_CROSSED;
538     send_barrier_status_msg (barrier, NULL);
539     notify_task_cb (barrier, NULL);
540   }
541   GNUNET_SERVER_receive_done (client, GNUNET_OK);
542 }
543
544
545 /**
546  * Functions with this signature are called whenever a client
547  * is disconnected on the network level.
548  *
549  * @param cls closure
550  * @param client identification of the client; NULL
551  *        for the last call when the server is destroyed
552  */
553 static void
554 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
555 {
556   struct ClientCtx *client_ctx;
557   
558   if (NULL == client)
559     return;
560   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
561   if (NULL == client_ctx)
562     return;                     /* We only set user context for locally
563                                    connected clients */
564   cleanup_clientctx (client_ctx);
565 }
566
567
568 /**
569  * Function to initialise barrriers component
570  *
571  * @param cfg the configuration to use for initialisation
572  */
573 void
574 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
575 {
576   static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
577     {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
578     {NULL, NULL, 0, 0}
579   };
580   struct GNUNET_SERVER_Handle *srv;
581
582   barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
583   ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
584                               GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
585   srv = GNUNET_SERVICE_get_server (ctx);
586   GNUNET_SERVER_add_handlers (srv, message_handlers);
587   GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);  
588 }
589
590
591 /**
592  * Function to stop the barrier service
593  */
594 void
595 GST_barriers_destroy ()
596 {
597   GNUNET_assert (NULL != barrier_map);
598   GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
599   GNUNET_assert (NULL != ctx);
600   GNUNET_SERVICE_stop (ctx);
601 }
602
603
604 /**
605  * Functions of this type are to be given as callback argument to
606  * GNUNET_TESTBED_barrier_init().  The callback will be called when status
607  * information is available for the barrier.
608  *
609  * @param cls the closure given to GNUNET_TESTBED_barrier_init()
610  * @param name the name of the barrier
611  * @param b_ the barrier handle
612  * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
613  *   GNUNET_SYSERR upon error
614  * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
615  *   error messsage
616  */
617 static void 
618 wbarrier_status_cb (void *cls, const char *name,
619                     struct GNUNET_TESTBED_Barrier *b_,
620                     enum GNUNET_TESTBED_BarrierStatus status,
621                     const char *emsg)
622 {
623   struct WBarrier *wrapper = cls;
624   struct Barrier *barrier = wrapper->barrier;
625
626   GNUNET_assert (b_ == wrapper->hbarrier);
627   wrapper->hbarrier = NULL;
628   GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
629   GNUNET_free (wrapper);
630   switch (status)
631   {
632   case BARRIER_STATUS_ERROR:
633     LOG (GNUNET_ERROR_TYPE_ERROR,
634          "Initialising barrier `%s' failed at a sub-controller: %s\n",
635          barrier->name, (NULL != emsg) ? emsg : "NULL");
636     cancel_wrappers (barrier);
637     if (NULL == emsg)
638       emsg = "Initialisation failed at a sub-controller";
639     barrier->status = BARRIER_STATUS_ERROR;
640     send_barrier_status_msg (barrier, emsg);
641     return;
642   case BARRIER_STATUS_CROSSED:
643     if (BARRIER_STATUS_INITIALISED != barrier->status)
644     {
645       GNUNET_break_op (0);
646       return;
647     }
648     barrier->num_wbarriers_reached++;
649     if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
650         && (LOCAL_QUORUM_REACHED (barrier)))
651     {
652       barrier->status = BARRIER_STATUS_CROSSED;
653       send_barrier_status_msg (barrier, NULL);
654     }
655     return;
656   case BARRIER_STATUS_INITIALISED:
657     if (0 != barrier->status)
658     {
659       GNUNET_break_op (0);
660       return;
661     }
662     barrier->num_wbarriers_inited++;
663     if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
664     {
665       barrier->status = BARRIER_STATUS_INITIALISED;
666       send_barrier_status_msg (barrier, NULL);
667     }
668     return;
669   }
670 }
671
672
673 /**
674  * Function called upon timeout while waiting for a response from the
675  * subcontrollers to barrier init message
676  *
677  * @param cls barrier
678  * @param tc scheduler task context
679  */
680 static void
681 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
682 {
683   struct Barrier *barrier = cls;
684   
685   cancel_wrappers (barrier);
686   barrier->status = BARRIER_STATUS_ERROR;
687   send_barrier_status_msg (barrier,
688                            "Timedout while propagating barrier initialisation\n");
689   remove_barrier (barrier);
690 }
691
692
693 /**
694  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.  This
695  * message should always come from a parent controller or the testbed API if we
696  * are the root controller.
697  *
698  * This handler is queued in the main service and will handle the messages sent
699  * either from the testbed driver or from a high level controller
700  *
701  * @param cls NULL
702  * @param client identification of the client
703  * @param message the actual message
704  */
705 void
706 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
707                          const struct GNUNET_MessageHeader *message)
708 {
709   const struct GNUNET_TESTBED_BarrierInit *msg;
710   char *name;
711   struct Barrier *barrier;
712   struct Slave *slave;
713   struct WBarrier *wrapper;
714   struct GNUNET_HashCode hash;
715   size_t name_len;
716   unsigned int cnt;
717   uint16_t msize;
718   
719   if (NULL == GST_context)
720   {
721     GNUNET_break_op (0);
722     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
723     return;
724   }
725   if (client != GST_context->client)
726   {
727     GNUNET_break_op (0);
728     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
729     return;
730   }
731   msize = ntohs (message->size);
732   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
733   {
734     GNUNET_break_op (0);
735     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
736     return;
737   }
738   msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
739   name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
740   name = GNUNET_malloc (name_len + 1);
741   (void) memcpy (name, msg->name, name_len);
742   GNUNET_CRYPTO_hash (name, name_len, &hash);
743   LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n", name);
744   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
745   {
746     
747     send_client_status_msg (client, name, BARRIER_STATUS_ERROR,
748                             "A barrier with the same name already exists");
749     GNUNET_free (name);
750     GNUNET_SERVER_receive_done (client, GNUNET_OK);
751     return;
752   }
753   barrier = GNUNET_malloc (sizeof (struct Barrier));
754   (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
755   barrier->quorum = msg->quorum;
756   barrier->name = name;
757   barrier->client = client;
758   GNUNET_SERVER_client_keep (client);
759   GNUNET_assert (GNUNET_OK ==
760                  GNUNET_CONTAINER_multihashmap_put (barrier_map,
761                                                     &barrier->hash,
762                                                     barrier,
763                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
764   GNUNET_SERVER_receive_done (client, GNUNET_OK);
765   /* Propagate barrier init to subcontrollers */
766   for (cnt = 0; cnt < GST_slave_list_size; cnt++)
767   {
768     if (NULL == (slave = GST_slave_list[cnt]))
769       continue;
770     if (NULL == slave->controller)
771     {
772       GNUNET_break (0);/* May happen when we are connecting to the controller */
773       continue;
774     }
775     wrapper = GNUNET_malloc (sizeof (struct WBarrier));
776     wrapper->barrier = barrier;
777     GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
778     wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller,
779                                                      barrier->name,
780                                                      barrier->quorum,
781                                                      &wbarrier_status_cb,
782                                                      wrapper);    
783   }
784   if (NULL == barrier->whead)   /* No further propagation */
785   {
786     barrier->status = BARRIER_STATUS_INITIALISED;
787     LOG_DEBUG ("Sending BARRIER_STATUS_INITIALISED for barrier `%s'\n",
788                barrier->name);
789     send_barrier_status_msg (barrier, NULL);
790   }else
791     barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
792                                                        &fwd_tout_barrier_init,
793                                                        barrier);
794 }
795
796
797 /**
798  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.  This
799  * message should always come from a parent controller or the testbed API if we
800  * are the root controller.
801  *
802  * This handler is queued in the main service and will handle the messages sent
803  * either from the testbed driver or from a high level controller
804  *
805  * @param cls NULL
806  * @param client identification of the client
807  * @param message the actual message
808  */
809 void
810 GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
811                            const struct GNUNET_MessageHeader *message)
812 {
813   const struct GNUNET_TESTBED_BarrierCancel *msg;
814   char *name;
815   struct Barrier *barrier;
816   struct GNUNET_HashCode hash;
817   size_t name_len;
818   uint16_t msize;
819
820   if (NULL == GST_context)
821   {
822     GNUNET_break_op (0);
823     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
824     return;
825   }  
826   if (client != GST_context->client)
827   {
828     GNUNET_break_op (0);
829     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
830     return;
831   }
832   msize = ntohs (message->size);
833   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
834   {
835     GNUNET_break_op (0);
836     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
837     return;
838   }
839   msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
840   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
841   name = GNUNET_malloc (name_len + 1);
842   (void) memcpy (name, msg->name, name_len);
843   GNUNET_CRYPTO_hash (name, name_len, &hash);
844   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
845   {
846     GNUNET_break_op (0);
847     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
848     return;
849   }
850   barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash);
851   GNUNET_assert (NULL != barrier);
852   cancel_wrappers (barrier);
853   remove_barrier (barrier);
854   GNUNET_SERVER_receive_done (client, GNUNET_OK);  
855 }
856
857 /* end of gnunet-service-testbed_barriers.c */