};
+/**
+ * Collection of the state necessary to read and write gnunet messages
+ * to a stream socket. Should be used as closure for stream_data_processor.
+ */
+struct MQStreamState
+{
+ /**
+ * Message stream tokenizer for the data received from the
+ * stream socket.
+ */
+ struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+
+ /**
+ * The stream socket to use for receiving and transmitting
+ * messages with the message queue.
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
+ * Current read handle, NULL if no read active.
+ */
+ struct GNUNET_STREAM_ReadHandle *rh;
+
+ /**
+ * Current write handle, NULL if no write active.
+ */
+ struct GNUNET_STREAM_WriteHandle *wh;
+};
+
+
+
/**
* Default value in seconds for various timeouts
*/
cleanup_read_handle (socket);
}
+
+/**
+ * Functions of this signature are called whenever writing operations
+ * on a stream are executed
+ *
+ * @param cls the closure from GNUNET_STREAM_write
+ * @param status the status of the stream at the time this function is called;
+ * GNUNET_STREAM_OK if writing to stream was completed successfully;
+ * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
+ * (this doesn't mean that the data is never sent, the receiver may
+ * have read the data but its ACKs may have been lost);
+ * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
+ * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
+ * be processed.
+ * @param size the number of bytes written
+ */
+static void
+mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
+{
+ struct GNUNET_MQ_MessageQueue *mq = cls;
+ struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
+ struct GNUNET_MQ_Message *mqm;
+
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+
+ /* call cb for message we finished sending */
+ mqm = mq->current_msg;
+ GNUNET_assert (NULL != mq->current_msg);
+ if (NULL != mqm->sent_cb)
+ mqm->sent_cb (mqm->sent_cls);
+ GNUNET_free (mqm);
+
+ mss->wh = NULL;
+
+ mqm = mq->msg_head;
+ mq->current_msg = mqm;
+ if (NULL == mqm)
+ return;
+ GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
+ mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ mq_stream_write_queued, mq);
+ GNUNET_assert (NULL != mss->wh);
+}
+
+
+static void
+mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq,
+ struct GNUNET_MQ_Message *mqm)
+{
+ struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
+
+ if (NULL != mq->current_msg)
+ {
+ GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
+ return;
+ }
+ mq->current_msg = mqm;
+ mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ mq_stream_write_queued, mq);
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete message is received by the tokenizer.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param message the actual message
+ *
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+mq_stream_mst_callback (void *cls, void *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct GNUNET_MQ_MessageQueue *mq = cls;
+
+ GNUNET_assert (NULL != message);
+ GNUNET_MQ_dispatch (mq, message);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Functions of this signature are called whenever data is available from the
+ * stream.
+ *
+ * @param cls the closure from GNUNET_STREAM_read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read; will be 0 on timeout
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+mq_stream_data_processor (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *data,
+ size_t size)
+{
+ struct GNUNET_MQ_MessageQueue *mq = cls;
+ struct MQStreamState *mss;
+ int ret;
+
+ mss = (struct MQStreamState *) mq->impl_state;
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+ ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
+ GNUNET_assert (GNUNET_OK == ret);
+ /* we always read all data */
+ mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
+ mq_stream_data_processor, mq);
+ return size;
+}
+
+
+static void
+mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
+{
+ struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
+
+ if (NULL != mss->rh)
+ {
+ GNUNET_STREAM_read_cancel (mss->rh);
+ mss->rh = NULL;
+ }
+
+ if (NULL != mss->wh)
+ {
+ GNUNET_STREAM_write_cancel (mss->wh);
+ mss->wh = NULL;
+ }
+
+ if (NULL != mss->mst)
+ {
+ GNUNET_SERVER_mst_destroy (mss->mst);
+ mss->mst = NULL;
+ }
+
+ GNUNET_free (mss);
+}
+
+
+
+/**
+ * Create a message queue for a stream socket.
+ *
+ * @param socket the socket to read/write in the message queue
+ * @param msg_handlers message handler array
+ * @param error_handler callback for errors
+ * @return the message queue for the socket
+ */
+struct GNUNET_MQ_MessageQueue *
+GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_MQ_Handler *msg_handlers,
+ GNUNET_MQ_ErrorHandler error_handler,
+ void *cls)
+{
+ struct GNUNET_MQ_MessageQueue *mq;
+ struct MQStreamState *mss;
+
+ mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
+ mss = GNUNET_new (struct MQStreamState);
+ mss->socket = socket;
+ mq->impl_state = mss;
+ mq->send_impl = mq_stream_send_impl;
+ mq->destroy_impl = mq_stream_destroy_impl;
+ mq->handlers = msg_handlers;
+ mq->handlers_cls = cls;
+ if (NULL != msg_handlers)
+ {
+ mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq);
+ mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
+ mq_stream_data_processor, mq);
+ }
+ return mq;
+}
+
/* end of stream_api.c */