dc87b97114e466bb268426a38d92fb0e4b6b8560
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26
27 #include "platform.h"
28 #include "gnunet_common.h"
29 #include "gnunet_util_lib.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
32
33
34
35 struct ServerClientSocketState
36 {
37   struct GNUNET_SERVER_Client *client;
38   struct GNUNET_SERVER_TransmitHandle* th;
39 };
40
41
42 struct ClientConnectionState
43 {
44   /**
45    * Did we call receive?
46    */
47   int receive_active;
48   struct GNUNET_CLIENT_Connection *connection;
49   struct GNUNET_CLIENT_TransmitHandle *th;
50 };
51
52
53
54
55 /**
56  * Call the right callback for a message.
57  *
58  * @param mq message queue with the handlers
59  * @param mh message to dispatch
60  */
61 void
62 GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
63 {
64   const struct GNUNET_MQ_Handler *handler;
65   int handled = GNUNET_NO;
66
67   handler = mq->handlers;
68   if (NULL == handler)
69     return;
70   for (; NULL != handler->cb; handler++)
71   {
72     if (handler->type == ntohs (mh->type))
73     {
74       handler->cb (mq->handlers_cls, mh);
75       handled = GNUNET_YES;
76     }
77   }
78   
79   if (GNUNET_NO == handled)
80     LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
81 }
82
83
84 void
85 GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
86 {
87   GNUNET_assert (NULL == mqm->parent_queue);
88   GNUNET_free (mqm);
89 }
90
91
92 /**
93  * Send a message with the give message queue.
94  * May only be called once per message.
95  * 
96  * @param mq message queue
97  * @param mqm the message to send.
98  */
99 void
100 GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
101 {
102   GNUNET_assert (NULL != mq);
103   mq->send_impl (mq, mqm);
104 }
105
106
107 struct GNUNET_MQ_Message *
108 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
109 {
110   struct GNUNET_MQ_Message *mqm;
111
112   mqm = GNUNET_malloc (sizeof *mqm + size);
113   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
114   mqm->mh->size = htons (size);
115   mqm->mh->type = htons (type);
116   if (NULL != mhp)
117     *mhp = mqm->mh;
118   return mqm;
119 }
120
121
122 struct GNUNET_MQ_Message *
123 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
124                           const struct GNUNET_MessageHeader *nested_mh)
125 {
126   struct GNUNET_MQ_Message *mqm;
127   uint16_t size;
128
129   if (NULL == nested_mh)
130     return GNUNET_MQ_msg_ (mhp, base_size, type);
131
132   size = base_size + ntohs (nested_mh->size);
133
134   /* check for uint16_t overflow */
135   if (size < base_size)
136     return NULL;
137
138   mqm = GNUNET_MQ_msg_ (mhp, size, type);
139   memcpy ((char *) mqm->mh + base_size, nested_mh, ntohs (nested_mh->size));
140
141   return mqm;
142 }
143
144
145 /**
146  * Transmit a queued message to the session's client.
147  *
148  * @param cls consensus session
149  * @param size number of bytes available in buf
150  * @param buf where the callee should write the message
151  * @return number of bytes written to buf
152  */
153 static size_t
154 transmit_queued (void *cls, size_t size,
155                  void *buf)
156 {
157   struct GNUNET_MQ_MessageQueue *mq = cls;
158   struct GNUNET_MQ_Message *mqm = mq->current_msg;
159   struct ServerClientSocketState *state = mq->impl_state;
160   size_t msg_size;
161
162   GNUNET_assert (NULL != buf);
163
164   if (NULL != mqm->sent_cb)
165   {
166     mqm->sent_cb (mqm->sent_cls);
167   }
168
169   mq->current_msg = NULL;
170   GNUNET_assert (NULL != mqm);
171   msg_size = ntohs (mqm->mh->size);
172   GNUNET_assert (size >= msg_size);
173   memcpy (buf, mqm->mh, msg_size);
174   GNUNET_free (mqm);
175   state->th = NULL;
176
177   if (NULL != mq->msg_head)
178   {
179     mq->current_msg = mq->msg_head;
180     GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
181     state->th = 
182         GNUNET_SERVER_notify_transmit_ready (state->client, msg_size, 
183                                              GNUNET_TIME_UNIT_FOREVER_REL,
184                                              &transmit_queued, mq);
185   }
186   return msg_size;
187 }
188
189
190
191 static void
192 server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
193 {
194   struct ServerClientSocketState *state;
195   
196   GNUNET_assert (NULL != mq);
197   state = mq->impl_state;
198   GNUNET_assert (NULL != state);
199   GNUNET_SERVER_client_drop (state->client);
200   GNUNET_free (state);
201 }
202
203 static void
204 server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
205 {
206   struct ServerClientSocketState *state;
207   int msize;
208
209   GNUNET_assert (NULL != mq);
210   state = mq->impl_state;
211   GNUNET_assert (NULL != state);
212
213   if (NULL != state->th)
214   {
215     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
216     return;
217   }
218   GNUNET_assert (NULL == mq->msg_head);
219   GNUNET_assert (NULL == mq->current_msg);
220   msize = ntohs (mqm->mh->size);
221   mq->current_msg = mqm;
222   state->th = 
223       GNUNET_SERVER_notify_transmit_ready (state->client, msize, 
224                                            GNUNET_TIME_UNIT_FOREVER_REL,
225                                            &transmit_queued, mq);
226 }
227
228
229 struct GNUNET_MQ_MessageQueue *
230 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
231 {
232   struct GNUNET_MQ_MessageQueue *mq;
233   struct ServerClientSocketState *scss;
234
235   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
236   scss = GNUNET_new (struct ServerClientSocketState);
237   mq->impl_state = scss;
238   scss->client = client;
239   GNUNET_SERVER_client_keep (client);
240   mq->send_impl = server_client_send_impl;
241   mq->destroy_impl = server_client_destroy_impl;
242   return mq;
243 }
244
245
246 /**
247  * Type of a function to call when we receive a message
248  * from the service.
249  *
250  * @param cls closure
251  * @param msg message received, NULL on timeout or fatal error
252  */
253 static void
254 handle_client_message (void *cls,
255                        const struct GNUNET_MessageHeader *msg)
256 {
257   struct GNUNET_MQ_MessageQueue *mq = cls;
258   struct ClientConnectionState *state;
259
260   state = mq->impl_state;
261   
262   if (NULL == msg)
263   {
264     if (NULL == mq->error_handler)
265       LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
266     else
267       mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
268     return;
269   }
270
271   GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
272                          GNUNET_TIME_UNIT_FOREVER_REL);
273
274   GNUNET_MQ_dispatch (mq, msg);
275 }
276
277
278 /**
279  * Transmit a queued message to the session's client.
280  *
281  * @param cls consensus session
282  * @param size number of bytes available in buf
283  * @param buf where the callee should write the message
284  * @return number of bytes written to buf
285  */
286 static size_t
287 connection_client_transmit_queued (void *cls, size_t size,
288                  void *buf)
289 {
290   struct GNUNET_MQ_MessageQueue *mq = cls;
291   struct GNUNET_MQ_Message *mqm = mq->current_msg;
292   struct ClientConnectionState *state = mq->impl_state;
293   size_t msg_size;
294
295   if (NULL == buf)
296   {
297     if (NULL == mq->error_handler)
298     {
299       LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed\n");
300       return 0;
301     }
302     mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
303     return 0;
304   }
305
306   if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active))
307   {
308     state->receive_active = GNUNET_YES;
309     GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
310                            GNUNET_TIME_UNIT_FOREVER_REL);
311   }
312
313
314   GNUNET_assert (NULL != mqm);
315
316   if (NULL != mqm->sent_cb)
317   {
318     mqm->sent_cb (mqm->sent_cls);
319   }
320
321   mq->current_msg = NULL;
322   GNUNET_assert (NULL != buf);
323   msg_size = ntohs (mqm->mh->size);
324   GNUNET_assert (size >= msg_size);
325   memcpy (buf, mqm->mh, msg_size);
326   GNUNET_free (mqm);
327   state->th = NULL;
328   if (NULL != mq->msg_head)
329   {
330     mq->current_msg = mq->msg_head;
331     GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
332     state->th = 
333       GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size), 
334                                              GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
335                                              &connection_client_transmit_queued, mq);
336   }
337   return msg_size;
338 }
339
340
341
342 static void
343 connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
344 {
345   GNUNET_free (mq->impl_state);
346 }
347
348 static void
349 connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
350                              struct GNUNET_MQ_Message *mqm)
351 {
352   struct ClientConnectionState *state = mq->impl_state;
353   int msize;
354
355   GNUNET_assert (NULL != state);
356
357   if (NULL != state->th)
358   {
359     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
360     return;
361   }
362   GNUNET_assert (NULL == mq->current_msg);
363   mq->current_msg = mqm;
364   msize = ntohs (mqm->mh->size);
365   state->th = 
366       GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, 
367                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
368                                            &connection_client_transmit_queued, mq);
369 }
370
371
372
373
374
375 struct GNUNET_MQ_MessageQueue *
376 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
377                                        const struct GNUNET_MQ_Handler *handlers,
378                                        void *cls)
379 {
380   struct GNUNET_MQ_MessageQueue *mq;
381   struct ClientConnectionState *state;
382
383   GNUNET_assert (NULL != connection);
384
385   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
386   mq->handlers = handlers;
387   mq->handlers_cls = cls;
388   state = GNUNET_new (struct ClientConnectionState);
389   state->connection = connection;
390   mq->impl_state = state;
391   mq->send_impl = connection_client_send_impl;
392   mq->destroy_impl = connection_client_destroy_impl;
393
394   return mq;
395 }
396
397
398 void
399 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
400                             const struct GNUNET_MQ_Handler *new_handlers,
401                             void *cls)
402 {
403   mq->handlers = new_handlers;
404   mq->handlers_cls = cls;
405 }
406
407
408 /**
409  * Associate the assoc_data in mq with a unique request id.
410  *
411  * @param mq message queue, id will be unique for the queue
412  * @param mqm message to associate
413  * @param assoc_data to associate
414  */
415 uint32_t
416 GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
417                      struct GNUNET_MQ_Message *mqm,
418                      void *assoc_data)
419 {
420   uint32_t id;
421
422   if (NULL == mq->assoc_map)
423   {
424     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
425     mq->assoc_id = 1;
426   }
427   id = mq->assoc_id++;
428   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
429                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
430   return id;
431 }
432
433
434
435 void *
436 GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
437 {
438   if (NULL == mq->assoc_map)
439     return NULL;
440   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
441 }
442
443
444 void *
445 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
446 {
447   void *val;
448
449   if (NULL == mq->assoc_map)
450     return NULL;
451   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
452   GNUNET_assert (NULL != val);
453   GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
454   return val;
455 }
456
457
458 void
459 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
460                        GNUNET_MQ_NotifyCallback cb,
461                        void *cls)
462 {
463   mqm->sent_cb = cb;
464   mqm->sent_cls = cls;
465 }
466
467
468 void
469 GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
470 {
471   /* FIXME: destroy all pending messages in the queue */
472
473   if (NULL != mq->destroy_impl)
474   {
475     mq->destroy_impl (mq);
476   }
477
478   GNUNET_free (mq);
479 }
480
481
482
483
484 struct GNUNET_MessageHeader *
485 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
486 {
487   uint16_t whole_size;
488   uint16_t nested_size;
489   struct GNUNET_MessageHeader *nested_msg;
490
491   whole_size = ntohs (mh->size);
492   GNUNET_assert (whole_size >= base_size);
493
494   nested_size = whole_size - base_size;
495
496   if (0 == nested_size)
497     return NULL;
498
499   if (nested_size < sizeof (struct GNUNET_MessageHeader))
500   {
501     GNUNET_break_op (0);
502     return NULL;
503   }
504
505   nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size);
506
507   if (ntohs (nested_msg->size) != nested_size)
508   {
509     GNUNET_break_op (0);
510     nested_msg->size = htons (nested_size);
511   }
512
513   return nested_msg;
514 }
515
516