- fix 2699
[oweals/gnunet.git] / src / consensus / consensus_api.c
index 2479c019c571f125e8b5e5a2d849a194e1a633c9..7ebb0a9d94c23b4f4cad390c97a2999462dcdab2 100644 (file)
 
 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
 
-struct ElementAck
+/**
+ * Actions that can be queued.
+ */
+struct QueuedMessage
 {
-  struct ElementAck *next;
-  struct ElementAck *prev;
-  int keep;
-  struct GNUNET_CONSENSUS_Element *element;
+  /**
+   * Queued messages are stored in a doubly linked list.
+   */
+  struct QueuedMessage *next;
+
+  /**
+   * Queued messages are stored in a doubly linked list.
+   */
+  struct QueuedMessage *prev;
+
+  /**
+   * The actual queued message.
+   */
+  struct GNUNET_MessageHeader *msg;
+
+  /**
+   * Will be called after transmit, if not NULL
+   */
+  GNUNET_CONSENSUS_InsertDoneCallback idc;
+
+  /**
+   * Closure for idc
+   */
+  void *idc_cls;
 };
 
+
 /**
  * Handle for the service.
  */
@@ -52,14 +76,14 @@ struct GNUNET_CONSENSUS_Handle
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Socket (if available).
+   * Client connected to the consensus service, may be NULL if not connected.
    */
   struct GNUNET_CLIENT_Connection *client;
 
   /**
    * Callback for new elements. Not called for elements added locally.
    */
-  GNUNET_CONSENSUS_NewElementCallback new_element_cb;
+  GNUNET_CONSENSUS_ElementCallback new_element_cb;
 
   /**
    * Closure for new_element_cb
@@ -67,7 +91,7 @@ struct GNUNET_CONSENSUS_Handle
   void *new_element_cls;
 
   /**
-   * Session identifier for the consensus session.
+   * The (local) session identifier for the consensus session.
    */
   struct GNUNET_HashCode session_id;
 
@@ -77,9 +101,9 @@ struct GNUNET_CONSENSUS_Handle
   int num_peers;
 
   /**
-   * Peer identities of peers in the consensus. Optionally includes the local peer.
+   * Peer identities of peers participating in the consensus, includes the local peer.
    */
-  struct GNUNET_PeerIdentity *peers;
+  struct GNUNET_PeerIdentity **peers;
 
   /**
    * Currently active transmit request.
@@ -91,22 +115,11 @@ struct GNUNET_CONSENSUS_Handle
    */
   int joined;
 
-  /**
-   * Called when the current insertion operation finishes.
-   * NULL if there is no insert operation active.
-   */
-  GNUNET_CONSENSUS_InsertDoneCallback idc;
-
   /**
    * Closure for the insert done callback.
    */
   void *idc_cls;
 
-  /**
-   * An element that was requested to be inserted.
-   */
-  struct GNUNET_CONSENSUS_Element *insert_element;
-
   /**
    * Called when the conclude operation finishes or fails.
    */
@@ -122,103 +135,105 @@ struct GNUNET_CONSENSUS_Handle
    */
   struct GNUNET_TIME_Absolute conclude_deadline;
 
-  struct ElementAck *ack_head;
-  struct ElementAck *ack_tail;
-
-  /**
-   * Set to GNUNET_YES if the begin message has been transmitted to the service
-   */
-  int begin_sent;
+  unsigned int conclude_min_size;
 
-  /**
-   * Set to GNUNET_YES it the begin message should be transmitted to the service
-   */
-  int begin_requested;
+  struct QueuedMessage *messages_head;
+  struct QueuedMessage *messages_tail;
 };
 
 
-static size_t
-transmit_ack (void *cls, size_t size, void *buf);
-
-static size_t
-transmit_insert (void *cls, size_t size, void *buf);
-
-static size_t
-transmit_conclude (void *cls, size_t size, void *buf);
-
-static size_t
-transmit_begin (void *cls, size_t size, void *buf);
-
 
 /**
- * Call notify_transmit_ready for ack if necessary and possible.
+ * Schedule transmitting the next message.
+ *
+ * @param consensus consensus handle
  */
 static void
-ntr_ack (struct GNUNET_CONSENSUS_Handle *consensus)
-{
-  if ((NULL == consensus->th) && (NULL != consensus->ack_head))
-  {
-    consensus->th =
-        GNUNET_CLIENT_notify_transmit_ready (consensus->client,
-                                             sizeof (struct GNUNET_CONSENSUS_AckMessage),
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             GNUNET_NO, &transmit_ack, consensus);
-  }
-}
+send_next (struct GNUNET_CONSENSUS_Handle *consensus);
 
 
 /**
- * Call notify_transmit_ready for ack if necessary and possible.
+ * Function called to notify a client about the connection
+ * begin ready to queue more data.  "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
  */
-static void
-ntr_insert (struct GNUNET_CONSENSUS_Handle *consensus)
+static size_t
+transmit_queued (void *cls, size_t size,
+                 void *buf)
 {
-  if ((NULL == consensus->th) && (NULL != consensus->insert_element))
+  struct GNUNET_CONSENSUS_Handle *consensus;
+  struct QueuedMessage *qmsg;
+  size_t msg_size;
+
+  consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
+  consensus->th = NULL;
+
+  qmsg = consensus->messages_head;
+  GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
+
+  if (NULL == buf)
   {
-    consensus->th =
-        GNUNET_CLIENT_notify_transmit_ready (consensus->client,
-                                             sizeof (struct GNUNET_CONSENSUS_ElementMessage) + 
-                                                consensus->insert_element->size,
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             GNUNET_NO, &transmit_insert, consensus);
+    if (NULL != qmsg->idc)
+    {
+      qmsg->idc (qmsg->idc_cls, GNUNET_YES);
+    }
+    return 0;
   }
-}
 
+  msg_size = ntohs (qmsg->msg->size);
 
-/**
- * Call notify_transmit_ready for ack if necessary and possible.
- */
-static void
-ntr_conclude (struct GNUNET_CONSENSUS_Handle *consensus)
-{
-  if ((NULL == consensus->th) && (NULL != consensus->conclude_cb))
+  GNUNET_assert (size >= msg_size);
+
+  memcpy (buf, qmsg->msg, msg_size);
+  if (NULL != qmsg->idc)
   {
-    consensus->th =
-        GNUNET_CLIENT_notify_transmit_ready (consensus->client,
-                                             sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
-                                             GNUNET_TIME_absolute_get_remaining (consensus->conclude_deadline),
-                                             GNUNET_NO, &transmit_conclude, consensus);
+    qmsg->idc (qmsg->idc_cls, GNUNET_YES);
   }
+
+  /* FIXME: free the messages */
+
+  send_next (consensus);
+
+  return msg_size;
 }
 
 
 /**
- * Call notify_transmit_ready for ack if necessary and possible.
+ * Schedule transmitting the next message.
+ *
+ * @param consensus consensus handle
  */
 static void
-ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus)
+send_next (struct GNUNET_CONSENSUS_Handle *consensus)
 {
-  if ((NULL == consensus->th) && (GNUNET_YES == consensus->begin_requested) &&
-      (GNUNET_NO == consensus->begin_sent))
+  if (NULL != consensus->th)
+    return;
+
+  if (NULL != consensus->messages_head)
   {
-    consensus->th =
-        GNUNET_CLIENT_notify_transmit_ready (consensus->client,
-                                             sizeof (struct GNUNET_MessageHeader),
+    consensus->th = 
+        GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
                                              GNUNET_TIME_UNIT_FOREVER_REL,
-                                             GNUNET_NO, &transmit_begin, consensus);
+                                             GNUNET_NO, &transmit_queued, consensus);
   }
 }
 
+static void
+queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg)
+{
+  struct QueuedMessage *qm;
+  qm = GNUNET_malloc (sizeof *qm);
+  qm->msg = msg;
+  GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm);
+}
+
+
 /**
  * Called when the server has sent is a new element
  * 
@@ -226,23 +241,29 @@ ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus)
  * @param msg element message
  */
 static void
-handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
+handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
                    struct GNUNET_CONSENSUS_ElementMessage *msg)
 {
   struct GNUNET_CONSENSUS_Element element;
-  struct ElementAck *ack;
+  struct GNUNET_CONSENSUS_AckMessage *ack_msg;
   int ret;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
+
   element.type = msg->element_type;
-  element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
+  element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
   element.data = &msg[1];
 
   ret = consensus->new_element_cb (consensus->new_element_cls, &element);
-  ack = GNUNET_malloc (sizeof (struct ElementAck));
-  ack->keep = ret;
-  GNUNET_CONTAINER_DLL_insert_tail (consensus->ack_head, consensus->ack_tail,ack);
 
-  ntr_ack (consensus);
+  ack_msg = GNUNET_malloc (sizeof *ack_msg);
+  ack_msg->header.size = htons (sizeof *ack_msg);
+  ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
+  ack_msg->keep = ret;
+
+  queue_message (consensus, (struct GNUNET_MessageHeader *) ack_msg);
+
+  send_next (consensus);
 }
 
 
@@ -254,13 +275,11 @@ handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
  * @param msg conclude done message
  */
 static void
-handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
+handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
                      struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
 {
   GNUNET_assert (NULL != consensus->conclude_cb);
-  consensus->conclude_cb(consensus->conclude_cls,
-                         msg->num_peers,
-                         (struct GNUNET_PeerIdentity *) &msg[1]);
+  consensus->conclude_cb (consensus->conclude_cls, NULL);
   consensus->conclude_cb = NULL;
 }
 
@@ -278,7 +297,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_CONSENSUS_Handle *consensus = cls;
 
-  LOG (GNUNET_ERROR_TYPE_INFO, "received message from consensus service\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n");
 
   if (msg == NULL)
   {
@@ -287,12 +306,6 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
     GNUNET_CLIENT_disconnect (consensus->client);
     consensus->client = NULL;
     consensus->new_element_cb (NULL, NULL);
-    if (NULL != consensus->idc)
-    {
-      consensus->idc(consensus->idc_cls, GNUNET_NO);
-      consensus->idc = NULL;
-      consensus->idc_cls = NULL;
-    }
     return;
   }
 
@@ -305,108 +318,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
       handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
       break;
     default:
-      LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring");
+      GNUNET_break (0);
   }
   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
                          GNUNET_TIME_UNIT_FOREVER_REL);
 }
 
-
-
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data.  "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_ack (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_CONSENSUS_AckMessage *msg;
-  struct GNUNET_CONSENSUS_Handle *consensus;
-
-  consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
-
-  GNUNET_assert (NULL != consensus->ack_head);
-
-  msg = (struct GNUNET_CONSENSUS_AckMessage *) buf;
-  msg->keep = consensus->ack_head->keep;
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
-  msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_AckMessage));
-
-  consensus->ack_head = consensus->ack_head->next;
-
-  consensus->th = NULL;
-
-  ntr_insert (consensus);
-  ntr_ack (consensus);
-  ntr_conclude (consensus);
-
-  return sizeof (struct GNUNET_CONSENSUS_AckMessage);
-}
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data.  "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_insert (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_CONSENSUS_ElementMessage *msg;
-  struct GNUNET_CONSENSUS_Handle *consensus;
-  GNUNET_CONSENSUS_InsertDoneCallback idc;
-  int msize;
-  void *idc_cls;
-
-  GNUNET_assert (NULL != buf);
-
-  consensus = cls;
-
-  GNUNET_assert (NULL != consensus->insert_element);
-
-  consensus->th = NULL;
-
-  msg = buf;
-
-  msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
-      consensus->insert_element->size;
-
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
-  msg->header.size = htons (msize);
-  memcpy (&msg[1],
-          consensus->insert_element->data,
-          consensus->insert_element->size);
-
-  consensus->insert_element = NULL;
-
-  idc = consensus->idc;
-  consensus->idc = NULL;
-  idc_cls = consensus->idc_cls;
-  consensus->idc_cls = NULL;
-  idc (idc_cls, GNUNET_YES);
-
-
-  ntr_ack (consensus);
-  ntr_insert (consensus);
-  ntr_conclude (consensus);
-
-  return msize;
-}
-
-
 /**
  * Function called to notify a client about the connection
  * begin ready to queue more data.  "buf" will be
@@ -447,9 +364,7 @@ transmit_join (void *cls, size_t size, void *buf)
            consensus->peers,
            consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
 
-  ntr_insert (consensus);
-  ntr_begin (consensus);
-  ntr_conclude (consensus);
+  send_next (consensus);
 
   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
                          GNUNET_TIME_UNIT_FOREVER_REL);
@@ -457,88 +372,11 @@ transmit_join (void *cls, size_t size, void *buf)
   return msize;
 }
 
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data.  "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_conclude (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_CONSENSUS_ConcludeMessage *msg;
-  struct GNUNET_CONSENSUS_Handle *consensus;
-  int msize;
-
-  GNUNET_assert (NULL != buf);
-
-  consensus = cls;
-  consensus->th = NULL;
-
-  msg = buf;
-
-  msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
-
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
-  msg->header.size = htons (msize);
-  msg->timeout =
-      GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
-
-  ntr_ack (consensus);
-
-  return msize;
-}
-
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data.  "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls the consensus handle
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_begin (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_MessageHeader *msg;
-  struct GNUNET_CONSENSUS_Handle *consensus;
-  int msize;
-
-  GNUNET_assert (NULL != buf);
-
-  consensus = cls;
-  consensus->th = NULL;
-
-  msg = buf;
-
-  msize = sizeof (struct GNUNET_MessageHeader);
-
-  msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
-  msg->size = htons (msize);
-
-  ntr_ack (consensus);
-  ntr_insert (consensus);
-  ntr_conclude (consensus);
-
-  return msize;
-}
-
-
 /**
  * Create a consensus session.
  *
- * @param cfg
- * @param num_peers
+ * @param cfg configuration to use for connecting to the consensus service
+ * @param num_peers number of peers in the peers array
  * @param peers array of peers participating in this consensus session
  *              Inclusion of the local peer is optional.
  * @param session_id session identifier
@@ -553,7 +391,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
                         unsigned int num_peers,
                         const struct GNUNET_PeerIdentity *peers,
                          const struct GNUNET_HashCode *session_id,
-                         GNUNET_CONSENSUS_NewElementCallback new_element_cb,
+                         GNUNET_CONSENSUS_ElementCallback new_element_cb,
                          void *new_element_cls)
 {
   struct GNUNET_CONSENSUS_Handle *consensus;
@@ -567,17 +405,10 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
   consensus->session_id = *session_id;
 
   if (0 == num_peers)
-  {
     consensus->peers = NULL;
-  }
   else if (num_peers > 0)
-  {
-    consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
-  }
-  else
-  {
-    GNUNET_break (0);
-  }
+    consensus->peers =
+        GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
 
   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
 
@@ -615,45 +446,36 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
                         GNUNET_CONSENSUS_InsertDoneCallback idc,
                         void *idc_cls)
 {
-  GNUNET_assert (NULL == consensus->idc);
-  GNUNET_assert (NULL == consensus->insert_element);
-  GNUNET_assert (NULL == consensus->conclude_cb);
+  struct QueuedMessage *qmsg;
+  struct GNUNET_CONSENSUS_ElementMessage *element_msg;
+  size_t element_msg_size;
 
-  consensus->idc = idc;
-  consensus->idc_cls = idc_cls;
-  consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
 
-  if (consensus->joined == 0)
-  {
-    return;
-  }
+  element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
+                               element->size);
 
-  ntr_insert (consensus);
-}
+  element_msg = GNUNET_malloc (element_msg_size);
+  element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
+  element_msg->header.size = htons (element_msg_size);
+  memcpy (&element_msg[1], element->data, element->size);
 
+  qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
+  qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
+  qmsg->idc = idc;
+  qmsg->idc_cls = idc_cls;
 
-/**
- * Begin reconciling elements with other peers.
- *
- * @param consensus handle for the consensus session
- */
-void
-GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
-{
-  GNUNET_assert (NULL == consensus->idc);
-  GNUNET_assert (NULL == consensus->insert_element);
-  GNUNET_assert (GNUNET_NO == consensus->begin_requested);
-  GNUNET_assert (GNUNET_NO == consensus->begin_sent);
-
-  consensus->begin_requested = GNUNET_YES;
+  GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
 
-  ntr_begin (consensus);
+  send_next (consensus);
 }
 
 
 /**
- * We are finished inserting new elements into the consensus;
+ * We are done with inserting new elements into the consensus;
  * try to conclude the consensus within a given time window.
+ * After conclude has been called, no further elements may be
+ * inserted by the client.
  *
  * @param consensus consensus session
  * @param timeout timeout after which the conculde callback
@@ -664,20 +486,31 @@ GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
 void
 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
                           struct GNUNET_TIME_Relative timeout,
+                          unsigned int min_group_size_in_consensus,
                           GNUNET_CONSENSUS_ConcludeCallback conclude,
                           void *conclude_cls)
 {
+  struct QueuedMessage *qmsg;
+  struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
+
   GNUNET_assert (NULL != conclude);
   GNUNET_assert (NULL == consensus->conclude_cb);
 
   consensus->conclude_cls = conclude_cls;
   consensus->conclude_cb = conclude;
-  consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
 
+  conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
+  conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
+  conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
+  conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
+  conclude_msg->min_group_size = min_group_size_in_consensus;
+
+  qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
+  qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
+
+  GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
 
-  /* if transmitting the conclude message is not possible right now, transmit_join
-   * or transmit_ack will handle it */
-  ntr_conclude (consensus);
+  send_next (consensus);
 }