psycstore
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header,
48    * usually points to the end of the containing GNUNET_MQ_Envelope
49    */
50   struct GNUNET_MessageHeader *mh;
51
52   /**
53    * Queue the message is queued in, NULL if message is not queued.
54    */
55   struct GNUNET_MQ_Handle *parent_queue;
56
57   /**
58    * Called after the message was sent irrevocably.
59    */
60   GNUNET_MQ_NotifyCallback sent_cb;
61
62   /**
63    * Closure for @e send_cb
64    */
65   void *sent_cls;
66
67   /**
68    * Flags that were set for this envelope by
69    * #GNUNET_MQ_env_set_options().   Only valid if
70    * @e have_custom_options is set.
71    */
72   uint64_t flags;
73
74   /**
75    * Additional options buffer set for this envelope by
76    * #GNUNET_MQ_env_set_options().  Only valid if
77    * @e have_custom_options is set.
78    */
79   const void *extra;
80
81   /**
82    * Did the application call #GNUNET_MQ_env_set_options()?
83    */
84   int have_custom_options;
85
86 };
87
88
89 /**
90  * Handle to a message queue.
91  */
92 struct GNUNET_MQ_Handle
93 {
94   /**
95    * Handlers array, or NULL if the queue should not receive messages
96    */
97   struct GNUNET_MQ_MessageHandler *handlers;
98
99   /**
100    * Actual implementation of message sending,
101    * called when a message is added
102    */
103   GNUNET_MQ_SendImpl send_impl;
104
105   /**
106    * Implementation-dependent queue destruction function
107    */
108   GNUNET_MQ_DestroyImpl destroy_impl;
109
110   /**
111    * Implementation-dependent send cancel function
112    */
113   GNUNET_MQ_CancelImpl cancel_impl;
114
115   /**
116    * Implementation-specific state
117    */
118   void *impl_state;
119
120   /**
121    * Callback will be called when an error occurs.
122    */
123   GNUNET_MQ_ErrorHandler error_handler;
124
125   /**
126    * Closure for the error handler.
127    */
128   void *error_handler_cls;
129
130   /**
131    * Linked list of messages pending to be sent
132    */
133   struct GNUNET_MQ_Envelope *envelope_head;
134
135   /**
136    * Linked list of messages pending to be sent
137    */
138   struct GNUNET_MQ_Envelope *envelope_tail;
139
140   /**
141    * Message that is currently scheduled to be
142    * sent. Not the head of the message queue, as the implementation
143    * needs to know if sending has been already scheduled or not.
144    */
145   struct GNUNET_MQ_Envelope *current_envelope;
146
147   /**
148    * Map of associations, lazily allocated
149    */
150   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
151
152   /**
153    * Task scheduled during #GNUNET_MQ_impl_send_continue.
154    */
155   struct GNUNET_SCHEDULER_Task *continue_task;
156
157   /**
158    * Functions to call on queue destruction; kept in a DLL.
159    */
160   struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
161
162   /**
163    * Functions to call on queue destruction; kept in a DLL.
164    */
165   struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
166
167   /**
168    * Additional options buffer set for this queue by
169    * #GNUNET_MQ_set_options().  Default is 0.
170    */
171   const void *default_extra;
172
173   /**
174    * Flags that were set for this queue by
175    * #GNUNET_MQ_set_options().   Default is 0.
176    */
177   uint64_t default_flags;
178
179   /**
180    * Next id that should be used for the @e assoc_map,
181    * initialized lazily to a random value together with
182    * @e assoc_map
183    */
184   uint32_t assoc_id;
185
186   /**
187    * Number of entries we have in the envelope-DLL.
188    */
189   unsigned int queue_length;
190 };
191
192
193 /**
194  * Implementation-specific state for connection to
195  * client (MQ for server).
196  */
197 struct ServerClientSocketState
198 {
199   /**
200    * Handle of the client that connected to the server.
201    */
202   struct GNUNET_SERVER_Client *client;
203
204   /**
205    * Active transmission request to the client.
206    */
207   struct GNUNET_SERVER_TransmitHandle *th;
208 };
209
210
211 /**
212  * Implementation-specific state for connection to
213  * service (MQ for clients).
214  */
215 struct ClientConnectionState
216 {
217   /**
218    * Did we call receive alread alreadyy?
219    */
220   int receive_active;
221
222   /**
223    * Do we also want to receive?
224    */
225   int receive_requested;
226
227   /**
228    * Connection to the service.
229    */
230   struct GNUNET_CLIENT_Connection *connection;
231
232   /**
233    * Active transmission request (or NULL).
234    */
235   struct GNUNET_CLIENT_TransmitHandle *th;
236 };
237
238
239 /**
240  * Call the message message handler that was registered
241  * for the type of the given message in the given message queue.
242  *
243  * This function is indended to be used for the implementation
244  * of message queues.
245  *
246  * @param mq message queue with the handlers
247  * @param mh message to dispatch
248  */
249 void
250 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
251                           const struct GNUNET_MessageHeader *mh)
252 {
253   const struct GNUNET_MQ_MessageHandler *handler;
254   int handled = GNUNET_NO;
255   uint16_t ms = ntohs (mh->size);
256
257   if (NULL == mq->handlers)
258     goto done;
259   for (handler = mq->handlers; NULL != handler->cb; handler++)
260   {
261     if (handler->type == ntohs (mh->type))
262     {
263       handled = GNUNET_YES;
264       if ( (handler->expected_size > ms) ||
265            ( (handler->expected_size != ms) &&
266              (NULL == handler->mv) ) )
267       {
268         /* Too small, or not an exact size and
269            no 'mv' handler to check rest */
270         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
271                     "Received malformed message of type %u\n",
272                     (unsigned int) handler->type);
273         GNUNET_MQ_inject_error (mq,
274                                 GNUNET_MQ_ERROR_MALFORMED);
275         break;
276       }
277       if ( (NULL == handler->mv) ||
278            (GNUNET_OK ==
279             handler->mv (handler->cls, mh)) )
280       {
281         /* message well-formed, pass to handler */
282         handler->cb (handler->cls, mh);
283       }
284       else
285       {
286         /* Message rejected by check routine */
287         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
288                     "Received malformed message of type %u\n",
289                     (unsigned int) handler->type);
290         GNUNET_MQ_inject_error (mq,
291                                 GNUNET_MQ_ERROR_MALFORMED);
292       }
293       break;
294     }
295   }
296  done:
297   if (GNUNET_NO == handled)
298     LOG (GNUNET_ERROR_TYPE_INFO,
299          "No handler for message of type %d and size %d\n",
300          ntohs (mh->type),
301          ntohs (mh->size));
302 }
303
304
305 /**
306  * Call the error handler of a message queue with the given
307  * error code.  If there is no error handler, log a warning.
308  *
309  * This function is intended to be used by the implementation
310  * of message queues.
311  *
312  * @param mq message queue
313  * @param error the error type
314  */
315 void
316 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
317                         enum GNUNET_MQ_Error error)
318 {
319   if (NULL == mq->error_handler)
320   {
321     LOG (GNUNET_ERROR_TYPE_WARNING,
322          "Got error %d, but no handler installed\n",
323          (int) error);
324     return;
325   }
326   mq->error_handler (mq->error_handler_cls,
327                      error);
328 }
329
330
331 /**
332  * Discard the message queue message, free all
333  * allocated resources. Must be called in the event
334  * that a message is created but should not actually be sent.
335  *
336  * @param mqm the message to discard
337  */
338 void
339 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
340 {
341   GNUNET_assert (NULL == mqm->parent_queue);
342   GNUNET_free (mqm);
343 }
344
345
346 /**
347  * Obtain the current length of the message queue.
348  *
349  * @param mq queue to inspect
350  * @return number of queued, non-transmitted messages
351  */
352 unsigned int
353 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
354 {
355   return mq->queue_length;
356 }
357
358
359 /**
360  * Send a message with the given message queue.
361  * May only be called once per message.
362  *
363  * @param mq message queue
364  * @param ev the envelope with the message to send.
365  */
366 void
367 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
368                 struct GNUNET_MQ_Envelope *ev)
369 {
370   GNUNET_assert (NULL != mq);
371   GNUNET_assert (NULL == ev->parent_queue);
372
373   mq->queue_length++;
374   ev->parent_queue = mq;
375   /* is the implementation busy? queue it! */
376   if (NULL != mq->current_envelope)
377   {
378     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
379                                       mq->envelope_tail,
380                                       ev);
381     return;
382   }
383   mq->current_envelope = ev;
384   mq->send_impl (mq,
385                  ev->mh,
386                  mq->impl_state);
387 }
388
389
390 /**
391  * Send a copy of a message with the given message queue.
392  * Can be called repeatedly on the same envelope.
393  *
394  * @param mq message queue
395  * @param ev the envelope with the message to send.
396  */
397 void
398 GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
399                      const struct GNUNET_MQ_Envelope *ev)
400 {
401   struct GNUNET_MQ_Envelope *env;
402   uint16_t msize;
403
404   msize = ntohs (ev->mh->size);
405   env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) +
406                        msize);
407   env->mh = (struct GNUNET_MessageHeader *) &env[1];
408   env->sent_cb = ev->sent_cb;
409   env->sent_cls = ev->sent_cls;
410   GNUNET_memcpy (&env[1],
411           ev->mh,
412           msize);
413   GNUNET_MQ_send (mq,
414                   env);
415 }
416
417
418
419 /**
420  * Task run to call the send implementation for the next queued
421  * message, if any.  Only useful for implementing message queues,
422  * results in undefined behavior if not used carefully.
423  *
424  * @param cls message queue to send the next message with
425  */
426 static void
427 impl_send_continue (void *cls)
428 {
429   struct GNUNET_MQ_Handle *mq = cls;
430   struct GNUNET_MQ_Envelope *current_envelope;
431
432   mq->continue_task = NULL;
433   /* call is only valid if we're actually currently sending
434    * a message */
435   current_envelope = mq->current_envelope;
436   GNUNET_assert (NULL != current_envelope);
437   current_envelope->parent_queue = NULL;
438   GNUNET_assert (0 < mq->queue_length);
439   mq->queue_length--;
440   if (NULL == mq->envelope_head)
441   {
442     mq->current_envelope = NULL;
443   }
444   else
445   {
446     mq->current_envelope = mq->envelope_head;
447     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
448                                  mq->envelope_tail,
449                                  mq->current_envelope);
450     mq->send_impl (mq,
451                    mq->current_envelope->mh,
452                    mq->impl_state);
453   }
454   if (NULL != current_envelope->sent_cb)
455     current_envelope->sent_cb (current_envelope->sent_cls);
456   GNUNET_free (current_envelope);
457 }
458
459
460 /**
461  * Call the send implementation for the next queued message, if any.
462  * Only useful for implementing message queues, results in undefined
463  * behavior if not used carefully.
464  *
465  * @param mq message queue to send the next message with
466  */
467 void
468 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
469 {
470   GNUNET_assert (NULL == mq->continue_task);
471   mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
472                                                 mq);
473 }
474
475
476 /**
477  * Create a message queue for the specified handlers.
478  *
479  * @param send function the implements sending messages
480  * @param destroy function that implements destroying the queue
481  * @param cancel function that implements canceling a message
482  * @param impl_state for the queue, passed to 'send' and 'destroy'
483  * @param handlers array of message handlers
484  * @param error_handler handler for read and write errors
485  * @param error_handler_cls closure for @a error_handler
486  * @return a new message queue
487  */
488 struct GNUNET_MQ_Handle *
489 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
490                                GNUNET_MQ_DestroyImpl destroy,
491                                GNUNET_MQ_CancelImpl cancel,
492                                void *impl_state,
493                                const struct GNUNET_MQ_MessageHandler *handlers,
494                                GNUNET_MQ_ErrorHandler error_handler,
495                                void *error_handler_cls)
496 {
497   struct GNUNET_MQ_Handle *mq;
498   unsigned int i;
499
500   mq = GNUNET_new (struct GNUNET_MQ_Handle);
501   mq->send_impl = send;
502   mq->destroy_impl = destroy;
503   mq->cancel_impl = cancel;
504   if (NULL != handlers)
505   {
506     for (i=0;NULL != handlers[i].cb; i++) ;
507     mq->handlers = GNUNET_new_array (i + 1,
508                                      struct GNUNET_MQ_MessageHandler);
509     GNUNET_memcpy (mq->handlers,
510             handlers,
511             i * sizeof (struct GNUNET_MQ_MessageHandler));
512   }
513   mq->error_handler = error_handler;
514   mq->error_handler_cls = error_handler_cls;
515   mq->impl_state = impl_state;
516
517   return mq;
518 }
519
520
521 /**
522  * Change the closure argument in all of the `handlers` of the
523  * @a mq.
524  *
525  * @param mq to modify
526  * @param handlers_cls new closure to use
527  */
528 void
529 GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
530                                 void *handlers_cls)
531 {
532   unsigned int i;
533
534   if (NULL == mq->handlers)
535     return;
536   for (i=0;NULL != mq->handlers[i].cb; i++)
537     mq->handlers[i].cls = handlers_cls;
538 }
539
540
541 /**
542  * Get the message that should currently be sent.
543  * Fails if there is no current message.
544  * Only useful for implementing message queues,
545  * results in undefined behavior if not used carefully.
546  *
547  * @param mq message queue with the current message
548  * @return message to send, never NULL
549  */
550 const struct GNUNET_MessageHeader *
551 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
552 {
553   if (NULL == mq->current_envelope)
554     GNUNET_assert (0);
555   if (NULL == mq->current_envelope->mh)
556     GNUNET_assert (0);
557   return mq->current_envelope->mh;
558 }
559
560
561 /**
562  * Get the implementation state associated with the
563  * message queue.
564  *
565  * While the GNUNET_MQ_Impl* callbacks receive the
566  * implementation state, continuations that are scheduled
567  * by the implementation function often only have one closure
568  * argument, with this function it is possible to get at the
569  * implementation state when only passing the GNUNET_MQ_Handle
570  * as closure.
571  *
572  * @param mq message queue with the current message
573  * @return message to send, never NULL
574  */
575 void *
576 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
577 {
578   return mq->impl_state;
579 }
580
581
582 struct GNUNET_MQ_Envelope *
583 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
584                 uint16_t size,
585                 uint16_t type)
586 {
587   struct GNUNET_MQ_Envelope *mqm;
588
589   mqm = GNUNET_malloc (sizeof *mqm + size);
590   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
591   mqm->mh->size = htons (size);
592   mqm->mh->type = htons (type);
593   if (NULL != mhp)
594     *mhp = mqm->mh;
595   return mqm;
596 }
597
598
599 /**
600  * Create a new envelope by copying an existing message.
601  *
602  * @param hdr header of the message to copy
603  * @return envelope containing @a hdr
604  */
605 struct GNUNET_MQ_Envelope *
606 GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
607 {
608   struct GNUNET_MQ_Envelope *mqm;
609   uint16_t size = ntohs (hdr->size);
610
611   mqm = GNUNET_malloc (sizeof (*mqm) + size);
612   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
613   GNUNET_memcpy (mqm->mh,
614           hdr,
615           size);
616   return mqm;
617 }
618
619
620 /**
621  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
622  *
623  * @param mhp pointer to the message header pointer that will be changed to allocate at
624  *        the newly allocated space for the message.
625  * @param base_size size of the data before the nested message
626  * @param type type of the message in the envelope
627  * @param nested_mh the message to append to the message after base_size
628  */
629 struct GNUNET_MQ_Envelope *
630 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
631                           uint16_t base_size,
632                           uint16_t type,
633                           const struct GNUNET_MessageHeader *nested_mh)
634 {
635   struct GNUNET_MQ_Envelope *mqm;
636   uint16_t size;
637
638   if (NULL == nested_mh)
639     return GNUNET_MQ_msg_ (mhp, base_size, type);
640
641   size = base_size + ntohs (nested_mh->size);
642
643   /* check for uint16_t overflow */
644   if (size < base_size)
645     return NULL;
646
647   mqm = GNUNET_MQ_msg_ (mhp, size, type);
648   GNUNET_memcpy ((char *) mqm->mh + base_size,
649                  nested_mh,
650                  ntohs (nested_mh->size));
651
652   return mqm;
653 }
654
655
656 /**
657  * Transmit a queued message to the session's client.
658  *
659  * @param cls consensus session
660  * @param size number of bytes available in @a buf
661  * @param buf where the callee should write the message
662  * @return number of bytes written to @a buf
663  */
664 static size_t
665 transmit_queued (void *cls,
666                  size_t size,
667                  void *buf)
668 {
669   struct GNUNET_MQ_Handle *mq = cls;
670   struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
671   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
672   size_t msg_size;
673
674   GNUNET_assert (NULL != buf);
675   msg_size = ntohs (msg->size);
676   GNUNET_assert (size >= msg_size);
677   GNUNET_memcpy (buf, msg, msg_size);
678   state->th = NULL;
679
680   GNUNET_MQ_impl_send_continue (mq);
681
682   return msg_size;
683 }
684
685
686 static void
687 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
688                             void *impl_state)
689 {
690   struct ServerClientSocketState *state = impl_state;
691
692   if (NULL != state->th)
693   {
694     GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
695     state->th = NULL;
696   }
697
698   GNUNET_assert (NULL != mq);
699   GNUNET_assert (NULL != state);
700   GNUNET_SERVER_client_drop (state->client);
701   GNUNET_free (state);
702 }
703
704
705 static void
706 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
707                          const struct GNUNET_MessageHeader *msg,
708                          void *impl_state)
709 {
710   struct ServerClientSocketState *state = impl_state;
711
712   GNUNET_assert (NULL != mq);
713   state->th = GNUNET_SERVER_notify_transmit_ready (state->client,
714                                                    ntohs (msg->size),
715                                                    GNUNET_TIME_UNIT_FOREVER_REL,
716                                                    &transmit_queued, mq);
717 }
718
719
720 struct GNUNET_MQ_Handle *
721 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
722 {
723   struct GNUNET_MQ_Handle *mq;
724   struct ServerClientSocketState *scss;
725
726   mq = GNUNET_new (struct GNUNET_MQ_Handle);
727   scss = GNUNET_new (struct ServerClientSocketState);
728   mq->impl_state = scss;
729   scss->client = client;
730   GNUNET_SERVER_client_keep (client);
731   mq->send_impl = &server_client_send_impl;
732   mq->destroy_impl = &server_client_destroy_impl;
733   return mq;
734 }
735
736
737 /**
738  * Type of a function to call when we receive a message
739  * from the service.
740  *
741  * @param cls closure
742  * @param msg message received, NULL on timeout or fatal error
743  */
744 static void
745 handle_client_message (void *cls,
746                        const struct GNUNET_MessageHeader *msg)
747 {
748   struct GNUNET_MQ_Handle *mq = cls;
749   struct ClientConnectionState *state;
750
751   state = mq->impl_state;
752   if (NULL == msg)
753   {
754     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
755     return;
756   }
757   GNUNET_CLIENT_receive (state->connection,
758                          &handle_client_message,
759                          mq,
760                          GNUNET_TIME_UNIT_FOREVER_REL);
761   GNUNET_MQ_inject_message (mq, msg);
762 }
763
764
765 /**
766  * Transmit a queued message to the session's client.
767  *
768  * @param cls consensus session
769  * @param size number of bytes available in @a buf
770  * @param buf where the callee should write the message
771  * @return number of bytes written to buf
772  */
773 static size_t
774 connection_client_transmit_queued (void *cls,
775                                    size_t size,
776                                    void *buf)
777 {
778   struct GNUNET_MQ_Handle *mq = cls;
779   const struct GNUNET_MessageHeader *msg;
780   struct ClientConnectionState *state = mq->impl_state;
781   size_t msg_size;
782
783   GNUNET_assert (NULL != mq);
784   state->th = NULL;
785   msg = GNUNET_MQ_impl_current (mq);
786
787   if (NULL == buf)
788   {
789     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
790     return 0;
791   }
792
793   if ( (GNUNET_YES == state->receive_requested) &&
794        (GNUNET_NO == state->receive_active) )
795   {
796     state->receive_active = GNUNET_YES;
797     GNUNET_CLIENT_receive (state->connection,
798                            &handle_client_message,
799                            mq,
800                            GNUNET_TIME_UNIT_FOREVER_REL);
801   }
802
803   msg_size = ntohs (msg->size);
804   GNUNET_assert (size >= msg_size);
805   GNUNET_memcpy (buf, msg, msg_size);
806   state->th = NULL;
807
808   GNUNET_MQ_impl_send_continue (mq);
809
810   return msg_size;
811 }
812
813
814 static void
815 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
816                                 void *impl_state)
817 {
818   struct ClientConnectionState *state = impl_state;
819
820   if (NULL != state->th)
821   {
822     GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
823     state->th = NULL;
824   }
825   GNUNET_CLIENT_disconnect (state->connection);
826   GNUNET_free (impl_state);
827 }
828
829
830 static void
831 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
832                              const struct GNUNET_MessageHeader *msg,
833                              void *impl_state)
834 {
835   struct ClientConnectionState *state = impl_state;
836
837   GNUNET_assert (NULL != state);
838   GNUNET_assert (NULL == state->th);
839   state->th =
840       GNUNET_CLIENT_notify_transmit_ready (state->connection,
841                                            ntohs (msg->size),
842                                            GNUNET_TIME_UNIT_FOREVER_REL,
843                                            GNUNET_NO,
844                                            &connection_client_transmit_queued,
845                                            mq);
846   GNUNET_assert (NULL != state->th);
847 }
848
849
850 static void
851 connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
852                                void *impl_state)
853 {
854   struct ClientConnectionState *state = impl_state;
855
856   if (NULL != state->th)
857   {
858     GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
859     state->th = NULL;
860   }
861   else if (NULL != mq->continue_task)
862   {
863     GNUNET_SCHEDULER_cancel (mq->continue_task);
864     mq->continue_task = NULL;
865   }
866   else
867     GNUNET_assert (0);
868 }
869
870
871 struct GNUNET_MQ_Handle *
872 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
873                                        const struct GNUNET_MQ_MessageHandler *handlers,
874                                        GNUNET_MQ_ErrorHandler error_handler,
875                                        void *error_handler_cls)
876 {
877   struct GNUNET_MQ_Handle *mq;
878   struct ClientConnectionState *state;
879   unsigned int i;
880
881   mq = GNUNET_new (struct GNUNET_MQ_Handle);
882   if (NULL != handlers)
883   {
884     for (i=0;NULL != handlers[i].cb; i++) ;
885     mq->handlers = GNUNET_new_array (i + 1,
886                                      struct GNUNET_MQ_MessageHandler);
887     GNUNET_memcpy (mq->handlers,
888                    handlers,
889                    i * sizeof (struct GNUNET_MQ_MessageHandler));
890   }
891   mq->error_handler = error_handler;
892   mq->error_handler_cls = error_handler_cls;
893   state = GNUNET_new (struct ClientConnectionState);
894   state->connection = connection;
895   mq->impl_state = state;
896   mq->send_impl = &connection_client_send_impl;
897   mq->destroy_impl = &connection_client_destroy_impl;
898   mq->cancel_impl = &connection_client_cancel_impl;
899   if (NULL != handlers)
900     state->receive_requested = GNUNET_YES;
901
902   return mq;
903 }
904
905
906 /**
907  * Associate the assoc_data in mq with a unique request id.
908  *
909  * @param mq message queue, id will be unique for the queue
910  * @param assoc_data to associate
911  */
912 uint32_t
913 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
914                      void *assoc_data)
915 {
916   uint32_t id;
917
918   if (NULL == mq->assoc_map)
919   {
920     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
921     mq->assoc_id = 1;
922   }
923   id = mq->assoc_id++;
924   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
925                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
926   return id;
927 }
928
929
930 void *
931 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
932                      uint32_t request_id)
933 {
934   if (NULL == mq->assoc_map)
935     return NULL;
936   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
937 }
938
939
940 void *
941 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
942                         uint32_t request_id)
943 {
944   void *val;
945
946   if (NULL == mq->assoc_map)
947     return NULL;
948   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
949                                              request_id);
950   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
951                                               request_id);
952   return val;
953 }
954
955
956 /**
957  * Call a callback once the envelope has been sent, that is,
958  * sending it can not be canceled anymore.
959  * There can be only one notify sent callback per envelope.
960  *
961  * @param ev message to call the notify callback for
962  * @param cb the notify callback
963  * @param cb_cls closure for the callback
964  */
965 void
966 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
967                        GNUNET_MQ_NotifyCallback cb,
968                        void *cb_cls)
969 {
970   mqm->sent_cb = cb;
971   mqm->sent_cls = cb_cls;
972 }
973
974
975 /**
976  * Handle we return for callbacks registered to be
977  * notified when #GNUNET_MQ_destroy() is called on a queue.
978  */
979 struct GNUNET_MQ_DestroyNotificationHandle
980 {
981   /**
982    * Kept in a DLL.
983    */
984   struct GNUNET_MQ_DestroyNotificationHandle *prev;
985
986   /**
987    * Kept in a DLL.
988    */
989   struct GNUNET_MQ_DestroyNotificationHandle *next;
990
991   /**
992    * Queue to notify about.
993    */
994   struct GNUNET_MQ_Handle *mq;
995
996   /**
997    * Function to call.
998    */
999   GNUNET_SCHEDULER_TaskCallback cb;
1000
1001   /**
1002    * Closure for @e cb.
1003    */
1004   void *cb_cls;
1005 };
1006
1007
1008 /**
1009  * Destroy the message queue.
1010  *
1011  * @param mq message queue to destroy
1012  */
1013 void
1014 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
1015 {
1016   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1017
1018   if (NULL != mq->destroy_impl)
1019   {
1020     mq->destroy_impl (mq, mq->impl_state);
1021   }
1022   if (NULL != mq->continue_task)
1023   {
1024     GNUNET_SCHEDULER_cancel (mq->continue_task);
1025     mq->continue_task = NULL;
1026   }
1027   while (NULL != mq->envelope_head)
1028   {
1029     struct GNUNET_MQ_Envelope *ev;
1030
1031     ev = mq->envelope_head;
1032     ev->parent_queue = NULL;
1033     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1034                                  mq->envelope_tail,
1035                                  ev);
1036     GNUNET_assert (0 < mq->queue_length);
1037     mq->queue_length--;
1038     GNUNET_MQ_discard (ev);
1039   }
1040   if (NULL != mq->current_envelope)
1041   {
1042     /* we can only discard envelopes that
1043      * are not queued! */
1044     mq->current_envelope->parent_queue = NULL;
1045     GNUNET_MQ_discard (mq->current_envelope);
1046     mq->current_envelope = NULL;
1047     GNUNET_assert (0 < mq->queue_length);
1048     mq->queue_length--;
1049   }
1050   GNUNET_assert (0 == mq->queue_length);
1051   while (NULL != (dnh = mq->dnh_head))
1052   {
1053     dnh->cb (dnh->cb_cls);
1054     GNUNET_MQ_destroy_notify_cancel (dnh);
1055   }
1056   if (NULL != mq->assoc_map)
1057   {
1058     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
1059     mq->assoc_map = NULL;
1060   }
1061   GNUNET_free_non_null (mq->handlers);
1062   GNUNET_free (mq);
1063 }
1064
1065
1066 const struct GNUNET_MessageHeader *
1067 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
1068                               uint16_t base_size)
1069 {
1070   uint16_t whole_size;
1071   uint16_t nested_size;
1072   const struct GNUNET_MessageHeader *nested_msg;
1073
1074   whole_size = ntohs (mh->size);
1075   GNUNET_assert (whole_size >= base_size);
1076   nested_size = whole_size - base_size;
1077   if (0 == nested_size)
1078     return NULL;
1079   if (nested_size < sizeof (struct GNUNET_MessageHeader))
1080   {
1081     GNUNET_break_op (0);
1082     return NULL;
1083   }
1084   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
1085   if (ntohs (nested_msg->size) != nested_size)
1086   {
1087     GNUNET_break_op (0);
1088     return NULL;
1089   }
1090   return nested_msg;
1091 }
1092
1093
1094 /**
1095  * Cancel sending the message. Message must have been sent with
1096  * #GNUNET_MQ_send before.  May not be called after the notify sent
1097  * callback has been called
1098  *
1099  * @param ev queued envelope to cancel
1100  */
1101 void
1102 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
1103 {
1104   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
1105
1106   GNUNET_assert (NULL != mq);
1107   GNUNET_assert (NULL != mq->cancel_impl);
1108
1109   if (mq->current_envelope == ev)
1110   {
1111     // complex case, we already started with transmitting
1112     // the message
1113     GNUNET_assert (0 < mq->queue_length);
1114     mq->queue_length--;
1115     mq->cancel_impl (mq,
1116                      mq->impl_state);
1117     // continue sending the next message, if any
1118     if (NULL == mq->envelope_head)
1119     {
1120       mq->current_envelope = NULL;
1121     }
1122     else
1123     {
1124       mq->current_envelope = mq->envelope_head;
1125       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1126                                    mq->envelope_tail,
1127                                    mq->current_envelope);
1128       mq->send_impl (mq,
1129                      mq->current_envelope->mh,
1130                      mq->impl_state);
1131     }
1132   }
1133   else
1134   {
1135     // simple case, message is still waiting in the queue
1136     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1137                                  mq->envelope_tail,
1138                                  ev);
1139     GNUNET_assert (0 < mq->queue_length);
1140     mq->queue_length--;
1141   }
1142
1143   ev->parent_queue = NULL;
1144   ev->mh = NULL;
1145   GNUNET_free (ev);
1146 }
1147
1148
1149 /**
1150  * Function to obtain the current envelope
1151  * from within #GNUNET_MQ_SendImpl implementations.
1152  *
1153  * @param mq message queue to interrogate
1154  * @return the current envelope
1155  */
1156 struct GNUNET_MQ_Envelope *
1157 GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
1158 {
1159   return mq->current_envelope;
1160 }
1161
1162
1163 /**
1164  * Function to obtain the last envelope in the queue.
1165  *
1166  * @param mq message queue to interrogate
1167  * @return the last envelope in the queue
1168  */
1169 struct GNUNET_MQ_Envelope *
1170 GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
1171 {
1172   if (NULL != mq->envelope_tail)
1173     return mq->envelope_tail;
1174
1175   return mq->current_envelope;
1176 }
1177
1178
1179 /**
1180  * Set application-specific options for this envelope.
1181  * Overrides the options set for the queue with
1182  * #GNUNET_MQ_set_options() for this message only.
1183  *
1184  * @param env message to set options for
1185  * @param flags flags to use (meaning is queue-specific)
1186  * @param extra additional buffer for further data (also queue-specific)
1187  */
1188 void
1189 GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1190                            uint64_t flags,
1191                            const void *extra)
1192 {
1193   env->flags = flags;
1194   env->extra = extra;
1195   env->have_custom_options = GNUNET_YES;
1196 }
1197
1198
1199 /**
1200  * Get application-specific options for this envelope.
1201  *
1202  * @param env message to set options for
1203  * @param[out] flags set to flags to use (meaning is queue-specific)
1204  * @return extra additional buffer for further data (also queue-specific)
1205  */
1206 const void *
1207 GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env,
1208                            uint64_t *flags)
1209 {
1210   struct GNUNET_MQ_Handle *mq = env->parent_queue;
1211
1212   if (GNUNET_YES == env->have_custom_options)
1213   {
1214     *flags = env->flags;
1215     return env->extra;
1216   }
1217   if (NULL == mq)
1218   {
1219     *flags = 0;
1220     return NULL;
1221   }
1222   *flags = mq->default_flags;
1223   return mq->default_extra;
1224 }
1225
1226
1227 /**
1228  * Set application-specific options for this queue.
1229  *
1230  * @param mq message queue to set options for
1231  * @param flags flags to use (meaning is queue-specific)
1232  * @param extra additional buffer for further data (also queue-specific)
1233  */
1234 void
1235 GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1236                        uint64_t flags,
1237                        const void *extra)
1238 {
1239   mq->default_flags = flags;
1240   mq->default_extra = extra;
1241 }
1242
1243
1244 /**
1245  * Register function to be called whenever @a mq is being
1246  * destroyed.
1247  *
1248  * @param mq message queue to watch
1249  * @param cb function to call on @a mq destruction
1250  * @param cb_cls closure for @a cb
1251  * @return handle for #GNUNET_MQ_destroy_notify_cancel().
1252  */
1253 struct GNUNET_MQ_DestroyNotificationHandle *
1254 GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1255                           GNUNET_SCHEDULER_TaskCallback cb,
1256                           void *cb_cls)
1257 {
1258   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1259
1260   dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
1261   dnh->mq = mq;
1262   dnh->cb = cb;
1263   dnh->cb_cls = cb_cls;
1264   GNUNET_CONTAINER_DLL_insert (mq->dnh_head,
1265                                mq->dnh_tail,
1266                                dnh);
1267   return dnh;
1268 }
1269
1270
1271 /**
1272  * Cancel registration from #GNUNET_MQ_destroy_notify().
1273  *
1274  * @param dnh handle for registration to cancel
1275  */
1276 void
1277 GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh)
1278 {
1279   struct GNUNET_MQ_Handle *mq = dnh->mq;
1280
1281   GNUNET_CONTAINER_DLL_remove (mq->dnh_head,
1282                                mq->dnh_tail,
1283                                dnh);
1284   GNUNET_free (dnh);
1285 }
1286
1287
1288 /* end of mq.c */