-rps: merge duplicate functions
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_barriers.c
1 /*
2   This file is part of GNUnet.
3   Copyright (C) 2008--2013 GNUnet e.V.
4
5   GNUnet is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published
7   by the Free Software Foundation; either version 3, or (at your
8   option) any later version.
9
10   GNUnet is distributed in the hope that it will be useful, but
11   WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13   General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with GNUnet; see the file COPYING.  If not, write to the
17   Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18   Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file testbed/gnunet-service-testbed_barriers.c
23  * @brief barrier handling at the testbed controller
24  * @author Sree Harsha Totakura <sreeharsha@totakura.in>
25  */
26
27 #include "gnunet-service-testbed.h"
28 #include "gnunet-service-testbed_barriers.h"
29 #include "testbed_api_barriers.h"
30
31
32 /**
33  * timeout for outgoing message transmissions in seconds
34  */
35 #define MESSAGE_SEND_TIMEOUT(s) \
36   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
37
38
39 /**
40  * Test to see if local peers have reached the required quorum of a barrier
41  */
42 #define LOCAL_QUORUM_REACHED(barrier)           \
43   ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
44
45
46 #ifdef LOG
47 #undef LOG
48 #endif
49
50 /**
51  * Logging shorthand
52  */
53 #define LOG(kind,...)                                           \
54   GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__)
55
56
57 /**
58  * Barrier
59  */
60 struct Barrier;
61
62
63 /**
64  * Message queue for transmitting messages
65  */
66 struct MessageQueue
67 {
68   /**
69    * next pointer for DLL
70    */
71   struct MessageQueue *next;
72
73   /**
74    * prev pointer for DLL
75    */
76   struct MessageQueue *prev;
77
78   /**
79    * The message to be sent
80    */
81   struct GNUNET_MessageHeader *msg;
82 };
83
84
85 /**
86  * Context to be associated with each client
87  */
88 struct ClientCtx
89 {
90   /**
91    * The barrier this client is waiting for
92    */
93   struct Barrier *barrier;
94
95   /**
96    * DLL next ptr
97    */
98   struct ClientCtx *next;
99
100   /**
101    * DLL prev ptr
102    */
103   struct ClientCtx *prev;
104
105   /**
106    * The client handle
107    */
108   struct GNUNET_SERVER_Client *client;
109
110   /**
111    * the transmission handle
112    */
113   struct GNUNET_SERVER_TransmitHandle *tx;
114
115   /**
116    * message queue head
117    */
118   struct MessageQueue *mq_head;
119
120   /**
121    * message queue tail
122    */
123   struct MessageQueue *mq_tail;
124 };
125
126
127 /**
128  * Wrapper around Barrier handle
129  */
130 struct WBarrier
131 {
132   /**
133    * DLL next pointer
134    */
135   struct WBarrier *next;
136
137   /**
138    * DLL prev pointer
139    */
140   struct WBarrier *prev;
141
142   /**
143    * The local barrier associated with the creation of this wrapper
144    */
145   struct Barrier *barrier;
146
147   /**
148    * The barrier handle from API
149    */
150   struct GNUNET_TESTBED_Barrier *hbarrier;
151
152   /**
153    * Has this barrier been crossed?
154    */
155   uint8_t reached;
156 };
157
158
159 /**
160  * Barrier
161  */
162 struct Barrier
163 {
164   /**
165    * The hashcode of the barrier name
166    */
167   struct GNUNET_HashCode hash;
168
169   /**
170    * The client handle to the master controller
171    */
172   struct GNUNET_SERVER_Client *mc;
173
174   /**
175    * The name of the barrier
176    */
177   char *name;
178
179   /**
180    * DLL head for the list of clients waiting for this barrier
181    */
182   struct ClientCtx *head;
183
184   /**
185    * DLL tail for the list of clients waiting for this barrier
186    */
187   struct ClientCtx *tail;
188
189   /**
190    * DLL head for the list of barrier handles
191    */
192   struct WBarrier *whead;
193
194   /**
195    * DLL tail for the list of barrier handles
196    */
197   struct WBarrier *wtail;
198
199   /**
200    * Identifier for the timeout task
201    */
202   struct GNUNET_SCHEDULER_Task * tout_task;
203
204   /**
205    * The status of this barrier
206    */
207   enum GNUNET_TESTBED_BarrierStatus status;
208
209   /**
210    * Number of barriers wrapped in the above DLL
211    */
212   unsigned int num_wbarriers;
213
214   /**
215    * Number of wrapped barriers reached so far
216    */
217   unsigned int num_wbarriers_reached;
218
219   /**
220    * Number of wrapped barrier initialised so far
221    */
222   unsigned int num_wbarriers_inited;
223
224   /**
225    * Number of peers which have reached this barrier
226    */
227   unsigned int nreached;
228
229   /**
230    * Number of slaves we have initialised this barrier
231    */
232   unsigned int nslaves;
233
234   /**
235    * Quorum percentage to be reached
236    */
237   uint8_t quorum;
238
239 };
240
241
242 /**
243  * Hashtable handle for storing initialised barriers
244  */
245 static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
246
247 /**
248  * Service context
249  */
250 static struct GNUNET_SERVICE_Context *ctx;
251
252
253 /**
254  * Function called to notify a client about the connection
255  * begin ready to queue more data.  "buf" will be
256  * NULL and "size" zero if the connection was closed for
257  * writing in the meantime.
258  *
259  * @param cls client context
260  * @param size number of bytes available in buf
261  * @param buf where the callee should write the message
262  * @return number of bytes written to buf
263  */
264 static size_t
265 transmit_ready_cb (void *cls, size_t size, void *buf)
266 {
267   struct ClientCtx *ctx = cls;
268   struct GNUNET_SERVER_Client *client = ctx->client;
269   struct MessageQueue *mq;
270   struct GNUNET_MessageHeader *msg;
271   size_t wrote;
272
273   ctx->tx = NULL;
274   if ((0 == size) || (NULL == buf))
275   {
276     GNUNET_assert (NULL != ctx->client);
277     GNUNET_SERVER_client_drop (ctx->client);
278     ctx->client = NULL;
279     return 0;
280   }
281   mq = ctx->mq_head;
282   msg = mq->msg;
283   wrote = ntohs (msg->size);
284   GNUNET_assert (size >= wrote);
285   (void) memcpy (buf, msg, wrote);
286   GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
287   GNUNET_free (mq->msg);
288   GNUNET_free (mq);
289   if (NULL != (mq = ctx->mq_head))
290     ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
291                                                   MESSAGE_SEND_TIMEOUT (30),
292                                                   &transmit_ready_cb, ctx);
293   return wrote;
294 }
295
296
297 /**
298  * Queue a message into a clients message queue
299  *
300  * @param ctx the context associated with the client
301  * @param msg the message to queue.  Will be consumed
302  */
303 static void
304 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
305 {
306   struct MessageQueue *mq;
307   struct GNUNET_SERVER_Client *client = ctx->client;
308
309   mq = GNUNET_new (struct MessageQueue);
310   mq->msg = msg;
311   LOG_DEBUG ("Queueing message of type %u, size %u for sending\n",
312              ntohs (msg->type), ntohs (msg->size));
313   GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
314   if (NULL == ctx->tx)
315    ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
316                                                   MESSAGE_SEND_TIMEOUT (30),
317                                                   &transmit_ready_cb, ctx);
318 }
319
320
321 /**
322  * Function to cleanup client context data structure
323  *
324  * @param ctx the client context data structure
325  */
326 static void
327 cleanup_clientctx (struct ClientCtx *ctx)
328 {
329   struct MessageQueue *mq;
330
331   if (NULL != ctx->client)
332   {
333     GNUNET_SERVER_client_set_user_context_ (ctx->client, NULL, 0);
334     GNUNET_SERVER_client_drop (ctx->client);
335   }
336   if (NULL != ctx->tx)
337     GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
338   if (NULL != (mq = ctx->mq_head))
339   {
340     GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
341     GNUNET_free (mq->msg);
342     GNUNET_free (mq);
343   }
344   GNUNET_free (ctx);
345 }
346
347
348 /**
349  * Function to remove a barrier from the barrier map and cleanup resources
350  * occupied by a barrier
351  *
352  * @param barrier the barrier handle
353  */
354 static void
355 remove_barrier (struct Barrier *barrier)
356 {
357   struct ClientCtx *ctx;
358
359   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
360                                                                     &barrier->hash,
361                                                                     barrier));
362   while (NULL != (ctx = barrier->head))
363   {
364     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx);
365     cleanup_clientctx (ctx);
366   }
367   GNUNET_free (barrier->name);
368   GNUNET_SERVER_client_drop (barrier->mc);
369   GNUNET_free (barrier);
370 }
371
372
373 /**
374  * Cancels all subcontroller barrier handles
375  *
376  * @param barrier the local barrier
377  */
378 static void
379 cancel_wrappers (struct Barrier *barrier)
380 {
381   struct WBarrier *wrapper;
382
383   while (NULL != (wrapper = barrier->whead))
384   {
385     GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
386     GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
387     GNUNET_free (wrapper);
388   }
389 }
390
391
392 /**
393  * Send a status message about a barrier to the given client
394  *
395  * @param client the client to send the message to
396  * @param name the barrier name
397  * @param status the status of the barrier
398  * @param emsg the error message; should be non-NULL for
399  *   status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
400  */
401 static void
402 send_client_status_msg (struct GNUNET_SERVER_Client *client,
403                         const char *name,
404                         enum GNUNET_TESTBED_BarrierStatus status,
405                         const char *emsg)
406 {
407   struct GNUNET_TESTBED_BarrierStatusMsg *msg;
408   size_t name_len;
409   uint16_t msize;
410
411   GNUNET_assert ((NULL == emsg) || (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status));
412   name_len = strlen (name);
413   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
414       + (name_len + 1)
415       + ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
416   msg = GNUNET_malloc (msize);
417   msg->header.size = htons (msize);
418   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
419   msg->status = htons (status);
420   msg->name_len = htons ((uint16_t) name_len);
421   (void) memcpy (msg->data, name, name_len);
422   if (NULL != emsg)
423     (void) memcpy (msg->data + name_len + 1, emsg, strlen (emsg));
424   GST_queue_message (client, &msg->header);
425 }
426
427
428 /**
429  * Sends a barrier failed message
430  *
431  * @param barrier the corresponding barrier
432  * @param emsg the error message; should be non-NULL for
433  *   status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
434  */
435 static void
436 send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
437 {
438   GNUNET_assert (0 != barrier->status);
439   send_client_status_msg (barrier->mc, barrier->name, barrier->status, emsg);
440 }
441
442
443 /**
444  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.  This
445  * message should come from peers or a shared helper service using the
446  * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
447  *
448  * This handler is queued in the main service and will handle the messages sent
449  * either from the testbed driver or from a high level controller
450  *
451  * @param cls NULL
452  * @param client identification of the client
453  * @param message the actual message
454  */
455 static void
456 handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
457                      const struct GNUNET_MessageHeader *message)
458 {
459   const struct GNUNET_TESTBED_BarrierWait *msg;
460   struct Barrier *barrier;
461   char *name;
462   struct ClientCtx *client_ctx;
463   struct GNUNET_HashCode key;
464   size_t name_len;
465   uint16_t msize;
466
467   msize = ntohs (message->size);
468   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
469   {
470     GNUNET_break_op (0);
471     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
472     return;
473   }
474   if (NULL == barrier_map)
475   {
476     GNUNET_break (0);
477     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
478     return;
479   }
480   msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
481   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
482   name = GNUNET_malloc (name_len + 1);
483   name[name_len] = '\0';
484   (void) memcpy (name, msg->name, name_len);
485   LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n", name);
486   GNUNET_CRYPTO_hash (name, name_len, &key);
487   GNUNET_free (name);
488   if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
489   {
490     GNUNET_break (0);
491     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
492     return;
493   }
494   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
495   if (NULL == client_ctx)
496   {
497     client_ctx = GNUNET_new (struct ClientCtx);
498     client_ctx->client = client;
499     GNUNET_SERVER_client_keep (client);
500     client_ctx->barrier = barrier;
501     GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
502     GNUNET_SERVER_client_set_user_context (client, client_ctx);
503   }
504   barrier->nreached++;
505   if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
506         && (LOCAL_QUORUM_REACHED (barrier)))
507   {
508     barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
509     send_barrier_status_msg (barrier, NULL);
510   }
511   GNUNET_SERVER_receive_done (client, GNUNET_OK);
512 }
513
514
515 /**
516  * Functions with this signature are called whenever a client
517  * is disconnected on the network level.
518  *
519  * @param cls closure
520  * @param client identification of the client; NULL
521  *        for the last call when the server is destroyed
522  */
523 static void
524 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
525 {
526   struct ClientCtx *client_ctx;
527
528   if (NULL == client)
529     return;
530   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
531   if (NULL == client_ctx)
532     return;
533   cleanup_clientctx (client_ctx);
534 }
535
536
537 /**
538  * Function to initialise barrriers component
539  *
540  * @param cfg the configuration to use for initialisation
541  */
542 void
543 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
544 {
545   static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
546     {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
547     {NULL, NULL, 0, 0}
548   };
549   struct GNUNET_SERVER_Handle *srv;
550
551   barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
552   ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
553                               GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
554   srv = GNUNET_SERVICE_get_server (ctx);
555   GNUNET_SERVER_add_handlers (srv, message_handlers);
556   GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);
557 }
558
559
560 /**
561  * Iterator over hash map entries.
562  *
563  * @param cls closure
564  * @param key current key code
565  * @param value value in the hash map
566  * @return #GNUNET_YES if we should continue to
567  *         iterate,
568  *         #GNUNET_NO if not.
569  */
570 static int
571 barrier_destroy_iterator (void *cls,
572                           const struct GNUNET_HashCode *key,
573                           void *value)
574 {
575   struct Barrier *barrier = value;
576
577   GNUNET_assert (NULL != barrier);
578   cancel_wrappers (barrier);
579   remove_barrier (barrier);
580   return GNUNET_YES;
581 }
582
583
584 /**
585  * Function to stop the barrier service
586  */
587 void
588 GST_barriers_destroy ()
589 {
590   GNUNET_assert (NULL != barrier_map);
591   GNUNET_assert (GNUNET_SYSERR !=
592                  GNUNET_CONTAINER_multihashmap_iterate (barrier_map,
593                                                         &barrier_destroy_iterator,
594                                                         NULL));
595   GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
596   GNUNET_assert (NULL != ctx);
597   GNUNET_SERVICE_stop (ctx);
598 }
599
600
601 /**
602  * Functions of this type are to be given as callback argument to
603  * GNUNET_TESTBED_barrier_init().  The callback will be called when status
604  * information is available for the barrier.
605  *
606  * @param cls the closure given to GNUNET_TESTBED_barrier_init()
607  * @param name the name of the barrier
608  * @param b_ the barrier handle
609  * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
610  *   GNUNET_SYSERR upon error
611  * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
612  *   error messsage
613  */
614 static void
615 wbarrier_status_cb (void *cls, const char *name,
616                     struct GNUNET_TESTBED_Barrier *b_,
617                     enum GNUNET_TESTBED_BarrierStatus status,
618                     const char *emsg)
619 {
620   struct WBarrier *wrapper = cls;
621   struct Barrier *barrier = wrapper->barrier;
622
623   GNUNET_assert (b_ == wrapper->hbarrier);
624   wrapper->hbarrier = NULL;
625   GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
626   GNUNET_free (wrapper);
627   switch (status)
628   {
629   case GNUNET_TESTBED_BARRIERSTATUS_ERROR:
630     LOG (GNUNET_ERROR_TYPE_ERROR,
631          "Initialising barrier `%s' failed at a sub-controller: %s\n",
632          barrier->name, (NULL != emsg) ? emsg : "NULL");
633     cancel_wrappers (barrier);
634     if (NULL == emsg)
635       emsg = "Initialisation failed at a sub-controller";
636     barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
637     send_barrier_status_msg (barrier, emsg);
638     return;
639   case GNUNET_TESTBED_BARRIERSTATUS_CROSSED:
640     if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED != barrier->status)
641     {
642       GNUNET_break_op (0);
643       return;
644     }
645     barrier->num_wbarriers_reached++;
646     if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
647         && (LOCAL_QUORUM_REACHED (barrier)))
648     {
649       barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
650       send_barrier_status_msg (barrier, NULL);
651     }
652     return;
653   case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED:
654     if (0 != barrier->status)
655     {
656       GNUNET_break_op (0);
657       return;
658     }
659     barrier->num_wbarriers_inited++;
660     if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
661     {
662       barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
663       send_barrier_status_msg (barrier, NULL);
664     }
665     return;
666   }
667 }
668
669
670 /**
671  * Function called upon timeout while waiting for a response from the
672  * subcontrollers to barrier init message
673  *
674  * @param cls barrier
675  */
676 static void
677 fwd_tout_barrier_init (void *cls)
678 {
679   struct Barrier *barrier = cls;
680
681   cancel_wrappers (barrier);
682   barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
683   send_barrier_status_msg (barrier,
684                            "Timedout while propagating barrier initialisation\n");
685   remove_barrier (barrier);
686 }
687
688
689 /**
690  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.  This
691  * message should always come from a parent controller or the testbed API if we
692  * are the root controller.
693  *
694  * This handler is queued in the main service and will handle the messages sent
695  * either from the testbed driver or from a high level controller
696  *
697  * @param cls NULL
698  * @param client identification of the client
699  * @param message the actual message
700  */
701 void
702 GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
703                          const struct GNUNET_MessageHeader *message)
704 {
705   const struct GNUNET_TESTBED_BarrierInit *msg;
706   char *name;
707   struct Barrier *barrier;
708   struct Slave *slave;
709   struct WBarrier *wrapper;
710   struct GNUNET_HashCode hash;
711   size_t name_len;
712   unsigned int cnt;
713   uint16_t msize;
714
715   if (NULL == GST_context)
716   {
717     GNUNET_break_op (0);
718     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
719     return;
720   }
721   if (client != GST_context->client)
722   {
723     GNUNET_break_op (0);
724     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
725     return;
726   }
727   msize = ntohs (message->size);
728   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
729   {
730     GNUNET_break_op (0);
731     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
732     return;
733   }
734   msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
735   name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
736   name = GNUNET_malloc (name_len + 1);
737   (void) memcpy (name, msg->name, name_len);
738   GNUNET_CRYPTO_hash (name, name_len, &hash);
739   LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n", name);
740   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
741   {
742
743     send_client_status_msg (client, name, GNUNET_TESTBED_BARRIERSTATUS_ERROR,
744                             "A barrier with the same name already exists");
745     GNUNET_free (name);
746     GNUNET_SERVER_receive_done (client, GNUNET_OK);
747     return;
748   }
749   barrier = GNUNET_new (struct Barrier);
750   (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
751   barrier->quorum = msg->quorum;
752   barrier->name = name;
753   barrier->mc = client;
754   GNUNET_SERVER_client_keep (client);
755   GNUNET_assert (GNUNET_OK ==
756                  GNUNET_CONTAINER_multihashmap_put (barrier_map,
757                                                     &barrier->hash,
758                                                     barrier,
759                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
760   GNUNET_SERVER_receive_done (client, GNUNET_OK);
761   /* Propagate barrier init to subcontrollers */
762   for (cnt = 0; cnt < GST_slave_list_size; cnt++)
763   {
764     if (NULL == (slave = GST_slave_list[cnt]))
765       continue;
766     if (NULL == slave->controller)
767     {
768       GNUNET_break (0);/* May happen when we are connecting to the controller */
769       continue;
770     }
771     wrapper = GNUNET_new (struct WBarrier);
772     wrapper->barrier = barrier;
773     GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
774     wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (slave->controller,
775                                                       barrier->name,
776                                                       barrier->quorum,
777                                                       &wbarrier_status_cb,
778                                                       wrapper,
779                                                       GNUNET_NO);
780   }
781   if (NULL == barrier->whead)   /* No further propagation */
782   {
783     barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
784     LOG_DEBUG ("Sending GNUNET_TESTBED_BARRIERSTATUS_INITIALISED for barrier `%s'\n",
785                barrier->name);
786     send_barrier_status_msg (barrier, NULL);
787   }else
788     barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
789                                                        &fwd_tout_barrier_init,
790                                                        barrier);
791 }
792
793
794 /**
795  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.  This
796  * message should always come from a parent controller or the testbed API if we
797  * are the root controller.
798  *
799  * This handler is queued in the main service and will handle the messages sent
800  * either from the testbed driver or from a high level controller
801  *
802  * @param cls NULL
803  * @param client identification of the client
804  * @param message the actual message
805  */
806 void
807 GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
808                            const struct GNUNET_MessageHeader *message)
809 {
810   const struct GNUNET_TESTBED_BarrierCancel *msg;
811   char *name;
812   struct Barrier *barrier;
813   struct GNUNET_HashCode hash;
814   size_t name_len;
815   uint16_t msize;
816
817   if (NULL == GST_context)
818   {
819     GNUNET_break_op (0);
820     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
821     return;
822   }
823   if (client != GST_context->client)
824   {
825     GNUNET_break_op (0);
826     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
827     return;
828   }
829   msize = ntohs (message->size);
830   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
831   {
832     GNUNET_break_op (0);
833     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
834     return;
835   }
836   msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
837   name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
838   name = GNUNET_malloc (name_len + 1);
839   (void) memcpy (name, msg->name, name_len);
840   GNUNET_CRYPTO_hash (name, name_len, &hash);
841   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
842   {
843     GNUNET_break_op (0);
844     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
845     return;
846   }
847   barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash);
848   GNUNET_assert (NULL != barrier);
849   cancel_wrappers (barrier);
850   remove_barrier (barrier);
851   GNUNET_SERVER_receive_done (client, GNUNET_OK);
852 }
853
854
855 /**
856  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
857  * This handler is queued in the main service and will handle the messages sent
858  * either from the testbed driver or from a high level controller
859  *
860  * @param cls NULL
861  * @param client identification of the client
862  * @param message the actual message
863  */
864 void
865 GST_handle_barrier_status (void *cls, struct GNUNET_SERVER_Client *client,
866                            const struct GNUNET_MessageHeader *message)
867 {
868   const struct GNUNET_TESTBED_BarrierStatusMsg *msg;
869   struct Barrier *barrier;
870   struct ClientCtx *client_ctx;
871   const char *name;
872   struct GNUNET_HashCode key;
873   enum GNUNET_TESTBED_BarrierStatus status;
874   uint16_t msize;
875   uint16_t name_len;
876
877   if (NULL == GST_context)
878   {
879     GNUNET_break_op (0);
880     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
881     return;
882   }
883   if (client != GST_context->client)
884   {
885     GNUNET_break_op (0);
886     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
887     return;
888   }
889   msize = ntohs (message->size);
890   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg))
891   {
892     GNUNET_break_op (0);
893     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
894     return;
895   }
896   msg = (const struct GNUNET_TESTBED_BarrierStatusMsg *) message;
897   status = ntohs (msg->status);
898   if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status)
899   {
900     GNUNET_break_op (0);        /* current we only expect BARRIER_CROSSED
901                                    status message this way */
902     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
903     return;
904   }
905   name = msg->data;
906   name_len = ntohs (msg->name_len);
907   if ((sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1) != msize)
908   {
909     GNUNET_break_op (0);
910     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
911     return;
912   }
913   if ('\0' != name[name_len])
914   {
915     GNUNET_break_op (0);
916     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
917     return;
918   }
919   GNUNET_CRYPTO_hash (name, name_len, &key);
920   barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key);
921   if (NULL == barrier)
922   {
923     GNUNET_break_op (0);
924     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
925     return;
926   }
927   GNUNET_SERVER_receive_done (client, GNUNET_OK);
928   while (NULL != (client_ctx = barrier->head)) /* Notify peers */
929   {
930     queue_message (client_ctx, GNUNET_copy_message (message));
931     GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
932   }
933 }
934
935 /* end of gnunet-service-testbed_barriers.c */