36cacd30b0e0499fc6e5db571d72d21fd9d4b611
[oweals/gnunet.git] / src / util / mq.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @author Florian Dold
23  * @file util/mq.c
24  * @brief general purpose request queue
25  */
26
27 #include "platform.h"
28 #include "gnunet_common.h"
29 #include "gnunet_util_lib.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
32
33
34
35 struct ServerClientSocketState
36 {
37   struct GNUNET_SERVER_Client *client;
38   struct GNUNET_SERVER_TransmitHandle* th;
39 };
40
41
42 struct ClientConnectionState
43 {
44   /**
45    * Did we call receive?
46    */
47   int receive_active;
48   struct GNUNET_CLIENT_Connection *connection;
49   struct GNUNET_CLIENT_TransmitHandle *th;
50 };
51
52
53
54
55 /**
56  * Call the right callback for a message.
57  *
58  * @param mq message queue with the handlers
59  * @param mh message to dispatch
60  */
61 void
62 GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
63 {
64   const struct GNUNET_MQ_Handler *handler;
65   int handled = GNUNET_NO;
66
67   handler = mq->handlers;
68   if (NULL == handler)
69     return;
70   for (; NULL != handler->cb; handler++)
71   {
72     if (handler->type == ntohs (mh->type))
73     {
74       handler->cb (mq->handlers_cls, mh);
75       handled = GNUNET_YES;
76     }
77   }
78   
79   if (GNUNET_NO == handled)
80     LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
81 }
82
83
84 void
85 GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
86 {
87   GNUNET_assert (NULL == mqm->parent_queue);
88   GNUNET_free (mqm);
89 }
90
91
92 /**
93  * Send a message with the give message queue.
94  * May only be called once per message.
95  * 
96  * @param mq message queue
97  * @param mqm the message to send.
98  */
99 void
100 GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
101 {
102   GNUNET_assert (NULL != mq);
103   mq->send_impl (mq, mqm);
104 }
105
106
107 struct GNUNET_MQ_Message *
108 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
109 {
110   struct GNUNET_MQ_Message *mqm;
111
112   mqm = GNUNET_malloc (sizeof *mqm + size);
113   mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
114   mqm->mh->size = htons (size);
115   mqm->mh->type = htons (type);
116   if (NULL != mhp)
117     *mhp = mqm->mh;
118   return mqm;
119 }
120
121
122 int
123 GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
124                  const void *data, uint16_t len)
125 {
126   size_t new_size;
127   size_t old_size;
128
129   GNUNET_assert (NULL != mqmp);
130   /* there's no data to append => do nothing */
131   if (NULL == data)
132     return GNUNET_OK;
133   old_size = ntohs ((*mqmp)->mh->size);
134   /* message too large to concatenate? */
135   if (((uint16_t) (old_size + len)) < len)
136     return GNUNET_SYSERR;
137   new_size = old_size + len;
138   *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
139   (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1];
140   memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size);
141   (*mqmp)->mh->size = htons (new_size);
142   return GNUNET_OK;
143 }
144
145
146
147
148 /*** Transmit a queued message to the session's client.
149  *
150  * @param cls consensus session
151  * @param size number of bytes available in buf
152  * @param buf where the callee should write the message
153  * @return number of bytes written to buf
154  */
155 static size_t
156 transmit_queued (void *cls, size_t size,
157                  void *buf)
158 {
159   struct GNUNET_MQ_MessageQueue *mq = cls;
160   struct GNUNET_MQ_Message *mqm = mq->current_msg;
161   struct ServerClientSocketState *state = mq->impl_state;
162   size_t msg_size;
163
164   GNUNET_assert (NULL != buf);
165
166   if (NULL != mqm->sent_cb)
167   {
168     mqm->sent_cb (mqm->sent_cls);
169   }
170
171   mq->current_msg = NULL;
172   GNUNET_assert (NULL != mqm);
173   msg_size = ntohs (mqm->mh->size);
174   GNUNET_assert (size >= msg_size);
175   memcpy (buf, mqm->mh, msg_size);
176   GNUNET_free (mqm);
177   state->th = NULL;
178
179   if (NULL != mq->msg_head)
180   {
181     mq->current_msg = mq->msg_head;
182     GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
183     state->th = 
184         GNUNET_SERVER_notify_transmit_ready (state->client, msg_size, 
185                                              GNUNET_TIME_UNIT_FOREVER_REL,
186                                              &transmit_queued, mq);
187   }
188   return msg_size;
189 }
190
191
192
193 static void
194 server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
195 {
196   struct ServerClientSocketState *state;
197   
198   GNUNET_assert (NULL != mq);
199   state = mq->impl_state;
200   GNUNET_assert (NULL != state);
201   GNUNET_SERVER_client_drop (state->client);
202   GNUNET_free (state);
203 }
204
205 static void
206 server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
207 {
208   struct ServerClientSocketState *state;
209   int msize;
210
211   GNUNET_assert (NULL != mq);
212   state = mq->impl_state;
213   GNUNET_assert (NULL != state);
214
215   if (NULL != state->th)
216   {
217     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
218     return;
219   }
220   GNUNET_assert (NULL == mq->msg_head);
221   GNUNET_assert (NULL == mq->current_msg);
222   msize = ntohs (mqm->mh->size);
223   mq->current_msg = mqm;
224   state->th = 
225       GNUNET_SERVER_notify_transmit_ready (state->client, msize, 
226                                            GNUNET_TIME_UNIT_FOREVER_REL,
227                                            &transmit_queued, mq);
228 }
229
230
231 struct GNUNET_MQ_MessageQueue *
232 GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
233 {
234   struct GNUNET_MQ_MessageQueue *mq;
235   struct ServerClientSocketState *scss;
236
237   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
238   scss = GNUNET_new (struct ServerClientSocketState);
239   mq->impl_state = scss;
240   scss->client = client;
241   GNUNET_SERVER_client_keep (client);
242   mq->send_impl = server_client_send_impl;
243   mq->destroy_impl = server_client_destroy_impl;
244   return mq;
245 }
246
247
248 /**
249  * Type of a function to call when we receive a message
250  * from the service.
251  *
252  * @param cls closure
253  * @param msg message received, NULL on timeout or fatal error
254  */
255 static void
256 handle_client_message (void *cls,
257                        const struct GNUNET_MessageHeader *msg)
258 {
259   struct GNUNET_MQ_MessageQueue *mq = cls;
260   struct ClientConnectionState *state;
261
262   state = mq->impl_state;
263   
264   if (NULL == msg)
265   {
266     if (NULL == mq->error_handler)
267       LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
268     mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
269     return;
270   }
271
272   GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
273                          GNUNET_TIME_UNIT_FOREVER_REL);
274
275   GNUNET_MQ_dispatch (mq, msg);
276 }
277
278
279 /**
280  * Transmit a queued message to the session's client.
281  *
282  * @param cls consensus session
283  * @param size number of bytes available in buf
284  * @param buf where the callee should write the message
285  * @return number of bytes written to buf
286  */
287 static size_t
288 connection_client_transmit_queued (void *cls, size_t size,
289                  void *buf)
290 {
291   struct GNUNET_MQ_MessageQueue *mq = cls;
292   struct GNUNET_MQ_Message *mqm = mq->current_msg;
293   struct ClientConnectionState *state = mq->impl_state;
294   size_t msg_size;
295
296   if (NULL == buf)
297   {
298     if (NULL == mq->error_handler)
299     {
300       LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed\n");
301       return 0;
302     }
303     mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
304     return 0;
305   }
306
307   if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active))
308   {
309     state->receive_active = GNUNET_YES;
310     GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
311                            GNUNET_TIME_UNIT_FOREVER_REL);
312   }
313
314
315   GNUNET_assert (NULL != mqm);
316
317   if (NULL != mqm->sent_cb)
318   {
319     mqm->sent_cb (mqm->sent_cls);
320   }
321
322   mq->current_msg = NULL;
323   GNUNET_assert (NULL != buf);
324   msg_size = ntohs (mqm->mh->size);
325   GNUNET_assert (size >= msg_size);
326   memcpy (buf, mqm->mh, msg_size);
327   GNUNET_free (mqm);
328   state->th = NULL;
329   if (NULL != mq->msg_head)
330   {
331     mq->current_msg = mq->msg_head;
332     GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
333     state->th = 
334       GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size), 
335                                              GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
336                                              &connection_client_transmit_queued, mq);
337   }
338   return msg_size;
339 }
340
341
342
343 static void
344 connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
345 {
346   GNUNET_free (mq->impl_state);
347 }
348
349 static void
350 connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
351                              struct GNUNET_MQ_Message *mqm)
352 {
353   struct ClientConnectionState *state = mq->impl_state;
354   int msize;
355
356   GNUNET_assert (NULL != state);
357
358   if (NULL != state->th)
359   {
360     GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
361     return;
362   }
363   GNUNET_assert (NULL == mq->current_msg);
364   mq->current_msg = mqm;
365   msize = ntohs (mqm->mh->size);
366   state->th = 
367       GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, 
368                                            GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
369                                            &connection_client_transmit_queued, mq);
370 }
371
372
373
374
375
376 struct GNUNET_MQ_MessageQueue *
377 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
378                                        const struct GNUNET_MQ_Handler *handlers,
379                                        void *cls)
380 {
381   struct GNUNET_MQ_MessageQueue *mq;
382   struct ClientConnectionState *state;
383
384   GNUNET_assert (NULL != connection);
385
386   mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
387   mq->handlers = handlers;
388   mq->handlers_cls = cls;
389   state = GNUNET_new (struct ClientConnectionState);
390   state->connection = connection;
391   mq->impl_state = state;
392   mq->send_impl = connection_client_send_impl;
393   mq->destroy_impl = connection_client_destroy_impl;
394
395   return mq;
396 }
397
398
399 void
400 GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
401                             const struct GNUNET_MQ_Handler *new_handlers,
402                             void *cls)
403 {
404   mq->handlers = new_handlers;
405   mq->handlers_cls = cls;
406 }
407
408
409 /**
410  * Associate the assoc_data in mq with a unique request id.
411  *
412  * @param mq message queue, id will be unique for the queue
413  * @param mqm message to associate
414  * @param assoc_data to associate
415  */
416 uint32_t
417 GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
418                      struct GNUNET_MQ_Message *mqm,
419                      void *assoc_data)
420 {
421   uint32_t id;
422
423   if (NULL == mq->assoc_map)
424   {
425     mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
426     mq->assoc_id = 1;
427   }
428   id = mq->assoc_id++;
429   GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
430                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
431   return id;
432 }
433
434
435
436 void *
437 GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
438 {
439   if (NULL == mq->assoc_map)
440     return NULL;
441   return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
442 }
443
444
445 void *
446 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
447 {
448   void *val;
449
450   if (NULL == mq->assoc_map)
451     return NULL;
452   val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
453   GNUNET_assert (NULL != val);
454   GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
455   return val;
456 }
457
458
459 void
460 GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
461                        GNUNET_MQ_NotifyCallback cb,
462                        void *cls)
463 {
464   mqm->sent_cb = cb;
465   mqm->sent_cls = cls;
466 }
467
468
469 void
470 GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
471 {
472   /* FIXME: destroy all pending messages in the queue */
473
474   if (NULL != mq->destroy_impl)
475   {
476     mq->destroy_impl (mq);
477   }
478
479   GNUNET_free (mq);
480 }
481