95bcd71cb42d6efc07e127f9098024ef3358f4d0
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header.
48    * The GNUNET_MQ_Envelope header is allocated at
49    * the end of the message.
50    */
51   struct GNUNET_MessageHeader *mh;
52
53   /**
54    * Queue the message is queued in, NULL if message is not queued.
55    */
56   struct GNUNET_MQ_Handle *parent_queue;
57
58   /**
59    * Called after the message was sent irrevocably.
60    */
61   GNUNET_SCHEDULER_TaskCallback sent_cb;
62
63   /**
64    * Closure for @e send_cb
65    */
66   void *sent_cls;
67
68   /**
69    * Flags that were set for this envelope by
70    * #GNUNET_MQ_env_set_options().   Only valid if
71    * @e have_custom_options is set.
72    */
73   uint64_t flags;
74
75   /**
76    * Additional options buffer set for this envelope by
77    * #GNUNET_MQ_env_set_options().  Only valid if
78    * @e have_custom_options is set.
79    */
80   const void *extra;
81
82   /**
83    * Did the application call #GNUNET_MQ_env_set_options()?
84    */
85   int have_custom_options;
86 };
87
88
89 /**
90  * Handle to a message queue.
91  */
92 struct GNUNET_MQ_Handle
93 {
94   /**
95    * Handlers array, or NULL if the queue should not receive messages
96    */
97   struct GNUNET_MQ_MessageHandler *handlers;
98
99   /**
100    * Actual implementation of message sending,
101    * called when a message is added
102    */
103   GNUNET_MQ_SendImpl send_impl;
104
105   /**
106    * Implementation-dependent queue destruction function
107    */
108   GNUNET_MQ_DestroyImpl destroy_impl;
109
110   /**
111    * Implementation-dependent send cancel function
112    */
113   GNUNET_MQ_CancelImpl cancel_impl;
114
115   /**
116    * Implementation-specific state
117    */
118   void *impl_state;
119
120   /**
121    * Callback will be called when an error occurs.
122    */
123   GNUNET_MQ_ErrorHandler error_handler;
124
125   /**
126    * Closure for the error handler.
127    */
128   void *error_handler_cls;
129
130   /**
131    * Task to asynchronously run #impl_send_continue().
132    */
133   struct GNUNET_SCHEDULER_Task *send_task;
134
135   /**
136    * Linked list of messages pending to be sent
137    */
138   struct GNUNET_MQ_Envelope *envelope_head;
139
140   /**
141    * Linked list of messages pending to be sent
142    */
143   struct GNUNET_MQ_Envelope *envelope_tail;
144
145   /**
146    * Message that is currently scheduled to be
147    * sent. Not the head of the message queue, as the implementation
148    * needs to know if sending has been already scheduled or not.
149    */
150   struct GNUNET_MQ_Envelope *current_envelope;
151
152   /**
153    * Map of associations, lazily allocated
154    */
155   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
156
157   /**
158    * Functions to call on queue destruction; kept in a DLL.
159    */
160   struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
161
162   /**
163    * Functions to call on queue destruction; kept in a DLL.
164    */
165   struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
166
167   /**
168    * Additional options buffer set for this queue by
169    * #GNUNET_MQ_set_options().  Default is 0.
170    */
171   const void *default_extra;
172
173   /**
174    * Flags that were set for this queue by
175    * #GNUNET_MQ_set_options().   Default is 0.
176    */
177   uint64_t default_flags;
178
179   /**
180    * Next id that should be used for the @e assoc_map,
181    * initialized lazily to a random value together with
182    * @e assoc_map
183    */
184   uint32_t assoc_id;
185
186   /**
187    * Number of entries we have in the envelope-DLL.
188    */
189   unsigned int queue_length;
190
191   /**
192    * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
193    * FIXME: is this dead?
194    */
195   int evacuate_called;
196
197   /**
198    * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
199    */
200   int in_flight;
201 };
202
203
204 /**
205  * Implementation-specific state for connection to
206  * client (MQ for server).
207  */
208 struct ServerClientSocketState
209 {
210   /**
211    * Handle of the client that connected to the server.
212    */
213   struct GNUNET_SERVER_Client *client;
214
215   /**
216    * Active transmission request to the client.
217    */
218   struct GNUNET_SERVER_TransmitHandle *th;
219 };
220
221
222 /**
223  * Call the message message handler that was registered
224  * for the type of the given message in the given message queue.
225  *
226  * This function is indended to be used for the implementation
227  * of message queues.
228  *
229  * @param mq message queue with the handlers
230  * @param mh message to dispatch
231  */
232 void
233 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
234                           const struct GNUNET_MessageHeader *mh)
235 {
236   const struct GNUNET_MQ_MessageHandler *handler;
237   int handled = GNUNET_NO;
238   uint16_t ms = ntohs (mh->size);
239
240   if (NULL == mq->handlers)
241     goto done;
242   for (handler = mq->handlers; NULL != handler->cb; handler++)
243   {
244     if (handler->type == ntohs (mh->type))
245     {
246       handled = GNUNET_YES;
247       if ( (handler->expected_size > ms) ||
248            ( (handler->expected_size != ms) &&
249              (NULL == handler->mv) ) )
250       {
251         /* Too small, or not an exact size and
252            no 'mv' handler to check rest */
253         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
254                     "Received malformed message of type %u\n",
255                     (unsigned int) handler->type);
256         GNUNET_MQ_inject_error (mq,
257                                 GNUNET_MQ_ERROR_MALFORMED);
258         break;
259       }
260       if ( (NULL == handler->mv) ||
261            (GNUNET_OK ==
262             handler->mv (handler->cls, mh)) )
263       {
264         /* message well-formed, pass to handler */
265         handler->cb (handler->cls, mh);
266       }
267       else
268       {
269         /* Message rejected by check routine */
270         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
271                     "Received malformed message of type %u\n",
272                     (unsigned int) handler->type);
273         GNUNET_MQ_inject_error (mq,
274                                 GNUNET_MQ_ERROR_MALFORMED);
275       }
276       break;
277     }
278   }
279  done:
280   if (GNUNET_NO == handled)
281     LOG (GNUNET_ERROR_TYPE_INFO,
282          "No handler for message of type %d and size %d\n",
283          ntohs (mh->type),
284          ntohs (mh->size));
285 }
286
287
288 /**
289  * Call the error handler of a message queue with the given
290  * error code.  If there is no error handler, log a warning.
291  *
292  * This function is intended to be used by the implementation
293  * of message queues.
294  *
295  * @param mq message queue
296  * @param error the error type
297  */
298 void
299 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
300                         enum GNUNET_MQ_Error error)
301 {
302   if (NULL == mq->error_handler)
303   {
304     LOG (GNUNET_ERROR_TYPE_WARNING,
305          "Got error %d, but no handler installed\n",
306          (int) error);
307     return;
308   }
309   mq->error_handler (mq->error_handler_cls,
310                      error);
311 }
312
313
314 /**
315  * Discard the message queue message, free all
316  * allocated resources. Must be called in the event
317  * that a message is created but should not actually be sent.
318  *
319  * @param mqm the message to discard
320  */
321 void
322 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
323 {
324   GNUNET_assert (NULL == ev->parent_queue);
325   GNUNET_free (ev);
326 }
327
328
329 /**
330  * Obtain the current length of the message queue.
331  *
332  * @param mq queue to inspect
333  * @return number of queued, non-transmitted messages
334  */
335 unsigned int
336 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
337 {
338   if (GNUNET_YES != mq->in_flight)
339   {
340     return mq->queue_length;
341   }
342   return mq->queue_length - 1;
343 }
344
345
346 /**
347  * Send a message with the given message queue.
348  * May only be called once per message.
349  *
350  * @param mq message queue
351  * @param ev the envelope with the message to send.
352  */
353 void
354 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
355                 struct GNUNET_MQ_Envelope *ev)
356 {
357   GNUNET_assert (NULL != mq);
358   GNUNET_assert (NULL == ev->parent_queue);
359
360   mq->queue_length++;
361   ev->parent_queue = mq;
362   /* is the implementation busy? queue it! */
363   if ( (NULL != mq->current_envelope) ||
364        (NULL != mq->send_task) )
365   {
366     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
367                                       mq->envelope_tail,
368                                       ev);
369     return;
370   }
371   GNUNET_assert (NULL == mq->envelope_head);
372   mq->current_envelope = ev;
373   mq->send_impl (mq,
374                  ev->mh,
375                  mq->impl_state);
376 }
377
378
379 /**
380  * Remove the first envelope that has not yet been sent from the message
381  * queue and return it.
382  *
383  * @param mq queue to remove envelope from
384  * @return NULL if queue is empty (or has no envelope that is not under transmission)
385  */
386 struct GNUNET_MQ_Envelope *
387 GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
388 {
389   struct GNUNET_MQ_Envelope *env;
390
391   env = mq->envelope_head;
392   GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
393                                mq->envelope_tail,
394                                env);
395   mq->queue_length--;
396   env->parent_queue = NULL;
397   return env;
398 }
399
400
401 /**
402  * Function to copy an envelope.  The envelope must not yet
403  * be in any queue or have any options or callbacks set.
404  *
405  * @param env envelope to copy
406  * @return copy of @a env
407  */
408 struct GNUNET_MQ_Envelope *
409 GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
410 {
411   GNUNET_assert (NULL == env->next);
412   GNUNET_assert (NULL == env->parent_queue);
413   GNUNET_assert (NULL == env->sent_cb);
414   GNUNET_assert (GNUNET_NO == env->have_custom_options);
415   return GNUNET_MQ_msg_copy (env->mh);
416 }
417
418
419 /**
420  * Send a copy of a message with the given message queue.
421  * Can be called repeatedly on the same envelope.
422  *
423  * @param mq message queue
424  * @param ev the envelope with the message to send.
425  */
426 void
427 GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
428                      const struct GNUNET_MQ_Envelope *ev)
429 {
430   struct GNUNET_MQ_Envelope *env;
431   uint16_t msize;
432
433   msize = ntohs (ev->mh->size);
434   env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) +
435                        msize);
436   env->mh = (struct GNUNET_MessageHeader *) &env[1];
437   env->sent_cb = ev->sent_cb;
438   env->sent_cls = ev->sent_cls;
439   GNUNET_memcpy (&env[1],
440           ev->mh,
441           msize);
442   GNUNET_MQ_send (mq,
443                   env);
444 }
445
446
447 /**
448  * Task run to call the send implementation for the next queued
449  * message, if any.  Only useful for implementing message queues,
450  * results in undefined behavior if not used carefully.
451  *
452  * @param cls message queue to send the next message with
453  */
454 static void
455 impl_send_continue (void *cls)
456 {
457   struct GNUNET_MQ_Handle *mq = cls;
458
459   mq->send_task = NULL;
460   /* call is only valid if we're actually currently sending
461    * a message */
462   if (NULL == mq->envelope_head)
463     return;
464   mq->current_envelope = mq->envelope_head;
465   GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
466                                mq->envelope_tail,
467                                mq->current_envelope);
468   mq->send_impl (mq,
469                  mq->current_envelope->mh,
470                  mq->impl_state);
471 }
472
473
474 /**
475  * Call the send implementation for the next queued message, if any.
476  * Only useful for implementing message queues, results in undefined
477  * behavior if not used carefully.
478  *
479  * @param mq message queue to send the next message with
480  */
481 void
482 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
483 {
484   struct GNUNET_MQ_Envelope *current_envelope;
485   GNUNET_SCHEDULER_TaskCallback cb;
486
487   GNUNET_assert (0 < mq->queue_length);
488   mq->queue_length--;
489   mq->in_flight = GNUNET_NO;
490   current_envelope = mq->current_envelope;
491   current_envelope->parent_queue = NULL;
492   mq->current_envelope = NULL;
493   GNUNET_assert (NULL == mq->send_task);
494   mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
495                                             mq);
496   if (NULL != (cb = current_envelope->sent_cb))
497   {
498     current_envelope->sent_cb = NULL;
499     cb (current_envelope->sent_cls);
500   }
501   GNUNET_free (current_envelope);
502 }
503
504
505 /**
506  * Call the send notification for the current message, but do not
507  * try to send the next message until #GNUNET_MQ_impl_send_continue
508  * is called.
509  *
510  * Only useful for implementing message queues, results in undefined
511  * behavior if not used carefully.
512  *
513  * @param mq message queue to send the next message with
514  */
515 void
516 GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
517 {
518   struct GNUNET_MQ_Envelope *current_envelope;
519   GNUNET_SCHEDULER_TaskCallback cb;
520
521   mq->in_flight = GNUNET_YES;
522   /* call is only valid if we're actually currently sending
523    * a message */
524   current_envelope = mq->current_envelope;
525   GNUNET_assert (NULL != current_envelope);
526   /* can't call cancel from now on anymore */
527   current_envelope->parent_queue = NULL;
528   if (NULL != (cb = current_envelope->sent_cb))
529   {
530     current_envelope->sent_cb = NULL;
531     cb (current_envelope->sent_cls);
532   }
533 }
534
535
536 /**
537  * Create a message queue for the specified handlers.
538  *
539  * @param send function the implements sending messages
540  * @param destroy function that implements destroying the queue
541  * @param cancel function that implements canceling a message
542  * @param impl_state for the queue, passed to 'send' and 'destroy'
543  * @param handlers array of message handlers
544  * @param error_handler handler for read and write errors
545  * @param error_handler_cls closure for @a error_handler
546  * @return a new message queue
547  */
548 struct GNUNET_MQ_Handle *
549 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
550                                GNUNET_MQ_DestroyImpl destroy,
551                                GNUNET_MQ_CancelImpl cancel,
552                                void *impl_state,
553                                const struct GNUNET_MQ_MessageHandler *handlers,
554                                GNUNET_MQ_ErrorHandler error_handler,
555                                void *error_handler_cls)
556 {
557   struct GNUNET_MQ_Handle *mq;
558
559   mq = GNUNET_new (struct GNUNET_MQ_Handle);
560   mq->send_impl = send;
561   mq->destroy_impl = destroy;
562   mq->cancel_impl = cancel;
563   mq->handlers = GNUNET_MQ_copy_handlers (handlers);
564   mq->error_handler = error_handler;
565   mq->error_handler_cls = error_handler_cls;
566   mq->impl_state = impl_state;
567
568   return mq;
569 }
570
571
572 /**
573  * Change the closure argument in all of the `handlers` of the
574  * @a mq.
575  *
576  * @param mq to modify
577  * @param handlers_cls new closure to use
578  */
579 void
580 GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
581                                 void *handlers_cls)
582 {
583   unsigned int i;
584
585   if (NULL == mq->handlers)
586     return;
587   for (i=0;NULL != mq->handlers[i].cb; i++)
588     mq->handlers[i].cls = handlers_cls;
589 }
590
591
592 /**
593  * Get the message that should currently be sent.
594  * Fails if there is no current message.
595  * Only useful for implementing message queues,
596  * results in undefined behavior if not used carefully.
597  *
598  * @param mq message queue with the current message
599  * @return message to send, never NULL
600  */
601 const struct GNUNET_MessageHeader *
602 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
603 {
604   GNUNET_assert (NULL != mq->current_envelope);
605   GNUNET_assert (NULL != mq->current_envelope->mh);
606   return mq->current_envelope->mh;
607 }
608
609
610 /**
611  * Get the implementation state associated with the
612  * message queue.
613  *
614  * While the GNUNET_MQ_Impl* callbacks receive the
615  * implementation state, continuations that are scheduled
616  * by the implementation function often only have one closure
617  * argument, with this function it is possible to get at the
618  * implementation state when only passing the GNUNET_MQ_Handle
619  * as closure.
620  *
621  * @param mq message queue with the current message
622  * @return message to send, never NULL
623  */
624 void *
625 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
626 {
627   return mq->impl_state;
628 }
629
630
631 struct GNUNET_MQ_Envelope *
632 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
633                 uint16_t size,
634                 uint16_t type)
635 {
636   struct GNUNET_MQ_Envelope *ev;
637
638   ev = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope));
639   ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
640   ev->mh->size = htons (size);
641   ev->mh->type = htons (type);
642   if (NULL != mhp)
643     *mhp = ev->mh;
644   return ev;
645 }
646
647
648 /**
649  * Create a new envelope by copying an existing message.
650  *
651  * @param hdr header of the message to copy
652  * @return envelope containing @a hdr
653  */
654 struct GNUNET_MQ_Envelope *
655 GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
656 {
657   struct GNUNET_MQ_Envelope *mqm;
658   uint16_t size = ntohs (hdr->size);
659
660   mqm = GNUNET_malloc (sizeof (*mqm) + size);
661   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
662   GNUNET_memcpy (mqm->mh,
663           hdr,
664           size);
665   return mqm;
666 }
667
668
669 /**
670  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
671  *
672  * @param mhp pointer to the message header pointer that will be changed to allocate at
673  *        the newly allocated space for the message.
674  * @param base_size size of the data before the nested message
675  * @param type type of the message in the envelope
676  * @param nested_mh the message to append to the message after base_size
677  */
678 struct GNUNET_MQ_Envelope *
679 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
680                           uint16_t base_size,
681                           uint16_t type,
682                           const struct GNUNET_MessageHeader *nested_mh)
683 {
684   struct GNUNET_MQ_Envelope *mqm;
685   uint16_t size;
686
687   if (NULL == nested_mh)
688     return GNUNET_MQ_msg_ (mhp, base_size, type);
689
690   size = base_size + ntohs (nested_mh->size);
691
692   /* check for uint16_t overflow */
693   if (size < base_size)
694     return NULL;
695
696   mqm = GNUNET_MQ_msg_ (mhp, size, type);
697   GNUNET_memcpy ((char *) mqm->mh + base_size,
698                  nested_mh,
699                  ntohs (nested_mh->size));
700
701   return mqm;
702 }
703
704
705 /**
706  * Transmit a queued message to the session's client.
707  *
708  * @param cls consensus session
709  * @param size number of bytes available in @a buf
710  * @param buf where the callee should write the message
711  * @return number of bytes written to @a buf
712  */
713 static size_t
714 transmit_queued (void *cls,
715                  size_t size,
716                  void *buf)
717 {
718   struct GNUNET_MQ_Handle *mq = cls;
719   struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
720   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
721   size_t msg_size;
722
723   GNUNET_assert (NULL != buf);
724   msg_size = ntohs (msg->size);
725   GNUNET_assert (size >= msg_size);
726   GNUNET_memcpy (buf, msg, msg_size);
727   state->th = NULL;
728
729   GNUNET_MQ_impl_send_continue (mq);
730
731   return msg_size;
732 }
733
734
735 static void
736 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
737                             void *impl_state)
738 {
739   struct ServerClientSocketState *state = impl_state;
740
741   if (NULL != state->th)
742   {
743     GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
744     state->th = NULL;
745   }
746
747   GNUNET_assert (NULL != mq);
748   GNUNET_assert (NULL != state);
749   GNUNET_SERVER_client_drop (state->client);
750   GNUNET_free (state);
751 }
752
753
754 static void
755 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
756                          const struct GNUNET_MessageHeader *msg,
757                          void *impl_state)
758 {
759   struct ServerClientSocketState *state = impl_state;
760
761   GNUNET_assert (NULL != mq);
762   state->th = GNUNET_SERVER_notify_transmit_ready (state->client,
763                                                    ntohs (msg->size),
764                                                    GNUNET_TIME_UNIT_FOREVER_REL,
765                                                    &transmit_queued,
766                                                    mq);
767 }
768
769
770 struct GNUNET_MQ_Handle *
771 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
772 {
773   struct GNUNET_MQ_Handle *mq;
774   struct ServerClientSocketState *scss;
775
776   mq = GNUNET_new (struct GNUNET_MQ_Handle);
777   scss = GNUNET_new (struct ServerClientSocketState);
778   mq->impl_state = scss;
779   scss->client = client;
780   GNUNET_SERVER_client_keep (client);
781   mq->send_impl = &server_client_send_impl;
782   mq->destroy_impl = &server_client_destroy_impl;
783   return mq;
784 }
785
786
787 /**
788  * Associate the assoc_data in mq with a unique request id.
789  *
790  * @param mq message queue, id will be unique for the queue
791  * @param assoc_data to associate
792  */
793 uint32_t
794 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
795                      void *assoc_data)
796 {
797   uint32_t id;
798
799   if (NULL == mq->assoc_map)
800   {
801     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
802     mq->assoc_id = 1;
803   }
804   id = mq->assoc_id++;
805   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
806                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
807   return id;
808 }
809
810
811 void *
812 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
813                      uint32_t request_id)
814 {
815   if (NULL == mq->assoc_map)
816     return NULL;
817   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
818 }
819
820
821 void *
822 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
823                         uint32_t request_id)
824 {
825   void *val;
826
827   if (NULL == mq->assoc_map)
828     return NULL;
829   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
830                                              request_id);
831   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
832                                               request_id);
833   return val;
834 }
835
836
837 /**
838  * Call a callback once the envelope has been sent, that is,
839  * sending it can not be canceled anymore.
840  * There can be only one notify sent callback per envelope.
841  *
842  * @param ev message to call the notify callback for
843  * @param cb the notify callback
844  * @param cb_cls closure for the callback
845  */
846 void
847 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
848                        GNUNET_SCHEDULER_TaskCallback cb,
849                        void *cb_cls)
850 {
851   GNUNET_assert (NULL == ev->sent_cb);
852   ev->sent_cb = cb;
853   ev->sent_cls = cb_cls;
854 }
855
856
857 /**
858  * Handle we return for callbacks registered to be
859  * notified when #GNUNET_MQ_destroy() is called on a queue.
860  */
861 struct GNUNET_MQ_DestroyNotificationHandle
862 {
863   /**
864    * Kept in a DLL.
865    */
866   struct GNUNET_MQ_DestroyNotificationHandle *prev;
867
868   /**
869    * Kept in a DLL.
870    */
871   struct GNUNET_MQ_DestroyNotificationHandle *next;
872
873   /**
874    * Queue to notify about.
875    */
876   struct GNUNET_MQ_Handle *mq;
877
878   /**
879    * Function to call.
880    */
881   GNUNET_SCHEDULER_TaskCallback cb;
882
883   /**
884    * Closure for @e cb.
885    */
886   void *cb_cls;
887 };
888
889
890 /**
891  * Destroy the message queue.
892  *
893  * @param mq message queue to destroy
894  */
895 void
896 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
897 {
898   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
899
900   if (NULL != mq->destroy_impl)
901   {
902     mq->destroy_impl (mq, mq->impl_state);
903   }
904   if (NULL != mq->send_task)
905   {
906     GNUNET_SCHEDULER_cancel (mq->send_task);
907     mq->send_task = NULL;
908   }
909   while (NULL != mq->envelope_head)
910   {
911     struct GNUNET_MQ_Envelope *ev;
912
913     ev = mq->envelope_head;
914     ev->parent_queue = NULL;
915     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
916                                  mq->envelope_tail,
917                                  ev);
918     GNUNET_assert (0 < mq->queue_length);
919     mq->queue_length--;
920     GNUNET_MQ_discard (ev);
921   }
922   if (NULL != mq->current_envelope)
923   {
924     /* we can only discard envelopes that
925      * are not queued! */
926     mq->current_envelope->parent_queue = NULL;
927     GNUNET_MQ_discard (mq->current_envelope);
928     mq->current_envelope = NULL;
929     GNUNET_assert (0 < mq->queue_length);
930     mq->queue_length--;
931   }
932   GNUNET_assert (0 == mq->queue_length);
933   while (NULL != (dnh = mq->dnh_head))
934   {
935     dnh->cb (dnh->cb_cls);
936     GNUNET_MQ_destroy_notify_cancel (dnh);
937   }
938   if (NULL != mq->assoc_map)
939   {
940     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
941     mq->assoc_map = NULL;
942   }
943   GNUNET_free_non_null (mq->handlers);
944   GNUNET_free (mq);
945 }
946
947
948 const struct GNUNET_MessageHeader *
949 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
950                               uint16_t base_size)
951 {
952   uint16_t whole_size;
953   uint16_t nested_size;
954   const struct GNUNET_MessageHeader *nested_msg;
955
956   whole_size = ntohs (mh->size);
957   GNUNET_assert (whole_size >= base_size);
958   nested_size = whole_size - base_size;
959   if (0 == nested_size)
960     return NULL;
961   if (nested_size < sizeof (struct GNUNET_MessageHeader))
962   {
963     GNUNET_break_op (0);
964     return NULL;
965   }
966   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
967   if (ntohs (nested_msg->size) != nested_size)
968   {
969     GNUNET_break_op (0);
970     return NULL;
971   }
972   return nested_msg;
973 }
974
975
976 /**
977  * Cancel sending the message. Message must have been sent with
978  * #GNUNET_MQ_send before.  May not be called after the notify sent
979  * callback has been called
980  *
981  * @param ev queued envelope to cancel
982  */
983 void
984 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
985 {
986   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
987
988   GNUNET_assert (NULL != mq);
989   GNUNET_assert (NULL != mq->cancel_impl);
990
991   mq->evacuate_called = GNUNET_NO;
992
993   if (mq->current_envelope == ev)
994   {
995     /* complex case, we already started with transmitting
996        the message using the callbacks. */
997     GNUNET_assert (0 < mq->queue_length);
998     mq->queue_length--;
999     mq->cancel_impl (mq,
1000                      mq->impl_state);
1001     /* continue sending the next message, if any */
1002     mq->current_envelope = mq->envelope_head;
1003     if (NULL != mq->current_envelope)
1004     {
1005       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1006                                    mq->envelope_tail,
1007                                    mq->current_envelope);
1008       mq->send_impl (mq,
1009                      mq->current_envelope->mh,
1010                      mq->impl_state);
1011     }
1012   }
1013   else
1014   {
1015     /* simple case, message is still waiting in the queue */
1016     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1017                                  mq->envelope_tail,
1018                                  ev);
1019     GNUNET_assert (0 < mq->queue_length);
1020     mq->queue_length--;
1021   }
1022
1023   if (GNUNET_YES != mq->evacuate_called)
1024   {
1025     ev->parent_queue = NULL;
1026     ev->mh = NULL;
1027     /* also frees ev */
1028     GNUNET_free (ev);
1029   }
1030 }
1031
1032
1033 /**
1034  * Function to obtain the current envelope
1035  * from within #GNUNET_MQ_SendImpl implementations.
1036  *
1037  * @param mq message queue to interrogate
1038  * @return the current envelope
1039  */
1040 struct GNUNET_MQ_Envelope *
1041 GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
1042 {
1043   return mq->current_envelope;
1044 }
1045
1046
1047 /**
1048  * Function to obtain the last envelope in the queue.
1049  *
1050  * @param mq message queue to interrogate
1051  * @return the last envelope in the queue
1052  */
1053 struct GNUNET_MQ_Envelope *
1054 GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
1055 {
1056   if (NULL != mq->envelope_tail)
1057     return mq->envelope_tail;
1058
1059   return mq->current_envelope;
1060 }
1061
1062
1063 /**
1064  * Set application-specific options for this envelope.
1065  * Overrides the options set for the queue with
1066  * #GNUNET_MQ_set_options() for this message only.
1067  *
1068  * @param env message to set options for
1069  * @param flags flags to use (meaning is queue-specific)
1070  * @param extra additional buffer for further data (also queue-specific)
1071  */
1072 void
1073 GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1074                            uint64_t flags,
1075                            const void *extra)
1076 {
1077   env->flags = flags;
1078   env->extra = extra;
1079   env->have_custom_options = GNUNET_YES;
1080 }
1081
1082
1083 /**
1084  * Get application-specific options for this envelope.
1085  *
1086  * @param env message to set options for
1087  * @param[out] flags set to flags to use (meaning is queue-specific)
1088  * @return extra additional buffer for further data (also queue-specific)
1089  */
1090 const void *
1091 GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env,
1092                            uint64_t *flags)
1093 {
1094   struct GNUNET_MQ_Handle *mq = env->parent_queue;
1095
1096   if (GNUNET_YES == env->have_custom_options)
1097   {
1098     *flags = env->flags;
1099     return env->extra;
1100   }
1101   if (NULL == mq)
1102   {
1103     *flags = 0;
1104     return NULL;
1105   }
1106   *flags = mq->default_flags;
1107   return mq->default_extra;
1108 }
1109
1110
1111 /**
1112  * Set application-specific options for this queue.
1113  *
1114  * @param mq message queue to set options for
1115  * @param flags flags to use (meaning is queue-specific)
1116  * @param extra additional buffer for further data (also queue-specific)
1117  */
1118 void
1119 GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1120                        uint64_t flags,
1121                        const void *extra)
1122 {
1123   mq->default_flags = flags;
1124   mq->default_extra = extra;
1125 }
1126
1127
1128 /**
1129  * Register function to be called whenever @a mq is being
1130  * destroyed.
1131  *
1132  * @param mq message queue to watch
1133  * @param cb function to call on @a mq destruction
1134  * @param cb_cls closure for @a cb
1135  * @return handle for #GNUNET_MQ_destroy_notify_cancel().
1136  */
1137 struct GNUNET_MQ_DestroyNotificationHandle *
1138 GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1139                           GNUNET_SCHEDULER_TaskCallback cb,
1140                           void *cb_cls)
1141 {
1142   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1143
1144   dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
1145   dnh->mq = mq;
1146   dnh->cb = cb;
1147   dnh->cb_cls = cb_cls;
1148   GNUNET_CONTAINER_DLL_insert (mq->dnh_head,
1149                                mq->dnh_tail,
1150                                dnh);
1151   return dnh;
1152 }
1153
1154
1155 /**
1156  * Cancel registration from #GNUNET_MQ_destroy_notify().
1157  *
1158  * @param dnh handle for registration to cancel
1159  */
1160 void
1161 GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh)
1162 {
1163   struct GNUNET_MQ_Handle *mq = dnh->mq;
1164
1165   GNUNET_CONTAINER_DLL_remove (mq->dnh_head,
1166                                mq->dnh_tail,
1167                                dnh);
1168   GNUNET_free (dnh);
1169 }
1170
1171
1172 /**
1173  * Insert @a env into the envelope DLL starting at @a env_head
1174  * Note that @a env must not be in any MQ while this function
1175  * is used with DLLs defined outside of the MQ module.  This
1176  * is just in case some application needs to also manage a
1177  * FIFO of envelopes independent of MQ itself and wants to
1178  * re-use the pointers internal to @a env.  Use with caution.
1179  *
1180  * @param[in|out] env_head of envelope DLL
1181  * @param[in|out] env_tail tail of envelope DLL
1182  * @param[in|out] env element to insert at the tail
1183  */
1184 void
1185 GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
1186                            struct GNUNET_MQ_Envelope **env_tail,
1187                            struct GNUNET_MQ_Envelope *env)
1188 {
1189   GNUNET_CONTAINER_DLL_insert_tail (*env_head,
1190                                     *env_tail,
1191                                     env);
1192 }
1193
1194
1195 /**
1196  * Remove @a env from the envelope DLL starting at @a env_head.
1197  * Note that @a env must not be in any MQ while this function
1198  * is used with DLLs defined outside of the MQ module. This
1199  * is just in case some application needs to also manage a
1200  * FIFO of envelopes independent of MQ itself and wants to
1201  * re-use the pointers internal to @a env.  Use with caution.
1202  *
1203  * @param[in|out] env_head of envelope DLL
1204  * @param[in|out] env_tail tail of envelope DLL
1205  * @param[in|out] env element to remove from the DLL
1206  */
1207 void
1208 GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
1209                       struct GNUNET_MQ_Envelope **env_tail,
1210                       struct GNUNET_MQ_Envelope *env)
1211 {
1212   GNUNET_CONTAINER_DLL_remove (*env_head,
1213                                *env_tail,
1214                                env);
1215 }
1216
1217
1218 /**
1219  * Copy an array of handlers.
1220  *
1221  * Useful if the array has been delared in local memory and needs to be
1222  * persisted for future use.
1223  *
1224  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1225  * @return A newly allocated array of handlers.
1226  *         Needs to be freed with #GNUNET_free.
1227  */
1228 struct GNUNET_MQ_MessageHandler *
1229 GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1230 {
1231   struct GNUNET_MQ_MessageHandler *copy;
1232   unsigned int count;
1233
1234   if (NULL == handlers)
1235     return NULL;
1236
1237   count = GNUNET_MQ_count_handlers (handlers);
1238   copy = GNUNET_new_array (count + 1,
1239                            struct GNUNET_MQ_MessageHandler);
1240   GNUNET_memcpy (copy,
1241                  handlers,
1242                  count * sizeof (struct GNUNET_MQ_MessageHandler));
1243   return copy;
1244 }
1245
1246
1247 /**
1248  * Count the handlers in a handler array.
1249  *
1250  * @param handlers Array of handlers to be counted.
1251  * @return The number of handlers in the array.
1252  */
1253 unsigned int
1254 GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1255 {
1256   unsigned int i;
1257
1258   if (NULL == handlers)
1259     return 0;
1260
1261   for (i=0; NULL != handlers[i].cb; i++) ;
1262
1263   return i;
1264 }
1265
1266
1267
1268 /* end of mq.c */