6e4c768ba622e8f1f69b427b4dad69e77b62aa52
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "statistics.h"
32
33 /**
34  * How long do we wait until a statistics request for setting
35  * a value times out?  (The update will be lost if the
36  * service does not react within this timeframe).
37  */
38 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
39
40 #define LOG(kind,...) GNUNET_log_from (kind, "statistics-api",__VA_ARGS__)
41
42 /**
43  * Types of actions.
44  */
45 enum ActionType
46 {
47   /**
48    * Get a value.
49    */
50   ACTION_GET,
51
52   /**
53    * Set a value.
54    */
55   ACTION_SET,
56
57   /**
58    * Update a value.
59    */
60   ACTION_UPDATE,
61
62   /**
63    * Watch a value.
64    */
65   ACTION_WATCH
66 };
67
68
69 /**
70  * Entry kept for each value we are watching.
71  */
72 struct GNUNET_STATISTICS_WatchEntry
73 {
74
75   /**
76    * What subsystem is this action about? (never NULL)
77    */
78   char *subsystem;
79
80   /**
81    * What value is this action about? (never NULL)
82    */
83   char *name;
84
85   /**
86    * Function to call
87    */
88   GNUNET_STATISTICS_Iterator proc;
89
90   /**
91    * Closure for @e proc
92    */
93   void *proc_cls;
94
95 };
96
97
98 /**
99  * Linked list of things we still need to do.
100  */
101 struct GNUNET_STATISTICS_GetHandle
102 {
103
104   /**
105    * This is a doubly linked list.
106    */
107   struct GNUNET_STATISTICS_GetHandle *next;
108
109   /**
110    * This is a doubly linked list.
111    */
112   struct GNUNET_STATISTICS_GetHandle *prev;
113
114   /**
115    * Main statistics handle.
116    */
117   struct GNUNET_STATISTICS_Handle *sh;
118
119   /**
120    * What subsystem is this action about? (can be NULL)
121    */
122   char *subsystem;
123
124   /**
125    * What value is this action about? (can be NULL)
126    */
127   char *name;
128
129   /**
130    * Continuation to call once action is complete.
131    */
132   GNUNET_STATISTICS_Callback cont;
133
134   /**
135    * Function to call (for GET actions only).
136    */
137   GNUNET_STATISTICS_Iterator proc;
138
139   /**
140    * Closure for @e proc and @e cont.
141    */
142   void *cls;
143
144   /**
145    * Timeout for this action.
146    */
147   struct GNUNET_TIME_Absolute timeout;
148
149   /**
150    * Associated value.
151    */
152   uint64_t value;
153
154   /**
155    * Flag for SET/UPDATE actions.
156    */
157   int make_persistent;
158
159   /**
160    * Has the current iteration been aborted; for GET actions.
161    */
162   int aborted;
163
164   /**
165    * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH?
166    */
167   enum ActionType type;
168
169   /**
170    * Size of the message that we will be transmitting.
171    */
172   uint16_t msize;
173
174 };
175
176
177 /**
178  * Handle for the service.
179  */
180 struct GNUNET_STATISTICS_Handle
181 {
182   /**
183    * Name of our subsystem.
184    */
185   char *subsystem;
186
187   /**
188    * Configuration to use.
189    */
190   const struct GNUNET_CONFIGURATION_Handle *cfg;
191
192   /**
193    * Message queue to the service.
194    */
195   struct GNUNET_MQ_Handle *mq;
196
197   /**
198    * Head of the linked list of pending actions (first action
199    * to be performed).
200    */
201   struct GNUNET_STATISTICS_GetHandle *action_head;
202
203   /**
204    * Tail of the linked list of actions (for fast append).
205    */
206   struct GNUNET_STATISTICS_GetHandle *action_tail;
207
208   /**
209    * Action we are currently busy with (action request has been
210    * transmitted, we're now receiving the response from the
211    * service).
212    */
213   struct GNUNET_STATISTICS_GetHandle *current;
214
215   /**
216    * Array of watch entries.
217    */
218   struct GNUNET_STATISTICS_WatchEntry **watches;
219
220   /**
221    * Task doing exponential back-off trying to reconnect.
222    */
223   struct GNUNET_SCHEDULER_Task *backoff_task;
224
225   /**
226    * Task for running #do_destroy().
227    */
228   struct GNUNET_SCHEDULER_Task *destroy_task;
229
230   /**
231    * Time for next connect retry.
232    */
233   struct GNUNET_TIME_Relative backoff;
234
235   /**
236    * Maximum heap size observed so far (if available).
237    */
238   uint64_t peak_heap_size;
239
240   /**
241    * Maximum resident set side observed so far (if available).
242    */
243   uint64_t peak_rss;
244
245   /**
246    * Size of the @e watches array.
247    */
248   unsigned int watches_size;
249
250   /**
251    * Should this handle auto-destruct once all actions have
252    * been processed?
253    */
254   int do_destroy;
255
256   /**
257    * Are we currently receiving from the service?
258    */
259   int receiving;
260
261 };
262
263
264 /**
265  * Obtain statistics about this process's memory consumption and
266  * report those as well (if they changed).
267  */
268 static void
269 update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
270 {
271 #if ENABLE_HEAP_STATISTICS
272   uint64_t current_heap_size = 0;
273   uint64_t current_rss = 0;
274
275   if (GNUNET_NO != h->do_destroy)
276     return;
277 #if HAVE_MALLINFO
278   {
279     struct mallinfo mi;
280
281     mi = mallinfo();
282     current_heap_size = mi.uordblks + mi.fordblks;
283   }
284 #endif
285 #if HAVE_GETRUSAGE
286   {
287     struct rusage ru;
288
289     if (0 == getrusage (RUSAGE_SELF, &ru))
290     {
291       current_rss = 1024LL * ru.ru_maxrss;
292     }
293   }
294 #endif
295   if (current_heap_size > h->peak_heap_size)
296   {
297     h->peak_heap_size = current_heap_size;
298     GNUNET_STATISTICS_set (h, "# peak heap size", current_heap_size, GNUNET_NO);
299   }
300   if (current_rss > h->peak_rss)
301   {
302     h->peak_rss = current_rss;
303     GNUNET_STATISTICS_set (h, "# peak resident set size", current_rss, GNUNET_NO);
304   }
305 #endif
306 }
307
308
309 /**
310  * Schedule the next action to be performed.
311  *
312  * @param cls statistics handle to reconnect
313  */
314 static void
315 schedule_action (void *cls);
316
317
318 /**
319  * Reconnect at a later time, respecting back-off.
320  *
321  * @param h statistics handle
322  */
323 static void
324 reconnect_later (struct GNUNET_STATISTICS_Handle *h);
325
326
327 /**
328  * Transmit request to service that we want to watch
329  * the development of a particular value.
330  *
331  * @param h statistics handle
332  * @param watch watch entry of the value to watch
333  */
334 static void
335 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
336                         struct GNUNET_STATISTICS_WatchEntry *watch)
337 {
338   struct GNUNET_STATISTICS_GetHandle *ai;
339   size_t slen;
340   size_t nlen;
341   size_t nsize;
342
343   slen = strlen (watch->subsystem) + 1;
344   nlen = strlen (watch->name) + 1;
345   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
346   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
347   {
348     GNUNET_break (0);
349     return;
350   }
351   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
352   ai->sh = h;
353   ai->subsystem = GNUNET_strdup (watch->subsystem);
354   ai->name = GNUNET_strdup (watch->name);
355   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
356   ai->msize = nsize;
357   ai->type = ACTION_WATCH;
358   ai->proc = watch->proc;
359   ai->cls = watch->proc_cls;
360   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
361                                     h->action_tail,
362                                     ai);
363   schedule_action (h);
364 }
365
366
367 /**
368  * Free memory associated with the given action item.
369  *
370  * @param gh action item to free
371  */
372 static void
373 free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
374 {
375   GNUNET_free_non_null (gh->subsystem);
376   GNUNET_free_non_null (gh->name);
377   GNUNET_free (gh);
378 }
379
380
381 /**
382  * Disconnect from the statistics service.
383  *
384  * @param h statistics handle to disconnect from
385  */
386 static void
387 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
388 {
389   struct GNUNET_STATISTICS_GetHandle *c;
390
391   h->receiving = GNUNET_NO;
392   if (NULL != (c = h->current))
393   {
394     h->current = NULL;
395     if ( (NULL != c->cont) &&
396          (GNUNET_YES != c->aborted) )
397     {
398       c->cont (c->cls,
399                GNUNET_SYSERR);
400       c->cont = NULL;
401     }
402     free_action_item (c);
403   }
404   if (NULL != h->mq)
405   {
406     GNUNET_MQ_destroy (h->mq);
407     h->mq = NULL;
408   }
409 }
410
411
412 /**
413  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
414  *
415  * @param cls statistics handle
416  * @param smsg message received from the service, never NULL
417  * @return #GNUNET_OK if the message was well-formed
418  */
419 static int
420 check_statistics_value (void *cls,
421                         const struct GNUNET_STATISTICS_ReplyMessage *smsg)
422 {
423   const char *service;
424   const char *name;
425   uint16_t size;
426
427   size = ntohs (smsg->header.size);
428   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
429   if (size !=
430       GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
431                                       size,
432                                       2,
433                                       &service,
434                                       &name))
435   {
436     GNUNET_break (0);
437     return GNUNET_SYSERR;
438   }
439   return GNUNET_OK;
440 }
441
442
443 /**
444  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
445  *
446  * @param cls statistics handle
447  * @param msg message received from the service, never NULL
448  * @return #GNUNET_OK if the message was well-formed
449  */
450 static void
451 handle_statistics_value (void *cls,
452                          const struct GNUNET_STATISTICS_ReplyMessage *smsg)
453 {
454   struct GNUNET_STATISTICS_Handle *h = cls;
455   const char *service;
456   const char *name;
457   uint16_t size;
458
459   if (h->current->aborted)
460     return;           /* iteration aborted, don't bother */
461
462   size = ntohs (smsg->header.size);
463   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
464   GNUNET_assert (size ==
465                  GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
466                                                  size,
467                                                  2,
468                                                  &service,
469                                                  &name));
470   LOG (GNUNET_ERROR_TYPE_DEBUG,
471        "Received valid statistic on `%s:%s': %llu\n",
472        service, name,
473        GNUNET_ntohll (smsg->value));
474   if (GNUNET_OK !=
475       h->current->proc (h->current->cls,
476                         service,
477                         name,
478                         GNUNET_ntohll (smsg->value),
479                         0 !=
480                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
481   {
482     LOG (GNUNET_ERROR_TYPE_DEBUG,
483          "Processing of remaining statistics aborted by client.\n");
484     h->current->aborted = GNUNET_YES;
485   }
486 }
487
488
489 /**
490  * We have received a watch value from the service.  Process it.
491  *
492  * @param cls statistics handle
493  * @param msg the watch value message
494  */
495 static void
496 handle_statistics_watch_value (void *cls,
497                                const struct GNUNET_STATISTICS_WatchValueMessage *wvm)
498 {
499   struct GNUNET_STATISTICS_Handle *h = cls;
500   struct GNUNET_STATISTICS_WatchEntry *w;
501   uint32_t wid;
502
503   GNUNET_break (0 == ntohl (wvm->reserved));
504   wid = ntohl (wvm->wid);
505   if (wid >= h->watches_size)
506   {
507     do_disconnect (h);
508     reconnect_later (h);
509     return;
510   }
511   w = h->watches[wid];
512   if (NULL == w)
513     return;
514   (void) w->proc (w->proc_cls,
515                   w->subsystem,
516                   w->name,
517                   GNUNET_ntohll (wvm->value),
518                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
519 }
520
521
522 /**
523  * Generic error handler, called with the appropriate error code and
524  * the same closure specified at the creation of the message queue.
525  * Not every message queue implementation supports an error handler.
526  *
527  * @param cls closure with the `struct GNUNET_STATISTICS_Handle *`
528  * @param error error code
529  */
530 static void
531 mq_error_handler (void *cls,
532                   enum GNUNET_MQ_Error error)
533 {
534   struct GNUNET_STATISTICS_Handle *h = cls;
535
536   if (GNUNET_NO != h->do_destroy)
537   {
538     h->do_destroy = GNUNET_NO;
539     GNUNET_STATISTICS_destroy (h,
540                                GNUNET_NO);
541     return;
542   }
543   do_disconnect (h);
544   reconnect_later (h);
545 }
546
547
548 /**
549  * Task used to destroy the statistics handle.
550  *
551  * @param cls the `struct GNUNET_STATISTICS_Handle`
552  */
553 static void
554 destroy_task (void *cls)
555 {
556   struct GNUNET_STATISTICS_Handle *h = cls;
557
558   LOG (GNUNET_ERROR_TYPE_DEBUG,
559        "Running final destruction\n");
560   GNUNET_STATISTICS_destroy (h,
561                              GNUNET_NO);
562 }
563
564
565 /**
566  * Handle a #GNUNET_MESSAGE_TYPE_TEST (sic) message. We receive this
567  * message at the end of the shutdown when the service confirms that
568  * all data has been written to disk.
569  *
570  * @param cls our `struct GNUNET_STATISTICS_Handle *`
571  * @param msg the message
572  */
573 static void
574 handle_test (void *cls,
575              const struct GNUNET_MessageHeader *msg)
576 {
577   struct GNUNET_STATISTICS_Handle *h = cls;
578
579   if (GNUNET_SYSERR != h->do_destroy)
580   {
581     /* not in shutdown, why do we get 'TEST'? */
582     GNUNET_break (0);
583     do_disconnect (h);
584     reconnect_later (h);
585     return;
586   }
587   h->do_destroy = GNUNET_NO;
588   LOG (GNUNET_ERROR_TYPE_DEBUG,
589        "Received TEST message from statistics, can complete disconnect\n");
590   GNUNET_SCHEDULER_add_now (&destroy_task,
591                             h);
592 }
593
594
595 /**
596  * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive
597  * this message in response to a query to indicate that there are no
598  * further matching results.
599  *
600  * @param cls our `struct GNUNET_STATISTICS_Handle *`
601  * @param msg the message
602  */
603 static void
604 handle_statistics_end (void *cls,
605                        const struct GNUNET_MessageHeader *msg)
606 {
607   struct GNUNET_STATISTICS_Handle *h = cls;
608   struct GNUNET_STATISTICS_GetHandle *c;
609
610   LOG (GNUNET_ERROR_TYPE_DEBUG,
611        "Received end of statistics marker\n");
612   if (NULL == (c = h->current))
613   {
614     GNUNET_break (0);
615     do_disconnect (h);
616     reconnect_later (h);
617     return;
618   }
619   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
620   h->current = NULL;
621   schedule_action (h);
622   if (NULL != c->cont)
623   {
624     c->cont (c->cls,
625              GNUNET_OK);
626     c->cont = NULL;
627   }
628   free_action_item (c);
629 }
630
631
632 /**
633  * Try to (re)connect to the statistics service.
634  *
635  * @param h statistics handle to reconnect
636  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
637  */
638 static int
639 try_connect (struct GNUNET_STATISTICS_Handle *h)
640 {
641   GNUNET_MQ_hd_fixed_size (test,
642                            GNUNET_MESSAGE_TYPE_TEST,
643                            struct GNUNET_MessageHeader);
644   GNUNET_MQ_hd_fixed_size (statistics_end,
645                            GNUNET_MESSAGE_TYPE_STATISTICS_END,
646                            struct GNUNET_MessageHeader);
647   GNUNET_MQ_hd_var_size (statistics_value,
648                          GNUNET_MESSAGE_TYPE_STATISTICS_VALUE,
649                          struct GNUNET_STATISTICS_ReplyMessage);
650   GNUNET_MQ_hd_fixed_size (statistics_watch_value,
651                            GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE,
652                            struct GNUNET_STATISTICS_WatchValueMessage);
653   struct GNUNET_MQ_MessageHandler handlers[] = {
654     make_test_handler (h),
655     make_statistics_end_handler (h),
656     make_statistics_value_handler (h),
657     make_statistics_watch_value_handler (h),
658     GNUNET_MQ_handler_end ()
659   };
660   struct GNUNET_STATISTICS_GetHandle *gh;
661   struct GNUNET_STATISTICS_GetHandle *gn;
662
663   if (NULL != h->backoff_task)
664     return GNUNET_NO;
665   if (NULL != h->mq)
666     return GNUNET_YES;
667   h->mq = GNUNET_CLIENT_connecT (h->cfg,
668                                  "statistics",
669                                  handlers,
670                                  &mq_error_handler,
671                                  h);
672   if (NULL == h->mq)
673   {
674     LOG (GNUNET_ERROR_TYPE_DEBUG,
675          "Failed to connect to statistics service!\n");
676     return GNUNET_NO;
677   }
678   gn = h->action_head;
679   while (NULL != (gh = gn))
680   {
681     gn = gh->next;
682     if (gh->type == ACTION_WATCH)
683     {
684       GNUNET_CONTAINER_DLL_remove (h->action_head,
685                                    h->action_tail,
686                                    gh);
687       free_action_item (gh);
688     }
689   }
690   for (unsigned int i = 0; i < h->watches_size; i++)
691     if (NULL != h->watches[i])
692       schedule_watch_request (h,
693                               h->watches[i]);
694   return GNUNET_YES;
695 }
696
697
698 /**
699  * We've waited long enough, reconnect now.
700  *
701  * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect
702  */
703 static void
704 reconnect_task (void *cls)
705 {
706   struct GNUNET_STATISTICS_Handle *h = cls;
707
708   h->backoff_task = NULL;
709   schedule_action (h);
710 }
711
712
713 /**
714  * Task used by #reconnect_later() to shutdown the handle
715  *
716  * @param cls the statistics handle
717  */
718 static void
719 do_destroy (void *cls)
720 {
721   struct GNUNET_STATISTICS_Handle *h = cls;
722
723   h->destroy_task = NULL;
724   h->do_destroy = GNUNET_NO;
725   GNUNET_STATISTICS_destroy (h,
726                              GNUNET_NO);
727 }
728
729
730 /**
731  * Reconnect at a later time, respecting back-off.
732  *
733  * @param h statistics handle
734  */
735 static void
736 reconnect_later (struct GNUNET_STATISTICS_Handle *h)
737 {
738   int loss;
739   struct GNUNET_STATISTICS_GetHandle *gh;
740
741   GNUNET_assert (NULL == h->backoff_task);
742   if (GNUNET_YES == h->do_destroy)
743   {
744     /* So we are shutting down and the service is not reachable.
745      * Chances are that it's down for good and we are not going to connect to
746      * it anymore.
747      * Give up and don't sync the rest of the data.
748      */
749     loss = GNUNET_NO;
750     for (gh = h->action_head; NULL != gh; gh = gh->next)
751       if ( (gh->make_persistent) && (ACTION_SET == gh->type) )
752         loss = GNUNET_YES;
753     if (GNUNET_YES == loss)
754       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
755                   _("Could not save some persistent statistics\n"));
756     h->do_destroy = GNUNET_NO;
757     if (NULL != h->destroy_task)
758       GNUNET_SCHEDULER_cancel (h->destroy_task);
759     h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
760                                                 h);
761     return;
762   }
763   h->backoff_task
764     = GNUNET_SCHEDULER_add_delayed (h->backoff,
765                                     &reconnect_task,
766                                     h);
767   h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
768 }
769
770
771
772 /**
773  * Transmit a GET request (and if successful, start to receive
774  * the response).
775  *
776  * @param handle statistics handle
777  */
778 static void
779 transmit_get (struct GNUNET_STATISTICS_Handle *handle)
780 {
781   struct GNUNET_STATISTICS_GetHandle *c;
782   struct GNUNET_MessageHeader *hdr;
783   struct GNUNET_MQ_Envelope *env;
784   size_t slen1;
785   size_t slen2;
786
787   GNUNET_assert (NULL != (c = handle->current));
788   slen1 = strlen (c->subsystem) + 1;
789   slen2 = strlen (c->name) + 1;
790   env = GNUNET_MQ_msg_extra (hdr,
791                              slen1 + slen2,
792                              GNUNET_MESSAGE_TYPE_STATISTICS_GET);
793   GNUNET_assert (slen1 + slen2 ==
794                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
795                                              slen1 + slen2,
796                                              2,
797                                              c->subsystem,
798                                              c->name));
799   GNUNET_MQ_send (handle->mq,
800                   env);
801 }
802
803
804 /**
805  * Transmit a WATCH request (and if successful, start to receive
806  * the response).
807  *
808  * @param handle statistics handle
809  */
810 static void
811 transmit_watch (struct GNUNET_STATISTICS_Handle *handle)
812 {
813   struct GNUNET_MessageHeader *hdr;
814   struct GNUNET_MQ_Envelope *env;
815   size_t slen1;
816   size_t slen2;
817
818   LOG (GNUNET_ERROR_TYPE_DEBUG,
819        "Transmitting watch request for `%s'\n",
820        handle->current->name);
821   slen1 = strlen (handle->current->subsystem) + 1;
822   slen2 = strlen (handle->current->name) + 1;
823   env = GNUNET_MQ_msg_extra (hdr,
824                              slen1 + slen2,
825                              GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
826   GNUNET_assert (slen1 + slen2 ==
827                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
828                                              slen1 + slen2,
829                                              2,
830                                              handle->current->subsystem,
831                                              handle->current->name));
832   GNUNET_MQ_send (handle->mq,
833                   env);
834   GNUNET_assert (NULL == handle->current->cont);
835   free_action_item (handle->current);
836   handle->current = NULL;
837   schedule_action (handle);
838 }
839
840
841 /**
842  * Transmit a SET/UPDATE request.
843  *
844  * @param handle statistics handle
845  */
846 static void
847 transmit_set (struct GNUNET_STATISTICS_Handle *handle)
848 {
849   struct GNUNET_STATISTICS_SetMessage *r;
850   struct GNUNET_MQ_Envelope *env;
851   size_t slen;
852   size_t nlen;
853
854   slen = strlen (handle->current->subsystem) + 1;
855   nlen = strlen (handle->current->name) + 1;
856   env = GNUNET_MQ_msg_extra (r,
857                              slen + nlen,
858                              GNUNET_MESSAGE_TYPE_STATISTICS_SET);
859   r->flags = 0;
860   r->value = GNUNET_htonll (handle->current->value);
861   if (handle->current->make_persistent)
862     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
863   if (handle->current->type == ACTION_UPDATE)
864     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
865   GNUNET_assert (slen + nlen ==
866                  GNUNET_STRINGS_buffer_fill ((char *) &r[1],
867                                              slen + nlen,
868                                              2,
869                                              handle->current->subsystem,
870                                              handle->current->name));
871   GNUNET_assert (NULL == handle->current->cont);
872   free_action_item (handle->current);
873   handle->current = NULL;
874   update_memory_statistics (handle);
875   GNUNET_MQ_notify_sent (env,
876                          &schedule_action,
877                          handle);
878   GNUNET_MQ_send (handle->mq,
879                   env);
880 }
881
882
883 /**
884  * Get handle for the statistics service.
885  *
886  * @param subsystem name of subsystem using the service
887  * @param cfg services configuration in use
888  * @return handle to use
889  */
890 struct GNUNET_STATISTICS_Handle *
891 GNUNET_STATISTICS_create (const char *subsystem,
892                           const struct GNUNET_CONFIGURATION_Handle *cfg)
893 {
894   struct GNUNET_STATISTICS_Handle *h;
895
896   if (GNUNET_YES ==
897       GNUNET_CONFIGURATION_get_value_yesno (cfg,
898                                             "statistics",
899                                             "DISABLE"))
900     return NULL;
901   h = GNUNET_new (struct GNUNET_STATISTICS_Handle);
902   h->cfg = cfg;
903   h->subsystem = GNUNET_strdup (subsystem);
904   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
905   return h;
906 }
907
908
909 /**
910  * Destroy a handle (free all state associated with
911  * it).
912  *
913  * @param h statistics handle to destroy
914  * @param sync_first set to #GNUNET_YES if pending SET requests should
915  *        be completed
916  */
917 void
918 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
919                            int sync_first)
920 {
921   struct GNUNET_STATISTICS_GetHandle *pos;
922   struct GNUNET_STATISTICS_GetHandle *next;
923
924   if (NULL == h)
925     return;
926   GNUNET_assert (GNUNET_NO == h->do_destroy); // Don't call twice.
927   if ( (sync_first) &&
928        (GNUNET_YES == try_connect (h)) )
929   {
930     if ( (NULL != h->current) &&
931          (ACTION_GET == h->current->type) )
932       h->current->aborted = GNUNET_YES;
933     next = h->action_head;
934     while (NULL != (pos = next))
935     {
936       next = pos->next;
937       if ( (ACTION_GET == pos->type) ||
938            (ACTION_WATCH == pos->type) ||
939            (GNUNET_NO == pos->make_persistent) )
940       {
941         GNUNET_CONTAINER_DLL_remove (h->action_head,
942                                      h->action_tail,
943                                      pos);
944         free_action_item (pos);
945       }
946     }
947     h->do_destroy = GNUNET_YES;
948     schedule_action (h);
949     h->destroy_task
950       = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (h->backoff,
951                                                                      5),
952                                       &do_destroy,
953                                       h);
954     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
955                 "Deferring destruction\n");
956     return; /* do not finish destruction just yet */
957   }
958   /* do clean up all */
959   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
960               "Cleaning all up\n");
961   while (NULL != (pos = h->action_head))
962   {
963     GNUNET_CONTAINER_DLL_remove (h->action_head,
964                                  h->action_tail,
965                                  pos);
966     free_action_item (pos);
967   }
968   do_disconnect (h);
969   if (NULL != h->backoff_task)
970   {
971     GNUNET_SCHEDULER_cancel (h->backoff_task);
972     h->backoff_task = NULL;
973   }
974   if (NULL != h->destroy_task)
975   {
976     GNUNET_break (0);
977     GNUNET_SCHEDULER_cancel (h->destroy_task);
978     h->destroy_task = NULL;
979   }
980   for (unsigned int i = 0; i < h->watches_size; i++)
981   {
982     if (NULL == h->watches[i])
983       continue;
984     GNUNET_free (h->watches[i]->subsystem);
985     GNUNET_free (h->watches[i]->name);
986     GNUNET_free (h->watches[i]);
987   }
988   GNUNET_array_grow (h->watches,
989                      h->watches_size,
990                      0);
991   GNUNET_free (h->subsystem);
992   GNUNET_free (h);
993 }
994
995
996 /**
997  * Schedule the next action to be performed.
998  *
999  * @param cls statistics handle
1000  */
1001 static void
1002 schedule_action (void *cls)
1003 {
1004   struct GNUNET_STATISTICS_Handle *h = cls;
1005
1006   if (NULL != h->backoff_task)
1007     return;                     /* action already pending */
1008   if (GNUNET_YES != try_connect (h))
1009   {
1010     reconnect_later (h);
1011     return;
1012   }
1013   /* schedule next action */
1014   while (NULL == h->current)
1015   {
1016     h->current = h->action_head;
1017     if (NULL == h->current)
1018     {
1019       struct GNUNET_MessageHeader *hdr;
1020       struct GNUNET_MQ_Envelope *env;
1021
1022       if (GNUNET_YES != h->do_destroy)
1023         return; /* nothing to do */
1024       /* let service know that we're done */
1025       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1026                   "Notifying service that we are done\n");
1027       h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
1028       env = GNUNET_MQ_msg (hdr,
1029                            GNUNET_MESSAGE_TYPE_TEST);
1030       GNUNET_MQ_send (h->mq,
1031                       env);
1032       return;
1033     }
1034     GNUNET_CONTAINER_DLL_remove (h->action_head,
1035                                  h->action_tail,
1036                                  h->current);
1037     switch (h->current->type)
1038     {
1039     case ACTION_GET:
1040       transmit_get (h);
1041       break;
1042     case ACTION_SET:
1043     case ACTION_UPDATE:
1044       transmit_set (h);
1045       break;
1046     case ACTION_WATCH:
1047       transmit_watch (h);
1048       break;
1049     default:
1050       GNUNET_assert (0);
1051       break;
1052     }
1053   }
1054 }
1055
1056
1057 /**
1058  * Get statistic from the peer.
1059  *
1060  * @param handle identification of the statistics service
1061  * @param subsystem limit to the specified subsystem, NULL for our subsystem
1062  * @param name name of the statistic value, NULL for all values
1063  * @param cont continuation to call when done (can be NULL)
1064  *        This callback CANNOT destroy the statistics handle in the same call.
1065  * @param proc function to call on each value
1066  * @param cls closure for @a cont and @a proc
1067  * @return NULL on error
1068  */
1069 struct GNUNET_STATISTICS_GetHandle *
1070 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1071                        const char *subsystem,
1072                        const char *name,
1073                        GNUNET_STATISTICS_Callback cont,
1074                        GNUNET_STATISTICS_Iterator proc,
1075                        void *cls)
1076 {
1077   size_t slen1;
1078   size_t slen2;
1079   struct GNUNET_STATISTICS_GetHandle *ai;
1080
1081   if (NULL == handle)
1082     return NULL;
1083   GNUNET_assert (NULL != proc);
1084   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1085   if (NULL == subsystem)
1086     subsystem = "";
1087   if (NULL == name)
1088     name = "";
1089   slen1 = strlen (subsystem) + 1;
1090   slen2 = strlen (name) + 1;
1091   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
1092                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1093   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1094   ai->sh = handle;
1095   ai->subsystem = GNUNET_strdup (subsystem);
1096   ai->name = GNUNET_strdup (name);
1097   ai->cont = cont;
1098   ai->proc = proc;
1099   ai->cls = cls;
1100   ai->type = ACTION_GET;
1101   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
1102   GNUNET_CONTAINER_DLL_insert_tail (handle->action_head,
1103                                     handle->action_tail,
1104                                     ai);
1105   schedule_action (handle);
1106   return ai;
1107 }
1108
1109
1110 /**
1111  * Cancel a 'get' request.  Must be called before the 'cont'
1112  * function is called.
1113  *
1114  * @param gh handle of the request to cancel
1115  */
1116 void
1117 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1118 {
1119   if (NULL == gh)
1120     return;
1121   gh->cont = NULL;
1122   if (gh->sh->current == gh)
1123   {
1124     gh->aborted = GNUNET_YES;
1125     return;
1126   }
1127   GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
1128                                gh->sh->action_tail,
1129                                gh);
1130   GNUNET_free (gh->name);
1131   GNUNET_free (gh->subsystem);
1132   GNUNET_free (gh);
1133 }
1134
1135
1136 /**
1137  * Watch statistics from the peer (be notified whenever they change).
1138  *
1139  * @param handle identification of the statistics service
1140  * @param subsystem limit to the specified subsystem, never NULL
1141  * @param name name of the statistic value, never NULL
1142  * @param proc function to call on each value
1143  * @param proc_cls closure for @a proc
1144  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
1145  */
1146 int
1147 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1148                          const char *subsystem,
1149                          const char *name,
1150                          GNUNET_STATISTICS_Iterator proc,
1151                          void *proc_cls)
1152 {
1153   struct GNUNET_STATISTICS_WatchEntry *w;
1154
1155   if (NULL == handle)
1156     return GNUNET_SYSERR;
1157   w = GNUNET_new (struct GNUNET_STATISTICS_WatchEntry);
1158   w->subsystem = GNUNET_strdup (subsystem);
1159   w->name = GNUNET_strdup (name);
1160   w->proc = proc;
1161   w->proc_cls = proc_cls;
1162   GNUNET_array_append (handle->watches,
1163                        handle->watches_size,
1164                        w);
1165   schedule_watch_request (handle,
1166                           w);
1167   return GNUNET_OK;
1168 }
1169
1170
1171 /**
1172  * Stop watching statistics from the peer.
1173  *
1174  * @param handle identification of the statistics service
1175  * @param subsystem limit to the specified subsystem, never NULL
1176  * @param name name of the statistic value, never NULL
1177  * @param proc function to call on each value
1178  * @param proc_cls closure for @a proc
1179  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error (no such watch)
1180  */
1181 int
1182 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
1183                                 const char *subsystem,
1184                                 const char *name,
1185                                 GNUNET_STATISTICS_Iterator proc,
1186                                 void *proc_cls)
1187 {
1188   struct GNUNET_STATISTICS_WatchEntry *w;
1189
1190   if (NULL == handle)
1191     return GNUNET_SYSERR;
1192   for (unsigned int i=0;i<handle->watches_size;i++)
1193   {
1194     w = handle->watches[i];
1195     if (NULL == w)
1196       continue;
1197     if ( (w->proc == proc) &&
1198          (w->proc_cls == proc_cls) &&
1199          (0 == strcmp (w->name, name)) &&
1200          (0 == strcmp (w->subsystem, subsystem)) )
1201     {
1202       GNUNET_free (w->name);
1203       GNUNET_free (w->subsystem);
1204       GNUNET_free (w);
1205       handle->watches[i] = NULL;
1206       return GNUNET_OK;
1207     }
1208   }
1209   return GNUNET_SYSERR;
1210 }
1211
1212
1213 /**
1214  * Queue a request to change a statistic.
1215  *
1216  * @param h statistics handle
1217  * @param name name of the value
1218  * @param make_persistent  should the value be kept across restarts?
1219  * @param value new value or change
1220  * @param type type of the action (#ACTION_SET or #ACTION_UPDATE)
1221  */
1222 static void
1223 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
1224                    const char *name,
1225                    int make_persistent,
1226                    uint64_t value,
1227                    enum ActionType type)
1228 {
1229   struct GNUNET_STATISTICS_GetHandle *ai;
1230   size_t slen;
1231   size_t nlen;
1232   size_t nsize;
1233   int64_t delta;
1234
1235   slen = strlen (h->subsystem) + 1;
1236   nlen = strlen (name) + 1;
1237   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1238   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1239   {
1240     GNUNET_break (0);
1241     return;
1242   }
1243   for (ai = h->action_head; NULL != ai; ai = ai->next)
1244   {
1245     if (! ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
1246             (0 == strcmp (ai->name, name)) &&
1247             ( (ACTION_UPDATE == ai->type) ||
1248               (ACTION_SET == ai->type) ) ) )
1249       continue;
1250     if (ACTION_SET == ai->type)
1251     {
1252       if (ACTION_UPDATE == type)
1253       {
1254         delta = (int64_t) value;
1255         if (delta > 0)
1256         {
1257           /* update old set by new delta */
1258           ai->value += delta;
1259         }
1260         else
1261         {
1262           /* update old set by new delta, but never go negative */
1263           if (ai->value < -delta)
1264             ai->value = 0;
1265           else
1266             ai->value += delta;
1267         }
1268       }
1269       else
1270       {
1271         /* new set overrides old set */
1272         ai->value = value;
1273       }
1274     }
1275     else
1276     {
1277       if (ACTION_UPDATE == type)
1278       {
1279         /* make delta cummulative */
1280         delta = (int64_t) value;
1281         ai->value += delta;
1282       }
1283       else
1284       {
1285         /* drop old 'update', use new 'set' instead */
1286         ai->value = value;
1287         ai->type = type;
1288       }
1289     }
1290     ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1291     ai->make_persistent = make_persistent;
1292     return;
1293   }
1294   /* no existing entry matches, create a fresh one */
1295   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1296   ai->sh = h;
1297   ai->subsystem = GNUNET_strdup (h->subsystem);
1298   ai->name = GNUNET_strdup (name);
1299   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1300   ai->make_persistent = make_persistent;
1301   ai->msize = nsize;
1302   ai->value = value;
1303   ai->type = type;
1304   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
1305                                     h->action_tail,
1306                                     ai);
1307   schedule_action (h);
1308 }
1309
1310
1311 /**
1312  * Set statistic value for the peer.  Will always use our
1313  * subsystem (the argument used when "handle" was created).
1314  *
1315  * @param handle identification of the statistics service
1316  * @param name name of the statistic value
1317  * @param value new value to set
1318  * @param make_persistent should the value be kept across restarts?
1319  */
1320 void
1321 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1322                        const char *name,
1323                        uint64_t value,
1324                        int make_persistent)
1325 {
1326   if (NULL == handle)
1327     return;
1328   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1329   add_setter_action (handle,
1330                      name,
1331                      make_persistent,
1332                      value,
1333                      ACTION_SET);
1334 }
1335
1336
1337 /**
1338  * Set statistic value for the peer.  Will always use our
1339  * subsystem (the argument used when "handle" was created).
1340  *
1341  * @param handle identification of the statistics service
1342  * @param name name of the statistic value
1343  * @param delta change in value (added to existing value)
1344  * @param make_persistent should the value be kept across restarts?
1345  */
1346 void
1347 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1348                           const char *name,
1349                           int64_t delta,
1350                           int make_persistent)
1351 {
1352   if (NULL == handle)
1353     return;
1354   if (0 == delta)
1355     return;
1356   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1357   add_setter_action (handle,
1358                      name,
1359                      make_persistent,
1360                      (uint64_t) delta,
1361                      ACTION_UPDATE);
1362 }
1363
1364
1365 /* end of statistics_api.c */