towards adding mq destruction notification
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header,
48    * usually points to the end of the containing GNUNET_MQ_Envelope
49    */
50   struct GNUNET_MessageHeader *mh;
51
52   /**
53    * Queue the message is queued in, NULL if message is not queued.
54    */
55   struct GNUNET_MQ_Handle *parent_queue;
56
57   /**
58    * Called after the message was sent irrevocably.
59    */
60   GNUNET_MQ_NotifyCallback sent_cb;
61
62   /**
63    * Closure for @e send_cb
64    */
65   void *sent_cls;
66
67   /**
68    * Flags that were set for this envelope by
69    * #GNUNET_MQ_env_set_options().   Only valid if
70    * @e have_custom_options is set.
71    */
72   uint64_t flags;
73
74   /**
75    * Additional options buffer set for this envelope by
76    * #GNUNET_MQ_env_set_options().  Only valid if
77    * @e have_custom_options is set.
78    */
79   const void *extra;
80
81   /**
82    * Did the application call #GNUNET_MQ_env_set_options()?
83    */
84   int have_custom_options;
85
86 };
87
88
89 /**
90  * Handle to a message queue.
91  */
92 struct GNUNET_MQ_Handle
93 {
94   /**
95    * Handlers array, or NULL if the queue should not receive messages
96    */
97   struct GNUNET_MQ_MessageHandler *handlers;
98
99   /**
100    * Actual implementation of message sending,
101    * called when a message is added
102    */
103   GNUNET_MQ_SendImpl send_impl;
104
105   /**
106    * Implementation-dependent queue destruction function
107    */
108   GNUNET_MQ_DestroyImpl destroy_impl;
109
110   /**
111    * Implementation-dependent send cancel function
112    */
113   GNUNET_MQ_CancelImpl cancel_impl;
114
115   /**
116    * Implementation-specific state
117    */
118   void *impl_state;
119
120   /**
121    * Callback will be called when an error occurs.
122    */
123   GNUNET_MQ_ErrorHandler error_handler;
124
125   /**
126    * Closure for the error handler.
127    */
128   void *error_handler_cls;
129
130   /**
131    * Linked list of messages pending to be sent
132    */
133   struct GNUNET_MQ_Envelope *envelope_head;
134
135   /**
136    * Linked list of messages pending to be sent
137    */
138   struct GNUNET_MQ_Envelope *envelope_tail;
139
140   /**
141    * Message that is currently scheduled to be
142    * sent. Not the head of the message queue, as the implementation
143    * needs to know if sending has been already scheduled or not.
144    */
145   struct GNUNET_MQ_Envelope *current_envelope;
146
147   /**
148    * Map of associations, lazily allocated
149    */
150   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
151
152   /**
153    * Task scheduled during #GNUNET_MQ_impl_send_continue.
154    */
155   struct GNUNET_SCHEDULER_Task *continue_task;
156
157   /**
158    * Functions to call on queue destruction; kept in a DLL.
159    */
160   struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
161
162   /**
163    * Functions to call on queue destruction; kept in a DLL.
164    */
165   struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
166
167   /**
168    * Additional options buffer set for this queue by
169    * #GNUNET_MQ_set_options().  Default is 0.
170    */
171   const void *default_extra;
172
173   /**
174    * Flags that were set for this queue by
175    * #GNUNET_MQ_set_options().   Default is 0.
176    */
177   uint64_t default_flags;
178
179   /**
180    * Next id that should be used for the @e assoc_map,
181    * initialized lazily to a random value together with
182    * @e assoc_map
183    */
184   uint32_t assoc_id;
185
186   /**
187    * Number of entries we have in the envelope-DLL.
188    */
189   unsigned int queue_length;
190 };
191
192
193 /**
194  * Implementation-specific state for connection to
195  * client (MQ for server).
196  */
197 struct ServerClientSocketState
198 {
199   /**
200    * Handle of the client that connected to the server.
201    */
202   struct GNUNET_SERVER_Client *client;
203
204   /**
205    * Active transmission request to the client.
206    */
207   struct GNUNET_SERVER_TransmitHandle *th;
208 };
209
210
211 /**
212  * Implementation-specific state for connection to
213  * service (MQ for clients).
214  */
215 struct ClientConnectionState
216 {
217   /**
218    * Did we call receive alread alreadyy?
219    */
220   int receive_active;
221
222   /**
223    * Do we also want to receive?
224    */
225   int receive_requested;
226
227   /**
228    * Connection to the service.
229    */
230   struct GNUNET_CLIENT_Connection *connection;
231
232   /**
233    * Active transmission request (or NULL).
234    */
235   struct GNUNET_CLIENT_TransmitHandle *th;
236 };
237
238
239 /**
240  * Call the message message handler that was registered
241  * for the type of the given message in the given message queue.
242  *
243  * This function is indended to be used for the implementation
244  * of message queues.
245  *
246  * @param mq message queue with the handlers
247  * @param mh message to dispatch
248  */
249 void
250 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
251                           const struct GNUNET_MessageHeader *mh)
252 {
253   const struct GNUNET_MQ_MessageHandler *handler;
254   int handled = GNUNET_NO;
255   uint16_t ms = ntohs (mh->size);
256
257   if (NULL == mq->handlers)
258     goto done;
259   for (handler = mq->handlers; NULL != handler->cb; handler++)
260   {
261     if (handler->type == ntohs (mh->type))
262     {
263       handled = GNUNET_YES;
264       if ( (handler->expected_size > ms) ||
265            ( (handler->expected_size != ms) &&
266              (NULL == handler->mv) ) )
267       {
268         /* Too small, or not an exact size and
269            no 'mv' handler to check rest */
270         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
271                     "Received malformed message of type %u\n",
272                     (unsigned int) handler->type);
273         GNUNET_MQ_inject_error (mq,
274                                 GNUNET_MQ_ERROR_MALFORMED);
275         break;
276       }
277       if ( (NULL == handler->mv) ||
278            (GNUNET_OK ==
279             handler->mv (handler->cls, mh)) )
280       {
281         /* message well-formed, pass to handler */
282         handler->cb (handler->cls, mh);
283       }
284       else
285       {
286         /* Message rejected by check routine */
287         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
288                     "Received malformed message of type %u\n",
289                     (unsigned int) handler->type);
290         GNUNET_MQ_inject_error (mq,
291                                 GNUNET_MQ_ERROR_MALFORMED);
292       }
293       break;
294     }
295   }
296  done:
297   if (GNUNET_NO == handled)
298     LOG (GNUNET_ERROR_TYPE_DEBUG,
299          "No handler for message of type %d\n",
300          ntohs (mh->type));
301 }
302
303
304 /**
305  * Call the error handler of a message queue with the given
306  * error code.  If there is no error handler, log a warning.
307  *
308  * This function is intended to be used by the implementation
309  * of message queues.
310  *
311  * @param mq message queue
312  * @param error the error type
313  */
314 void
315 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
316                         enum GNUNET_MQ_Error error)
317 {
318   if (NULL == mq->error_handler)
319   {
320     LOG (GNUNET_ERROR_TYPE_WARNING,
321          "Got error %d, but no handler installed\n",
322          (int) error);
323     return;
324   }
325   mq->error_handler (mq->error_handler_cls,
326                      error);
327 }
328
329
330 /**
331  * Discard the message queue message, free all
332  * allocated resources. Must be called in the event
333  * that a message is created but should not actually be sent.
334  *
335  * @param mqm the message to discard
336  */
337 void
338 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
339 {
340   GNUNET_assert (NULL == mqm->parent_queue);
341   GNUNET_free (mqm);
342 }
343
344
345 /**
346  * Obtain the current length of the message queue.
347  *
348  * @param mq queue to inspect
349  * @return number of queued, non-transmitted messages
350  */
351 unsigned int
352 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
353 {
354   return mq->queue_length;
355 }
356
357
358 /**
359  * Send a message with the given message queue.
360  * May only be called once per message.
361  *
362  * @param mq message queue
363  * @param ev the envelope with the message to send.
364  */
365 void
366 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
367                 struct GNUNET_MQ_Envelope *ev)
368 {
369   GNUNET_assert (NULL != mq);
370   GNUNET_assert (NULL == ev->parent_queue);
371
372   mq->queue_length++;
373   ev->parent_queue = mq;
374   /* is the implementation busy? queue it! */
375   if (NULL != mq->current_envelope)
376   {
377     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
378                                       mq->envelope_tail,
379                                       ev);
380     return;
381   }
382   mq->current_envelope = ev;
383   mq->send_impl (mq,
384                  ev->mh,
385                  mq->impl_state);
386 }
387
388
389 /**
390  * Send a copy of a message with the given message queue.
391  * Can be called repeatedly on the same envelope.
392  *
393  * @param mq message queue
394  * @param ev the envelope with the message to send.
395  */
396 void
397 GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
398                      const struct GNUNET_MQ_Envelope *ev)
399 {
400   struct GNUNET_MQ_Envelope *env;
401   uint16_t msize;
402
403   msize = ntohs (ev->mh->size);
404   env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) +
405                        msize);
406   env->mh = (struct GNUNET_MessageHeader *) &env[1];
407   env->sent_cb = ev->sent_cb;
408   env->sent_cls = ev->sent_cls;
409   GNUNET_memcpy (&env[1],
410           ev->mh,
411           msize);
412   GNUNET_MQ_send (mq,
413                   env);
414 }
415
416
417
418 /**
419  * Task run to call the send implementation for the next queued
420  * message, if any.  Only useful for implementing message queues,
421  * results in undefined behavior if not used carefully.
422  *
423  * @param cls message queue to send the next message with
424  */
425 static void
426 impl_send_continue (void *cls)
427 {
428   struct GNUNET_MQ_Handle *mq = cls;
429   struct GNUNET_MQ_Envelope *current_envelope;
430
431   mq->continue_task = NULL;
432   /* call is only valid if we're actually currently sending
433    * a message */
434   current_envelope = mq->current_envelope;
435   GNUNET_assert (NULL != current_envelope);
436   current_envelope->parent_queue = NULL;
437   GNUNET_assert (0 < mq->queue_length);
438   mq->queue_length--;
439   if (NULL == mq->envelope_head)
440   {
441     mq->current_envelope = NULL;
442   }
443   else
444   {
445     mq->current_envelope = mq->envelope_head;
446     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
447                                  mq->envelope_tail,
448                                  mq->current_envelope);
449     mq->send_impl (mq,
450                    mq->current_envelope->mh,
451                    mq->impl_state);
452   }
453   if (NULL != current_envelope->sent_cb)
454     current_envelope->sent_cb (current_envelope->sent_cls);
455   GNUNET_free (current_envelope);
456 }
457
458
459 /**
460  * Call the send implementation for the next queued message, if any.
461  * Only useful for implementing message queues, results in undefined
462  * behavior if not used carefully.
463  *
464  * @param mq message queue to send the next message with
465  */
466 void
467 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
468 {
469   GNUNET_assert (NULL == mq->continue_task);
470   mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
471                                                 mq);
472 }
473
474
475 /**
476  * Create a message queue for the specified handlers.
477  *
478  * @param send function the implements sending messages
479  * @param destroy function that implements destroying the queue
480  * @param cancel function that implements canceling a message
481  * @param impl_state for the queue, passed to 'send' and 'destroy'
482  * @param handlers array of message handlers
483  * @param error_handler handler for read and write errors
484  * @param error_handler_cls closure for @a error_handler
485  * @return a new message queue
486  */
487 struct GNUNET_MQ_Handle *
488 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
489                                GNUNET_MQ_DestroyImpl destroy,
490                                GNUNET_MQ_CancelImpl cancel,
491                                void *impl_state,
492                                const struct GNUNET_MQ_MessageHandler *handlers,
493                                GNUNET_MQ_ErrorHandler error_handler,
494                                void *error_handler_cls)
495 {
496   struct GNUNET_MQ_Handle *mq;
497   unsigned int i;
498
499   mq = GNUNET_new (struct GNUNET_MQ_Handle);
500   mq->send_impl = send;
501   mq->destroy_impl = destroy;
502   mq->cancel_impl = cancel;
503   if (NULL != handlers)
504   {
505     for (i=0;NULL != handlers[i].cb; i++) ;
506     mq->handlers = GNUNET_new_array (i + 1,
507                                      struct GNUNET_MQ_MessageHandler);
508     GNUNET_memcpy (mq->handlers,
509             handlers,
510             i * sizeof (struct GNUNET_MQ_MessageHandler));
511   }
512   mq->error_handler = error_handler;
513   mq->error_handler_cls = error_handler_cls;
514   mq->impl_state = impl_state;
515
516   return mq;
517 }
518
519
520 /**
521  * Change the closure argument in all of the `handlers` of the
522  * @a mq.
523  *
524  * @param mq to modify
525  * @param handlers_cls new closure to use
526  */
527 void
528 GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
529                                 void *handlers_cls)
530 {
531   unsigned int i;
532
533   if (NULL == mq->handlers)
534     return;
535   for (i=0;NULL != mq->handlers[i].cb; i++)
536     mq->handlers[i].cls = handlers_cls;
537 }
538
539
540 /**
541  * Get the message that should currently be sent.
542  * Fails if there is no current message.
543  * Only useful for implementing message queues,
544  * results in undefined behavior if not used carefully.
545  *
546  * @param mq message queue with the current message
547  * @return message to send, never NULL
548  */
549 const struct GNUNET_MessageHeader *
550 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
551 {
552   if (NULL == mq->current_envelope)
553     GNUNET_assert (0);
554   if (NULL == mq->current_envelope->mh)
555     GNUNET_assert (0);
556   return mq->current_envelope->mh;
557 }
558
559
560 /**
561  * Get the implementation state associated with the
562  * message queue.
563  *
564  * While the GNUNET_MQ_Impl* callbacks receive the
565  * implementation state, continuations that are scheduled
566  * by the implementation function often only have one closure
567  * argument, with this function it is possible to get at the
568  * implementation state when only passing the GNUNET_MQ_Handle
569  * as closure.
570  *
571  * @param mq message queue with the current message
572  * @return message to send, never NULL
573  */
574 void *
575 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
576 {
577   return mq->impl_state;
578 }
579
580
581 struct GNUNET_MQ_Envelope *
582 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
583                 uint16_t size,
584                 uint16_t type)
585 {
586   struct GNUNET_MQ_Envelope *mqm;
587
588   mqm = GNUNET_malloc (sizeof *mqm + size);
589   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
590   mqm->mh->size = htons (size);
591   mqm->mh->type = htons (type);
592   if (NULL != mhp)
593     *mhp = mqm->mh;
594   return mqm;
595 }
596
597
598 /**
599  * Create a new envelope by copying an existing message.
600  *
601  * @param hdr header of the message to copy
602  * @return envelope containing @a hdr
603  */
604 struct GNUNET_MQ_Envelope *
605 GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
606 {
607   struct GNUNET_MQ_Envelope *mqm;
608   uint16_t size = ntohs (hdr->size);
609
610   mqm = GNUNET_malloc (sizeof (*mqm) + size);
611   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
612   GNUNET_memcpy (mqm->mh,
613           hdr,
614           size);
615   return mqm;
616 }
617
618
619 /**
620  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
621  *
622  * @param mhp pointer to the message header pointer that will be changed to allocate at
623  *        the newly allocated space for the message.
624  * @param base_size size of the data before the nested message
625  * @param type type of the message in the envelope
626  * @param nested_mh the message to append to the message after base_size
627  */
628 struct GNUNET_MQ_Envelope *
629 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
630                           uint16_t base_size,
631                           uint16_t type,
632                           const struct GNUNET_MessageHeader *nested_mh)
633 {
634   struct GNUNET_MQ_Envelope *mqm;
635   uint16_t size;
636
637   if (NULL == nested_mh)
638     return GNUNET_MQ_msg_ (mhp, base_size, type);
639
640   size = base_size + ntohs (nested_mh->size);
641
642   /* check for uint16_t overflow */
643   if (size < base_size)
644     return NULL;
645
646   mqm = GNUNET_MQ_msg_ (mhp, size, type);
647   GNUNET_memcpy ((char *) mqm->mh + base_size,
648                  nested_mh,
649                  ntohs (nested_mh->size));
650
651   return mqm;
652 }
653
654
655 /**
656  * Transmit a queued message to the session's client.
657  *
658  * @param cls consensus session
659  * @param size number of bytes available in @a buf
660  * @param buf where the callee should write the message
661  * @return number of bytes written to @a buf
662  */
663 static size_t
664 transmit_queued (void *cls,
665                  size_t size,
666                  void *buf)
667 {
668   struct GNUNET_MQ_Handle *mq = cls;
669   struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
670   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
671   size_t msg_size;
672
673   GNUNET_assert (NULL != buf);
674   msg_size = ntohs (msg->size);
675   GNUNET_assert (size >= msg_size);
676   GNUNET_memcpy (buf, msg, msg_size);
677   state->th = NULL;
678
679   GNUNET_MQ_impl_send_continue (mq);
680
681   return msg_size;
682 }
683
684
685 static void
686 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
687                             void *impl_state)
688 {
689   struct ServerClientSocketState *state = impl_state;
690
691   if (NULL != state->th)
692   {
693     GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
694     state->th = NULL;
695   }
696
697   GNUNET_assert (NULL != mq);
698   GNUNET_assert (NULL != state);
699   GNUNET_SERVER_client_drop (state->client);
700   GNUNET_free (state);
701 }
702
703
704 static void
705 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
706                          const struct GNUNET_MessageHeader *msg,
707                          void *impl_state)
708 {
709   struct ServerClientSocketState *state = impl_state;
710
711   GNUNET_assert (NULL != mq);
712   state->th = GNUNET_SERVER_notify_transmit_ready (state->client,
713                                                    ntohs (msg->size),
714                                                    GNUNET_TIME_UNIT_FOREVER_REL,
715                                                    &transmit_queued, mq);
716 }
717
718
719 struct GNUNET_MQ_Handle *
720 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
721 {
722   struct GNUNET_MQ_Handle *mq;
723   struct ServerClientSocketState *scss;
724
725   mq = GNUNET_new (struct GNUNET_MQ_Handle);
726   scss = GNUNET_new (struct ServerClientSocketState);
727   mq->impl_state = scss;
728   scss->client = client;
729   GNUNET_SERVER_client_keep (client);
730   mq->send_impl = &server_client_send_impl;
731   mq->destroy_impl = &server_client_destroy_impl;
732   return mq;
733 }
734
735
736 /**
737  * Type of a function to call when we receive a message
738  * from the service.
739  *
740  * @param cls closure
741  * @param msg message received, NULL on timeout or fatal error
742  */
743 static void
744 handle_client_message (void *cls,
745                        const struct GNUNET_MessageHeader *msg)
746 {
747   struct GNUNET_MQ_Handle *mq = cls;
748   struct ClientConnectionState *state;
749
750   state = mq->impl_state;
751   if (NULL == msg)
752   {
753     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
754     return;
755   }
756   GNUNET_CLIENT_receive (state->connection,
757                          &handle_client_message,
758                          mq,
759                          GNUNET_TIME_UNIT_FOREVER_REL);
760   GNUNET_MQ_inject_message (mq, msg);
761 }
762
763
764 /**
765  * Transmit a queued message to the session's client.
766  *
767  * @param cls consensus session
768  * @param size number of bytes available in @a buf
769  * @param buf where the callee should write the message
770  * @return number of bytes written to buf
771  */
772 static size_t
773 connection_client_transmit_queued (void *cls,
774                                    size_t size,
775                                    void *buf)
776 {
777   struct GNUNET_MQ_Handle *mq = cls;
778   const struct GNUNET_MessageHeader *msg;
779   struct ClientConnectionState *state = mq->impl_state;
780   size_t msg_size;
781
782   GNUNET_assert (NULL != mq);
783   state->th = NULL;
784   msg = GNUNET_MQ_impl_current (mq);
785
786   if (NULL == buf)
787   {
788     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
789     return 0;
790   }
791
792   if ( (GNUNET_YES == state->receive_requested) &&
793        (GNUNET_NO == state->receive_active) )
794   {
795     state->receive_active = GNUNET_YES;
796     GNUNET_CLIENT_receive (state->connection,
797                            &handle_client_message,
798                            mq,
799                            GNUNET_TIME_UNIT_FOREVER_REL);
800   }
801
802   msg_size = ntohs (msg->size);
803   GNUNET_assert (size >= msg_size);
804   GNUNET_memcpy (buf, msg, msg_size);
805   state->th = NULL;
806
807   GNUNET_MQ_impl_send_continue (mq);
808
809   return msg_size;
810 }
811
812
813 static void
814 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
815                                 void *impl_state)
816 {
817   struct ClientConnectionState *state = impl_state;
818
819   if (NULL != state->th)
820   {
821     GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
822     state->th = NULL;
823   }
824   GNUNET_CLIENT_disconnect (state->connection);
825   GNUNET_free (impl_state);
826 }
827
828
829 static void
830 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
831                              const struct GNUNET_MessageHeader *msg,
832                              void *impl_state)
833 {
834   struct ClientConnectionState *state = impl_state;
835
836   GNUNET_assert (NULL != state);
837   GNUNET_assert (NULL == state->th);
838   state->th =
839       GNUNET_CLIENT_notify_transmit_ready (state->connection,
840                                            ntohs (msg->size),
841                                            GNUNET_TIME_UNIT_FOREVER_REL,
842                                            GNUNET_NO,
843                                            &connection_client_transmit_queued,
844                                            mq);
845   GNUNET_assert (NULL != state->th);
846 }
847
848
849 static void
850 connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
851                                void *impl_state)
852 {
853   struct ClientConnectionState *state = impl_state;
854
855   if (NULL != state->th)
856   {
857     GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
858     state->th = NULL;
859   }
860   else if (NULL != mq->continue_task)
861   {
862     GNUNET_SCHEDULER_cancel (mq->continue_task);
863     mq->continue_task = NULL;
864   }
865   else
866     GNUNET_assert (0);
867 }
868
869
870 struct GNUNET_MQ_Handle *
871 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
872                                        const struct GNUNET_MQ_MessageHandler *handlers,
873                                        GNUNET_MQ_ErrorHandler error_handler,
874                                        void *error_handler_cls)
875 {
876   struct GNUNET_MQ_Handle *mq;
877   struct ClientConnectionState *state;
878   unsigned int i;
879
880   mq = GNUNET_new (struct GNUNET_MQ_Handle);
881   if (NULL != handlers)
882   {
883     for (i=0;NULL != handlers[i].cb; i++) ;
884     mq->handlers = GNUNET_new_array (i + 1,
885                                      struct GNUNET_MQ_MessageHandler);
886     GNUNET_memcpy (mq->handlers,
887                    handlers,
888                    i * sizeof (struct GNUNET_MQ_MessageHandler));
889   }
890   mq->error_handler = error_handler;
891   mq->error_handler_cls = error_handler_cls;
892   state = GNUNET_new (struct ClientConnectionState);
893   state->connection = connection;
894   mq->impl_state = state;
895   mq->send_impl = &connection_client_send_impl;
896   mq->destroy_impl = &connection_client_destroy_impl;
897   mq->cancel_impl = &connection_client_cancel_impl;
898   if (NULL != handlers)
899     state->receive_requested = GNUNET_YES;
900
901   return mq;
902 }
903
904
905 /**
906  * Associate the assoc_data in mq with a unique request id.
907  *
908  * @param mq message queue, id will be unique for the queue
909  * @param assoc_data to associate
910  */
911 uint32_t
912 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
913                      void *assoc_data)
914 {
915   uint32_t id;
916
917   if (NULL == mq->assoc_map)
918   {
919     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
920     mq->assoc_id = 1;
921   }
922   id = mq->assoc_id++;
923   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
924                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
925   return id;
926 }
927
928
929 void *
930 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
931                      uint32_t request_id)
932 {
933   if (NULL == mq->assoc_map)
934     return NULL;
935   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
936 }
937
938
939 void *
940 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
941                         uint32_t request_id)
942 {
943   void *val;
944
945   if (NULL == mq->assoc_map)
946     return NULL;
947   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
948                                              request_id);
949   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
950                                               request_id);
951   return val;
952 }
953
954
955 void
956 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
957                        GNUNET_MQ_NotifyCallback cb,
958                        void *cls)
959 {
960   mqm->sent_cb = cb;
961   mqm->sent_cls = cls;
962 }
963
964
965 void
966 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
967 {
968   if (NULL != mq->destroy_impl)
969   {
970     mq->destroy_impl (mq, mq->impl_state);
971   }
972   if (NULL != mq->continue_task)
973   {
974     GNUNET_SCHEDULER_cancel (mq->continue_task);
975     mq->continue_task = NULL;
976   }
977   while (NULL != mq->envelope_head)
978   {
979     struct GNUNET_MQ_Envelope *ev;
980
981     ev = mq->envelope_head;
982     ev->parent_queue = NULL;
983     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
984                                  mq->envelope_tail,
985                                  ev);
986     mq->queue_length--;
987     GNUNET_MQ_discard (ev);
988   }
989   if (NULL != mq->current_envelope)
990   {
991     /* we can only discard envelopes that
992      * are not queued! */
993     mq->current_envelope->parent_queue = NULL;
994     GNUNET_MQ_discard (mq->current_envelope);
995     mq->current_envelope = NULL;
996     mq->queue_length--;
997   }
998   GNUNET_assert (0 == mq->queue_length);
999   if (NULL != mq->assoc_map)
1000   {
1001     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
1002     mq->assoc_map = NULL;
1003   }
1004   GNUNET_free_non_null (mq->handlers);
1005   GNUNET_free (mq);
1006 }
1007
1008
1009 const struct GNUNET_MessageHeader *
1010 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
1011                               uint16_t base_size)
1012 {
1013   uint16_t whole_size;
1014   uint16_t nested_size;
1015   const struct GNUNET_MessageHeader *nested_msg;
1016
1017   whole_size = ntohs (mh->size);
1018   GNUNET_assert (whole_size >= base_size);
1019   nested_size = whole_size - base_size;
1020   if (0 == nested_size)
1021     return NULL;
1022   if (nested_size < sizeof (struct GNUNET_MessageHeader))
1023   {
1024     GNUNET_break_op (0);
1025     return NULL;
1026   }
1027   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
1028   if (ntohs (nested_msg->size) != nested_size)
1029   {
1030     GNUNET_break_op (0);
1031     return NULL;
1032   }
1033   return nested_msg;
1034 }
1035
1036
1037 /**
1038  * Cancel sending the message. Message must have been sent with
1039  * #GNUNET_MQ_send before.  May not be called after the notify sent
1040  * callback has been called
1041  *
1042  * @param ev queued envelope to cancel
1043  */
1044 void
1045 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
1046 {
1047   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
1048
1049   GNUNET_assert (NULL != mq);
1050   GNUNET_assert (NULL != mq->cancel_impl);
1051
1052   if (mq->current_envelope == ev)
1053   {
1054     // complex case, we already started with transmitting
1055     // the message
1056     mq->queue_length--;
1057     mq->cancel_impl (mq,
1058                      mq->impl_state);
1059     // continue sending the next message, if any
1060     if (NULL == mq->envelope_head)
1061     {
1062       mq->current_envelope = NULL;
1063     }
1064     else
1065     {
1066       mq->current_envelope = mq->envelope_head;
1067       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1068                                    mq->envelope_tail,
1069                                    mq->current_envelope);
1070       mq->queue_length--;
1071       mq->send_impl (mq,
1072                      mq->current_envelope->mh,
1073                      mq->impl_state);
1074     }
1075   }
1076   else
1077   {
1078     // simple case, message is still waiting in the queue
1079     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1080                                  mq->envelope_tail,
1081                                  ev);
1082     mq->queue_length--;
1083   }
1084
1085   ev->parent_queue = NULL;
1086   ev->mh = NULL;
1087   GNUNET_free (ev);
1088 }
1089
1090
1091 /**
1092  * Function to obtain the current envelope
1093  * from within #GNUNET_MQ_SendImpl implementations.
1094  *
1095  * @param mq message queue to interrogate
1096  * @return the current envelope
1097  */
1098 struct GNUNET_MQ_Envelope *
1099 GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
1100 {
1101   return mq->current_envelope;
1102 }
1103
1104
1105 /**
1106  * Function to obtain the last envelope in the queue.
1107  *
1108  * @param mq message queue to interrogate
1109  * @return the last envelope in the queue
1110  */
1111 struct GNUNET_MQ_Envelope *
1112 GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
1113 {
1114   if (NULL != mq->envelope_tail)
1115     return mq->envelope_tail;
1116
1117   return mq->current_envelope;
1118 }
1119
1120
1121 /**
1122  * Set application-specific options for this envelope.
1123  * Overrides the options set for the queue with
1124  * #GNUNET_MQ_set_options() for this message only.
1125  *
1126  * @param env message to set options for
1127  * @param flags flags to use (meaning is queue-specific)
1128  * @param extra additional buffer for further data (also queue-specific)
1129  */
1130 void
1131 GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1132                            uint64_t flags,
1133                            const void *extra)
1134 {
1135   env->flags = flags;
1136   env->extra = extra;
1137   env->have_custom_options = GNUNET_YES;
1138 }
1139
1140
1141 /**
1142  * Get application-specific options for this envelope.
1143  *
1144  * @param env message to set options for
1145  * @param[out] flags set to flags to use (meaning is queue-specific)
1146  * @return extra additional buffer for further data (also queue-specific)
1147  */
1148 const void *
1149 GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env,
1150                            uint64_t *flags)
1151 {
1152   struct GNUNET_MQ_Handle *mq = env->parent_queue;
1153
1154   if (GNUNET_YES == env->have_custom_options)
1155   {
1156     *flags = env->flags;
1157     return env->extra;
1158   }
1159   if (NULL == mq)
1160   {
1161     *flags = 0;
1162     return NULL;
1163   }
1164   *flags = mq->default_flags;
1165   return mq->default_extra;
1166 }
1167
1168
1169 /**
1170  * Set application-specific options for this queue.
1171  *
1172  * @param mq message queue to set options for
1173  * @param flags flags to use (meaning is queue-specific)
1174  * @param extra additional buffer for further data (also queue-specific)
1175  */
1176 void
1177 GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1178                        uint64_t flags,
1179                        const void *extra)
1180 {
1181   mq->default_flags = flags;
1182   mq->default_extra = extra;
1183 }
1184
1185
1186 /**
1187  * Handle we return for callbacks registered to be
1188  * notified when #GNUNET_MQ_destroy() is called on a queue.
1189  */
1190 struct GNUNET_MQ_DestroyNotificationHandle
1191 {
1192   /**
1193    * Kept in a DLL.
1194    */
1195   struct GNUNET_MQ_DestroyNotificationHandle *prev;
1196
1197   /**
1198    * Kept in a DLL.
1199    */
1200   struct GNUNET_MQ_DestroyNotificationHandle *next;
1201
1202   /**
1203    * Queue to notify about.
1204    */
1205   struct GNUNET_MQ_Handle *mq;
1206
1207   /**
1208    * Function to call.
1209    */
1210   GNUNET_SCHEDULER_TaskCallback cb;
1211
1212   /**
1213    * Closure for @e cb.
1214    */
1215   void *cb_cls;
1216 };
1217
1218
1219 /**
1220  * Register function to be called whenever @a mq is being
1221  * destroyed.
1222  *
1223  * @param mq message queue to watch
1224  * @param cb function to call on @a mq destruction
1225  * @param cb_cls closure for @a cb
1226  * @return handle for #GNUNET_MQ_destroy_notify_cancel().
1227  */
1228 struct GNUNET_MQ_DestroyNotificationHandle *
1229 GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1230                           GNUNET_SCHEDULER_TaskCallback cb,
1231                           void *cb_cls)
1232 {
1233   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1234
1235   dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
1236   dnh->mq = mq;
1237   dnh->cb = cb;
1238   dnh->cb_cls = cb_cls;
1239   GNUNET_CONTAINER_DLL_insert (mq->dnh_head,
1240                                mq->dnh_tail,
1241                                dnh);
1242   return dnh;
1243 }
1244
1245
1246 /**
1247  * Cancel registration from #GNUNET_MQ_destroy_notify().
1248  *
1249  * @param dnh handle for registration to cancel
1250  */
1251 void
1252 GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh)
1253 {
1254   struct GNUNET_MQ_Handle *mq = dnh->mq;
1255   
1256   GNUNET_CONTAINER_DLL_remove (mq->dnh_head,
1257                                mq->dnh_tail,
1258                                dnh);
1259   GNUNET_free (dnh);
1260 }
1261
1262
1263 /* end of mq.c */