a76f83db1dc3f8c9bd4ef27a9b7ffe02e6b51909
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_client_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_server_lib.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_strings_lib.h"
34 #include "statistics.h"
35
36 /**
37  * How long do we wait until a statistics request for setting
38  * a value times out?  (The update will be lost if the
39  * service does not react within this timeframe).  
40  */
41 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
42
43
44 /**
45  * Types of actions.
46  */
47 enum ActionType
48 {
49   ACTION_GET,
50   ACTION_SET,
51   ACTION_UPDATE,
52   ACTION_WATCH
53 };
54
55
56 /**
57  * Entry kept for each value we are watching.
58  */
59 struct GNUNET_STATISTICS_WatchEntry
60 {
61
62   /**
63    * What subsystem is this action about? (never NULL)
64    */
65   char *subsystem;
66
67   /**
68    * What value is this action about? (never NULL)
69    */
70   char *name;
71
72   /**
73    * Function to call
74    */
75   GNUNET_STATISTICS_Iterator proc;
76
77   /**
78    * Closure for proc
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  * Linked list of things we still need to do.
87  */
88 struct GNUNET_STATISTICS_GetHandle
89 {
90
91   /**
92    * This is a doubly linked list.
93    */
94   struct GNUNET_STATISTICS_GetHandle *next;
95
96   /**
97    * This is a doubly linked list.
98    */
99   struct GNUNET_STATISTICS_GetHandle *prev;
100
101   /**
102    * Main statistics handle.
103    */
104   struct GNUNET_STATISTICS_Handle *sh;
105
106   /**
107    * What subsystem is this action about? (can be NULL)
108    */
109   char *subsystem;
110
111   /**
112    * What value is this action about? (can be NULL)
113    */
114   char *name;
115
116   /**
117    * Continuation to call once action is complete.
118    */
119   GNUNET_STATISTICS_Callback cont;
120
121   /**
122    * Function to call (for GET actions only).
123    */
124   GNUNET_STATISTICS_Iterator proc;
125
126   /**
127    * Closure for proc and cont.
128    */
129   void *cls;
130
131   /**
132    * Timeout for this action.
133    */
134   struct GNUNET_TIME_Absolute timeout;
135
136   /**
137    * Associated value.
138    */
139   uint64_t value;
140
141   /**
142    * Flag for SET/UPDATE actions.
143    */
144   int make_persistent;
145
146   /**
147    * Has the current iteration been aborted; for GET actions.
148    */
149   int aborted;
150
151   /**
152    * Is this a GET, SET, UPDATE or WATCH?
153    */
154   enum ActionType type;
155
156   /**
157    * Size of the message that we will be transmitting.
158    */
159   uint16_t msize;
160
161 };
162
163
164 /**
165  * Handle for the service.
166  */
167 struct GNUNET_STATISTICS_Handle
168 {
169   /**
170    * Name of our subsystem.
171    */
172   char *subsystem;
173
174   /**
175    * Configuration to use.
176    */
177   const struct GNUNET_CONFIGURATION_Handle *cfg;
178
179   /**
180    * Socket (if available).
181    */
182   struct GNUNET_CLIENT_Connection *client;
183
184   /**
185    * Currently pending transmission request.
186    */
187   struct GNUNET_CLIENT_TransmitHandle *th;
188
189   /**
190    * Head of the linked list of pending actions (first action
191    * to be performed).
192    */
193   struct GNUNET_STATISTICS_GetHandle *action_head;
194
195   /**
196    * Tail of the linked list of actions (for fast append).
197    */
198   struct GNUNET_STATISTICS_GetHandle *action_tail;
199
200   /**
201    * Action we are currently busy with (action request has been
202    * transmitted, we're now receiving the response from the
203    * service).
204    */
205   struct GNUNET_STATISTICS_GetHandle *current;
206
207   /**
208    * Array of watch entries.
209    */
210   struct GNUNET_STATISTICS_WatchEntry **watches;
211
212   /**
213    * Task doing exponential back-off trying to reconnect.
214    */
215   GNUNET_SCHEDULER_TaskIdentifier backoff_task;
216
217   /**
218    * Time for next connect retry.
219    */
220   struct GNUNET_TIME_Relative backoff;
221
222   /**
223    * Size of the 'watches' array.
224    */
225   unsigned int watches_size;
226
227   /**
228    * Should this handle auto-destruct once all actions have
229    * been processed?
230    */
231   int do_destroy;
232
233   /**
234    * Are we currently receiving from the service?
235    */
236   int receiving;
237
238 };
239
240
241
242 /**
243  * Schedule the next action to be performed.
244  */
245 static void schedule_action (struct GNUNET_STATISTICS_Handle *h);
246
247 /**
248  * Try to (re)connect to the statistics service.
249  *
250  * @return GNUNET_YES on success, GNUNET_NO on failure.
251  */
252 static int try_connect (struct GNUNET_STATISTICS_Handle *ret);
253
254
255 static void
256 insert_ai (struct GNUNET_STATISTICS_Handle *h,
257            struct GNUNET_STATISTICS_GetHandle *ai)
258 {
259   GNUNET_CONTAINER_DLL_insert_after (h->action_head,
260                                      h->action_tail, h->action_tail, ai);
261   if (h->action_head == ai)
262     schedule_action (h);
263 }
264
265
266 static void
267 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
268                         struct GNUNET_STATISTICS_WatchEntry *watch)
269 {
270
271   struct GNUNET_STATISTICS_GetHandle *ai;
272   size_t slen;
273   size_t nlen;
274   size_t nsize;
275
276   GNUNET_assert (h != NULL);
277   if (GNUNET_YES != try_connect (h))
278   {
279     schedule_action (h);
280     return;
281   }
282   slen = strlen (watch->subsystem) + 1;
283   nlen = strlen (watch->name) + 1;
284   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
285   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
286   {
287     GNUNET_break (0);
288     return;
289   }
290   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
291   ai->sh = h;
292   ai->subsystem = GNUNET_strdup (watch->subsystem);
293   ai->name = GNUNET_strdup (watch->name);
294   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
295   ai->msize = nsize;
296   ai->type = ACTION_WATCH;
297   ai->proc = watch->proc;
298   ai->cls = watch->proc_cls;
299   insert_ai (h, ai);
300 }
301
302
303 /**
304  * Try to (re)connect to the statistics service.
305  *
306  * @return GNUNET_YES on success, GNUNET_NO on failure.
307  */
308 static int
309 try_connect (struct GNUNET_STATISTICS_Handle *ret)
310 {
311   unsigned int i;
312
313   if (ret->client != NULL)
314     return GNUNET_YES;
315   ret->client = GNUNET_CLIENT_connect ("statistics", ret->cfg);
316   if (ret->client != NULL)
317   {
318     for (i = 0; i < ret->watches_size; i++)
319       schedule_watch_request (ret, ret->watches[i]);
320     return GNUNET_YES;
321   }
322 #if DEBUG_STATISTICS
323   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
324               _("Failed to connect to statistics service!\n"));
325 #endif
326   return GNUNET_NO;
327 }
328
329
330 /**
331  * Free memory associated with the given action item.
332  */
333 static void
334 free_action_item (struct GNUNET_STATISTICS_GetHandle *ai)
335 {
336   GNUNET_free_non_null (ai->subsystem);
337   GNUNET_free_non_null (ai->name);
338   GNUNET_free (ai);
339 }
340
341
342 /**
343  * GET processing is complete, tell client about it.
344  */
345 static void
346 finish (struct GNUNET_STATISTICS_Handle *h, int code)
347 {
348   struct GNUNET_STATISTICS_GetHandle *pos = h->current;
349
350   h->current = NULL;
351   schedule_action (h);
352   if (pos != NULL)
353   {
354     if (pos->cont != NULL)
355       pos->cont (pos->cls, code);
356     free_action_item (pos);
357   }
358 }
359
360
361 /**
362  * Process the message.
363  *
364  * @return GNUNET_OK if the message was well-formed
365  */
366 static int
367 process_message (struct GNUNET_STATISTICS_Handle *h,
368                  const struct GNUNET_MessageHeader *msg)
369 {
370   char *service;
371   char *name;
372   const struct GNUNET_STATISTICS_ReplyMessage *smsg;
373   uint16_t size;
374
375   if (h->current->aborted)
376   {
377 #if DEBUG_STATISTICS
378     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
379                 "Iteration was aborted, ignoring VALUE\n");
380 #endif
381     return GNUNET_OK;           /* don't bother */
382   }
383   size = ntohs (msg->size);
384   if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
385   {
386     GNUNET_break (0);
387     return GNUNET_SYSERR;
388   }
389   smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
390   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
391   if (size != GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
392                                               size, 2, &service, &name))
393   {
394     GNUNET_break (0);
395     return GNUNET_SYSERR;
396   }
397 #if DEBUG_STATISTICS
398   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399               "Received valid statistic on `%s:%s': %llu\n",
400               service, name, GNUNET_ntohll (smsg->value));
401 #endif
402   if (GNUNET_OK !=
403       h->current->proc (h->current->cls,
404                         service,
405                         name,
406                         GNUNET_ntohll (smsg->value),
407                         0 !=
408                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
409   {
410 #if DEBUG_STATISTICS
411     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
412                 "Processing of remaining statistics aborted by client.\n");
413 #endif
414     h->current->aborted = GNUNET_YES;
415   }
416 #if DEBUG_STATISTICS
417   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "VALUE processed successfully\n");
418 #endif
419   return GNUNET_OK;
420 }
421
422
423 static int
424 process_watch_value (struct GNUNET_STATISTICS_Handle *h,
425                      const struct GNUNET_MessageHeader *msg)
426 {
427   const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
428   struct GNUNET_STATISTICS_WatchEntry *w;
429   uint32_t wid;
430
431   if (sizeof (struct GNUNET_STATISTICS_WatchValueMessage) != ntohs (msg->size))
432   {
433     GNUNET_break (0);
434     return GNUNET_SYSERR;
435   }
436   wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *) msg;
437   GNUNET_break (0 == ntohl (wvm->reserved));
438   wid = ntohl (wvm->wid);
439   if (wid >= h->watches_size)
440   {
441     GNUNET_break (0);
442     return GNUNET_SYSERR;
443   }
444   w = h->watches[wid];
445   (void) w->proc (w->proc_cls,
446                   w->subsystem,
447                   w->name,
448                   GNUNET_ntohll (wvm->value),
449                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
450   return GNUNET_OK;
451 }
452
453
454 /**
455  * Function called with messages from stats service.
456  *
457  * @param cls closure
458  * @param msg message received, NULL on timeout or fatal error
459  */
460 static void
461 receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
462 {
463   struct GNUNET_STATISTICS_Handle *h = cls;
464
465   if (msg == NULL)
466   {
467     if (NULL != h->client)
468     {
469       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
470       h->client = NULL;
471     }
472 #if DEBUG_STATISTICS
473     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
474                 "Error receiving statistics from service, is the service running?\n");
475 #endif
476     finish (h, GNUNET_SYSERR);
477     return;
478   }
479   switch (ntohs (msg->type))
480   {
481   case GNUNET_MESSAGE_TYPE_STATISTICS_END:
482 #if DEBUG_STATISTICS
483     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received end of statistics marker\n");
484 #endif
485     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
486     if (h->watches_size > 0)
487     {
488       GNUNET_CLIENT_receive (h->client,
489                              &receive_stats, h, GNUNET_TIME_UNIT_FOREVER_REL);
490     }
491     else
492     {
493       h->receiving = GNUNET_NO;
494     }
495     finish (h, GNUNET_OK);
496     return;
497   case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
498     if (GNUNET_OK == process_message (h, msg))
499     {
500       /* finally, look for more! */
501 #if DEBUG_STATISTICS
502       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
503                   "Processing VALUE done, now reading more\n");
504 #endif
505       GNUNET_CLIENT_receive (h->client,
506                              &receive_stats,
507                              h,
508                              GNUNET_TIME_absolute_get_remaining
509                              (h->current->timeout));
510       h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
511       return;
512     }
513     GNUNET_break (0);
514     break;
515   case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
516     if (GNUNET_OK == process_watch_value (h, msg))
517     {
518       h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
519       GNUNET_assert (h->watches_size > 0);
520       GNUNET_CLIENT_receive (h->client,
521                              &receive_stats, h, GNUNET_TIME_UNIT_FOREVER_REL);
522       return;
523     }
524     GNUNET_break (0);
525     break;
526   default:
527     GNUNET_break (0);
528     break;
529   }
530   if (NULL != h->client)
531   {
532     GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
533     h->client = NULL;
534   }
535   finish (h, GNUNET_SYSERR);
536 }
537
538
539 /**
540  * Transmit a GET request (and if successful, start to receive
541  * the response).
542  */
543 static size_t
544 transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
545 {
546   struct GNUNET_MessageHeader *hdr;
547   size_t slen1;
548   size_t slen2;
549   uint16_t msize;
550
551   if (buf == NULL)
552   {
553     /* timeout / error */
554 #if DEBUG_STATISTICS
555     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
556                 "Transmission of request for statistics failed!\n");
557 #endif
558     finish (handle, GNUNET_SYSERR);
559     return 0;
560   }
561   slen1 = strlen (handle->current->subsystem) + 1;
562   slen2 = strlen (handle->current->name) + 1;
563   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
564   GNUNET_assert (msize <= size);
565   hdr = (struct GNUNET_MessageHeader *) buf;
566   hdr->size = htons (msize);
567   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
568   GNUNET_assert (slen1 + slen2 ==
569                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
570                                              slen1 + slen2,
571                                              2,
572                                              handle->current->subsystem,
573                                              handle->current->name));
574   if (!handle->receiving)
575   {
576 #if DEBUG_STATISTICS
577     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
578                 "Transmission of GET done, now reading response\n");
579 #endif
580     handle->receiving = GNUNET_YES;
581     GNUNET_CLIENT_receive (handle->client,
582                            &receive_stats,
583                            handle,
584                            GNUNET_TIME_absolute_get_remaining (handle->
585                                                                current->timeout));
586   }
587   return msize;
588 }
589
590
591 /**
592  * Transmit a WATCH request (and if successful, start to receive
593  * the response).
594  */
595 static size_t
596 transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
597 {
598   struct GNUNET_MessageHeader *hdr;
599   size_t slen1;
600   size_t slen2;
601   uint16_t msize;
602
603   if (buf == NULL)
604   {
605     /* timeout / error */
606 #if DEBUG_STATISTICS
607     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
608                 "Transmission of request for statistics failed!\n");
609 #endif
610     finish (handle, GNUNET_SYSERR);
611     return 0;
612   }
613 #if DEBUG_STATISTICS
614   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
615               "Transmitting watch request for `%s'\n", handle->current->name);
616 #endif
617   slen1 = strlen (handle->current->subsystem) + 1;
618   slen2 = strlen (handle->current->name) + 1;
619   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
620   GNUNET_assert (msize <= size);
621   hdr = (struct GNUNET_MessageHeader *) buf;
622   hdr->size = htons (msize);
623   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
624   GNUNET_assert (slen1 + slen2 ==
625                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
626                                              slen1 + slen2,
627                                              2,
628                                              handle->current->subsystem,
629                                              handle->current->name));
630   if (GNUNET_YES != handle->receiving)
631   {
632     handle->receiving = GNUNET_YES;
633     GNUNET_CLIENT_receive (handle->client,
634                            &receive_stats,
635                            handle, GNUNET_TIME_UNIT_FOREVER_REL);
636   }
637   finish (handle, GNUNET_OK);
638   return msize;
639 }
640
641
642 /**
643  * Transmit a SET/UPDATE request.
644  */
645 static size_t
646 transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
647 {
648   struct GNUNET_STATISTICS_SetMessage *r;
649   size_t slen;
650   size_t nlen;
651   size_t nsize;
652
653   if (NULL == buf)
654   {
655     finish (handle, GNUNET_SYSERR);
656     return 0;
657   }
658
659   slen = strlen (handle->current->subsystem) + 1;
660   nlen = strlen (handle->current->name) + 1;
661   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
662   if (size < nsize)
663   {
664     GNUNET_break (0);
665     finish (handle, GNUNET_SYSERR);
666     return 0;
667   }
668   r = buf;
669   r->header.size = htons (nsize);
670   r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
671   r->flags = 0;
672   r->value = GNUNET_htonll (handle->current->value);
673   if (handle->current->make_persistent)
674     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
675   if (handle->current->type == ACTION_UPDATE)
676     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
677   GNUNET_assert (slen + nlen ==
678                  GNUNET_STRINGS_buffer_fill ((char *) &r[1],
679                                              slen + nlen,
680                                              2,
681                                              handle->current->subsystem,
682                                              handle->current->name));
683   finish (handle, GNUNET_OK);
684   return nsize;
685 }
686
687
688 static size_t
689 transmit_action (void *cls, size_t size, void *buf)
690 {
691   struct GNUNET_STATISTICS_Handle *handle = cls;
692   size_t ret;
693
694   handle->th = NULL;
695   switch (handle->current->type)
696   {
697   case ACTION_GET:
698     ret = transmit_get (handle, size, buf);
699     break;
700   case ACTION_SET:
701   case ACTION_UPDATE:
702     ret = transmit_set (handle, size, buf);
703     break;
704   case ACTION_WATCH:
705     ret = transmit_watch (handle, size, buf);
706     break;
707   default:
708     ret = 0;
709     GNUNET_break (0);
710     break;
711   }
712   return ret;
713 }
714
715
716 /**
717  * Get handle for the statistics service.
718  *
719  * @param subsystem name of subsystem using the service
720  * @param cfg services configuration in use
721  * @return handle to use
722  */
723 struct GNUNET_STATISTICS_Handle *
724 GNUNET_STATISTICS_create (const char *subsystem,
725                           const struct GNUNET_CONFIGURATION_Handle *cfg)
726 {
727   struct GNUNET_STATISTICS_Handle *ret;
728
729   GNUNET_assert (subsystem != NULL);
730   GNUNET_assert (cfg != NULL);
731   ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
732   ret->cfg = cfg;
733   ret->subsystem = GNUNET_strdup (subsystem);
734   ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
735   if (GNUNET_YES != try_connect (ret))
736   {
737     GNUNET_free (ret->subsystem);
738     GNUNET_free (ret);
739     return NULL;
740   }
741   return ret;
742 }
743
744
745 /**
746  * Destroy a handle (free all state associated with
747  * it).
748  *
749  * @param h statistics handle to destroy
750  * @param sync_first set to GNUNET_YES if pending SET requests should
751  *        be completed
752  */
753 void
754 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
755 {
756   struct GNUNET_STATISTICS_GetHandle *pos;
757   struct GNUNET_STATISTICS_GetHandle *next;
758   struct GNUNET_STATISTICS_GetHandle *prev;
759   struct GNUNET_TIME_Relative timeout;
760   int i;
761
762   if (h == NULL)
763     return;
764   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
765     GNUNET_SCHEDULER_cancel (h->backoff_task);
766   if (sync_first)
767   {
768     if (h->current != NULL)
769     {
770       if (h->current->type == ACTION_GET)
771       {
772         GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
773         h->th = NULL;
774         free_action_item (h->current);
775         h->current = NULL;
776       }
777     }
778     pos = h->action_head;
779     prev = NULL;
780     while (pos != NULL)
781     {
782       next = pos->next;
783       if (pos->type == ACTION_GET)
784       {
785         if (prev == NULL)
786           h->action_head = next;
787         else
788           prev->next = next;
789         free_action_item (pos);
790       }
791       else
792       {
793         prev = pos;
794       }
795       pos = next;
796     }
797     h->action_tail = prev;
798     if (h->current == NULL)
799     {
800       h->current = h->action_head;
801       if (h->action_head != NULL)
802       {
803         h->action_head = h->action_head->next;
804         if (h->action_head == NULL)
805           h->action_tail = NULL;
806       }
807     }
808     h->do_destroy = GNUNET_YES;
809     if ((h->current != NULL) && (h->th == NULL))
810     {
811       timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
812       h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
813                                                    h->current->msize,
814                                                    timeout,
815                                                    GNUNET_YES,
816                                                    &transmit_action, h);
817       GNUNET_assert (NULL != h->th);
818     }
819     if (h->th != NULL)
820       return;
821   }
822   if (NULL != h->th)
823   {
824     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
825     h->th = NULL;
826   }
827   if (h->current != NULL)
828     free_action_item (h->current);
829   while (NULL != (pos = h->action_head))
830   {
831     h->action_head = pos->next;
832     free_action_item (pos);
833   }
834   if (h->client != NULL)
835   {
836     GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
837     h->client = NULL;
838   }
839   for (i = 0; i < h->watches_size; i++)
840   {
841     GNUNET_free (h->watches[i]->subsystem);
842     GNUNET_free (h->watches[i]->name);
843     GNUNET_free (h->watches[i]);
844   }
845   GNUNET_array_grow (h->watches, h->watches_size, 0);
846   GNUNET_free (h->subsystem);
847   GNUNET_free (h);
848 }
849
850
851 static void
852 finish_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
853 {
854   struct GNUNET_STATISTICS_Handle *h = cls;
855
856   h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
857   finish (h, GNUNET_SYSERR);
858 }
859
860
861 /**
862  * Schedule the next action to be performed.
863  */
864 static void
865 schedule_action (struct GNUNET_STATISTICS_Handle *h)
866 {
867   struct GNUNET_TIME_Relative timeout;
868
869   if (h->current != NULL)
870     return;                     /* action already pending */
871   if (GNUNET_YES != try_connect (h))
872   {
873     h->backoff_task = GNUNET_SCHEDULER_add_delayed (h->backoff,
874                                                     &finish_task, h);
875     h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2);
876     h->backoff = GNUNET_TIME_relative_min (h->backoff,
877                                            GNUNET_CONSTANTS_SERVICE_TIMEOUT);
878     return;
879   }
880
881   /* schedule next action */
882   h->current = h->action_head;
883   if (NULL == h->current)
884   {
885     if (h->do_destroy)
886     {
887       h->do_destroy = GNUNET_NO;
888       GNUNET_STATISTICS_destroy (h, GNUNET_YES);
889     }
890     return;
891   }
892   GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current);
893   timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
894   if (NULL ==
895       (h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
896                                                     h->current->msize,
897                                                     timeout,
898                                                     GNUNET_YES,
899                                                     &transmit_action, h)))
900   {
901 #if DEBUG_STATISTICS
902     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
903                 "Failed to transmit request to statistics service.\n");
904 #endif
905     finish (h, GNUNET_SYSERR);
906   }
907 }
908
909
910 /**
911  * Get statistic from the peer.
912  *
913  * @param handle identification of the statistics service
914  * @param subsystem limit to the specified subsystem, NULL for our subsystem
915  * @param name name of the statistic value, NULL for all values
916  * @param timeout after how long should we give up (and call
917  *        cont with an error code)?
918  * @param cont continuation to call when done (can be NULL)
919  * @param proc function to call on each value
920  * @param cls closure for cont and proc
921  * @return NULL on error
922  */
923 struct GNUNET_STATISTICS_GetHandle *
924 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
925                        const char *subsystem,
926                        const char *name,
927                        struct GNUNET_TIME_Relative timeout,
928                        GNUNET_STATISTICS_Callback cont,
929                        GNUNET_STATISTICS_Iterator proc, void *cls)
930 {
931   size_t slen1;
932   size_t slen2;
933   struct GNUNET_STATISTICS_GetHandle *ai;
934
935   GNUNET_assert (handle != NULL);
936   GNUNET_assert (proc != NULL);
937   GNUNET_assert (GNUNET_NO == handle->do_destroy);
938   if (GNUNET_YES != try_connect (handle))
939   {
940 #if DEBUG_STATISTICS
941     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942                 "Failed to connect to statistics service, can not get value `%s:%s'.\n",
943                 strlen (subsystem) ? subsystem : "*",
944                 strlen (name) ? name : "*");
945 #endif
946     return NULL;
947   }
948   if (subsystem == NULL)
949     subsystem = "";
950   if (name == NULL)
951     name = "";
952   slen1 = strlen (subsystem) + 1;
953   slen2 = strlen (name) + 1;
954   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
955                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
956   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
957   ai->sh = handle;
958   ai->subsystem = GNUNET_strdup (subsystem);
959   ai->name = GNUNET_strdup (name);
960   ai->cont = cont;
961   ai->proc = proc;
962   ai->cls = cls;
963   ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
964   ai->type = ACTION_GET;
965   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
966   insert_ai (handle, ai);
967   return ai;
968 }
969
970
971 /**
972  * Cancel a 'get' request.  Must be called before the 'cont' 
973  * function is called.
974  *
975  * @param gh handle of the request to cancel
976  */
977 void
978 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
979 {
980   if (gh->sh->current == gh)
981   {
982     gh->aborted = GNUNET_YES;
983   }
984   else
985   {
986     GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, gh->sh->action_tail, gh);
987     GNUNET_free (gh->name);
988     GNUNET_free (gh->subsystem);
989     GNUNET_free (gh);
990   }
991 }
992
993
994 /**
995  * Watch statistics from the peer (be notified whenever they change).
996  * Note that the only way to cancel a "watch" request is to destroy
997  * the statistics handle given as the first argument to this call.
998  *
999  * @param handle identification of the statistics service
1000  * @param subsystem limit to the specified subsystem, never NULL
1001  * @param name name of the statistic value, never NULL
1002  * @param proc function to call on each value
1003  * @param proc_cls closure for proc
1004  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1005  */
1006 int
1007 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1008                          const char *subsystem,
1009                          const char *name,
1010                          GNUNET_STATISTICS_Iterator proc, void *proc_cls)
1011 {
1012   struct GNUNET_STATISTICS_WatchEntry *w;
1013
1014   if (handle == NULL)
1015     return GNUNET_SYSERR;
1016   w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
1017   w->subsystem = GNUNET_strdup (subsystem);
1018   w->name = GNUNET_strdup (name);
1019   w->proc = proc;
1020   w->proc_cls = proc_cls;
1021   GNUNET_array_append (handle->watches, handle->watches_size, w);
1022   schedule_watch_request (handle, w);
1023   return GNUNET_OK;
1024 }
1025
1026
1027 static void
1028 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
1029                    const char *name,
1030                    int make_persistent, uint64_t value, enum ActionType type)
1031 {
1032   struct GNUNET_STATISTICS_GetHandle *ai;
1033   size_t slen;
1034   size_t nlen;
1035   size_t nsize;
1036   int64_t delta;
1037
1038   GNUNET_assert (h != NULL);
1039   GNUNET_assert (name != NULL);
1040   if (GNUNET_YES != try_connect (h))
1041     return;
1042   slen = strlen (h->subsystem) + 1;
1043   nlen = strlen (name) + 1;
1044   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1045   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1046   {
1047     GNUNET_break (0);
1048     return;
1049   }
1050   ai = h->action_head;
1051   while (ai != NULL)
1052   {
1053     if ((0 == strcmp (ai->subsystem, h->subsystem)) &&
1054         (0 == strcmp (ai->name, name)) &&
1055         ((ai->type == ACTION_UPDATE) || (ai->type == ACTION_SET)))
1056     {
1057       if (ai->type == ACTION_SET)
1058       {
1059         if (type == ACTION_UPDATE)
1060         {
1061           delta = (int64_t) value;
1062           if (delta > 0)
1063           {
1064             ai->value += delta;
1065           }
1066           else
1067           {
1068             if (ai->value < -delta)
1069               ai->value = 0;
1070             else
1071               ai->value += delta;
1072           }
1073         }
1074         else
1075         {
1076           ai->value = value;
1077         }
1078       }
1079       else
1080       {
1081         if (type == ACTION_UPDATE)
1082         {
1083           delta = (int64_t) value;
1084           ai->value += delta;
1085         }
1086         else
1087         {
1088           ai->value = value;
1089           ai->type = type;
1090         }
1091       }
1092       ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1093       ai->make_persistent = make_persistent;
1094       return;
1095     }
1096     ai = ai->next;
1097   }
1098   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1099   ai->sh = h;
1100   ai->subsystem = GNUNET_strdup (h->subsystem);
1101   ai->name = GNUNET_strdup (name);
1102   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1103   ai->make_persistent = make_persistent;
1104   ai->msize = nsize;
1105   ai->value = value;
1106   ai->type = type;
1107   insert_ai (h, ai);
1108 }
1109
1110
1111 /**
1112  * Set statistic value for the peer.  Will always use our
1113  * subsystem (the argument used when "handle" was created).
1114  *
1115  * @param handle identification of the statistics service
1116  * @param name name of the statistic value
1117  * @param value new value to set
1118  * @param make_persistent should the value be kept across restarts?
1119  */
1120 void
1121 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1122                        const char *name, uint64_t value, int make_persistent)
1123 {
1124   if (handle == NULL)
1125     return;
1126   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1127   add_setter_action (handle, name, make_persistent, value, ACTION_SET);
1128 }
1129
1130
1131 /**
1132  * Set statistic value for the peer.  Will always use our
1133  * subsystem (the argument used when "handle" was created).
1134  *
1135  * @param handle identification of the statistics service
1136  * @param name name of the statistic value
1137  * @param delta change in value (added to existing value)
1138  * @param make_persistent should the value be kept across restarts?
1139  */
1140 void
1141 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1142                           const char *name, int64_t delta, int make_persistent)
1143 {
1144   if (handle == NULL)
1145     return;
1146   if (delta == 0)
1147     return;
1148   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1149   add_setter_action (handle, name, make_persistent,
1150                      (uint64_t) delta, ACTION_UPDATE);
1151 }
1152
1153
1154 /* end of statistics_api.c */