- logging
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26
27 #include "platform.h"
28 #include "gnunet_common.h"
29 #include "gnunet_util_lib.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
32
33
34 struct GNUNET_MQ_Envelope
35 {
36   /**
37    * Messages are stored in a linked list.
38    * Each queue has its own list of envelopes.
39    */
40   struct GNUNET_MQ_Envelope *next;
41
42   /**
43    * Messages are stored in a linked list
44    * Each queue has its own list of envelopes.
45    */
46   struct GNUNET_MQ_Envelope *prev;
47
48   /**
49    * Actual allocated message header,
50    * usually points to the end of the containing GNUNET_MQ_Envelope
51    */
52   struct GNUNET_MessageHeader *mh;
53
54   /**
55    * Queue the message is queued in, NULL if message is not queued.
56    */
57   struct GNUNET_MQ_Handle *parent_queue;
58
59   /**
60    * Called after the message was sent irrevocably.
61    */
62   GNUNET_MQ_NotifyCallback sent_cb;
63
64   /**
65    * Closure for send_cb
66    */
67   void *sent_cls;
68 };
69
70
71 /**
72  * Handle to a message queue.
73  */
74 struct GNUNET_MQ_Handle
75 {
76   /**
77    * Handlers array, or NULL if the queue should not receive messages
78    */
79   const struct GNUNET_MQ_MessageHandler *handlers;
80
81   /**
82    * Closure for the handler callbacks,
83    * as well as for the error handler.
84    */
85   void *handlers_cls;
86
87   /**
88    * Actual implementation of message sending,
89    * called when a message is added
90    */
91   GNUNET_MQ_SendImpl send_impl;
92
93   /**
94    * Implementation-dependent queue destruction function
95    */
96   GNUNET_MQ_DestroyImpl destroy_impl;
97
98   /**
99    * Implementation-specific state
100    */
101   void *impl_state;
102
103   /**
104    * Callback will be called when an error occurs.
105    */
106   GNUNET_MQ_ErrorHandler error_handler;
107
108   /**
109    * Linked list of messages pending to be sent
110    */
111   struct GNUNET_MQ_Envelope *envelope_head;
112
113   /**
114    * Linked list of messages pending to be sent
115    */
116   struct GNUNET_MQ_Envelope *envelope_tail;
117
118   /**
119    * Message that is currently scheduled to be
120    * sent. Not the head of the message queue, as the implementation
121    * needs to know if sending has been already scheduled or not.
122    */
123   struct GNUNET_MQ_Envelope *current_envelope;
124
125   /**
126    * Has the current envelope been commited?
127    * Either GNUNET_YES or GNUNET_NO.
128    */
129   int commited;
130
131   /**
132    * Map of associations, lazily allocated
133    */
134   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
135
136   /**
137    * Next id that should be used for the assoc_map,
138    * initialized lazily to a random value together with
139    * assoc_map
140    */
141   uint32_t assoc_id;
142 };
143
144
145
146
147 struct ServerClientSocketState
148 {
149   struct GNUNET_SERVER_Client *client;
150   struct GNUNET_SERVER_TransmitHandle* th;
151 };
152
153
154 struct ClientConnectionState
155 {
156   /**
157    * Did we call receive alread alreadyy?
158    */
159   int receive_active;
160
161   /**
162    * Do we also want to receive?
163    */
164   int receive_requested;
165   struct GNUNET_CLIENT_Connection *connection;
166   struct GNUNET_CLIENT_TransmitHandle *th;
167 };
168
169
170 /**
171  * Call the message message handler that was registered
172  * for the type of the given message in the given message queue.
173  *
174  * This function is indended to be used for the implementation
175  * of message queues.
176  *
177  * @param mq message queue with the handlers
178  * @param mh message to dispatch
179  */
180 void
181 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
182 {
183   const struct GNUNET_MQ_MessageHandler *handler;
184   int handled = GNUNET_NO;
185
186   handler = mq->handlers;
187   if (NULL == handler)
188     return;
189   for (; NULL != handler->cb; handler++)
190   {
191     if (handler->type == ntohs (mh->type))
192     {
193       handler->cb (mq->handlers_cls, mh);
194       handled = GNUNET_YES;
195     }
196   }
197   
198   if (GNUNET_NO == handled)
199     LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
200 }
201
202
203 /**
204  * Call the error handler of a message queue with the given
205  * error code.  If there is no error handler, log a warning.
206  *
207  * This function is intended to be used by the implementation
208  * of message queues.
209  *
210  * @param mq message queue
211  * @param error the error type
212  */
213 void
214 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
215                         enum GNUNET_MQ_Error error)
216 {
217   if (NULL == mq->error_handler)
218   {
219     /* FIXME: log what kind of error occured */
220     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n");
221     return;
222   }
223   mq->error_handler (mq->handlers_cls, error);
224 }
225
226
227 void
228 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
229 {
230   GNUNET_assert (NULL == mqm->parent_queue);
231   GNUNET_free (mqm);
232 }
233
234
235 /**
236  * Send a message with the give message queue.
237  * May only be called once per message.
238  * 
239  * @param mq message queue
240  * @param ev the envelope with the message to send.
241  */
242 void
243 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
244 {
245   GNUNET_assert (NULL != mq);
246   GNUNET_assert (NULL == ev->parent_queue);
247   
248   /* is the implementation busy? queue it! */
249   if (NULL != mq->current_envelope)
250   {
251     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev);
252     return;
253   }
254   mq->current_envelope = ev;
255   mq->send_impl (mq, ev->mh, mq->impl_state);
256 }
257
258
259 /**
260  * Call the send implementation for the next queued message,
261  * if any.
262  * Only useful for implementing message queues,
263  * results in undefined behavior if not used carefully.
264  *
265  * @param mq message queue to send the next message with
266  */
267 void
268 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
269 {
270   /* call is only valid if we're actually currently sending
271    * a message */
272   GNUNET_assert (NULL != mq);
273   GNUNET_assert (NULL != mq->current_envelope);
274   GNUNET_assert (GNUNET_YES == mq->commited);
275   mq->commited = GNUNET_NO;
276   GNUNET_free (mq->current_envelope);
277   if (NULL == mq->envelope_head)
278   {
279     mq->current_envelope = NULL;
280     return;
281   }
282
283
284   GNUNET_assert (NULL != mq->envelope_tail);
285   GNUNET_assert (NULL != mq->envelope_head);
286   mq->current_envelope = mq->envelope_head;
287   GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail,
288                                mq->current_envelope);
289   mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
290 }
291
292
293 /**
294  * Create a message queue for the specified handlers.
295  *
296  * @param send function the implements sending messages
297  * @param destroy function that implements destroying the queue
298  * @param cancel function that implements canceling a message
299  * @param impl_state for the queue, passed to 'send' and 'destroy'
300  * @param handlers array of message handlers
301  * @param error_handler handler for read and write errors
302  * @param cls closure for message handlers and error handler
303  * @return a new message queue
304  */
305 struct GNUNET_MQ_Handle *
306 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
307                                GNUNET_MQ_DestroyImpl destroy,
308                                GNUNET_MQ_CancelImpl cancel,
309                                void *impl_state,
310                                const struct GNUNET_MQ_MessageHandler *handlers,
311                                GNUNET_MQ_ErrorHandler error_handler,
312                                void *cls)
313 {
314   struct GNUNET_MQ_Handle *mq;
315
316   mq = GNUNET_new (struct GNUNET_MQ_Handle);
317   mq->send_impl = send;
318   mq->destroy_impl = destroy;
319   mq->handlers = handlers;
320   mq->handlers_cls = cls;
321   mq->impl_state = impl_state;
322
323   return mq;
324 }
325
326
327 /**
328  * Get the message that should currently be sent.
329  * Fails if there is no current message.
330  * Only useful for implementing message queues,
331  * results in undefined behavior if not used carefully.
332  *
333  * @param mq message queue with the current message
334  * @return message to send, never NULL
335  */
336 const struct GNUNET_MessageHeader *
337 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
338 {
339   if (NULL == mq->current_envelope)
340     GNUNET_abort ();
341   if (NULL == mq->current_envelope->mh)
342     GNUNET_abort ();
343   return mq->current_envelope->mh;
344 }
345
346
347 /**
348  * Get the implementation state associated with the
349  * message queue.
350  *
351  * While the GNUNET_MQ_Impl* callbacks receive the
352  * implementation state, continuations that are scheduled
353  * by the implementation function often only have one closure
354  * argument, with this function it is possible to get at the
355  * implementation state when only passing the GNUNET_MQ_Handle
356  * as closure.
357  *
358  * @param mq message queue with the current message
359  * @return message to send, never NULL
360  */
361 void *
362 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
363 {
364   return mq->impl_state;
365 }
366
367
368
369 /**
370  * Mark the current message as irrevocably sent, but do not
371  * proceed with sending the next message.
372  * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
373  *
374  * @param mq message queue
375  */
376 void
377 GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq)
378 {
379   GNUNET_assert (NULL != mq->current_envelope);
380   GNUNET_assert (GNUNET_NO == mq->commited);
381   mq->commited = GNUNET_YES;
382   if (NULL != mq->current_envelope->sent_cb)
383     mq->current_envelope->sent_cb (mq->current_envelope->sent_cls);
384 }
385
386
387 struct GNUNET_MQ_Envelope *
388 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
389 {
390   struct GNUNET_MQ_Envelope *mqm;
391
392   mqm = GNUNET_malloc (sizeof *mqm + size);
393   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
394   mqm->mh->size = htons (size);
395   mqm->mh->type = htons (type);
396   if (NULL != mhp)
397     *mhp = mqm->mh;
398   return mqm;
399 }
400
401
402 /**
403  * Implementation of the GNUNET_MQ_msg_nested_mh macro.
404  *
405  * @param mhp pointer to the message header pointer that will be changed to allocate at
406  *        the newly allocated space for the message.
407  * @param base_size size of the data before the nested message
408  * @param type type of the message in the envelope
409  * @param nested_mh the message to append to the message after base_size
410  */
411 struct GNUNET_MQ_Envelope *
412 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
413                           const struct GNUNET_MessageHeader *nested_mh)
414 {
415   struct GNUNET_MQ_Envelope *mqm;
416   uint16_t size;
417
418   if (NULL == nested_mh)
419     return GNUNET_MQ_msg_ (mhp, base_size, type);
420
421   size = base_size + ntohs (nested_mh->size);
422
423   /* check for uint16_t overflow */
424   if (size < base_size)
425     return NULL;
426
427   mqm = GNUNET_MQ_msg_ (mhp, size, type);
428   memcpy ((char *) mqm->mh + base_size, nested_mh, ntohs (nested_mh->size));
429
430   return mqm;
431 }
432
433
434 /**
435  * Transmit a queued message to the session's client.
436  *
437  * @param cls consensus session
438  * @param size number of bytes available in buf
439  * @param buf where the callee should write the message
440  * @return number of bytes written to buf
441  */
442 static size_t
443 transmit_queued (void *cls, size_t size,
444                  void *buf)
445 {
446   struct GNUNET_MQ_Handle *mq = cls;
447   struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
448   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
449   size_t msg_size;
450
451   GNUNET_assert (NULL != buf);
452
453   msg_size = ntohs (msg->size);
454   GNUNET_assert (size >= msg_size);
455   memcpy (buf, msg, msg_size);
456   state->th = NULL;
457
458   GNUNET_MQ_impl_send_continue (mq);
459
460   return msg_size;
461 }
462
463
464
465 static void
466 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
467                             void *impl_state)
468 {
469   struct ServerClientSocketState *state = impl_state;
470   
471   GNUNET_assert (NULL != mq);
472   GNUNET_assert (NULL != state);
473   GNUNET_SERVER_client_drop (state->client);
474   GNUNET_free (state);
475 }
476
477 static void
478 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
479                          const struct GNUNET_MessageHeader *msg, void *impl_state)
480 {
481   struct ServerClientSocketState *state = impl_state;
482
483   GNUNET_assert (NULL != mq);
484   GNUNET_assert (NULL != state);
485
486   GNUNET_MQ_impl_send_commit (mq);
487
488   state->th = 
489       GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
490                                            GNUNET_TIME_UNIT_FOREVER_REL,
491                                            &transmit_queued, mq);
492 }
493
494
495 struct GNUNET_MQ_Handle *
496 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
497 {
498   struct GNUNET_MQ_Handle *mq;
499   struct ServerClientSocketState *scss;
500
501   mq = GNUNET_new (struct GNUNET_MQ_Handle);
502   scss = GNUNET_new (struct ServerClientSocketState);
503   mq->impl_state = scss;
504   scss->client = client;
505   GNUNET_SERVER_client_keep (client);
506   mq->send_impl = server_client_send_impl;
507   mq->destroy_impl = server_client_destroy_impl;
508   return mq;
509 }
510
511
512 /**
513  * Type of a function to call when we receive a message
514  * from the service.
515  *
516  * @param cls closure
517  * @param msg message received, NULL on timeout or fatal error
518  */
519 static void
520 handle_client_message (void *cls,
521                        const struct GNUNET_MessageHeader *msg)
522 {
523   struct GNUNET_MQ_Handle *mq = cls;
524   struct ClientConnectionState *state;
525
526   state = mq->impl_state;
527   
528   if (NULL == msg)
529   {
530     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
531     return;
532   }
533
534   GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
535                          GNUNET_TIME_UNIT_FOREVER_REL);
536
537   GNUNET_MQ_inject_message (mq, msg);
538 }
539
540
541 /**
542  * Transmit a queued message to the session's client.
543  *
544  * @param cls consensus session
545  * @param size number of bytes available in buf
546  * @param buf where the callee should write the message
547  * @return number of bytes written to buf
548  */
549 static size_t
550 connection_client_transmit_queued (void *cls, size_t size,
551                  void *buf)
552 {
553   struct GNUNET_MQ_Handle *mq = cls;
554   const struct GNUNET_MessageHeader *msg;
555   struct ClientConnectionState *state = mq->impl_state;
556   size_t msg_size;
557
558   GNUNET_assert (NULL != mq);
559   msg = GNUNET_MQ_impl_current (mq);
560
561   if (NULL == buf)
562   {
563     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
564     return 0;
565   }
566
567   if ( (GNUNET_YES == state->receive_requested) &&
568        (GNUNET_NO == state->receive_active) )
569   {
570     state->receive_active = GNUNET_YES;
571     GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
572                            GNUNET_TIME_UNIT_FOREVER_REL);
573   }
574
575
576   msg_size = ntohs (msg->size);
577   GNUNET_assert (size >= msg_size);
578   memcpy (buf, msg, msg_size);
579   state->th = NULL;
580
581   GNUNET_MQ_impl_send_continue (mq);
582
583   return msg_size;
584 }
585
586
587
588 static void
589 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
590 {
591   GNUNET_free (impl_state);
592 }
593
594 static void
595 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
596                              const struct GNUNET_MessageHeader *msg, void *impl_state)
597 {
598   struct ClientConnectionState *state = impl_state;
599
600   GNUNET_assert (NULL != state);
601   GNUNET_assert (NULL == state->th);
602
603   GNUNET_MQ_impl_send_commit (mq);
604
605   state->th = 
606       GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), 
607                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
608                                            &connection_client_transmit_queued, mq);
609 }
610
611
612 struct GNUNET_MQ_Handle *
613 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
614                                        const struct GNUNET_MQ_MessageHandler *handlers,
615                                        GNUNET_MQ_ErrorHandler error_handler,
616                                        void *cls)
617 {
618   struct GNUNET_MQ_Handle *mq;
619   struct ClientConnectionState *state;
620
621   GNUNET_assert (NULL != connection);
622
623   mq = GNUNET_new (struct GNUNET_MQ_Handle);
624   mq->handlers = handlers;
625   mq->error_handler = error_handler;
626   mq->handlers_cls = cls;
627   state = GNUNET_new (struct ClientConnectionState);
628   state->connection = connection;
629   mq->impl_state = state;
630   mq->send_impl = connection_client_send_impl;
631   mq->destroy_impl = connection_client_destroy_impl;
632   if (NULL != handlers)
633     state->receive_requested = GNUNET_YES;
634
635   return mq;
636 }
637
638
639 void
640 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq,
641                             const struct GNUNET_MQ_MessageHandler *new_handlers,
642                             void *cls)
643 {
644   /* FIXME: notify implementation? */
645   /* FIXME: what about NULL handlers? abort receive? */
646   mq->handlers = new_handlers;
647   mq->handlers_cls = cls;
648 }
649
650
651 /**
652  * Associate the assoc_data in mq with a unique request id.
653  *
654  * @param mq message queue, id will be unique for the queue
655  * @param assoc_data to associate
656  */
657 uint32_t
658 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
659                      void *assoc_data)
660 {
661   uint32_t id;
662
663   if (NULL == mq->assoc_map)
664   {
665     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
666     mq->assoc_id = 1;
667   }
668   id = mq->assoc_id++;
669   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
670                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
671   return id;
672 }
673
674
675
676 void *
677 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
678 {
679   if (NULL == mq->assoc_map)
680     return NULL;
681   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
682 }
683
684
685 void *
686 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
687 {
688   void *val;
689
690   if (NULL == mq->assoc_map)
691     return NULL;
692   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
693   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
694   return val;
695 }
696
697
698 void
699 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
700                        GNUNET_MQ_NotifyCallback cb,
701                        void *cls)
702 {
703   mqm->sent_cb = cb;
704   mqm->sent_cls = cls;
705 }
706
707
708 void
709 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
710 {
711   if (NULL != mq->destroy_impl)
712   {
713     mq->destroy_impl (mq, mq->impl_state);
714   }
715
716   while (NULL != mq->envelope_head)
717   {
718     struct GNUNET_MQ_Envelope *ev;
719     ev = mq->envelope_head;
720     GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
721     GNUNET_MQ_discard (ev);
722   }
723
724   if (NULL != mq->current_envelope)
725   {
726     GNUNET_MQ_discard (mq->current_envelope);
727     mq->current_envelope = NULL;
728   }
729
730   GNUNET_free (mq);
731 }
732
733
734 struct GNUNET_MessageHeader *
735 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
736 {
737   uint16_t whole_size;
738   uint16_t nested_size;
739   struct GNUNET_MessageHeader *nested_msg;
740
741   whole_size = ntohs (mh->size);
742   GNUNET_assert (whole_size >= base_size);
743
744   nested_size = whole_size - base_size;
745
746   if (0 == nested_size)
747     return NULL;
748
749   if (nested_size < sizeof (struct GNUNET_MessageHeader))
750   {
751     GNUNET_break_op (0);
752     return NULL;
753   }
754
755   nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size);
756
757   if (ntohs (nested_msg->size) != nested_size)
758   {
759     GNUNET_break_op (0);
760     nested_msg->size = htons (nested_size);
761   }
762
763   return nested_msg;
764 }
765
766