2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @author Florian Dold
24 * @brief general purpose request queue
30 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
33 * Signature of functions implementing the
34 * sending part of a message queue
36 * @param q the message queue
37 * @param m the message
39 typedef void (*SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m);
42 typedef void (*DestroyImpl) (struct GNUNET_MQ_MessageQueue *q);
46 * Collection of the state necessary to read and write gnunet messages
47 * to a stream socket. Should be used as closure for stream_data_processor.
49 struct MessageStreamState
51 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
52 struct MessageQueue *mq;
53 struct GNUNET_STREAM_Socket *socket;
54 struct GNUNET_STREAM_ReadHandle *rh;
55 struct GNUNET_STREAM_WriteHandle *wh;
59 struct ServerClientSocketState
61 struct GNUNET_SERVER_Client *client;
62 struct GNUNET_SERVER_TransmitHandle* th;
66 struct ClientConnectionState
68 struct GNUNET_CLIENT_Connection *connection;
69 struct GNUNET_CLIENT_TransmitHandle *th;
73 struct GNUNET_MQ_MessageQueue
76 * Handlers array, or NULL if the queue should not receive messages
78 const struct GNUNET_MQ_Handler *handlers;
81 * Closure for the handler callbacks
86 * Actual implementation of message sending,
87 * called when a message is added
92 * Implementation-dependent queue destruction function
94 DestroyImpl destroy_impl;
97 * Implementation-specific state
102 * Callback will be called when the message queue is empty
104 GNUNET_MQ_NotifyCallback empty_cb;
107 * Closure for empty_cb
112 * Callback will be called when a read error occurs.
114 GNUNET_MQ_NotifyCallback read_error_cb;
117 * Closure for read_error_cb
119 void *read_error_cls;
122 * Linked list of messages pending to be sent
124 struct GNUNET_MQ_Message *msg_head;
127 * Linked list of messages pending to be sent
129 struct GNUNET_MQ_Message *msg_tail;
132 * Message that is currently scheduled to be
133 * sent. Not the head of the message queue, as the implementation
134 * needs to know if sending has been already scheduled or not.
136 struct GNUNET_MQ_Message *current_msg;
139 * Map of associations, lazily allocated
141 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
144 * Next id that should be used for the assoc_map,
145 * initialized lazily to a random value together with
152 struct GNUNET_MQ_Message
155 * Messages are stored in a linked list
157 struct GNUNET_MQ_Message *next;
160 * Messages are stored in a linked list
162 struct GNUNET_MQ_Message *prev;
165 * Actual allocated message header,
166 * usually points to the end of the containing GNUNET_MQ_Message
168 struct GNUNET_MessageHeader *mh;
171 * Queue the message is queued in, NULL if message is not queued.
173 struct GNUNET_MQ_MessageQueue *parent_queue;
176 * Called after the message was sent irrevokably
178 GNUNET_MQ_NotifyCallback sent_cb;
181 * Closure for send_cb
188 * Call the right callback for a message received
192 dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
194 const struct GNUNET_MQ_Handler *handler;
196 handler = mq->handlers;
199 for (; NULL != handler->cb; handler++)
200 if (handler->type == ntohs (mh->type))
201 handler->cb (mq->handlers_cls, mh);
206 GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
208 GNUNET_assert (NULL == mqm->parent_queue);
214 * Send a message with the give message queue.
215 * May only be called once per message.
217 * @param mq message queue
218 * @param mqm the message to send.
221 GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
223 mq->send_impl (mq, mqm);
227 struct GNUNET_MQ_Message *
228 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
230 struct GNUNET_MQ_Message *mqm;
231 mqm = GNUNET_malloc (sizeof *mqm + size);
232 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
233 mqm->mh->size = htons (size);
234 mqm->mh->type = htons (type);
242 GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
243 const void *data, uint16_t len)
250 GNUNET_assert (NULL != mqmp);
251 old_size = ntohs ((*mqmp)->mh->size);
252 /* message too large to concatenate? */
253 if (ntohs ((*mqmp)->mh->size) + len < len)
254 return GNUNET_SYSERR;
255 new_size = old_size + len;
256 *mqmp = GNUNET_realloc (mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
257 memcpy ((*mqmp)->mh + old_size, data, new_size - old_size);
258 (*mqmp)->mh->size = htons (new_size);
264 * Functions of this signature are called whenever writing operations
265 * on a stream are executed
267 * @param cls the closure from GNUNET_STREAM_write
268 * @param status the status of the stream at the time this function is called;
269 * GNUNET_STREAM_OK if writing to stream was completed successfully;
270 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
271 * (this doesn't mean that the data is never sent, the receiver may
272 * have read the data but its ACKs may have been lost);
273 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
274 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
276 * @param size the number of bytes written
279 stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
281 struct GNUNET_MQ_MessageQueue *mq = cls;
282 struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
283 struct GNUNET_MQ_Message *mqm;
285 GNUNET_assert (GNUNET_STREAM_OK == status);
287 /* call cb for message we finished sending */
288 mqm = mq->current_msg;
291 if (NULL != mqm->sent_cb)
292 mqm->sent_cb (mqm->sent_cls);
299 mq->current_msg = mqm;
302 if (NULL != mq->empty_cb)
303 mq->empty_cb (mq->empty_cls);
306 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
307 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
308 GNUNET_TIME_UNIT_FOREVER_REL,
309 stream_write_queued, mq);
310 GNUNET_assert (NULL != mss->wh);
315 stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq,
316 struct GNUNET_MQ_Message *mqm)
318 struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
319 if (NULL != mq->current_msg)
321 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
324 mq->current_msg = mqm;
325 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
326 GNUNET_TIME_UNIT_FOREVER_REL,
327 stream_write_queued, mq);
332 * Functions with this signature are called whenever a
333 * complete message is received by the tokenizer.
335 * Do not call GNUNET_SERVER_mst_destroy in callback
338 * @param client identification of the client
339 * @param message the actual message
341 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
344 stream_mst_callback (void *cls, void *client,
345 const struct GNUNET_MessageHeader *message)
347 struct GNUNET_MQ_MessageQueue *mq = cls;
349 GNUNET_assert (NULL != message);
350 dispatch_message (mq, message);
356 * Functions of this signature are called whenever data is available from the
359 * @param cls the closure from GNUNET_STREAM_read
360 * @param status the status of the stream at the time this function is called
361 * @param data traffic from the other side
362 * @param size the number of bytes available in data read; will be 0 on timeout
363 * @return number of bytes of processed from 'data' (any data remaining should be
364 * given to the next time the read processor is called).
367 stream_data_processor (void *cls,
368 enum GNUNET_STREAM_Status status,
372 struct GNUNET_MQ_MessageQueue *mq = cls;
373 struct MessageStreamState *mss;
376 mss = (struct MessageStreamState *) mq->impl_state;
377 GNUNET_assert (GNUNET_STREAM_OK == status);
378 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
379 GNUNET_assert (GNUNET_OK == ret);
380 /* we always read all data */
381 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
382 stream_data_processor, mq);
387 struct GNUNET_MQ_MessageQueue *
388 GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
389 const struct GNUNET_MQ_Handler *handlers,
392 struct GNUNET_MQ_MessageQueue *mq;
393 struct MessageStreamState *mss;
395 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
396 mss = GNUNET_new (struct MessageStreamState);
397 mss->socket = socket;
398 mq->impl_state = mss;
399 mq->send_impl = stream_socket_send_impl;
400 mq->handlers = handlers;
401 mq->handlers_cls = cls;
402 if (NULL != handlers)
404 mss->mst = GNUNET_SERVER_mst_create (stream_mst_callback, mq);
405 mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
406 stream_data_processor, mq);
412 /*** Transmit a queued message to the session's client.
414 * @param cls consensus session
415 * @param size number of bytes available in buf
416 * @param buf where the callee should write the message
417 * @return number of bytes written to buf
420 transmit_queued (void *cls, size_t size,
423 struct GNUNET_MQ_MessageQueue *mq = cls;
424 struct GNUNET_MQ_Message *mqm = mq->current_msg;
425 struct ServerClientSocketState *state = mq->impl_state;
428 mq->current_msg = NULL;
429 GNUNET_assert (NULL != mqm);
430 GNUNET_assert (NULL != buf);
431 msg_size = ntohs (mqm->mh->size);
432 GNUNET_assert (size >= msg_size);
433 memcpy (buf, mqm->mh, msg_size);
436 if (NULL != mq->msg_head)
438 mq->current_msg = mq->msg_head;
439 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
441 GNUNET_SERVER_notify_transmit_ready (state->client, msg_size,
442 GNUNET_TIME_UNIT_FOREVER_REL,
443 &transmit_queued, mq);
445 else if (NULL != mq->empty_cb)
446 mq->empty_cb (mq->empty_cls);
452 server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
454 struct ServerClientSocketState *state = mq->impl_state;
457 GNUNET_assert (NULL != state);
459 if (NULL != state->th)
461 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
464 GNUNET_assert (NULL == mq->current_msg);
465 msize = ntohs (mq->msg_head->mh->size);
466 mq->current_msg = mqm;
468 GNUNET_SERVER_notify_transmit_ready (state->client, msize,
469 GNUNET_TIME_UNIT_FOREVER_REL,
470 &transmit_queued, mq);
474 struct GNUNET_MQ_MessageQueue *
475 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
477 struct GNUNET_MQ_MessageQueue *mq;
478 struct ServerClientSocketState *scss;
480 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
481 scss = GNUNET_new (struct ServerClientSocketState);
482 mq->impl_state = scss;
483 mq->send_impl = server_client_send_impl;
489 * Transmit a queued message to the session's client.
491 * @param cls consensus session
492 * @param size number of bytes available in buf
493 * @param buf where the callee should write the message
494 * @return number of bytes written to buf
497 connection_client_transmit_queued (void *cls, size_t size,
500 struct GNUNET_MQ_MessageQueue *mq = cls;
501 struct GNUNET_MQ_Message *mqm = mq->current_msg;
502 struct ClientConnectionState *state = mq->impl_state;
505 mq->current_msg = NULL;
506 GNUNET_assert (NULL != mqm);
507 GNUNET_assert (NULL != buf);
508 msg_size = ntohs (mqm->mh->size);
509 GNUNET_assert (size >= msg_size);
510 memcpy (buf, mqm->mh, msg_size);
513 if (NULL != mq->msg_head)
515 mq->current_msg = mq->msg_head;
516 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
518 GNUNET_CLIENT_notify_transmit_ready (state->connection, htons (mq->current_msg->mh->size),
519 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
520 &connection_client_transmit_queued, mq);
522 else if (NULL != mq->empty_cb)
523 mq->empty_cb (mq->empty_cls);
529 connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
530 struct GNUNET_MQ_Message *mqm)
532 struct ClientConnectionState *state = mq->impl_state;
535 GNUNET_assert (NULL != state);
537 if (NULL != state->th)
539 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
542 GNUNET_assert (NULL == mq->current_msg);
543 mq->current_msg = mqm;
544 msize = ntohs (mqm->mh->size);
546 GNUNET_CLIENT_notify_transmit_ready (state->connection, msize,
547 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
548 &connection_client_transmit_queued, mq);
553 * Type of a function to call when we receive a message
557 * @param msg message received, NULL on timeout or fatal error
560 handle_client_message (void *cls,
561 const struct GNUNET_MessageHeader *msg)
563 struct GNUNET_MQ_MessageQueue *mq = cls;
567 if (NULL == mq->read_error_cb)
568 LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
569 mq->read_error_cb (mq->read_error_cls);
572 dispatch_message (mq, msg);
576 struct GNUNET_MQ_MessageQueue *
577 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
578 const struct GNUNET_MQ_Handler *handlers,
581 struct GNUNET_MQ_MessageQueue *mq;
582 struct ClientConnectionState *state;
584 GNUNET_assert (NULL != connection);
586 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
587 mq->handlers = handlers;
588 mq->handlers_cls = cls;
589 state = GNUNET_new (struct ClientConnectionState);
590 state->connection = connection;
591 mq->impl_state = state;
592 mq->send_impl = connection_client_send_impl;
594 if (NULL != handlers)
596 GNUNET_CLIENT_receive (connection, handle_client_message, mq,
597 GNUNET_TIME_UNIT_FOREVER_REL);
605 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
606 const struct GNUNET_MQ_Handler *new_handlers,
609 mq->handlers = new_handlers;
610 mq->handlers_cls = cls;
615 * Associate the assoc_data in mq with a unique request id.
617 * @param mq message queue, id will be unique for the queue
618 * @param mqm message to associate
619 * @param assoc_data to associate
622 GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
623 struct GNUNET_MQ_Message *mqm,
628 if (NULL == mq->assoc_map)
629 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
631 GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
632 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
639 GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
641 if (NULL == mq->assoc_map)
643 return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
648 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
652 if (NULL == mq->assoc_map)
654 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
655 GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
661 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
662 GNUNET_MQ_NotifyCallback cb,
671 GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
673 /* FIXME: destroy all pending messages in the queue */
679 * Call a callback once all messages queued have been sent,
680 * i.e. the message queue is empty.
682 * @param mqm the message queue to send the notification for
683 * @param cb the callback to call on an empty queue
684 * @param cls closure for cb
687 GNUNET_MQ_notify_empty (struct GNUNET_MQ_MessageQueue *mqm,
688 GNUNET_MQ_NotifyCallback cb,
692 mqm->empty_cls = cls;