-doxygen, error messages
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header,
48    * usually points to the end of the containing GNUNET_MQ_Envelope
49    */
50   struct GNUNET_MessageHeader *mh;
51
52   /**
53    * Queue the message is queued in, NULL if message is not queued.
54    */
55   struct GNUNET_MQ_Handle *parent_queue;
56
57   /**
58    * Called after the message was sent irrevocably.
59    */
60   GNUNET_MQ_NotifyCallback sent_cb;
61
62   /**
63    * Closure for @e send_cb
64    */
65   void *sent_cls;
66 };
67
68
69 /**
70  * Handle to a message queue.
71  */
72 struct GNUNET_MQ_Handle
73 {
74   /**
75    * Handlers array, or NULL if the queue should not receive messages
76    */
77   struct GNUNET_MQ_MessageHandler *handlers;
78
79   /**
80    * Actual implementation of message sending,
81    * called when a message is added
82    */
83   GNUNET_MQ_SendImpl send_impl;
84
85   /**
86    * Implementation-dependent queue destruction function
87    */
88   GNUNET_MQ_DestroyImpl destroy_impl;
89
90   /**
91    * Implementation-dependent send cancel function
92    */
93   GNUNET_MQ_CancelImpl cancel_impl;
94
95   /**
96    * Implementation-specific state
97    */
98   void *impl_state;
99
100   /**
101    * Callback will be called when an error occurs.
102    */
103   GNUNET_MQ_ErrorHandler error_handler;
104
105   /**
106    * Closure for the error handler.
107    */
108   void *error_handler_cls;
109
110   /**
111    * Linked list of messages pending to be sent
112    */
113   struct GNUNET_MQ_Envelope *envelope_head;
114
115   /**
116    * Linked list of messages pending to be sent
117    */
118   struct GNUNET_MQ_Envelope *envelope_tail;
119
120   /**
121    * Message that is currently scheduled to be
122    * sent. Not the head of the message queue, as the implementation
123    * needs to know if sending has been already scheduled or not.
124    */
125   struct GNUNET_MQ_Envelope *current_envelope;
126
127   /**
128    * Map of associations, lazily allocated
129    */
130   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
131
132   /**
133    * Task scheduled during #GNUNET_MQ_impl_send_continue.
134    */
135   struct GNUNET_SCHEDULER_Task *continue_task;
136
137   /**
138    * Next id that should be used for the @e assoc_map,
139    * initialized lazily to a random value together with
140    * @e assoc_map
141    */
142   uint32_t assoc_id;
143
144   /**
145    * Number of entries we have in the envelope-DLL.
146    */
147   unsigned int queue_length;
148 };
149
150
151 /**
152  * Implementation-specific state for connection to
153  * client (MQ for server).
154  */
155 struct ServerClientSocketState
156 {
157   /**
158    * Handle of the client that connected to the server.
159    */
160   struct GNUNET_SERVER_Client *client;
161
162   /**
163    * Active transmission request to the client.
164    */
165   struct GNUNET_SERVER_TransmitHandle* th;
166 };
167
168
169 /**
170  * Implementation-specific state for connection to
171  * service (MQ for clients).
172  */
173 struct ClientConnectionState
174 {
175   /**
176    * Did we call receive alread alreadyy?
177    */
178   int receive_active;
179
180   /**
181    * Do we also want to receive?
182    */
183   int receive_requested;
184
185   /**
186    * Connection to the service.
187    */
188   struct GNUNET_CLIENT_Connection *connection;
189
190   /**
191    * Active transmission request (or NULL).
192    */
193   struct GNUNET_CLIENT_TransmitHandle *th;
194 };
195
196
197 /**
198  * Call the message message handler that was registered
199  * for the type of the given message in the given message queue.
200  *
201  * This function is indended to be used for the implementation
202  * of message queues.
203  *
204  * @param mq message queue with the handlers
205  * @param mh message to dispatch
206  */
207 void
208 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
209                           const struct GNUNET_MessageHeader *mh)
210 {
211   const struct GNUNET_MQ_MessageHandler *handler;
212   int handled = GNUNET_NO;
213   uint16_t ms = ntohs (mh->size);
214
215   if (NULL == mq->handlers)
216     goto done;
217   for (handler = mq->handlers; NULL != handler->cb; handler++)
218   {
219     if (handler->type == ntohs (mh->type))
220     {
221       handled = GNUNET_YES;
222       if ( (handler->expected_size > ms) ||
223            ( (handler->expected_size != ms) &&
224              (NULL == handler->mv) ) )
225       {
226         /* Too small, or not an exact size and
227            no 'mv' handler to check rest */
228         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
229                     "Received malformed message of type %u\n",
230                     (unsigned int) handler->type);
231         GNUNET_MQ_inject_error (mq,
232                                 GNUNET_MQ_ERROR_MALFORMED);
233         break;
234       }
235       if ( (NULL == handler->mv) ||
236            (GNUNET_OK ==
237             handler->mv (handler->cls, mh)) )
238       {
239         /* message well-formed, pass to handler */
240         handler->cb (handler->cls, mh);
241       }
242       else
243       {
244         /* Message rejected by check routine */
245         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
246                     "Received malformed message of type %u\n",
247                     (unsigned int) handler->type);
248         GNUNET_MQ_inject_error (mq,
249                                 GNUNET_MQ_ERROR_MALFORMED);
250       }
251       break;
252     }
253   }
254  done:
255   if (GNUNET_NO == handled)
256     LOG (GNUNET_ERROR_TYPE_WARNING,
257          "No handler for message of type %d\n",
258          ntohs (mh->type));
259 }
260
261
262 /**
263  * Call the error handler of a message queue with the given
264  * error code.  If there is no error handler, log a warning.
265  *
266  * This function is intended to be used by the implementation
267  * of message queues.
268  *
269  * @param mq message queue
270  * @param error the error type
271  */
272 void
273 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
274                         enum GNUNET_MQ_Error error)
275 {
276   if (NULL == mq->error_handler)
277   {
278     LOG (GNUNET_ERROR_TYPE_WARNING,
279          "Got error %d, but no handler installed\n",
280          (int) error);
281     return;
282   }
283   mq->error_handler (mq->error_handler_cls,
284                      error);
285 }
286
287
288 /**
289  * Discard the message queue message, free all
290  * allocated resources. Must be called in the event
291  * that a message is created but should not actually be sent.
292  *
293  * @param mqm the message to discard
294  */
295 void
296 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
297 {
298   GNUNET_assert (NULL == mqm->parent_queue);
299   GNUNET_free (mqm);
300 }
301
302
303 /**
304  * Obtain the current length of the message queue.
305  *
306  * @param mq queue to inspect
307  * @return number of queued, non-transmitted messages
308  */
309 unsigned int
310 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
311 {
312   return mq->queue_length;
313 }
314
315
316 /**
317  * Send a message with the give message queue.
318  * May only be called once per message.
319  *
320  * @param mq message queue
321  * @param ev the envelope with the message to send.
322  */
323 void
324 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
325                 struct GNUNET_MQ_Envelope *ev)
326 {
327   GNUNET_assert (NULL != mq);
328   GNUNET_assert (NULL == ev->parent_queue);
329
330   ev->parent_queue = mq;
331   /* is the implementation busy? queue it! */
332   if (NULL != mq->current_envelope)
333   {
334     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
335                                       mq->envelope_tail,
336                                       ev);
337     mq->queue_length++;
338     return;
339   }
340   mq->current_envelope = ev;
341   mq->send_impl (mq, ev->mh, mq->impl_state);
342 }
343
344
345 /**
346  * Send a copy of a message with the give message queue.
347  * Can be called repeatedly on the same envelope.
348  *
349  * @param mq message queue
350  * @param ev the envelope with the message to send.
351  */
352 void
353 GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
354                      const struct GNUNET_MQ_Envelope *ev)
355 {
356   struct GNUNET_MQ_Envelope *env;
357   uint16_t msize;
358
359   msize = ntohs (ev->mh->size);
360   env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) +
361                        msize);
362   env->mh = (struct GNUNET_MessageHeader *) &env[1];
363   env->sent_cb = ev->sent_cb;
364   env->sent_cls = ev->sent_cls;
365   memcpy (&env[1],
366           ev->mh,
367           msize);
368   GNUNET_MQ_send (mq,
369                   env);
370 }
371
372
373
374 /**
375  * Task run to call the send implementation for the next queued
376  * message, if any.  Only useful for implementing message queues,
377  * results in undefined behavior if not used carefully.
378  *
379  * @param cls message queue to send the next message with
380  */
381 static void
382 impl_send_continue (void *cls)
383 {
384   struct GNUNET_MQ_Handle *mq = cls;
385   struct GNUNET_MQ_Envelope *current_envelope;
386
387   mq->continue_task = NULL;
388   /* call is only valid if we're actually currently sending
389    * a message */
390   current_envelope = mq->current_envelope;
391   GNUNET_assert (NULL != current_envelope);
392   current_envelope->parent_queue = NULL;
393   if (NULL == mq->envelope_head)
394   {
395     mq->current_envelope = NULL;
396   }
397   else
398   {
399     mq->current_envelope = mq->envelope_head;
400     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
401                                  mq->envelope_tail,
402                                  mq->current_envelope);
403     mq->queue_length--;
404     mq->send_impl (mq,
405                    mq->current_envelope->mh,
406                    mq->impl_state);
407   }
408   if (NULL != current_envelope->sent_cb)
409     current_envelope->sent_cb (current_envelope->sent_cls);
410   GNUNET_free (current_envelope);
411 }
412
413
414 /**
415  * Call the send implementation for the next queued message, if any.
416  * Only useful for implementing message queues, results in undefined
417  * behavior if not used carefully.
418  *
419  * @param mq message queue to send the next message with
420  */
421 void
422 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
423 {
424   GNUNET_assert (NULL == mq->continue_task);
425   mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
426                                                 mq);
427 }
428
429
430 /**
431  * Create a message queue for the specified handlers.
432  *
433  * @param send function the implements sending messages
434  * @param destroy function that implements destroying the queue
435  * @param cancel function that implements canceling a message
436  * @param impl_state for the queue, passed to 'send' and 'destroy'
437  * @param handlers array of message handlers
438  * @param error_handler handler for read and write errors
439  * @param error_handler_cls closure for @a error_handler
440  * @return a new message queue
441  */
442 struct GNUNET_MQ_Handle *
443 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
444                                GNUNET_MQ_DestroyImpl destroy,
445                                GNUNET_MQ_CancelImpl cancel,
446                                void *impl_state,
447                                const struct GNUNET_MQ_MessageHandler *handlers,
448                                GNUNET_MQ_ErrorHandler error_handler,
449                                void *error_handler_cls)
450 {
451   struct GNUNET_MQ_Handle *mq;
452   unsigned int i;
453
454   mq = GNUNET_new (struct GNUNET_MQ_Handle);
455   mq->send_impl = send;
456   mq->destroy_impl = destroy;
457   mq->cancel_impl = cancel;
458   if (NULL != handlers)
459   {
460     for (i=0;NULL != handlers[i].cb; i++) ;
461     mq->handlers = GNUNET_new_array (i + 1,
462                                      struct GNUNET_MQ_MessageHandler);
463     memcpy (mq->handlers,
464             handlers,
465             i * sizeof (struct GNUNET_MQ_MessageHandler));
466   }
467   mq->error_handler = error_handler;
468   mq->error_handler_cls = error_handler_cls;
469   mq->impl_state = impl_state;
470
471   return mq;
472 }
473
474
475 /**
476  * Get the message that should currently be sent.
477  * Fails if there is no current message.
478  * Only useful for implementing message queues,
479  * results in undefined behavior if not used carefully.
480  *
481  * @param mq message queue with the current message
482  * @return message to send, never NULL
483  */
484 const struct GNUNET_MessageHeader *
485 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
486 {
487   if (NULL == mq->current_envelope)
488     GNUNET_assert (0);
489   if (NULL == mq->current_envelope->mh)
490     GNUNET_assert (0);
491   return mq->current_envelope->mh;
492 }
493
494
495 /**
496  * Get the implementation state associated with the
497  * message queue.
498  *
499  * While the GNUNET_MQ_Impl* callbacks receive the
500  * implementation state, continuations that are scheduled
501  * by the implementation function often only have one closure
502  * argument, with this function it is possible to get at the
503  * implementation state when only passing the GNUNET_MQ_Handle
504  * as closure.
505  *
506  * @param mq message queue with the current message
507  * @return message to send, never NULL
508  */
509 void *
510 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
511 {
512   return mq->impl_state;
513 }
514
515
516 struct GNUNET_MQ_Envelope *
517 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
518                 uint16_t size,
519                 uint16_t type)
520 {
521   struct GNUNET_MQ_Envelope *mqm;
522
523   mqm = GNUNET_malloc (sizeof *mqm + size);
524   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
525   mqm->mh->size = htons (size);
526   mqm->mh->type = htons (type);
527   if (NULL != mhp)
528     *mhp = mqm->mh;
529   return mqm;
530 }
531
532
533 /**
534  * Create a new envelope by copying an existing message.
535  *
536  * @param hdr header of the message to copy
537  * @return envelope containing @a hdr
538  */
539 struct GNUNET_MQ_Envelope *
540 GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
541 {
542   struct GNUNET_MQ_Envelope *mqm;
543   uint16_t size = ntohs (hdr->size);
544
545   mqm = GNUNET_malloc (sizeof (*mqm) + size);
546   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
547   memcpy (mqm->mh,
548           hdr,
549           size);
550   return mqm;
551 }
552
553
554 /**
555  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
556  *
557  * @param mhp pointer to the message header pointer that will be changed to allocate at
558  *        the newly allocated space for the message.
559  * @param base_size size of the data before the nested message
560  * @param type type of the message in the envelope
561  * @param nested_mh the message to append to the message after base_size
562  */
563 struct GNUNET_MQ_Envelope *
564 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
565                           uint16_t base_size,
566                           uint16_t type,
567                           const struct GNUNET_MessageHeader *nested_mh)
568 {
569   struct GNUNET_MQ_Envelope *mqm;
570   uint16_t size;
571
572   if (NULL == nested_mh)
573     return GNUNET_MQ_msg_ (mhp, base_size, type);
574
575   size = base_size + ntohs (nested_mh->size);
576
577   /* check for uint16_t overflow */
578   if (size < base_size)
579     return NULL;
580
581   mqm = GNUNET_MQ_msg_ (mhp, size, type);
582   memcpy ((char *) mqm->mh + base_size,
583           nested_mh,
584           ntohs (nested_mh->size));
585
586   return mqm;
587 }
588
589
590 /**
591  * Transmit a queued message to the session's client.
592  *
593  * @param cls consensus session
594  * @param size number of bytes available in @a buf
595  * @param buf where the callee should write the message
596  * @return number of bytes written to @a buf
597  */
598 static size_t
599 transmit_queued (void *cls,
600                  size_t size,
601                  void *buf)
602 {
603   struct GNUNET_MQ_Handle *mq = cls;
604   struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
605   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
606   size_t msg_size;
607
608   GNUNET_assert (NULL != buf);
609   msg_size = ntohs (msg->size);
610   GNUNET_assert (size >= msg_size);
611   memcpy (buf, msg, msg_size);
612   state->th = NULL;
613
614   GNUNET_MQ_impl_send_continue (mq);
615
616   return msg_size;
617 }
618
619
620 static void
621 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
622                             void *impl_state)
623 {
624   struct ServerClientSocketState *state = impl_state;
625
626   if (NULL != state->th)
627   {
628     GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
629     state->th = NULL;
630   }
631
632   GNUNET_assert (NULL != mq);
633   GNUNET_assert (NULL != state);
634   GNUNET_SERVER_client_drop (state->client);
635   GNUNET_free (state);
636 }
637
638
639 static void
640 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
641                          const struct GNUNET_MessageHeader *msg,
642                          void *impl_state)
643 {
644   struct ServerClientSocketState *state = impl_state;
645
646   GNUNET_assert (NULL != mq);
647   GNUNET_assert (NULL != state);
648   state->th = GNUNET_SERVER_notify_transmit_ready (state->client,
649                                                    ntohs (msg->size),
650                                                    GNUNET_TIME_UNIT_FOREVER_REL,
651                                                    &transmit_queued, mq);
652 }
653
654
655 struct GNUNET_MQ_Handle *
656 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
657 {
658   struct GNUNET_MQ_Handle *mq;
659   struct ServerClientSocketState *scss;
660
661   mq = GNUNET_new (struct GNUNET_MQ_Handle);
662   scss = GNUNET_new (struct ServerClientSocketState);
663   mq->impl_state = scss;
664   scss->client = client;
665   GNUNET_SERVER_client_keep (client);
666   mq->send_impl = server_client_send_impl;
667   mq->destroy_impl = server_client_destroy_impl;
668   return mq;
669 }
670
671
672 /**
673  * Type of a function to call when we receive a message
674  * from the service.
675  *
676  * @param cls closure
677  * @param msg message received, NULL on timeout or fatal error
678  */
679 static void
680 handle_client_message (void *cls,
681                        const struct GNUNET_MessageHeader *msg)
682 {
683   struct GNUNET_MQ_Handle *mq = cls;
684   struct ClientConnectionState *state;
685
686   state = mq->impl_state;
687   if (NULL == msg)
688   {
689     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
690     return;
691   }
692   GNUNET_CLIENT_receive (state->connection,
693                          &handle_client_message,
694                          mq,
695                          GNUNET_TIME_UNIT_FOREVER_REL);
696   GNUNET_MQ_inject_message (mq, msg);
697 }
698
699
700 /**
701  * Transmit a queued message to the session's client.
702  *
703  * @param cls consensus session
704  * @param size number of bytes available in @a buf
705  * @param buf where the callee should write the message
706  * @return number of bytes written to buf
707  */
708 static size_t
709 connection_client_transmit_queued (void *cls,
710                                    size_t size,
711                                    void *buf)
712 {
713   struct GNUNET_MQ_Handle *mq = cls;
714   const struct GNUNET_MessageHeader *msg;
715   struct ClientConnectionState *state = mq->impl_state;
716   size_t msg_size;
717
718   GNUNET_assert (NULL != mq);
719   state->th = NULL;
720   msg = GNUNET_MQ_impl_current (mq);
721
722   if (NULL == buf)
723   {
724     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
725     return 0;
726   }
727
728   if ( (GNUNET_YES == state->receive_requested) &&
729        (GNUNET_NO == state->receive_active) )
730   {
731     state->receive_active = GNUNET_YES;
732     GNUNET_CLIENT_receive (state->connection,
733                            &handle_client_message,
734                            mq,
735                            GNUNET_TIME_UNIT_FOREVER_REL);
736   }
737
738   msg_size = ntohs (msg->size);
739   GNUNET_assert (size >= msg_size);
740   memcpy (buf, msg, msg_size);
741   state->th = NULL;
742
743   GNUNET_MQ_impl_send_continue (mq);
744
745   return msg_size;
746 }
747
748
749 static void
750 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
751                                 void *impl_state)
752 {
753   struct ClientConnectionState *state = impl_state;
754
755   if (NULL != state->th)
756   {
757     GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
758     state->th = NULL;
759   }
760   GNUNET_CLIENT_disconnect (state->connection);
761   GNUNET_free (impl_state);
762 }
763
764
765 static void
766 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
767                              const struct GNUNET_MessageHeader *msg,
768                              void *impl_state)
769 {
770   struct ClientConnectionState *state = impl_state;
771
772   GNUNET_assert (NULL != state);
773   GNUNET_assert (NULL == state->th);
774   state->th =
775       GNUNET_CLIENT_notify_transmit_ready (state->connection,
776                                            ntohs (msg->size),
777                                            GNUNET_TIME_UNIT_FOREVER_REL,
778                                            GNUNET_NO,
779                                            &connection_client_transmit_queued,
780                                            mq);
781   GNUNET_assert (NULL != state->th);
782 }
783
784
785 static void
786 connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
787                                void *impl_state)
788 {
789   struct ClientConnectionState *state = impl_state;
790
791   GNUNET_assert (NULL != state->th);
792   GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
793   state->th = NULL;
794 }
795
796
797 struct GNUNET_MQ_Handle *
798 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
799                                        const struct GNUNET_MQ_MessageHandler *handlers,
800                                        GNUNET_MQ_ErrorHandler error_handler,
801                                        void *error_handler_cls)
802 {
803   struct GNUNET_MQ_Handle *mq;
804   struct ClientConnectionState *state;
805   unsigned int i;
806
807   mq = GNUNET_new (struct GNUNET_MQ_Handle);
808   if (NULL != handlers)
809   {
810     for (i=0;NULL != handlers[i].cb; i++) ;
811     mq->handlers = GNUNET_new_array (i,
812                                      struct GNUNET_MQ_MessageHandler);
813     memcpy (mq->handlers,
814             handlers,
815             i * sizeof (struct GNUNET_MQ_MessageHandler));
816   }
817   mq->error_handler = error_handler;
818   mq->error_handler_cls = error_handler_cls;
819   state = GNUNET_new (struct ClientConnectionState);
820   state->connection = connection;
821   mq->impl_state = state;
822   mq->send_impl = &connection_client_send_impl;
823   mq->destroy_impl = &connection_client_destroy_impl;
824   mq->cancel_impl = &connection_client_cancel_impl;
825   if (NULL != handlers)
826     state->receive_requested = GNUNET_YES;
827
828   return mq;
829 }
830
831
832 /**
833  * Associate the assoc_data in mq with a unique request id.
834  *
835  * @param mq message queue, id will be unique for the queue
836  * @param assoc_data to associate
837  */
838 uint32_t
839 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
840                      void *assoc_data)
841 {
842   uint32_t id;
843
844   if (NULL == mq->assoc_map)
845   {
846     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
847     mq->assoc_id = 1;
848   }
849   id = mq->assoc_id++;
850   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
851                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
852   return id;
853 }
854
855
856 void *
857 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
858                      uint32_t request_id)
859 {
860   if (NULL == mq->assoc_map)
861     return NULL;
862   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
863 }
864
865
866 void *
867 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
868                         uint32_t request_id)
869 {
870   void *val;
871
872   if (NULL == mq->assoc_map)
873     return NULL;
874   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
875                                              request_id);
876   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
877                                               request_id);
878   return val;
879 }
880
881
882 void
883 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
884                        GNUNET_MQ_NotifyCallback cb,
885                        void *cls)
886 {
887   mqm->sent_cb = cb;
888   mqm->sent_cls = cls;
889 }
890
891
892 void
893 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
894 {
895   if (NULL != mq->destroy_impl)
896   {
897     mq->destroy_impl (mq, mq->impl_state);
898   }
899   if (NULL != mq->continue_task)
900   {
901     GNUNET_SCHEDULER_cancel (mq->continue_task);
902     mq->continue_task = NULL;
903   }
904   while (NULL != mq->envelope_head)
905   {
906     struct GNUNET_MQ_Envelope *ev;
907
908     ev = mq->envelope_head;
909     ev->parent_queue = NULL;
910     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
911                                  mq->envelope_tail,
912                                  ev);
913     mq->queue_length--;
914     GNUNET_MQ_discard (ev);
915   }
916   GNUNET_assert (0 == mq->queue_length);
917   if (NULL != mq->current_envelope)
918   {
919     /* we can only discard envelopes that
920      * are not queued! */
921     mq->current_envelope->parent_queue = NULL;
922     GNUNET_MQ_discard (mq->current_envelope);
923     mq->current_envelope = NULL;
924   }
925   if (NULL != mq->assoc_map)
926   {
927     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
928     mq->assoc_map = NULL;
929   }
930   GNUNET_free_non_null (mq->handlers);
931   GNUNET_free (mq);
932 }
933
934
935 const struct GNUNET_MessageHeader *
936 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
937                               uint16_t base_size)
938 {
939   uint16_t whole_size;
940   uint16_t nested_size;
941   const struct GNUNET_MessageHeader *nested_msg;
942
943   whole_size = ntohs (mh->size);
944   GNUNET_assert (whole_size >= base_size);
945   nested_size = whole_size - base_size;
946   if (0 == nested_size)
947     return NULL;
948   if (nested_size < sizeof (struct GNUNET_MessageHeader))
949   {
950     GNUNET_break_op (0);
951     return NULL;
952   }
953   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
954   if (ntohs (nested_msg->size) != nested_size)
955   {
956     GNUNET_break_op (0);
957     return NULL;
958   }
959   return nested_msg;
960 }
961
962
963 /**
964  * Cancel sending the message. Message must have been sent with
965  * #GNUNET_MQ_send before.  May not be called after the notify sent
966  * callback has been called
967  *
968  * @param ev queued envelope to cancel
969  */
970 void
971 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
972 {
973   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
974
975   GNUNET_assert (NULL != mq);
976   GNUNET_assert (NULL != mq->cancel_impl);
977
978   if (mq->current_envelope == ev)
979   {
980     // complex case, we already started with transmitting
981     // the message
982     mq->cancel_impl (mq,
983                      mq->impl_state);
984     // continue sending the next message, if any
985     if (NULL == mq->envelope_head)
986     {
987       mq->current_envelope = NULL;
988     }
989     else
990     {
991       mq->current_envelope = mq->envelope_head;
992       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
993                                    mq->envelope_tail,
994                                    mq->current_envelope);
995       mq->queue_length--;
996       mq->send_impl (mq,
997                      mq->current_envelope->mh,
998                      mq->impl_state);
999     }
1000   }
1001   else
1002   {
1003     // simple case, message is still waiting in the queue
1004     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1005                                  mq->envelope_tail,
1006                                  ev);
1007     mq->queue_length--;
1008   }
1009
1010   ev->parent_queue = NULL;
1011   ev->mh = NULL;
1012   GNUNET_free (ev);
1013 }
1014
1015 /* end of mq.c */