Merge branch 'master' of gnunet.org:gnunet
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @author Florian Dold
21  * @file util/mq.c
22  * @brief general purpose request queue
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26
27 #define LOG(kind,...) GNUNET_log_from (kind, "util-mq",__VA_ARGS__)
28
29
30 struct GNUNET_MQ_Envelope
31 {
32   /**
33    * Messages are stored in a linked list.
34    * Each queue has its own list of envelopes.
35    */
36   struct GNUNET_MQ_Envelope *next;
37
38   /**
39    * Messages are stored in a linked list
40    * Each queue has its own list of envelopes.
41    */
42   struct GNUNET_MQ_Envelope *prev;
43
44   /**
45    * Actual allocated message header.
46    * The GNUNET_MQ_Envelope header is allocated at
47    * the end of the message.
48    */
49   struct GNUNET_MessageHeader *mh;
50
51   /**
52    * Queue the message is queued in, NULL if message is not queued.
53    */
54   struct GNUNET_MQ_Handle *parent_queue;
55
56   /**
57    * Called after the message was sent irrevocably.
58    */
59   GNUNET_SCHEDULER_TaskCallback sent_cb;
60
61   /**
62    * Closure for @e send_cb
63    */
64   void *sent_cls;
65
66   /**
67    * Flags that were set for this envelope by
68    * #GNUNET_MQ_env_set_options().   Only valid if
69    * @e have_custom_options is set.
70    */
71   uint64_t flags;
72
73   /**
74    * Additional options buffer set for this envelope by
75    * #GNUNET_MQ_env_set_options().  Only valid if
76    * @e have_custom_options is set.
77    */
78   const void *extra;
79
80   /**
81    * Did the application call #GNUNET_MQ_env_set_options()?
82    */
83   int have_custom_options;
84 };
85
86
87 /**
88  * Handle to a message queue.
89  */
90 struct GNUNET_MQ_Handle
91 {
92   /**
93    * Handlers array, or NULL if the queue should not receive messages
94    */
95   struct GNUNET_MQ_MessageHandler *handlers;
96
97   /**
98    * Actual implementation of message sending,
99    * called when a message is added
100    */
101   GNUNET_MQ_SendImpl send_impl;
102
103   /**
104    * Implementation-dependent queue destruction function
105    */
106   GNUNET_MQ_DestroyImpl destroy_impl;
107
108   /**
109    * Implementation-dependent send cancel function
110    */
111   GNUNET_MQ_CancelImpl cancel_impl;
112
113   /**
114    * Implementation-specific state
115    */
116   void *impl_state;
117
118   /**
119    * Callback will be called when an error occurs.
120    */
121   GNUNET_MQ_ErrorHandler error_handler;
122
123   /**
124    * Closure for the error handler.
125    */
126   void *error_handler_cls;
127
128   /**
129    * Task to asynchronously run #impl_send_continue().
130    */
131   struct GNUNET_SCHEDULER_Task *send_task;
132
133   /**
134    * Linked list of messages pending to be sent
135    */
136   struct GNUNET_MQ_Envelope *envelope_head;
137
138   /**
139    * Linked list of messages pending to be sent
140    */
141   struct GNUNET_MQ_Envelope *envelope_tail;
142
143   /**
144    * Message that is currently scheduled to be
145    * sent. Not the head of the message queue, as the implementation
146    * needs to know if sending has been already scheduled or not.
147    */
148   struct GNUNET_MQ_Envelope *current_envelope;
149
150   /**
151    * Map of associations, lazily allocated
152    */
153   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
154
155   /**
156    * Functions to call on queue destruction; kept in a DLL.
157    */
158   struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
159
160   /**
161    * Functions to call on queue destruction; kept in a DLL.
162    */
163   struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
164
165   /**
166    * Additional options buffer set for this queue by
167    * #GNUNET_MQ_set_options().  Default is 0.
168    */
169   const void *default_extra;
170
171   /**
172    * Flags that were set for this queue by
173    * #GNUNET_MQ_set_options().   Default is 0.
174    */
175   uint64_t default_flags;
176
177   /**
178    * Next id that should be used for the @e assoc_map,
179    * initialized lazily to a random value together with
180    * @e assoc_map
181    */
182   uint32_t assoc_id;
183
184   /**
185    * Number of entries we have in the envelope-DLL.
186    */
187   unsigned int queue_length;
188
189   /**
190    * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
191    * FIXME: is this dead?
192    */
193   int evacuate_called;
194
195   /**
196    * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
197    */
198   int in_flight;
199 };
200
201
202 /**
203  * Call the message message handler that was registered
204  * for the type of the given message in the given message queue.
205  *
206  * This function is indended to be used for the implementation
207  * of message queues.
208  *
209  * @param mq message queue with the handlers
210  * @param mh message to dispatch
211  */
212 void
213 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
214                           const struct GNUNET_MessageHeader *mh)
215 {
216   const struct GNUNET_MQ_MessageHandler *handler;
217   int handled = GNUNET_NO;
218   uint16_t msize = ntohs (mh->size);
219   uint16_t mtype = ntohs (mh->type);
220
221   LOG (GNUNET_ERROR_TYPE_DEBUG,
222        "Received message of type %u and size %u\n",
223        mtype, msize);
224
225   if (NULL == mq->handlers)
226     goto done;
227   for (handler = mq->handlers; NULL != handler->cb; handler++)
228   {
229     if (handler->type == mtype)
230     {
231       handled = GNUNET_YES;
232       if ( (handler->expected_size > msize) ||
233            ( (handler->expected_size != msize) &&
234              (NULL == handler->mv) ) )
235       {
236         /* Too small, or not an exact size and
237            no 'mv' handler to check rest */
238         LOG (GNUNET_ERROR_TYPE_ERROR,
239              "Received malformed message of type %u\n",
240              (unsigned int) handler->type);
241         GNUNET_MQ_inject_error (mq,
242                                 GNUNET_MQ_ERROR_MALFORMED);
243         break;
244       }
245       if ( (NULL == handler->mv) ||
246            (GNUNET_OK ==
247             handler->mv (handler->cls, mh)) )
248       {
249         /* message well-formed, pass to handler */
250         handler->cb (handler->cls, mh);
251       }
252       else
253       {
254         /* Message rejected by check routine */
255         LOG (GNUNET_ERROR_TYPE_ERROR,
256              "Received malformed message of type %u\n",
257              (unsigned int) handler->type);
258         GNUNET_MQ_inject_error (mq,
259                                 GNUNET_MQ_ERROR_MALFORMED);
260       }
261       break;
262     }
263   }
264  done:
265   if (GNUNET_NO == handled)
266     LOG (GNUNET_ERROR_TYPE_INFO,
267          "No handler for message of type %u and size %u\n",
268          mtype, msize);
269 }
270
271
272 /**
273  * Call the error handler of a message queue with the given
274  * error code.  If there is no error handler, log a warning.
275  *
276  * This function is intended to be used by the implementation
277  * of message queues.
278  *
279  * @param mq message queue
280  * @param error the error type
281  */
282 void
283 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
284                         enum GNUNET_MQ_Error error)
285 {
286   if (NULL == mq->error_handler)
287   {
288     LOG (GNUNET_ERROR_TYPE_WARNING,
289          "Got error %d, but no handler installed\n",
290          (int) error);
291     return;
292   }
293   mq->error_handler (mq->error_handler_cls,
294                      error);
295 }
296
297
298 /**
299  * Discard the message queue message, free all
300  * allocated resources. Must be called in the event
301  * that a message is created but should not actually be sent.
302  *
303  * @param mqm the message to discard
304  */
305 void
306 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
307 {
308   GNUNET_assert (NULL == ev->parent_queue);
309   GNUNET_free (ev);
310 }
311
312
313 /**
314  * Obtain the current length of the message queue.
315  *
316  * @param mq queue to inspect
317  * @return number of queued, non-transmitted messages
318  */
319 unsigned int
320 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
321 {
322   if (GNUNET_YES != mq->in_flight)
323   {
324     return mq->queue_length;
325   }
326   return mq->queue_length - 1;
327 }
328
329
330 /**
331  * Send a message with the given message queue.
332  * May only be called once per message.
333  *
334  * @param mq message queue
335  * @param ev the envelope with the message to send.
336  */
337 void
338 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
339                 struct GNUNET_MQ_Envelope *ev)
340 {
341   GNUNET_assert (NULL != mq);
342   GNUNET_assert (NULL == ev->parent_queue);
343
344   mq->queue_length++;
345   GNUNET_break (mq->queue_length < 10000); /* This would seem like a bug... */
346   ev->parent_queue = mq;
347   /* is the implementation busy? queue it! */
348   if ( (NULL != mq->current_envelope) ||
349        (NULL != mq->send_task) )
350   {
351     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
352                                       mq->envelope_tail,
353                                       ev);
354     return;
355   }
356   GNUNET_assert (NULL == mq->envelope_head);
357   mq->current_envelope = ev;
358
359   LOG (GNUNET_ERROR_TYPE_DEBUG,
360        "sending message of type %u, queue empty (MQ: %p)\n",
361        ntohs(ev->mh->type),
362        mq);
363
364   mq->send_impl (mq,
365                  ev->mh,
366                  mq->impl_state);
367 }
368
369
370 /**
371  * Remove the first envelope that has not yet been sent from the message
372  * queue and return it.
373  *
374  * @param mq queue to remove envelope from
375  * @return NULL if queue is empty (or has no envelope that is not under transmission)
376  */
377 struct GNUNET_MQ_Envelope *
378 GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
379 {
380   struct GNUNET_MQ_Envelope *env;
381
382   env = mq->envelope_head;
383   GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
384                                mq->envelope_tail,
385                                env);
386   mq->queue_length--;
387   env->parent_queue = NULL;
388   return env;
389 }
390
391
392 /**
393  * Function to copy an envelope.  The envelope must not yet
394  * be in any queue or have any options or callbacks set.
395  *
396  * @param env envelope to copy
397  * @return copy of @a env
398  */
399 struct GNUNET_MQ_Envelope *
400 GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
401 {
402   GNUNET_assert (NULL == env->next);
403   GNUNET_assert (NULL == env->parent_queue);
404   GNUNET_assert (NULL == env->sent_cb);
405   GNUNET_assert (GNUNET_NO == env->have_custom_options);
406   return GNUNET_MQ_msg_copy (env->mh);
407 }
408
409
410 /**
411  * Send a copy of a message with the given message queue.
412  * Can be called repeatedly on the same envelope.
413  *
414  * @param mq message queue
415  * @param ev the envelope with the message to send.
416  */
417 void
418 GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
419                      const struct GNUNET_MQ_Envelope *ev)
420 {
421   struct GNUNET_MQ_Envelope *env;
422   uint16_t msize;
423
424   msize = ntohs (ev->mh->size);
425   env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) +
426                        msize);
427   env->mh = (struct GNUNET_MessageHeader *) &env[1];
428   env->sent_cb = ev->sent_cb;
429   env->sent_cls = ev->sent_cls;
430   GNUNET_memcpy (&env[1],
431           ev->mh,
432           msize);
433   GNUNET_MQ_send (mq,
434                   env);
435 }
436
437
438 /**
439  * Task run to call the send implementation for the next queued
440  * message, if any.  Only useful for implementing message queues,
441  * results in undefined behavior if not used carefully.
442  *
443  * @param cls message queue to send the next message with
444  */
445 static void
446 impl_send_continue (void *cls)
447 {
448   struct GNUNET_MQ_Handle *mq = cls;
449
450   mq->send_task = NULL;
451   /* call is only valid if we're actually currently sending
452    * a message */
453   if (NULL == mq->envelope_head)
454     return;
455   mq->current_envelope = mq->envelope_head;
456   GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
457                                mq->envelope_tail,
458                                mq->current_envelope);
459
460   LOG (GNUNET_ERROR_TYPE_DEBUG,
461        "sending message of type %u from queue\n",
462        ntohs(mq->current_envelope->mh->type));
463
464   mq->send_impl (mq,
465                  mq->current_envelope->mh,
466                  mq->impl_state);
467 }
468
469
470 /**
471  * Call the send implementation for the next queued message, if any.
472  * Only useful for implementing message queues, results in undefined
473  * behavior if not used carefully.
474  *
475  * @param mq message queue to send the next message with
476  */
477 void
478 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
479 {
480   struct GNUNET_MQ_Envelope *current_envelope;
481   GNUNET_SCHEDULER_TaskCallback cb;
482
483   GNUNET_assert (0 < mq->queue_length);
484   mq->queue_length--;
485   mq->in_flight = GNUNET_NO;
486   current_envelope = mq->current_envelope;
487   current_envelope->parent_queue = NULL;
488   mq->current_envelope = NULL;
489   GNUNET_assert (NULL == mq->send_task);
490   mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
491                                             mq);
492   if (NULL != (cb = current_envelope->sent_cb))
493   {
494     current_envelope->sent_cb = NULL;
495     cb (current_envelope->sent_cls);
496   }
497   GNUNET_free (current_envelope);
498 }
499
500
501 /**
502  * Call the send notification for the current message, but do not
503  * try to send the next message until #GNUNET_MQ_impl_send_continue
504  * is called.
505  *
506  * Only useful for implementing message queues, results in undefined
507  * behavior if not used carefully.
508  *
509  * @param mq message queue to send the next message with
510  */
511 void
512 GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
513 {
514   struct GNUNET_MQ_Envelope *current_envelope;
515   GNUNET_SCHEDULER_TaskCallback cb;
516
517   mq->in_flight = GNUNET_YES;
518   /* call is only valid if we're actually currently sending
519    * a message */
520   current_envelope = mq->current_envelope;
521   GNUNET_assert (NULL != current_envelope);
522   /* can't call cancel from now on anymore */
523   current_envelope->parent_queue = NULL;
524   if (NULL != (cb = current_envelope->sent_cb))
525   {
526     current_envelope->sent_cb = NULL;
527     cb (current_envelope->sent_cls);
528   }
529 }
530
531
532 /**
533  * Create a message queue for the specified handlers.
534  *
535  * @param send function the implements sending messages
536  * @param destroy function that implements destroying the queue
537  * @param cancel function that implements canceling a message
538  * @param impl_state for the queue, passed to 'send' and 'destroy'
539  * @param handlers array of message handlers
540  * @param error_handler handler for read and write errors
541  * @param error_handler_cls closure for @a error_handler
542  * @return a new message queue
543  */
544 struct GNUNET_MQ_Handle *
545 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
546                                GNUNET_MQ_DestroyImpl destroy,
547                                GNUNET_MQ_CancelImpl cancel,
548                                void *impl_state,
549                                const struct GNUNET_MQ_MessageHandler *handlers,
550                                GNUNET_MQ_ErrorHandler error_handler,
551                                void *error_handler_cls)
552 {
553   struct GNUNET_MQ_Handle *mq;
554
555   mq = GNUNET_new (struct GNUNET_MQ_Handle);
556   mq->send_impl = send;
557   mq->destroy_impl = destroy;
558   mq->cancel_impl = cancel;
559   mq->handlers = GNUNET_MQ_copy_handlers (handlers);
560   mq->error_handler = error_handler;
561   mq->error_handler_cls = error_handler_cls;
562   mq->impl_state = impl_state;
563
564   return mq;
565 }
566
567
568 /**
569  * Change the closure argument in all of the `handlers` of the
570  * @a mq.
571  *
572  * @param mq to modify
573  * @param handlers_cls new closure to use
574  */
575 void
576 GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
577                                 void *handlers_cls)
578 {
579   if (NULL == mq->handlers)
580     return;
581   for (unsigned int i=0;NULL != mq->handlers[i].cb; i++)
582     mq->handlers[i].cls = handlers_cls;
583 }
584
585
586 /**
587  * Get the message that should currently be sent.
588  * Fails if there is no current message.
589  * Only useful for implementing message queues,
590  * results in undefined behavior if not used carefully.
591  *
592  * @param mq message queue with the current message
593  * @return message to send, never NULL
594  */
595 const struct GNUNET_MessageHeader *
596 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
597 {
598   GNUNET_assert (NULL != mq->current_envelope);
599   GNUNET_assert (NULL != mq->current_envelope->mh);
600   return mq->current_envelope->mh;
601 }
602
603
604 /**
605  * Get the implementation state associated with the
606  * message queue.
607  *
608  * While the GNUNET_MQ_Impl* callbacks receive the
609  * implementation state, continuations that are scheduled
610  * by the implementation function often only have one closure
611  * argument, with this function it is possible to get at the
612  * implementation state when only passing the GNUNET_MQ_Handle
613  * as closure.
614  *
615  * @param mq message queue with the current message
616  * @return message to send, never NULL
617  */
618 void *
619 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
620 {
621   return mq->impl_state;
622 }
623
624
625 struct GNUNET_MQ_Envelope *
626 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
627                 uint16_t size,
628                 uint16_t type)
629 {
630   struct GNUNET_MQ_Envelope *ev;
631
632   ev = GNUNET_malloc (size + sizeof (struct GNUNET_MQ_Envelope));
633   ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
634   ev->mh->size = htons (size);
635   ev->mh->type = htons (type);
636   if (NULL != mhp)
637     *mhp = ev->mh;
638   return ev;
639 }
640
641
642 /**
643  * Create a new envelope by copying an existing message.
644  *
645  * @param hdr header of the message to copy
646  * @return envelope containing @a hdr
647  */
648 struct GNUNET_MQ_Envelope *
649 GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
650 {
651   struct GNUNET_MQ_Envelope *mqm;
652   uint16_t size = ntohs (hdr->size);
653
654   mqm = GNUNET_malloc (sizeof (*mqm) + size);
655   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
656   GNUNET_memcpy (mqm->mh,
657           hdr,
658           size);
659   return mqm;
660 }
661
662
663 /**
664  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
665  *
666  * @param mhp pointer to the message header pointer that will be changed to allocate at
667  *        the newly allocated space for the message.
668  * @param base_size size of the data before the nested message
669  * @param type type of the message in the envelope
670  * @param nested_mh the message to append to the message after base_size
671  */
672 struct GNUNET_MQ_Envelope *
673 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
674                           uint16_t base_size,
675                           uint16_t type,
676                           const struct GNUNET_MessageHeader *nested_mh)
677 {
678   struct GNUNET_MQ_Envelope *mqm;
679   uint16_t size;
680
681   if (NULL == nested_mh)
682     return GNUNET_MQ_msg_ (mhp, base_size, type);
683
684   size = base_size + ntohs (nested_mh->size);
685
686   /* check for uint16_t overflow */
687   if (size < base_size)
688     return NULL;
689
690   mqm = GNUNET_MQ_msg_ (mhp, size, type);
691   GNUNET_memcpy ((char *) mqm->mh + base_size,
692                  nested_mh,
693                  ntohs (nested_mh->size));
694
695   return mqm;
696 }
697
698
699 /**
700  * Associate the assoc_data in mq with a unique request id.
701  *
702  * @param mq message queue, id will be unique for the queue
703  * @param assoc_data to associate
704  */
705 uint32_t
706 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
707                      void *assoc_data)
708 {
709   uint32_t id;
710
711   if (NULL == mq->assoc_map)
712   {
713     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
714     mq->assoc_id = 1;
715   }
716   id = mq->assoc_id++;
717   GNUNET_assert (GNUNET_OK ==
718                  GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map,
719                                                       id,
720                                                       assoc_data,
721                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
722   return id;
723 }
724
725
726 /**
727  * Get the data associated with a @a request_id in a queue
728  *
729  * @param mq the message queue with the association
730  * @param request_id the request id we are interested in
731  * @return the associated data
732  */
733 void *
734 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
735                      uint32_t request_id)
736 {
737   if (NULL == mq->assoc_map)
738     return NULL;
739   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
740                                               request_id);
741 }
742
743
744 /**
745  * Remove the association for a @a request_id
746  *
747  * @param mq the message queue with the association
748  * @param request_id the request id we want to remove
749  * @return the associated data
750  */
751 void *
752 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
753                         uint32_t request_id)
754 {
755   void *val;
756
757   if (NULL == mq->assoc_map)
758     return NULL;
759   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
760                                              request_id);
761   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
762                                               request_id);
763   return val;
764 }
765
766
767 /**
768  * Call a callback once the envelope has been sent, that is,
769  * sending it can not be canceled anymore.
770  * There can be only one notify sent callback per envelope.
771  *
772  * @param ev message to call the notify callback for
773  * @param cb the notify callback
774  * @param cb_cls closure for the callback
775  */
776 void
777 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
778                        GNUNET_SCHEDULER_TaskCallback cb,
779                        void *cb_cls)
780 {
781   /* allow setting *OR* clearing callback */
782   GNUNET_assert ( (NULL == ev->sent_cb) ||
783                   (NULL == cb) );
784   ev->sent_cb = cb;
785   ev->sent_cls = cb_cls;
786 }
787
788
789 /**
790  * Handle we return for callbacks registered to be
791  * notified when #GNUNET_MQ_destroy() is called on a queue.
792  */
793 struct GNUNET_MQ_DestroyNotificationHandle
794 {
795   /**
796    * Kept in a DLL.
797    */
798   struct GNUNET_MQ_DestroyNotificationHandle *prev;
799
800   /**
801    * Kept in a DLL.
802    */
803   struct GNUNET_MQ_DestroyNotificationHandle *next;
804
805   /**
806    * Queue to notify about.
807    */
808   struct GNUNET_MQ_Handle *mq;
809
810   /**
811    * Function to call.
812    */
813   GNUNET_SCHEDULER_TaskCallback cb;
814
815   /**
816    * Closure for @e cb.
817    */
818   void *cb_cls;
819 };
820
821
822 /**
823  * Destroy the message queue.
824  *
825  * @param mq message queue to destroy
826  */
827 void
828 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
829 {
830   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
831
832   if (NULL != mq->destroy_impl)
833   {
834     mq->destroy_impl (mq, mq->impl_state);
835   }
836   if (NULL != mq->send_task)
837   {
838     GNUNET_SCHEDULER_cancel (mq->send_task);
839     mq->send_task = NULL;
840   }
841   while (NULL != mq->envelope_head)
842   {
843     struct GNUNET_MQ_Envelope *ev;
844
845     ev = mq->envelope_head;
846     ev->parent_queue = NULL;
847     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
848                                  mq->envelope_tail,
849                                  ev);
850     GNUNET_assert (0 < mq->queue_length);
851     mq->queue_length--;
852     LOG (GNUNET_ERROR_TYPE_DEBUG,
853          "MQ destroy drops message of type %u\n",
854          ntohs (ev->mh->type));
855     GNUNET_MQ_discard (ev);
856   }
857   if (NULL != mq->current_envelope)
858   {
859     /* we can only discard envelopes that
860      * are not queued! */
861     mq->current_envelope->parent_queue = NULL;
862     LOG (GNUNET_ERROR_TYPE_DEBUG,
863          "MQ destroy drops current message of type %u\n",
864          ntohs (mq->current_envelope->mh->type));
865     GNUNET_MQ_discard (mq->current_envelope);
866     mq->current_envelope = NULL;
867     GNUNET_assert (0 < mq->queue_length);
868     mq->queue_length--;
869   }
870   GNUNET_assert (0 == mq->queue_length);
871   while (NULL != (dnh = mq->dnh_head))
872   {
873     dnh->cb (dnh->cb_cls);
874     GNUNET_MQ_destroy_notify_cancel (dnh);
875   }
876   if (NULL != mq->assoc_map)
877   {
878     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
879     mq->assoc_map = NULL;
880   }
881   GNUNET_free_non_null (mq->handlers);
882   GNUNET_free (mq);
883 }
884
885
886 const struct GNUNET_MessageHeader *
887 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
888                               uint16_t base_size)
889 {
890   uint16_t whole_size;
891   uint16_t nested_size;
892   const struct GNUNET_MessageHeader *nested_msg;
893
894   whole_size = ntohs (mh->size);
895   GNUNET_assert (whole_size >= base_size);
896   nested_size = whole_size - base_size;
897   if (0 == nested_size)
898     return NULL;
899   if (nested_size < sizeof (struct GNUNET_MessageHeader))
900   {
901     GNUNET_break_op (0);
902     return NULL;
903   }
904   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
905   if (ntohs (nested_msg->size) != nested_size)
906   {
907     GNUNET_break_op (0);
908     return NULL;
909   }
910   return nested_msg;
911 }
912
913
914 /**
915  * Cancel sending the message. Message must have been sent with
916  * #GNUNET_MQ_send before.  May not be called after the notify sent
917  * callback has been called
918  *
919  * @param ev queued envelope to cancel
920  */
921 void
922 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
923 {
924   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
925
926   GNUNET_assert (NULL != mq);
927   GNUNET_assert (NULL != mq->cancel_impl);
928
929   mq->evacuate_called = GNUNET_NO;
930
931   if (mq->current_envelope == ev)
932   {
933     /* complex case, we already started with transmitting
934        the message using the callbacks. */
935     GNUNET_assert (0 < mq->queue_length);
936     mq->queue_length--;
937     mq->cancel_impl (mq,
938                      mq->impl_state);
939     /* continue sending the next message, if any */
940     mq->current_envelope = mq->envelope_head;
941     if (NULL != mq->current_envelope)
942     {
943       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
944                                    mq->envelope_tail,
945                                    mq->current_envelope);
946
947       LOG (GNUNET_ERROR_TYPE_DEBUG,
948            "sending canceled message of type %u queue\n",
949            ntohs(ev->mh->type));
950
951       mq->send_impl (mq,
952                      mq->current_envelope->mh,
953                      mq->impl_state);
954     }
955   }
956   else
957   {
958     /* simple case, message is still waiting in the queue */
959     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
960                                  mq->envelope_tail,
961                                  ev);
962     GNUNET_assert (0 < mq->queue_length);
963     mq->queue_length--;
964   }
965
966   if (GNUNET_YES != mq->evacuate_called)
967   {
968     ev->parent_queue = NULL;
969     ev->mh = NULL;
970     /* also frees ev */
971     GNUNET_free (ev);
972   }
973 }
974
975
976 /**
977  * Function to obtain the current envelope
978  * from within #GNUNET_MQ_SendImpl implementations.
979  *
980  * @param mq message queue to interrogate
981  * @return the current envelope
982  */
983 struct GNUNET_MQ_Envelope *
984 GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
985 {
986   return mq->current_envelope;
987 }
988
989
990 /**
991  * Function to obtain the last envelope in the queue.
992  *
993  * @param mq message queue to interrogate
994  * @return the last envelope in the queue
995  */
996 struct GNUNET_MQ_Envelope *
997 GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
998 {
999   if (NULL != mq->envelope_tail)
1000     return mq->envelope_tail;
1001
1002   return mq->current_envelope;
1003 }
1004
1005
1006 /**
1007  * Set application-specific options for this envelope.
1008  * Overrides the options set for the queue with
1009  * #GNUNET_MQ_set_options() for this message only.
1010  *
1011  * @param env message to set options for
1012  * @param flags flags to use (meaning is queue-specific)
1013  * @param extra additional buffer for further data (also queue-specific)
1014  */
1015 void
1016 GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1017                            uint64_t flags,
1018                            const void *extra)
1019 {
1020   env->flags = flags;
1021   env->extra = extra;
1022   env->have_custom_options = GNUNET_YES;
1023 }
1024
1025
1026 /**
1027  * Get application-specific options for this envelope.
1028  *
1029  * @param env message to set options for
1030  * @param[out] flags set to flags to use (meaning is queue-specific)
1031  * @return extra additional buffer for further data (also queue-specific)
1032  */
1033 const void *
1034 GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env,
1035                            uint64_t *flags)
1036 {
1037   struct GNUNET_MQ_Handle *mq = env->parent_queue;
1038
1039   if (GNUNET_YES == env->have_custom_options)
1040   {
1041     *flags = env->flags;
1042     return env->extra;
1043   }
1044   if (NULL == mq)
1045   {
1046     *flags = 0;
1047     return NULL;
1048   }
1049   *flags = mq->default_flags;
1050   return mq->default_extra;
1051 }
1052
1053
1054 /**
1055  * Set application-specific options for this queue.
1056  *
1057  * @param mq message queue to set options for
1058  * @param flags flags to use (meaning is queue-specific)
1059  * @param extra additional buffer for further data (also queue-specific)
1060  */
1061 void
1062 GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1063                        uint64_t flags,
1064                        const void *extra)
1065 {
1066   mq->default_flags = flags;
1067   mq->default_extra = extra;
1068 }
1069
1070
1071 /**
1072  * Obtain message contained in envelope.
1073  *
1074  * @param env the envelope
1075  * @return message contained in the envelope
1076  */
1077 const struct GNUNET_MessageHeader *
1078 GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env)
1079 {
1080   return env->mh;
1081 }
1082
1083
1084 /**
1085  * Return next envelope in queue.
1086  *
1087  * @param env a queued envelope
1088  * @return next one, or NULL
1089  */
1090 const struct GNUNET_MQ_Envelope *
1091 GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env)
1092 {
1093   return env->next;
1094 }
1095
1096
1097 /**
1098  * Register function to be called whenever @a mq is being
1099  * destroyed.
1100  *
1101  * @param mq message queue to watch
1102  * @param cb function to call on @a mq destruction
1103  * @param cb_cls closure for @a cb
1104  * @return handle for #GNUNET_MQ_destroy_notify_cancel().
1105  */
1106 struct GNUNET_MQ_DestroyNotificationHandle *
1107 GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
1108                           GNUNET_SCHEDULER_TaskCallback cb,
1109                           void *cb_cls)
1110 {
1111   struct GNUNET_MQ_DestroyNotificationHandle *dnh;
1112
1113   dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
1114   dnh->mq = mq;
1115   dnh->cb = cb;
1116   dnh->cb_cls = cb_cls;
1117   GNUNET_CONTAINER_DLL_insert (mq->dnh_head,
1118                                mq->dnh_tail,
1119                                dnh);
1120   return dnh;
1121 }
1122
1123
1124 /**
1125  * Cancel registration from #GNUNET_MQ_destroy_notify().
1126  *
1127  * @param dnh handle for registration to cancel
1128  */
1129 void
1130 GNUNET_MQ_destroy_notify_cancel (struct GNUNET_MQ_DestroyNotificationHandle *dnh)
1131 {
1132   struct GNUNET_MQ_Handle *mq = dnh->mq;
1133
1134   GNUNET_CONTAINER_DLL_remove (mq->dnh_head,
1135                                mq->dnh_tail,
1136                                dnh);
1137   GNUNET_free (dnh);
1138 }
1139
1140
1141 /**
1142  * Insert @a env into the envelope DLL starting at @a env_head
1143  * Note that @a env must not be in any MQ while this function
1144  * is used with DLLs defined outside of the MQ module.  This
1145  * is just in case some application needs to also manage a
1146  * FIFO of envelopes independent of MQ itself and wants to
1147  * re-use the pointers internal to @a env.  Use with caution.
1148  *
1149  * @param[in|out] env_head of envelope DLL
1150  * @param[in|out] env_tail tail of envelope DLL
1151  * @param[in|out] env element to insert at the tail
1152  */
1153 void
1154 GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
1155                            struct GNUNET_MQ_Envelope **env_tail,
1156                            struct GNUNET_MQ_Envelope *env)
1157 {
1158   GNUNET_CONTAINER_DLL_insert_tail (*env_head,
1159                                     *env_tail,
1160                                     env);
1161 }
1162
1163
1164 /**
1165  * Remove @a env from the envelope DLL starting at @a env_head.
1166  * Note that @a env must not be in any MQ while this function
1167  * is used with DLLs defined outside of the MQ module. This
1168  * is just in case some application needs to also manage a
1169  * FIFO of envelopes independent of MQ itself and wants to
1170  * re-use the pointers internal to @a env.  Use with caution.
1171  *
1172  * @param[in|out] env_head of envelope DLL
1173  * @param[in|out] env_tail tail of envelope DLL
1174  * @param[in|out] env element to remove from the DLL
1175  */
1176 void
1177 GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
1178                       struct GNUNET_MQ_Envelope **env_tail,
1179                       struct GNUNET_MQ_Envelope *env)
1180 {
1181   GNUNET_CONTAINER_DLL_remove (*env_head,
1182                                *env_tail,
1183                                env);
1184 }
1185
1186
1187 /**
1188  * Copy an array of handlers.
1189  *
1190  * Useful if the array has been delared in local memory and needs to be
1191  * persisted for future use.
1192  *
1193  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1194  * @return A newly allocated array of handlers.
1195  *         Needs to be freed with #GNUNET_free.
1196  */
1197 struct GNUNET_MQ_MessageHandler *
1198 GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1199 {
1200   struct GNUNET_MQ_MessageHandler *copy;
1201   unsigned int count;
1202
1203   if (NULL == handlers)
1204     return NULL;
1205
1206   count = GNUNET_MQ_count_handlers (handlers);
1207   copy = GNUNET_new_array (count + 1,
1208                            struct GNUNET_MQ_MessageHandler);
1209   GNUNET_memcpy (copy,
1210                  handlers,
1211                  count * sizeof (struct GNUNET_MQ_MessageHandler));
1212   return copy;
1213 }
1214
1215
1216 /**
1217  * Copy an array of handlers, appending AGPL handler.
1218  *
1219  * Useful if the array has been delared in local memory and needs to be
1220  * persisted for future use.
1221  *
1222  * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
1223  * @param agpl_handler function to call for AGPL handling
1224  * @param agpl_cls closure for @a agpl_handler
1225  * @return A newly allocated array of handlers.
1226  *         Needs to be freed with #GNUNET_free.
1227  */
1228 struct GNUNET_MQ_MessageHandler *
1229 GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers,
1230                           GNUNET_MQ_MessageCallback agpl_handler,
1231                           void *agpl_cls)
1232 {
1233   struct GNUNET_MQ_MessageHandler *copy;
1234   unsigned int count;
1235
1236   if (NULL == handlers)
1237     return NULL;
1238   count = GNUNET_MQ_count_handlers (handlers);
1239   copy = GNUNET_new_array (count + 2,
1240                            struct GNUNET_MQ_MessageHandler);
1241   GNUNET_memcpy (copy,
1242                  handlers,
1243                  count * sizeof (struct GNUNET_MQ_MessageHandler));
1244   copy[count].mv = NULL;
1245   copy[count].cb = agpl_handler;
1246   copy[count].cls = agpl_cls;
1247   copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
1248   copy[count].expected_size = sizeof (struct GNUNET_MessageHeader);
1249   return copy;
1250 }
1251
1252
1253 /**
1254  * Count the handlers in a handler array.
1255  *
1256  * @param handlers Array of handlers to be counted.
1257  * @return The number of handlers in the array.
1258  */
1259 unsigned int
1260 GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
1261 {
1262   unsigned int i;
1263
1264   if (NULL == handlers)
1265     return 0;
1266
1267   for (i=0; NULL != handlers[i].cb; i++) ;
1268
1269   return i;
1270 }
1271
1272
1273
1274 /* end of mq.c */