e117e971c00f3f8cc688dba8940553bfef58193a
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_client_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_server_lib.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_strings_lib.h"
34 #include "statistics.h"
35
36 /**
37  * How long do we wait until a statistics request for setting
38  * a value times out?  (The update will be lost if the
39  * service does not react within this timeframe).  
40  */
41 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
42
43
44 /**
45  * Types of actions.
46  */
47 enum ActionType
48 {
49   ACTION_GET,
50   ACTION_SET,
51   ACTION_UPDATE,
52   ACTION_WATCH
53 };
54
55
56 /**
57  * Entry kept for each value we are watching.
58  */
59 struct GNUNET_STATISTICS_WatchEntry
60 {
61
62   /**
63    * What subsystem is this action about? (never NULL)
64    */
65   char *subsystem;
66
67   /**
68    * What value is this action about? (never NULL)
69    */
70   char *name;
71
72   /**
73    * Function to call
74    */
75   GNUNET_STATISTICS_Iterator proc;
76
77   /**
78    * Closure for proc
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  * Linked list of things we still need to do.
87  */
88 struct GNUNET_STATISTICS_GetHandle
89 {
90
91   /**
92    * This is a doubly linked list.
93    */
94   struct GNUNET_STATISTICS_GetHandle *next;
95
96   /**
97    * This is a doubly linked list.
98    */
99   struct GNUNET_STATISTICS_GetHandle *prev;
100
101   /**
102    * Main statistics handle.
103    */
104   struct GNUNET_STATISTICS_Handle *sh;
105
106   /**
107    * What subsystem is this action about? (can be NULL)
108    */
109   char *subsystem;
110
111   /**
112    * What value is this action about? (can be NULL)
113    */
114   char *name;
115
116   /**
117    * Continuation to call once action is complete.
118    */
119   GNUNET_STATISTICS_Callback cont;
120
121   /**
122    * Function to call (for GET actions only).
123    */
124   GNUNET_STATISTICS_Iterator proc;
125
126   /**
127    * Closure for proc and cont.
128    */
129   void *cls;
130
131   /**
132    * Timeout for this action.
133    */
134   struct GNUNET_TIME_Absolute timeout;
135
136   /**
137    * Associated value.
138    */
139   uint64_t value;
140
141   /**
142    * Flag for SET/UPDATE actions.
143    */
144   int make_persistent;
145
146   /**
147    * Has the current iteration been aborted; for GET actions.
148    */
149   int aborted;
150
151   /**
152    * Is this a GET, SET, UPDATE or WATCH?
153    */
154   enum ActionType type;
155
156   /**
157    * Size of the message that we will be transmitting.
158    */
159   uint16_t msize;
160
161 };
162
163
164 /**
165  * Handle for the service.
166  */
167 struct GNUNET_STATISTICS_Handle
168 {
169   /**
170    * Name of our subsystem.
171    */
172   char *subsystem;
173
174   /**
175    * Configuration to use.
176    */
177   const struct GNUNET_CONFIGURATION_Handle *cfg;
178
179   /**
180    * Socket (if available).
181    */
182   struct GNUNET_CLIENT_Connection *client;
183
184   /**
185    * Currently pending transmission request.
186    */
187   struct GNUNET_CLIENT_TransmitHandle *th;
188
189   /**
190    * Head of the linked list of pending actions (first action
191    * to be performed).
192    */
193   struct GNUNET_STATISTICS_GetHandle *action_head;
194
195   /**
196    * Tail of the linked list of actions (for fast append).
197    */
198   struct GNUNET_STATISTICS_GetHandle *action_tail;
199
200   /**
201    * Action we are currently busy with (action request has been
202    * transmitted, we're now receiving the response from the
203    * service).
204    */
205   struct GNUNET_STATISTICS_GetHandle *current;
206
207   /**
208    * Array of watch entries.
209    */
210   struct GNUNET_STATISTICS_WatchEntry **watches;
211
212   /**
213    * Task doing exponential back-off trying to reconnect.
214    */
215   GNUNET_SCHEDULER_TaskIdentifier backoff_task;
216
217   /**
218    * Time for next connect retry.
219    */
220   struct GNUNET_TIME_Relative backoff;
221
222   /**
223    * Size of the 'watches' array.
224    */
225   unsigned int watches_size;
226
227   /**
228    * Should this handle auto-destruct once all actions have
229    * been processed?
230    */
231   int do_destroy;
232
233   /**
234    * Are we currently receiving from the service?
235    */
236   int receiving;
237
238 };
239
240
241
242 /**
243  * Schedule the next action to be performed.
244  */
245 static void
246 schedule_action (struct GNUNET_STATISTICS_Handle *h);
247
248 /**
249  * Try to (re)connect to the statistics service.
250  *
251  * @return GNUNET_YES on success, GNUNET_NO on failure.
252  */
253 static int
254 try_connect (struct GNUNET_STATISTICS_Handle *ret);
255
256
257 static void
258 insert_ai (struct GNUNET_STATISTICS_Handle *h,
259            struct GNUNET_STATISTICS_GetHandle *ai)
260 {
261   GNUNET_CONTAINER_DLL_insert_after (h->action_head, h->action_tail,
262                                      h->action_tail, ai);
263   if (h->action_head == ai)
264     schedule_action (h);
265 }
266
267
268 static void
269 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
270                         struct GNUNET_STATISTICS_WatchEntry *watch)
271 {
272
273   struct GNUNET_STATISTICS_GetHandle *ai;
274   size_t slen;
275   size_t nlen;
276   size_t nsize;
277
278   GNUNET_assert (h != NULL);
279   if (GNUNET_YES != try_connect (h))
280   {
281     schedule_action (h);
282     return;
283   }
284   slen = strlen (watch->subsystem) + 1;
285   nlen = strlen (watch->name) + 1;
286   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
287   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
288   {
289     GNUNET_break (0);
290     return;
291   }
292   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
293   ai->sh = h;
294   ai->subsystem = GNUNET_strdup (watch->subsystem);
295   ai->name = GNUNET_strdup (watch->name);
296   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
297   ai->msize = nsize;
298   ai->type = ACTION_WATCH;
299   ai->proc = watch->proc;
300   ai->cls = watch->proc_cls;
301   insert_ai (h, ai);
302 }
303
304
305 /**
306  * Try to (re)connect to the statistics service.
307  *
308  * @return GNUNET_YES on success, GNUNET_NO on failure.
309  */
310 static int
311 try_connect (struct GNUNET_STATISTICS_Handle *ret)
312 {
313   unsigned int i;
314
315   if (ret->client != NULL)
316     return GNUNET_YES;
317   ret->client = GNUNET_CLIENT_connect ("statistics", ret->cfg);
318   if (ret->client != NULL)
319   {
320     for (i = 0; i < ret->watches_size; i++)
321       schedule_watch_request (ret, ret->watches[i]);
322     return GNUNET_YES;
323   }
324 #if DEBUG_STATISTICS
325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
326               _("Failed to connect to statistics service!\n"));
327 #endif
328   return GNUNET_NO;
329 }
330
331
332 /**
333  * Free memory associated with the given action item.
334  */
335 static void
336 free_action_item (struct GNUNET_STATISTICS_GetHandle *ai)
337 {
338   GNUNET_free_non_null (ai->subsystem);
339   GNUNET_free_non_null (ai->name);
340   GNUNET_free (ai);
341 }
342
343
344 /**
345  * GET processing is complete, tell client about it.
346  */
347 static void
348 finish (struct GNUNET_STATISTICS_Handle *h, int code)
349 {
350   struct GNUNET_STATISTICS_GetHandle *pos = h->current;
351
352   h->current = NULL;
353   schedule_action (h);
354   if (pos != NULL)
355   {
356     if (pos->cont != NULL)
357       pos->cont (pos->cls, code);
358     free_action_item (pos);
359   }
360 }
361
362
363 /**
364  * Process the message.
365  *
366  * @return GNUNET_OK if the message was well-formed
367  */
368 static int
369 process_message (struct GNUNET_STATISTICS_Handle *h,
370                  const struct GNUNET_MessageHeader *msg)
371 {
372   char *service;
373   char *name;
374   const struct GNUNET_STATISTICS_ReplyMessage *smsg;
375   uint16_t size;
376
377   if (h->current->aborted)
378   {
379 #if DEBUG_STATISTICS
380     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
381                 "Iteration was aborted, ignoring VALUE\n");
382 #endif
383     return GNUNET_OK;           /* don't bother */
384   }
385   size = ntohs (msg->size);
386   if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
387   {
388     GNUNET_break (0);
389     return GNUNET_SYSERR;
390   }
391   smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
392   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
393   if (size !=
394       GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], size, 2,
395                                       &service, &name))
396   {
397     GNUNET_break (0);
398     return GNUNET_SYSERR;
399   }
400 #if DEBUG_STATISTICS
401   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
402               "Received valid statistic on `%s:%s': %llu\n", service, name,
403               GNUNET_ntohll (smsg->value));
404 #endif
405   if (GNUNET_OK !=
406       h->current->proc (h->current->cls, service, name,
407                         GNUNET_ntohll (smsg->value),
408                         0 !=
409                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
410   {
411 #if DEBUG_STATISTICS
412     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413                 "Processing of remaining statistics aborted by client.\n");
414 #endif
415     h->current->aborted = GNUNET_YES;
416   }
417 #if DEBUG_STATISTICS
418   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "VALUE processed successfully\n");
419 #endif
420   return GNUNET_OK;
421 }
422
423
424 static int
425 process_watch_value (struct GNUNET_STATISTICS_Handle *h,
426                      const struct GNUNET_MessageHeader *msg)
427 {
428   const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
429   struct GNUNET_STATISTICS_WatchEntry *w;
430   uint32_t wid;
431
432   if (sizeof (struct GNUNET_STATISTICS_WatchValueMessage) != ntohs (msg->size))
433   {
434     GNUNET_break (0);
435     return GNUNET_SYSERR;
436   }
437   wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *) msg;
438   GNUNET_break (0 == ntohl (wvm->reserved));
439   wid = ntohl (wvm->wid);
440   if (wid >= h->watches_size)
441   {
442     GNUNET_break (0);
443     return GNUNET_SYSERR;
444   }
445   w = h->watches[wid];
446   (void) w->proc (w->proc_cls, w->subsystem, w->name,
447                   GNUNET_ntohll (wvm->value),
448                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
449   return GNUNET_OK;
450 }
451
452
453 /**
454  * Function called with messages from stats service.
455  *
456  * @param cls closure
457  * @param msg message received, NULL on timeout or fatal error
458  */
459 static void
460 receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
461 {
462   struct GNUNET_STATISTICS_Handle *h = cls;
463
464   if (msg == NULL)
465   {
466     if (NULL != h->client)
467     {
468       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
469       h->client = NULL;
470     }
471 #if DEBUG_STATISTICS
472     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
473                 "Error receiving statistics from service, is the service running?\n");
474 #endif
475     finish (h, GNUNET_SYSERR);
476     return;
477   }
478   switch (ntohs (msg->type))
479   {
480   case GNUNET_MESSAGE_TYPE_STATISTICS_END:
481 #if DEBUG_STATISTICS
482     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received end of statistics marker\n");
483 #endif
484     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
485     if (h->watches_size > 0)
486     {
487       GNUNET_CLIENT_receive (h->client, &receive_stats, h,
488                              GNUNET_TIME_UNIT_FOREVER_REL);
489     }
490     else
491     {
492       h->receiving = GNUNET_NO;
493     }
494     finish (h, GNUNET_OK);
495     return;
496   case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
497     if (GNUNET_OK == process_message (h, msg))
498     {
499       /* finally, look for more! */
500 #if DEBUG_STATISTICS
501       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
502                   "Processing VALUE done, now reading more\n");
503 #endif
504       GNUNET_CLIENT_receive (h->client, &receive_stats, h,
505                              GNUNET_TIME_absolute_get_remaining (h->
506                                                                  current->timeout));
507       h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
508       return;
509     }
510     GNUNET_break (0);
511     break;
512   case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
513     if (GNUNET_OK == process_watch_value (h, msg))
514     {
515       h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
516       GNUNET_assert (h->watches_size > 0);
517       GNUNET_CLIENT_receive (h->client, &receive_stats, h,
518                              GNUNET_TIME_UNIT_FOREVER_REL);
519       return;
520     }
521     GNUNET_break (0);
522     break;
523   default:
524     GNUNET_break (0);
525     break;
526   }
527   if (NULL != h->client)
528   {
529     GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
530     h->client = NULL;
531   }
532   finish (h, GNUNET_SYSERR);
533 }
534
535
536 /**
537  * Transmit a GET request (and if successful, start to receive
538  * the response).
539  */
540 static size_t
541 transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
542 {
543   struct GNUNET_MessageHeader *hdr;
544   size_t slen1;
545   size_t slen2;
546   uint16_t msize;
547
548   if (buf == NULL)
549   {
550     /* timeout / error */
551 #if DEBUG_STATISTICS
552     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553                 "Transmission of request for statistics failed!\n");
554 #endif
555     finish (handle, GNUNET_SYSERR);
556     return 0;
557   }
558   slen1 = strlen (handle->current->subsystem) + 1;
559   slen2 = strlen (handle->current->name) + 1;
560   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
561   GNUNET_assert (msize <= size);
562   hdr = (struct GNUNET_MessageHeader *) buf;
563   hdr->size = htons (msize);
564   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
565   GNUNET_assert (slen1 + slen2 ==
566                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
567                                              handle->current->subsystem,
568                                              handle->current->name));
569   if (!handle->receiving)
570   {
571 #if DEBUG_STATISTICS
572     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
573                 "Transmission of GET done, now reading response\n");
574 #endif
575     handle->receiving = GNUNET_YES;
576     GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
577                            GNUNET_TIME_absolute_get_remaining (handle->
578                                                                current->timeout));
579   }
580   return msize;
581 }
582
583
584 /**
585  * Transmit a WATCH request (and if successful, start to receive
586  * the response).
587  */
588 static size_t
589 transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
590 {
591   struct GNUNET_MessageHeader *hdr;
592   size_t slen1;
593   size_t slen2;
594   uint16_t msize;
595
596   if (buf == NULL)
597   {
598     /* timeout / error */
599 #if DEBUG_STATISTICS
600     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
601                 "Transmission of request for statistics failed!\n");
602 #endif
603     finish (handle, GNUNET_SYSERR);
604     return 0;
605   }
606 #if DEBUG_STATISTICS
607   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting watch request for `%s'\n",
608               handle->current->name);
609 #endif
610   slen1 = strlen (handle->current->subsystem) + 1;
611   slen2 = strlen (handle->current->name) + 1;
612   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
613   GNUNET_assert (msize <= size);
614   hdr = (struct GNUNET_MessageHeader *) buf;
615   hdr->size = htons (msize);
616   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
617   GNUNET_assert (slen1 + slen2 ==
618                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
619                                              handle->current->subsystem,
620                                              handle->current->name));
621   if (GNUNET_YES != handle->receiving)
622   {
623     handle->receiving = GNUNET_YES;
624     GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
625                            GNUNET_TIME_UNIT_FOREVER_REL);
626   }
627   finish (handle, GNUNET_OK);
628   return msize;
629 }
630
631
632 /**
633  * Transmit a SET/UPDATE request.
634  */
635 static size_t
636 transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
637 {
638   struct GNUNET_STATISTICS_SetMessage *r;
639   size_t slen;
640   size_t nlen;
641   size_t nsize;
642
643   if (NULL == buf)
644   {
645     finish (handle, GNUNET_SYSERR);
646     return 0;
647   }
648
649   slen = strlen (handle->current->subsystem) + 1;
650   nlen = strlen (handle->current->name) + 1;
651   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
652   if (size < nsize)
653   {
654     GNUNET_break (0);
655     finish (handle, GNUNET_SYSERR);
656     return 0;
657   }
658   r = buf;
659   r->header.size = htons (nsize);
660   r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
661   r->flags = 0;
662   r->value = GNUNET_htonll (handle->current->value);
663   if (handle->current->make_persistent)
664     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
665   if (handle->current->type == ACTION_UPDATE)
666     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
667   GNUNET_assert (slen + nlen ==
668                  GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2,
669                                              handle->current->subsystem,
670                                              handle->current->name));
671   finish (handle, GNUNET_OK);
672   return nsize;
673 }
674
675
676 static size_t
677 transmit_action (void *cls, size_t size, void *buf)
678 {
679   struct GNUNET_STATISTICS_Handle *handle = cls;
680   size_t ret;
681
682   handle->th = NULL;
683   switch (handle->current->type)
684   {
685   case ACTION_GET:
686     ret = transmit_get (handle, size, buf);
687     break;
688   case ACTION_SET:
689   case ACTION_UPDATE:
690     ret = transmit_set (handle, size, buf);
691     break;
692   case ACTION_WATCH:
693     ret = transmit_watch (handle, size, buf);
694     break;
695   default:
696     ret = 0;
697     GNUNET_break (0);
698     break;
699   }
700   return ret;
701 }
702
703
704 /**
705  * Get handle for the statistics service.
706  *
707  * @param subsystem name of subsystem using the service
708  * @param cfg services configuration in use
709  * @return handle to use
710  */
711 struct GNUNET_STATISTICS_Handle *
712 GNUNET_STATISTICS_create (const char *subsystem,
713                           const struct GNUNET_CONFIGURATION_Handle *cfg)
714 {
715   struct GNUNET_STATISTICS_Handle *ret;
716
717   GNUNET_assert (subsystem != NULL);
718   GNUNET_assert (cfg != NULL);
719   ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
720   ret->cfg = cfg;
721   ret->subsystem = GNUNET_strdup (subsystem);
722   ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
723   if (GNUNET_YES != try_connect (ret))
724   {
725     GNUNET_free (ret->subsystem);
726     GNUNET_free (ret);
727     return NULL;
728   }
729   return ret;
730 }
731
732
733 /**
734  * Destroy a handle (free all state associated with
735  * it).
736  *
737  * @param h statistics handle to destroy
738  * @param sync_first set to GNUNET_YES if pending SET requests should
739  *        be completed
740  */
741 void
742 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
743 {
744   struct GNUNET_STATISTICS_GetHandle *pos;
745   struct GNUNET_STATISTICS_GetHandle *next;
746   struct GNUNET_STATISTICS_GetHandle *prev;
747   struct GNUNET_TIME_Relative timeout;
748   int i;
749
750   if (h == NULL)
751     return;
752   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
753     GNUNET_SCHEDULER_cancel (h->backoff_task);
754   if (sync_first)
755   {
756     if (h->current != NULL)
757     {
758       if (h->current->type == ACTION_GET)
759       {
760         GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
761         h->th = NULL;
762         free_action_item (h->current);
763         h->current = NULL;
764       }
765     }
766     pos = h->action_head;
767     prev = NULL;
768     while (pos != NULL)
769     {
770       next = pos->next;
771       if (pos->type == ACTION_GET)
772       {
773         if (prev == NULL)
774           h->action_head = next;
775         else
776           prev->next = next;
777         free_action_item (pos);
778       }
779       else
780       {
781         prev = pos;
782       }
783       pos = next;
784     }
785     h->action_tail = prev;
786     if (h->current == NULL)
787     {
788       h->current = h->action_head;
789       if (h->action_head != NULL)
790       {
791         h->action_head = h->action_head->next;
792         if (h->action_head == NULL)
793           h->action_tail = NULL;
794       }
795     }
796     h->do_destroy = GNUNET_YES;
797     if ((h->current != NULL) && (h->th == NULL))
798     {
799       timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
800       h->th =
801           GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
802                                                timeout, GNUNET_YES,
803                                                &transmit_action, h);
804       GNUNET_assert (NULL != h->th);
805     }
806     if (h->th != NULL)
807       return;
808   }
809   if (NULL != h->th)
810   {
811     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
812     h->th = NULL;
813   }
814   if (h->current != NULL)
815     free_action_item (h->current);
816   while (NULL != (pos = h->action_head))
817   {
818     h->action_head = pos->next;
819     free_action_item (pos);
820   }
821   if (h->client != NULL)
822   {
823     GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
824     h->client = NULL;
825   }
826   for (i = 0; i < h->watches_size; i++)
827   {
828     GNUNET_free (h->watches[i]->subsystem);
829     GNUNET_free (h->watches[i]->name);
830     GNUNET_free (h->watches[i]);
831   }
832   GNUNET_array_grow (h->watches, h->watches_size, 0);
833   GNUNET_free (h->subsystem);
834   GNUNET_free (h);
835 }
836
837
838 static void
839 finish_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
840 {
841   struct GNUNET_STATISTICS_Handle *h = cls;
842
843   h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
844   finish (h, GNUNET_SYSERR);
845 }
846
847
848 /**
849  * Schedule the next action to be performed.
850  */
851 static void
852 schedule_action (struct GNUNET_STATISTICS_Handle *h)
853 {
854   struct GNUNET_TIME_Relative timeout;
855
856   if (h->current != NULL)
857     return;                     /* action already pending */
858   if (GNUNET_YES != try_connect (h))
859   {
860     h->backoff_task =
861         GNUNET_SCHEDULER_add_delayed (h->backoff, &finish_task, h);
862     h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2);
863     h->backoff =
864         GNUNET_TIME_relative_min (h->backoff, GNUNET_CONSTANTS_SERVICE_TIMEOUT);
865     return;
866   }
867
868   /* schedule next action */
869   h->current = h->action_head;
870   if (NULL == h->current)
871   {
872     if (h->do_destroy)
873     {
874       h->do_destroy = GNUNET_NO;
875       GNUNET_STATISTICS_destroy (h, GNUNET_YES);
876     }
877     return;
878   }
879   GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current);
880   timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
881   if (NULL ==
882       (h->th =
883        GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
884                                             timeout, GNUNET_YES,
885                                             &transmit_action, h)))
886   {
887 #if DEBUG_STATISTICS
888     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
889                 "Failed to transmit request to statistics service.\n");
890 #endif
891     finish (h, GNUNET_SYSERR);
892   }
893 }
894
895
896 /**
897  * Get statistic from the peer.
898  *
899  * @param handle identification of the statistics service
900  * @param subsystem limit to the specified subsystem, NULL for our subsystem
901  * @param name name of the statistic value, NULL for all values
902  * @param timeout after how long should we give up (and call
903  *        cont with an error code)?
904  * @param cont continuation to call when done (can be NULL)
905  * @param proc function to call on each value
906  * @param cls closure for cont and proc
907  * @return NULL on error
908  */
909 struct GNUNET_STATISTICS_GetHandle *
910 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
911                        const char *subsystem, const char *name,
912                        struct GNUNET_TIME_Relative timeout,
913                        GNUNET_STATISTICS_Callback cont,
914                        GNUNET_STATISTICS_Iterator proc, void *cls)
915 {
916   size_t slen1;
917   size_t slen2;
918   struct GNUNET_STATISTICS_GetHandle *ai;
919
920   GNUNET_assert (handle != NULL);
921   GNUNET_assert (proc != NULL);
922   GNUNET_assert (GNUNET_NO == handle->do_destroy);
923   if (GNUNET_YES != try_connect (handle))
924   {
925 #if DEBUG_STATISTICS
926     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
927                 "Failed to connect to statistics service, can not get value `%s:%s'.\n",
928                 strlen (subsystem) ? subsystem : "*",
929                 strlen (name) ? name : "*");
930 #endif
931     return NULL;
932   }
933   if (subsystem == NULL)
934     subsystem = "";
935   if (name == NULL)
936     name = "";
937   slen1 = strlen (subsystem) + 1;
938   slen2 = strlen (name) + 1;
939   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
940                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
941   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
942   ai->sh = handle;
943   ai->subsystem = GNUNET_strdup (subsystem);
944   ai->name = GNUNET_strdup (name);
945   ai->cont = cont;
946   ai->proc = proc;
947   ai->cls = cls;
948   ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
949   ai->type = ACTION_GET;
950   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
951   insert_ai (handle, ai);
952   return ai;
953 }
954
955
956 /**
957  * Cancel a 'get' request.  Must be called before the 'cont' 
958  * function is called.
959  *
960  * @param gh handle of the request to cancel
961  */
962 void
963 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
964 {
965   if (gh->sh->current == gh)
966   {
967     gh->aborted = GNUNET_YES;
968   }
969   else
970   {
971     GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, gh->sh->action_tail, gh);
972     GNUNET_free (gh->name);
973     GNUNET_free (gh->subsystem);
974     GNUNET_free (gh);
975   }
976 }
977
978
979 /**
980  * Watch statistics from the peer (be notified whenever they change).
981  * Note that the only way to cancel a "watch" request is to destroy
982  * the statistics handle given as the first argument to this call.
983  *
984  * @param handle identification of the statistics service
985  * @param subsystem limit to the specified subsystem, never NULL
986  * @param name name of the statistic value, never NULL
987  * @param proc function to call on each value
988  * @param proc_cls closure for proc
989  * @return GNUNET_OK on success, GNUNET_SYSERR on error
990  */
991 int
992 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
993                          const char *subsystem, const char *name,
994                          GNUNET_STATISTICS_Iterator proc, void *proc_cls)
995 {
996   struct GNUNET_STATISTICS_WatchEntry *w;
997
998   if (handle == NULL)
999     return GNUNET_SYSERR;
1000   w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
1001   w->subsystem = GNUNET_strdup (subsystem);
1002   w->name = GNUNET_strdup (name);
1003   w->proc = proc;
1004   w->proc_cls = proc_cls;
1005   GNUNET_array_append (handle->watches, handle->watches_size, w);
1006   schedule_watch_request (handle, w);
1007   return GNUNET_OK;
1008 }
1009
1010
1011 static void
1012 add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
1013                    int make_persistent, uint64_t value, enum ActionType type)
1014 {
1015   struct GNUNET_STATISTICS_GetHandle *ai;
1016   size_t slen;
1017   size_t nlen;
1018   size_t nsize;
1019   int64_t delta;
1020
1021   GNUNET_assert (h != NULL);
1022   GNUNET_assert (name != NULL);
1023   if (GNUNET_YES != try_connect (h))
1024     return;
1025   slen = strlen (h->subsystem) + 1;
1026   nlen = strlen (name) + 1;
1027   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1028   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1029   {
1030     GNUNET_break (0);
1031     return;
1032   }
1033   ai = h->action_head;
1034   while (ai != NULL)
1035   {
1036     if ((0 == strcmp (ai->subsystem, h->subsystem)) &&
1037         (0 == strcmp (ai->name, name)) && ((ai->type == ACTION_UPDATE) ||
1038                                            (ai->type == ACTION_SET)))
1039     {
1040       if (ai->type == ACTION_SET)
1041       {
1042         if (type == ACTION_UPDATE)
1043         {
1044           delta = (int64_t) value;
1045           if (delta > 0)
1046           {
1047             ai->value += delta;
1048           }
1049           else
1050           {
1051             if (ai->value < -delta)
1052               ai->value = 0;
1053             else
1054               ai->value += delta;
1055           }
1056         }
1057         else
1058         {
1059           ai->value = value;
1060         }
1061       }
1062       else
1063       {
1064         if (type == ACTION_UPDATE)
1065         {
1066           delta = (int64_t) value;
1067           ai->value += delta;
1068         }
1069         else
1070         {
1071           ai->value = value;
1072           ai->type = type;
1073         }
1074       }
1075       ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1076       ai->make_persistent = make_persistent;
1077       return;
1078     }
1079     ai = ai->next;
1080   }
1081   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1082   ai->sh = h;
1083   ai->subsystem = GNUNET_strdup (h->subsystem);
1084   ai->name = GNUNET_strdup (name);
1085   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1086   ai->make_persistent = make_persistent;
1087   ai->msize = nsize;
1088   ai->value = value;
1089   ai->type = type;
1090   insert_ai (h, ai);
1091 }
1092
1093
1094 /**
1095  * Set statistic value for the peer.  Will always use our
1096  * subsystem (the argument used when "handle" was created).
1097  *
1098  * @param handle identification of the statistics service
1099  * @param name name of the statistic value
1100  * @param value new value to set
1101  * @param make_persistent should the value be kept across restarts?
1102  */
1103 void
1104 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1105                        const char *name, uint64_t value, int make_persistent)
1106 {
1107   if (handle == NULL)
1108     return;
1109   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1110   add_setter_action (handle, name, make_persistent, value, ACTION_SET);
1111 }
1112
1113
1114 /**
1115  * Set statistic value for the peer.  Will always use our
1116  * subsystem (the argument used when "handle" was created).
1117  *
1118  * @param handle identification of the statistics service
1119  * @param name name of the statistic value
1120  * @param delta change in value (added to existing value)
1121  * @param make_persistent should the value be kept across restarts?
1122  */
1123 void
1124 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1125                           const char *name, int64_t delta, int make_persistent)
1126 {
1127   if (handle == NULL)
1128     return;
1129   if (delta == 0)
1130     return;
1131   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1132   add_setter_action (handle, name, make_persistent, (uint64_t) delta,
1133                      ACTION_UPDATE);
1134 }
1135
1136
1137 /* end of statistics_api.c */