- moved MQ to util
[oweals/gnunet.git] / src / stream / stream_api.c
index 8994afc243a2299e5559ffbb5b56c3454d4ae71c..b4a47b53d4152f46d5cdae840ff7634831a5bccd 100644 (file)
@@ -578,6 +578,37 @@ struct GNUNET_STREAM_ShutdownHandle
 };
 
 
+/**
+ * Collection of the state necessary to read and write gnunet messages 
+ * to a stream socket. Should be used as closure for stream_data_processor.
+ */
+struct MQStreamState
+{
+  /**
+   * Message stream tokenizer for the data received from the
+   * stream socket.
+   */
+  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+
+  /**
+   * The stream socket to use for receiving and transmitting
+   * messages with the message queue.
+   */
+  struct GNUNET_STREAM_Socket *socket;
+
+  /**
+   * Current read handle, NULL if no read active.
+   */
+  struct GNUNET_STREAM_ReadHandle *rh;
+
+  /**
+   * Current write handle, NULL if no write active.
+   */
+  struct GNUNET_STREAM_WriteHandle *wh;
+};
+
+
+
 /**
  * Default value in seconds for various timeouts
  */
@@ -3731,4 +3762,186 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh)
   cleanup_read_handle (socket);
 }
 
+
+/**
+ * Functions of this signature are called whenever writing operations
+ * on a stream are executed
+ *
+ * @param cls the closure from GNUNET_STREAM_write
+ * @param status the status of the stream at the time this function is called;
+ *          GNUNET_STREAM_OK if writing to stream was completed successfully;
+ *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
+ *          (this doesn't mean that the data is never sent, the receiver may
+ *          have read the data but its ACKs may have been lost);
+ *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
+ *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
+ *          be processed.
+ * @param size the number of bytes written
+ */
+static void 
+mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
+{
+  struct GNUNET_MQ_MessageQueue *mq = cls;
+  struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
+  struct GNUNET_MQ_Message *mqm;
+
+  GNUNET_assert (GNUNET_STREAM_OK == status);
+  
+  /* call cb for message we finished sending */
+  mqm = mq->current_msg;
+  GNUNET_assert (NULL != mq->current_msg);
+  if (NULL != mqm->sent_cb)
+    mqm->sent_cb (mqm->sent_cls);
+  GNUNET_free (mqm);
+
+  mss->wh = NULL;
+
+  mqm = mq->msg_head;
+  mq->current_msg = mqm;
+  if (NULL == mqm)
+    return;
+  GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
+  mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
+                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                 mq_stream_write_queued, mq);
+  GNUNET_assert (NULL != mss->wh);
+}
+
+
+static void
+mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq,
+                     struct GNUNET_MQ_Message *mqm)
+{
+  struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
+
+  if (NULL != mq->current_msg)
+  {
+    GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
+    return;
+  }
+  mq->current_msg = mqm;
+  mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
+                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                 mq_stream_write_queued, mq);
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete message is received by the tokenizer.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param message the actual message
+ *
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+mq_stream_mst_callback (void *cls, void *client,
+                     const struct GNUNET_MessageHeader *message)
+{
+  struct GNUNET_MQ_MessageQueue *mq = cls;
+
+  GNUNET_assert (NULL != message);
+  GNUNET_MQ_dispatch (mq, message);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Functions of this signature are called whenever data is available from the
+ * stream.
+ *
+ * @param cls the closure from GNUNET_STREAM_read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read; will be 0 on timeout 
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ *         given to the next time the read processor is called).
+ */
+static size_t
+mq_stream_data_processor (void *cls,
+                       enum GNUNET_STREAM_Status status,
+                       const void *data,
+                       size_t size)
+{
+  struct GNUNET_MQ_MessageQueue *mq = cls;
+  struct MQStreamState *mss;
+  int ret;
+  
+  mss = (struct MQStreamState *) mq->impl_state;
+  GNUNET_assert (GNUNET_STREAM_OK == status);
+  ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
+  GNUNET_assert (GNUNET_OK == ret);
+  /* we always read all data */
+    mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, 
+                                  mq_stream_data_processor, mq);
+  return size;
+}
+
+
+static void
+mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
+{
+  struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
+
+  if (NULL != mss->rh)
+  {
+    GNUNET_STREAM_read_cancel (mss->rh);
+    mss->rh = NULL;
+  }
+
+  if (NULL != mss->wh)
+  {
+    GNUNET_STREAM_write_cancel (mss->wh);
+    mss->wh = NULL;
+  }
+
+  if (NULL != mss->mst)
+  {
+    GNUNET_SERVER_mst_destroy (mss->mst);
+    mss->mst = NULL;
+  }
+
+  GNUNET_free (mss);
+}
+
+
+
+/**
+ * Create a message queue for a stream socket.
+ *
+ * @param socket the socket to read/write in the message queue
+ * @param msg_handlers message handler array
+ * @param error_handler callback for errors
+ * @return the message queue for the socket
+ */
+struct GNUNET_MQ_MessageQueue *
+GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, 
+                         const struct GNUNET_MQ_Handler *msg_handlers,
+                         GNUNET_MQ_ErrorHandler error_handler,
+                         void *cls)
+{
+  struct GNUNET_MQ_MessageQueue *mq;
+  struct MQStreamState *mss;
+
+  mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
+  mss = GNUNET_new (struct MQStreamState);
+  mss->socket = socket;
+  mq->impl_state = mss;
+  mq->send_impl = mq_stream_send_impl;
+  mq->destroy_impl = mq_stream_destroy_impl;
+  mq->handlers = msg_handlers;
+  mq->handlers_cls = cls;
+  if (NULL != msg_handlers)
+  {
+    mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq);
+    mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 
+                                  mq_stream_data_processor, mq);
+  }
+  return mq;
+}
+
 /* end of stream_api.c */