- distribute peers equally among island nodes on SuperMUC
[oweals/gnunet.git] / src / set / mq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file set/mq.c
24  * @brief general purpose request queue
25  */
26
27 #include "mq.h"
28
29
30 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
31
32 /**
33  * Signature of functions implementing the
34  * sending part of a message queue
35  *
36  * @param q the message queue
37  * @param m the message
38  */
39 typedef void (*SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m);
40
41
42 typedef void (*DestroyImpl) (struct GNUNET_MQ_MessageQueue *q);
43
44
45 /**
46  * Collection of the state necessary to read and write gnunet messages 
47  * to a stream socket. Should be used as closure for stream_data_processor.
48  */
49 struct MessageStreamState
50 {
51   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
52   struct MessageQueue *mq;
53   struct GNUNET_STREAM_Socket *socket;
54   struct GNUNET_STREAM_ReadHandle *rh;
55   struct GNUNET_STREAM_WriteHandle *wh;
56 };
57
58
59 struct ServerClientSocketState
60 {
61   struct GNUNET_SERVER_Client *client;
62   struct GNUNET_SERVER_TransmitHandle* th;
63 };
64
65
66 struct ClientConnectionState
67 {
68   struct GNUNET_CLIENT_Connection *connection;
69   struct GNUNET_CLIENT_TransmitHandle *th;
70 };
71
72
73 struct GNUNET_MQ_MessageQueue
74 {
75   /**
76    * Handlers array, or NULL if the queue should not receive messages
77    */
78   const struct GNUNET_MQ_Handler *handlers;
79
80   /**
81    * Closure for the handler callbacks
82    */
83   void *handlers_cls;
84
85   /**
86    * Actual implementation of message sending,
87    * called when a message is added
88    */
89   SendImpl send_impl;
90
91   /**
92    * Implementation-dependent queue destruction function
93    */
94   DestroyImpl destroy_impl;
95
96   /**
97    * Implementation-specific state
98    */
99   void *impl_state;
100
101   /**
102    * Callback will be called when the message queue is empty
103    */
104   GNUNET_MQ_NotifyCallback empty_cb;
105
106   /**
107    * Closure for empty_cb
108    */
109   void *empty_cls;
110
111   /**
112    * Callback will be called when a read error occurs.
113    */
114   GNUNET_MQ_NotifyCallback read_error_cb;
115
116   /**
117    * Closure for read_error_cb
118    */
119   void *read_error_cls;
120
121   /**
122    * Linked list of messages pending to be sent
123    */
124   struct GNUNET_MQ_Message *msg_head;
125
126   /**
127    * Linked list of messages pending to be sent
128    */
129   struct GNUNET_MQ_Message *msg_tail;
130
131   /**
132    * Message that is currently scheduled to be
133    * sent. Not the head of the message queue, as the implementation
134    * needs to know if sending has been already scheduled or not.
135    */
136   struct GNUNET_MQ_Message *current_msg;
137
138   /**
139    * Map of associations, lazily allocated
140    */
141   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
142
143   /**
144    * Next id that should be used for the assoc_map,
145    * initialized lazily to a random value together with
146    * assoc_map
147    */
148   uint32_t assoc_id;
149 };
150
151
152 struct GNUNET_MQ_Message
153 {
154   /**
155    * Messages are stored in a linked list
156    */
157   struct GNUNET_MQ_Message *next;
158
159   /**
160    * Messages are stored in a linked list
161    */
162   struct GNUNET_MQ_Message *prev;
163
164   /**
165    * Actual allocated message header,
166    * usually points to the end of the containing GNUNET_MQ_Message
167    */
168   struct GNUNET_MessageHeader *mh;
169
170   /**
171    * Queue the message is queued in, NULL if message is not queued.
172    */
173   struct GNUNET_MQ_MessageQueue *parent_queue;
174
175   /**
176    * Called after the message was sent irrevokably
177    */
178   GNUNET_MQ_NotifyCallback sent_cb;
179
180   /**
181    * Closure for send_cb
182    */
183   void *sent_cls;
184 };
185
186
187 /**
188  * Call the right callback for a message received
189  * by a queue
190  */
191 static void
192 dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
193 {
194   const struct GNUNET_MQ_Handler *handler;
195
196   handler = mq->handlers;
197   if (NULL == handler)
198     return;
199   for (; NULL != handler->cb; handler++)
200     if (handler->type == ntohs (mh->type))
201       handler->cb (mq->handlers_cls, mh);
202 }
203
204
205 void
206 GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
207 {
208   GNUNET_assert (NULL == mqm->parent_queue);
209   GNUNET_free (mqm);
210 }
211
212
213 /**
214  * Send a message with the give message queue.
215  * May only be called once per message.
216  * 
217  * @param mq message queue
218  * @param mqm the message to send.
219  */
220 void
221 GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
222 {
223   mq->send_impl (mq, mqm);
224 }
225
226
227 struct GNUNET_MQ_Message *
228 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
229 {
230   struct GNUNET_MQ_Message *mqm;
231   mqm = GNUNET_malloc (sizeof *mqm + size);
232   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
233   mqm->mh->size = htons (size);
234   mqm->mh->type = htons (type);
235   if (NULL != mhp)
236     *mhp = mqm->mh;
237   return mqm;
238 }
239
240
241 int
242 GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
243                  const void *data, uint16_t len)
244 {
245   size_t new_size;
246   size_t old_size;
247
248   if (NULL == data)
249     return GNUNET_OK;
250   GNUNET_assert (NULL != mqmp);
251   old_size = ntohs ((*mqmp)->mh->size);
252   /* message too large to concatenate? */
253   if (ntohs ((*mqmp)->mh->size) + len < len)
254     return GNUNET_SYSERR;
255   new_size = old_size + len;
256   *mqmp = GNUNET_realloc (mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
257   memcpy ((*mqmp)->mh + old_size, data, new_size - old_size);
258   (*mqmp)->mh->size = htons (new_size);
259   return GNUNET_OK;
260 }
261
262
263 /**
264  * Functions of this signature are called whenever writing operations
265  * on a stream are executed
266  *
267  * @param cls the closure from GNUNET_STREAM_write
268  * @param status the status of the stream at the time this function is called;
269  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
270  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
271  *          (this doesn't mean that the data is never sent, the receiver may
272  *          have read the data but its ACKs may have been lost);
273  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
274  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
275  *          be processed.
276  * @param size the number of bytes written
277  */
278 static void 
279 stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
280 {
281   struct GNUNET_MQ_MessageQueue *mq = cls;
282   struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
283   struct GNUNET_MQ_Message *mqm;
284
285   GNUNET_assert (GNUNET_STREAM_OK == status);
286   
287   /* call cb for message we finished sending */
288   mqm = mq->current_msg;
289   if (NULL != mqm)
290   {
291     if (NULL != mqm->sent_cb)
292       mqm->sent_cb (mqm->sent_cls);
293     GNUNET_free (mqm);
294   }
295
296   mss->wh = NULL;
297
298   mqm = mq->msg_head;
299   mq->current_msg = mqm;
300   if (NULL == mqm)
301   {
302     if (NULL != mq->empty_cb)
303       mq->empty_cb (mq->empty_cls);
304     return;
305   }
306   GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
307   mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
308                                  GNUNET_TIME_UNIT_FOREVER_REL,
309                                  stream_write_queued, mq);
310   GNUNET_assert (NULL != mss->wh);
311 }
312
313
314 static void
315 stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq,
316                          struct GNUNET_MQ_Message *mqm)
317 {
318   struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
319   if (NULL != mq->current_msg)
320   {
321     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
322     return;
323   }
324   mq->current_msg = mqm;
325   mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
326                                  GNUNET_TIME_UNIT_FOREVER_REL,
327                                  stream_write_queued, mq);
328 }
329
330
331 /**
332  * Functions with this signature are called whenever a
333  * complete message is received by the tokenizer.
334  *
335  * Do not call GNUNET_SERVER_mst_destroy in callback
336  *
337  * @param cls closure
338  * @param client identification of the client
339  * @param message the actual message
340  *
341  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
342  */
343 static int
344 stream_mst_callback (void *cls, void *client,
345                      const struct GNUNET_MessageHeader *message)
346 {
347   struct GNUNET_MQ_MessageQueue *mq = cls;
348
349   GNUNET_assert (NULL != message);
350   dispatch_message (mq, message);
351   return GNUNET_OK;
352 }
353
354
355 /**
356  * Functions of this signature are called whenever data is available from the
357  * stream.
358  *
359  * @param cls the closure from GNUNET_STREAM_read
360  * @param status the status of the stream at the time this function is called
361  * @param data traffic from the other side
362  * @param size the number of bytes available in data read; will be 0 on timeout 
363  * @return number of bytes of processed from 'data' (any data remaining should be
364  *         given to the next time the read processor is called).
365  */
366 static size_t
367 stream_data_processor (void *cls,
368                        enum GNUNET_STREAM_Status status,
369                        const void *data,
370                        size_t size)
371 {
372   struct GNUNET_MQ_MessageQueue *mq = cls;
373   struct MessageStreamState *mss;
374   int ret;
375   
376   mss = (struct MessageStreamState *) mq->impl_state;
377   GNUNET_assert (GNUNET_STREAM_OK == status);
378   ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
379   GNUNET_assert (GNUNET_OK == ret);
380   /* we always read all data */
381     mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, 
382                                   stream_data_processor, mq);
383   return size;
384 }
385
386
387 struct GNUNET_MQ_MessageQueue *
388 GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
389                                    const struct GNUNET_MQ_Handler *handlers,
390                                    void *cls)
391 {
392   struct GNUNET_MQ_MessageQueue *mq;
393   struct MessageStreamState *mss;
394
395   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
396   mss = GNUNET_new (struct MessageStreamState);
397   mss->socket = socket;
398   mq->impl_state = mss;
399   mq->send_impl = stream_socket_send_impl;
400   mq->handlers = handlers;
401   mq->handlers_cls = cls;
402   if (NULL != handlers)
403   {
404     mss->mst = GNUNET_SERVER_mst_create (stream_mst_callback, mq);
405     mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 
406                                   stream_data_processor, mq);
407   }
408   return mq;
409 }
410
411
412 /*** Transmit a queued message to the session's client.
413  *
414  * @param cls consensus session
415  * @param size number of bytes available in buf
416  * @param buf where the callee should write the message
417  * @return number of bytes written to buf
418  */
419 static size_t
420 transmit_queued (void *cls, size_t size,
421                  void *buf)
422 {
423   struct GNUNET_MQ_MessageQueue *mq = cls;
424   struct GNUNET_MQ_Message *mqm = mq->current_msg;
425   struct ServerClientSocketState *state = mq->impl_state;
426   size_t msg_size;
427
428   mq->current_msg = NULL;
429   GNUNET_assert (NULL != mqm);
430   GNUNET_assert (NULL != buf);
431   msg_size = ntohs (mqm->mh->size);
432   GNUNET_assert (size >= msg_size);
433   memcpy (buf, mqm->mh, msg_size);
434   GNUNET_free (mqm);
435   state->th = NULL;
436   if (NULL != mq->msg_head)
437   {
438     mq->current_msg = mq->msg_head;
439     GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
440     state->th = 
441         GNUNET_SERVER_notify_transmit_ready (state->client, msg_size, 
442                                              GNUNET_TIME_UNIT_FOREVER_REL,
443                                              &transmit_queued, mq);
444   }
445   else if (NULL != mq->empty_cb)
446     mq->empty_cb (mq->empty_cls);
447   return msg_size;
448 }
449
450
451 static void
452 server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
453 {
454   struct ServerClientSocketState *state = mq->impl_state;
455   int msize;
456
457   GNUNET_assert (NULL != state);
458
459   if (NULL != state->th)
460   {
461     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
462     return;
463   }
464   GNUNET_assert (NULL == mq->current_msg);
465   msize = ntohs (mq->msg_head->mh->size);
466   mq->current_msg = mqm;
467   state->th = 
468       GNUNET_SERVER_notify_transmit_ready (state->client, msize, 
469                                            GNUNET_TIME_UNIT_FOREVER_REL,
470                                            &transmit_queued, mq);
471 }
472
473
474 struct GNUNET_MQ_MessageQueue *
475 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
476 {
477   struct GNUNET_MQ_MessageQueue *mq;
478   struct ServerClientSocketState *scss;
479
480   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
481   scss = GNUNET_new (struct ServerClientSocketState);
482   mq->impl_state = scss;
483   mq->send_impl = server_client_send_impl;
484   return mq;
485 }
486
487
488 /**
489  * Transmit a queued message to the session's client.
490  *
491  * @param cls consensus session
492  * @param size number of bytes available in buf
493  * @param buf where the callee should write the message
494  * @return number of bytes written to buf
495  */
496 static size_t
497 connection_client_transmit_queued (void *cls, size_t size,
498                  void *buf)
499 {
500   struct GNUNET_MQ_MessageQueue *mq = cls;
501   struct GNUNET_MQ_Message *mqm = mq->current_msg;
502   struct ClientConnectionState *state = mq->impl_state;
503   size_t msg_size;
504
505   mq->current_msg = NULL;
506   GNUNET_assert (NULL != mqm);
507   GNUNET_assert (NULL != buf);
508   msg_size = ntohs (mqm->mh->size);
509   GNUNET_assert (size >= msg_size);
510   memcpy (buf, mqm->mh, msg_size);
511   GNUNET_free (mqm);
512   state->th = NULL;
513   if (NULL != mq->msg_head)
514   {
515     mq->current_msg = mq->msg_head;
516     GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
517     state->th = 
518       GNUNET_CLIENT_notify_transmit_ready (state->connection, htons (mq->current_msg->mh->size), 
519                                              GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
520                                              &connection_client_transmit_queued, mq);
521   }
522   else if (NULL != mq->empty_cb)
523     mq->empty_cb (mq->empty_cls);
524   return msg_size;
525 }
526
527
528 static void
529 connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
530                              struct GNUNET_MQ_Message *mqm)
531 {
532   struct ClientConnectionState *state = mq->impl_state;
533   int msize;
534
535   GNUNET_assert (NULL != state);
536
537   if (NULL != state->th)
538   {
539     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
540     return;
541   }
542   GNUNET_assert (NULL == mq->current_msg);
543   mq->current_msg = mqm;
544   msize = ntohs (mqm->mh->size);
545   state->th = 
546       GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, 
547                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
548                                            &connection_client_transmit_queued, mq);
549 }
550
551
552 /**
553  * Type of a function to call when we receive a message
554  * from the service.
555  *
556  * @param cls closure
557  * @param msg message received, NULL on timeout or fatal error
558  */
559 static void
560 handle_client_message (void *cls,
561                        const struct GNUNET_MessageHeader *msg)
562 {
563   struct GNUNET_MQ_MessageQueue *mq = cls;
564   
565   if (NULL == msg)
566   {
567     if (NULL == mq->read_error_cb)
568       LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
569     mq->read_error_cb (mq->read_error_cls);
570     return;
571   }
572   dispatch_message (mq, msg);
573 }
574
575
576 struct GNUNET_MQ_MessageQueue *
577 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
578                                        const struct GNUNET_MQ_Handler *handlers,
579                                        void *cls)
580 {
581   struct GNUNET_MQ_MessageQueue *mq;
582   struct ClientConnectionState *state;
583
584   GNUNET_assert (NULL != connection);
585
586   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
587   mq->handlers = handlers;
588   mq->handlers_cls = cls;
589   state = GNUNET_new (struct ClientConnectionState);
590   state->connection = connection;
591   mq->impl_state = state;
592   mq->send_impl = connection_client_send_impl;
593
594   if (NULL != handlers)
595   {
596     GNUNET_CLIENT_receive (connection, handle_client_message, mq,
597                            GNUNET_TIME_UNIT_FOREVER_REL);
598   }
599
600   return mq;
601 }
602
603
604 void
605 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
606                             const struct GNUNET_MQ_Handler *new_handlers,
607                             void *cls)
608 {
609   mq->handlers = new_handlers;
610   mq->handlers_cls = cls;
611 }
612
613
614 /**
615  * Associate the assoc_data in mq with a unique request id.
616  *
617  * @param mq message queue, id will be unique for the queue
618  * @param mqm message to associate
619  * @param assoc_data to associate
620  */
621 uint32_t
622 GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
623                      struct GNUNET_MQ_Message *mqm,
624                      void *assoc_data)
625 {
626   uint32_t id;
627
628   if (NULL == mq->assoc_map)
629     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
630   id = mq->assoc_id++;
631   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
632                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
633   return id;
634 }
635
636
637
638 void *
639 GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
640 {
641   if (NULL == mq->assoc_map)
642     return NULL;
643   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
644 }
645
646
647 void *
648 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
649 {
650   void *val;
651
652   if (NULL == mq->assoc_map)
653     return NULL;
654   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
655   GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
656   return val;
657 }
658
659
660 void
661 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
662                        GNUNET_MQ_NotifyCallback cb,
663                        void *cls)
664 {
665   mqm->sent_cb = cb;
666   mqm->sent_cls = cls;
667 }
668
669
670 void
671 GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
672 {
673   /* FIXME: destroy all pending messages in the queue */
674   GNUNET_free (mq);
675 }
676
677
678 /**
679  * Call a callback once all messages queued have been sent,
680  * i.e. the message queue is empty.
681  *
682  * @param mqm the message queue to send the notification for
683  * @param cb the callback to call on an empty queue
684  * @param cls closure for cb
685  */
686 void
687 GNUNET_MQ_notify_empty (struct GNUNET_MQ_MessageQueue *mqm,
688                         GNUNET_MQ_NotifyCallback cb,
689                         void *cls)
690 {
691   mqm->empty_cb = cb;
692   mqm->empty_cls = cls;
693 }