forgot to commit new files
[oweals/gnunet.git] / src / set / mq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file mq/mq.c
24  * @brief general purpose request queue
25  */
26
27 #include "mq.h"
28
29 /**
30  * Signature of functions implementing the
31  * sending part of a message queue
32  *
33  * @param q the message queue
34  * @param m the message
35  */
36 typedef void (*SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m);
37
38
39 typedef void (*DestroyImpl) (struct GNUNET_MQ_MessageQueue *q);
40
41
42 /**
43  * Collection of the state necessary to read and write gnunet messages 
44  * to a stream socket. Should be used as closure for stream_data_processor.
45  */
46 struct MessageStreamState
47 {
48   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
49   struct MessageQueue *mq;
50   struct GNUNET_STREAM_Socket *socket;
51   struct GNUNET_STREAM_ReadHandle *rh;
52   struct GNUNET_STREAM_WriteHandle *wh;
53 };
54
55
56 struct ServerClientSocketState
57 {
58   struct GNUNET_SERVER_Client *client;
59   struct GNUNET_SERVER_TransmitHandle* th;
60 };
61
62
63 struct ClientConnectionState
64 {
65   struct GNUNET_CLIENT_Connection *connection;
66   struct GNUNET_CLIENT_TransmitHandle *th;
67 };
68
69
70 struct GNUNET_MQ_MessageQueue
71 {
72   /**
73    * Handlers array, or NULL if the queue should not receive messages
74    */
75   const struct GNUNET_MQ_Handler *handlers;
76
77   /**
78    * Closure for the handler callbacks
79    */
80   void *handlers_cls;
81
82   /**
83    * Actual implementation of message sending,
84    * called when a message is added
85    */
86   SendImpl send_impl;
87
88   /**
89    * Implementation-dependent queue destruction function
90    */
91   DestroyImpl destroy_impl;
92
93   /**
94    * Implementation-specific state
95    */
96   void *impl_state;
97
98   /**
99    * Linked list of messages pending to be sent
100    */
101   struct GNUNET_MQ_Message *msg_head;
102
103   /**
104    * Linked list of messages pending to be sent
105    */
106   struct GNUNET_MQ_Message *msg_tail;
107
108   /**
109    * Message that is currently scheduled to be
110    * sent. Not the head of the message queue, as the implementation
111    * needs to know if sending has been already scheduled or not.
112    */
113   struct GNUNET_MQ_Message *current_msg;
114
115   /**
116    * Map of associations, lazily allocated
117    */
118   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
119
120   /**
121    * Next id that should be used for the assoc_map,
122    * initialized lazily to a random value together with
123    * assoc_map
124    */
125   uint32_t assoc_id;
126 };
127
128
129 struct GNUNET_MQ_Message
130 {
131   /**
132    * Messages are stored in a linked list
133    */
134   struct GNUNET_MQ_Message *next;
135
136   /**
137    * Messages are stored in a linked list
138    */
139   struct GNUNET_MQ_Message *prev;
140
141   /**
142    * Actual allocated message header,
143    * usually points to the end of the containing GNUNET_MQ_Message
144    */
145   struct GNUNET_MessageHeader *mh;
146
147   /**
148    * Queue the message is queued in, NULL if message is not queued.
149    */
150   struct GNUNET_MQ_MessageQueue *parent_queue;
151
152   /**
153    * Called after the message was sent irrevokably
154    */
155   GNUNET_MQ_NotifyCallback sent_cb;
156
157   /**
158    * Closure for send_cb
159    */
160   void *sent_cls;
161 };
162
163
164 /**
165  * Call the right callback for a message received
166  * by a queue
167  */
168 static void
169 dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
170 {
171   const struct GNUNET_MQ_Handler *handler;
172
173   handler = mq->handlers;
174   if (NULL == handler)
175     return;
176   for (; NULL != handler->cb; handler++)
177     if (handler->type == ntohs (mh->type))
178       handler->cb (mq->handlers_cls, mh);
179 }
180
181
182 void
183 GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
184 {
185   GNUNET_assert (NULL == mqm->parent_queue);
186   GNUNET_free (mqm);
187 }
188
189
190 /**
191  * Send a message with the give message queue.
192  * May only be called once per message.
193  * 
194  * @param mq message queue
195  * @param mqm the message to send.
196  */
197 void
198 GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
199 {
200   mq->send_impl (mq, mqm);
201 }
202
203
204 struct GNUNET_MQ_Message *
205 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
206 {
207   struct GNUNET_MQ_Message *mqm;
208   mqm = GNUNET_malloc (sizeof *mqm + size);
209   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
210   mqm->mh->size = htons (size);
211   mqm->mh->type = htons (type);
212   if (NULL != mhp)
213     *mhp = mqm->mh;
214   return mqm;
215 }
216
217
218 struct GNUNET_MQ_Message *
219 GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, struct GNUNET_MessageHeader *m, uint16_t type)
220 {
221   struct GNUNET_MQ_Message *mq;
222
223   GNUNET_assert (NULL != mhp);
224   if (NULL == m)
225     return GNUNET_MQ_msg_ (mhp, base_size, type);
226   GNUNET_assert (ntohs (m->size >= sizeof (struct GNUNET_MessageHeader)));
227   /* check for overflow */
228   if (base_size + ntohs (m->size) <= base_size)
229     return NULL; 
230   mq = GNUNET_MQ_msg_ (mhp, base_size + ntohs (m->size), type);
231   memcpy (((void *) *mhp) + base_size, m, ntohs (m->size));
232   return mq;
233 }
234
235
236 /**
237  * Functions of this signature are called whenever writing operations
238  * on a stream are executed
239  *
240  * @param cls the closure from GNUNET_STREAM_write
241  * @param status the status of the stream at the time this function is called;
242  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
243  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
244  *          (this doesn't mean that the data is never sent, the receiver may
245  *          have read the data but its ACKs may have been lost);
246  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
247  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
248  *          be processed.
249  * @param size the number of bytes written
250  */
251 static void 
252 stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
253 {
254   struct GNUNET_MQ_MessageQueue *mq = cls;
255   struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
256   struct GNUNET_MQ_Message *mqm;
257
258   GNUNET_assert (GNUNET_STREAM_OK == status);
259   
260   /* call cb for message we finished sending */
261   mqm = mq->current_msg;
262   if (NULL != mqm)
263   {
264     if (NULL != mqm->sent_cb)
265       mqm->sent_cb (mqm->sent_cls);
266     GNUNET_free (mqm);
267   }
268
269   mss->wh = NULL;
270
271   mqm = mq->msg_head;
272   mq->current_msg = mqm;
273   if (NULL == mqm)
274     return;
275   GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
276   mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
277                                  GNUNET_TIME_UNIT_FOREVER_REL, stream_write_queued, cls);
278   GNUNET_assert (NULL != mss->wh);
279 }
280
281
282 static void
283 stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
284 {
285   if (NULL != mq->current_msg)
286   {
287     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
288     return;
289   }
290   stream_write_queued (mq, GNUNET_STREAM_OK, 0);
291 }
292
293
294 /**
295  * Functions with this signature are called whenever a
296  * complete message is received by the tokenizer.
297  *
298  * Do not call GNUNET_SERVER_mst_destroy in callback
299  *
300  * @param cls closure
301  * @param client identification of the client
302  * @param message the actual message
303  *
304  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
305  */
306 static int
307 stream_mst_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
308 {
309   struct GNUNET_MQ_MessageQueue *mq = cls;
310
311   GNUNET_assert (NULL != message);
312   dispatch_message (mq, message);
313   return GNUNET_OK;
314 }
315
316
317 /**
318  * Functions of this signature are called whenever data is available from the
319  * stream.
320  *
321  * @param cls the closure from GNUNET_STREAM_read
322  * @param status the status of the stream at the time this function is called
323  * @param data traffic from the other side
324  * @param size the number of bytes available in data read; will be 0 on timeout 
325  * @return number of bytes of processed from 'data' (any data remaining should be
326  *         given to the next time the read processor is called).
327  */
328 static size_t
329 stream_data_processor (void *cls,
330                        enum GNUNET_STREAM_Status status,
331                        const void *data,
332                        size_t size)
333 {
334   struct GNUNET_MQ_MessageQueue *mq = cls;
335   struct MessageStreamState *mss;
336   int ret;
337   mss = (struct MessageStreamState *) mq->impl_state;
338
339   GNUNET_assert (GNUNET_STREAM_OK == status);
340   ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
341   GNUNET_assert (GNUNET_OK == ret);
342   /* we always read all data */
343   return size;
344 }
345
346
347 struct GNUNET_MQ_MessageQueue *
348 GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
349                                    const struct GNUNET_MQ_Handler *handlers,
350                                    void *cls)
351 {
352   struct GNUNET_MQ_MessageQueue *mq;
353   struct MessageStreamState *mss;
354
355   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
356   mss = GNUNET_new (struct MessageStreamState);
357   mss->socket = socket;
358   mq->impl_state = mss;
359   mq->send_impl = stream_socket_send_impl;
360   mq->handlers = handlers;
361   mq->handlers_cls = cls;
362   if (NULL != handlers)
363   {
364     mss->mst = GNUNET_SERVER_mst_create (stream_mst_callback, mq);
365     mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 
366                                   stream_data_processor, mq);
367   }
368   return mq;
369 }
370
371
372 /**
373  * Transmit a queued message to the session's client.
374  *
375  * @param cls consensus session
376  * @param size number of bytes available in buf
377  * @param buf where the callee should write the message
378  * @return number of bytes written to buf
379  */
380 static size_t
381 transmit_queued (void *cls, size_t size,
382                  void *buf)
383 {
384   struct GNUNET_MQ_MessageQueue *mq = cls;
385   struct GNUNET_MQ_Message *mqm = mq->current_msg;
386   struct ServerClientSocketState *state = mq->impl_state;
387   size_t msg_size;
388
389   mq->current_msg = NULL;
390   GNUNET_assert (NULL != mqm);
391   GNUNET_assert (NULL != buf);
392   msg_size = ntohs (mqm->mh->size);
393   GNUNET_assert (size >= msg_size);
394   memcpy (buf, mqm->mh, msg_size);
395   GNUNET_free (mqm);
396   state->th = NULL;
397   if (NULL != mq->msg_head)
398   {
399     mq->current_msg = mq->msg_head;
400     GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
401     state->th = 
402         GNUNET_SERVER_notify_transmit_ready (state->client, msg_size, 
403                                              GNUNET_TIME_UNIT_FOREVER_REL,
404                                              &transmit_queued, mq);
405   }
406   return msg_size;
407 }
408
409
410 static void
411 server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
412 {
413   struct ServerClientSocketState *state = mq->impl_state;
414   int msize;
415
416   GNUNET_assert (NULL != state);
417
418   if (NULL != state->th)
419   {
420     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
421     return;
422   }
423   GNUNET_assert (NULL == mq->current_msg);
424   msize = ntohs (mq->msg_head->mh->size);
425   mq->current_msg = mqm;
426   state->th = 
427       GNUNET_SERVER_notify_transmit_ready (state->client, msize, 
428                                            GNUNET_TIME_UNIT_FOREVER_REL,
429                                            &transmit_queued, mq);
430 }
431
432
433 struct GNUNET_MQ_MessageQueue *
434 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
435 {
436   struct GNUNET_MQ_MessageQueue *mq;
437   struct ServerClientSocketState *scss;
438
439   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
440   scss = GNUNET_new (struct ServerClientSocketState);
441   mq->impl_state = scss;
442   mq->send_impl = server_client_send_impl;
443   return mq;
444 }
445
446
447 /**
448  * Transmit a queued message to the session's client.
449  *
450  * @param cls consensus session
451  * @param size number of bytes available in buf
452  * @param buf where the callee should write the message
453  * @return number of bytes written to buf
454  */
455 static size_t
456 connection_client_transmit_queued (void *cls, size_t size,
457                  void *buf)
458 {
459   struct GNUNET_MQ_MessageQueue *mq = cls;
460   struct GNUNET_MQ_Message *mqm = mq->current_msg;
461   struct ClientConnectionState *state = mq->impl_state;
462   size_t msg_size;
463
464   mq->current_msg = NULL;
465   GNUNET_assert (NULL != mqm);
466   GNUNET_assert (NULL != buf);
467   msg_size = ntohs (mqm->mh->size);
468   GNUNET_assert (size >= msg_size);
469   memcpy (buf, mqm->mh, msg_size);
470   GNUNET_free (mqm);
471   state->th = NULL;
472   if (NULL != mq->msg_head)
473   {
474     mq->current_msg = mq->msg_head;
475     GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
476     state->th = 
477         GNUNET_CLIENT_notify_transmit_ready (state->connection, msg_size, 
478                                              GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
479                                              &connection_client_transmit_queued, mq);
480   }
481   return msg_size;
482 }
483
484
485 static void
486 connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
487 {
488   struct ClientConnectionState *state = mq->impl_state;
489   int msize;
490
491   GNUNET_assert (NULL != state);
492
493   if (NULL != state->th)
494   {
495     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
496     return;
497   }
498   GNUNET_assert (NULL == mq->current_msg);
499   mq->current_msg = mqm;
500   msize = ntohs (mqm->mh->size);
501   state->th = 
502       GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, 
503                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
504                                            &connection_client_transmit_queued, mq);
505 }
506
507
508 /**
509  * Type of a function to call when we receive a message
510  * from the service.
511  *
512  * @param cls closure
513  * @param msg message received, NULL on timeout or fatal error
514  */
515 static void
516 handle_client_message (void *cls,
517                        const struct GNUNET_MessageHeader *msg)
518 {
519   struct GNUNET_MQ_MessageQueue *mq = cls;
520
521   GNUNET_assert (NULL != msg);
522
523   dispatch_message (mq, msg);
524 }
525
526
527 struct GNUNET_MQ_MessageQueue *
528 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
529                                        const struct GNUNET_MQ_Handler *handlers,
530                                        void *cls)
531 {
532   struct GNUNET_MQ_MessageQueue *mq;
533   struct ClientConnectionState *state;
534
535   GNUNET_assert (NULL != connection);
536
537   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
538   mq->handlers = handlers;
539   mq->handlers_cls = cls;
540   state = GNUNET_new (struct ClientConnectionState);
541   state->connection = connection;
542   mq->impl_state = state;
543   mq->send_impl = connection_client_send_impl;
544
545   if (NULL != handlers)
546   {
547     GNUNET_CLIENT_receive (connection, handle_client_message, mq,
548                            GNUNET_TIME_UNIT_FOREVER_REL);
549   }
550
551   return mq;
552 }
553
554
555
556 void
557 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
558                             const struct GNUNET_MQ_Handler *new_handlers,
559                             void *cls)
560 {
561   mq->handlers = new_handlers;
562   mq->handlers_cls = cls;
563 }
564
565
566
567 /**
568  * Associate the assoc_data in mq with a unique request id.
569  *
570  * @param mq message queue, id will be unique for the queue
571  * @param mqm message to associate
572  * @param data to associate
573  */
574 uint32_t
575 GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
576                      struct GNUNET_MQ_Message *mqm,
577                      void *assoc_data)
578 {
579   uint32_t id;
580
581   if (NULL == mq->assoc_map)
582     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
583   id = mq->assoc_id++;
584   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
585                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
586   return id;
587 }
588
589
590
591 void *
592 GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
593 {
594   if (NULL == mq->assoc_map)
595     return NULL;
596   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
597 }
598
599
600 void *
601 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
602 {
603   void *val;
604
605   if (NULL == mq->assoc_map)
606     return NULL;
607   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
608   GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
609   return val;
610 }
611
612
613 void
614 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
615                        GNUNET_MQ_NotifyCallback cb,
616                        void *cls)
617 {
618   mqm->sent_cb = cb;
619   mqm->sent_cls = cls;
620 }
621
622
623 void
624 GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
625 {
626   /* FIXME: destroy all pending messages in the queue */
627   GNUNET_free (mq);
628 }
629