-fixes
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header,
48    * usually points to the end of the containing GNUNET_MQ_Envelope
49    */
50   struct GNUNET_MessageHeader *mh;
51
52   /**
53    * Queue the message is queued in, NULL if message is not queued.
54    */
55   struct GNUNET_MQ_Handle *parent_queue;
56
57   /**
58    * Called after the message was sent irrevocably.
59    */
60   GNUNET_MQ_NotifyCallback sent_cb;
61
62   /**
63    * Closure for send_cb
64    */
65   void *sent_cls;
66 };
67
68
69 /**
70  * Handle to a message queue.
71  */
72 struct GNUNET_MQ_Handle
73 {
74   /**
75    * Handlers array, or NULL if the queue should not receive messages
76    */
77   const struct GNUNET_MQ_MessageHandler *handlers;
78
79   /**
80    * Closure for the handler callbacks,
81    * as well as for the error handler.
82    */
83   void *handlers_cls;
84
85   /**
86    * Actual implementation of message sending,
87    * called when a message is added
88    */
89   GNUNET_MQ_SendImpl send_impl;
90
91   /**
92    * Implementation-dependent queue destruction function
93    */
94   GNUNET_MQ_DestroyImpl destroy_impl;
95
96   /**
97    * Implementation-specific state
98    */
99   void *impl_state;
100
101   /**
102    * Callback will be called when an error occurs.
103    */
104   GNUNET_MQ_ErrorHandler error_handler;
105
106   /**
107    * Linked list of messages pending to be sent
108    */
109   struct GNUNET_MQ_Envelope *envelope_head;
110
111   /**
112    * Linked list of messages pending to be sent
113    */
114   struct GNUNET_MQ_Envelope *envelope_tail;
115
116   /**
117    * Message that is currently scheduled to be
118    * sent. Not the head of the message queue, as the implementation
119    * needs to know if sending has been already scheduled or not.
120    */
121   struct GNUNET_MQ_Envelope *current_envelope;
122
123   /**
124    * Has the current envelope been commited?
125    * Either GNUNET_YES or GNUNET_NO.
126    */
127   int commited;
128
129   /**
130    * Map of associations, lazily allocated
131    */
132   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
133
134   /**
135    * Next id that should be used for the assoc_map,
136    * initialized lazily to a random value together with
137    * assoc_map
138    */
139   uint32_t assoc_id;
140 };
141
142
143
144
145 struct ServerClientSocketState
146 {
147   struct GNUNET_SERVER_Client *client;
148   struct GNUNET_SERVER_TransmitHandle* th;
149 };
150
151
152 struct ClientConnectionState
153 {
154   /**
155    * Did we call receive alread alreadyy?
156    */
157   int receive_active;
158
159   /**
160    * Do we also want to receive?
161    */
162   int receive_requested;
163   struct GNUNET_CLIENT_Connection *connection;
164   struct GNUNET_CLIENT_TransmitHandle *th;
165 };
166
167
168 /**
169  * Call the message message handler that was registered
170  * for the type of the given message in the given message queue.
171  *
172  * This function is indended to be used for the implementation
173  * of message queues.
174  *
175  * @param mq message queue with the handlers
176  * @param mh message to dispatch
177  */
178 void
179 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
180 {
181   const struct GNUNET_MQ_MessageHandler *handler;
182   int handled = GNUNET_NO;
183
184   handler = mq->handlers;
185   if (NULL == handler)
186     return;
187   for (; NULL != handler->cb; handler++)
188   {
189     if (handler->type == ntohs (mh->type))
190     {
191       handler->cb (mq->handlers_cls, mh);
192       handled = GNUNET_YES;
193     }
194   }
195
196   if (GNUNET_NO == handled)
197     LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
198 }
199
200
201 /**
202  * Call the error handler of a message queue with the given
203  * error code.  If there is no error handler, log a warning.
204  *
205  * This function is intended to be used by the implementation
206  * of message queues.
207  *
208  * @param mq message queue
209  * @param error the error type
210  */
211 void
212 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
213                         enum GNUNET_MQ_Error error)
214 {
215   if (NULL == mq->error_handler)
216   {
217     /* FIXME: log what kind of error occured */
218     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n");
219     return;
220   }
221   mq->error_handler (mq->handlers_cls, error);
222 }
223
224
225 void
226 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
227 {
228   GNUNET_assert (NULL == mqm->parent_queue);
229   GNUNET_free (mqm);
230 }
231
232
233 /**
234  * Send a message with the give message queue.
235  * May only be called once per message.
236  *
237  * @param mq message queue
238  * @param ev the envelope with the message to send.
239  */
240 void
241 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
242 {
243   GNUNET_assert (NULL != mq);
244   GNUNET_assert (NULL == ev->parent_queue);
245
246   /* is the implementation busy? queue it! */
247   if (NULL != mq->current_envelope)
248   {
249     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev);
250     return;
251   }
252   mq->current_envelope = ev;
253   mq->send_impl (mq, ev->mh, mq->impl_state);
254 }
255
256
257 /**
258  * Call the send implementation for the next queued message,
259  * if any.
260  * Only useful for implementing message queues,
261  * results in undefined behavior if not used carefully.
262  *
263  * @param mq message queue to send the next message with
264  */
265 void
266 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
267 {
268   /* call is only valid if we're actually currently sending
269    * a message */
270   GNUNET_assert (NULL != mq);
271   GNUNET_assert (NULL != mq->current_envelope);
272   GNUNET_assert (GNUNET_YES == mq->commited);
273   mq->commited = GNUNET_NO;
274   GNUNET_free (mq->current_envelope);
275   if (NULL == mq->envelope_head)
276   {
277     mq->current_envelope = NULL;
278     return;
279   }
280
281   GNUNET_assert (NULL != mq->envelope_tail);
282   GNUNET_assert (NULL != mq->envelope_head);
283   mq->current_envelope = mq->envelope_head;
284   GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail,
285                                mq->current_envelope);
286   mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
287 }
288
289
290 /**
291  * Create a message queue for the specified handlers.
292  *
293  * @param send function the implements sending messages
294  * @param destroy function that implements destroying the queue
295  * @param cancel function that implements canceling a message
296  * @param impl_state for the queue, passed to 'send' and 'destroy'
297  * @param handlers array of message handlers
298  * @param error_handler handler for read and write errors
299  * @param cls closure for message handlers and error handler
300  * @return a new message queue
301  */
302 struct GNUNET_MQ_Handle *
303 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
304                                GNUNET_MQ_DestroyImpl destroy,
305                                GNUNET_MQ_CancelImpl cancel,
306                                void *impl_state,
307                                const struct GNUNET_MQ_MessageHandler *handlers,
308                                GNUNET_MQ_ErrorHandler error_handler,
309                                void *cls)
310 {
311   struct GNUNET_MQ_Handle *mq;
312
313   mq = GNUNET_new (struct GNUNET_MQ_Handle);
314   mq->send_impl = send;
315   mq->destroy_impl = destroy;
316   mq->handlers = handlers;
317   mq->handlers_cls = cls;
318   mq->impl_state = impl_state;
319
320   return mq;
321 }
322
323
324 /**
325  * Get the message that should currently be sent.
326  * Fails if there is no current message.
327  * Only useful for implementing message queues,
328  * results in undefined behavior if not used carefully.
329  *
330  * @param mq message queue with the current message
331  * @return message to send, never NULL
332  */
333 const struct GNUNET_MessageHeader *
334 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
335 {
336   if (NULL == mq->current_envelope)
337     GNUNET_abort ();
338   if (NULL == mq->current_envelope->mh)
339     GNUNET_abort ();
340   return mq->current_envelope->mh;
341 }
342
343
344 /**
345  * Get the implementation state associated with the
346  * message queue.
347  *
348  * While the GNUNET_MQ_Impl* callbacks receive the
349  * implementation state, continuations that are scheduled
350  * by the implementation function often only have one closure
351  * argument, with this function it is possible to get at the
352  * implementation state when only passing the GNUNET_MQ_Handle
353  * as closure.
354  *
355  * @param mq message queue with the current message
356  * @return message to send, never NULL
357  */
358 void *
359 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
360 {
361   return mq->impl_state;
362 }
363
364
365
366 /**
367  * Mark the current message as irrevocably sent, but do not
368  * proceed with sending the next message.
369  * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
370  *
371  * @param mq message queue
372  */
373 void
374 GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq)
375 {
376   GNUNET_assert (NULL != mq->current_envelope);
377   GNUNET_assert (GNUNET_NO == mq->commited);
378   mq->commited = GNUNET_YES;
379   if (NULL != mq->current_envelope->sent_cb)
380     mq->current_envelope->sent_cb (mq->current_envelope->sent_cls);
381 }
382
383
384 struct GNUNET_MQ_Envelope *
385 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
386 {
387   struct GNUNET_MQ_Envelope *mqm;
388
389   mqm = GNUNET_malloc (sizeof *mqm + size);
390   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
391   mqm->mh->size = htons (size);
392   mqm->mh->type = htons (type);
393   if (NULL != mhp)
394     *mhp = mqm->mh;
395   return mqm;
396 }
397
398
399 /**
400  * Implementation of the GNUNET_MQ_msg_nested_mh macro.
401  *
402  * @param mhp pointer to the message header pointer that will be changed to allocate at
403  *        the newly allocated space for the message.
404  * @param base_size size of the data before the nested message
405  * @param type type of the message in the envelope
406  * @param nested_mh the message to append to the message after base_size
407  */
408 struct GNUNET_MQ_Envelope *
409 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
410                           const struct GNUNET_MessageHeader *nested_mh)
411 {
412   struct GNUNET_MQ_Envelope *mqm;
413   uint16_t size;
414
415   if (NULL == nested_mh)
416     return GNUNET_MQ_msg_ (mhp, base_size, type);
417
418   size = base_size + ntohs (nested_mh->size);
419
420   /* check for uint16_t overflow */
421   if (size < base_size)
422     return NULL;
423
424   mqm = GNUNET_MQ_msg_ (mhp, size, type);
425   memcpy ((char *) mqm->mh + base_size, nested_mh, ntohs (nested_mh->size));
426
427   return mqm;
428 }
429
430
431 /**
432  * Transmit a queued message to the session's client.
433  *
434  * @param cls consensus session
435  * @param size number of bytes available in buf
436  * @param buf where the callee should write the message
437  * @return number of bytes written to buf
438  */
439 static size_t
440 transmit_queued (void *cls, size_t size,
441                  void *buf)
442 {
443   struct GNUNET_MQ_Handle *mq = cls;
444   struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
445   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
446   size_t msg_size;
447
448   GNUNET_assert (NULL != buf);
449
450   msg_size = ntohs (msg->size);
451   GNUNET_assert (size >= msg_size);
452   memcpy (buf, msg, msg_size);
453   state->th = NULL;
454
455   GNUNET_MQ_impl_send_continue (mq);
456
457   return msg_size;
458 }
459
460
461
462 static void
463 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
464                             void *impl_state)
465 {
466   struct ServerClientSocketState *state = impl_state;
467
468   GNUNET_assert (NULL != mq);
469   GNUNET_assert (NULL != state);
470   GNUNET_SERVER_client_drop (state->client);
471   GNUNET_free (state);
472 }
473
474 static void
475 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
476                          const struct GNUNET_MessageHeader *msg, void *impl_state)
477 {
478   struct ServerClientSocketState *state = impl_state;
479
480   GNUNET_assert (NULL != mq);
481   GNUNET_assert (NULL != state);
482
483   GNUNET_MQ_impl_send_commit (mq);
484
485   state->th =
486       GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
487                                            GNUNET_TIME_UNIT_FOREVER_REL,
488                                            &transmit_queued, mq);
489 }
490
491
492 struct GNUNET_MQ_Handle *
493 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
494 {
495   struct GNUNET_MQ_Handle *mq;
496   struct ServerClientSocketState *scss;
497
498   mq = GNUNET_new (struct GNUNET_MQ_Handle);
499   scss = GNUNET_new (struct ServerClientSocketState);
500   mq->impl_state = scss;
501   scss->client = client;
502   GNUNET_SERVER_client_keep (client);
503   mq->send_impl = server_client_send_impl;
504   mq->destroy_impl = server_client_destroy_impl;
505   return mq;
506 }
507
508
509 /**
510  * Type of a function to call when we receive a message
511  * from the service.
512  *
513  * @param cls closure
514  * @param msg message received, NULL on timeout or fatal error
515  */
516 static void
517 handle_client_message (void *cls,
518                        const struct GNUNET_MessageHeader *msg)
519 {
520   struct GNUNET_MQ_Handle *mq = cls;
521   struct ClientConnectionState *state;
522
523   state = mq->impl_state;
524
525   if (NULL == msg)
526   {
527     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
528     return;
529   }
530
531   GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
532                          GNUNET_TIME_UNIT_FOREVER_REL);
533
534   GNUNET_MQ_inject_message (mq, msg);
535 }
536
537
538 /**
539  * Transmit a queued message to the session's client.
540  *
541  * @param cls consensus session
542  * @param size number of bytes available in buf
543  * @param buf where the callee should write the message
544  * @return number of bytes written to buf
545  */
546 static size_t
547 connection_client_transmit_queued (void *cls, size_t size,
548                  void *buf)
549 {
550   struct GNUNET_MQ_Handle *mq = cls;
551   const struct GNUNET_MessageHeader *msg;
552   struct ClientConnectionState *state = mq->impl_state;
553   size_t msg_size;
554
555   GNUNET_assert (NULL != mq);
556   msg = GNUNET_MQ_impl_current (mq);
557
558   if (NULL == buf)
559   {
560     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
561     return 0;
562   }
563
564   if ( (GNUNET_YES == state->receive_requested) &&
565        (GNUNET_NO == state->receive_active) )
566   {
567     state->receive_active = GNUNET_YES;
568     GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
569                            GNUNET_TIME_UNIT_FOREVER_REL);
570   }
571
572
573   msg_size = ntohs (msg->size);
574   GNUNET_assert (size >= msg_size);
575   memcpy (buf, msg, msg_size);
576   state->th = NULL;
577
578   GNUNET_MQ_impl_send_continue (mq);
579
580   return msg_size;
581 }
582
583
584
585 static void
586 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
587 {
588   GNUNET_free (impl_state);
589 }
590
591 static void
592 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
593                              const struct GNUNET_MessageHeader *msg, void *impl_state)
594 {
595   struct ClientConnectionState *state = impl_state;
596
597   GNUNET_assert (NULL != state);
598   GNUNET_assert (NULL == state->th);
599
600   GNUNET_MQ_impl_send_commit (mq);
601
602   state->th =
603       GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
604                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
605                                            &connection_client_transmit_queued, mq);
606 }
607
608
609 struct GNUNET_MQ_Handle *
610 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
611                                        const struct GNUNET_MQ_MessageHandler *handlers,
612                                        GNUNET_MQ_ErrorHandler error_handler,
613                                        void *cls)
614 {
615   struct GNUNET_MQ_Handle *mq;
616   struct ClientConnectionState *state;
617
618   GNUNET_assert (NULL != connection);
619
620   mq = GNUNET_new (struct GNUNET_MQ_Handle);
621   mq->handlers = handlers;
622   mq->error_handler = error_handler;
623   mq->handlers_cls = cls;
624   state = GNUNET_new (struct ClientConnectionState);
625   state->connection = connection;
626   mq->impl_state = state;
627   mq->send_impl = connection_client_send_impl;
628   mq->destroy_impl = connection_client_destroy_impl;
629   if (NULL != handlers)
630     state->receive_requested = GNUNET_YES;
631
632   return mq;
633 }
634
635
636 void
637 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq,
638                             const struct GNUNET_MQ_MessageHandler *new_handlers,
639                             void *cls)
640 {
641   /* FIXME: notify implementation? */
642   /* FIXME: what about NULL handlers? abort receive? */
643   mq->handlers = new_handlers;
644   mq->handlers_cls = cls;
645 }
646
647
648 /**
649  * Associate the assoc_data in mq with a unique request id.
650  *
651  * @param mq message queue, id will be unique for the queue
652  * @param assoc_data to associate
653  */
654 uint32_t
655 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
656                      void *assoc_data)
657 {
658   uint32_t id;
659
660   if (NULL == mq->assoc_map)
661   {
662     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
663     mq->assoc_id = 1;
664   }
665   id = mq->assoc_id++;
666   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
667                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
668   return id;
669 }
670
671
672
673 void *
674 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
675 {
676   if (NULL == mq->assoc_map)
677     return NULL;
678   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
679 }
680
681
682 void *
683 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
684 {
685   void *val;
686
687   if (NULL == mq->assoc_map)
688     return NULL;
689   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
690   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
691   return val;
692 }
693
694
695 void
696 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
697                        GNUNET_MQ_NotifyCallback cb,
698                        void *cls)
699 {
700   mqm->sent_cb = cb;
701   mqm->sent_cls = cls;
702 }
703
704
705 void
706 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
707 {
708   if (NULL != mq->destroy_impl)
709   {
710     mq->destroy_impl (mq, mq->impl_state);
711   }
712
713   while (NULL != mq->envelope_head)
714   {
715     struct GNUNET_MQ_Envelope *ev;
716     ev = mq->envelope_head;
717     GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
718     GNUNET_MQ_discard (ev);
719   }
720
721   if (NULL != mq->current_envelope)
722   {
723     GNUNET_MQ_discard (mq->current_envelope);
724     mq->current_envelope = NULL;
725   }
726
727   if (NULL != mq->assoc_map)
728   {
729     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
730     mq->assoc_map = NULL;
731   }
732
733   GNUNET_free (mq);
734 }
735
736
737 struct GNUNET_MessageHeader *
738 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
739 {
740   uint16_t whole_size;
741   uint16_t nested_size;
742   struct GNUNET_MessageHeader *nested_msg;
743
744   whole_size = ntohs (mh->size);
745   GNUNET_assert (whole_size >= base_size);
746
747   nested_size = whole_size - base_size;
748
749   if (0 == nested_size)
750     return NULL;
751
752   if (nested_size < sizeof (struct GNUNET_MessageHeader))
753   {
754     GNUNET_break_op (0);
755     return NULL;
756   }
757
758   nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size);
759
760   if (ntohs (nested_msg->size) != nested_size)
761   {
762     GNUNET_break_op (0);
763     nested_msg->size = htons (nested_size);
764   }
765
766   return nested_msg;
767 }
768
769