305c6f3a5c038ebcf48971a703ce1ef916b8b78a
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_client_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_server_lib.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_strings_lib.h"
34 #include "statistics.h"
35
36 /**
37  * How long do we wait until a statistics request for setting
38  * a value times out?  (The update will be lost if the
39  * service does not react within this timeframe).
40  */
41 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "statistics-api",__VA_ARGS__)
44
45 /**
46  * Types of actions.
47  */
48 enum ActionType
49 {
50   /**
51    * Get a value.
52    */
53   ACTION_GET,
54
55   /**
56    * Set a value.
57    */
58   ACTION_SET,
59
60   /**
61    * Update a value.
62    */
63   ACTION_UPDATE,
64
65   /**
66    * Watch a value.
67    */
68   ACTION_WATCH
69 };
70
71
72 /**
73  * Entry kept for each value we are watching.
74  */
75 struct GNUNET_STATISTICS_WatchEntry
76 {
77
78   /**
79    * What subsystem is this action about? (never NULL)
80    */
81   char *subsystem;
82
83   /**
84    * What value is this action about? (never NULL)
85    */
86   char *name;
87
88   /**
89    * Function to call
90    */
91   GNUNET_STATISTICS_Iterator proc;
92
93   /**
94    * Closure for proc
95    */
96   void *proc_cls;
97
98 };
99
100
101 /**
102  * Linked list of things we still need to do.
103  */
104 struct GNUNET_STATISTICS_GetHandle
105 {
106
107   /**
108    * This is a doubly linked list.
109    */
110   struct GNUNET_STATISTICS_GetHandle *next;
111
112   /**
113    * This is a doubly linked list.
114    */
115   struct GNUNET_STATISTICS_GetHandle *prev;
116
117   /**
118    * Main statistics handle.
119    */
120   struct GNUNET_STATISTICS_Handle *sh;
121
122   /**
123    * What subsystem is this action about? (can be NULL)
124    */
125   char *subsystem;
126
127   /**
128    * What value is this action about? (can be NULL)
129    */
130   char *name;
131
132   /**
133    * Continuation to call once action is complete.
134    */
135   GNUNET_STATISTICS_Callback cont;
136
137   /**
138    * Function to call (for GET actions only).
139    */
140   GNUNET_STATISTICS_Iterator proc;
141
142   /**
143    * Closure for proc and cont.
144    */
145   void *cls;
146
147   /**
148    * Timeout for this action.
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Associated value.
154    */
155   uint64_t value;
156
157   /**
158    * Flag for SET/UPDATE actions.
159    */
160   int make_persistent;
161
162   /**
163    * Has the current iteration been aborted; for GET actions.
164    */
165   int aborted;
166
167   /**
168    * Is this a GET, SET, UPDATE or WATCH?
169    */
170   enum ActionType type;
171
172   /**
173    * Size of the message that we will be transmitting.
174    */
175   uint16_t msize;
176
177 };
178
179
180 /**
181  * Handle for the service.
182  */
183 struct GNUNET_STATISTICS_Handle
184 {
185   /**
186    * Name of our subsystem.
187    */
188   char *subsystem;
189
190   /**
191    * Configuration to use.
192    */
193   const struct GNUNET_CONFIGURATION_Handle *cfg;
194
195   /**
196    * Socket (if available).
197    */
198   struct GNUNET_CLIENT_Connection *client;
199
200   /**
201    * Currently pending transmission request.
202    */
203   struct GNUNET_CLIENT_TransmitHandle *th;
204
205   /**
206    * Head of the linked list of pending actions (first action
207    * to be performed).
208    */
209   struct GNUNET_STATISTICS_GetHandle *action_head;
210
211   /**
212    * Tail of the linked list of actions (for fast append).
213    */
214   struct GNUNET_STATISTICS_GetHandle *action_tail;
215
216   /**
217    * Action we are currently busy with (action request has been
218    * transmitted, we're now receiving the response from the
219    * service).
220    */
221   struct GNUNET_STATISTICS_GetHandle *current;
222
223   /**
224    * Array of watch entries.
225    */
226   struct GNUNET_STATISTICS_WatchEntry **watches;
227
228   /**
229    * Task doing exponential back-off trying to reconnect.
230    */
231   GNUNET_SCHEDULER_TaskIdentifier backoff_task;
232
233   /**
234    * Time for next connect retry.
235    */
236   struct GNUNET_TIME_Relative backoff;
237
238   /**
239    * Maximum heap size observed so far (if available).
240    */
241   uint64_t peak_heap_size;
242
243   /**
244    * Maximum resident set side observed so far (if available).
245    */
246   uint64_t peak_rss;
247
248   /**
249    * Size of the 'watches' array.
250    */
251   unsigned int watches_size;
252
253   /**
254    * Should this handle auto-destruct once all actions have
255    * been processed?
256    */
257   int do_destroy;
258
259   /**
260    * Are we currently receiving from the service?
261    */
262   int receiving;
263
264 };
265
266
267 /**
268  * Obtain statistics about this process's memory consumption and
269  * report those as well (if they changed).
270  */
271 static void
272 update_memory_statistics (struct GNUNET_STATISTICS_Handle *h)
273 {
274 #if ENABLE_HEAP_STATISTICS
275   uint64_t current_heap_size = 0;
276   uint64_t current_rss = 0;
277
278   if (GNUNET_NO != h->do_destroy)
279     return;
280 #if HAVE_MALLINFO
281   {
282     struct mallinfo mi;
283     
284     mi = mallinfo();
285     current_heap_size = mi.uordblks + mi.fordblks;  
286   }
287 #endif  
288 #if HAVE_GETRUSAGE
289   {
290     struct rusage ru;
291
292     if (0 == getrusage (RUSAGE_SELF, &ru))
293     {
294       current_rss = 1024LL * ru.ru_maxrss;
295     }    
296   }
297 #endif
298   if (current_heap_size > h->peak_heap_size)
299   {
300     h->peak_heap_size = current_heap_size;
301     GNUNET_STATISTICS_set (h, "# peak heap size", current_heap_size, GNUNET_NO);
302   }
303   if (current_rss > h->peak_rss)
304   {
305     h->peak_rss = current_rss;
306     GNUNET_STATISTICS_set (h, "# peak resident set size", current_rss, GNUNET_NO);
307   }
308 #endif
309 }
310
311
312 /**
313  * Schedule the next action to be performed.
314  *
315  * @param h statistics handle to reconnect
316  */
317 static void
318 schedule_action (struct GNUNET_STATISTICS_Handle *h);
319
320
321 /**
322  * Transmit request to service that we want to watch
323  * the development of a particular value.
324  *
325  * @param h statistics handle
326  * @param watch watch entry of the value to watch
327  */
328 static void
329 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
330                         struct GNUNET_STATISTICS_WatchEntry *watch)
331 {
332
333   struct GNUNET_STATISTICS_GetHandle *ai;
334   size_t slen;
335   size_t nlen;
336   size_t nsize;
337
338   GNUNET_assert (NULL != h);
339   GNUNET_assert (NULL != watch);
340
341   slen = strlen (watch->subsystem) + 1;
342   nlen = strlen (watch->name) + 1;
343   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
344   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
345   {
346     GNUNET_break (0);
347     return;
348   }
349   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
350   ai->sh = h;
351   ai->subsystem = GNUNET_strdup (watch->subsystem);
352   ai->name = GNUNET_strdup (watch->name);
353   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
354   ai->msize = nsize;
355   ai->type = ACTION_WATCH;
356   ai->proc = watch->proc;
357   ai->cls = watch->proc_cls;
358   GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
359                                     ai);
360   schedule_action (h);
361 }
362
363
364 /**
365  * Free memory associated with the given action item.
366  *
367  * @param gh action item to free
368  */
369 static void
370 free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
371 {
372   GNUNET_free_non_null (gh->subsystem);
373   GNUNET_free_non_null (gh->name);
374   GNUNET_free (gh);
375 }
376
377
378 /**
379  * Disconnect from the statistics service.
380  *
381  * @param h statistics handle to disconnect from
382  */
383 static void
384 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
385 {
386   struct GNUNET_STATISTICS_GetHandle *c;
387   
388   if (NULL != h->th)
389   {
390     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
391     h->th = NULL;
392   } 
393   if (NULL != h->client)
394   {
395     GNUNET_CLIENT_disconnect (h->client);
396     h->client = NULL;
397   }
398   h->receiving = GNUNET_NO;
399   if (NULL != (c = h->current))
400   {
401     h->current = NULL;
402     if (NULL != c->cont)
403       c->cont (c->cls, GNUNET_SYSERR);
404     free_action_item (c);
405   }
406 }
407
408
409 /**
410  * Try to (re)connect to the statistics service.
411  *
412  * @param h statistics handle to reconnect
413  * @return GNUNET_YES on success, GNUNET_NO on failure.
414  */
415 static int
416 try_connect (struct GNUNET_STATISTICS_Handle *h)
417 {
418   struct GNUNET_STATISTICS_GetHandle *gh;
419   struct GNUNET_STATISTICS_GetHandle *gn;
420   unsigned int i;
421
422   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
423     return GNUNET_NO;
424   if (NULL != h->client)
425     return GNUNET_YES;
426   h->client = GNUNET_CLIENT_connect ("statistics", h->cfg);  
427   if (NULL != h->client)
428   {
429     gn = h->action_head; 
430     while (NULL != (gh = gn))
431     {
432       gn = gh->next;
433       if (gh->type == ACTION_WATCH)
434       {
435         GNUNET_CONTAINER_DLL_remove (h->action_head,
436                                      h->action_tail,
437                                      gh);
438         free_action_item (gh);  
439       }
440     }
441     for (i = 0; i < h->watches_size; i++)
442     {
443       if (NULL != h->watches[i])
444         schedule_watch_request (h, h->watches[i]);
445     }
446     return GNUNET_YES;
447   }
448   LOG (GNUNET_ERROR_TYPE_DEBUG,
449        "Failed to connect to statistics service!\n");
450   return GNUNET_NO;
451 }
452
453
454 /**
455  * We've waited long enough, reconnect now.
456  *
457  * @param cls the 'struct GNUNET_STATISTICS_Handle' to reconnect
458  * @param tc scheduler context (unused)
459  */
460 static void
461 reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
462 {
463   struct GNUNET_STATISTICS_Handle *h = cls;
464
465   h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
466   schedule_action (h);
467 }
468
469
470 /**
471  * Task used by 'reconnect_later' to shutdown the handle
472  *
473  * @param cls the statistics handle
474  * @param tc scheduler context
475  */
476 static void
477 do_destroy (void *cls,
478                const struct GNUNET_SCHEDULER_TaskContext *tc)
479 {
480   struct GNUNET_STATISTICS_Handle *h = cls;
481
482   GNUNET_STATISTICS_destroy (h, GNUNET_NO);
483 }
484
485
486 /**
487  * Reconnect at a later time, respecting back-off.
488  *
489  * @param h statistics handle
490  */
491 static void
492 reconnect_later (struct GNUNET_STATISTICS_Handle *h)
493 {
494   int loss;
495   struct GNUNET_STATISTICS_GetHandle *gh;
496
497   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == h->backoff_task);
498   if (GNUNET_YES == h->do_destroy)
499   {
500     /* So we are shutting down and the service is not reachable.
501      * Chances are that it's down for good and we are not going to connect to
502      * it anymore.
503      * Give up and don't sync the rest of the data.
504      */
505     loss = GNUNET_NO;
506     for (gh = h->action_head; NULL != gh; gh = gh->next)
507       if ( (gh->make_persistent) && (ACTION_SET == gh->type) )
508         loss = GNUNET_YES;
509     if (GNUNET_YES == loss)
510       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
511                   _("Could not save some persistent statistics\n"));
512     h->do_destroy = GNUNET_NO;
513     GNUNET_SCHEDULER_add_continuation (&do_destroy, h,
514                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
515     return;
516   }
517   h->backoff_task =
518     GNUNET_SCHEDULER_add_delayed (h->backoff, &reconnect_task, h);
519   h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff);
520 }
521
522
523 /**
524  * Process a 'GNUNET_MESSAGE_TYPE_STATISTICS_VALUE' message.
525  *
526  * @param h statistics handle
527  * @param msg message received from the service, never NULL
528  * @return GNUNET_OK if the message was well-formed
529  */
530 static int
531 process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h,
532                                   const struct GNUNET_MessageHeader *msg)
533 {
534   char *service;
535   char *name;
536   const struct GNUNET_STATISTICS_ReplyMessage *smsg;
537   uint16_t size;
538
539   if (h->current->aborted)
540   {
541     LOG (GNUNET_ERROR_TYPE_DEBUG, "Iteration was aborted, ignoring VALUE\n");
542     return GNUNET_OK;           /* don't bother */
543   }
544   size = ntohs (msg->size);
545   if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
546   {
547     GNUNET_break (0);
548     return GNUNET_SYSERR;
549   }
550   smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
551   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
552   if (size !=
553       GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], size, 2,
554                                       &service, &name))
555   {
556     GNUNET_break (0);
557     return GNUNET_SYSERR;
558   }
559   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received valid statistic on `%s:%s': %llu\n",
560        service, name, GNUNET_ntohll (smsg->value));
561   if (GNUNET_OK !=
562       h->current->proc (h->current->cls, service, name,
563                         GNUNET_ntohll (smsg->value),
564                         0 !=
565                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
566   {
567     LOG (GNUNET_ERROR_TYPE_DEBUG,
568          "Processing of remaining statistics aborted by client.\n");
569     h->current->aborted = GNUNET_YES;
570   }
571   LOG (GNUNET_ERROR_TYPE_DEBUG, "VALUE processed successfully\n");
572   return GNUNET_OK;
573 }
574
575
576 /**
577  * We have received a watch value from the service.  Process it.
578  *
579  * @param h statistics handle
580  * @param msg the watch value message
581  * @return GNUNET_OK if the message was well-formed, GNUNET_SYSERR if not,
582  *         GNUNET_NO if this watch has been cancelled
583  */
584 static int
585 process_watch_value (struct GNUNET_STATISTICS_Handle *h,
586                      const struct GNUNET_MessageHeader *msg)
587 {
588   const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
589   struct GNUNET_STATISTICS_WatchEntry *w;
590   uint32_t wid;
591
592   if (sizeof (struct GNUNET_STATISTICS_WatchValueMessage) != ntohs (msg->size))
593   {
594     GNUNET_break (0);
595     return GNUNET_SYSERR;
596   }
597   wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *) msg;
598   GNUNET_break (0 == ntohl (wvm->reserved));
599   wid = ntohl (wvm->wid);
600   if (wid >= h->watches_size)
601   {
602     GNUNET_break (0);
603     return GNUNET_SYSERR;
604   }
605   w = h->watches[wid];
606   if (NULL == w)  
607     return GNUNET_NO;  
608   (void) w->proc (w->proc_cls, w->subsystem, w->name,
609                   GNUNET_ntohll (wvm->value),
610                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
611   return GNUNET_OK;
612 }
613
614
615 static void
616 destroy_task (void *cls,
617               const struct GNUNET_SCHEDULER_TaskContext *tc)
618 {
619   struct GNUNET_STATISTICS_Handle *h = cls;
620
621   GNUNET_STATISTICS_destroy (h, GNUNET_NO);
622 }
623
624
625 /**
626  * Function called with messages from stats service.
627  *
628  * @param cls closure
629  * @param msg message received, NULL on timeout or fatal error
630  */
631 static void
632 receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
633 {
634   struct GNUNET_STATISTICS_Handle *h = cls;
635   struct GNUNET_STATISTICS_GetHandle *c;
636   int ret;
637
638   if (NULL == msg)
639   {
640     LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
641          "Error receiving statistics from service, is the service running?\n");
642     do_disconnect (h);
643     reconnect_later (h);
644     return;
645   }
646   switch (ntohs (msg->type))
647   {
648   case GNUNET_MESSAGE_TYPE_TEST:
649     if (GNUNET_SYSERR != h->do_destroy)
650     {
651       /* not in shutdown, why do we get 'TEST'? */
652       GNUNET_break (0);
653       do_disconnect (h);
654       reconnect_later (h);
655       return;
656     }
657     h->do_destroy = GNUNET_NO;
658     GNUNET_SCHEDULER_add_continuation (&destroy_task, h,
659                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
660     break;
661   case GNUNET_MESSAGE_TYPE_STATISTICS_END:
662     LOG (GNUNET_ERROR_TYPE_DEBUG, "Received end of statistics marker\n");
663     if (NULL == (c = h->current))
664     {
665       GNUNET_break (0);
666       do_disconnect (h);
667       reconnect_later (h);
668       return;
669     }
670     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
671     if (h->watches_size > 0)
672     {
673       GNUNET_CLIENT_receive (h->client, &receive_stats, h,
674                              GNUNET_TIME_UNIT_FOREVER_REL);
675     }
676     else
677     {
678       h->receiving = GNUNET_NO;
679     }    
680     h->current = NULL;
681     schedule_action (h);
682     if (NULL != c->cont)
683       c->cont (c->cls, GNUNET_OK);
684     free_action_item (c);
685     return;
686   case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
687     if (GNUNET_OK != process_statistics_value_message (h, msg))
688     {
689       do_disconnect (h);
690       reconnect_later (h);
691       return;     
692     }
693     /* finally, look for more! */
694     LOG (GNUNET_ERROR_TYPE_DEBUG,
695          "Processing VALUE done, now reading more\n");
696     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
697                            GNUNET_TIME_absolute_get_remaining (h->
698                                                                current->timeout));
699     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
700     return;
701   case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
702     if (GNUNET_OK != 
703         (ret = process_watch_value (h, msg)))
704     {
705       do_disconnect (h);
706       if (GNUNET_NO == ret)
707         h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; 
708       reconnect_later (h);
709       return;
710     }
711     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
712     GNUNET_assert (h->watches_size > 0);
713     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
714                            GNUNET_TIME_UNIT_FOREVER_REL);
715     return;    
716   default:
717     GNUNET_break (0);
718     do_disconnect (h);
719     reconnect_later (h);
720     return;
721   }
722 }
723
724
725 /**
726  * Transmit a GET request (and if successful, start to receive
727  * the response).
728  *
729  * @param handle statistics handle
730  * @param size how many bytes can we write to buf
731  * @param buf where to write requests to the service
732  * @return number of bytes written to buf
733  */
734 static size_t
735 transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
736 {
737   struct GNUNET_STATISTICS_GetHandle *c;
738   struct GNUNET_MessageHeader *hdr;
739   size_t slen1;
740   size_t slen2;
741   uint16_t msize;
742
743   GNUNET_assert (NULL != (c = handle->current));
744   if (NULL == buf)
745   {
746     /* timeout / error */
747     LOG (GNUNET_ERROR_TYPE_DEBUG,
748          "Transmission of request for statistics failed!\n");
749     do_disconnect (handle);
750     reconnect_later (handle);
751     return 0;
752   }
753   slen1 = strlen (c->subsystem) + 1;
754   slen2 = strlen (c->name) + 1;
755   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
756   GNUNET_assert (msize <= size);
757   hdr = (struct GNUNET_MessageHeader *) buf;
758   hdr->size = htons (msize);
759   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
760   GNUNET_assert (slen1 + slen2 ==
761                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
762                                              c->subsystem,
763                                              c->name));
764   if (GNUNET_YES != handle->receiving)
765   {
766     LOG (GNUNET_ERROR_TYPE_DEBUG,
767          "Transmission of GET done, now reading response\n");
768     handle->receiving = GNUNET_YES;
769     GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
770                            GNUNET_TIME_absolute_get_remaining (c->timeout));
771   }
772   return msize;
773 }
774
775
776 /**
777  * Transmit a WATCH request (and if successful, start to receive
778  * the response).
779  *
780  * @param handle statistics handle
781  * @param size how many bytes can we write to buf
782  * @param buf where to write requests to the service
783  * @return number of bytes written to buf
784  */
785 static size_t
786 transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
787 {
788   struct GNUNET_MessageHeader *hdr;
789   size_t slen1;
790   size_t slen2;
791   uint16_t msize;
792
793   if (NULL == buf)
794   {
795     /* timeout / error */
796     LOG (GNUNET_ERROR_TYPE_DEBUG,
797          "Transmission of request for statistics failed!\n");
798     do_disconnect (handle);
799     reconnect_later (handle);
800     return 0;
801   }
802   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting watch request for `%s'\n",
803        handle->current->name);
804   slen1 = strlen (handle->current->subsystem) + 1;
805   slen2 = strlen (handle->current->name) + 1;
806   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
807   GNUNET_assert (msize <= size);
808   hdr = (struct GNUNET_MessageHeader *) buf;
809   hdr->size = htons (msize);
810   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
811   GNUNET_assert (slen1 + slen2 ==
812                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
813                                              handle->current->subsystem,
814                                              handle->current->name));
815   if (GNUNET_YES != handle->receiving)
816   {
817     handle->receiving = GNUNET_YES;
818     GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
819                            GNUNET_TIME_UNIT_FOREVER_REL);
820   }
821   GNUNET_assert (NULL == handle->current->cont);
822   free_action_item (handle->current);
823   handle->current = NULL;
824   return msize;
825 }
826
827
828 /**
829  * Transmit a SET/UPDATE request.
830  *
831  * @param handle statistics handle
832  * @param size how many bytes can we write to buf
833  * @param buf where to write requests to the service
834  * @return number of bytes written to buf
835  */
836 static size_t
837 transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
838 {
839   struct GNUNET_STATISTICS_SetMessage *r;
840   size_t slen;
841   size_t nlen;
842   size_t nsize;
843
844   if (NULL == buf)
845   {
846     do_disconnect (handle);
847     reconnect_later (handle);
848     return 0;
849   }
850   slen = strlen (handle->current->subsystem) + 1;
851   nlen = strlen (handle->current->name) + 1;
852   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
853   if (size < nsize)
854   {
855     GNUNET_break (0);
856     do_disconnect (handle);
857     reconnect_later (handle);
858     return 0;
859   }
860   r = buf;
861   r->header.size = htons (nsize);
862   r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
863   r->flags = 0;
864   r->value = GNUNET_htonll (handle->current->value);
865   if (handle->current->make_persistent)
866     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
867   if (handle->current->type == ACTION_UPDATE)
868     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
869   GNUNET_assert (slen + nlen ==
870                  GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2,
871                                              handle->current->subsystem,
872                                              handle->current->name));
873   GNUNET_assert (NULL == handle->current->cont);
874   free_action_item (handle->current);
875   handle->current = NULL;
876   update_memory_statistics (handle);
877   return nsize;
878 }
879
880
881 /**
882  * Function called when we are ready to transmit a request to the service.
883  *
884  * @param cls the 'struct GNUNET_STATISTICS_Handle'
885  * @param size how many bytes can we write to buf
886  * @param buf where to write requests to the service
887  * @return number of bytes written to buf
888  */
889 static size_t
890 transmit_action (void *cls, size_t size, void *buf)
891 {
892   struct GNUNET_STATISTICS_Handle *h = cls;
893   size_t ret;
894
895   h->th = NULL;
896   ret = 0;
897   if (NULL != h->current)
898     switch (h->current->type)
899     {
900     case ACTION_GET:
901       ret = transmit_get (h, size, buf);
902       break;
903     case ACTION_SET:
904     case ACTION_UPDATE:
905       ret = transmit_set (h, size, buf);
906       break;
907     case ACTION_WATCH:
908       ret = transmit_watch (h, size, buf);
909       break;
910     default:
911       GNUNET_assert (0);
912       break;
913     }
914   schedule_action (h);
915   return ret;
916 }
917
918
919 /**
920  * Get handle for the statistics service.
921  *
922  * @param subsystem name of subsystem using the service
923  * @param cfg services configuration in use
924  * @return handle to use
925  */
926 struct GNUNET_STATISTICS_Handle *
927 GNUNET_STATISTICS_create (const char *subsystem,
928                           const struct GNUNET_CONFIGURATION_Handle *cfg)
929 {
930   struct GNUNET_STATISTICS_Handle *ret;
931
932   GNUNET_assert (NULL != subsystem);
933   GNUNET_assert (NULL != cfg);
934   ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
935   ret->cfg = cfg;
936   ret->subsystem = GNUNET_strdup (subsystem);
937   ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
938   return ret;
939 }
940
941
942 /**
943  * Destroy a handle (free all state associated with
944  * it).
945  *
946  * @param h statistics handle to destroy
947  * @param sync_first set to GNUNET_YES if pending SET requests should
948  *        be completed
949  */
950 void
951 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
952 {
953   struct GNUNET_STATISTICS_GetHandle *pos;
954   struct GNUNET_STATISTICS_GetHandle *next;
955   struct GNUNET_TIME_Relative timeout;
956   int i;
957
958   if (NULL == h)
959     return;
960   GNUNET_assert (GNUNET_NO == h->do_destroy); // Don't call twice.
961   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
962   {
963     GNUNET_SCHEDULER_cancel (h->backoff_task);
964     h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
965   }
966   if (sync_first)
967   {
968     if (NULL != h->current)
969     {
970       if (ACTION_GET == h->current->type)
971       {
972         GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
973         h->th = NULL;
974         free_action_item (h->current);
975         h->current = NULL;
976       }
977     }
978     next = h->action_head; 
979     while (NULL != (pos = next))
980     {
981       next = pos->next;
982       if (ACTION_GET == pos->type)
983       {
984         GNUNET_CONTAINER_DLL_remove (h->action_head,
985                                      h->action_tail,
986                                      pos);
987         free_action_item (pos);
988       }
989     }
990     if ( (NULL == h->current) &&
991          (NULL != (h->current = h->action_head)) )
992       GNUNET_CONTAINER_DLL_remove (h->action_head,
993                                    h->action_tail,
994                                    h->current);
995     h->do_destroy = GNUNET_YES;
996     if ((NULL != h->current) && (NULL == h->th) &&
997         (NULL != h->client))
998     {
999       timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
1000       h->th =
1001         GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
1002                                              timeout, GNUNET_YES,
1003                                              &transmit_action, h);
1004       GNUNET_assert (NULL != h->th);
1005     }
1006     if (NULL != h->th)
1007       return; /* do not finish destruction just yet */
1008   }
1009   while (NULL != (pos = h->action_head))
1010   {
1011     GNUNET_CONTAINER_DLL_remove (h->action_head,
1012                                  h->action_tail,
1013                                  pos);
1014     free_action_item (pos);
1015   }
1016   do_disconnect (h);
1017   for (i = 0; i < h->watches_size; i++)
1018   {
1019     if (NULL == h->watches[i])
1020       continue; 
1021     GNUNET_free (h->watches[i]->subsystem);
1022     GNUNET_free (h->watches[i]->name);
1023     GNUNET_free (h->watches[i]);
1024   }
1025   GNUNET_array_grow (h->watches, h->watches_size, 0);
1026   GNUNET_free (h->subsystem);
1027   GNUNET_free (h);
1028 }
1029
1030
1031 /**
1032  * Function called to transmit TEST message to service to
1033  * confirm that the service has received all of our 'SET'
1034  * messages (during statistics disconnect/shutdown).
1035  *
1036  * @param cls the 'struct GNUNET_STATISTICS_Handle'
1037  * @param size how many bytes can we write to buf
1038  * @param buf where to write requests to the service
1039  * @return number of bytes written to buf
1040  */
1041 static size_t
1042 transmit_test_on_shutdown (void *cls,
1043                            size_t size,
1044                            void *buf)
1045 {
1046   struct GNUNET_STATISTICS_Handle *h = cls;
1047   struct GNUNET_MessageHeader hdr;
1048
1049   h->th = NULL;
1050   if (NULL == buf)
1051   {
1052     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1053                 _("Failed to receive acknowledgement from statistics service, some statistics might have been lost!\n"));
1054     h->do_destroy = GNUNET_NO;
1055     GNUNET_SCHEDULER_add_continuation (&destroy_task, h,
1056                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1057     return 0;
1058   }
1059   hdr.type = htons (GNUNET_MESSAGE_TYPE_TEST);
1060   hdr.size = htons (sizeof (struct GNUNET_MessageHeader));
1061   memcpy (buf, &hdr, sizeof (hdr));
1062   if (GNUNET_YES != h->receiving)
1063   {
1064     h->receiving = GNUNET_YES;
1065     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
1066                            GNUNET_TIME_UNIT_FOREVER_REL);
1067   }
1068   return sizeof (struct GNUNET_MessageHeader);
1069 }
1070
1071
1072 /**
1073  * Schedule the next action to be performed.
1074  *
1075  * @param h statistics handle
1076  */
1077 static void
1078 schedule_action (struct GNUNET_STATISTICS_Handle *h)
1079 {
1080   struct GNUNET_TIME_Relative timeout;
1081
1082   if ( (NULL != h->th) ||
1083        (GNUNET_SCHEDULER_NO_TASK != h->backoff_task) )
1084     return;                     /* action already pending */
1085   if (GNUNET_YES != try_connect (h))
1086   {
1087     reconnect_later (h);
1088     return;
1089   }
1090   if (NULL != h->current)
1091     return; /* action already pending */
1092   /* schedule next action */
1093   h->current = h->action_head;
1094   if (NULL == h->current)
1095   {
1096     if (GNUNET_YES == h->do_destroy)
1097     {
1098       h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */
1099       h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
1100                                                    sizeof (struct GNUNET_MessageHeader),
1101                                                    SET_TRANSMIT_TIMEOUT,
1102                                                    GNUNET_NO,
1103                                                    &transmit_test_on_shutdown, h);
1104     }
1105     return;
1106   }
1107   GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current);
1108   timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
1109   if (NULL ==
1110       (h->th =
1111        GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
1112                                             timeout, GNUNET_YES,
1113                                             &transmit_action, h)))
1114   {
1115     LOG (GNUNET_ERROR_TYPE_DEBUG,
1116          "Failed to transmit request to statistics service.\n");
1117     do_disconnect (h);
1118     reconnect_later (h);
1119   }
1120 }
1121
1122
1123 /**
1124  * Get statistic from the peer.
1125  *
1126  * @param handle identification of the statistics service
1127  * @param subsystem limit to the specified subsystem, NULL for our subsystem
1128  * @param name name of the statistic value, NULL for all values
1129  * @param timeout after how long should we give up (and call
1130  *        cont with an error code)?
1131  * @param cont continuation to call when done (can be NULL)
1132  *        This callback CANNOT destroy the statistics handle in the same call.
1133  * @param proc function to call on each value
1134  * @param cls closure for cont and proc
1135  * @return NULL on error
1136  */
1137 struct GNUNET_STATISTICS_GetHandle *
1138 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1139                        const char *subsystem, const char *name,
1140                        struct GNUNET_TIME_Relative timeout,
1141                        GNUNET_STATISTICS_Callback cont,
1142                        GNUNET_STATISTICS_Iterator proc, void *cls)
1143 {
1144   size_t slen1;
1145   size_t slen2;
1146   struct GNUNET_STATISTICS_GetHandle *ai;
1147
1148   if (NULL == handle)
1149     return NULL;
1150   GNUNET_assert (NULL != proc);
1151   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1152   if (NULL == subsystem)
1153     subsystem = "";
1154   if (NULL == name)
1155     name = "";
1156   slen1 = strlen (subsystem) + 1;
1157   slen2 = strlen (name) + 1;
1158   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
1159                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1160   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1161   ai->sh = handle;
1162   ai->subsystem = GNUNET_strdup (subsystem);
1163   ai->name = GNUNET_strdup (name);
1164   ai->cont = cont;
1165   ai->proc = proc;
1166   ai->cls = cls;
1167   ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1168   ai->type = ACTION_GET;
1169   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
1170   GNUNET_CONTAINER_DLL_insert_tail (handle->action_head, handle->action_tail,
1171                                     ai);
1172   schedule_action (handle);
1173   return ai;
1174 }
1175
1176
1177 /**
1178  * Cancel a 'get' request.  Must be called before the 'cont'
1179  * function is called.
1180  *
1181  * @param gh handle of the request to cancel
1182  */
1183 void
1184 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1185 {
1186   if (NULL == gh)
1187     return;
1188   if (gh->sh->current == gh)
1189   {
1190     gh->aborted = GNUNET_YES;
1191   }
1192   else
1193   {
1194     GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, gh->sh->action_tail, gh);
1195     GNUNET_free (gh->name);
1196     GNUNET_free (gh->subsystem);
1197     GNUNET_free (gh);
1198   }
1199 }
1200
1201
1202 /**
1203  * Watch statistics from the peer (be notified whenever they change).
1204  *
1205  * @param handle identification of the statistics service
1206  * @param subsystem limit to the specified subsystem, never NULL
1207  * @param name name of the statistic value, never NULL
1208  * @param proc function to call on each value
1209  * @param proc_cls closure for proc
1210  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1211  */
1212 int
1213 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1214                          const char *subsystem, const char *name,
1215                          GNUNET_STATISTICS_Iterator proc, void *proc_cls)
1216 {
1217   struct GNUNET_STATISTICS_WatchEntry *w;
1218
1219   if (NULL == handle)
1220     return GNUNET_SYSERR;
1221   w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
1222   w->subsystem = GNUNET_strdup (subsystem);
1223   w->name = GNUNET_strdup (name);
1224   w->proc = proc;
1225   w->proc_cls = proc_cls;
1226   GNUNET_array_append (handle->watches, handle->watches_size, w);
1227   schedule_watch_request (handle, w);
1228   return GNUNET_OK;
1229 }
1230
1231
1232 /**
1233  * Stop watching statistics from the peer.  
1234  *
1235  * @param handle identification of the statistics service
1236  * @param subsystem limit to the specified subsystem, never NULL
1237  * @param name name of the statistic value, never NULL
1238  * @param proc function to call on each value
1239  * @param proc_cls closure for proc
1240  * @return GNUNET_OK on success, GNUNET_SYSERR on error (no such watch)
1241  */
1242 int
1243 GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle,
1244                                 const char *subsystem, const char *name,
1245                                 GNUNET_STATISTICS_Iterator proc, void *proc_cls)
1246 {
1247   struct GNUNET_STATISTICS_WatchEntry *w;
1248   unsigned int i;
1249
1250   if (NULL == handle)
1251     return GNUNET_SYSERR;
1252   for (i=0;i<handle->watches_size;i++)
1253   {
1254     w = handle->watches[i];
1255     if (NULL == w)
1256       continue;
1257     if ( (w->proc == proc) &&
1258          (w->proc_cls == proc_cls) &&
1259          (0 == strcmp (w->name, name)) &&
1260          (0 == strcmp (w->subsystem, subsystem)) )
1261     {
1262       GNUNET_free (w->name);
1263       GNUNET_free (w->subsystem);
1264       GNUNET_free (w);
1265       handle->watches[i] = NULL;      
1266       return GNUNET_OK;
1267     }    
1268   }
1269   return GNUNET_SYSERR;
1270 }
1271
1272
1273
1274 /**
1275  * Queue a request to change a statistic.
1276  *
1277  * @param h statistics handle
1278  * @param name name of the value
1279  * @param make_persistent  should the value be kept across restarts?
1280  * @param value new value or change
1281  * @param type type of the action (ACTION_SET or ACTION_UPDATE)
1282  */
1283 static void
1284 add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
1285                    int make_persistent, uint64_t value, enum ActionType type)
1286 {
1287   struct GNUNET_STATISTICS_GetHandle *ai;
1288   size_t slen;
1289   size_t nlen;
1290   size_t nsize;
1291   int64_t delta;
1292
1293   GNUNET_assert (NULL != h);
1294   GNUNET_assert (NULL != name);
1295   slen = strlen (h->subsystem) + 1;
1296   nlen = strlen (name) + 1;
1297   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1298   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1299   {
1300     GNUNET_break (0);
1301     return;
1302   }
1303   for (ai = h->action_head; NULL != ai; ai = ai->next)
1304   {
1305     if (! ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
1306             (0 == strcmp (ai->name, name)) && 
1307             ( (ACTION_UPDATE == ai->type) ||
1308               (ACTION_SET == ai->type) ) ) )
1309       continue;
1310     if (ACTION_SET == ai->type)
1311     {
1312       if (ACTION_UPDATE == type)
1313       {
1314         delta = (int64_t) value;
1315         if (delta > 0)
1316         {
1317           /* update old set by new delta */
1318           ai->value += delta;
1319         }
1320         else
1321         {
1322           /* update old set by new delta, but never go negative */
1323           if (ai->value < -delta)
1324             ai->value = 0;
1325           else
1326             ai->value += delta;
1327         }
1328       }
1329       else
1330       {
1331         /* new set overrides old set */
1332         ai->value = value;
1333       }
1334     }
1335     else
1336     {
1337       if (ACTION_UPDATE == type)
1338       {
1339         /* make delta cummulative */
1340         delta = (int64_t) value;
1341         ai->value += delta;
1342       }
1343       else
1344       {
1345         /* drop old 'update', use new 'set' instead */
1346         ai->value = value;
1347         ai->type = type;
1348       }
1349     }
1350     ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1351     ai->make_persistent = make_persistent;
1352     return;  
1353   }
1354   /* no existing entry matches, create a fresh one */
1355   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1356   ai->sh = h;
1357   ai->subsystem = GNUNET_strdup (h->subsystem);
1358   ai->name = GNUNET_strdup (name);
1359   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1360   ai->make_persistent = make_persistent;
1361   ai->msize = nsize;
1362   ai->value = value;
1363   ai->type = type;
1364   GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
1365                                     ai);
1366   schedule_action (h);
1367 }
1368
1369
1370 /**
1371  * Set statistic value for the peer.  Will always use our
1372  * subsystem (the argument used when "handle" was created).
1373  *
1374  * @param handle identification of the statistics service
1375  * @param name name of the statistic value
1376  * @param value new value to set
1377  * @param make_persistent should the value be kept across restarts?
1378  */
1379 void
1380 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1381                        const char *name, uint64_t value, int make_persistent)
1382 {
1383   if (NULL == handle)
1384     return;
1385   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1386   add_setter_action (handle, name, make_persistent, value, ACTION_SET);
1387 }
1388
1389
1390 /**
1391  * Set statistic value for the peer.  Will always use our
1392  * subsystem (the argument used when "handle" was created).
1393  *
1394  * @param handle identification of the statistics service
1395  * @param name name of the statistic value
1396  * @param delta change in value (added to existing value)
1397  * @param make_persistent should the value be kept across restarts?
1398  */
1399 void
1400 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1401                           const char *name, int64_t delta, int make_persistent)
1402 {
1403   if (NULL == handle)
1404     return;
1405   if (0 == delta)
1406     return;
1407   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1408   add_setter_action (handle, name, make_persistent, (uint64_t) delta,
1409                      ACTION_UPDATE);
1410 }
1411
1412
1413 /* end of statistics_api.c */