941a5c43eeca094d3f74509f14a42d8594980b8a
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header,
48    * usually points to the end of the containing GNUNET_MQ_Envelope
49    */
50   struct GNUNET_MessageHeader *mh;
51
52   /**
53    * Queue the message is queued in, NULL if message is not queued.
54    */
55   struct GNUNET_MQ_Handle *parent_queue;
56
57   /**
58    * Called after the message was sent irrevocably.
59    */
60   GNUNET_MQ_NotifyCallback sent_cb;
61
62   /**
63    * Closure for send_cb
64    */
65   void *sent_cls;
66 };
67
68
69 /**
70  * Handle to a message queue.
71  */
72 struct GNUNET_MQ_Handle
73 {
74   /**
75    * Handlers array, or NULL if the queue should not receive messages
76    */
77   const struct GNUNET_MQ_MessageHandler *handlers;
78
79   /**
80    * Closure for the handler callbacks,
81    * as well as for the error handler.
82    */
83   void *handlers_cls;
84
85   /**
86    * Actual implementation of message sending,
87    * called when a message is added
88    */
89   GNUNET_MQ_SendImpl send_impl;
90
91   /**
92    * Implementation-dependent queue destruction function
93    */
94   GNUNET_MQ_DestroyImpl destroy_impl;
95
96   /**
97    * Implementation-dependent send cancel function
98    */
99   GNUNET_MQ_CancelImpl cancel_impl;
100
101   /**
102    * Implementation-specific state
103    */
104   void *impl_state;
105
106   /**
107    * Callback will be called when an error occurs.
108    */
109   GNUNET_MQ_ErrorHandler error_handler;
110
111   /**
112    * Linked list of messages pending to be sent
113    */
114   struct GNUNET_MQ_Envelope *envelope_head;
115
116   /**
117    * Linked list of messages pending to be sent
118    */
119   struct GNUNET_MQ_Envelope *envelope_tail;
120
121   /**
122    * Message that is currently scheduled to be
123    * sent. Not the head of the message queue, as the implementation
124    * needs to know if sending has been already scheduled or not.
125    */
126   struct GNUNET_MQ_Envelope *current_envelope;
127
128   /**
129    * Map of associations, lazily allocated
130    */
131   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
132
133   /**
134    * Task scheduled during #GNUNET_MQ_impl_send_continue.
135    */
136   GNUNET_SCHEDULER_TaskIdentifier continue_task;
137
138   /**
139    * Next id that should be used for the assoc_map,
140    * initialized lazily to a random value together with
141    * assoc_map
142    */
143   uint32_t assoc_id;
144 };
145
146
147
148
149 struct ServerClientSocketState
150 {
151   struct GNUNET_SERVER_Client *client;
152   struct GNUNET_SERVER_TransmitHandle* th;
153 };
154
155
156 struct ClientConnectionState
157 {
158   /**
159    * Did we call receive alread alreadyy?
160    */
161   int receive_active;
162
163   /**
164    * Do we also want to receive?
165    */
166   int receive_requested;
167   struct GNUNET_CLIENT_Connection *connection;
168   struct GNUNET_CLIENT_TransmitHandle *th;
169 };
170
171
172 /**
173  * Call the message message handler that was registered
174  * for the type of the given message in the given message queue.
175  *
176  * This function is indended to be used for the implementation
177  * of message queues.
178  *
179  * @param mq message queue with the handlers
180  * @param mh message to dispatch
181  */
182 void
183 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
184 {
185   const struct GNUNET_MQ_MessageHandler *handler;
186   int handled = GNUNET_NO;
187
188   handler = mq->handlers;
189   if (NULL == handler)
190     return;
191   for (; NULL != handler->cb; handler++)
192   {
193     if (handler->type == ntohs (mh->type))
194     {
195       handler->cb (mq->handlers_cls, mh);
196       handled = GNUNET_YES;
197     }
198   }
199
200   if (GNUNET_NO == handled)
201     LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
202 }
203
204
205 /**
206  * Call the error handler of a message queue with the given
207  * error code.  If there is no error handler, log a warning.
208  *
209  * This function is intended to be used by the implementation
210  * of message queues.
211  *
212  * @param mq message queue
213  * @param error the error type
214  */
215 void
216 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
217                         enum GNUNET_MQ_Error error)
218 {
219   if (NULL == mq->error_handler)
220   {
221     /* FIXME: log what kind of error occured */
222     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n");
223     return;
224   }
225   mq->error_handler (mq->handlers_cls, error);
226 }
227
228
229 void
230 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
231 {
232   GNUNET_assert (NULL == mqm->parent_queue);
233   GNUNET_free (mqm);
234 }
235
236
237 /**
238  * Send a message with the give message queue.
239  * May only be called once per message.
240  *
241  * @param mq message queue
242  * @param ev the envelope with the message to send.
243  */
244 void
245 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
246 {
247   GNUNET_assert (NULL != mq);
248   GNUNET_assert (NULL == ev->parent_queue);
249
250   ev->parent_queue = mq;
251
252   /* is the implementation busy? queue it! */
253   if (NULL != mq->current_envelope)
254   {
255     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev);
256     return;
257   }
258   mq->current_envelope = ev;
259   mq->send_impl (mq, ev->mh, mq->impl_state);
260 }
261
262
263 /**
264  * Task run to call the send implementation for the next queued
265  * message, if any.  Only useful for implementing message queues,
266  * results in undefined behavior if not used carefully.
267  *
268  * @param cls message queue to send the next message with
269  * @param tc scheduler context
270  */
271 static void
272 impl_send_continue (void *cls,
273                     const struct GNUNET_SCHEDULER_TaskContext *tc)
274 {
275   struct GNUNET_MQ_Handle *mq = cls;
276   struct GNUNET_MQ_Envelope *current_envelope;
277
278   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
279     return;
280
281   mq->continue_task = GNUNET_SCHEDULER_NO_TASK;
282   /* call is only valid if we're actually currently sending
283    * a message */
284   current_envelope = mq->current_envelope;
285   GNUNET_assert (NULL != current_envelope);
286   current_envelope->parent_queue = NULL;
287   if (NULL == mq->envelope_head)
288   {
289     mq->current_envelope = NULL;
290   }
291   else
292   {
293     mq->current_envelope = mq->envelope_head;
294     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
295                                  mq->envelope_tail,
296                                  mq->current_envelope);
297     mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
298   }
299   if (NULL != current_envelope->sent_cb)
300     current_envelope->sent_cb (current_envelope->sent_cls);
301   GNUNET_free (current_envelope);
302 }
303
304
305 /**
306  * Call the send implementation for the next queued message,
307  * if any.
308  * Only useful for implementing message queues,
309  * results in undefined behavior if not used carefully.
310  *
311  * @param mq message queue to send the next message with
312  */
313 void
314 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
315 {
316   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == mq->continue_task);
317   mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
318                                                 mq);
319 }
320
321
322 /**
323  * Create a message queue for the specified handlers.
324  *
325  * @param send function the implements sending messages
326  * @param destroy function that implements destroying the queue
327  * @param cancel function that implements canceling a message
328  * @param impl_state for the queue, passed to 'send' and 'destroy'
329  * @param handlers array of message handlers
330  * @param error_handler handler for read and write errors
331  * @param cls closure for message handlers and error handler
332  * @return a new message queue
333  */
334 struct GNUNET_MQ_Handle *
335 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
336                                GNUNET_MQ_DestroyImpl destroy,
337                                GNUNET_MQ_CancelImpl cancel,
338                                void *impl_state,
339                                const struct GNUNET_MQ_MessageHandler *handlers,
340                                GNUNET_MQ_ErrorHandler error_handler,
341                                void *cls)
342 {
343   struct GNUNET_MQ_Handle *mq;
344
345   mq = GNUNET_new (struct GNUNET_MQ_Handle);
346   mq->send_impl = send;
347   mq->destroy_impl = destroy;
348   mq->cancel_impl = cancel;
349   mq->handlers = handlers;
350   mq->handlers_cls = cls;
351   mq->impl_state = impl_state;
352
353   return mq;
354 }
355
356
357 /**
358  * Get the message that should currently be sent.
359  * Fails if there is no current message.
360  * Only useful for implementing message queues,
361  * results in undefined behavior if not used carefully.
362  *
363  * @param mq message queue with the current message
364  * @return message to send, never NULL
365  */
366 const struct GNUNET_MessageHeader *
367 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
368 {
369   if (NULL == mq->current_envelope)
370     GNUNET_abort ();
371   if (NULL == mq->current_envelope->mh)
372     GNUNET_abort ();
373   return mq->current_envelope->mh;
374 }
375
376
377 /**
378  * Get the implementation state associated with the
379  * message queue.
380  *
381  * While the GNUNET_MQ_Impl* callbacks receive the
382  * implementation state, continuations that are scheduled
383  * by the implementation function often only have one closure
384  * argument, with this function it is possible to get at the
385  * implementation state when only passing the GNUNET_MQ_Handle
386  * as closure.
387  *
388  * @param mq message queue with the current message
389  * @return message to send, never NULL
390  */
391 void *
392 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
393 {
394   return mq->impl_state;
395 }
396
397
398 struct GNUNET_MQ_Envelope *
399 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
400 {
401   struct GNUNET_MQ_Envelope *mqm;
402
403   mqm = GNUNET_malloc (sizeof *mqm + size);
404   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
405   mqm->mh->size = htons (size);
406   mqm->mh->type = htons (type);
407   if (NULL != mhp)
408     *mhp = mqm->mh;
409   return mqm;
410 }
411
412
413 /**
414  * Implementation of the GNUNET_MQ_msg_nested_mh macro.
415  *
416  * @param mhp pointer to the message header pointer that will be changed to allocate at
417  *        the newly allocated space for the message.
418  * @param base_size size of the data before the nested message
419  * @param type type of the message in the envelope
420  * @param nested_mh the message to append to the message after base_size
421  */
422 struct GNUNET_MQ_Envelope *
423 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
424                           const struct GNUNET_MessageHeader *nested_mh)
425 {
426   struct GNUNET_MQ_Envelope *mqm;
427   uint16_t size;
428
429   if (NULL == nested_mh)
430     return GNUNET_MQ_msg_ (mhp, base_size, type);
431
432   size = base_size + ntohs (nested_mh->size);
433
434   /* check for uint16_t overflow */
435   if (size < base_size)
436     return NULL;
437
438   mqm = GNUNET_MQ_msg_ (mhp, size, type);
439   memcpy ((char *) mqm->mh + base_size, nested_mh, ntohs (nested_mh->size));
440
441   return mqm;
442 }
443
444
445 /**
446  * Transmit a queued message to the session's client.
447  *
448  * @param cls consensus session
449  * @param size number of bytes available in buf
450  * @param buf where the callee should write the message
451  * @return number of bytes written to buf
452  */
453 static size_t
454 transmit_queued (void *cls, size_t size,
455                  void *buf)
456 {
457   struct GNUNET_MQ_Handle *mq = cls;
458   struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
459   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
460   size_t msg_size;
461
462   GNUNET_assert (NULL != buf);
463
464   msg_size = ntohs (msg->size);
465   GNUNET_assert (size >= msg_size);
466   memcpy (buf, msg, msg_size);
467   state->th = NULL;
468
469   GNUNET_MQ_impl_send_continue (mq);
470
471   return msg_size;
472 }
473
474
475 static void
476 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
477                             void *impl_state)
478 {
479   struct ServerClientSocketState *state = impl_state;
480
481   if (NULL != state->th)
482   {
483     GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
484     state->th = NULL;
485   }
486
487   GNUNET_assert (NULL != mq);
488   GNUNET_assert (NULL != state);
489   GNUNET_SERVER_client_drop (state->client);
490   GNUNET_free (state);
491 }
492
493
494 static void
495 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
496                          const struct GNUNET_MessageHeader *msg, void *impl_state)
497 {
498   struct ServerClientSocketState *state = impl_state;
499
500   GNUNET_assert (NULL != mq);
501   GNUNET_assert (NULL != state);
502   state->th =
503       GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
504                                            GNUNET_TIME_UNIT_FOREVER_REL,
505                                            &transmit_queued, mq);
506 }
507
508
509 struct GNUNET_MQ_Handle *
510 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
511 {
512   struct GNUNET_MQ_Handle *mq;
513   struct ServerClientSocketState *scss;
514
515   mq = GNUNET_new (struct GNUNET_MQ_Handle);
516   scss = GNUNET_new (struct ServerClientSocketState);
517   mq->impl_state = scss;
518   scss->client = client;
519   GNUNET_SERVER_client_keep (client);
520   mq->send_impl = server_client_send_impl;
521   mq->destroy_impl = server_client_destroy_impl;
522   return mq;
523 }
524
525
526 /**
527  * Type of a function to call when we receive a message
528  * from the service.
529  *
530  * @param cls closure
531  * @param msg message received, NULL on timeout or fatal error
532  */
533 static void
534 handle_client_message (void *cls,
535                        const struct GNUNET_MessageHeader *msg)
536 {
537   struct GNUNET_MQ_Handle *mq = cls;
538   struct ClientConnectionState *state;
539
540   state = mq->impl_state;
541
542   if (NULL == msg)
543   {
544     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
545     return;
546   }
547
548   GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
549                          GNUNET_TIME_UNIT_FOREVER_REL);
550
551   GNUNET_MQ_inject_message (mq, msg);
552 }
553
554
555 /**
556  * Transmit a queued message to the session's client.
557  *
558  * @param cls consensus session
559  * @param size number of bytes available in @a buf
560  * @param buf where the callee should write the message
561  * @return number of bytes written to buf
562  */
563 static size_t
564 connection_client_transmit_queued (void *cls,
565                                    size_t size,
566                                    void *buf)
567 {
568   struct GNUNET_MQ_Handle *mq = cls;
569   const struct GNUNET_MessageHeader *msg;
570   struct ClientConnectionState *state = mq->impl_state;
571   size_t msg_size;
572
573   GNUNET_assert (NULL != mq);
574   msg = GNUNET_MQ_impl_current (mq);
575
576   if (NULL == buf)
577   {
578     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
579     return 0;
580   }
581
582   if ( (GNUNET_YES == state->receive_requested) &&
583        (GNUNET_NO == state->receive_active) )
584   {
585     state->receive_active = GNUNET_YES;
586     GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
587                            GNUNET_TIME_UNIT_FOREVER_REL);
588   }
589
590
591   msg_size = ntohs (msg->size);
592   GNUNET_assert (size >= msg_size);
593   memcpy (buf, msg, msg_size);
594   state->th = NULL;
595
596   GNUNET_MQ_impl_send_continue (mq);
597
598   return msg_size;
599 }
600
601
602 static void
603 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
604 {
605   GNUNET_free (impl_state);
606 }
607
608
609 static void
610 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
611                              const struct GNUNET_MessageHeader *msg, void *impl_state)
612 {
613   struct ClientConnectionState *state = impl_state;
614
615   GNUNET_assert (NULL != state);
616   GNUNET_assert (NULL == state->th);
617   state->th =
618       GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
619                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
620                                            &connection_client_transmit_queued, mq);
621   GNUNET_assert (NULL != state->th);
622 }
623
624
625 static void
626 connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
627                                void *impl_state)
628 {
629   struct ClientConnectionState *state = impl_state;
630   GNUNET_assert (NULL != state->th);
631   GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
632   state->th = NULL;
633 }
634
635
636 struct GNUNET_MQ_Handle *
637 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
638                                        const struct GNUNET_MQ_MessageHandler *handlers,
639                                        GNUNET_MQ_ErrorHandler error_handler,
640                                        void *cls)
641 {
642   struct GNUNET_MQ_Handle *mq;
643   struct ClientConnectionState *state;
644
645   GNUNET_assert (NULL != connection);
646
647   mq = GNUNET_new (struct GNUNET_MQ_Handle);
648   mq->handlers = handlers;
649   mq->error_handler = error_handler;
650   mq->handlers_cls = cls;
651   state = GNUNET_new (struct ClientConnectionState);
652   state->connection = connection;
653   mq->impl_state = state;
654   mq->send_impl = connection_client_send_impl;
655   mq->destroy_impl = connection_client_destroy_impl;
656   mq->cancel_impl = connection_client_cancel_impl;
657   if (NULL != handlers)
658     state->receive_requested = GNUNET_YES;
659
660   return mq;
661 }
662
663
664 void
665 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq,
666                             const struct GNUNET_MQ_MessageHandler *new_handlers,
667                             void *cls)
668 {
669   /* FIXME: notify implementation? */
670   /* FIXME: what about NULL handlers? abort receive? */
671   mq->handlers = new_handlers;
672   mq->handlers_cls = cls;
673 }
674
675
676 /**
677  * Associate the assoc_data in mq with a unique request id.
678  *
679  * @param mq message queue, id will be unique for the queue
680  * @param assoc_data to associate
681  */
682 uint32_t
683 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
684                      void *assoc_data)
685 {
686   uint32_t id;
687
688   if (NULL == mq->assoc_map)
689   {
690     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
691     mq->assoc_id = 1;
692   }
693   id = mq->assoc_id++;
694   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
695                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
696   return id;
697 }
698
699
700 void *
701 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
702 {
703   if (NULL == mq->assoc_map)
704     return NULL;
705   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
706 }
707
708
709 void *
710 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
711 {
712   void *val;
713
714   if (NULL == mq->assoc_map)
715     return NULL;
716   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
717   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
718   return val;
719 }
720
721
722 void
723 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
724                        GNUNET_MQ_NotifyCallback cb,
725                        void *cls)
726 {
727   mqm->sent_cb = cb;
728   mqm->sent_cls = cls;
729 }
730
731
732 void
733 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
734 {
735   if (NULL != mq->destroy_impl)
736   {
737     mq->destroy_impl (mq, mq->impl_state);
738   }
739   if (GNUNET_SCHEDULER_NO_TASK != mq->continue_task)
740   {
741     GNUNET_SCHEDULER_cancel (mq->continue_task);
742     mq->continue_task = GNUNET_SCHEDULER_NO_TASK;
743   }
744   while (NULL != mq->envelope_head)
745   {
746     struct GNUNET_MQ_Envelope *ev;
747     ev = mq->envelope_head;
748     GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
749     GNUNET_MQ_discard (ev);
750   }
751
752   if (NULL != mq->current_envelope)
753   {
754     GNUNET_MQ_discard (mq->current_envelope);
755     mq->current_envelope = NULL;
756   }
757
758   if (NULL != mq->assoc_map)
759   {
760     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
761     mq->assoc_map = NULL;
762   }
763
764   GNUNET_free (mq);
765 }
766
767
768 struct GNUNET_MessageHeader *
769 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
770 {
771   uint16_t whole_size;
772   uint16_t nested_size;
773   struct GNUNET_MessageHeader *nested_msg;
774
775   whole_size = ntohs (mh->size);
776   GNUNET_assert (whole_size >= base_size);
777
778   nested_size = whole_size - base_size;
779
780   if (0 == nested_size)
781     return NULL;
782
783   if (nested_size < sizeof (struct GNUNET_MessageHeader))
784   {
785     GNUNET_break_op (0);
786     return NULL;
787   }
788
789   nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size);
790
791   if (ntohs (nested_msg->size) != nested_size)
792   {
793     GNUNET_break_op (0);
794     nested_msg->size = htons (nested_size);
795   }
796
797   return nested_msg;
798 }
799
800
801 /**
802  * Cancel sending the message. Message must have been sent with
803  * #GNUNET_MQ_send before.  May not be called after the notify sent
804  * callback has been called
805  *
806  * @param ev queued envelope to cancel
807  */
808 void
809 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
810 {
811   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
812
813   GNUNET_assert (NULL != mq);
814   GNUNET_assert (NULL != mq->cancel_impl);
815
816   if (mq->current_envelope == ev) {
817     // complex case, we already started with transmitting
818     // the message
819     mq->cancel_impl (mq, mq->impl_state);
820     // continue sending the next message, if any
821     if (NULL == mq->envelope_head)
822     {
823       mq->current_envelope = NULL;
824     }
825     else
826     {
827       mq->current_envelope = mq->envelope_head;
828       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
829                                    mq->envelope_tail,
830                                    mq->current_envelope);
831       mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
832     }
833   } else {
834     // simple case, message is still waiting in the queue
835     GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
836   }
837
838   ev->parent_queue = NULL;
839   ev->mh = NULL;
840   GNUNET_free (ev);
841 }
842