-remove debug message
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2019 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header.
48    * The GNUNET_MQ_Envelope header is allocated at
49    * the end of the message.
50    */
51   struct GNUNET_MessageHeader *mh;
52
53   /**
54    * Queue the message is queued in, NULL if message is not queued.
55    */
56   struct GNUNET_MQ_Handle *parent_queue;
57
58   /**
59    * Called after the message was sent irrevocably.
60    */
61   GNUNET_SCHEDULER_TaskCallback sent_cb;
62
63   /**
64    * Closure for @e send_cb
65    */
66   void *sent_cls;
67
68   /**
69    * Flags that were set for this envelope by
70    * #GNUNET_MQ_env_set_options().   Only valid if
71    * @e have_custom_options is set.
72    */
73   enum GNUNET_MQ_PriorityPreferences priority;
74
75   /**
76    * Did the application call #GNUNET_MQ_env_set_options()?
77    */
78   int have_custom_options;
79 };
80
81
82 /**
83  * Handle to a message queue.
84  */
85 struct GNUNET_MQ_Handle
86 {
87   /**
88    * Handlers array, or NULL if the queue should not receive messages
89    */
90   struct GNUNET_MQ_MessageHandler *handlers;
91
92   /**
93    * Actual implementation of message sending,
94    * called when a message is added
95    */
96   GNUNET_MQ_SendImpl send_impl;
97
98   /**
99    * Implementation-dependent queue destruction function
100    */
101   GNUNET_MQ_DestroyImpl destroy_impl;
102
103   /**
104    * Implementation-dependent send cancel function
105    */
106   GNUNET_MQ_CancelImpl cancel_impl;
107
108   /**
109    * Implementation-specific state
110    */
111   void *impl_state;
112
113   /**
114    * Callback will be called when an error occurs.
115    */
116   GNUNET_MQ_ErrorHandler error_handler;
117
118   /**
119    * Closure for the error handler.
120    */
121   void *error_handler_cls;
122
123   /**
124    * Task to asynchronously run #impl_send_continue().
125    */
126   struct GNUNET_SCHEDULER_Task *send_task;
127
128   /**
129    * Linked list of messages pending to be sent
130    */
131   struct GNUNET_MQ_Envelope *envelope_head;
132
133   /**
134    * Linked list of messages pending to be sent
135    */
136   struct GNUNET_MQ_Envelope *envelope_tail;
137
138   /**
139    * Message that is currently scheduled to be
140    * sent. Not the head of the message queue, as the implementation
141    * needs to know if sending has been already scheduled or not.
142    */
143   struct GNUNET_MQ_Envelope *current_envelope;
144
145   /**
146    * Map of associations, lazily allocated
147    */
148   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
149
150   /**
151    * Functions to call on queue destruction; kept in a DLL.
152    */
153   struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
154
155   /**
156    * Functions to call on queue destruction; kept in a DLL.
157    */
158   struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
159
160   /**
161    * Flags that were set for this queue by
162    * #GNUNET_MQ_set_options().   Default is 0.
163    */
164   enum GNUNET_MQ_PriorityPreferences priority;
165
166   /**
167    * Next id that should be used for the @e assoc_map,
168    * initialized lazily to a random value together with
169    * @e assoc_map
170    */
171   uint32_t assoc_id;
172
173   /**
174    * Number of entries we have in the envelope-DLL.
175    */
176   unsigned int queue_length;
177
178   /**
179    * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
180    * FIXME: is this dead?
181    */
182   int evacuate_called;
183
184   /**
185    * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
186    */
187   int in_flight;
188 };
189
190
191 /**
192  * Call the message message handler that was registered
193  * for the type of the given message in the given message queue.
194  *
195  * This function is indended to be used for the implementation
196  * of message queues.
197  *
198  * @param mq message queue with the handlers
199  * @param mh message to dispatch
200  */
201 void
202 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
203                           const struct GNUNET_MessageHeader *mh)
204 {
205   int ret;
206
207   ret = GNUNET_MQ_handle_message (mq->handlers, mh);
208   if (GNUNET_SYSERR == ret)
209   {
210     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED);
211     return;
212   }
213 }
214
215
216 /**
217  * Call the message message handler that was registered
218  * for the type of the given message in the given @a handlers list.
219  *
220  * This function is indended to be used for the implementation
221  * of message queues.
222  *
223  * @param handlers a set of handlers
224  * @param mh message to dispatch
225  * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
226  *         #GNUNET_SYSERR if message was rejected by check function
227  */
228 int
229 GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
230                           const struct GNUNET_MessageHeader *mh)
231 {
232   const struct GNUNET_MQ_MessageHandler *handler;
233   int handled = GNUNET_NO;
234   uint16_t msize = ntohs (mh->size);
235   uint16_t mtype = ntohs (mh->type);
236
237   LOG (GNUNET_ERROR_TYPE_DEBUG,
238        "Received message of type %u and size %u\n",
239        mtype,
240        msize);
241
242   if (NULL == handlers)
243     goto done;
244   for (handler = handlers; NULL != handler->cb; handler++)
245   {
246     if (handler->type == mtype)
247     {
248       handled = GNUNET_YES;
249       if ((handler->expected_size > msize) ||
250           ((handler->expected_size != msize) && (NULL == handler->mv)))
251       {
252         /* Too small, or not an exact size and
253            no 'mv' handler to check rest */
254         LOG (GNUNET_ERROR_TYPE_ERROR,
255              "Received malformed message of type %u\n",
256              (unsigned int) handler->type);
257         return GNUNET_SYSERR;
258       }
259       if ((NULL == handler->mv) ||
260           (GNUNET_OK == handler->mv (handler->cls, mh)))
261       {
262         /* message well-formed, pass to handler */
263         handler->cb (handler->cls, mh);
264       }
265       else
266       {
267         /* Message rejected by check routine */
268         LOG (GNUNET_ERROR_TYPE_ERROR,
269              "Received malformed message of type %u\n",
270              (unsigned int) handler->type);
271         return GNUNET_SYSERR;
272       }
273       break;
274     }
275   }
276 done:
277   if (GNUNET_NO == handled)
278   {
279     LOG (GNUNET_ERROR_TYPE_INFO,
280          "No handler for message of type %u and size %u\n",
281          mtype,
282          msize);
283     return GNUNET_NO;
284   }
285   return GNUNET_OK;
286 }
287
288
289 /**
290  * Call the error handler of a message queue with the given
291  * error code.  If there is no error handler, log a warning.
292  *
293  * This function is intended to be used by the implementation
294  * of message queues.
295  *
296  * @param mq message queue
297  * @param error the error type
298  */
299 void
300 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
301                         enum GNUNET_MQ_Error error)
302 {
303   if (NULL == mq->error_handler)
304   {
305     LOG (GNUNET_ERROR_TYPE_WARNING,
306          "Got error %d, but no handler installed\n",
307          (int) error);
308     return;
309   }
310   mq->error_handler (mq->error_handler_cls,
311                      error);
312 }
313
314
315 /**
316  * Discard the message queue message, free all
317  * allocated resources. Must be called in the event
318  * that a message is created but should not actually be sent.
319  *
320  * @param mqm the message to discard
321  */
322 void
323 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
324 {
325   GNUNET_assert (NULL == ev->parent_queue);
326   GNUNET_free (ev);
327 }
328
329
330 /**
331  * Obtain the current length of the message queue.
332  *
333  * @param mq queue to inspect
334  * @return number of queued, non-transmitted messages
335  */
336 unsigned int
337 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
338 {
339   if (GNUNET_YES != mq->in_flight)
340   {
341     return mq->queue_length;
342   }
343   return mq->queue_length - 1;
344 }
345
346
347 /**
348  * Send a message with the given message queue.
349  * May only be called once per message.
350  *
351  * @param mq message queue
352  * @param ev the envelope with the message to send.
353  */
354 void
355 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
356                 struct GNUNET_MQ_Envelope *ev)
357 {
358   GNUNET_assert (NULL != mq);
359   GNUNET_assert (NULL == ev->parent_queue);
360
361   mq->queue_length++;
362   if (mq->queue_length >= 10000)
363   {
364     /* This would seem like a bug... */
365     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
366                 "MQ with %u entries extended by message of type %u (FC broken?)\n",
367                 (unsigned int) mq->queue_length,
368                 (unsigned int) ntohs (ev->mh->type));
369   }
370   ev->parent_queue = mq;
371   /* is the implementation busy? queue it! */
372   if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
373   {
374     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
375                                       mq->envelope_tail,
376                                       ev);
377     return;
378   }
379   GNUNET_assert (NULL == mq->envelope_head);
380   mq->current_envelope = ev;
381
382   LOG (GNUNET_ERROR_TYPE_DEBUG,
383        "sending message of type %u, queue empty (MQ: %p)\n",
384        ntohs (ev->mh->type),
385        mq);
386
387   mq->send_impl (mq,
388                  ev->mh,
389                  mq->impl_state);
390 }
391
392
393 /**
394  * Remove the first envelope that has not yet been sent from the message
395  * queue and return it.
396  *
397  * @param mq queue to remove envelope from
398  * @return NULL if queue is empty (or has no envelope that is not under transmission)
399  */
400 struct GNUNET_MQ_Envelope *
401 GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
402 {
403   struct GNUNET_MQ_Envelope *env;
404
405   env = mq->envelope_head;
406   GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, env);
407   mq->queue_length--;
408   env->parent_queue = NULL;
409   return env;
410 }
411
412
413 /**
414  * Function to copy an envelope.  The envelope must not yet
415  * be in any queue or have any options or callbacks set.
416  *
417  * @param env envelope to copy
418  * @return copy of @a env
419  */
420 struct GNUNET_MQ_Envelope *
421 GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
422 {
423   GNUNET_assert (NULL == env->next);
424   GNUNET_assert (NULL == env->parent_queue);
425   GNUNET_assert (NULL == env->sent_cb);
426   GNUNET_assert (GNUNET_NO == env->have_custom_options);
427   return GNUNET_MQ_msg_copy (env->mh);
428 }
429
430
431 /**
432  * Send a copy of a message with the given message queue.
433  * Can be called repeatedly on the same envelope.
434  *
435  * @param mq message queue
436  * @param ev the envelope with the message to send.
437  */
438 void
439 GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
440                      const struct GNUNET_MQ_Envelope *ev)
441 {
442   struct GNUNET_MQ_Envelope *env;
443   uint16_t msize;
444
445   msize = ntohs (ev->mh->size);
446   env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize);
447   env->mh = (struct GNUNET_MessageHeader *) &env[1];
448   env->sent_cb = ev->sent_cb;
449   env->sent_cls = ev->sent_cls;
450   GNUNET_memcpy (&env[1], ev->mh, msize);
451   GNUNET_MQ_send (mq, env);
452 }
453
454
455 /**
456  * Task run to call the send implementation for the next queued
457  * message, if any.  Only useful for implementing message queues,
458  * results in undefined behavior if not used carefully.
459  *
460  * @param cls message queue to send the next message with
461  */
462 static void
463 impl_send_continue (void *cls)
464 {
465   struct GNUNET_MQ_Handle *mq = cls;
466
467   mq->send_task = NULL;
468   /* call is only valid if we're actually currently sending
469    * a message */
470   if (NULL == mq->envelope_head)
471     return;
472   mq->current_envelope = mq->envelope_head;
473   GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
474                                mq->envelope_tail,
475                                mq->current_envelope);
476
477   LOG (GNUNET_ERROR_TYPE_DEBUG,
478        "sending message of type %u from queue\n",
479        ntohs (mq->current_envelope->mh->type));
480
481   mq->send_impl (mq,
482                  mq->current_envelope->mh,
483                  mq->impl_state);
484 }
485
486
487 /**
488  * Call the send implementation for the next queued message, if any.
489  * Only useful for implementing message queues, results in undefined
490  * behavior if not used carefully.
491  *
492  * @param mq message queue to send the next message with
493  */
494 void
495 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
496 {
497   struct GNUNET_MQ_Envelope *current_envelope;
498   GNUNET_SCHEDULER_TaskCallback cb;
499
500   GNUNET_assert (0 < mq->queue_length);
501   mq->queue_length--;
502   mq->in_flight = GNUNET_NO;
503   current_envelope = mq->current_envelope;
504   current_envelope->parent_queue = NULL;
505   mq->current_envelope = NULL;
506   GNUNET_assert (NULL == mq->send_task);
507   mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq);
508   if (NULL != (cb = current_envelope->sent_cb))
509   {
510     current_envelope->sent_cb = NULL;
511     cb (current_envelope->sent_cls);
512   }
513   GNUNET_free (current_envelope);
514 }
515
516
517 /**
518  * Call the send notification for the current message, but do not
519  * try to send the next message until #GNUNET_MQ_impl_send_continue
520  * is called.
521  *
522  * Only useful for implementing message queues, results in undefined
523  * behavior if not used carefully.
524  *
525  * @param mq message queue to send the next message with
526  */
527 void
528 GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
529 {
530   struct GNUNET_MQ_Envelope *current_envelope;
531   GNUNET_SCHEDULER_TaskCallback cb;
532
533   mq->in_flight = GNUNET_YES;
534   /* call is only valid if we're actually currently sending
535    * a message */
536   current_envelope = mq->current_envelope;
537   GNUNET_assert (NULL != current_envelope);
538   /* can't call cancel from now on anymore */
539   current_envelope->parent_queue = NULL;
540   if (NULL != (cb = current_envelope->sent_cb))
541   {
542     current_envelope->sent_cb = NULL;
543     cb (current_envelope->sent_cls);
544   }
545 }
546
547
548 /**
549  * Create a message queue for the specified handlers.
550  *
551  * @param send function the implements sending messages
552  * @param destroy function that implements destroying the queue
553  * @param cancel function that implements canceling a message
554  * @param impl_state for the queue, passed to 'send' and 'destroy'
555  * @param handlers array of message handlers
556  * @param error_handler handler for read and write errors
557  * @param error_handler_cls closure for @a error_handler
558  * @return a new message queue
559  */
560 struct GNUNET_MQ_Handle *
561 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
562                                GNUNET_MQ_DestroyImpl destroy,
563                                GNUNET_MQ_CancelImpl cancel,
564                                void *impl_state,
565                                const struct GNUNET_MQ_MessageHandler *handlers,
566                                GNUNET_MQ_ErrorHandler error_handler,
567                                void *error_handler_cls)
568 {
569   struct GNUNET_MQ_Handle *mq;
570
571   mq = GNUNET_new (struct GNUNET_MQ_Handle);
572   mq->send_impl = send;
573   mq->destroy_impl = destroy;
574   mq->cancel_impl = cancel;
575   mq->handlers = GNUNET_MQ_copy_handlers (handlers);
576   mq->error_handler = error_handler;
577   mq->error_handler_cls = error_handler_cls;
578   mq->impl_state = impl_state;
579
580   return mq;
581 }
582
583
584 /**
585  * Change the closure argument in all of the `handlers` of the
586  * @a mq.
587  *
588  * @param mq to modify
589  * @param handlers_cls new closure to use
590  */
591 void
592 GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls)
593 {
594   if (NULL == mq->handlers)
595     return;
596   for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
597     mq->handlers[i].cls = handlers_cls;
598 }
599
600
601 /**
602  * Get the message that should currently be sent.
603  * Fails if there is no current message.
604  * Only useful for implementing message queues,
605  * results in undefined behavior if not used carefully.
606  *
607  * @param mq message queue with the current message
608  * @return message to send, never NULL
609  */
610 const struct GNUNET_MessageHeader *
611 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
612 {
613   GNUNET_assert (NULL != mq->current_envelope);
614   GNUNET_assert (NULL != mq->current_envelope->mh);
615   return mq->current_envelope->mh;
616 }
617
618
619 /**
620  * Get the implementation state associated with the
621  * message queue.
622  *
623  * While the GNUNET_MQ_Impl* callbacks receive the
624  * implementation state, continuations that are scheduled
625  * by the implementation function often only have one closure
626  * argument, with this function it is possible to get at the
627  * implementation state when only passing the GNUNET_MQ_Handle
628  * as closure.
629  *
630  * @param mq message queue with the current message
631  * @return message to send, never NULL
632  */
633 void *
634 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
635 {
636   return mq->impl_state;
637 }
638
639
640 struct GNUNET_MQ_Envelope *
641 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
642 {
643   struct GNUNET_MQ_Envelope *ev;
644
645   ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope));
646   ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
647   ev->mh->size = htons (size);
648   ev->mh->type = htons (type);
649   if (NULL != mhp)
650     *mhp = ev->mh;
651   return ev;
652 }
653
654
655 /**
656  * Create a new envelope by copying an existing message.
657  *
658  * @param hdr header of the message to copy
659  * @return envelope containing @a hdr
660  */
661 struct GNUNET_MQ_Envelope *
662 GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
663 {
664   struct GNUNET_MQ_Envelope *mqm;
665   uint16_t size = ntohs (hdr->size);
666
667   mqm = GNUNET_malloc (sizeof(*mqm) + size);
668   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
669   GNUNET_memcpy (mqm->mh, hdr, size);
670   return mqm;
671 }
672
673
674 /**
675  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
676  *
677  * @param mhp pointer to the message header pointer that will be changed to allocate at
678  *        the newly allocated space for the message.
679  * @param base_size size of the data before the nested message
680  * @param type type of the message in the envelope
681  * @param nested_mh the message to append to the message after base_size
682  */
683 struct GNUNET_MQ_Envelope *
684 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
685                           uint16_t base_size,
686                           uint16_t type,
687                           const struct GNUNET_MessageHeader *nested_mh)
688 {
689   struct GNUNET_MQ_Envelope *mqm;
690   uint16_t size;
691
692   if (NULL == nested_mh)
693     return GNUNET_MQ_msg_ (mhp, base_size, type);
694
695   size = base_size + ntohs (nested_mh->size);
696
697   /* check for uint16_t overflow */
698   if (size < base_size)
699     return NULL;
700
701   mqm = GNUNET_MQ_msg_ (mhp, size, type);
702   GNUNET_memcpy ((char *) mqm->mh + base_size,
703                  nested_mh,
704                  ntohs (nested_mh->size));
705
706   return mqm;
707 }
708
709
710 /**
711  * Associate the assoc_data in mq with a unique request id.
712  *
713  * @param mq message queue, id will be unique for the queue
714  * @param assoc_data to associate
715  */
716 uint32_t
717 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data)
718 {
719   uint32_t id;
720
721   if (NULL == mq->assoc_map)
722   {
723     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
724     mq->assoc_id = 1;
725   }
726   id = mq->assoc_id++;
727   GNUNET_assert (GNUNET_OK ==
728                  GNUNET_CONTAINER_multihashmap32_put (
729                    mq->assoc_map,
730                    id,
731                    assoc_data,
732                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
733   return id;
734 }
735
736
737 /**
738  * Get the data associated with a @a request_id in a queue
739  *
740  * @param mq the message queue with the association
741  * @param request_id the request id we are interested in
742  * @return the associated data
743  */
744 void *
745 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
746 {
747   if (NULL == mq->assoc_map)
748     return NULL;
749   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
750 }
751
752
753 /**
754  * Remove the association for a @a request_id
755  *
756  * @param mq the message queue with the association
757  * @param request_id the request id we want to remove
758  * @return the associated data
759  */
760 void *
761 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
762 {
763   void *val;
764
765   if (NULL == mq->assoc_map)
766     return NULL;
767   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
768   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
769   return val;
770 }
771
772
773 /**
774  * Call a callback once the envelope has been sent, that is,
775  * sending it can not be canceled anymore.
776  * There can be only one notify sent callback per envelope.
777  *
778  * @param ev message to call the notify callback for
779  * @param cb the notify callback
780  * @param cb_cls closure for the callback
781  */
782 void
783 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
784                        GNUNET_SCHEDULER_TaskCallback cb,
785                        void *cb_cls)
786 {
787   /* allow setting *OR* clearing callback */
788   GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
789   ev->sent_cb = cb;
790   ev->sent_cls = cb_cls;
791 }
792
793
794 /**
795  * Handle we return for callbacks registered to be
796  * notified when #GNUNET_MQ_destroy() is called on a queue.
797  */
798 struct GNUNET_MQ_DestroyNotificationHandle
799 {
800   /**
801    * Kept in a DLL.
802    */
803   struct GNUNET_MQ_DestroyNotificationHandle *prev;
804
805   /**
806    * Kept in a DLL.
807    */
808   struct GNUNET_MQ_DestroyNotificationHandle *next;
809
810   /**
811    * Queue to notify about.
812    */
813   struct GNUNET_MQ_Handle *mq;
814
815   /**
816    * Function to call.
817    */
818   GNUNET_SCHEDULER_TaskCallback cb;
819
820   /**
821    * Closure for @e cb.
822    */
823   void *cb_cls;
824 };
825
826
827 /**
828  * Destroy the message queue.
829  *
830  * @param mq message queue to destroy
831  */
832 void
833 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
834 {
835   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
836
837   if (NULL != mq->destroy_impl)
838   {
839     mq->destroy_impl (mq, mq->impl_state);
840   }
841   if (NULL != mq->send_task)
842   {
843     GNUNET_SCHEDULER_cancel (mq->send_task);
844     mq->send_task = NULL;
845   }
846   while (NULL != mq->envelope_head)
847   {
848     struct GNUNET_MQ_Envelope *ev;
849
850     ev = mq->envelope_head;
851     ev->parent_queue = NULL;
852     GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
853     GNUNET_assert (0 < mq->queue_length);
854     mq->queue_length--;
855     LOG (GNUNET_ERROR_TYPE_DEBUG,
856          "MQ destroy drops message of type %u\n",
857          ntohs (ev->mh->type));
858     GNUNET_MQ_discard (ev);
859   }
860   if (NULL != mq->current_envelope)
861   {
862     /* we can only discard envelopes that
863      * are not queued! */
864     mq->current_envelope->parent_queue = NULL;
865     LOG (GNUNET_ERROR_TYPE_DEBUG,
866          "MQ destroy drops current message of type %u\n",
867          ntohs (mq->current_envelope->mh->type));
868     GNUNET_MQ_discard (mq->current_envelope);
869     mq->current_envelope = NULL;
870     GNUNET_assert (0 < mq->queue_length);
871     mq->queue_length--;
872   }
873   GNUNET_assert (0 == mq->queue_length);
874   while (NULL != (dnh = mq->dnh_head))
875   {
876     dnh->cb (dnh->cb_cls);
877     GNUNET_MQ_destroy_notify_cancel (dnh);
878   }
879   if (NULL != mq->assoc_map)
880   {
881     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
882     mq->assoc_map = NULL;
883   }
884   GNUNET_free_non_null (mq->handlers);
885   GNUNET_free (mq);
886 }
887
888
889 const struct GNUNET_MessageHeader *
890 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
891                               uint16_t base_size)
892 {
893   uint16_t whole_size;
894   uint16_t nested_size;
895   const struct GNUNET_MessageHeader *nested_msg;
896
897   whole_size = ntohs (mh->size);
898   GNUNET_assert (whole_size >= base_size);
899   nested_size = whole_size - base_size;
900   if (0 == nested_size)
901     return NULL;
902   if (nested_size < sizeof(struct GNUNET_MessageHeader))
903   {
904     GNUNET_break_op (0);
905     return NULL;
906   }
907   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
908   if (ntohs (nested_msg->size) != nested_size)
909   {
910     GNUNET_break_op (0);
911     return NULL;
912   }
913   return nested_msg;
914 }
915
916
917 /**
918  * Cancel sending the message. Message must have been sent with
919  * #GNUNET_MQ_send before.  May not be called after the notify sent
920  * callback has been called
921  *
922  * @param ev queued envelope to cancel
923  */
924 void
925 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
926 {
927   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
928
929   GNUNET_assert (NULL != mq);
930   GNUNET_assert (NULL != mq->cancel_impl);
931
932   mq->evacuate_called = GNUNET_NO;
933
934   if (mq->current_envelope == ev)
935   {
936     /* complex case, we already started with transmitting
937        the message using the callbacks. */
938     GNUNET_assert (GNUNET_NO == mq->in_flight);
939     GNUNET_assert (0 < mq->queue_length);
940     mq->queue_length--;
941     mq->cancel_impl (mq,
942                      mq->impl_state);
943     /* continue sending the next message, if any */
944     mq->current_envelope = mq->envelope_head;
945     if (NULL != mq->current_envelope)
946     {
947       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
948                                    mq->envelope_tail,
949                                    mq->current_envelope);
950
951       LOG (GNUNET_ERROR_TYPE_DEBUG,
952            "sending canceled message of type %u queue\n",
953            ntohs (ev->mh->type));
954       mq->send_impl (mq,
955                      mq->current_envelope->mh,
956                      mq->impl_state);
957     }
958   }
959   else
960   {
961     /* simple case, message is still waiting in the queue */
962     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
963                                  mq->envelope_tail,
964                                  ev);
965     GNUNET_assert (0 < mq->queue_length);
966     mq->queue_length--;
967   }
968
969   if (GNUNET_YES != mq->evacuate_called)
970   {
971     ev->parent_queue = NULL;
972     ev->mh = NULL;
973     /* also frees ev */
974     GNUNET_free (ev);
975   }
976 }
977
978
979 /**
980  * Function to obtain the current envelope
981  * from within #GNUNET_MQ_SendImpl implementations.
982  *
983  * @param mq message queue to interrogate
984  * @return the current envelope
985  */
986 struct GNUNET_MQ_Envelope *
987 GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
988 {
989   return mq->current_envelope;
990 }
991
992
993 /**
994  * Function to obtain the last envelope in the queue.
995  *
996  * @param mq message queue to interrogate
997  * @return the last envelope in the queue
998  */
999 struct GNUNET_MQ_Envelope *
1000 GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
1001 {
1002   if (NULL != mq->envelope_tail)
1003     return mq->envelope_tail;
1004
1005   return mq->current_envelope;
1006 }
1007
1008
1009 /**
1010  * Set application-specific preferences for this envelope.
1011  * Overrides the options set for the queue with
1012  * #GNUNET_MQ_set_options() for this message only.
1013  *
1014  * @param env message to set options for
1015  * @param pp priorities and preferences to apply
1016  */
1017 void
1018 GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1019                            enum GNUNET_MQ_PriorityPreferences pp)
1020 {
1021   env->priority = pp;
1022   env->have_custom_options = GNUNET_YES;
1023 }
1024
1025
1026 /**
1027  * Get application-specific options for this envelope.
1028  *
1029  * @param env message to set options for
1030  * @return priorities and preferences to apply for @a env
1031  */
1032 enum GNUNET_MQ_PriorityPreferences
1033 GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env)
1034 {
1035   struct GNUNET_MQ_Handle *mq = env->parent_queue;
1036
1037   if (GNUNET_YES == env->have_custom_options)
1038     return env->priority;
1039   if (NULL == mq)
1040     return 0;
1041   return mq->priority;
1042 }
1043
1044
1045 /**
1046  * Combine performance preferences set for different
1047  * envelopes that are being combined into one larger envelope.
1048  *
1049  * @param p1 one set of preferences
1050  * @param p2 second set of preferences
1051  * @return combined priority and preferences to use
1052  */
1053 enum GNUNET_MQ_PriorityPreferences
1054 GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1,
1055                                enum GNUNET_MQ_PriorityPreferences p2)
1056 {
1057   enum GNUNET_MQ_PriorityPreferences ret;
1058
1059   ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
1060   ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
1061   ret |=
1062     ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY));
1063   ret |=
1064     ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED));
1065   ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
1066   ret |=
1067     ((p1 & GNUNET_MQ_PREF_OUT_OF_ORDER) & (p2 & GNUNET_MQ_PREF_OUT_OF_ORDER));
1068   return ret;
1069 }
1070
1071
1072 /**
1073  * Set application-specific default options for this queue.
1074  *
1075  * @param mq message queue to set options for
1076  * @param pp priorities and preferences to apply
1077  */
1078 void
1079 GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1080                        enum GNUNET_MQ_PriorityPreferences pp)
1081 {
1082   mq->priority = pp;
1083 }
1084
1085
1086 /**
1087  * Obtain message contained in envelope.
1088  *
1089  * @param env the envelope
1090  * @return message contained in the envelope
1091  */
1092 const struct GNUNET_MessageHeader *
1093 GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env)
1094 {
1095   return env->mh;
1096 }
1097
1098
1099 /**
1100  * Return next envelope in queue.
1101  *
1102  * @param env a queued envelope
1103  * @return next one, or NULL
1104  */
1105 const struct GNUNET_MQ_Envelope *
1106 GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env)
1107 {
1108   return env->next;
1109 }
1110
1111
1112 /**
1113  * Register function to be called whenever @a mq is being
1114  * destroyed.
1115  *
1116  * @param mq message queue to watch
1117  * @param cb function to call on @a mq destruction
1118  * @param cb_cls closure for @a cb
1119  * @return handle for #GNUNET_MQ_destroy_notify_cancel().
1120  */
1121 struct GNUNET_MQ_DestroyNotificationHandle *
1122 GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1123                           GNUNET_SCHEDULER_TaskCallback cb,
1124                           void *cb_cls)
1125 {
1126   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1127
1128   dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
1129   dnh->mq = mq;
1130   dnh->cb = cb;
1131   dnh->cb_cls = cb_cls;
1132   GNUNET_CONTAINER_DLL_insert (mq->dnh_head, mq->dnh_tail, dnh);
1133   return dnh;
1134 }
1135
1136
1137 /**
1138  * Cancel registration from #GNUNET_MQ_destroy_notify().
1139  *
1140  * @param dnh handle for registration to cancel
1141  */
1142 void
1143 GNUNET_MQ_destroy_notify_cancel (struct
1144                                  GNUNET_MQ_DestroyNotificationHandle *dnh)
1145 {
1146   struct GNUNET_MQ_Handle *mq = dnh->mq;
1147
1148   GNUNET_CONTAINER_DLL_remove (mq->dnh_head, mq->dnh_tail, dnh);
1149   GNUNET_free (dnh);
1150 }
1151
1152
1153 /**
1154  * Insert @a env into the envelope DLL starting at @a env_head
1155  * Note that @a env must not be in any MQ while this function
1156  * is used with DLLs defined outside of the MQ module.  This
1157  * is just in case some application needs to also manage a
1158  * FIFO of envelopes independent of MQ itself and wants to
1159  * re-use the pointers internal to @a env.  Use with caution.
1160  *
1161  * @param[in|out] env_head of envelope DLL
1162  * @param[in|out] env_tail tail of envelope DLL
1163  * @param[in|out] env element to insert at the tail
1164  */
1165 void
1166 GNUNET_MQ_dll_insert_head (struct GNUNET_MQ_Envelope **env_head,
1167                            struct GNUNET_MQ_Envelope **env_tail,
1168                            struct GNUNET_MQ_Envelope *env)
1169 {
1170   GNUNET_CONTAINER_DLL_insert (*env_head, *env_tail, env);
1171 }
1172
1173
1174 /**
1175  * Insert @a env into the envelope DLL starting at @a env_head
1176  * Note that @a env must not be in any MQ while this function
1177  * is used with DLLs defined outside of the MQ module.  This
1178  * is just in case some application needs to also manage a
1179  * FIFO of envelopes independent of MQ itself and wants to
1180  * re-use the pointers internal to @a env.  Use with caution.
1181  *
1182  * @param[in|out] env_head of envelope DLL
1183  * @param[in|out] env_tail tail of envelope DLL
1184  * @param[in|out] env element to insert at the tail
1185  */
1186 void
1187 GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
1188                            struct GNUNET_MQ_Envelope **env_tail,
1189                            struct GNUNET_MQ_Envelope *env)
1190 {
1191   GNUNET_CONTAINER_DLL_insert_tail (*env_head, *env_tail, env);
1192 }
1193
1194
1195 /**
1196  * Remove @a env from the envelope DLL starting at @a env_head.
1197  * Note that @a env must not be in any MQ while this function
1198  * is used with DLLs defined outside of the MQ module. This
1199  * is just in case some application needs to also manage a
1200  * FIFO of envelopes independent of MQ itself and wants to
1201  * re-use the pointers internal to @a env.  Use with caution.
1202  *
1203  * @param[in|out] env_head of envelope DLL
1204  * @param[in|out] env_tail tail of envelope DLL
1205  * @param[in|out] env element to remove from the DLL
1206  */
1207 void
1208 GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
1209                       struct GNUNET_MQ_Envelope **env_tail,
1210                       struct GNUNET_MQ_Envelope *env)
1211 {
1212   GNUNET_CONTAINER_DLL_remove (*env_head, *env_tail, env);
1213 }
1214
1215
1216 /**
1217  * Copy an array of handlers.
1218  *
1219  * Useful if the array has been delared in local memory and needs to be
1220  * persisted for future use.
1221  *
1222  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1223  * @return A newly allocated array of handlers.
1224  *         Needs to be freed with #GNUNET_free.
1225  */
1226 struct GNUNET_MQ_MessageHandler *
1227 GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1228 {
1229   struct GNUNET_MQ_MessageHandler *copy;
1230   unsigned int count;
1231
1232   if (NULL == handlers)
1233     return NULL;
1234
1235   count = GNUNET_MQ_count_handlers (handlers);
1236   copy = GNUNET_new_array (count + 1, struct GNUNET_MQ_MessageHandler);
1237   GNUNET_memcpy (copy,
1238                  handlers,
1239                  count * sizeof(struct GNUNET_MQ_MessageHandler));
1240   return copy;
1241 }
1242
1243
1244 /**
1245  * Copy an array of handlers, appending AGPL handler.
1246  *
1247  * Useful if the array has been delared in local memory and needs to be
1248  * persisted for future use.
1249  *
1250  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1251  * @param agpl_handler function to call for AGPL handling
1252  * @param agpl_cls closure for @a agpl_handler
1253  * @return A newly allocated array of handlers.
1254  *         Needs to be freed with #GNUNET_free.
1255  */
1256 struct GNUNET_MQ_MessageHandler *
1257 GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers,
1258                           GNUNET_MQ_MessageCallback agpl_handler,
1259                           void *agpl_cls)
1260 {
1261   struct GNUNET_MQ_MessageHandler *copy;
1262   unsigned int count;
1263
1264   if (NULL == handlers)
1265     return NULL;
1266   count = GNUNET_MQ_count_handlers (handlers);
1267   copy = GNUNET_new_array (count + 2, struct GNUNET_MQ_MessageHandler);
1268   GNUNET_memcpy (copy,
1269                  handlers,
1270                  count * sizeof(struct GNUNET_MQ_MessageHandler));
1271   copy[count].mv = NULL;
1272   copy[count].cb = agpl_handler;
1273   copy[count].cls = agpl_cls;
1274   copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
1275   copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
1276   return copy;
1277 }
1278
1279
1280 /**
1281  * Count the handlers in a handler array.
1282  *
1283  * @param handlers Array of handlers to be counted.
1284  * @return The number of handlers in the array.
1285  */
1286 unsigned int
1287 GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1288 {
1289   unsigned int i;
1290
1291   if (NULL == handlers)
1292     return 0;
1293
1294   for (i = 0; NULL != handlers[i].cb; i++)
1295     ;
1296
1297   return i;
1298 }
1299
1300
1301 /**
1302  * Convert an `enum GNUNET_MQ_PreferenceType` to a string
1303  *
1304  * @param type the preference type
1305  * @return a string or NULL if invalid
1306  */
1307 const char *
1308 GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type)
1309 {
1310   switch (type)
1311   {
1312   case GNUNET_MQ_PREFERENCE_NONE:
1313     return "NONE";
1314
1315   case GNUNET_MQ_PREFERENCE_BANDWIDTH:
1316     return "BANDWIDTH";
1317
1318   case GNUNET_MQ_PREFERENCE_LATENCY:
1319     return "LATENCY";
1320
1321   case GNUNET_MQ_PREFERENCE_RELIABILITY:
1322     return "RELIABILITY";
1323   }
1324   ;
1325   return NULL;
1326 }
1327
1328
1329 /* end of mq.c */