-avoid calling memcpy() with NULL argument, even if len is 0
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header,
48    * usually points to the end of the containing GNUNET_MQ_Envelope
49    */
50   struct GNUNET_MessageHeader *mh;
51
52   /**
53    * Queue the message is queued in, NULL if message is not queued.
54    */
55   struct GNUNET_MQ_Handle *parent_queue;
56
57   /**
58    * Called after the message was sent irrevocably.
59    */
60   GNUNET_MQ_NotifyCallback sent_cb;
61
62   /**
63    * Closure for @e send_cb
64    */
65   void *sent_cls;
66 };
67
68
69 /**
70  * Handle to a message queue.
71  */
72 struct GNUNET_MQ_Handle
73 {
74   /**
75    * Handlers array, or NULL if the queue should not receive messages
76    */
77   struct GNUNET_MQ_MessageHandler *handlers;
78
79   /**
80    * Actual implementation of message sending,
81    * called when a message is added
82    */
83   GNUNET_MQ_SendImpl send_impl;
84
85   /**
86    * Implementation-dependent queue destruction function
87    */
88   GNUNET_MQ_DestroyImpl destroy_impl;
89
90   /**
91    * Implementation-dependent send cancel function
92    */
93   GNUNET_MQ_CancelImpl cancel_impl;
94
95   /**
96    * Implementation-specific state
97    */
98   void *impl_state;
99
100   /**
101    * Callback will be called when an error occurs.
102    */
103   GNUNET_MQ_ErrorHandler error_handler;
104
105   /**
106    * Closure for the error handler.
107    */
108   void *error_handler_cls;
109
110   /**
111    * Linked list of messages pending to be sent
112    */
113   struct GNUNET_MQ_Envelope *envelope_head;
114
115   /**
116    * Linked list of messages pending to be sent
117    */
118   struct GNUNET_MQ_Envelope *envelope_tail;
119
120   /**
121    * Message that is currently scheduled to be
122    * sent. Not the head of the message queue, as the implementation
123    * needs to know if sending has been already scheduled or not.
124    */
125   struct GNUNET_MQ_Envelope *current_envelope;
126
127   /**
128    * Map of associations, lazily allocated
129    */
130   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
131
132   /**
133    * Task scheduled during #GNUNET_MQ_impl_send_continue.
134    */
135   struct GNUNET_SCHEDULER_Task *continue_task;
136
137   /**
138    * Next id that should be used for the @e assoc_map,
139    * initialized lazily to a random value together with
140    * @e assoc_map
141    */
142   uint32_t assoc_id;
143
144   /**
145    * Number of entries we have in the envelope-DLL.
146    */
147   unsigned int queue_length;
148 };
149
150
151 /**
152  * Implementation-specific state for connection to
153  * client (MQ for server).
154  */
155 struct ServerClientSocketState
156 {
157   /**
158    * Handle of the client that connected to the server.
159    */
160   struct GNUNET_SERVER_Client *client;
161
162   /**
163    * Active transmission request to the client.
164    */
165   struct GNUNET_SERVER_TransmitHandle *th;
166 };
167
168
169 /**
170  * Implementation-specific state for connection to
171  * service (MQ for clients).
172  */
173 struct ClientConnectionState
174 {
175   /**
176    * Did we call receive alread alreadyy?
177    */
178   int receive_active;
179
180   /**
181    * Do we also want to receive?
182    */
183   int receive_requested;
184
185   /**
186    * Connection to the service.
187    */
188   struct GNUNET_CLIENT_Connection *connection;
189
190   /**
191    * Active transmission request (or NULL).
192    */
193   struct GNUNET_CLIENT_TransmitHandle *th;
194 };
195
196
197 /**
198  * Call the message message handler that was registered
199  * for the type of the given message in the given message queue.
200  *
201  * This function is indended to be used for the implementation
202  * of message queues.
203  *
204  * @param mq message queue with the handlers
205  * @param mh message to dispatch
206  */
207 void
208 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
209                           const struct GNUNET_MessageHeader *mh)
210 {
211   const struct GNUNET_MQ_MessageHandler *handler;
212   int handled = GNUNET_NO;
213   uint16_t ms = ntohs (mh->size);
214
215   if (NULL == mq->handlers)
216     goto done;
217   for (handler = mq->handlers; NULL != handler->cb; handler++)
218   {
219     if (handler->type == ntohs (mh->type))
220     {
221       handled = GNUNET_YES;
222       if ( (handler->expected_size > ms) ||
223            ( (handler->expected_size != ms) &&
224              (NULL == handler->mv) ) )
225       {
226         /* Too small, or not an exact size and
227            no 'mv' handler to check rest */
228         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
229                     "Received malformed message of type %u\n",
230                     (unsigned int) handler->type);
231         GNUNET_MQ_inject_error (mq,
232                                 GNUNET_MQ_ERROR_MALFORMED);
233         break;
234       }
235       if ( (NULL == handler->mv) ||
236            (GNUNET_OK ==
237             handler->mv (handler->cls, mh)) )
238       {
239         /* message well-formed, pass to handler */
240         handler->cb (handler->cls, mh);
241       }
242       else
243       {
244         /* Message rejected by check routine */
245         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
246                     "Received malformed message of type %u\n",
247                     (unsigned int) handler->type);
248         GNUNET_MQ_inject_error (mq,
249                                 GNUNET_MQ_ERROR_MALFORMED);
250       }
251       break;
252     }
253   }
254  done:
255   if (GNUNET_NO == handled)
256     LOG (GNUNET_ERROR_TYPE_DEBUG,
257          "No handler for message of type %d\n",
258          ntohs (mh->type));
259 }
260
261
262 /**
263  * Call the error handler of a message queue with the given
264  * error code.  If there is no error handler, log a warning.
265  *
266  * This function is intended to be used by the implementation
267  * of message queues.
268  *
269  * @param mq message queue
270  * @param error the error type
271  */
272 void
273 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
274                         enum GNUNET_MQ_Error error)
275 {
276   if (NULL == mq->error_handler)
277   {
278     LOG (GNUNET_ERROR_TYPE_WARNING,
279          "Got error %d, but no handler installed\n",
280          (int) error);
281     return;
282   }
283   mq->error_handler (mq->error_handler_cls,
284                      error);
285 }
286
287
288 /**
289  * Discard the message queue message, free all
290  * allocated resources. Must be called in the event
291  * that a message is created but should not actually be sent.
292  *
293  * @param mqm the message to discard
294  */
295 void
296 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
297 {
298   GNUNET_assert (NULL == mqm->parent_queue);
299   GNUNET_free (mqm);
300 }
301
302
303 /**
304  * Obtain the current length of the message queue.
305  *
306  * @param mq queue to inspect
307  * @return number of queued, non-transmitted messages
308  */
309 unsigned int
310 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
311 {
312   return mq->queue_length;
313 }
314
315
316 /**
317  * Send a message with the given message queue.
318  * May only be called once per message.
319  *
320  * @param mq message queue
321  * @param ev the envelope with the message to send.
322  */
323 void
324 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
325                 struct GNUNET_MQ_Envelope *ev)
326 {
327   GNUNET_assert (NULL != mq);
328   GNUNET_assert (NULL == ev->parent_queue);
329
330   ev->parent_queue = mq;
331   /* is the implementation busy? queue it! */
332   if (NULL != mq->current_envelope)
333   {
334     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
335                                       mq->envelope_tail,
336                                       ev);
337     mq->queue_length++;
338     return;
339   }
340   mq->current_envelope = ev;
341   mq->send_impl (mq, ev->mh, mq->impl_state);
342 }
343
344
345 /**
346  * Send a copy of a message with the given message queue.
347  * Can be called repeatedly on the same envelope.
348  *
349  * @param mq message queue
350  * @param ev the envelope with the message to send.
351  */
352 void
353 GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
354                      const struct GNUNET_MQ_Envelope *ev)
355 {
356   struct GNUNET_MQ_Envelope *env;
357   uint16_t msize;
358
359   msize = ntohs (ev->mh->size);
360   env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) +
361                        msize);
362   env->mh = (struct GNUNET_MessageHeader *) &env[1];
363   env->sent_cb = ev->sent_cb;
364   env->sent_cls = ev->sent_cls;
365   GNUNET_memcpy (&env[1],
366           ev->mh,
367           msize);
368   GNUNET_MQ_send (mq,
369                   env);
370 }
371
372
373
374 /**
375  * Task run to call the send implementation for the next queued
376  * message, if any.  Only useful for implementing message queues,
377  * results in undefined behavior if not used carefully.
378  *
379  * @param cls message queue to send the next message with
380  */
381 static void
382 impl_send_continue (void *cls)
383 {
384   struct GNUNET_MQ_Handle *mq = cls;
385   struct GNUNET_MQ_Envelope *current_envelope;
386
387   mq->continue_task = NULL;
388   /* call is only valid if we're actually currently sending
389    * a message */
390   current_envelope = mq->current_envelope;
391   GNUNET_assert (NULL != current_envelope);
392   current_envelope->parent_queue = NULL;
393   if (NULL == mq->envelope_head)
394   {
395     mq->current_envelope = NULL;
396   }
397   else
398   {
399     mq->current_envelope = mq->envelope_head;
400     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
401                                  mq->envelope_tail,
402                                  mq->current_envelope);
403     mq->queue_length--;
404     mq->send_impl (mq,
405                    mq->current_envelope->mh,
406                    mq->impl_state);
407   }
408   if (NULL != current_envelope->sent_cb)
409     current_envelope->sent_cb (current_envelope->sent_cls);
410   GNUNET_free (current_envelope);
411 }
412
413
414 /**
415  * Call the send implementation for the next queued message, if any.
416  * Only useful for implementing message queues, results in undefined
417  * behavior if not used carefully.
418  *
419  * @param mq message queue to send the next message with
420  */
421 void
422 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
423 {
424   GNUNET_assert (NULL == mq->continue_task);
425   mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
426                                                 mq);
427 }
428
429
430 /**
431  * Create a message queue for the specified handlers.
432  *
433  * @param send function the implements sending messages
434  * @param destroy function that implements destroying the queue
435  * @param cancel function that implements canceling a message
436  * @param impl_state for the queue, passed to 'send' and 'destroy'
437  * @param handlers array of message handlers
438  * @param error_handler handler for read and write errors
439  * @param error_handler_cls closure for @a error_handler
440  * @return a new message queue
441  */
442 struct GNUNET_MQ_Handle *
443 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
444                                GNUNET_MQ_DestroyImpl destroy,
445                                GNUNET_MQ_CancelImpl cancel,
446                                void *impl_state,
447                                const struct GNUNET_MQ_MessageHandler *handlers,
448                                GNUNET_MQ_ErrorHandler error_handler,
449                                void *error_handler_cls)
450 {
451   struct GNUNET_MQ_Handle *mq;
452   unsigned int i;
453
454   mq = GNUNET_new (struct GNUNET_MQ_Handle);
455   mq->send_impl = send;
456   mq->destroy_impl = destroy;
457   mq->cancel_impl = cancel;
458   if (NULL != handlers)
459   {
460     for (i=0;NULL != handlers[i].cb; i++) ;
461     mq->handlers = GNUNET_new_array (i + 1,
462                                      struct GNUNET_MQ_MessageHandler);
463     GNUNET_memcpy (mq->handlers,
464             handlers,
465             i * sizeof (struct GNUNET_MQ_MessageHandler));
466   }
467   mq->error_handler = error_handler;
468   mq->error_handler_cls = error_handler_cls;
469   mq->impl_state = impl_state;
470
471   return mq;
472 }
473
474
475 /**
476  * Change the closure argument in all of the `handlers` of the
477  * @a mq.
478  *
479  * @param mq to modify
480  * @param handlers_cls new closure to use
481  */
482 void
483 GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
484                                 void *handlers_cls)
485 {
486   unsigned int i;
487
488   if (NULL == mq->handlers)
489     return;
490   for (i=0;NULL != mq->handlers[i].cb; i++)
491     mq->handlers[i].cls = handlers_cls;
492 }
493
494
495 /**
496  * Get the message that should currently be sent.
497  * Fails if there is no current message.
498  * Only useful for implementing message queues,
499  * results in undefined behavior if not used carefully.
500  *
501  * @param mq message queue with the current message
502  * @return message to send, never NULL
503  */
504 const struct GNUNET_MessageHeader *
505 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
506 {
507   if (NULL == mq->current_envelope)
508     GNUNET_assert (0);
509   if (NULL == mq->current_envelope->mh)
510     GNUNET_assert (0);
511   return mq->current_envelope->mh;
512 }
513
514
515 /**
516  * Get the implementation state associated with the
517  * message queue.
518  *
519  * While the GNUNET_MQ_Impl* callbacks receive the
520  * implementation state, continuations that are scheduled
521  * by the implementation function often only have one closure
522  * argument, with this function it is possible to get at the
523  * implementation state when only passing the GNUNET_MQ_Handle
524  * as closure.
525  *
526  * @param mq message queue with the current message
527  * @return message to send, never NULL
528  */
529 void *
530 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
531 {
532   return mq->impl_state;
533 }
534
535
536 struct GNUNET_MQ_Envelope *
537 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
538                 uint16_t size,
539                 uint16_t type)
540 {
541   struct GNUNET_MQ_Envelope *mqm;
542
543   mqm = GNUNET_malloc (sizeof *mqm + size);
544   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
545   mqm->mh->size = htons (size);
546   mqm->mh->type = htons (type);
547   if (NULL != mhp)
548     *mhp = mqm->mh;
549   return mqm;
550 }
551
552
553 /**
554  * Create a new envelope by copying an existing message.
555  *
556  * @param hdr header of the message to copy
557  * @return envelope containing @a hdr
558  */
559 struct GNUNET_MQ_Envelope *
560 GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
561 {
562   struct GNUNET_MQ_Envelope *mqm;
563   uint16_t size = ntohs (hdr->size);
564
565   mqm = GNUNET_malloc (sizeof (*mqm) + size);
566   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
567   GNUNET_memcpy (mqm->mh,
568           hdr,
569           size);
570   return mqm;
571 }
572
573
574 /**
575  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
576  *
577  * @param mhp pointer to the message header pointer that will be changed to allocate at
578  *        the newly allocated space for the message.
579  * @param base_size size of the data before the nested message
580  * @param type type of the message in the envelope
581  * @param nested_mh the message to append to the message after base_size
582  */
583 struct GNUNET_MQ_Envelope *
584 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
585                           uint16_t base_size,
586                           uint16_t type,
587                           const struct GNUNET_MessageHeader *nested_mh)
588 {
589   struct GNUNET_MQ_Envelope *mqm;
590   uint16_t size;
591
592   if (NULL == nested_mh)
593     return GNUNET_MQ_msg_ (mhp, base_size, type);
594
595   size = base_size + ntohs (nested_mh->size);
596
597   /* check for uint16_t overflow */
598   if (size < base_size)
599     return NULL;
600
601   mqm = GNUNET_MQ_msg_ (mhp, size, type);
602   GNUNET_memcpy ((char *) mqm->mh + base_size,
603           nested_mh,
604           ntohs (nested_mh->size));
605
606   return mqm;
607 }
608
609
610 /**
611  * Transmit a queued message to the session's client.
612  *
613  * @param cls consensus session
614  * @param size number of bytes available in @a buf
615  * @param buf where the callee should write the message
616  * @return number of bytes written to @a buf
617  */
618 static size_t
619 transmit_queued (void *cls,
620                  size_t size,
621                  void *buf)
622 {
623   struct GNUNET_MQ_Handle *mq = cls;
624   struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
625   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
626   size_t msg_size;
627
628   GNUNET_assert (NULL != buf);
629   msg_size = ntohs (msg->size);
630   GNUNET_assert (size >= msg_size);
631   GNUNET_memcpy (buf, msg, msg_size);
632   state->th = NULL;
633
634   GNUNET_MQ_impl_send_continue (mq);
635
636   return msg_size;
637 }
638
639
640 static void
641 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
642                             void *impl_state)
643 {
644   struct ServerClientSocketState *state = impl_state;
645
646   if (NULL != state->th)
647   {
648     GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
649     state->th = NULL;
650   }
651
652   GNUNET_assert (NULL != mq);
653   GNUNET_assert (NULL != state);
654   GNUNET_SERVER_client_drop (state->client);
655   GNUNET_free (state);
656 }
657
658
659 static void
660 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
661                          const struct GNUNET_MessageHeader *msg,
662                          void *impl_state)
663 {
664   struct ServerClientSocketState *state = impl_state;
665
666   GNUNET_assert (NULL != mq);
667   state->th = GNUNET_SERVER_notify_transmit_ready (state->client,
668                                                    ntohs (msg->size),
669                                                    GNUNET_TIME_UNIT_FOREVER_REL,
670                                                    &transmit_queued, mq);
671 }
672
673
674 struct GNUNET_MQ_Handle *
675 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
676 {
677   struct GNUNET_MQ_Handle *mq;
678   struct ServerClientSocketState *scss;
679
680   mq = GNUNET_new (struct GNUNET_MQ_Handle);
681   scss = GNUNET_new (struct ServerClientSocketState);
682   mq->impl_state = scss;
683   scss->client = client;
684   GNUNET_SERVER_client_keep (client);
685   mq->send_impl = &server_client_send_impl;
686   mq->destroy_impl = &server_client_destroy_impl;
687   return mq;
688 }
689
690
691 /**
692  * Type of a function to call when we receive a message
693  * from the service.
694  *
695  * @param cls closure
696  * @param msg message received, NULL on timeout or fatal error
697  */
698 static void
699 handle_client_message (void *cls,
700                        const struct GNUNET_MessageHeader *msg)
701 {
702   struct GNUNET_MQ_Handle *mq = cls;
703   struct ClientConnectionState *state;
704
705   state = mq->impl_state;
706   if (NULL == msg)
707   {
708     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
709     return;
710   }
711   GNUNET_CLIENT_receive (state->connection,
712                          &handle_client_message,
713                          mq,
714                          GNUNET_TIME_UNIT_FOREVER_REL);
715   GNUNET_MQ_inject_message (mq, msg);
716 }
717
718
719 /**
720  * Transmit a queued message to the session's client.
721  *
722  * @param cls consensus session
723  * @param size number of bytes available in @a buf
724  * @param buf where the callee should write the message
725  * @return number of bytes written to buf
726  */
727 static size_t
728 connection_client_transmit_queued (void *cls,
729                                    size_t size,
730                                    void *buf)
731 {
732   struct GNUNET_MQ_Handle *mq = cls;
733   const struct GNUNET_MessageHeader *msg;
734   struct ClientConnectionState *state = mq->impl_state;
735   size_t msg_size;
736
737   GNUNET_assert (NULL != mq);
738   state->th = NULL;
739   msg = GNUNET_MQ_impl_current (mq);
740
741   if (NULL == buf)
742   {
743     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
744     return 0;
745   }
746
747   if ( (GNUNET_YES == state->receive_requested) &&
748        (GNUNET_NO == state->receive_active) )
749   {
750     state->receive_active = GNUNET_YES;
751     GNUNET_CLIENT_receive (state->connection,
752                            &handle_client_message,
753                            mq,
754                            GNUNET_TIME_UNIT_FOREVER_REL);
755   }
756
757   msg_size = ntohs (msg->size);
758   GNUNET_assert (size >= msg_size);
759   GNUNET_memcpy (buf, msg, msg_size);
760   state->th = NULL;
761
762   GNUNET_MQ_impl_send_continue (mq);
763
764   return msg_size;
765 }
766
767
768 static void
769 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
770                                 void *impl_state)
771 {
772   struct ClientConnectionState *state = impl_state;
773
774   if (NULL != state->th)
775   {
776     GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
777     state->th = NULL;
778   }
779   GNUNET_CLIENT_disconnect (state->connection);
780   GNUNET_free (impl_state);
781 }
782
783
784 static void
785 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
786                              const struct GNUNET_MessageHeader *msg,
787                              void *impl_state)
788 {
789   struct ClientConnectionState *state = impl_state;
790
791   GNUNET_assert (NULL != state);
792   GNUNET_assert (NULL == state->th);
793   state->th =
794       GNUNET_CLIENT_notify_transmit_ready (state->connection,
795                                            ntohs (msg->size),
796                                            GNUNET_TIME_UNIT_FOREVER_REL,
797                                            GNUNET_NO,
798                                            &connection_client_transmit_queued,
799                                            mq);
800   GNUNET_assert (NULL != state->th);
801 }
802
803
804 static void
805 connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
806                                void *impl_state)
807 {
808   struct ClientConnectionState *state = impl_state;
809
810   GNUNET_assert (NULL != state->th);
811   GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
812   state->th = NULL;
813 }
814
815
816 struct GNUNET_MQ_Handle *
817 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
818                                        const struct GNUNET_MQ_MessageHandler *handlers,
819                                        GNUNET_MQ_ErrorHandler error_handler,
820                                        void *error_handler_cls)
821 {
822   struct GNUNET_MQ_Handle *mq;
823   struct ClientConnectionState *state;
824   unsigned int i;
825
826   mq = GNUNET_new (struct GNUNET_MQ_Handle);
827   if (NULL != handlers)
828   {
829     for (i=0;NULL != handlers[i].cb; i++) ;
830     mq->handlers = GNUNET_new_array (i,
831                                      struct GNUNET_MQ_MessageHandler);
832     GNUNET_memcpy (mq->handlers,
833             handlers,
834             i * sizeof (struct GNUNET_MQ_MessageHandler));
835   }
836   mq->error_handler = error_handler;
837   mq->error_handler_cls = error_handler_cls;
838   state = GNUNET_new (struct ClientConnectionState);
839   state->connection = connection;
840   mq->impl_state = state;
841   mq->send_impl = &connection_client_send_impl;
842   mq->destroy_impl = &connection_client_destroy_impl;
843   mq->cancel_impl = &connection_client_cancel_impl;
844   if (NULL != handlers)
845     state->receive_requested = GNUNET_YES;
846
847   return mq;
848 }
849
850
851 /**
852  * Associate the assoc_data in mq with a unique request id.
853  *
854  * @param mq message queue, id will be unique for the queue
855  * @param assoc_data to associate
856  */
857 uint32_t
858 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
859                      void *assoc_data)
860 {
861   uint32_t id;
862
863   if (NULL == mq->assoc_map)
864   {
865     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
866     mq->assoc_id = 1;
867   }
868   id = mq->assoc_id++;
869   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
870                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
871   return id;
872 }
873
874
875 void *
876 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
877                      uint32_t request_id)
878 {
879   if (NULL == mq->assoc_map)
880     return NULL;
881   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
882 }
883
884
885 void *
886 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
887                         uint32_t request_id)
888 {
889   void *val;
890
891   if (NULL == mq->assoc_map)
892     return NULL;
893   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
894                                              request_id);
895   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
896                                               request_id);
897   return val;
898 }
899
900
901 void
902 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
903                        GNUNET_MQ_NotifyCallback cb,
904                        void *cls)
905 {
906   mqm->sent_cb = cb;
907   mqm->sent_cls = cls;
908 }
909
910
911 void
912 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
913 {
914   if (NULL != mq->destroy_impl)
915   {
916     mq->destroy_impl (mq, mq->impl_state);
917   }
918   if (NULL != mq->continue_task)
919   {
920     GNUNET_SCHEDULER_cancel (mq->continue_task);
921     mq->continue_task = NULL;
922   }
923   while (NULL != mq->envelope_head)
924   {
925     struct GNUNET_MQ_Envelope *ev;
926
927     ev = mq->envelope_head;
928     ev->parent_queue = NULL;
929     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
930                                  mq->envelope_tail,
931                                  ev);
932     mq->queue_length--;
933     GNUNET_MQ_discard (ev);
934   }
935   GNUNET_assert (0 == mq->queue_length);
936   if (NULL != mq->current_envelope)
937   {
938     /* we can only discard envelopes that
939      * are not queued! */
940     mq->current_envelope->parent_queue = NULL;
941     GNUNET_MQ_discard (mq->current_envelope);
942     mq->current_envelope = NULL;
943   }
944   if (NULL != mq->assoc_map)
945   {
946     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
947     mq->assoc_map = NULL;
948   }
949   GNUNET_free_non_null (mq->handlers);
950   GNUNET_free (mq);
951 }
952
953
954 const struct GNUNET_MessageHeader *
955 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
956                               uint16_t base_size)
957 {
958   uint16_t whole_size;
959   uint16_t nested_size;
960   const struct GNUNET_MessageHeader *nested_msg;
961
962   whole_size = ntohs (mh->size);
963   GNUNET_assert (whole_size >= base_size);
964   nested_size = whole_size - base_size;
965   if (0 == nested_size)
966     return NULL;
967   if (nested_size < sizeof (struct GNUNET_MessageHeader))
968   {
969     GNUNET_break_op (0);
970     return NULL;
971   }
972   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
973   if (ntohs (nested_msg->size) != nested_size)
974   {
975     GNUNET_break_op (0);
976     return NULL;
977   }
978   return nested_msg;
979 }
980
981
982 /**
983  * Cancel sending the message. Message must have been sent with
984  * #GNUNET_MQ_send before.  May not be called after the notify sent
985  * callback has been called
986  *
987  * @param ev queued envelope to cancel
988  */
989 void
990 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
991 {
992   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
993
994   GNUNET_assert (NULL != mq);
995   GNUNET_assert (NULL != mq->cancel_impl);
996
997   if (mq->current_envelope == ev)
998   {
999     // complex case, we already started with transmitting
1000     // the message
1001     mq->cancel_impl (mq,
1002                      mq->impl_state);
1003     // continue sending the next message, if any
1004     if (NULL == mq->envelope_head)
1005     {
1006       mq->current_envelope = NULL;
1007     }
1008     else
1009     {
1010       mq->current_envelope = mq->envelope_head;
1011       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1012                                    mq->envelope_tail,
1013                                    mq->current_envelope);
1014       mq->queue_length--;
1015       mq->send_impl (mq,
1016                      mq->current_envelope->mh,
1017                      mq->impl_state);
1018     }
1019   }
1020   else
1021   {
1022     // simple case, message is still waiting in the queue
1023     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1024                                  mq->envelope_tail,
1025                                  ev);
1026     mq->queue_length--;
1027   }
1028
1029   ev->parent_queue = NULL;
1030   ev->mh = NULL;
1031   GNUNET_free (ev);
1032 }
1033
1034 /* end of mq.c */