glitch in the license text detected by hyazinthe, thank you!
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15
16 /**
17  * @author Florian Dold
18  * @file util/mq.c
19  * @brief general purpose request queue
20  */
21 #include "platform.h"
22 #include "gnunet_util_lib.h"
23
24 #define LOG(kind,...) GNUNET_log_from (kind, "util-mq",__VA_ARGS__)
25
26
27 struct GNUNET_MQ_Envelope
28 {
29   /**
30    * Messages are stored in a linked list.
31    * Each queue has its own list of envelopes.
32    */
33   struct GNUNET_MQ_Envelope *next;
34
35   /**
36    * Messages are stored in a linked list
37    * Each queue has its own list of envelopes.
38    */
39   struct GNUNET_MQ_Envelope *prev;
40
41   /**
42    * Actual allocated message header.
43    * The GNUNET_MQ_Envelope header is allocated at
44    * the end of the message.
45    */
46   struct GNUNET_MessageHeader *mh;
47
48   /**
49    * Queue the message is queued in, NULL if message is not queued.
50    */
51   struct GNUNET_MQ_Handle *parent_queue;
52
53   /**
54    * Called after the message was sent irrevocably.
55    */
56   GNUNET_SCHEDULER_TaskCallback sent_cb;
57
58   /**
59    * Closure for @e send_cb
60    */
61   void *sent_cls;
62
63   /**
64    * Flags that were set for this envelope by
65    * #GNUNET_MQ_env_set_options().   Only valid if
66    * @e have_custom_options is set.
67    */
68   uint64_t flags;
69
70   /**
71    * Additional options buffer set for this envelope by
72    * #GNUNET_MQ_env_set_options().  Only valid if
73    * @e have_custom_options is set.
74    */
75   const void *extra;
76
77   /**
78    * Did the application call #GNUNET_MQ_env_set_options()?
79    */
80   int have_custom_options;
81 };
82
83
84 /**
85  * Handle to a message queue.
86  */
87 struct GNUNET_MQ_Handle
88 {
89   /**
90    * Handlers array, or NULL if the queue should not receive messages
91    */
92   struct GNUNET_MQ_MessageHandler *handlers;
93
94   /**
95    * Actual implementation of message sending,
96    * called when a message is added
97    */
98   GNUNET_MQ_SendImpl send_impl;
99
100   /**
101    * Implementation-dependent queue destruction function
102    */
103   GNUNET_MQ_DestroyImpl destroy_impl;
104
105   /**
106    * Implementation-dependent send cancel function
107    */
108   GNUNET_MQ_CancelImpl cancel_impl;
109
110   /**
111    * Implementation-specific state
112    */
113   void *impl_state;
114
115   /**
116    * Callback will be called when an error occurs.
117    */
118   GNUNET_MQ_ErrorHandler error_handler;
119
120   /**
121    * Closure for the error handler.
122    */
123   void *error_handler_cls;
124
125   /**
126    * Task to asynchronously run #impl_send_continue().
127    */
128   struct GNUNET_SCHEDULER_Task *send_task;
129
130   /**
131    * Linked list of messages pending to be sent
132    */
133   struct GNUNET_MQ_Envelope *envelope_head;
134
135   /**
136    * Linked list of messages pending to be sent
137    */
138   struct GNUNET_MQ_Envelope *envelope_tail;
139
140   /**
141    * Message that is currently scheduled to be
142    * sent. Not the head of the message queue, as the implementation
143    * needs to know if sending has been already scheduled or not.
144    */
145   struct GNUNET_MQ_Envelope *current_envelope;
146
147   /**
148    * Map of associations, lazily allocated
149    */
150   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
151
152   /**
153    * Functions to call on queue destruction; kept in a DLL.
154    */
155   struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
156
157   /**
158    * Functions to call on queue destruction; kept in a DLL.
159    */
160   struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
161
162   /**
163    * Additional options buffer set for this queue by
164    * #GNUNET_MQ_set_options().  Default is 0.
165    */
166   const void *default_extra;
167
168   /**
169    * Flags that were set for this queue by
170    * #GNUNET_MQ_set_options().   Default is 0.
171    */
172   uint64_t default_flags;
173
174   /**
175    * Next id that should be used for the @e assoc_map,
176    * initialized lazily to a random value together with
177    * @e assoc_map
178    */
179   uint32_t assoc_id;
180
181   /**
182    * Number of entries we have in the envelope-DLL.
183    */
184   unsigned int queue_length;
185
186   /**
187    * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
188    * FIXME: is this dead?
189    */
190   int evacuate_called;
191
192   /**
193    * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
194    */
195   int in_flight;
196 };
197
198
199 /**
200  * Call the message message handler that was registered
201  * for the type of the given message in the given message queue.
202  *
203  * This function is indended to be used for the implementation
204  * of message queues.
205  *
206  * @param mq message queue with the handlers
207  * @param mh message to dispatch
208  */
209 void
210 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
211                           const struct GNUNET_MessageHeader *mh)
212 {
213   const struct GNUNET_MQ_MessageHandler *handler;
214   int handled = GNUNET_NO;
215   uint16_t msize = ntohs (mh->size);
216   uint16_t mtype = ntohs (mh->type);
217
218   LOG (GNUNET_ERROR_TYPE_DEBUG,
219        "Received message of type %u and size %u\n",
220        mtype, msize);
221
222   if (NULL == mq->handlers)
223     goto done;
224   for (handler = mq->handlers; NULL != handler->cb; handler++)
225   {
226     if (handler->type == mtype)
227     {
228       handled = GNUNET_YES;
229       if ( (handler->expected_size > msize) ||
230            ( (handler->expected_size != msize) &&
231              (NULL == handler->mv) ) )
232       {
233         /* Too small, or not an exact size and
234            no 'mv' handler to check rest */
235         LOG (GNUNET_ERROR_TYPE_ERROR,
236              "Received malformed message of type %u\n",
237              (unsigned int) handler->type);
238         GNUNET_MQ_inject_error (mq,
239                                 GNUNET_MQ_ERROR_MALFORMED);
240         break;
241       }
242       if ( (NULL == handler->mv) ||
243            (GNUNET_OK ==
244             handler->mv (handler->cls, mh)) )
245       {
246         /* message well-formed, pass to handler */
247         handler->cb (handler->cls, mh);
248       }
249       else
250       {
251         /* Message rejected by check routine */
252         LOG (GNUNET_ERROR_TYPE_ERROR,
253              "Received malformed message of type %u\n",
254              (unsigned int) handler->type);
255         GNUNET_MQ_inject_error (mq,
256                                 GNUNET_MQ_ERROR_MALFORMED);
257       }
258       break;
259     }
260   }
261  done:
262   if (GNUNET_NO == handled)
263     LOG (GNUNET_ERROR_TYPE_INFO,
264          "No handler for message of type %u and size %u\n",
265          mtype, msize);
266 }
267
268
269 /**
270  * Call the error handler of a message queue with the given
271  * error code.  If there is no error handler, log a warning.
272  *
273  * This function is intended to be used by the implementation
274  * of message queues.
275  *
276  * @param mq message queue
277  * @param error the error type
278  */
279 void
280 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
281                         enum GNUNET_MQ_Error error)
282 {
283   if (NULL == mq->error_handler)
284   {
285     LOG (GNUNET_ERROR_TYPE_WARNING,
286          "Got error %d, but no handler installed\n",
287          (int) error);
288     return;
289   }
290   mq->error_handler (mq->error_handler_cls,
291                      error);
292 }
293
294
295 /**
296  * Discard the message queue message, free all
297  * allocated resources. Must be called in the event
298  * that a message is created but should not actually be sent.
299  *
300  * @param mqm the message to discard
301  */
302 void
303 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
304 {
305   GNUNET_assert (NULL == ev->parent_queue);
306   GNUNET_free (ev);
307 }
308
309
310 /**
311  * Obtain the current length of the message queue.
312  *
313  * @param mq queue to inspect
314  * @return number of queued, non-transmitted messages
315  */
316 unsigned int
317 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
318 {
319   if (GNUNET_YES != mq->in_flight)
320   {
321     return mq->queue_length;
322   }
323   return mq->queue_length - 1;
324 }
325
326
327 /**
328  * Send a message with the given message queue.
329  * May only be called once per message.
330  *
331  * @param mq message queue
332  * @param ev the envelope with the message to send.
333  */
334 void
335 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
336                 struct GNUNET_MQ_Envelope *ev)
337 {
338   GNUNET_assert (NULL != mq);
339   GNUNET_assert (NULL == ev->parent_queue);
340
341   mq->queue_length++;
342   GNUNET_break (mq->queue_length < 10000); /* This would seem like a bug... */
343   ev->parent_queue = mq;
344   /* is the implementation busy? queue it! */
345   if ( (NULL != mq->current_envelope) ||
346        (NULL != mq->send_task) )
347   {
348     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
349                                       mq->envelope_tail,
350                                       ev);
351     return;
352   }
353   GNUNET_assert (NULL == mq->envelope_head);
354   mq->current_envelope = ev;
355
356   LOG (GNUNET_ERROR_TYPE_DEBUG,
357        "sending message of type %u, queue empty (MQ: %p)\n",
358        ntohs(ev->mh->type),
359        mq);
360
361   mq->send_impl (mq,
362                  ev->mh,
363                  mq->impl_state);
364 }
365
366
367 /**
368  * Remove the first envelope that has not yet been sent from the message
369  * queue and return it.
370  *
371  * @param mq queue to remove envelope from
372  * @return NULL if queue is empty (or has no envelope that is not under transmission)
373  */
374 struct GNUNET_MQ_Envelope *
375 GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
376 {
377   struct GNUNET_MQ_Envelope *env;
378
379   env = mq->envelope_head;
380   GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
381                                mq->envelope_tail,
382                                env);
383   mq->queue_length--;
384   env->parent_queue = NULL;
385   return env;
386 }
387
388
389 /**
390  * Function to copy an envelope.  The envelope must not yet
391  * be in any queue or have any options or callbacks set.
392  *
393  * @param env envelope to copy
394  * @return copy of @a env
395  */
396 struct GNUNET_MQ_Envelope *
397 GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
398 {
399   GNUNET_assert (NULL == env->next);
400   GNUNET_assert (NULL == env->parent_queue);
401   GNUNET_assert (NULL == env->sent_cb);
402   GNUNET_assert (GNUNET_NO == env->have_custom_options);
403   return GNUNET_MQ_msg_copy (env->mh);
404 }
405
406
407 /**
408  * Send a copy of a message with the given message queue.
409  * Can be called repeatedly on the same envelope.
410  *
411  * @param mq message queue
412  * @param ev the envelope with the message to send.
413  */
414 void
415 GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
416                      const struct GNUNET_MQ_Envelope *ev)
417 {
418   struct GNUNET_MQ_Envelope *env;
419   uint16_t msize;
420
421   msize = ntohs (ev->mh->size);
422   env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) +
423                        msize);
424   env->mh = (struct GNUNET_MessageHeader *) &env[1];
425   env->sent_cb = ev->sent_cb;
426   env->sent_cls = ev->sent_cls;
427   GNUNET_memcpy (&env[1],
428           ev->mh,
429           msize);
430   GNUNET_MQ_send (mq,
431                   env);
432 }
433
434
435 /**
436  * Task run to call the send implementation for the next queued
437  * message, if any.  Only useful for implementing message queues,
438  * results in undefined behavior if not used carefully.
439  *
440  * @param cls message queue to send the next message with
441  */
442 static void
443 impl_send_continue (void *cls)
444 {
445   struct GNUNET_MQ_Handle *mq = cls;
446
447   mq->send_task = NULL;
448   /* call is only valid if we're actually currently sending
449    * a message */
450   if (NULL == mq->envelope_head)
451     return;
452   mq->current_envelope = mq->envelope_head;
453   GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
454                                mq->envelope_tail,
455                                mq->current_envelope);
456
457   LOG (GNUNET_ERROR_TYPE_DEBUG,
458        "sending message of type %u from queue\n",
459        ntohs(mq->current_envelope->mh->type));
460
461   mq->send_impl (mq,
462                  mq->current_envelope->mh,
463                  mq->impl_state);
464 }
465
466
467 /**
468  * Call the send implementation for the next queued message, if any.
469  * Only useful for implementing message queues, results in undefined
470  * behavior if not used carefully.
471  *
472  * @param mq message queue to send the next message with
473  */
474 void
475 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
476 {
477   struct GNUNET_MQ_Envelope *current_envelope;
478   GNUNET_SCHEDULER_TaskCallback cb;
479
480   GNUNET_assert (0 < mq->queue_length);
481   mq->queue_length--;
482   mq->in_flight = GNUNET_NO;
483   current_envelope = mq->current_envelope;
484   current_envelope->parent_queue = NULL;
485   mq->current_envelope = NULL;
486   GNUNET_assert (NULL == mq->send_task);
487   mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
488                                             mq);
489   if (NULL != (cb = current_envelope->sent_cb))
490   {
491     current_envelope->sent_cb = NULL;
492     cb (current_envelope->sent_cls);
493   }
494   GNUNET_free (current_envelope);
495 }
496
497
498 /**
499  * Call the send notification for the current message, but do not
500  * try to send the next message until #GNUNET_MQ_impl_send_continue
501  * is called.
502  *
503  * Only useful for implementing message queues, results in undefined
504  * behavior if not used carefully.
505  *
506  * @param mq message queue to send the next message with
507  */
508 void
509 GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
510 {
511   struct GNUNET_MQ_Envelope *current_envelope;
512   GNUNET_SCHEDULER_TaskCallback cb;
513
514   mq->in_flight = GNUNET_YES;
515   /* call is only valid if we're actually currently sending
516    * a message */
517   current_envelope = mq->current_envelope;
518   GNUNET_assert (NULL != current_envelope);
519   /* can't call cancel from now on anymore */
520   current_envelope->parent_queue = NULL;
521   if (NULL != (cb = current_envelope->sent_cb))
522   {
523     current_envelope->sent_cb = NULL;
524     cb (current_envelope->sent_cls);
525   }
526 }
527
528
529 /**
530  * Create a message queue for the specified handlers.
531  *
532  * @param send function the implements sending messages
533  * @param destroy function that implements destroying the queue
534  * @param cancel function that implements canceling a message
535  * @param impl_state for the queue, passed to 'send' and 'destroy'
536  * @param handlers array of message handlers
537  * @param error_handler handler for read and write errors
538  * @param error_handler_cls closure for @a error_handler
539  * @return a new message queue
540  */
541 struct GNUNET_MQ_Handle *
542 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
543                                GNUNET_MQ_DestroyImpl destroy,
544                                GNUNET_MQ_CancelImpl cancel,
545                                void *impl_state,
546                                const struct GNUNET_MQ_MessageHandler *handlers,
547                                GNUNET_MQ_ErrorHandler error_handler,
548                                void *error_handler_cls)
549 {
550   struct GNUNET_MQ_Handle *mq;
551
552   mq = GNUNET_new (struct GNUNET_MQ_Handle);
553   mq->send_impl = send;
554   mq->destroy_impl = destroy;
555   mq->cancel_impl = cancel;
556   mq->handlers = GNUNET_MQ_copy_handlers (handlers);
557   mq->error_handler = error_handler;
558   mq->error_handler_cls = error_handler_cls;
559   mq->impl_state = impl_state;
560
561   return mq;
562 }
563
564
565 /**
566  * Change the closure argument in all of the `handlers` of the
567  * @a mq.
568  *
569  * @param mq to modify
570  * @param handlers_cls new closure to use
571  */
572 void
573 GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
574                                 void *handlers_cls)
575 {
576   if (NULL == mq->handlers)
577     return;
578   for (unsigned int i=0;NULL != mq->handlers[i].cb; i++)
579     mq->handlers[i].cls = handlers_cls;
580 }
581
582
583 /**
584  * Get the message that should currently be sent.
585  * Fails if there is no current message.
586  * Only useful for implementing message queues,
587  * results in undefined behavior if not used carefully.
588  *
589  * @param mq message queue with the current message
590  * @return message to send, never NULL
591  */
592 const struct GNUNET_MessageHeader *
593 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
594 {
595   GNUNET_assert (NULL != mq->current_envelope);
596   GNUNET_assert (NULL != mq->current_envelope->mh);
597   return mq->current_envelope->mh;
598 }
599
600
601 /**
602  * Get the implementation state associated with the
603  * message queue.
604  *
605  * While the GNUNET_MQ_Impl* callbacks receive the
606  * implementation state, continuations that are scheduled
607  * by the implementation function often only have one closure
608  * argument, with this function it is possible to get at the
609  * implementation state when only passing the GNUNET_MQ_Handle
610  * as closure.
611  *
612  * @param mq message queue with the current message
613  * @return message to send, never NULL
614  */
615 void *
616 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
617 {
618   return mq->impl_state;
619 }
620
621
622 struct GNUNET_MQ_Envelope *
623 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
624                 uint16_t size,
625                 uint16_t type)
626 {
627   struct GNUNET_MQ_Envelope *ev;
628
629   ev = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope));
630   ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
631   ev->mh->size = htons (size);
632   ev->mh->type = htons (type);
633   if (NULL != mhp)
634     *mhp = ev->mh;
635   return ev;
636 }
637
638
639 /**
640  * Create a new envelope by copying an existing message.
641  *
642  * @param hdr header of the message to copy
643  * @return envelope containing @a hdr
644  */
645 struct GNUNET_MQ_Envelope *
646 GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
647 {
648   struct GNUNET_MQ_Envelope *mqm;
649   uint16_t size = ntohs (hdr->size);
650
651   mqm = GNUNET_malloc (sizeof (*mqm) + size);
652   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
653   GNUNET_memcpy (mqm->mh,
654           hdr,
655           size);
656   return mqm;
657 }
658
659
660 /**
661  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
662  *
663  * @param mhp pointer to the message header pointer that will be changed to allocate at
664  *        the newly allocated space for the message.
665  * @param base_size size of the data before the nested message
666  * @param type type of the message in the envelope
667  * @param nested_mh the message to append to the message after base_size
668  */
669 struct GNUNET_MQ_Envelope *
670 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
671                           uint16_t base_size,
672                           uint16_t type,
673                           const struct GNUNET_MessageHeader *nested_mh)
674 {
675   struct GNUNET_MQ_Envelope *mqm;
676   uint16_t size;
677
678   if (NULL == nested_mh)
679     return GNUNET_MQ_msg_ (mhp, base_size, type);
680
681   size = base_size + ntohs (nested_mh->size);
682
683   /* check for uint16_t overflow */
684   if (size < base_size)
685     return NULL;
686
687   mqm = GNUNET_MQ_msg_ (mhp, size, type);
688   GNUNET_memcpy ((char *) mqm->mh + base_size,
689                  nested_mh,
690                  ntohs (nested_mh->size));
691
692   return mqm;
693 }
694
695
696 /**
697  * Associate the assoc_data in mq with a unique request id.
698  *
699  * @param mq message queue, id will be unique for the queue
700  * @param assoc_data to associate
701  */
702 uint32_t
703 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
704                      void *assoc_data)
705 {
706   uint32_t id;
707
708   if (NULL == mq->assoc_map)
709   {
710     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
711     mq->assoc_id = 1;
712   }
713   id = mq->assoc_id++;
714   GNUNET_assert (GNUNET_OK ==
715                  GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map,
716                                                       id,
717                                                       assoc_data,
718                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
719   return id;
720 }
721
722
723 /**
724  * Get the data associated with a @a request_id in a queue
725  *
726  * @param mq the message queue with the association
727  * @param request_id the request id we are interested in
728  * @return the associated data
729  */
730 void *
731 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
732                      uint32_t request_id)
733 {
734   if (NULL == mq->assoc_map)
735     return NULL;
736   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
737                                               request_id);
738 }
739
740
741 /**
742  * Remove the association for a @a request_id
743  *
744  * @param mq the message queue with the association
745  * @param request_id the request id we want to remove
746  * @return the associated data
747  */
748 void *
749 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
750                         uint32_t request_id)
751 {
752   void *val;
753
754   if (NULL == mq->assoc_map)
755     return NULL;
756   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
757                                              request_id);
758   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
759                                               request_id);
760   return val;
761 }
762
763
764 /**
765  * Call a callback once the envelope has been sent, that is,
766  * sending it can not be canceled anymore.
767  * There can be only one notify sent callback per envelope.
768  *
769  * @param ev message to call the notify callback for
770  * @param cb the notify callback
771  * @param cb_cls closure for the callback
772  */
773 void
774 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
775                        GNUNET_SCHEDULER_TaskCallback cb,
776                        void *cb_cls)
777 {
778   /* allow setting *OR* clearing callback */
779   GNUNET_assert ( (NULL == ev->sent_cb) ||
780                   (NULL == cb) );
781   ev->sent_cb = cb;
782   ev->sent_cls = cb_cls;
783 }
784
785
786 /**
787  * Handle we return for callbacks registered to be
788  * notified when #GNUNET_MQ_destroy() is called on a queue.
789  */
790 struct GNUNET_MQ_DestroyNotificationHandle
791 {
792   /**
793    * Kept in a DLL.
794    */
795   struct GNUNET_MQ_DestroyNotificationHandle *prev;
796
797   /**
798    * Kept in a DLL.
799    */
800   struct GNUNET_MQ_DestroyNotificationHandle *next;
801
802   /**
803    * Queue to notify about.
804    */
805   struct GNUNET_MQ_Handle *mq;
806
807   /**
808    * Function to call.
809    */
810   GNUNET_SCHEDULER_TaskCallback cb;
811
812   /**
813    * Closure for @e cb.
814    */
815   void *cb_cls;
816 };
817
818
819 /**
820  * Destroy the message queue.
821  *
822  * @param mq message queue to destroy
823  */
824 void
825 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
826 {
827   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
828
829   if (NULL != mq->destroy_impl)
830   {
831     mq->destroy_impl (mq, mq->impl_state);
832   }
833   if (NULL != mq->send_task)
834   {
835     GNUNET_SCHEDULER_cancel (mq->send_task);
836     mq->send_task = NULL;
837   }
838   while (NULL != mq->envelope_head)
839   {
840     struct GNUNET_MQ_Envelope *ev;
841
842     ev = mq->envelope_head;
843     ev->parent_queue = NULL;
844     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
845                                  mq->envelope_tail,
846                                  ev);
847     GNUNET_assert (0 < mq->queue_length);
848     mq->queue_length--;
849     LOG (GNUNET_ERROR_TYPE_DEBUG,
850          "MQ destroy drops message of type %u\n",
851          ntohs (ev->mh->type));
852     GNUNET_MQ_discard (ev);
853   }
854   if (NULL != mq->current_envelope)
855   {
856     /* we can only discard envelopes that
857      * are not queued! */
858     mq->current_envelope->parent_queue = NULL;
859     LOG (GNUNET_ERROR_TYPE_DEBUG,
860          "MQ destroy drops current message of type %u\n",
861          ntohs (mq->current_envelope->mh->type));
862     GNUNET_MQ_discard (mq->current_envelope);
863     mq->current_envelope = NULL;
864     GNUNET_assert (0 < mq->queue_length);
865     mq->queue_length--;
866   }
867   GNUNET_assert (0 == mq->queue_length);
868   while (NULL != (dnh = mq->dnh_head))
869   {
870     dnh->cb (dnh->cb_cls);
871     GNUNET_MQ_destroy_notify_cancel (dnh);
872   }
873   if (NULL != mq->assoc_map)
874   {
875     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
876     mq->assoc_map = NULL;
877   }
878   GNUNET_free_non_null (mq->handlers);
879   GNUNET_free (mq);
880 }
881
882
883 const struct GNUNET_MessageHeader *
884 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
885                               uint16_t base_size)
886 {
887   uint16_t whole_size;
888   uint16_t nested_size;
889   const struct GNUNET_MessageHeader *nested_msg;
890
891   whole_size = ntohs (mh->size);
892   GNUNET_assert (whole_size >= base_size);
893   nested_size = whole_size - base_size;
894   if (0 == nested_size)
895     return NULL;
896   if (nested_size < sizeof (struct GNUNET_MessageHeader))
897   {
898     GNUNET_break_op (0);
899     return NULL;
900   }
901   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
902   if (ntohs (nested_msg->size) != nested_size)
903   {
904     GNUNET_break_op (0);
905     return NULL;
906   }
907   return nested_msg;
908 }
909
910
911 /**
912  * Cancel sending the message. Message must have been sent with
913  * #GNUNET_MQ_send before.  May not be called after the notify sent
914  * callback has been called
915  *
916  * @param ev queued envelope to cancel
917  */
918 void
919 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
920 {
921   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
922
923   GNUNET_assert (NULL != mq);
924   GNUNET_assert (NULL != mq->cancel_impl);
925
926   mq->evacuate_called = GNUNET_NO;
927
928   if (mq->current_envelope == ev)
929   {
930     /* complex case, we already started with transmitting
931        the message using the callbacks. */
932     GNUNET_assert (0 < mq->queue_length);
933     mq->queue_length--;
934     mq->cancel_impl (mq,
935                      mq->impl_state);
936     /* continue sending the next message, if any */
937     mq->current_envelope = mq->envelope_head;
938     if (NULL != mq->current_envelope)
939     {
940       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
941                                    mq->envelope_tail,
942                                    mq->current_envelope);
943
944       LOG (GNUNET_ERROR_TYPE_DEBUG,
945            "sending canceled message of type %u queue\n",
946            ntohs(ev->mh->type));
947
948       mq->send_impl (mq,
949                      mq->current_envelope->mh,
950                      mq->impl_state);
951     }
952   }
953   else
954   {
955     /* simple case, message is still waiting in the queue */
956     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
957                                  mq->envelope_tail,
958                                  ev);
959     GNUNET_assert (0 < mq->queue_length);
960     mq->queue_length--;
961   }
962
963   if (GNUNET_YES != mq->evacuate_called)
964   {
965     ev->parent_queue = NULL;
966     ev->mh = NULL;
967     /* also frees ev */
968     GNUNET_free (ev);
969   }
970 }
971
972
973 /**
974  * Function to obtain the current envelope
975  * from within #GNUNET_MQ_SendImpl implementations.
976  *
977  * @param mq message queue to interrogate
978  * @return the current envelope
979  */
980 struct GNUNET_MQ_Envelope *
981 GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
982 {
983   return mq->current_envelope;
984 }
985
986
987 /**
988  * Function to obtain the last envelope in the queue.
989  *
990  * @param mq message queue to interrogate
991  * @return the last envelope in the queue
992  */
993 struct GNUNET_MQ_Envelope *
994 GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
995 {
996   if (NULL != mq->envelope_tail)
997     return mq->envelope_tail;
998
999   return mq->current_envelope;
1000 }
1001
1002
1003 /**
1004  * Set application-specific options for this envelope.
1005  * Overrides the options set for the queue with
1006  * #GNUNET_MQ_set_options() for this message only.
1007  *
1008  * @param env message to set options for
1009  * @param flags flags to use (meaning is queue-specific)
1010  * @param extra additional buffer for further data (also queue-specific)
1011  */
1012 void
1013 GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1014                            uint64_t flags,
1015                            const void *extra)
1016 {
1017   env->flags = flags;
1018   env->extra = extra;
1019   env->have_custom_options = GNUNET_YES;
1020 }
1021
1022
1023 /**
1024  * Get application-specific options for this envelope.
1025  *
1026  * @param env message to set options for
1027  * @param[out] flags set to flags to use (meaning is queue-specific)
1028  * @return extra additional buffer for further data (also queue-specific)
1029  */
1030 const void *
1031 GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env,
1032                            uint64_t *flags)
1033 {
1034   struct GNUNET_MQ_Handle *mq = env->parent_queue;
1035
1036   if (GNUNET_YES == env->have_custom_options)
1037   {
1038     *flags = env->flags;
1039     return env->extra;
1040   }
1041   if (NULL == mq)
1042   {
1043     *flags = 0;
1044     return NULL;
1045   }
1046   *flags = mq->default_flags;
1047   return mq->default_extra;
1048 }
1049
1050
1051 /**
1052  * Set application-specific options for this queue.
1053  *
1054  * @param mq message queue to set options for
1055  * @param flags flags to use (meaning is queue-specific)
1056  * @param extra additional buffer for further data (also queue-specific)
1057  */
1058 void
1059 GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1060                        uint64_t flags,
1061                        const void *extra)
1062 {
1063   mq->default_flags = flags;
1064   mq->default_extra = extra;
1065 }
1066
1067
1068 /**
1069  * Obtain message contained in envelope.
1070  *
1071  * @param env the envelope
1072  * @return message contained in the envelope
1073  */
1074 const struct GNUNET_MessageHeader *
1075 GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env)
1076 {
1077   return env->mh;
1078 }
1079
1080
1081 /**
1082  * Return next envelope in queue.
1083  *
1084  * @param env a queued envelope
1085  * @return next one, or NULL
1086  */
1087 const struct GNUNET_MQ_Envelope *
1088 GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env)
1089 {
1090   return env->next;
1091 }
1092
1093
1094 /**
1095  * Register function to be called whenever @a mq is being
1096  * destroyed.
1097  *
1098  * @param mq message queue to watch
1099  * @param cb function to call on @a mq destruction
1100  * @param cb_cls closure for @a cb
1101  * @return handle for #GNUNET_MQ_destroy_notify_cancel().
1102  */
1103 struct GNUNET_MQ_DestroyNotificationHandle *
1104 GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1105                           GNUNET_SCHEDULER_TaskCallback cb,
1106                           void *cb_cls)
1107 {
1108   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1109
1110   dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
1111   dnh->mq = mq;
1112   dnh->cb = cb;
1113   dnh->cb_cls = cb_cls;
1114   GNUNET_CONTAINER_DLL_insert (mq->dnh_head,
1115                                mq->dnh_tail,
1116                                dnh);
1117   return dnh;
1118 }
1119
1120
1121 /**
1122  * Cancel registration from #GNUNET_MQ_destroy_notify().
1123  *
1124  * @param dnh handle for registration to cancel
1125  */
1126 void
1127 GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh)
1128 {
1129   struct GNUNET_MQ_Handle *mq = dnh->mq;
1130
1131   GNUNET_CONTAINER_DLL_remove (mq->dnh_head,
1132                                mq->dnh_tail,
1133                                dnh);
1134   GNUNET_free (dnh);
1135 }
1136
1137
1138 /**
1139  * Insert @a env into the envelope DLL starting at @a env_head
1140  * Note that @a env must not be in any MQ while this function
1141  * is used with DLLs defined outside of the MQ module.  This
1142  * is just in case some application needs to also manage a
1143  * FIFO of envelopes independent of MQ itself and wants to
1144  * re-use the pointers internal to @a env.  Use with caution.
1145  *
1146  * @param[in|out] env_head of envelope DLL
1147  * @param[in|out] env_tail tail of envelope DLL
1148  * @param[in|out] env element to insert at the tail
1149  */
1150 void
1151 GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
1152                            struct GNUNET_MQ_Envelope **env_tail,
1153                            struct GNUNET_MQ_Envelope *env)
1154 {
1155   GNUNET_CONTAINER_DLL_insert_tail (*env_head,
1156                                     *env_tail,
1157                                     env);
1158 }
1159
1160
1161 /**
1162  * Remove @a env from the envelope DLL starting at @a env_head.
1163  * Note that @a env must not be in any MQ while this function
1164  * is used with DLLs defined outside of the MQ module. This
1165  * is just in case some application needs to also manage a
1166  * FIFO of envelopes independent of MQ itself and wants to
1167  * re-use the pointers internal to @a env.  Use with caution.
1168  *
1169  * @param[in|out] env_head of envelope DLL
1170  * @param[in|out] env_tail tail of envelope DLL
1171  * @param[in|out] env element to remove from the DLL
1172  */
1173 void
1174 GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
1175                       struct GNUNET_MQ_Envelope **env_tail,
1176                       struct GNUNET_MQ_Envelope *env)
1177 {
1178   GNUNET_CONTAINER_DLL_remove (*env_head,
1179                                *env_tail,
1180                                env);
1181 }
1182
1183
1184 /**
1185  * Copy an array of handlers.
1186  *
1187  * Useful if the array has been delared in local memory and needs to be
1188  * persisted for future use.
1189  *
1190  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1191  * @return A newly allocated array of handlers.
1192  *         Needs to be freed with #GNUNET_free.
1193  */
1194 struct GNUNET_MQ_MessageHandler *
1195 GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1196 {
1197   struct GNUNET_MQ_MessageHandler *copy;
1198   unsigned int count;
1199
1200   if (NULL == handlers)
1201     return NULL;
1202
1203   count = GNUNET_MQ_count_handlers (handlers);
1204   copy = GNUNET_new_array (count + 1,
1205                            struct GNUNET_MQ_MessageHandler);
1206   GNUNET_memcpy (copy,
1207                  handlers,
1208                  count * sizeof (struct GNUNET_MQ_MessageHandler));
1209   return copy;
1210 }
1211
1212
1213 /**
1214  * Copy an array of handlers, appending AGPL handler.
1215  *
1216  * Useful if the array has been delared in local memory and needs to be
1217  * persisted for future use.
1218  *
1219  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1220  * @param agpl_handler function to call for AGPL handling
1221  * @param agpl_cls closure for @a agpl_handler
1222  * @return A newly allocated array of handlers.
1223  *         Needs to be freed with #GNUNET_free.
1224  */
1225 struct GNUNET_MQ_MessageHandler *
1226 GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers,
1227                           GNUNET_MQ_MessageCallback agpl_handler,
1228                           void *agpl_cls)
1229 {
1230   struct GNUNET_MQ_MessageHandler *copy;
1231   unsigned int count;
1232
1233   if (NULL == handlers)
1234     return NULL;
1235   count = GNUNET_MQ_count_handlers (handlers);
1236   copy = GNUNET_new_array (count + 2,
1237                            struct GNUNET_MQ_MessageHandler);
1238   GNUNET_memcpy (copy,
1239                  handlers,
1240                  count * sizeof (struct GNUNET_MQ_MessageHandler));
1241   copy[count].mv = NULL;
1242   copy[count].cb = agpl_handler;
1243   copy[count].cls = agpl_cls;
1244   copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
1245   copy[count].expected_size = sizeof (struct GNUNET_MessageHeader);
1246   return copy;
1247 }
1248
1249
1250 /**
1251  * Count the handlers in a handler array.
1252  *
1253  * @param handlers Array of handlers to be counted.
1254  * @return The number of handlers in the array.
1255  */
1256 unsigned int
1257 GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1258 {
1259   unsigned int i;
1260
1261   if (NULL == handlers)
1262     return 0;
1263
1264   for (i=0; NULL != handlers[i].cb; i++) ;
1265
1266   return i;
1267 }
1268
1269
1270
1271 /* end of mq.c */