2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @author Florian Dold
24 * @brief general purpose request queue
30 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
33 * Signature of functions implementing the
34 * sending part of a message queue
36 * @param q the message queue
37 * @param m the message
39 typedef void (*SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m);
42 typedef void (*DestroyImpl) (struct GNUNET_MQ_MessageQueue *q);
46 * Collection of the state necessary to read and write gnunet messages
47 * to a stream socket. Should be used as closure for stream_data_processor.
49 struct MessageStreamState
51 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
52 struct MessageQueue *mq;
53 struct GNUNET_STREAM_Socket *socket;
54 struct GNUNET_STREAM_ReadHandle *rh;
55 struct GNUNET_STREAM_WriteHandle *wh;
59 struct ServerClientSocketState
61 struct GNUNET_SERVER_Client *client;
62 struct GNUNET_SERVER_TransmitHandle* th;
66 struct ClientConnectionState
68 struct GNUNET_CLIENT_Connection *connection;
69 struct GNUNET_CLIENT_TransmitHandle *th;
73 struct GNUNET_MQ_MessageQueue
76 * Handlers array, or NULL if the queue should not receive messages
78 const struct GNUNET_MQ_Handler *handlers;
81 * Closure for the handler callbacks
86 * Actual implementation of message sending,
87 * called when a message is added
92 * Implementation-dependent queue destruction function
94 DestroyImpl destroy_impl;
97 * Implementation-specific state
102 * Callback will be called when the message queue is empty
104 GNUNET_MQ_NotifyCallback empty_cb;
107 * Closure for empty_cb
112 * Callback will be called when a read error occurs.
114 GNUNET_MQ_NotifyCallback read_error_cb;
117 * Closure for read_error_cb
119 void *read_error_cls;
122 * Linked list of messages pending to be sent
124 struct GNUNET_MQ_Message *msg_head;
127 * Linked list of messages pending to be sent
129 struct GNUNET_MQ_Message *msg_tail;
132 * Message that is currently scheduled to be
133 * sent. Not the head of the message queue, as the implementation
134 * needs to know if sending has been already scheduled or not.
136 struct GNUNET_MQ_Message *current_msg;
139 * Map of associations, lazily allocated
141 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
144 * Next id that should be used for the assoc_map,
145 * initialized lazily to a random value together with
152 struct GNUNET_MQ_Message
155 * Messages are stored in a linked list
157 struct GNUNET_MQ_Message *next;
160 * Messages are stored in a linked list
162 struct GNUNET_MQ_Message *prev;
165 * Actual allocated message header,
166 * usually points to the end of the containing GNUNET_MQ_Message
168 struct GNUNET_MessageHeader *mh;
171 * Queue the message is queued in, NULL if message is not queued.
173 struct GNUNET_MQ_MessageQueue *parent_queue;
176 * Called after the message was sent irrevokably
178 GNUNET_MQ_NotifyCallback sent_cb;
181 * Closure for send_cb
188 * Call the right callback for a message received
192 dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
194 const struct GNUNET_MQ_Handler *handler;
195 int handled = GNUNET_NO;
197 handler = mq->handlers;
200 for (; NULL != handler->cb; handler++)
202 if (handler->type == ntohs (mh->type))
204 handler->cb (mq->handlers_cls, mh);
205 handled = GNUNET_YES;
209 if (GNUNET_NO == handled)
210 LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
215 GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
217 GNUNET_assert (NULL == mqm->parent_queue);
223 * Send a message with the give message queue.
224 * May only be called once per message.
226 * @param mq message queue
227 * @param mqm the message to send.
230 GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
232 GNUNET_assert (NULL != mq);
233 mq->send_impl (mq, mqm);
237 struct GNUNET_MQ_Message *
238 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
240 struct GNUNET_MQ_Message *mqm;
242 mqm = GNUNET_malloc (sizeof *mqm + size);
243 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
244 mqm->mh->size = htons (size);
245 mqm->mh->type = htons (type);
253 GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
254 const void *data, uint16_t len)
259 GNUNET_assert (NULL != mqmp);
260 /* there's no data to append => do nothing */
263 old_size = ntohs ((*mqmp)->mh->size);
264 /* message too large to concatenate? */
265 if (((uint16_t) (old_size + len)) < len)
266 return GNUNET_SYSERR;
267 new_size = old_size + len;
268 *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
269 (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1];
270 memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size);
271 (*mqmp)->mh->size = htons (new_size);
277 * Functions of this signature are called whenever writing operations
278 * on a stream are executed
280 * @param cls the closure from GNUNET_STREAM_write
281 * @param status the status of the stream at the time this function is called;
282 * GNUNET_STREAM_OK if writing to stream was completed successfully;
283 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
284 * (this doesn't mean that the data is never sent, the receiver may
285 * have read the data but its ACKs may have been lost);
286 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
287 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
289 * @param size the number of bytes written
292 stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
294 struct GNUNET_MQ_MessageQueue *mq = cls;
295 struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
296 struct GNUNET_MQ_Message *mqm;
298 GNUNET_assert (GNUNET_STREAM_OK == status);
300 /* call cb for message we finished sending */
301 mqm = mq->current_msg;
302 GNUNET_assert (NULL != mq->current_msg);
303 if (NULL != mqm->sent_cb)
304 mqm->sent_cb (mqm->sent_cls);
310 mq->current_msg = mqm;
313 if (NULL != mq->empty_cb)
314 mq->empty_cb (mq->empty_cls);
317 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
318 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
319 GNUNET_TIME_UNIT_FOREVER_REL,
320 stream_write_queued, mq);
321 GNUNET_assert (NULL != mss->wh);
326 stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq,
327 struct GNUNET_MQ_Message *mqm)
329 struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
330 if (NULL != mq->current_msg)
332 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
335 mq->current_msg = mqm;
336 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
337 GNUNET_TIME_UNIT_FOREVER_REL,
338 stream_write_queued, mq);
343 * Functions with this signature are called whenever a
344 * complete message is received by the tokenizer.
346 * Do not call GNUNET_SERVER_mst_destroy in callback
349 * @param client identification of the client
350 * @param message the actual message
352 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
355 stream_mst_callback (void *cls, void *client,
356 const struct GNUNET_MessageHeader *message)
358 struct GNUNET_MQ_MessageQueue *mq = cls;
360 GNUNET_assert (NULL != message);
361 dispatch_message (mq, message);
367 * Functions of this signature are called whenever data is available from the
370 * @param cls the closure from GNUNET_STREAM_read
371 * @param status the status of the stream at the time this function is called
372 * @param data traffic from the other side
373 * @param size the number of bytes available in data read; will be 0 on timeout
374 * @return number of bytes of processed from 'data' (any data remaining should be
375 * given to the next time the read processor is called).
378 stream_data_processor (void *cls,
379 enum GNUNET_STREAM_Status status,
383 struct GNUNET_MQ_MessageQueue *mq = cls;
384 struct MessageStreamState *mss;
387 mss = (struct MessageStreamState *) mq->impl_state;
388 GNUNET_assert (GNUNET_STREAM_OK == status);
389 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
390 GNUNET_assert (GNUNET_OK == ret);
391 /* we always read all data */
392 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
393 stream_data_processor, mq);
399 stream_socket_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
401 struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
405 GNUNET_STREAM_read_cancel (mss->rh);
411 GNUNET_STREAM_write_cancel (mss->wh);
415 if (NULL != mss->mst)
417 GNUNET_SERVER_mst_destroy (mss->mst);
427 struct GNUNET_MQ_MessageQueue *
428 GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
429 const struct GNUNET_MQ_Handler *handlers,
432 struct GNUNET_MQ_MessageQueue *mq;
433 struct MessageStreamState *mss;
435 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
436 mss = GNUNET_new (struct MessageStreamState);
437 mss->socket = socket;
438 mq->impl_state = mss;
439 mq->send_impl = stream_socket_send_impl;
440 mq->destroy_impl = &stream_socket_destroy_impl;
441 mq->handlers = handlers;
442 mq->handlers_cls = cls;
443 if (NULL != handlers)
445 mss->mst = GNUNET_SERVER_mst_create (stream_mst_callback, mq);
446 mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
447 stream_data_processor, mq);
453 /*** Transmit a queued message to the session's client.
455 * @param cls consensus session
456 * @param size number of bytes available in buf
457 * @param buf where the callee should write the message
458 * @return number of bytes written to buf
461 transmit_queued (void *cls, size_t size,
464 struct GNUNET_MQ_MessageQueue *mq = cls;
465 struct GNUNET_MQ_Message *mqm = mq->current_msg;
466 struct ServerClientSocketState *state = mq->impl_state;
469 GNUNET_assert (NULL != buf);
471 if (NULL != mqm->sent_cb)
473 mqm->sent_cb (mqm->sent_cls);
476 mq->current_msg = NULL;
477 GNUNET_assert (NULL != mqm);
478 msg_size = ntohs (mqm->mh->size);
479 GNUNET_assert (size >= msg_size);
480 memcpy (buf, mqm->mh, msg_size);
484 if (NULL != mq->msg_head)
486 mq->current_msg = mq->msg_head;
487 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
489 GNUNET_SERVER_notify_transmit_ready (state->client, msg_size,
490 GNUNET_TIME_UNIT_FOREVER_REL,
491 &transmit_queued, mq);
493 else if (NULL != mq->empty_cb)
494 mq->empty_cb (mq->empty_cls);
501 server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
503 struct ServerClientSocketState *state;
505 GNUNET_assert (NULL != mq);
506 state = mq->impl_state;
507 GNUNET_assert (NULL != state);
508 GNUNET_SERVER_client_drop (state->client);
513 server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
515 struct ServerClientSocketState *state;
518 GNUNET_assert (NULL != mq);
519 state = mq->impl_state;
520 GNUNET_assert (NULL != state);
522 if (NULL != state->th)
524 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
527 GNUNET_assert (NULL == mq->msg_head);
528 GNUNET_assert (NULL == mq->current_msg);
529 msize = ntohs (mqm->mh->size);
530 mq->current_msg = mqm;
532 GNUNET_SERVER_notify_transmit_ready (state->client, msize,
533 GNUNET_TIME_UNIT_FOREVER_REL,
534 &transmit_queued, mq);
538 struct GNUNET_MQ_MessageQueue *
539 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
541 struct GNUNET_MQ_MessageQueue *mq;
542 struct ServerClientSocketState *scss;
544 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
545 scss = GNUNET_new (struct ServerClientSocketState);
546 mq->impl_state = scss;
547 scss->client = client;
548 GNUNET_SERVER_client_keep (client);
549 mq->send_impl = server_client_send_impl;
550 mq->destroy_impl = server_client_destroy_impl;
556 * Transmit a queued message to the session's client.
558 * @param cls consensus session
559 * @param size number of bytes available in buf
560 * @param buf where the callee should write the message
561 * @return number of bytes written to buf
564 connection_client_transmit_queued (void *cls, size_t size,
567 struct GNUNET_MQ_MessageQueue *mq = cls;
568 struct GNUNET_MQ_Message *mqm = mq->current_msg;
569 struct ClientConnectionState *state = mq->impl_state;
573 GNUNET_assert (NULL != mqm);
575 if (NULL != mqm->sent_cb)
577 mqm->sent_cb (mqm->sent_cls);
580 mq->current_msg = NULL;
581 GNUNET_assert (NULL != buf);
582 msg_size = ntohs (mqm->mh->size);
583 GNUNET_assert (size >= msg_size);
584 memcpy (buf, mqm->mh, msg_size);
587 if (NULL != mq->msg_head)
589 mq->current_msg = mq->msg_head;
590 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
592 GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size),
593 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
594 &connection_client_transmit_queued, mq);
596 else if (NULL != mq->empty_cb)
597 mq->empty_cb (mq->empty_cls);
604 connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
606 GNUNET_free (mq->impl_state);
610 connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
611 struct GNUNET_MQ_Message *mqm)
613 struct ClientConnectionState *state = mq->impl_state;
616 GNUNET_assert (NULL != state);
618 if (NULL != state->th)
620 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
623 GNUNET_assert (NULL == mq->current_msg);
624 mq->current_msg = mqm;
625 msize = ntohs (mqm->mh->size);
627 GNUNET_CLIENT_notify_transmit_ready (state->connection, msize,
628 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
629 &connection_client_transmit_queued, mq);
635 * Type of a function to call when we receive a message
639 * @param msg message received, NULL on timeout or fatal error
642 handle_client_message (void *cls,
643 const struct GNUNET_MessageHeader *msg)
645 struct GNUNET_MQ_MessageQueue *mq = cls;
646 struct ClientConnectionState *state;
648 state = mq->impl_state;
652 if (NULL == mq->read_error_cb)
653 LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
654 mq->read_error_cb (mq->read_error_cls);
658 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
659 GNUNET_TIME_UNIT_FOREVER_REL);
661 dispatch_message (mq, msg);
665 struct GNUNET_MQ_MessageQueue *
666 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
667 const struct GNUNET_MQ_Handler *handlers,
670 struct GNUNET_MQ_MessageQueue *mq;
671 struct ClientConnectionState *state;
673 GNUNET_assert (NULL != connection);
675 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
676 mq->handlers = handlers;
677 mq->handlers_cls = cls;
678 state = GNUNET_new (struct ClientConnectionState);
679 state->connection = connection;
680 mq->impl_state = state;
681 mq->send_impl = connection_client_send_impl;
682 mq->destroy_impl = connection_client_destroy_impl;
684 if (NULL != handlers)
686 GNUNET_CLIENT_receive (connection, handle_client_message, mq,
687 GNUNET_TIME_UNIT_FOREVER_REL);
695 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
696 const struct GNUNET_MQ_Handler *new_handlers,
699 mq->handlers = new_handlers;
700 mq->handlers_cls = cls;
705 * Associate the assoc_data in mq with a unique request id.
707 * @param mq message queue, id will be unique for the queue
708 * @param mqm message to associate
709 * @param assoc_data to associate
712 GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
713 struct GNUNET_MQ_Message *mqm,
718 if (NULL == mq->assoc_map)
720 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
724 GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
725 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
732 GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
734 if (NULL == mq->assoc_map)
736 return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
741 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
745 if (NULL == mq->assoc_map)
747 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
748 GNUNET_assert (NULL != val);
749 GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
755 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
756 GNUNET_MQ_NotifyCallback cb,
765 GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
767 /* FIXME: destroy all pending messages in the queue */
769 if (NULL != mq->destroy_impl)
771 mq->destroy_impl (mq);
779 * Call a callback once all messages queued have been sent,
780 * i.e. the message queue is empty.
782 * @param mqm the message queue to send the notification for
783 * @param cb the callback to call on an empty queue
784 * @param cls closure for cb
787 GNUNET_MQ_notify_empty (struct GNUNET_MQ_MessageQueue *mqm,
788 GNUNET_MQ_NotifyCallback cb,
792 mqm->empty_cls = cls;