log
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header,
48    * usually points to the end of the containing GNUNET_MQ_Envelope
49    */
50   struct GNUNET_MessageHeader *mh;
51
52   /**
53    * Queue the message is queued in, NULL if message is not queued.
54    */
55   struct GNUNET_MQ_Handle *parent_queue;
56
57   /**
58    * Called after the message was sent irrevocably.
59    */
60   GNUNET_MQ_NotifyCallback sent_cb;
61
62   /**
63    * Closure for @e send_cb
64    */
65   void *sent_cls;
66 };
67
68
69 /**
70  * Handle to a message queue.
71  */
72 struct GNUNET_MQ_Handle
73 {
74   /**
75    * Handlers array, or NULL if the queue should not receive messages
76    */
77   struct GNUNET_MQ_MessageHandler *handlers;
78
79   /**
80    * Actual implementation of message sending,
81    * called when a message is added
82    */
83   GNUNET_MQ_SendImpl send_impl;
84
85   /**
86    * Implementation-dependent queue destruction function
87    */
88   GNUNET_MQ_DestroyImpl destroy_impl;
89
90   /**
91    * Implementation-dependent send cancel function
92    */
93   GNUNET_MQ_CancelImpl cancel_impl;
94
95   /**
96    * Implementation-specific state
97    */
98   void *impl_state;
99
100   /**
101    * Callback will be called when an error occurs.
102    */
103   GNUNET_MQ_ErrorHandler error_handler;
104
105   /**
106    * Closure for the error handler.
107    */
108   void *error_handler_cls;
109
110   /**
111    * Linked list of messages pending to be sent
112    */
113   struct GNUNET_MQ_Envelope *envelope_head;
114
115   /**
116    * Linked list of messages pending to be sent
117    */
118   struct GNUNET_MQ_Envelope *envelope_tail;
119
120   /**
121    * Message that is currently scheduled to be
122    * sent. Not the head of the message queue, as the implementation
123    * needs to know if sending has been already scheduled or not.
124    */
125   struct GNUNET_MQ_Envelope *current_envelope;
126
127   /**
128    * Map of associations, lazily allocated
129    */
130   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
131
132   /**
133    * Task scheduled during #GNUNET_MQ_impl_send_continue.
134    */
135   struct GNUNET_SCHEDULER_Task *continue_task;
136
137   /**
138    * Next id that should be used for the @e assoc_map,
139    * initialized lazily to a random value together with
140    * @e assoc_map
141    */
142   uint32_t assoc_id;
143 };
144
145
146 /**
147  * Implementation-specific state for connection to
148  * client (MQ for server).
149  */
150 struct ServerClientSocketState
151 {
152   /**
153    * Handle of the client that connected to the server.
154    */
155   struct GNUNET_SERVER_Client *client;
156
157   /**
158    * Active transmission request to the client.
159    */
160   struct GNUNET_SERVER_TransmitHandle* th;
161 };
162
163
164 /**
165  * Implementation-specific state for connection to
166  * service (MQ for clients).
167  */
168 struct ClientConnectionState
169 {
170   /**
171    * Did we call receive alread alreadyy?
172    */
173   int receive_active;
174
175   /**
176    * Do we also want to receive?
177    */
178   int receive_requested;
179
180   /**
181    * Connection to the service.
182    */
183   struct GNUNET_CLIENT_Connection *connection;
184
185   /**
186    * Active transmission request (or NULL).
187    */
188   struct GNUNET_CLIENT_TransmitHandle *th;
189 };
190
191
192 /**
193  * Call the message message handler that was registered
194  * for the type of the given message in the given message queue.
195  *
196  * This function is indended to be used for the implementation
197  * of message queues.
198  *
199  * @param mq message queue with the handlers
200  * @param mh message to dispatch
201  */
202 void
203 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
204                           const struct GNUNET_MessageHeader *mh)
205 {
206   const struct GNUNET_MQ_MessageHandler *handler;
207   int handled = GNUNET_NO;
208   uint16_t ms = ntohs (mh->size);
209
210   if (NULL == mq->handlers)
211     goto done;
212   for (handler = mq->handlers; NULL != handler->cb; handler++)
213   {
214     if (handler->type == ntohs (mh->type))
215     {
216       handled = GNUNET_YES;
217       if ( (handler->expected_size > ms) ||
218            ( (handler->expected_size != ms) &&
219              (NULL == handler->mv) ) )
220       {
221         /* Too small, or not an exact size and
222            no 'mv' handler to check rest */
223         GNUNET_MQ_inject_error (mq,
224                                 GNUNET_MQ_ERROR_MALFORMED);
225         break;
226       }
227       if ( (NULL == handler->mv) ||
228            (GNUNET_OK ==
229             handler->mv (handler->cls, mh)) )
230       {
231         /* message well-formed, pass to handler */
232         handler->cb (handler->cls, mh);
233       }
234       else
235       {
236         /* Message rejected by check routine */
237         GNUNET_MQ_inject_error (mq,
238                                 GNUNET_MQ_ERROR_MALFORMED);
239       }
240       break;
241     }
242   }
243  done:
244   if (GNUNET_NO == handled)
245     LOG (GNUNET_ERROR_TYPE_WARNING,
246          "No handler for message of type %d\n",
247          ntohs (mh->type));
248 }
249
250
251 /**
252  * Call the error handler of a message queue with the given
253  * error code.  If there is no error handler, log a warning.
254  *
255  * This function is intended to be used by the implementation
256  * of message queues.
257  *
258  * @param mq message queue
259  * @param error the error type
260  */
261 void
262 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
263                         enum GNUNET_MQ_Error error)
264 {
265   if (NULL == mq->error_handler)
266   {
267     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
268                 "mq: got error %d, but no handler installed\n",
269                 (int) error);
270     return;
271   }
272   mq->error_handler (mq->error_handler_cls, error);
273 }
274
275
276 void
277 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
278 {
279   GNUNET_assert (NULL == mqm->parent_queue);
280   GNUNET_free (mqm);
281 }
282
283
284 /**
285  * Send a message with the give message queue.
286  * May only be called once per message.
287  *
288  * @param mq message queue
289  * @param ev the envelope with the message to send.
290  */
291 void
292 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
293                 struct GNUNET_MQ_Envelope *ev)
294 {
295   GNUNET_assert (NULL != mq);
296   GNUNET_assert (NULL == ev->parent_queue);
297
298   ev->parent_queue = mq;
299   /* is the implementation busy? queue it! */
300   if (NULL != mq->current_envelope)
301   {
302     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
303                                       mq->envelope_tail,
304                                       ev);
305     return;
306   }
307   mq->current_envelope = ev;
308   mq->send_impl (mq, ev->mh, mq->impl_state);
309 }
310
311
312 /**
313  * Task run to call the send implementation for the next queued
314  * message, if any.  Only useful for implementing message queues,
315  * results in undefined behavior if not used carefully.
316  *
317  * @param cls message queue to send the next message with
318  */
319 static void
320 impl_send_continue (void *cls)
321 {
322   struct GNUNET_MQ_Handle *mq = cls;
323   struct GNUNET_MQ_Envelope *current_envelope;
324
325   mq->continue_task = NULL;
326   /* call is only valid if we're actually currently sending
327    * a message */
328   current_envelope = mq->current_envelope;
329   GNUNET_assert (NULL != current_envelope);
330   current_envelope->parent_queue = NULL;
331   if (NULL == mq->envelope_head)
332   {
333     mq->current_envelope = NULL;
334   }
335   else
336   {
337     mq->current_envelope = mq->envelope_head;
338     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
339                                  mq->envelope_tail,
340                                  mq->current_envelope);
341     mq->send_impl (mq,
342                    mq->current_envelope->mh,
343                    mq->impl_state);
344   }
345   if (NULL != current_envelope->sent_cb)
346     current_envelope->sent_cb (current_envelope->sent_cls);
347   GNUNET_free (current_envelope);
348 }
349
350
351 /**
352  * Call the send implementation for the next queued message, if any.
353  * Only useful for implementing message queues, results in undefined
354  * behavior if not used carefully.
355  *
356  * @param mq message queue to send the next message with
357  */
358 void
359 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
360 {
361   GNUNET_assert (NULL == mq->continue_task);
362   mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
363                                                 mq);
364 }
365
366
367 /**
368  * Create a message queue for the specified handlers.
369  *
370  * @param send function the implements sending messages
371  * @param destroy function that implements destroying the queue
372  * @param cancel function that implements canceling a message
373  * @param impl_state for the queue, passed to 'send' and 'destroy'
374  * @param handlers array of message handlers
375  * @param error_handler handler for read and write errors
376  * @param error_handler_cls closure for @a error_handler
377  * @return a new message queue
378  */
379 struct GNUNET_MQ_Handle *
380 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
381                                GNUNET_MQ_DestroyImpl destroy,
382                                GNUNET_MQ_CancelImpl cancel,
383                                void *impl_state,
384                                const struct GNUNET_MQ_MessageHandler *handlers,
385                                GNUNET_MQ_ErrorHandler error_handler,
386                                void *error_handler_cls)
387 {
388   struct GNUNET_MQ_Handle *mq;
389   unsigned int i;
390
391   mq = GNUNET_new (struct GNUNET_MQ_Handle);
392   mq->send_impl = send;
393   mq->destroy_impl = destroy;
394   mq->cancel_impl = cancel;
395   if (NULL != handlers)
396   {
397     for (i=0;NULL != handlers[i].cb; i++) ;
398     mq->handlers = GNUNET_new_array (i,
399                                      struct GNUNET_MQ_MessageHandler);
400     memcpy (mq->handlers,
401             handlers,
402             i * sizeof (struct GNUNET_MQ_MessageHandler));
403   }
404   mq->error_handler = error_handler;
405   mq->error_handler_cls = error_handler_cls;
406   mq->impl_state = impl_state;
407
408   return mq;
409 }
410
411
412 /**
413  * Get the message that should currently be sent.
414  * Fails if there is no current message.
415  * Only useful for implementing message queues,
416  * results in undefined behavior if not used carefully.
417  *
418  * @param mq message queue with the current message
419  * @return message to send, never NULL
420  */
421 const struct GNUNET_MessageHeader *
422 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
423 {
424   if (NULL == mq->current_envelope)
425     GNUNET_assert (0);
426   if (NULL == mq->current_envelope->mh)
427     GNUNET_assert (0);
428   return mq->current_envelope->mh;
429 }
430
431
432 /**
433  * Get the implementation state associated with the
434  * message queue.
435  *
436  * While the GNUNET_MQ_Impl* callbacks receive the
437  * implementation state, continuations that are scheduled
438  * by the implementation function often only have one closure
439  * argument, with this function it is possible to get at the
440  * implementation state when only passing the GNUNET_MQ_Handle
441  * as closure.
442  *
443  * @param mq message queue with the current message
444  * @return message to send, never NULL
445  */
446 void *
447 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
448 {
449   return mq->impl_state;
450 }
451
452
453 struct GNUNET_MQ_Envelope *
454 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
455                 uint16_t size,
456                 uint16_t type)
457 {
458   struct GNUNET_MQ_Envelope *mqm;
459
460   mqm = GNUNET_malloc (sizeof *mqm + size);
461   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
462   mqm->mh->size = htons (size);
463   mqm->mh->type = htons (type);
464   if (NULL != mhp)
465     *mhp = mqm->mh;
466   return mqm;
467 }
468
469
470 /**
471  * Implementation of the GNUNET_MQ_msg_nested_mh macro.
472  *
473  * @param mhp pointer to the message header pointer that will be changed to allocate at
474  *        the newly allocated space for the message.
475  * @param base_size size of the data before the nested message
476  * @param type type of the message in the envelope
477  * @param nested_mh the message to append to the message after base_size
478  */
479 struct GNUNET_MQ_Envelope *
480 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
481                           uint16_t base_size,
482                           uint16_t type,
483                           const struct GNUNET_MessageHeader *nested_mh)
484 {
485   struct GNUNET_MQ_Envelope *mqm;
486   uint16_t size;
487
488   if (NULL == nested_mh)
489     return GNUNET_MQ_msg_ (mhp, base_size, type);
490
491   size = base_size + ntohs (nested_mh->size);
492
493   /* check for uint16_t overflow */
494   if (size < base_size)
495     return NULL;
496
497   mqm = GNUNET_MQ_msg_ (mhp, size, type);
498   memcpy ((char *) mqm->mh + base_size,
499           nested_mh,
500           ntohs (nested_mh->size));
501
502   return mqm;
503 }
504
505
506 /**
507  * Transmit a queued message to the session's client.
508  *
509  * @param cls consensus session
510  * @param size number of bytes available in @a buf
511  * @param buf where the callee should write the message
512  * @return number of bytes written to @a buf
513  */
514 static size_t
515 transmit_queued (void *cls,
516                  size_t size,
517                  void *buf)
518 {
519   struct GNUNET_MQ_Handle *mq = cls;
520   struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
521   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
522   size_t msg_size;
523
524   GNUNET_assert (NULL != buf);
525   msg_size = ntohs (msg->size);
526   GNUNET_assert (size >= msg_size);
527   memcpy (buf, msg, msg_size);
528   state->th = NULL;
529
530   GNUNET_MQ_impl_send_continue (mq);
531
532   return msg_size;
533 }
534
535
536 static void
537 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
538                             void *impl_state)
539 {
540   struct ServerClientSocketState *state = impl_state;
541
542   if (NULL != state->th)
543   {
544     GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
545     state->th = NULL;
546   }
547
548   GNUNET_assert (NULL != mq);
549   GNUNET_assert (NULL != state);
550   GNUNET_SERVER_client_drop (state->client);
551   GNUNET_free (state);
552 }
553
554
555 static void
556 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
557                          const struct GNUNET_MessageHeader *msg,
558                          void *impl_state)
559 {
560   struct ServerClientSocketState *state = impl_state;
561
562   GNUNET_assert (NULL != mq);
563   GNUNET_assert (NULL != state);
564   state->th = GNUNET_SERVER_notify_transmit_ready (state->client,
565                                                    ntohs (msg->size),
566                                                    GNUNET_TIME_UNIT_FOREVER_REL,
567                                                    &transmit_queued, mq);
568 }
569
570
571 struct GNUNET_MQ_Handle *
572 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
573 {
574   struct GNUNET_MQ_Handle *mq;
575   struct ServerClientSocketState *scss;
576
577   mq = GNUNET_new (struct GNUNET_MQ_Handle);
578   scss = GNUNET_new (struct ServerClientSocketState);
579   mq->impl_state = scss;
580   scss->client = client;
581   GNUNET_SERVER_client_keep (client);
582   mq->send_impl = server_client_send_impl;
583   mq->destroy_impl = server_client_destroy_impl;
584   return mq;
585 }
586
587
588 /**
589  * Type of a function to call when we receive a message
590  * from the service.
591  *
592  * @param cls closure
593  * @param msg message received, NULL on timeout or fatal error
594  */
595 static void
596 handle_client_message (void *cls,
597                        const struct GNUNET_MessageHeader *msg)
598 {
599   struct GNUNET_MQ_Handle *mq = cls;
600   struct ClientConnectionState *state;
601
602   state = mq->impl_state;
603   if (NULL == msg)
604   {
605     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
606     return;
607   }
608   GNUNET_CLIENT_receive (state->connection,
609                          &handle_client_message,
610                          mq,
611                          GNUNET_TIME_UNIT_FOREVER_REL);
612   GNUNET_MQ_inject_message (mq, msg);
613 }
614
615
616 /**
617  * Transmit a queued message to the session's client.
618  *
619  * @param cls consensus session
620  * @param size number of bytes available in @a buf
621  * @param buf where the callee should write the message
622  * @return number of bytes written to buf
623  */
624 static size_t
625 connection_client_transmit_queued (void *cls,
626                                    size_t size,
627                                    void *buf)
628 {
629   struct GNUNET_MQ_Handle *mq = cls;
630   const struct GNUNET_MessageHeader *msg;
631   struct ClientConnectionState *state = mq->impl_state;
632   size_t msg_size;
633
634   GNUNET_assert (NULL != mq);
635   state->th = NULL;
636   msg = GNUNET_MQ_impl_current (mq);
637
638   if (NULL == buf)
639   {
640     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
641     return 0;
642   }
643
644   if ( (GNUNET_YES == state->receive_requested) &&
645        (GNUNET_NO == state->receive_active) )
646   {
647     state->receive_active = GNUNET_YES;
648     GNUNET_CLIENT_receive (state->connection,
649                            &handle_client_message,
650                            mq,
651                            GNUNET_TIME_UNIT_FOREVER_REL);
652   }
653
654   msg_size = ntohs (msg->size);
655   GNUNET_assert (size >= msg_size);
656   memcpy (buf, msg, msg_size);
657   state->th = NULL;
658
659   GNUNET_MQ_impl_send_continue (mq);
660
661   return msg_size;
662 }
663
664
665 static void
666 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
667                                 void *impl_state)
668 {
669   struct ClientConnectionState *state = impl_state;
670
671   if (NULL != state->th)
672   {
673     GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
674     state->th = NULL;
675   }
676   GNUNET_CLIENT_disconnect (state->connection);
677   GNUNET_free (impl_state);
678 }
679
680
681 static void
682 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
683                              const struct GNUNET_MessageHeader *msg,
684                              void *impl_state)
685 {
686   struct ClientConnectionState *state = impl_state;
687
688   GNUNET_assert (NULL != state);
689   GNUNET_assert (NULL == state->th);
690   state->th =
691       GNUNET_CLIENT_notify_transmit_ready (state->connection,
692                                            ntohs (msg->size),
693                                            GNUNET_TIME_UNIT_FOREVER_REL,
694                                            GNUNET_NO,
695                                            &connection_client_transmit_queued,
696                                            mq);
697   GNUNET_assert (NULL != state->th);
698 }
699
700
701 static void
702 connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
703                                void *impl_state)
704 {
705   struct ClientConnectionState *state = impl_state;
706
707   GNUNET_assert (NULL != state->th);
708   GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
709   state->th = NULL;
710 }
711
712
713 struct GNUNET_MQ_Handle *
714 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
715                                        const struct GNUNET_MQ_MessageHandler *handlers,
716                                        GNUNET_MQ_ErrorHandler error_handler,
717                                        void *error_handler_cls)
718 {
719   struct GNUNET_MQ_Handle *mq;
720   struct ClientConnectionState *state;
721   unsigned int i;
722
723   mq = GNUNET_new (struct GNUNET_MQ_Handle);
724   if (NULL != handlers)
725   {
726     for (i=0;NULL != handlers[i].cb; i++) ;
727     mq->handlers = GNUNET_new_array (i,
728                                      struct GNUNET_MQ_MessageHandler);
729     memcpy (mq->handlers,
730             handlers,
731             i * sizeof (struct GNUNET_MQ_MessageHandler));
732   }
733   mq->error_handler = error_handler;
734   mq->error_handler_cls = error_handler_cls;
735   state = GNUNET_new (struct ClientConnectionState);
736   state->connection = connection;
737   mq->impl_state = state;
738   mq->send_impl = &connection_client_send_impl;
739   mq->destroy_impl = &connection_client_destroy_impl;
740   mq->cancel_impl = &connection_client_cancel_impl;
741   if (NULL != handlers)
742     state->receive_requested = GNUNET_YES;
743
744   return mq;
745 }
746
747
748 /**
749  * Associate the assoc_data in mq with a unique request id.
750  *
751  * @param mq message queue, id will be unique for the queue
752  * @param assoc_data to associate
753  */
754 uint32_t
755 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
756                      void *assoc_data)
757 {
758   uint32_t id;
759
760   if (NULL == mq->assoc_map)
761   {
762     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
763     mq->assoc_id = 1;
764   }
765   id = mq->assoc_id++;
766   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
767                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
768   return id;
769 }
770
771
772 void *
773 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
774                      uint32_t request_id)
775 {
776   if (NULL == mq->assoc_map)
777     return NULL;
778   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
779 }
780
781
782 void *
783 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
784                         uint32_t request_id)
785 {
786   void *val;
787
788   if (NULL == mq->assoc_map)
789     return NULL;
790   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
791                                              request_id);
792   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
793                                               request_id);
794   return val;
795 }
796
797
798 void
799 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
800                        GNUNET_MQ_NotifyCallback cb,
801                        void *cls)
802 {
803   mqm->sent_cb = cb;
804   mqm->sent_cls = cls;
805 }
806
807
808 void
809 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
810 {
811   if (NULL != mq->destroy_impl)
812   {
813     mq->destroy_impl (mq, mq->impl_state);
814   }
815   if (NULL != mq->continue_task)
816   {
817     GNUNET_SCHEDULER_cancel (mq->continue_task);
818     mq->continue_task = NULL;
819   }
820   while (NULL != mq->envelope_head)
821   {
822     struct GNUNET_MQ_Envelope *ev;
823
824     ev = mq->envelope_head;
825     ev->parent_queue = NULL;
826     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
827                                  mq->envelope_tail,
828                                  ev);
829     GNUNET_MQ_discard (ev);
830   }
831   if (NULL != mq->current_envelope)
832   {
833     /* we can only discard envelopes that
834      * are not queued! */
835     mq->current_envelope->parent_queue = NULL;
836     GNUNET_MQ_discard (mq->current_envelope);
837     mq->current_envelope = NULL;
838   }
839   if (NULL != mq->assoc_map)
840   {
841     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
842     mq->assoc_map = NULL;
843   }
844   GNUNET_free_non_null (mq->handlers);
845   GNUNET_free (mq);
846 }
847
848
849 const struct GNUNET_MessageHeader *
850 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
851                               uint16_t base_size)
852 {
853   uint16_t whole_size;
854   uint16_t nested_size;
855   const struct GNUNET_MessageHeader *nested_msg;
856
857   whole_size = ntohs (mh->size);
858   GNUNET_assert (whole_size >= base_size);
859   nested_size = whole_size - base_size;
860   if (0 == nested_size)
861     return NULL;
862   if (nested_size < sizeof (struct GNUNET_MessageHeader))
863   {
864     GNUNET_break_op (0);
865     return NULL;
866   }
867   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
868   if (ntohs (nested_msg->size) != nested_size)
869   {
870     GNUNET_break_op (0);
871     return NULL;
872   }
873   return nested_msg;
874 }
875
876
877 /**
878  * Cancel sending the message. Message must have been sent with
879  * #GNUNET_MQ_send before.  May not be called after the notify sent
880  * callback has been called
881  *
882  * @param ev queued envelope to cancel
883  */
884 void
885 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
886 {
887   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
888
889   GNUNET_assert (NULL != mq);
890   GNUNET_assert (NULL != mq->cancel_impl);
891
892   if (mq->current_envelope == ev)
893   {
894     // complex case, we already started with transmitting
895     // the message
896     mq->cancel_impl (mq,
897                      mq->impl_state);
898     // continue sending the next message, if any
899     if (NULL == mq->envelope_head)
900     {
901       mq->current_envelope = NULL;
902     }
903     else
904     {
905       mq->current_envelope = mq->envelope_head;
906       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
907                                    mq->envelope_tail,
908                                    mq->current_envelope);
909       mq->send_impl (mq,
910                      mq->current_envelope->mh,
911                      mq->impl_state);
912     }
913   }
914   else
915   {
916     // simple case, message is still waiting in the queue
917     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
918                                  mq->envelope_tail,
919                                  ev);
920   }
921
922   ev->parent_queue = NULL;
923   ev->mh = NULL;
924   GNUNET_free (ev);
925 }
926
927 /* end of mq.c */