commented out wrong message type
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "statistics.h"
32
33 /**
34  * How long do we wait until a statistics request for setting
35  * a value times out?  (The update will be lost if the
36  * service does not react within this timeframe).
37  */
38 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
39
40 #define LOG(kind,...) GNUNET_log_from (kind, "statistics-api",__VA_ARGS__)
41
42 /**
43  * Types of actions.
44  */
45 enum ActionType
46 {
47   /**
48    * Get a value.
49    */
50   ACTION_GET,
51
52   /**
53    * Set a value.
54    */
55   ACTION_SET,
56
57   /**
58    * Update a value.
59    */
60   ACTION_UPDATE,
61
62   /**
63    * Watch a value.
64    */
65   ACTION_WATCH
66 };
67
68
69 /**
70  * Entry kept for each value we are watching.
71  */
72 struct GNUNET_STATISTICS_WatchEntry
73 {
74
75   /**
76    * What subsystem is this action about? (never NULL)
77    */
78   char *subsystem;
79
80   /**
81    * What value is this action about? (never NULL)
82    */
83   char *name;
84
85   /**
86    * Function to call
87    */
88   GNUNET_STATISTICS_Iterator proc;
89
90   /**
91    * Closure for @e proc
92    */
93   void *proc_cls;
94
95 };
96
97
98 /**
99  * Linked list of things we still need to do.
100  */
101 struct GNUNET_STATISTICS_GetHandle
102 {
103
104   /**
105    * This is a doubly linked list.
106    */
107   struct GNUNET_STATISTICS_GetHandle *next;
108
109   /**
110    * This is a doubly linked list.
111    */
112   struct GNUNET_STATISTICS_GetHandle *prev;
113
114   /**
115    * Main statistics handle.
116    */
117   struct GNUNET_STATISTICS_Handle *sh;
118
119   /**
120    * What subsystem is this action about? (can be NULL)
121    */
122   char *subsystem;
123
124   /**
125    * What value is this action about? (can be NULL)
126    */
127   char *name;
128
129   /**
130    * Continuation to call once action is complete.
131    */
132   GNUNET_STATISTICS_Callback cont;
133
134   /**
135    * Function to call (for GET actions only).
136    */
137   GNUNET_STATISTICS_Iterator proc;
138
139   /**
140    * Closure for @e proc and @e cont.
141    */
142   void *cls;
143
144   /**
145    * Timeout for this action.
146    */
147   struct GNUNET_TIME_Absolute timeout;
148
149   /**
150    * Associated value.
151    */
152   uint64_t value;
153
154   /**
155    * Flag for SET/UPDATE actions.
156    */
157   int make_persistent;
158
159   /**
160    * Has the current iteration been aborted; for GET actions.
161    */
162   int aborted;
163
164   /**
165    * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH?
166    */
167   enum ActionType type;
168
169   /**
170    * Size of the message that we will be transmitting.
171    */
172   uint16_t msize;
173
174 };
175
176
177 /**
178  * Handle for the service.
179  */
180 struct GNUNET_STATISTICS_Handle
181 {
182   /**
183    * Name of our subsystem.
184    */
185   char *subsystem;
186
187   /**
188    * Configuration to use.
189    */
190   const struct GNUNET_CONFIGURATION_Handle *cfg;
191
192   /**
193    * Message queue to the service.
194    */
195   struct GNUNET_MQ_Handle *mq;
196
197   /**
198    * Head of the linked list of pending actions (first action
199    * to be performed).
200    */
201   struct GNUNET_STATISTICS_GetHandle *action_head;
202
203   /**
204    * Tail of the linked list of actions (for fast append).
205    */
206   struct GNUNET_STATISTICS_GetHandle *action_tail;
207
208   /**
209    * Action we are currently busy with (action request has been
210    * transmitted, we're now receiving the response from the
211    * service).
212    */
213   struct GNUNET_STATISTICS_GetHandle *current;
214
215   /**
216    * Array of watch entries.
217    */
218   struct GNUNET_STATISTICS_WatchEntry **watches;
219
220   /**
221    * Task doing exponential back-off trying to reconnect.
222    */
223   struct GNUNET_SCHEDULER_Task *backoff_task;
224
225   /**
226    * Task for running #do_destroy().
227    */
228   struct GNUNET_SCHEDULER_Task *destroy_task;
229
230   /**
231    * Time for next connect retry.
232    */
233   struct GNUNET_TIME_Relative backoff;
234
235   /**
236    * Maximum heap size observed so far (if available).
237    */
238   uint64_t peak_heap_size;
239
240   /**
241    * Maximum resident set side observed so far (if available).
242    */
243   uint64_t peak_rss;
244
245   /**
246    * Size of the @e watches array.
247    */
248   unsigned int watches_size;
249
250   /**
251    * Should this handle auto-destruct once all actions have
252    * been processed?
253    */
254   int do_destroy;
255
256   /**
257    * Are we currently receiving from the service?
258    */
259   int receiving;
260
261 };
262
263
264 /**
265  * Obtain statistics about this process's memory consumption and
266  * report those as well (if they changed).
267  */
268 static void
269 update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
270 {
271 #if ENABLE_HEAP_STATISTICS
272   uint64_t current_heap_size = 0;
273   uint64_t current_rss = 0;
274
275   if (GNUNET_NO != h->do_destroy)
276     return;
277 #if HAVE_MALLINFO
278   {
279     struct mallinfo mi;
280
281     mi = mallinfo();
282     current_heap_size = mi.uordblks + mi.fordblks;
283   }
284 #endif
285 #if HAVE_GETRUSAGE
286   {
287     struct rusage ru;
288
289     if (0 == getrusage (RUSAGE_SELF, &ru))
290     {
291       current_rss = 1024LL * ru.ru_maxrss;
292     }
293   }
294 #endif
295   if (current_heap_size > h->peak_heap_size)
296   {
297     h->peak_heap_size = current_heap_size;
298     GNUNET_STATISTICS_set (h,
299                            "# peak heap size",
300                            current_heap_size,
301                            GNUNET_NO);
302   }
303   if (current_rss > h->peak_rss)
304   {
305     h->peak_rss = current_rss;
306     GNUNET_STATISTICS_set (h,
307                            "# peak resident set size",
308                            current_rss,
309                            GNUNET_NO);
310   }
311 #endif
312 }
313
314
315 /**
316  * Reconnect at a later time, respecting back-off.
317  *
318  * @param h statistics handle
319  */
320 static void
321 reconnect_later (struct GNUNET_STATISTICS_Handle *h);
322
323
324 /**
325  * Schedule the next action to be performed.
326  *
327  * @param cls statistics handle to reconnect
328  */
329 static void
330 schedule_action (void *cls);
331
332
333 /**
334  * Transmit request to service that we want to watch
335  * the development of a particular value.
336  *
337  * @param h statistics handle
338  * @param watch watch entry of the value to watch
339  */
340 static void
341 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
342                         struct GNUNET_STATISTICS_WatchEntry *watch)
343 {
344   struct GNUNET_STATISTICS_GetHandle *ai;
345   size_t slen;
346   size_t nlen;
347   size_t nsize;
348
349   slen = strlen (watch->subsystem) + 1;
350   nlen = strlen (watch->name) + 1;
351   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
352   if (nsize >= GNUNET_MAX_MESSAGE_SIZE)
353   {
354     GNUNET_break (0);
355     return;
356   }
357   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
358   ai->sh = h;
359   ai->subsystem = GNUNET_strdup (watch->subsystem);
360   ai->name = GNUNET_strdup (watch->name);
361   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
362   ai->msize = nsize;
363   ai->type = ACTION_WATCH;
364   ai->proc = watch->proc;
365   ai->cls = watch->proc_cls;
366   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
367                                     h->action_tail,
368                                     ai);
369   schedule_action (h);
370 }
371
372
373 /**
374  * Free memory associated with the given action item.
375  *
376  * @param gh action item to free
377  */
378 static void
379 free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
380 {
381   GNUNET_free_non_null (gh->subsystem);
382   GNUNET_free_non_null (gh->name);
383   GNUNET_free (gh);
384 }
385
386
387 /**
388  * Disconnect from the statistics service.
389  *
390  * @param h statistics handle to disconnect from
391  */
392 static void
393 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
394 {
395   struct GNUNET_STATISTICS_GetHandle *c;
396
397   h->receiving = GNUNET_NO;
398   if (NULL != (c = h->current))
399   {
400     h->current = NULL;
401     if ( (NULL != c->cont) &&
402          (GNUNET_YES != c->aborted) )
403     {
404       c->cont (c->cls,
405                GNUNET_SYSERR);
406       c->cont = NULL;
407     }
408     free_action_item (c);
409   }
410   if (NULL != h->mq)
411   {
412     GNUNET_MQ_destroy (h->mq);
413     h->mq = NULL;
414   }
415 }
416
417
418 /**
419  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
420  *
421  * @param cls statistics handle
422  * @param smsg message received from the service, never NULL
423  * @return #GNUNET_OK if the message was well-formed
424  */
425 static int
426 check_statistics_value (void *cls,
427                         const struct GNUNET_STATISTICS_ReplyMessage *smsg)
428 {
429   const char *service;
430   const char *name;
431   uint16_t size;
432
433   size = ntohs (smsg->header.size);
434   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
435   if (size !=
436       GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
437                                       size,
438                                       2,
439                                       &service,
440                                       &name))
441   {
442     GNUNET_break (0);
443     return GNUNET_SYSERR;
444   }
445   return GNUNET_OK;
446 }
447
448
449 /**
450  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
451  *
452  * @param cls statistics handle
453  * @param msg message received from the service, never NULL
454  * @return #GNUNET_OK if the message was well-formed
455  */
456 static void
457 handle_statistics_value (void *cls,
458                          const struct GNUNET_STATISTICS_ReplyMessage *smsg)
459 {
460   struct GNUNET_STATISTICS_Handle *h = cls;
461   const char *service;
462   const char *name;
463   uint16_t size;
464
465   if (h->current->aborted)
466     return;           /* iteration aborted, don't bother */
467
468   size = ntohs (smsg->header.size);
469   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
470   GNUNET_assert (size ==
471                  GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
472                                                  size,
473                                                  2,
474                                                  &service,
475                                                  &name));
476   LOG (GNUNET_ERROR_TYPE_DEBUG,
477        "Received valid statistic on `%s:%s': %llu\n",
478        service, name,
479        GNUNET_ntohll (smsg->value));
480   if (GNUNET_OK !=
481       h->current->proc (h->current->cls,
482                         service,
483                         name,
484                         GNUNET_ntohll (smsg->value),
485                         0 !=
486                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
487   {
488     LOG (GNUNET_ERROR_TYPE_DEBUG,
489          "Processing of remaining statistics aborted by client.\n");
490     h->current->aborted = GNUNET_YES;
491   }
492 }
493
494
495 /**
496  * We have received a watch value from the service.  Process it.
497  *
498  * @param cls statistics handle
499  * @param msg the watch value message
500  */
501 static void
502 handle_statistics_watch_value (void *cls,
503                                const struct GNUNET_STATISTICS_WatchValueMessage *wvm)
504 {
505   struct GNUNET_STATISTICS_Handle *h = cls;
506   struct GNUNET_STATISTICS_WatchEntry *w;
507   uint32_t wid;
508
509   GNUNET_break (0 == ntohl (wvm->reserved));
510   wid = ntohl (wvm->wid);
511   if (wid >= h->watches_size)
512   {
513     do_disconnect (h);
514     reconnect_later (h);
515     return;
516   }
517   w = h->watches[wid];
518   if (NULL == w)
519     return;
520   (void) w->proc (w->proc_cls,
521                   w->subsystem,
522                   w->name,
523                   GNUNET_ntohll (wvm->value),
524                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
525 }
526
527
528 /**
529  * Generic error handler, called with the appropriate error code and
530  * the same closure specified at the creation of the message queue.
531  * Not every message queue implementation supports an error handler.
532  *
533  * @param cls closure with the `struct GNUNET_STATISTICS_Handle *`
534  * @param error error code
535  */
536 static void
537 mq_error_handler (void *cls,
538                   enum GNUNET_MQ_Error error)
539 {
540   struct GNUNET_STATISTICS_Handle *h = cls;
541
542   if (GNUNET_NO != h->do_destroy)
543   {
544     h->do_destroy = GNUNET_NO;
545     if (NULL != h->destroy_task)
546     {
547       GNUNET_SCHEDULER_cancel (h->destroy_task);
548       h->destroy_task = NULL;
549     }
550     GNUNET_STATISTICS_destroy (h,
551                                GNUNET_NO);
552     return;
553   }
554   do_disconnect (h);
555   reconnect_later (h);
556 }
557
558
559 /**
560  * Task used to destroy the statistics handle.
561  *
562  * @param cls the `struct GNUNET_STATISTICS_Handle`
563  */
564 static void
565 do_destroy (void *cls)
566 {
567   struct GNUNET_STATISTICS_Handle *h = cls;
568
569   h->destroy_task = NULL;
570   h->do_destroy = GNUNET_NO;
571   LOG (GNUNET_ERROR_TYPE_DEBUG,
572        "Running final destruction\n");
573   GNUNET_STATISTICS_destroy (h,
574                              GNUNET_NO);
575 }
576
577
578 /**
579  * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM
580  * message. We receive this message at the end of the shutdown when
581  * the service confirms that all data has been written to disk.
582  *
583  * @param cls our `struct GNUNET_STATISTICS_Handle *`
584  * @param msg the message
585  */
586 static void
587 handle_disconnect_confirm (void *cls,
588                            const struct GNUNET_MessageHeader *msg)
589 {
590   struct GNUNET_STATISTICS_Handle *h = cls;
591
592   if (GNUNET_SYSERR != h->do_destroy)
593   {
594     /* not in shutdown, why do we get 'TEST'? */
595     GNUNET_break (0);
596     do_disconnect (h);
597     reconnect_later (h);
598     return;
599   }
600   LOG (GNUNET_ERROR_TYPE_DEBUG,
601        "Received DISCONNNECT_CONFIRM message from statistics, can complete disconnect\n");
602   if (NULL != h->destroy_task)
603     GNUNET_SCHEDULER_cancel (h->destroy_task);
604   h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
605                                               h);
606 }
607
608
609 /**
610  * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive
611  * this message in response to a query to indicate that there are no
612  * further matching results.
613  *
614  * @param cls our `struct GNUNET_STATISTICS_Handle *`
615  * @param msg the message
616  */
617 static void
618 handle_statistics_end (void *cls,
619                        const struct GNUNET_MessageHeader *msg)
620 {
621   struct GNUNET_STATISTICS_Handle *h = cls;
622   struct GNUNET_STATISTICS_GetHandle *c;
623
624   LOG (GNUNET_ERROR_TYPE_DEBUG,
625        "Received end of statistics marker\n");
626   if (NULL == (c = h->current))
627   {
628     GNUNET_break (0);
629     do_disconnect (h);
630     reconnect_later (h);
631     return;
632   }
633   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
634   h->current = NULL;
635   schedule_action (h);
636   if (NULL != c->cont)
637   {
638     c->cont (c->cls,
639              GNUNET_OK);
640     c->cont = NULL;
641   }
642   free_action_item (c);
643 }
644
645
646 /**
647  * Try to (re)connect to the statistics service.
648  *
649  * @param h statistics handle to reconnect
650  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
651  */
652 static int
653 try_connect (struct GNUNET_STATISTICS_Handle *h)
654 {
655   struct GNUNET_MQ_MessageHandler handlers[] = {
656     GNUNET_MQ_hd_fixed_size (disconnect_confirm,
657                              GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM,
658                              struct GNUNET_MessageHeader,
659                              h),
660     GNUNET_MQ_hd_fixed_size (statistics_end,
661                              GNUNET_MESSAGE_TYPE_STATISTICS_END,
662                              struct GNUNET_MessageHeader,
663                              h),
664     GNUNET_MQ_hd_var_size (statistics_value,
665                            GNUNET_MESSAGE_TYPE_STATISTICS_VALUE,
666                            struct GNUNET_STATISTICS_ReplyMessage,
667                            h),
668     GNUNET_MQ_hd_fixed_size (statistics_watch_value,
669                              GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE,
670                              struct GNUNET_STATISTICS_WatchValueMessage,
671                              h),
672     GNUNET_MQ_handler_end ()
673   };
674   struct GNUNET_STATISTICS_GetHandle *gh;
675   struct GNUNET_STATISTICS_GetHandle *gn;
676
677   if (NULL != h->backoff_task)
678     return GNUNET_NO;
679   if (NULL != h->mq)
680     return GNUNET_YES;
681   h->mq = GNUNET_CLIENT_connect (h->cfg,
682                                  "statistics",
683                                  handlers,
684                                  &mq_error_handler,
685                                  h);
686   if (NULL == h->mq)
687   {
688     LOG (GNUNET_ERROR_TYPE_DEBUG,
689          "Failed to connect to statistics service!\n");
690     return GNUNET_NO;
691   }
692   gn = h->action_head;
693   while (NULL != (gh = gn))
694   {
695     gn = gh->next;
696     if (gh->type == ACTION_WATCH)
697     {
698       GNUNET_CONTAINER_DLL_remove (h->action_head,
699                                    h->action_tail,
700                                    gh);
701       free_action_item (gh);
702     }
703   }
704   for (unsigned int i = 0; i < h->watches_size; i++)
705     if (NULL != h->watches[i])
706       schedule_watch_request (h,
707                               h->watches[i]);
708   return GNUNET_YES;
709 }
710
711
712 /**
713  * We've waited long enough, reconnect now.
714  *
715  * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect
716  */
717 static void
718 reconnect_task (void *cls)
719 {
720   struct GNUNET_STATISTICS_Handle *h = cls;
721
722   h->backoff_task = NULL;
723   schedule_action (h);
724 }
725
726
727 /**
728  * Reconnect at a later time, respecting back-off.
729  *
730  * @param h statistics handle
731  */
732 static void
733 reconnect_later (struct GNUNET_STATISTICS_Handle *h)
734 {
735   int loss;
736   struct GNUNET_STATISTICS_GetHandle *gh;
737
738   GNUNET_assert (NULL == h->backoff_task);
739   if (GNUNET_YES == h->do_destroy)
740   {
741     /* So we are shutting down and the service is not reachable.
742      * Chances are that it's down for good and we are not going to connect to
743      * it anymore.
744      * Give up and don't sync the rest of the data.
745      */
746     loss = GNUNET_NO;
747     for (gh = h->action_head; NULL != gh; gh = gh->next)
748       if ( (gh->make_persistent) &&
749            (ACTION_SET == gh->type) )
750         loss = GNUNET_YES;
751     if (GNUNET_YES == loss)
752       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
753                   _("Could not save some persistent statistics\n"));
754     if (NULL != h->destroy_task)
755       GNUNET_SCHEDULER_cancel (h->destroy_task);
756     h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
757                                                 h);
758     return;
759   }
760   h->backoff_task
761     = GNUNET_SCHEDULER_add_delayed (h->backoff,
762                                     &reconnect_task,
763                                     h);
764   h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
765 }
766
767
768
769 /**
770  * Transmit a GET request (and if successful, start to receive
771  * the response).
772  *
773  * @param handle statistics handle
774  */
775 static void
776 transmit_get (struct GNUNET_STATISTICS_Handle *handle)
777 {
778   struct GNUNET_STATISTICS_GetHandle *c;
779   struct GNUNET_MessageHeader *hdr;
780   struct GNUNET_MQ_Envelope *env;
781   size_t slen1;
782   size_t slen2;
783
784   GNUNET_assert (NULL != (c = handle->current));
785   slen1 = strlen (c->subsystem) + 1;
786   slen2 = strlen (c->name) + 1;
787   env = GNUNET_MQ_msg_extra (hdr,
788                              slen1 + slen2,
789                              GNUNET_MESSAGE_TYPE_STATISTICS_GET);
790   GNUNET_assert (slen1 + slen2 ==
791                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
792                                              slen1 + slen2,
793                                              2,
794                                              c->subsystem,
795                                              c->name));
796   GNUNET_MQ_notify_sent (env,
797                          &schedule_action,
798                          handle);
799   GNUNET_MQ_send (handle->mq,
800                   env);
801 }
802
803
804 /**
805  * Transmit a WATCH request (and if successful, start to receive
806  * the response).
807  *
808  * @param handle statistics handle
809  */
810 static void
811 transmit_watch (struct GNUNET_STATISTICS_Handle *handle)
812 {
813   struct GNUNET_MessageHeader *hdr;
814   struct GNUNET_MQ_Envelope *env;
815   size_t slen1;
816   size_t slen2;
817
818   LOG (GNUNET_ERROR_TYPE_DEBUG,
819        "Transmitting watch request for `%s'\n",
820        handle->current->name);
821   slen1 = strlen (handle->current->subsystem) + 1;
822   slen2 = strlen (handle->current->name) + 1;
823   env = GNUNET_MQ_msg_extra (hdr,
824                              slen1 + slen2,
825                              GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
826   GNUNET_assert (slen1 + slen2 ==
827                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
828                                              slen1 + slen2,
829                                              2,
830                                              handle->current->subsystem,
831                                              handle->current->name));
832   GNUNET_MQ_notify_sent (env,
833                          &schedule_action,
834                          handle);
835   GNUNET_MQ_send (handle->mq,
836                   env);
837   GNUNET_assert (NULL == handle->current->cont);
838   free_action_item (handle->current);
839   handle->current = NULL;
840   schedule_action (handle);
841 }
842
843
844 /**
845  * Transmit a SET/UPDATE request.
846  *
847  * @param handle statistics handle
848  */
849 static void
850 transmit_set (struct GNUNET_STATISTICS_Handle *handle)
851 {
852   struct GNUNET_STATISTICS_SetMessage *r;
853   struct GNUNET_MQ_Envelope *env;
854   size_t slen;
855   size_t nlen;
856
857   slen = strlen (handle->current->subsystem) + 1;
858   nlen = strlen (handle->current->name) + 1;
859   env = GNUNET_MQ_msg_extra (r,
860                              slen + nlen,
861                              GNUNET_MESSAGE_TYPE_STATISTICS_SET);
862   r->flags = 0;
863   r->value = GNUNET_htonll (handle->current->value);
864   if (handle->current->make_persistent)
865     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
866   if (handle->current->type == ACTION_UPDATE)
867     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
868   GNUNET_assert (slen + nlen ==
869                  GNUNET_STRINGS_buffer_fill ((char *) &r[1],
870                                              slen + nlen,
871                                              2,
872                                              handle->current->subsystem,
873                                              handle->current->name));
874   GNUNET_assert (NULL == handle->current->cont);
875   free_action_item (handle->current);
876   handle->current = NULL;
877   update_memory_statistics (handle);
878   GNUNET_MQ_notify_sent (env,
879                          &schedule_action,
880                          handle);
881   GNUNET_MQ_send (handle->mq,
882                   env);
883 }
884
885
886 /**
887  * Get handle for the statistics service.
888  *
889  * @param subsystem name of subsystem using the service
890  * @param cfg services configuration in use
891  * @return handle to use
892  */
893 struct GNUNET_STATISTICS_Handle *
894 GNUNET_STATISTICS_create (const char *subsystem,
895                           const struct GNUNET_CONFIGURATION_Handle *cfg)
896 {
897   struct GNUNET_STATISTICS_Handle *h;
898
899   if (GNUNET_YES ==
900       GNUNET_CONFIGURATION_get_value_yesno (cfg,
901                                             "statistics",
902                                             "DISABLE"))
903     return NULL;
904   h = GNUNET_new (struct GNUNET_STATISTICS_Handle);
905   h->cfg = cfg;
906   h->subsystem = GNUNET_strdup (subsystem);
907   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
908   return h;
909 }
910
911
912 /**
913  * Destroy a handle (free all state associated with
914  * it).
915  *
916  * @param h statistics handle to destroy
917  * @param sync_first set to #GNUNET_YES if pending SET requests should
918  *        be completed
919  */
920 void
921 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
922                            int sync_first)
923 {
924   struct GNUNET_STATISTICS_GetHandle *pos;
925   struct GNUNET_STATISTICS_GetHandle *next;
926
927   if (NULL == h)
928     return;
929   GNUNET_assert (GNUNET_NO == h->do_destroy); /* Don't call twice. */
930   if ( (sync_first) &&
931        (NULL != h->mq) &&
932        (0 != GNUNET_MQ_get_length (h->mq)) )
933   {
934     if ( (NULL != h->current) &&
935          (ACTION_GET == h->current->type) )
936       h->current->aborted = GNUNET_YES;
937     next = h->action_head;
938     while (NULL != (pos = next))
939     {
940       next = pos->next;
941       if ( (ACTION_GET == pos->type) ||
942            (ACTION_WATCH == pos->type) )
943       {
944         GNUNET_CONTAINER_DLL_remove (h->action_head,
945                                      h->action_tail,
946                                      pos);
947         free_action_item (pos);
948       }
949     }
950     h->do_destroy = GNUNET_YES;
951     schedule_action (h);
952     GNUNET_assert (NULL == h->destroy_task);
953     h->destroy_task
954       = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (h->backoff,
955                                                                      5),
956                                       &do_destroy,
957                                       h);
958     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
959                 "Deferring destruction\n");
960     return; /* do not finish destruction just yet */
961   }
962   /* do clean up all */
963   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
964               "Cleaning all up\n");
965   while (NULL != (pos = h->action_head))
966   {
967     GNUNET_CONTAINER_DLL_remove (h->action_head,
968                                  h->action_tail,
969                                  pos);
970     free_action_item (pos);
971   }
972   do_disconnect (h);
973   if (NULL != h->backoff_task)
974   {
975     GNUNET_SCHEDULER_cancel (h->backoff_task);
976     h->backoff_task = NULL;
977   }
978   if (NULL != h->destroy_task)
979   {
980     GNUNET_break (0);
981     GNUNET_SCHEDULER_cancel (h->destroy_task);
982     h->destroy_task = NULL;
983   }
984   for (unsigned int i = 0; i < h->watches_size; i++)
985   {
986     if (NULL == h->watches[i])
987       continue;
988     GNUNET_free (h->watches[i]->subsystem);
989     GNUNET_free (h->watches[i]->name);
990     GNUNET_free (h->watches[i]);
991   }
992   GNUNET_array_grow (h->watches,
993                      h->watches_size,
994                      0);
995   GNUNET_free (h->subsystem);
996   GNUNET_free (h);
997 }
998
999
1000 /**
1001  * Schedule the next action to be performed.
1002  *
1003  * @param cls statistics handle
1004  */
1005 static void
1006 schedule_action (void *cls)
1007 {
1008   struct GNUNET_STATISTICS_Handle *h = cls;
1009
1010   if (NULL != h->backoff_task)
1011     return;                     /* action already pending */
1012   if (GNUNET_YES != try_connect (h))
1013   {
1014     reconnect_later (h);
1015     return;
1016   }
1017   if (0 < GNUNET_MQ_get_length (h->mq))
1018     return; /* Wait for queue to be reduced more */    
1019   /* schedule next action */
1020   while (NULL == h->current)
1021   {
1022     h->current = h->action_head;
1023     if (NULL == h->current)
1024     {
1025       struct GNUNET_MessageHeader *hdr;
1026       struct GNUNET_MQ_Envelope *env;
1027
1028       if (GNUNET_YES != h->do_destroy)
1029         return; /* nothing to do */
1030       /* let service know that we're done */
1031       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1032                   "Notifying service that we are done\n");
1033       h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
1034       env = GNUNET_MQ_msg (hdr,
1035                            GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT);
1036       GNUNET_MQ_notify_sent (env,
1037                              &schedule_action,
1038                              h);
1039       GNUNET_MQ_send (h->mq,
1040                       env);
1041       return;
1042     }
1043     GNUNET_CONTAINER_DLL_remove (h->action_head,
1044                                  h->action_tail,
1045                                  h->current);
1046     switch (h->current->type)
1047     {
1048     case ACTION_GET:
1049       transmit_get (h);
1050       break;
1051     case ACTION_SET:
1052     case ACTION_UPDATE:
1053       transmit_set (h);
1054       break;
1055     case ACTION_WATCH:
1056       transmit_watch (h);
1057       break;
1058     default:
1059       GNUNET_assert (0);
1060       break;
1061     }
1062   }
1063 }
1064
1065
1066 /**
1067  * Get statistic from the peer.
1068  *
1069  * @param handle identification of the statistics service
1070  * @param subsystem limit to the specified subsystem, NULL for our subsystem
1071  * @param name name of the statistic value, NULL for all values
1072  * @param cont continuation to call when done (can be NULL)
1073  *        This callback CANNOT destroy the statistics handle in the same call.
1074  * @param proc function to call on each value
1075  * @param cls closure for @a cont and @a proc
1076  * @return NULL on error
1077  */
1078 struct GNUNET_STATISTICS_GetHandle *
1079 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1080                        const char *subsystem,
1081                        const char *name,
1082                        GNUNET_STATISTICS_Callback cont,
1083                        GNUNET_STATISTICS_Iterator proc,
1084                        void *cls)
1085 {
1086   size_t slen1;
1087   size_t slen2;
1088   struct GNUNET_STATISTICS_GetHandle *ai;
1089
1090   if (NULL == handle)
1091     return NULL;
1092   GNUNET_assert (NULL != proc);
1093   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1094   if (NULL == subsystem)
1095     subsystem = "";
1096   if (NULL == name)
1097     name = "";
1098   slen1 = strlen (subsystem) + 1;
1099   slen2 = strlen (name) + 1;
1100   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
1101                  GNUNET_MAX_MESSAGE_SIZE);
1102   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1103   ai->sh = handle;
1104   ai->subsystem = GNUNET_strdup (subsystem);
1105   ai->name = GNUNET_strdup (name);
1106   ai->cont = cont;
1107   ai->proc = proc;
1108   ai->cls = cls;
1109   ai->type = ACTION_GET;
1110   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
1111   GNUNET_CONTAINER_DLL_insert_tail (handle->action_head,
1112                                     handle->action_tail,
1113                                     ai);
1114   schedule_action (handle);
1115   return ai;
1116 }
1117
1118
1119 /**
1120  * Cancel a 'get' request.  Must be called before the 'cont'
1121  * function is called.
1122  *
1123  * @param gh handle of the request to cancel
1124  */
1125 void
1126 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1127 {
1128   if (NULL == gh)
1129     return;
1130   gh->cont = NULL;
1131   if (gh->sh->current == gh)
1132   {
1133     gh->aborted = GNUNET_YES;
1134     return;
1135   }
1136   GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
1137                                gh->sh->action_tail,
1138                                gh);
1139   GNUNET_free (gh->name);
1140   GNUNET_free (gh->subsystem);
1141   GNUNET_free (gh);
1142 }
1143
1144
1145 /**
1146  * Watch statistics from the peer (be notified whenever they change).
1147  *
1148  * @param handle identification of the statistics service
1149  * @param subsystem limit to the specified subsystem, never NULL
1150  * @param name name of the statistic value, never NULL
1151  * @param proc function to call on each value
1152  * @param proc_cls closure for @a proc
1153  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
1154  */
1155 int
1156 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1157                          const char *subsystem,
1158                          const char *name,
1159                          GNUNET_STATISTICS_Iterator proc,
1160                          void *proc_cls)
1161 {
1162   struct GNUNET_STATISTICS_WatchEntry *w;
1163
1164   if (NULL == handle)
1165     return GNUNET_SYSERR;
1166   w = GNUNET_new (struct GNUNET_STATISTICS_WatchEntry);
1167   w->subsystem = GNUNET_strdup (subsystem);
1168   w->name = GNUNET_strdup (name);
1169   w->proc = proc;
1170   w->proc_cls = proc_cls;
1171   GNUNET_array_append (handle->watches,
1172                        handle->watches_size,
1173                        w);
1174   schedule_watch_request (handle,
1175                           w);
1176   return GNUNET_OK;
1177 }
1178
1179
1180 /**
1181  * Stop watching statistics from the peer.
1182  *
1183  * @param handle identification of the statistics service
1184  * @param subsystem limit to the specified subsystem, never NULL
1185  * @param name name of the statistic value, never NULL
1186  * @param proc function to call on each value
1187  * @param proc_cls closure for @a proc
1188  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error (no such watch)
1189  */
1190 int
1191 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
1192                                 const char *subsystem,
1193                                 const char *name,
1194                                 GNUNET_STATISTICS_Iterator proc,
1195                                 void *proc_cls)
1196 {
1197   struct GNUNET_STATISTICS_WatchEntry *w;
1198
1199   if (NULL == handle)
1200     return GNUNET_SYSERR;
1201   for (unsigned int i=0;i<handle->watches_size;i++)
1202   {
1203     w = handle->watches[i];
1204     if (NULL == w)
1205       continue;
1206     if ( (w->proc == proc) &&
1207          (w->proc_cls == proc_cls) &&
1208          (0 == strcmp (w->name,
1209                        name)) &&
1210          (0 == strcmp (w->subsystem,
1211                        subsystem)) )
1212     {
1213       GNUNET_free (w->name);
1214       GNUNET_free (w->subsystem);
1215       GNUNET_free (w);
1216       handle->watches[i] = NULL;
1217       return GNUNET_OK;
1218     }
1219   }
1220   return GNUNET_SYSERR;
1221 }
1222
1223
1224 /**
1225  * Queue a request to change a statistic.
1226  *
1227  * @param h statistics handle
1228  * @param name name of the value
1229  * @param make_persistent  should the value be kept across restarts?
1230  * @param value new value or change
1231  * @param type type of the action (#ACTION_SET or #ACTION_UPDATE)
1232  */
1233 static void
1234 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
1235                    const char *name,
1236                    int make_persistent,
1237                    uint64_t value,
1238                    enum ActionType type)
1239 {
1240   struct GNUNET_STATISTICS_GetHandle *ai;
1241   size_t slen;
1242   size_t nlen;
1243   size_t nsize;
1244   int64_t delta;
1245
1246   slen = strlen (h->subsystem) + 1;
1247   nlen = strlen (name) + 1;
1248   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1249   if (nsize >= GNUNET_MAX_MESSAGE_SIZE)
1250   {
1251     GNUNET_break (0);
1252     return;
1253   }
1254   for (ai = h->action_head; NULL != ai; ai = ai->next)
1255   {
1256     if (! ( (0 == strcmp (ai->subsystem,
1257                           h->subsystem)) &&
1258             (0 == strcmp (ai->name,
1259                           name)) &&
1260             ( (ACTION_UPDATE == ai->type) ||
1261               (ACTION_SET == ai->type) ) ) )
1262       continue;
1263     if (ACTION_SET == ai->type)
1264     {
1265       if (ACTION_UPDATE == type)
1266       {
1267         delta = (int64_t) value;
1268         if (delta > 0)
1269         {
1270           /* update old set by new delta */
1271           ai->value += delta;
1272         }
1273         else
1274         {
1275           /* update old set by new delta, but never go negative */
1276           if (ai->value < -delta)
1277             ai->value = 0;
1278           else
1279             ai->value += delta;
1280         }
1281       }
1282       else
1283       {
1284         /* new set overrides old set */
1285         ai->value = value;
1286       }
1287     }
1288     else
1289     {
1290       if (ACTION_UPDATE == type)
1291       {
1292         /* make delta cummulative */
1293         delta = (int64_t) value;
1294         ai->value += delta;
1295       }
1296       else
1297       {
1298         /* drop old 'update', use new 'set' instead */
1299         ai->value = value;
1300         ai->type = type;
1301       }
1302     }
1303     ai->timeout
1304       = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1305     ai->make_persistent
1306       = make_persistent;
1307     return;
1308   }
1309   /* no existing entry matches, create a fresh one */
1310   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1311   ai->sh = h;
1312   ai->subsystem = GNUNET_strdup (h->subsystem);
1313   ai->name = GNUNET_strdup (name);
1314   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1315   ai->make_persistent = make_persistent;
1316   ai->msize = nsize;
1317   ai->value = value;
1318   ai->type = type;
1319   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
1320                                     h->action_tail,
1321                                     ai);
1322   schedule_action (h);
1323 }
1324
1325
1326 /**
1327  * Set statistic value for the peer.  Will always use our
1328  * subsystem (the argument used when "handle" was created).
1329  *
1330  * @param handle identification of the statistics service
1331  * @param name name of the statistic value
1332  * @param value new value to set
1333  * @param make_persistent should the value be kept across restarts?
1334  */
1335 void
1336 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1337                        const char *name,
1338                        uint64_t value,
1339                        int make_persistent)
1340 {
1341   if (NULL == handle)
1342     return;
1343   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1344   add_setter_action (handle,
1345                      name,
1346                      make_persistent,
1347                      value,
1348                      ACTION_SET);
1349 }
1350
1351
1352 /**
1353  * Set statistic value for the peer.  Will always use our
1354  * subsystem (the argument used when "handle" was created).
1355  *
1356  * @param handle identification of the statistics service
1357  * @param name name of the statistic value
1358  * @param delta change in value (added to existing value)
1359  * @param make_persistent should the value be kept across restarts?
1360  */
1361 void
1362 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1363                           const char *name,
1364                           int64_t delta,
1365                           int make_persistent)
1366 {
1367   if (NULL == handle)
1368     return;
1369   if (0 == delta)
1370     return;
1371   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1372   add_setter_action (handle,
1373                      name,
1374                      make_persistent,
1375                      (uint64_t) delta,
1376                      ACTION_UPDATE);
1377 }
1378
1379
1380 /* end of statistics_api.c */