work on gnunet-set, isolated bug in stream
[oweals/gnunet.git] / src / set / mq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file mq/mq.c
24  * @brief general purpose request queue
25  */
26
27 #include "mq.h"
28
29 /**
30  * Signature of functions implementing the
31  * sending part of a message queue
32  *
33  * @param q the message queue
34  * @param m the message
35  */
36 typedef void (*SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m);
37
38
39 typedef void (*DestroyImpl) (struct GNUNET_MQ_MessageQueue *q);
40
41
42 /**
43  * Collection of the state necessary to read and write gnunet messages 
44  * to a stream socket. Should be used as closure for stream_data_processor.
45  */
46 struct MessageStreamState
47 {
48   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
49   struct MessageQueue *mq;
50   struct GNUNET_STREAM_Socket *socket;
51   struct GNUNET_STREAM_ReadHandle *rh;
52   struct GNUNET_STREAM_WriteHandle *wh;
53 };
54
55
56 struct ServerClientSocketState
57 {
58   struct GNUNET_SERVER_Client *client;
59   struct GNUNET_SERVER_TransmitHandle* th;
60 };
61
62
63 struct ClientConnectionState
64 {
65   struct GNUNET_CLIENT_Connection *connection;
66   struct GNUNET_CLIENT_TransmitHandle *th;
67 };
68
69
70 struct GNUNET_MQ_MessageQueue
71 {
72   /**
73    * Handlers array, or NULL if the queue should not receive messages
74    */
75   const struct GNUNET_MQ_Handler *handlers;
76
77   /**
78    * Closure for the handler callbacks
79    */
80   void *handlers_cls;
81
82   /**
83    * Actual implementation of message sending,
84    * called when a message is added
85    */
86   SendImpl send_impl;
87
88   /**
89    * Implementation-dependent queue destruction function
90    */
91   DestroyImpl destroy_impl;
92
93   /**
94    * Implementation-specific state
95    */
96   void *impl_state;
97
98   /**
99    * Linked list of messages pending to be sent
100    */
101   struct GNUNET_MQ_Message *msg_head;
102
103   /**
104    * Linked list of messages pending to be sent
105    */
106   struct GNUNET_MQ_Message *msg_tail;
107
108   /**
109    * Message that is currently scheduled to be
110    * sent. Not the head of the message queue, as the implementation
111    * needs to know if sending has been already scheduled or not.
112    */
113   struct GNUNET_MQ_Message *current_msg;
114
115   /**
116    * Map of associations, lazily allocated
117    */
118   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
119
120   /**
121    * Next id that should be used for the assoc_map,
122    * initialized lazily to a random value together with
123    * assoc_map
124    */
125   uint32_t assoc_id;
126 };
127
128
129 struct GNUNET_MQ_Message
130 {
131   /**
132    * Messages are stored in a linked list
133    */
134   struct GNUNET_MQ_Message *next;
135
136   /**
137    * Messages are stored in a linked list
138    */
139   struct GNUNET_MQ_Message *prev;
140
141   /**
142    * Actual allocated message header,
143    * usually points to the end of the containing GNUNET_MQ_Message
144    */
145   struct GNUNET_MessageHeader *mh;
146
147   /**
148    * Queue the message is queued in, NULL if message is not queued.
149    */
150   struct GNUNET_MQ_MessageQueue *parent_queue;
151
152   /**
153    * Called after the message was sent irrevokably
154    */
155   GNUNET_MQ_NotifyCallback sent_cb;
156
157   /**
158    * Closure for send_cb
159    */
160   void *sent_cls;
161 };
162
163
164 /**
165  * Call the right callback for a message received
166  * by a queue
167  */
168 static void
169 dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
170 {
171   const struct GNUNET_MQ_Handler *handler;
172
173   handler = mq->handlers;
174   if (NULL == handler)
175     return;
176   for (; NULL != handler->cb; handler++)
177     if (handler->type == ntohs (mh->type))
178       handler->cb (mq->handlers_cls, mh);
179 }
180
181
182 void
183 GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
184 {
185   GNUNET_assert (NULL == mqm->parent_queue);
186   GNUNET_free (mqm);
187 }
188
189
190 /**
191  * Send a message with the give message queue.
192  * May only be called once per message.
193  * 
194  * @param mq message queue
195  * @param mqm the message to send.
196  */
197 void
198 GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
199 {
200   mq->send_impl (mq, mqm);
201 }
202
203
204 struct GNUNET_MQ_Message *
205 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
206 {
207   struct GNUNET_MQ_Message *mqm;
208   mqm = GNUNET_malloc (sizeof *mqm + size);
209   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
210   mqm->mh->size = htons (size);
211   mqm->mh->type = htons (type);
212   if (NULL != mhp)
213     *mhp = mqm->mh;
214   return mqm;
215 }
216
217
218 int
219 GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
220                  const struct GNUNET_MessageHeader *m)
221 {
222   size_t new_size;
223   size_t old_size;
224
225   if (NULL == m)
226     return GNUNET_OK;
227   GNUNET_assert (NULL != mqmp);
228   old_size = ntohs ((*mqmp)->mh->size);
229   /* message too large to concatenate? */
230   if (ntohs ((*mqmp)->mh->size) + ntohs (m->size) < ntohs (m->size))
231     return GNUNET_SYSERR;
232   new_size = old_size + ntohs (m->size);
233   *mqmp = GNUNET_realloc (mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
234   memcpy ((*mqmp)->mh + old_size, m, new_size - old_size);
235   (*mqmp)->mh->size = htons (new_size);
236   return GNUNET_OK;
237 }
238
239
240 /**
241  * Functions of this signature are called whenever writing operations
242  * on a stream are executed
243  *
244  * @param cls the closure from GNUNET_STREAM_write
245  * @param status the status of the stream at the time this function is called;
246  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
247  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
248  *          (this doesn't mean that the data is never sent, the receiver may
249  *          have read the data but its ACKs may have been lost);
250  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
251  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
252  *          be processed.
253  * @param size the number of bytes written
254  */
255 static void 
256 stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
257 {
258   struct GNUNET_MQ_MessageQueue *mq = cls;
259   struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
260   struct GNUNET_MQ_Message *mqm;
261
262   GNUNET_assert (GNUNET_STREAM_OK == status);
263   
264   /* call cb for message we finished sending */
265   mqm = mq->current_msg;
266   if (NULL != mqm)
267   {
268     if (NULL != mqm->sent_cb)
269       mqm->sent_cb (mqm->sent_cls);
270     GNUNET_free (mqm);
271   }
272
273   mss->wh = NULL;
274
275   mqm = mq->msg_head;
276   mq->current_msg = mqm;
277   if (NULL == mqm)
278     return;
279   GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
280   mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
281                                  GNUNET_TIME_UNIT_FOREVER_REL,
282                                  stream_write_queued, mq);
283   GNUNET_assert (NULL != mss->wh);
284 }
285
286
287 static void
288 stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq,
289                          struct GNUNET_MQ_Message *mqm)
290 {
291   struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
292   if (NULL != mq->current_msg)
293   {
294     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
295     return;
296   }
297   mq->current_msg = mqm;
298   mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
299                                  GNUNET_TIME_UNIT_FOREVER_REL,
300                                  stream_write_queued, mq);
301 }
302
303
304 /**
305  * Functions with this signature are called whenever a
306  * complete message is received by the tokenizer.
307  *
308  * Do not call GNUNET_SERVER_mst_destroy in callback
309  *
310  * @param cls closure
311  * @param client identification of the client
312  * @param message the actual message
313  *
314  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
315  */
316 static int
317 stream_mst_callback (void *cls, void *client,
318                      const struct GNUNET_MessageHeader *message)
319 {
320   struct GNUNET_MQ_MessageQueue *mq = cls;
321
322   GNUNET_assert (NULL != message);
323   dispatch_message (mq, message);
324   return GNUNET_OK;
325 }
326
327
328 /**
329  * Functions of this signature are called whenever data is available from the
330  * stream.
331  *
332  * @param cls the closure from GNUNET_STREAM_read
333  * @param status the status of the stream at the time this function is called
334  * @param data traffic from the other side
335  * @param size the number of bytes available in data read; will be 0 on timeout 
336  * @return number of bytes of processed from 'data' (any data remaining should be
337  *         given to the next time the read processor is called).
338  */
339 static size_t
340 stream_data_processor (void *cls,
341                        enum GNUNET_STREAM_Status status,
342                        const void *data,
343                        size_t size)
344 {
345   struct GNUNET_MQ_MessageQueue *mq = cls;
346   struct MessageStreamState *mss;
347   int ret;
348   
349   mss = (struct MessageStreamState *) mq->impl_state;
350   GNUNET_assert (GNUNET_STREAM_OK == status);
351   ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
352   GNUNET_assert (GNUNET_OK == ret);
353   /* we always read all data */
354     mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, 
355                                   stream_data_processor, mq);
356   return size;
357 }
358
359
360 struct GNUNET_MQ_MessageQueue *
361 GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
362                                    const struct GNUNET_MQ_Handler *handlers,
363                                    void *cls)
364 {
365   struct GNUNET_MQ_MessageQueue *mq;
366   struct MessageStreamState *mss;
367
368   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
369   mss = GNUNET_new (struct MessageStreamState);
370   mss->socket = socket;
371   mq->impl_state = mss;
372   mq->send_impl = stream_socket_send_impl;
373   mq->handlers = handlers;
374   mq->handlers_cls = cls;
375   if (NULL != handlers)
376   {
377     mss->mst = GNUNET_SERVER_mst_create (stream_mst_callback, mq);
378     mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 
379                                   stream_data_processor, mq);
380   }
381   return mq;
382 }
383
384
385 /*** Transmit a queued message to the session's client.
386  *
387  * @param cls consensus session
388  * @param size number of bytes available in buf
389  * @param buf where the callee should write the message
390  * @return number of bytes written to buf
391  */
392 static size_t
393 transmit_queued (void *cls, size_t size,
394                  void *buf)
395 {
396   struct GNUNET_MQ_MessageQueue *mq = cls;
397   struct GNUNET_MQ_Message *mqm = mq->current_msg;
398   struct ServerClientSocketState *state = mq->impl_state;
399   size_t msg_size;
400
401   mq->current_msg = NULL;
402   GNUNET_assert (NULL != mqm);
403   GNUNET_assert (NULL != buf);
404   msg_size = ntohs (mqm->mh->size);
405   GNUNET_assert (size >= msg_size);
406   memcpy (buf, mqm->mh, msg_size);
407   GNUNET_free (mqm);
408   state->th = NULL;
409   if (NULL != mq->msg_head)
410   {
411     mq->current_msg = mq->msg_head;
412     GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
413     state->th = 
414         GNUNET_SERVER_notify_transmit_ready (state->client, msg_size, 
415                                              GNUNET_TIME_UNIT_FOREVER_REL,
416                                              &transmit_queued, mq);
417   }
418   return msg_size;
419 }
420
421
422 static void
423 server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
424 {
425   struct ServerClientSocketState *state = mq->impl_state;
426   int msize;
427
428   GNUNET_assert (NULL != state);
429
430   if (NULL != state->th)
431   {
432     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
433     return;
434   }
435   GNUNET_assert (NULL == mq->current_msg);
436   msize = ntohs (mq->msg_head->mh->size);
437   mq->current_msg = mqm;
438   state->th = 
439       GNUNET_SERVER_notify_transmit_ready (state->client, msize, 
440                                            GNUNET_TIME_UNIT_FOREVER_REL,
441                                            &transmit_queued, mq);
442 }
443
444
445 struct GNUNET_MQ_MessageQueue *
446 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
447 {
448   struct GNUNET_MQ_MessageQueue *mq;
449   struct ServerClientSocketState *scss;
450
451   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
452   scss = GNUNET_new (struct ServerClientSocketState);
453   mq->impl_state = scss;
454   mq->send_impl = server_client_send_impl;
455   return mq;
456 }
457
458
459 /**
460  * Transmit a queued message to the session's client.
461  *
462  * @param cls consensus session
463  * @param size number of bytes available in buf
464  * @param buf where the callee should write the message
465  * @return number of bytes written to buf
466  */
467 static size_t
468 connection_client_transmit_queued (void *cls, size_t size,
469                  void *buf)
470 {
471   struct GNUNET_MQ_MessageQueue *mq = cls;
472   struct GNUNET_MQ_Message *mqm = mq->current_msg;
473   struct ClientConnectionState *state = mq->impl_state;
474   size_t msg_size;
475
476   mq->current_msg = NULL;
477   GNUNET_assert (NULL != mqm);
478   GNUNET_assert (NULL != buf);
479   msg_size = ntohs (mqm->mh->size);
480   GNUNET_assert (size >= msg_size);
481   memcpy (buf, mqm->mh, msg_size);
482   GNUNET_free (mqm);
483   state->th = NULL;
484   if (NULL != mq->msg_head)
485   {
486     mq->current_msg = mq->msg_head;
487     GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
488     state->th = 
489       GNUNET_CLIENT_notify_transmit_ready (state->connection, htons (mq->current_msg->mh->size), 
490                                              GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
491                                              &connection_client_transmit_queued, mq);
492   }
493   return msg_size;
494 }
495
496
497 static void
498 connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
499                              struct GNUNET_MQ_Message *mqm)
500 {
501   struct ClientConnectionState *state = mq->impl_state;
502   int msize;
503
504   GNUNET_assert (NULL != state);
505
506   if (NULL != state->th)
507   {
508     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
509     return;
510   }
511   GNUNET_assert (NULL == mq->current_msg);
512   mq->current_msg = mqm;
513   msize = ntohs (mqm->mh->size);
514   state->th = 
515       GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, 
516                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
517                                            &connection_client_transmit_queued, mq);
518 }
519
520
521 /**
522  * Type of a function to call when we receive a message
523  * from the service.
524  *
525  * @param cls closure
526  * @param msg message received, NULL on timeout or fatal error
527  */
528 static void
529 handle_client_message (void *cls,
530                        const struct GNUNET_MessageHeader *msg)
531 {
532   struct GNUNET_MQ_MessageQueue *mq = cls;
533
534   GNUNET_assert (NULL != msg);
535   dispatch_message (mq, msg);
536 }
537
538
539 struct GNUNET_MQ_MessageQueue *
540 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
541                                        const struct GNUNET_MQ_Handler *handlers,
542                                        void *cls)
543 {
544   struct GNUNET_MQ_MessageQueue *mq;
545   struct ClientConnectionState *state;
546
547   GNUNET_assert (NULL != connection);
548
549   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
550   mq->handlers = handlers;
551   mq->handlers_cls = cls;
552   state = GNUNET_new (struct ClientConnectionState);
553   state->connection = connection;
554   mq->impl_state = state;
555   mq->send_impl = connection_client_send_impl;
556
557   if (NULL != handlers)
558   {
559     GNUNET_CLIENT_receive (connection, handle_client_message, mq,
560                            GNUNET_TIME_UNIT_FOREVER_REL);
561   }
562
563   return mq;
564 }
565
566
567
568 void
569 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
570                             const struct GNUNET_MQ_Handler *new_handlers,
571                             void *cls)
572 {
573   mq->handlers = new_handlers;
574   mq->handlers_cls = cls;
575 }
576
577
578
579 /**
580  * Associate the assoc_data in mq with a unique request id.
581  *
582  * @param mq message queue, id will be unique for the queue
583  * @param mqm message to associate
584  * @param data to associate
585  */
586 uint32_t
587 GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
588                      struct GNUNET_MQ_Message *mqm,
589                      void *assoc_data)
590 {
591   uint32_t id;
592
593   if (NULL == mq->assoc_map)
594     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
595   id = mq->assoc_id++;
596   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
597                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
598   return id;
599 }
600
601
602
603 void *
604 GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
605 {
606   if (NULL == mq->assoc_map)
607     return NULL;
608   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
609 }
610
611
612 void *
613 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
614 {
615   void *val;
616
617   if (NULL == mq->assoc_map)
618     return NULL;
619   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
620   GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
621   return val;
622 }
623
624
625 void
626 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
627                        GNUNET_MQ_NotifyCallback cb,
628                        void *cls)
629 {
630   mqm->sent_cb = cb;
631   mqm->sent_cls = cls;
632 }
633
634
635 void
636 GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
637 {
638   /* FIXME: destroy all pending messages in the queue */
639   GNUNET_free (mq);
640 }
641