-fix the fix
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header,
48    * usually points to the end of the containing GNUNET_MQ_Envelope
49    */
50   struct GNUNET_MessageHeader *mh;
51
52   /**
53    * Queue the message is queued in, NULL if message is not queued.
54    */
55   struct GNUNET_MQ_Handle *parent_queue;
56
57   /**
58    * Called after the message was sent irrevocably.
59    */
60   GNUNET_MQ_NotifyCallback sent_cb;
61
62   /**
63    * Closure for @e send_cb
64    */
65   void *sent_cls;
66 };
67
68
69 /**
70  * Handle to a message queue.
71  */
72 struct GNUNET_MQ_Handle
73 {
74   /**
75    * Handlers array, or NULL if the queue should not receive messages
76    */
77   struct GNUNET_MQ_MessageHandler *handlers;
78
79   /**
80    * Actual implementation of message sending,
81    * called when a message is added
82    */
83   GNUNET_MQ_SendImpl send_impl;
84
85   /**
86    * Implementation-dependent queue destruction function
87    */
88   GNUNET_MQ_DestroyImpl destroy_impl;
89
90   /**
91    * Implementation-dependent send cancel function
92    */
93   GNUNET_MQ_CancelImpl cancel_impl;
94
95   /**
96    * Implementation-specific state
97    */
98   void *impl_state;
99
100   /**
101    * Callback will be called when an error occurs.
102    */
103   GNUNET_MQ_ErrorHandler error_handler;
104
105   /**
106    * Closure for the error handler.
107    */
108   void *error_handler_cls;
109
110   /**
111    * Linked list of messages pending to be sent
112    */
113   struct GNUNET_MQ_Envelope *envelope_head;
114
115   /**
116    * Linked list of messages pending to be sent
117    */
118   struct GNUNET_MQ_Envelope *envelope_tail;
119
120   /**
121    * Message that is currently scheduled to be
122    * sent. Not the head of the message queue, as the implementation
123    * needs to know if sending has been already scheduled or not.
124    */
125   struct GNUNET_MQ_Envelope *current_envelope;
126
127   /**
128    * Map of associations, lazily allocated
129    */
130   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
131
132   /**
133    * Task scheduled during #GNUNET_MQ_impl_send_continue.
134    */
135   struct GNUNET_SCHEDULER_Task *continue_task;
136
137   /**
138    * Next id that should be used for the @e assoc_map,
139    * initialized lazily to a random value together with
140    * @e assoc_map
141    */
142   uint32_t assoc_id;
143 };
144
145
146 /**
147  * Implementation-specific state for connection to
148  * client (MQ for server).
149  */
150 struct ServerClientSocketState
151 {
152   /**
153    * Handle of the client that connected to the server.
154    */
155   struct GNUNET_SERVER_Client *client;
156
157   /**
158    * Active transmission request to the client.
159    */
160   struct GNUNET_SERVER_TransmitHandle* th;
161 };
162
163
164 /**
165  * Implementation-specific state for connection to
166  * service (MQ for clients).
167  */
168 struct ClientConnectionState
169 {
170   /**
171    * Did we call receive alread alreadyy?
172    */
173   int receive_active;
174
175   /**
176    * Do we also want to receive?
177    */
178   int receive_requested;
179
180   /**
181    * Connection to the service.
182    */
183   struct GNUNET_CLIENT_Connection *connection;
184
185   /**
186    * Active transmission request (or NULL).
187    */
188   struct GNUNET_CLIENT_TransmitHandle *th;
189 };
190
191
192 /**
193  * Call the message message handler that was registered
194  * for the type of the given message in the given message queue.
195  *
196  * This function is indended to be used for the implementation
197  * of message queues.
198  *
199  * @param mq message queue with the handlers
200  * @param mh message to dispatch
201  */
202 void
203 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
204                           const struct GNUNET_MessageHeader *mh)
205 {
206   const struct GNUNET_MQ_MessageHandler *handler;
207   int handled = GNUNET_NO;
208   uint16_t ms = ntohs (mh->size);
209
210   if (NULL == mq->handlers)
211     goto done;
212   for (handler = mq->handlers; NULL != handler->cb; handler++)
213   {
214     if (handler->type == ntohs (mh->type))
215     {
216       handled = GNUNET_YES;
217       if ( (handler->expected_size > ms) ||
218            ( (handler->expected_size != ms) &&
219              (NULL == handler->mv) ) )
220       {
221         /* Too small, or not an exact size and
222            no 'mv' handler to check rest */
223         GNUNET_MQ_inject_error (mq,
224                                 GNUNET_MQ_ERROR_MALFORMED);
225         break;
226       }
227       if ( (NULL == handler->mv) ||
228            (GNUNET_OK ==
229             handler->mv (handler->cls, mh)) )
230       {
231         /* message well-formed, pass to handler */
232         handler->cb (handler->cls, mh);
233       }
234       else
235       {
236         /* Message rejected by check routine */
237         GNUNET_MQ_inject_error (mq,
238                                 GNUNET_MQ_ERROR_MALFORMED);
239       }
240       break;
241     }
242   }
243  done:
244   if (GNUNET_NO == handled)
245     LOG (GNUNET_ERROR_TYPE_WARNING,
246          "No handler for message of type %d\n",
247          ntohs (mh->type));
248 }
249
250
251 /**
252  * Call the error handler of a message queue with the given
253  * error code.  If there is no error handler, log a warning.
254  *
255  * This function is intended to be used by the implementation
256  * of message queues.
257  *
258  * @param mq message queue
259  * @param error the error type
260  */
261 void
262 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
263                         enum GNUNET_MQ_Error error)
264 {
265   if (NULL == mq->error_handler)
266   {
267     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
268                 "mq: got error %d, but no handler installed\n",
269                 (int) error);
270     return;
271   }
272   mq->error_handler (mq->error_handler_cls, error);
273 }
274
275
276 void
277 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
278 {
279   GNUNET_assert (NULL == mqm->parent_queue);
280   GNUNET_free (mqm);
281 }
282
283
284 /**
285  * Send a message with the give message queue.
286  * May only be called once per message.
287  *
288  * @param mq message queue
289  * @param ev the envelope with the message to send.
290  */
291 void
292 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
293                 struct GNUNET_MQ_Envelope *ev)
294 {
295   GNUNET_assert (NULL != mq);
296   GNUNET_assert (NULL == ev->parent_queue);
297
298   ev->parent_queue = mq;
299   /* is the implementation busy? queue it! */
300   if (NULL != mq->current_envelope)
301   {
302     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
303                                       mq->envelope_tail,
304                                       ev);
305     return;
306   }
307   mq->current_envelope = ev;
308   mq->send_impl (mq, ev->mh, mq->impl_state);
309 }
310
311
312 /**
313  * Task run to call the send implementation for the next queued
314  * message, if any.  Only useful for implementing message queues,
315  * results in undefined behavior if not used carefully.
316  *
317  * @param cls message queue to send the next message with
318  */
319 static void
320 impl_send_continue (void *cls)
321 {
322   struct GNUNET_MQ_Handle *mq = cls;
323   struct GNUNET_MQ_Envelope *current_envelope;
324
325   mq->continue_task = NULL;
326   /* call is only valid if we're actually currently sending
327    * a message */
328   current_envelope = mq->current_envelope;
329   GNUNET_assert (NULL != current_envelope);
330   current_envelope->parent_queue = NULL;
331   if (NULL == mq->envelope_head)
332   {
333     mq->current_envelope = NULL;
334   }
335   else
336   {
337     mq->current_envelope = mq->envelope_head;
338     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
339                                  mq->envelope_tail,
340                                  mq->current_envelope);
341     mq->send_impl (mq,
342                    mq->current_envelope->mh,
343                    mq->impl_state);
344   }
345   if (NULL != current_envelope->sent_cb)
346     current_envelope->sent_cb (current_envelope->sent_cls);
347   GNUNET_free (current_envelope);
348 }
349
350
351 /**
352  * Call the send implementation for the next queued message, if any.
353  * Only useful for implementing message queues, results in undefined
354  * behavior if not used carefully.
355  *
356  * @param mq message queue to send the next message with
357  */
358 void
359 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
360 {
361   GNUNET_assert (NULL == mq->continue_task);
362   mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
363                                                 mq);
364 }
365
366
367 /**
368  * Create a message queue for the specified handlers.
369  *
370  * @param send function the implements sending messages
371  * @param destroy function that implements destroying the queue
372  * @param cancel function that implements canceling a message
373  * @param impl_state for the queue, passed to 'send' and 'destroy'
374  * @param handlers array of message handlers
375  * @param error_handler handler for read and write errors
376  * @param error_handler_cls closure for @a error_handler
377  * @return a new message queue
378  */
379 struct GNUNET_MQ_Handle *
380 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
381                                GNUNET_MQ_DestroyImpl destroy,
382                                GNUNET_MQ_CancelImpl cancel,
383                                void *impl_state,
384                                const struct GNUNET_MQ_MessageHandler *handlers,
385                                GNUNET_MQ_ErrorHandler error_handler,
386                                void *error_handler_cls)
387 {
388   struct GNUNET_MQ_Handle *mq;
389   unsigned int i;
390
391   mq = GNUNET_new (struct GNUNET_MQ_Handle);
392   mq->send_impl = send;
393   mq->destroy_impl = destroy;
394   mq->cancel_impl = cancel;
395   if (NULL != handlers)
396   {
397     for (i=0;NULL != handlers[i].cb; i++) ;
398     mq->handlers = GNUNET_new_array (i,
399                                      struct GNUNET_MQ_MessageHandler);
400     memcpy (mq->handlers,
401             handlers,
402             i * sizeof (struct GNUNET_MQ_MessageHandler));
403   }
404   mq->error_handler = error_handler;
405   mq->error_handler_cls = error_handler_cls;
406   mq->impl_state = impl_state;
407
408   return mq;
409 }
410
411
412 /**
413  * Get the message that should currently be sent.
414  * Fails if there is no current message.
415  * Only useful for implementing message queues,
416  * results in undefined behavior if not used carefully.
417  *
418  * @param mq message queue with the current message
419  * @return message to send, never NULL
420  */
421 const struct GNUNET_MessageHeader *
422 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
423 {
424   if (NULL == mq->current_envelope)
425     GNUNET_assert (0);
426   if (NULL == mq->current_envelope->mh)
427     GNUNET_assert (0);
428   return mq->current_envelope->mh;
429 }
430
431
432 /**
433  * Get the implementation state associated with the
434  * message queue.
435  *
436  * While the GNUNET_MQ_Impl* callbacks receive the
437  * implementation state, continuations that are scheduled
438  * by the implementation function often only have one closure
439  * argument, with this function it is possible to get at the
440  * implementation state when only passing the GNUNET_MQ_Handle
441  * as closure.
442  *
443  * @param mq message queue with the current message
444  * @return message to send, never NULL
445  */
446 void *
447 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
448 {
449   return mq->impl_state;
450 }
451
452
453 struct GNUNET_MQ_Envelope *
454 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
455                 uint16_t size,
456                 uint16_t type)
457 {
458   struct GNUNET_MQ_Envelope *mqm;
459
460   mqm = GNUNET_malloc (sizeof *mqm + size);
461   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
462   mqm->mh->size = htons (size);
463   mqm->mh->type = htons (type);
464   if (NULL != mhp)
465     *mhp = mqm->mh;
466   return mqm;
467 }
468
469
470 /**
471  * Implementation of the GNUNET_MQ_msg_nested_mh macro.
472  *
473  * @param mhp pointer to the message header pointer that will be changed to allocate at
474  *        the newly allocated space for the message.
475  * @param base_size size of the data before the nested message
476  * @param type type of the message in the envelope
477  * @param nested_mh the message to append to the message after base_size
478  */
479 struct GNUNET_MQ_Envelope *
480 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
481                           uint16_t base_size,
482                           uint16_t type,
483                           const struct GNUNET_MessageHeader *nested_mh)
484 {
485   struct GNUNET_MQ_Envelope *mqm;
486   uint16_t size;
487
488   if (NULL == nested_mh)
489     return GNUNET_MQ_msg_ (mhp, base_size, type);
490
491   size = base_size + ntohs (nested_mh->size);
492
493   /* check for uint16_t overflow */
494   if (size < base_size)
495     return NULL;
496
497   mqm = GNUNET_MQ_msg_ (mhp, size, type);
498   memcpy ((char *) mqm->mh + base_size,
499           nested_mh,
500           ntohs (nested_mh->size));
501
502   return mqm;
503 }
504
505
506 /**
507  * Transmit a queued message to the session's client.
508  *
509  * @param cls consensus session
510  * @param size number of bytes available in @a buf
511  * @param buf where the callee should write the message
512  * @return number of bytes written to @a buf
513  */
514 static size_t
515 transmit_queued (void *cls, size_t size,
516                  void *buf)
517 {
518   struct GNUNET_MQ_Handle *mq = cls;
519   struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
520   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
521   size_t msg_size;
522
523   GNUNET_assert (NULL != buf);
524
525   msg_size = ntohs (msg->size);
526   GNUNET_assert (size >= msg_size);
527   memcpy (buf, msg, msg_size);
528   state->th = NULL;
529
530   GNUNET_MQ_impl_send_continue (mq);
531
532   return msg_size;
533 }
534
535
536 static void
537 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
538                             void *impl_state)
539 {
540   struct ServerClientSocketState *state = impl_state;
541
542   if (NULL != state->th)
543   {
544     GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
545     state->th = NULL;
546   }
547
548   GNUNET_assert (NULL != mq);
549   GNUNET_assert (NULL != state);
550   GNUNET_SERVER_client_drop (state->client);
551   GNUNET_free (state);
552 }
553
554
555 static void
556 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
557                          const struct GNUNET_MessageHeader *msg,
558                          void *impl_state)
559 {
560   struct ServerClientSocketState *state = impl_state;
561
562   GNUNET_assert (NULL != mq);
563   GNUNET_assert (NULL != state);
564   state->th = GNUNET_SERVER_notify_transmit_ready (state->client,
565                                                    ntohs (msg->size),
566                                                    GNUNET_TIME_UNIT_FOREVER_REL,
567                                                    &transmit_queued, mq);
568 }
569
570
571 struct GNUNET_MQ_Handle *
572 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
573 {
574   struct GNUNET_MQ_Handle *mq;
575   struct ServerClientSocketState *scss;
576
577   mq = GNUNET_new (struct GNUNET_MQ_Handle);
578   scss = GNUNET_new (struct ServerClientSocketState);
579   mq->impl_state = scss;
580   scss->client = client;
581   GNUNET_SERVER_client_keep (client);
582   mq->send_impl = server_client_send_impl;
583   mq->destroy_impl = server_client_destroy_impl;
584   return mq;
585 }
586
587
588 /**
589  * Type of a function to call when we receive a message
590  * from the service.
591  *
592  * @param cls closure
593  * @param msg message received, NULL on timeout or fatal error
594  */
595 static void
596 handle_client_message (void *cls,
597                        const struct GNUNET_MessageHeader *msg)
598 {
599   struct GNUNET_MQ_Handle *mq = cls;
600   struct ClientConnectionState *state;
601
602   state = mq->impl_state;
603   if (NULL == msg)
604   {
605     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
606     return;
607   }
608   GNUNET_CLIENT_receive (state->connection,
609                          &handle_client_message,
610                          mq,
611                          GNUNET_TIME_UNIT_FOREVER_REL);
612   GNUNET_MQ_inject_message (mq, msg);
613 }
614
615
616 /**
617  * Transmit a queued message to the session's client.
618  *
619  * @param cls consensus session
620  * @param size number of bytes available in @a buf
621  * @param buf where the callee should write the message
622  * @return number of bytes written to buf
623  */
624 static size_t
625 connection_client_transmit_queued (void *cls,
626                                    size_t size,
627                                    void *buf)
628 {
629   struct GNUNET_MQ_Handle *mq = cls;
630   const struct GNUNET_MessageHeader *msg;
631   struct ClientConnectionState *state = mq->impl_state;
632   size_t msg_size;
633
634   GNUNET_assert (NULL != mq);
635   msg = GNUNET_MQ_impl_current (mq);
636
637   if (NULL == buf)
638   {
639     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
640     return 0;
641   }
642
643   if ( (GNUNET_YES == state->receive_requested) &&
644        (GNUNET_NO == state->receive_active) )
645   {
646     state->receive_active = GNUNET_YES;
647     GNUNET_CLIENT_receive (state->connection,
648                            &handle_client_message,
649                            mq,
650                            GNUNET_TIME_UNIT_FOREVER_REL);
651   }
652
653   msg_size = ntohs (msg->size);
654   GNUNET_assert (size >= msg_size);
655   memcpy (buf, msg, msg_size);
656   state->th = NULL;
657
658   GNUNET_MQ_impl_send_continue (mq);
659
660   return msg_size;
661 }
662
663
664 static void
665 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
666                                 void *impl_state)
667 {
668   struct ClientConnectionState *state = impl_state;
669
670   GNUNET_CLIENT_disconnect (state->connection);
671   GNUNET_free (impl_state);
672 }
673
674
675 static void
676 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
677                              const struct GNUNET_MessageHeader *msg,
678                              void *impl_state)
679 {
680   struct ClientConnectionState *state = impl_state;
681
682   GNUNET_assert (NULL != state);
683   GNUNET_assert (NULL == state->th);
684   state->th =
685       GNUNET_CLIENT_notify_transmit_ready (state->connection,
686                                            ntohs (msg->size),
687                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
688                                            &connection_client_transmit_queued, mq);
689   GNUNET_assert (NULL != state->th);
690 }
691
692
693 static void
694 connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
695                                void *impl_state)
696 {
697   struct ClientConnectionState *state = impl_state;
698
699   GNUNET_assert (NULL != state->th);
700   GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
701   state->th = NULL;
702 }
703
704
705 struct GNUNET_MQ_Handle *
706 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
707                                        const struct GNUNET_MQ_MessageHandler *handlers,
708                                        GNUNET_MQ_ErrorHandler error_handler,
709                                        void *error_handler_cls)
710 {
711   struct GNUNET_MQ_Handle *mq;
712   struct ClientConnectionState *state;
713   unsigned int i;
714
715   mq = GNUNET_new (struct GNUNET_MQ_Handle);
716   if (NULL != handlers)
717   {
718     for (i=0;NULL != handlers[i].cb; i++) ;
719     mq->handlers = GNUNET_new_array (i,
720                                      struct GNUNET_MQ_MessageHandler);
721     memcpy (mq->handlers,
722             handlers,
723             i * sizeof (struct GNUNET_MQ_MessageHandler));
724   }
725   mq->error_handler = error_handler;
726   mq->error_handler_cls = error_handler_cls;
727   state = GNUNET_new (struct ClientConnectionState);
728   state->connection = connection;
729   mq->impl_state = state;
730   mq->send_impl = &connection_client_send_impl;
731   mq->destroy_impl = &connection_client_destroy_impl;
732   mq->cancel_impl = &connection_client_cancel_impl;
733   if (NULL != handlers)
734     state->receive_requested = GNUNET_YES;
735
736   return mq;
737 }
738
739
740 /**
741  * Associate the assoc_data in mq with a unique request id.
742  *
743  * @param mq message queue, id will be unique for the queue
744  * @param assoc_data to associate
745  */
746 uint32_t
747 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
748                      void *assoc_data)
749 {
750   uint32_t id;
751
752   if (NULL == mq->assoc_map)
753   {
754     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
755     mq->assoc_id = 1;
756   }
757   id = mq->assoc_id++;
758   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
759                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
760   return id;
761 }
762
763
764 void *
765 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
766                      uint32_t request_id)
767 {
768   if (NULL == mq->assoc_map)
769     return NULL;
770   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
771 }
772
773
774 void *
775 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
776                         uint32_t request_id)
777 {
778   void *val;
779
780   if (NULL == mq->assoc_map)
781     return NULL;
782   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
783                                              request_id);
784   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
785                                               request_id);
786   return val;
787 }
788
789
790 void
791 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
792                        GNUNET_MQ_NotifyCallback cb,
793                        void *cls)
794 {
795   mqm->sent_cb = cb;
796   mqm->sent_cls = cls;
797 }
798
799
800 void
801 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
802 {
803   if (NULL != mq->destroy_impl)
804   {
805     mq->destroy_impl (mq, mq->impl_state);
806   }
807   if (NULL != mq->continue_task)
808   {
809     GNUNET_SCHEDULER_cancel (mq->continue_task);
810     mq->continue_task = NULL;
811   }
812   while (NULL != mq->envelope_head)
813   {
814     struct GNUNET_MQ_Envelope *ev;
815
816     ev = mq->envelope_head;
817     ev->parent_queue = NULL;
818     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
819                                  mq->envelope_tail,
820                                  ev);
821     GNUNET_MQ_discard (ev);
822   }
823   if (NULL != mq->current_envelope)
824   {
825     /* we can only discard envelopes that
826      * are not queued! */
827     mq->current_envelope->parent_queue = NULL;
828     GNUNET_MQ_discard (mq->current_envelope);
829     mq->current_envelope = NULL;
830   }
831   if (NULL != mq->assoc_map)
832   {
833     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
834     mq->assoc_map = NULL;
835   }
836   GNUNET_free_non_null (mq->handlers);
837   GNUNET_free (mq);
838 }
839
840
841 const struct GNUNET_MessageHeader *
842 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
843                               uint16_t base_size)
844 {
845   uint16_t whole_size;
846   uint16_t nested_size;
847   const struct GNUNET_MessageHeader *nested_msg;
848
849   whole_size = ntohs (mh->size);
850   GNUNET_assert (whole_size >= base_size);
851   nested_size = whole_size - base_size;
852   if (0 == nested_size)
853     return NULL;
854   if (nested_size < sizeof (struct GNUNET_MessageHeader))
855   {
856     GNUNET_break_op (0);
857     return NULL;
858   }
859   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
860   if (ntohs (nested_msg->size) != nested_size)
861   {
862     GNUNET_break_op (0);
863     return NULL;
864   }
865   return nested_msg;
866 }
867
868
869 /**
870  * Cancel sending the message. Message must have been sent with
871  * #GNUNET_MQ_send before.  May not be called after the notify sent
872  * callback has been called
873  *
874  * @param ev queued envelope to cancel
875  */
876 void
877 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
878 {
879   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
880
881   GNUNET_assert (NULL != mq);
882   GNUNET_assert (NULL != mq->cancel_impl);
883
884   if (mq->current_envelope == ev)
885   {
886     // complex case, we already started with transmitting
887     // the message
888     mq->cancel_impl (mq,
889                      mq->impl_state);
890     // continue sending the next message, if any
891     if (NULL == mq->envelope_head)
892     {
893       mq->current_envelope = NULL;
894     }
895     else
896     {
897       mq->current_envelope = mq->envelope_head;
898       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
899                                    mq->envelope_tail,
900                                    mq->current_envelope);
901       mq->send_impl (mq,
902                      mq->current_envelope->mh,
903                      mq->impl_state);
904     }
905   }
906   else
907   {
908     // simple case, message is still waiting in the queue
909     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
910                                  mq->envelope_tail,
911                                  ev);
912   }
913
914   ev->parent_queue = NULL;
915   ev->mh = NULL;
916   GNUNET_free (ev);
917 }
918
919 /* end of mq.c */