0ced014ddec81d9f05e0187125a28a3c373e64ce
[oweals/gnunet.git] / src / set / mq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file set/mq.c
24  * @brief general purpose request queue
25  */
26
27 #include "mq.h"
28
29
30 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
31
32 /**
33  * Signature of functions implementing the
34  * sending part of a message queue
35  *
36  * @param q the message queue
37  * @param m the message
38  */
39 typedef void (*SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m);
40
41
42 typedef void (*DestroyImpl) (struct GNUNET_MQ_MessageQueue *q);
43
44
45 /**
46  * Collection of the state necessary to read and write gnunet messages 
47  * to a stream socket. Should be used as closure for stream_data_processor.
48  */
49 struct MessageStreamState
50 {
51   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
52   struct MessageQueue *mq;
53   struct GNUNET_STREAM_Socket *socket;
54   struct GNUNET_STREAM_ReadHandle *rh;
55   struct GNUNET_STREAM_WriteHandle *wh;
56 };
57
58
59 struct ServerClientSocketState
60 {
61   struct GNUNET_SERVER_Client *client;
62   struct GNUNET_SERVER_TransmitHandle* th;
63 };
64
65
66 struct ClientConnectionState
67 {
68   struct GNUNET_CLIENT_Connection *connection;
69   struct GNUNET_CLIENT_TransmitHandle *th;
70 };
71
72
73 struct GNUNET_MQ_MessageQueue
74 {
75   /**
76    * Handlers array, or NULL if the queue should not receive messages
77    */
78   const struct GNUNET_MQ_Handler *handlers;
79
80   /**
81    * Closure for the handler callbacks
82    */
83   void *handlers_cls;
84
85   /**
86    * Actual implementation of message sending,
87    * called when a message is added
88    */
89   SendImpl send_impl;
90
91   /**
92    * Implementation-dependent queue destruction function
93    */
94   DestroyImpl destroy_impl;
95
96   /**
97    * Implementation-specific state
98    */
99   void *impl_state;
100
101   /**
102    * Callback will be called when the message queue is empty
103    */
104   GNUNET_MQ_NotifyCallback empty_cb;
105
106   /**
107    * Closure for empty_cb
108    */
109   void *empty_cls;
110
111   /**
112    * Callback will be called when a read error occurs.
113    */
114   GNUNET_MQ_NotifyCallback read_error_cb;
115
116   /**
117    * Closure for read_error_cb
118    */
119   void *read_error_cls;
120
121   /**
122    * Linked list of messages pending to be sent
123    */
124   struct GNUNET_MQ_Message *msg_head;
125
126   /**
127    * Linked list of messages pending to be sent
128    */
129   struct GNUNET_MQ_Message *msg_tail;
130
131   /**
132    * Message that is currently scheduled to be
133    * sent. Not the head of the message queue, as the implementation
134    * needs to know if sending has been already scheduled or not.
135    */
136   struct GNUNET_MQ_Message *current_msg;
137
138   /**
139    * Map of associations, lazily allocated
140    */
141   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
142
143   /**
144    * Next id that should be used for the assoc_map,
145    * initialized lazily to a random value together with
146    * assoc_map
147    */
148   uint32_t assoc_id;
149 };
150
151
152 struct GNUNET_MQ_Message
153 {
154   /**
155    * Messages are stored in a linked list
156    */
157   struct GNUNET_MQ_Message *next;
158
159   /**
160    * Messages are stored in a linked list
161    */
162   struct GNUNET_MQ_Message *prev;
163
164   /**
165    * Actual allocated message header,
166    * usually points to the end of the containing GNUNET_MQ_Message
167    */
168   struct GNUNET_MessageHeader *mh;
169
170   /**
171    * Queue the message is queued in, NULL if message is not queued.
172    */
173   struct GNUNET_MQ_MessageQueue *parent_queue;
174
175   /**
176    * Called after the message was sent irrevokably
177    */
178   GNUNET_MQ_NotifyCallback sent_cb;
179
180   /**
181    * Closure for send_cb
182    */
183   void *sent_cls;
184 };
185
186
187 /**
188  * Call the right callback for a message received
189  * by a queue
190  */
191 static void
192 dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
193 {
194   const struct GNUNET_MQ_Handler *handler;
195   int handled = GNUNET_NO;
196
197   handler = mq->handlers;
198   if (NULL == handler)
199     return;
200   for (; NULL != handler->cb; handler++)
201   {
202     if (handler->type == ntohs (mh->type))
203     {
204       handler->cb (mq->handlers_cls, mh);
205       handled = GNUNET_YES;
206     }
207   }
208   
209   if (GNUNET_NO == handled)
210     LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
211 }
212
213
214 void
215 GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
216 {
217   GNUNET_assert (NULL == mqm->parent_queue);
218   GNUNET_free (mqm);
219 }
220
221
222 /**
223  * Send a message with the give message queue.
224  * May only be called once per message.
225  * 
226  * @param mq message queue
227  * @param mqm the message to send.
228  */
229 void
230 GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
231 {
232   GNUNET_assert (NULL != mq);
233   mq->send_impl (mq, mqm);
234 }
235
236
237 struct GNUNET_MQ_Message *
238 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
239 {
240   struct GNUNET_MQ_Message *mqm;
241
242   mqm = GNUNET_malloc (sizeof *mqm + size);
243   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
244   mqm->mh->size = htons (size);
245   mqm->mh->type = htons (type);
246   if (NULL != mhp)
247     *mhp = mqm->mh;
248   return mqm;
249 }
250
251
252 int
253 GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
254                  const void *data, uint16_t len)
255 {
256   size_t new_size;
257   size_t old_size;
258
259   GNUNET_assert (NULL != mqmp);
260   /* there's no data to append => do nothing */
261   if (NULL == data)
262     return GNUNET_OK;
263   old_size = ntohs ((*mqmp)->mh->size);
264   /* message too large to concatenate? */
265   if (((uint16_t) (old_size + len)) < len)
266     return GNUNET_SYSERR;
267   new_size = old_size + len;
268   *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
269   (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1];
270   memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size);
271   (*mqmp)->mh->size = htons (new_size);
272   return GNUNET_OK;
273 }
274
275
276 /**
277  * Functions of this signature are called whenever writing operations
278  * on a stream are executed
279  *
280  * @param cls the closure from GNUNET_STREAM_write
281  * @param status the status of the stream at the time this function is called;
282  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
283  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
284  *          (this doesn't mean that the data is never sent, the receiver may
285  *          have read the data but its ACKs may have been lost);
286  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
287  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
288  *          be processed.
289  * @param size the number of bytes written
290  */
291 static void 
292 stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
293 {
294   struct GNUNET_MQ_MessageQueue *mq = cls;
295   struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
296   struct GNUNET_MQ_Message *mqm;
297
298   GNUNET_assert (GNUNET_STREAM_OK == status);
299   
300   /* call cb for message we finished sending */
301   mqm = mq->current_msg;
302   GNUNET_assert (NULL != mq->current_msg);
303   if (NULL != mqm->sent_cb)
304     mqm->sent_cb (mqm->sent_cls);
305   GNUNET_free (mqm);
306
307   mss->wh = NULL;
308
309   mqm = mq->msg_head;
310   mq->current_msg = mqm;
311   if (NULL == mqm)
312   {
313     if (NULL != mq->empty_cb)
314       mq->empty_cb (mq->empty_cls);
315     return;
316   }
317   GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
318   mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
319                                  GNUNET_TIME_UNIT_FOREVER_REL,
320                                  stream_write_queued, mq);
321   GNUNET_assert (NULL != mss->wh);
322 }
323
324
325 static void
326 stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq,
327                          struct GNUNET_MQ_Message *mqm)
328 {
329   struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
330   if (NULL != mq->current_msg)
331   {
332     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
333     return;
334   }
335   mq->current_msg = mqm;
336   mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
337                                  GNUNET_TIME_UNIT_FOREVER_REL,
338                                  stream_write_queued, mq);
339 }
340
341
342 /**
343  * Functions with this signature are called whenever a
344  * complete message is received by the tokenizer.
345  *
346  * Do not call GNUNET_SERVER_mst_destroy in callback
347  *
348  * @param cls closure
349  * @param client identification of the client
350  * @param message the actual message
351  *
352  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
353  */
354 static int
355 stream_mst_callback (void *cls, void *client,
356                      const struct GNUNET_MessageHeader *message)
357 {
358   struct GNUNET_MQ_MessageQueue *mq = cls;
359
360   GNUNET_assert (NULL != message);
361   dispatch_message (mq, message);
362   return GNUNET_OK;
363 }
364
365
366 /**
367  * Functions of this signature are called whenever data is available from the
368  * stream.
369  *
370  * @param cls the closure from GNUNET_STREAM_read
371  * @param status the status of the stream at the time this function is called
372  * @param data traffic from the other side
373  * @param size the number of bytes available in data read; will be 0 on timeout 
374  * @return number of bytes of processed from 'data' (any data remaining should be
375  *         given to the next time the read processor is called).
376  */
377 static size_t
378 stream_data_processor (void *cls,
379                        enum GNUNET_STREAM_Status status,
380                        const void *data,
381                        size_t size)
382 {
383   struct GNUNET_MQ_MessageQueue *mq = cls;
384   struct MessageStreamState *mss;
385   int ret;
386   
387   mss = (struct MessageStreamState *) mq->impl_state;
388   GNUNET_assert (GNUNET_STREAM_OK == status);
389   ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
390   GNUNET_assert (GNUNET_OK == ret);
391   /* we always read all data */
392     mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, 
393                                   stream_data_processor, mq);
394   return size;
395 }
396
397
398 static void
399 stream_socket_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
400 {
401   struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
402
403   if (NULL != mss->rh)
404   {
405     GNUNET_STREAM_read_cancel (mss->rh);
406     mss->rh = NULL;
407   }
408
409   if (NULL != mss->wh)
410   {
411     GNUNET_STREAM_write_cancel (mss->wh);
412     mss->wh = NULL;
413   }
414
415   if (NULL != mss->mst)
416   {
417     GNUNET_SERVER_mst_destroy (mss->mst);
418     mss->mst = NULL;
419   }
420
421   GNUNET_free (mss);
422 }
423
424
425
426
427 struct GNUNET_MQ_MessageQueue *
428 GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
429                                    const struct GNUNET_MQ_Handler *handlers,
430                                    void *cls)
431 {
432   struct GNUNET_MQ_MessageQueue *mq;
433   struct MessageStreamState *mss;
434
435   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
436   mss = GNUNET_new (struct MessageStreamState);
437   mss->socket = socket;
438   mq->impl_state = mss;
439   mq->send_impl = stream_socket_send_impl;
440   mq->destroy_impl = &stream_socket_destroy_impl;
441   mq->handlers = handlers;
442   mq->handlers_cls = cls;
443   if (NULL != handlers)
444   {
445     mss->mst = GNUNET_SERVER_mst_create (stream_mst_callback, mq);
446     mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 
447                                   stream_data_processor, mq);
448   }
449   return mq;
450 }
451
452
453 /*** Transmit a queued message to the session's client.
454  *
455  * @param cls consensus session
456  * @param size number of bytes available in buf
457  * @param buf where the callee should write the message
458  * @return number of bytes written to buf
459  */
460 static size_t
461 transmit_queued (void *cls, size_t size,
462                  void *buf)
463 {
464   struct GNUNET_MQ_MessageQueue *mq = cls;
465   struct GNUNET_MQ_Message *mqm = mq->current_msg;
466   struct ServerClientSocketState *state = mq->impl_state;
467   size_t msg_size;
468
469   GNUNET_assert (NULL != buf);
470
471   if (NULL != mqm->sent_cb)
472   {
473     mqm->sent_cb (mqm->sent_cls);
474   }
475
476   mq->current_msg = NULL;
477   GNUNET_assert (NULL != mqm);
478   msg_size = ntohs (mqm->mh->size);
479   GNUNET_assert (size >= msg_size);
480   memcpy (buf, mqm->mh, msg_size);
481   GNUNET_free (mqm);
482   state->th = NULL;
483
484   if (NULL != mq->msg_head)
485   {
486     mq->current_msg = mq->msg_head;
487     GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
488     state->th = 
489         GNUNET_SERVER_notify_transmit_ready (state->client, msg_size, 
490                                              GNUNET_TIME_UNIT_FOREVER_REL,
491                                              &transmit_queued, mq);
492   }
493   else if (NULL != mq->empty_cb)
494     mq->empty_cb (mq->empty_cls);
495   return msg_size;
496 }
497
498
499
500 static void
501 server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
502 {
503   struct ServerClientSocketState *state;
504   
505   GNUNET_assert (NULL != mq);
506   state = mq->impl_state;
507   GNUNET_assert (NULL != state);
508   GNUNET_SERVER_client_drop (state->client);
509   GNUNET_free (state);
510 }
511
512 static void
513 server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
514 {
515   struct ServerClientSocketState *state;
516   int msize;
517
518   GNUNET_assert (NULL != mq);
519   state = mq->impl_state;
520   GNUNET_assert (NULL != state);
521
522   if (NULL != state->th)
523   {
524     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
525     return;
526   }
527   GNUNET_assert (NULL == mq->msg_head);
528   GNUNET_assert (NULL == mq->current_msg);
529   msize = ntohs (mqm->mh->size);
530   mq->current_msg = mqm;
531   state->th = 
532       GNUNET_SERVER_notify_transmit_ready (state->client, msize, 
533                                            GNUNET_TIME_UNIT_FOREVER_REL,
534                                            &transmit_queued, mq);
535 }
536
537
538 struct GNUNET_MQ_MessageQueue *
539 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
540 {
541   struct GNUNET_MQ_MessageQueue *mq;
542   struct ServerClientSocketState *scss;
543
544   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
545   scss = GNUNET_new (struct ServerClientSocketState);
546   mq->impl_state = scss;
547   scss->client = client;
548   GNUNET_SERVER_client_keep (client);
549   mq->send_impl = server_client_send_impl;
550   mq->destroy_impl = server_client_destroy_impl;
551   return mq;
552 }
553
554
555 /**
556  * Transmit a queued message to the session's client.
557  *
558  * @param cls consensus session
559  * @param size number of bytes available in buf
560  * @param buf where the callee should write the message
561  * @return number of bytes written to buf
562  */
563 static size_t
564 connection_client_transmit_queued (void *cls, size_t size,
565                  void *buf)
566 {
567   struct GNUNET_MQ_MessageQueue *mq = cls;
568   struct GNUNET_MQ_Message *mqm = mq->current_msg;
569   struct ClientConnectionState *state = mq->impl_state;
570   size_t msg_size;
571
572
573   GNUNET_assert (NULL != mqm);
574
575   if (NULL != mqm->sent_cb)
576   {
577     mqm->sent_cb (mqm->sent_cls);
578   }
579
580   mq->current_msg = NULL;
581   GNUNET_assert (NULL != buf);
582   msg_size = ntohs (mqm->mh->size);
583   GNUNET_assert (size >= msg_size);
584   memcpy (buf, mqm->mh, msg_size);
585   GNUNET_free (mqm);
586   state->th = NULL;
587   if (NULL != mq->msg_head)
588   {
589     mq->current_msg = mq->msg_head;
590     GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
591     state->th = 
592       GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size), 
593                                              GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
594                                              &connection_client_transmit_queued, mq);
595   }
596   else if (NULL != mq->empty_cb)
597     mq->empty_cb (mq->empty_cls);
598   return msg_size;
599 }
600
601
602
603 static void
604 connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
605 {
606   GNUNET_free (mq->impl_state);
607 }
608
609 static void
610 connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
611                              struct GNUNET_MQ_Message *mqm)
612 {
613   struct ClientConnectionState *state = mq->impl_state;
614   int msize;
615
616   GNUNET_assert (NULL != state);
617
618   if (NULL != state->th)
619   {
620     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
621     return;
622   }
623   GNUNET_assert (NULL == mq->current_msg);
624   mq->current_msg = mqm;
625   msize = ntohs (mqm->mh->size);
626   state->th = 
627       GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, 
628                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
629                                            &connection_client_transmit_queued, mq);
630 }
631
632
633
634 /**
635  * Type of a function to call when we receive a message
636  * from the service.
637  *
638  * @param cls closure
639  * @param msg message received, NULL on timeout or fatal error
640  */
641 static void
642 handle_client_message (void *cls,
643                        const struct GNUNET_MessageHeader *msg)
644 {
645   struct GNUNET_MQ_MessageQueue *mq = cls;
646   struct ClientConnectionState *state;
647
648   state = mq->impl_state;
649   
650   if (NULL == msg)
651   {
652     if (NULL == mq->read_error_cb)
653       LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
654     mq->read_error_cb (mq->read_error_cls);
655     return;
656   }
657
658   GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
659                          GNUNET_TIME_UNIT_FOREVER_REL);
660
661   dispatch_message (mq, msg);
662 }
663
664
665 struct GNUNET_MQ_MessageQueue *
666 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
667                                        const struct GNUNET_MQ_Handler *handlers,
668                                        void *cls)
669 {
670   struct GNUNET_MQ_MessageQueue *mq;
671   struct ClientConnectionState *state;
672
673   GNUNET_assert (NULL != connection);
674
675   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
676   mq->handlers = handlers;
677   mq->handlers_cls = cls;
678   state = GNUNET_new (struct ClientConnectionState);
679   state->connection = connection;
680   mq->impl_state = state;
681   mq->send_impl = connection_client_send_impl;
682   mq->destroy_impl = connection_client_destroy_impl;
683
684   if (NULL != handlers)
685   {
686     GNUNET_CLIENT_receive (connection, handle_client_message, mq,
687                            GNUNET_TIME_UNIT_FOREVER_REL);
688   }
689
690   return mq;
691 }
692
693
694 void
695 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
696                             const struct GNUNET_MQ_Handler *new_handlers,
697                             void *cls)
698 {
699   mq->handlers = new_handlers;
700   mq->handlers_cls = cls;
701 }
702
703
704 /**
705  * Associate the assoc_data in mq with a unique request id.
706  *
707  * @param mq message queue, id will be unique for the queue
708  * @param mqm message to associate
709  * @param assoc_data to associate
710  */
711 uint32_t
712 GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
713                      struct GNUNET_MQ_Message *mqm,
714                      void *assoc_data)
715 {
716   uint32_t id;
717
718   if (NULL == mq->assoc_map)
719   {
720     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
721     mq->assoc_id = 1;
722   }
723   id = mq->assoc_id++;
724   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
725                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
726   return id;
727 }
728
729
730
731 void *
732 GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
733 {
734   if (NULL == mq->assoc_map)
735     return NULL;
736   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
737 }
738
739
740 void *
741 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
742 {
743   void *val;
744
745   if (NULL == mq->assoc_map)
746     return NULL;
747   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
748   GNUNET_assert (NULL != val);
749   GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
750   return val;
751 }
752
753
754 void
755 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
756                        GNUNET_MQ_NotifyCallback cb,
757                        void *cls)
758 {
759   mqm->sent_cb = cb;
760   mqm->sent_cls = cls;
761 }
762
763
764 void
765 GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
766 {
767   /* FIXME: destroy all pending messages in the queue */
768
769   if (NULL != mq->destroy_impl)
770   {
771     mq->destroy_impl (mq);
772   }
773
774   GNUNET_free (mq);
775 }
776
777
778 /**
779  * Call a callback once all messages queued have been sent,
780  * i.e. the message queue is empty.
781  *
782  * @param mqm the message queue to send the notification for
783  * @param cb the callback to call on an empty queue
784  * @param cls closure for cb
785  */
786 void
787 GNUNET_MQ_notify_empty (struct GNUNET_MQ_MessageQueue *mqm,
788                         GNUNET_MQ_NotifyCallback cb,
789                         void *cls)
790 {
791   mqm->empty_cb = cb;
792   mqm->empty_cls = cls;
793 }