2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @author Florian Dold
24 * @brief general purpose request queue
28 #include "gnunet_common.h"
29 #include "gnunet_util_lib.h"
31 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
35 struct ServerClientSocketState
37 struct GNUNET_SERVER_Client *client;
38 struct GNUNET_SERVER_TransmitHandle* th;
42 struct ClientConnectionState
45 * Did we call receive?
48 struct GNUNET_CLIENT_Connection *connection;
49 struct GNUNET_CLIENT_TransmitHandle *th;
56 * Call the right callback for a message.
58 * @param mq message queue with the handlers
59 * @param mh message to dispatch
62 GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
64 const struct GNUNET_MQ_Handler *handler;
65 int handled = GNUNET_NO;
67 handler = mq->handlers;
70 for (; NULL != handler->cb; handler++)
72 if (handler->type == ntohs (mh->type))
74 handler->cb (mq->handlers_cls, mh);
79 if (GNUNET_NO == handled)
80 LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
85 GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
87 GNUNET_assert (NULL == mqm->parent_queue);
93 * Send a message with the give message queue.
94 * May only be called once per message.
96 * @param mq message queue
97 * @param mqm the message to send.
100 GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
102 GNUNET_assert (NULL != mq);
103 mq->send_impl (mq, mqm);
107 struct GNUNET_MQ_Message *
108 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
110 struct GNUNET_MQ_Message *mqm;
112 mqm = GNUNET_malloc (sizeof *mqm + size);
113 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
114 mqm->mh->size = htons (size);
115 mqm->mh->type = htons (type);
123 GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
124 const void *data, uint16_t len)
129 GNUNET_assert (NULL != mqmp);
130 /* there's no data to append => do nothing */
133 old_size = ntohs ((*mqmp)->mh->size);
134 /* message too large to concatenate? */
135 if (((uint16_t) (old_size + len)) < len)
136 return GNUNET_SYSERR;
137 new_size = old_size + len;
138 *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
139 (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1];
140 memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size);
141 (*mqmp)->mh->size = htons (new_size);
148 /*** Transmit a queued message to the session's client.
150 * @param cls consensus session
151 * @param size number of bytes available in buf
152 * @param buf where the callee should write the message
153 * @return number of bytes written to buf
156 transmit_queued (void *cls, size_t size,
159 struct GNUNET_MQ_MessageQueue *mq = cls;
160 struct GNUNET_MQ_Message *mqm = mq->current_msg;
161 struct ServerClientSocketState *state = mq->impl_state;
164 GNUNET_assert (NULL != buf);
166 if (NULL != mqm->sent_cb)
168 mqm->sent_cb (mqm->sent_cls);
171 mq->current_msg = NULL;
172 GNUNET_assert (NULL != mqm);
173 msg_size = ntohs (mqm->mh->size);
174 GNUNET_assert (size >= msg_size);
175 memcpy (buf, mqm->mh, msg_size);
179 if (NULL != mq->msg_head)
181 mq->current_msg = mq->msg_head;
182 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
184 GNUNET_SERVER_notify_transmit_ready (state->client, msg_size,
185 GNUNET_TIME_UNIT_FOREVER_REL,
186 &transmit_queued, mq);
194 server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
196 struct ServerClientSocketState *state;
198 GNUNET_assert (NULL != mq);
199 state = mq->impl_state;
200 GNUNET_assert (NULL != state);
201 GNUNET_SERVER_client_drop (state->client);
206 server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
208 struct ServerClientSocketState *state;
211 GNUNET_assert (NULL != mq);
212 state = mq->impl_state;
213 GNUNET_assert (NULL != state);
215 if (NULL != state->th)
217 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
220 GNUNET_assert (NULL == mq->msg_head);
221 GNUNET_assert (NULL == mq->current_msg);
222 msize = ntohs (mqm->mh->size);
223 mq->current_msg = mqm;
225 GNUNET_SERVER_notify_transmit_ready (state->client, msize,
226 GNUNET_TIME_UNIT_FOREVER_REL,
227 &transmit_queued, mq);
231 struct GNUNET_MQ_MessageQueue *
232 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
234 struct GNUNET_MQ_MessageQueue *mq;
235 struct ServerClientSocketState *scss;
237 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
238 scss = GNUNET_new (struct ServerClientSocketState);
239 mq->impl_state = scss;
240 scss->client = client;
241 GNUNET_SERVER_client_keep (client);
242 mq->send_impl = server_client_send_impl;
243 mq->destroy_impl = server_client_destroy_impl;
249 * Type of a function to call when we receive a message
253 * @param msg message received, NULL on timeout or fatal error
256 handle_client_message (void *cls,
257 const struct GNUNET_MessageHeader *msg)
259 struct GNUNET_MQ_MessageQueue *mq = cls;
260 struct ClientConnectionState *state;
262 state = mq->impl_state;
266 if (NULL == mq->error_handler)
267 LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
268 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
272 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
273 GNUNET_TIME_UNIT_FOREVER_REL);
275 GNUNET_MQ_dispatch (mq, msg);
280 * Transmit a queued message to the session's client.
282 * @param cls consensus session
283 * @param size number of bytes available in buf
284 * @param buf where the callee should write the message
285 * @return number of bytes written to buf
288 connection_client_transmit_queued (void *cls, size_t size,
291 struct GNUNET_MQ_MessageQueue *mq = cls;
292 struct GNUNET_MQ_Message *mqm = mq->current_msg;
293 struct ClientConnectionState *state = mq->impl_state;
298 if (NULL == mq->error_handler)
300 LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed\n");
303 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
307 if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active))
309 state->receive_active = GNUNET_YES;
310 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
311 GNUNET_TIME_UNIT_FOREVER_REL);
315 GNUNET_assert (NULL != mqm);
317 if (NULL != mqm->sent_cb)
319 mqm->sent_cb (mqm->sent_cls);
322 mq->current_msg = NULL;
323 GNUNET_assert (NULL != buf);
324 msg_size = ntohs (mqm->mh->size);
325 GNUNET_assert (size >= msg_size);
326 memcpy (buf, mqm->mh, msg_size);
329 if (NULL != mq->msg_head)
331 mq->current_msg = mq->msg_head;
332 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
334 GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size),
335 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
336 &connection_client_transmit_queued, mq);
344 connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
346 GNUNET_free (mq->impl_state);
350 connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
351 struct GNUNET_MQ_Message *mqm)
353 struct ClientConnectionState *state = mq->impl_state;
356 GNUNET_assert (NULL != state);
358 if (NULL != state->th)
360 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
363 GNUNET_assert (NULL == mq->current_msg);
364 mq->current_msg = mqm;
365 msize = ntohs (mqm->mh->size);
367 GNUNET_CLIENT_notify_transmit_ready (state->connection, msize,
368 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
369 &connection_client_transmit_queued, mq);
376 struct GNUNET_MQ_MessageQueue *
377 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
378 const struct GNUNET_MQ_Handler *handlers,
381 struct GNUNET_MQ_MessageQueue *mq;
382 struct ClientConnectionState *state;
384 GNUNET_assert (NULL != connection);
386 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
387 mq->handlers = handlers;
388 mq->handlers_cls = cls;
389 state = GNUNET_new (struct ClientConnectionState);
390 state->connection = connection;
391 mq->impl_state = state;
392 mq->send_impl = connection_client_send_impl;
393 mq->destroy_impl = connection_client_destroy_impl;
400 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
401 const struct GNUNET_MQ_Handler *new_handlers,
404 mq->handlers = new_handlers;
405 mq->handlers_cls = cls;
410 * Associate the assoc_data in mq with a unique request id.
412 * @param mq message queue, id will be unique for the queue
413 * @param mqm message to associate
414 * @param assoc_data to associate
417 GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
418 struct GNUNET_MQ_Message *mqm,
423 if (NULL == mq->assoc_map)
425 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
429 GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
430 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
437 GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
439 if (NULL == mq->assoc_map)
441 return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
446 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
450 if (NULL == mq->assoc_map)
452 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
453 GNUNET_assert (NULL != val);
454 GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
460 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
461 GNUNET_MQ_NotifyCallback cb,
470 GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
472 /* FIXME: destroy all pending messages in the queue */
474 if (NULL != mq->destroy_impl)
476 mq->destroy_impl (mq);