auto-provide OS_IPK paths in [paths] of config
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2019 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header.
48    * The GNUNET_MQ_Envelope header is allocated at
49    * the end of the message.
50    */
51   struct GNUNET_MessageHeader *mh;
52
53   /**
54    * Queue the message is queued in, NULL if message is not queued.
55    */
56   struct GNUNET_MQ_Handle *parent_queue;
57
58   /**
59    * Called after the message was sent irrevocably.
60    */
61   GNUNET_SCHEDULER_TaskCallback sent_cb;
62
63   /**
64    * Closure for @e send_cb
65    */
66   void *sent_cls;
67
68   /**
69    * Flags that were set for this envelope by
70    * #GNUNET_MQ_env_set_options().   Only valid if
71    * @e have_custom_options is set.
72    */
73   enum GNUNET_MQ_PriorityPreferences priority;
74
75   /**
76    * Did the application call #GNUNET_MQ_env_set_options()?
77    */
78   int have_custom_options;
79 };
80
81
82 /**
83  * Handle to a message queue.
84  */
85 struct GNUNET_MQ_Handle
86 {
87   /**
88    * Handlers array, or NULL if the queue should not receive messages
89    */
90   struct GNUNET_MQ_MessageHandler *handlers;
91
92   /**
93    * Actual implementation of message sending,
94    * called when a message is added
95    */
96   GNUNET_MQ_SendImpl send_impl;
97
98   /**
99    * Implementation-dependent queue destruction function
100    */
101   GNUNET_MQ_DestroyImpl destroy_impl;
102
103   /**
104    * Implementation-dependent send cancel function
105    */
106   GNUNET_MQ_CancelImpl cancel_impl;
107
108   /**
109    * Implementation-specific state
110    */
111   void *impl_state;
112
113   /**
114    * Callback will be called when an error occurs.
115    */
116   GNUNET_MQ_ErrorHandler error_handler;
117
118   /**
119    * Closure for the error handler.
120    */
121   void *error_handler_cls;
122
123   /**
124    * Task to asynchronously run #impl_send_continue().
125    */
126   struct GNUNET_SCHEDULER_Task *send_task;
127
128   /**
129    * Linked list of messages pending to be sent
130    */
131   struct GNUNET_MQ_Envelope *envelope_head;
132
133   /**
134    * Linked list of messages pending to be sent
135    */
136   struct GNUNET_MQ_Envelope *envelope_tail;
137
138   /**
139    * Message that is currently scheduled to be
140    * sent. Not the head of the message queue, as the implementation
141    * needs to know if sending has been already scheduled or not.
142    */
143   struct GNUNET_MQ_Envelope *current_envelope;
144
145   /**
146    * Map of associations, lazily allocated
147    */
148   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
149
150   /**
151    * Functions to call on queue destruction; kept in a DLL.
152    */
153   struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
154
155   /**
156    * Functions to call on queue destruction; kept in a DLL.
157    */
158   struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
159
160   /**
161    * Flags that were set for this queue by
162    * #GNUNET_MQ_set_options().   Default is 0.
163    */
164   enum GNUNET_MQ_PriorityPreferences priority;
165
166   /**
167    * Next id that should be used for the @e assoc_map,
168    * initialized lazily to a random value together with
169    * @e assoc_map
170    */
171   uint32_t assoc_id;
172
173   /**
174    * Number of entries we have in the envelope-DLL.
175    */
176   unsigned int queue_length;
177
178   /**
179    * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
180    * FIXME: is this dead?
181    */
182   int evacuate_called;
183
184   /**
185    * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
186    */
187   int in_flight;
188 };
189
190
191 /**
192  * Call the message message handler that was registered
193  * for the type of the given message in the given message queue.
194  *
195  * This function is indended to be used for the implementation
196  * of message queues.
197  *
198  * @param mq message queue with the handlers
199  * @param mh message to dispatch
200  */
201 void
202 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
203                           const struct GNUNET_MessageHeader *mh)
204 {
205   int ret;
206
207   ret = GNUNET_MQ_handle_message (mq->handlers, mh);
208   if (GNUNET_SYSERR == ret)
209   {
210     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED);
211     return;
212   }
213 }
214
215
216 /**
217  * Call the message message handler that was registered
218  * for the type of the given message in the given @a handlers list.
219  *
220  * This function is indended to be used for the implementation
221  * of message queues.
222  *
223  * @param handlers a set of handlers
224  * @param mh message to dispatch
225  * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
226  *         #GNUNET_SYSERR if message was rejected by check function
227  */
228 int
229 GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
230                           const struct GNUNET_MessageHeader *mh)
231 {
232   const struct GNUNET_MQ_MessageHandler *handler;
233   int handled = GNUNET_NO;
234   uint16_t msize = ntohs (mh->size);
235   uint16_t mtype = ntohs (mh->type);
236
237   LOG (GNUNET_ERROR_TYPE_DEBUG,
238        "Received message of type %u and size %u\n",
239        mtype,
240        msize);
241
242   if (NULL == handlers)
243     goto done;
244   for (handler = handlers; NULL != handler->cb; handler++)
245   {
246     if (handler->type == mtype)
247     {
248       handled = GNUNET_YES;
249       if ((handler->expected_size > msize) ||
250           ((handler->expected_size != msize) && (NULL == handler->mv)))
251       {
252         /* Too small, or not an exact size and
253            no 'mv' handler to check rest */
254         LOG (GNUNET_ERROR_TYPE_ERROR,
255              "Received malformed message of type %u\n",
256              (unsigned int) handler->type);
257         return GNUNET_SYSERR;
258       }
259       if ((NULL == handler->mv) ||
260           (GNUNET_OK == handler->mv (handler->cls, mh)))
261       {
262         /* message well-formed, pass to handler */
263         handler->cb (handler->cls, mh);
264       }
265       else
266       {
267         /* Message rejected by check routine */
268         LOG (GNUNET_ERROR_TYPE_ERROR,
269              "Received malformed message of type %u\n",
270              (unsigned int) handler->type);
271         return GNUNET_SYSERR;
272       }
273       break;
274     }
275   }
276 done:
277   if (GNUNET_NO == handled)
278   {
279     LOG (GNUNET_ERROR_TYPE_INFO,
280          "No handler for message of type %u and size %u\n",
281          mtype,
282          msize);
283     return GNUNET_NO;
284   }
285   return GNUNET_OK;
286 }
287
288
289 /**
290  * Call the error handler of a message queue with the given
291  * error code.  If there is no error handler, log a warning.
292  *
293  * This function is intended to be used by the implementation
294  * of message queues.
295  *
296  * @param mq message queue
297  * @param error the error type
298  */
299 void
300 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error)
301 {
302   if (NULL == mq->error_handler)
303   {
304     LOG (GNUNET_ERROR_TYPE_WARNING,
305          "Got error %d, but no handler installed\n",
306          (int) error);
307     return;
308   }
309   mq->error_handler (mq->error_handler_cls, error);
310 }
311
312
313 /**
314  * Discard the message queue message, free all
315  * allocated resources. Must be called in the event
316  * that a message is created but should not actually be sent.
317  *
318  * @param mqm the message to discard
319  */
320 void
321 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
322 {
323   GNUNET_assert (NULL == ev->parent_queue);
324   GNUNET_free (ev);
325 }
326
327
328 /**
329  * Obtain the current length of the message queue.
330  *
331  * @param mq queue to inspect
332  * @return number of queued, non-transmitted messages
333  */
334 unsigned int
335 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
336 {
337   if (GNUNET_YES != mq->in_flight)
338   {
339     return mq->queue_length;
340   }
341   return mq->queue_length - 1;
342 }
343
344
345 /**
346  * Send a message with the given message queue.
347  * May only be called once per message.
348  *
349  * @param mq message queue
350  * @param ev the envelope with the message to send.
351  */
352 void
353 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
354 {
355   GNUNET_assert (NULL != mq);
356   GNUNET_assert (NULL == ev->parent_queue);
357
358   mq->queue_length++;
359   if (mq->queue_length >= 10000)
360   {
361     /* This would seem like a bug... */
362     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
363                 "MQ with %u entries extended by message of type %u (FC broken?)\n",
364                 (unsigned int) mq->queue_length,
365                 (unsigned int) ntohs (ev->mh->type));
366   }
367   ev->parent_queue = mq;
368   /* is the implementation busy? queue it! */
369   if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
370   {
371     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev);
372     return;
373   }
374   GNUNET_assert (NULL == mq->envelope_head);
375   mq->current_envelope = ev;
376
377   LOG (GNUNET_ERROR_TYPE_DEBUG,
378        "sending message of type %u, queue empty (MQ: %p)\n",
379        ntohs (ev->mh->type),
380        mq);
381
382   mq->send_impl (mq, ev->mh, mq->impl_state);
383 }
384
385
386 /**
387  * Remove the first envelope that has not yet been sent from the message
388  * queue and return it.
389  *
390  * @param mq queue to remove envelope from
391  * @return NULL if queue is empty (or has no envelope that is not under transmission)
392  */
393 struct GNUNET_MQ_Envelope *
394 GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
395 {
396   struct GNUNET_MQ_Envelope *env;
397
398   env = mq->envelope_head;
399   GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, env);
400   mq->queue_length--;
401   env->parent_queue = NULL;
402   return env;
403 }
404
405
406 /**
407  * Function to copy an envelope.  The envelope must not yet
408  * be in any queue or have any options or callbacks set.
409  *
410  * @param env envelope to copy
411  * @return copy of @a env
412  */
413 struct GNUNET_MQ_Envelope *
414 GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
415 {
416   GNUNET_assert (NULL == env->next);
417   GNUNET_assert (NULL == env->parent_queue);
418   GNUNET_assert (NULL == env->sent_cb);
419   GNUNET_assert (GNUNET_NO == env->have_custom_options);
420   return GNUNET_MQ_msg_copy (env->mh);
421 }
422
423
424 /**
425  * Send a copy of a message with the given message queue.
426  * Can be called repeatedly on the same envelope.
427  *
428  * @param mq message queue
429  * @param ev the envelope with the message to send.
430  */
431 void
432 GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
433                      const struct GNUNET_MQ_Envelope *ev)
434 {
435   struct GNUNET_MQ_Envelope *env;
436   uint16_t msize;
437
438   msize = ntohs (ev->mh->size);
439   env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize);
440   env->mh = (struct GNUNET_MessageHeader *) &env[1];
441   env->sent_cb = ev->sent_cb;
442   env->sent_cls = ev->sent_cls;
443   GNUNET_memcpy (&env[1], ev->mh, msize);
444   GNUNET_MQ_send (mq, env);
445 }
446
447
448 /**
449  * Task run to call the send implementation for the next queued
450  * message, if any.  Only useful for implementing message queues,
451  * results in undefined behavior if not used carefully.
452  *
453  * @param cls message queue to send the next message with
454  */
455 static void
456 impl_send_continue (void *cls)
457 {
458   struct GNUNET_MQ_Handle *mq = cls;
459
460   mq->send_task = NULL;
461   /* call is only valid if we're actually currently sending
462    * a message */
463   if (NULL == mq->envelope_head)
464     return;
465   mq->current_envelope = mq->envelope_head;
466   GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
467                                mq->envelope_tail,
468                                mq->current_envelope);
469
470   LOG (GNUNET_ERROR_TYPE_DEBUG,
471        "sending message of type %u from queue\n",
472        ntohs (mq->current_envelope->mh->type));
473
474   mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
475 }
476
477
478 /**
479  * Call the send implementation for the next queued message, if any.
480  * Only useful for implementing message queues, results in undefined
481  * behavior if not used carefully.
482  *
483  * @param mq message queue to send the next message with
484  */
485 void
486 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
487 {
488   struct GNUNET_MQ_Envelope *current_envelope;
489   GNUNET_SCHEDULER_TaskCallback cb;
490
491   GNUNET_assert (0 < mq->queue_length);
492   mq->queue_length--;
493   mq->in_flight = GNUNET_NO;
494   current_envelope = mq->current_envelope;
495   current_envelope->parent_queue = NULL;
496   mq->current_envelope = NULL;
497   GNUNET_assert (NULL == mq->send_task);
498   mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq);
499   if (NULL != (cb = current_envelope->sent_cb))
500   {
501     current_envelope->sent_cb = NULL;
502     cb (current_envelope->sent_cls);
503   }
504   GNUNET_free (current_envelope);
505 }
506
507
508 /**
509  * Call the send notification for the current message, but do not
510  * try to send the next message until #GNUNET_MQ_impl_send_continue
511  * is called.
512  *
513  * Only useful for implementing message queues, results in undefined
514  * behavior if not used carefully.
515  *
516  * @param mq message queue to send the next message with
517  */
518 void
519 GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
520 {
521   struct GNUNET_MQ_Envelope *current_envelope;
522   GNUNET_SCHEDULER_TaskCallback cb;
523
524   mq->in_flight = GNUNET_YES;
525   /* call is only valid if we're actually currently sending
526    * a message */
527   current_envelope = mq->current_envelope;
528   GNUNET_assert (NULL != current_envelope);
529   /* can't call cancel from now on anymore */
530   current_envelope->parent_queue = NULL;
531   if (NULL != (cb = current_envelope->sent_cb))
532   {
533     current_envelope->sent_cb = NULL;
534     cb (current_envelope->sent_cls);
535   }
536 }
537
538
539 /**
540  * Create a message queue for the specified handlers.
541  *
542  * @param send function the implements sending messages
543  * @param destroy function that implements destroying the queue
544  * @param cancel function that implements canceling a message
545  * @param impl_state for the queue, passed to 'send' and 'destroy'
546  * @param handlers array of message handlers
547  * @param error_handler handler for read and write errors
548  * @param error_handler_cls closure for @a error_handler
549  * @return a new message queue
550  */
551 struct GNUNET_MQ_Handle *
552 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
553                                GNUNET_MQ_DestroyImpl destroy,
554                                GNUNET_MQ_CancelImpl cancel,
555                                void *impl_state,
556                                const struct GNUNET_MQ_MessageHandler *handlers,
557                                GNUNET_MQ_ErrorHandler error_handler,
558                                void *error_handler_cls)
559 {
560   struct GNUNET_MQ_Handle *mq;
561
562   mq = GNUNET_new (struct GNUNET_MQ_Handle);
563   mq->send_impl = send;
564   mq->destroy_impl = destroy;
565   mq->cancel_impl = cancel;
566   mq->handlers = GNUNET_MQ_copy_handlers (handlers);
567   mq->error_handler = error_handler;
568   mq->error_handler_cls = error_handler_cls;
569   mq->impl_state = impl_state;
570
571   return mq;
572 }
573
574
575 /**
576  * Change the closure argument in all of the `handlers` of the
577  * @a mq.
578  *
579  * @param mq to modify
580  * @param handlers_cls new closure to use
581  */
582 void
583 GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls)
584 {
585   if (NULL == mq->handlers)
586     return;
587   for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
588     mq->handlers[i].cls = handlers_cls;
589 }
590
591
592 /**
593  * Get the message that should currently be sent.
594  * Fails if there is no current message.
595  * Only useful for implementing message queues,
596  * results in undefined behavior if not used carefully.
597  *
598  * @param mq message queue with the current message
599  * @return message to send, never NULL
600  */
601 const struct GNUNET_MessageHeader *
602 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
603 {
604   GNUNET_assert (NULL != mq->current_envelope);
605   GNUNET_assert (NULL != mq->current_envelope->mh);
606   return mq->current_envelope->mh;
607 }
608
609
610 /**
611  * Get the implementation state associated with the
612  * message queue.
613  *
614  * While the GNUNET_MQ_Impl* callbacks receive the
615  * implementation state, continuations that are scheduled
616  * by the implementation function often only have one closure
617  * argument, with this function it is possible to get at the
618  * implementation state when only passing the GNUNET_MQ_Handle
619  * as closure.
620  *
621  * @param mq message queue with the current message
622  * @return message to send, never NULL
623  */
624 void *
625 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
626 {
627   return mq->impl_state;
628 }
629
630
631 struct GNUNET_MQ_Envelope *
632 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
633 {
634   struct GNUNET_MQ_Envelope *ev;
635
636   ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope));
637   ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
638   ev->mh->size = htons (size);
639   ev->mh->type = htons (type);
640   if (NULL != mhp)
641     *mhp = ev->mh;
642   return ev;
643 }
644
645
646 /**
647  * Create a new envelope by copying an existing message.
648  *
649  * @param hdr header of the message to copy
650  * @return envelope containing @a hdr
651  */
652 struct GNUNET_MQ_Envelope *
653 GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
654 {
655   struct GNUNET_MQ_Envelope *mqm;
656   uint16_t size = ntohs (hdr->size);
657
658   mqm = GNUNET_malloc (sizeof(*mqm) + size);
659   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
660   GNUNET_memcpy (mqm->mh, hdr, size);
661   return mqm;
662 }
663
664
665 /**
666  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
667  *
668  * @param mhp pointer to the message header pointer that will be changed to allocate at
669  *        the newly allocated space for the message.
670  * @param base_size size of the data before the nested message
671  * @param type type of the message in the envelope
672  * @param nested_mh the message to append to the message after base_size
673  */
674 struct GNUNET_MQ_Envelope *
675 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
676                           uint16_t base_size,
677                           uint16_t type,
678                           const struct GNUNET_MessageHeader *nested_mh)
679 {
680   struct GNUNET_MQ_Envelope *mqm;
681   uint16_t size;
682
683   if (NULL == nested_mh)
684     return GNUNET_MQ_msg_ (mhp, base_size, type);
685
686   size = base_size + ntohs (nested_mh->size);
687
688   /* check for uint16_t overflow */
689   if (size < base_size)
690     return NULL;
691
692   mqm = GNUNET_MQ_msg_ (mhp, size, type);
693   GNUNET_memcpy ((char *) mqm->mh + base_size,
694                  nested_mh,
695                  ntohs (nested_mh->size));
696
697   return mqm;
698 }
699
700
701 /**
702  * Associate the assoc_data in mq with a unique request id.
703  *
704  * @param mq message queue, id will be unique for the queue
705  * @param assoc_data to associate
706  */
707 uint32_t
708 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data)
709 {
710   uint32_t id;
711
712   if (NULL == mq->assoc_map)
713   {
714     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
715     mq->assoc_id = 1;
716   }
717   id = mq->assoc_id++;
718   GNUNET_assert (GNUNET_OK ==
719                  GNUNET_CONTAINER_multihashmap32_put (
720                    mq->assoc_map,
721                    id,
722                    assoc_data,
723                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
724   return id;
725 }
726
727
728 /**
729  * Get the data associated with a @a request_id in a queue
730  *
731  * @param mq the message queue with the association
732  * @param request_id the request id we are interested in
733  * @return the associated data
734  */
735 void *
736 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
737 {
738   if (NULL == mq->assoc_map)
739     return NULL;
740   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
741 }
742
743
744 /**
745  * Remove the association for a @a request_id
746  *
747  * @param mq the message queue with the association
748  * @param request_id the request id we want to remove
749  * @return the associated data
750  */
751 void *
752 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
753 {
754   void *val;
755
756   if (NULL == mq->assoc_map)
757     return NULL;
758   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
759   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
760   return val;
761 }
762
763
764 /**
765  * Call a callback once the envelope has been sent, that is,
766  * sending it can not be canceled anymore.
767  * There can be only one notify sent callback per envelope.
768  *
769  * @param ev message to call the notify callback for
770  * @param cb the notify callback
771  * @param cb_cls closure for the callback
772  */
773 void
774 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
775                        GNUNET_SCHEDULER_TaskCallback cb,
776                        void *cb_cls)
777 {
778   /* allow setting *OR* clearing callback */
779   GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
780   ev->sent_cb = cb;
781   ev->sent_cls = cb_cls;
782 }
783
784
785 /**
786  * Handle we return for callbacks registered to be
787  * notified when #GNUNET_MQ_destroy() is called on a queue.
788  */
789 struct GNUNET_MQ_DestroyNotificationHandle
790 {
791   /**
792    * Kept in a DLL.
793    */
794   struct GNUNET_MQ_DestroyNotificationHandle *prev;
795
796   /**
797    * Kept in a DLL.
798    */
799   struct GNUNET_MQ_DestroyNotificationHandle *next;
800
801   /**
802    * Queue to notify about.
803    */
804   struct GNUNET_MQ_Handle *mq;
805
806   /**
807    * Function to call.
808    */
809   GNUNET_SCHEDULER_TaskCallback cb;
810
811   /**
812    * Closure for @e cb.
813    */
814   void *cb_cls;
815 };
816
817
818 /**
819  * Destroy the message queue.
820  *
821  * @param mq message queue to destroy
822  */
823 void
824 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
825 {
826   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
827
828   if (NULL != mq->destroy_impl)
829   {
830     mq->destroy_impl (mq, mq->impl_state);
831   }
832   if (NULL != mq->send_task)
833   {
834     GNUNET_SCHEDULER_cancel (mq->send_task);
835     mq->send_task = NULL;
836   }
837   while (NULL != mq->envelope_head)
838   {
839     struct GNUNET_MQ_Envelope *ev;
840
841     ev = mq->envelope_head;
842     ev->parent_queue = NULL;
843     GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
844     GNUNET_assert (0 < mq->queue_length);
845     mq->queue_length--;
846     LOG (GNUNET_ERROR_TYPE_DEBUG,
847          "MQ destroy drops message of type %u\n",
848          ntohs (ev->mh->type));
849     GNUNET_MQ_discard (ev);
850   }
851   if (NULL != mq->current_envelope)
852   {
853     /* we can only discard envelopes that
854      * are not queued! */
855     mq->current_envelope->parent_queue = NULL;
856     LOG (GNUNET_ERROR_TYPE_DEBUG,
857          "MQ destroy drops current message of type %u\n",
858          ntohs (mq->current_envelope->mh->type));
859     GNUNET_MQ_discard (mq->current_envelope);
860     mq->current_envelope = NULL;
861     GNUNET_assert (0 < mq->queue_length);
862     mq->queue_length--;
863   }
864   GNUNET_assert (0 == mq->queue_length);
865   while (NULL != (dnh = mq->dnh_head))
866   {
867     dnh->cb (dnh->cb_cls);
868     GNUNET_MQ_destroy_notify_cancel (dnh);
869   }
870   if (NULL != mq->assoc_map)
871   {
872     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
873     mq->assoc_map = NULL;
874   }
875   GNUNET_free_non_null (mq->handlers);
876   GNUNET_free (mq);
877 }
878
879
880 const struct GNUNET_MessageHeader *
881 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
882                               uint16_t base_size)
883 {
884   uint16_t whole_size;
885   uint16_t nested_size;
886   const struct GNUNET_MessageHeader *nested_msg;
887
888   whole_size = ntohs (mh->size);
889   GNUNET_assert (whole_size >= base_size);
890   nested_size = whole_size - base_size;
891   if (0 == nested_size)
892     return NULL;
893   if (nested_size < sizeof(struct GNUNET_MessageHeader))
894   {
895     GNUNET_break_op (0);
896     return NULL;
897   }
898   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
899   if (ntohs (nested_msg->size) != nested_size)
900   {
901     GNUNET_break_op (0);
902     return NULL;
903   }
904   return nested_msg;
905 }
906
907
908 /**
909  * Cancel sending the message. Message must have been sent with
910  * #GNUNET_MQ_send before.  May not be called after the notify sent
911  * callback has been called
912  *
913  * @param ev queued envelope to cancel
914  */
915 void
916 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
917 {
918   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
919
920   GNUNET_assert (NULL != mq);
921   GNUNET_assert (NULL != mq->cancel_impl);
922
923   mq->evacuate_called = GNUNET_NO;
924
925   if (mq->current_envelope == ev)
926   {
927     /* complex case, we already started with transmitting
928        the message using the callbacks. */
929     GNUNET_assert (GNUNET_NO == mq->in_flight);
930     GNUNET_assert (0 < mq->queue_length);
931     mq->queue_length--;
932     mq->cancel_impl (mq, mq->impl_state);
933     /* continue sending the next message, if any */
934     mq->current_envelope = mq->envelope_head;
935     if (NULL != mq->current_envelope)
936     {
937       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
938                                    mq->envelope_tail,
939                                    mq->current_envelope);
940
941       LOG (GNUNET_ERROR_TYPE_DEBUG,
942            "sending canceled message of type %u queue\n",
943            ntohs (ev->mh->type));
944
945       mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
946     }
947   }
948   else
949   {
950     /* simple case, message is still waiting in the queue */
951     GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
952     GNUNET_assert (0 < mq->queue_length);
953     mq->queue_length--;
954   }
955
956   if (GNUNET_YES != mq->evacuate_called)
957   {
958     ev->parent_queue = NULL;
959     ev->mh = NULL;
960     /* also frees ev */
961     GNUNET_free (ev);
962   }
963 }
964
965
966 /**
967  * Function to obtain the current envelope
968  * from within #GNUNET_MQ_SendImpl implementations.
969  *
970  * @param mq message queue to interrogate
971  * @return the current envelope
972  */
973 struct GNUNET_MQ_Envelope *
974 GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
975 {
976   return mq->current_envelope;
977 }
978
979
980 /**
981  * Function to obtain the last envelope in the queue.
982  *
983  * @param mq message queue to interrogate
984  * @return the last envelope in the queue
985  */
986 struct GNUNET_MQ_Envelope *
987 GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
988 {
989   if (NULL != mq->envelope_tail)
990     return mq->envelope_tail;
991
992   return mq->current_envelope;
993 }
994
995
996 /**
997  * Set application-specific preferences for this envelope.
998  * Overrides the options set for the queue with
999  * #GNUNET_MQ_set_options() for this message only.
1000  *
1001  * @param env message to set options for
1002  * @param pp priorities and preferences to apply
1003  */
1004 void
1005 GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1006                            enum GNUNET_MQ_PriorityPreferences pp)
1007 {
1008   env->priority = pp;
1009   env->have_custom_options = GNUNET_YES;
1010 }
1011
1012
1013 /**
1014  * Get application-specific options for this envelope.
1015  *
1016  * @param env message to set options for
1017  * @return priorities and preferences to apply for @a env
1018  */
1019 enum GNUNET_MQ_PriorityPreferences
1020 GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env)
1021 {
1022   struct GNUNET_MQ_Handle *mq = env->parent_queue;
1023
1024   if (GNUNET_YES == env->have_custom_options)
1025     return env->priority;
1026   if (NULL == mq)
1027     return 0;
1028   return mq->priority;
1029 }
1030
1031
1032 /**
1033  * Combine performance preferences set for different
1034  * envelopes that are being combined into one larger envelope.
1035  *
1036  * @param p1 one set of preferences
1037  * @param p2 second set of preferences
1038  * @return combined priority and preferences to use
1039  */
1040 enum GNUNET_MQ_PriorityPreferences
1041 GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1,
1042                                enum GNUNET_MQ_PriorityPreferences p2)
1043 {
1044   enum GNUNET_MQ_PriorityPreferences ret;
1045
1046   ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
1047   ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
1048   ret |=
1049     ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY));
1050   ret |=
1051     ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED));
1052   ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
1053   ret |=
1054     ((p1 & GNUNET_MQ_PREF_OUT_OF_ORDER) & (p2 & GNUNET_MQ_PREF_OUT_OF_ORDER));
1055   return ret;
1056 }
1057
1058
1059 /**
1060  * Set application-specific default options for this queue.
1061  *
1062  * @param mq message queue to set options for
1063  * @param pp priorities and preferences to apply
1064  */
1065 void
1066 GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1067                        enum GNUNET_MQ_PriorityPreferences pp)
1068 {
1069   mq->priority = pp;
1070 }
1071
1072
1073 /**
1074  * Obtain message contained in envelope.
1075  *
1076  * @param env the envelope
1077  * @return message contained in the envelope
1078  */
1079 const struct GNUNET_MessageHeader *
1080 GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env)
1081 {
1082   return env->mh;
1083 }
1084
1085
1086 /**
1087  * Return next envelope in queue.
1088  *
1089  * @param env a queued envelope
1090  * @return next one, or NULL
1091  */
1092 const struct GNUNET_MQ_Envelope *
1093 GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env)
1094 {
1095   return env->next;
1096 }
1097
1098
1099 /**
1100  * Register function to be called whenever @a mq is being
1101  * destroyed.
1102  *
1103  * @param mq message queue to watch
1104  * @param cb function to call on @a mq destruction
1105  * @param cb_cls closure for @a cb
1106  * @return handle for #GNUNET_MQ_destroy_notify_cancel().
1107  */
1108 struct GNUNET_MQ_DestroyNotificationHandle *
1109 GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1110                           GNUNET_SCHEDULER_TaskCallback cb,
1111                           void *cb_cls)
1112 {
1113   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1114
1115   dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
1116   dnh->mq = mq;
1117   dnh->cb = cb;
1118   dnh->cb_cls = cb_cls;
1119   GNUNET_CONTAINER_DLL_insert (mq->dnh_head, mq->dnh_tail, dnh);
1120   return dnh;
1121 }
1122
1123
1124 /**
1125  * Cancel registration from #GNUNET_MQ_destroy_notify().
1126  *
1127  * @param dnh handle for registration to cancel
1128  */
1129 void
1130 GNUNET_MQ_destroy_notify_cancel (
1131   struct GNUNET_MQ_DestroyNotificationHandle *dnh)
1132 {
1133   struct GNUNET_MQ_Handle *mq = dnh->mq;
1134
1135   GNUNET_CONTAINER_DLL_remove (mq->dnh_head, mq->dnh_tail, dnh);
1136   GNUNET_free (dnh);
1137 }
1138
1139
1140 /**
1141  * Insert @a env into the envelope DLL starting at @a env_head
1142  * Note that @a env must not be in any MQ while this function
1143  * is used with DLLs defined outside of the MQ module.  This
1144  * is just in case some application needs to also manage a
1145  * FIFO of envelopes independent of MQ itself and wants to
1146  * re-use the pointers internal to @a env.  Use with caution.
1147  *
1148  * @param[in|out] env_head of envelope DLL
1149  * @param[in|out] env_tail tail of envelope DLL
1150  * @param[in|out] env element to insert at the tail
1151  */
1152 void
1153 GNUNET_MQ_dll_insert_head (struct GNUNET_MQ_Envelope **env_head,
1154                            struct GNUNET_MQ_Envelope **env_tail,
1155                            struct GNUNET_MQ_Envelope *env)
1156 {
1157   GNUNET_CONTAINER_DLL_insert (*env_head, *env_tail, env);
1158 }
1159
1160
1161 /**
1162  * Insert @a env into the envelope DLL starting at @a env_head
1163  * Note that @a env must not be in any MQ while this function
1164  * is used with DLLs defined outside of the MQ module.  This
1165  * is just in case some application needs to also manage a
1166  * FIFO of envelopes independent of MQ itself and wants to
1167  * re-use the pointers internal to @a env.  Use with caution.
1168  *
1169  * @param[in|out] env_head of envelope DLL
1170  * @param[in|out] env_tail tail of envelope DLL
1171  * @param[in|out] env element to insert at the tail
1172  */
1173 void
1174 GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
1175                            struct GNUNET_MQ_Envelope **env_tail,
1176                            struct GNUNET_MQ_Envelope *env)
1177 {
1178   GNUNET_CONTAINER_DLL_insert_tail (*env_head, *env_tail, env);
1179 }
1180
1181
1182 /**
1183  * Remove @a env from the envelope DLL starting at @a env_head.
1184  * Note that @a env must not be in any MQ while this function
1185  * is used with DLLs defined outside of the MQ module. This
1186  * is just in case some application needs to also manage a
1187  * FIFO of envelopes independent of MQ itself and wants to
1188  * re-use the pointers internal to @a env.  Use with caution.
1189  *
1190  * @param[in|out] env_head of envelope DLL
1191  * @param[in|out] env_tail tail of envelope DLL
1192  * @param[in|out] env element to remove from the DLL
1193  */
1194 void
1195 GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
1196                       struct GNUNET_MQ_Envelope **env_tail,
1197                       struct GNUNET_MQ_Envelope *env)
1198 {
1199   GNUNET_CONTAINER_DLL_remove (*env_head, *env_tail, env);
1200 }
1201
1202
1203 /**
1204  * Copy an array of handlers.
1205  *
1206  * Useful if the array has been delared in local memory and needs to be
1207  * persisted for future use.
1208  *
1209  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1210  * @return A newly allocated array of handlers.
1211  *         Needs to be freed with #GNUNET_free.
1212  */
1213 struct GNUNET_MQ_MessageHandler *
1214 GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1215 {
1216   struct GNUNET_MQ_MessageHandler *copy;
1217   unsigned int count;
1218
1219   if (NULL == handlers)
1220     return NULL;
1221
1222   count = GNUNET_MQ_count_handlers (handlers);
1223   copy = GNUNET_new_array (count + 1, struct GNUNET_MQ_MessageHandler);
1224   GNUNET_memcpy (copy,
1225                  handlers,
1226                  count * sizeof(struct GNUNET_MQ_MessageHandler));
1227   return copy;
1228 }
1229
1230
1231 /**
1232  * Copy an array of handlers, appending AGPL handler.
1233  *
1234  * Useful if the array has been delared in local memory and needs to be
1235  * persisted for future use.
1236  *
1237  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1238  * @param agpl_handler function to call for AGPL handling
1239  * @param agpl_cls closure for @a agpl_handler
1240  * @return A newly allocated array of handlers.
1241  *         Needs to be freed with #GNUNET_free.
1242  */
1243 struct GNUNET_MQ_MessageHandler *
1244 GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers,
1245                           GNUNET_MQ_MessageCallback agpl_handler,
1246                           void *agpl_cls)
1247 {
1248   struct GNUNET_MQ_MessageHandler *copy;
1249   unsigned int count;
1250
1251   if (NULL == handlers)
1252     return NULL;
1253   count = GNUNET_MQ_count_handlers (handlers);
1254   copy = GNUNET_new_array (count + 2, struct GNUNET_MQ_MessageHandler);
1255   GNUNET_memcpy (copy,
1256                  handlers,
1257                  count * sizeof(struct GNUNET_MQ_MessageHandler));
1258   copy[count].mv = NULL;
1259   copy[count].cb = agpl_handler;
1260   copy[count].cls = agpl_cls;
1261   copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
1262   copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
1263   return copy;
1264 }
1265
1266
1267 /**
1268  * Count the handlers in a handler array.
1269  *
1270  * @param handlers Array of handlers to be counted.
1271  * @return The number of handlers in the array.
1272  */
1273 unsigned int
1274 GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1275 {
1276   unsigned int i;
1277
1278   if (NULL == handlers)
1279     return 0;
1280
1281   for (i = 0; NULL != handlers[i].cb; i++)
1282     ;
1283
1284   return i;
1285 }
1286
1287
1288 /**
1289  * Convert an `enum GNUNET_MQ_PreferenceType` to a string
1290  *
1291  * @param type the preference type
1292  * @return a string or NULL if invalid
1293  */
1294 const char *
1295 GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type)
1296 {
1297   switch (type)
1298   {
1299   case GNUNET_MQ_PREFERENCE_NONE:
1300     return "NONE";
1301
1302   case GNUNET_MQ_PREFERENCE_BANDWIDTH:
1303     return "BANDWIDTH";
1304
1305   case GNUNET_MQ_PREFERENCE_LATENCY:
1306     return "LATENCY";
1307
1308   case GNUNET_MQ_PREFERENCE_RELIABILITY:
1309     return "RELIABILITY";
1310   }
1311   ;
1312   return NULL;
1313 }
1314
1315
1316 /* end of mq.c */