ack florian
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
30
31
32 struct GNUNET_MQ_Envelope
33 {
34   /**
35    * Messages are stored in a linked list.
36    * Each queue has its own list of envelopes.
37    */
38   struct GNUNET_MQ_Envelope *next;
39
40   /**
41    * Messages are stored in a linked list
42    * Each queue has its own list of envelopes.
43    */
44   struct GNUNET_MQ_Envelope *prev;
45
46   /**
47    * Actual allocated message header,
48    * usually points to the end of the containing GNUNET_MQ_Envelope
49    */
50   struct GNUNET_MessageHeader *mh;
51
52   /**
53    * Queue the message is queued in, NULL if message is not queued.
54    */
55   struct GNUNET_MQ_Handle *parent_queue;
56
57   /**
58    * Called after the message was sent irrevocably.
59    */
60   GNUNET_MQ_NotifyCallback sent_cb;
61
62   /**
63    * Closure for @e send_cb
64    */
65   void *sent_cls;
66
67   /**
68    * Flags that were set for this envelope by
69    * #GNUNET_MQ_env_set_options().   Only valid if
70    * @e have_custom_options is set.
71    */
72   uint64_t flags;
73
74   /**
75    * Additional options buffer set for this envelope by
76    * #GNUNET_MQ_env_set_options().  Only valid if
77    * @e have_custom_options is set.
78    */
79   const void *extra;
80
81   /**
82    * Did the application call #GNUNET_MQ_env_set_options()?
83    */
84   int have_custom_options;
85
86 };
87
88
89 /**
90  * Handle to a message queue.
91  */
92 struct GNUNET_MQ_Handle
93 {
94   /**
95    * Handlers array, or NULL if the queue should not receive messages
96    */
97   struct GNUNET_MQ_MessageHandler *handlers;
98
99   /**
100    * Actual implementation of message sending,
101    * called when a message is added
102    */
103   GNUNET_MQ_SendImpl send_impl;
104
105   /**
106    * Implementation-dependent queue destruction function
107    */
108   GNUNET_MQ_DestroyImpl destroy_impl;
109
110   /**
111    * Implementation-dependent send cancel function
112    */
113   GNUNET_MQ_CancelImpl cancel_impl;
114
115   /**
116    * Implementation-specific state
117    */
118   void *impl_state;
119
120   /**
121    * Callback will be called when an error occurs.
122    */
123   GNUNET_MQ_ErrorHandler error_handler;
124
125   /**
126    * Closure for the error handler.
127    */
128   void *error_handler_cls;
129
130   /**
131    * Linked list of messages pending to be sent
132    */
133   struct GNUNET_MQ_Envelope *envelope_head;
134
135   /**
136    * Linked list of messages pending to be sent
137    */
138   struct GNUNET_MQ_Envelope *envelope_tail;
139
140   /**
141    * Message that is currently scheduled to be
142    * sent. Not the head of the message queue, as the implementation
143    * needs to know if sending has been already scheduled or not.
144    */
145   struct GNUNET_MQ_Envelope *current_envelope;
146
147   /**
148    * Map of associations, lazily allocated
149    */
150   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
151
152   /**
153    * Task scheduled during #GNUNET_MQ_impl_send_continue.
154    */
155   struct GNUNET_SCHEDULER_Task *continue_task;
156
157   /**
158    * Additional options buffer set for this queue by
159    * #GNUNET_MQ_set_options().  Default is 0.
160    */
161   const void *default_extra;
162
163   /**
164    * Flags that were set for this queue by
165    * #GNUNET_MQ_set_options().   Default is 0.
166    */
167   uint64_t default_flags;
168
169   /**
170    * Next id that should be used for the @e assoc_map,
171    * initialized lazily to a random value together with
172    * @e assoc_map
173    */
174   uint32_t assoc_id;
175
176   /**
177    * Number of entries we have in the envelope-DLL.
178    */
179   unsigned int queue_length;
180 };
181
182
183 /**
184  * Implementation-specific state for connection to
185  * client (MQ for server).
186  */
187 struct ServerClientSocketState
188 {
189   /**
190    * Handle of the client that connected to the server.
191    */
192   struct GNUNET_SERVER_Client *client;
193
194   /**
195    * Active transmission request to the client.
196    */
197   struct GNUNET_SERVER_TransmitHandle *th;
198 };
199
200
201 /**
202  * Implementation-specific state for connection to
203  * service (MQ for clients).
204  */
205 struct ClientConnectionState
206 {
207   /**
208    * Did we call receive alread alreadyy?
209    */
210   int receive_active;
211
212   /**
213    * Do we also want to receive?
214    */
215   int receive_requested;
216
217   /**
218    * Connection to the service.
219    */
220   struct GNUNET_CLIENT_Connection *connection;
221
222   /**
223    * Active transmission request (or NULL).
224    */
225   struct GNUNET_CLIENT_TransmitHandle *th;
226 };
227
228
229 /**
230  * Call the message message handler that was registered
231  * for the type of the given message in the given message queue.
232  *
233  * This function is indended to be used for the implementation
234  * of message queues.
235  *
236  * @param mq message queue with the handlers
237  * @param mh message to dispatch
238  */
239 void
240 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
241                           const struct GNUNET_MessageHeader *mh)
242 {
243   const struct GNUNET_MQ_MessageHandler *handler;
244   int handled = GNUNET_NO;
245   uint16_t ms = ntohs (mh->size);
246
247   if (NULL == mq->handlers)
248     goto done;
249   for (handler = mq->handlers; NULL != handler->cb; handler++)
250   {
251     if (handler->type == ntohs (mh->type))
252     {
253       handled = GNUNET_YES;
254       if ( (handler->expected_size > ms) ||
255            ( (handler->expected_size != ms) &&
256              (NULL == handler->mv) ) )
257       {
258         /* Too small, or not an exact size and
259            no 'mv' handler to check rest */
260         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
261                     "Received malformed message of type %u\n",
262                     (unsigned int) handler->type);
263         GNUNET_MQ_inject_error (mq,
264                                 GNUNET_MQ_ERROR_MALFORMED);
265         break;
266       }
267       if ( (NULL == handler->mv) ||
268            (GNUNET_OK ==
269             handler->mv (handler->cls, mh)) )
270       {
271         /* message well-formed, pass to handler */
272         handler->cb (handler->cls, mh);
273       }
274       else
275       {
276         /* Message rejected by check routine */
277         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
278                     "Received malformed message of type %u\n",
279                     (unsigned int) handler->type);
280         GNUNET_MQ_inject_error (mq,
281                                 GNUNET_MQ_ERROR_MALFORMED);
282       }
283       break;
284     }
285   }
286  done:
287   if (GNUNET_NO == handled)
288     LOG (GNUNET_ERROR_TYPE_DEBUG,
289          "No handler for message of type %d\n",
290          ntohs (mh->type));
291 }
292
293
294 /**
295  * Call the error handler of a message queue with the given
296  * error code.  If there is no error handler, log a warning.
297  *
298  * This function is intended to be used by the implementation
299  * of message queues.
300  *
301  * @param mq message queue
302  * @param error the error type
303  */
304 void
305 GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
306                         enum GNUNET_MQ_Error error)
307 {
308   if (NULL == mq->error_handler)
309   {
310     LOG (GNUNET_ERROR_TYPE_WARNING,
311          "Got error %d, but no handler installed\n",
312          (int) error);
313     return;
314   }
315   mq->error_handler (mq->error_handler_cls,
316                      error);
317 }
318
319
320 /**
321  * Discard the message queue message, free all
322  * allocated resources. Must be called in the event
323  * that a message is created but should not actually be sent.
324  *
325  * @param mqm the message to discard
326  */
327 void
328 GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
329 {
330   GNUNET_assert (NULL == mqm->parent_queue);
331   GNUNET_free (mqm);
332 }
333
334
335 /**
336  * Obtain the current length of the message queue.
337  *
338  * @param mq queue to inspect
339  * @return number of queued, non-transmitted messages
340  */
341 unsigned int
342 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
343 {
344   return mq->queue_length;
345 }
346
347
348 /**
349  * Send a message with the given message queue.
350  * May only be called once per message.
351  *
352  * @param mq message queue
353  * @param ev the envelope with the message to send.
354  */
355 void
356 GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
357                 struct GNUNET_MQ_Envelope *ev)
358 {
359   GNUNET_assert (NULL != mq);
360   GNUNET_assert (NULL == ev->parent_queue);
361
362   ev->parent_queue = mq;
363   /* is the implementation busy? queue it! */
364   if (NULL != mq->current_envelope)
365   {
366     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
367                                       mq->envelope_tail,
368                                       ev);
369     mq->queue_length++;
370     return;
371   }
372   mq->current_envelope = ev;
373   mq->send_impl (mq, ev->mh, mq->impl_state);
374 }
375
376
377 /**
378  * Send a copy of a message with the given message queue.
379  * Can be called repeatedly on the same envelope.
380  *
381  * @param mq message queue
382  * @param ev the envelope with the message to send.
383  */
384 void
385 GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
386                      const struct GNUNET_MQ_Envelope *ev)
387 {
388   struct GNUNET_MQ_Envelope *env;
389   uint16_t msize;
390
391   msize = ntohs (ev->mh->size);
392   env = GNUNET_malloc (sizeof (struct GNUNET_MQ_Envelope) +
393                        msize);
394   env->mh = (struct GNUNET_MessageHeader *) &env[1];
395   env->sent_cb = ev->sent_cb;
396   env->sent_cls = ev->sent_cls;
397   GNUNET_memcpy (&env[1],
398           ev->mh,
399           msize);
400   GNUNET_MQ_send (mq,
401                   env);
402 }
403
404
405
406 /**
407  * Task run to call the send implementation for the next queued
408  * message, if any.  Only useful for implementing message queues,
409  * results in undefined behavior if not used carefully.
410  *
411  * @param cls message queue to send the next message with
412  */
413 static void
414 impl_send_continue (void *cls)
415 {
416   struct GNUNET_MQ_Handle *mq = cls;
417   struct GNUNET_MQ_Envelope *current_envelope;
418
419   mq->continue_task = NULL;
420   /* call is only valid if we're actually currently sending
421    * a message */
422   current_envelope = mq->current_envelope;
423   GNUNET_assert (NULL != current_envelope);
424   current_envelope->parent_queue = NULL;
425   if (NULL == mq->envelope_head)
426   {
427     mq->current_envelope = NULL;
428   }
429   else
430   {
431     mq->current_envelope = mq->envelope_head;
432     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
433                                  mq->envelope_tail,
434                                  mq->current_envelope);
435     mq->queue_length--;
436     mq->send_impl (mq,
437                    mq->current_envelope->mh,
438                    mq->impl_state);
439   }
440   if (NULL != current_envelope->sent_cb)
441     current_envelope->sent_cb (current_envelope->sent_cls);
442   GNUNET_free (current_envelope);
443 }
444
445
446 /**
447  * Call the send implementation for the next queued message, if any.
448  * Only useful for implementing message queues, results in undefined
449  * behavior if not used carefully.
450  *
451  * @param mq message queue to send the next message with
452  */
453 void
454 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
455 {
456   GNUNET_assert (NULL == mq->continue_task);
457   mq->continue_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
458                                                 mq);
459 }
460
461
462 /**
463  * Create a message queue for the specified handlers.
464  *
465  * @param send function the implements sending messages
466  * @param destroy function that implements destroying the queue
467  * @param cancel function that implements canceling a message
468  * @param impl_state for the queue, passed to 'send' and 'destroy'
469  * @param handlers array of message handlers
470  * @param error_handler handler for read and write errors
471  * @param error_handler_cls closure for @a error_handler
472  * @return a new message queue
473  */
474 struct GNUNET_MQ_Handle *
475 GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
476                                GNUNET_MQ_DestroyImpl destroy,
477                                GNUNET_MQ_CancelImpl cancel,
478                                void *impl_state,
479                                const struct GNUNET_MQ_MessageHandler *handlers,
480                                GNUNET_MQ_ErrorHandler error_handler,
481                                void *error_handler_cls)
482 {
483   struct GNUNET_MQ_Handle *mq;
484   unsigned int i;
485
486   mq = GNUNET_new (struct GNUNET_MQ_Handle);
487   mq->send_impl = send;
488   mq->destroy_impl = destroy;
489   mq->cancel_impl = cancel;
490   if (NULL != handlers)
491   {
492     for (i=0;NULL != handlers[i].cb; i++) ;
493     mq->handlers = GNUNET_new_array (i + 1,
494                                      struct GNUNET_MQ_MessageHandler);
495     GNUNET_memcpy (mq->handlers,
496             handlers,
497             i * sizeof (struct GNUNET_MQ_MessageHandler));
498   }
499   mq->error_handler = error_handler;
500   mq->error_handler_cls = error_handler_cls;
501   mq->impl_state = impl_state;
502
503   return mq;
504 }
505
506
507 /**
508  * Change the closure argument in all of the `handlers` of the
509  * @a mq.
510  *
511  * @param mq to modify
512  * @param handlers_cls new closure to use
513  */
514 void
515 GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
516                                 void *handlers_cls)
517 {
518   unsigned int i;
519
520   if (NULL == mq->handlers)
521     return;
522   for (i=0;NULL != mq->handlers[i].cb; i++)
523     mq->handlers[i].cls = handlers_cls;
524 }
525
526
527 /**
528  * Get the message that should currently be sent.
529  * Fails if there is no current message.
530  * Only useful for implementing message queues,
531  * results in undefined behavior if not used carefully.
532  *
533  * @param mq message queue with the current message
534  * @return message to send, never NULL
535  */
536 const struct GNUNET_MessageHeader *
537 GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
538 {
539   if (NULL == mq->current_envelope)
540     GNUNET_assert (0);
541   if (NULL == mq->current_envelope->mh)
542     GNUNET_assert (0);
543   return mq->current_envelope->mh;
544 }
545
546
547 /**
548  * Get the implementation state associated with the
549  * message queue.
550  *
551  * While the GNUNET_MQ_Impl* callbacks receive the
552  * implementation state, continuations that are scheduled
553  * by the implementation function often only have one closure
554  * argument, with this function it is possible to get at the
555  * implementation state when only passing the GNUNET_MQ_Handle
556  * as closure.
557  *
558  * @param mq message queue with the current message
559  * @return message to send, never NULL
560  */
561 void *
562 GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
563 {
564   return mq->impl_state;
565 }
566
567
568 struct GNUNET_MQ_Envelope *
569 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp,
570                 uint16_t size,
571                 uint16_t type)
572 {
573   struct GNUNET_MQ_Envelope *mqm;
574
575   mqm = GNUNET_malloc (sizeof *mqm + size);
576   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
577   mqm->mh->size = htons (size);
578   mqm->mh->type = htons (type);
579   if (NULL != mhp)
580     *mhp = mqm->mh;
581   return mqm;
582 }
583
584
585 /**
586  * Create a new envelope by copying an existing message.
587  *
588  * @param hdr header of the message to copy
589  * @return envelope containing @a hdr
590  */
591 struct GNUNET_MQ_Envelope *
592 GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
593 {
594   struct GNUNET_MQ_Envelope *mqm;
595   uint16_t size = ntohs (hdr->size);
596
597   mqm = GNUNET_malloc (sizeof (*mqm) + size);
598   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
599   GNUNET_memcpy (mqm->mh,
600           hdr,
601           size);
602   return mqm;
603 }
604
605
606 /**
607  * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
608  *
609  * @param mhp pointer to the message header pointer that will be changed to allocate at
610  *        the newly allocated space for the message.
611  * @param base_size size of the data before the nested message
612  * @param type type of the message in the envelope
613  * @param nested_mh the message to append to the message after base_size
614  */
615 struct GNUNET_MQ_Envelope *
616 GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
617                           uint16_t base_size,
618                           uint16_t type,
619                           const struct GNUNET_MessageHeader *nested_mh)
620 {
621   struct GNUNET_MQ_Envelope *mqm;
622   uint16_t size;
623
624   if (NULL == nested_mh)
625     return GNUNET_MQ_msg_ (mhp, base_size, type);
626
627   size = base_size + ntohs (nested_mh->size);
628
629   /* check for uint16_t overflow */
630   if (size < base_size)
631     return NULL;
632
633   mqm = GNUNET_MQ_msg_ (mhp, size, type);
634   GNUNET_memcpy ((char *) mqm->mh + base_size,
635                  nested_mh,
636                  ntohs (nested_mh->size));
637
638   return mqm;
639 }
640
641
642 /**
643  * Transmit a queued message to the session's client.
644  *
645  * @param cls consensus session
646  * @param size number of bytes available in @a buf
647  * @param buf where the callee should write the message
648  * @return number of bytes written to @a buf
649  */
650 static size_t
651 transmit_queued (void *cls,
652                  size_t size,
653                  void *buf)
654 {
655   struct GNUNET_MQ_Handle *mq = cls;
656   struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
657   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
658   size_t msg_size;
659
660   GNUNET_assert (NULL != buf);
661   msg_size = ntohs (msg->size);
662   GNUNET_assert (size >= msg_size);
663   GNUNET_memcpy (buf, msg, msg_size);
664   state->th = NULL;
665
666   GNUNET_MQ_impl_send_continue (mq);
667
668   return msg_size;
669 }
670
671
672 static void
673 server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
674                             void *impl_state)
675 {
676   struct ServerClientSocketState *state = impl_state;
677
678   if (NULL != state->th)
679   {
680     GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
681     state->th = NULL;
682   }
683
684   GNUNET_assert (NULL != mq);
685   GNUNET_assert (NULL != state);
686   GNUNET_SERVER_client_drop (state->client);
687   GNUNET_free (state);
688 }
689
690
691 static void
692 server_client_send_impl (struct GNUNET_MQ_Handle *mq,
693                          const struct GNUNET_MessageHeader *msg,
694                          void *impl_state)
695 {
696   struct ServerClientSocketState *state = impl_state;
697
698   GNUNET_assert (NULL != mq);
699   state->th = GNUNET_SERVER_notify_transmit_ready (state->client,
700                                                    ntohs (msg->size),
701                                                    GNUNET_TIME_UNIT_FOREVER_REL,
702                                                    &transmit_queued, mq);
703 }
704
705
706 struct GNUNET_MQ_Handle *
707 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
708 {
709   struct GNUNET_MQ_Handle *mq;
710   struct ServerClientSocketState *scss;
711
712   mq = GNUNET_new (struct GNUNET_MQ_Handle);
713   scss = GNUNET_new (struct ServerClientSocketState);
714   mq->impl_state = scss;
715   scss->client = client;
716   GNUNET_SERVER_client_keep (client);
717   mq->send_impl = &server_client_send_impl;
718   mq->destroy_impl = &server_client_destroy_impl;
719   return mq;
720 }
721
722
723 /**
724  * Type of a function to call when we receive a message
725  * from the service.
726  *
727  * @param cls closure
728  * @param msg message received, NULL on timeout or fatal error
729  */
730 static void
731 handle_client_message (void *cls,
732                        const struct GNUNET_MessageHeader *msg)
733 {
734   struct GNUNET_MQ_Handle *mq = cls;
735   struct ClientConnectionState *state;
736
737   state = mq->impl_state;
738   if (NULL == msg)
739   {
740     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
741     return;
742   }
743   GNUNET_CLIENT_receive (state->connection,
744                          &handle_client_message,
745                          mq,
746                          GNUNET_TIME_UNIT_FOREVER_REL);
747   GNUNET_MQ_inject_message (mq, msg);
748 }
749
750
751 /**
752  * Transmit a queued message to the session's client.
753  *
754  * @param cls consensus session
755  * @param size number of bytes available in @a buf
756  * @param buf where the callee should write the message
757  * @return number of bytes written to buf
758  */
759 static size_t
760 connection_client_transmit_queued (void *cls,
761                                    size_t size,
762                                    void *buf)
763 {
764   struct GNUNET_MQ_Handle *mq = cls;
765   const struct GNUNET_MessageHeader *msg;
766   struct ClientConnectionState *state = mq->impl_state;
767   size_t msg_size;
768
769   GNUNET_assert (NULL != mq);
770   state->th = NULL;
771   msg = GNUNET_MQ_impl_current (mq);
772
773   if (NULL == buf)
774   {
775     GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
776     return 0;
777   }
778
779   if ( (GNUNET_YES == state->receive_requested) &&
780        (GNUNET_NO == state->receive_active) )
781   {
782     state->receive_active = GNUNET_YES;
783     GNUNET_CLIENT_receive (state->connection,
784                            &handle_client_message,
785                            mq,
786                            GNUNET_TIME_UNIT_FOREVER_REL);
787   }
788
789   msg_size = ntohs (msg->size);
790   GNUNET_assert (size >= msg_size);
791   GNUNET_memcpy (buf, msg, msg_size);
792   state->th = NULL;
793
794   GNUNET_MQ_impl_send_continue (mq);
795
796   return msg_size;
797 }
798
799
800 static void
801 connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
802                                 void *impl_state)
803 {
804   struct ClientConnectionState *state = impl_state;
805
806   if (NULL != state->th)
807   {
808     GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
809     state->th = NULL;
810   }
811   GNUNET_CLIENT_disconnect (state->connection);
812   GNUNET_free (impl_state);
813 }
814
815
816 static void
817 connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
818                              const struct GNUNET_MessageHeader *msg,
819                              void *impl_state)
820 {
821   struct ClientConnectionState *state = impl_state;
822
823   GNUNET_assert (NULL != state);
824   GNUNET_assert (NULL == state->th);
825   state->th =
826       GNUNET_CLIENT_notify_transmit_ready (state->connection,
827                                            ntohs (msg->size),
828                                            GNUNET_TIME_UNIT_FOREVER_REL,
829                                            GNUNET_NO,
830                                            &connection_client_transmit_queued,
831                                            mq);
832   GNUNET_assert (NULL != state->th);
833 }
834
835
836 static void
837 connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
838                                void *impl_state)
839 {
840   struct ClientConnectionState *state = impl_state;
841
842   if (NULL != state->th)
843   {
844     GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
845     state->th = NULL;
846   }
847   else if (NULL != mq->continue_task)
848   {
849     GNUNET_SCHEDULER_cancel (mq->continue_task);
850     mq->continue_task = NULL;
851   }
852   else
853     GNUNET_assert (0);
854 }
855
856
857 struct GNUNET_MQ_Handle *
858 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
859                                        const struct GNUNET_MQ_MessageHandler *handlers,
860                                        GNUNET_MQ_ErrorHandler error_handler,
861                                        void *error_handler_cls)
862 {
863   struct GNUNET_MQ_Handle *mq;
864   struct ClientConnectionState *state;
865   unsigned int i;
866
867   mq = GNUNET_new (struct GNUNET_MQ_Handle);
868   if (NULL != handlers)
869   {
870     for (i=0;NULL != handlers[i].cb; i++) ;
871     mq->handlers = GNUNET_new_array (i + 1,
872                                      struct GNUNET_MQ_MessageHandler);
873     GNUNET_memcpy (mq->handlers,
874                    handlers,
875                    i * sizeof (struct GNUNET_MQ_MessageHandler));
876   }
877   mq->error_handler = error_handler;
878   mq->error_handler_cls = error_handler_cls;
879   state = GNUNET_new (struct ClientConnectionState);
880   state->connection = connection;
881   mq->impl_state = state;
882   mq->send_impl = &connection_client_send_impl;
883   mq->destroy_impl = &connection_client_destroy_impl;
884   mq->cancel_impl = &connection_client_cancel_impl;
885   if (NULL != handlers)
886     state->receive_requested = GNUNET_YES;
887
888   return mq;
889 }
890
891
892 /**
893  * Associate the assoc_data in mq with a unique request id.
894  *
895  * @param mq message queue, id will be unique for the queue
896  * @param assoc_data to associate
897  */
898 uint32_t
899 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
900                      void *assoc_data)
901 {
902   uint32_t id;
903
904   if (NULL == mq->assoc_map)
905   {
906     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
907     mq->assoc_id = 1;
908   }
909   id = mq->assoc_id++;
910   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
911                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
912   return id;
913 }
914
915
916 void *
917 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq,
918                      uint32_t request_id)
919 {
920   if (NULL == mq->assoc_map)
921     return NULL;
922   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
923 }
924
925
926 void *
927 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq,
928                         uint32_t request_id)
929 {
930   void *val;
931
932   if (NULL == mq->assoc_map)
933     return NULL;
934   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map,
935                                              request_id);
936   GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map,
937                                               request_id);
938   return val;
939 }
940
941
942 void
943 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
944                        GNUNET_MQ_NotifyCallback cb,
945                        void *cls)
946 {
947   mqm->sent_cb = cb;
948   mqm->sent_cls = cls;
949 }
950
951
952 void
953 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
954 {
955   if (NULL != mq->destroy_impl)
956   {
957     mq->destroy_impl (mq, mq->impl_state);
958   }
959   if (NULL != mq->continue_task)
960   {
961     GNUNET_SCHEDULER_cancel (mq->continue_task);
962     mq->continue_task = NULL;
963   }
964   while (NULL != mq->envelope_head)
965   {
966     struct GNUNET_MQ_Envelope *ev;
967
968     ev = mq->envelope_head;
969     ev->parent_queue = NULL;
970     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
971                                  mq->envelope_tail,
972                                  ev);
973     mq->queue_length--;
974     GNUNET_MQ_discard (ev);
975   }
976   GNUNET_assert (0 == mq->queue_length);
977   if (NULL != mq->current_envelope)
978   {
979     /* we can only discard envelopes that
980      * are not queued! */
981     mq->current_envelope->parent_queue = NULL;
982     GNUNET_MQ_discard (mq->current_envelope);
983     mq->current_envelope = NULL;
984   }
985   if (NULL != mq->assoc_map)
986   {
987     GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
988     mq->assoc_map = NULL;
989   }
990   GNUNET_free_non_null (mq->handlers);
991   GNUNET_free (mq);
992 }
993
994
995 const struct GNUNET_MessageHeader *
996 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
997                               uint16_t base_size)
998 {
999   uint16_t whole_size;
1000   uint16_t nested_size;
1001   const struct GNUNET_MessageHeader *nested_msg;
1002
1003   whole_size = ntohs (mh->size);
1004   GNUNET_assert (whole_size >= base_size);
1005   nested_size = whole_size - base_size;
1006   if (0 == nested_size)
1007     return NULL;
1008   if (nested_size < sizeof (struct GNUNET_MessageHeader))
1009   {
1010     GNUNET_break_op (0);
1011     return NULL;
1012   }
1013   nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
1014   if (ntohs (nested_msg->size) != nested_size)
1015   {
1016     GNUNET_break_op (0);
1017     return NULL;
1018   }
1019   return nested_msg;
1020 }
1021
1022
1023 /**
1024  * Cancel sending the message. Message must have been sent with
1025  * #GNUNET_MQ_send before.  May not be called after the notify sent
1026  * callback has been called
1027  *
1028  * @param ev queued envelope to cancel
1029  */
1030 void
1031 GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
1032 {
1033   struct GNUNET_MQ_Handle *mq = ev->parent_queue;
1034
1035   GNUNET_assert (NULL != mq);
1036   GNUNET_assert (NULL != mq->cancel_impl);
1037
1038   if (mq->current_envelope == ev)
1039   {
1040     // complex case, we already started with transmitting
1041     // the message
1042     mq->cancel_impl (mq,
1043                      mq->impl_state);
1044     // continue sending the next message, if any
1045     if (NULL == mq->envelope_head)
1046     {
1047       mq->current_envelope = NULL;
1048     }
1049     else
1050     {
1051       mq->current_envelope = mq->envelope_head;
1052       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1053                                    mq->envelope_tail,
1054                                    mq->current_envelope);
1055       mq->queue_length--;
1056       mq->send_impl (mq,
1057                      mq->current_envelope->mh,
1058                      mq->impl_state);
1059     }
1060   }
1061   else
1062   {
1063     // simple case, message is still waiting in the queue
1064     GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1065                                  mq->envelope_tail,
1066                                  ev);
1067     mq->queue_length--;
1068   }
1069
1070   ev->parent_queue = NULL;
1071   ev->mh = NULL;
1072   GNUNET_free (ev);
1073 }
1074
1075
1076 /**
1077  * Function to obtain the current envelope
1078  * from within #GNUNET_MQ_SendImpl implementations.
1079  *
1080  * @param mq message queue to interrogate
1081  * @return the current envelope
1082  */
1083 struct GNUNET_MQ_Envelope *
1084 GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
1085 {
1086   return mq->current_envelope;
1087 }
1088
1089
1090 /**
1091  * Function to obtain the last envelope in the queue.
1092  *
1093  * @param mq message queue to interrogate
1094  * @return the last envelope in the queue
1095  */
1096 struct GNUNET_MQ_Envelope *
1097 GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
1098 {
1099   if (NULL != mq->envelope_tail)
1100     return mq->envelope_tail;
1101
1102   return mq->current_envelope;
1103 }
1104
1105
1106 /**
1107  * Set application-specific options for this envelope.
1108  * Overrides the options set for the queue with
1109  * #GNUNET_MQ_set_options() for this message only.
1110  *
1111  * @param env message to set options for
1112  * @param flags flags to use (meaning is queue-specific)
1113  * @param extra additional buffer for further data (also queue-specific)
1114  */
1115 void
1116 GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1117                            uint64_t flags,
1118                            const void *extra)
1119 {
1120   env->flags = flags;
1121   env->extra = extra;
1122   env->have_custom_options = GNUNET_YES;
1123 }
1124
1125
1126 /**
1127  * Get application-specific options for this envelope.
1128  *
1129  * @param env message to set options for
1130  * @param[out] flags set to flags to use (meaning is queue-specific)
1131  * @return extra additional buffer for further data (also queue-specific)
1132  */
1133 const void *
1134 GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env,
1135                            uint64_t *flags)
1136 {
1137   struct GNUNET_MQ_Handle *mq = env->parent_queue;
1138
1139   if (GNUNET_YES == env->have_custom_options)
1140   {
1141     *flags = env->flags;
1142     return env->extra;
1143   }
1144   if (NULL == mq)
1145   {
1146     *flags = 0;
1147     return NULL;
1148   }
1149   *flags = mq->default_flags;
1150   return mq->default_extra;
1151 }
1152
1153
1154 /**
1155  * Set application-specific options for this queue.
1156  *
1157  * @param mq message queue to set options for
1158  * @param flags flags to use (meaning is queue-specific)
1159  * @param extra additional buffer for further data (also queue-specific)
1160  */
1161 void
1162 GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1163                        uint64_t flags,
1164                        const void *extra)
1165 {
1166   mq->default_flags = flags;
1167   mq->default_extra = extra;
1168 }
1169
1170
1171 /* end of mq.c */