-remove debug message
[oweals/gnunet.git] / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_client_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_server_lib.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_strings_lib.h"
34 #include "statistics.h"
35
36 /**
37  * How long do we wait until a statistics request for setting
38  * a value times out?  (The update will be lost if the
39  * service does not react within this timeframe).
40  */
41 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "statistics-api",__VA_ARGS__)
44
45 /**
46  * Types of actions.
47  */
48 enum ActionType
49 {
50   /**
51    * Get a value.
52    */
53   ACTION_GET,
54
55   /**
56    * Set a value.
57    */
58   ACTION_SET,
59
60   /**
61    * Update a value.
62    */
63   ACTION_UPDATE,
64
65   /**
66    * Watch a value.
67    */
68   ACTION_WATCH
69 };
70
71
72 /**
73  * Entry kept for each value we are watching.
74  */
75 struct GNUNET_STATISTICS_WatchEntry
76 {
77
78   /**
79    * What subsystem is this action about? (never NULL)
80    */
81   char *subsystem;
82
83   /**
84    * What value is this action about? (never NULL)
85    */
86   char *name;
87
88   /**
89    * Function to call
90    */
91   GNUNET_STATISTICS_Iterator proc;
92
93   /**
94    * Closure for proc
95    */
96   void *proc_cls;
97
98 };
99
100
101 /**
102  * Linked list of things we still need to do.
103  */
104 struct GNUNET_STATISTICS_GetHandle
105 {
106
107   /**
108    * This is a doubly linked list.
109    */
110   struct GNUNET_STATISTICS_GetHandle *next;
111
112   /**
113    * This is a doubly linked list.
114    */
115   struct GNUNET_STATISTICS_GetHandle *prev;
116
117   /**
118    * Main statistics handle.
119    */
120   struct GNUNET_STATISTICS_Handle *sh;
121
122   /**
123    * What subsystem is this action about? (can be NULL)
124    */
125   char *subsystem;
126
127   /**
128    * What value is this action about? (can be NULL)
129    */
130   char *name;
131
132   /**
133    * Continuation to call once action is complete.
134    */
135   GNUNET_STATISTICS_Callback cont;
136
137   /**
138    * Function to call (for GET actions only).
139    */
140   GNUNET_STATISTICS_Iterator proc;
141
142   /**
143    * Closure for proc and cont.
144    */
145   void *cls;
146
147   /**
148    * Timeout for this action.
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Associated value.
154    */
155   uint64_t value;
156
157   /**
158    * Flag for SET/UPDATE actions.
159    */
160   int make_persistent;
161
162   /**
163    * Has the current iteration been aborted; for GET actions.
164    */
165   int aborted;
166
167   /**
168    * Is this a GET, SET, UPDATE or WATCH?
169    */
170   enum ActionType type;
171
172   /**
173    * Size of the message that we will be transmitting.
174    */
175   uint16_t msize;
176
177 };
178
179
180 /**
181  * Handle for the service.
182  */
183 struct GNUNET_STATISTICS_Handle
184 {
185   /**
186    * Name of our subsystem.
187    */
188   char *subsystem;
189
190   /**
191    * Configuration to use.
192    */
193   const struct GNUNET_CONFIGURATION_Handle *cfg;
194
195   /**
196    * Socket (if available).
197    */
198   struct GNUNET_CLIENT_Connection *client;
199
200   /**
201    * Currently pending transmission request.
202    */
203   struct GNUNET_CLIENT_TransmitHandle *th;
204
205   /**
206    * Head of the linked list of pending actions (first action
207    * to be performed).
208    */
209   struct GNUNET_STATISTICS_GetHandle *action_head;
210
211   /**
212    * Tail of the linked list of actions (for fast append).
213    */
214   struct GNUNET_STATISTICS_GetHandle *action_tail;
215
216   /**
217    * Action we are currently busy with (action request has been
218    * transmitted, we're now receiving the response from the
219    * service).
220    */
221   struct GNUNET_STATISTICS_GetHandle *current;
222
223   /**
224    * Array of watch entries.
225    */
226   struct GNUNET_STATISTICS_WatchEntry **watches;
227
228   /**
229    * Task doing exponential back-off trying to reconnect.
230    */
231   GNUNET_SCHEDULER_TaskIdentifier backoff_task;
232
233   /**
234    * Time for next connect retry.
235    */
236   struct GNUNET_TIME_Relative backoff;
237
238   /**
239    * Maximum heap size observed so far (if available).
240    */
241   uint64_t peak_heap_size;
242
243   /**
244    * Maximum resident set side observed so far (if available).
245    */
246   uint64_t peak_rss;
247
248   /**
249    * Size of the 'watches' array.
250    */
251   unsigned int watches_size;
252
253   /**
254    * Should this handle auto-destruct once all actions have
255    * been processed?
256    */
257   int do_destroy;
258
259   /**
260    * Are we currently receiving from the service?
261    */
262   int receiving;
263
264 };
265
266
267 /**
268  * Obtain statistics about this process's memory consumption and
269  * report those as well (if they changed).
270  */
271 static void
272 update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
273 {
274 #if ENABLE_HEAP_STATISTICS
275   uint64_t current_heap_size = 0;
276   uint64_t current_rss = 0;
277
278   if (GNUNET_NO != h->do_destroy)
279     return;
280 #if HAVE_MALLINFO
281   {
282     struct mallinfo mi;
283     
284     mi = mallinfo();
285     current_heap_size = mi.uordblks + mi.fordblks;  
286   }
287 #endif  
288 #if HAVE_GETRUSAGE
289   {
290     struct rusage ru;
291
292     if (0 == getrusage (RUSAGE_SELF, &ru))
293     {
294       current_rss = 1024LL * ru.ru_maxrss;
295     }    
296   }
297 #endif
298   if (current_heap_size > h->peak_heap_size)
299   {
300     h->peak_heap_size = current_heap_size;
301     GNUNET_STATISTICS_set (h, "# peak heap size", current_heap_size, GNUNET_NO);
302   }
303   if (current_rss > h->peak_rss)
304   {
305     h->peak_rss = current_rss;
306     GNUNET_STATISTICS_set (h, "# peak resident set size", current_rss, GNUNET_NO);
307   }
308 #endif
309 }
310
311
312 /**
313  * Schedule the next action to be performed.
314  *
315  * @param h statistics handle to reconnect
316  */
317 static void
318 schedule_action (struct GNUNET_STATISTICS_Handle *h);
319
320
321 /**
322  * Transmit request to service that we want to watch
323  * the development of a particular value.
324  *
325  * @param h statistics handle
326  * @param watch watch entry of the value to watch
327  */
328 static void
329 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
330                         struct GNUNET_STATISTICS_WatchEntry *watch)
331 {
332
333   struct GNUNET_STATISTICS_GetHandle *ai;
334   size_t slen;
335   size_t nlen;
336   size_t nsize;
337
338   GNUNET_assert (NULL != h);
339   GNUNET_assert (NULL != watch);
340
341   slen = strlen (watch->subsystem) + 1;
342   nlen = strlen (watch->name) + 1;
343   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
344   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
345   {
346     GNUNET_break (0);
347     return;
348   }
349   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
350   ai->sh = h;
351   ai->subsystem = GNUNET_strdup (watch->subsystem);
352   ai->name = GNUNET_strdup (watch->name);
353   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
354   ai->msize = nsize;
355   ai->type = ACTION_WATCH;
356   ai->proc = watch->proc;
357   ai->cls = watch->proc_cls;
358   GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
359                                     ai);
360   schedule_action (h);
361 }
362
363
364 /**
365  * Free memory associated with the given action item.
366  *
367  * @param gh action item to free
368  */
369 static void
370 free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
371 {
372   GNUNET_free_non_null (gh->subsystem);
373   GNUNET_free_non_null (gh->name);
374   GNUNET_free (gh);
375 }
376
377
378 /**
379  * Disconnect from the statistics service.
380  *
381  * @param h statistics handle to disconnect from
382  */
383 static void
384 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
385 {
386   struct GNUNET_STATISTICS_GetHandle *c;
387   
388   if (NULL != h->th)
389   {
390     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
391     h->th = NULL;
392   } 
393   if (NULL != h->client)
394   {
395     GNUNET_CLIENT_disconnect (h->client);
396     h->client = NULL;
397   }
398   h->receiving = GNUNET_NO;
399   if (NULL != (c = h->current))
400   {
401     h->current = NULL;
402     if (NULL != c->cont)
403       c->cont (c->cls, GNUNET_SYSERR);
404     free_action_item (c);
405   }
406 }
407
408
409 /**
410  * Try to (re)connect to the statistics service.
411  *
412  * @param h statistics handle to reconnect
413  * @return GNUNET_YES on success, GNUNET_NO on failure.
414  */
415 static int
416 try_connect (struct GNUNET_STATISTICS_Handle *h)
417 {
418   struct GNUNET_STATISTICS_GetHandle *gh;
419   struct GNUNET_STATISTICS_GetHandle *gn;
420   unsigned int i;
421
422   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
423     return GNUNET_NO;
424   if (NULL != h->client)
425     return GNUNET_YES;
426   h->client = GNUNET_CLIENT_connect ("statistics", h->cfg);  
427   if (NULL != h->client)
428   {
429     gn = h->action_head; 
430     while (NULL != (gh = gn))
431     {
432       gn = gh->next;
433       if (gh->type == ACTION_WATCH)
434       {
435         GNUNET_CONTAINER_DLL_remove (h->action_head,
436                                      h->action_tail,
437                                      gh);
438         free_action_item (gh);  
439       }
440     }
441     for (i = 0; i < h->watches_size; i++)
442     {
443       if (NULL != h->watches[i])
444         schedule_watch_request (h, h->watches[i]);
445     }
446     return GNUNET_YES;
447   }
448   LOG (GNUNET_ERROR_TYPE_DEBUG,
449        "Failed to connect to statistics service!\n");
450   return GNUNET_NO;
451 }
452
453
454 /**
455  * We've waited long enough, reconnect now.
456  *
457  * @param cls the 'struct GNUNET_STATISTICS_Handle' to reconnect
458  * @param tc scheduler context (unused)
459  */
460 static void
461 reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
462 {
463   struct GNUNET_STATISTICS_Handle *h = cls;
464
465   h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
466   schedule_action (h);
467 }
468
469
470 /**
471  * Task used by 'reconnect_later' to shutdown the handle
472  *
473  * @param cls the statistics handle
474  * @param tc scheduler context
475  */
476 static void
477 do_destroy (void *cls,
478                const struct GNUNET_SCHEDULER_TaskContext *tc)
479 {
480   struct GNUNET_STATISTICS_Handle *h = cls;
481
482   GNUNET_STATISTICS_destroy (h, GNUNET_NO);
483 }
484
485
486 /**
487  * Reconnect at a later time, respecting back-off.
488  *
489  * @param h statistics handle
490  */
491 static void
492 reconnect_later (struct GNUNET_STATISTICS_Handle *h)
493 {
494   int loss;
495   struct GNUNET_STATISTICS_GetHandle *gh;
496
497   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == h->backoff_task);
498   if (GNUNET_YES == h->do_destroy)
499   {
500     /* So we are shutting down and the service is not reachable.
501      * Chances are that it's down for good and we are not going to connect to
502      * it anymore.
503      * Give up and don't sync the rest of the data.
504      */
505     loss = GNUNET_NO;
506     for (gh = h->action_head; NULL != gh; gh = gh->next)
507       if ( (gh->make_persistent) && (ACTION_SET == gh->type) )
508         loss = GNUNET_YES;
509     if (GNUNET_YES == loss)
510       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
511                   _("Could not save some persistent statistics\n"));
512     h->do_destroy = GNUNET_NO;
513     GNUNET_SCHEDULER_add_continuation (&do_destroy, h,
514                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
515     return;
516   }
517   h->backoff_task =
518     GNUNET_SCHEDULER_add_delayed (h->backoff, &reconnect_task, h);
519   h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
520 }
521
522
523 /**
524  * Process a 'GNUNET_MESSAGE_TYPE_STATISTICS_VALUE' message.
525  *
526  * @param h statistics handle
527  * @param msg message received from the service, never NULL
528  * @return GNUNET_OK if the message was well-formed
529  */
530 static int
531 process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h,
532                                   const struct GNUNET_MessageHeader *msg)
533 {
534   char *service;
535   char *name;
536   const struct GNUNET_STATISTICS_ReplyMessage *smsg;
537   uint16_t size;
538
539   if (h->current->aborted)
540   {
541     LOG (GNUNET_ERROR_TYPE_DEBUG, "Iteration was aborted, ignoring VALUE\n");
542     return GNUNET_OK;           /* don't bother */
543   }
544   size = ntohs (msg->size);
545   if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
546   {
547     GNUNET_break (0);
548     return GNUNET_SYSERR;
549   }
550   smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
551   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
552   if (size !=
553       GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], size, 2,
554                                       &service, &name))
555   {
556     GNUNET_break (0);
557     return GNUNET_SYSERR;
558   }
559   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received valid statistic on `%s:%s': %llu\n",
560        service, name, GNUNET_ntohll (smsg->value));
561   if (GNUNET_OK !=
562       h->current->proc (h->current->cls, service, name,
563                         GNUNET_ntohll (smsg->value),
564                         0 !=
565                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
566   {
567     LOG (GNUNET_ERROR_TYPE_DEBUG,
568          "Processing of remaining statistics aborted by client.\n");
569     h->current->aborted = GNUNET_YES;
570   }
571   LOG (GNUNET_ERROR_TYPE_DEBUG, "VALUE processed successfully\n");
572   return GNUNET_OK;
573 }
574
575
576 /**
577  * We have received a watch value from the service.  Process it.
578  *
579  * @param h statistics handle
580  * @param msg the watch value message
581  * @return GNUNET_OK if the message was well-formed, GNUNET_SYSERR if not,
582  *         GNUNET_NO if this watch has been cancelled
583  */
584 static int
585 process_watch_value (struct GNUNET_STATISTICS_Handle *h,
586                      const struct GNUNET_MessageHeader *msg)
587 {
588   const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
589   struct GNUNET_STATISTICS_WatchEntry *w;
590   uint32_t wid;
591
592   if (sizeof (struct GNUNET_STATISTICS_WatchValueMessage) != ntohs (msg->size))
593   {
594     GNUNET_break (0);
595     return GNUNET_SYSERR;
596   }
597   wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *) msg;
598   GNUNET_break (0 == ntohl (wvm->reserved));
599   wid = ntohl (wvm->wid);
600   if (wid >= h->watches_size)
601   {
602     GNUNET_break (0);
603     return GNUNET_SYSERR;
604   }
605   w = h->watches[wid];
606   if (NULL == w)  
607     return GNUNET_NO;  
608   (void) w->proc (w->proc_cls, w->subsystem, w->name,
609                   GNUNET_ntohll (wvm->value),
610                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
611   return GNUNET_OK;
612 }
613
614
615 static void
616 destroy_task (void *cls,
617               const struct GNUNET_SCHEDULER_TaskContext *tc)
618 {
619   struct GNUNET_STATISTICS_Handle *h = cls;
620
621   GNUNET_STATISTICS_destroy (h, GNUNET_NO);
622 }
623
624
625 /**
626  * Function called with messages from stats service.
627  *
628  * @param cls closure
629  * @param msg message received, NULL on timeout or fatal error
630  */
631 static void
632 receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
633 {
634   struct GNUNET_STATISTICS_Handle *h = cls;
635   struct GNUNET_STATISTICS_GetHandle *c;
636   int ret;
637
638   if (NULL == msg)
639   {
640     LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
641          "Error receiving statistics from service, is the service running?\n");
642     do_disconnect (h);
643     reconnect_later (h);
644     return;
645   }
646   switch (ntohs (msg->type))
647   {
648   case GNUNET_MESSAGE_TYPE_TEST:
649     if (GNUNET_SYSERR != h->do_destroy)
650     {
651       /* not in shutdown, why do we get 'TEST'? */
652       GNUNET_break (0);
653       do_disconnect (h);
654       reconnect_later (h);
655       return;
656     }
657     h->do_destroy = GNUNET_NO;
658     GNUNET_SCHEDULER_add_continuation (&destroy_task, h,
659                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
660     break;
661   case GNUNET_MESSAGE_TYPE_STATISTICS_END:
662     LOG (GNUNET_ERROR_TYPE_DEBUG, "Received end of statistics marker\n");
663     if (NULL == (c = h->current))
664     {
665       GNUNET_break (0);
666       do_disconnect (h);
667       reconnect_later (h);
668       return;
669     }
670     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
671     if (h->watches_size > 0)
672     {
673       GNUNET_CLIENT_receive (h->client, &receive_stats, h,
674                              GNUNET_TIME_UNIT_FOREVER_REL);
675     }
676     else
677     {
678       h->receiving = GNUNET_NO;
679     }    
680     h->current = NULL;
681     schedule_action (h);
682     if (NULL != c->cont)
683       c->cont (c->cls, GNUNET_OK);
684     free_action_item (c);
685     return;
686   case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
687     if (GNUNET_OK != process_statistics_value_message (h, msg))
688     {
689       do_disconnect (h);
690       reconnect_later (h);
691       return;     
692     }
693     /* finally, look for more! */
694     LOG (GNUNET_ERROR_TYPE_DEBUG,
695          "Processing VALUE done, now reading more\n");
696     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
697                            GNUNET_TIME_absolute_get_remaining (h->
698                                                                current->timeout));
699     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
700     return;
701   case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
702     if (GNUNET_OK != 
703         (ret = process_watch_value (h, msg)))
704     {
705       do_disconnect (h);
706       if (GNUNET_NO == ret)
707         h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; 
708       reconnect_later (h);
709       return;
710     }
711     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
712     GNUNET_assert (h->watches_size > 0);
713     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
714                            GNUNET_TIME_UNIT_FOREVER_REL);
715     return;    
716   default:
717     GNUNET_break (0);
718     do_disconnect (h);
719     reconnect_later (h);
720     return;
721   }
722 }
723
724
725 /**
726  * Transmit a GET request (and if successful, start to receive
727  * the response).
728  *
729  * @param handle statistics handle
730  * @param size how many bytes can we write to buf
731  * @param buf where to write requests to the service
732  * @return number of bytes written to buf
733  */
734 static size_t
735 transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
736 {
737   struct GNUNET_STATISTICS_GetHandle *c;
738   struct GNUNET_MessageHeader *hdr;
739   size_t slen1;
740   size_t slen2;
741   uint16_t msize;
742
743   GNUNET_assert (NULL != (c = handle->current));
744   if (NULL == buf)
745   {
746     /* timeout / error */
747     LOG (GNUNET_ERROR_TYPE_DEBUG,
748          "Transmission of request for statistics failed!\n");
749     do_disconnect (handle);
750     reconnect_later (handle);
751     return 0;
752   }
753   slen1 = strlen (c->subsystem) + 1;
754   slen2 = strlen (c->name) + 1;
755   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
756   GNUNET_assert (msize <= size);
757   hdr = (struct GNUNET_MessageHeader *) buf;
758   hdr->size = htons (msize);
759   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
760   GNUNET_assert (slen1 + slen2 ==
761                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
762                                              c->subsystem,
763                                              c->name));
764   if (GNUNET_YES != handle->receiving)
765   {
766     LOG (GNUNET_ERROR_TYPE_DEBUG,
767          "Transmission of GET done, now reading response\n");
768     handle->receiving = GNUNET_YES;
769     GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
770                            GNUNET_TIME_absolute_get_remaining (c->timeout));
771   }
772   return msize;
773 }
774
775
776 /**
777  * Transmit a WATCH request (and if successful, start to receive
778  * the response).
779  *
780  * @param handle statistics handle
781  * @param size how many bytes can we write to buf
782  * @param buf where to write requests to the service
783  * @return number of bytes written to buf
784  */
785 static size_t
786 transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
787 {
788   struct GNUNET_MessageHeader *hdr;
789   size_t slen1;
790   size_t slen2;
791   uint16_t msize;
792
793   if (NULL == buf)
794   {
795     /* timeout / error */
796     LOG (GNUNET_ERROR_TYPE_DEBUG,
797          "Transmission of request for statistics failed!\n");
798     do_disconnect (handle);
799     reconnect_later (handle);
800     return 0;
801   }
802   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting watch request for `%s'\n",
803        handle->current->name);
804   slen1 = strlen (handle->current->subsystem) + 1;
805   slen2 = strlen (handle->current->name) + 1;
806   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
807   GNUNET_assert (msize <= size);
808   hdr = (struct GNUNET_MessageHeader *) buf;
809   hdr->size = htons (msize);
810   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
811   GNUNET_assert (slen1 + slen2 ==
812                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
813                                              handle->current->subsystem,
814                                              handle->current->name));
815   if (GNUNET_YES != handle->receiving)
816   {
817     handle->receiving = GNUNET_YES;
818     GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
819                            GNUNET_TIME_UNIT_FOREVER_REL);
820   }
821   GNUNET_assert (NULL == handle->current->cont);
822   free_action_item (handle->current);
823   handle->current = NULL;
824   return msize;
825 }
826
827
828 /**
829  * Transmit a SET/UPDATE request.
830  *
831  * @param handle statistics handle
832  * @param size how many bytes can we write to buf
833  * @param buf where to write requests to the service
834  * @return number of bytes written to buf
835  */
836 static size_t
837 transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
838 {
839   struct GNUNET_STATISTICS_SetMessage *r;
840   size_t slen;
841   size_t nlen;
842   size_t nsize;
843
844   if (NULL == buf)
845   {
846     do_disconnect (handle);
847     reconnect_later (handle);
848     return 0;
849   }
850   slen = strlen (handle->current->subsystem) + 1;
851   nlen = strlen (handle->current->name) + 1;
852   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
853   if (size < nsize)
854   {
855     GNUNET_break (0);
856     do_disconnect (handle);
857     reconnect_later (handle);
858     return 0;
859   }
860   r = buf;
861   r->header.size = htons (nsize);
862   r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
863   r->flags = 0;
864   r->value = GNUNET_htonll (handle->current->value);
865   if (handle->current->make_persistent)
866     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
867   if (handle->current->type == ACTION_UPDATE)
868     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
869   GNUNET_assert (slen + nlen ==
870                  GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2,
871                                              handle->current->subsystem,
872                                              handle->current->name));
873   GNUNET_assert (NULL == handle->current->cont);
874   free_action_item (handle->current);
875   handle->current = NULL;
876   update_memory_statistics (handle);
877   return nsize;
878 }
879
880
881 /**
882  * Function called when we are ready to transmit a request to the service.
883  *
884  * @param cls the 'struct GNUNET_STATISTICS_Handle'
885  * @param size how many bytes can we write to buf
886  * @param buf where to write requests to the service
887  * @return number of bytes written to buf
888  */
889 static size_t
890 transmit_action (void *cls, size_t size, void *buf)
891 {
892   struct GNUNET_STATISTICS_Handle *h = cls;
893   size_t ret;
894
895   h->th = NULL;
896   ret = 0;
897   if (NULL != h->current)
898     switch (h->current->type)
899     {
900     case ACTION_GET:
901       ret = transmit_get (h, size, buf);
902       break;
903     case ACTION_SET:
904     case ACTION_UPDATE:
905       ret = transmit_set (h, size, buf);
906       break;
907     case ACTION_WATCH:
908       ret = transmit_watch (h, size, buf);
909       break;
910     default:
911       GNUNET_assert (0);
912       break;
913     }
914   schedule_action (h);
915   return ret;
916 }
917
918
919 /**
920  * Get handle for the statistics service.
921  *
922  * @param subsystem name of subsystem using the service
923  * @param cfg services configuration in use
924  * @return handle to use
925  */
926 struct GNUNET_STATISTICS_Handle *
927 GNUNET_STATISTICS_create (const char *subsystem,
928                           const struct GNUNET_CONFIGURATION_Handle *cfg)
929 {
930   struct GNUNET_STATISTICS_Handle *ret;
931
932   if (GNUNET_YES ==
933       GNUNET_CONFIGURATION_get_value_yesno (cfg, "statistics", "DISABLE"))
934     return NULL;
935   GNUNET_assert (NULL != subsystem);
936   GNUNET_assert (NULL != cfg);
937   ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
938   ret->cfg = cfg;
939   ret->subsystem = GNUNET_strdup (subsystem);
940   ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
941   return ret;
942 }
943
944
945 /**
946  * Destroy a handle (free all state associated with
947  * it).
948  *
949  * @param h statistics handle to destroy
950  * @param sync_first set to GNUNET_YES if pending SET requests should
951  *        be completed
952  */
953 void
954 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
955 {
956   struct GNUNET_STATISTICS_GetHandle *pos;
957   struct GNUNET_STATISTICS_GetHandle *next;
958   struct GNUNET_TIME_Relative timeout;
959   int i;
960
961   if (NULL == h)
962     return;
963   GNUNET_assert (GNUNET_NO == h->do_destroy); // Don't call twice.
964   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
965   {
966     GNUNET_SCHEDULER_cancel (h->backoff_task);
967     h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
968   }
969   if (sync_first)
970   {
971     if (NULL != h->current)
972     {
973       if (ACTION_GET == h->current->type)
974       {
975         GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
976         h->th = NULL;
977         free_action_item (h->current);
978         h->current = NULL;
979       }
980     }
981     next = h->action_head; 
982     while (NULL != (pos = next))
983     {
984       next = pos->next;
985       if (ACTION_GET == pos->type)
986       {
987         GNUNET_CONTAINER_DLL_remove (h->action_head,
988                                      h->action_tail,
989                                      pos);
990         free_action_item (pos);
991       }
992     }
993     if ( (NULL == h->current) &&
994          (NULL != (h->current = h->action_head)) )
995       GNUNET_CONTAINER_DLL_remove (h->action_head,
996                                    h->action_tail,
997                                    h->current);
998     h->do_destroy = GNUNET_YES;
999     if ((NULL != h->current) && (NULL == h->th) &&
1000         (NULL != h->client))
1001     {
1002       timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
1003       h->th =
1004         GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
1005                                              timeout, GNUNET_YES,
1006                                              &transmit_action, h);
1007       GNUNET_assert (NULL != h->th);
1008     }
1009     if (NULL != h->th)
1010       return; /* do not finish destruction just yet */
1011   }
1012   while (NULL != (pos = h->action_head))
1013   {
1014     GNUNET_CONTAINER_DLL_remove (h->action_head,
1015                                  h->action_tail,
1016                                  pos);
1017     free_action_item (pos);
1018   }
1019   do_disconnect (h);
1020   for (i = 0; i < h->watches_size; i++)
1021   {
1022     if (NULL == h->watches[i])
1023       continue; 
1024     GNUNET_free (h->watches[i]->subsystem);
1025     GNUNET_free (h->watches[i]->name);
1026     GNUNET_free (h->watches[i]);
1027   }
1028   GNUNET_array_grow (h->watches, h->watches_size, 0);
1029   GNUNET_free (h->subsystem);
1030   GNUNET_free (h);
1031 }
1032
1033
1034 /**
1035  * Function called to transmit TEST message to service to
1036  * confirm that the service has received all of our 'SET'
1037  * messages (during statistics disconnect/shutdown).
1038  *
1039  * @param cls the 'struct GNUNET_STATISTICS_Handle'
1040  * @param size how many bytes can we write to buf
1041  * @param buf where to write requests to the service
1042  * @return number of bytes written to buf
1043  */
1044 static size_t
1045 transmit_test_on_shutdown (void *cls,
1046                            size_t size,
1047                            void *buf)
1048 {
1049   struct GNUNET_STATISTICS_Handle *h = cls;
1050   struct GNUNET_MessageHeader hdr;
1051
1052   h->th = NULL;
1053   if (NULL == buf)
1054   {
1055     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1056                 _("Failed to receive acknowledgement from statistics service, some statistics might have been lost!\n"));
1057     h->do_destroy = GNUNET_NO;
1058     GNUNET_SCHEDULER_add_continuation (&destroy_task, h,
1059                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1060     return 0;
1061   }
1062   hdr.type = htons (GNUNET_MESSAGE_TYPE_TEST);
1063   hdr.size = htons (sizeof (struct GNUNET_MessageHeader));
1064   memcpy (buf, &hdr, sizeof (hdr));
1065   if (GNUNET_YES != h->receiving)
1066   {
1067     h->receiving = GNUNET_YES;
1068     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
1069                            GNUNET_TIME_UNIT_FOREVER_REL);
1070   }
1071   return sizeof (struct GNUNET_MessageHeader);
1072 }
1073
1074
1075 /**
1076  * Schedule the next action to be performed.
1077  *
1078  * @param h statistics handle
1079  */
1080 static void
1081 schedule_action (struct GNUNET_STATISTICS_Handle *h)
1082 {
1083   struct GNUNET_TIME_Relative timeout;
1084
1085   if ( (NULL != h->th) ||
1086        (GNUNET_SCHEDULER_NO_TASK != h->backoff_task) )
1087     return;                     /* action already pending */
1088   if (GNUNET_YES != try_connect (h))
1089   {
1090     reconnect_later (h);
1091     return;
1092   }
1093   if (NULL != h->current)
1094     return; /* action already pending */
1095   /* schedule next action */
1096   h->current = h->action_head;
1097   if (NULL == h->current)
1098   {
1099     if (GNUNET_YES == h->do_destroy)
1100     {
1101       h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
1102       h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
1103                                                    sizeof (struct GNUNET_MessageHeader),
1104                                                    SET_TRANSMIT_TIMEOUT,
1105                                                    GNUNET_NO,
1106                                                    &transmit_test_on_shutdown, h);
1107     }
1108     return;
1109   }
1110   GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current);
1111   timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
1112   if (NULL ==
1113       (h->th =
1114        GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
1115                                             timeout, GNUNET_YES,
1116                                             &transmit_action, h)))
1117   {
1118     LOG (GNUNET_ERROR_TYPE_DEBUG,
1119          "Failed to transmit request to statistics service.\n");
1120     do_disconnect (h);
1121     reconnect_later (h);
1122   }
1123 }
1124
1125
1126 /**
1127  * Get statistic from the peer.
1128  *
1129  * @param handle identification of the statistics service
1130  * @param subsystem limit to the specified subsystem, NULL for our subsystem
1131  * @param name name of the statistic value, NULL for all values
1132  * @param timeout after how long should we give up (and call
1133  *        cont with an error code)?
1134  * @param cont continuation to call when done (can be NULL)
1135  *        This callback CANNOT destroy the statistics handle in the same call.
1136  * @param proc function to call on each value
1137  * @param cls closure for cont and proc
1138  * @return NULL on error
1139  */
1140 struct GNUNET_STATISTICS_GetHandle *
1141 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1142                        const char *subsystem, const char *name,
1143                        struct GNUNET_TIME_Relative timeout,
1144                        GNUNET_STATISTICS_Callback cont,
1145                        GNUNET_STATISTICS_Iterator proc, void *cls)
1146 {
1147   size_t slen1;
1148   size_t slen2;
1149   struct GNUNET_STATISTICS_GetHandle *ai;
1150
1151   if (NULL == handle)
1152     return NULL;
1153   GNUNET_assert (NULL != proc);
1154   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1155   if (NULL == subsystem)
1156     subsystem = "";
1157   if (NULL == name)
1158     name = "";
1159   slen1 = strlen (subsystem) + 1;
1160   slen2 = strlen (name) + 1;
1161   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
1162                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1163   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1164   ai->sh = handle;
1165   ai->subsystem = GNUNET_strdup (subsystem);
1166   ai->name = GNUNET_strdup (name);
1167   ai->cont = cont;
1168   ai->proc = proc;
1169   ai->cls = cls;
1170   ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1171   ai->type = ACTION_GET;
1172   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
1173   GNUNET_CONTAINER_DLL_insert_tail (handle->action_head, handle->action_tail,
1174                                     ai);
1175   schedule_action (handle);
1176   return ai;
1177 }
1178
1179
1180 /**
1181  * Cancel a 'get' request.  Must be called before the 'cont'
1182  * function is called.
1183  *
1184  * @param gh handle of the request to cancel
1185  */
1186 void
1187 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1188 {
1189   if (NULL == gh)
1190     return;
1191   if (gh->sh->current == gh)
1192   {
1193     gh->aborted = GNUNET_YES;
1194   }
1195   else
1196   {
1197     GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, gh->sh->action_tail, gh);
1198     GNUNET_free (gh->name);
1199     GNUNET_free (gh->subsystem);
1200     GNUNET_free (gh);
1201   }
1202 }
1203
1204
1205 /**
1206  * Watch statistics from the peer (be notified whenever they change).
1207  *
1208  * @param handle identification of the statistics service
1209  * @param subsystem limit to the specified subsystem, never NULL
1210  * @param name name of the statistic value, never NULL
1211  * @param proc function to call on each value
1212  * @param proc_cls closure for proc
1213  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1214  */
1215 int
1216 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1217                          const char *subsystem, const char *name,
1218                          GNUNET_STATISTICS_Iterator proc, void *proc_cls)
1219 {
1220   struct GNUNET_STATISTICS_WatchEntry *w;
1221
1222   if (NULL == handle)
1223     return GNUNET_SYSERR;
1224   w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
1225   w->subsystem = GNUNET_strdup (subsystem);
1226   w->name = GNUNET_strdup (name);
1227   w->proc = proc;
1228   w->proc_cls = proc_cls;
1229   GNUNET_array_append (handle->watches, handle->watches_size, w);
1230   schedule_watch_request (handle, w);
1231   return GNUNET_OK;
1232 }
1233
1234
1235 /**
1236  * Stop watching statistics from the peer.  
1237  *
1238  * @param handle identification of the statistics service
1239  * @param subsystem limit to the specified subsystem, never NULL
1240  * @param name name of the statistic value, never NULL
1241  * @param proc function to call on each value
1242  * @param proc_cls closure for proc
1243  * @return GNUNET_OK on success, GNUNET_SYSERR on error (no such watch)
1244  */
1245 int
1246 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
1247                                 const char *subsystem, const char *name,
1248                                 GNUNET_STATISTICS_Iterator proc, void *proc_cls)
1249 {
1250   struct GNUNET_STATISTICS_WatchEntry *w;
1251   unsigned int i;
1252
1253   if (NULL == handle)
1254     return GNUNET_SYSERR;
1255   for (i=0;i<handle->watches_size;i++)
1256   {
1257     w = handle->watches[i];
1258     if (NULL == w)
1259       continue;
1260     if ( (w->proc == proc) &&
1261          (w->proc_cls == proc_cls) &&
1262          (0 == strcmp (w->name, name)) &&
1263          (0 == strcmp (w->subsystem, subsystem)) )
1264     {
1265       GNUNET_free (w->name);
1266       GNUNET_free (w->subsystem);
1267       GNUNET_free (w);
1268       handle->watches[i] = NULL;      
1269       return GNUNET_OK;
1270     }    
1271   }
1272   return GNUNET_SYSERR;
1273 }
1274
1275
1276
1277 /**
1278  * Queue a request to change a statistic.
1279  *
1280  * @param h statistics handle
1281  * @param name name of the value
1282  * @param make_persistent  should the value be kept across restarts?
1283  * @param value new value or change
1284  * @param type type of the action (ACTION_SET or ACTION_UPDATE)
1285  */
1286 static void
1287 add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
1288                    int make_persistent, uint64_t value, enum ActionType type)
1289 {
1290   struct GNUNET_STATISTICS_GetHandle *ai;
1291   size_t slen;
1292   size_t nlen;
1293   size_t nsize;
1294   int64_t delta;
1295
1296   GNUNET_assert (NULL != h);
1297   GNUNET_assert (NULL != name);
1298   slen = strlen (h->subsystem) + 1;
1299   nlen = strlen (name) + 1;
1300   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1301   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1302   {
1303     GNUNET_break (0);
1304     return;
1305   }
1306   for (ai = h->action_head; NULL != ai; ai = ai->next)
1307   {
1308     if (! ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
1309             (0 == strcmp (ai->name, name)) && 
1310             ( (ACTION_UPDATE == ai->type) ||
1311               (ACTION_SET == ai->type) ) ) )
1312       continue;
1313     if (ACTION_SET == ai->type)
1314     {
1315       if (ACTION_UPDATE == type)
1316       {
1317         delta = (int64_t) value;
1318         if (delta > 0)
1319         {
1320           /* update old set by new delta */
1321           ai->value += delta;
1322         }
1323         else
1324         {
1325           /* update old set by new delta, but never go negative */
1326           if (ai->value < -delta)
1327             ai->value = 0;
1328           else
1329             ai->value += delta;
1330         }
1331       }
1332       else
1333       {
1334         /* new set overrides old set */
1335         ai->value = value;
1336       }
1337     }
1338     else
1339     {
1340       if (ACTION_UPDATE == type)
1341       {
1342         /* make delta cummulative */
1343         delta = (int64_t) value;
1344         ai->value += delta;
1345       }
1346       else
1347       {
1348         /* drop old 'update', use new 'set' instead */
1349         ai->value = value;
1350         ai->type = type;
1351       }
1352     }
1353     ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1354     ai->make_persistent = make_persistent;
1355     return;  
1356   }
1357   /* no existing entry matches, create a fresh one */
1358   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1359   ai->sh = h;
1360   ai->subsystem = GNUNET_strdup (h->subsystem);
1361   ai->name = GNUNET_strdup (name);
1362   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1363   ai->make_persistent = make_persistent;
1364   ai->msize = nsize;
1365   ai->value = value;
1366   ai->type = type;
1367   GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
1368                                     ai);
1369   schedule_action (h);
1370 }
1371
1372
1373 /**
1374  * Set statistic value for the peer.  Will always use our
1375  * subsystem (the argument used when "handle" was created).
1376  *
1377  * @param handle identification of the statistics service
1378  * @param name name of the statistic value
1379  * @param value new value to set
1380  * @param make_persistent should the value be kept across restarts?
1381  */
1382 void
1383 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1384                        const char *name, uint64_t value, int make_persistent)
1385 {
1386   if (NULL == handle)
1387     return;
1388   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1389   add_setter_action (handle, name, make_persistent, value, ACTION_SET);
1390 }
1391
1392
1393 /**
1394  * Set statistic value for the peer.  Will always use our
1395  * subsystem (the argument used when "handle" was created).
1396  *
1397  * @param handle identification of the statistics service
1398  * @param name name of the statistic value
1399  * @param delta change in value (added to existing value)
1400  * @param make_persistent should the value be kept across restarts?
1401  */
1402 void
1403 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1404                           const char *name, int64_t delta, int make_persistent)
1405 {
1406   if (NULL == handle)
1407     return;
1408   if (0 == delta)
1409     return;
1410   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1411   add_setter_action (handle, name, make_persistent, (uint64_t) delta,
1412                      ACTION_UPDATE);
1413 }
1414
1415
1416 /* end of statistics_api.c */