f72c8525bbc234d5e9acb1dc115fcbc5eb3ed738
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "statistics.h"
32
33 /**
34  * How long do we wait until a statistics request for setting
35  * a value times out?  (The update will be lost if the
36  * service does not react within this timeframe).
37  */
38 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
39
40 #define LOG(kind,...) GNUNET_log_from (kind, "statistics-api",__VA_ARGS__)
41
42 /**
43  * Types of actions.
44  */
45 enum ActionType
46 {
47   /**
48    * Get a value.
49    */
50   ACTION_GET,
51
52   /**
53    * Set a value.
54    */
55   ACTION_SET,
56
57   /**
58    * Update a value.
59    */
60   ACTION_UPDATE,
61
62   /**
63    * Watch a value.
64    */
65   ACTION_WATCH
66 };
67
68
69 /**
70  * Entry kept for each value we are watching.
71  */
72 struct GNUNET_STATISTICS_WatchEntry
73 {
74
75   /**
76    * What subsystem is this action about? (never NULL)
77    */
78   char *subsystem;
79
80   /**
81    * What value is this action about? (never NULL)
82    */
83   char *name;
84
85   /**
86    * Function to call
87    */
88   GNUNET_STATISTICS_Iterator proc;
89
90   /**
91    * Closure for @e proc
92    */
93   void *proc_cls;
94
95 };
96
97
98 /**
99  * Linked list of things we still need to do.
100  */
101 struct GNUNET_STATISTICS_GetHandle
102 {
103
104   /**
105    * This is a doubly linked list.
106    */
107   struct GNUNET_STATISTICS_GetHandle *next;
108
109   /**
110    * This is a doubly linked list.
111    */
112   struct GNUNET_STATISTICS_GetHandle *prev;
113
114   /**
115    * Main statistics handle.
116    */
117   struct GNUNET_STATISTICS_Handle *sh;
118
119   /**
120    * What subsystem is this action about? (can be NULL)
121    */
122   char *subsystem;
123
124   /**
125    * What value is this action about? (can be NULL)
126    */
127   char *name;
128
129   /**
130    * Continuation to call once action is complete.
131    */
132   GNUNET_STATISTICS_Callback cont;
133
134   /**
135    * Function to call (for GET actions only).
136    */
137   GNUNET_STATISTICS_Iterator proc;
138
139   /**
140    * Closure for @e proc and @e cont.
141    */
142   void *cls;
143
144   /**
145    * Timeout for this action.
146    */
147   struct GNUNET_TIME_Absolute timeout;
148
149   /**
150    * Associated value.
151    */
152   uint64_t value;
153
154   /**
155    * Flag for SET/UPDATE actions.
156    */
157   int make_persistent;
158
159   /**
160    * Has the current iteration been aborted; for GET actions.
161    */
162   int aborted;
163
164   /**
165    * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH?
166    */
167   enum ActionType type;
168
169   /**
170    * Size of the message that we will be transmitting.
171    */
172   uint16_t msize;
173
174 };
175
176
177 /**
178  * Handle for the service.
179  */
180 struct GNUNET_STATISTICS_Handle
181 {
182   /**
183    * Name of our subsystem.
184    */
185   char *subsystem;
186
187   /**
188    * Configuration to use.
189    */
190   const struct GNUNET_CONFIGURATION_Handle *cfg;
191
192   /**
193    * Message queue to the service.
194    */
195   struct GNUNET_MQ_Handle *mq;
196
197   /**
198    * Head of the linked list of pending actions (first action
199    * to be performed).
200    */
201   struct GNUNET_STATISTICS_GetHandle *action_head;
202
203   /**
204    * Tail of the linked list of actions (for fast append).
205    */
206   struct GNUNET_STATISTICS_GetHandle *action_tail;
207
208   /**
209    * Action we are currently busy with (action request has been
210    * transmitted, we're now receiving the response from the
211    * service).
212    */
213   struct GNUNET_STATISTICS_GetHandle *current;
214
215   /**
216    * Array of watch entries.
217    */
218   struct GNUNET_STATISTICS_WatchEntry **watches;
219
220   /**
221    * Task doing exponential back-off trying to reconnect.
222    */
223   struct GNUNET_SCHEDULER_Task *backoff_task;
224
225   /**
226    * Task for running #do_destroy().
227    */
228   struct GNUNET_SCHEDULER_Task *destroy_task;
229
230   /**
231    * Time for next connect retry.
232    */
233   struct GNUNET_TIME_Relative backoff;
234
235   /**
236    * Maximum heap size observed so far (if available).
237    */
238   uint64_t peak_heap_size;
239
240   /**
241    * Maximum resident set side observed so far (if available).
242    */
243   uint64_t peak_rss;
244
245   /**
246    * Size of the @e watches array.
247    */
248   unsigned int watches_size;
249
250   /**
251    * Should this handle auto-destruct once all actions have
252    * been processed?
253    */
254   int do_destroy;
255
256   /**
257    * Are we currently receiving from the service?
258    */
259   int receiving;
260
261 };
262
263
264 /**
265  * Obtain statistics about this process's memory consumption and
266  * report those as well (if they changed).
267  */
268 static void
269 update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
270 {
271 #if ENABLE_HEAP_STATISTICS
272   uint64_t current_heap_size = 0;
273   uint64_t current_rss = 0;
274
275   if (GNUNET_NO != h->do_destroy)
276     return;
277 #if HAVE_MALLINFO
278   {
279     struct mallinfo mi;
280
281     mi = mallinfo();
282     current_heap_size = mi.uordblks + mi.fordblks;
283   }
284 #endif
285 #if HAVE_GETRUSAGE
286   {
287     struct rusage ru;
288
289     if (0 == getrusage (RUSAGE_SELF, &ru))
290     {
291       current_rss = 1024LL * ru.ru_maxrss;
292     }
293   }
294 #endif
295   if (current_heap_size > h->peak_heap_size)
296   {
297     h->peak_heap_size = current_heap_size;
298     GNUNET_STATISTICS_set (h, "# peak heap size", current_heap_size, GNUNET_NO);
299   }
300   if (current_rss > h->peak_rss)
301   {
302     h->peak_rss = current_rss;
303     GNUNET_STATISTICS_set (h, "# peak resident set size", current_rss, GNUNET_NO);
304   }
305 #endif
306 }
307
308
309 /**
310  * Reconnect at a later time, respecting back-off.
311  *
312  * @param h statistics handle
313  */
314 static void
315 reconnect_later (struct GNUNET_STATISTICS_Handle *h);
316
317
318 /**
319  * Schedule the next action to be performed.
320  *
321  * @param cls statistics handle to reconnect
322  */
323 static void
324 schedule_action (void *cls);
325
326
327 /**
328  * Transmit request to service that we want to watch
329  * the development of a particular value.
330  *
331  * @param h statistics handle
332  * @param watch watch entry of the value to watch
333  */
334 static void
335 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
336                         struct GNUNET_STATISTICS_WatchEntry *watch)
337 {
338   struct GNUNET_STATISTICS_GetHandle *ai;
339   size_t slen;
340   size_t nlen;
341   size_t nsize;
342
343   slen = strlen (watch->subsystem) + 1;
344   nlen = strlen (watch->name) + 1;
345   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
346   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
347   {
348     GNUNET_break (0);
349     return;
350   }
351   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
352   ai->sh = h;
353   ai->subsystem = GNUNET_strdup (watch->subsystem);
354   ai->name = GNUNET_strdup (watch->name);
355   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
356   ai->msize = nsize;
357   ai->type = ACTION_WATCH;
358   ai->proc = watch->proc;
359   ai->cls = watch->proc_cls;
360   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
361                                     h->action_tail,
362                                     ai);
363   schedule_action (h);
364 }
365
366
367 /**
368  * Free memory associated with the given action item.
369  *
370  * @param gh action item to free
371  */
372 static void
373 free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
374 {
375   GNUNET_free_non_null (gh->subsystem);
376   GNUNET_free_non_null (gh->name);
377   GNUNET_free (gh);
378 }
379
380
381 /**
382  * Disconnect from the statistics service.
383  *
384  * @param h statistics handle to disconnect from
385  */
386 static void
387 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
388 {
389   struct GNUNET_STATISTICS_GetHandle *c;
390
391   h->receiving = GNUNET_NO;
392   if (NULL != (c = h->current))
393   {
394     h->current = NULL;
395     if ( (NULL != c->cont) &&
396          (GNUNET_YES != c->aborted) )
397     {
398       c->cont (c->cls,
399                GNUNET_SYSERR);
400       c->cont = NULL;
401     }
402     free_action_item (c);
403   }
404   if (NULL != h->mq)
405   {
406     GNUNET_MQ_destroy (h->mq);
407     h->mq = NULL;
408   }
409 }
410
411
412 /**
413  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
414  *
415  * @param cls statistics handle
416  * @param smsg message received from the service, never NULL
417  * @return #GNUNET_OK if the message was well-formed
418  */
419 static int
420 check_statistics_value (void *cls,
421                         const struct GNUNET_STATISTICS_ReplyMessage *smsg)
422 {
423   const char *service;
424   const char *name;
425   uint16_t size;
426
427   size = ntohs (smsg->header.size);
428   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
429   if (size !=
430       GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
431                                       size,
432                                       2,
433                                       &service,
434                                       &name))
435   {
436     GNUNET_break (0);
437     return GNUNET_SYSERR;
438   }
439   return GNUNET_OK;
440 }
441
442
443 /**
444  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
445  *
446  * @param cls statistics handle
447  * @param msg message received from the service, never NULL
448  * @return #GNUNET_OK if the message was well-formed
449  */
450 static void
451 handle_statistics_value (void *cls,
452                          const struct GNUNET_STATISTICS_ReplyMessage *smsg)
453 {
454   struct GNUNET_STATISTICS_Handle *h = cls;
455   const char *service;
456   const char *name;
457   uint16_t size;
458
459   if (h->current->aborted)
460     return;           /* iteration aborted, don't bother */
461
462   size = ntohs (smsg->header.size);
463   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
464   GNUNET_assert (size ==
465                  GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
466                                                  size,
467                                                  2,
468                                                  &service,
469                                                  &name));
470   LOG (GNUNET_ERROR_TYPE_DEBUG,
471        "Received valid statistic on `%s:%s': %llu\n",
472        service, name,
473        GNUNET_ntohll (smsg->value));
474   if (GNUNET_OK !=
475       h->current->proc (h->current->cls,
476                         service,
477                         name,
478                         GNUNET_ntohll (smsg->value),
479                         0 !=
480                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
481   {
482     LOG (GNUNET_ERROR_TYPE_DEBUG,
483          "Processing of remaining statistics aborted by client.\n");
484     h->current->aborted = GNUNET_YES;
485   }
486 }
487
488
489 /**
490  * We have received a watch value from the service.  Process it.
491  *
492  * @param cls statistics handle
493  * @param msg the watch value message
494  */
495 static void
496 handle_statistics_watch_value (void *cls,
497                                const struct GNUNET_STATISTICS_WatchValueMessage *wvm)
498 {
499   struct GNUNET_STATISTICS_Handle *h = cls;
500   struct GNUNET_STATISTICS_WatchEntry *w;
501   uint32_t wid;
502
503   GNUNET_break (0 == ntohl (wvm->reserved));
504   wid = ntohl (wvm->wid);
505   if (wid >= h->watches_size)
506   {
507     do_disconnect (h);
508     reconnect_later (h);
509     return;
510   }
511   w = h->watches[wid];
512   if (NULL == w)
513     return;
514   (void) w->proc (w->proc_cls,
515                   w->subsystem,
516                   w->name,
517                   GNUNET_ntohll (wvm->value),
518                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
519 }
520
521
522 /**
523  * Generic error handler, called with the appropriate error code and
524  * the same closure specified at the creation of the message queue.
525  * Not every message queue implementation supports an error handler.
526  *
527  * @param cls closure with the `struct GNUNET_STATISTICS_Handle *`
528  * @param error error code
529  */
530 static void
531 mq_error_handler (void *cls,
532                   enum GNUNET_MQ_Error error)
533 {
534   struct GNUNET_STATISTICS_Handle *h = cls;
535
536   if (GNUNET_NO != h->do_destroy)
537   {
538     h->do_destroy = GNUNET_NO;
539     if (NULL != h->destroy_task)
540     {
541       GNUNET_SCHEDULER_cancel (h->destroy_task);
542       h->destroy_task = NULL;
543     }
544     GNUNET_STATISTICS_destroy (h,
545                                GNUNET_NO);
546     return;
547   }
548   do_disconnect (h);
549   reconnect_later (h);
550 }
551
552
553 /**
554  * Task used to destroy the statistics handle.
555  *
556  * @param cls the `struct GNUNET_STATISTICS_Handle`
557  */
558 static void
559 do_destroy (void *cls)
560 {
561   struct GNUNET_STATISTICS_Handle *h = cls;
562
563   h->destroy_task = NULL;
564   h->do_destroy = GNUNET_NO;
565   LOG (GNUNET_ERROR_TYPE_DEBUG,
566        "Running final destruction\n");
567   GNUNET_STATISTICS_destroy (h,
568                              GNUNET_NO);
569 }
570
571
572 /**
573  * Handle a #GNUNET_MESSAGE_TYPE_TEST (sic) message. We receive this
574  * message at the end of the shutdown when the service confirms that
575  * all data has been written to disk.
576  *
577  * @param cls our `struct GNUNET_STATISTICS_Handle *`
578  * @param msg the message
579  */
580 static void
581 handle_test (void *cls,
582              const struct GNUNET_MessageHeader *msg)
583 {
584   struct GNUNET_STATISTICS_Handle *h = cls;
585
586   if (GNUNET_SYSERR != h->do_destroy)
587   {
588     /* not in shutdown, why do we get 'TEST'? */
589     GNUNET_break (0);
590     do_disconnect (h);
591     reconnect_later (h);
592     return;
593   }
594   LOG (GNUNET_ERROR_TYPE_DEBUG,
595        "Received TEST message from statistics, can complete disconnect\n");
596   if (NULL != h->destroy_task)
597     GNUNET_SCHEDULER_cancel (h->destroy_task);
598   h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
599                                               h);
600 }
601
602
603 /**
604  * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive
605  * this message in response to a query to indicate that there are no
606  * further matching results.
607  *
608  * @param cls our `struct GNUNET_STATISTICS_Handle *`
609  * @param msg the message
610  */
611 static void
612 handle_statistics_end (void *cls,
613                        const struct GNUNET_MessageHeader *msg)
614 {
615   struct GNUNET_STATISTICS_Handle *h = cls;
616   struct GNUNET_STATISTICS_GetHandle *c;
617
618   LOG (GNUNET_ERROR_TYPE_DEBUG,
619        "Received end of statistics marker\n");
620   if (NULL == (c = h->current))
621   {
622     GNUNET_break (0);
623     do_disconnect (h);
624     reconnect_later (h);
625     return;
626   }
627   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
628   h->current = NULL;
629   schedule_action (h);
630   if (NULL != c->cont)
631   {
632     c->cont (c->cls,
633              GNUNET_OK);
634     c->cont = NULL;
635   }
636   free_action_item (c);
637 }
638
639
640 /**
641  * Try to (re)connect to the statistics service.
642  *
643  * @param h statistics handle to reconnect
644  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
645  */
646 static int
647 try_connect (struct GNUNET_STATISTICS_Handle *h)
648 {
649   struct GNUNET_MQ_MessageHandler handlers[] = {
650     GNUNET_MQ_hd_fixed_size (test,
651                              GNUNET_MESSAGE_TYPE_TEST,
652                              struct GNUNET_MessageHeader,
653                              h),
654     GNUNET_MQ_hd_fixed_size (statistics_end,
655                              GNUNET_MESSAGE_TYPE_STATISTICS_END,
656                              struct GNUNET_MessageHeader,
657                              h),
658     GNUNET_MQ_hd_var_size (statistics_value,
659                            GNUNET_MESSAGE_TYPE_STATISTICS_VALUE,
660                            struct GNUNET_STATISTICS_ReplyMessage,
661                            h),
662     GNUNET_MQ_hd_fixed_size (statistics_watch_value,
663                              GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE,
664                              struct GNUNET_STATISTICS_WatchValueMessage,
665                              h),
666     GNUNET_MQ_handler_end ()
667   };
668   struct GNUNET_STATISTICS_GetHandle *gh;
669   struct GNUNET_STATISTICS_GetHandle *gn;
670
671   if (NULL != h->backoff_task)
672     return GNUNET_NO;
673   if (NULL != h->mq)
674     return GNUNET_YES;
675   h->mq = GNUNET_CLIENT_connecT (h->cfg,
676                                  "statistics",
677                                  handlers,
678                                  &mq_error_handler,
679                                  h);
680   if (NULL == h->mq)
681   {
682     LOG (GNUNET_ERROR_TYPE_DEBUG,
683          "Failed to connect to statistics service!\n");
684     return GNUNET_NO;
685   }
686   gn = h->action_head;
687   while (NULL != (gh = gn))
688   {
689     gn = gh->next;
690     if (gh->type == ACTION_WATCH)
691     {
692       GNUNET_CONTAINER_DLL_remove (h->action_head,
693                                    h->action_tail,
694                                    gh);
695       free_action_item (gh);
696     }
697   }
698   for (unsigned int i = 0; i < h->watches_size; i++)
699     if (NULL != h->watches[i])
700       schedule_watch_request (h,
701                               h->watches[i]);
702   return GNUNET_YES;
703 }
704
705
706 /**
707  * We've waited long enough, reconnect now.
708  *
709  * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect
710  */
711 static void
712 reconnect_task (void *cls)
713 {
714   struct GNUNET_STATISTICS_Handle *h = cls;
715
716   h->backoff_task = NULL;
717   schedule_action (h);
718 }
719
720
721 /**
722  * Reconnect at a later time, respecting back-off.
723  *
724  * @param h statistics handle
725  */
726 static void
727 reconnect_later (struct GNUNET_STATISTICS_Handle *h)
728 {
729   int loss;
730   struct GNUNET_STATISTICS_GetHandle *gh;
731
732   GNUNET_assert (NULL == h->backoff_task);
733   if (GNUNET_YES == h->do_destroy)
734   {
735     /* So we are shutting down and the service is not reachable.
736      * Chances are that it's down for good and we are not going to connect to
737      * it anymore.
738      * Give up and don't sync the rest of the data.
739      */
740     loss = GNUNET_NO;
741     for (gh = h->action_head; NULL != gh; gh = gh->next)
742       if ( (gh->make_persistent) && (ACTION_SET == gh->type) )
743         loss = GNUNET_YES;
744     if (GNUNET_YES == loss)
745       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
746                   _("Could not save some persistent statistics\n"));
747     if (NULL != h->destroy_task)
748       GNUNET_SCHEDULER_cancel (h->destroy_task);
749     h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
750                                                 h);
751     return;
752   }
753   h->backoff_task
754     = GNUNET_SCHEDULER_add_delayed (h->backoff,
755                                     &reconnect_task,
756                                     h);
757   h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
758 }
759
760
761
762 /**
763  * Transmit a GET request (and if successful, start to receive
764  * the response).
765  *
766  * @param handle statistics handle
767  */
768 static void
769 transmit_get (struct GNUNET_STATISTICS_Handle *handle)
770 {
771   struct GNUNET_STATISTICS_GetHandle *c;
772   struct GNUNET_MessageHeader *hdr;
773   struct GNUNET_MQ_Envelope *env;
774   size_t slen1;
775   size_t slen2;
776
777   GNUNET_assert (NULL != (c = handle->current));
778   slen1 = strlen (c->subsystem) + 1;
779   slen2 = strlen (c->name) + 1;
780   env = GNUNET_MQ_msg_extra (hdr,
781                              slen1 + slen2,
782                              GNUNET_MESSAGE_TYPE_STATISTICS_GET);
783   GNUNET_assert (slen1 + slen2 ==
784                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
785                                              slen1 + slen2,
786                                              2,
787                                              c->subsystem,
788                                              c->name));
789   GNUNET_MQ_notify_sent (env,
790                          &schedule_action,
791                          handle);
792   GNUNET_MQ_send (handle->mq,
793                   env);
794 }
795
796
797 /**
798  * Transmit a WATCH request (and if successful, start to receive
799  * the response).
800  *
801  * @param handle statistics handle
802  */
803 static void
804 transmit_watch (struct GNUNET_STATISTICS_Handle *handle)
805 {
806   struct GNUNET_MessageHeader *hdr;
807   struct GNUNET_MQ_Envelope *env;
808   size_t slen1;
809   size_t slen2;
810
811   LOG (GNUNET_ERROR_TYPE_DEBUG,
812        "Transmitting watch request for `%s'\n",
813        handle->current->name);
814   slen1 = strlen (handle->current->subsystem) + 1;
815   slen2 = strlen (handle->current->name) + 1;
816   env = GNUNET_MQ_msg_extra (hdr,
817                              slen1 + slen2,
818                              GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
819   GNUNET_assert (slen1 + slen2 ==
820                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
821                                              slen1 + slen2,
822                                              2,
823                                              handle->current->subsystem,
824                                              handle->current->name));
825   GNUNET_MQ_notify_sent (env,
826                          &schedule_action,
827                          handle);
828   GNUNET_MQ_send (handle->mq,
829                   env);
830   GNUNET_assert (NULL == handle->current->cont);
831   free_action_item (handle->current);
832   handle->current = NULL;
833   schedule_action (handle);
834 }
835
836
837 /**
838  * Transmit a SET/UPDATE request.
839  *
840  * @param handle statistics handle
841  */
842 static void
843 transmit_set (struct GNUNET_STATISTICS_Handle *handle)
844 {
845   struct GNUNET_STATISTICS_SetMessage *r;
846   struct GNUNET_MQ_Envelope *env;
847   size_t slen;
848   size_t nlen;
849
850   slen = strlen (handle->current->subsystem) + 1;
851   nlen = strlen (handle->current->name) + 1;
852   env = GNUNET_MQ_msg_extra (r,
853                              slen + nlen,
854                              GNUNET_MESSAGE_TYPE_STATISTICS_SET);
855   r->flags = 0;
856   r->value = GNUNET_htonll (handle->current->value);
857   if (handle->current->make_persistent)
858     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
859   if (handle->current->type == ACTION_UPDATE)
860     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
861   GNUNET_assert (slen + nlen ==
862                  GNUNET_STRINGS_buffer_fill ((char *) &r[1],
863                                              slen + nlen,
864                                              2,
865                                              handle->current->subsystem,
866                                              handle->current->name));
867   GNUNET_assert (NULL == handle->current->cont);
868   free_action_item (handle->current);
869   handle->current = NULL;
870   update_memory_statistics (handle);
871   GNUNET_MQ_notify_sent (env,
872                          &schedule_action,
873                          handle);
874   GNUNET_MQ_send (handle->mq,
875                   env);
876 }
877
878
879 /**
880  * Get handle for the statistics service.
881  *
882  * @param subsystem name of subsystem using the service
883  * @param cfg services configuration in use
884  * @return handle to use
885  */
886 struct GNUNET_STATISTICS_Handle *
887 GNUNET_STATISTICS_create (const char *subsystem,
888                           const struct GNUNET_CONFIGURATION_Handle *cfg)
889 {
890   struct GNUNET_STATISTICS_Handle *h;
891
892   if (GNUNET_YES ==
893       GNUNET_CONFIGURATION_get_value_yesno (cfg,
894                                             "statistics",
895                                             "DISABLE"))
896     return NULL;
897   h = GNUNET_new (struct GNUNET_STATISTICS_Handle);
898   h->cfg = cfg;
899   h->subsystem = GNUNET_strdup (subsystem);
900   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
901   return h;
902 }
903
904
905 /**
906  * Destroy a handle (free all state associated with
907  * it).
908  *
909  * @param h statistics handle to destroy
910  * @param sync_first set to #GNUNET_YES if pending SET requests should
911  *        be completed
912  */
913 void
914 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
915                            int sync_first)
916 {
917   struct GNUNET_STATISTICS_GetHandle *pos;
918   struct GNUNET_STATISTICS_GetHandle *next;
919
920   if (NULL == h)
921     return;
922   GNUNET_assert (GNUNET_NO == h->do_destroy); /* Don't call twice. */
923   if ( (sync_first) &&
924        (0 != GNUNET_MQ_get_length (h->mq)) &&
925        (GNUNET_YES == try_connect (h)) )
926   {
927     if ( (NULL != h->current) &&
928          (ACTION_GET == h->current->type) )
929       h->current->aborted = GNUNET_YES;
930     next = h->action_head;
931     while (NULL != (pos = next))
932     {
933       next = pos->next;
934       if ( (ACTION_GET == pos->type) ||
935            (ACTION_WATCH == pos->type) ||
936            (GNUNET_NO == pos->make_persistent) )
937       {
938         GNUNET_CONTAINER_DLL_remove (h->action_head,
939                                      h->action_tail,
940                                      pos);
941         free_action_item (pos);
942       }
943     }
944     h->do_destroy = GNUNET_YES;
945     schedule_action (h);
946     GNUNET_assert (NULL == h->destroy_task);
947     h->destroy_task
948       = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (h->backoff,
949                                                                      5),
950                                       &do_destroy,
951                                       h);
952     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
953                 "Deferring destruction\n");
954     return; /* do not finish destruction just yet */
955   }
956   /* do clean up all */
957   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958               "Cleaning all up\n");
959   while (NULL != (pos = h->action_head))
960   {
961     GNUNET_CONTAINER_DLL_remove (h->action_head,
962                                  h->action_tail,
963                                  pos);
964     free_action_item (pos);
965   }
966   do_disconnect (h);
967   if (NULL != h->backoff_task)
968   {
969     GNUNET_SCHEDULER_cancel (h->backoff_task);
970     h->backoff_task = NULL;
971   }
972   if (NULL != h->destroy_task)
973   {
974     GNUNET_break (0);
975     GNUNET_SCHEDULER_cancel (h->destroy_task);
976     h->destroy_task = NULL;
977   }
978   for (unsigned int i = 0; i < h->watches_size; i++)
979   {
980     if (NULL == h->watches[i])
981       continue;
982     GNUNET_free (h->watches[i]->subsystem);
983     GNUNET_free (h->watches[i]->name);
984     GNUNET_free (h->watches[i]);
985   }
986   GNUNET_array_grow (h->watches,
987                      h->watches_size,
988                      0);
989   GNUNET_free (h->subsystem);
990   GNUNET_free (h);
991 }
992
993
994 /**
995  * Schedule the next action to be performed.
996  *
997  * @param cls statistics handle
998  */
999 static void
1000 schedule_action (void *cls)
1001 {
1002   struct GNUNET_STATISTICS_Handle *h = cls;
1003
1004   if (NULL != h->backoff_task)
1005     return;                     /* action already pending */
1006   if (GNUNET_YES != try_connect (h))
1007   {
1008     reconnect_later (h);
1009     return;
1010   }
1011   if (0 < GNUNET_MQ_get_length (h->mq) )
1012     return; /* Wait for queue to be reduced more */
1013   /* schedule next action */
1014   while (NULL == h->current)
1015   {
1016     h->current = h->action_head;
1017     if (NULL == h->current)
1018     {
1019       struct GNUNET_MessageHeader *hdr;
1020       struct GNUNET_MQ_Envelope *env;
1021
1022       if (GNUNET_YES != h->do_destroy)
1023         return; /* nothing to do */
1024       /* let service know that we're done */
1025       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1026                   "Notifying service that we are done\n");
1027       h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
1028       env = GNUNET_MQ_msg (hdr,
1029                            GNUNET_MESSAGE_TYPE_TEST);
1030       GNUNET_MQ_notify_sent (env,
1031                              &schedule_action,
1032                              h);
1033       GNUNET_MQ_send (h->mq,
1034                       env);
1035       return;
1036     }
1037     GNUNET_CONTAINER_DLL_remove (h->action_head,
1038                                  h->action_tail,
1039                                  h->current);
1040     switch (h->current->type)
1041     {
1042     case ACTION_GET:
1043       transmit_get (h);
1044       break;
1045     case ACTION_SET:
1046     case ACTION_UPDATE:
1047       transmit_set (h);
1048       break;
1049     case ACTION_WATCH:
1050       transmit_watch (h);
1051       break;
1052     default:
1053       GNUNET_assert (0);
1054       break;
1055     }
1056   }
1057 }
1058
1059
1060 /**
1061  * Get statistic from the peer.
1062  *
1063  * @param handle identification of the statistics service
1064  * @param subsystem limit to the specified subsystem, NULL for our subsystem
1065  * @param name name of the statistic value, NULL for all values
1066  * @param cont continuation to call when done (can be NULL)
1067  *        This callback CANNOT destroy the statistics handle in the same call.
1068  * @param proc function to call on each value
1069  * @param cls closure for @a cont and @a proc
1070  * @return NULL on error
1071  */
1072 struct GNUNET_STATISTICS_GetHandle *
1073 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1074                        const char *subsystem,
1075                        const char *name,
1076                        GNUNET_STATISTICS_Callback cont,
1077                        GNUNET_STATISTICS_Iterator proc,
1078                        void *cls)
1079 {
1080   size_t slen1;
1081   size_t slen2;
1082   struct GNUNET_STATISTICS_GetHandle *ai;
1083
1084   if (NULL == handle)
1085     return NULL;
1086   GNUNET_assert (NULL != proc);
1087   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1088   if (NULL == subsystem)
1089     subsystem = "";
1090   if (NULL == name)
1091     name = "";
1092   slen1 = strlen (subsystem) + 1;
1093   slen2 = strlen (name) + 1;
1094   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
1095                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1096   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1097   ai->sh = handle;
1098   ai->subsystem = GNUNET_strdup (subsystem);
1099   ai->name = GNUNET_strdup (name);
1100   ai->cont = cont;
1101   ai->proc = proc;
1102   ai->cls = cls;
1103   ai->type = ACTION_GET;
1104   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
1105   GNUNET_CONTAINER_DLL_insert_tail (handle->action_head,
1106                                     handle->action_tail,
1107                                     ai);
1108   schedule_action (handle);
1109   return ai;
1110 }
1111
1112
1113 /**
1114  * Cancel a 'get' request.  Must be called before the 'cont'
1115  * function is called.
1116  *
1117  * @param gh handle of the request to cancel
1118  */
1119 void
1120 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1121 {
1122   if (NULL == gh)
1123     return;
1124   gh->cont = NULL;
1125   if (gh->sh->current == gh)
1126   {
1127     gh->aborted = GNUNET_YES;
1128     return;
1129   }
1130   GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
1131                                gh->sh->action_tail,
1132                                gh);
1133   GNUNET_free (gh->name);
1134   GNUNET_free (gh->subsystem);
1135   GNUNET_free (gh);
1136 }
1137
1138
1139 /**
1140  * Watch statistics from the peer (be notified whenever they change).
1141  *
1142  * @param handle identification of the statistics service
1143  * @param subsystem limit to the specified subsystem, never NULL
1144  * @param name name of the statistic value, never NULL
1145  * @param proc function to call on each value
1146  * @param proc_cls closure for @a proc
1147  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
1148  */
1149 int
1150 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1151                          const char *subsystem,
1152                          const char *name,
1153                          GNUNET_STATISTICS_Iterator proc,
1154                          void *proc_cls)
1155 {
1156   struct GNUNET_STATISTICS_WatchEntry *w;
1157
1158   if (NULL == handle)
1159     return GNUNET_SYSERR;
1160   w = GNUNET_new (struct GNUNET_STATISTICS_WatchEntry);
1161   w->subsystem = GNUNET_strdup (subsystem);
1162   w->name = GNUNET_strdup (name);
1163   w->proc = proc;
1164   w->proc_cls = proc_cls;
1165   GNUNET_array_append (handle->watches,
1166                        handle->watches_size,
1167                        w);
1168   schedule_watch_request (handle,
1169                           w);
1170   return GNUNET_OK;
1171 }
1172
1173
1174 /**
1175  * Stop watching statistics from the peer.
1176  *
1177  * @param handle identification of the statistics service
1178  * @param subsystem limit to the specified subsystem, never NULL
1179  * @param name name of the statistic value, never NULL
1180  * @param proc function to call on each value
1181  * @param proc_cls closure for @a proc
1182  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error (no such watch)
1183  */
1184 int
1185 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
1186                                 const char *subsystem,
1187                                 const char *name,
1188                                 GNUNET_STATISTICS_Iterator proc,
1189                                 void *proc_cls)
1190 {
1191   struct GNUNET_STATISTICS_WatchEntry *w;
1192
1193   if (NULL == handle)
1194     return GNUNET_SYSERR;
1195   for (unsigned int i=0;i<handle->watches_size;i++)
1196   {
1197     w = handle->watches[i];
1198     if (NULL == w)
1199       continue;
1200     if ( (w->proc == proc) &&
1201          (w->proc_cls == proc_cls) &&
1202          (0 == strcmp (w->name, name)) &&
1203          (0 == strcmp (w->subsystem, subsystem)) )
1204     {
1205       GNUNET_free (w->name);
1206       GNUNET_free (w->subsystem);
1207       GNUNET_free (w);
1208       handle->watches[i] = NULL;
1209       return GNUNET_OK;
1210     }
1211   }
1212   return GNUNET_SYSERR;
1213 }
1214
1215
1216 /**
1217  * Queue a request to change a statistic.
1218  *
1219  * @param h statistics handle
1220  * @param name name of the value
1221  * @param make_persistent  should the value be kept across restarts?
1222  * @param value new value or change
1223  * @param type type of the action (#ACTION_SET or #ACTION_UPDATE)
1224  */
1225 static void
1226 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
1227                    const char *name,
1228                    int make_persistent,
1229                    uint64_t value,
1230                    enum ActionType type)
1231 {
1232   struct GNUNET_STATISTICS_GetHandle *ai;
1233   size_t slen;
1234   size_t nlen;
1235   size_t nsize;
1236   int64_t delta;
1237
1238   slen = strlen (h->subsystem) + 1;
1239   nlen = strlen (name) + 1;
1240   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1241   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1242   {
1243     GNUNET_break (0);
1244     return;
1245   }
1246   for (ai = h->action_head; NULL != ai; ai = ai->next)
1247   {
1248     if (! ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
1249             (0 == strcmp (ai->name, name)) &&
1250             ( (ACTION_UPDATE == ai->type) ||
1251               (ACTION_SET == ai->type) ) ) )
1252       continue;
1253     if (ACTION_SET == ai->type)
1254     {
1255       if (ACTION_UPDATE == type)
1256       {
1257         delta = (int64_t) value;
1258         if (delta > 0)
1259         {
1260           /* update old set by new delta */
1261           ai->value += delta;
1262         }
1263         else
1264         {
1265           /* update old set by new delta, but never go negative */
1266           if (ai->value < -delta)
1267             ai->value = 0;
1268           else
1269             ai->value += delta;
1270         }
1271       }
1272       else
1273       {
1274         /* new set overrides old set */
1275         ai->value = value;
1276       }
1277     }
1278     else
1279     {
1280       if (ACTION_UPDATE == type)
1281       {
1282         /* make delta cummulative */
1283         delta = (int64_t) value;
1284         ai->value += delta;
1285       }
1286       else
1287       {
1288         /* drop old 'update', use new 'set' instead */
1289         ai->value = value;
1290         ai->type = type;
1291       }
1292     }
1293     ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1294     ai->make_persistent = make_persistent;
1295     return;
1296   }
1297   /* no existing entry matches, create a fresh one */
1298   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1299   ai->sh = h;
1300   ai->subsystem = GNUNET_strdup (h->subsystem);
1301   ai->name = GNUNET_strdup (name);
1302   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1303   ai->make_persistent = make_persistent;
1304   ai->msize = nsize;
1305   ai->value = value;
1306   ai->type = type;
1307   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
1308                                     h->action_tail,
1309                                     ai);
1310   schedule_action (h);
1311 }
1312
1313
1314 /**
1315  * Set statistic value for the peer.  Will always use our
1316  * subsystem (the argument used when "handle" was created).
1317  *
1318  * @param handle identification of the statistics service
1319  * @param name name of the statistic value
1320  * @param value new value to set
1321  * @param make_persistent should the value be kept across restarts?
1322  */
1323 void
1324 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1325                        const char *name,
1326                        uint64_t value,
1327                        int make_persistent)
1328 {
1329   if (NULL == handle)
1330     return;
1331   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1332   add_setter_action (handle,
1333                      name,
1334                      make_persistent,
1335                      value,
1336                      ACTION_SET);
1337 }
1338
1339
1340 /**
1341  * Set statistic value for the peer.  Will always use our
1342  * subsystem (the argument used when "handle" was created).
1343  *
1344  * @param handle identification of the statistics service
1345  * @param name name of the statistic value
1346  * @param delta change in value (added to existing value)
1347  * @param make_persistent should the value be kept across restarts?
1348  */
1349 void
1350 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1351                           const char *name,
1352                           int64_t delta,
1353                           int make_persistent)
1354 {
1355   if (NULL == handle)
1356     return;
1357   if (0 == delta)
1358     return;
1359   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1360   add_setter_action (handle,
1361                      name,
1362                      make_persistent,
1363                      (uint64_t) delta,
1364                      ACTION_UPDATE);
1365 }
1366
1367
1368 /* end of statistics_api.c */