793fd70aeccd2c870eba51919501e565bbe151fc
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "util-mq",__VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header.
48    * The GNUNET_MQ_Envelope header is allocated at
49    * the end of the message.
50    */
51   struct GNUNET_MessageHeader *mh;
52
53   /**
54    * Queue the message is queued in, NULL if message is not queued.
55    */
56   struct GNUNET_MQ_Handle *parent_queue;
57
58   /**
59    * Called after the message was sent irrevocably.
60    */
61   GNUNET_SCHEDULER_TaskCallback sent_cb;
62
63   /**
64    * Closure for @e sent_cb
65    */
66   void *sent_cls;
67
68   /**
69    * Flags that were set for this envelope by
70    * #GNUNET_MQ_env_set_options().   Only valid if
71    * @e have_custom_options is set.
72    */
73   uint64_t flags;
74
75   /**
76    * Additional options buffer set for this envelope by
77    * #GNUNET_MQ_env_set_options().  Only valid if
78    * @e have_custom_options is set.
79    */
80   const void *extra;
81
82   /**
83    * Did the application call #GNUNET_MQ_env_set_options()?
84    */
85   int have_custom_options;
86 };
87
88
89 /**
90  * Handle to a message queue.
91  */
92 struct GNUNET_MQ_Handle
93 {
94   /**
95    * Handlers array, or NULL if the queue should not receive messages
96    */
97   struct GNUNET_MQ_MessageHandler *handlers;
98
99   /**
100    * Actual implementation of message sending,
101    * called when a message is added
102    */
103   GNUNET_MQ_SendImpl send_impl;
104
105   /**
106    * Implementation-dependent queue destruction function
107    */
108   GNUNET_MQ_DestroyImpl destroy_impl;
109
110   /**
111    * Implementation-dependent send cancel function
112    */
113   GNUNET_MQ_CancelImpl cancel_impl;
114
115   /**
116    * Implementation-specific state
117    */
118   void *impl_state;
119
120   /**
121    * Callback will be called when an error occurs.
122    */
123   GNUNET_MQ_ErrorHandler error_handler;
124
125   /**
126    * Closure for the error handler.
127    */
128   void *error_handler_cls;
129
130   /**
131    * Task to asynchronously run #impl_send_continue().
132    */
133   struct GNUNET_SCHEDULER_Task *send_task;
134
135   /**
136    * Linked list of messages pending to be sent
137    */
138   struct GNUNET_MQ_Envelope *envelope_head;
139
140   /**
141    * Linked list of messages pending to be sent
142    */
143   struct GNUNET_MQ_Envelope *envelope_tail;
144
145   /**
146    * Message that is currently scheduled to be
147    * sent. Not the head of the message queue, as the implementation
148    * needs to know if sending has been already scheduled or not.
149    */
150   struct GNUNET_MQ_Envelope *current_envelope;
151
152   /**
153    * Map of associations, lazily allocated
154    */
155   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
156
157   /**
158    * Functions to call on queue destruction; kept in a DLL.
159    */
160   struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
161
162   /**
163    * Functions to call on queue destruction; kept in a DLL.
164    */
165   struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
166
167   /**
168    * Additional options buffer set for this queue by
169    * #GNUNET_MQ_set_options().  Default is 0.
170    */
171   const void *default_extra;
172
173   /**
174    * Flags that were set for this queue by
175    * #GNUNET_MQ_set_options().   Default is 0.
176    */
177   uint64_t default_flags;
178
179   /**
180    * Next id that should be used for the @e assoc_map,
181    * initialized lazily to a random value together with
182    * @e assoc_map
183    */
184   uint32_t assoc_id;
185
186   /**
187    * Number of entries we have in the envelope-DLL.
188    */
189   unsigned int queue_length;
190
191   /**
192    * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
193    * FIXME: is this dead?
194    */
195   int evacuate_called;
196
197   /**
198    * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
199    */
200   int in_flight;
201 };
202
203
204 /**
205  * Call the message message handler that was registered
206  * for the type of the given message in the given message queue.
207  *
208  * This function is indended to be used for the implementation
209  * of message queues.
210  *
211  * @param mq message queue with the handlers
212  * @param mh message to dispatch
213  */
214 void
215 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
216                           const struct GNUNET_MessageHeader *mh)
217 {
218   const struct GNUNET_MQ_MessageHandler *handler;
219   int handled = GNUNET_NO;
220   uint16_t msize = ntohs (mh->size);
221   uint16_t mtype = ntohs (mh->type);
222
223   LOG (GNUNET_ERROR_TYPE_DEBUG,
224        "Queue %p received message of type %u and size %u\n",
225        mq,
226        mtype,
227        msize);
228
229   if (NULL == mq->handlers)
230     goto done;
231   for (handler = mq->handlers; NULL != handler->cb; handler++)
232   {
233     if (handler->type == mtype)
234     {
235       handled = GNUNET_YES;
236       if ( (handler->expected_size > msize) ||
237            ( (handler->expected_size != msize) &&
238              (NULL == handler->mv) ) )
239       {
240         /* Too small, or not an exact size and
241            no 'mv' handler to check rest */
242         LOG (GNUNET_ERROR_TYPE_ERROR,
243              "Received malformed message of type %u\n",
244              (unsigned int) handler->type);
245         GNUNET_MQ_inject_error (mq,
246                                 GNUNET_MQ_ERROR_MALFORMED);
247         break;
248       }
249       if ( (NULL == handler->mv) ||
250            (GNUNET_OK ==
251             handler->mv (handler->cls, mh)) )
252       {
253         /* message well-formed, pass to handler */
254         handler->cb (handler->cls, mh);
255       }
256       else
257       {
258         /* Message rejected by check routine */
259         LOG (GNUNET_ERROR_TYPE_ERROR,
260              "Received malformed message of type %u\n",
261              (unsigned int) handler->type);
262         GNUNET_MQ_inject_error (mq,
263                                 GNUNET_MQ_ERROR_MALFORMED);
264       }
265       break;
266     }
267   }
268  done:
269   if (GNUNET_NO == handled)
270     LOG (GNUNET_ERROR_TYPE_INFO,
271          "No handler for message of type %u and size %u\n",
272          mtype, msize);
273 }
274
275
276 /**
277  * Call the error handler of a message queue with the given
278  * error code.  If there is no error handler, log a warning.
279  *
280  * This function is intended to be used by the implementation
281  * of message queues.
282  *
283  * @param mq message queue
284  * @param error the error type
285  */
286 void
287 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
288                         enum GNUNET_MQ_Error error)
289 {
290   if (NULL == mq->error_handler)
291   {
292     LOG (GNUNET_ERROR_TYPE_WARNING,
293          "Got error %d, but no handler installed\n",
294          (int) error);
295     return;
296   }
297   mq->error_handler (mq->error_handler_cls,
298                      error);
299 }
300
301
302 /**
303  * Discard the message queue message, free all
304  * allocated resources. Must be called in the event
305  * that a message is created but should not actually be sent.
306  *
307  * @param mqm the message to discard
308  */
309 void
310 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
311 {
312   GNUNET_assert (NULL == ev->parent_queue);
313   GNUNET_free (ev);
314 }
315
316
317 /**
318  * Obtain the current length of the message queue.
319  *
320  * @param mq queue to inspect
321  * @return number of queued, non-transmitted messages
322  */
323 unsigned int
324 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
325 {
326   if (GNUNET_YES != mq->in_flight)
327   {
328     return mq->queue_length;
329   }
330   return mq->queue_length - 1;
331 }
332
333
334 /**
335  * Send a message with the given message queue.
336  * May only be called once per message.
337  *
338  * @param mq message queue
339  * @param ev the envelope with the message to send.
340  */
341 void
342 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
343                 struct GNUNET_MQ_Envelope *ev)
344 {
345   GNUNET_assert (NULL != mq);
346   GNUNET_assert (NULL == ev->parent_queue);
347
348   mq->queue_length++;
349   GNUNET_break (mq->queue_length < 10000); /* This would seem like a bug... */
350   ev->parent_queue = mq;
351   /* is the implementation busy? queue it! */
352   if ( (NULL != mq->current_envelope) ||
353        (NULL != mq->send_task) )
354   {
355     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
356                                       mq->envelope_tail,
357                                       ev);
358     return;
359   }
360   GNUNET_assert (NULL == mq->envelope_head);
361   mq->current_envelope = ev;
362
363   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
364               "mq: sending message of type %u, queue empty\n",
365               ntohs(ev->mh->type));
366
367   mq->send_impl (mq,
368                  ev->mh,
369                  mq->impl_state);
370 }
371
372
373 /**
374  * Remove the first envelope that has not yet been sent from the message
375  * queue and return it.
376  *
377  * @param mq queue to remove envelope from
378  * @return NULL if queue is empty (or has no envelope that is not under transmission)
379  */
380 struct GNUNET_MQ_Envelope *
381 GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
382 {
383   struct GNUNET_MQ_Envelope *env;
384
385   env = mq->envelope_head;
386   GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
387                                mq->envelope_tail,
388                                env);
389   mq->queue_length--;
390   env->parent_queue = NULL;
391   return env;
392 }
393
394
395 /**
396  * Function to copy an envelope.  The envelope must not yet
397  * be in any queue or have any options or callbacks set.
398  *
399  * @param env envelope to copy
400  * @return copy of @a env
401  */
402 struct GNUNET_MQ_Envelope *
403 GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
404 {
405   GNUNET_assert (NULL == env->next);
406   GNUNET_assert (NULL == env->parent_queue);
407   GNUNET_assert (NULL == env->sent_cb);
408   GNUNET_assert (GNUNET_NO == env->have_custom_options);
409   return GNUNET_MQ_msg_copy (env->mh);
410 }
411
412
413 /**
414  * Send a copy of a message with the given message queue.
415  * Can be called repeatedly on the same envelope.
416  *
417  * @param mq message queue
418  * @param ev the envelope with the message to send.
419  */
420 void
421 GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
422                      const struct GNUNET_MQ_Envelope *ev)
423 {
424   struct GNUNET_MQ_Envelope *env;
425   uint16_t msize;
426
427   msize = ntohs (ev->mh->size);
428   env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) +
429                        msize);
430   env->mh = (struct GNUNET_MessageHeader *) &env[1];
431   env->sent_cb = ev->sent_cb;
432   env->sent_cls = ev->sent_cls;
433   GNUNET_memcpy (&env[1],
434           ev->mh,
435           msize);
436   GNUNET_MQ_send (mq,
437                   env);
438 }
439
440
441 /**
442  * Task run to call the send implementation for the next queued
443  * message, if any.  Only useful for implementing message queues,
444  * results in undefined behavior if not used carefully.
445  *
446  * @param cls message queue to send the next message with
447  */
448 static void
449 impl_send_continue (void *cls)
450 {
451   struct GNUNET_MQ_Handle *mq = cls;
452
453   mq->send_task = NULL;
454   /* call is only valid if we're actually currently sending
455    * a message */
456   if (NULL == mq->envelope_head)
457     return;
458   mq->current_envelope = mq->envelope_head;
459   GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
460                                mq->envelope_tail,
461                                mq->current_envelope);
462
463   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
464               "mq: sending message of type %u from queue\n",
465               ntohs(mq->current_envelope->mh->type));
466
467   mq->send_impl (mq,
468                  mq->current_envelope->mh,
469                  mq->impl_state);
470 }
471
472
473 /**
474  * Call the send implementation for the next queued message, if any.
475  * Only useful for implementing message queues, results in undefined
476  * behavior if not used carefully.
477  *
478  * @param mq message queue to send the next message with
479  */
480 void
481 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
482 {
483   struct GNUNET_MQ_Envelope *current_envelope;
484   GNUNET_SCHEDULER_TaskCallback cb;
485
486   GNUNET_assert (0 < mq->queue_length);
487   mq->queue_length--;
488   mq->in_flight = GNUNET_NO;
489   current_envelope = mq->current_envelope;
490   current_envelope->parent_queue = NULL;
491   mq->current_envelope = NULL;
492   GNUNET_assert (NULL == mq->send_task);
493   mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
494                                             mq);
495   if (NULL != (cb = current_envelope->sent_cb))
496   {
497     current_envelope->sent_cb = NULL;
498     cb (current_envelope->sent_cls);
499   }
500   GNUNET_free (current_envelope);
501 }
502
503
504 /**
505  * Call the send notification for the current message, but do not
506  * try to send the next message until #GNUNET_MQ_impl_send_continue
507  * is called.
508  *
509  * Only useful for implementing message queues, results in undefined
510  * behavior if not used carefully.
511  *
512  * @param mq message queue to send the next message with
513  */
514 void
515 GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
516 {
517   struct GNUNET_MQ_Envelope *current_envelope;
518   GNUNET_SCHEDULER_TaskCallback cb;
519
520   mq->in_flight = GNUNET_YES;
521   /* call is only valid if we're actually currently sending
522    * a message */
523   current_envelope = mq->current_envelope;
524   GNUNET_assert (NULL != current_envelope);
525   /* can't call cancel from now on anymore */
526   current_envelope->parent_queue = NULL;
527   if (NULL != (cb = current_envelope->sent_cb))
528   {
529     current_envelope->sent_cb = NULL;
530     cb (current_envelope->sent_cls);
531   }
532 }
533
534
535 /**
536  * Create a message queue for the specified handlers.
537  *
538  * @param send function the implements sending messages
539  * @param destroy function that implements destroying the queue
540  * @param cancel function that implements canceling a message
541  * @param impl_state for the queue, passed to 'send' and 'destroy'
542  * @param handlers array of message handlers
543  * @param error_handler handler for read and write errors
544  * @param error_handler_cls closure for @a error_handler
545  * @return a new message queue
546  */
547 struct GNUNET_MQ_Handle *
548 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
549                                GNUNET_MQ_DestroyImpl destroy,
550                                GNUNET_MQ_CancelImpl cancel,
551                                void *impl_state,
552                                const struct GNUNET_MQ_MessageHandler *handlers,
553                                GNUNET_MQ_ErrorHandler error_handler,
554                                void *error_handler_cls)
555 {
556   struct GNUNET_MQ_Handle *mq;
557
558   mq = GNUNET_new (struct GNUNET_MQ_Handle);
559   mq->send_impl = send;
560   mq->destroy_impl = destroy;
561   mq->cancel_impl = cancel;
562   mq->handlers = GNUNET_MQ_copy_handlers (handlers);
563   mq->error_handler = error_handler;
564   mq->error_handler_cls = error_handler_cls;
565   mq->impl_state = impl_state;
566
567   return mq;
568 }
569
570
571 /**
572  * Change the closure argument in all of the `handlers` of the
573  * @a mq.
574  *
575  * @param mq to modify
576  * @param handlers_cls new closure to use
577  */
578 void
579 GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
580                                 void *handlers_cls)
581 {
582   unsigned int i;
583
584   if (NULL == mq->handlers)
585     return;
586   for (i=0;NULL != mq->handlers[i].cb; i++)
587     mq->handlers[i].cls = handlers_cls;
588 }
589
590
591 /**
592  * Get the message that should currently be sent.
593  * Fails if there is no current message.
594  * Only useful for implementing message queues,
595  * results in undefined behavior if not used carefully.
596  *
597  * @param mq message queue with the current message
598  * @return message to send, never NULL
599  */
600 const struct GNUNET_MessageHeader *
601 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
602 {
603   GNUNET_assert (NULL != mq->current_envelope);
604   GNUNET_assert (NULL != mq->current_envelope->mh);
605   return mq->current_envelope->mh;
606 }
607
608
609 /**
610  * Get the implementation state associated with the
611  * message queue.
612  *
613  * While the GNUNET_MQ_Impl* callbacks receive the
614  * implementation state, continuations that are scheduled
615  * by the implementation function often only have one closure
616  * argument, with this function it is possible to get at the
617  * implementation state when only passing the GNUNET_MQ_Handle
618  * as closure.
619  *
620  * @param mq message queue with the current message
621  * @return message to send, never NULL
622  */
623 void *
624 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
625 {
626   return mq->impl_state;
627 }
628
629
630 struct GNUNET_MQ_Envelope *
631 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
632                 uint16_t size,
633                 uint16_t type)
634 {
635   struct GNUNET_MQ_Envelope *ev;
636
637   ev = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope));
638   ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
639   ev->mh->size = htons (size);
640   ev->mh->type = htons (type);
641   if (NULL != mhp)
642     *mhp = ev->mh;
643   return ev;
644 }
645
646
647 /**
648  * Create a new envelope by copying an existing message.
649  *
650  * @param hdr header of the message to copy
651  * @return envelope containing @a hdr
652  */
653 struct GNUNET_MQ_Envelope *
654 GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
655 {
656   struct GNUNET_MQ_Envelope *mqm;
657   uint16_t size = ntohs (hdr->size);
658
659   mqm = GNUNET_malloc (sizeof (*mqm) + size);
660   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
661   GNUNET_memcpy (mqm->mh,
662           hdr,
663           size);
664   return mqm;
665 }
666
667
668 /**
669  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
670  *
671  * @param mhp pointer to the message header pointer that will be changed to allocate at
672  *        the newly allocated space for the message.
673  * @param base_size size of the data before the nested message
674  * @param type type of the message in the envelope
675  * @param nested_mh the message to append to the message after base_size
676  */
677 struct GNUNET_MQ_Envelope *
678 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
679                           uint16_t base_size,
680                           uint16_t type,
681                           const struct GNUNET_MessageHeader *nested_mh)
682 {
683   struct GNUNET_MQ_Envelope *mqm;
684   uint16_t size;
685
686   if (NULL == nested_mh)
687     return GNUNET_MQ_msg_ (mhp, base_size, type);
688
689   size = base_size + ntohs (nested_mh->size);
690
691   /* check for uint16_t overflow */
692   if (size < base_size)
693     return NULL;
694
695   mqm = GNUNET_MQ_msg_ (mhp, size, type);
696   GNUNET_memcpy ((char *) mqm->mh + base_size,
697                  nested_mh,
698                  ntohs (nested_mh->size));
699
700   return mqm;
701 }
702
703
704 /**
705  * Associate the assoc_data in mq with a unique request id.
706  *
707  * @param mq message queue, id will be unique for the queue
708  * @param assoc_data to associate
709  */
710 uint32_t
711 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
712                      void *assoc_data)
713 {
714   uint32_t id;
715
716   if (NULL == mq->assoc_map)
717   {
718     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
719     mq->assoc_id = 1;
720   }
721   id = mq->assoc_id++;
722   GNUNET_assert (GNUNET_OK ==
723                  GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map,
724                                                       id,
725                                                       assoc_data,
726                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
727   return id;
728 }
729
730
731 /**
732  * Get the data associated with a @a request_id in a queue
733  *
734  * @param mq the message queue with the association
735  * @param request_id the request id we are interested in
736  * @return the associated data
737  */
738 void *
739 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
740                      uint32_t request_id)
741 {
742   if (NULL == mq->assoc_map)
743     return NULL;
744   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
745                                               request_id);
746 }
747
748
749 /**
750  * Remove the association for a @a request_id
751  *
752  * @param mq the message queue with the association
753  * @param request_id the request id we want to remove
754  * @return the associated data
755  */
756 void *
757 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
758                         uint32_t request_id)
759 {
760   void *val;
761
762   if (NULL == mq->assoc_map)
763     return NULL;
764   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
765                                              request_id);
766   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
767                                               request_id);
768   return val;
769 }
770
771
772 /**
773  * Call a callback once the envelope has been sent, that is,
774  * sending it can not be canceled anymore.
775  * There can be only one notify sent callback per envelope.
776  *
777  * @param ev message to call the notify callback for
778  * @param cb the notify callback
779  * @param cb_cls closure for the callback
780  */
781 void
782 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
783                        GNUNET_SCHEDULER_TaskCallback cb,
784                        void *cb_cls)
785 {
786   GNUNET_assert (NULL == ev->sent_cb);
787   ev->sent_cb = cb;
788   ev->sent_cls = cb_cls;
789 }
790
791
792 /**
793  * Handle we return for callbacks registered to be
794  * notified when #GNUNET_MQ_destroy() is called on a queue.
795  */
796 struct GNUNET_MQ_DestroyNotificationHandle
797 {
798   /**
799    * Kept in a DLL.
800    */
801   struct GNUNET_MQ_DestroyNotificationHandle *prev;
802
803   /**
804    * Kept in a DLL.
805    */
806   struct GNUNET_MQ_DestroyNotificationHandle *next;
807
808   /**
809    * Queue to notify about.
810    */
811   struct GNUNET_MQ_Handle *mq;
812
813   /**
814    * Function to call.
815    */
816   GNUNET_SCHEDULER_TaskCallback cb;
817
818   /**
819    * Closure for @e cb.
820    */
821   void *cb_cls;
822 };
823
824
825 /**
826  * Destroy the message queue.
827  *
828  * @param mq message queue to destroy
829  */
830 void
831 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
832 {
833   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
834
835   if (NULL != mq->destroy_impl)
836   {
837     mq->destroy_impl (mq, mq->impl_state);
838   }
839   if (NULL != mq->send_task)
840   {
841     GNUNET_SCHEDULER_cancel (mq->send_task);
842     mq->send_task = NULL;
843   }
844   while (NULL != mq->envelope_head)
845   {
846     struct GNUNET_MQ_Envelope *ev;
847
848     ev = mq->envelope_head;
849     ev->parent_queue = NULL;
850     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
851                                  mq->envelope_tail,
852                                  ev);
853     GNUNET_assert (0 < mq->queue_length);
854     mq->queue_length--;
855     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
856                 "MQ destroy drops message of type %u\n",
857                 ntohs (ev->mh->type));
858     GNUNET_MQ_discard (ev);
859   }
860   if (NULL != mq->current_envelope)
861   {
862     /* we can only discard envelopes that
863      * are not queued! */
864     mq->current_envelope->parent_queue = NULL;
865     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
866                 "MQ destroy drops message of type %u\n",
867                 ntohs (mq->current_envelope->mh->type));
868     GNUNET_MQ_discard (mq->current_envelope);
869     mq->current_envelope = NULL;
870     GNUNET_assert (0 < mq->queue_length);
871     mq->queue_length--;
872   }
873   GNUNET_assert (0 == mq->queue_length);
874   while (NULL != (dnh = mq->dnh_head))
875   {
876     dnh->cb (dnh->cb_cls);
877     GNUNET_MQ_destroy_notify_cancel (dnh);
878   }
879   if (NULL != mq->assoc_map)
880   {
881     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
882     mq->assoc_map = NULL;
883   }
884   GNUNET_free_non_null (mq->handlers);
885   GNUNET_free (mq);
886 }
887
888
889 const struct GNUNET_MessageHeader *
890 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
891                               uint16_t base_size)
892 {
893   uint16_t whole_size;
894   uint16_t nested_size;
895   const struct GNUNET_MessageHeader *nested_msg;
896
897   whole_size = ntohs (mh->size);
898   GNUNET_assert (whole_size >= base_size);
899   nested_size = whole_size - base_size;
900   if (0 == nested_size)
901     return NULL;
902   if (nested_size < sizeof (struct GNUNET_MessageHeader))
903   {
904     GNUNET_break_op (0);
905     return NULL;
906   }
907   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
908   if (ntohs (nested_msg->size) != nested_size)
909   {
910     GNUNET_break_op (0);
911     return NULL;
912   }
913   return nested_msg;
914 }
915
916
917 /**
918  * Cancel sending the message. Message must have been sent with
919  * #GNUNET_MQ_send before.  May not be called after the notify sent
920  * callback has been called
921  *
922  * @param ev queued envelope to cancel
923  */
924 void
925 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
926 {
927   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
928
929   GNUNET_assert (NULL != mq);
930   GNUNET_assert (NULL != mq->cancel_impl);
931
932   mq->evacuate_called = GNUNET_NO;
933
934   if (mq->current_envelope == ev)
935   {
936     /* complex case, we already started with transmitting
937        the message using the callbacks. */
938     GNUNET_assert (0 < mq->queue_length);
939     mq->queue_length--;
940     mq->cancel_impl (mq,
941                      mq->impl_state);
942     /* continue sending the next message, if any */
943     mq->current_envelope = mq->envelope_head;
944     if (NULL != mq->current_envelope)
945     {
946       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
947                                    mq->envelope_tail,
948                                    mq->current_envelope);
949
950       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
951                   "mq: sending canceled message of type %u queue\n",
952                   ntohs(ev->mh->type));
953
954       mq->send_impl (mq,
955                      mq->current_envelope->mh,
956                      mq->impl_state);
957     }
958   }
959   else
960   {
961     /* simple case, message is still waiting in the queue */
962     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
963                                  mq->envelope_tail,
964                                  ev);
965     GNUNET_assert (0 < mq->queue_length);
966     mq->queue_length--;
967   }
968
969   if (GNUNET_YES != mq->evacuate_called)
970   {
971     ev->parent_queue = NULL;
972     ev->mh = NULL;
973     /* also frees ev */
974     GNUNET_free (ev);
975   }
976 }
977
978
979 /**
980  * Function to obtain the current envelope
981  * from within #GNUNET_MQ_SendImpl implementations.
982  *
983  * @param mq message queue to interrogate
984  * @return the current envelope
985  */
986 struct GNUNET_MQ_Envelope *
987 GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
988 {
989   return mq->current_envelope;
990 }
991
992
993 /**
994  * Function to obtain the last envelope in the queue.
995  *
996  * @param mq message queue to interrogate
997  * @return the last envelope in the queue
998  */
999 struct GNUNET_MQ_Envelope *
1000 GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
1001 {
1002   if (NULL != mq->envelope_tail)
1003     return mq->envelope_tail;
1004
1005   return mq->current_envelope;
1006 }
1007
1008
1009 /**
1010  * Set application-specific options for this envelope.
1011  * Overrides the options set for the queue with
1012  * #GNUNET_MQ_set_options() for this message only.
1013  *
1014  * @param env message to set options for
1015  * @param flags flags to use (meaning is queue-specific)
1016  * @param extra additional buffer for further data (also queue-specific)
1017  */
1018 void
1019 GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1020                            uint64_t flags,
1021                            const void *extra)
1022 {
1023   env->flags = flags;
1024   env->extra = extra;
1025   env->have_custom_options = GNUNET_YES;
1026 }
1027
1028
1029 /**
1030  * Get application-specific options for this envelope.
1031  *
1032  * @param env message to set options for
1033  * @param[out] flags set to flags to use (meaning is queue-specific)
1034  * @return extra additional buffer for further data (also queue-specific)
1035  */
1036 const void *
1037 GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env,
1038                            uint64_t *flags)
1039 {
1040   struct GNUNET_MQ_Handle *mq = env->parent_queue;
1041
1042   if (GNUNET_YES == env->have_custom_options)
1043   {
1044     *flags = env->flags;
1045     return env->extra;
1046   }
1047   if (NULL == mq)
1048   {
1049     *flags = 0;
1050     return NULL;
1051   }
1052   *flags = mq->default_flags;
1053   return mq->default_extra;
1054 }
1055
1056
1057 /**
1058  * Set application-specific options for this queue.
1059  *
1060  * @param mq message queue to set options for
1061  * @param flags flags to use (meaning is queue-specific)
1062  * @param extra additional buffer for further data (also queue-specific)
1063  */
1064 void
1065 GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1066                        uint64_t flags,
1067                        const void *extra)
1068 {
1069   mq->default_flags = flags;
1070   mq->default_extra = extra;
1071 }
1072
1073
1074 /**
1075  * Register function to be called whenever @a mq is being
1076  * destroyed.
1077  *
1078  * @param mq message queue to watch
1079  * @param cb function to call on @a mq destruction
1080  * @param cb_cls closure for @a cb
1081  * @return handle for #GNUNET_MQ_destroy_notify_cancel().
1082  */
1083 struct GNUNET_MQ_DestroyNotificationHandle *
1084 GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1085                           GNUNET_SCHEDULER_TaskCallback cb,
1086                           void *cb_cls)
1087 {
1088   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1089
1090   dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
1091   dnh->mq = mq;
1092   dnh->cb = cb;
1093   dnh->cb_cls = cb_cls;
1094   GNUNET_CONTAINER_DLL_insert (mq->dnh_head,
1095                                mq->dnh_tail,
1096                                dnh);
1097   return dnh;
1098 }
1099
1100
1101 /**
1102  * Cancel registration from #GNUNET_MQ_destroy_notify().
1103  *
1104  * @param dnh handle for registration to cancel
1105  */
1106 void
1107 GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh)
1108 {
1109   struct GNUNET_MQ_Handle *mq = dnh->mq;
1110
1111   GNUNET_CONTAINER_DLL_remove (mq->dnh_head,
1112                                mq->dnh_tail,
1113                                dnh);
1114   GNUNET_free (dnh);
1115 }
1116
1117
1118 /**
1119  * Insert @a env into the envelope DLL starting at @a env_head
1120  * Note that @a env must not be in any MQ while this function
1121  * is used with DLLs defined outside of the MQ module.  This
1122  * is just in case some application needs to also manage a
1123  * FIFO of envelopes independent of MQ itself and wants to
1124  * re-use the pointers internal to @a env.  Use with caution.
1125  *
1126  * @param[in|out] env_head of envelope DLL
1127  * @param[in|out] env_tail tail of envelope DLL
1128  * @param[in|out] env element to insert at the tail
1129  */
1130 void
1131 GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
1132                            struct GNUNET_MQ_Envelope **env_tail,
1133                            struct GNUNET_MQ_Envelope *env)
1134 {
1135   GNUNET_CONTAINER_DLL_insert_tail (*env_head,
1136                                     *env_tail,
1137                                     env);
1138 }
1139
1140
1141 /**
1142  * Remove @a env from the envelope DLL starting at @a env_head.
1143  * Note that @a env must not be in any MQ while this function
1144  * is used with DLLs defined outside of the MQ module. This
1145  * is just in case some application needs to also manage a
1146  * FIFO of envelopes independent of MQ itself and wants to
1147  * re-use the pointers internal to @a env.  Use with caution.
1148  *
1149  * @param[in|out] env_head of envelope DLL
1150  * @param[in|out] env_tail tail of envelope DLL
1151  * @param[in|out] env element to remove from the DLL
1152  */
1153 void
1154 GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
1155                       struct GNUNET_MQ_Envelope **env_tail,
1156                       struct GNUNET_MQ_Envelope *env)
1157 {
1158   GNUNET_CONTAINER_DLL_remove (*env_head,
1159                                *env_tail,
1160                                env);
1161 }
1162
1163
1164 /**
1165  * Copy an array of handlers.
1166  *
1167  * Useful if the array has been delared in local memory and needs to be
1168  * persisted for future use.
1169  *
1170  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1171  * @return A newly allocated array of handlers.
1172  *         Needs to be freed with #GNUNET_free.
1173  */
1174 struct GNUNET_MQ_MessageHandler *
1175 GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1176 {
1177   struct GNUNET_MQ_MessageHandler *copy;
1178   unsigned int count;
1179
1180   if (NULL == handlers)
1181     return NULL;
1182
1183   count = GNUNET_MQ_count_handlers (handlers);
1184   copy = GNUNET_new_array (count + 1,
1185                            struct GNUNET_MQ_MessageHandler);
1186   GNUNET_memcpy (copy,
1187                  handlers,
1188                  count * sizeof (struct GNUNET_MQ_MessageHandler));
1189   return copy;
1190 }
1191
1192
1193 /**
1194  * Count the handlers in a handler array.
1195  *
1196  * @param handlers Array of handlers to be counted.
1197  * @return The number of handlers in the array.
1198  */
1199 unsigned int
1200 GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1201 {
1202   unsigned int i;
1203
1204   if (NULL == handlers)
1205     return 0;
1206
1207   for (i=0; NULL != handlers[i].cb; i++) ;
1208
1209   return i;
1210 }
1211
1212
1213
1214 /* end of mq.c */