0f4149ead29d580d98ee35ff01e5132b7132bcfb
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_client_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_server_lib.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_strings_lib.h"
34 #include "statistics.h"
35
36 /**
37  * How long do we wait until a statistics request for setting
38  * a value times out?  (The update will be lost if the
39  * service does not react within this timeframe).
40  */
41 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "statistics-api",__VA_ARGS__)
44
45 /**
46  * Types of actions.
47  */
48 enum ActionType
49 {
50   /**
51    * Get a value.
52    */
53   ACTION_GET,
54
55   /**
56    * Set a value.
57    */
58   ACTION_SET,
59
60   /**
61    * Update a value.
62    */
63   ACTION_UPDATE,
64
65   /**
66    * Watch a value.
67    */
68   ACTION_WATCH
69 };
70
71
72 /**
73  * Entry kept for each value we are watching.
74  */
75 struct GNUNET_STATISTICS_WatchEntry
76 {
77
78   /**
79    * What subsystem is this action about? (never NULL)
80    */
81   char *subsystem;
82
83   /**
84    * What value is this action about? (never NULL)
85    */
86   char *name;
87
88   /**
89    * Function to call
90    */
91   GNUNET_STATISTICS_Iterator proc;
92
93   /**
94    * Closure for proc
95    */
96   void *proc_cls;
97
98 };
99
100
101 /**
102  * Linked list of things we still need to do.
103  */
104 struct GNUNET_STATISTICS_GetHandle
105 {
106
107   /**
108    * This is a doubly linked list.
109    */
110   struct GNUNET_STATISTICS_GetHandle *next;
111
112   /**
113    * This is a doubly linked list.
114    */
115   struct GNUNET_STATISTICS_GetHandle *prev;
116
117   /**
118    * Main statistics handle.
119    */
120   struct GNUNET_STATISTICS_Handle *sh;
121
122   /**
123    * What subsystem is this action about? (can be NULL)
124    */
125   char *subsystem;
126
127   /**
128    * What value is this action about? (can be NULL)
129    */
130   char *name;
131
132   /**
133    * Continuation to call once action is complete.
134    */
135   GNUNET_STATISTICS_Callback cont;
136
137   /**
138    * Function to call (for GET actions only).
139    */
140   GNUNET_STATISTICS_Iterator proc;
141
142   /**
143    * Closure for proc and cont.
144    */
145   void *cls;
146
147   /**
148    * Timeout for this action.
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Associated value.
154    */
155   uint64_t value;
156
157   /**
158    * Flag for SET/UPDATE actions.
159    */
160   int make_persistent;
161
162   /**
163    * Has the current iteration been aborted; for GET actions.
164    */
165   int aborted;
166
167   /**
168    * Is this a GET, SET, UPDATE or WATCH?
169    */
170   enum ActionType type;
171
172   /**
173    * Size of the message that we will be transmitting.
174    */
175   uint16_t msize;
176
177 };
178
179
180 /**
181  * Handle for the service.
182  */
183 struct GNUNET_STATISTICS_Handle
184 {
185   /**
186    * Name of our subsystem.
187    */
188   char *subsystem;
189
190   /**
191    * Configuration to use.
192    */
193   const struct GNUNET_CONFIGURATION_Handle *cfg;
194
195   /**
196    * Socket (if available).
197    */
198   struct GNUNET_CLIENT_Connection *client;
199
200   /**
201    * Currently pending transmission request.
202    */
203   struct GNUNET_CLIENT_TransmitHandle *th;
204
205   /**
206    * Head of the linked list of pending actions (first action
207    * to be performed).
208    */
209   struct GNUNET_STATISTICS_GetHandle *action_head;
210
211   /**
212    * Tail of the linked list of actions (for fast append).
213    */
214   struct GNUNET_STATISTICS_GetHandle *action_tail;
215
216   /**
217    * Action we are currently busy with (action request has been
218    * transmitted, we're now receiving the response from the
219    * service).
220    */
221   struct GNUNET_STATISTICS_GetHandle *current;
222
223   /**
224    * Array of watch entries.
225    */
226   struct GNUNET_STATISTICS_WatchEntry **watches;
227
228   /**
229    * Task doing exponential back-off trying to reconnect.
230    */
231   GNUNET_SCHEDULER_TaskIdentifier backoff_task;
232
233   /**
234    * Time for next connect retry.
235    */
236   struct GNUNET_TIME_Relative backoff;
237
238   /**
239    * Maximum heap size observed so far (if available).
240    */
241   uint64_t peak_heap_size;
242
243   /**
244    * Maximum resident set side observed so far (if available).
245    */
246   uint64_t peak_rss;
247
248   /**
249    * Size of the 'watches' array.
250    */
251   unsigned int watches_size;
252
253   /**
254    * Should this handle auto-destruct once all actions have
255    * been processed?
256    */
257   int do_destroy;
258
259   /**
260    * Are we currently receiving from the service?
261    */
262   int receiving;
263
264 };
265
266
267 /**
268  * Obtain statistics about this process's memory consumption and
269  * report those as well (if they changed).
270  */
271 static void
272 update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
273 {
274 #if ENABLE_HEAP_STATISTICS
275   uint64_t current_heap_size = 0;
276   uint64_t current_rss = 0;
277
278   if (GNUNET_NO != h->do_destroy)
279     return;
280 #if HAVE_MALLINFO
281   {
282     struct mallinfo mi;
283     
284     mi = mallinfo();
285     current_heap_size = mi.uordblks + mi.fordblks;  
286   }
287 #endif  
288 #if HAVE_GETRUSAGE
289   {
290     struct rusage ru;
291
292     if (0 == getrusage (RUSAGE_SELF, &ru))
293     {
294       current_rss = 1024LL * ru.ru_maxrss;
295     }    
296   }
297 #endif
298   if (current_heap_size > h->peak_heap_size)
299   {
300     h->peak_heap_size = current_heap_size;
301     GNUNET_STATISTICS_set (h, "# peak heap size", current_heap_size, GNUNET_NO);
302   }
303   if (current_rss > h->peak_rss)
304   {
305     h->peak_rss = current_rss;
306     GNUNET_STATISTICS_set (h, "# peak resident set size", current_rss, GNUNET_NO);
307   }
308 #endif
309 }
310
311
312 /**
313  * Schedule the next action to be performed.
314  *
315  * @param h statistics handle to reconnect
316  */
317 static void
318 schedule_action (struct GNUNET_STATISTICS_Handle *h);
319
320
321 /**
322  * Transmit request to service that we want to watch
323  * the development of a particular value.
324  *
325  * @param h statistics handle
326  * @param watch watch entry of the value to watch
327  */
328 static void
329 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
330                         struct GNUNET_STATISTICS_WatchEntry *watch)
331 {
332
333   struct GNUNET_STATISTICS_GetHandle *ai;
334   size_t slen;
335   size_t nlen;
336   size_t nsize;
337
338   GNUNET_assert (NULL != h);
339   GNUNET_assert (NULL != watch);
340
341   slen = strlen (watch->subsystem) + 1;
342   nlen = strlen (watch->name) + 1;
343   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
344   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
345   {
346     GNUNET_break (0);
347     return;
348   }
349   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
350   ai->sh = h;
351   ai->subsystem = GNUNET_strdup (watch->subsystem);
352   ai->name = GNUNET_strdup (watch->name);
353   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
354   ai->msize = nsize;
355   ai->type = ACTION_WATCH;
356   ai->proc = watch->proc;
357   ai->cls = watch->proc_cls;
358   GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
359                                     ai);
360   schedule_action (h);
361 }
362
363
364 /**
365  * Free memory associated with the given action item.
366  *
367  * @param gh action item to free
368  */
369 static void
370 free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
371 {
372   GNUNET_free_non_null (gh->subsystem);
373   GNUNET_free_non_null (gh->name);
374   GNUNET_free (gh);
375 }
376
377
378 /**
379  * Disconnect from the statistics service.
380  *
381  * @param h statistics handle to disconnect from
382  */
383 static void
384 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
385 {
386   struct GNUNET_STATISTICS_GetHandle *c;
387   
388   if (NULL != h->th)
389   {
390     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
391     h->th = NULL;
392   } 
393   if (NULL != h->client)
394   {
395     GNUNET_CLIENT_disconnect (h->client);
396     h->client = NULL;
397   }
398   h->receiving = GNUNET_NO;
399   if (NULL != (c = h->current))
400   {
401     h->current = NULL;
402     if (NULL != c->cont)
403       c->cont (c->cls, GNUNET_SYSERR);
404     free_action_item (c);
405   }
406 }
407
408
409 /**
410  * Try to (re)connect to the statistics service.
411  *
412  * @param h statistics handle to reconnect
413  * @return GNUNET_YES on success, GNUNET_NO on failure.
414  */
415 static int
416 try_connect (struct GNUNET_STATISTICS_Handle *h)
417 {
418   struct GNUNET_STATISTICS_GetHandle *gh;
419   struct GNUNET_STATISTICS_GetHandle *gn;
420   unsigned int i;
421
422   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
423     return GNUNET_NO;
424   if (NULL != h->client)
425     return GNUNET_YES;
426   h->client = GNUNET_CLIENT_connect ("statistics", h->cfg);  
427   if (NULL != h->client)
428   {
429     gn = h->action_head; 
430     while (NULL != (gh = gn))
431     {
432       gn = gh->next;
433       if (gh->type == ACTION_WATCH)
434       {
435         GNUNET_CONTAINER_DLL_remove (h->action_head,
436                                      h->action_tail,
437                                      gh);
438         free_action_item (gh);  
439       }
440     }
441     for (i = 0; i < h->watches_size; i++)
442     {
443       if (NULL != h->watches[i])
444         schedule_watch_request (h, h->watches[i]);
445     }
446     return GNUNET_YES;
447   }
448   LOG (GNUNET_ERROR_TYPE_DEBUG,
449        "Failed to connect to statistics service!\n");
450   return GNUNET_NO;
451 }
452
453
454 /**
455  * We've waited long enough, reconnect now.
456  *
457  * @param cls the 'struct GNUNET_STATISTICS_Handle' to reconnect
458  * @param tc scheduler context (unused)
459  */
460 static void
461 reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
462 {
463   struct GNUNET_STATISTICS_Handle *h = cls;
464
465   h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
466   schedule_action (h);
467 }
468
469
470 /**
471  * Task used by 'reconnect_later' to shutdown the handle
472  *
473  * @param cls the statistics handle
474  * @param tc scheduler context
475  */
476 static void
477 do_destroy (void *cls,
478                const struct GNUNET_SCHEDULER_TaskContext *tc)
479 {
480   struct GNUNET_STATISTICS_Handle *h = cls;
481
482   GNUNET_STATISTICS_destroy (h, GNUNET_NO);
483 }
484
485
486 /**
487  * Reconnect at a later time, respecting back-off.
488  *
489  * @param h statistics handle
490  */
491 static void
492 reconnect_later (struct GNUNET_STATISTICS_Handle *h)
493 {
494   int loss;
495   struct GNUNET_STATISTICS_GetHandle *gh;
496
497   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == h->backoff_task);
498   if (GNUNET_YES == h->do_destroy)
499   {
500     /* So we are shutting down and the service is not reachable.
501      * Chances are that it's down for good and we are not going to connect to
502      * it anymore.
503      * Give up and don't sync the rest of the data.
504      */
505     loss = GNUNET_NO;
506     for (gh = h->action_head; NULL != gh; gh = gh->next)
507       if ( (gh->make_persistent) && (ACTION_SET == gh->type) )
508         loss = GNUNET_YES;
509     if (GNUNET_YES == loss)
510       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
511                   _("Could not save some persistent statistics\n"));
512     h->do_destroy = GNUNET_NO;
513     GNUNET_SCHEDULER_add_continuation (&do_destroy, h,
514                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
515     return;
516   }
517   h->backoff_task =
518     GNUNET_SCHEDULER_add_delayed (h->backoff, &reconnect_task, h);
519   h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2);
520   h->backoff =
521     GNUNET_TIME_relative_min (h->backoff, GNUNET_CONSTANTS_SERVICE_TIMEOUT);
522 }
523
524
525 /**
526  * Process a 'GNUNET_MESSAGE_TYPE_STATISTICS_VALUE' message.
527  *
528  * @param h statistics handle
529  * @param msg message received from the service, never NULL
530  * @return GNUNET_OK if the message was well-formed
531  */
532 static int
533 process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h,
534                                   const struct GNUNET_MessageHeader *msg)
535 {
536   char *service;
537   char *name;
538   const struct GNUNET_STATISTICS_ReplyMessage *smsg;
539   uint16_t size;
540
541   if (h->current->aborted)
542   {
543     LOG (GNUNET_ERROR_TYPE_DEBUG, "Iteration was aborted, ignoring VALUE\n");
544     return GNUNET_OK;           /* don't bother */
545   }
546   size = ntohs (msg->size);
547   if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
548   {
549     GNUNET_break (0);
550     return GNUNET_SYSERR;
551   }
552   smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
553   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
554   if (size !=
555       GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], size, 2,
556                                       &service, &name))
557   {
558     GNUNET_break (0);
559     return GNUNET_SYSERR;
560   }
561   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received valid statistic on `%s:%s': %llu\n",
562        service, name, GNUNET_ntohll (smsg->value));
563   if (GNUNET_OK !=
564       h->current->proc (h->current->cls, service, name,
565                         GNUNET_ntohll (smsg->value),
566                         0 !=
567                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
568   {
569     LOG (GNUNET_ERROR_TYPE_DEBUG,
570          "Processing of remaining statistics aborted by client.\n");
571     h->current->aborted = GNUNET_YES;
572   }
573   LOG (GNUNET_ERROR_TYPE_DEBUG, "VALUE processed successfully\n");
574   return GNUNET_OK;
575 }
576
577
578 /**
579  * We have received a watch value from the service.  Process it.
580  *
581  * @param h statistics handle
582  * @param msg the watch value message
583  * @return GNUNET_OK if the message was well-formed, GNUNET_SYSERR if not,
584  *         GNUNET_NO if this watch has been cancelled
585  */
586 static int
587 process_watch_value (struct GNUNET_STATISTICS_Handle *h,
588                      const struct GNUNET_MessageHeader *msg)
589 {
590   const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
591   struct GNUNET_STATISTICS_WatchEntry *w;
592   uint32_t wid;
593
594   if (sizeof (struct GNUNET_STATISTICS_WatchValueMessage) != ntohs (msg->size))
595   {
596     GNUNET_break (0);
597     return GNUNET_SYSERR;
598   }
599   wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *) msg;
600   GNUNET_break (0 == ntohl (wvm->reserved));
601   wid = ntohl (wvm->wid);
602   if (wid >= h->watches_size)
603   {
604     GNUNET_break (0);
605     return GNUNET_SYSERR;
606   }
607   w = h->watches[wid];
608   if (NULL == w)  
609     return GNUNET_NO;  
610   (void) w->proc (w->proc_cls, w->subsystem, w->name,
611                   GNUNET_ntohll (wvm->value),
612                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
613   return GNUNET_OK;
614 }
615
616
617 static void
618 destroy_task (void *cls,
619               const struct GNUNET_SCHEDULER_TaskContext *tc)
620 {
621   struct GNUNET_STATISTICS_Handle *h = cls;
622
623   GNUNET_STATISTICS_destroy (h, GNUNET_NO);
624 }
625
626
627 /**
628  * Function called with messages from stats service.
629  *
630  * @param cls closure
631  * @param msg message received, NULL on timeout or fatal error
632  */
633 static void
634 receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
635 {
636   struct GNUNET_STATISTICS_Handle *h = cls;
637   struct GNUNET_STATISTICS_GetHandle *c;
638   int ret;
639
640   if (NULL == msg)
641   {
642     LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
643          "Error receiving statistics from service, is the service running?\n");
644     do_disconnect (h);
645     reconnect_later (h);
646     return;
647   }
648   switch (ntohs (msg->type))
649   {
650   case GNUNET_MESSAGE_TYPE_TEST:
651     if (GNUNET_SYSERR != h->do_destroy)
652     {
653       /* not in shutdown, why do we get 'TEST'? */
654       GNUNET_break (0);
655       do_disconnect (h);
656       reconnect_later (h);
657       return;
658     }
659     h->do_destroy = GNUNET_NO;
660     GNUNET_SCHEDULER_add_continuation (&destroy_task, h,
661                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
662     break;
663   case GNUNET_MESSAGE_TYPE_STATISTICS_END:
664     LOG (GNUNET_ERROR_TYPE_DEBUG, "Received end of statistics marker\n");
665     if (NULL == (c = h->current))
666     {
667       GNUNET_break (0);
668       do_disconnect (h);
669       reconnect_later (h);
670       return;
671     }
672     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
673     if (h->watches_size > 0)
674     {
675       GNUNET_CLIENT_receive (h->client, &receive_stats, h,
676                              GNUNET_TIME_UNIT_FOREVER_REL);
677     }
678     else
679     {
680       h->receiving = GNUNET_NO;
681     }    
682     h->current = NULL;
683     schedule_action (h);
684     if (NULL != c->cont)
685       c->cont (c->cls, GNUNET_OK);
686     free_action_item (c);
687     return;
688   case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
689     if (GNUNET_OK != process_statistics_value_message (h, msg))
690     {
691       do_disconnect (h);
692       reconnect_later (h);
693       return;     
694     }
695     /* finally, look for more! */
696     LOG (GNUNET_ERROR_TYPE_DEBUG,
697          "Processing VALUE done, now reading more\n");
698     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
699                            GNUNET_TIME_absolute_get_remaining (h->
700                                                                current->timeout));
701     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
702     return;
703   case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
704     if (GNUNET_OK != 
705         (ret = process_watch_value (h, msg)))
706     {
707       do_disconnect (h);
708       if (GNUNET_NO == ret)
709         h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; 
710       reconnect_later (h);
711       return;
712     }
713     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
714     GNUNET_assert (h->watches_size > 0);
715     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
716                            GNUNET_TIME_UNIT_FOREVER_REL);
717     return;    
718   default:
719     GNUNET_break (0);
720     do_disconnect (h);
721     reconnect_later (h);
722     return;
723   }
724 }
725
726
727 /**
728  * Transmit a GET request (and if successful, start to receive
729  * the response).
730  *
731  * @param handle statistics handle
732  * @param size how many bytes can we write to buf
733  * @param buf where to write requests to the service
734  * @return number of bytes written to buf
735  */
736 static size_t
737 transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
738 {
739   struct GNUNET_STATISTICS_GetHandle *c;
740   struct GNUNET_MessageHeader *hdr;
741   size_t slen1;
742   size_t slen2;
743   uint16_t msize;
744
745   GNUNET_assert (NULL != (c = handle->current));
746   if (NULL == buf)
747   {
748     /* timeout / error */
749     LOG (GNUNET_ERROR_TYPE_DEBUG,
750          "Transmission of request for statistics failed!\n");
751     do_disconnect (handle);
752     reconnect_later (handle);
753     return 0;
754   }
755   slen1 = strlen (c->subsystem) + 1;
756   slen2 = strlen (c->name) + 1;
757   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
758   GNUNET_assert (msize <= size);
759   hdr = (struct GNUNET_MessageHeader *) buf;
760   hdr->size = htons (msize);
761   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
762   GNUNET_assert (slen1 + slen2 ==
763                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
764                                              c->subsystem,
765                                              c->name));
766   if (GNUNET_YES != handle->receiving)
767   {
768     LOG (GNUNET_ERROR_TYPE_DEBUG,
769          "Transmission of GET done, now reading response\n");
770     handle->receiving = GNUNET_YES;
771     GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
772                            GNUNET_TIME_absolute_get_remaining (c->timeout));
773   }
774   return msize;
775 }
776
777
778 /**
779  * Transmit a WATCH request (and if successful, start to receive
780  * the response).
781  *
782  * @param handle statistics handle
783  * @param size how many bytes can we write to buf
784  * @param buf where to write requests to the service
785  * @return number of bytes written to buf
786  */
787 static size_t
788 transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
789 {
790   struct GNUNET_MessageHeader *hdr;
791   size_t slen1;
792   size_t slen2;
793   uint16_t msize;
794
795   if (NULL == buf)
796   {
797     /* timeout / error */
798     LOG (GNUNET_ERROR_TYPE_DEBUG,
799          "Transmission of request for statistics failed!\n");
800     do_disconnect (handle);
801     reconnect_later (handle);
802     return 0;
803   }
804   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting watch request for `%s'\n",
805        handle->current->name);
806   slen1 = strlen (handle->current->subsystem) + 1;
807   slen2 = strlen (handle->current->name) + 1;
808   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
809   GNUNET_assert (msize <= size);
810   hdr = (struct GNUNET_MessageHeader *) buf;
811   hdr->size = htons (msize);
812   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
813   GNUNET_assert (slen1 + slen2 ==
814                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
815                                              handle->current->subsystem,
816                                              handle->current->name));
817   if (GNUNET_YES != handle->receiving)
818   {
819     handle->receiving = GNUNET_YES;
820     GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
821                            GNUNET_TIME_UNIT_FOREVER_REL);
822   }
823   GNUNET_assert (NULL == handle->current->cont);
824   free_action_item (handle->current);
825   handle->current = NULL;
826   return msize;
827 }
828
829
830 /**
831  * Transmit a SET/UPDATE request.
832  *
833  * @param handle statistics handle
834  * @param size how many bytes can we write to buf
835  * @param buf where to write requests to the service
836  * @return number of bytes written to buf
837  */
838 static size_t
839 transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
840 {
841   struct GNUNET_STATISTICS_SetMessage *r;
842   size_t slen;
843   size_t nlen;
844   size_t nsize;
845
846   if (NULL == buf)
847   {
848     do_disconnect (handle);
849     reconnect_later (handle);
850     return 0;
851   }
852   slen = strlen (handle->current->subsystem) + 1;
853   nlen = strlen (handle->current->name) + 1;
854   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
855   if (size < nsize)
856   {
857     GNUNET_break (0);
858     do_disconnect (handle);
859     reconnect_later (handle);
860     return 0;
861   }
862   r = buf;
863   r->header.size = htons (nsize);
864   r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
865   r->flags = 0;
866   r->value = GNUNET_htonll (handle->current->value);
867   if (handle->current->make_persistent)
868     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
869   if (handle->current->type == ACTION_UPDATE)
870     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
871   GNUNET_assert (slen + nlen ==
872                  GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2,
873                                              handle->current->subsystem,
874                                              handle->current->name));
875   GNUNET_assert (NULL == handle->current->cont);
876   free_action_item (handle->current);
877   handle->current = NULL;
878   update_memory_statistics (handle);
879   return nsize;
880 }
881
882
883 /**
884  * Function called when we are ready to transmit a request to the service.
885  *
886  * @param cls the 'struct GNUNET_STATISTICS_Handle'
887  * @param size how many bytes can we write to buf
888  * @param buf where to write requests to the service
889  * @return number of bytes written to buf
890  */
891 static size_t
892 transmit_action (void *cls, size_t size, void *buf)
893 {
894   struct GNUNET_STATISTICS_Handle *h = cls;
895   size_t ret;
896
897   h->th = NULL;
898   ret = 0;
899   if (NULL != h->current)
900     switch (h->current->type)
901     {
902     case ACTION_GET:
903       ret = transmit_get (h, size, buf);
904       break;
905     case ACTION_SET:
906     case ACTION_UPDATE:
907       ret = transmit_set (h, size, buf);
908       break;
909     case ACTION_WATCH:
910       ret = transmit_watch (h, size, buf);
911       break;
912     default:
913       GNUNET_assert (0);
914       break;
915     }
916   schedule_action (h);
917   return ret;
918 }
919
920
921 /**
922  * Get handle for the statistics service.
923  *
924  * @param subsystem name of subsystem using the service
925  * @param cfg services configuration in use
926  * @return handle to use
927  */
928 struct GNUNET_STATISTICS_Handle *
929 GNUNET_STATISTICS_create (const char *subsystem,
930                           const struct GNUNET_CONFIGURATION_Handle *cfg)
931 {
932   struct GNUNET_STATISTICS_Handle *ret;
933
934   GNUNET_assert (NULL != subsystem);
935   GNUNET_assert (NULL != cfg);
936   ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
937   ret->cfg = cfg;
938   ret->subsystem = GNUNET_strdup (subsystem);
939   ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
940   return ret;
941 }
942
943
944 /**
945  * Destroy a handle (free all state associated with
946  * it).
947  *
948  * @param h statistics handle to destroy
949  * @param sync_first set to GNUNET_YES if pending SET requests should
950  *        be completed
951  */
952 void
953 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
954 {
955   struct GNUNET_STATISTICS_GetHandle *pos;
956   struct GNUNET_STATISTICS_GetHandle *next;
957   struct GNUNET_TIME_Relative timeout;
958   int i;
959
960   if (NULL == h)
961     return;
962   GNUNET_assert (GNUNET_NO == h->do_destroy); // Don't call twice.
963   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
964   {
965     GNUNET_SCHEDULER_cancel (h->backoff_task);
966     h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
967   }
968   if (sync_first)
969   {
970     if (NULL != h->current)
971     {
972       if (ACTION_GET == h->current->type)
973       {
974         GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
975         h->th = NULL;
976         free_action_item (h->current);
977         h->current = NULL;
978       }
979     }
980     next = h->action_head; 
981     while (NULL != (pos = next))
982     {
983       next = pos->next;
984       if (ACTION_GET == pos->type)
985       {
986         GNUNET_CONTAINER_DLL_remove (h->action_head,
987                                      h->action_tail,
988                                      pos);
989         free_action_item (pos);
990       }
991     }
992     if ( (NULL == h->current) &&
993          (NULL != (h->current = h->action_head)) )
994       GNUNET_CONTAINER_DLL_remove (h->action_head,
995                                    h->action_tail,
996                                    h->current);
997     h->do_destroy = GNUNET_YES;
998     if ((NULL != h->current) && (NULL == h->th) &&
999         (NULL != h->client))
1000     {
1001       timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
1002       h->th =
1003         GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
1004                                              timeout, GNUNET_YES,
1005                                              &transmit_action, h);
1006       GNUNET_assert (NULL != h->th);
1007     }
1008     if (NULL != h->th)
1009       return; /* do not finish destruction just yet */
1010   }
1011   while (NULL != (pos = h->action_head))
1012   {
1013     GNUNET_CONTAINER_DLL_remove (h->action_head,
1014                                  h->action_tail,
1015                                  pos);
1016     free_action_item (pos);
1017   }
1018   do_disconnect (h);
1019   for (i = 0; i < h->watches_size; i++)
1020   {
1021     if (NULL == h->watches[i])
1022       continue; 
1023     GNUNET_free (h->watches[i]->subsystem);
1024     GNUNET_free (h->watches[i]->name);
1025     GNUNET_free (h->watches[i]);
1026   }
1027   GNUNET_array_grow (h->watches, h->watches_size, 0);
1028   GNUNET_free (h->subsystem);
1029   GNUNET_free (h);
1030 }
1031
1032
1033 /**
1034  * Function called to transmit TEST message to service to
1035  * confirm that the service has received all of our 'SET'
1036  * messages (during statistics disconnect/shutdown).
1037  *
1038  * @param cls the 'struct GNUNET_STATISTICS_Handle'
1039  * @param size how many bytes can we write to buf
1040  * @param buf where to write requests to the service
1041  * @return number of bytes written to buf
1042  */
1043 static size_t
1044 transmit_test_on_shutdown (void *cls,
1045                            size_t size,
1046                            void *buf)
1047 {
1048   struct GNUNET_STATISTICS_Handle *h = cls;
1049   struct GNUNET_MessageHeader hdr;
1050
1051   h->th = NULL;
1052   if (NULL == buf)
1053   {
1054     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1055                 _("Failed to receive acknowledgement from statistics service, some statistics might have been lost!\n"));
1056     h->do_destroy = GNUNET_NO;
1057     GNUNET_SCHEDULER_add_continuation (&destroy_task, h,
1058                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1059     return 0;
1060   }
1061   hdr.type = htons (GNUNET_MESSAGE_TYPE_TEST);
1062   hdr.size = htons (sizeof (struct GNUNET_MessageHeader));
1063   memcpy (buf, &hdr, sizeof (hdr));
1064   if (GNUNET_YES != h->receiving)
1065   {
1066     h->receiving = GNUNET_YES;
1067     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
1068                            GNUNET_TIME_UNIT_FOREVER_REL);
1069   }
1070   return sizeof (struct GNUNET_MessageHeader);
1071 }
1072
1073
1074 /**
1075  * Schedule the next action to be performed.
1076  *
1077  * @param h statistics handle
1078  */
1079 static void
1080 schedule_action (struct GNUNET_STATISTICS_Handle *h)
1081 {
1082   struct GNUNET_TIME_Relative timeout;
1083
1084   if ( (NULL != h->th) ||
1085        (GNUNET_SCHEDULER_NO_TASK != h->backoff_task) )
1086     return;                     /* action already pending */
1087   if (GNUNET_YES != try_connect (h))
1088   {
1089     reconnect_later (h);
1090     return;
1091   }
1092   if (NULL != h->current)
1093     return; /* action already pending */
1094   /* schedule next action */
1095   h->current = h->action_head;
1096   if (NULL == h->current)
1097   {
1098     if (GNUNET_YES == h->do_destroy)
1099     {
1100       h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
1101       h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
1102                                                    sizeof (struct GNUNET_MessageHeader),
1103                                                    SET_TRANSMIT_TIMEOUT,
1104                                                    GNUNET_NO,
1105                                                    &transmit_test_on_shutdown, h);
1106     }
1107     return;
1108   }
1109   GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current);
1110   timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
1111   if (NULL ==
1112       (h->th =
1113        GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
1114                                             timeout, GNUNET_YES,
1115                                             &transmit_action, h)))
1116   {
1117     LOG (GNUNET_ERROR_TYPE_DEBUG,
1118          "Failed to transmit request to statistics service.\n");
1119     do_disconnect (h);
1120     reconnect_later (h);
1121   }
1122 }
1123
1124
1125 /**
1126  * Get statistic from the peer.
1127  *
1128  * @param handle identification of the statistics service
1129  * @param subsystem limit to the specified subsystem, NULL for our subsystem
1130  * @param name name of the statistic value, NULL for all values
1131  * @param timeout after how long should we give up (and call
1132  *        cont with an error code)?
1133  * @param cont continuation to call when done (can be NULL)
1134  *        This callback CANNOT destroy the statistics handle in the same call.
1135  * @param proc function to call on each value
1136  * @param cls closure for cont and proc
1137  * @return NULL on error
1138  */
1139 struct GNUNET_STATISTICS_GetHandle *
1140 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1141                        const char *subsystem, const char *name,
1142                        struct GNUNET_TIME_Relative timeout,
1143                        GNUNET_STATISTICS_Callback cont,
1144                        GNUNET_STATISTICS_Iterator proc, void *cls)
1145 {
1146   size_t slen1;
1147   size_t slen2;
1148   struct GNUNET_STATISTICS_GetHandle *ai;
1149
1150   if (NULL == handle)
1151     return NULL;
1152   GNUNET_assert (NULL != proc);
1153   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1154   if (NULL == subsystem)
1155     subsystem = "";
1156   if (NULL == name)
1157     name = "";
1158   slen1 = strlen (subsystem) + 1;
1159   slen2 = strlen (name) + 1;
1160   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
1161                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1162   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1163   ai->sh = handle;
1164   ai->subsystem = GNUNET_strdup (subsystem);
1165   ai->name = GNUNET_strdup (name);
1166   ai->cont = cont;
1167   ai->proc = proc;
1168   ai->cls = cls;
1169   ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1170   ai->type = ACTION_GET;
1171   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
1172   GNUNET_CONTAINER_DLL_insert_tail (handle->action_head, handle->action_tail,
1173                                     ai);
1174   schedule_action (handle);
1175   return ai;
1176 }
1177
1178
1179 /**
1180  * Cancel a 'get' request.  Must be called before the 'cont'
1181  * function is called.
1182  *
1183  * @param gh handle of the request to cancel
1184  */
1185 void
1186 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1187 {
1188   if (NULL == gh)
1189     return;
1190   if (gh->sh->current == gh)
1191   {
1192     gh->aborted = GNUNET_YES;
1193   }
1194   else
1195   {
1196     GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, gh->sh->action_tail, gh);
1197     GNUNET_free (gh->name);
1198     GNUNET_free (gh->subsystem);
1199     GNUNET_free (gh);
1200   }
1201 }
1202
1203
1204 /**
1205  * Watch statistics from the peer (be notified whenever they change).
1206  *
1207  * @param handle identification of the statistics service
1208  * @param subsystem limit to the specified subsystem, never NULL
1209  * @param name name of the statistic value, never NULL
1210  * @param proc function to call on each value
1211  * @param proc_cls closure for proc
1212  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1213  */
1214 int
1215 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1216                          const char *subsystem, const char *name,
1217                          GNUNET_STATISTICS_Iterator proc, void *proc_cls)
1218 {
1219   struct GNUNET_STATISTICS_WatchEntry *w;
1220
1221   if (NULL == handle)
1222     return GNUNET_SYSERR;
1223   w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
1224   w->subsystem = GNUNET_strdup (subsystem);
1225   w->name = GNUNET_strdup (name);
1226   w->proc = proc;
1227   w->proc_cls = proc_cls;
1228   GNUNET_array_append (handle->watches, handle->watches_size, w);
1229   schedule_watch_request (handle, w);
1230   return GNUNET_OK;
1231 }
1232
1233
1234 /**
1235  * Stop watching statistics from the peer.  
1236  *
1237  * @param handle identification of the statistics service
1238  * @param subsystem limit to the specified subsystem, never NULL
1239  * @param name name of the statistic value, never NULL
1240  * @param proc function to call on each value
1241  * @param proc_cls closure for proc
1242  * @return GNUNET_OK on success, GNUNET_SYSERR on error (no such watch)
1243  */
1244 int
1245 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
1246                                 const char *subsystem, const char *name,
1247                                 GNUNET_STATISTICS_Iterator proc, void *proc_cls)
1248 {
1249   struct GNUNET_STATISTICS_WatchEntry *w;
1250   unsigned int i;
1251
1252   if (NULL == handle)
1253     return GNUNET_SYSERR;
1254   for (i=0;i<handle->watches_size;i++)
1255   {
1256     w = handle->watches[i];
1257     if (NULL == w)
1258       continue;
1259     if ( (w->proc == proc) &&
1260          (w->proc_cls == proc_cls) &&
1261          (0 == strcmp (w->name, name)) &&
1262          (0 == strcmp (w->subsystem, subsystem)) )
1263     {
1264       GNUNET_free (w->name);
1265       GNUNET_free (w->subsystem);
1266       GNUNET_free (w);
1267       handle->watches[i] = NULL;      
1268       return GNUNET_OK;
1269     }    
1270   }
1271   return GNUNET_SYSERR;
1272 }
1273
1274
1275
1276 /**
1277  * Queue a request to change a statistic.
1278  *
1279  * @param h statistics handle
1280  * @param name name of the value
1281  * @param make_persistent  should the value be kept across restarts?
1282  * @param value new value or change
1283  * @param type type of the action (ACTION_SET or ACTION_UPDATE)
1284  */
1285 static void
1286 add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
1287                    int make_persistent, uint64_t value, enum ActionType type)
1288 {
1289   struct GNUNET_STATISTICS_GetHandle *ai;
1290   size_t slen;
1291   size_t nlen;
1292   size_t nsize;
1293   int64_t delta;
1294
1295   GNUNET_assert (NULL != h);
1296   GNUNET_assert (NULL != name);
1297   slen = strlen (h->subsystem) + 1;
1298   nlen = strlen (name) + 1;
1299   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1300   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1301   {
1302     GNUNET_break (0);
1303     return;
1304   }
1305   for (ai = h->action_head; NULL != ai; ai = ai->next)
1306   {
1307     if (! ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
1308             (0 == strcmp (ai->name, name)) && 
1309             ( (ACTION_UPDATE == ai->type) ||
1310               (ACTION_SET == ai->type) ) ) )
1311       continue;
1312     if (ACTION_SET == ai->type)
1313     {
1314       if (ACTION_UPDATE == type)
1315       {
1316         delta = (int64_t) value;
1317         if (delta > 0)
1318         {
1319           /* update old set by new delta */
1320           ai->value += delta;
1321         }
1322         else
1323         {
1324           /* update old set by new delta, but never go negative */
1325           if (ai->value < -delta)
1326             ai->value = 0;
1327           else
1328             ai->value += delta;
1329         }
1330       }
1331       else
1332       {
1333         /* new set overrides old set */
1334         ai->value = value;
1335       }
1336     }
1337     else
1338     {
1339       if (ACTION_UPDATE == type)
1340       {
1341         /* make delta cummulative */
1342         delta = (int64_t) value;
1343         ai->value += delta;
1344       }
1345       else
1346       {
1347         /* drop old 'update', use new 'set' instead */
1348         ai->value = value;
1349         ai->type = type;
1350       }
1351     }
1352     ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1353     ai->make_persistent = make_persistent;
1354     return;  
1355   }
1356   /* no existing entry matches, create a fresh one */
1357   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1358   ai->sh = h;
1359   ai->subsystem = GNUNET_strdup (h->subsystem);
1360   ai->name = GNUNET_strdup (name);
1361   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1362   ai->make_persistent = make_persistent;
1363   ai->msize = nsize;
1364   ai->value = value;
1365   ai->type = type;
1366   GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
1367                                     ai);
1368   schedule_action (h);
1369 }
1370
1371
1372 /**
1373  * Set statistic value for the peer.  Will always use our
1374  * subsystem (the argument used when "handle" was created).
1375  *
1376  * @param handle identification of the statistics service
1377  * @param name name of the statistic value
1378  * @param value new value to set
1379  * @param make_persistent should the value be kept across restarts?
1380  */
1381 void
1382 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1383                        const char *name, uint64_t value, int make_persistent)
1384 {
1385   if (NULL == handle)
1386     return;
1387   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1388   add_setter_action (handle, name, make_persistent, value, ACTION_SET);
1389 }
1390
1391
1392 /**
1393  * Set statistic value for the peer.  Will always use our
1394  * subsystem (the argument used when "handle" was created).
1395  *
1396  * @param handle identification of the statistics service
1397  * @param name name of the statistic value
1398  * @param delta change in value (added to existing value)
1399  * @param make_persistent should the value be kept across restarts?
1400  */
1401 void
1402 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1403                           const char *name, int64_t delta, int make_persistent)
1404 {
1405   if (NULL == handle)
1406     return;
1407   if (0 == delta)
1408     return;
1409   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1410   add_setter_action (handle, name, make_persistent, (uint64_t) delta,
1411                      ACTION_UPDATE);
1412 }
1413
1414
1415 /* end of statistics_api.c */