-remove trailing whitespace
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header,
48    * usually points to the end of the containing GNUNET_MQ_Envelope
49    */
50   struct GNUNET_MessageHeader *mh;
51
52   /**
53    * Queue the message is queued in, NULL if message is not queued.
54    */
55   struct GNUNET_MQ_Handle *parent_queue;
56
57   /**
58    * Called after the message was sent irrevocably.
59    */
60   GNUNET_MQ_NotifyCallback sent_cb;
61
62   /**
63    * Closure for send_cb
64    */
65   void *sent_cls;
66 };
67
68
69 /**
70  * Handle to a message queue.
71  */
72 struct GNUNET_MQ_Handle
73 {
74   /**
75    * Handlers array, or NULL if the queue should not receive messages
76    */
77   const struct GNUNET_MQ_MessageHandler *handlers;
78
79   /**
80    * Closure for the handler callbacks,
81    * as well as for the error handler.
82    */
83   void *handlers_cls;
84
85   /**
86    * Actual implementation of message sending,
87    * called when a message is added
88    */
89   GNUNET_MQ_SendImpl send_impl;
90
91   /**
92    * Implementation-dependent queue destruction function
93    */
94   GNUNET_MQ_DestroyImpl destroy_impl;
95
96   /**
97    * Implementation-specific state
98    */
99   void *impl_state;
100
101   /**
102    * Callback will be called when an error occurs.
103    */
104   GNUNET_MQ_ErrorHandler error_handler;
105
106   /**
107    * Linked list of messages pending to be sent
108    */
109   struct GNUNET_MQ_Envelope *envelope_head;
110
111   /**
112    * Linked list of messages pending to be sent
113    */
114   struct GNUNET_MQ_Envelope *envelope_tail;
115
116   /**
117    * Message that is currently scheduled to be
118    * sent. Not the head of the message queue, as the implementation
119    * needs to know if sending has been already scheduled or not.
120    */
121   struct GNUNET_MQ_Envelope *current_envelope;
122
123   /**
124    * Has the current envelope been commited?
125    * Either GNUNET_YES or GNUNET_NO.
126    */
127   int commited;
128
129   /**
130    * Map of associations, lazily allocated
131    */
132   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
133
134   /**
135    * Next id that should be used for the assoc_map,
136    * initialized lazily to a random value together with
137    * assoc_map
138    */
139   uint32_t assoc_id;
140 };
141
142
143
144
145 struct ServerClientSocketState
146 {
147   struct GNUNET_SERVER_Client *client;
148   struct GNUNET_SERVER_TransmitHandle* th;
149 };
150
151
152 struct ClientConnectionState
153 {
154   /**
155    * Did we call receive alread alreadyy?
156    */
157   int receive_active;
158
159   /**
160    * Do we also want to receive?
161    */
162   int receive_requested;
163   struct GNUNET_CLIENT_Connection *connection;
164   struct GNUNET_CLIENT_TransmitHandle *th;
165 };
166
167
168 /**
169  * Call the message message handler that was registered
170  * for the type of the given message in the given message queue.
171  *
172  * This function is indended to be used for the implementation
173  * of message queues.
174  *
175  * @param mq message queue with the handlers
176  * @param mh message to dispatch
177  */
178 void
179 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
180 {
181   const struct GNUNET_MQ_MessageHandler *handler;
182   int handled = GNUNET_NO;
183
184   handler = mq->handlers;
185   if (NULL == handler)
186     return;
187   for (; NULL != handler->cb; handler++)
188   {
189     if (handler->type == ntohs (mh->type))
190     {
191       handler->cb (mq->handlers_cls, mh);
192       handled = GNUNET_YES;
193     }
194   }
195
196   if (GNUNET_NO == handled)
197     LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
198 }
199
200
201 /**
202  * Call the error handler of a message queue with the given
203  * error code.  If there is no error handler, log a warning.
204  *
205  * This function is intended to be used by the implementation
206  * of message queues.
207  *
208  * @param mq message queue
209  * @param error the error type
210  */
211 void
212 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
213                         enum GNUNET_MQ_Error error)
214 {
215   if (NULL == mq->error_handler)
216   {
217     /* FIXME: log what kind of error occured */
218     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n");
219     return;
220   }
221   mq->error_handler (mq->handlers_cls, error);
222 }
223
224
225 void
226 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
227 {
228   GNUNET_assert (NULL == mqm->parent_queue);
229   GNUNET_free (mqm);
230 }
231
232
233 /**
234  * Send a message with the give message queue.
235  * May only be called once per message.
236  *
237  * @param mq message queue
238  * @param ev the envelope with the message to send.
239  */
240 void
241 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
242 {
243   GNUNET_assert (NULL != mq);
244   GNUNET_assert (NULL == ev->parent_queue);
245
246   /* is the implementation busy? queue it! */
247   if (NULL != mq->current_envelope)
248   {
249     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev);
250     return;
251   }
252   mq->current_envelope = ev;
253   mq->send_impl (mq, ev->mh, mq->impl_state);
254 }
255
256
257 /**
258  * Call the send implementation for the next queued message,
259  * if any.
260  * Only useful for implementing message queues,
261  * results in undefined behavior if not used carefully.
262  *
263  * @param mq message queue to send the next message with
264  */
265 void
266 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
267 {
268   /* call is only valid if we're actually currently sending
269    * a message */
270   GNUNET_assert (NULL != mq);
271   GNUNET_assert (NULL != mq->current_envelope);
272   GNUNET_assert (GNUNET_YES == mq->commited);
273   mq->commited = GNUNET_NO;
274   GNUNET_free (mq->current_envelope);
275   if (NULL == mq->envelope_head)
276   {
277     mq->current_envelope = NULL;
278     return;
279   }
280
281
282   GNUNET_assert (NULL != mq->envelope_tail);
283   GNUNET_assert (NULL != mq->envelope_head);
284   mq->current_envelope = mq->envelope_head;
285   GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail,
286                                mq->current_envelope);
287   mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
288 }
289
290
291 /**
292  * Create a message queue for the specified handlers.
293  *
294  * @param send function the implements sending messages
295  * @param destroy function that implements destroying the queue
296  * @param cancel function that implements canceling a message
297  * @param impl_state for the queue, passed to 'send' and 'destroy'
298  * @param handlers array of message handlers
299  * @param error_handler handler for read and write errors
300  * @param cls closure for message handlers and error handler
301  * @return a new message queue
302  */
303 struct GNUNET_MQ_Handle *
304 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
305                                GNUNET_MQ_DestroyImpl destroy,
306                                GNUNET_MQ_CancelImpl cancel,
307                                void *impl_state,
308                                const struct GNUNET_MQ_MessageHandler *handlers,
309                                GNUNET_MQ_ErrorHandler error_handler,
310                                void *cls)
311 {
312   struct GNUNET_MQ_Handle *mq;
313
314   mq = GNUNET_new (struct GNUNET_MQ_Handle);
315   mq->send_impl = send;
316   mq->destroy_impl = destroy;
317   mq->handlers = handlers;
318   mq->handlers_cls = cls;
319   mq->impl_state = impl_state;
320
321   return mq;
322 }
323
324
325 /**
326  * Get the message that should currently be sent.
327  * Fails if there is no current message.
328  * Only useful for implementing message queues,
329  * results in undefined behavior if not used carefully.
330  *
331  * @param mq message queue with the current message
332  * @return message to send, never NULL
333  */
334 const struct GNUNET_MessageHeader *
335 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
336 {
337   if (NULL == mq->current_envelope)
338     GNUNET_abort ();
339   if (NULL == mq->current_envelope->mh)
340     GNUNET_abort ();
341   return mq->current_envelope->mh;
342 }
343
344
345 /**
346  * Get the implementation state associated with the
347  * message queue.
348  *
349  * While the GNUNET_MQ_Impl* callbacks receive the
350  * implementation state, continuations that are scheduled
351  * by the implementation function often only have one closure
352  * argument, with this function it is possible to get at the
353  * implementation state when only passing the GNUNET_MQ_Handle
354  * as closure.
355  *
356  * @param mq message queue with the current message
357  * @return message to send, never NULL
358  */
359 void *
360 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
361 {
362   return mq->impl_state;
363 }
364
365
366
367 /**
368  * Mark the current message as irrevocably sent, but do not
369  * proceed with sending the next message.
370  * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
371  *
372  * @param mq message queue
373  */
374 void
375 GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq)
376 {
377   GNUNET_assert (NULL != mq->current_envelope);
378   GNUNET_assert (GNUNET_NO == mq->commited);
379   mq->commited = GNUNET_YES;
380   if (NULL != mq->current_envelope->sent_cb)
381     mq->current_envelope->sent_cb (mq->current_envelope->sent_cls);
382 }
383
384
385 struct GNUNET_MQ_Envelope *
386 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
387 {
388   struct GNUNET_MQ_Envelope *mqm;
389
390   mqm = GNUNET_malloc (sizeof *mqm + size);
391   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
392   mqm->mh->size = htons (size);
393   mqm->mh->type = htons (type);
394   if (NULL != mhp)
395     *mhp = mqm->mh;
396   return mqm;
397 }
398
399
400 /**
401  * Implementation of the GNUNET_MQ_msg_nested_mh macro.
402  *
403  * @param mhp pointer to the message header pointer that will be changed to allocate at
404  *        the newly allocated space for the message.
405  * @param base_size size of the data before the nested message
406  * @param type type of the message in the envelope
407  * @param nested_mh the message to append to the message after base_size
408  */
409 struct GNUNET_MQ_Envelope *
410 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
411                           const struct GNUNET_MessageHeader *nested_mh)
412 {
413   struct GNUNET_MQ_Envelope *mqm;
414   uint16_t size;
415
416   if (NULL == nested_mh)
417     return GNUNET_MQ_msg_ (mhp, base_size, type);
418
419   size = base_size + ntohs (nested_mh->size);
420
421   /* check for uint16_t overflow */
422   if (size < base_size)
423     return NULL;
424
425   mqm = GNUNET_MQ_msg_ (mhp, size, type);
426   memcpy ((char *) mqm->mh + base_size, nested_mh, ntohs (nested_mh->size));
427
428   return mqm;
429 }
430
431
432 /**
433  * Transmit a queued message to the session's client.
434  *
435  * @param cls consensus session
436  * @param size number of bytes available in buf
437  * @param buf where the callee should write the message
438  * @return number of bytes written to buf
439  */
440 static size_t
441 transmit_queued (void *cls, size_t size,
442                  void *buf)
443 {
444   struct GNUNET_MQ_Handle *mq = cls;
445   struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
446   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
447   size_t msg_size;
448
449   GNUNET_assert (NULL != buf);
450
451   msg_size = ntohs (msg->size);
452   GNUNET_assert (size >= msg_size);
453   memcpy (buf, msg, msg_size);
454   state->th = NULL;
455
456   GNUNET_MQ_impl_send_continue (mq);
457
458   return msg_size;
459 }
460
461
462
463 static void
464 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
465                             void *impl_state)
466 {
467   struct ServerClientSocketState *state = impl_state;
468
469   GNUNET_assert (NULL != mq);
470   GNUNET_assert (NULL != state);
471   GNUNET_SERVER_client_drop (state->client);
472   GNUNET_free (state);
473 }
474
475 static void
476 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
477                          const struct GNUNET_MessageHeader *msg, void *impl_state)
478 {
479   struct ServerClientSocketState *state = impl_state;
480
481   GNUNET_assert (NULL != mq);
482   GNUNET_assert (NULL != state);
483
484   GNUNET_MQ_impl_send_commit (mq);
485
486   state->th =
487       GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
488                                            GNUNET_TIME_UNIT_FOREVER_REL,
489                                            &transmit_queued, mq);
490 }
491
492
493 struct GNUNET_MQ_Handle *
494 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
495 {
496   struct GNUNET_MQ_Handle *mq;
497   struct ServerClientSocketState *scss;
498
499   mq = GNUNET_new (struct GNUNET_MQ_Handle);
500   scss = GNUNET_new (struct ServerClientSocketState);
501   mq->impl_state = scss;
502   scss->client = client;
503   GNUNET_SERVER_client_keep (client);
504   mq->send_impl = server_client_send_impl;
505   mq->destroy_impl = server_client_destroy_impl;
506   return mq;
507 }
508
509
510 /**
511  * Type of a function to call when we receive a message
512  * from the service.
513  *
514  * @param cls closure
515  * @param msg message received, NULL on timeout or fatal error
516  */
517 static void
518 handle_client_message (void *cls,
519                        const struct GNUNET_MessageHeader *msg)
520 {
521   struct GNUNET_MQ_Handle *mq = cls;
522   struct ClientConnectionState *state;
523
524   state = mq->impl_state;
525
526   if (NULL == msg)
527   {
528     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
529     return;
530   }
531
532   GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
533                          GNUNET_TIME_UNIT_FOREVER_REL);
534
535   GNUNET_MQ_inject_message (mq, msg);
536 }
537
538
539 /**
540  * Transmit a queued message to the session's client.
541  *
542  * @param cls consensus session
543  * @param size number of bytes available in buf
544  * @param buf where the callee should write the message
545  * @return number of bytes written to buf
546  */
547 static size_t
548 connection_client_transmit_queued (void *cls, size_t size,
549                  void *buf)
550 {
551   struct GNUNET_MQ_Handle *mq = cls;
552   const struct GNUNET_MessageHeader *msg;
553   struct ClientConnectionState *state = mq->impl_state;
554   size_t msg_size;
555
556   GNUNET_assert (NULL != mq);
557   msg = GNUNET_MQ_impl_current (mq);
558
559   if (NULL == buf)
560   {
561     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
562     return 0;
563   }
564
565   if ( (GNUNET_YES == state->receive_requested) &&
566        (GNUNET_NO == state->receive_active) )
567   {
568     state->receive_active = GNUNET_YES;
569     GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
570                            GNUNET_TIME_UNIT_FOREVER_REL);
571   }
572
573
574   msg_size = ntohs (msg->size);
575   GNUNET_assert (size >= msg_size);
576   memcpy (buf, msg, msg_size);
577   state->th = NULL;
578
579   GNUNET_MQ_impl_send_continue (mq);
580
581   return msg_size;
582 }
583
584
585
586 static void
587 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
588 {
589   GNUNET_free (impl_state);
590 }
591
592 static void
593 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
594                              const struct GNUNET_MessageHeader *msg, void *impl_state)
595 {
596   struct ClientConnectionState *state = impl_state;
597
598   GNUNET_assert (NULL != state);
599   GNUNET_assert (NULL == state->th);
600
601   GNUNET_MQ_impl_send_commit (mq);
602
603   state->th =
604       GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
605                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
606                                            &connection_client_transmit_queued, mq);
607 }
608
609
610 struct GNUNET_MQ_Handle *
611 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
612                                        const struct GNUNET_MQ_MessageHandler *handlers,
613                                        GNUNET_MQ_ErrorHandler error_handler,
614                                        void *cls)
615 {
616   struct GNUNET_MQ_Handle *mq;
617   struct ClientConnectionState *state;
618
619   GNUNET_assert (NULL != connection);
620
621   mq = GNUNET_new (struct GNUNET_MQ_Handle);
622   mq->handlers = handlers;
623   mq->error_handler = error_handler;
624   mq->handlers_cls = cls;
625   state = GNUNET_new (struct ClientConnectionState);
626   state->connection = connection;
627   mq->impl_state = state;
628   mq->send_impl = connection_client_send_impl;
629   mq->destroy_impl = connection_client_destroy_impl;
630   if (NULL != handlers)
631     state->receive_requested = GNUNET_YES;
632
633   return mq;
634 }
635
636
637 void
638 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq,
639                             const struct GNUNET_MQ_MessageHandler *new_handlers,
640                             void *cls)
641 {
642   /* FIXME: notify implementation? */
643   /* FIXME: what about NULL handlers? abort receive? */
644   mq->handlers = new_handlers;
645   mq->handlers_cls = cls;
646 }
647
648
649 /**
650  * Associate the assoc_data in mq with a unique request id.
651  *
652  * @param mq message queue, id will be unique for the queue
653  * @param assoc_data to associate
654  */
655 uint32_t
656 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
657                      void *assoc_data)
658 {
659   uint32_t id;
660
661   if (NULL == mq->assoc_map)
662   {
663     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
664     mq->assoc_id = 1;
665   }
666   id = mq->assoc_id++;
667   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
668                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
669   return id;
670 }
671
672
673
674 void *
675 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
676 {
677   if (NULL == mq->assoc_map)
678     return NULL;
679   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
680 }
681
682
683 void *
684 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
685 {
686   void *val;
687
688   if (NULL == mq->assoc_map)
689     return NULL;
690   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
691   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
692   return val;
693 }
694
695
696 void
697 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
698                        GNUNET_MQ_NotifyCallback cb,
699                        void *cls)
700 {
701   mqm->sent_cb = cb;
702   mqm->sent_cls = cls;
703 }
704
705
706 void
707 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
708 {
709   if (NULL != mq->destroy_impl)
710   {
711     mq->destroy_impl (mq, mq->impl_state);
712   }
713
714   while (NULL != mq->envelope_head)
715   {
716     struct GNUNET_MQ_Envelope *ev;
717     ev = mq->envelope_head;
718     GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
719     GNUNET_MQ_discard (ev);
720   }
721
722   if (NULL != mq->current_envelope)
723   {
724     GNUNET_MQ_discard (mq->current_envelope);
725     mq->current_envelope = NULL;
726   }
727
728   if (NULL != mq->assoc_map)
729   {
730     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
731     mq->assoc_map = NULL;
732   }
733
734   GNUNET_free (mq);
735 }
736
737
738 struct GNUNET_MessageHeader *
739 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
740 {
741   uint16_t whole_size;
742   uint16_t nested_size;
743   struct GNUNET_MessageHeader *nested_msg;
744
745   whole_size = ntohs (mh->size);
746   GNUNET_assert (whole_size >= base_size);
747
748   nested_size = whole_size - base_size;
749
750   if (0 == nested_size)
751     return NULL;
752
753   if (nested_size < sizeof (struct GNUNET_MessageHeader))
754   {
755     GNUNET_break_op (0);
756     return NULL;
757   }
758
759   nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size);
760
761   if (ntohs (nested_msg->size) != nested_size)
762   {
763     GNUNET_break_op (0);
764     nested_msg->size = htons (nested_size);
765   }
766
767   return nested_msg;
768 }
769
770