added logging
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "util-mq",__VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header.
48    * The GNUNET_MQ_Envelope header is allocated at
49    * the end of the message.
50    */
51   struct GNUNET_MessageHeader *mh;
52
53   /**
54    * Queue the message is queued in, NULL if message is not queued.
55    */
56   struct GNUNET_MQ_Handle *parent_queue;
57
58   /**
59    * Called after the message was sent irrevocably.
60    */
61   GNUNET_SCHEDULER_TaskCallback sent_cb;
62
63   /**
64    * Closure for @e sent_cb
65    */
66   void *sent_cls;
67
68   /**
69    * Flags that were set for this envelope by
70    * #GNUNET_MQ_env_set_options().   Only valid if
71    * @e have_custom_options is set.
72    */
73   uint64_t flags;
74
75   /**
76    * Additional options buffer set for this envelope by
77    * #GNUNET_MQ_env_set_options().  Only valid if
78    * @e have_custom_options is set.
79    */
80   const void *extra;
81
82   /**
83    * Did the application call #GNUNET_MQ_env_set_options()?
84    */
85   int have_custom_options;
86 };
87
88
89 /**
90  * Handle to a message queue.
91  */
92 struct GNUNET_MQ_Handle
93 {
94   /**
95    * Handlers array, or NULL if the queue should not receive messages
96    */
97   struct GNUNET_MQ_MessageHandler *handlers;
98
99   /**
100    * Actual implementation of message sending,
101    * called when a message is added
102    */
103   GNUNET_MQ_SendImpl send_impl;
104
105   /**
106    * Implementation-dependent queue destruction function
107    */
108   GNUNET_MQ_DestroyImpl destroy_impl;
109
110   /**
111    * Implementation-dependent send cancel function
112    */
113   GNUNET_MQ_CancelImpl cancel_impl;
114
115   /**
116    * Implementation-specific state
117    */
118   void *impl_state;
119
120   /**
121    * Callback will be called when an error occurs.
122    */
123   GNUNET_MQ_ErrorHandler error_handler;
124
125   /**
126    * Closure for the error handler.
127    */
128   void *error_handler_cls;
129
130   /**
131    * Task to asynchronously run #impl_send_continue().
132    */
133   struct GNUNET_SCHEDULER_Task *send_task;
134
135   /**
136    * Linked list of messages pending to be sent
137    */
138   struct GNUNET_MQ_Envelope *envelope_head;
139
140   /**
141    * Linked list of messages pending to be sent
142    */
143   struct GNUNET_MQ_Envelope *envelope_tail;
144
145   /**
146    * Message that is currently scheduled to be
147    * sent. Not the head of the message queue, as the implementation
148    * needs to know if sending has been already scheduled or not.
149    */
150   struct GNUNET_MQ_Envelope *current_envelope;
151
152   /**
153    * Map of associations, lazily allocated
154    */
155   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
156
157   /**
158    * Functions to call on queue destruction; kept in a DLL.
159    */
160   struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
161
162   /**
163    * Functions to call on queue destruction; kept in a DLL.
164    */
165   struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
166
167   /**
168    * Additional options buffer set for this queue by
169    * #GNUNET_MQ_set_options().  Default is 0.
170    */
171   const void *default_extra;
172
173   /**
174    * Flags that were set for this queue by
175    * #GNUNET_MQ_set_options().   Default is 0.
176    */
177   uint64_t default_flags;
178
179   /**
180    * Next id that should be used for the @e assoc_map,
181    * initialized lazily to a random value together with
182    * @e assoc_map
183    */
184   uint32_t assoc_id;
185
186   /**
187    * Number of entries we have in the envelope-DLL.
188    */
189   unsigned int queue_length;
190
191   /**
192    * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
193    * FIXME: is this dead?
194    */
195   int evacuate_called;
196
197   /**
198    * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
199    */
200   int in_flight;
201 };
202
203
204 /**
205  * Call the message message handler that was registered
206  * for the type of the given message in the given message queue.
207  *
208  * This function is indended to be used for the implementation
209  * of message queues.
210  *
211  * @param mq message queue with the handlers
212  * @param mh message to dispatch
213  */
214 void
215 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
216                           const struct GNUNET_MessageHeader *mh)
217 {
218   const struct GNUNET_MQ_MessageHandler *handler;
219   int handled = GNUNET_NO;
220   uint16_t msize = ntohs (mh->size);
221   uint16_t mtype = ntohs (mh->type);
222
223   LOG (GNUNET_ERROR_TYPE_DEBUG,
224        "Queue %p received message of type %u and size %u\n",
225        mq,
226        mtype,
227        msize);
228
229   if (NULL == mq->handlers)
230     goto done;
231   for (handler = mq->handlers; NULL != handler->cb; handler++)
232   {
233     if (handler->type == mtype)
234     {
235       handled = GNUNET_YES;
236       if ( (handler->expected_size > msize) ||
237            ( (handler->expected_size != msize) &&
238              (NULL == handler->mv) ) )
239       {
240         /* Too small, or not an exact size and
241            no 'mv' handler to check rest */
242         LOG (GNUNET_ERROR_TYPE_ERROR,
243              "Received malformed message of type %u\n",
244              (unsigned int) handler->type);
245         GNUNET_MQ_inject_error (mq,
246                                 GNUNET_MQ_ERROR_MALFORMED);
247         break;
248       }
249       if ( (NULL == handler->mv) ||
250            (GNUNET_OK ==
251             handler->mv (handler->cls, mh)) )
252       {
253         /* message well-formed, pass to handler */
254         handler->cb (handler->cls, mh);
255       }
256       else
257       {
258         /* Message rejected by check routine */
259         LOG (GNUNET_ERROR_TYPE_ERROR,
260              "Received malformed message of type %u\n",
261              (unsigned int) handler->type);
262         GNUNET_MQ_inject_error (mq,
263                                 GNUNET_MQ_ERROR_MALFORMED);
264       }
265       break;
266     }
267   }
268  done:
269   if (GNUNET_NO == handled)
270     LOG (GNUNET_ERROR_TYPE_INFO,
271          "No handler for message of type %u and size %u\n",
272          mtype, msize);
273 }
274
275
276 /**
277  * Call the error handler of a message queue with the given
278  * error code.  If there is no error handler, log a warning.
279  *
280  * This function is intended to be used by the implementation
281  * of message queues.
282  *
283  * @param mq message queue
284  * @param error the error type
285  */
286 void
287 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
288                         enum GNUNET_MQ_Error error)
289 {
290   if (NULL == mq->error_handler)
291   {
292     LOG (GNUNET_ERROR_TYPE_WARNING,
293          "Got error %d, but no handler installed\n",
294          (int) error);
295     return;
296   }
297   mq->error_handler (mq->error_handler_cls,
298                      error);
299 }
300
301
302 /**
303  * Discard the message queue message, free all
304  * allocated resources. Must be called in the event
305  * that a message is created but should not actually be sent.
306  *
307  * @param mqm the message to discard
308  */
309 void
310 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
311 {
312   GNUNET_assert (NULL == ev->parent_queue);
313   GNUNET_free (ev);
314 }
315
316
317 /**
318  * Obtain the current length of the message queue.
319  *
320  * @param mq queue to inspect
321  * @return number of queued, non-transmitted messages
322  */
323 unsigned int
324 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
325 {
326   if (GNUNET_YES != mq->in_flight)
327   {
328     return mq->queue_length;
329   }
330   return mq->queue_length - 1;
331 }
332
333
334 /**
335  * Send a message with the given message queue.
336  * May only be called once per message.
337  *
338  * @param mq message queue
339  * @param ev the envelope with the message to send.
340  */
341 void
342 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
343                 struct GNUNET_MQ_Envelope *ev)
344 {
345   GNUNET_assert (NULL != mq);
346   GNUNET_assert (NULL == ev->parent_queue);
347
348   mq->queue_length++;
349   GNUNET_break (mq->queue_length < 10000); /* This would seem like a bug... */
350   ev->parent_queue = mq;
351   /* is the implementation busy? queue it! */
352   if ( (NULL != mq->current_envelope) ||
353        (NULL != mq->send_task) )
354   {
355     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
356                                       mq->envelope_tail,
357                                       ev);
358     return;
359   }
360   GNUNET_assert (NULL == mq->envelope_head);
361   mq->current_envelope = ev;
362
363   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
364               "mq: sending message of type %u, queue empty (MQ: %p)\n",
365               ntohs(ev->mh->type),
366               mq);
367
368   mq->send_impl (mq,
369                  ev->mh,
370                  mq->impl_state);
371 }
372
373
374 /**
375  * Remove the first envelope that has not yet been sent from the message
376  * queue and return it.
377  *
378  * @param mq queue to remove envelope from
379  * @return NULL if queue is empty (or has no envelope that is not under transmission)
380  */
381 struct GNUNET_MQ_Envelope *
382 GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
383 {
384   struct GNUNET_MQ_Envelope *env;
385
386   env = mq->envelope_head;
387   GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
388                                mq->envelope_tail,
389                                env);
390   mq->queue_length--;
391   env->parent_queue = NULL;
392   return env;
393 }
394
395
396 /**
397  * Function to copy an envelope.  The envelope must not yet
398  * be in any queue or have any options or callbacks set.
399  *
400  * @param env envelope to copy
401  * @return copy of @a env
402  */
403 struct GNUNET_MQ_Envelope *
404 GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
405 {
406   GNUNET_assert (NULL == env->next);
407   GNUNET_assert (NULL == env->parent_queue);
408   GNUNET_assert (NULL == env->sent_cb);
409   GNUNET_assert (GNUNET_NO == env->have_custom_options);
410   return GNUNET_MQ_msg_copy (env->mh);
411 }
412
413
414 /**
415  * Send a copy of a message with the given message queue.
416  * Can be called repeatedly on the same envelope.
417  *
418  * @param mq message queue
419  * @param ev the envelope with the message to send.
420  */
421 void
422 GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
423                      const struct GNUNET_MQ_Envelope *ev)
424 {
425   struct GNUNET_MQ_Envelope *env;
426   uint16_t msize;
427
428   msize = ntohs (ev->mh->size);
429   env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) +
430                        msize);
431   env->mh = (struct GNUNET_MessageHeader *) &env[1];
432   env->sent_cb = ev->sent_cb;
433   env->sent_cls = ev->sent_cls;
434   GNUNET_memcpy (&env[1],
435           ev->mh,
436           msize);
437   GNUNET_MQ_send (mq,
438                   env);
439 }
440
441
442 /**
443  * Task run to call the send implementation for the next queued
444  * message, if any.  Only useful for implementing message queues,
445  * results in undefined behavior if not used carefully.
446  *
447  * @param cls message queue to send the next message with
448  */
449 static void
450 impl_send_continue (void *cls)
451 {
452   struct GNUNET_MQ_Handle *mq = cls;
453
454   mq->send_task = NULL;
455   /* call is only valid if we're actually currently sending
456    * a message */
457   if (NULL == mq->envelope_head)
458     return;
459   mq->current_envelope = mq->envelope_head;
460   GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
461                                mq->envelope_tail,
462                                mq->current_envelope);
463
464   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
465               "mq: sending message of type %u from queue\n",
466               ntohs(mq->current_envelope->mh->type));
467
468   mq->send_impl (mq,
469                  mq->current_envelope->mh,
470                  mq->impl_state);
471 }
472
473
474 /**
475  * Call the send implementation for the next queued message, if any.
476  * Only useful for implementing message queues, results in undefined
477  * behavior if not used carefully.
478  *
479  * @param mq message queue to send the next message with
480  */
481 void
482 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
483 {
484   struct GNUNET_MQ_Envelope *current_envelope;
485   GNUNET_SCHEDULER_TaskCallback cb;
486
487   GNUNET_assert (0 < mq->queue_length);
488   mq->queue_length--;
489   mq->in_flight = GNUNET_NO;
490   current_envelope = mq->current_envelope;
491   current_envelope->parent_queue = NULL;
492   mq->current_envelope = NULL;
493   GNUNET_assert (NULL == mq->send_task);
494   mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
495                                             mq);
496   if (NULL != (cb = current_envelope->sent_cb))
497   {
498     current_envelope->sent_cb = NULL;
499     cb (current_envelope->sent_cls);
500   }
501   GNUNET_free (current_envelope);
502 }
503
504
505 /**
506  * Call the send notification for the current message, but do not
507  * try to send the next message until #GNUNET_MQ_impl_send_continue
508  * is called.
509  *
510  * Only useful for implementing message queues, results in undefined
511  * behavior if not used carefully.
512  *
513  * @param mq message queue to send the next message with
514  */
515 void
516 GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
517 {
518   struct GNUNET_MQ_Envelope *current_envelope;
519   GNUNET_SCHEDULER_TaskCallback cb;
520
521   mq->in_flight = GNUNET_YES;
522   /* call is only valid if we're actually currently sending
523    * a message */
524   current_envelope = mq->current_envelope;
525   GNUNET_assert (NULL != current_envelope);
526   /* can't call cancel from now on anymore */
527   current_envelope->parent_queue = NULL;
528   if (NULL != (cb = current_envelope->sent_cb))
529   {
530     current_envelope->sent_cb = NULL;
531     cb (current_envelope->sent_cls);
532   }
533 }
534
535
536 /**
537  * Create a message queue for the specified handlers.
538  *
539  * @param send function the implements sending messages
540  * @param destroy function that implements destroying the queue
541  * @param cancel function that implements canceling a message
542  * @param impl_state for the queue, passed to 'send' and 'destroy'
543  * @param handlers array of message handlers
544  * @param error_handler handler for read and write errors
545  * @param error_handler_cls closure for @a error_handler
546  * @return a new message queue
547  */
548 struct GNUNET_MQ_Handle *
549 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
550                                GNUNET_MQ_DestroyImpl destroy,
551                                GNUNET_MQ_CancelImpl cancel,
552                                void *impl_state,
553                                const struct GNUNET_MQ_MessageHandler *handlers,
554                                GNUNET_MQ_ErrorHandler error_handler,
555                                void *error_handler_cls)
556 {
557   struct GNUNET_MQ_Handle *mq;
558
559   mq = GNUNET_new (struct GNUNET_MQ_Handle);
560   mq->send_impl = send;
561   mq->destroy_impl = destroy;
562   mq->cancel_impl = cancel;
563   mq->handlers = GNUNET_MQ_copy_handlers (handlers);
564   mq->error_handler = error_handler;
565   mq->error_handler_cls = error_handler_cls;
566   mq->impl_state = impl_state;
567
568   return mq;
569 }
570
571
572 /**
573  * Change the closure argument in all of the `handlers` of the
574  * @a mq.
575  *
576  * @param mq to modify
577  * @param handlers_cls new closure to use
578  */
579 void
580 GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
581                                 void *handlers_cls)
582 {
583   unsigned int i;
584
585   if (NULL == mq->handlers)
586     return;
587   for (i=0;NULL != mq->handlers[i].cb; i++)
588     mq->handlers[i].cls = handlers_cls;
589 }
590
591
592 /**
593  * Get the message that should currently be sent.
594  * Fails if there is no current message.
595  * Only useful for implementing message queues,
596  * results in undefined behavior if not used carefully.
597  *
598  * @param mq message queue with the current message
599  * @return message to send, never NULL
600  */
601 const struct GNUNET_MessageHeader *
602 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
603 {
604   GNUNET_assert (NULL != mq->current_envelope);
605   GNUNET_assert (NULL != mq->current_envelope->mh);
606   return mq->current_envelope->mh;
607 }
608
609
610 /**
611  * Get the implementation state associated with the
612  * message queue.
613  *
614  * While the GNUNET_MQ_Impl* callbacks receive the
615  * implementation state, continuations that are scheduled
616  * by the implementation function often only have one closure
617  * argument, with this function it is possible to get at the
618  * implementation state when only passing the GNUNET_MQ_Handle
619  * as closure.
620  *
621  * @param mq message queue with the current message
622  * @return message to send, never NULL
623  */
624 void *
625 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
626 {
627   return mq->impl_state;
628 }
629
630
631 struct GNUNET_MQ_Envelope *
632 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
633                 uint16_t size,
634                 uint16_t type)
635 {
636   struct GNUNET_MQ_Envelope *ev;
637
638   ev = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope));
639   ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
640   ev->mh->size = htons (size);
641   ev->mh->type = htons (type);
642   if (NULL != mhp)
643     *mhp = ev->mh;
644   return ev;
645 }
646
647
648 /**
649  * Create a new envelope by copying an existing message.
650  *
651  * @param hdr header of the message to copy
652  * @return envelope containing @a hdr
653  */
654 struct GNUNET_MQ_Envelope *
655 GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
656 {
657   struct GNUNET_MQ_Envelope *mqm;
658   uint16_t size = ntohs (hdr->size);
659
660   mqm = GNUNET_malloc (sizeof (*mqm) + size);
661   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
662   GNUNET_memcpy (mqm->mh,
663           hdr,
664           size);
665   return mqm;
666 }
667
668
669 /**
670  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
671  *
672  * @param mhp pointer to the message header pointer that will be changed to allocate at
673  *        the newly allocated space for the message.
674  * @param base_size size of the data before the nested message
675  * @param type type of the message in the envelope
676  * @param nested_mh the message to append to the message after base_size
677  */
678 struct GNUNET_MQ_Envelope *
679 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
680                           uint16_t base_size,
681                           uint16_t type,
682                           const struct GNUNET_MessageHeader *nested_mh)
683 {
684   struct GNUNET_MQ_Envelope *mqm;
685   uint16_t size;
686
687   if (NULL == nested_mh)
688     return GNUNET_MQ_msg_ (mhp, base_size, type);
689
690   size = base_size + ntohs (nested_mh->size);
691
692   /* check for uint16_t overflow */
693   if (size < base_size)
694     return NULL;
695
696   mqm = GNUNET_MQ_msg_ (mhp, size, type);
697   GNUNET_memcpy ((char *) mqm->mh + base_size,
698                  nested_mh,
699                  ntohs (nested_mh->size));
700
701   return mqm;
702 }
703
704
705 /**
706  * Associate the assoc_data in mq with a unique request id.
707  *
708  * @param mq message queue, id will be unique for the queue
709  * @param assoc_data to associate
710  */
711 uint32_t
712 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
713                      void *assoc_data)
714 {
715   uint32_t id;
716
717   if (NULL == mq->assoc_map)
718   {
719     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
720     mq->assoc_id = 1;
721   }
722   id = mq->assoc_id++;
723   GNUNET_assert (GNUNET_OK ==
724                  GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map,
725                                                       id,
726                                                       assoc_data,
727                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
728   return id;
729 }
730
731
732 /**
733  * Get the data associated with a @a request_id in a queue
734  *
735  * @param mq the message queue with the association
736  * @param request_id the request id we are interested in
737  * @return the associated data
738  */
739 void *
740 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
741                      uint32_t request_id)
742 {
743   if (NULL == mq->assoc_map)
744     return NULL;
745   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
746                                               request_id);
747 }
748
749
750 /**
751  * Remove the association for a @a request_id
752  *
753  * @param mq the message queue with the association
754  * @param request_id the request id we want to remove
755  * @return the associated data
756  */
757 void *
758 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
759                         uint32_t request_id)
760 {
761   void *val;
762
763   if (NULL == mq->assoc_map)
764     return NULL;
765   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
766                                              request_id);
767   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
768                                               request_id);
769   return val;
770 }
771
772
773 /**
774  * Call a callback once the envelope has been sent, that is,
775  * sending it can not be canceled anymore.
776  * There can be only one notify sent callback per envelope.
777  *
778  * @param ev message to call the notify callback for
779  * @param cb the notify callback
780  * @param cb_cls closure for the callback
781  */
782 void
783 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
784                        GNUNET_SCHEDULER_TaskCallback cb,
785                        void *cb_cls)
786 {
787   GNUNET_assert (NULL == ev->sent_cb);
788   ev->sent_cb = cb;
789   ev->sent_cls = cb_cls;
790 }
791
792
793 /**
794  * Handle we return for callbacks registered to be
795  * notified when #GNUNET_MQ_destroy() is called on a queue.
796  */
797 struct GNUNET_MQ_DestroyNotificationHandle
798 {
799   /**
800    * Kept in a DLL.
801    */
802   struct GNUNET_MQ_DestroyNotificationHandle *prev;
803
804   /**
805    * Kept in a DLL.
806    */
807   struct GNUNET_MQ_DestroyNotificationHandle *next;
808
809   /**
810    * Queue to notify about.
811    */
812   struct GNUNET_MQ_Handle *mq;
813
814   /**
815    * Function to call.
816    */
817   GNUNET_SCHEDULER_TaskCallback cb;
818
819   /**
820    * Closure for @e cb.
821    */
822   void *cb_cls;
823 };
824
825
826 /**
827  * Destroy the message queue.
828  *
829  * @param mq message queue to destroy
830  */
831 void
832 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
833 {
834   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
835
836   if (NULL != mq->destroy_impl)
837   {
838     mq->destroy_impl (mq, mq->impl_state);
839   }
840   if (NULL != mq->send_task)
841   {
842     GNUNET_SCHEDULER_cancel (mq->send_task);
843     mq->send_task = NULL;
844   }
845   while (NULL != mq->envelope_head)
846   {
847     struct GNUNET_MQ_Envelope *ev;
848
849     ev = mq->envelope_head;
850     ev->parent_queue = NULL;
851     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
852                                  mq->envelope_tail,
853                                  ev);
854     GNUNET_assert (0 < mq->queue_length);
855     mq->queue_length--;
856     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
857                 "MQ destroy drops message of type %u\n",
858                 ntohs (ev->mh->type));
859     GNUNET_MQ_discard (ev);
860   }
861   if (NULL != mq->current_envelope)
862   {
863     /* we can only discard envelopes that
864      * are not queued! */
865     mq->current_envelope->parent_queue = NULL;
866     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
867                 "MQ destroy drops current message of type %u\n",
868                 ntohs (mq->current_envelope->mh->type));
869     GNUNET_MQ_discard (mq->current_envelope);
870     mq->current_envelope = NULL;
871     GNUNET_assert (0 < mq->queue_length);
872     mq->queue_length--;
873   }
874   GNUNET_assert (0 == mq->queue_length);
875   while (NULL != (dnh = mq->dnh_head))
876   {
877     dnh->cb (dnh->cb_cls);
878     GNUNET_MQ_destroy_notify_cancel (dnh);
879   }
880   if (NULL != mq->assoc_map)
881   {
882     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
883     mq->assoc_map = NULL;
884   }
885   GNUNET_free_non_null (mq->handlers);
886   GNUNET_free (mq);
887 }
888
889
890 const struct GNUNET_MessageHeader *
891 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
892                               uint16_t base_size)
893 {
894   uint16_t whole_size;
895   uint16_t nested_size;
896   const struct GNUNET_MessageHeader *nested_msg;
897
898   whole_size = ntohs (mh->size);
899   GNUNET_assert (whole_size >= base_size);
900   nested_size = whole_size - base_size;
901   if (0 == nested_size)
902     return NULL;
903   if (nested_size < sizeof (struct GNUNET_MessageHeader))
904   {
905     GNUNET_break_op (0);
906     return NULL;
907   }
908   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
909   if (ntohs (nested_msg->size) != nested_size)
910   {
911     GNUNET_break_op (0);
912     return NULL;
913   }
914   return nested_msg;
915 }
916
917
918 /**
919  * Cancel sending the message. Message must have been sent with
920  * #GNUNET_MQ_send before.  May not be called after the notify sent
921  * callback has been called
922  *
923  * @param ev queued envelope to cancel
924  */
925 void
926 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
927 {
928   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
929
930   GNUNET_assert (NULL != mq);
931   GNUNET_assert (NULL != mq->cancel_impl);
932
933   mq->evacuate_called = GNUNET_NO;
934
935   if (mq->current_envelope == ev)
936   {
937     /* complex case, we already started with transmitting
938        the message using the callbacks. */
939     GNUNET_assert (0 < mq->queue_length);
940     mq->queue_length--;
941     mq->cancel_impl (mq,
942                      mq->impl_state);
943     /* continue sending the next message, if any */
944     mq->current_envelope = mq->envelope_head;
945     if (NULL != mq->current_envelope)
946     {
947       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
948                                    mq->envelope_tail,
949                                    mq->current_envelope);
950
951       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
952                   "mq: sending canceled message of type %u queue\n",
953                   ntohs(ev->mh->type));
954
955       mq->send_impl (mq,
956                      mq->current_envelope->mh,
957                      mq->impl_state);
958     }
959   }
960   else
961   {
962     /* simple case, message is still waiting in the queue */
963     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
964                                  mq->envelope_tail,
965                                  ev);
966     GNUNET_assert (0 < mq->queue_length);
967     mq->queue_length--;
968   }
969
970   if (GNUNET_YES != mq->evacuate_called)
971   {
972     ev->parent_queue = NULL;
973     ev->mh = NULL;
974     /* also frees ev */
975     GNUNET_free (ev);
976   }
977 }
978
979
980 /**
981  * Function to obtain the current envelope
982  * from within #GNUNET_MQ_SendImpl implementations.
983  *
984  * @param mq message queue to interrogate
985  * @return the current envelope
986  */
987 struct GNUNET_MQ_Envelope *
988 GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
989 {
990   return mq->current_envelope;
991 }
992
993
994 /**
995  * Function to obtain the last envelope in the queue.
996  *
997  * @param mq message queue to interrogate
998  * @return the last envelope in the queue
999  */
1000 struct GNUNET_MQ_Envelope *
1001 GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
1002 {
1003   if (NULL != mq->envelope_tail)
1004     return mq->envelope_tail;
1005
1006   return mq->current_envelope;
1007 }
1008
1009
1010 /**
1011  * Set application-specific options for this envelope.
1012  * Overrides the options set for the queue with
1013  * #GNUNET_MQ_set_options() for this message only.
1014  *
1015  * @param env message to set options for
1016  * @param flags flags to use (meaning is queue-specific)
1017  * @param extra additional buffer for further data (also queue-specific)
1018  */
1019 void
1020 GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1021                            uint64_t flags,
1022                            const void *extra)
1023 {
1024   env->flags = flags;
1025   env->extra = extra;
1026   env->have_custom_options = GNUNET_YES;
1027 }
1028
1029
1030 /**
1031  * Get application-specific options for this envelope.
1032  *
1033  * @param env message to set options for
1034  * @param[out] flags set to flags to use (meaning is queue-specific)
1035  * @return extra additional buffer for further data (also queue-specific)
1036  */
1037 const void *
1038 GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env,
1039                            uint64_t *flags)
1040 {
1041   struct GNUNET_MQ_Handle *mq = env->parent_queue;
1042
1043   if (GNUNET_YES == env->have_custom_options)
1044   {
1045     *flags = env->flags;
1046     return env->extra;
1047   }
1048   if (NULL == mq)
1049   {
1050     *flags = 0;
1051     return NULL;
1052   }
1053   *flags = mq->default_flags;
1054   return mq->default_extra;
1055 }
1056
1057
1058 /**
1059  * Set application-specific options for this queue.
1060  *
1061  * @param mq message queue to set options for
1062  * @param flags flags to use (meaning is queue-specific)
1063  * @param extra additional buffer for further data (also queue-specific)
1064  */
1065 void
1066 GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1067                        uint64_t flags,
1068                        const void *extra)
1069 {
1070   mq->default_flags = flags;
1071   mq->default_extra = extra;
1072 }
1073
1074
1075 /**
1076  * Register function to be called whenever @a mq is being
1077  * destroyed.
1078  *
1079  * @param mq message queue to watch
1080  * @param cb function to call on @a mq destruction
1081  * @param cb_cls closure for @a cb
1082  * @return handle for #GNUNET_MQ_destroy_notify_cancel().
1083  */
1084 struct GNUNET_MQ_DestroyNotificationHandle *
1085 GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1086                           GNUNET_SCHEDULER_TaskCallback cb,
1087                           void *cb_cls)
1088 {
1089   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1090
1091   dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
1092   dnh->mq = mq;
1093   dnh->cb = cb;
1094   dnh->cb_cls = cb_cls;
1095   GNUNET_CONTAINER_DLL_insert (mq->dnh_head,
1096                                mq->dnh_tail,
1097                                dnh);
1098   return dnh;
1099 }
1100
1101
1102 /**
1103  * Cancel registration from #GNUNET_MQ_destroy_notify().
1104  *
1105  * @param dnh handle for registration to cancel
1106  */
1107 void
1108 GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh)
1109 {
1110   struct GNUNET_MQ_Handle *mq = dnh->mq;
1111
1112   GNUNET_CONTAINER_DLL_remove (mq->dnh_head,
1113                                mq->dnh_tail,
1114                                dnh);
1115   GNUNET_free (dnh);
1116 }
1117
1118
1119 /**
1120  * Insert @a env into the envelope DLL starting at @a env_head
1121  * Note that @a env must not be in any MQ while this function
1122  * is used with DLLs defined outside of the MQ module.  This
1123  * is just in case some application needs to also manage a
1124  * FIFO of envelopes independent of MQ itself and wants to
1125  * re-use the pointers internal to @a env.  Use with caution.
1126  *
1127  * @param[in|out] env_head of envelope DLL
1128  * @param[in|out] env_tail tail of envelope DLL
1129  * @param[in|out] env element to insert at the tail
1130  */
1131 void
1132 GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
1133                            struct GNUNET_MQ_Envelope **env_tail,
1134                            struct GNUNET_MQ_Envelope *env)
1135 {
1136   GNUNET_CONTAINER_DLL_insert_tail (*env_head,
1137                                     *env_tail,
1138                                     env);
1139 }
1140
1141
1142 /**
1143  * Remove @a env from the envelope DLL starting at @a env_head.
1144  * Note that @a env must not be in any MQ while this function
1145  * is used with DLLs defined outside of the MQ module. This
1146  * is just in case some application needs to also manage a
1147  * FIFO of envelopes independent of MQ itself and wants to
1148  * re-use the pointers internal to @a env.  Use with caution.
1149  *
1150  * @param[in|out] env_head of envelope DLL
1151  * @param[in|out] env_tail tail of envelope DLL
1152  * @param[in|out] env element to remove from the DLL
1153  */
1154 void
1155 GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
1156                       struct GNUNET_MQ_Envelope **env_tail,
1157                       struct GNUNET_MQ_Envelope *env)
1158 {
1159   GNUNET_CONTAINER_DLL_remove (*env_head,
1160                                *env_tail,
1161                                env);
1162 }
1163
1164
1165 /**
1166  * Copy an array of handlers.
1167  *
1168  * Useful if the array has been delared in local memory and needs to be
1169  * persisted for future use.
1170  *
1171  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1172  * @return A newly allocated array of handlers.
1173  *         Needs to be freed with #GNUNET_free.
1174  */
1175 struct GNUNET_MQ_MessageHandler *
1176 GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1177 {
1178   struct GNUNET_MQ_MessageHandler *copy;
1179   unsigned int count;
1180
1181   if (NULL == handlers)
1182     return NULL;
1183
1184   count = GNUNET_MQ_count_handlers (handlers);
1185   copy = GNUNET_new_array (count + 1,
1186                            struct GNUNET_MQ_MessageHandler);
1187   GNUNET_memcpy (copy,
1188                  handlers,
1189                  count * sizeof (struct GNUNET_MQ_MessageHandler));
1190   return copy;
1191 }
1192
1193
1194 /**
1195  * Count the handlers in a handler array.
1196  *
1197  * @param handlers Array of handlers to be counted.
1198  * @return The number of handlers in the array.
1199  */
1200 unsigned int
1201 GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1202 {
1203   unsigned int i;
1204
1205   if (NULL == handlers)
1206     return 0;
1207
1208   for (i=0; NULL != handlers[i].cb; i++) ;
1209
1210   return i;
1211 }
1212
1213
1214
1215 /* end of mq.c */