uncrustify as demanded.
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "statistics.h"
32
33 /**
34  * How long do we wait until a statistics request for setting
35  * a value times out?  (The update will be lost if the
36  * service does not react within this timeframe).
37  */
38 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 2)
39
40 #define LOG(kind, ...) GNUNET_log_from(kind, "statistics-api", __VA_ARGS__)
41
42 /**
43  * Types of actions.
44  */
45 enum ActionType {
46   /**
47    * Get a value.
48    */
49   ACTION_GET,
50
51   /**
52    * Set a value.
53    */
54   ACTION_SET,
55
56   /**
57    * Update a value.
58    */
59   ACTION_UPDATE,
60
61   /**
62    * Watch a value.
63    */
64   ACTION_WATCH
65 };
66
67
68 /**
69  * Entry kept for each value we are watching.
70  */
71 struct GNUNET_STATISTICS_WatchEntry {
72   /**
73    * What subsystem is this action about? (never NULL)
74    */
75   char *subsystem;
76
77   /**
78    * What value is this action about? (never NULL)
79    */
80   char *name;
81
82   /**
83    * Function to call
84    */
85   GNUNET_STATISTICS_Iterator proc;
86
87   /**
88    * Closure for @e proc
89    */
90   void *proc_cls;
91 };
92
93
94 /**
95  * Linked list of things we still need to do.
96  */
97 struct GNUNET_STATISTICS_GetHandle {
98   /**
99    * This is a doubly linked list.
100    */
101   struct GNUNET_STATISTICS_GetHandle *next;
102
103   /**
104    * This is a doubly linked list.
105    */
106   struct GNUNET_STATISTICS_GetHandle *prev;
107
108   /**
109    * Main statistics handle.
110    */
111   struct GNUNET_STATISTICS_Handle *sh;
112
113   /**
114    * What subsystem is this action about? (can be NULL)
115    */
116   char *subsystem;
117
118   /**
119    * What value is this action about? (can be NULL)
120    */
121   char *name;
122
123   /**
124    * Continuation to call once action is complete.
125    */
126   GNUNET_STATISTICS_Callback cont;
127
128   /**
129    * Function to call (for GET actions only).
130    */
131   GNUNET_STATISTICS_Iterator proc;
132
133   /**
134    * Closure for @e proc and @e cont.
135    */
136   void *cls;
137
138   /**
139    * Timeout for this action.
140    */
141   struct GNUNET_TIME_Absolute timeout;
142
143   /**
144    * Associated value.
145    */
146   uint64_t value;
147
148   /**
149    * Flag for SET/UPDATE actions.
150    */
151   int make_persistent;
152
153   /**
154    * Has the current iteration been aborted; for GET actions.
155    */
156   int aborted;
157
158   /**
159    * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH?
160    */
161   enum ActionType type;
162
163   /**
164    * Size of the message that we will be transmitting.
165    */
166   uint16_t msize;
167 };
168
169
170 /**
171  * Handle for the service.
172  */
173 struct GNUNET_STATISTICS_Handle {
174   /**
175    * Name of our subsystem.
176    */
177   char *subsystem;
178
179   /**
180    * Configuration to use.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Message queue to the service.
186    */
187   struct GNUNET_MQ_Handle *mq;
188
189   /**
190    * Head of the linked list of pending actions (first action
191    * to be performed).
192    */
193   struct GNUNET_STATISTICS_GetHandle *action_head;
194
195   /**
196    * Tail of the linked list of actions (for fast append).
197    */
198   struct GNUNET_STATISTICS_GetHandle *action_tail;
199
200   /**
201    * Action we are currently busy with (action request has been
202    * transmitted, we're now receiving the response from the
203    * service).
204    */
205   struct GNUNET_STATISTICS_GetHandle *current;
206
207   /**
208    * Array of watch entries.
209    */
210   struct GNUNET_STATISTICS_WatchEntry **watches;
211
212   /**
213    * Task doing exponential back-off trying to reconnect.
214    */
215   struct GNUNET_SCHEDULER_Task *backoff_task;
216
217   /**
218    * Task for running #do_destroy().
219    */
220   struct GNUNET_SCHEDULER_Task *destroy_task;
221
222   /**
223    * Time for next connect retry.
224    */
225   struct GNUNET_TIME_Relative backoff;
226
227   /**
228    * Maximum heap size observed so far (if available).
229    */
230   uint64_t peak_heap_size;
231
232   /**
233    * Maximum resident set side observed so far (if available).
234    */
235   uint64_t peak_rss;
236
237   /**
238    * Size of the @e watches array.
239    */
240   unsigned int watches_size;
241
242   /**
243    * Should this handle auto-destruct once all actions have
244    * been processed?
245    */
246   int do_destroy;
247
248   /**
249    * Are we currently receiving from the service?
250    */
251   int receiving;
252 };
253
254
255 /**
256  * Obtain statistics about this process's memory consumption and
257  * report those as well (if they changed).
258  */
259 static void
260 update_memory_statistics(struct GNUNET_STATISTICS_Handle *h)
261 {
262 #if ENABLE_HEAP_STATISTICS
263   uint64_t current_heap_size = 0;
264   uint64_t current_rss = 0;
265
266   if (GNUNET_NO != h->do_destroy)
267     return;
268 #if HAVE_MALLINFO
269   {
270     struct mallinfo mi;
271
272     mi = mallinfo();
273     current_heap_size = mi.uordblks + mi.fordblks;
274   }
275 #endif
276 #if HAVE_GETRUSAGE
277   {
278     struct rusage ru;
279
280     if (0 == getrusage(RUSAGE_SELF, &ru))
281       {
282         current_rss = 1024LL * ru.ru_maxrss;
283       }
284   }
285 #endif
286   if (current_heap_size > h->peak_heap_size)
287     {
288       h->peak_heap_size = current_heap_size;
289       GNUNET_STATISTICS_set(h,
290                             "# peak heap size",
291                             current_heap_size,
292                             GNUNET_NO);
293     }
294   if (current_rss > h->peak_rss)
295     {
296       h->peak_rss = current_rss;
297       GNUNET_STATISTICS_set(h,
298                             "# peak resident set size",
299                             current_rss,
300                             GNUNET_NO);
301     }
302 #endif
303 }
304
305
306 /**
307  * Reconnect at a later time, respecting back-off.
308  *
309  * @param h statistics handle
310  */
311 static void
312 reconnect_later(struct GNUNET_STATISTICS_Handle *h);
313
314
315 /**
316  * Schedule the next action to be performed.
317  *
318  * @param cls statistics handle to reconnect
319  */
320 static void
321 schedule_action(void *cls);
322
323
324 /**
325  * Transmit request to service that we want to watch
326  * the development of a particular value.
327  *
328  * @param h statistics handle
329  * @param watch watch entry of the value to watch
330  */
331 static void
332 schedule_watch_request(struct GNUNET_STATISTICS_Handle *h,
333                        struct GNUNET_STATISTICS_WatchEntry *watch)
334 {
335   struct GNUNET_STATISTICS_GetHandle *ai;
336   size_t slen;
337   size_t nlen;
338   size_t nsize;
339
340   slen = strlen(watch->subsystem) + 1;
341   nlen = strlen(watch->name) + 1;
342   nsize = sizeof(struct GNUNET_MessageHeader) + slen + nlen;
343   if (nsize >= GNUNET_MAX_MESSAGE_SIZE)
344     {
345       GNUNET_break(0);
346       return;
347     }
348   ai = GNUNET_new(struct GNUNET_STATISTICS_GetHandle);
349   ai->sh = h;
350   ai->subsystem = GNUNET_strdup(watch->subsystem);
351   ai->name = GNUNET_strdup(watch->name);
352   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
353   ai->msize = nsize;
354   ai->type = ACTION_WATCH;
355   ai->proc = watch->proc;
356   ai->cls = watch->proc_cls;
357   GNUNET_CONTAINER_DLL_insert_tail(h->action_head,
358                                    h->action_tail,
359                                    ai);
360   schedule_action(h);
361 }
362
363
364 /**
365  * Free memory associated with the given action item.
366  *
367  * @param gh action item to free
368  */
369 static void
370 free_action_item(struct GNUNET_STATISTICS_GetHandle *gh)
371 {
372   GNUNET_free_non_null(gh->subsystem);
373   GNUNET_free_non_null(gh->name);
374   GNUNET_free(gh);
375 }
376
377
378 /**
379  * Disconnect from the statistics service.
380  *
381  * @param h statistics handle to disconnect from
382  */
383 static void
384 do_disconnect(struct GNUNET_STATISTICS_Handle *h)
385 {
386   struct GNUNET_STATISTICS_GetHandle *c;
387
388   h->receiving = GNUNET_NO;
389   if (NULL != (c = h->current))
390     {
391       h->current = NULL;
392       if ((NULL != c->cont) &&
393           (GNUNET_YES != c->aborted))
394         {
395           c->cont(c->cls,
396                   GNUNET_SYSERR);
397           c->cont = NULL;
398         }
399       free_action_item(c);
400     }
401   if (NULL != h->mq)
402     {
403       GNUNET_MQ_destroy(h->mq);
404       h->mq = NULL;
405     }
406 }
407
408
409 /**
410  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
411  *
412  * @param cls statistics handle
413  * @param smsg message received from the service, never NULL
414  * @return #GNUNET_OK if the message was well-formed
415  */
416 static int
417 check_statistics_value(void *cls,
418                        const struct GNUNET_STATISTICS_ReplyMessage *smsg)
419 {
420   const char *service;
421   const char *name;
422   uint16_t size;
423
424   size = ntohs(smsg->header.size);
425   size -= sizeof(struct GNUNET_STATISTICS_ReplyMessage);
426   if (size !=
427       GNUNET_STRINGS_buffer_tokenize((const char *)&smsg[1],
428                                      size,
429                                      2,
430                                      &service,
431                                      &name))
432     {
433       GNUNET_break(0);
434       return GNUNET_SYSERR;
435     }
436   return GNUNET_OK;
437 }
438
439
440 /**
441  * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message.
442  *
443  * @param cls statistics handle
444  * @param msg message received from the service, never NULL
445  * @return #GNUNET_OK if the message was well-formed
446  */
447 static void
448 handle_statistics_value(void *cls,
449                         const struct GNUNET_STATISTICS_ReplyMessage *smsg)
450 {
451   struct GNUNET_STATISTICS_Handle *h = cls;
452   const char *service;
453   const char *name;
454   uint16_t size;
455
456   if (h->current->aborted)
457     return;           /* iteration aborted, don't bother */
458
459   size = ntohs(smsg->header.size);
460   size -= sizeof(struct GNUNET_STATISTICS_ReplyMessage);
461   GNUNET_assert(size ==
462                 GNUNET_STRINGS_buffer_tokenize((const char *)&smsg[1],
463                                                size,
464                                                2,
465                                                &service,
466                                                &name));
467   LOG(GNUNET_ERROR_TYPE_DEBUG,
468       "Received valid statistic on `%s:%s': %llu\n",
469       service, name,
470       GNUNET_ntohll(smsg->value));
471   if (GNUNET_OK !=
472       h->current->proc(h->current->cls,
473                        service,
474                        name,
475                        GNUNET_ntohll(smsg->value),
476                        0 !=
477                        (ntohl(smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
478     {
479       LOG(GNUNET_ERROR_TYPE_DEBUG,
480           "Processing of remaining statistics aborted by client.\n");
481       h->current->aborted = GNUNET_YES;
482     }
483 }
484
485
486 /**
487  * We have received a watch value from the service.  Process it.
488  *
489  * @param cls statistics handle
490  * @param msg the watch value message
491  */
492 static void
493 handle_statistics_watch_value(void *cls,
494                               const struct GNUNET_STATISTICS_WatchValueMessage *wvm)
495 {
496   struct GNUNET_STATISTICS_Handle *h = cls;
497   struct GNUNET_STATISTICS_WatchEntry *w;
498   uint32_t wid;
499
500   GNUNET_break(0 == ntohl(wvm->reserved));
501   wid = ntohl(wvm->wid);
502   if (wid >= h->watches_size)
503     {
504       do_disconnect(h);
505       reconnect_later(h);
506       return;
507     }
508   w = h->watches[wid];
509   if (NULL == w)
510     return;
511   (void)w->proc(w->proc_cls,
512                 w->subsystem,
513                 w->name,
514                 GNUNET_ntohll(wvm->value),
515                 0 != (ntohl(wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
516 }
517
518
519 /**
520  * Generic error handler, called with the appropriate error code and
521  * the same closure specified at the creation of the message queue.
522  * Not every message queue implementation supports an error handler.
523  *
524  * @param cls closure with the `struct GNUNET_STATISTICS_Handle *`
525  * @param error error code
526  */
527 static void
528 mq_error_handler(void *cls,
529                  enum GNUNET_MQ_Error error)
530 {
531   struct GNUNET_STATISTICS_Handle *h = cls;
532
533   if (GNUNET_NO != h->do_destroy)
534     {
535       h->do_destroy = GNUNET_NO;
536       if (NULL != h->destroy_task)
537         {
538           GNUNET_SCHEDULER_cancel(h->destroy_task);
539           h->destroy_task = NULL;
540         }
541       GNUNET_STATISTICS_destroy(h,
542                                 GNUNET_NO);
543       return;
544     }
545   do_disconnect(h);
546   reconnect_later(h);
547 }
548
549
550 /**
551  * Task used to destroy the statistics handle.
552  *
553  * @param cls the `struct GNUNET_STATISTICS_Handle`
554  */
555 static void
556 do_destroy(void *cls)
557 {
558   struct GNUNET_STATISTICS_Handle *h = cls;
559
560   h->destroy_task = NULL;
561   h->do_destroy = GNUNET_NO;
562   LOG(GNUNET_ERROR_TYPE_DEBUG,
563       "Running final destruction\n");
564   GNUNET_STATISTICS_destroy(h,
565                             GNUNET_NO);
566 }
567
568
569 /**
570  * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM
571  * message. We receive this message at the end of the shutdown when
572  * the service confirms that all data has been written to disk.
573  *
574  * @param cls our `struct GNUNET_STATISTICS_Handle *`
575  * @param msg the message
576  */
577 static void
578 handle_disconnect_confirm(void *cls,
579                           const struct GNUNET_MessageHeader *msg)
580 {
581   struct GNUNET_STATISTICS_Handle *h = cls;
582
583   if (GNUNET_SYSERR != h->do_destroy)
584     {
585       /* not in shutdown, why do we get 'TEST'? */
586       GNUNET_break(0);
587       do_disconnect(h);
588       reconnect_later(h);
589       return;
590     }
591   LOG(GNUNET_ERROR_TYPE_DEBUG,
592       "Received DISCONNNECT_CONFIRM message from statistics, can complete disconnect\n");
593   if (NULL != h->destroy_task)
594     GNUNET_SCHEDULER_cancel(h->destroy_task);
595   h->destroy_task = GNUNET_SCHEDULER_add_now(&do_destroy,
596                                              h);
597 }
598
599
600 /**
601  * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive
602  * this message in response to a query to indicate that there are no
603  * further matching results.
604  *
605  * @param cls our `struct GNUNET_STATISTICS_Handle *`
606  * @param msg the message
607  */
608 static void
609 handle_statistics_end(void *cls,
610                       const struct GNUNET_MessageHeader *msg)
611 {
612   struct GNUNET_STATISTICS_Handle *h = cls;
613   struct GNUNET_STATISTICS_GetHandle *c;
614
615   LOG(GNUNET_ERROR_TYPE_DEBUG,
616       "Received end of statistics marker\n");
617   if (NULL == (c = h->current))
618     {
619       GNUNET_break(0);
620       do_disconnect(h);
621       reconnect_later(h);
622       return;
623     }
624   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
625   h->current = NULL;
626   schedule_action(h);
627   if (NULL != c->cont)
628     {
629       c->cont(c->cls,
630               GNUNET_OK);
631       c->cont = NULL;
632     }
633   free_action_item(c);
634 }
635
636
637 /**
638  * Try to (re)connect to the statistics service.
639  *
640  * @param h statistics handle to reconnect
641  * @return #GNUNET_YES on success, #GNUNET_NO on failure.
642  */
643 static int
644 try_connect(struct GNUNET_STATISTICS_Handle *h)
645 {
646   struct GNUNET_MQ_MessageHandler handlers[] = {
647     GNUNET_MQ_hd_fixed_size(disconnect_confirm,
648                             GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT_CONFIRM,
649                             struct GNUNET_MessageHeader,
650                             h),
651     GNUNET_MQ_hd_fixed_size(statistics_end,
652                             GNUNET_MESSAGE_TYPE_STATISTICS_END,
653                             struct GNUNET_MessageHeader,
654                             h),
655     GNUNET_MQ_hd_var_size(statistics_value,
656                           GNUNET_MESSAGE_TYPE_STATISTICS_VALUE,
657                           struct GNUNET_STATISTICS_ReplyMessage,
658                           h),
659     GNUNET_MQ_hd_fixed_size(statistics_watch_value,
660                             GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE,
661                             struct GNUNET_STATISTICS_WatchValueMessage,
662                             h),
663     GNUNET_MQ_handler_end()
664   };
665   struct GNUNET_STATISTICS_GetHandle *gh;
666   struct GNUNET_STATISTICS_GetHandle *gn;
667
668   if (NULL != h->backoff_task)
669     return GNUNET_NO;
670   if (NULL != h->mq)
671     return GNUNET_YES;
672   h->mq = GNUNET_CLIENT_connect(h->cfg,
673                                 "statistics",
674                                 handlers,
675                                 &mq_error_handler,
676                                 h);
677   if (NULL == h->mq)
678     {
679       LOG(GNUNET_ERROR_TYPE_DEBUG,
680           "Failed to connect to statistics service!\n");
681       return GNUNET_NO;
682     }
683   gn = h->action_head;
684   while (NULL != (gh = gn))
685     {
686       gn = gh->next;
687       if (gh->type == ACTION_WATCH)
688         {
689           GNUNET_CONTAINER_DLL_remove(h->action_head,
690                                       h->action_tail,
691                                       gh);
692           free_action_item(gh);
693         }
694     }
695   for (unsigned int i = 0; i < h->watches_size; i++)
696     if (NULL != h->watches[i])
697       schedule_watch_request(h,
698                              h->watches[i]);
699   return GNUNET_YES;
700 }
701
702
703 /**
704  * We've waited long enough, reconnect now.
705  *
706  * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect
707  */
708 static void
709 reconnect_task(void *cls)
710 {
711   struct GNUNET_STATISTICS_Handle *h = cls;
712
713   h->backoff_task = NULL;
714   schedule_action(h);
715 }
716
717
718 /**
719  * Reconnect at a later time, respecting back-off.
720  *
721  * @param h statistics handle
722  */
723 static void
724 reconnect_later(struct GNUNET_STATISTICS_Handle *h)
725 {
726   int loss;
727   struct GNUNET_STATISTICS_GetHandle *gh;
728
729   GNUNET_assert(NULL == h->backoff_task);
730   if (GNUNET_YES == h->do_destroy)
731     {
732       /* So we are shutting down and the service is not reachable.
733        * Chances are that it's down for good and we are not going to connect to
734        * it anymore.
735        * Give up and don't sync the rest of the data.
736        */
737       loss = GNUNET_NO;
738       for (gh = h->action_head; NULL != gh; gh = gh->next)
739         if ((gh->make_persistent) &&
740             (ACTION_SET == gh->type))
741           loss = GNUNET_YES;
742       if (GNUNET_YES == loss)
743         GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
744                    _("Could not save some persistent statistics\n"));
745       if (NULL != h->destroy_task)
746         GNUNET_SCHEDULER_cancel(h->destroy_task);
747       h->destroy_task = GNUNET_SCHEDULER_add_now(&do_destroy,
748                                                  h);
749       return;
750     }
751   h->backoff_task
752     = GNUNET_SCHEDULER_add_delayed(h->backoff,
753                                    &reconnect_task,
754                                    h);
755   h->backoff = GNUNET_TIME_STD_BACKOFF(h->backoff);
756 }
757
758
759
760 /**
761  * Transmit a GET request (and if successful, start to receive
762  * the response).
763  *
764  * @param handle statistics handle
765  */
766 static void
767 transmit_get(struct GNUNET_STATISTICS_Handle *handle)
768 {
769   struct GNUNET_STATISTICS_GetHandle *c;
770   struct GNUNET_MessageHeader *hdr;
771   struct GNUNET_MQ_Envelope *env;
772   size_t slen1;
773   size_t slen2;
774
775   GNUNET_assert(NULL != (c = handle->current));
776   slen1 = strlen(c->subsystem) + 1;
777   slen2 = strlen(c->name) + 1;
778   env = GNUNET_MQ_msg_extra(hdr,
779                             slen1 + slen2,
780                             GNUNET_MESSAGE_TYPE_STATISTICS_GET);
781   GNUNET_assert(slen1 + slen2 ==
782                 GNUNET_STRINGS_buffer_fill((char *)&hdr[1],
783                                            slen1 + slen2,
784                                            2,
785                                            c->subsystem,
786                                            c->name));
787   GNUNET_MQ_notify_sent(env,
788                         &schedule_action,
789                         handle);
790   GNUNET_MQ_send(handle->mq,
791                  env);
792 }
793
794
795 /**
796  * Transmit a WATCH request (and if successful, start to receive
797  * the response).
798  *
799  * @param handle statistics handle
800  */
801 static void
802 transmit_watch(struct GNUNET_STATISTICS_Handle *handle)
803 {
804   struct GNUNET_MessageHeader *hdr;
805   struct GNUNET_MQ_Envelope *env;
806   size_t slen1;
807   size_t slen2;
808
809   LOG(GNUNET_ERROR_TYPE_DEBUG,
810       "Transmitting watch request for `%s'\n",
811       handle->current->name);
812   slen1 = strlen(handle->current->subsystem) + 1;
813   slen2 = strlen(handle->current->name) + 1;
814   env = GNUNET_MQ_msg_extra(hdr,
815                             slen1 + slen2,
816                             GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
817   GNUNET_assert(slen1 + slen2 ==
818                 GNUNET_STRINGS_buffer_fill((char *)&hdr[1],
819                                            slen1 + slen2,
820                                            2,
821                                            handle->current->subsystem,
822                                            handle->current->name));
823   GNUNET_MQ_notify_sent(env,
824                         &schedule_action,
825                         handle);
826   GNUNET_MQ_send(handle->mq,
827                  env);
828   GNUNET_assert(NULL == handle->current->cont);
829   free_action_item(handle->current);
830   handle->current = NULL;
831   schedule_action(handle);
832 }
833
834
835 /**
836  * Transmit a SET/UPDATE request.
837  *
838  * @param handle statistics handle
839  */
840 static void
841 transmit_set(struct GNUNET_STATISTICS_Handle *handle)
842 {
843   struct GNUNET_STATISTICS_SetMessage *r;
844   struct GNUNET_MQ_Envelope *env;
845   size_t slen;
846   size_t nlen;
847
848   slen = strlen(handle->current->subsystem) + 1;
849   nlen = strlen(handle->current->name) + 1;
850   env = GNUNET_MQ_msg_extra(r,
851                             slen + nlen,
852                             GNUNET_MESSAGE_TYPE_STATISTICS_SET);
853   r->flags = 0;
854   r->value = GNUNET_htonll(handle->current->value);
855   if (handle->current->make_persistent)
856     r->flags |= htonl(GNUNET_STATISTICS_SETFLAG_PERSISTENT);
857   if (handle->current->type == ACTION_UPDATE)
858     r->flags |= htonl(GNUNET_STATISTICS_SETFLAG_RELATIVE);
859   GNUNET_assert(slen + nlen ==
860                 GNUNET_STRINGS_buffer_fill((char *)&r[1],
861                                            slen + nlen,
862                                            2,
863                                            handle->current->subsystem,
864                                            handle->current->name));
865   GNUNET_assert(NULL == handle->current->cont);
866   free_action_item(handle->current);
867   handle->current = NULL;
868   update_memory_statistics(handle);
869   GNUNET_MQ_notify_sent(env,
870                         &schedule_action,
871                         handle);
872   GNUNET_MQ_send(handle->mq,
873                  env);
874 }
875
876
877 /**
878  * Get handle for the statistics service.
879  *
880  * @param subsystem name of subsystem using the service
881  * @param cfg services configuration in use
882  * @return handle to use
883  */
884 struct GNUNET_STATISTICS_Handle *
885 GNUNET_STATISTICS_create(const char *subsystem,
886                          const struct GNUNET_CONFIGURATION_Handle *cfg)
887 {
888   struct GNUNET_STATISTICS_Handle *h;
889
890   if (GNUNET_YES ==
891       GNUNET_CONFIGURATION_get_value_yesno(cfg,
892                                            "statistics",
893                                            "DISABLE"))
894     return NULL;
895   h = GNUNET_new(struct GNUNET_STATISTICS_Handle);
896   h->cfg = cfg;
897   h->subsystem = GNUNET_strdup(subsystem);
898   h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
899   return h;
900 }
901
902
903 /**
904  * Destroy a handle (free all state associated with
905  * it).
906  *
907  * @param h statistics handle to destroy
908  * @param sync_first set to #GNUNET_YES if pending SET requests should
909  *        be completed
910  */
911 void
912 GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h,
913                           int sync_first)
914 {
915   struct GNUNET_STATISTICS_GetHandle *pos;
916   struct GNUNET_STATISTICS_GetHandle *next;
917
918   if (NULL == h)
919     return;
920   GNUNET_assert(GNUNET_NO == h->do_destroy);  /* Don't call twice. */
921   if ((sync_first) &&
922       (NULL != h->mq) &&
923       (0 != GNUNET_MQ_get_length(h->mq)))
924     {
925       if ((NULL != h->current) &&
926           (ACTION_GET == h->current->type))
927         h->current->aborted = GNUNET_YES;
928       next = h->action_head;
929       while (NULL != (pos = next))
930         {
931           next = pos->next;
932           if ((ACTION_GET == pos->type) ||
933               (ACTION_WATCH == pos->type))
934             {
935               GNUNET_CONTAINER_DLL_remove(h->action_head,
936                                           h->action_tail,
937                                           pos);
938               free_action_item(pos);
939             }
940         }
941       h->do_destroy = GNUNET_YES;
942       schedule_action(h);
943       GNUNET_assert(NULL == h->destroy_task);
944       h->destroy_task
945         = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_relative_multiply(h->backoff,
946                                                                      5),
947                                        &do_destroy,
948                                        h);
949       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
950                  "Deferring destruction\n");
951       return; /* do not finish destruction just yet */
952     }
953   /* do clean up all */
954   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
955              "Cleaning all up\n");
956   while (NULL != (pos = h->action_head))
957     {
958       GNUNET_CONTAINER_DLL_remove(h->action_head,
959                                   h->action_tail,
960                                   pos);
961       free_action_item(pos);
962     }
963   do_disconnect(h);
964   if (NULL != h->backoff_task)
965     {
966       GNUNET_SCHEDULER_cancel(h->backoff_task);
967       h->backoff_task = NULL;
968     }
969   if (NULL != h->destroy_task)
970     {
971       GNUNET_break(0);
972       GNUNET_SCHEDULER_cancel(h->destroy_task);
973       h->destroy_task = NULL;
974     }
975   for (unsigned int i = 0; i < h->watches_size; i++)
976     {
977       if (NULL == h->watches[i])
978         continue;
979       GNUNET_free(h->watches[i]->subsystem);
980       GNUNET_free(h->watches[i]->name);
981       GNUNET_free(h->watches[i]);
982     }
983   GNUNET_array_grow(h->watches,
984                     h->watches_size,
985                     0);
986   GNUNET_free(h->subsystem);
987   GNUNET_free(h);
988 }
989
990
991 /**
992  * Schedule the next action to be performed.
993  *
994  * @param cls statistics handle
995  */
996 static void
997 schedule_action(void *cls)
998 {
999   struct GNUNET_STATISTICS_Handle *h = cls;
1000
1001   if (NULL != h->backoff_task)
1002     return;                     /* action already pending */
1003   if (GNUNET_YES != try_connect(h))
1004     {
1005       reconnect_later(h);
1006       return;
1007     }
1008   if (0 < GNUNET_MQ_get_length(h->mq))
1009     return; /* Wait for queue to be reduced more */
1010   /* schedule next action */
1011   while (NULL == h->current)
1012     {
1013       h->current = h->action_head;
1014       if (NULL == h->current)
1015         {
1016           struct GNUNET_MessageHeader *hdr;
1017           struct GNUNET_MQ_Envelope *env;
1018
1019           if (GNUNET_YES != h->do_destroy)
1020             return; /* nothing to do */
1021           /* let service know that we're done */
1022           GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1023                      "Notifying service that we are done\n");
1024           h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
1025           env = GNUNET_MQ_msg(hdr,
1026                               GNUNET_MESSAGE_TYPE_STATISTICS_DISCONNECT);
1027           GNUNET_MQ_notify_sent(env,
1028                                 &schedule_action,
1029                                 h);
1030           GNUNET_MQ_send(h->mq,
1031                          env);
1032           return;
1033         }
1034       GNUNET_CONTAINER_DLL_remove(h->action_head,
1035                                   h->action_tail,
1036                                   h->current);
1037       switch (h->current->type)
1038         {
1039         case ACTION_GET:
1040           transmit_get(h);
1041           break;
1042
1043         case ACTION_SET:
1044         case ACTION_UPDATE:
1045           transmit_set(h);
1046           break;
1047
1048         case ACTION_WATCH:
1049           transmit_watch(h);
1050           break;
1051
1052         default:
1053           GNUNET_assert(0);
1054           break;
1055         }
1056     }
1057 }
1058
1059
1060 /**
1061  * Get statistic from the peer.
1062  *
1063  * @param handle identification of the statistics service
1064  * @param subsystem limit to the specified subsystem, NULL for our subsystem
1065  * @param name name of the statistic value, NULL for all values
1066  * @param cont continuation to call when done (can be NULL)
1067  *        This callback CANNOT destroy the statistics handle in the same call.
1068  * @param proc function to call on each value
1069  * @param cls closure for @a cont and @a proc
1070  * @return NULL on error
1071  */
1072 struct GNUNET_STATISTICS_GetHandle *
1073 GNUNET_STATISTICS_get(struct GNUNET_STATISTICS_Handle *handle,
1074                       const char *subsystem,
1075                       const char *name,
1076                       GNUNET_STATISTICS_Callback cont,
1077                       GNUNET_STATISTICS_Iterator proc,
1078                       void *cls)
1079 {
1080   size_t slen1;
1081   size_t slen2;
1082   struct GNUNET_STATISTICS_GetHandle *ai;
1083
1084   if (NULL == handle)
1085     return NULL;
1086   GNUNET_assert(NULL != proc);
1087   GNUNET_assert(GNUNET_NO == handle->do_destroy);
1088   if (NULL == subsystem)
1089     subsystem = "";
1090   if (NULL == name)
1091     name = "";
1092   slen1 = strlen(subsystem) + 1;
1093   slen2 = strlen(name) + 1;
1094   GNUNET_assert(slen1 + slen2 + sizeof(struct GNUNET_MessageHeader) <
1095                 GNUNET_MAX_MESSAGE_SIZE);
1096   ai = GNUNET_new(struct GNUNET_STATISTICS_GetHandle);
1097   ai->sh = handle;
1098   ai->subsystem = GNUNET_strdup(subsystem);
1099   ai->name = GNUNET_strdup(name);
1100   ai->cont = cont;
1101   ai->proc = proc;
1102   ai->cls = cls;
1103   ai->type = ACTION_GET;
1104   ai->msize = slen1 + slen2 + sizeof(struct GNUNET_MessageHeader);
1105   GNUNET_CONTAINER_DLL_insert_tail(handle->action_head,
1106                                    handle->action_tail,
1107                                    ai);
1108   schedule_action(handle);
1109   return ai;
1110 }
1111
1112
1113 /**
1114  * Cancel a 'get' request.  Must be called before the 'cont'
1115  * function is called.
1116  *
1117  * @param gh handle of the request to cancel
1118  */
1119 void
1120 GNUNET_STATISTICS_get_cancel(struct GNUNET_STATISTICS_GetHandle *gh)
1121 {
1122   if (NULL == gh)
1123     return;
1124   gh->cont = NULL;
1125   if (gh->sh->current == gh)
1126     {
1127       gh->aborted = GNUNET_YES;
1128       return;
1129     }
1130   GNUNET_CONTAINER_DLL_remove(gh->sh->action_head,
1131                               gh->sh->action_tail,
1132                               gh);
1133   GNUNET_free(gh->name);
1134   GNUNET_free(gh->subsystem);
1135   GNUNET_free(gh);
1136 }
1137
1138
1139 /**
1140  * Watch statistics from the peer (be notified whenever they change).
1141  *
1142  * @param handle identification of the statistics service
1143  * @param subsystem limit to the specified subsystem, never NULL
1144  * @param name name of the statistic value, never NULL
1145  * @param proc function to call on each value
1146  * @param proc_cls closure for @a proc
1147  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
1148  */
1149 int
1150 GNUNET_STATISTICS_watch(struct GNUNET_STATISTICS_Handle *handle,
1151                         const char *subsystem,
1152                         const char *name,
1153                         GNUNET_STATISTICS_Iterator proc,
1154                         void *proc_cls)
1155 {
1156   struct GNUNET_STATISTICS_WatchEntry *w;
1157
1158   if (NULL == handle)
1159     return GNUNET_SYSERR;
1160   w = GNUNET_new(struct GNUNET_STATISTICS_WatchEntry);
1161   w->subsystem = GNUNET_strdup(subsystem);
1162   w->name = GNUNET_strdup(name);
1163   w->proc = proc;
1164   w->proc_cls = proc_cls;
1165   GNUNET_array_append(handle->watches,
1166                       handle->watches_size,
1167                       w);
1168   schedule_watch_request(handle,
1169                          w);
1170   return GNUNET_OK;
1171 }
1172
1173
1174 /**
1175  * Stop watching statistics from the peer.
1176  *
1177  * @param handle identification of the statistics service
1178  * @param subsystem limit to the specified subsystem, never NULL
1179  * @param name name of the statistic value, never NULL
1180  * @param proc function to call on each value
1181  * @param proc_cls closure for @a proc
1182  * @return #GNUNET_OK on success, #GNUNET_SYSERR on error (no such watch)
1183  */
1184 int
1185 GNUNET_STATISTICS_watch_cancel(struct GNUNET_STATISTICS_Handle *handle,
1186                                const char *subsystem,
1187                                const char *name,
1188                                GNUNET_STATISTICS_Iterator proc,
1189                                void *proc_cls)
1190 {
1191   struct GNUNET_STATISTICS_WatchEntry *w;
1192
1193   if (NULL == handle)
1194     return GNUNET_SYSERR;
1195   for (unsigned int i = 0; i < handle->watches_size; i++)
1196     {
1197       w = handle->watches[i];
1198       if (NULL == w)
1199         continue;
1200       if ((w->proc == proc) &&
1201           (w->proc_cls == proc_cls) &&
1202           (0 == strcmp(w->name,
1203                        name)) &&
1204           (0 == strcmp(w->subsystem,
1205                        subsystem)))
1206         {
1207           GNUNET_free(w->name);
1208           GNUNET_free(w->subsystem);
1209           GNUNET_free(w);
1210           handle->watches[i] = NULL;
1211           return GNUNET_OK;
1212         }
1213     }
1214   return GNUNET_SYSERR;
1215 }
1216
1217
1218 /**
1219  * Queue a request to change a statistic.
1220  *
1221  * @param h statistics handle
1222  * @param name name of the value
1223  * @param make_persistent  should the value be kept across restarts?
1224  * @param value new value or change
1225  * @param type type of the action (#ACTION_SET or #ACTION_UPDATE)
1226  */
1227 static void
1228 add_setter_action(struct GNUNET_STATISTICS_Handle *h,
1229                   const char *name,
1230                   int make_persistent,
1231                   uint64_t value,
1232                   enum ActionType type)
1233 {
1234   struct GNUNET_STATISTICS_GetHandle *ai;
1235   size_t slen;
1236   size_t nlen;
1237   size_t nsize;
1238   int64_t delta;
1239
1240   slen = strlen(h->subsystem) + 1;
1241   nlen = strlen(name) + 1;
1242   nsize = sizeof(struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1243   if (nsize >= GNUNET_MAX_MESSAGE_SIZE)
1244     {
1245       GNUNET_break(0);
1246       return;
1247     }
1248   for (ai = h->action_head; NULL != ai; ai = ai->next)
1249     {
1250       if (!((0 == strcmp(ai->subsystem,
1251                          h->subsystem)) &&
1252             (0 == strcmp(ai->name,
1253                          name)) &&
1254             ((ACTION_UPDATE == ai->type) ||
1255              (ACTION_SET == ai->type))))
1256         continue;
1257       if (ACTION_SET == ai->type)
1258         {
1259           if (ACTION_UPDATE == type)
1260             {
1261               delta = (int64_t)value;
1262               if (delta > 0)
1263                 {
1264                   /* update old set by new delta */
1265                   ai->value += delta;
1266                 }
1267               else
1268                 {
1269                   /* update old set by new delta, but never go negative */
1270                   if (ai->value < -delta)
1271                     ai->value = 0;
1272                   else
1273                     ai->value += delta;
1274                 }
1275             }
1276           else
1277             {
1278               /* new set overrides old set */
1279               ai->value = value;
1280             }
1281         }
1282       else
1283         {
1284           if (ACTION_UPDATE == type)
1285             {
1286               /* make delta cummulative */
1287               delta = (int64_t)value;
1288               ai->value += delta;
1289             }
1290           else
1291             {
1292               /* drop old 'update', use new 'set' instead */
1293               ai->value = value;
1294               ai->type = type;
1295             }
1296         }
1297       ai->timeout
1298         = GNUNET_TIME_relative_to_absolute(SET_TRANSMIT_TIMEOUT);
1299       ai->make_persistent
1300         = make_persistent;
1301       return;
1302     }
1303   /* no existing entry matches, create a fresh one */
1304   ai = GNUNET_new(struct GNUNET_STATISTICS_GetHandle);
1305   ai->sh = h;
1306   ai->subsystem = GNUNET_strdup(h->subsystem);
1307   ai->name = GNUNET_strdup(name);
1308   ai->timeout = GNUNET_TIME_relative_to_absolute(SET_TRANSMIT_TIMEOUT);
1309   ai->make_persistent = make_persistent;
1310   ai->msize = nsize;
1311   ai->value = value;
1312   ai->type = type;
1313   GNUNET_CONTAINER_DLL_insert_tail(h->action_head,
1314                                    h->action_tail,
1315                                    ai);
1316   schedule_action(h);
1317 }
1318
1319
1320 /**
1321  * Set statistic value for the peer.  Will always use our
1322  * subsystem (the argument used when "handle" was created).
1323  *
1324  * @param handle identification of the statistics service
1325  * @param name name of the statistic value
1326  * @param value new value to set
1327  * @param make_persistent should the value be kept across restarts?
1328  */
1329 void
1330 GNUNET_STATISTICS_set(struct GNUNET_STATISTICS_Handle *handle,
1331                       const char *name,
1332                       uint64_t value,
1333                       int make_persistent)
1334 {
1335   if (NULL == handle)
1336     return;
1337   GNUNET_assert(GNUNET_NO == handle->do_destroy);
1338   add_setter_action(handle,
1339                     name,
1340                     make_persistent,
1341                     value,
1342                     ACTION_SET);
1343 }
1344
1345
1346 /**
1347  * Set statistic value for the peer.  Will always use our
1348  * subsystem (the argument used when "handle" was created).
1349  *
1350  * @param handle identification of the statistics service
1351  * @param name name of the statistic value
1352  * @param delta change in value (added to existing value)
1353  * @param make_persistent should the value be kept across restarts?
1354  */
1355 void
1356 GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle,
1357                          const char *name,
1358                          int64_t delta,
1359                          int make_persistent)
1360 {
1361   if (NULL == handle)
1362     return;
1363   if (0 == delta)
1364     return;
1365   GNUNET_assert(GNUNET_NO == handle->do_destroy);
1366   add_setter_action(handle,
1367                     name,
1368                     make_persistent,
1369                     (uint64_t)delta,
1370                     ACTION_UPDATE);
1371 }
1372
1373
1374 /* end of statistics_api.c */