- distribute peers equally among island nodes on SuperMUC
[oweals/gnunet.git] / src / testbed / testbed_logger_api.c
1 /*
2       This file is part of GNUnet
3       (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file testbed/testbed_logger_api.c
23  * @brief Client-side routines for communicating with the tesbted logger service
24  * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_testbed_logger_service.h"
30
31 /**
32  * Generic logging shorthand
33  */
34 #define LOG(kind, ...)                          \
35   GNUNET_log_from (kind, "testbed-logger-api", __VA_ARGS__)
36
37 /**
38  * Debug logging
39  */
40 #define LOG_DEBUG(...)                          \
41   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
42
43 #ifdef GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD
44 #undef GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD
45 #endif
46
47 /**
48  * Threshold after which exponential backoff should not increase (15 s).
49  */
50 #define GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
51
52
53 /**
54  * The message queue for sending messages to the controller service
55  */
56 struct MessageQueue
57 {
58   /**
59    * next pointer for DLL
60    */
61   struct MessageQueue *next;
62
63   /**
64    * prev pointer for DLL
65    */
66   struct MessageQueue *prev;
67
68   /**
69    * The message to be sent
70    */
71   struct GNUNET_MessageHeader *msg;
72
73   /**
74    * Completion callback
75    */
76   GNUNET_TESTBED_LOGGER_FlushCompletion cb;
77
78   /**
79    * callback closure
80    */
81   void *cb_cls;
82 };
83
84
85 /**
86  * Connection handle for the logger service
87  */
88 struct GNUNET_TESTBED_LOGGER_Handle
89 {
90   /**
91    * Client connection
92    */
93   struct GNUNET_CLIENT_Connection *client;
94
95   /**
96    * The transport handle
97    */
98   struct GNUNET_CLIENT_TransmitHandle *th;
99
100   /**
101    * DLL head for the message queue
102    */
103   struct MessageQueue *mq_head;
104
105   /**
106    * DLL tail for the message queue
107    */
108   struct MessageQueue *mq_tail;
109
110   /**
111    * Flush completion callback
112    */
113   GNUNET_TESTBED_LOGGER_FlushCompletion cb;
114
115   /**
116    * Closure for the above callback
117    */
118   void *cb_cls;
119
120   /**
121    * Local buffer for data to be transmitted
122    */
123   void *buf;
124
125   /**
126    * The size of the local buffer
127    */
128   size_t bs;
129
130   /**
131    * Number of bytes wrote since last flush
132    */
133   size_t bwrote;
134
135   /**
136    * How long after should we retry sending a message to the service?
137    */
138   struct GNUNET_TIME_Relative retry_backoff;
139
140   /**
141    * Task to call the flush completion callback
142    */
143   GNUNET_SCHEDULER_TaskIdentifier flush_completion_task;
144
145   /**
146    * Task to be executed when flushing takes too long
147    */
148   GNUNET_SCHEDULER_TaskIdentifier timeout_flush_task;
149 };
150
151
152 /**
153  * Cancels the flush timeout task
154  *
155  * @param 
156  * @return 
157  */
158 static void
159 cancel_timeout_flush (struct GNUNET_TESTBED_LOGGER_Handle *h)
160 {
161   GNUNET_SCHEDULER_cancel (h->timeout_flush_task);
162   h->timeout_flush_task = GNUNET_SCHEDULER_NO_TASK;  
163 }
164
165
166 /**
167  * Task to call the flush completion notification
168  *
169  * @param cls the logger handle
170  * @param tc the scheduler task context
171  */
172 static void
173 call_flush_completion (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
174 {
175   struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
176   GNUNET_TESTBED_LOGGER_FlushCompletion cb;
177   void *cb_cls;
178   size_t bw;
179
180   h->flush_completion_task = GNUNET_SCHEDULER_NO_TASK;
181   bw = h->bwrote;
182   h->bwrote = 0;
183   cb = h->cb;
184   h->cb = NULL;
185   cb_cls = h->cb_cls;
186   h->cb_cls = NULL;
187   if (GNUNET_SCHEDULER_NO_TASK != h->timeout_flush_task)
188     cancel_timeout_flush (h);
189   if (NULL != cb)
190     cb (cb_cls, bw);
191 }
192
193
194 /**
195  * Schedule the flush completion notification task
196  *
197  * @param h logger handle
198  */
199 static void
200 trigger_flush_notification (struct GNUNET_TESTBED_LOGGER_Handle *h)
201 {
202   if (GNUNET_SCHEDULER_NO_TASK != h->flush_completion_task)
203     GNUNET_SCHEDULER_cancel (h->flush_completion_task);
204   h->flush_completion_task = GNUNET_SCHEDULER_add_now (&call_flush_completion, h);
205 }
206
207
208 /**
209  * Function called to notify a client about the connection begin ready to queue
210  * more data.  "buf" will be NULL and "size" zero if the connection was closed
211  * for writing in the meantime.
212  *
213  * @param cls closure
214  * @param size number of bytes available in buf
215  * @param buf where the callee should write the message
216  * @return number of bytes written to buf
217  */
218 static size_t
219 transmit_ready_notify (void *cls, size_t size, void *buf)
220 {
221   struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
222   struct MessageQueue *mq;
223
224   h->th = NULL;
225   mq = h->mq_head;
226   GNUNET_assert (NULL != mq);
227   if ((0 == size) && (NULL == buf))     /* Timeout */
228   {
229     LOG_DEBUG ("Message sending timed out -- retrying\n");
230     h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
231     h->th =
232         GNUNET_CLIENT_notify_transmit_ready (h->client,
233                                              ntohs (mq->msg->size),
234                                              h->retry_backoff, GNUNET_YES,
235                                              &transmit_ready_notify, h);
236     return 0;
237   }
238   h->retry_backoff = GNUNET_TIME_UNIT_ZERO;
239   GNUNET_assert (ntohs (mq->msg->size) <= size);
240   size = ntohs (mq->msg->size);
241   memcpy (buf, mq->msg, size);
242   LOG_DEBUG ("Message of type: %u and size: %u sent\n",
243              ntohs (mq->msg->type), size);
244   GNUNET_free (mq->msg);
245   GNUNET_CONTAINER_DLL_remove (h->mq_head, h->mq_tail, mq);
246   GNUNET_free (mq);
247   h->bwrote += (size - sizeof (struct GNUNET_MessageHeader));
248   mq = h->mq_head;
249   if (NULL != mq)
250   {
251     h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
252     h->th =
253         GNUNET_CLIENT_notify_transmit_ready (h->client,
254                                              ntohs (mq->msg->size),
255                                              h->retry_backoff, GNUNET_YES,
256                                              &transmit_ready_notify, h);
257     return size;
258   }
259   if (NULL != h->cb)
260     trigger_flush_notification (h);       /* Call the flush completion callback */
261   return size;
262 }
263
264
265 /**
266  * Queues a message in send queue of the logger handle
267  *
268  * @param h the logger handle
269  * @param msg the message to queue
270  */
271 static void
272 queue_message (struct GNUNET_TESTBED_LOGGER_Handle *h,
273                struct GNUNET_MessageHeader *msg)
274 {
275   struct MessageQueue *mq;
276   uint16_t type;
277   uint16_t size;
278
279   type = ntohs (msg->type);
280   size = ntohs (msg->size);
281   mq = GNUNET_malloc (sizeof (struct MessageQueue));
282   mq->msg = msg;
283   LOG (GNUNET_ERROR_TYPE_DEBUG,
284        "Queueing message of type %u, size %u for sending\n", type,
285        ntohs (msg->size));
286   GNUNET_CONTAINER_DLL_insert_tail (h->mq_head, h->mq_tail, mq);
287   if (NULL == h->th)
288   {
289     h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
290     h->th =
291         GNUNET_CLIENT_notify_transmit_ready (h->client, size,
292                                              h->retry_backoff, GNUNET_YES,
293                                              &transmit_ready_notify,
294                                              h);
295   }
296 }
297
298
299 /**
300  * Send the buffered data to the service
301  *
302  * @param h the logger handle
303  */
304 static void
305 dispatch_buffer (struct GNUNET_TESTBED_LOGGER_Handle *h)
306 {
307   struct GNUNET_MessageHeader *msg;
308   size_t msize;
309
310   msize = sizeof (struct GNUNET_MessageHeader) + h->bs;
311   msg = GNUNET_realloc (h->buf, msize);
312   h->buf = NULL;
313   memmove (&msg[1], msg, h->bs);
314   h->bs = 0;    
315   msg->type = htons (GNUNET_MESSAGE_TYPE_TESTBED_LOGGER_MSG);
316   msg->size = htons (msize);
317   queue_message (h, msg);
318 }
319
320
321 /**
322  * Connect to the testbed logger service
323  *
324  * @param cfg configuration to use
325  * @return the handle which can be used for sending data to the service; NULL
326  *           upon any error
327  */
328 struct GNUNET_TESTBED_LOGGER_Handle *
329 GNUNET_TESTBED_LOGGER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
330 {
331   struct GNUNET_TESTBED_LOGGER_Handle *h;
332   struct GNUNET_CLIENT_Connection *client;
333   
334   client = GNUNET_CLIENT_connect ("testbed-logger", cfg);
335   if (NULL == client)
336     return NULL;
337   h = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_LOGGER_Handle));
338   h->client = client;
339   return h;
340 }
341
342
343 /**
344  * Disconnect from the logger service.
345  *
346  * @param h the logger handle
347  */
348 void
349 GNUNET_TESTBED_LOGGER_disconnect (struct GNUNET_TESTBED_LOGGER_Handle *h)
350 {
351   struct MessageQueue *mq;
352
353   if (GNUNET_SCHEDULER_NO_TASK != h->flush_completion_task)
354     GNUNET_SCHEDULER_cancel (h->flush_completion_task);
355   while (NULL != (mq = h->mq_head))
356   {
357     GNUNET_CONTAINER_DLL_remove (h->mq_head, h->mq_tail, mq);
358     GNUNET_free (mq->msg);
359     GNUNET_free (mq);
360   }
361   GNUNET_CLIENT_disconnect (h->client);
362   GNUNET_free (h);
363 }
364
365
366 /**
367  * Send data to be logged to the logger service.  The data will be buffered and
368  * will be sent upon an explicit call to GNUNET_TESTBED_LOGGER_flush() or upon
369  * exceeding a threshold size.
370  *
371  * @param h the logger handle
372  * @param data the data to send;
373  * @param size how many bytes of data to send
374  */
375 void
376 GNUNET_TESTBED_LOGGER_write (struct GNUNET_TESTBED_LOGGER_Handle *h,
377                              const void *data, size_t size)
378 {  
379   size_t fit_size;
380
381   GNUNET_assert (0 != size);
382   GNUNET_assert (NULL != data);
383   GNUNET_assert (size < (GNUNET_SERVER_MAX_MESSAGE_SIZE
384                          - sizeof (struct GNUNET_MessageHeader)));
385   fit_size = sizeof (struct GNUNET_MessageHeader) + h->bs + size;
386   if ( GNUNET_SERVER_MAX_MESSAGE_SIZE < fit_size )
387     dispatch_buffer (h);
388   if (NULL == h->buf)
389   {
390     h->buf = GNUNET_malloc (size);
391     h->bs = size;
392     memcpy (h->buf, data, size);
393     return;
394   }
395   h->buf = GNUNET_realloc (h->buf, h->bs + size);
396   memcpy (h->buf + h->bs, data, size);
397   h->bs += size;
398   return;
399 }
400
401
402 /**
403  * Task to be executed when flushing our local buffer takes longer than timeout
404  * given to GNUNET_TESTBED_LOGGER_flush().  The flush completion callback will
405  * be called with 0 as the amount of data sent.
406  *
407  * @param cls the logger handle
408  * @param tc scheduler task context
409  */
410 static void
411 timeout_flush (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
412 {
413   struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
414   GNUNET_TESTBED_LOGGER_FlushCompletion cb;
415   void *cb_cls;
416
417   h->timeout_flush_task = GNUNET_SCHEDULER_NO_TASK;
418   cb = h->cb;
419   h->cb = NULL;
420   cb_cls = h->cb_cls;
421   h->cb_cls = NULL;
422   if (GNUNET_SCHEDULER_NO_TASK != h->flush_completion_task)
423   {
424     GNUNET_SCHEDULER_cancel (h->flush_completion_task);
425     h->flush_completion_task = GNUNET_SCHEDULER_NO_TASK;
426   }
427   if (NULL != cb)
428     cb (cb_cls, 0);
429 }
430
431
432 /**
433  * Flush the buffered data to the logger service
434  *
435  * @param h the logger handle
436  * @param timeout how long to wait before calling the flust completion callback
437  * @param cb the callback to call after the data is flushed
438  * @param cb_cls the closure for the above callback
439  */
440 void
441 GNUNET_TESTBED_LOGGER_flush (struct GNUNET_TESTBED_LOGGER_Handle *h,
442                              struct GNUNET_TIME_Relative timeout,
443                              GNUNET_TESTBED_LOGGER_FlushCompletion cb,
444                              void *cb_cls)
445 {
446   h->cb = cb;
447   h->cb_cls = cb_cls;
448   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == h->timeout_flush_task);
449   h->timeout_flush_task = 
450       GNUNET_SCHEDULER_add_delayed (timeout, &timeout_flush, h);
451   if (NULL == h->buf)
452   {
453     trigger_flush_notification (h);
454     return;
455   }
456   dispatch_buffer (h);
457 }
458
459
460 /**
461  * Cancel notification upon flush.  Should only be used when the flush
462  * completion callback given to GNUNET_TESTBED_LOGGER_flush() is not already
463  * called.
464  *
465  * @param h the logger handle
466  */
467 void
468 GNUNET_TESTBED_LOGGER_flush_cancel (struct GNUNET_TESTBED_LOGGER_Handle *h)
469 {
470   if (GNUNET_SCHEDULER_NO_TASK != h->flush_completion_task)
471   {
472     GNUNET_SCHEDULER_cancel (h->flush_completion_task);
473     h->flush_completion_task = GNUNET_SCHEDULER_NO_TASK;
474   }
475   if (GNUNET_SCHEDULER_NO_TASK != h->timeout_flush_task)
476     cancel_timeout_flush (h);
477   h->cb = NULL;
478   h->cb_cls = NULL;
479 }
480
481 /* End of testbed_logger_api.c */