missing check
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_client_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_server_lib.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_strings_lib.h"
34 #include "statistics.h"
35
36 /**
37  * How long do we wait until a statistics request for setting
38  * a value times out?  (The update will be lost if the
39  * service does not react within this timeframe).
40  */
41 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "statistics-api",__VA_ARGS__)
44
45 /**
46  * Types of actions.
47  */
48 enum ActionType
49 {
50   /**
51    * Get a value.
52    */
53   ACTION_GET,
54
55   /**
56    * Set a value.
57    */
58   ACTION_SET,
59
60   /**
61    * Update a value.
62    */
63   ACTION_UPDATE,
64
65   /**
66    * Watch a value.
67    */
68   ACTION_WATCH
69 };
70
71
72 /**
73  * Entry kept for each value we are watching.
74  */
75 struct GNUNET_STATISTICS_WatchEntry
76 {
77
78   /**
79    * What subsystem is this action about? (never NULL)
80    */
81   char *subsystem;
82
83   /**
84    * What value is this action about? (never NULL)
85    */
86   char *name;
87
88   /**
89    * Function to call
90    */
91   GNUNET_STATISTICS_Iterator proc;
92
93   /**
94    * Closure for proc
95    */
96   void *proc_cls;
97
98 };
99
100
101 /**
102  * Linked list of things we still need to do.
103  */
104 struct GNUNET_STATISTICS_GetHandle
105 {
106
107   /**
108    * This is a doubly linked list.
109    */
110   struct GNUNET_STATISTICS_GetHandle *next;
111
112   /**
113    * This is a doubly linked list.
114    */
115   struct GNUNET_STATISTICS_GetHandle *prev;
116
117   /**
118    * Main statistics handle.
119    */
120   struct GNUNET_STATISTICS_Handle *sh;
121
122   /**
123    * What subsystem is this action about? (can be NULL)
124    */
125   char *subsystem;
126
127   /**
128    * What value is this action about? (can be NULL)
129    */
130   char *name;
131
132   /**
133    * Continuation to call once action is complete.
134    */
135   GNUNET_STATISTICS_Callback cont;
136
137   /**
138    * Function to call (for GET actions only).
139    */
140   GNUNET_STATISTICS_Iterator proc;
141
142   /**
143    * Closure for proc and cont.
144    */
145   void *cls;
146
147   /**
148    * Timeout for this action.
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Associated value.
154    */
155   uint64_t value;
156
157   /**
158    * Flag for SET/UPDATE actions.
159    */
160   int make_persistent;
161
162   /**
163    * Has the current iteration been aborted; for GET actions.
164    */
165   int aborted;
166
167   /**
168    * Is this a GET, SET, UPDATE or WATCH?
169    */
170   enum ActionType type;
171
172   /**
173    * Size of the message that we will be transmitting.
174    */
175   uint16_t msize;
176
177 };
178
179
180 /**
181  * Handle for the service.
182  */
183 struct GNUNET_STATISTICS_Handle
184 {
185   /**
186    * Name of our subsystem.
187    */
188   char *subsystem;
189
190   /**
191    * Configuration to use.
192    */
193   const struct GNUNET_CONFIGURATION_Handle *cfg;
194
195   /**
196    * Socket (if available).
197    */
198   struct GNUNET_CLIENT_Connection *client;
199
200   /**
201    * Currently pending transmission request.
202    */
203   struct GNUNET_CLIENT_TransmitHandle *th;
204
205   /**
206    * Head of the linked list of pending actions (first action
207    * to be performed).
208    */
209   struct GNUNET_STATISTICS_GetHandle *action_head;
210
211   /**
212    * Tail of the linked list of actions (for fast append).
213    */
214   struct GNUNET_STATISTICS_GetHandle *action_tail;
215
216   /**
217    * Action we are currently busy with (action request has been
218    * transmitted, we're now receiving the response from the
219    * service).
220    */
221   struct GNUNET_STATISTICS_GetHandle *current;
222
223   /**
224    * Array of watch entries.
225    */
226   struct GNUNET_STATISTICS_WatchEntry **watches;
227
228   /**
229    * Task doing exponential back-off trying to reconnect.
230    */
231   GNUNET_SCHEDULER_TaskIdentifier backoff_task;
232
233   /**
234    * Time for next connect retry.
235    */
236   struct GNUNET_TIME_Relative backoff;
237
238   /**
239    * Maximum heap size observed so far (if available).
240    */
241   uint64_t peak_heap_size;
242
243   /**
244    * Maximum resident set side observed so far (if available).
245    */
246   uint64_t peak_rss;
247
248   /**
249    * Size of the 'watches' array.
250    */
251   unsigned int watches_size;
252
253   /**
254    * Should this handle auto-destruct once all actions have
255    * been processed?
256    */
257   int do_destroy;
258
259   /**
260    * Are we currently receiving from the service?
261    */
262   int receiving;
263
264 };
265
266
267 /**
268  * Obtain statistics about this process's memory consumption and
269  * report those as well (if they changed).
270  */
271 static void
272 update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
273 {
274 #if ENABLE_HEAP_STATISTICS
275   uint64_t current_heap_size = 0;
276   uint64_t current_rss = 0;
277
278   if (GNUNET_NO != h->do_destroy)
279     return;
280 #if HAVE_MALLINFO
281   {
282     struct mallinfo mi;
283     
284     mi = mallinfo();
285     current_heap_size = mi.uordblks + mi.fordblks;  
286   }
287 #endif  
288 #if HAVE_GETRUSAGE
289   {
290     struct rusage ru;
291
292     if (0 == getrusage (RUSAGE_SELF, &ru))
293     {
294       current_rss = 1024LL * ru.ru_maxrss;
295     }    
296   }
297 #endif
298   if (current_heap_size > h->peak_heap_size)
299   {
300     h->peak_heap_size = current_heap_size;
301     GNUNET_STATISTICS_set (h, "# peak heap size", current_heap_size, GNUNET_NO);
302   }
303   if (current_rss > h->peak_rss)
304   {
305     h->peak_rss = current_rss;
306     GNUNET_STATISTICS_set (h, "# peak resident set size", current_rss, GNUNET_NO);
307   }
308 #endif
309 }
310
311
312 /**
313  * Schedule the next action to be performed.
314  *
315  * @param h statistics handle to reconnect
316  */
317 static void
318 schedule_action (struct GNUNET_STATISTICS_Handle *h);
319
320
321 /**
322  * Transmit request to service that we want to watch
323  * the development of a particular value.
324  *
325  * @param h statistics handle
326  * @param watch watch entry of the value to watch
327  */
328 static void
329 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
330                         struct GNUNET_STATISTICS_WatchEntry *watch)
331 {
332
333   struct GNUNET_STATISTICS_GetHandle *ai;
334   size_t slen;
335   size_t nlen;
336   size_t nsize;
337
338   GNUNET_assert (NULL != h);
339   GNUNET_assert (NULL != watch);
340
341   slen = strlen (watch->subsystem) + 1;
342   nlen = strlen (watch->name) + 1;
343   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
344   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
345   {
346     GNUNET_break (0);
347     return;
348   }
349   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
350   ai->sh = h;
351   ai->subsystem = GNUNET_strdup (watch->subsystem);
352   ai->name = GNUNET_strdup (watch->name);
353   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
354   ai->msize = nsize;
355   ai->type = ACTION_WATCH;
356   ai->proc = watch->proc;
357   ai->cls = watch->proc_cls;
358   GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
359                                     ai);
360   schedule_action (h);
361 }
362
363
364 /**
365  * Free memory associated with the given action item.
366  *
367  * @param gh action item to free
368  */
369 static void
370 free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
371 {
372   GNUNET_free_non_null (gh->subsystem);
373   GNUNET_free_non_null (gh->name);
374   GNUNET_free (gh);
375 }
376
377
378 /**
379  * Disconnect from the statistics service.
380  *
381  * @param h statistics handle to disconnect from
382  */
383 static void
384 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
385 {
386   struct GNUNET_STATISTICS_GetHandle *c;
387   
388   if (NULL != h->th)
389   {
390     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
391     h->th = NULL;
392   } 
393   if (NULL != h->client)
394   {
395     GNUNET_CLIENT_disconnect (h->client);
396     h->client = NULL;
397   }
398   h->receiving = GNUNET_NO;
399   if (NULL != (c = h->current))
400   {
401     h->current = NULL;
402     if ( (NULL != c->cont) &&
403          (GNUNET_YES != c->aborted) )
404       c->cont (c->cls, GNUNET_SYSERR);
405     free_action_item (c);
406   }
407 }
408
409
410 /**
411  * Try to (re)connect to the statistics service.
412  *
413  * @param h statistics handle to reconnect
414  * @return GNUNET_YES on success, GNUNET_NO on failure.
415  */
416 static int
417 try_connect (struct GNUNET_STATISTICS_Handle *h)
418 {
419   struct GNUNET_STATISTICS_GetHandle *gh;
420   struct GNUNET_STATISTICS_GetHandle *gn;
421   unsigned int i;
422
423   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
424     return GNUNET_NO;
425   if (NULL != h->client)
426     return GNUNET_YES;
427   h->client = GNUNET_CLIENT_connect ("statistics", h->cfg);  
428   if (NULL != h->client)
429   {
430     gn = h->action_head; 
431     while (NULL != (gh = gn))
432     {
433       gn = gh->next;
434       if (gh->type == ACTION_WATCH)
435       {
436         GNUNET_CONTAINER_DLL_remove (h->action_head,
437                                      h->action_tail,
438                                      gh);
439         free_action_item (gh);  
440       }
441     }
442     for (i = 0; i < h->watches_size; i++)
443     {
444       if (NULL != h->watches[i])
445         schedule_watch_request (h, h->watches[i]);
446     }
447     return GNUNET_YES;
448   }
449   LOG (GNUNET_ERROR_TYPE_DEBUG,
450        "Failed to connect to statistics service!\n");
451   return GNUNET_NO;
452 }
453
454
455 /**
456  * We've waited long enough, reconnect now.
457  *
458  * @param cls the 'struct GNUNET_STATISTICS_Handle' to reconnect
459  * @param tc scheduler context (unused)
460  */
461 static void
462 reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
463 {
464   struct GNUNET_STATISTICS_Handle *h = cls;
465
466   h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
467   schedule_action (h);
468 }
469
470
471 /**
472  * Task used by 'reconnect_later' to shutdown the handle
473  *
474  * @param cls the statistics handle
475  * @param tc scheduler context
476  */
477 static void
478 do_destroy (void *cls,
479                const struct GNUNET_SCHEDULER_TaskContext *tc)
480 {
481   struct GNUNET_STATISTICS_Handle *h = cls;
482
483   GNUNET_STATISTICS_destroy (h, GNUNET_NO);
484 }
485
486
487 /**
488  * Reconnect at a later time, respecting back-off.
489  *
490  * @param h statistics handle
491  */
492 static void
493 reconnect_later (struct GNUNET_STATISTICS_Handle *h)
494 {
495   int loss;
496   struct GNUNET_STATISTICS_GetHandle *gh;
497
498   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == h->backoff_task);
499   if (GNUNET_YES == h->do_destroy)
500   {
501     /* So we are shutting down and the service is not reachable.
502      * Chances are that it's down for good and we are not going to connect to
503      * it anymore.
504      * Give up and don't sync the rest of the data.
505      */
506     loss = GNUNET_NO;
507     for (gh = h->action_head; NULL != gh; gh = gh->next)
508       if ( (gh->make_persistent) && (ACTION_SET == gh->type) )
509         loss = GNUNET_YES;
510     if (GNUNET_YES == loss)
511       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
512                   _("Could not save some persistent statistics\n"));
513     h->do_destroy = GNUNET_NO;
514     GNUNET_SCHEDULER_add_continuation (&do_destroy, h,
515                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
516     return;
517   }
518   h->backoff_task =
519     GNUNET_SCHEDULER_add_delayed (h->backoff, &reconnect_task, h);
520   h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
521 }
522
523
524 /**
525  * Process a 'GNUNET_MESSAGE_TYPE_STATISTICS_VALUE' message.
526  *
527  * @param h statistics handle
528  * @param msg message received from the service, never NULL
529  * @return GNUNET_OK if the message was well-formed
530  */
531 static int
532 process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h,
533                                   const struct GNUNET_MessageHeader *msg)
534 {
535   char *service;
536   char *name;
537   const struct GNUNET_STATISTICS_ReplyMessage *smsg;
538   uint16_t size;
539
540   if (h->current->aborted)
541   {
542     LOG (GNUNET_ERROR_TYPE_DEBUG, "Iteration was aborted, ignoring VALUE\n");
543     return GNUNET_OK;           /* don't bother */
544   }
545   size = ntohs (msg->size);
546   if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
547   {
548     GNUNET_break (0);
549     return GNUNET_SYSERR;
550   }
551   smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
552   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
553   if (size !=
554       GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], size, 2,
555                                       &service, &name))
556   {
557     GNUNET_break (0);
558     return GNUNET_SYSERR;
559   }
560   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received valid statistic on `%s:%s': %llu\n",
561        service, name, GNUNET_ntohll (smsg->value));
562   if (GNUNET_OK !=
563       h->current->proc (h->current->cls, service, name,
564                         GNUNET_ntohll (smsg->value),
565                         0 !=
566                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
567   {
568     LOG (GNUNET_ERROR_TYPE_DEBUG,
569          "Processing of remaining statistics aborted by client.\n");
570     h->current->aborted = GNUNET_YES;
571   }
572   LOG (GNUNET_ERROR_TYPE_DEBUG, "VALUE processed successfully\n");
573   return GNUNET_OK;
574 }
575
576
577 /**
578  * We have received a watch value from the service.  Process it.
579  *
580  * @param h statistics handle
581  * @param msg the watch value message
582  * @return GNUNET_OK if the message was well-formed, GNUNET_SYSERR if not,
583  *         GNUNET_NO if this watch has been cancelled
584  */
585 static int
586 process_watch_value (struct GNUNET_STATISTICS_Handle *h,
587                      const struct GNUNET_MessageHeader *msg)
588 {
589   const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
590   struct GNUNET_STATISTICS_WatchEntry *w;
591   uint32_t wid;
592
593   if (sizeof (struct GNUNET_STATISTICS_WatchValueMessage) != ntohs (msg->size))
594   {
595     GNUNET_break (0);
596     return GNUNET_SYSERR;
597   }
598   wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *) msg;
599   GNUNET_break (0 == ntohl (wvm->reserved));
600   wid = ntohl (wvm->wid);
601   if (wid >= h->watches_size)
602   {
603     GNUNET_break (0);
604     return GNUNET_SYSERR;
605   }
606   w = h->watches[wid];
607   if (NULL == w)  
608     return GNUNET_NO;  
609   (void) w->proc (w->proc_cls, w->subsystem, w->name,
610                   GNUNET_ntohll (wvm->value),
611                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
612   return GNUNET_OK;
613 }
614
615
616 static void
617 destroy_task (void *cls,
618               const struct GNUNET_SCHEDULER_TaskContext *tc)
619 {
620   struct GNUNET_STATISTICS_Handle *h = cls;
621
622   GNUNET_STATISTICS_destroy (h, GNUNET_NO);
623 }
624
625
626 /**
627  * Function called with messages from stats service.
628  *
629  * @param cls closure
630  * @param msg message received, NULL on timeout or fatal error
631  */
632 static void
633 receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
634 {
635   struct GNUNET_STATISTICS_Handle *h = cls;
636   struct GNUNET_STATISTICS_GetHandle *c;
637   int ret;
638
639   if (NULL == msg)
640   {
641     LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
642          "Error receiving statistics from service, is the service running?\n");
643     do_disconnect (h);
644     reconnect_later (h);
645     return;
646   }
647   switch (ntohs (msg->type))
648   {
649   case GNUNET_MESSAGE_TYPE_TEST:
650     if (GNUNET_SYSERR != h->do_destroy)
651     {
652       /* not in shutdown, why do we get 'TEST'? */
653       GNUNET_break (0);
654       do_disconnect (h);
655       reconnect_later (h);
656       return;
657     }
658     h->do_destroy = GNUNET_NO;
659     GNUNET_SCHEDULER_add_continuation (&destroy_task, h,
660                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
661     break;
662   case GNUNET_MESSAGE_TYPE_STATISTICS_END:
663     LOG (GNUNET_ERROR_TYPE_DEBUG, "Received end of statistics marker\n");
664     if (NULL == (c = h->current))
665     {
666       GNUNET_break (0);
667       do_disconnect (h);
668       reconnect_later (h);
669       return;
670     }
671     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
672     if (h->watches_size > 0)
673     {
674       GNUNET_CLIENT_receive (h->client, &receive_stats, h,
675                              GNUNET_TIME_UNIT_FOREVER_REL);
676     }
677     else
678     {
679       h->receiving = GNUNET_NO;
680     }    
681     h->current = NULL;
682     schedule_action (h);
683     if (NULL != c->cont)
684       c->cont (c->cls, GNUNET_OK);
685     free_action_item (c);
686     return;
687   case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
688     if (GNUNET_OK != process_statistics_value_message (h, msg))
689     {
690       do_disconnect (h);
691       reconnect_later (h);
692       return;     
693     }
694     /* finally, look for more! */
695     LOG (GNUNET_ERROR_TYPE_DEBUG,
696          "Processing VALUE done, now reading more\n");
697     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
698                            GNUNET_TIME_absolute_get_remaining (h->
699                                                                current->timeout));
700     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
701     return;
702   case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
703     if (GNUNET_OK != 
704         (ret = process_watch_value (h, msg)))
705     {
706       do_disconnect (h);
707       if (GNUNET_NO == ret)
708         h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; 
709       reconnect_later (h);
710       return;
711     }
712     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
713     GNUNET_assert (h->watches_size > 0);
714     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
715                            GNUNET_TIME_UNIT_FOREVER_REL);
716     return;    
717   default:
718     GNUNET_break (0);
719     do_disconnect (h);
720     reconnect_later (h);
721     return;
722   }
723 }
724
725
726 /**
727  * Transmit a GET request (and if successful, start to receive
728  * the response).
729  *
730  * @param handle statistics handle
731  * @param size how many bytes can we write to buf
732  * @param buf where to write requests to the service
733  * @return number of bytes written to buf
734  */
735 static size_t
736 transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
737 {
738   struct GNUNET_STATISTICS_GetHandle *c;
739   struct GNUNET_MessageHeader *hdr;
740   size_t slen1;
741   size_t slen2;
742   uint16_t msize;
743
744   GNUNET_assert (NULL != (c = handle->current));
745   if (NULL == buf)
746   {
747     /* timeout / error */
748     LOG (GNUNET_ERROR_TYPE_DEBUG,
749          "Transmission of request for statistics failed!\n");
750     do_disconnect (handle);
751     reconnect_later (handle);
752     return 0;
753   }
754   slen1 = strlen (c->subsystem) + 1;
755   slen2 = strlen (c->name) + 1;
756   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
757   GNUNET_assert (msize <= size);
758   hdr = (struct GNUNET_MessageHeader *) buf;
759   hdr->size = htons (msize);
760   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
761   GNUNET_assert (slen1 + slen2 ==
762                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
763                                              c->subsystem,
764                                              c->name));
765   if (GNUNET_YES != handle->receiving)
766   {
767     LOG (GNUNET_ERROR_TYPE_DEBUG,
768          "Transmission of GET done, now reading response\n");
769     handle->receiving = GNUNET_YES;
770     GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
771                            GNUNET_TIME_absolute_get_remaining (c->timeout));
772   }
773   return msize;
774 }
775
776
777 /**
778  * Transmit a WATCH request (and if successful, start to receive
779  * the response).
780  *
781  * @param handle statistics handle
782  * @param size how many bytes can we write to buf
783  * @param buf where to write requests to the service
784  * @return number of bytes written to buf
785  */
786 static size_t
787 transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
788 {
789   struct GNUNET_MessageHeader *hdr;
790   size_t slen1;
791   size_t slen2;
792   uint16_t msize;
793
794   if (NULL == buf)
795   {
796     /* timeout / error */
797     LOG (GNUNET_ERROR_TYPE_DEBUG,
798          "Transmission of request for statistics failed!\n");
799     do_disconnect (handle);
800     reconnect_later (handle);
801     return 0;
802   }
803   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting watch request for `%s'\n",
804        handle->current->name);
805   slen1 = strlen (handle->current->subsystem) + 1;
806   slen2 = strlen (handle->current->name) + 1;
807   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
808   GNUNET_assert (msize <= size);
809   hdr = (struct GNUNET_MessageHeader *) buf;
810   hdr->size = htons (msize);
811   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
812   GNUNET_assert (slen1 + slen2 ==
813                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
814                                              handle->current->subsystem,
815                                              handle->current->name));
816   if (GNUNET_YES != handle->receiving)
817   {
818     handle->receiving = GNUNET_YES;
819     GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
820                            GNUNET_TIME_UNIT_FOREVER_REL);
821   }
822   GNUNET_assert (NULL == handle->current->cont);
823   free_action_item (handle->current);
824   handle->current = NULL;
825   return msize;
826 }
827
828
829 /**
830  * Transmit a SET/UPDATE request.
831  *
832  * @param handle statistics handle
833  * @param size how many bytes can we write to buf
834  * @param buf where to write requests to the service
835  * @return number of bytes written to buf
836  */
837 static size_t
838 transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
839 {
840   struct GNUNET_STATISTICS_SetMessage *r;
841   size_t slen;
842   size_t nlen;
843   size_t nsize;
844
845   if (NULL == buf)
846   {
847     do_disconnect (handle);
848     reconnect_later (handle);
849     return 0;
850   }
851   slen = strlen (handle->current->subsystem) + 1;
852   nlen = strlen (handle->current->name) + 1;
853   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
854   if (size < nsize)
855   {
856     GNUNET_break (0);
857     do_disconnect (handle);
858     reconnect_later (handle);
859     return 0;
860   }
861   r = buf;
862   r->header.size = htons (nsize);
863   r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
864   r->flags = 0;
865   r->value = GNUNET_htonll (handle->current->value);
866   if (handle->current->make_persistent)
867     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
868   if (handle->current->type == ACTION_UPDATE)
869     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
870   GNUNET_assert (slen + nlen ==
871                  GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2,
872                                              handle->current->subsystem,
873                                              handle->current->name));
874   GNUNET_assert (NULL == handle->current->cont);
875   free_action_item (handle->current);
876   handle->current = NULL;
877   update_memory_statistics (handle);
878   return nsize;
879 }
880
881
882 /**
883  * Function called when we are ready to transmit a request to the service.
884  *
885  * @param cls the 'struct GNUNET_STATISTICS_Handle'
886  * @param size how many bytes can we write to buf
887  * @param buf where to write requests to the service
888  * @return number of bytes written to buf
889  */
890 static size_t
891 transmit_action (void *cls, size_t size, void *buf)
892 {
893   struct GNUNET_STATISTICS_Handle *h = cls;
894   size_t ret;
895
896   h->th = NULL;
897   ret = 0;
898   if (NULL != h->current)
899     switch (h->current->type)
900     {
901     case ACTION_GET:
902       ret = transmit_get (h, size, buf);
903       break;
904     case ACTION_SET:
905     case ACTION_UPDATE:
906       ret = transmit_set (h, size, buf);
907       break;
908     case ACTION_WATCH:
909       ret = transmit_watch (h, size, buf);
910       break;
911     default:
912       GNUNET_assert (0);
913       break;
914     }
915   schedule_action (h);
916   return ret;
917 }
918
919
920 /**
921  * Get handle for the statistics service.
922  *
923  * @param subsystem name of subsystem using the service
924  * @param cfg services configuration in use
925  * @return handle to use
926  */
927 struct GNUNET_STATISTICS_Handle *
928 GNUNET_STATISTICS_create (const char *subsystem,
929                           const struct GNUNET_CONFIGURATION_Handle *cfg)
930 {
931   struct GNUNET_STATISTICS_Handle *ret;
932
933   if (GNUNET_YES ==
934       GNUNET_CONFIGURATION_get_value_yesno (cfg, "statistics", "DISABLE"))
935     return NULL;
936   GNUNET_assert (NULL != subsystem);
937   GNUNET_assert (NULL != cfg);
938   ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
939   ret->cfg = cfg;
940   ret->subsystem = GNUNET_strdup (subsystem);
941   ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
942   return ret;
943 }
944
945
946 /**
947  * Destroy a handle (free all state associated with
948  * it).
949  *
950  * @param h statistics handle to destroy
951  * @param sync_first set to GNUNET_YES if pending SET requests should
952  *        be completed
953  */
954 void
955 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
956 {
957   struct GNUNET_STATISTICS_GetHandle *pos;
958   struct GNUNET_STATISTICS_GetHandle *next;
959   struct GNUNET_TIME_Relative timeout;
960   int i;
961
962   if (NULL == h)
963     return;
964   GNUNET_assert (GNUNET_NO == h->do_destroy); // Don't call twice.
965   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
966   {
967     GNUNET_SCHEDULER_cancel (h->backoff_task);
968     h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
969   }
970   if (sync_first)
971   {
972     if (NULL != h->current)
973     {
974       if (ACTION_GET == h->current->type)
975       {
976         GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
977         h->th = NULL;
978         free_action_item (h->current);
979         h->current = NULL;
980       }
981     }
982     next = h->action_head; 
983     while (NULL != (pos = next))
984     {
985       next = pos->next;
986       if (ACTION_GET == pos->type)
987       {
988         GNUNET_CONTAINER_DLL_remove (h->action_head,
989                                      h->action_tail,
990                                      pos);
991         free_action_item (pos);
992       }
993     }
994     if ( (NULL == h->current) &&
995          (NULL != (h->current = h->action_head)) )
996       GNUNET_CONTAINER_DLL_remove (h->action_head,
997                                    h->action_tail,
998                                    h->current);
999     h->do_destroy = GNUNET_YES;
1000     if ((NULL != h->current) && (NULL == h->th) &&
1001         (NULL != h->client))
1002     {
1003       timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
1004       h->th =
1005         GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
1006                                              timeout, GNUNET_YES,
1007                                              &transmit_action, h);
1008       GNUNET_assert (NULL != h->th);
1009     }
1010     if (NULL != h->th)
1011       return; /* do not finish destruction just yet */
1012   }
1013   while (NULL != (pos = h->action_head))
1014   {
1015     GNUNET_CONTAINER_DLL_remove (h->action_head,
1016                                  h->action_tail,
1017                                  pos);
1018     free_action_item (pos);
1019   }
1020   do_disconnect (h);
1021   for (i = 0; i < h->watches_size; i++)
1022   {
1023     if (NULL == h->watches[i])
1024       continue; 
1025     GNUNET_free (h->watches[i]->subsystem);
1026     GNUNET_free (h->watches[i]->name);
1027     GNUNET_free (h->watches[i]);
1028   }
1029   GNUNET_array_grow (h->watches, h->watches_size, 0);
1030   GNUNET_free (h->subsystem);
1031   GNUNET_free (h);
1032 }
1033
1034
1035 /**
1036  * Function called to transmit TEST message to service to
1037  * confirm that the service has received all of our 'SET'
1038  * messages (during statistics disconnect/shutdown).
1039  *
1040  * @param cls the 'struct GNUNET_STATISTICS_Handle'
1041  * @param size how many bytes can we write to buf
1042  * @param buf where to write requests to the service
1043  * @return number of bytes written to buf
1044  */
1045 static size_t
1046 transmit_test_on_shutdown (void *cls,
1047                            size_t size,
1048                            void *buf)
1049 {
1050   struct GNUNET_STATISTICS_Handle *h = cls;
1051   struct GNUNET_MessageHeader hdr;
1052
1053   h->th = NULL;
1054   if (NULL == buf)
1055   {
1056     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1057                 _("Failed to receive acknowledgement from statistics service, some statistics might have been lost!\n"));
1058     h->do_destroy = GNUNET_NO;
1059     GNUNET_SCHEDULER_add_continuation (&destroy_task, h,
1060                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1061     return 0;
1062   }
1063   hdr.type = htons (GNUNET_MESSAGE_TYPE_TEST);
1064   hdr.size = htons (sizeof (struct GNUNET_MessageHeader));
1065   memcpy (buf, &hdr, sizeof (hdr));
1066   if (GNUNET_YES != h->receiving)
1067   {
1068     h->receiving = GNUNET_YES;
1069     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
1070                            GNUNET_TIME_UNIT_FOREVER_REL);
1071   }
1072   return sizeof (struct GNUNET_MessageHeader);
1073 }
1074
1075
1076 /**
1077  * Schedule the next action to be performed.
1078  *
1079  * @param h statistics handle
1080  */
1081 static void
1082 schedule_action (struct GNUNET_STATISTICS_Handle *h)
1083 {
1084   struct GNUNET_TIME_Relative timeout;
1085
1086   if ( (NULL != h->th) ||
1087        (GNUNET_SCHEDULER_NO_TASK != h->backoff_task) )
1088     return;                     /* action already pending */
1089   if (GNUNET_YES != try_connect (h))
1090   {
1091     reconnect_later (h);
1092     return;
1093   }
1094   if (NULL != h->current)
1095     return; /* action already pending */
1096   /* schedule next action */
1097   h->current = h->action_head;
1098   if (NULL == h->current)
1099   {
1100     if (GNUNET_YES == h->do_destroy)
1101     {
1102       h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
1103       h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
1104                                                    sizeof (struct GNUNET_MessageHeader),
1105                                                    SET_TRANSMIT_TIMEOUT,
1106                                                    GNUNET_NO,
1107                                                    &transmit_test_on_shutdown, h);
1108     }
1109     return;
1110   }
1111   GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current);
1112   timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
1113   if (NULL ==
1114       (h->th =
1115        GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
1116                                             timeout, GNUNET_YES,
1117                                             &transmit_action, h)))
1118   {
1119     LOG (GNUNET_ERROR_TYPE_DEBUG,
1120          "Failed to transmit request to statistics service.\n");
1121     do_disconnect (h);
1122     reconnect_later (h);
1123   }
1124 }
1125
1126
1127 /**
1128  * Get statistic from the peer.
1129  *
1130  * @param handle identification of the statistics service
1131  * @param subsystem limit to the specified subsystem, NULL for our subsystem
1132  * @param name name of the statistic value, NULL for all values
1133  * @param timeout after how long should we give up (and call
1134  *        cont with an error code)?
1135  * @param cont continuation to call when done (can be NULL)
1136  *        This callback CANNOT destroy the statistics handle in the same call.
1137  * @param proc function to call on each value
1138  * @param cls closure for cont and proc
1139  * @return NULL on error
1140  */
1141 struct GNUNET_STATISTICS_GetHandle *
1142 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1143                        const char *subsystem, const char *name,
1144                        struct GNUNET_TIME_Relative timeout,
1145                        GNUNET_STATISTICS_Callback cont,
1146                        GNUNET_STATISTICS_Iterator proc, void *cls)
1147 {
1148   size_t slen1;
1149   size_t slen2;
1150   struct GNUNET_STATISTICS_GetHandle *ai;
1151
1152   if (NULL == handle)
1153     return NULL;
1154   GNUNET_assert (NULL != proc);
1155   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1156   if (NULL == subsystem)
1157     subsystem = "";
1158   if (NULL == name)
1159     name = "";
1160   slen1 = strlen (subsystem) + 1;
1161   slen2 = strlen (name) + 1;
1162   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
1163                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1164   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1165   ai->sh = handle;
1166   ai->subsystem = GNUNET_strdup (subsystem);
1167   ai->name = GNUNET_strdup (name);
1168   ai->cont = cont;
1169   ai->proc = proc;
1170   ai->cls = cls;
1171   ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1172   ai->type = ACTION_GET;
1173   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
1174   GNUNET_CONTAINER_DLL_insert_tail (handle->action_head, handle->action_tail,
1175                                     ai);
1176   schedule_action (handle);
1177   return ai;
1178 }
1179
1180
1181 /**
1182  * Cancel a 'get' request.  Must be called before the 'cont'
1183  * function is called.
1184  *
1185  * @param gh handle of the request to cancel
1186  */
1187 void
1188 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1189 {
1190   if (NULL == gh)
1191     return;
1192   gh->cont = NULL;
1193   if (gh->sh->current == gh)
1194   {
1195     gh->aborted = GNUNET_YES;
1196   }
1197   else
1198   {
1199     GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, gh->sh->action_tail, gh);
1200     GNUNET_free (gh->name);
1201     GNUNET_free (gh->subsystem);
1202     GNUNET_free (gh);
1203   }
1204 }
1205
1206
1207 /**
1208  * Watch statistics from the peer (be notified whenever they change).
1209  *
1210  * @param handle identification of the statistics service
1211  * @param subsystem limit to the specified subsystem, never NULL
1212  * @param name name of the statistic value, never NULL
1213  * @param proc function to call on each value
1214  * @param proc_cls closure for proc
1215  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1216  */
1217 int
1218 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1219                          const char *subsystem, const char *name,
1220                          GNUNET_STATISTICS_Iterator proc, void *proc_cls)
1221 {
1222   struct GNUNET_STATISTICS_WatchEntry *w;
1223
1224   if (NULL == handle)
1225     return GNUNET_SYSERR;
1226   w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
1227   w->subsystem = GNUNET_strdup (subsystem);
1228   w->name = GNUNET_strdup (name);
1229   w->proc = proc;
1230   w->proc_cls = proc_cls;
1231   GNUNET_array_append (handle->watches, handle->watches_size, w);
1232   schedule_watch_request (handle, w);
1233   return GNUNET_OK;
1234 }
1235
1236
1237 /**
1238  * Stop watching statistics from the peer.  
1239  *
1240  * @param handle identification of the statistics service
1241  * @param subsystem limit to the specified subsystem, never NULL
1242  * @param name name of the statistic value, never NULL
1243  * @param proc function to call on each value
1244  * @param proc_cls closure for proc
1245  * @return GNUNET_OK on success, GNUNET_SYSERR on error (no such watch)
1246  */
1247 int
1248 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
1249                                 const char *subsystem, const char *name,
1250                                 GNUNET_STATISTICS_Iterator proc, void *proc_cls)
1251 {
1252   struct GNUNET_STATISTICS_WatchEntry *w;
1253   unsigned int i;
1254
1255   if (NULL == handle)
1256     return GNUNET_SYSERR;
1257   for (i=0;i<handle->watches_size;i++)
1258   {
1259     w = handle->watches[i];
1260     if (NULL == w)
1261       continue;
1262     if ( (w->proc == proc) &&
1263          (w->proc_cls == proc_cls) &&
1264          (0 == strcmp (w->name, name)) &&
1265          (0 == strcmp (w->subsystem, subsystem)) )
1266     {
1267       GNUNET_free (w->name);
1268       GNUNET_free (w->subsystem);
1269       GNUNET_free (w);
1270       handle->watches[i] = NULL;      
1271       return GNUNET_OK;
1272     }    
1273   }
1274   return GNUNET_SYSERR;
1275 }
1276
1277
1278
1279 /**
1280  * Queue a request to change a statistic.
1281  *
1282  * @param h statistics handle
1283  * @param name name of the value
1284  * @param make_persistent  should the value be kept across restarts?
1285  * @param value new value or change
1286  * @param type type of the action (ACTION_SET or ACTION_UPDATE)
1287  */
1288 static void
1289 add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
1290                    int make_persistent, uint64_t value, enum ActionType type)
1291 {
1292   struct GNUNET_STATISTICS_GetHandle *ai;
1293   size_t slen;
1294   size_t nlen;
1295   size_t nsize;
1296   int64_t delta;
1297
1298   GNUNET_assert (NULL != h);
1299   GNUNET_assert (NULL != name);
1300   slen = strlen (h->subsystem) + 1;
1301   nlen = strlen (name) + 1;
1302   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1303   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1304   {
1305     GNUNET_break (0);
1306     return;
1307   }
1308   for (ai = h->action_head; NULL != ai; ai = ai->next)
1309   {
1310     if (! ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
1311             (0 == strcmp (ai->name, name)) && 
1312             ( (ACTION_UPDATE == ai->type) ||
1313               (ACTION_SET == ai->type) ) ) )
1314       continue;
1315     if (ACTION_SET == ai->type)
1316     {
1317       if (ACTION_UPDATE == type)
1318       {
1319         delta = (int64_t) value;
1320         if (delta > 0)
1321         {
1322           /* update old set by new delta */
1323           ai->value += delta;
1324         }
1325         else
1326         {
1327           /* update old set by new delta, but never go negative */
1328           if (ai->value < -delta)
1329             ai->value = 0;
1330           else
1331             ai->value += delta;
1332         }
1333       }
1334       else
1335       {
1336         /* new set overrides old set */
1337         ai->value = value;
1338       }
1339     }
1340     else
1341     {
1342       if (ACTION_UPDATE == type)
1343       {
1344         /* make delta cummulative */
1345         delta = (int64_t) value;
1346         ai->value += delta;
1347       }
1348       else
1349       {
1350         /* drop old 'update', use new 'set' instead */
1351         ai->value = value;
1352         ai->type = type;
1353       }
1354     }
1355     ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1356     ai->make_persistent = make_persistent;
1357     return;  
1358   }
1359   /* no existing entry matches, create a fresh one */
1360   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1361   ai->sh = h;
1362   ai->subsystem = GNUNET_strdup (h->subsystem);
1363   ai->name = GNUNET_strdup (name);
1364   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1365   ai->make_persistent = make_persistent;
1366   ai->msize = nsize;
1367   ai->value = value;
1368   ai->type = type;
1369   GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
1370                                     ai);
1371   schedule_action (h);
1372 }
1373
1374
1375 /**
1376  * Set statistic value for the peer.  Will always use our
1377  * subsystem (the argument used when "handle" was created).
1378  *
1379  * @param handle identification of the statistics service
1380  * @param name name of the statistic value
1381  * @param value new value to set
1382  * @param make_persistent should the value be kept across restarts?
1383  */
1384 void
1385 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1386                        const char *name, uint64_t value, int make_persistent)
1387 {
1388   if (NULL == handle)
1389     return;
1390   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1391   add_setter_action (handle, name, make_persistent, value, ACTION_SET);
1392 }
1393
1394
1395 /**
1396  * Set statistic value for the peer.  Will always use our
1397  * subsystem (the argument used when "handle" was created).
1398  *
1399  * @param handle identification of the statistics service
1400  * @param name name of the statistic value
1401  * @param delta change in value (added to existing value)
1402  * @param make_persistent should the value be kept across restarts?
1403  */
1404 void
1405 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1406                           const char *name, int64_t delta, int make_persistent)
1407 {
1408   if (NULL == handle)
1409     return;
1410   if (0 == delta)
1411     return;
1412   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1413   add_setter_action (handle, name, make_persistent, (uint64_t) delta,
1414                      ACTION_UPDATE);
1415 }
1416
1417
1418 /* end of statistics_api.c */