plibc: win32 related, socket
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2019 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind, ...) GNUNET_log_from(kind, "util-mq", __VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope {
33   /**
34    * Messages are stored in a linked list.
35    * Each queue has its own list of envelopes.
36    */
37   struct GNUNET_MQ_Envelope *next;
38
39   /**
40    * Messages are stored in a linked list
41    * Each queue has its own list of envelopes.
42    */
43   struct GNUNET_MQ_Envelope *prev;
44
45   /**
46    * Actual allocated message header.
47    * The GNUNET_MQ_Envelope header is allocated at
48    * the end of the message.
49    */
50   struct GNUNET_MessageHeader *mh;
51
52   /**
53    * Queue the message is queued in, NULL if message is not queued.
54    */
55   struct GNUNET_MQ_Handle *parent_queue;
56
57   /**
58    * Called after the message was sent irrevocably.
59    */
60   GNUNET_SCHEDULER_TaskCallback sent_cb;
61
62   /**
63    * Closure for @e send_cb
64    */
65   void *sent_cls;
66
67   /**
68    * Flags that were set for this envelope by
69    * #GNUNET_MQ_env_set_options().   Only valid if
70    * @e have_custom_options is set.
71    */
72   enum GNUNET_MQ_PriorityPreferences priority;
73
74   /**
75    * Did the application call #GNUNET_MQ_env_set_options()?
76    */
77   int have_custom_options;
78 };
79
80
81 /**
82  * Handle to a message queue.
83  */
84 struct GNUNET_MQ_Handle {
85   /**
86    * Handlers array, or NULL if the queue should not receive messages
87    */
88   struct GNUNET_MQ_MessageHandler *handlers;
89
90   /**
91    * Actual implementation of message sending,
92    * called when a message is added
93    */
94   GNUNET_MQ_SendImpl send_impl;
95
96   /**
97    * Implementation-dependent queue destruction function
98    */
99   GNUNET_MQ_DestroyImpl destroy_impl;
100
101   /**
102    * Implementation-dependent send cancel function
103    */
104   GNUNET_MQ_CancelImpl cancel_impl;
105
106   /**
107    * Implementation-specific state
108    */
109   void *impl_state;
110
111   /**
112    * Callback will be called when an error occurs.
113    */
114   GNUNET_MQ_ErrorHandler error_handler;
115
116   /**
117    * Closure for the error handler.
118    */
119   void *error_handler_cls;
120
121   /**
122    * Task to asynchronously run #impl_send_continue().
123    */
124   struct GNUNET_SCHEDULER_Task *send_task;
125
126   /**
127    * Linked list of messages pending to be sent
128    */
129   struct GNUNET_MQ_Envelope *envelope_head;
130
131   /**
132    * Linked list of messages pending to be sent
133    */
134   struct GNUNET_MQ_Envelope *envelope_tail;
135
136   /**
137    * Message that is currently scheduled to be
138    * sent. Not the head of the message queue, as the implementation
139    * needs to know if sending has been already scheduled or not.
140    */
141   struct GNUNET_MQ_Envelope *current_envelope;
142
143   /**
144    * Map of associations, lazily allocated
145    */
146   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
147
148   /**
149    * Functions to call on queue destruction; kept in a DLL.
150    */
151   struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
152
153   /**
154    * Functions to call on queue destruction; kept in a DLL.
155    */
156   struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
157
158   /**
159    * Flags that were set for this queue by
160    * #GNUNET_MQ_set_options().   Default is 0.
161    */
162   enum GNUNET_MQ_PriorityPreferences priority;
163
164   /**
165    * Next id that should be used for the @e assoc_map,
166    * initialized lazily to a random value together with
167    * @e assoc_map
168    */
169   uint32_t assoc_id;
170
171   /**
172    * Number of entries we have in the envelope-DLL.
173    */
174   unsigned int queue_length;
175
176   /**
177    * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
178    * FIXME: is this dead?
179    */
180   int evacuate_called;
181
182   /**
183    * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
184    */
185   int in_flight;
186 };
187
188
189 /**
190  * Call the message message handler that was registered
191  * for the type of the given message in the given message queue.
192  *
193  * This function is indended to be used for the implementation
194  * of message queues.
195  *
196  * @param mq message queue with the handlers
197  * @param mh message to dispatch
198  */
199 void
200 GNUNET_MQ_inject_message(struct GNUNET_MQ_Handle *mq,
201                          const struct GNUNET_MessageHeader *mh)
202 {
203   int ret;
204
205   ret = GNUNET_MQ_handle_message(mq->handlers, mh);
206   if (GNUNET_SYSERR == ret)
207     {
208       GNUNET_MQ_inject_error(mq, GNUNET_MQ_ERROR_MALFORMED);
209       return;
210     }
211 }
212
213
214 /**
215  * Call the message message handler that was registered
216  * for the type of the given message in the given @a handlers list.
217  *
218  * This function is indended to be used for the implementation
219  * of message queues.
220  *
221  * @param handlers a set of handlers
222  * @param mh message to dispatch
223  * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
224  *         #GNUNET_SYSERR if message was rejected by check function
225  */
226 int
227 GNUNET_MQ_handle_message(const struct GNUNET_MQ_MessageHandler *handlers,
228                          const struct GNUNET_MessageHeader *mh)
229 {
230   const struct GNUNET_MQ_MessageHandler *handler;
231   int handled = GNUNET_NO;
232   uint16_t msize = ntohs(mh->size);
233   uint16_t mtype = ntohs(mh->type);
234
235   LOG(GNUNET_ERROR_TYPE_DEBUG,
236       "Received message of type %u and size %u\n",
237       mtype,
238       msize);
239
240   if (NULL == handlers)
241     goto done;
242   for (handler = handlers; NULL != handler->cb; handler++)
243     {
244       if (handler->type == mtype)
245         {
246           handled = GNUNET_YES;
247           if ((handler->expected_size > msize) ||
248               ((handler->expected_size != msize) && (NULL == handler->mv)))
249             {
250               /* Too small, or not an exact size and
251                  no 'mv' handler to check rest */
252               LOG(GNUNET_ERROR_TYPE_ERROR,
253                   "Received malformed message of type %u\n",
254                   (unsigned int)handler->type);
255               return GNUNET_SYSERR;
256             }
257           if ((NULL == handler->mv) ||
258               (GNUNET_OK == handler->mv(handler->cls, mh)))
259             {
260               /* message well-formed, pass to handler */
261               handler->cb(handler->cls, mh);
262             }
263           else
264             {
265               /* Message rejected by check routine */
266               LOG(GNUNET_ERROR_TYPE_ERROR,
267                   "Received malformed message of type %u\n",
268                   (unsigned int)handler->type);
269               return GNUNET_SYSERR;
270             }
271           break;
272         }
273     }
274 done:
275   if (GNUNET_NO == handled)
276     {
277       LOG(GNUNET_ERROR_TYPE_INFO,
278           "No handler for message of type %u and size %u\n",
279           mtype,
280           msize);
281       return GNUNET_NO;
282     }
283   return GNUNET_OK;
284 }
285
286
287 /**
288  * Call the error handler of a message queue with the given
289  * error code.  If there is no error handler, log a warning.
290  *
291  * This function is intended to be used by the implementation
292  * of message queues.
293  *
294  * @param mq message queue
295  * @param error the error type
296  */
297 void
298 GNUNET_MQ_inject_error(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error)
299 {
300   if (NULL == mq->error_handler)
301     {
302       LOG(GNUNET_ERROR_TYPE_WARNING,
303           "Got error %d, but no handler installed\n",
304           (int)error);
305       return;
306     }
307   mq->error_handler(mq->error_handler_cls, error);
308 }
309
310
311 /**
312  * Discard the message queue message, free all
313  * allocated resources. Must be called in the event
314  * that a message is created but should not actually be sent.
315  *
316  * @param mqm the message to discard
317  */
318 void
319 GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *ev)
320 {
321   GNUNET_assert(NULL == ev->parent_queue);
322   GNUNET_free(ev);
323 }
324
325
326 /**
327  * Obtain the current length of the message queue.
328  *
329  * @param mq queue to inspect
330  * @return number of queued, non-transmitted messages
331  */
332 unsigned int
333 GNUNET_MQ_get_length(struct GNUNET_MQ_Handle *mq)
334 {
335   if (GNUNET_YES != mq->in_flight)
336     {
337       return mq->queue_length;
338     }
339   return mq->queue_length - 1;
340 }
341
342
343 /**
344  * Send a message with the given message queue.
345  * May only be called once per message.
346  *
347  * @param mq message queue
348  * @param ev the envelope with the message to send.
349  */
350 void
351 GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
352 {
353   GNUNET_assert(NULL != mq);
354   GNUNET_assert(NULL == ev->parent_queue);
355
356   mq->queue_length++;
357   if (mq->queue_length >= 10000)
358     {
359       /* This would seem like a bug... */
360       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
361                  "MQ with %u entries extended by message of type %u (FC broken?)\n",
362                  (unsigned int)mq->queue_length,
363                  (unsigned int)ntohs(ev->mh->type));
364     }
365   ev->parent_queue = mq;
366   /* is the implementation busy? queue it! */
367   if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
368     {
369       GNUNET_CONTAINER_DLL_insert_tail(mq->envelope_head, mq->envelope_tail, ev);
370       return;
371     }
372   GNUNET_assert(NULL == mq->envelope_head);
373   mq->current_envelope = ev;
374
375   LOG(GNUNET_ERROR_TYPE_DEBUG,
376       "sending message of type %u, queue empty (MQ: %p)\n",
377       ntohs(ev->mh->type),
378       mq);
379
380   mq->send_impl(mq, ev->mh, mq->impl_state);
381 }
382
383
384 /**
385  * Remove the first envelope that has not yet been sent from the message
386  * queue and return it.
387  *
388  * @param mq queue to remove envelope from
389  * @return NULL if queue is empty (or has no envelope that is not under transmission)
390  */
391 struct GNUNET_MQ_Envelope *
392 GNUNET_MQ_unsent_head(struct GNUNET_MQ_Handle *mq)
393 {
394   struct GNUNET_MQ_Envelope *env;
395
396   env = mq->envelope_head;
397   GNUNET_CONTAINER_DLL_remove(mq->envelope_head, mq->envelope_tail, env);
398   mq->queue_length--;
399   env->parent_queue = NULL;
400   return env;
401 }
402
403
404 /**
405  * Function to copy an envelope.  The envelope must not yet
406  * be in any queue or have any options or callbacks set.
407  *
408  * @param env envelope to copy
409  * @return copy of @a env
410  */
411 struct GNUNET_MQ_Envelope *
412 GNUNET_MQ_env_copy(struct GNUNET_MQ_Envelope *env)
413 {
414   GNUNET_assert(NULL == env->next);
415   GNUNET_assert(NULL == env->parent_queue);
416   GNUNET_assert(NULL == env->sent_cb);
417   GNUNET_assert(GNUNET_NO == env->have_custom_options);
418   return GNUNET_MQ_msg_copy(env->mh);
419 }
420
421
422 /**
423  * Send a copy of a message with the given message queue.
424  * Can be called repeatedly on the same envelope.
425  *
426  * @param mq message queue
427  * @param ev the envelope with the message to send.
428  */
429 void
430 GNUNET_MQ_send_copy(struct GNUNET_MQ_Handle *mq,
431                     const struct GNUNET_MQ_Envelope *ev)
432 {
433   struct GNUNET_MQ_Envelope *env;
434   uint16_t msize;
435
436   msize = ntohs(ev->mh->size);
437   env = GNUNET_malloc(sizeof(struct GNUNET_MQ_Envelope) + msize);
438   env->mh = (struct GNUNET_MessageHeader *)&env[1];
439   env->sent_cb = ev->sent_cb;
440   env->sent_cls = ev->sent_cls;
441   GNUNET_memcpy(&env[1], ev->mh, msize);
442   GNUNET_MQ_send(mq, env);
443 }
444
445
446 /**
447  * Task run to call the send implementation for the next queued
448  * message, if any.  Only useful for implementing message queues,
449  * results in undefined behavior if not used carefully.
450  *
451  * @param cls message queue to send the next message with
452  */
453 static void
454 impl_send_continue(void *cls)
455 {
456   struct GNUNET_MQ_Handle *mq = cls;
457
458   mq->send_task = NULL;
459   /* call is only valid if we're actually currently sending
460    * a message */
461   if (NULL == mq->envelope_head)
462     return;
463   mq->current_envelope = mq->envelope_head;
464   GNUNET_CONTAINER_DLL_remove(mq->envelope_head,
465                               mq->envelope_tail,
466                               mq->current_envelope);
467
468   LOG(GNUNET_ERROR_TYPE_DEBUG,
469       "sending message of type %u from queue\n",
470       ntohs(mq->current_envelope->mh->type));
471
472   mq->send_impl(mq, mq->current_envelope->mh, mq->impl_state);
473 }
474
475
476 /**
477  * Call the send implementation for the next queued message, if any.
478  * Only useful for implementing message queues, results in undefined
479  * behavior if not used carefully.
480  *
481  * @param mq message queue to send the next message with
482  */
483 void
484 GNUNET_MQ_impl_send_continue(struct GNUNET_MQ_Handle *mq)
485 {
486   struct GNUNET_MQ_Envelope *current_envelope;
487   GNUNET_SCHEDULER_TaskCallback cb;
488
489   GNUNET_assert(0 < mq->queue_length);
490   mq->queue_length--;
491   mq->in_flight = GNUNET_NO;
492   current_envelope = mq->current_envelope;
493   current_envelope->parent_queue = NULL;
494   mq->current_envelope = NULL;
495   GNUNET_assert(NULL == mq->send_task);
496   mq->send_task = GNUNET_SCHEDULER_add_now(&impl_send_continue, mq);
497   if (NULL != (cb = current_envelope->sent_cb))
498     {
499       current_envelope->sent_cb = NULL;
500       cb(current_envelope->sent_cls);
501     }
502   GNUNET_free(current_envelope);
503 }
504
505
506 /**
507  * Call the send notification for the current message, but do not
508  * try to send the next message until #GNUNET_MQ_impl_send_continue
509  * is called.
510  *
511  * Only useful for implementing message queues, results in undefined
512  * behavior if not used carefully.
513  *
514  * @param mq message queue to send the next message with
515  */
516 void
517 GNUNET_MQ_impl_send_in_flight(struct GNUNET_MQ_Handle *mq)
518 {
519   struct GNUNET_MQ_Envelope *current_envelope;
520   GNUNET_SCHEDULER_TaskCallback cb;
521
522   mq->in_flight = GNUNET_YES;
523   /* call is only valid if we're actually currently sending
524    * a message */
525   current_envelope = mq->current_envelope;
526   GNUNET_assert(NULL != current_envelope);
527   /* can't call cancel from now on anymore */
528   current_envelope->parent_queue = NULL;
529   if (NULL != (cb = current_envelope->sent_cb))
530     {
531       current_envelope->sent_cb = NULL;
532       cb(current_envelope->sent_cls);
533     }
534 }
535
536
537 /**
538  * Create a message queue for the specified handlers.
539  *
540  * @param send function the implements sending messages
541  * @param destroy function that implements destroying the queue
542  * @param cancel function that implements canceling a message
543  * @param impl_state for the queue, passed to 'send' and 'destroy'
544  * @param handlers array of message handlers
545  * @param error_handler handler for read and write errors
546  * @param error_handler_cls closure for @a error_handler
547  * @return a new message queue
548  */
549 struct GNUNET_MQ_Handle *
550 GNUNET_MQ_queue_for_callbacks(GNUNET_MQ_SendImpl send,
551                               GNUNET_MQ_DestroyImpl destroy,
552                               GNUNET_MQ_CancelImpl cancel,
553                               void *impl_state,
554                               const struct GNUNET_MQ_MessageHandler *handlers,
555                               GNUNET_MQ_ErrorHandler error_handler,
556                               void *error_handler_cls)
557 {
558   struct GNUNET_MQ_Handle *mq;
559
560   mq = GNUNET_new(struct GNUNET_MQ_Handle);
561   mq->send_impl = send;
562   mq->destroy_impl = destroy;
563   mq->cancel_impl = cancel;
564   mq->handlers = GNUNET_MQ_copy_handlers(handlers);
565   mq->error_handler = error_handler;
566   mq->error_handler_cls = error_handler_cls;
567   mq->impl_state = impl_state;
568
569   return mq;
570 }
571
572
573 /**
574  * Change the closure argument in all of the `handlers` of the
575  * @a mq.
576  *
577  * @param mq to modify
578  * @param handlers_cls new closure to use
579  */
580 void
581 GNUNET_MQ_set_handlers_closure(struct GNUNET_MQ_Handle *mq, void *handlers_cls)
582 {
583   if (NULL == mq->handlers)
584     return;
585   for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
586     mq->handlers[i].cls = handlers_cls;
587 }
588
589
590 /**
591  * Get the message that should currently be sent.
592  * Fails if there is no current message.
593  * Only useful for implementing message queues,
594  * results in undefined behavior if not used carefully.
595  *
596  * @param mq message queue with the current message
597  * @return message to send, never NULL
598  */
599 const struct GNUNET_MessageHeader *
600 GNUNET_MQ_impl_current(struct GNUNET_MQ_Handle *mq)
601 {
602   GNUNET_assert(NULL != mq->current_envelope);
603   GNUNET_assert(NULL != mq->current_envelope->mh);
604   return mq->current_envelope->mh;
605 }
606
607
608 /**
609  * Get the implementation state associated with the
610  * message queue.
611  *
612  * While the GNUNET_MQ_Impl* callbacks receive the
613  * implementation state, continuations that are scheduled
614  * by the implementation function often only have one closure
615  * argument, with this function it is possible to get at the
616  * implementation state when only passing the GNUNET_MQ_Handle
617  * as closure.
618  *
619  * @param mq message queue with the current message
620  * @return message to send, never NULL
621  */
622 void *
623 GNUNET_MQ_impl_state(struct GNUNET_MQ_Handle *mq)
624 {
625   return mq->impl_state;
626 }
627
628
629 struct GNUNET_MQ_Envelope *
630 GNUNET_MQ_msg_(struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
631 {
632   struct GNUNET_MQ_Envelope *ev;
633
634   ev = GNUNET_malloc(size + sizeof(struct GNUNET_MQ_Envelope));
635   ev->mh = (struct GNUNET_MessageHeader *)&ev[1];
636   ev->mh->size = htons(size);
637   ev->mh->type = htons(type);
638   if (NULL != mhp)
639     *mhp = ev->mh;
640   return ev;
641 }
642
643
644 /**
645  * Create a new envelope by copying an existing message.
646  *
647  * @param hdr header of the message to copy
648  * @return envelope containing @a hdr
649  */
650 struct GNUNET_MQ_Envelope *
651 GNUNET_MQ_msg_copy(const struct GNUNET_MessageHeader *hdr)
652 {
653   struct GNUNET_MQ_Envelope *mqm;
654   uint16_t size = ntohs(hdr->size);
655
656   mqm = GNUNET_malloc(sizeof(*mqm) + size);
657   mqm->mh = (struct GNUNET_MessageHeader *)&mqm[1];
658   GNUNET_memcpy(mqm->mh, hdr, size);
659   return mqm;
660 }
661
662
663 /**
664  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
665  *
666  * @param mhp pointer to the message header pointer that will be changed to allocate at
667  *        the newly allocated space for the message.
668  * @param base_size size of the data before the nested message
669  * @param type type of the message in the envelope
670  * @param nested_mh the message to append to the message after base_size
671  */
672 struct GNUNET_MQ_Envelope *
673 GNUNET_MQ_msg_nested_mh_(struct GNUNET_MessageHeader **mhp,
674                          uint16_t base_size,
675                          uint16_t type,
676                          const struct GNUNET_MessageHeader *nested_mh)
677 {
678   struct GNUNET_MQ_Envelope *mqm;
679   uint16_t size;
680
681   if (NULL == nested_mh)
682     return GNUNET_MQ_msg_(mhp, base_size, type);
683
684   size = base_size + ntohs(nested_mh->size);
685
686   /* check for uint16_t overflow */
687   if (size < base_size)
688     return NULL;
689
690   mqm = GNUNET_MQ_msg_(mhp, size, type);
691   GNUNET_memcpy((char *)mqm->mh + base_size,
692                 nested_mh,
693                 ntohs(nested_mh->size));
694
695   return mqm;
696 }
697
698
699 /**
700  * Associate the assoc_data in mq with a unique request id.
701  *
702  * @param mq message queue, id will be unique for the queue
703  * @param assoc_data to associate
704  */
705 uint32_t
706 GNUNET_MQ_assoc_add(struct GNUNET_MQ_Handle *mq, void *assoc_data)
707 {
708   uint32_t id;
709
710   if (NULL == mq->assoc_map)
711     {
712       mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create(8);
713       mq->assoc_id = 1;
714     }
715   id = mq->assoc_id++;
716   GNUNET_assert(GNUNET_OK ==
717                 GNUNET_CONTAINER_multihashmap32_put(
718                   mq->assoc_map,
719                   id,
720                   assoc_data,
721                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
722   return id;
723 }
724
725
726 /**
727  * Get the data associated with a @a request_id in a queue
728  *
729  * @param mq the message queue with the association
730  * @param request_id the request id we are interested in
731  * @return the associated data
732  */
733 void *
734 GNUNET_MQ_assoc_get(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
735 {
736   if (NULL == mq->assoc_map)
737     return NULL;
738   return GNUNET_CONTAINER_multihashmap32_get(mq->assoc_map, request_id);
739 }
740
741
742 /**
743  * Remove the association for a @a request_id
744  *
745  * @param mq the message queue with the association
746  * @param request_id the request id we want to remove
747  * @return the associated data
748  */
749 void *
750 GNUNET_MQ_assoc_remove(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
751 {
752   void *val;
753
754   if (NULL == mq->assoc_map)
755     return NULL;
756   val = GNUNET_CONTAINER_multihashmap32_get(mq->assoc_map, request_id);
757   GNUNET_CONTAINER_multihashmap32_remove_all(mq->assoc_map, request_id);
758   return val;
759 }
760
761
762 /**
763  * Call a callback once the envelope has been sent, that is,
764  * sending it can not be canceled anymore.
765  * There can be only one notify sent callback per envelope.
766  *
767  * @param ev message to call the notify callback for
768  * @param cb the notify callback
769  * @param cb_cls closure for the callback
770  */
771 void
772 GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev,
773                       GNUNET_SCHEDULER_TaskCallback cb,
774                       void *cb_cls)
775 {
776   /* allow setting *OR* clearing callback */
777   GNUNET_assert((NULL == ev->sent_cb) || (NULL == cb));
778   ev->sent_cb = cb;
779   ev->sent_cls = cb_cls;
780 }
781
782
783 /**
784  * Handle we return for callbacks registered to be
785  * notified when #GNUNET_MQ_destroy() is called on a queue.
786  */
787 struct GNUNET_MQ_DestroyNotificationHandle {
788   /**
789    * Kept in a DLL.
790    */
791   struct GNUNET_MQ_DestroyNotificationHandle *prev;
792
793   /**
794    * Kept in a DLL.
795    */
796   struct GNUNET_MQ_DestroyNotificationHandle *next;
797
798   /**
799    * Queue to notify about.
800    */
801   struct GNUNET_MQ_Handle *mq;
802
803   /**
804    * Function to call.
805    */
806   GNUNET_SCHEDULER_TaskCallback cb;
807
808   /**
809    * Closure for @e cb.
810    */
811   void *cb_cls;
812 };
813
814
815 /**
816  * Destroy the message queue.
817  *
818  * @param mq message queue to destroy
819  */
820 void
821 GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq)
822 {
823   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
824
825   if (NULL != mq->destroy_impl)
826     {
827       mq->destroy_impl(mq, mq->impl_state);
828     }
829   if (NULL != mq->send_task)
830     {
831       GNUNET_SCHEDULER_cancel(mq->send_task);
832       mq->send_task = NULL;
833     }
834   while (NULL != mq->envelope_head)
835     {
836       struct GNUNET_MQ_Envelope *ev;
837
838       ev = mq->envelope_head;
839       ev->parent_queue = NULL;
840       GNUNET_CONTAINER_DLL_remove(mq->envelope_head, mq->envelope_tail, ev);
841       GNUNET_assert(0 < mq->queue_length);
842       mq->queue_length--;
843       LOG(GNUNET_ERROR_TYPE_DEBUG,
844           "MQ destroy drops message of type %u\n",
845           ntohs(ev->mh->type));
846       GNUNET_MQ_discard(ev);
847     }
848   if (NULL != mq->current_envelope)
849     {
850       /* we can only discard envelopes that
851        * are not queued! */
852       mq->current_envelope->parent_queue = NULL;
853       LOG(GNUNET_ERROR_TYPE_DEBUG,
854           "MQ destroy drops current message of type %u\n",
855           ntohs(mq->current_envelope->mh->type));
856       GNUNET_MQ_discard(mq->current_envelope);
857       mq->current_envelope = NULL;
858       GNUNET_assert(0 < mq->queue_length);
859       mq->queue_length--;
860     }
861   GNUNET_assert(0 == mq->queue_length);
862   while (NULL != (dnh = mq->dnh_head))
863     {
864       dnh->cb(dnh->cb_cls);
865       GNUNET_MQ_destroy_notify_cancel(dnh);
866     }
867   if (NULL != mq->assoc_map)
868     {
869       GNUNET_CONTAINER_multihashmap32_destroy(mq->assoc_map);
870       mq->assoc_map = NULL;
871     }
872   GNUNET_free_non_null(mq->handlers);
873   GNUNET_free(mq);
874 }
875
876
877 const struct GNUNET_MessageHeader *
878 GNUNET_MQ_extract_nested_mh_(const struct GNUNET_MessageHeader *mh,
879                              uint16_t base_size)
880 {
881   uint16_t whole_size;
882   uint16_t nested_size;
883   const struct GNUNET_MessageHeader *nested_msg;
884
885   whole_size = ntohs(mh->size);
886   GNUNET_assert(whole_size >= base_size);
887   nested_size = whole_size - base_size;
888   if (0 == nested_size)
889     return NULL;
890   if (nested_size < sizeof(struct GNUNET_MessageHeader))
891     {
892       GNUNET_break_op(0);
893       return NULL;
894     }
895   nested_msg = (const struct GNUNET_MessageHeader *)((char *)mh + base_size);
896   if (ntohs(nested_msg->size) != nested_size)
897     {
898       GNUNET_break_op(0);
899       return NULL;
900     }
901   return nested_msg;
902 }
903
904
905 /**
906  * Cancel sending the message. Message must have been sent with
907  * #GNUNET_MQ_send before.  May not be called after the notify sent
908  * callback has been called
909  *
910  * @param ev queued envelope to cancel
911  */
912 void
913 GNUNET_MQ_send_cancel(struct GNUNET_MQ_Envelope *ev)
914 {
915   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
916
917   GNUNET_assert(NULL != mq);
918   GNUNET_assert(NULL != mq->cancel_impl);
919
920   mq->evacuate_called = GNUNET_NO;
921
922   if (mq->current_envelope == ev)
923     {
924       /* complex case, we already started with transmitting
925          the message using the callbacks. */
926       GNUNET_assert(GNUNET_NO == mq->in_flight);
927       GNUNET_assert(0 < mq->queue_length);
928       mq->queue_length--;
929       mq->cancel_impl(mq, mq->impl_state);
930       /* continue sending the next message, if any */
931       mq->current_envelope = mq->envelope_head;
932       if (NULL != mq->current_envelope)
933         {
934           GNUNET_CONTAINER_DLL_remove(mq->envelope_head,
935                                       mq->envelope_tail,
936                                       mq->current_envelope);
937
938           LOG(GNUNET_ERROR_TYPE_DEBUG,
939               "sending canceled message of type %u queue\n",
940               ntohs(ev->mh->type));
941
942           mq->send_impl(mq, mq->current_envelope->mh, mq->impl_state);
943         }
944     }
945   else
946     {
947       /* simple case, message is still waiting in the queue */
948       GNUNET_CONTAINER_DLL_remove(mq->envelope_head, mq->envelope_tail, ev);
949       GNUNET_assert(0 < mq->queue_length);
950       mq->queue_length--;
951     }
952
953   if (GNUNET_YES != mq->evacuate_called)
954     {
955       ev->parent_queue = NULL;
956       ev->mh = NULL;
957       /* also frees ev */
958       GNUNET_free(ev);
959     }
960 }
961
962
963 /**
964  * Function to obtain the current envelope
965  * from within #GNUNET_MQ_SendImpl implementations.
966  *
967  * @param mq message queue to interrogate
968  * @return the current envelope
969  */
970 struct GNUNET_MQ_Envelope *
971 GNUNET_MQ_get_current_envelope(struct GNUNET_MQ_Handle *mq)
972 {
973   return mq->current_envelope;
974 }
975
976
977 /**
978  * Function to obtain the last envelope in the queue.
979  *
980  * @param mq message queue to interrogate
981  * @return the last envelope in the queue
982  */
983 struct GNUNET_MQ_Envelope *
984 GNUNET_MQ_get_last_envelope(struct GNUNET_MQ_Handle *mq)
985 {
986   if (NULL != mq->envelope_tail)
987     return mq->envelope_tail;
988
989   return mq->current_envelope;
990 }
991
992
993 /**
994  * Set application-specific preferences for this envelope.
995  * Overrides the options set for the queue with
996  * #GNUNET_MQ_set_options() for this message only.
997  *
998  * @param env message to set options for
999  * @param pp priorities and preferences to apply
1000  */
1001 void
1002 GNUNET_MQ_env_set_options(struct GNUNET_MQ_Envelope *env,
1003                           enum GNUNET_MQ_PriorityPreferences pp)
1004 {
1005   env->priority = pp;
1006   env->have_custom_options = GNUNET_YES;
1007 }
1008
1009
1010 /**
1011  * Get application-specific options for this envelope.
1012  *
1013  * @param env message to set options for
1014  * @return priorities and preferences to apply for @a env
1015  */
1016 enum GNUNET_MQ_PriorityPreferences
1017 GNUNET_MQ_env_get_options(struct GNUNET_MQ_Envelope *env)
1018 {
1019   struct GNUNET_MQ_Handle *mq = env->parent_queue;
1020
1021   if (GNUNET_YES == env->have_custom_options)
1022     return env->priority;
1023   if (NULL == mq)
1024     return 0;
1025   return mq->priority;
1026 }
1027
1028
1029 /**
1030  * Combine performance preferences set for different
1031  * envelopes that are being combined into one larger envelope.
1032  *
1033  * @param p1 one set of preferences
1034  * @param p2 second set of preferences
1035  * @return combined priority and preferences to use
1036  */
1037 enum GNUNET_MQ_PriorityPreferences
1038 GNUNET_MQ_env_combine_options(enum GNUNET_MQ_PriorityPreferences p1,
1039                               enum GNUNET_MQ_PriorityPreferences p2)
1040 {
1041   enum GNUNET_MQ_PriorityPreferences ret;
1042
1043   ret = GNUNET_MAX(p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
1044   ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
1045   ret |=
1046     ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY));
1047   ret |=
1048     ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED));
1049   ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
1050   ret |=
1051     ((p1 & GNUNET_MQ_PREF_OUT_OF_ORDER) & (p2 & GNUNET_MQ_PREF_OUT_OF_ORDER));
1052   return ret;
1053 }
1054
1055
1056 /**
1057  * Set application-specific default options for this queue.
1058  *
1059  * @param mq message queue to set options for
1060  * @param pp priorities and preferences to apply
1061  */
1062 void
1063 GNUNET_MQ_set_options(struct GNUNET_MQ_Handle *mq,
1064                       enum GNUNET_MQ_PriorityPreferences pp)
1065 {
1066   mq->priority = pp;
1067 }
1068
1069
1070 /**
1071  * Obtain message contained in envelope.
1072  *
1073  * @param env the envelope
1074  * @return message contained in the envelope
1075  */
1076 const struct GNUNET_MessageHeader *
1077 GNUNET_MQ_env_get_msg(const struct GNUNET_MQ_Envelope *env)
1078 {
1079   return env->mh;
1080 }
1081
1082
1083 /**
1084  * Return next envelope in queue.
1085  *
1086  * @param env a queued envelope
1087  * @return next one, or NULL
1088  */
1089 const struct GNUNET_MQ_Envelope *
1090 GNUNET_MQ_env_next(const struct GNUNET_MQ_Envelope *env)
1091 {
1092   return env->next;
1093 }
1094
1095
1096 /**
1097  * Register function to be called whenever @a mq is being
1098  * destroyed.
1099  *
1100  * @param mq message queue to watch
1101  * @param cb function to call on @a mq destruction
1102  * @param cb_cls closure for @a cb
1103  * @return handle for #GNUNET_MQ_destroy_notify_cancel().
1104  */
1105 struct GNUNET_MQ_DestroyNotificationHandle *
1106 GNUNET_MQ_destroy_notify(struct GNUNET_MQ_Handle *mq,
1107                          GNUNET_SCHEDULER_TaskCallback cb,
1108                          void *cb_cls)
1109 {
1110   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1111
1112   dnh = GNUNET_new(struct GNUNET_MQ_DestroyNotificationHandle);
1113   dnh->mq = mq;
1114   dnh->cb = cb;
1115   dnh->cb_cls = cb_cls;
1116   GNUNET_CONTAINER_DLL_insert(mq->dnh_head, mq->dnh_tail, dnh);
1117   return dnh;
1118 }
1119
1120
1121 /**
1122  * Cancel registration from #GNUNET_MQ_destroy_notify().
1123  *
1124  * @param dnh handle for registration to cancel
1125  */
1126 void
1127 GNUNET_MQ_destroy_notify_cancel(
1128   struct GNUNET_MQ_DestroyNotificationHandle *dnh)
1129 {
1130   struct GNUNET_MQ_Handle *mq = dnh->mq;
1131
1132   GNUNET_CONTAINER_DLL_remove(mq->dnh_head, mq->dnh_tail, dnh);
1133   GNUNET_free(dnh);
1134 }
1135
1136
1137 /**
1138  * Insert @a env into the envelope DLL starting at @a env_head
1139  * Note that @a env must not be in any MQ while this function
1140  * is used with DLLs defined outside of the MQ module.  This
1141  * is just in case some application needs to also manage a
1142  * FIFO of envelopes independent of MQ itself and wants to
1143  * re-use the pointers internal to @a env.  Use with caution.
1144  *
1145  * @param[in|out] env_head of envelope DLL
1146  * @param[in|out] env_tail tail of envelope DLL
1147  * @param[in|out] env element to insert at the tail
1148  */
1149 void
1150 GNUNET_MQ_dll_insert_head(struct GNUNET_MQ_Envelope **env_head,
1151                           struct GNUNET_MQ_Envelope **env_tail,
1152                           struct GNUNET_MQ_Envelope *env)
1153 {
1154   GNUNET_CONTAINER_DLL_insert(*env_head, *env_tail, env);
1155 }
1156
1157
1158 /**
1159  * Insert @a env into the envelope DLL starting at @a env_head
1160  * Note that @a env must not be in any MQ while this function
1161  * is used with DLLs defined outside of the MQ module.  This
1162  * is just in case some application needs to also manage a
1163  * FIFO of envelopes independent of MQ itself and wants to
1164  * re-use the pointers internal to @a env.  Use with caution.
1165  *
1166  * @param[in|out] env_head of envelope DLL
1167  * @param[in|out] env_tail tail of envelope DLL
1168  * @param[in|out] env element to insert at the tail
1169  */
1170 void
1171 GNUNET_MQ_dll_insert_tail(struct GNUNET_MQ_Envelope **env_head,
1172                           struct GNUNET_MQ_Envelope **env_tail,
1173                           struct GNUNET_MQ_Envelope *env)
1174 {
1175   GNUNET_CONTAINER_DLL_insert_tail(*env_head, *env_tail, env);
1176 }
1177
1178
1179 /**
1180  * Remove @a env from the envelope DLL starting at @a env_head.
1181  * Note that @a env must not be in any MQ while this function
1182  * is used with DLLs defined outside of the MQ module. This
1183  * is just in case some application needs to also manage a
1184  * FIFO of envelopes independent of MQ itself and wants to
1185  * re-use the pointers internal to @a env.  Use with caution.
1186  *
1187  * @param[in|out] env_head of envelope DLL
1188  * @param[in|out] env_tail tail of envelope DLL
1189  * @param[in|out] env element to remove from the DLL
1190  */
1191 void
1192 GNUNET_MQ_dll_remove(struct GNUNET_MQ_Envelope **env_head,
1193                      struct GNUNET_MQ_Envelope **env_tail,
1194                      struct GNUNET_MQ_Envelope *env)
1195 {
1196   GNUNET_CONTAINER_DLL_remove(*env_head, *env_tail, env);
1197 }
1198
1199
1200 /**
1201  * Copy an array of handlers.
1202  *
1203  * Useful if the array has been delared in local memory and needs to be
1204  * persisted for future use.
1205  *
1206  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1207  * @return A newly allocated array of handlers.
1208  *         Needs to be freed with #GNUNET_free.
1209  */
1210 struct GNUNET_MQ_MessageHandler *
1211 GNUNET_MQ_copy_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
1212 {
1213   struct GNUNET_MQ_MessageHandler *copy;
1214   unsigned int count;
1215
1216   if (NULL == handlers)
1217     return NULL;
1218
1219   count = GNUNET_MQ_count_handlers(handlers);
1220   copy = GNUNET_new_array(count + 1, struct GNUNET_MQ_MessageHandler);
1221   GNUNET_memcpy(copy,
1222                 handlers,
1223                 count * sizeof(struct GNUNET_MQ_MessageHandler));
1224   return copy;
1225 }
1226
1227
1228 /**
1229  * Copy an array of handlers, appending AGPL handler.
1230  *
1231  * Useful if the array has been delared in local memory and needs to be
1232  * persisted for future use.
1233  *
1234  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1235  * @param agpl_handler function to call for AGPL handling
1236  * @param agpl_cls closure for @a agpl_handler
1237  * @return A newly allocated array of handlers.
1238  *         Needs to be freed with #GNUNET_free.
1239  */
1240 struct GNUNET_MQ_MessageHandler *
1241 GNUNET_MQ_copy_handlers2(const struct GNUNET_MQ_MessageHandler *handlers,
1242                          GNUNET_MQ_MessageCallback agpl_handler,
1243                          void *agpl_cls)
1244 {
1245   struct GNUNET_MQ_MessageHandler *copy;
1246   unsigned int count;
1247
1248   if (NULL == handlers)
1249     return NULL;
1250   count = GNUNET_MQ_count_handlers(handlers);
1251   copy = GNUNET_new_array(count + 2, struct GNUNET_MQ_MessageHandler);
1252   GNUNET_memcpy(copy,
1253                 handlers,
1254                 count * sizeof(struct GNUNET_MQ_MessageHandler));
1255   copy[count].mv = NULL;
1256   copy[count].cb = agpl_handler;
1257   copy[count].cls = agpl_cls;
1258   copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
1259   copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
1260   return copy;
1261 }
1262
1263
1264 /**
1265  * Count the handlers in a handler array.
1266  *
1267  * @param handlers Array of handlers to be counted.
1268  * @return The number of handlers in the array.
1269  */
1270 unsigned int
1271 GNUNET_MQ_count_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
1272 {
1273   unsigned int i;
1274
1275   if (NULL == handlers)
1276     return 0;
1277
1278   for (i = 0; NULL != handlers[i].cb; i++)
1279     ;
1280
1281   return i;
1282 }
1283
1284
1285 /**
1286  * Convert an `enum GNUNET_MQ_PreferenceType` to a string
1287  *
1288  * @param type the preference type
1289  * @return a string or NULL if invalid
1290  */
1291 const char *
1292 GNUNET_MQ_preference_to_string(enum GNUNET_MQ_PreferenceKind type)
1293 {
1294   switch (type)
1295     {
1296     case GNUNET_MQ_PREFERENCE_NONE:
1297       return "NONE";
1298
1299     case GNUNET_MQ_PREFERENCE_BANDWIDTH:
1300       return "BANDWIDTH";
1301
1302     case GNUNET_MQ_PREFERENCE_LATENCY:
1303       return "LATENCY";
1304
1305     case GNUNET_MQ_PREFERENCE_RELIABILITY:
1306       return "RELIABILITY";
1307     }
1308   ;
1309   return NULL;
1310 }
1311
1312
1313 /* end of mq.c */