Forget to commit some files
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_barriers.c
index 71fd8ebceaf63e7a99949bc7e208480de5e47389..4450ddf777c2d59afd01f22e993fdda364be51af 100644 (file)
@@ -1,6 +1,6 @@
 /*
   This file is part of GNUnet.
-  (C) 2008--2013 Christian Grothoff (and other contributing authors)
+  Copyright (C) 2008--2013 Christian Grothoff (and other contributing authors)
 
   GNUnet is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published
 
   You should have received a copy of the GNU General Public License
   along with GNUnet; see the file COPYING.  If not, write to the
-  Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-  Boston, MA 02111-1307, USA.
+  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+  Boston, MA 02110-1301, USA.
 */
 
 /**
  * @file testbed/gnunet-service-testbed_barriers.c
  * @brief barrier handling at the testbed controller
- * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
+ * @author Sree Harsha Totakura <sreeharsha@totakura.in>
  */
 
 #include "gnunet-service-testbed.h"
 #include "gnunet-service-testbed_barriers.h"
+#include "testbed_api_barriers.h"
 
 
 /**
   ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
 
 
+#ifdef LOG
+#undef LOG
+#endif
+
+/**
+ * Logging shorthand
+ */
+#define LOG(kind,...)                                           \
+  GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__)
+
+
 /**
  * Barrier
  */
@@ -157,7 +169,7 @@ struct Barrier
   /**
    * The client handle to the master controller
    */
-  struct GNUNET_SERVER_Client *client;
+  struct GNUNET_SERVER_Client *mc;
 
   /**
    * The name of the barrier
@@ -187,13 +199,13 @@ struct Barrier
   /**
    * Identifier for the timeout task
    */
-  GNUNET_SCHEDULER_TaskIdentifier tout_task;
-  
+  struct GNUNET_SCHEDULER_Task * tout_task;
+
   /**
    * The status of this barrier
    */
   enum GNUNET_TESTBED_BarrierStatus status;
-  
+
   /**
    * Number of barriers wrapped in the above DLL
    */
@@ -223,7 +235,7 @@ struct Barrier
    * Quorum percentage to be reached
    */
   uint8_t quorum;
-  
+
 };
 
 
@@ -249,7 +261,7 @@ static struct GNUNET_SERVICE_Context *ctx;
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
  */
-static size_t 
+static size_t
 transmit_ready_cb (void *cls, size_t size, void *buf)
 {
   struct ClientCtx *ctx = cls;
@@ -259,12 +271,11 @@ transmit_ready_cb (void *cls, size_t size, void *buf)
   size_t wrote;
 
   ctx->tx = NULL;
-  wrote = 0;
   if ((0 == size) || (NULL == buf))
   {
     GNUNET_assert (NULL != ctx->client);
     GNUNET_SERVER_client_drop (ctx->client);
-    ctx->client = NULL;    
+    ctx->client = NULL;
     return 0;
   }
   mq = ctx->mq_head;
@@ -294,9 +305,11 @@ queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
 {
   struct MessageQueue *mq;
   struct GNUNET_SERVER_Client *client = ctx->client;
-  
-  mq = GNUNET_malloc (sizeof (struct MessageQueue));
+
+  mq = GNUNET_new (struct MessageQueue);
   mq->msg = msg;
+  LOG_DEBUG ("Queueing message of type %u, size %u for sending\n",
+             ntohs (msg->type), ntohs (msg->size));
   GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
   if (NULL == ctx->tx)
    ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
@@ -314,8 +327,12 @@ static void
 cleanup_clientctx (struct ClientCtx *ctx)
 {
   struct MessageQueue *mq;
-  
-  GNUNET_SERVER_client_drop (ctx->client);
+
+  if (NULL != ctx->client)
+  {
+    GNUNET_SERVER_client_set_user_context_ (ctx->client, NULL, 0);
+    GNUNET_SERVER_client_drop (ctx->client);
+  }
   if (NULL != ctx->tx)
     GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
   if (NULL != (mq = ctx->mq_head))
@@ -338,7 +355,7 @@ static void
 remove_barrier (struct Barrier *barrier)
 {
   struct ClientCtx *ctx;
-  
+
   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
                                                                     &barrier->hash,
                                                                     barrier));
@@ -348,7 +365,7 @@ remove_barrier (struct Barrier *barrier)
     cleanup_clientctx (ctx);
   }
   GNUNET_free (barrier->name);
-  GNUNET_SERVER_client_drop (barrier->client);
+  GNUNET_SERVER_client_drop (barrier->mc);
   GNUNET_free (barrier);
 }
 
@@ -379,7 +396,7 @@ cancel_wrappers (struct Barrier *barrier)
  * @param name the barrier name
  * @param status the status of the barrier
  * @param emsg the error message; should be non-NULL for
- *   status=BARRIER_STATUS_ERROR 
+ *   status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
  */
 static void
 send_client_status_msg (struct GNUNET_SERVER_Client *client,
@@ -391,17 +408,19 @@ send_client_status_msg (struct GNUNET_SERVER_Client *client,
   size_t name_len;
   uint16_t msize;
 
-  GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status));
-  name_len = strlen (name) + 1;
+  GNUNET_assert ((NULL == emsg) || (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status));
+  name_len = strlen (name);
   msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
-      + name_len
-      + (NULL == emsg) ? 0 : strlen (emsg) + 1;
+      + (name_len + 1)
+      + ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
   msg = GNUNET_malloc (msize);
+  msg->header.size = htons (msize);
+  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
   msg->status = htons (status);
   msg->name_len = htons ((uint16_t) name_len);
   (void) memcpy (msg->data, name, name_len);
   if (NULL != emsg)
-    (void) memcpy (msg->data + name_len, emsg, strlen (emsg) + 1);
+    (void) memcpy (msg->data + name_len + 1, emsg, strlen (emsg));
   GST_queue_message (client, &msg->header);
 }
 
@@ -411,51 +430,13 @@ send_client_status_msg (struct GNUNET_SERVER_Client *client,
  *
  * @param barrier the corresponding barrier
  * @param emsg the error message; should be non-NULL for
- *   status=BARRIER_STATUS_ERROR 
+ *   status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
  */
 static void
 send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
 {
   GNUNET_assert (0 != barrier->status);
-  send_client_status_msg (barrier->client, barrier->name,
-                          barrier->status, emsg);
-}
-
-
-
-/**
- * Task for sending barrier crossed notifications to waiting client
- *
- * @param cls the barrier which is crossed
- * @param tc scheduler task context
- */
-static void
-notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct Barrier *barrier = cls;
-  struct ClientCtx *client_ctx;
-  struct GNUNET_TESTBED_BarrierStatusMsg *msg;
-  struct GNUNET_MessageHeader *dup_msg;
-  uint16_t name_len;
-  uint16_t msize;
-
-  name_len = strlen (barrier->name) + 1;
-  msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len;  
-  msg = GNUNET_malloc (msize);
-  msg->header.size = htons (msize);
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
-  msg->status = 0;
-  msg->name_len = htons (name_len);
-  (void) memcpy (msg->data, barrier->name, name_len);
-  msg->data[name_len] = '\0';
-  while (NULL != (client_ctx = barrier->head))
-  {
-    dup_msg = GNUNET_copy_message (&msg->header);
-    queue_message (client_ctx, dup_msg);
-    GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
-    GNUNET_SERVER_client_set_user_context_ (client_ctx->client, NULL, 0);
-    GNUNET_free (client_ctx);
-  }
+  send_client_status_msg (barrier->mc, barrier->name, barrier->status, emsg);
 }
 
 
@@ -482,7 +463,7 @@ handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
   struct GNUNET_HashCode key;
   size_t name_len;
   uint16_t msize;
-  
+
   msize = ntohs (message->size);
   if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
   {
@@ -501,25 +482,31 @@ handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
   name = GNUNET_malloc (name_len + 1);
   name[name_len] = '\0';
   (void) memcpy (name, msg->name, name_len);
-  GNUNET_CRYPTO_hash (name, name_len - 1, &key);
+  LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n", name);
+  GNUNET_CRYPTO_hash (name, name_len, &key);
+  GNUNET_free (name);
   if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
   {
     GNUNET_break (0);
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-    GNUNET_free (name);
     return;
   }
   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
   if (NULL == client_ctx)
   {
-    client_ctx = GNUNET_malloc (sizeof (struct ClientCtx));
+    client_ctx = GNUNET_new (struct ClientCtx);
     client_ctx->client = client;
     GNUNET_SERVER_client_keep (client);
     client_ctx->barrier = barrier;
     GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
-    barrier->nreached++;
-    if (LOCAL_QUORUM_REACHED (barrier))
-      notify_task_cb (barrier, NULL);
+    GNUNET_SERVER_client_set_user_context (client, client_ctx);
+  }
+  barrier->nreached++;
+  if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
+        && (LOCAL_QUORUM_REACHED (barrier)))
+  {
+    barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
+    send_barrier_status_msg (barrier, NULL);
   }
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -537,17 +524,20 @@ static void
 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
 {
   struct ClientCtx *client_ctx;
-  
+
+  if (NULL == client)
+    return;
   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
   if (NULL == client_ctx)
-    return;                     /* We only set user context for locally
-                                   connected clients */
+    return;
   cleanup_clientctx (client_ctx);
 }
 
 
 /**
  * Function to initialise barrriers component
+ *
+ * @param cfg the configuration to use for initialisation
  */
 void
 GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
@@ -563,7 +553,31 @@ GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
                               GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
   srv = GNUNET_SERVICE_get_server (ctx);
   GNUNET_SERVER_add_handlers (srv, message_handlers);
-  GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);  
+  GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);
+}
+
+
+/**
+ * Iterator over hash map entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to
+ *         iterate,
+ *         #GNUNET_NO if not.
+ */
+static int
+barrier_destroy_iterator (void *cls,
+                          const struct GNUNET_HashCode *key,
+                          void *value)
+{
+  struct Barrier *barrier = value;
+
+  GNUNET_assert (NULL != barrier);
+  cancel_wrappers (barrier);
+  remove_barrier (barrier);
+  return GNUNET_YES;
 }
 
 
@@ -571,9 +585,13 @@ GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
  * Function to stop the barrier service
  */
 void
-GST_barriers_stop ()
+GST_barriers_destroy ()
 {
   GNUNET_assert (NULL != barrier_map);
+  GNUNET_assert (GNUNET_SYSERR !=
+                 GNUNET_CONTAINER_multihashmap_iterate (barrier_map,
+                                                        &barrier_destroy_iterator,
+                                                        NULL));
   GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
   GNUNET_assert (NULL != ctx);
   GNUNET_SERVICE_stop (ctx);
@@ -587,13 +605,13 @@ GST_barriers_stop ()
  *
  * @param cls the closure given to GNUNET_TESTBED_barrier_init()
  * @param name the name of the barrier
- * @param barrier the barrier handle
+ * @param b_ the barrier handle
  * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
  *   GNUNET_SYSERR upon error
  * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
  *   error messsage
  */
-static void 
+static void
 wbarrier_status_cb (void *cls, const char *name,
                     struct GNUNET_TESTBED_Barrier *b_,
                     enum GNUNET_TESTBED_BarrierStatus status,
@@ -608,18 +626,18 @@ wbarrier_status_cb (void *cls, const char *name,
   GNUNET_free (wrapper);
   switch (status)
   {
-  case BARRIER_STATUS_ERROR:
+  case GNUNET_TESTBED_BARRIERSTATUS_ERROR:
     LOG (GNUNET_ERROR_TYPE_ERROR,
          "Initialising barrier `%s' failed at a sub-controller: %s\n",
          barrier->name, (NULL != emsg) ? emsg : "NULL");
     cancel_wrappers (barrier);
     if (NULL == emsg)
       emsg = "Initialisation failed at a sub-controller";
-    barrier->status = BARRIER_STATUS_ERROR;
+    barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
     send_barrier_status_msg (barrier, emsg);
     return;
-  case BARRIER_STATUS_CROSSED:
-    if (BARRIER_STATUS_INITIALISED != barrier->status)
+  case GNUNET_TESTBED_BARRIERSTATUS_CROSSED:
+    if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED != barrier->status)
     {
       GNUNET_break_op (0);
       return;
@@ -628,11 +646,11 @@ wbarrier_status_cb (void *cls, const char *name,
     if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
         && (LOCAL_QUORUM_REACHED (barrier)))
     {
-      barrier->status = BARRIER_STATUS_CROSSED;
+      barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
       send_barrier_status_msg (barrier, NULL);
     }
     return;
-  case BARRIER_STATUS_INITIALISED:
+  case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED:
     if (0 != barrier->status)
     {
       GNUNET_break_op (0);
@@ -641,7 +659,7 @@ wbarrier_status_cb (void *cls, const char *name,
     barrier->num_wbarriers_inited++;
     if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
     {
-      barrier->status = BARRIER_STATUS_INITIALISED;
+      barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
       send_barrier_status_msg (barrier, NULL);
     }
     return;
@@ -653,16 +671,16 @@ wbarrier_status_cb (void *cls, const char *name,
  * Function called upon timeout while waiting for a response from the
  * subcontrollers to barrier init message
  *
- * @param 
- * @return 
+ * @param cls barrier
+ * @param tc scheduler task context
  */
 static void
 fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct Barrier *barrier = cls;
-  
+
   cancel_wrappers (barrier);
-  barrier->status = BARRIER_STATUS_ERROR;
+  barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
   send_barrier_status_msg (barrier,
                            "Timedout while propagating barrier initialisation\n");
   remove_barrier (barrier);
@@ -694,7 +712,7 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
   size_t name_len;
   unsigned int cnt;
   uint16_t msize;
-  
+
   if (NULL == GST_context)
   {
     GNUNET_break_op (0);
@@ -719,20 +737,21 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
   name = GNUNET_malloc (name_len + 1);
   (void) memcpy (name, msg->name, name_len);
   GNUNET_CRYPTO_hash (name, name_len, &hash);
+  LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n", name);
   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
   {
-  
-    send_client_status_msg (client, name, BARRIER_STATUS_ERROR,
+
+    send_client_status_msg (client, name, GNUNET_TESTBED_BARRIERSTATUS_ERROR,
                             "A barrier with the same name already exists");
     GNUNET_free (name);
     GNUNET_SERVER_receive_done (client, GNUNET_OK);
     return;
   }
-  barrier = GNUNET_malloc (sizeof (struct Barrier));
+  barrier = GNUNET_new (struct Barrier);
   (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
   barrier->quorum = msg->quorum;
   barrier->name = name;
-  barrier->client = client;
+  barrier->mc = client;
   GNUNET_SERVER_client_keep (client);
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multihashmap_put (barrier_map,
@@ -750,18 +769,21 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
       GNUNET_break (0);/* May happen when we are connecting to the controller */
       continue;
     }
-    wrapper = GNUNET_malloc (sizeof (struct WBarrier));
+    wrapper = GNUNET_new (struct WBarrier);
     wrapper->barrier = barrier;
     GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
-    wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller,
-                                                     barrier->name,
-                                                     barrier->quorum,
-                                                     &wbarrier_status_cb,
-                                                     wrapper);    
+    wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (slave->controller,
+                                                      barrier->name,
+                                                      barrier->quorum,
+                                                      &wbarrier_status_cb,
+                                                      wrapper,
+                                                      GNUNET_NO);
   }
   if (NULL == barrier->whead)   /* No further propagation */
   {
-    barrier->status = BARRIER_STATUS_INITIALISED;
+    barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
+    LOG_DEBUG ("Sending GNUNET_TESTBED_BARRIERSTATUS_INITIALISED for barrier `%s'\n",
+               barrier->name);
     send_barrier_status_msg (barrier, NULL);
   }else
     barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
@@ -798,7 +820,7 @@ GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
     GNUNET_break_op (0);
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
-  }  
+  }
   if (client != GST_context->client)
   {
     GNUNET_break_op (0);
@@ -827,7 +849,88 @@ GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
   GNUNET_assert (NULL != barrier);
   cancel_wrappers (barrier);
   remove_barrier (barrier);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);  
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/**
+ * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
+ * This handler is queued in the main service and will handle the messages sent
+ * either from the testbed driver or from a high level controller
+ *
+ * @param cls NULL
+ * @param client identification of the client
+ * @param message the actual message
+ */
+void
+GST_handle_barrier_status (void *cls, struct GNUNET_SERVER_Client *client,
+                           const struct GNUNET_MessageHeader *message)
+{
+  const struct GNUNET_TESTBED_BarrierStatusMsg *msg;
+  struct Barrier *barrier;
+  struct ClientCtx *client_ctx;
+  const char *name;
+  struct GNUNET_HashCode key;
+  enum GNUNET_TESTBED_BarrierStatus status;
+  uint16_t msize;
+  uint16_t name_len;
+
+  if (NULL == GST_context)
+  {
+    GNUNET_break_op (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  if (client != GST_context->client)
+  {
+    GNUNET_break_op (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  msize = ntohs (message->size);
+  if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg))
+  {
+    GNUNET_break_op (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  msg = (const struct GNUNET_TESTBED_BarrierStatusMsg *) message;
+  status = ntohs (msg->status);
+  if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status)
+  {
+    GNUNET_break_op (0);        /* current we only expect BARRIER_CROSSED
+                                   status message this way */
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  name = msg->data;
+  name_len = ntohs (msg->name_len);
+  if ((sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1) != msize)
+  {
+    GNUNET_break_op (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  if ('\0' != name[name_len])
+  {
+    GNUNET_break_op (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  GNUNET_CRYPTO_hash (name, name_len, &key);
+  barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key);
+  if (NULL == barrier)
+  {
+    GNUNET_break_op (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  while (NULL != (client_ctx = barrier->head)) /* Notify peers */
+  {
+    queue_message (client_ctx, GNUNET_copy_message (message));
+    GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
+  }
 }
 
 /* end of gnunet-service-testbed_barriers.c */