-remove trailing whitespace
[oweals/gnunet.git] / src / consensus / consensus_api.c
index 19bf81c86205f88ef91729ced62bcf2625465beb..d8e65c52fd202ecd150737219c9189183b605cb8 100644 (file)
@@ -20,7 +20,7 @@
 
 /**
  * @file consensus/consensus_api.c
- * @brief 
+ * @brief
  * @author Florian Dold
  */
 #include "platform.h"
 
 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
 
-/**
- * Actions that can be queued.
- */
-struct QueuedMessage
-{
-  /**
-   * Queued messages are stored in a doubly linked list.
-   */
-  struct QueuedMessage *next;
-
-  /**
-   * Queued messages are stored in a doubly linked list.
-   */
-  struct QueuedMessage *prev;
-
-  /**
-   * The actual queued message.
-   */
-  struct GNUNET_MessageHeader *msg;
-
-  /**
-   * Will be called after transmit, if not NULL
-   */
-  GNUNET_CONSENSUS_InsertDoneCallback idc;
-
-  /**
-   * Closure for idc
-   */
-  void *idc_cls;
-};
-
 
 /**
  * Handle for the service.
@@ -95,31 +64,11 @@ struct GNUNET_CONSENSUS_Handle
    */
   struct GNUNET_HashCode session_id;
 
-  /**
-   * Number of peers in the consensus. Optionally includes the local peer.
-   */
-  int num_peers;
-
-  /**
-   * Peer identities of peers participating in the consensus, includes the local peer.
-   */
-  struct GNUNET_PeerIdentity **peers;
-
-  /**
-   * Currently active transmit request.
-   */
-  struct GNUNET_CLIENT_TransmitHandle *th;
-
   /**
    * GNUNES_YES iff the join message has been sent to the service.
    */
   int joined;
 
-  /**
-   * Closure for the insert done callback.
-   */
-  void *idc_cls;
-
   /**
    * Called when the conclude operation finishes or fails.
    */
@@ -135,124 +84,37 @@ struct GNUNET_CONSENSUS_Handle
    */
   struct GNUNET_TIME_Absolute conclude_deadline;
 
-  unsigned int conclude_min_size;
-
-  struct QueuedMessage *messages_head;
-  struct QueuedMessage *messages_tail;
-
   /**
-   * GNUNET_YES when currently in a section where destroy may not be
-   * called.
+   * Message queue for the client.
    */
-  int may_not_destroy;
+  struct GNUNET_MQ_Handle *mq;
 };
 
-
-
-/**
- * Schedule transmitting the next message.
- *
- * @param consensus consensus handle
- */
-static void
-send_next (struct GNUNET_CONSENSUS_Handle *consensus);
-
-
 /**
- * Function called to notify a client about the connection
- * begin ready to queue more data.  "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * FIXME: this should not bee necessary when the API
+ * issue has been fixed
  */
-static size_t
-transmit_queued (void *cls, size_t size,
-                 void *buf)
+struct InsertDoneInfo
 {
-  struct GNUNET_CONSENSUS_Handle *consensus;
-  struct QueuedMessage *qmsg;
-  size_t msg_size;
-
-  consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
-  consensus->th = NULL;
-
-  qmsg = consensus->messages_head;
-  GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
-
-  if (NULL == buf)
-  {
-    if (NULL != qmsg->idc)
-    {
-      qmsg->idc (qmsg->idc_cls, GNUNET_YES);
-    }
-    return 0;
-  }
-
-  msg_size = ntohs (qmsg->msg->size);
-
-  GNUNET_assert (size >= msg_size);
-
-  memcpy (buf, qmsg->msg, msg_size);
-  if (NULL != qmsg->idc)
-  {
-    qmsg->idc (qmsg->idc_cls, GNUNET_YES);
-  }
-
-  /* FIXME: free the messages */
-
-  send_next (consensus);
-
-  return msg_size;
-}
-
-
-/**
- * Schedule transmitting the next message.
- *
- * @param consensus consensus handle
- */
-static void
-send_next (struct GNUNET_CONSENSUS_Handle *consensus)
-{
-  if (NULL != consensus->th)
-    return;
-
-  if (NULL != consensus->messages_head)
-  {
-    consensus->th = 
-        GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             GNUNET_NO, &transmit_queued, consensus);
-  }
-}
-
-static void
-queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg)
-{
-  struct QueuedMessage *qm;
-  qm = GNUNET_malloc (sizeof *qm);
-  qm->msg = msg;
-  GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm);
-}
+  GNUNET_CONSENSUS_InsertDoneCallback idc;
+  void *cls;
+};
 
 
 /**
  * Called when the server has sent is a new element
- * 
- * @param consensus consensus handle
- * @param msg element message
+ *
+ * @param cls consensus handle
+ * @param mh element message
  */
 static void
-handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
-                   struct GNUNET_CONSENSUS_ElementMessage *msg)
+handle_new_element (void *cls,
+                    const struct GNUNET_MessageHeader *mh)
 {
-  struct GNUNET_CONSENSUS_Element element;
-  struct GNUNET_CONSENSUS_AckMessage *ack_msg;
-  int ret;
+  struct GNUNET_CONSENSUS_Handle *consensus = cls;
+  const struct GNUNET_CONSENSUS_ElementMessage *msg
+      = (const struct GNUNET_CONSENSUS_ElementMessage *) mh;
+  struct GNUNET_SET_Element element;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
 
@@ -260,122 +122,30 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
   element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
   element.data = &msg[1];
 
-  ret = consensus->new_element_cb (consensus->new_element_cls, &element);
-
-  ack_msg = GNUNET_malloc (sizeof *ack_msg);
-  ack_msg->header.size = htons (sizeof *ack_msg);
-  ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
-  ack_msg->keep = ret;
-
-  queue_message (consensus, (struct GNUNET_MessageHeader *) ack_msg);
-
-  send_next (consensus);
+  consensus->new_element_cb (consensus->new_element_cls, &element);
 }
 
 
 /**
  * Called when the server has announced
  * that the conclusion is over.
- * 
- * @param consensus consensus handle
- * @param msg conclude done message
- */
-static void
-handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
-                     struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
-{
-  GNUNET_assert (NULL != consensus->conclude_cb);
-  consensus->may_not_destroy = GNUNET_YES;
-  consensus->conclude_cb (consensus->conclude_cls, NULL);
-  consensus->may_not_destroy = GNUNET_NO;
-  consensus->conclude_cb = NULL;
-}
-
-
-
-/**
- * Type of a function to call when we receive a message
- * from the service.
  *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
+ * @param cls consensus handle
+ * @param msg conclude done message
  */
 static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+handle_conclude_done (void *cls,
+                     const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_CONSENSUS_Handle *consensus = cls;
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n");
-
-  if (msg == NULL)
-  {
-    /* Error, timeout, death */
-    LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
-    GNUNET_CLIENT_disconnect (consensus->client);
-    consensus->client = NULL;
-    consensus->new_element_cb (consensus->new_element_cls, NULL);
-    return;
-  }
+  GNUNET_CONSENSUS_ConcludeCallback cc;
 
-  switch (ntohs (msg->type))
-  {
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
-      handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
-      break;
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
-      handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
-      break;
-    default:
-      GNUNET_break (0);
-  }
-  GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
+  GNUNET_assert (NULL != (cc = consensus->conclude_cb));
+  consensus->conclude_cb = NULL;
+  cc (consensus->conclude_cls);
 }
 
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data.  "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_join (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_CONSENSUS_JoinMessage *msg;
-  struct GNUNET_CONSENSUS_Handle *consensus;
-  int msize;
-
-  GNUNET_assert (NULL != buf);
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
-
-  consensus = cls;
-  consensus->th = NULL;
-  consensus->joined = 1;
-
-  msg = buf;
-
-  msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
-      consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
-
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
-  msg->header.size = htons (msize);
-  msg->session_id = consensus->session_id;
-  msg->num_peers = htonl (consensus->num_peers);
-  memcpy(&msg[1],
-        consensus->peers,
-        consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
-  send_next (consensus);
-  GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
-  
-  return msize;
-}
 
 /**
  * Create a consensus session.
@@ -400,40 +170,49 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
                          void *new_element_cls)
 {
   struct GNUNET_CONSENSUS_Handle *consensus;
-  size_t join_message_size;
+  struct GNUNET_CONSENSUS_JoinMessage *join_msg;
+  struct GNUNET_MQ_Envelope *ev;
+  const static struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+    {handle_new_element,
+      GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0},
+    {handle_conclude_done,
+      GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0},
+    GNUNET_MQ_HANDLERS_END
+  };
 
   consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
   consensus->cfg = cfg;
   consensus->new_element_cb = new_element_cb;
   consensus->new_element_cls = new_element_cls;
-  consensus->num_peers = num_peers;
   consensus->session_id = *session_id;
-
-  if (0 == num_peers)
-    consensus->peers = NULL;
-  else if (num_peers > 0)
-    consensus->peers =
-        GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
-
   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
+  consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client,
+                                                         mq_handlers, NULL, consensus);
 
   GNUNET_assert (consensus->client != NULL);
 
-  join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
-      (num_peers * sizeof (struct GNUNET_PeerIdentity));
-
-  consensus->th =
-      GNUNET_CLIENT_notify_transmit_ready (consensus->client,
-                                           join_message_size,
-                                           GNUNET_TIME_UNIT_FOREVER_REL,
-                                           GNUNET_NO, &transmit_join, consensus);
+  ev = GNUNET_MQ_msg_extra (join_msg,
+                            (num_peers * sizeof (struct GNUNET_PeerIdentity)),
+                            GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
 
+  join_msg->session_id = consensus->session_id;
+  join_msg->num_peers = htonl (num_peers);
+  memcpy(&join_msg[1],
+        peers,
+        num_peers * sizeof (struct GNUNET_PeerIdentity));
 
-  GNUNET_assert (consensus->th != NULL);
+  GNUNET_MQ_send (consensus->mq, ev);
   return consensus;
 }
 
 
+static void
+idc_adapter (void *cls)
+{
+  struct InsertDoneInfo *i = cls;
+  i->idc (i->cls, GNUNET_OK);
+  GNUNET_free (i);
+}
 
 /**
  * Insert an element in the set being reconsiled.  Must not be called after
@@ -441,38 +220,35 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
  *
  * @param consensus handle for the consensus session
  * @param element the element to be inserted
- * @param idc function called when we are done with this element and it 
+ * @param idc function called when we are done with this element and it
  *            is thus allowed to call GNUNET_CONSENSUS_insert again
  * @param idc_cls closure for 'idc'
  */
 void
 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
-                        const struct GNUNET_CONSENSUS_Element *element,
+                        const struct GNUNET_SET_Element *element,
                         GNUNET_CONSENSUS_InsertDoneCallback idc,
                         void *idc_cls)
 {
-  struct QueuedMessage *qmsg;
   struct GNUNET_CONSENSUS_ElementMessage *element_msg;
-  size_t element_msg_size;
+  struct GNUNET_MQ_Envelope *ev;
+  struct InsertDoneInfo *i;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
 
-  element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
-                               element->size);
+  ev = GNUNET_MQ_msg_extra (element_msg, element->size,
+                            GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
 
-  element_msg = GNUNET_malloc (element_msg_size);
-  element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
-  element_msg->header.size = htons (element_msg_size);
   memcpy (&element_msg[1], element->data, element->size);
 
-  qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
-  qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
-  qmsg->idc = idc;
-  qmsg->idc_cls = idc_cls;
-
-  GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
-
-  send_next (consensus);
+  if (NULL != idc)
+  {
+    i = GNUNET_new (struct InsertDoneInfo);
+    i->idc = idc;
+    i->cls = idc_cls;
+    GNUNET_MQ_notify_sent (ev, idc_adapter, i);
+  }
+  GNUNET_MQ_send (consensus->mq, ev);
 }
 
 
@@ -491,11 +267,10 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
 void
 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
                           struct GNUNET_TIME_Relative timeout,
-                          unsigned int min_group_size_in_consensus,
                           GNUNET_CONSENSUS_ConcludeCallback conclude,
                           void *conclude_cls)
 {
-  struct QueuedMessage *qmsg;
+  struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
 
   GNUNET_assert (NULL != conclude);
@@ -504,18 +279,10 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
   consensus->conclude_cls = conclude_cls;
   consensus->conclude_cb = conclude;
 
-  conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
-  conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
-  conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
+  ev = GNUNET_MQ_msg (conclude_msg, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
   conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
-  conclude_msg->min_group_size = min_group_size_in_consensus;
-
-  qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
-  qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
 
-  GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
-
-  send_next (consensus);
+  GNUNET_MQ_send (consensus->mq, ev);
 }
 
 
@@ -528,18 +295,11 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
 void
 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
 {
-  if (GNUNET_YES == consensus->may_not_destroy)
-  {
-    LOG (GNUNET_ERROR_TYPE_ERROR, "destroy may not be called right now\n");
-    GNUNET_assert (0);
-  }
   if (consensus->client != NULL)
   {
     GNUNET_CLIENT_disconnect (consensus->client);
     consensus->client = NULL;
   }
-  if (NULL != consensus->peers)
-    GNUNET_free (consensus->peers);
   GNUNET_free (consensus);
 }