37aa990176aab5d28381c1ac549c2f813d34e877
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "statistics.h"
32
33 /**
34  * How long do we wait until a statistics request for setting
35  * a value times out?  (The update will be lost if the
36  * service does not react within this timeframe).
37  */
38 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
39
40 #define LOG(kind,...) GNUNET_log_from (kind, "statistics-api",__VA_ARGS__)
41
42 /**
43  * Types of actions.
44  */
45 enum ActionType
46 {
47   /**
48    * Get a value.
49    */
50   ACTION_GET,
51
52   /**
53    * Set a value.
54    */
55   ACTION_SET,
56
57   /**
58    * Update a value.
59    */
60   ACTION_UPDATE,
61
62   /**
63    * Watch a value.
64    */
65   ACTION_WATCH
66 };
67
68
69 /**
70  * Entry kept for each value we are watching.
71  */
72 struct GNUNET_STATISTICS_WatchEntry
73 {
74
75   /**
76    * What subsystem is this action about? (never NULL)
77    */
78   char *subsystem;
79
80   /**
81    * What value is this action about? (never NULL)
82    */
83   char *name;
84
85   /**
86    * Function to call
87    */
88   GNUNET_STATISTICS_Iterator proc;
89
90   /**
91    * Closure for @e proc
92    */
93   void *proc_cls;
94
95 };
96
97
98 /**
99  * Linked list of things we still need to do.
100  */
101 struct GNUNET_STATISTICS_GetHandle
102 {
103
104   /**
105    * This is a doubly linked list.
106    */
107   struct GNUNET_STATISTICS_GetHandle *next;
108
109   /**
110    * This is a doubly linked list.
111    */
112   struct GNUNET_STATISTICS_GetHandle *prev;
113
114   /**
115    * Main statistics handle.
116    */
117   struct GNUNET_STATISTICS_Handle *sh;
118
119   /**
120    * What subsystem is this action about? (can be NULL)
121    */
122   char *subsystem;
123
124   /**
125    * What value is this action about? (can be NULL)
126    */
127   char *name;
128
129   /**
130    * Continuation to call once action is complete.
131    */
132   GNUNET_STATISTICS_Callback cont;
133
134   /**
135    * Function to call (for GET actions only).
136    */
137   GNUNET_STATISTICS_Iterator proc;
138
139   /**
140    * Closure for @e proc and @e cont.
141    */
142   void *cls;
143
144   /**
145    * Timeout for this action.
146    */
147   struct GNUNET_TIME_Absolute timeout;
148
149   /**
150    * Associated value.
151    */
152   uint64_t value;
153
154   /**
155    * Flag for SET/UPDATE actions.
156    */
157   int make_persistent;
158
159   /**
160    * Has the current iteration been aborted; for GET actions.
161    */
162   int aborted;
163
164   /**
165    * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH?
166    */
167   enum ActionType type;
168
169   /**
170    * Size of the message that we will be transmitting.
171    */
172   uint16_t msize;
173
174 };
175
176
177 /**
178  * Handle for the service.
179  */
180 struct GNUNET_STATISTICS_Handle
181 {
182   /**
183    * Name of our subsystem.
184    */
185   char *subsystem;
186
187   /**
188    * Configuration to use.
189    */
190   const struct GNUNET_CONFIGURATION_Handle *cfg;
191
192   /**
193    * Message queue to the service.
194    */
195   struct GNUNET_MQ_Handle *mq;
196
197   /**
198    * Head of the linked list of pending actions (first action
199    * to be performed).
200    */
201   struct GNUNET_STATISTICS_GetHandle *action_head;
202
203   /**
204    * Tail of the linked list of actions (for fast append).
205    */
206   struct GNUNET_STATISTICS_GetHandle *action_tail;
207
208   /**
209    * Action we are currently busy with (action request has been
210    * transmitted, we're now receiving the response from the
211    * service).
212    */
213   struct GNUNET_STATISTICS_GetHandle *current;
214
215   /**
216    * Array of watch entries.
217    */
218   struct GNUNET_STATISTICS_WatchEntry **watches;
219
220   /**
221    * Task doing exponential back-off trying to reconnect.
222    */
223   struct GNUNET_SCHEDULER_Task *backoff_task;
224
225   /**
226    * Time for next connect retry.
227    */
228   struct GNUNET_TIME_Relative backoff;
229
230   /**
231    * Maximum heap size observed so far (if available).
232    */
233   uint64_t peak_heap_size;
234
235   /**
236    * Maximum resident set side observed so far (if available).
237    */
238   uint64_t peak_rss;
239
240   /**
241    * Size of the @e watches array.
242    */
243   unsigned int watches_size;
244
245   /**
246    * Should this handle auto-destruct once all actions have
247    * been processed?
248    */
249   int do_destroy;
250
251   /**
252    * Are we currently receiving from the service?
253    */
254   int receiving;
255
256 };
257
258
259 /**
260  * Obtain statistics about this process's memory consumption and
261  * report those as well (if they changed).
262  */
263 static void
264 update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
265 {
266 #if ENABLE_HEAP_STATISTICS
267   uint64_t current_heap_size = 0;
268   uint64_t current_rss = 0;
269
270   if (GNUNET_NO != h->do_destroy)
271     return;
272 #if HAVE_MALLINFO
273   {
274     struct mallinfo mi;
275
276     mi = mallinfo();
277     current_heap_size = mi.uordblks + mi.fordblks;
278   }
279 #endif
280 #if HAVE_GETRUSAGE
281   {
282     struct rusage ru;
283
284     if (0 == getrusage (RUSAGE_SELF, &ru))
285     {
286       current_rss = 1024LL * ru.ru_maxrss;
287     }
288   }
289 #endif
290   if (current_heap_size > h->peak_heap_size)
291   {
292     h->peak_heap_size = current_heap_size;
293     GNUNET_STATISTICS_set (h, "# peak heap size", current_heap_size, GNUNET_NO);
294   }
295   if (current_rss > h->peak_rss)
296   {
297     h->peak_rss = current_rss;
298     GNUNET_STATISTICS_set (h, "# peak resident set size", current_rss, GNUNET_NO);
299   }
300 #endif
301 }
302
303
304 /**
305  * Schedule the next action to be performed.
306  *
307  * @param h statistics handle to reconnect
308  */
309 static void
310 schedule_action (struct GNUNET_STATISTICS_Handle *h);
311
312
313 /**
314  * Reconnect at a later time, respecting back-off.
315  *
316  * @param h statistics handle
317  */
318 static void
319 reconnect_later (struct GNUNET_STATISTICS_Handle *h);
320
321
322 /**
323  * Transmit request to service that we want to watch
324  * the development of a particular value.
325  *
326  * @param h statistics handle
327  * @param watch watch entry of the value to watch
328  */
329 static void
330 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
331                         struct GNUNET_STATISTICS_WatchEntry *watch)
332 {
333   struct GNUNET_STATISTICS_GetHandle *ai;
334   size_t slen;
335   size_t nlen;
336   size_t nsize;
337
338   slen = strlen (watch->subsystem) + 1;
339   nlen = strlen (watch->name) + 1;
340   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
341   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
342   {
343     GNUNET_break (0);
344     return;
345   }
346   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
347   ai->sh = h;
348   ai->subsystem = GNUNET_strdup (watch->subsystem);
349   ai->name = GNUNET_strdup (watch->name);
350   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
351   ai->msize = nsize;
352   ai->type = ACTION_WATCH;
353   ai->proc = watch->proc;
354   ai->cls = watch->proc_cls;
355   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
356                                     h->action_tail,
357                                     ai);
358   schedule_action (h);
359 }
360
361
362 /**
363  * Free memory associated with the given action item.
364  *
365  * @param gh action item to free
366  */
367 static void
368 free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
369 {
370   GNUNET_free_non_null (gh->subsystem);
371   GNUNET_free_non_null (gh->name);
372   GNUNET_free (gh);
373 }
374
375
376 /**
377  * Disconnect from the statistics service.
378  *
379  * @param h statistics handle to disconnect from
380  */
381 static void
382 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
383 {
384   struct GNUNET_STATISTICS_GetHandle *c;
385
386   h->receiving = GNUNET_NO;
387   if (NULL != (c = h->current))
388   {
389     h->current = NULL;
390     if ( (NULL != c->cont) &&
391          (GNUNET_YES != c->aborted) )
392     {
393       c->cont (c->cls,
394                GNUNET_SYSERR);
395       c->cont = NULL;
396     }
397     free_action_item (c);
398   }
399   if (NULL != h->mq)
400   {
401     GNUNET_MQ_destroy (h->mq);
402     h->mq = NULL;
403   }
404 }
405
406
407 /**
408  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
409  *
410  * @param cls statistics handle
411  * @param smsg message received from the service, never NULL
412  * @return #GNUNET_OK if the message was well-formed
413  */
414 static int
415 check_statistics_value (void *cls,
416                         const struct GNUNET_STATISTICS_ReplyMessage *smsg)
417 {
418   const char *service;
419   const char *name;
420   uint16_t size;
421
422   size = ntohs (smsg->header.size);
423   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
424   if (size !=
425       GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
426                                       size,
427                                       2,
428                                       &service,
429                                       &name))
430   {
431     GNUNET_break (0);
432     return GNUNET_SYSERR;
433   }
434   return GNUNET_OK;
435 }
436
437
438 /**
439  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
440  *
441  * @param cls statistics handle
442  * @param msg message received from the service, never NULL
443  * @return #GNUNET_OK if the message was well-formed
444  */
445 static void
446 handle_statistics_value (void *cls,
447                          const struct GNUNET_STATISTICS_ReplyMessage *smsg)
448 {
449   struct GNUNET_STATISTICS_Handle *h = cls;
450   const char *service;
451   const char *name;
452   uint16_t size;
453
454   if (h->current->aborted)
455     return;           /* iteration aborted, don't bother */
456
457   size = ntohs (smsg->header.size);
458   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
459   GNUNET_assert (size ==
460                  GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
461                                                  size,
462                                                  2,
463                                                  &service,
464                                                  &name));
465   LOG (GNUNET_ERROR_TYPE_DEBUG,
466        "Received valid statistic on `%s:%s': %llu\n",
467        service, name,
468        GNUNET_ntohll (smsg->value));
469   if (GNUNET_OK !=
470       h->current->proc (h->current->cls,
471                         service,
472                         name,
473                         GNUNET_ntohll (smsg->value),
474                         0 !=
475                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
476   {
477     LOG (GNUNET_ERROR_TYPE_DEBUG,
478          "Processing of remaining statistics aborted by client.\n");
479     h->current->aborted = GNUNET_YES;
480   }
481 }
482
483
484 /**
485  * We have received a watch value from the service.  Process it.
486  *
487  * @param cls statistics handle
488  * @param msg the watch value message
489  */
490 static void
491 handle_statistics_watch_value (void *cls,
492                                const struct GNUNET_STATISTICS_WatchValueMessage *wvm)
493 {
494   struct GNUNET_STATISTICS_Handle *h = cls;
495   struct GNUNET_STATISTICS_WatchEntry *w;
496   uint32_t wid;
497
498   GNUNET_break (0 == ntohl (wvm->reserved));
499   wid = ntohl (wvm->wid);
500   if (wid >= h->watches_size)
501   {
502     do_disconnect (h);
503     reconnect_later (h);
504     return;
505   }
506   w = h->watches[wid];
507   if (NULL == w)
508     return;
509   (void) w->proc (w->proc_cls,
510                   w->subsystem,
511                   w->name,
512                   GNUNET_ntohll (wvm->value),
513                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
514 }
515
516
517 /**
518  * Generic error handler, called with the appropriate error code and
519  * the same closure specified at the creation of the message queue.
520  * Not every message queue implementation supports an error handler.
521  *
522  * @param cls closure with the `struct GNUNET_STATISTICS_Handle *`
523  * @param error error code
524  */
525 static void
526 mq_error_handler (void *cls,
527                   enum GNUNET_MQ_Error error)
528 {
529   struct GNUNET_STATISTICS_Handle *h = cls;
530
531   if (GNUNET_NO != h->do_destroy)
532   {
533     h->do_destroy = GNUNET_NO;
534     GNUNET_STATISTICS_destroy (h,
535                                GNUNET_NO);
536     return;
537   }
538   do_disconnect (h);
539   reconnect_later (h);
540 }
541
542
543 /**
544  * Task used to destroy the statistics handle.
545  *
546  * @param cls the `struct GNUNET_STATISTICS_Handle`
547  */
548 static void
549 destroy_task (void *cls)
550 {
551   struct GNUNET_STATISTICS_Handle *h = cls;
552
553   GNUNET_STATISTICS_destroy (h, GNUNET_NO);
554 }
555
556
557 /**
558  * Handle a #GNUNET_MESSAGE_TYPE_TEST (sic) message. We receive this
559  * message at the end of the shutdown when the service confirms that
560  * all data has been written to disk.
561  *
562  * @param cls our `struct GNUNET_STATISTICS_Handle *`
563  * @param msg the message
564  */
565 static void
566 handle_test (void *cls,
567              const struct GNUNET_MessageHeader *msg)
568 {
569   struct GNUNET_STATISTICS_Handle *h = cls;
570
571   if (GNUNET_SYSERR != h->do_destroy)
572   {
573     /* not in shutdown, why do we get 'TEST'? */
574     GNUNET_break (0);
575     do_disconnect (h);
576     reconnect_later (h);
577     return;
578   }
579   h->do_destroy = GNUNET_NO;
580   GNUNET_SCHEDULER_add_now (&destroy_task,
581                             h);
582 }
583
584
585 /**
586  * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive
587  * this message in response to a query to indicate that there are no
588  * further matching results.
589  *
590  * @param cls our `struct GNUNET_STATISTICS_Handle *`
591  * @param msg the message
592  */
593 static void
594 handle_statistics_end (void *cls,
595                        const struct GNUNET_MessageHeader *msg)
596 {
597   struct GNUNET_STATISTICS_Handle *h = cls;
598   struct GNUNET_STATISTICS_GetHandle *c;
599
600   LOG (GNUNET_ERROR_TYPE_DEBUG,
601        "Received end of statistics marker\n");
602   if (NULL == (c = h->current))
603   {
604     GNUNET_break (0);
605     do_disconnect (h);
606     reconnect_later (h);
607     return;
608   }
609   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
610   h->current = NULL;
611   schedule_action (h);
612   if (NULL != c->cont)
613   {
614     c->cont (c->cls,
615              GNUNET_OK);
616     c->cont = NULL;
617   }
618   free_action_item (c);
619 }
620
621
622 /**
623  * Try to (re)connect to the statistics service.
624  *
625  * @param h statistics handle to reconnect
626  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
627  */
628 static int
629 try_connect (struct GNUNET_STATISTICS_Handle *h)
630 {
631   GNUNET_MQ_hd_fixed_size (test,
632                            GNUNET_MESSAGE_TYPE_TEST,
633                            struct GNUNET_MessageHeader);
634   GNUNET_MQ_hd_fixed_size (statistics_end,
635                            GNUNET_MESSAGE_TYPE_STATISTICS_END,
636                            struct GNUNET_MessageHeader);
637   GNUNET_MQ_hd_var_size (statistics_value,
638                          GNUNET_MESSAGE_TYPE_STATISTICS_VALUE,
639                          struct GNUNET_STATISTICS_ReplyMessage);
640   GNUNET_MQ_hd_fixed_size (statistics_watch_value,
641                            GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE,
642                            struct GNUNET_STATISTICS_WatchValueMessage);
643   struct GNUNET_MQ_MessageHandler handlers[] = {
644     make_test_handler (h),
645     make_statistics_end_handler (h),
646     make_statistics_value_handler (h),
647     make_statistics_watch_value_handler (h),
648     GNUNET_MQ_handler_end ()
649   };
650   struct GNUNET_STATISTICS_GetHandle *gh;
651   struct GNUNET_STATISTICS_GetHandle *gn;
652
653   if (NULL != h->backoff_task)
654     return GNUNET_NO;
655   if (NULL != h->mq)
656     return GNUNET_YES;
657   h->mq = GNUNET_CLIENT_connecT (h->cfg,
658                                  "statistics",
659                                  handlers,
660                                  &mq_error_handler,
661                                  h);
662   if (NULL == h->mq)
663   {
664     LOG (GNUNET_ERROR_TYPE_DEBUG,
665          "Failed to connect to statistics service!\n");
666     return GNUNET_NO;
667   }
668   gn = h->action_head;
669   while (NULL != (gh = gn))
670   {
671     gn = gh->next;
672     if (gh->type == ACTION_WATCH)
673     {
674       GNUNET_CONTAINER_DLL_remove (h->action_head,
675                                    h->action_tail,
676                                    gh);
677       free_action_item (gh);
678     }
679   }
680   for (unsigned int i = 0; i < h->watches_size; i++)
681     if (NULL != h->watches[i])
682       schedule_watch_request (h,
683                               h->watches[i]);
684   return GNUNET_YES;
685 }
686
687
688 /**
689  * We've waited long enough, reconnect now.
690  *
691  * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect
692  */
693 static void
694 reconnect_task (void *cls)
695 {
696   struct GNUNET_STATISTICS_Handle *h = cls;
697
698   h->backoff_task = NULL;
699   schedule_action (h);
700 }
701
702
703 /**
704  * Task used by #reconnect_later() to shutdown the handle
705  *
706  * @param cls the statistics handle
707  */
708 static void
709 do_destroy (void *cls)
710 {
711   struct GNUNET_STATISTICS_Handle *h = cls;
712
713   GNUNET_STATISTICS_destroy (h,
714                              GNUNET_NO);
715 }
716
717
718 /**
719  * Reconnect at a later time, respecting back-off.
720  *
721  * @param h statistics handle
722  */
723 static void
724 reconnect_later (struct GNUNET_STATISTICS_Handle *h)
725 {
726   int loss;
727   struct GNUNET_STATISTICS_GetHandle *gh;
728
729   GNUNET_assert (NULL == h->backoff_task);
730   if (GNUNET_YES == h->do_destroy)
731   {
732     /* So we are shutting down and the service is not reachable.
733      * Chances are that it's down for good and we are not going to connect to
734      * it anymore.
735      * Give up and don't sync the rest of the data.
736      */
737     loss = GNUNET_NO;
738     for (gh = h->action_head; NULL != gh; gh = gh->next)
739       if ( (gh->make_persistent) && (ACTION_SET == gh->type) )
740         loss = GNUNET_YES;
741     if (GNUNET_YES == loss)
742       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
743                   _("Could not save some persistent statistics\n"));
744     h->do_destroy = GNUNET_NO;
745     GNUNET_SCHEDULER_add_now (&do_destroy,
746                               h);
747     return;
748   }
749   h->backoff_task
750     = GNUNET_SCHEDULER_add_delayed (h->backoff,
751                                     &reconnect_task,
752                                     h);
753   h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
754 }
755
756
757
758 /**
759  * Transmit a GET request (and if successful, start to receive
760  * the response).
761  *
762  * @param handle statistics handle
763  */
764 static void
765 transmit_get (struct GNUNET_STATISTICS_Handle *handle)
766 {
767   struct GNUNET_STATISTICS_GetHandle *c;
768   struct GNUNET_MessageHeader *hdr;
769   struct GNUNET_MQ_Envelope *env;
770   size_t slen1;
771   size_t slen2;
772
773   GNUNET_assert (NULL != (c = handle->current));
774   slen1 = strlen (c->subsystem) + 1;
775   slen2 = strlen (c->name) + 1;
776   env = GNUNET_MQ_msg_extra (hdr,
777                              slen1 + slen2,
778                              GNUNET_MESSAGE_TYPE_STATISTICS_GET);
779   GNUNET_assert (slen1 + slen2 ==
780                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
781                                              slen1 + slen2,
782                                              2,
783                                              c->subsystem,
784                                              c->name));
785   GNUNET_MQ_send (handle->mq,
786                   env);
787 }
788
789
790 /**
791  * Transmit a WATCH request (and if successful, start to receive
792  * the response).
793  *
794  * @param handle statistics handle
795  */
796 static void
797 transmit_watch (struct GNUNET_STATISTICS_Handle *handle)
798 {
799   struct GNUNET_MessageHeader *hdr;
800   struct GNUNET_MQ_Envelope *env;
801   size_t slen1;
802   size_t slen2;
803
804   LOG (GNUNET_ERROR_TYPE_DEBUG,
805        "Transmitting watch request for `%s'\n",
806        handle->current->name);
807   slen1 = strlen (handle->current->subsystem) + 1;
808   slen2 = strlen (handle->current->name) + 1;
809   env = GNUNET_MQ_msg_extra (hdr,
810                              slen1 + slen2,
811                              GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
812   GNUNET_assert (slen1 + slen2 ==
813                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
814                                              slen1 + slen2,
815                                              2,
816                                              handle->current->subsystem,
817                                              handle->current->name));
818   GNUNET_MQ_send (handle->mq,
819                   env);
820   GNUNET_assert (NULL == handle->current->cont);
821   free_action_item (handle->current);
822   handle->current = NULL;
823 }
824
825
826 /**
827  * Transmit a SET/UPDATE request.
828  *
829  * @param handle statistics handle
830  */
831 static void
832 transmit_set (struct GNUNET_STATISTICS_Handle *handle)
833 {
834   struct GNUNET_STATISTICS_SetMessage *r;
835   struct GNUNET_MQ_Envelope *env;
836   size_t slen;
837   size_t nlen;
838
839   slen = strlen (handle->current->subsystem) + 1;
840   nlen = strlen (handle->current->name) + 1;
841   env = GNUNET_MQ_msg_extra (r,
842                              slen + nlen,
843                              GNUNET_MESSAGE_TYPE_STATISTICS_SET);
844   r->flags = 0;
845   r->value = GNUNET_htonll (handle->current->value);
846   if (handle->current->make_persistent)
847     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
848   if (handle->current->type == ACTION_UPDATE)
849     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
850   GNUNET_assert (slen + nlen ==
851                  GNUNET_STRINGS_buffer_fill ((char *) &r[1],
852                                              slen + nlen,
853                                              2,
854                                              handle->current->subsystem,
855                                              handle->current->name));
856   GNUNET_assert (NULL == handle->current->cont);
857   free_action_item (handle->current);
858   handle->current = NULL;
859   update_memory_statistics (handle);
860   GNUNET_MQ_send (handle->mq,
861                   env);
862 }
863
864
865 /**
866  * Get handle for the statistics service.
867  *
868  * @param subsystem name of subsystem using the service
869  * @param cfg services configuration in use
870  * @return handle to use
871  */
872 struct GNUNET_STATISTICS_Handle *
873 GNUNET_STATISTICS_create (const char *subsystem,
874                           const struct GNUNET_CONFIGURATION_Handle *cfg)
875 {
876   struct GNUNET_STATISTICS_Handle *ret;
877
878   if (GNUNET_YES ==
879       GNUNET_CONFIGURATION_get_value_yesno (cfg,
880                                             "statistics",
881                                             "DISABLE"))
882     return NULL;
883   ret = GNUNET_new (struct GNUNET_STATISTICS_Handle);
884   ret->cfg = cfg;
885   ret->subsystem = GNUNET_strdup (subsystem);
886   ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
887   return ret;
888 }
889
890
891 /**
892  * Destroy a handle (free all state associated with
893  * it).
894  *
895  * @param h statistics handle to destroy
896  * @param sync_first set to #GNUNET_YES if pending SET requests should
897  *        be completed
898  */
899 void
900 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
901                            int sync_first)
902 {
903   struct GNUNET_STATISTICS_GetHandle *pos;
904   struct GNUNET_STATISTICS_GetHandle *next;
905
906   if (NULL == h)
907     return;
908   GNUNET_assert (GNUNET_NO == h->do_destroy); // Don't call twice.
909   if (NULL != h->backoff_task)
910   {
911     GNUNET_SCHEDULER_cancel (h->backoff_task);
912     h->backoff_task = NULL;
913   }
914   if ( (sync_first) &&
915        (GNUNET_YES == try_connect (h)) )
916   {
917     if ( (NULL != h->current) &&
918          (ACTION_GET == h->current->type) )
919       h->current->aborted = GNUNET_YES;
920     next = h->action_head;
921     while (NULL != (pos = next))
922     {
923       next = pos->next;
924       if ( (ACTION_GET == pos->type) ||
925            (ACTION_WATCH == pos->type) ||
926            (GNUNET_NO == pos->make_persistent) )
927       {
928         GNUNET_CONTAINER_DLL_remove (h->action_head,
929                                      h->action_tail,
930                                      pos);
931         free_action_item (pos);
932       }
933     }
934     h->do_destroy = GNUNET_YES;
935     schedule_action (h);
936     return; /* do not finish destruction just yet */
937   }
938   /* do clean up all */
939   while (NULL != (pos = h->action_head))
940   {
941     GNUNET_CONTAINER_DLL_remove (h->action_head,
942                                  h->action_tail,
943                                  pos);
944     free_action_item (pos);
945   }
946   do_disconnect (h);
947   for (unsigned int i = 0; i < h->watches_size; i++)
948   {
949     if (NULL == h->watches[i])
950       continue;
951     GNUNET_free (h->watches[i]->subsystem);
952     GNUNET_free (h->watches[i]->name);
953     GNUNET_free (h->watches[i]);
954   }
955   GNUNET_array_grow (h->watches,
956                      h->watches_size,
957                      0);
958   GNUNET_free (h->subsystem);
959   GNUNET_free (h);
960 }
961
962
963 /**
964  * Schedule the next action to be performed.
965  *
966  * @param h statistics handle
967  */
968 static void
969 schedule_action (struct GNUNET_STATISTICS_Handle *h)
970 {
971   if (NULL != h->backoff_task)
972     return;                     /* action already pending */
973   if (GNUNET_YES != try_connect (h))
974   {
975     reconnect_later (h);
976     return;
977   }
978   /* schedule next action */
979   while (NULL == h->current)
980   {
981     h->current = h->action_head;
982     if (NULL == h->current)
983     {
984       struct GNUNET_MessageHeader *hdr;
985       struct GNUNET_MQ_Envelope *env;
986
987       if (GNUNET_YES != h->do_destroy)
988         return; /* nothing to do */
989       /* let service know that we're done */
990       h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
991       env = GNUNET_MQ_msg (hdr,
992                            GNUNET_MESSAGE_TYPE_TEST);
993       GNUNET_MQ_send (h->mq,
994                       env);
995       return;
996     }
997     GNUNET_CONTAINER_DLL_remove (h->action_head,
998                                  h->action_tail,
999                                  h->current);
1000     switch (h->current->type)
1001     {
1002     case ACTION_GET:
1003       transmit_get (h);
1004       break;
1005     case ACTION_SET:
1006     case ACTION_UPDATE:
1007       transmit_set (h);
1008       break;
1009     case ACTION_WATCH:
1010       transmit_watch (h);
1011       break;
1012     default:
1013       GNUNET_assert (0);
1014       break;
1015     }
1016   }
1017 }
1018
1019
1020 /**
1021  * Get statistic from the peer.
1022  *
1023  * @param handle identification of the statistics service
1024  * @param subsystem limit to the specified subsystem, NULL for our subsystem
1025  * @param name name of the statistic value, NULL for all values
1026  * @param cont continuation to call when done (can be NULL)
1027  *        This callback CANNOT destroy the statistics handle in the same call.
1028  * @param proc function to call on each value
1029  * @param cls closure for @a cont and @a proc
1030  * @return NULL on error
1031  */
1032 struct GNUNET_STATISTICS_GetHandle *
1033 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1034                        const char *subsystem,
1035                        const char *name,
1036                        GNUNET_STATISTICS_Callback cont,
1037                        GNUNET_STATISTICS_Iterator proc,
1038                        void *cls)
1039 {
1040   size_t slen1;
1041   size_t slen2;
1042   struct GNUNET_STATISTICS_GetHandle *ai;
1043
1044   if (NULL == handle)
1045     return NULL;
1046   GNUNET_assert (NULL != proc);
1047   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1048   if (NULL == subsystem)
1049     subsystem = "";
1050   if (NULL == name)
1051     name = "";
1052   slen1 = strlen (subsystem) + 1;
1053   slen2 = strlen (name) + 1;
1054   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
1055                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1056   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1057   ai->sh = handle;
1058   ai->subsystem = GNUNET_strdup (subsystem);
1059   ai->name = GNUNET_strdup (name);
1060   ai->cont = cont;
1061   ai->proc = proc;
1062   ai->cls = cls;
1063   ai->type = ACTION_GET;
1064   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
1065   GNUNET_CONTAINER_DLL_insert_tail (handle->action_head,
1066                                     handle->action_tail,
1067                                     ai);
1068   schedule_action (handle);
1069   return ai;
1070 }
1071
1072
1073 /**
1074  * Cancel a 'get' request.  Must be called before the 'cont'
1075  * function is called.
1076  *
1077  * @param gh handle of the request to cancel
1078  */
1079 void
1080 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1081 {
1082   if (NULL == gh)
1083     return;
1084   gh->cont = NULL;
1085   if (gh->sh->current == gh)
1086   {
1087     gh->aborted = GNUNET_YES;
1088     return;
1089   }
1090   GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
1091                                gh->sh->action_tail,
1092                                gh);
1093   GNUNET_free (gh->name);
1094   GNUNET_free (gh->subsystem);
1095   GNUNET_free (gh);
1096 }
1097
1098
1099 /**
1100  * Watch statistics from the peer (be notified whenever they change).
1101  *
1102  * @param handle identification of the statistics service
1103  * @param subsystem limit to the specified subsystem, never NULL
1104  * @param name name of the statistic value, never NULL
1105  * @param proc function to call on each value
1106  * @param proc_cls closure for @a proc
1107  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
1108  */
1109 int
1110 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1111                          const char *subsystem,
1112                          const char *name,
1113                          GNUNET_STATISTICS_Iterator proc,
1114                          void *proc_cls)
1115 {
1116   struct GNUNET_STATISTICS_WatchEntry *w;
1117
1118   if (NULL == handle)
1119     return GNUNET_SYSERR;
1120   w = GNUNET_new (struct GNUNET_STATISTICS_WatchEntry);
1121   w->subsystem = GNUNET_strdup (subsystem);
1122   w->name = GNUNET_strdup (name);
1123   w->proc = proc;
1124   w->proc_cls = proc_cls;
1125   GNUNET_array_append (handle->watches,
1126                        handle->watches_size,
1127                        w);
1128   schedule_watch_request (handle,
1129                           w);
1130   return GNUNET_OK;
1131 }
1132
1133
1134 /**
1135  * Stop watching statistics from the peer.
1136  *
1137  * @param handle identification of the statistics service
1138  * @param subsystem limit to the specified subsystem, never NULL
1139  * @param name name of the statistic value, never NULL
1140  * @param proc function to call on each value
1141  * @param proc_cls closure for @a proc
1142  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error (no such watch)
1143  */
1144 int
1145 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
1146                                 const char *subsystem,
1147                                 const char *name,
1148                                 GNUNET_STATISTICS_Iterator proc,
1149                                 void *proc_cls)
1150 {
1151   struct GNUNET_STATISTICS_WatchEntry *w;
1152
1153   if (NULL == handle)
1154     return GNUNET_SYSERR;
1155   for (unsigned int i=0;i<handle->watches_size;i++)
1156   {
1157     w = handle->watches[i];
1158     if (NULL == w)
1159       continue;
1160     if ( (w->proc == proc) &&
1161          (w->proc_cls == proc_cls) &&
1162          (0 == strcmp (w->name, name)) &&
1163          (0 == strcmp (w->subsystem, subsystem)) )
1164     {
1165       GNUNET_free (w->name);
1166       GNUNET_free (w->subsystem);
1167       GNUNET_free (w);
1168       handle->watches[i] = NULL;
1169       return GNUNET_OK;
1170     }
1171   }
1172   return GNUNET_SYSERR;
1173 }
1174
1175
1176 /**
1177  * Queue a request to change a statistic.
1178  *
1179  * @param h statistics handle
1180  * @param name name of the value
1181  * @param make_persistent  should the value be kept across restarts?
1182  * @param value new value or change
1183  * @param type type of the action (#ACTION_SET or #ACTION_UPDATE)
1184  */
1185 static void
1186 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
1187                    const char *name,
1188                    int make_persistent,
1189                    uint64_t value,
1190                    enum ActionType type)
1191 {
1192   struct GNUNET_STATISTICS_GetHandle *ai;
1193   size_t slen;
1194   size_t nlen;
1195   size_t nsize;
1196   int64_t delta;
1197
1198   slen = strlen (h->subsystem) + 1;
1199   nlen = strlen (name) + 1;
1200   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1201   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1202   {
1203     GNUNET_break (0);
1204     return;
1205   }
1206   for (ai = h->action_head; NULL != ai; ai = ai->next)
1207   {
1208     if (! ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
1209             (0 == strcmp (ai->name, name)) &&
1210             ( (ACTION_UPDATE == ai->type) ||
1211               (ACTION_SET == ai->type) ) ) )
1212       continue;
1213     if (ACTION_SET == ai->type)
1214     {
1215       if (ACTION_UPDATE == type)
1216       {
1217         delta = (int64_t) value;
1218         if (delta > 0)
1219         {
1220           /* update old set by new delta */
1221           ai->value += delta;
1222         }
1223         else
1224         {
1225           /* update old set by new delta, but never go negative */
1226           if (ai->value < -delta)
1227             ai->value = 0;
1228           else
1229             ai->value += delta;
1230         }
1231       }
1232       else
1233       {
1234         /* new set overrides old set */
1235         ai->value = value;
1236       }
1237     }
1238     else
1239     {
1240       if (ACTION_UPDATE == type)
1241       {
1242         /* make delta cummulative */
1243         delta = (int64_t) value;
1244         ai->value += delta;
1245       }
1246       else
1247       {
1248         /* drop old 'update', use new 'set' instead */
1249         ai->value = value;
1250         ai->type = type;
1251       }
1252     }
1253     ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1254     ai->make_persistent = make_persistent;
1255     return;
1256   }
1257   /* no existing entry matches, create a fresh one */
1258   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1259   ai->sh = h;
1260   ai->subsystem = GNUNET_strdup (h->subsystem);
1261   ai->name = GNUNET_strdup (name);
1262   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1263   ai->make_persistent = make_persistent;
1264   ai->msize = nsize;
1265   ai->value = value;
1266   ai->type = type;
1267   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
1268                                     h->action_tail,
1269                                     ai);
1270   schedule_action (h);
1271 }
1272
1273
1274 /**
1275  * Set statistic value for the peer.  Will always use our
1276  * subsystem (the argument used when "handle" was created).
1277  *
1278  * @param handle identification of the statistics service
1279  * @param name name of the statistic value
1280  * @param value new value to set
1281  * @param make_persistent should the value be kept across restarts?
1282  */
1283 void
1284 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1285                        const char *name,
1286                        uint64_t value,
1287                        int make_persistent)
1288 {
1289   if (NULL == handle)
1290     return;
1291   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1292   add_setter_action (handle,
1293                      name,
1294                      make_persistent,
1295                      value,
1296                      ACTION_SET);
1297 }
1298
1299
1300 /**
1301  * Set statistic value for the peer.  Will always use our
1302  * subsystem (the argument used when "handle" was created).
1303  *
1304  * @param handle identification of the statistics service
1305  * @param name name of the statistic value
1306  * @param delta change in value (added to existing value)
1307  * @param make_persistent should the value be kept across restarts?
1308  */
1309 void
1310 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1311                           const char *name,
1312                           int64_t delta,
1313                           int make_persistent)
1314 {
1315   if (NULL == handle)
1316     return;
1317   if (0 == delta)
1318     return;
1319   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1320   add_setter_action (handle,
1321                      name,
1322                      make_persistent,
1323                      (uint64_t) delta,
1324                      ACTION_UPDATE);
1325 }
1326
1327
1328 /* end of statistics_api.c */