error handling
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "statistics.h"
32
33 /**
34  * How long do we wait until a statistics request for setting
35  * a value times out?  (The update will be lost if the
36  * service does not react within this timeframe).
37  */
38 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply ( \
39     GNUNET_TIME_UNIT_SECONDS, 2)
40
41 #define LOG(kind, ...) GNUNET_log_from (kind, "statistics-api", __VA_ARGS__)
42
43 /**
44  * Types of actions.
45  */
46 enum ActionType
47 {
48   /**
49    * Get a value.
50    */
51   ACTION_GET,
52
53   /**
54    * Set a value.
55    */
56   ACTION_SET,
57
58   /**
59    * Update a value.
60    */
61   ACTION_UPDATE,
62
63   /**
64    * Watch a value.
65    */
66   ACTION_WATCH
67 };
68
69
70 /**
71  * Entry kept for each value we are watching.
72  */
73 struct GNUNET_STATISTICS_WatchEntry
74 {
75   /**
76    * What subsystem is this action about? (never NULL)
77    */
78   char *subsystem;
79
80   /**
81    * What value is this action about? (never NULL)
82    */
83   char *name;
84
85   /**
86    * Function to call
87    */
88   GNUNET_STATISTICS_Iterator proc;
89
90   /**
91    * Closure for @e proc
92    */
93   void *proc_cls;
94 };
95
96
97 /**
98  * Linked list of things we still need to do.
99  */
100 struct GNUNET_STATISTICS_GetHandle
101 {
102   /**
103    * This is a doubly linked list.
104    */
105   struct GNUNET_STATISTICS_GetHandle *next;
106
107   /**
108    * This is a doubly linked list.
109    */
110   struct GNUNET_STATISTICS_GetHandle *prev;
111
112   /**
113    * Main statistics handle.
114    */
115   struct GNUNET_STATISTICS_Handle *sh;
116
117   /**
118    * What subsystem is this action about? (can be NULL)
119    */
120   char *subsystem;
121
122   /**
123    * What value is this action about? (can be NULL)
124    */
125   char *name;
126
127   /**
128    * Continuation to call once action is complete.
129    */
130   GNUNET_STATISTICS_Callback cont;
131
132   /**
133    * Function to call (for GET actions only).
134    */
135   GNUNET_STATISTICS_Iterator proc;
136
137   /**
138    * Closure for @e proc and @e cont.
139    */
140   void *cls;
141
142   /**
143    * Timeout for this action.
144    */
145   struct GNUNET_TIME_Absolute timeout;
146
147   /**
148    * Associated value.
149    */
150   uint64_t value;
151
152   /**
153    * Flag for SET/UPDATE actions.
154    */
155   int make_persistent;
156
157   /**
158    * Has the current iteration been aborted; for GET actions.
159    */
160   int aborted;
161
162   /**
163    * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH?
164    */
165   enum ActionType type;
166
167   /**
168    * Size of the message that we will be transmitting.
169    */
170   uint16_t msize;
171 };
172
173
174 /**
175  * Handle for the service.
176  */
177 struct GNUNET_STATISTICS_Handle
178 {
179   /**
180    * Name of our subsystem.
181    */
182   char *subsystem;
183
184   /**
185    * Configuration to use.
186    */
187   const struct GNUNET_CONFIGURATION_Handle *cfg;
188
189   /**
190    * Message queue to the service.
191    */
192   struct GNUNET_MQ_Handle *mq;
193
194   /**
195    * Head of the linked list of pending actions (first action
196    * to be performed).
197    */
198   struct GNUNET_STATISTICS_GetHandle *action_head;
199
200   /**
201    * Tail of the linked list of actions (for fast append).
202    */
203   struct GNUNET_STATISTICS_GetHandle *action_tail;
204
205   /**
206    * Action we are currently busy with (action request has been
207    * transmitted, we're now receiving the response from the
208    * service).
209    */
210   struct GNUNET_STATISTICS_GetHandle *current;
211
212   /**
213    * Array of watch entries.
214    */
215   struct GNUNET_STATISTICS_WatchEntry **watches;
216
217   /**
218    * Task doing exponential back-off trying to reconnect.
219    */
220   struct GNUNET_SCHEDULER_Task *backoff_task;
221
222   /**
223    * Task for running #do_destroy().
224    */
225   struct GNUNET_SCHEDULER_Task *destroy_task;
226
227   /**
228    * Time for next connect retry.
229    */
230   struct GNUNET_TIME_Relative backoff;
231
232   /**
233    * Maximum heap size observed so far (if available).
234    */
235   uint64_t peak_heap_size;
236
237   /**
238    * Maximum resident set side observed so far (if available).
239    */
240   uint64_t peak_rss;
241
242   /**
243    * Size of the @e watches array.
244    */
245   unsigned int watches_size;
246
247   /**
248    * Should this handle auto-destruct once all actions have
249    * been processed?
250    */
251   int do_destroy;
252
253   /**
254    * Are we currently receiving from the service?
255    */
256   int receiving;
257 };
258
259
260 /**
261  * Obtain statistics about this process's memory consumption and
262  * report those as well (if they changed).
263  */
264 static void
265 update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
266 {
267 #if ENABLE_HEAP_STATISTICS
268   uint64_t current_heap_size = 0;
269   uint64_t current_rss = 0;
270
271   if (GNUNET_NO != h->do_destroy)
272     return;
273 #if HAVE_MALLINFO
274   {
275     struct mallinfo mi;
276
277     mi = mallinfo ();
278     current_heap_size = mi.uordblks + mi.fordblks;
279   }
280 #endif
281 #if HAVE_GETRUSAGE
282   {
283     struct rusage ru;
284
285     if (0 == getrusage (RUSAGE_SELF, &ru))
286     {
287       current_rss = 1024LL * ru.ru_maxrss;
288     }
289   }
290 #endif
291   if (current_heap_size > h->peak_heap_size)
292   {
293     h->peak_heap_size = current_heap_size;
294     GNUNET_STATISTICS_set (h,
295                            "# peak heap size",
296                            current_heap_size,
297                            GNUNET_NO);
298   }
299   if (current_rss > h->peak_rss)
300   {
301     h->peak_rss = current_rss;
302     GNUNET_STATISTICS_set (h,
303                            "# peak resident set size",
304                            current_rss,
305                            GNUNET_NO);
306   }
307 #endif
308 }
309
310
311 /**
312  * Reconnect at a later time, respecting back-off.
313  *
314  * @param h statistics handle
315  */
316 static void
317 reconnect_later (struct GNUNET_STATISTICS_Handle *h);
318
319
320 /**
321  * Schedule the next action to be performed.
322  *
323  * @param cls statistics handle to reconnect
324  */
325 static void
326 schedule_action (void *cls);
327
328
329 /**
330  * Transmit request to service that we want to watch
331  * the development of a particular value.
332  *
333  * @param h statistics handle
334  * @param watch watch entry of the value to watch
335  */
336 static void
337 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
338                         struct GNUNET_STATISTICS_WatchEntry *watch)
339 {
340   struct GNUNET_STATISTICS_GetHandle *ai;
341   size_t slen;
342   size_t nlen;
343   size_t nsize;
344
345   slen = strlen (watch->subsystem) + 1;
346   nlen = strlen (watch->name) + 1;
347   nsize = sizeof(struct GNUNET_MessageHeader) + slen + nlen;
348   if (nsize >= GNUNET_MAX_MESSAGE_SIZE)
349   {
350     GNUNET_break (0);
351     return;
352   }
353   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
354   ai->sh = h;
355   ai->subsystem = GNUNET_strdup (watch->subsystem);
356   ai->name = GNUNET_strdup (watch->name);
357   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
358   ai->msize = nsize;
359   ai->type = ACTION_WATCH;
360   ai->proc = watch->proc;
361   ai->cls = watch->proc_cls;
362   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
363                                     h->action_tail,
364                                     ai);
365   schedule_action (h);
366 }
367
368
369 /**
370  * Free memory associated with the given action item.
371  *
372  * @param gh action item to free
373  */
374 static void
375 free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
376 {
377   GNUNET_free_non_null (gh->subsystem);
378   GNUNET_free_non_null (gh->name);
379   GNUNET_free (gh);
380 }
381
382
383 /**
384  * Disconnect from the statistics service.
385  *
386  * @param h statistics handle to disconnect from
387  */
388 static void
389 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
390 {
391   struct GNUNET_STATISTICS_GetHandle *c;
392
393   h->receiving = GNUNET_NO;
394   if (NULL != (c = h->current))
395   {
396     h->current = NULL;
397     if ((NULL != c->cont) &&
398         (GNUNET_YES != c->aborted))
399     {
400       c->cont (c->cls,
401                GNUNET_SYSERR);
402       c->cont = NULL;
403     }
404     free_action_item (c);
405   }
406   if (NULL != h->mq)
407   {
408     GNUNET_MQ_destroy (h->mq);
409     h->mq = NULL;
410   }
411 }
412
413
414 /**
415  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
416  *
417  * @param cls statistics handle
418  * @param smsg message received from the service, never NULL
419  * @return #GNUNET_OK if the message was well-formed
420  */
421 static int
422 check_statistics_value (void *cls,
423                         const struct GNUNET_STATISTICS_ReplyMessage *smsg)
424 {
425   const char *service;
426   const char *name;
427   uint16_t size;
428
429   size = ntohs (smsg->header.size);
430   size -= sizeof(struct GNUNET_STATISTICS_ReplyMessage);
431   if (size !=
432       GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
433                                       size,
434                                       2,
435                                       &service,
436                                       &name))
437   {
438     GNUNET_break (0);
439     return GNUNET_SYSERR;
440   }
441   return GNUNET_OK;
442 }
443
444
445 /**
446  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
447  *
448  * @param cls statistics handle
449  * @param msg message received from the service, never NULL
450  * @return #GNUNET_OK if the message was well-formed
451  */
452 static void
453 handle_statistics_value (void *cls,
454                          const struct GNUNET_STATISTICS_ReplyMessage *smsg)
455 {
456   struct GNUNET_STATISTICS_Handle *h = cls;
457   const char *service;
458   const char *name;
459   uint16_t size;
460
461   if (h->current->aborted)
462     return;           /* iteration aborted, don't bother */
463
464   size = ntohs (smsg->header.size);
465   size -= sizeof(struct GNUNET_STATISTICS_ReplyMessage);
466   GNUNET_assert (size ==
467                  GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
468                                                  size,
469                                                  2,
470                                                  &service,
471                                                  &name));
472   LOG (GNUNET_ERROR_TYPE_DEBUG,
473        "Received valid statistic on `%s:%s': %llu\n",
474        service, name,
475        GNUNET_ntohll (smsg->value));
476   if (GNUNET_OK !=
477       h->current->proc (h->current->cls,
478                         service,
479                         name,
480                         GNUNET_ntohll (smsg->value),
481                         (0 !=
482                          (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)) ))
483   {
484     LOG (GNUNET_ERROR_TYPE_DEBUG,
485          "Processing of remaining statistics aborted by client.\n");
486     h->current->aborted = GNUNET_YES;
487   }
488 }
489
490
491 /**
492  * We have received a watch value from the service.  Process it.
493  *
494  * @param cls statistics handle
495  * @param msg the watch value message
496  */
497 static void
498 handle_statistics_watch_value (void *cls,
499                                const struct
500                                GNUNET_STATISTICS_WatchValueMessage *wvm)
501 {
502   struct GNUNET_STATISTICS_Handle *h = cls;
503   struct GNUNET_STATISTICS_WatchEntry *w;
504   uint32_t wid;
505
506   GNUNET_break (0 == ntohl (wvm->reserved));
507   wid = ntohl (wvm->wid);
508   if (wid >= h->watches_size)
509   {
510     do_disconnect (h);
511     reconnect_later (h);
512     return;
513   }
514   w = h->watches[wid];
515   if (NULL == w)
516     return;
517   (void) w->proc (w->proc_cls,
518                   w->subsystem,
519                   w->name,
520                   GNUNET_ntohll (wvm->value),
521                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
522 }
523
524
525 /**
526  * Generic error handler, called with the appropriate error code and
527  * the same closure specified at the creation of the message queue.
528  * Not every message queue implementation supports an error handler.
529  *
530  * @param cls closure with the `struct GNUNET_STATISTICS_Handle *`
531  * @param error error code
532  */
533 static void
534 mq_error_handler (void *cls,
535                   enum GNUNET_MQ_Error error)
536 {
537   struct GNUNET_STATISTICS_Handle *h = cls;
538
539   if (GNUNET_NO != h->do_destroy)
540   {
541     h->do_destroy = GNUNET_NO;
542     if (NULL != h->destroy_task)
543     {
544       GNUNET_SCHEDULER_cancel (h->destroy_task);
545       h->destroy_task = NULL;
546     }
547     GNUNET_STATISTICS_destroy (h,
548                                GNUNET_NO);
549     return;
550   }
551   do_disconnect (h);
552   reconnect_later (h);
553 }
554
555
556 /**
557  * Task used to destroy the statistics handle.
558  *
559  * @param cls the `struct GNUNET_STATISTICS_Handle`
560  */
561 static void
562 do_destroy (void *cls)
563 {
564   struct GNUNET_STATISTICS_Handle *h = cls;
565
566   h->destroy_task = NULL;
567   h->do_destroy = GNUNET_NO;
568   LOG (GNUNET_ERROR_TYPE_DEBUG,
569        "Running final destruction\n");
570   GNUNET_STATISTICS_destroy (h,
571                              GNUNET_NO);
572 }
573
574
575 /**
576  * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM
577  * message. We receive this message at the end of the shutdown when
578  * the service confirms that all data has been written to disk.
579  *
580  * @param cls our `struct GNUNET_STATISTICS_Handle *`
581  * @param msg the message
582  */
583 static void
584 handle_disconnect_confirm (void *cls,
585                            const struct GNUNET_MessageHeader *msg)
586 {
587   struct GNUNET_STATISTICS_Handle *h = cls;
588
589   if (GNUNET_SYSERR != h->do_destroy)
590   {
591     /* not in shutdown, why do we get 'TEST'? */
592     GNUNET_break (0);
593     do_disconnect (h);
594     reconnect_later (h);
595     return;
596   }
597   LOG (GNUNET_ERROR_TYPE_DEBUG,
598        "Received DISCONNNECT_CONFIRM message from statistics, can complete disconnect\n");
599   if (NULL != h->destroy_task)
600     GNUNET_SCHEDULER_cancel (h->destroy_task);
601   h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
602                                               h);
603 }
604
605
606 /**
607  * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive
608  * this message in response to a query to indicate that there are no
609  * further matching results.
610  *
611  * @param cls our `struct GNUNET_STATISTICS_Handle *`
612  * @param msg the message
613  */
614 static void
615 handle_statistics_end (void *cls,
616                        const struct GNUNET_MessageHeader *msg)
617 {
618   struct GNUNET_STATISTICS_Handle *h = cls;
619   struct GNUNET_STATISTICS_GetHandle *c;
620
621   LOG (GNUNET_ERROR_TYPE_DEBUG,
622        "Received end of statistics marker\n");
623   if (NULL == (c = h->current))
624   {
625     GNUNET_break (0);
626     do_disconnect (h);
627     reconnect_later (h);
628     return;
629   }
630   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
631   h->current = NULL;
632   schedule_action (h);
633   if (NULL != c->cont)
634   {
635     c->cont (c->cls,
636              GNUNET_OK);
637     c->cont = NULL;
638   }
639   free_action_item (c);
640 }
641
642
643 /**
644  * Try to (re)connect to the statistics service.
645  *
646  * @param h statistics handle to reconnect
647  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
648  */
649 static int
650 try_connect (struct GNUNET_STATISTICS_Handle *h)
651 {
652   struct GNUNET_MQ_MessageHandler handlers[] = {
653     GNUNET_MQ_hd_fixed_size (disconnect_confirm,
654                              GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM,
655                              struct GNUNET_MessageHeader,
656                              h),
657     GNUNET_MQ_hd_fixed_size (statistics_end,
658                              GNUNET_MESSAGE_TYPE_STATISTICS_END,
659                              struct GNUNET_MessageHeader,
660                              h),
661     GNUNET_MQ_hd_var_size (statistics_value,
662                            GNUNET_MESSAGE_TYPE_STATISTICS_VALUE,
663                            struct GNUNET_STATISTICS_ReplyMessage,
664                            h),
665     GNUNET_MQ_hd_fixed_size (statistics_watch_value,
666                              GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE,
667                              struct GNUNET_STATISTICS_WatchValueMessage,
668                              h),
669     GNUNET_MQ_handler_end ()
670   };
671   struct GNUNET_STATISTICS_GetHandle *gh;
672   struct GNUNET_STATISTICS_GetHandle *gn;
673
674   if (NULL != h->backoff_task)
675     return GNUNET_NO;
676   if (NULL != h->mq)
677     return GNUNET_YES;
678   h->mq = GNUNET_CLIENT_connect (h->cfg,
679                                  "statistics",
680                                  handlers,
681                                  &mq_error_handler,
682                                  h);
683   if (NULL == h->mq)
684   {
685     LOG (GNUNET_ERROR_TYPE_DEBUG,
686          "Failed to connect to statistics service!\n");
687     return GNUNET_NO;
688   }
689   gn = h->action_head;
690   while (NULL != (gh = gn))
691   {
692     gn = gh->next;
693     if (gh->type == ACTION_WATCH)
694     {
695       GNUNET_CONTAINER_DLL_remove (h->action_head,
696                                    h->action_tail,
697                                    gh);
698       free_action_item (gh);
699     }
700   }
701   for (unsigned int i = 0; i < h->watches_size; i++)
702     if (NULL != h->watches[i])
703       schedule_watch_request (h,
704                               h->watches[i]);
705   return GNUNET_YES;
706 }
707
708
709 /**
710  * We've waited long enough, reconnect now.
711  *
712  * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect
713  */
714 static void
715 reconnect_task (void *cls)
716 {
717   struct GNUNET_STATISTICS_Handle *h = cls;
718
719   h->backoff_task = NULL;
720   schedule_action (h);
721 }
722
723
724 /**
725  * Reconnect at a later time, respecting back-off.
726  *
727  * @param h statistics handle
728  */
729 static void
730 reconnect_later (struct GNUNET_STATISTICS_Handle *h)
731 {
732   int loss;
733   struct GNUNET_STATISTICS_GetHandle *gh;
734
735   GNUNET_assert (NULL == h->backoff_task);
736   if (GNUNET_YES == h->do_destroy)
737   {
738     /* So we are shutting down and the service is not reachable.
739      * Chances are that it's down for good and we are not going to connect to
740      * it anymore.
741      * Give up and don't sync the rest of the data.
742      */loss = GNUNET_NO;
743     for (gh = h->action_head; NULL != gh; gh = gh->next)
744       if ((gh->make_persistent) &&
745           (ACTION_SET == gh->type))
746         loss = GNUNET_YES;
747     if (GNUNET_YES == loss)
748       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
749                   _ ("Could not save some persistent statistics\n"));
750     if (NULL != h->destroy_task)
751       GNUNET_SCHEDULER_cancel (h->destroy_task);
752     h->destroy_task = GNUNET_SCHEDULER_add_now (&do_destroy,
753                                                 h);
754     return;
755   }
756   h->backoff_task
757     = GNUNET_SCHEDULER_add_delayed (h->backoff,
758                                     &reconnect_task,
759                                     h);
760   h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
761 }
762
763
764 /**
765  * Transmit a GET request (and if successful, start to receive
766  * the response).
767  *
768  * @param handle statistics handle
769  */
770 static void
771 transmit_get (struct GNUNET_STATISTICS_Handle *handle)
772 {
773   struct GNUNET_STATISTICS_GetHandle *c;
774   struct GNUNET_MessageHeader *hdr;
775   struct GNUNET_MQ_Envelope *env;
776   size_t slen1;
777   size_t slen2;
778
779   GNUNET_assert (NULL != (c = handle->current));
780   slen1 = strlen (c->subsystem) + 1;
781   slen2 = strlen (c->name) + 1;
782   env = GNUNET_MQ_msg_extra (hdr,
783                              slen1 + slen2,
784                              GNUNET_MESSAGE_TYPE_STATISTICS_GET);
785   GNUNET_assert (slen1 + slen2 ==
786                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
787                                              slen1 + slen2,
788                                              2,
789                                              c->subsystem,
790                                              c->name));
791   GNUNET_MQ_notify_sent (env,
792                          &schedule_action,
793                          handle);
794   GNUNET_MQ_send (handle->mq,
795                   env);
796 }
797
798
799 /**
800  * Transmit a WATCH request (and if successful, start to receive
801  * the response).
802  *
803  * @param handle statistics handle
804  */
805 static void
806 transmit_watch (struct GNUNET_STATISTICS_Handle *handle)
807 {
808   struct GNUNET_MessageHeader *hdr;
809   struct GNUNET_MQ_Envelope *env;
810   size_t slen1;
811   size_t slen2;
812
813   LOG (GNUNET_ERROR_TYPE_DEBUG,
814        "Transmitting watch request for `%s'\n",
815        handle->current->name);
816   slen1 = strlen (handle->current->subsystem) + 1;
817   slen2 = strlen (handle->current->name) + 1;
818   env = GNUNET_MQ_msg_extra (hdr,
819                              slen1 + slen2,
820                              GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
821   GNUNET_assert (slen1 + slen2 ==
822                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
823                                              slen1 + slen2,
824                                              2,
825                                              handle->current->subsystem,
826                                              handle->current->name));
827   GNUNET_MQ_notify_sent (env,
828                          &schedule_action,
829                          handle);
830   GNUNET_MQ_send (handle->mq,
831                   env);
832   GNUNET_assert (NULL == handle->current->cont);
833   free_action_item (handle->current);
834   handle->current = NULL;
835   schedule_action (handle);
836 }
837
838
839 /**
840  * Transmit a SET/UPDATE request.
841  *
842  * @param handle statistics handle
843  */
844 static void
845 transmit_set (struct GNUNET_STATISTICS_Handle *handle)
846 {
847   struct GNUNET_STATISTICS_SetMessage *r;
848   struct GNUNET_MQ_Envelope *env;
849   size_t slen;
850   size_t nlen;
851
852   slen = strlen (handle->current->subsystem) + 1;
853   nlen = strlen (handle->current->name) + 1;
854   env = GNUNET_MQ_msg_extra (r,
855                              slen + nlen,
856                              GNUNET_MESSAGE_TYPE_STATISTICS_SET);
857   r->flags = 0;
858   r->value = GNUNET_htonll (handle->current->value);
859   if (handle->current->make_persistent)
860     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
861   if (handle->current->type == ACTION_UPDATE)
862     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
863   GNUNET_assert (slen + nlen ==
864                  GNUNET_STRINGS_buffer_fill ((char *) &r[1],
865                                              slen + nlen,
866                                              2,
867                                              handle->current->subsystem,
868                                              handle->current->name));
869   GNUNET_assert (NULL == handle->current->cont);
870   free_action_item (handle->current);
871   handle->current = NULL;
872   update_memory_statistics (handle);
873   GNUNET_MQ_notify_sent (env,
874                          &schedule_action,
875                          handle);
876   GNUNET_MQ_send (handle->mq,
877                   env);
878 }
879
880
881 /**
882  * Get handle for the statistics service.
883  *
884  * @param subsystem name of subsystem using the service
885  * @param cfg services configuration in use
886  * @return handle to use
887  */
888 struct GNUNET_STATISTICS_Handle *
889 GNUNET_STATISTICS_create (const char *subsystem,
890                           const struct GNUNET_CONFIGURATION_Handle *cfg)
891 {
892   struct GNUNET_STATISTICS_Handle *h;
893
894   if (GNUNET_YES ==
895       GNUNET_CONFIGURATION_get_value_yesno (cfg,
896                                             "statistics",
897                                             "DISABLE"))
898     return NULL;
899   h = GNUNET_new (struct GNUNET_STATISTICS_Handle);
900   h->cfg = cfg;
901   h->subsystem = GNUNET_strdup (subsystem);
902   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
903   return h;
904 }
905
906
907 /**
908  * Destroy a handle (free all state associated with
909  * it).
910  *
911  * @param h statistics handle to destroy
912  * @param sync_first set to #GNUNET_YES if pending SET requests should
913  *        be completed
914  */
915 void
916 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
917                            int sync_first)
918 {
919   struct GNUNET_STATISTICS_GetHandle *pos;
920   struct GNUNET_STATISTICS_GetHandle *next;
921
922   if (NULL == h)
923     return;
924   GNUNET_assert (GNUNET_NO == h->do_destroy);  /* Don't call twice. */
925   if ((sync_first) &&
926       (NULL != h->mq) &&
927       (0 != GNUNET_MQ_get_length (h->mq)))
928   {
929     if ((NULL != h->current) &&
930         (ACTION_GET == h->current->type))
931       h->current->aborted = GNUNET_YES;
932     next = h->action_head;
933     while (NULL != (pos = next))
934     {
935       next = pos->next;
936       if ((ACTION_GET == pos->type) ||
937           (ACTION_WATCH == pos->type))
938       {
939         GNUNET_CONTAINER_DLL_remove (h->action_head,
940                                      h->action_tail,
941                                      pos);
942         free_action_item (pos);
943       }
944     }
945     h->do_destroy = GNUNET_YES;
946     schedule_action (h);
947     GNUNET_assert (NULL == h->destroy_task);
948     h->destroy_task
949       = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (h->backoff,
950                                                                      5),
951                                       &do_destroy,
952                                       h);
953     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
954                 "Deferring destruction\n");
955     return;   /* do not finish destruction just yet */
956   }
957   /* do clean up all */
958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
959               "Cleaning all up\n");
960   while (NULL != (pos = h->action_head))
961   {
962     GNUNET_CONTAINER_DLL_remove (h->action_head,
963                                  h->action_tail,
964                                  pos);
965     free_action_item (pos);
966   }
967   do_disconnect (h);
968   if (NULL != h->backoff_task)
969   {
970     GNUNET_SCHEDULER_cancel (h->backoff_task);
971     h->backoff_task = NULL;
972   }
973   if (NULL != h->destroy_task)
974   {
975     GNUNET_break (0);
976     GNUNET_SCHEDULER_cancel (h->destroy_task);
977     h->destroy_task = NULL;
978   }
979   for (unsigned int i = 0; i < h->watches_size; i++)
980   {
981     if (NULL == h->watches[i])
982       continue;
983     GNUNET_free (h->watches[i]->subsystem);
984     GNUNET_free (h->watches[i]->name);
985     GNUNET_free (h->watches[i]);
986   }
987   GNUNET_array_grow (h->watches,
988                      h->watches_size,
989                      0);
990   GNUNET_free (h->subsystem);
991   GNUNET_free (h);
992 }
993
994
995 /**
996  * Schedule the next action to be performed.
997  *
998  * @param cls statistics handle
999  */
1000 static void
1001 schedule_action (void *cls)
1002 {
1003   struct GNUNET_STATISTICS_Handle *h = cls;
1004
1005   if (NULL != h->backoff_task)
1006     return;                     /* action already pending */
1007   if (GNUNET_YES != try_connect (h))
1008   {
1009     reconnect_later (h);
1010     return;
1011   }
1012   if (0 < GNUNET_MQ_get_length (h->mq))
1013     return; /* Wait for queue to be reduced more */
1014   /* schedule next action */
1015   while (NULL == h->current)
1016   {
1017     h->current = h->action_head;
1018     if (NULL == h->current)
1019     {
1020       struct GNUNET_MessageHeader *hdr;
1021       struct GNUNET_MQ_Envelope *env;
1022
1023       if (GNUNET_YES != h->do_destroy)
1024         return;     /* nothing to do */
1025       /* let service know that we're done */
1026       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027                   "Notifying service that we are done\n");
1028       h->do_destroy = GNUNET_SYSERR;     /* in 'TEST' mode */
1029       env = GNUNET_MQ_msg (hdr,
1030                            GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT);
1031       GNUNET_MQ_notify_sent (env,
1032                              &schedule_action,
1033                              h);
1034       GNUNET_MQ_send (h->mq,
1035                       env);
1036       return;
1037     }
1038     GNUNET_CONTAINER_DLL_remove (h->action_head,
1039                                  h->action_tail,
1040                                  h->current);
1041     switch (h->current->type)
1042     {
1043     case ACTION_GET:
1044       transmit_get (h);
1045       break;
1046
1047     case ACTION_SET:
1048     case ACTION_UPDATE:
1049       transmit_set (h);
1050       break;
1051
1052     case ACTION_WATCH:
1053       transmit_watch (h);
1054       break;
1055
1056     default:
1057       GNUNET_assert (0);
1058       break;
1059     }
1060   }
1061 }
1062
1063
1064 /**
1065  * Get statistic from the peer.
1066  *
1067  * @param handle identification of the statistics service
1068  * @param subsystem limit to the specified subsystem, NULL for our subsystem
1069  * @param name name of the statistic value, NULL for all values
1070  * @param cont continuation to call when done (can be NULL)
1071  *        This callback CANNOT destroy the statistics handle in the same call.
1072  * @param proc function to call on each value
1073  * @param cls closure for @a cont and @a proc
1074  * @return NULL on error
1075  */
1076 struct GNUNET_STATISTICS_GetHandle *
1077 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1078                        const char *subsystem,
1079                        const char *name,
1080                        GNUNET_STATISTICS_Callback cont,
1081                        GNUNET_STATISTICS_Iterator proc,
1082                        void *cls)
1083 {
1084   size_t slen1;
1085   size_t slen2;
1086   struct GNUNET_STATISTICS_GetHandle *ai;
1087
1088   if (NULL == handle)
1089     return NULL;
1090   GNUNET_assert (NULL != proc);
1091   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1092   if (NULL == subsystem)
1093     subsystem = "";
1094   if (NULL == name)
1095     name = "";
1096   slen1 = strlen (subsystem) + 1;
1097   slen2 = strlen (name) + 1;
1098   GNUNET_assert (slen1 + slen2 + sizeof(struct GNUNET_MessageHeader) <
1099                  GNUNET_MAX_MESSAGE_SIZE);
1100   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1101   ai->sh = handle;
1102   ai->subsystem = GNUNET_strdup (subsystem);
1103   ai->name = GNUNET_strdup (name);
1104   ai->cont = cont;
1105   ai->proc = proc;
1106   ai->cls = cls;
1107   ai->type = ACTION_GET;
1108   ai->msize = slen1 + slen2 + sizeof(struct GNUNET_MessageHeader);
1109   GNUNET_CONTAINER_DLL_insert_tail (handle->action_head,
1110                                     handle->action_tail,
1111                                     ai);
1112   schedule_action (handle);
1113   return ai;
1114 }
1115
1116
1117 /**
1118  * Cancel a 'get' request.  Must be called before the 'cont'
1119  * function is called.
1120  *
1121  * @param gh handle of the request to cancel
1122  */
1123 void
1124 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1125 {
1126   if (NULL == gh)
1127     return;
1128   gh->cont = NULL;
1129   if (gh->sh->current == gh)
1130   {
1131     gh->aborted = GNUNET_YES;
1132     return;
1133   }
1134   GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
1135                                gh->sh->action_tail,
1136                                gh);
1137   GNUNET_free (gh->name);
1138   GNUNET_free (gh->subsystem);
1139   GNUNET_free (gh);
1140 }
1141
1142
1143 /**
1144  * Watch statistics from the peer (be notified whenever they change).
1145  *
1146  * @param handle identification of the statistics service
1147  * @param subsystem limit to the specified subsystem, never NULL
1148  * @param name name of the statistic value, never NULL
1149  * @param proc function to call on each value
1150  * @param proc_cls closure for @a proc
1151  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
1152  */
1153 int
1154 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1155                          const char *subsystem,
1156                          const char *name,
1157                          GNUNET_STATISTICS_Iterator proc,
1158                          void *proc_cls)
1159 {
1160   struct GNUNET_STATISTICS_WatchEntry *w;
1161
1162   if (NULL == handle)
1163     return GNUNET_SYSERR;
1164   w = GNUNET_new (struct GNUNET_STATISTICS_WatchEntry);
1165   w->subsystem = GNUNET_strdup (subsystem);
1166   w->name = GNUNET_strdup (name);
1167   w->proc = proc;
1168   w->proc_cls = proc_cls;
1169   GNUNET_array_append (handle->watches,
1170                        handle->watches_size,
1171                        w);
1172   schedule_watch_request (handle,
1173                           w);
1174   return GNUNET_OK;
1175 }
1176
1177
1178 /**
1179  * Stop watching statistics from the peer.
1180  *
1181  * @param handle identification of the statistics service
1182  * @param subsystem limit to the specified subsystem, never NULL
1183  * @param name name of the statistic value, never NULL
1184  * @param proc function to call on each value
1185  * @param proc_cls closure for @a proc
1186  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error (no such watch)
1187  */
1188 int
1189 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
1190                                 const char *subsystem,
1191                                 const char *name,
1192                                 GNUNET_STATISTICS_Iterator proc,
1193                                 void *proc_cls)
1194 {
1195   struct GNUNET_STATISTICS_WatchEntry *w;
1196
1197   if (NULL == handle)
1198     return GNUNET_SYSERR;
1199   for (unsigned int i = 0; i < handle->watches_size; i++)
1200   {
1201     w = handle->watches[i];
1202     if (NULL == w)
1203       continue;
1204     if ((w->proc == proc) &&
1205         (w->proc_cls == proc_cls) &&
1206         (0 == strcmp (w->name,
1207                       name)) &&
1208         (0 == strcmp (w->subsystem,
1209                       subsystem)))
1210     {
1211       GNUNET_free (w->name);
1212       GNUNET_free (w->subsystem);
1213       GNUNET_free (w);
1214       handle->watches[i] = NULL;
1215       return GNUNET_OK;
1216     }
1217   }
1218   return GNUNET_SYSERR;
1219 }
1220
1221
1222 /**
1223  * Queue a request to change a statistic.
1224  *
1225  * @param h statistics handle
1226  * @param name name of the value
1227  * @param make_persistent  should the value be kept across restarts?
1228  * @param value new value or change
1229  * @param type type of the action (#ACTION_SET or #ACTION_UPDATE)
1230  */
1231 static void
1232 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
1233                    const char *name,
1234                    int make_persistent,
1235                    uint64_t value,
1236                    enum ActionType type)
1237 {
1238   struct GNUNET_STATISTICS_GetHandle *ai;
1239   size_t slen;
1240   size_t nlen;
1241   size_t nsize;
1242   int64_t delta;
1243
1244   slen = strlen (h->subsystem) + 1;
1245   nlen = strlen (name) + 1;
1246   nsize = sizeof(struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1247   if (nsize >= GNUNET_MAX_MESSAGE_SIZE)
1248   {
1249     GNUNET_break (0);
1250     return;
1251   }
1252   for (ai = h->action_head; NULL != ai; ai = ai->next)
1253   {
1254     if (! ((0 == strcmp (ai->subsystem,
1255                          h->subsystem)) &&
1256            (0 == strcmp (ai->name,
1257                          name)) &&
1258            ((ACTION_UPDATE == ai->type) ||
1259             (ACTION_SET == ai->type))))
1260       continue;
1261     if (ACTION_SET == ai->type)
1262     {
1263       if (ACTION_UPDATE == type)
1264       {
1265         delta = (int64_t) value;
1266         if (delta > 0)
1267         {
1268           /* update old set by new delta */
1269           ai->value += delta;
1270         }
1271         else
1272         {
1273           /* update old set by new delta, but never go negative */
1274           if (ai->value < -delta)
1275             ai->value = 0;
1276           else
1277             ai->value += delta;
1278         }
1279       }
1280       else
1281       {
1282         /* new set overrides old set */
1283         ai->value = value;
1284       }
1285     }
1286     else
1287     {
1288       if (ACTION_UPDATE == type)
1289       {
1290         /* make delta cummulative */
1291         delta = (int64_t) value;
1292         ai->value += delta;
1293       }
1294       else
1295       {
1296         /* drop old 'update', use new 'set' instead */
1297         ai->value = value;
1298         ai->type = type;
1299       }
1300     }
1301     ai->timeout
1302       = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1303     ai->make_persistent
1304       = make_persistent;
1305     return;
1306   }
1307   /* no existing entry matches, create a fresh one */
1308   ai = GNUNET_new (struct GNUNET_STATISTICS_GetHandle);
1309   ai->sh = h;
1310   ai->subsystem = GNUNET_strdup (h->subsystem);
1311   ai->name = GNUNET_strdup (name);
1312   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1313   ai->make_persistent = make_persistent;
1314   ai->msize = nsize;
1315   ai->value = value;
1316   ai->type = type;
1317   GNUNET_CONTAINER_DLL_insert_tail (h->action_head,
1318                                     h->action_tail,
1319                                     ai);
1320   schedule_action (h);
1321 }
1322
1323
1324 /**
1325  * Set statistic value for the peer.  Will always use our
1326  * subsystem (the argument used when "handle" was created).
1327  *
1328  * @param handle identification of the statistics service
1329  * @param name name of the statistic value
1330  * @param value new value to set
1331  * @param make_persistent should the value be kept across restarts?
1332  */
1333 void
1334 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1335                        const char *name,
1336                        uint64_t value,
1337                        int make_persistent)
1338 {
1339   if (NULL == handle)
1340     return;
1341   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1342   add_setter_action (handle,
1343                      name,
1344                      make_persistent,
1345                      value,
1346                      ACTION_SET);
1347 }
1348
1349
1350 /**
1351  * Set statistic value for the peer.  Will always use our
1352  * subsystem (the argument used when "handle" was created).
1353  *
1354  * @param handle identification of the statistics service
1355  * @param name name of the statistic value
1356  * @param delta change in value (added to existing value)
1357  * @param make_persistent should the value be kept across restarts?
1358  */
1359 void
1360 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1361                           const char *name,
1362                           int64_t delta,
1363                           int make_persistent)
1364 {
1365   if (NULL == handle)
1366     return;
1367   if (0 == delta)
1368     return;
1369   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1370   add_setter_action (handle,
1371                      name,
1372                      make_persistent,
1373                      (uint64_t) delta,
1374                      ACTION_UPDATE);
1375 }
1376
1377
1378 /* end of statistics_api.c */