2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @author Florian Dold
24 * @brief general purpose request queue
28 #include "gnunet_common.h"
29 #include "gnunet_util_lib.h"
31 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
35 struct ServerClientSocketState
37 struct GNUNET_SERVER_Client *client;
38 struct GNUNET_SERVER_TransmitHandle* th;
42 struct ClientConnectionState
45 * Did we call receive?
48 struct GNUNET_CLIENT_Connection *connection;
49 struct GNUNET_CLIENT_TransmitHandle *th;
56 * Call the right callback for a message.
58 * @param mq message queue with the handlers
59 * @param mh message to dispatch
62 GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
64 const struct GNUNET_MQ_Handler *handler;
65 int handled = GNUNET_NO;
67 handler = mq->handlers;
70 for (; NULL != handler->cb; handler++)
72 if (handler->type == ntohs (mh->type))
74 handler->cb (mq->handlers_cls, mh);
79 if (GNUNET_NO == handled)
80 LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
85 GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
87 GNUNET_assert (NULL == mqm->parent_queue);
93 * Send a message with the give message queue.
94 * May only be called once per message.
96 * @param mq message queue
97 * @param mqm the message to send.
100 GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
102 GNUNET_assert (NULL != mq);
103 mq->send_impl (mq, mqm);
107 struct GNUNET_MQ_Message *
108 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
110 struct GNUNET_MQ_Message *mqm;
112 mqm = GNUNET_malloc (sizeof *mqm + size);
113 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
114 mqm->mh->size = htons (size);
115 mqm->mh->type = htons (type);
122 struct GNUNET_MQ_Message *
123 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
124 const struct GNUNET_MessageHeader *nested_mh)
126 struct GNUNET_MQ_Message *mqm;
129 if (NULL == nested_mh)
130 return GNUNET_MQ_msg_ (mhp, base_size, type);
132 size = base_size + ntohs (nested_mh->size);
134 /* check for uint16_t overflow */
135 if (size < base_size)
138 mqm = GNUNET_MQ_msg_ (mhp, size, type);
139 memcpy ((char *) mqm->mh + base_size, nested_mh, ntohs (nested_mh->size));
146 * Transmit a queued message to the session's client.
148 * @param cls consensus session
149 * @param size number of bytes available in buf
150 * @param buf where the callee should write the message
151 * @return number of bytes written to buf
154 transmit_queued (void *cls, size_t size,
157 struct GNUNET_MQ_MessageQueue *mq = cls;
158 struct GNUNET_MQ_Message *mqm = mq->current_msg;
159 struct ServerClientSocketState *state = mq->impl_state;
162 GNUNET_assert (NULL != buf);
164 if (NULL != mqm->sent_cb)
166 mqm->sent_cb (mqm->sent_cls);
169 mq->current_msg = NULL;
170 GNUNET_assert (NULL != mqm);
171 msg_size = ntohs (mqm->mh->size);
172 GNUNET_assert (size >= msg_size);
173 memcpy (buf, mqm->mh, msg_size);
177 if (NULL != mq->msg_head)
179 mq->current_msg = mq->msg_head;
180 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
182 GNUNET_SERVER_notify_transmit_ready (state->client, msg_size,
183 GNUNET_TIME_UNIT_FOREVER_REL,
184 &transmit_queued, mq);
192 server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
194 struct ServerClientSocketState *state;
196 GNUNET_assert (NULL != mq);
197 state = mq->impl_state;
198 GNUNET_assert (NULL != state);
199 GNUNET_SERVER_client_drop (state->client);
204 server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
206 struct ServerClientSocketState *state;
209 GNUNET_assert (NULL != mq);
210 state = mq->impl_state;
211 GNUNET_assert (NULL != state);
213 if (NULL != state->th)
215 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
218 GNUNET_assert (NULL == mq->msg_head);
219 GNUNET_assert (NULL == mq->current_msg);
220 msize = ntohs (mqm->mh->size);
221 mq->current_msg = mqm;
223 GNUNET_SERVER_notify_transmit_ready (state->client, msize,
224 GNUNET_TIME_UNIT_FOREVER_REL,
225 &transmit_queued, mq);
229 struct GNUNET_MQ_MessageQueue *
230 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
232 struct GNUNET_MQ_MessageQueue *mq;
233 struct ServerClientSocketState *scss;
235 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
236 scss = GNUNET_new (struct ServerClientSocketState);
237 mq->impl_state = scss;
238 scss->client = client;
239 GNUNET_SERVER_client_keep (client);
240 mq->send_impl = server_client_send_impl;
241 mq->destroy_impl = server_client_destroy_impl;
247 * Type of a function to call when we receive a message
251 * @param msg message received, NULL on timeout or fatal error
254 handle_client_message (void *cls,
255 const struct GNUNET_MessageHeader *msg)
257 struct GNUNET_MQ_MessageQueue *mq = cls;
258 struct ClientConnectionState *state;
260 state = mq->impl_state;
264 if (NULL == mq->error_handler)
265 LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
267 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
271 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
272 GNUNET_TIME_UNIT_FOREVER_REL);
274 GNUNET_MQ_dispatch (mq, msg);
279 * Transmit a queued message to the session's client.
281 * @param cls consensus session
282 * @param size number of bytes available in buf
283 * @param buf where the callee should write the message
284 * @return number of bytes written to buf
287 connection_client_transmit_queued (void *cls, size_t size,
290 struct GNUNET_MQ_MessageQueue *mq = cls;
291 struct GNUNET_MQ_Message *mqm = mq->current_msg;
292 struct ClientConnectionState *state = mq->impl_state;
297 if (NULL == mq->error_handler)
299 LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed\n");
302 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
306 if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active))
308 state->receive_active = GNUNET_YES;
309 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
310 GNUNET_TIME_UNIT_FOREVER_REL);
314 GNUNET_assert (NULL != mqm);
316 if (NULL != mqm->sent_cb)
318 mqm->sent_cb (mqm->sent_cls);
321 mq->current_msg = NULL;
322 GNUNET_assert (NULL != buf);
323 msg_size = ntohs (mqm->mh->size);
324 GNUNET_assert (size >= msg_size);
325 memcpy (buf, mqm->mh, msg_size);
328 if (NULL != mq->msg_head)
330 mq->current_msg = mq->msg_head;
331 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
333 GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size),
334 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
335 &connection_client_transmit_queued, mq);
343 connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
345 GNUNET_free (mq->impl_state);
349 connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
350 struct GNUNET_MQ_Message *mqm)
352 struct ClientConnectionState *state = mq->impl_state;
355 GNUNET_assert (NULL != state);
357 if (NULL != state->th)
359 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
362 GNUNET_assert (NULL == mq->current_msg);
363 mq->current_msg = mqm;
364 msize = ntohs (mqm->mh->size);
366 GNUNET_CLIENT_notify_transmit_ready (state->connection, msize,
367 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
368 &connection_client_transmit_queued, mq);
375 struct GNUNET_MQ_MessageQueue *
376 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
377 const struct GNUNET_MQ_Handler *handlers,
380 struct GNUNET_MQ_MessageQueue *mq;
381 struct ClientConnectionState *state;
383 GNUNET_assert (NULL != connection);
385 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
386 mq->handlers = handlers;
387 mq->handlers_cls = cls;
388 state = GNUNET_new (struct ClientConnectionState);
389 state->connection = connection;
390 mq->impl_state = state;
391 mq->send_impl = connection_client_send_impl;
392 mq->destroy_impl = connection_client_destroy_impl;
399 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
400 const struct GNUNET_MQ_Handler *new_handlers,
403 mq->handlers = new_handlers;
404 mq->handlers_cls = cls;
409 * Associate the assoc_data in mq with a unique request id.
411 * @param mq message queue, id will be unique for the queue
412 * @param mqm message to associate
413 * @param assoc_data to associate
416 GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
417 struct GNUNET_MQ_Message *mqm,
422 if (NULL == mq->assoc_map)
424 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
428 GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
429 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
436 GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
438 if (NULL == mq->assoc_map)
440 return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
445 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
449 if (NULL == mq->assoc_map)
451 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
452 GNUNET_assert (NULL != val);
453 GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
459 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
460 GNUNET_MQ_NotifyCallback cb,
469 GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
471 /* FIXME: destroy all pending messages in the queue */
473 if (NULL != mq->destroy_impl)
475 mq->destroy_impl (mq);
484 struct GNUNET_MessageHeader *
485 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
488 uint16_t nested_size;
489 struct GNUNET_MessageHeader *nested_msg;
491 whole_size = ntohs (mh->size);
492 GNUNET_assert (whole_size >= base_size);
494 nested_size = whole_size - base_size;
496 if (0 == nested_size)
499 if (nested_size < sizeof (struct GNUNET_MessageHeader))
505 nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size);
507 if (ntohs (nested_msg->size) != nested_size)
510 nested_msg->size = htons (nested_size);