-rps doxygen
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "statistics.h"
32
33 /**
34  * How long do we wait until a statistics request for setting
35  * a value times out?  (The update will be lost if the
36  * service does not react within this timeframe).
37  */
38 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
39
40 #define LOG(kind,...) GNUNET_log_from (kind, "statistics-api",__VA_ARGS__)
41
42 /**
43  * Types of actions.
44  */
45 enum ActionType
46 {
47   /**
48    * Get a value.
49    */
50   ACTION_GET,
51
52   /**
53    * Set a value.
54    */
55   ACTION_SET,
56
57   /**
58    * Update a value.
59    */
60   ACTION_UPDATE,
61
62   /**
63    * Watch a value.
64    */
65   ACTION_WATCH
66 };
67
68
69 /**
70  * Entry kept for each value we are watching.
71  */
72 struct GNUNET_STATISTICS_WatchEntry
73 {
74
75   /**
76    * What subsystem is this action about? (never NULL)
77    */
78   char *subsystem;
79
80   /**
81    * What value is this action about? (never NULL)
82    */
83   char *name;
84
85   /**
86    * Function to call
87    */
88   GNUNET_STATISTICS_Iterator proc;
89
90   /**
91    * Closure for @e proc
92    */
93   void *proc_cls;
94
95 };
96
97
98 /**
99  * Linked list of things we still need to do.
100  */
101 struct GNUNET_STATISTICS_GetHandle
102 {
103
104   /**
105    * This is a doubly linked list.
106    */
107   struct GNUNET_STATISTICS_GetHandle *next;
108
109   /**
110    * This is a doubly linked list.
111    */
112   struct GNUNET_STATISTICS_GetHandle *prev;
113
114   /**
115    * Main statistics handle.
116    */
117   struct GNUNET_STATISTICS_Handle *sh;
118
119   /**
120    * What subsystem is this action about? (can be NULL)
121    */
122   char *subsystem;
123
124   /**
125    * What value is this action about? (can be NULL)
126    */
127   char *name;
128
129   /**
130    * Continuation to call once action is complete.
131    */
132   GNUNET_STATISTICS_Callback cont;
133
134   /**
135    * Function to call (for GET actions only).
136    */
137   GNUNET_STATISTICS_Iterator proc;
138
139   /**
140    * Closure for @e proc and @e cont.
141    */
142   void *cls;
143
144   /**
145    * Timeout for this action.
146    */
147   struct GNUNET_TIME_Absolute timeout;
148
149   /**
150    * Associated value.
151    */
152   uint64_t value;
153
154   /**
155    * Flag for SET/UPDATE actions.
156    */
157   int make_persistent;
158
159   /**
160    * Has the current iteration been aborted; for GET actions.
161    */
162   int aborted;
163
164   /**
165    * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH?
166    */
167   enum ActionType type;
168
169   /**
170    * Size of the message that we will be transmitting.
171    */
172   uint16_t msize;
173
174 };
175
176
177 /**
178  * Handle for the service.
179  */
180 struct GNUNET_STATISTICS_Handle
181 {
182   /**
183    * Name of our subsystem.
184    */
185   char *subsystem;
186
187   /**
188    * Configuration to use.
189    */
190   const struct GNUNET_CONFIGURATION_Handle *cfg;
191
192   /**
193    * Message queue to the service.
194    */
195   struct GNUNET_MQ_Handle *mq;
196
197   /**
198    * Head of the linked list of pending actions (first action
199    * to be performed).
200    */
201   struct GNUNET_STATISTICS_GetHandle *action_head;
202
203   /**
204    * Tail of the linked list of actions (for fast append).
205    */
206   struct GNUNET_STATISTICS_GetHandle *action_tail;
207
208   /**
209    * Action we are currently busy with (action request has been
210    * transmitted, we're now receiving the response from the
211    * service).
212    */
213   struct GNUNET_STATISTICS_GetHandle *current;
214
215   /**
216    * Array of watch entries.
217    */
218   struct GNUNET_STATISTICS_WatchEntry **watches;
219
220   /**
221    * Task doing exponential back-off trying to reconnect.
222    */
223   struct GNUNET_SCHEDULER_Task *backoff_task;
224
225   /**
226    * Task for running #do_destroy().
227    */
228   struct GNUNET_SCHEDULER_Task *destroy_task;
229
230   /**
231    * Time for next connect retry.
232    */
233   struct GNUNET_TIME_Relative backoff;
234
235   /**
236    * Maximum heap size observed so far (if available).
237    */
238   uint64_t peak_heap_size;
239
240   /**
241    * Maximum resident set side observed so far (if available).
242    */
243   uint64_t peak_rss;
244
245   /**
246    * Size of the @e watches array.
247    */
248   unsigned int watches_size;
249
250   /**
251    * Should this handle auto-destruct once all actions have
252    * been processed?
253    */
254   int do_destroy;
255
256   /**
257    * Are we currently receiving from the service?
258    */
259   int receiving;
260
261 };
262
263
264 /**
265  * Obtain statistics about this process's memory consumption and
266  * report those as well (if they changed).
267  */
268 static void
269 update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
270 {
271 #if ENABLE_HEAP_STATISTICS
272   uint64_t current_heap_size = 0;
273   uint64_t current_rss = 0;
274
275   if (GNUNET_NO != h->do_destroy)
276     return;
277 #if HAVE_MALLINFO
278   {
279     struct mallinfo mi;
280
281     mi = mallinfo();
282     current_heap_size = mi.uordblks + mi.fordblks;
283   }
284 #endif
285 #if HAVE_GETRUSAGE
286   {
287     struct rusage ru;
288
289     if (0 == getrusage (RUSAGE_SELF, &ru))
290     {
291       current_rss = 1024LL * ru.ru_maxrss;
292     }
293   }
294 #endif
295   if (current_heap_size > h->peak_heap_size)
296   {
297     h->peak_heap_size = current_heap_size;
298     GNUNET_STATISTICS_set (h, "# peak heap size", current_heap_size, GNUNET_NO);
299   }
300   if (current_rss > h->peak_rss)
301   {
302     h->peak_rss = current_rss;
303     GNUNET_STATISTICS_set (h, "# peak resident set size", current_rss, GNUNET_NO);
304   }
305 #endif
306 }
307
308
309 /**
310  * Reconnect at a later time, respecting back-off.
311  *
312  * @param h statistics handle
313  */
314 static void
315 reconnect_later (struct GNUNET_STATISTICS_Handle *h);
316
317
318 /**
319  * Schedule the next action to be performed.
320  *
321  * @param cls statistics handle to reconnect
322  */
323 static void
324 schedule_action (void *cls);
325
326
327 /**
328  * Transmit request to service that we want to watch
329  * the development of a particular value.
330  *
331  * @param h statistics handle
332  * @param watch watch entry of the value to watch
333  */
334 static void
335 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
336                         struct GNUNET_STATISTICS_WatchEntry *watch)
337 {
338   struct GNUNET_STATISTICS_GetHandle *ai;
339   size_t slen;
340   size_t nlen;
341   size_t nsize;
342
343   slen = strlen (watch->subsystem) + 1;
344   nlen = strlen (watch->name) + 1;
345   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
346   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
347   {
348     GNUNET_break (0);
349     return;
350   }
351   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
352   ai->sh = h;
353   ai->subsystem = GNUNET_strdup (watch->subsystem);
354   ai->name = GNUNET_strdup (watch->name);
355   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
356   ai->msize = nsize;
357   ai->type = ACTION_WATCH;
358   ai->proc = watch->proc;
359   ai->cls = watch->proc_cls;
360   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
361                                     h->action_tail,
362                                     ai);
363   schedule_action (h);
364 }
365
366
367 /**
368  * Free memory associated with the given action item.
369  *
370  * @param gh action item to free
371  */
372 static void
373 free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
374 {
375   GNUNET_free_non_null (gh->subsystem);
376   GNUNET_free_non_null (gh->name);
377   GNUNET_free (gh);
378 }
379
380
381 /**
382  * Disconnect from the statistics service.
383  *
384  * @param h statistics handle to disconnect from
385  */
386 static void
387 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
388 {
389   struct GNUNET_STATISTICS_GetHandle *c;
390
391   h->receiving = GNUNET_NO;
392   if (NULL != (c = h->current))
393   {
394     h->current = NULL;
395     if ( (NULL != c->cont) &&
396          (GNUNET_YES != c->aborted) )
397     {
398       c->cont (c->cls,
399                GNUNET_SYSERR);
400       c->cont = NULL;
401     }
402     free_action_item (c);
403   }
404   if (NULL != h->mq)
405   {
406     GNUNET_MQ_destroy (h->mq);
407     h->mq = NULL;
408   }
409 }
410
411
412 /**
413  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
414  *
415  * @param cls statistics handle
416  * @param smsg message received from the service, never NULL
417  * @return #GNUNET_OK if the message was well-formed
418  */
419 static int
420 check_statistics_value (void *cls,
421                         const struct GNUNET_STATISTICS_ReplyMessage *smsg)
422 {
423   const char *service;
424   const char *name;
425   uint16_t size;
426
427   size = ntohs (smsg->header.size);
428   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
429   if (size !=
430       GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
431                                       size,
432                                       2,
433                                       &service,
434                                       &name))
435   {
436     GNUNET_break (0);
437     return GNUNET_SYSERR;
438   }
439   return GNUNET_OK;
440 }
441
442
443 /**
444  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
445  *
446  * @param cls statistics handle
447  * @param msg message received from the service, never NULL
448  * @return #GNUNET_OK if the message was well-formed
449  */
450 static void
451 handle_statistics_value (void *cls,
452                          const struct GNUNET_STATISTICS_ReplyMessage *smsg)
453 {
454   struct GNUNET_STATISTICS_Handle *h = cls;
455   const char *service;
456   const char *name;
457   uint16_t size;
458
459   if (h->current->aborted)
460     return;           /* iteration aborted, don't bother */
461
462   size = ntohs (smsg->header.size);
463   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
464   GNUNET_assert (size ==
465                  GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
466                                                  size,
467                                                  2,
468                                                  &service,
469                                                  &name));
470   LOG (GNUNET_ERROR_TYPE_DEBUG,
471        "Received valid statistic on `%s:%s': %llu\n",
472        service, name,
473        GNUNET_ntohll (smsg->value));
474   if (GNUNET_OK !=
475       h->current->proc (h->current->cls,
476                         service,
477                         name,
478                         GNUNET_ntohll (smsg->value),
479                         0 !=
480                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
481   {
482     LOG (GNUNET_ERROR_TYPE_DEBUG,
483          "Processing of remaining statistics aborted by client.\n");
484     h->current->aborted = GNUNET_YES;
485   }
486 }
487
488
489 /**
490  * We have received a watch value from the service.  Process it.
491  *
492  * @param cls statistics handle
493  * @param msg the watch value message
494  */
495 static void
496 handle_statistics_watch_value (void *cls,
497                                const struct GNUNET_STATISTICS_WatchValueMessage *wvm)
498 {
499   struct GNUNET_STATISTICS_Handle *h = cls;
500   struct GNUNET_STATISTICS_WatchEntry *w;
501   uint32_t wid;
502
503   GNUNET_break (0 == ntohl (wvm->reserved));
504   wid = ntohl (wvm->wid);
505   if (wid >= h->watches_size)
506   {
507     do_disconnect (h);
508     reconnect_later (h);
509     return;
510   }
511   w = h->watches[wid];
512   if (NULL == w)
513     return;
514   (void) w->proc (w->proc_cls,
515                   w->subsystem,
516                   w->name,
517                   GNUNET_ntohll (wvm->value),
518                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
519 }
520
521
522 /**
523  * Generic error handler, called with the appropriate error code and
524  * the same closure specified at the creation of the message queue.
525  * Not every message queue implementation supports an error handler.
526  *
527  * @param cls closure with the `struct GNUNET_STATISTICS_Handle *`
528  * @param error error code
529  */
530 static void
531 mq_error_handler (void *cls,
532                   enum GNUNET_MQ_Error error)
533 {
534   struct GNUNET_STATISTICS_Handle *h = cls;
535
536   if (GNUNET_NO != h->do_destroy)
537   {
538     h->do_destroy = GNUNET_NO;
539     if (NULL != h->destroy_task)
540     {
541       GNUNET_SCHEDULER_cancel (h->destroy_task);
542       h->destroy_task = NULL;
543     }
544     GNUNET_STATISTICS_destroy (h,
545                                GNUNET_NO);
546     return;
547   }
548   do_disconnect (h);
549   reconnect_later (h);
550 }
551
552
553 /**
554  * Task used to destroy the statistics handle.
555  *
556  * @param cls the `struct GNUNET_STATISTICS_Handle`
557  */
558 static void
559 do_destroy (void *cls)
560 {
561   struct GNUNET_STATISTICS_Handle *h = cls;
562
563   h->destroy_task = NULL;
564   h->do_destroy = GNUNET_NO;
565   LOG (GNUNET_ERROR_TYPE_DEBUG,
566        "Running final destruction\n");
567   GNUNET_STATISTICS_destroy (h,
568                              GNUNET_NO);
569 }
570
571
572 /**
573  * Handle a #GNUNET_MESSAGE_TYPE_TEST (sic) message. We receive this
574  * message at the end of the shutdown when the service confirms that
575  * all data has been written to disk.
576  *
577  * @param cls our `struct GNUNET_STATISTICS_Handle *`
578  * @param msg the message
579  */
580 static void
581 handle_test (void *cls,
582              const struct GNUNET_MessageHeader *msg)
583 {
584   struct GNUNET_STATISTICS_Handle *h = cls;
585
586   if (GNUNET_SYSERR != h->do_destroy)
587   {
588     /* not in shutdown, why do we get 'TEST'? */
589     GNUNET_break (0);
590     do_disconnect (h);
591     reconnect_later (h);
592     return;
593   }
594   LOG (GNUNET_ERROR_TYPE_DEBUG,
595        "Received TEST message from statistics, can complete disconnect\n");
596   if (NULL != h->destroy_task)
597     GNUNET_SCHEDULER_cancel (h->destroy_task);
598   h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
599                                               h);
600 }
601
602
603 /**
604  * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive
605  * this message in response to a query to indicate that there are no
606  * further matching results.
607  *
608  * @param cls our `struct GNUNET_STATISTICS_Handle *`
609  * @param msg the message
610  */
611 static void
612 handle_statistics_end (void *cls,
613                        const struct GNUNET_MessageHeader *msg)
614 {
615   struct GNUNET_STATISTICS_Handle *h = cls;
616   struct GNUNET_STATISTICS_GetHandle *c;
617
618   LOG (GNUNET_ERROR_TYPE_DEBUG,
619        "Received end of statistics marker\n");
620   if (NULL == (c = h->current))
621   {
622     GNUNET_break (0);
623     do_disconnect (h);
624     reconnect_later (h);
625     return;
626   }
627   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
628   h->current = NULL;
629   schedule_action (h);
630   if (NULL != c->cont)
631   {
632     c->cont (c->cls,
633              GNUNET_OK);
634     c->cont = NULL;
635   }
636   free_action_item (c);
637 }
638
639
640 /**
641  * Try to (re)connect to the statistics service.
642  *
643  * @param h statistics handle to reconnect
644  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
645  */
646 static int
647 try_connect (struct GNUNET_STATISTICS_Handle *h)
648 {
649   struct GNUNET_MQ_MessageHandler handlers[] = {
650     GNUNET_MQ_hd_fixed_size (test,
651                              GNUNET_MESSAGE_TYPE_TEST,
652                              struct GNUNET_MessageHeader,
653                              h),
654     GNUNET_MQ_hd_fixed_size (statistics_end,
655                              GNUNET_MESSAGE_TYPE_STATISTICS_END,
656                              struct GNUNET_MessageHeader,
657                              h),
658     GNUNET_MQ_hd_var_size (statistics_value,
659                            GNUNET_MESSAGE_TYPE_STATISTICS_VALUE,
660                            struct GNUNET_STATISTICS_ReplyMessage,
661                            h),
662     GNUNET_MQ_hd_fixed_size (statistics_watch_value,
663                              GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE,
664                              struct GNUNET_STATISTICS_WatchValueMessage,
665                              h),
666     GNUNET_MQ_handler_end ()
667   };
668   struct GNUNET_STATISTICS_GetHandle *gh;
669   struct GNUNET_STATISTICS_GetHandle *gn;
670
671   if (NULL != h->backoff_task)
672     return GNUNET_NO;
673   if (NULL != h->mq)
674     return GNUNET_YES;
675   h->mq = GNUNET_CLIENT_connecT (h->cfg,
676                                  "statistics",
677                                  handlers,
678                                  &mq_error_handler,
679                                  h);
680   if (NULL == h->mq)
681   {
682     LOG (GNUNET_ERROR_TYPE_DEBUG,
683          "Failed to connect to statistics service!\n");
684     return GNUNET_NO;
685   }
686   gn = h->action_head;
687   while (NULL != (gh = gn))
688   {
689     gn = gh->next;
690     if (gh->type == ACTION_WATCH)
691     {
692       GNUNET_CONTAINER_DLL_remove (h->action_head,
693                                    h->action_tail,
694                                    gh);
695       free_action_item (gh);
696     }
697   }
698   for (unsigned int i = 0; i < h->watches_size; i++)
699     if (NULL != h->watches[i])
700       schedule_watch_request (h,
701                               h->watches[i]);
702   return GNUNET_YES;
703 }
704
705
706 /**
707  * We've waited long enough, reconnect now.
708  *
709  * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect
710  */
711 static void
712 reconnect_task (void *cls)
713 {
714   struct GNUNET_STATISTICS_Handle *h = cls;
715
716   h->backoff_task = NULL;
717   schedule_action (h);
718 }
719
720
721 /**
722  * Reconnect at a later time, respecting back-off.
723  *
724  * @param h statistics handle
725  */
726 static void
727 reconnect_later (struct GNUNET_STATISTICS_Handle *h)
728 {
729   int loss;
730   struct GNUNET_STATISTICS_GetHandle *gh;
731
732   GNUNET_assert (NULL == h->backoff_task);
733   if (GNUNET_YES == h->do_destroy)
734   {
735     /* So we are shutting down and the service is not reachable.
736      * Chances are that it's down for good and we are not going to connect to
737      * it anymore.
738      * Give up and don't sync the rest of the data.
739      */
740     loss = GNUNET_NO;
741     for (gh = h->action_head; NULL != gh; gh = gh->next)
742       if ( (gh->make_persistent) && (ACTION_SET == gh->type) )
743         loss = GNUNET_YES;
744     if (GNUNET_YES == loss)
745       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
746                   _("Could not save some persistent statistics\n"));
747     if (NULL != h->destroy_task)
748       GNUNET_SCHEDULER_cancel (h->destroy_task);
749     h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
750                                                 h);
751     return;
752   }
753   h->backoff_task
754     = GNUNET_SCHEDULER_add_delayed (h->backoff,
755                                     &reconnect_task,
756                                     h);
757   h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
758 }
759
760
761
762 /**
763  * Transmit a GET request (and if successful, start to receive
764  * the response).
765  *
766  * @param handle statistics handle
767  */
768 static void
769 transmit_get (struct GNUNET_STATISTICS_Handle *handle)
770 {
771   struct GNUNET_STATISTICS_GetHandle *c;
772   struct GNUNET_MessageHeader *hdr;
773   struct GNUNET_MQ_Envelope *env;
774   size_t slen1;
775   size_t slen2;
776
777   GNUNET_assert (NULL != (c = handle->current));
778   slen1 = strlen (c->subsystem) + 1;
779   slen2 = strlen (c->name) + 1;
780   env = GNUNET_MQ_msg_extra (hdr,
781                              slen1 + slen2,
782                              GNUNET_MESSAGE_TYPE_STATISTICS_GET);
783   GNUNET_assert (slen1 + slen2 ==
784                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
785                                              slen1 + slen2,
786                                              2,
787                                              c->subsystem,
788                                              c->name));
789   GNUNET_MQ_notify_sent (env,
790                          &schedule_action,
791                          handle);
792   GNUNET_MQ_send (handle->mq,
793                   env);
794 }
795
796
797 /**
798  * Transmit a WATCH request (and if successful, start to receive
799  * the response).
800  *
801  * @param handle statistics handle
802  */
803 static void
804 transmit_watch (struct GNUNET_STATISTICS_Handle *handle)
805 {
806   struct GNUNET_MessageHeader *hdr;
807   struct GNUNET_MQ_Envelope *env;
808   size_t slen1;
809   size_t slen2;
810
811   LOG (GNUNET_ERROR_TYPE_DEBUG,
812        "Transmitting watch request for `%s'\n",
813        handle->current->name);
814   slen1 = strlen (handle->current->subsystem) + 1;
815   slen2 = strlen (handle->current->name) + 1;
816   env = GNUNET_MQ_msg_extra (hdr,
817                              slen1 + slen2,
818                              GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
819   GNUNET_assert (slen1 + slen2 ==
820                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
821                                              slen1 + slen2,
822                                              2,
823                                              handle->current->subsystem,
824                                              handle->current->name));
825   GNUNET_MQ_notify_sent (env,
826                          &schedule_action,
827                          handle);
828   GNUNET_MQ_send (handle->mq,
829                   env);
830   GNUNET_assert (NULL == handle->current->cont);
831   free_action_item (handle->current);
832   handle->current = NULL;
833   schedule_action (handle);
834 }
835
836
837 /**
838  * Transmit a SET/UPDATE request.
839  *
840  * @param handle statistics handle
841  */
842 static void
843 transmit_set (struct GNUNET_STATISTICS_Handle *handle)
844 {
845   struct GNUNET_STATISTICS_SetMessage *r;
846   struct GNUNET_MQ_Envelope *env;
847   size_t slen;
848   size_t nlen;
849
850   slen = strlen (handle->current->subsystem) + 1;
851   nlen = strlen (handle->current->name) + 1;
852   env = GNUNET_MQ_msg_extra (r,
853                              slen + nlen,
854                              GNUNET_MESSAGE_TYPE_STATISTICS_SET);
855   r->flags = 0;
856   r->value = GNUNET_htonll (handle->current->value);
857   if (handle->current->make_persistent)
858     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
859   if (handle->current->type == ACTION_UPDATE)
860     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
861   GNUNET_assert (slen + nlen ==
862                  GNUNET_STRINGS_buffer_fill ((char *) &r[1],
863                                              slen + nlen,
864                                              2,
865                                              handle->current->subsystem,
866                                              handle->current->name));
867   GNUNET_assert (NULL == handle->current->cont);
868   free_action_item (handle->current);
869   handle->current = NULL;
870   update_memory_statistics (handle);
871   GNUNET_MQ_notify_sent (env,
872                          &schedule_action,
873                          handle);
874   GNUNET_MQ_send (handle->mq,
875                   env);
876 }
877
878
879 /**
880  * Get handle for the statistics service.
881  *
882  * @param subsystem name of subsystem using the service
883  * @param cfg services configuration in use
884  * @return handle to use
885  */
886 struct GNUNET_STATISTICS_Handle *
887 GNUNET_STATISTICS_create (const char *subsystem,
888                           const struct GNUNET_CONFIGURATION_Handle *cfg)
889 {
890   struct GNUNET_STATISTICS_Handle *h;
891
892   if (GNUNET_YES ==
893       GNUNET_CONFIGURATION_get_value_yesno (cfg,
894                                             "statistics",
895                                             "DISABLE"))
896     return NULL;
897   h = GNUNET_new (struct GNUNET_STATISTICS_Handle);
898   h->cfg = cfg;
899   h->subsystem = GNUNET_strdup (subsystem);
900   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
901   return h;
902 }
903
904
905 /**
906  * Destroy a handle (free all state associated with
907  * it).
908  *
909  * @param h statistics handle to destroy
910  * @param sync_first set to #GNUNET_YES if pending SET requests should
911  *        be completed
912  */
913 void
914 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
915                            int sync_first)
916 {
917   struct GNUNET_STATISTICS_GetHandle *pos;
918   struct GNUNET_STATISTICS_GetHandle *next;
919
920   if (NULL == h)
921     return;
922   GNUNET_assert (GNUNET_NO == h->do_destroy); /* Don't call twice. */
923   if ( (sync_first) &&
924        (NULL != h->mq) &&
925        (0 != GNUNET_MQ_get_length (h->mq)) &&
926        (GNUNET_YES == try_connect (h)) )
927   {
928     if ( (NULL != h->current) &&
929          (ACTION_GET == h->current->type) )
930       h->current->aborted = GNUNET_YES;
931     next = h->action_head;
932     while (NULL != (pos = next))
933     {
934       next = pos->next;
935       if ( (ACTION_GET == pos->type) ||
936            (ACTION_WATCH == pos->type) ||
937            (GNUNET_NO == pos->make_persistent) )
938       {
939         GNUNET_CONTAINER_DLL_remove (h->action_head,
940                                      h->action_tail,
941                                      pos);
942         free_action_item (pos);
943       }
944     }
945     h->do_destroy = GNUNET_YES;
946     schedule_action (h);
947     GNUNET_assert (NULL == h->destroy_task);
948     h->destroy_task
949       = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (h->backoff,
950                                                                      5),
951                                       &do_destroy,
952                                       h);
953     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
954                 "Deferring destruction\n");
955     return; /* do not finish destruction just yet */
956   }
957   /* do clean up all */
958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
959               "Cleaning all up\n");
960   while (NULL != (pos = h->action_head))
961   {
962     GNUNET_CONTAINER_DLL_remove (h->action_head,
963                                  h->action_tail,
964                                  pos);
965     free_action_item (pos);
966   }
967   do_disconnect (h);
968   if (NULL != h->backoff_task)
969   {
970     GNUNET_SCHEDULER_cancel (h->backoff_task);
971     h->backoff_task = NULL;
972   }
973   if (NULL != h->destroy_task)
974   {
975     GNUNET_break (0);
976     GNUNET_SCHEDULER_cancel (h->destroy_task);
977     h->destroy_task = NULL;
978   }
979   for (unsigned int i = 0; i < h->watches_size; i++)
980   {
981     if (NULL == h->watches[i])
982       continue;
983     GNUNET_free (h->watches[i]->subsystem);
984     GNUNET_free (h->watches[i]->name);
985     GNUNET_free (h->watches[i]);
986   }
987   GNUNET_array_grow (h->watches,
988                      h->watches_size,
989                      0);
990   GNUNET_free (h->subsystem);
991   GNUNET_free (h);
992 }
993
994
995 /**
996  * Schedule the next action to be performed.
997  *
998  * @param cls statistics handle
999  */
1000 static void
1001 schedule_action (void *cls)
1002 {
1003   struct GNUNET_STATISTICS_Handle *h = cls;
1004
1005   if (NULL != h->backoff_task)
1006     return;                     /* action already pending */
1007   if (GNUNET_YES != try_connect (h))
1008   {
1009     reconnect_later (h);
1010     return;
1011   }
1012   if (0 < GNUNET_MQ_get_length (h->mq) )
1013     return; /* Wait for queue to be reduced more */
1014   /* schedule next action */
1015   while (NULL == h->current)
1016   {
1017     h->current = h->action_head;
1018     if (NULL == h->current)
1019     {
1020       struct GNUNET_MessageHeader *hdr;
1021       struct GNUNET_MQ_Envelope *env;
1022
1023       if (GNUNET_YES != h->do_destroy)
1024         return; /* nothing to do */
1025       /* let service know that we're done */
1026       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027                   "Notifying service that we are done\n");
1028       h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
1029       env = GNUNET_MQ_msg (hdr,
1030                            GNUNET_MESSAGE_TYPE_TEST);
1031       GNUNET_MQ_notify_sent (env,
1032                              &schedule_action,
1033                              h);
1034       GNUNET_MQ_send (h->mq,
1035                       env);
1036       return;
1037     }
1038     GNUNET_CONTAINER_DLL_remove (h->action_head,
1039                                  h->action_tail,
1040                                  h->current);
1041     switch (h->current->type)
1042     {
1043     case ACTION_GET:
1044       transmit_get (h);
1045       break;
1046     case ACTION_SET:
1047     case ACTION_UPDATE:
1048       transmit_set (h);
1049       break;
1050     case ACTION_WATCH:
1051       transmit_watch (h);
1052       break;
1053     default:
1054       GNUNET_assert (0);
1055       break;
1056     }
1057   }
1058 }
1059
1060
1061 /**
1062  * Get statistic from the peer.
1063  *
1064  * @param handle identification of the statistics service
1065  * @param subsystem limit to the specified subsystem, NULL for our subsystem
1066  * @param name name of the statistic value, NULL for all values
1067  * @param cont continuation to call when done (can be NULL)
1068  *        This callback CANNOT destroy the statistics handle in the same call.
1069  * @param proc function to call on each value
1070  * @param cls closure for @a cont and @a proc
1071  * @return NULL on error
1072  */
1073 struct GNUNET_STATISTICS_GetHandle *
1074 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1075                        const char *subsystem,
1076                        const char *name,
1077                        GNUNET_STATISTICS_Callback cont,
1078                        GNUNET_STATISTICS_Iterator proc,
1079                        void *cls)
1080 {
1081   size_t slen1;
1082   size_t slen2;
1083   struct GNUNET_STATISTICS_GetHandle *ai;
1084
1085   if (NULL == handle)
1086     return NULL;
1087   GNUNET_assert (NULL != proc);
1088   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1089   if (NULL == subsystem)
1090     subsystem = "";
1091   if (NULL == name)
1092     name = "";
1093   slen1 = strlen (subsystem) + 1;
1094   slen2 = strlen (name) + 1;
1095   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
1096                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1097   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1098   ai->sh = handle;
1099   ai->subsystem = GNUNET_strdup (subsystem);
1100   ai->name = GNUNET_strdup (name);
1101   ai->cont = cont;
1102   ai->proc = proc;
1103   ai->cls = cls;
1104   ai->type = ACTION_GET;
1105   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
1106   GNUNET_CONTAINER_DLL_insert_tail (handle->action_head,
1107                                     handle->action_tail,
1108                                     ai);
1109   schedule_action (handle);
1110   return ai;
1111 }
1112
1113
1114 /**
1115  * Cancel a 'get' request.  Must be called before the 'cont'
1116  * function is called.
1117  *
1118  * @param gh handle of the request to cancel
1119  */
1120 void
1121 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1122 {
1123   if (NULL == gh)
1124     return;
1125   gh->cont = NULL;
1126   if (gh->sh->current == gh)
1127   {
1128     gh->aborted = GNUNET_YES;
1129     return;
1130   }
1131   GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
1132                                gh->sh->action_tail,
1133                                gh);
1134   GNUNET_free (gh->name);
1135   GNUNET_free (gh->subsystem);
1136   GNUNET_free (gh);
1137 }
1138
1139
1140 /**
1141  * Watch statistics from the peer (be notified whenever they change).
1142  *
1143  * @param handle identification of the statistics service
1144  * @param subsystem limit to the specified subsystem, never NULL
1145  * @param name name of the statistic value, never NULL
1146  * @param proc function to call on each value
1147  * @param proc_cls closure for @a proc
1148  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
1149  */
1150 int
1151 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1152                          const char *subsystem,
1153                          const char *name,
1154                          GNUNET_STATISTICS_Iterator proc,
1155                          void *proc_cls)
1156 {
1157   struct GNUNET_STATISTICS_WatchEntry *w;
1158
1159   if (NULL == handle)
1160     return GNUNET_SYSERR;
1161   w = GNUNET_new (struct GNUNET_STATISTICS_WatchEntry);
1162   w->subsystem = GNUNET_strdup (subsystem);
1163   w->name = GNUNET_strdup (name);
1164   w->proc = proc;
1165   w->proc_cls = proc_cls;
1166   GNUNET_array_append (handle->watches,
1167                        handle->watches_size,
1168                        w);
1169   schedule_watch_request (handle,
1170                           w);
1171   return GNUNET_OK;
1172 }
1173
1174
1175 /**
1176  * Stop watching statistics from the peer.
1177  *
1178  * @param handle identification of the statistics service
1179  * @param subsystem limit to the specified subsystem, never NULL
1180  * @param name name of the statistic value, never NULL
1181  * @param proc function to call on each value
1182  * @param proc_cls closure for @a proc
1183  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error (no such watch)
1184  */
1185 int
1186 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
1187                                 const char *subsystem,
1188                                 const char *name,
1189                                 GNUNET_STATISTICS_Iterator proc,
1190                                 void *proc_cls)
1191 {
1192   struct GNUNET_STATISTICS_WatchEntry *w;
1193
1194   if (NULL == handle)
1195     return GNUNET_SYSERR;
1196   for (unsigned int i=0;i<handle->watches_size;i++)
1197   {
1198     w = handle->watches[i];
1199     if (NULL == w)
1200       continue;
1201     if ( (w->proc == proc) &&
1202          (w->proc_cls == proc_cls) &&
1203          (0 == strcmp (w->name, name)) &&
1204          (0 == strcmp (w->subsystem, subsystem)) )
1205     {
1206       GNUNET_free (w->name);
1207       GNUNET_free (w->subsystem);
1208       GNUNET_free (w);
1209       handle->watches[i] = NULL;
1210       return GNUNET_OK;
1211     }
1212   }
1213   return GNUNET_SYSERR;
1214 }
1215
1216
1217 /**
1218  * Queue a request to change a statistic.
1219  *
1220  * @param h statistics handle
1221  * @param name name of the value
1222  * @param make_persistent  should the value be kept across restarts?
1223  * @param value new value or change
1224  * @param type type of the action (#ACTION_SET or #ACTION_UPDATE)
1225  */
1226 static void
1227 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
1228                    const char *name,
1229                    int make_persistent,
1230                    uint64_t value,
1231                    enum ActionType type)
1232 {
1233   struct GNUNET_STATISTICS_GetHandle *ai;
1234   size_t slen;
1235   size_t nlen;
1236   size_t nsize;
1237   int64_t delta;
1238
1239   slen = strlen (h->subsystem) + 1;
1240   nlen = strlen (name) + 1;
1241   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1242   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1243   {
1244     GNUNET_break (0);
1245     return;
1246   }
1247   for (ai = h->action_head; NULL != ai; ai = ai->next)
1248   {
1249     if (! ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
1250             (0 == strcmp (ai->name, name)) &&
1251             ( (ACTION_UPDATE == ai->type) ||
1252               (ACTION_SET == ai->type) ) ) )
1253       continue;
1254     if (ACTION_SET == ai->type)
1255     {
1256       if (ACTION_UPDATE == type)
1257       {
1258         delta = (int64_t) value;
1259         if (delta > 0)
1260         {
1261           /* update old set by new delta */
1262           ai->value += delta;
1263         }
1264         else
1265         {
1266           /* update old set by new delta, but never go negative */
1267           if (ai->value < -delta)
1268             ai->value = 0;
1269           else
1270             ai->value += delta;
1271         }
1272       }
1273       else
1274       {
1275         /* new set overrides old set */
1276         ai->value = value;
1277       }
1278     }
1279     else
1280     {
1281       if (ACTION_UPDATE == type)
1282       {
1283         /* make delta cummulative */
1284         delta = (int64_t) value;
1285         ai->value += delta;
1286       }
1287       else
1288       {
1289         /* drop old 'update', use new 'set' instead */
1290         ai->value = value;
1291         ai->type = type;
1292       }
1293     }
1294     ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1295     ai->make_persistent = make_persistent;
1296     return;
1297   }
1298   /* no existing entry matches, create a fresh one */
1299   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1300   ai->sh = h;
1301   ai->subsystem = GNUNET_strdup (h->subsystem);
1302   ai->name = GNUNET_strdup (name);
1303   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1304   ai->make_persistent = make_persistent;
1305   ai->msize = nsize;
1306   ai->value = value;
1307   ai->type = type;
1308   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
1309                                     h->action_tail,
1310                                     ai);
1311   schedule_action (h);
1312 }
1313
1314
1315 /**
1316  * Set statistic value for the peer.  Will always use our
1317  * subsystem (the argument used when "handle" was created).
1318  *
1319  * @param handle identification of the statistics service
1320  * @param name name of the statistic value
1321  * @param value new value to set
1322  * @param make_persistent should the value be kept across restarts?
1323  */
1324 void
1325 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1326                        const char *name,
1327                        uint64_t value,
1328                        int make_persistent)
1329 {
1330   if (NULL == handle)
1331     return;
1332   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1333   add_setter_action (handle,
1334                      name,
1335                      make_persistent,
1336                      value,
1337                      ACTION_SET);
1338 }
1339
1340
1341 /**
1342  * Set statistic value for the peer.  Will always use our
1343  * subsystem (the argument used when "handle" was created).
1344  *
1345  * @param handle identification of the statistics service
1346  * @param name name of the statistic value
1347  * @param delta change in value (added to existing value)
1348  * @param make_persistent should the value be kept across restarts?
1349  */
1350 void
1351 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1352                           const char *name,
1353                           int64_t delta,
1354                           int make_persistent)
1355 {
1356   if (NULL == handle)
1357     return;
1358   if (0 == delta)
1359     return;
1360   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1361   add_setter_action (handle,
1362                      name,
1363                      make_persistent,
1364                      (uint64_t) delta,
1365                      ACTION_UPDATE);
1366 }
1367
1368
1369 /* end of statistics_api.c */