error handling
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2019 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header.
48    * The GNUNET_MQ_Envelope header is allocated at
49    * the end of the message.
50    */
51   struct GNUNET_MessageHeader *mh;
52
53   /**
54    * Queue the message is queued in, NULL if message is not queued.
55    */
56   struct GNUNET_MQ_Handle *parent_queue;
57
58   /**
59    * Called after the message was sent irrevocably.
60    */
61   GNUNET_SCHEDULER_TaskCallback sent_cb;
62
63   /**
64    * Closure for @e send_cb
65    */
66   void *sent_cls;
67
68   /**
69    * Flags that were set for this envelope by
70    * #GNUNET_MQ_env_set_options().   Only valid if
71    * @e have_custom_options is set.
72    */
73   enum GNUNET_MQ_PriorityPreferences priority;
74
75   /**
76    * Did the application call #GNUNET_MQ_env_set_options()?
77    */
78   int have_custom_options;
79 };
80
81
82 /**
83  * Handle to a message queue.
84  */
85 struct GNUNET_MQ_Handle
86 {
87   /**
88    * Handlers array, or NULL if the queue should not receive messages
89    */
90   struct GNUNET_MQ_MessageHandler *handlers;
91
92   /**
93    * Actual implementation of message sending,
94    * called when a message is added
95    */
96   GNUNET_MQ_SendImpl send_impl;
97
98   /**
99    * Implementation-dependent queue destruction function
100    */
101   GNUNET_MQ_DestroyImpl destroy_impl;
102
103   /**
104    * Implementation-dependent send cancel function
105    */
106   GNUNET_MQ_CancelImpl cancel_impl;
107
108   /**
109    * Implementation-specific state
110    */
111   void *impl_state;
112
113   /**
114    * Callback will be called when an error occurs.
115    */
116   GNUNET_MQ_ErrorHandler error_handler;
117
118   /**
119    * Closure for the error handler.
120    */
121   void *error_handler_cls;
122
123   /**
124    * Task to asynchronously run #impl_send_continue().
125    */
126   struct GNUNET_SCHEDULER_Task *send_task;
127
128   /**
129    * Linked list of messages pending to be sent
130    */
131   struct GNUNET_MQ_Envelope *envelope_head;
132
133   /**
134    * Linked list of messages pending to be sent
135    */
136   struct GNUNET_MQ_Envelope *envelope_tail;
137
138   /**
139    * Message that is currently scheduled to be
140    * sent. Not the head of the message queue, as the implementation
141    * needs to know if sending has been already scheduled or not.
142    */
143   struct GNUNET_MQ_Envelope *current_envelope;
144
145   /**
146    * Map of associations, lazily allocated
147    */
148   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
149
150   /**
151    * Functions to call on queue destruction; kept in a DLL.
152    */
153   struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
154
155   /**
156    * Functions to call on queue destruction; kept in a DLL.
157    */
158   struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
159
160   /**
161    * Flags that were set for this queue by
162    * #GNUNET_MQ_set_options().   Default is 0.
163    */
164   enum GNUNET_MQ_PriorityPreferences priority;
165
166   /**
167    * Next id that should be used for the @e assoc_map,
168    * initialized lazily to a random value together with
169    * @e assoc_map
170    */
171   uint32_t assoc_id;
172
173   /**
174    * Number of entries we have in the envelope-DLL.
175    */
176   unsigned int queue_length;
177
178   /**
179    * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
180    * FIXME: is this dead?
181    */
182   int evacuate_called;
183
184   /**
185    * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
186    */
187   int in_flight;
188 };
189
190
191 /**
192  * Call the message message handler that was registered
193  * for the type of the given message in the given message queue.
194  *
195  * This function is indended to be used for the implementation
196  * of message queues.
197  *
198  * @param mq message queue with the handlers
199  * @param mh message to dispatch
200  */
201 void
202 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
203                           const struct GNUNET_MessageHeader *mh)
204 {
205   int ret;
206
207   ret = GNUNET_MQ_handle_message (mq->handlers, mh);
208   if (GNUNET_SYSERR == ret)
209   {
210     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED);
211     return;
212   }
213 }
214
215
216 /**
217  * Call the message message handler that was registered
218  * for the type of the given message in the given @a handlers list.
219  *
220  * This function is indended to be used for the implementation
221  * of message queues.
222  *
223  * @param handlers a set of handlers
224  * @param mh message to dispatch
225  * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
226  *         #GNUNET_SYSERR if message was rejected by check function
227  */
228 int
229 GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
230                           const struct GNUNET_MessageHeader *mh)
231 {
232   const struct GNUNET_MQ_MessageHandler *handler;
233   int handled = GNUNET_NO;
234   uint16_t msize = ntohs (mh->size);
235   uint16_t mtype = ntohs (mh->type);
236
237   LOG (GNUNET_ERROR_TYPE_DEBUG,
238        "Received message of type %u and size %u\n",
239        mtype,
240        msize);
241
242   if (NULL == handlers)
243     goto done;
244   for (handler = handlers; NULL != handler->cb; handler++)
245   {
246     if (handler->type == mtype)
247     {
248       handled = GNUNET_YES;
249       if ((handler->expected_size > msize) ||
250           ((handler->expected_size != msize) && (NULL == handler->mv)))
251       {
252         /* Too small, or not an exact size and
253            no 'mv' handler to check rest */
254         LOG (GNUNET_ERROR_TYPE_ERROR,
255              "Received malformed message of type %u\n",
256              (unsigned int) handler->type);
257         return GNUNET_SYSERR;
258       }
259       if ((NULL == handler->mv) ||
260           (GNUNET_OK == handler->mv (handler->cls, mh)))
261       {
262         /* message well-formed, pass to handler */
263         handler->cb (handler->cls, mh);
264       }
265       else
266       {
267         /* Message rejected by check routine */
268         LOG (GNUNET_ERROR_TYPE_ERROR,
269              "Received malformed message of type %u\n",
270              (unsigned int) handler->type);
271         return GNUNET_SYSERR;
272       }
273       break;
274     }
275   }
276 done:
277   if (GNUNET_NO == handled)
278   {
279     LOG (GNUNET_ERROR_TYPE_INFO,
280          "No handler for message of type %u and size %u\n",
281          mtype,
282          msize);
283     return GNUNET_NO;
284   }
285   return GNUNET_OK;
286 }
287
288
289 /**
290  * Call the error handler of a message queue with the given
291  * error code.  If there is no error handler, log a warning.
292  *
293  * This function is intended to be used by the implementation
294  * of message queues.
295  *
296  * @param mq message queue
297  * @param error the error type
298  */
299 void
300 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error)
301 {
302   if (NULL == mq->error_handler)
303   {
304     LOG (GNUNET_ERROR_TYPE_WARNING,
305          "Got error %d, but no handler installed\n",
306          (int) error);
307     return;
308   }
309   mq->error_handler (mq->error_handler_cls, error);
310 }
311
312
313 /**
314  * Discard the message queue message, free all
315  * allocated resources. Must be called in the event
316  * that a message is created but should not actually be sent.
317  *
318  * @param mqm the message to discard
319  */
320 void
321 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
322 {
323   GNUNET_assert (NULL == ev->parent_queue);
324   GNUNET_free (ev);
325 }
326
327
328 /**
329  * Obtain the current length of the message queue.
330  *
331  * @param mq queue to inspect
332  * @return number of queued, non-transmitted messages
333  */
334 unsigned int
335 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
336 {
337   if (GNUNET_YES != mq->in_flight)
338   {
339     return mq->queue_length;
340   }
341   return mq->queue_length - 1;
342 }
343
344
345 /**
346  * Send a message with the given message queue.
347  * May only be called once per message.
348  *
349  * @param mq message queue
350  * @param ev the envelope with the message to send.
351  */
352 void
353 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
354                 struct GNUNET_MQ_Envelope *ev)
355 {
356   GNUNET_assert (NULL != mq);
357   GNUNET_assert (NULL == ev->parent_queue);
358
359   mq->queue_length++;
360   if (mq->queue_length >= 10000)
361   {
362     /* This would seem like a bug... */
363     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
364                 "MQ with %u entries extended by message of type %u (FC broken?)\n",
365                 (unsigned int) mq->queue_length,
366                 (unsigned int) ntohs (ev->mh->type));
367   }
368   ev->parent_queue = mq;
369   /* is the implementation busy? queue it! */
370   if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
371   {
372     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
373                                       mq->envelope_tail,
374                                       ev);
375     return;
376   }
377   GNUNET_assert (NULL == mq->envelope_head);
378   mq->current_envelope = ev;
379
380   LOG (GNUNET_ERROR_TYPE_DEBUG,
381        "sending message of type %u, queue empty (MQ: %p)\n",
382        ntohs (ev->mh->type),
383        mq);
384
385   mq->send_impl (mq,
386                  ev->mh,
387                  mq->impl_state);
388 }
389
390
391 /**
392  * Remove the first envelope that has not yet been sent from the message
393  * queue and return it.
394  *
395  * @param mq queue to remove envelope from
396  * @return NULL if queue is empty (or has no envelope that is not under transmission)
397  */
398 struct GNUNET_MQ_Envelope *
399 GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
400 {
401   struct GNUNET_MQ_Envelope *env;
402
403   env = mq->envelope_head;
404   GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, env);
405   mq->queue_length--;
406   env->parent_queue = NULL;
407   return env;
408 }
409
410
411 /**
412  * Function to copy an envelope.  The envelope must not yet
413  * be in any queue or have any options or callbacks set.
414  *
415  * @param env envelope to copy
416  * @return copy of @a env
417  */
418 struct GNUNET_MQ_Envelope *
419 GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
420 {
421   GNUNET_assert (NULL == env->next);
422   GNUNET_assert (NULL == env->parent_queue);
423   GNUNET_assert (NULL == env->sent_cb);
424   GNUNET_assert (GNUNET_NO == env->have_custom_options);
425   return GNUNET_MQ_msg_copy (env->mh);
426 }
427
428
429 /**
430  * Send a copy of a message with the given message queue.
431  * Can be called repeatedly on the same envelope.
432  *
433  * @param mq message queue
434  * @param ev the envelope with the message to send.
435  */
436 void
437 GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
438                      const struct GNUNET_MQ_Envelope *ev)
439 {
440   struct GNUNET_MQ_Envelope *env;
441   uint16_t msize;
442
443   msize = ntohs (ev->mh->size);
444   env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize);
445   env->mh = (struct GNUNET_MessageHeader *) &env[1];
446   env->sent_cb = ev->sent_cb;
447   env->sent_cls = ev->sent_cls;
448   GNUNET_memcpy (&env[1], ev->mh, msize);
449   GNUNET_MQ_send (mq, env);
450 }
451
452
453 /**
454  * Task run to call the send implementation for the next queued
455  * message, if any.  Only useful for implementing message queues,
456  * results in undefined behavior if not used carefully.
457  *
458  * @param cls message queue to send the next message with
459  */
460 static void
461 impl_send_continue (void *cls)
462 {
463   struct GNUNET_MQ_Handle *mq = cls;
464
465   mq->send_task = NULL;
466   /* call is only valid if we're actually currently sending
467    * a message */
468   if (NULL == mq->envelope_head)
469     return;
470   mq->current_envelope = mq->envelope_head;
471   GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
472                                mq->envelope_tail,
473                                mq->current_envelope);
474
475   LOG (GNUNET_ERROR_TYPE_DEBUG,
476        "sending message of type %u from queue\n",
477        ntohs (mq->current_envelope->mh->type));
478
479   mq->send_impl (mq,
480                  mq->current_envelope->mh,
481                  mq->impl_state);
482 }
483
484
485 /**
486  * Call the send implementation for the next queued message, if any.
487  * Only useful for implementing message queues, results in undefined
488  * behavior if not used carefully.
489  *
490  * @param mq message queue to send the next message with
491  */
492 void
493 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
494 {
495   struct GNUNET_MQ_Envelope *current_envelope;
496   GNUNET_SCHEDULER_TaskCallback cb;
497
498   GNUNET_assert (0 < mq->queue_length);
499   mq->queue_length--;
500   mq->in_flight = GNUNET_NO;
501   current_envelope = mq->current_envelope;
502   current_envelope->parent_queue = NULL;
503   mq->current_envelope = NULL;
504   GNUNET_assert (NULL == mq->send_task);
505   mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq);
506   if (NULL != (cb = current_envelope->sent_cb))
507   {
508     current_envelope->sent_cb = NULL;
509     cb (current_envelope->sent_cls);
510   }
511   GNUNET_free (current_envelope);
512 }
513
514
515 /**
516  * Call the send notification for the current message, but do not
517  * try to send the next message until #GNUNET_MQ_impl_send_continue
518  * is called.
519  *
520  * Only useful for implementing message queues, results in undefined
521  * behavior if not used carefully.
522  *
523  * @param mq message queue to send the next message with
524  */
525 void
526 GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
527 {
528   struct GNUNET_MQ_Envelope *current_envelope;
529   GNUNET_SCHEDULER_TaskCallback cb;
530
531   mq->in_flight = GNUNET_YES;
532   /* call is only valid if we're actually currently sending
533    * a message */
534   current_envelope = mq->current_envelope;
535   GNUNET_assert (NULL != current_envelope);
536   /* can't call cancel from now on anymore */
537   current_envelope->parent_queue = NULL;
538   if (NULL != (cb = current_envelope->sent_cb))
539   {
540     current_envelope->sent_cb = NULL;
541     cb (current_envelope->sent_cls);
542   }
543 }
544
545
546 /**
547  * Create a message queue for the specified handlers.
548  *
549  * @param send function the implements sending messages
550  * @param destroy function that implements destroying the queue
551  * @param cancel function that implements canceling a message
552  * @param impl_state for the queue, passed to 'send' and 'destroy'
553  * @param handlers array of message handlers
554  * @param error_handler handler for read and write errors
555  * @param error_handler_cls closure for @a error_handler
556  * @return a new message queue
557  */
558 struct GNUNET_MQ_Handle *
559 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
560                                GNUNET_MQ_DestroyImpl destroy,
561                                GNUNET_MQ_CancelImpl cancel,
562                                void *impl_state,
563                                const struct GNUNET_MQ_MessageHandler *handlers,
564                                GNUNET_MQ_ErrorHandler error_handler,
565                                void *error_handler_cls)
566 {
567   struct GNUNET_MQ_Handle *mq;
568
569   mq = GNUNET_new (struct GNUNET_MQ_Handle);
570   mq->send_impl = send;
571   mq->destroy_impl = destroy;
572   mq->cancel_impl = cancel;
573   mq->handlers = GNUNET_MQ_copy_handlers (handlers);
574   mq->error_handler = error_handler;
575   mq->error_handler_cls = error_handler_cls;
576   mq->impl_state = impl_state;
577
578   return mq;
579 }
580
581
582 /**
583  * Change the closure argument in all of the `handlers` of the
584  * @a mq.
585  *
586  * @param mq to modify
587  * @param handlers_cls new closure to use
588  */
589 void
590 GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls)
591 {
592   if (NULL == mq->handlers)
593     return;
594   for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
595     mq->handlers[i].cls = handlers_cls;
596 }
597
598
599 /**
600  * Get the message that should currently be sent.
601  * Fails if there is no current message.
602  * Only useful for implementing message queues,
603  * results in undefined behavior if not used carefully.
604  *
605  * @param mq message queue with the current message
606  * @return message to send, never NULL
607  */
608 const struct GNUNET_MessageHeader *
609 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
610 {
611   GNUNET_assert (NULL != mq->current_envelope);
612   GNUNET_assert (NULL != mq->current_envelope->mh);
613   return mq->current_envelope->mh;
614 }
615
616
617 /**
618  * Get the implementation state associated with the
619  * message queue.
620  *
621  * While the GNUNET_MQ_Impl* callbacks receive the
622  * implementation state, continuations that are scheduled
623  * by the implementation function often only have one closure
624  * argument, with this function it is possible to get at the
625  * implementation state when only passing the GNUNET_MQ_Handle
626  * as closure.
627  *
628  * @param mq message queue with the current message
629  * @return message to send, never NULL
630  */
631 void *
632 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
633 {
634   return mq->impl_state;
635 }
636
637
638 struct GNUNET_MQ_Envelope *
639 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
640 {
641   struct GNUNET_MQ_Envelope *ev;
642
643   ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope));
644   ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
645   ev->mh->size = htons (size);
646   ev->mh->type = htons (type);
647   if (NULL != mhp)
648     *mhp = ev->mh;
649   return ev;
650 }
651
652
653 /**
654  * Create a new envelope by copying an existing message.
655  *
656  * @param hdr header of the message to copy
657  * @return envelope containing @a hdr
658  */
659 struct GNUNET_MQ_Envelope *
660 GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
661 {
662   struct GNUNET_MQ_Envelope *mqm;
663   uint16_t size = ntohs (hdr->size);
664
665   mqm = GNUNET_malloc (sizeof(*mqm) + size);
666   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
667   GNUNET_memcpy (mqm->mh, hdr, size);
668   return mqm;
669 }
670
671
672 /**
673  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
674  *
675  * @param mhp pointer to the message header pointer that will be changed to allocate at
676  *        the newly allocated space for the message.
677  * @param base_size size of the data before the nested message
678  * @param type type of the message in the envelope
679  * @param nested_mh the message to append to the message after base_size
680  */
681 struct GNUNET_MQ_Envelope *
682 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
683                           uint16_t base_size,
684                           uint16_t type,
685                           const struct GNUNET_MessageHeader *nested_mh)
686 {
687   struct GNUNET_MQ_Envelope *mqm;
688   uint16_t size;
689
690   if (NULL == nested_mh)
691     return GNUNET_MQ_msg_ (mhp, base_size, type);
692
693   size = base_size + ntohs (nested_mh->size);
694
695   /* check for uint16_t overflow */
696   if (size < base_size)
697     return NULL;
698
699   mqm = GNUNET_MQ_msg_ (mhp, size, type);
700   GNUNET_memcpy ((char *) mqm->mh + base_size,
701                  nested_mh,
702                  ntohs (nested_mh->size));
703
704   return mqm;
705 }
706
707
708 /**
709  * Associate the assoc_data in mq with a unique request id.
710  *
711  * @param mq message queue, id will be unique for the queue
712  * @param assoc_data to associate
713  */
714 uint32_t
715 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data)
716 {
717   uint32_t id;
718
719   if (NULL == mq->assoc_map)
720   {
721     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
722     mq->assoc_id = 1;
723   }
724   id = mq->assoc_id++;
725   GNUNET_assert (GNUNET_OK ==
726                  GNUNET_CONTAINER_multihashmap32_put (
727                    mq->assoc_map,
728                    id,
729                    assoc_data,
730                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
731   return id;
732 }
733
734
735 /**
736  * Get the data associated with a @a request_id in a queue
737  *
738  * @param mq the message queue with the association
739  * @param request_id the request id we are interested in
740  * @return the associated data
741  */
742 void *
743 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
744 {
745   if (NULL == mq->assoc_map)
746     return NULL;
747   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
748 }
749
750
751 /**
752  * Remove the association for a @a request_id
753  *
754  * @param mq the message queue with the association
755  * @param request_id the request id we want to remove
756  * @return the associated data
757  */
758 void *
759 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
760 {
761   void *val;
762
763   if (NULL == mq->assoc_map)
764     return NULL;
765   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
766   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
767   return val;
768 }
769
770
771 /**
772  * Call a callback once the envelope has been sent, that is,
773  * sending it can not be canceled anymore.
774  * There can be only one notify sent callback per envelope.
775  *
776  * @param ev message to call the notify callback for
777  * @param cb the notify callback
778  * @param cb_cls closure for the callback
779  */
780 void
781 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
782                        GNUNET_SCHEDULER_TaskCallback cb,
783                        void *cb_cls)
784 {
785   /* allow setting *OR* clearing callback */
786   GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
787   ev->sent_cb = cb;
788   ev->sent_cls = cb_cls;
789 }
790
791
792 /**
793  * Handle we return for callbacks registered to be
794  * notified when #GNUNET_MQ_destroy() is called on a queue.
795  */
796 struct GNUNET_MQ_DestroyNotificationHandle
797 {
798   /**
799    * Kept in a DLL.
800    */
801   struct GNUNET_MQ_DestroyNotificationHandle *prev;
802
803   /**
804    * Kept in a DLL.
805    */
806   struct GNUNET_MQ_DestroyNotificationHandle *next;
807
808   /**
809    * Queue to notify about.
810    */
811   struct GNUNET_MQ_Handle *mq;
812
813   /**
814    * Function to call.
815    */
816   GNUNET_SCHEDULER_TaskCallback cb;
817
818   /**
819    * Closure for @e cb.
820    */
821   void *cb_cls;
822 };
823
824
825 /**
826  * Destroy the message queue.
827  *
828  * @param mq message queue to destroy
829  */
830 void
831 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
832 {
833   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
834
835   if (NULL != mq->destroy_impl)
836   {
837     mq->destroy_impl (mq, mq->impl_state);
838   }
839   if (NULL != mq->send_task)
840   {
841     GNUNET_SCHEDULER_cancel (mq->send_task);
842     mq->send_task = NULL;
843   }
844   while (NULL != mq->envelope_head)
845   {
846     struct GNUNET_MQ_Envelope *ev;
847
848     ev = mq->envelope_head;
849     ev->parent_queue = NULL;
850     GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
851     GNUNET_assert (0 < mq->queue_length);
852     mq->queue_length--;
853     LOG (GNUNET_ERROR_TYPE_DEBUG,
854          "MQ destroy drops message of type %u\n",
855          ntohs (ev->mh->type));
856     GNUNET_MQ_discard (ev);
857   }
858   if (NULL != mq->current_envelope)
859   {
860     /* we can only discard envelopes that
861      * are not queued! */
862     mq->current_envelope->parent_queue = NULL;
863     LOG (GNUNET_ERROR_TYPE_DEBUG,
864          "MQ destroy drops current message of type %u\n",
865          ntohs (mq->current_envelope->mh->type));
866     GNUNET_MQ_discard (mq->current_envelope);
867     mq->current_envelope = NULL;
868     GNUNET_assert (0 < mq->queue_length);
869     mq->queue_length--;
870   }
871   GNUNET_assert (0 == mq->queue_length);
872   while (NULL != (dnh = mq->dnh_head))
873   {
874     dnh->cb (dnh->cb_cls);
875     GNUNET_MQ_destroy_notify_cancel (dnh);
876   }
877   if (NULL != mq->assoc_map)
878   {
879     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
880     mq->assoc_map = NULL;
881   }
882   GNUNET_free_non_null (mq->handlers);
883   GNUNET_free (mq);
884 }
885
886
887 const struct GNUNET_MessageHeader *
888 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
889                               uint16_t base_size)
890 {
891   uint16_t whole_size;
892   uint16_t nested_size;
893   const struct GNUNET_MessageHeader *nested_msg;
894
895   whole_size = ntohs (mh->size);
896   GNUNET_assert (whole_size >= base_size);
897   nested_size = whole_size - base_size;
898   if (0 == nested_size)
899     return NULL;
900   if (nested_size < sizeof(struct GNUNET_MessageHeader))
901   {
902     GNUNET_break_op (0);
903     return NULL;
904   }
905   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
906   if (ntohs (nested_msg->size) != nested_size)
907   {
908     GNUNET_break_op (0);
909     return NULL;
910   }
911   return nested_msg;
912 }
913
914
915 /**
916  * Cancel sending the message. Message must have been sent with
917  * #GNUNET_MQ_send before.  May not be called after the notify sent
918  * callback has been called
919  *
920  * @param ev queued envelope to cancel
921  */
922 void
923 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
924 {
925   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
926
927   GNUNET_assert (NULL != mq);
928   GNUNET_assert (NULL != mq->cancel_impl);
929
930   mq->evacuate_called = GNUNET_NO;
931
932   if (mq->current_envelope == ev)
933   {
934     /* complex case, we already started with transmitting
935        the message using the callbacks. */
936     GNUNET_assert (GNUNET_NO == mq->in_flight);
937     GNUNET_assert (0 < mq->queue_length);
938     mq->queue_length--;
939     mq->cancel_impl (mq,
940                      mq->impl_state);
941     /* continue sending the next message, if any */
942     mq->current_envelope = mq->envelope_head;
943     if (NULL != mq->current_envelope)
944     {
945       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
946                                    mq->envelope_tail,
947                                    mq->current_envelope);
948
949       LOG (GNUNET_ERROR_TYPE_DEBUG,
950            "sending canceled message of type %u queue\n",
951            ntohs (ev->mh->type));
952       mq->send_impl (mq,
953                      mq->current_envelope->mh,
954                      mq->impl_state);
955     }
956   }
957   else
958   {
959     /* simple case, message is still waiting in the queue */
960     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
961                                  mq->envelope_tail,
962                                  ev);
963     GNUNET_assert (0 < mq->queue_length);
964     mq->queue_length--;
965   }
966
967   if (GNUNET_YES != mq->evacuate_called)
968   {
969     ev->parent_queue = NULL;
970     ev->mh = NULL;
971     /* also frees ev */
972     GNUNET_free (ev);
973   }
974 }
975
976
977 /**
978  * Function to obtain the current envelope
979  * from within #GNUNET_MQ_SendImpl implementations.
980  *
981  * @param mq message queue to interrogate
982  * @return the current envelope
983  */
984 struct GNUNET_MQ_Envelope *
985 GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
986 {
987   return mq->current_envelope;
988 }
989
990
991 /**
992  * Function to obtain the last envelope in the queue.
993  *
994  * @param mq message queue to interrogate
995  * @return the last envelope in the queue
996  */
997 struct GNUNET_MQ_Envelope *
998 GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
999 {
1000   if (NULL != mq->envelope_tail)
1001     return mq->envelope_tail;
1002
1003   return mq->current_envelope;
1004 }
1005
1006
1007 /**
1008  * Set application-specific preferences for this envelope.
1009  * Overrides the options set for the queue with
1010  * #GNUNET_MQ_set_options() for this message only.
1011  *
1012  * @param env message to set options for
1013  * @param pp priorities and preferences to apply
1014  */
1015 void
1016 GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1017                            enum GNUNET_MQ_PriorityPreferences pp)
1018 {
1019   env->priority = pp;
1020   env->have_custom_options = GNUNET_YES;
1021 }
1022
1023
1024 /**
1025  * Get application-specific options for this envelope.
1026  *
1027  * @param env message to set options for
1028  * @return priorities and preferences to apply for @a env
1029  */
1030 enum GNUNET_MQ_PriorityPreferences
1031 GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env)
1032 {
1033   struct GNUNET_MQ_Handle *mq = env->parent_queue;
1034
1035   if (GNUNET_YES == env->have_custom_options)
1036     return env->priority;
1037   if (NULL == mq)
1038     return 0;
1039   return mq->priority;
1040 }
1041
1042
1043 /**
1044  * Combine performance preferences set for different
1045  * envelopes that are being combined into one larger envelope.
1046  *
1047  * @param p1 one set of preferences
1048  * @param p2 second set of preferences
1049  * @return combined priority and preferences to use
1050  */
1051 enum GNUNET_MQ_PriorityPreferences
1052 GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1,
1053                                enum GNUNET_MQ_PriorityPreferences p2)
1054 {
1055   enum GNUNET_MQ_PriorityPreferences ret;
1056
1057   ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
1058   ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
1059   ret |=
1060     ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY));
1061   ret |=
1062     ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED));
1063   ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
1064   ret |=
1065     ((p1 & GNUNET_MQ_PREF_OUT_OF_ORDER) & (p2 & GNUNET_MQ_PREF_OUT_OF_ORDER));
1066   return ret;
1067 }
1068
1069
1070 /**
1071  * Set application-specific default options for this queue.
1072  *
1073  * @param mq message queue to set options for
1074  * @param pp priorities and preferences to apply
1075  */
1076 void
1077 GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1078                        enum GNUNET_MQ_PriorityPreferences pp)
1079 {
1080   mq->priority = pp;
1081 }
1082
1083
1084 /**
1085  * Obtain message contained in envelope.
1086  *
1087  * @param env the envelope
1088  * @return message contained in the envelope
1089  */
1090 const struct GNUNET_MessageHeader *
1091 GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env)
1092 {
1093   return env->mh;
1094 }
1095
1096
1097 /**
1098  * Return next envelope in queue.
1099  *
1100  * @param env a queued envelope
1101  * @return next one, or NULL
1102  */
1103 const struct GNUNET_MQ_Envelope *
1104 GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env)
1105 {
1106   return env->next;
1107 }
1108
1109
1110 /**
1111  * Register function to be called whenever @a mq is being
1112  * destroyed.
1113  *
1114  * @param mq message queue to watch
1115  * @param cb function to call on @a mq destruction
1116  * @param cb_cls closure for @a cb
1117  * @return handle for #GNUNET_MQ_destroy_notify_cancel().
1118  */
1119 struct GNUNET_MQ_DestroyNotificationHandle *
1120 GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1121                           GNUNET_SCHEDULER_TaskCallback cb,
1122                           void *cb_cls)
1123 {
1124   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1125
1126   dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
1127   dnh->mq = mq;
1128   dnh->cb = cb;
1129   dnh->cb_cls = cb_cls;
1130   GNUNET_CONTAINER_DLL_insert (mq->dnh_head, mq->dnh_tail, dnh);
1131   return dnh;
1132 }
1133
1134
1135 /**
1136  * Cancel registration from #GNUNET_MQ_destroy_notify().
1137  *
1138  * @param dnh handle for registration to cancel
1139  */
1140 void
1141 GNUNET_MQ_destroy_notify_cancel (struct
1142                                  GNUNET_MQ_DestroyNotificationHandle *dnh)
1143 {
1144   struct GNUNET_MQ_Handle *mq = dnh->mq;
1145
1146   GNUNET_CONTAINER_DLL_remove (mq->dnh_head, mq->dnh_tail, dnh);
1147   GNUNET_free (dnh);
1148 }
1149
1150
1151 /**
1152  * Insert @a env into the envelope DLL starting at @a env_head
1153  * Note that @a env must not be in any MQ while this function
1154  * is used with DLLs defined outside of the MQ module.  This
1155  * is just in case some application needs to also manage a
1156  * FIFO of envelopes independent of MQ itself and wants to
1157  * re-use the pointers internal to @a env.  Use with caution.
1158  *
1159  * @param[in|out] env_head of envelope DLL
1160  * @param[in|out] env_tail tail of envelope DLL
1161  * @param[in|out] env element to insert at the tail
1162  */
1163 void
1164 GNUNET_MQ_dll_insert_head (struct GNUNET_MQ_Envelope **env_head,
1165                            struct GNUNET_MQ_Envelope **env_tail,
1166                            struct GNUNET_MQ_Envelope *env)
1167 {
1168   GNUNET_CONTAINER_DLL_insert (*env_head, *env_tail, env);
1169 }
1170
1171
1172 /**
1173  * Insert @a env into the envelope DLL starting at @a env_head
1174  * Note that @a env must not be in any MQ while this function
1175  * is used with DLLs defined outside of the MQ module.  This
1176  * is just in case some application needs to also manage a
1177  * FIFO of envelopes independent of MQ itself and wants to
1178  * re-use the pointers internal to @a env.  Use with caution.
1179  *
1180  * @param[in|out] env_head of envelope DLL
1181  * @param[in|out] env_tail tail of envelope DLL
1182  * @param[in|out] env element to insert at the tail
1183  */
1184 void
1185 GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
1186                            struct GNUNET_MQ_Envelope **env_tail,
1187                            struct GNUNET_MQ_Envelope *env)
1188 {
1189   GNUNET_CONTAINER_DLL_insert_tail (*env_head, *env_tail, env);
1190 }
1191
1192
1193 /**
1194  * Remove @a env from the envelope DLL starting at @a env_head.
1195  * Note that @a env must not be in any MQ while this function
1196  * is used with DLLs defined outside of the MQ module. This
1197  * is just in case some application needs to also manage a
1198  * FIFO of envelopes independent of MQ itself and wants to
1199  * re-use the pointers internal to @a env.  Use with caution.
1200  *
1201  * @param[in|out] env_head of envelope DLL
1202  * @param[in|out] env_tail tail of envelope DLL
1203  * @param[in|out] env element to remove from the DLL
1204  */
1205 void
1206 GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
1207                       struct GNUNET_MQ_Envelope **env_tail,
1208                       struct GNUNET_MQ_Envelope *env)
1209 {
1210   GNUNET_CONTAINER_DLL_remove (*env_head, *env_tail, env);
1211 }
1212
1213
1214 /**
1215  * Copy an array of handlers.
1216  *
1217  * Useful if the array has been delared in local memory and needs to be
1218  * persisted for future use.
1219  *
1220  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1221  * @return A newly allocated array of handlers.
1222  *         Needs to be freed with #GNUNET_free.
1223  */
1224 struct GNUNET_MQ_MessageHandler *
1225 GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1226 {
1227   struct GNUNET_MQ_MessageHandler *copy;
1228   unsigned int count;
1229
1230   if (NULL == handlers)
1231     return NULL;
1232
1233   count = GNUNET_MQ_count_handlers (handlers);
1234   copy = GNUNET_new_array (count + 1, struct GNUNET_MQ_MessageHandler);
1235   GNUNET_memcpy (copy,
1236                  handlers,
1237                  count * sizeof(struct GNUNET_MQ_MessageHandler));
1238   return copy;
1239 }
1240
1241
1242 /**
1243  * Copy an array of handlers, appending AGPL handler.
1244  *
1245  * Useful if the array has been delared in local memory and needs to be
1246  * persisted for future use.
1247  *
1248  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1249  * @param agpl_handler function to call for AGPL handling
1250  * @param agpl_cls closure for @a agpl_handler
1251  * @return A newly allocated array of handlers.
1252  *         Needs to be freed with #GNUNET_free.
1253  */
1254 struct GNUNET_MQ_MessageHandler *
1255 GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers,
1256                           GNUNET_MQ_MessageCallback agpl_handler,
1257                           void *agpl_cls)
1258 {
1259   struct GNUNET_MQ_MessageHandler *copy;
1260   unsigned int count;
1261
1262   if (NULL == handlers)
1263     return NULL;
1264   count = GNUNET_MQ_count_handlers (handlers);
1265   copy = GNUNET_new_array (count + 2, struct GNUNET_MQ_MessageHandler);
1266   GNUNET_memcpy (copy,
1267                  handlers,
1268                  count * sizeof(struct GNUNET_MQ_MessageHandler));
1269   copy[count].mv = NULL;
1270   copy[count].cb = agpl_handler;
1271   copy[count].cls = agpl_cls;
1272   copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
1273   copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
1274   return copy;
1275 }
1276
1277
1278 /**
1279  * Count the handlers in a handler array.
1280  *
1281  * @param handlers Array of handlers to be counted.
1282  * @return The number of handlers in the array.
1283  */
1284 unsigned int
1285 GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1286 {
1287   unsigned int i;
1288
1289   if (NULL == handlers)
1290     return 0;
1291
1292   for (i = 0; NULL != handlers[i].cb; i++)
1293     ;
1294
1295   return i;
1296 }
1297
1298
1299 /**
1300  * Convert an `enum GNUNET_MQ_PreferenceType` to a string
1301  *
1302  * @param type the preference type
1303  * @return a string or NULL if invalid
1304  */
1305 const char *
1306 GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type)
1307 {
1308   switch (type)
1309   {
1310   case GNUNET_MQ_PREFERENCE_NONE:
1311     return "NONE";
1312
1313   case GNUNET_MQ_PREFERENCE_BANDWIDTH:
1314     return "BANDWIDTH";
1315
1316   case GNUNET_MQ_PREFERENCE_LATENCY:
1317     return "LATENCY";
1318
1319   case GNUNET_MQ_PREFERENCE_RELIABILITY:
1320     return "RELIABILITY";
1321   }
1322   ;
1323   return NULL;
1324 }
1325
1326
1327 /* end of mq.c */