first batch of license fixes (boring)
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15
16 /**
17  * @file statistics/statistics_api.c
18  * @brief API of the statistics service
19  * @author Christian Grothoff
20  */
21 #include "platform.h"
22 #include "gnunet_util_lib.h"
23 #include "gnunet_constants.h"
24 #include "gnunet_protocols.h"
25 #include "gnunet_statistics_service.h"
26 #include "statistics.h"
27
28 /**
29  * How long do we wait until a statistics request for setting
30  * a value times out?  (The update will be lost if the
31  * service does not react within this timeframe).
32  */
33 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "statistics-api",__VA_ARGS__)
36
37 /**
38  * Types of actions.
39  */
40 enum ActionType
41 {
42   /**
43    * Get a value.
44    */
45   ACTION_GET,
46
47   /**
48    * Set a value.
49    */
50   ACTION_SET,
51
52   /**
53    * Update a value.
54    */
55   ACTION_UPDATE,
56
57   /**
58    * Watch a value.
59    */
60   ACTION_WATCH
61 };
62
63
64 /**
65  * Entry kept for each value we are watching.
66  */
67 struct GNUNET_STATISTICS_WatchEntry
68 {
69
70   /**
71    * What subsystem is this action about? (never NULL)
72    */
73   char *subsystem;
74
75   /**
76    * What value is this action about? (never NULL)
77    */
78   char *name;
79
80   /**
81    * Function to call
82    */
83   GNUNET_STATISTICS_Iterator proc;
84
85   /**
86    * Closure for @e proc
87    */
88   void *proc_cls;
89
90 };
91
92
93 /**
94  * Linked list of things we still need to do.
95  */
96 struct GNUNET_STATISTICS_GetHandle
97 {
98
99   /**
100    * This is a doubly linked list.
101    */
102   struct GNUNET_STATISTICS_GetHandle *next;
103
104   /**
105    * This is a doubly linked list.
106    */
107   struct GNUNET_STATISTICS_GetHandle *prev;
108
109   /**
110    * Main statistics handle.
111    */
112   struct GNUNET_STATISTICS_Handle *sh;
113
114   /**
115    * What subsystem is this action about? (can be NULL)
116    */
117   char *subsystem;
118
119   /**
120    * What value is this action about? (can be NULL)
121    */
122   char *name;
123
124   /**
125    * Continuation to call once action is complete.
126    */
127   GNUNET_STATISTICS_Callback cont;
128
129   /**
130    * Function to call (for GET actions only).
131    */
132   GNUNET_STATISTICS_Iterator proc;
133
134   /**
135    * Closure for @e proc and @e cont.
136    */
137   void *cls;
138
139   /**
140    * Timeout for this action.
141    */
142   struct GNUNET_TIME_Absolute timeout;
143
144   /**
145    * Associated value.
146    */
147   uint64_t value;
148
149   /**
150    * Flag for SET/UPDATE actions.
151    */
152   int make_persistent;
153
154   /**
155    * Has the current iteration been aborted; for GET actions.
156    */
157   int aborted;
158
159   /**
160    * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH?
161    */
162   enum ActionType type;
163
164   /**
165    * Size of the message that we will be transmitting.
166    */
167   uint16_t msize;
168
169 };
170
171
172 /**
173  * Handle for the service.
174  */
175 struct GNUNET_STATISTICS_Handle
176 {
177   /**
178    * Name of our subsystem.
179    */
180   char *subsystem;
181
182   /**
183    * Configuration to use.
184    */
185   const struct GNUNET_CONFIGURATION_Handle *cfg;
186
187   /**
188    * Message queue to the service.
189    */
190   struct GNUNET_MQ_Handle *mq;
191
192   /**
193    * Head of the linked list of pending actions (first action
194    * to be performed).
195    */
196   struct GNUNET_STATISTICS_GetHandle *action_head;
197
198   /**
199    * Tail of the linked list of actions (for fast append).
200    */
201   struct GNUNET_STATISTICS_GetHandle *action_tail;
202
203   /**
204    * Action we are currently busy with (action request has been
205    * transmitted, we're now receiving the response from the
206    * service).
207    */
208   struct GNUNET_STATISTICS_GetHandle *current;
209
210   /**
211    * Array of watch entries.
212    */
213   struct GNUNET_STATISTICS_WatchEntry **watches;
214
215   /**
216    * Task doing exponential back-off trying to reconnect.
217    */
218   struct GNUNET_SCHEDULER_Task *backoff_task;
219
220   /**
221    * Task for running #do_destroy().
222    */
223   struct GNUNET_SCHEDULER_Task *destroy_task;
224
225   /**
226    * Time for next connect retry.
227    */
228   struct GNUNET_TIME_Relative backoff;
229
230   /**
231    * Maximum heap size observed so far (if available).
232    */
233   uint64_t peak_heap_size;
234
235   /**
236    * Maximum resident set side observed so far (if available).
237    */
238   uint64_t peak_rss;
239
240   /**
241    * Size of the @e watches array.
242    */
243   unsigned int watches_size;
244
245   /**
246    * Should this handle auto-destruct once all actions have
247    * been processed?
248    */
249   int do_destroy;
250
251   /**
252    * Are we currently receiving from the service?
253    */
254   int receiving;
255
256 };
257
258
259 /**
260  * Obtain statistics about this process's memory consumption and
261  * report those as well (if they changed).
262  */
263 static void
264 update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
265 {
266 #if ENABLE_HEAP_STATISTICS
267   uint64_t current_heap_size = 0;
268   uint64_t current_rss = 0;
269
270   if (GNUNET_NO != h->do_destroy)
271     return;
272 #if HAVE_MALLINFO
273   {
274     struct mallinfo mi;
275
276     mi = mallinfo();
277     current_heap_size = mi.uordblks + mi.fordblks;
278   }
279 #endif
280 #if HAVE_GETRUSAGE
281   {
282     struct rusage ru;
283
284     if (0 == getrusage (RUSAGE_SELF, &ru))
285     {
286       current_rss = 1024LL * ru.ru_maxrss;
287     }
288   }
289 #endif
290   if (current_heap_size > h->peak_heap_size)
291   {
292     h->peak_heap_size = current_heap_size;
293     GNUNET_STATISTICS_set (h,
294                            "# peak heap size",
295                            current_heap_size,
296                            GNUNET_NO);
297   }
298   if (current_rss > h->peak_rss)
299   {
300     h->peak_rss = current_rss;
301     GNUNET_STATISTICS_set (h,
302                            "# peak resident set size",
303                            current_rss,
304                            GNUNET_NO);
305   }
306 #endif
307 }
308
309
310 /**
311  * Reconnect at a later time, respecting back-off.
312  *
313  * @param h statistics handle
314  */
315 static void
316 reconnect_later (struct GNUNET_STATISTICS_Handle *h);
317
318
319 /**
320  * Schedule the next action to be performed.
321  *
322  * @param cls statistics handle to reconnect
323  */
324 static void
325 schedule_action (void *cls);
326
327
328 /**
329  * Transmit request to service that we want to watch
330  * the development of a particular value.
331  *
332  * @param h statistics handle
333  * @param watch watch entry of the value to watch
334  */
335 static void
336 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
337                         struct GNUNET_STATISTICS_WatchEntry *watch)
338 {
339   struct GNUNET_STATISTICS_GetHandle *ai;
340   size_t slen;
341   size_t nlen;
342   size_t nsize;
343
344   slen = strlen (watch->subsystem) + 1;
345   nlen = strlen (watch->name) + 1;
346   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
347   if (nsize >= GNUNET_MAX_MESSAGE_SIZE)
348   {
349     GNUNET_break (0);
350     return;
351   }
352   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
353   ai->sh = h;
354   ai->subsystem = GNUNET_strdup (watch->subsystem);
355   ai->name = GNUNET_strdup (watch->name);
356   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
357   ai->msize = nsize;
358   ai->type = ACTION_WATCH;
359   ai->proc = watch->proc;
360   ai->cls = watch->proc_cls;
361   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
362                                     h->action_tail,
363                                     ai);
364   schedule_action (h);
365 }
366
367
368 /**
369  * Free memory associated with the given action item.
370  *
371  * @param gh action item to free
372  */
373 static void
374 free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
375 {
376   GNUNET_free_non_null (gh->subsystem);
377   GNUNET_free_non_null (gh->name);
378   GNUNET_free (gh);
379 }
380
381
382 /**
383  * Disconnect from the statistics service.
384  *
385  * @param h statistics handle to disconnect from
386  */
387 static void
388 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
389 {
390   struct GNUNET_STATISTICS_GetHandle *c;
391
392   h->receiving = GNUNET_NO;
393   if (NULL != (c = h->current))
394   {
395     h->current = NULL;
396     if ( (NULL != c->cont) &&
397          (GNUNET_YES != c->aborted) )
398     {
399       c->cont (c->cls,
400                GNUNET_SYSERR);
401       c->cont = NULL;
402     }
403     free_action_item (c);
404   }
405   if (NULL != h->mq)
406   {
407     GNUNET_MQ_destroy (h->mq);
408     h->mq = NULL;
409   }
410 }
411
412
413 /**
414  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
415  *
416  * @param cls statistics handle
417  * @param smsg message received from the service, never NULL
418  * @return #GNUNET_OK if the message was well-formed
419  */
420 static int
421 check_statistics_value (void *cls,
422                         const struct GNUNET_STATISTICS_ReplyMessage *smsg)
423 {
424   const char *service;
425   const char *name;
426   uint16_t size;
427
428   size = ntohs (smsg->header.size);
429   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
430   if (size !=
431       GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
432                                       size,
433                                       2,
434                                       &service,
435                                       &name))
436   {
437     GNUNET_break (0);
438     return GNUNET_SYSERR;
439   }
440   return GNUNET_OK;
441 }
442
443
444 /**
445  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
446  *
447  * @param cls statistics handle
448  * @param msg message received from the service, never NULL
449  * @return #GNUNET_OK if the message was well-formed
450  */
451 static void
452 handle_statistics_value (void *cls,
453                          const struct GNUNET_STATISTICS_ReplyMessage *smsg)
454 {
455   struct GNUNET_STATISTICS_Handle *h = cls;
456   const char *service;
457   const char *name;
458   uint16_t size;
459
460   if (h->current->aborted)
461     return;           /* iteration aborted, don't bother */
462
463   size = ntohs (smsg->header.size);
464   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
465   GNUNET_assert (size ==
466                  GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
467                                                  size,
468                                                  2,
469                                                  &service,
470                                                  &name));
471   LOG (GNUNET_ERROR_TYPE_DEBUG,
472        "Received valid statistic on `%s:%s': %llu\n",
473        service, name,
474        GNUNET_ntohll (smsg->value));
475   if (GNUNET_OK !=
476       h->current->proc (h->current->cls,
477                         service,
478                         name,
479                         GNUNET_ntohll (smsg->value),
480                         0 !=
481                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
482   {
483     LOG (GNUNET_ERROR_TYPE_DEBUG,
484          "Processing of remaining statistics aborted by client.\n");
485     h->current->aborted = GNUNET_YES;
486   }
487 }
488
489
490 /**
491  * We have received a watch value from the service.  Process it.
492  *
493  * @param cls statistics handle
494  * @param msg the watch value message
495  */
496 static void
497 handle_statistics_watch_value (void *cls,
498                                const struct GNUNET_STATISTICS_WatchValueMessage *wvm)
499 {
500   struct GNUNET_STATISTICS_Handle *h = cls;
501   struct GNUNET_STATISTICS_WatchEntry *w;
502   uint32_t wid;
503
504   GNUNET_break (0 == ntohl (wvm->reserved));
505   wid = ntohl (wvm->wid);
506   if (wid >= h->watches_size)
507   {
508     do_disconnect (h);
509     reconnect_later (h);
510     return;
511   }
512   w = h->watches[wid];
513   if (NULL == w)
514     return;
515   (void) w->proc (w->proc_cls,
516                   w->subsystem,
517                   w->name,
518                   GNUNET_ntohll (wvm->value),
519                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
520 }
521
522
523 /**
524  * Generic error handler, called with the appropriate error code and
525  * the same closure specified at the creation of the message queue.
526  * Not every message queue implementation supports an error handler.
527  *
528  * @param cls closure with the `struct GNUNET_STATISTICS_Handle *`
529  * @param error error code
530  */
531 static void
532 mq_error_handler (void *cls,
533                   enum GNUNET_MQ_Error error)
534 {
535   struct GNUNET_STATISTICS_Handle *h = cls;
536
537   if (GNUNET_NO != h->do_destroy)
538   {
539     h->do_destroy = GNUNET_NO;
540     if (NULL != h->destroy_task)
541     {
542       GNUNET_SCHEDULER_cancel (h->destroy_task);
543       h->destroy_task = NULL;
544     }
545     GNUNET_STATISTICS_destroy (h,
546                                GNUNET_NO);
547     return;
548   }
549   do_disconnect (h);
550   reconnect_later (h);
551 }
552
553
554 /**
555  * Task used to destroy the statistics handle.
556  *
557  * @param cls the `struct GNUNET_STATISTICS_Handle`
558  */
559 static void
560 do_destroy (void *cls)
561 {
562   struct GNUNET_STATISTICS_Handle *h = cls;
563
564   h->destroy_task = NULL;
565   h->do_destroy = GNUNET_NO;
566   LOG (GNUNET_ERROR_TYPE_DEBUG,
567        "Running final destruction\n");
568   GNUNET_STATISTICS_destroy (h,
569                              GNUNET_NO);
570 }
571
572
573 /**
574  * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM
575  * message. We receive this message at the end of the shutdown when
576  * the service confirms that all data has been written to disk.
577  *
578  * @param cls our `struct GNUNET_STATISTICS_Handle *`
579  * @param msg the message
580  */
581 static void
582 handle_disconnect_confirm (void *cls,
583                            const struct GNUNET_MessageHeader *msg)
584 {
585   struct GNUNET_STATISTICS_Handle *h = cls;
586
587   if (GNUNET_SYSERR != h->do_destroy)
588   {
589     /* not in shutdown, why do we get 'TEST'? */
590     GNUNET_break (0);
591     do_disconnect (h);
592     reconnect_later (h);
593     return;
594   }
595   LOG (GNUNET_ERROR_TYPE_DEBUG,
596        "Received DISCONNNECT_CONFIRM message from statistics, can complete disconnect\n");
597   if (NULL != h->destroy_task)
598     GNUNET_SCHEDULER_cancel (h->destroy_task);
599   h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
600                                               h);
601 }
602
603
604 /**
605  * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive
606  * this message in response to a query to indicate that there are no
607  * further matching results.
608  *
609  * @param cls our `struct GNUNET_STATISTICS_Handle *`
610  * @param msg the message
611  */
612 static void
613 handle_statistics_end (void *cls,
614                        const struct GNUNET_MessageHeader *msg)
615 {
616   struct GNUNET_STATISTICS_Handle *h = cls;
617   struct GNUNET_STATISTICS_GetHandle *c;
618
619   LOG (GNUNET_ERROR_TYPE_DEBUG,
620        "Received end of statistics marker\n");
621   if (NULL == (c = h->current))
622   {
623     GNUNET_break (0);
624     do_disconnect (h);
625     reconnect_later (h);
626     return;
627   }
628   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
629   h->current = NULL;
630   schedule_action (h);
631   if (NULL != c->cont)
632   {
633     c->cont (c->cls,
634              GNUNET_OK);
635     c->cont = NULL;
636   }
637   free_action_item (c);
638 }
639
640
641 /**
642  * Try to (re)connect to the statistics service.
643  *
644  * @param h statistics handle to reconnect
645  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
646  */
647 static int
648 try_connect (struct GNUNET_STATISTICS_Handle *h)
649 {
650   struct GNUNET_MQ_MessageHandler handlers[] = {
651     GNUNET_MQ_hd_fixed_size (disconnect_confirm,
652                              GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM,
653                              struct GNUNET_MessageHeader,
654                              h),
655     GNUNET_MQ_hd_fixed_size (statistics_end,
656                              GNUNET_MESSAGE_TYPE_STATISTICS_END,
657                              struct GNUNET_MessageHeader,
658                              h),
659     GNUNET_MQ_hd_var_size (statistics_value,
660                            GNUNET_MESSAGE_TYPE_STATISTICS_VALUE,
661                            struct GNUNET_STATISTICS_ReplyMessage,
662                            h),
663     GNUNET_MQ_hd_fixed_size (statistics_watch_value,
664                              GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE,
665                              struct GNUNET_STATISTICS_WatchValueMessage,
666                              h),
667     GNUNET_MQ_handler_end ()
668   };
669   struct GNUNET_STATISTICS_GetHandle *gh;
670   struct GNUNET_STATISTICS_GetHandle *gn;
671
672   if (NULL != h->backoff_task)
673     return GNUNET_NO;
674   if (NULL != h->mq)
675     return GNUNET_YES;
676   h->mq = GNUNET_CLIENT_connect (h->cfg,
677                                  "statistics",
678                                  handlers,
679                                  &mq_error_handler,
680                                  h);
681   if (NULL == h->mq)
682   {
683     LOG (GNUNET_ERROR_TYPE_DEBUG,
684          "Failed to connect to statistics service!\n");
685     return GNUNET_NO;
686   }
687   gn = h->action_head;
688   while (NULL != (gh = gn))
689   {
690     gn = gh->next;
691     if (gh->type == ACTION_WATCH)
692     {
693       GNUNET_CONTAINER_DLL_remove (h->action_head,
694                                    h->action_tail,
695                                    gh);
696       free_action_item (gh);
697     }
698   }
699   for (unsigned int i = 0; i < h->watches_size; i++)
700     if (NULL != h->watches[i])
701       schedule_watch_request (h,
702                               h->watches[i]);
703   return GNUNET_YES;
704 }
705
706
707 /**
708  * We've waited long enough, reconnect now.
709  *
710  * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect
711  */
712 static void
713 reconnect_task (void *cls)
714 {
715   struct GNUNET_STATISTICS_Handle *h = cls;
716
717   h->backoff_task = NULL;
718   schedule_action (h);
719 }
720
721
722 /**
723  * Reconnect at a later time, respecting back-off.
724  *
725  * @param h statistics handle
726  */
727 static void
728 reconnect_later (struct GNUNET_STATISTICS_Handle *h)
729 {
730   int loss;
731   struct GNUNET_STATISTICS_GetHandle *gh;
732
733   GNUNET_assert (NULL == h->backoff_task);
734   if (GNUNET_YES == h->do_destroy)
735   {
736     /* So we are shutting down and the service is not reachable.
737      * Chances are that it's down for good and we are not going to connect to
738      * it anymore.
739      * Give up and don't sync the rest of the data.
740      */
741     loss = GNUNET_NO;
742     for (gh = h->action_head; NULL != gh; gh = gh->next)
743       if ( (gh->make_persistent) &&
744            (ACTION_SET == gh->type) )
745         loss = GNUNET_YES;
746     if (GNUNET_YES == loss)
747       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
748                   _("Could not save some persistent statistics\n"));
749     if (NULL != h->destroy_task)
750       GNUNET_SCHEDULER_cancel (h->destroy_task);
751     h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
752                                                 h);
753     return;
754   }
755   h->backoff_task
756     = GNUNET_SCHEDULER_add_delayed (h->backoff,
757                                     &reconnect_task,
758                                     h);
759   h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
760 }
761
762
763
764 /**
765  * Transmit a GET request (and if successful, start to receive
766  * the response).
767  *
768  * @param handle statistics handle
769  */
770 static void
771 transmit_get (struct GNUNET_STATISTICS_Handle *handle)
772 {
773   struct GNUNET_STATISTICS_GetHandle *c;
774   struct GNUNET_MessageHeader *hdr;
775   struct GNUNET_MQ_Envelope *env;
776   size_t slen1;
777   size_t slen2;
778
779   GNUNET_assert (NULL != (c = handle->current));
780   slen1 = strlen (c->subsystem) + 1;
781   slen2 = strlen (c->name) + 1;
782   env = GNUNET_MQ_msg_extra (hdr,
783                              slen1 + slen2,
784                              GNUNET_MESSAGE_TYPE_STATISTICS_GET);
785   GNUNET_assert (slen1 + slen2 ==
786                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
787                                              slen1 + slen2,
788                                              2,
789                                              c->subsystem,
790                                              c->name));
791   GNUNET_MQ_notify_sent (env,
792                          &schedule_action,
793                          handle);
794   GNUNET_MQ_send (handle->mq,
795                   env);
796 }
797
798
799 /**
800  * Transmit a WATCH request (and if successful, start to receive
801  * the response).
802  *
803  * @param handle statistics handle
804  */
805 static void
806 transmit_watch (struct GNUNET_STATISTICS_Handle *handle)
807 {
808   struct GNUNET_MessageHeader *hdr;
809   struct GNUNET_MQ_Envelope *env;
810   size_t slen1;
811   size_t slen2;
812
813   LOG (GNUNET_ERROR_TYPE_DEBUG,
814        "Transmitting watch request for `%s'\n",
815        handle->current->name);
816   slen1 = strlen (handle->current->subsystem) + 1;
817   slen2 = strlen (handle->current->name) + 1;
818   env = GNUNET_MQ_msg_extra (hdr,
819                              slen1 + slen2,
820                              GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
821   GNUNET_assert (slen1 + slen2 ==
822                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
823                                              slen1 + slen2,
824                                              2,
825                                              handle->current->subsystem,
826                                              handle->current->name));
827   GNUNET_MQ_notify_sent (env,
828                          &schedule_action,
829                          handle);
830   GNUNET_MQ_send (handle->mq,
831                   env);
832   GNUNET_assert (NULL == handle->current->cont);
833   free_action_item (handle->current);
834   handle->current = NULL;
835   schedule_action (handle);
836 }
837
838
839 /**
840  * Transmit a SET/UPDATE request.
841  *
842  * @param handle statistics handle
843  */
844 static void
845 transmit_set (struct GNUNET_STATISTICS_Handle *handle)
846 {
847   struct GNUNET_STATISTICS_SetMessage *r;
848   struct GNUNET_MQ_Envelope *env;
849   size_t slen;
850   size_t nlen;
851
852   slen = strlen (handle->current->subsystem) + 1;
853   nlen = strlen (handle->current->name) + 1;
854   env = GNUNET_MQ_msg_extra (r,
855                              slen + nlen,
856                              GNUNET_MESSAGE_TYPE_STATISTICS_SET);
857   r->flags = 0;
858   r->value = GNUNET_htonll (handle->current->value);
859   if (handle->current->make_persistent)
860     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
861   if (handle->current->type == ACTION_UPDATE)
862     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
863   GNUNET_assert (slen + nlen ==
864                  GNUNET_STRINGS_buffer_fill ((char *) &r[1],
865                                              slen + nlen,
866                                              2,
867                                              handle->current->subsystem,
868                                              handle->current->name));
869   GNUNET_assert (NULL == handle->current->cont);
870   free_action_item (handle->current);
871   handle->current = NULL;
872   update_memory_statistics (handle);
873   GNUNET_MQ_notify_sent (env,
874                          &schedule_action,
875                          handle);
876   GNUNET_MQ_send (handle->mq,
877                   env);
878 }
879
880
881 /**
882  * Get handle for the statistics service.
883  *
884  * @param subsystem name of subsystem using the service
885  * @param cfg services configuration in use
886  * @return handle to use
887  */
888 struct GNUNET_STATISTICS_Handle *
889 GNUNET_STATISTICS_create (const char *subsystem,
890                           const struct GNUNET_CONFIGURATION_Handle *cfg)
891 {
892   struct GNUNET_STATISTICS_Handle *h;
893
894   if (GNUNET_YES ==
895       GNUNET_CONFIGURATION_get_value_yesno (cfg,
896                                             "statistics",
897                                             "DISABLE"))
898     return NULL;
899   h = GNUNET_new (struct GNUNET_STATISTICS_Handle);
900   h->cfg = cfg;
901   h->subsystem = GNUNET_strdup (subsystem);
902   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
903   return h;
904 }
905
906
907 /**
908  * Destroy a handle (free all state associated with
909  * it).
910  *
911  * @param h statistics handle to destroy
912  * @param sync_first set to #GNUNET_YES if pending SET requests should
913  *        be completed
914  */
915 void
916 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
917                            int sync_first)
918 {
919   struct GNUNET_STATISTICS_GetHandle *pos;
920   struct GNUNET_STATISTICS_GetHandle *next;
921
922   if (NULL == h)
923     return;
924   GNUNET_assert (GNUNET_NO == h->do_destroy); /* Don't call twice. */
925   if ( (sync_first) &&
926        (NULL != h->mq) &&
927        (0 != GNUNET_MQ_get_length (h->mq)) )
928   {
929     if ( (NULL != h->current) &&
930          (ACTION_GET == h->current->type) )
931       h->current->aborted = GNUNET_YES;
932     next = h->action_head;
933     while (NULL != (pos = next))
934     {
935       next = pos->next;
936       if ( (ACTION_GET == pos->type) ||
937            (ACTION_WATCH == pos->type) )
938       {
939         GNUNET_CONTAINER_DLL_remove (h->action_head,
940                                      h->action_tail,
941                                      pos);
942         free_action_item (pos);
943       }
944     }
945     h->do_destroy = GNUNET_YES;
946     schedule_action (h);
947     GNUNET_assert (NULL == h->destroy_task);
948     h->destroy_task
949       = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (h->backoff,
950                                                                      5),
951                                       &do_destroy,
952                                       h);
953     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
954                 "Deferring destruction\n");
955     return; /* do not finish destruction just yet */
956   }
957   /* do clean up all */
958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
959               "Cleaning all up\n");
960   while (NULL != (pos = h->action_head))
961   {
962     GNUNET_CONTAINER_DLL_remove (h->action_head,
963                                  h->action_tail,
964                                  pos);
965     free_action_item (pos);
966   }
967   do_disconnect (h);
968   if (NULL != h->backoff_task)
969   {
970     GNUNET_SCHEDULER_cancel (h->backoff_task);
971     h->backoff_task = NULL;
972   }
973   if (NULL != h->destroy_task)
974   {
975     GNUNET_break (0);
976     GNUNET_SCHEDULER_cancel (h->destroy_task);
977     h->destroy_task = NULL;
978   }
979   for (unsigned int i = 0; i < h->watches_size; i++)
980   {
981     if (NULL == h->watches[i])
982       continue;
983     GNUNET_free (h->watches[i]->subsystem);
984     GNUNET_free (h->watches[i]->name);
985     GNUNET_free (h->watches[i]);
986   }
987   GNUNET_array_grow (h->watches,
988                      h->watches_size,
989                      0);
990   GNUNET_free (h->subsystem);
991   GNUNET_free (h);
992 }
993
994
995 /**
996  * Schedule the next action to be performed.
997  *
998  * @param cls statistics handle
999  */
1000 static void
1001 schedule_action (void *cls)
1002 {
1003   struct GNUNET_STATISTICS_Handle *h = cls;
1004
1005   if (NULL != h->backoff_task)
1006     return;                     /* action already pending */
1007   if (GNUNET_YES != try_connect (h))
1008   {
1009     reconnect_later (h);
1010     return;
1011   }
1012   if (0 < GNUNET_MQ_get_length (h->mq))
1013     return; /* Wait for queue to be reduced more */    
1014   /* schedule next action */
1015   while (NULL == h->current)
1016   {
1017     h->current = h->action_head;
1018     if (NULL == h->current)
1019     {
1020       struct GNUNET_MessageHeader *hdr;
1021       struct GNUNET_MQ_Envelope *env;
1022
1023       if (GNUNET_YES != h->do_destroy)
1024         return; /* nothing to do */
1025       /* let service know that we're done */
1026       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027                   "Notifying service that we are done\n");
1028       h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
1029       env = GNUNET_MQ_msg (hdr,
1030                            GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT);
1031       GNUNET_MQ_notify_sent (env,
1032                              &schedule_action,
1033                              h);
1034       GNUNET_MQ_send (h->mq,
1035                       env);
1036       return;
1037     }
1038     GNUNET_CONTAINER_DLL_remove (h->action_head,
1039                                  h->action_tail,
1040                                  h->current);
1041     switch (h->current->type)
1042     {
1043     case ACTION_GET:
1044       transmit_get (h);
1045       break;
1046     case ACTION_SET:
1047     case ACTION_UPDATE:
1048       transmit_set (h);
1049       break;
1050     case ACTION_WATCH:
1051       transmit_watch (h);
1052       break;
1053     default:
1054       GNUNET_assert (0);
1055       break;
1056     }
1057   }
1058 }
1059
1060
1061 /**
1062  * Get statistic from the peer.
1063  *
1064  * @param handle identification of the statistics service
1065  * @param subsystem limit to the specified subsystem, NULL for our subsystem
1066  * @param name name of the statistic value, NULL for all values
1067  * @param cont continuation to call when done (can be NULL)
1068  *        This callback CANNOT destroy the statistics handle in the same call.
1069  * @param proc function to call on each value
1070  * @param cls closure for @a cont and @a proc
1071  * @return NULL on error
1072  */
1073 struct GNUNET_STATISTICS_GetHandle *
1074 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1075                        const char *subsystem,
1076                        const char *name,
1077                        GNUNET_STATISTICS_Callback cont,
1078                        GNUNET_STATISTICS_Iterator proc,
1079                        void *cls)
1080 {
1081   size_t slen1;
1082   size_t slen2;
1083   struct GNUNET_STATISTICS_GetHandle *ai;
1084
1085   if (NULL == handle)
1086     return NULL;
1087   GNUNET_assert (NULL != proc);
1088   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1089   if (NULL == subsystem)
1090     subsystem = "";
1091   if (NULL == name)
1092     name = "";
1093   slen1 = strlen (subsystem) + 1;
1094   slen2 = strlen (name) + 1;
1095   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
1096                  GNUNET_MAX_MESSAGE_SIZE);
1097   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1098   ai->sh = handle;
1099   ai->subsystem = GNUNET_strdup (subsystem);
1100   ai->name = GNUNET_strdup (name);
1101   ai->cont = cont;
1102   ai->proc = proc;
1103   ai->cls = cls;
1104   ai->type = ACTION_GET;
1105   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
1106   GNUNET_CONTAINER_DLL_insert_tail (handle->action_head,
1107                                     handle->action_tail,
1108                                     ai);
1109   schedule_action (handle);
1110   return ai;
1111 }
1112
1113
1114 /**
1115  * Cancel a 'get' request.  Must be called before the 'cont'
1116  * function is called.
1117  *
1118  * @param gh handle of the request to cancel
1119  */
1120 void
1121 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1122 {
1123   if (NULL == gh)
1124     return;
1125   gh->cont = NULL;
1126   if (gh->sh->current == gh)
1127   {
1128     gh->aborted = GNUNET_YES;
1129     return;
1130   }
1131   GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
1132                                gh->sh->action_tail,
1133                                gh);
1134   GNUNET_free (gh->name);
1135   GNUNET_free (gh->subsystem);
1136   GNUNET_free (gh);
1137 }
1138
1139
1140 /**
1141  * Watch statistics from the peer (be notified whenever they change).
1142  *
1143  * @param handle identification of the statistics service
1144  * @param subsystem limit to the specified subsystem, never NULL
1145  * @param name name of the statistic value, never NULL
1146  * @param proc function to call on each value
1147  * @param proc_cls closure for @a proc
1148  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
1149  */
1150 int
1151 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1152                          const char *subsystem,
1153                          const char *name,
1154                          GNUNET_STATISTICS_Iterator proc,
1155                          void *proc_cls)
1156 {
1157   struct GNUNET_STATISTICS_WatchEntry *w;
1158
1159   if (NULL == handle)
1160     return GNUNET_SYSERR;
1161   w = GNUNET_new (struct GNUNET_STATISTICS_WatchEntry);
1162   w->subsystem = GNUNET_strdup (subsystem);
1163   w->name = GNUNET_strdup (name);
1164   w->proc = proc;
1165   w->proc_cls = proc_cls;
1166   GNUNET_array_append (handle->watches,
1167                        handle->watches_size,
1168                        w);
1169   schedule_watch_request (handle,
1170                           w);
1171   return GNUNET_OK;
1172 }
1173
1174
1175 /**
1176  * Stop watching statistics from the peer.
1177  *
1178  * @param handle identification of the statistics service
1179  * @param subsystem limit to the specified subsystem, never NULL
1180  * @param name name of the statistic value, never NULL
1181  * @param proc function to call on each value
1182  * @param proc_cls closure for @a proc
1183  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error (no such watch)
1184  */
1185 int
1186 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
1187                                 const char *subsystem,
1188                                 const char *name,
1189                                 GNUNET_STATISTICS_Iterator proc,
1190                                 void *proc_cls)
1191 {
1192   struct GNUNET_STATISTICS_WatchEntry *w;
1193
1194   if (NULL == handle)
1195     return GNUNET_SYSERR;
1196   for (unsigned int i=0;i<handle->watches_size;i++)
1197   {
1198     w = handle->watches[i];
1199     if (NULL == w)
1200       continue;
1201     if ( (w->proc == proc) &&
1202          (w->proc_cls == proc_cls) &&
1203          (0 == strcmp (w->name,
1204                        name)) &&
1205          (0 == strcmp (w->subsystem,
1206                        subsystem)) )
1207     {
1208       GNUNET_free (w->name);
1209       GNUNET_free (w->subsystem);
1210       GNUNET_free (w);
1211       handle->watches[i] = NULL;
1212       return GNUNET_OK;
1213     }
1214   }
1215   return GNUNET_SYSERR;
1216 }
1217
1218
1219 /**
1220  * Queue a request to change a statistic.
1221  *
1222  * @param h statistics handle
1223  * @param name name of the value
1224  * @param make_persistent  should the value be kept across restarts?
1225  * @param value new value or change
1226  * @param type type of the action (#ACTION_SET or #ACTION_UPDATE)
1227  */
1228 static void
1229 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
1230                    const char *name,
1231                    int make_persistent,
1232                    uint64_t value,
1233                    enum ActionType type)
1234 {
1235   struct GNUNET_STATISTICS_GetHandle *ai;
1236   size_t slen;
1237   size_t nlen;
1238   size_t nsize;
1239   int64_t delta;
1240
1241   slen = strlen (h->subsystem) + 1;
1242   nlen = strlen (name) + 1;
1243   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1244   if (nsize >= GNUNET_MAX_MESSAGE_SIZE)
1245   {
1246     GNUNET_break (0);
1247     return;
1248   }
1249   for (ai = h->action_head; NULL != ai; ai = ai->next)
1250   {
1251     if (! ( (0 == strcmp (ai->subsystem,
1252                           h->subsystem)) &&
1253             (0 == strcmp (ai->name,
1254                           name)) &&
1255             ( (ACTION_UPDATE == ai->type) ||
1256               (ACTION_SET == ai->type) ) ) )
1257       continue;
1258     if (ACTION_SET == ai->type)
1259     {
1260       if (ACTION_UPDATE == type)
1261       {
1262         delta = (int64_t) value;
1263         if (delta > 0)
1264         {
1265           /* update old set by new delta */
1266           ai->value += delta;
1267         }
1268         else
1269         {
1270           /* update old set by new delta, but never go negative */
1271           if (ai->value < -delta)
1272             ai->value = 0;
1273           else
1274             ai->value += delta;
1275         }
1276       }
1277       else
1278       {
1279         /* new set overrides old set */
1280         ai->value = value;
1281       }
1282     }
1283     else
1284     {
1285       if (ACTION_UPDATE == type)
1286       {
1287         /* make delta cummulative */
1288         delta = (int64_t) value;
1289         ai->value += delta;
1290       }
1291       else
1292       {
1293         /* drop old 'update', use new 'set' instead */
1294         ai->value = value;
1295         ai->type = type;
1296       }
1297     }
1298     ai->timeout
1299       = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1300     ai->make_persistent
1301       = make_persistent;
1302     return;
1303   }
1304   /* no existing entry matches, create a fresh one */
1305   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1306   ai->sh = h;
1307   ai->subsystem = GNUNET_strdup (h->subsystem);
1308   ai->name = GNUNET_strdup (name);
1309   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1310   ai->make_persistent = make_persistent;
1311   ai->msize = nsize;
1312   ai->value = value;
1313   ai->type = type;
1314   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
1315                                     h->action_tail,
1316                                     ai);
1317   schedule_action (h);
1318 }
1319
1320
1321 /**
1322  * Set statistic value for the peer.  Will always use our
1323  * subsystem (the argument used when "handle" was created).
1324  *
1325  * @param handle identification of the statistics service
1326  * @param name name of the statistic value
1327  * @param value new value to set
1328  * @param make_persistent should the value be kept across restarts?
1329  */
1330 void
1331 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1332                        const char *name,
1333                        uint64_t value,
1334                        int make_persistent)
1335 {
1336   if (NULL == handle)
1337     return;
1338   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1339   add_setter_action (handle,
1340                      name,
1341                      make_persistent,
1342                      value,
1343                      ACTION_SET);
1344 }
1345
1346
1347 /**
1348  * Set statistic value for the peer.  Will always use our
1349  * subsystem (the argument used when "handle" was created).
1350  *
1351  * @param handle identification of the statistics service
1352  * @param name name of the statistic value
1353  * @param delta change in value (added to existing value)
1354  * @param make_persistent should the value be kept across restarts?
1355  */
1356 void
1357 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1358                           const char *name,
1359                           int64_t delta,
1360                           int make_persistent)
1361 {
1362   if (NULL == handle)
1363     return;
1364   if (0 == delta)
1365     return;
1366   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1367   add_setter_action (handle,
1368                      name,
1369                      make_persistent,
1370                      (uint64_t) delta,
1371                      ACTION_UPDATE);
1372 }
1373
1374
1375 /* end of statistics_api.c */