If I hashed the whole block I could not retrieve it without knowing it...
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_client_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_server_lib.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_strings_lib.h"
34 #include "statistics.h"
35
36 /**
37  * How long do we wait until a statistics request for setting
38  * a value times out?  (The update will be lost if the
39  * service does not react within this timeframe).  
40  */
41 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
42
43
44 /**
45  * Types of actions.
46  */
47 enum ActionType
48 {
49   ACTION_GET,
50   ACTION_SET,
51   ACTION_UPDATE,
52   ACTION_WATCH
53 };
54
55
56 /**
57  * Entry kept for each value we are watching.
58  */
59 struct GNUNET_STATISTICS_WatchEntry
60 {
61  
62   /**
63    * What subsystem is this action about? (never NULL)
64    */
65   char *subsystem;
66
67   /**
68    * What value is this action about? (never NULL)
69    */
70   char *name;
71
72   /**
73    * Function to call
74    */
75   GNUNET_STATISTICS_Iterator proc;
76
77   /**
78    * Closure for proc
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  * Linked list of things we still need to do.
87  */
88 struct GNUNET_STATISTICS_GetHandle
89 {
90
91   /**
92    * This is a doubly linked list.
93    */
94   struct GNUNET_STATISTICS_GetHandle *next;
95
96   /**
97    * This is a doubly linked list.
98    */
99   struct GNUNET_STATISTICS_GetHandle *prev;
100
101   /**
102    * Main statistics handle.
103    */
104   struct GNUNET_STATISTICS_Handle *sh;
105  
106   /**
107    * What subsystem is this action about? (can be NULL)
108    */
109   char *subsystem;
110
111   /**
112    * What value is this action about? (can be NULL)
113    */
114   char *name;
115
116   /**
117    * Continuation to call once action is complete.
118    */
119   GNUNET_STATISTICS_Callback cont;
120
121   /**
122    * Function to call (for GET actions only).
123    */
124   GNUNET_STATISTICS_Iterator proc;
125
126   /**
127    * Closure for proc and cont.
128    */
129   void *cls;
130
131   /**
132    * Timeout for this action.
133    */
134   struct GNUNET_TIME_Absolute timeout;
135
136   /**
137    * Associated value.
138    */
139   uint64_t value;
140
141   /**
142    * Flag for SET/UPDATE actions.
143    */
144   int make_persistent;
145
146   /**
147    * Has the current iteration been aborted; for GET actions.
148    */
149   int aborted;
150
151   /**
152    * Is this a GET, SET, UPDATE or WATCH?
153    */
154   enum ActionType type;
155
156   /**
157    * Size of the message that we will be transmitting.
158    */
159   uint16_t msize;
160
161 };
162
163
164 /**
165  * Handle for the service.
166  */
167 struct GNUNET_STATISTICS_Handle
168 {
169   /**
170    * Our scheduler.
171    */
172   struct GNUNET_SCHEDULER_Handle *sched;
173
174   /**
175    * Name of our subsystem.
176    */
177   char *subsystem;
178
179   /**
180    * Configuration to use.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Socket (if available).
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Currently pending transmission request.
191    */
192   struct GNUNET_CLIENT_TransmitHandle *th;
193
194   /**
195    * Head of the linked list of pending actions (first action
196    * to be performed).
197    */
198   struct GNUNET_STATISTICS_GetHandle *action_head;
199
200   /**
201    * Tail of the linked list of actions (for fast append).
202    */
203   struct GNUNET_STATISTICS_GetHandle *action_tail;
204
205   /**
206    * Action we are currently busy with (action request has been
207    * transmitted, we're now receiving the response from the
208    * service).
209    */
210   struct GNUNET_STATISTICS_GetHandle *current;
211
212   /**
213    * Array of watch entries.
214    */
215   struct GNUNET_STATISTICS_WatchEntry **watches;
216
217   /**
218    * Task doing exponential back-off trying to reconnect.
219    */
220   GNUNET_SCHEDULER_TaskIdentifier backoff_task;
221
222   /**
223    * Time for next connect retry.
224    */
225   struct GNUNET_TIME_Relative backoff;
226
227   /**
228    * Size of the 'watches' array.
229    */
230   unsigned int watches_size;
231
232   /**
233    * Should this handle auto-destruct once all actions have
234    * been processed?
235    */
236   int do_destroy;
237
238   /**
239    * Are we currently receiving from the service?
240    */
241   int receiving;
242
243 };
244
245
246
247 /**
248  * Schedule the next action to be performed.
249  */
250 static void schedule_action (struct GNUNET_STATISTICS_Handle *h);
251
252 /**
253  * Try to (re)connect to the statistics service.
254  *
255  * @return GNUNET_YES on success, GNUNET_NO on failure.
256  */
257 static int
258 try_connect (struct GNUNET_STATISTICS_Handle *ret);
259
260
261 static void
262 insert_ai (struct GNUNET_STATISTICS_Handle *h, struct GNUNET_STATISTICS_GetHandle *ai)
263 {
264   GNUNET_CONTAINER_DLL_insert_after (h->action_head,
265                                      h->action_tail,
266                                      h->action_tail,
267                                      ai);                                    
268   if (h->action_head == ai)
269     schedule_action (h);
270 }
271
272
273 static void
274 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
275                         struct GNUNET_STATISTICS_WatchEntry *watch)
276 {
277
278   struct GNUNET_STATISTICS_GetHandle *ai;
279   size_t slen;
280   size_t nlen;
281   size_t nsize;
282   
283   GNUNET_assert (h != NULL);
284   if (GNUNET_YES != try_connect (h))
285     {
286       schedule_action (h);
287       return;
288     }
289   slen = strlen (watch->subsystem) + 1;
290   nlen = strlen (watch->name) + 1;
291   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
292   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
293     {
294       GNUNET_break (0);
295       return;
296     }
297   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
298   ai->sh = h;
299   ai->subsystem = GNUNET_strdup (watch->subsystem);
300   ai->name = GNUNET_strdup (watch->name);
301   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
302   ai->msize = nsize;
303   ai->type = ACTION_WATCH;
304   ai->proc = watch->proc;
305   ai->cls = watch->proc_cls;
306   insert_ai (h, ai);
307 }
308
309
310 /**
311  * Try to (re)connect to the statistics service.
312  *
313  * @return GNUNET_YES on success, GNUNET_NO on failure.
314  */
315 static int
316 try_connect (struct GNUNET_STATISTICS_Handle *ret)
317 {
318   unsigned int i;
319   if (ret->client != NULL)
320     return GNUNET_YES;
321   ret->client = GNUNET_CLIENT_connect (ret->sched, "statistics", ret->cfg);
322   if (ret->client != NULL)
323     {
324       for (i=0;i<ret->watches_size;i++)
325         schedule_watch_request (ret, ret->watches[i]);
326       return GNUNET_YES;
327     }
328 #if DEBUG_STATISTICS
329   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
330               _("Failed to connect to statistics service!\n"));
331 #endif
332   return GNUNET_NO;
333 }
334
335
336 /**
337  * Free memory associated with the given action item.
338  */
339 static void
340 free_action_item (struct GNUNET_STATISTICS_GetHandle *ai)
341 {
342   GNUNET_free_non_null (ai->subsystem);
343   GNUNET_free_non_null (ai->name);
344   GNUNET_free (ai);
345 }
346
347
348 /**
349  * GET processing is complete, tell client about it.
350  */
351 static void
352 finish (struct GNUNET_STATISTICS_Handle *h, int code)
353 {
354   struct GNUNET_STATISTICS_GetHandle *pos = h->current;
355   h->current = NULL;
356   schedule_action (h);
357   if (pos != NULL)
358     {
359       if (pos->cont != NULL)
360         pos->cont (pos->cls, code);
361       free_action_item (pos);
362     }
363 }
364
365
366 /**
367  * Process the message.
368  *
369  * @return GNUNET_OK if the message was well-formed
370  */
371 static int
372 process_message (struct GNUNET_STATISTICS_Handle *h,
373                  const struct GNUNET_MessageHeader *msg)
374 {
375   char *service;
376   char *name;
377   const struct GNUNET_STATISTICS_ReplyMessage *smsg;
378   uint16_t size;
379
380   if (h->current->aborted)
381     {
382 #if DEBUG_STATISTICS
383       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
384                   "Iteration was aborted, ignoring VALUE\n");
385 #endif      
386       return GNUNET_OK;           /* don't bother */
387     }
388   size = ntohs (msg->size);
389   if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
390     {
391       GNUNET_break (0);
392       return GNUNET_SYSERR;
393     }
394   smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
395   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
396   if (size != GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
397                                               size, 2, &service, &name))
398     {
399       GNUNET_break (0);
400       return GNUNET_SYSERR;
401     }
402 #if DEBUG_STATISTICS
403   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404               "Received valid statistic on `%s:%s': %llu\n",
405               service, name, GNUNET_ntohll (smsg->value));
406 #endif
407   if (GNUNET_OK !=
408       h->current->proc (h->current->cls,
409                         service,
410                         name,
411                         GNUNET_ntohll (smsg->value),
412                         0 !=
413                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
414     {
415 #if DEBUG_STATISTICS
416       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
417                   "Processing of remaining statistics aborted by client.\n");
418 #endif
419       h->current->aborted = GNUNET_YES;    
420     }
421 #if DEBUG_STATISTICS
422   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
423               "VALUE processed successfully\n");
424 #endif      
425   return GNUNET_OK;
426 }
427
428
429 static int
430 process_watch_value (struct GNUNET_STATISTICS_Handle *h,
431                      const struct GNUNET_MessageHeader *msg)
432 {
433   const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
434   struct GNUNET_STATISTICS_WatchEntry *w;
435   uint32_t wid;
436
437   if (sizeof(struct GNUNET_STATISTICS_WatchValueMessage) !=
438       ntohs (msg->size))
439     {
440       GNUNET_break (0);
441       return GNUNET_SYSERR;
442     }
443   wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *)msg;
444   wid = ntohl (wvm->wid);
445   if (wid >= h->watches_size)
446     {
447       GNUNET_break (0);
448       return GNUNET_SYSERR;
449     }
450   w = h->watches[wid];
451   (void) w->proc (w->proc_cls,
452                   w->subsystem,
453                   w->name,
454                   GNUNET_ntohll (wvm->value),
455                   0 !=
456                   (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
457   return GNUNET_OK;
458 }
459
460
461 /**
462  * Function called with messages from stats service.
463  *
464  * @param cls closure
465  * @param msg message received, NULL on timeout or fatal error
466  */
467 static void
468 receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
469 {
470   struct GNUNET_STATISTICS_Handle *h = cls;
471
472   if (msg == NULL)
473     {
474       if (NULL != h->client)
475         {
476           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
477           h->client = NULL;
478         }
479 #if DEBUG_STATISTICS
480       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
481                   "Error receiving statistics from service, is the service running?\n" );
482 #endif
483       finish (h, GNUNET_SYSERR);
484       return;
485     }
486   switch (ntohs (msg->type))
487     {
488     case GNUNET_MESSAGE_TYPE_STATISTICS_END:
489 #if DEBUG_STATISTICS
490       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
491                   "Received end of statistics marker\n");
492 #endif
493       h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
494       if (h->watches_size > 0)
495         {
496           GNUNET_CLIENT_receive (h->client,
497                                  &receive_stats,
498                                  h,
499                                  GNUNET_TIME_UNIT_FOREVER_REL);
500         }
501       else
502         {
503           h->receiving = GNUNET_NO;
504         }
505       finish (h, GNUNET_OK);
506       return;
507     case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
508       if (GNUNET_OK == process_message (h, msg))
509         {
510           /* finally, look for more! */
511 #if DEBUG_STATISTICS
512           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513                       "Processing VALUE done, now reading more\n");
514 #endif      
515           GNUNET_CLIENT_receive (h->client,
516                                  &receive_stats,
517                                  h,
518                                  GNUNET_TIME_absolute_get_remaining
519                                  (h->current->timeout));
520           h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
521           return;
522         }
523       GNUNET_break (0);
524       break;
525     case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
526       if (GNUNET_OK ==
527           process_watch_value (h, 
528                                msg))
529         {
530           h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
531           GNUNET_assert (h->watches_size > 0);
532           GNUNET_CLIENT_receive (h->client,
533                                  &receive_stats,
534                                  h,
535                                  GNUNET_TIME_UNIT_FOREVER_REL);
536           return;
537         }
538       GNUNET_break (0);
539       break;
540     default:
541       GNUNET_break (0);
542       break;
543     }
544   if (NULL != h->client)
545     {
546       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
547       h->client = NULL;
548     }
549   finish (h, GNUNET_SYSERR);
550 }
551
552
553 /**
554  * Transmit a GET request (and if successful, start to receive
555  * the response).
556  */
557 static size_t
558 transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
559 {
560   struct GNUNET_MessageHeader *hdr;
561   size_t slen1;
562   size_t slen2;
563   uint16_t msize;
564
565   if (buf == NULL)
566     {
567       /* timeout / error */
568 #if DEBUG_STATISTICS
569       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
570                   "Transmission of request for statistics failed!\n");
571 #endif
572       finish (handle, GNUNET_SYSERR);
573       return 0;
574     }
575   slen1 = strlen (handle->current->subsystem) + 1;
576   slen2 = strlen (handle->current->name) + 1;
577   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
578   GNUNET_assert (msize <= size);
579   hdr = (struct GNUNET_MessageHeader *) buf;
580   hdr->size = htons (msize);
581   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
582   GNUNET_assert (slen1 + slen2 ==
583                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
584                                              slen1 + slen2,
585                                              2,
586                                              handle->current->subsystem,
587                                              handle->current->name));
588   if (! handle->receiving)
589     {
590 #if DEBUG_STATISTICS
591       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
592                   "Transmission of GET done, now reading response\n");
593 #endif      
594       handle->receiving = GNUNET_YES;
595       GNUNET_CLIENT_receive (handle->client,
596                              &receive_stats,
597                              handle,
598                              GNUNET_TIME_absolute_get_remaining (handle->
599                                                                  current->timeout));
600     }
601   return msize;
602 }
603
604
605 /**
606  * Transmit a WATCH request (and if successful, start to receive
607  * the response).
608  */
609 static size_t
610 transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
611 {
612   struct GNUNET_MessageHeader *hdr;
613   size_t slen1;
614   size_t slen2;
615   uint16_t msize;
616
617   if (buf == NULL)
618     {
619       /* timeout / error */
620 #if DEBUG_STATISTICS
621       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622                   "Transmission of request for statistics failed!\n");
623 #endif
624       finish (handle, GNUNET_SYSERR);
625       return 0;
626     }
627   slen1 = strlen (handle->current->subsystem) + 1;
628   slen2 = strlen (handle->current->name) + 1;
629   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
630   GNUNET_assert (msize <= size);
631   hdr = (struct GNUNET_MessageHeader *) buf;
632   hdr->size = htons (msize);
633   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
634   GNUNET_assert (slen1 + slen2 ==
635                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
636                                              slen1 + slen2,
637                                              2,
638                                              handle->current->subsystem,
639                                              handle->current->name));
640   if (! handle->receiving)
641     {
642       handle->receiving = GNUNET_YES;
643       GNUNET_CLIENT_receive (handle->client,
644                              &receive_stats,
645                              handle,
646                              GNUNET_TIME_UNIT_FOREVER_REL);
647     }
648   return msize;
649 }
650
651
652 /**
653  * Transmit a SET/UPDATE request.
654  */
655 static size_t
656 transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
657 {
658   struct GNUNET_STATISTICS_SetMessage *r;
659   size_t slen;
660   size_t nlen;
661   size_t nsize;
662
663   if (NULL == buf)
664     {
665       finish (handle, GNUNET_SYSERR);
666       return 0;
667     }
668
669   slen = strlen (handle->current->subsystem) + 1;
670   nlen = strlen (handle->current->name) + 1;
671   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
672   if (size < nsize)
673     {
674       GNUNET_break (0);
675       finish (handle, GNUNET_SYSERR);
676       return 0;
677     }
678   r = buf;
679   r->header.size = htons (nsize);
680   r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
681   r->flags = 0;
682   r->value = GNUNET_htonll (handle->current->value);
683   if (handle->current->make_persistent)
684     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
685   if (handle->current->type == ACTION_UPDATE)
686     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
687   GNUNET_assert (slen + nlen ==
688                  GNUNET_STRINGS_buffer_fill ((char *) &r[1],
689                                              slen + nlen,
690                                              2,
691                                              handle->current->subsystem,
692                                              handle->current->name));
693   finish (handle, GNUNET_OK);
694   return nsize;
695 }
696
697
698 static size_t
699 transmit_action (void *cls, size_t size, void *buf)
700 {
701   struct GNUNET_STATISTICS_Handle *handle = cls;
702   size_t ret;
703
704   handle->th = NULL;
705   switch (handle->current->type)
706     {
707     case ACTION_GET:
708       ret = transmit_get (handle, size, buf);
709       break;
710     case ACTION_SET:
711     case ACTION_UPDATE:
712       ret = transmit_set (handle, size, buf);
713       break;
714     case ACTION_WATCH:
715       ret = transmit_watch (handle, size, buf);
716       break;
717     default:
718       ret = 0;
719       GNUNET_break (0);
720       break; 
721     }
722   return ret;
723 }
724
725
726 /**
727  * Get handle for the statistics service.
728  *
729  * @param sched scheduler to use
730  * @param subsystem name of subsystem using the service
731  * @param cfg services configuration in use
732  * @return handle to use
733  */
734 struct GNUNET_STATISTICS_Handle *
735 GNUNET_STATISTICS_create (struct GNUNET_SCHEDULER_Handle *sched,
736                           const char *subsystem,
737                           const struct GNUNET_CONFIGURATION_Handle *cfg)
738 {
739   struct GNUNET_STATISTICS_Handle *ret;
740
741   GNUNET_assert (subsystem != NULL);
742   GNUNET_assert (sched != NULL);
743   GNUNET_assert (cfg != NULL);
744   ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
745   ret->sched = sched;
746   ret->cfg = cfg;
747   ret->subsystem = GNUNET_strdup (subsystem);
748   ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
749   try_connect (ret);
750   return ret;
751 }
752
753
754 /**
755  * Destroy a handle (free all state associated with
756  * it).
757  *
758  * @param h statistics handle to destroy
759  * @param sync_first set to GNUNET_YES if pending SET requests should
760  *        be completed
761  */
762 void
763 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
764                            int sync_first)
765 {
766   struct GNUNET_STATISTICS_GetHandle *pos;
767   struct GNUNET_STATISTICS_GetHandle *next;
768   struct GNUNET_STATISTICS_GetHandle *prev;
769   struct GNUNET_TIME_Relative timeout;
770   int i;
771
772   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
773     GNUNET_SCHEDULER_cancel (h->sched,
774                              h->backoff_task);
775   if (sync_first)
776     {
777       if (h->current != NULL)
778         {
779           if (h->current->type == ACTION_GET)
780             {
781               GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
782               h->th = NULL;
783               free_action_item (h->current);
784               h->current = NULL;
785             }
786         }
787       pos = h->action_head;
788       prev = NULL;
789       while (pos != NULL)
790         {
791           next = pos->next;
792           if (pos->type == ACTION_GET)
793             {
794               if (prev == NULL)
795                 h->action_head = next;
796               else
797                 prev->next = next;
798               free_action_item (pos);
799             }
800           else
801             {
802               prev = pos;
803             }
804           pos = next;
805         }
806       h->action_tail = prev;
807       if (h->current == NULL)
808         {
809           h->current = h->action_head;
810           if (h->action_head != NULL)
811             {
812               h->action_head = h->action_head->next;
813               if (h->action_head == NULL)
814                 h->action_tail = NULL;
815             }
816         }
817       h->do_destroy = GNUNET_YES;
818       if ( (h->current != NULL) &&
819            (h->th == NULL) )
820         {                                       
821           timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
822           h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
823                                                        h->current->msize,
824                                                        timeout,
825                                                        GNUNET_YES,
826                                                        &transmit_action, h);
827           GNUNET_assert (NULL != h->th);
828         }
829       if (h->th != NULL)
830         return;
831     }
832   if (NULL != h->th)
833     {
834       GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
835       h->th = NULL;
836     }
837   if (h->current != NULL)
838     free_action_item (h->current);
839   while (NULL != (pos = h->action_head))
840     {
841       h->action_head = pos->next;
842       free_action_item (pos);
843     }
844   if (h->client != NULL)
845     {
846       GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
847       h->client = NULL;
848     }
849   for (i=0;i<h->watches_size;i++)
850     {
851       GNUNET_free (h->watches[i]->subsystem);
852       GNUNET_free (h->watches[i]->name);
853       GNUNET_free (h->watches[i]);
854     }
855   GNUNET_array_grow (h->watches,
856                      h->watches_size,
857                      0);
858   GNUNET_free (h->subsystem);
859   GNUNET_free (h);
860 }
861
862
863 static void
864 finish_task (void *cls,
865              const struct GNUNET_SCHEDULER_TaskContext *tc)
866 {
867   struct GNUNET_STATISTICS_Handle *h = cls;
868
869   h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
870   finish (h, GNUNET_SYSERR);
871 }
872
873
874 /**
875  * Schedule the next action to be performed.
876  */
877 static void
878 schedule_action (struct GNUNET_STATISTICS_Handle *h)
879 {
880   struct GNUNET_TIME_Relative timeout;
881
882   if (h->current != NULL)
883     return;                     /* action already pending */
884   if (GNUNET_YES != try_connect (h))
885     {
886       h->backoff_task = GNUNET_SCHEDULER_add_delayed (h->sched,
887                                                       h->backoff,
888                                                       &finish_task,
889                                                       h);
890       h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2);
891       h->backoff = GNUNET_TIME_relative_min (h->backoff,
892                                              GNUNET_CONSTANTS_SERVICE_TIMEOUT);
893       return;
894     }
895
896   /* schedule next action */
897   h->current = h->action_head;
898   if (NULL == h->current)
899     {
900       if (h->do_destroy)
901         {
902           h->do_destroy = GNUNET_NO;
903           GNUNET_STATISTICS_destroy (h, GNUNET_YES);
904         }
905       return;
906     }
907   GNUNET_CONTAINER_DLL_remove (h->action_head,
908                                h->action_tail,
909                                h->current);
910   timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
911   if (NULL ==
912       (h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
913                                                     h->current->msize,
914                                                     timeout,
915                                                     GNUNET_YES,
916                                                     &transmit_action, h)))
917     {
918 #if DEBUG_STATISTICS
919       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
920                   "Failed to transmit request to statistics service.\n");
921 #endif
922       finish (h, GNUNET_SYSERR);
923     }
924 }
925
926 /**
927  * Get statistic from the peer.
928  *
929  * @param handle identification of the statistics service
930  * @param subsystem limit to the specified subsystem, NULL for our subsystem
931  * @param name name of the statistic value, NULL for all values
932  * @param timeout after how long should we give up (and call
933  *        cont with an error code)?
934  * @param cont continuation to call when done (can be NULL)
935  * @param proc function to call on each value
936  * @param cls closure for cont and proc
937  * @return NULL on error
938  */
939 struct GNUNET_STATISTICS_GetHandle *
940 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
941                        const char *subsystem,
942                        const char *name,
943                        struct GNUNET_TIME_Relative timeout,
944                        GNUNET_STATISTICS_Callback cont,
945                        GNUNET_STATISTICS_Iterator proc, void *cls)
946 {
947   size_t slen1;
948   size_t slen2;
949   struct GNUNET_STATISTICS_GetHandle *ai;
950
951   GNUNET_assert (handle != NULL);
952   GNUNET_assert (proc != NULL);
953   GNUNET_assert (GNUNET_NO == handle->do_destroy);
954   if (GNUNET_YES != try_connect (handle))
955     {
956 #if DEBUG_STATISTICS
957       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958                   "Failed to connect to statistics service, can not get value `%s:%s'.\n",
959                   strlen (subsystem) ? subsystem : "*",
960                   strlen (name) ? name : "*");
961 #endif
962       return NULL;
963     }
964   if (subsystem == NULL)
965     subsystem = "";
966   if (name == NULL)
967     name = "";
968   slen1 = strlen (subsystem) + 1;
969   slen2 = strlen (name) + 1;
970   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
971                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
972   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
973   ai->sh = handle;
974   ai->subsystem = GNUNET_strdup (subsystem);
975   ai->name = GNUNET_strdup (name);
976   ai->cont = cont;
977   ai->proc = proc;
978   ai->cls = cls;
979   ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
980   ai->type = ACTION_GET;
981   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
982   insert_ai (handle, ai);
983   return ai;
984 }
985
986
987 /**
988  * Cancel a 'get' request.  Must be called before the 'cont' 
989  * function is called.
990  *
991  * @param gh handle of the request to cancel
992  */
993 void
994 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
995 {
996   if (gh->sh->current == gh)
997     {
998       gh->aborted = GNUNET_YES;
999     }
1000   else
1001     {
1002       GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
1003                                    gh->sh->action_tail,
1004                                    gh);
1005       GNUNET_free (gh->name);
1006       GNUNET_free (gh->subsystem);
1007       GNUNET_free (gh);
1008     }
1009 }
1010
1011
1012 /**
1013  * Watch statistics from the peer (be notified whenever they change).
1014  * Note that the only way to cancel a "watch" request is to destroy
1015  * the statistics handle given as the first argument to this call.
1016  *
1017  * @param handle identification of the statistics service
1018  * @param subsystem limit to the specified subsystem, never NULL
1019  * @param name name of the statistic value, never NULL
1020  * @param proc function to call on each value
1021  * @param proc_cls closure for proc
1022  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1023  */
1024 int
1025 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1026                          const char *subsystem,
1027                          const char *name,
1028                          GNUNET_STATISTICS_Iterator proc, 
1029                          void *proc_cls)
1030 {
1031   struct GNUNET_STATISTICS_WatchEntry *w;
1032
1033   w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
1034   w->subsystem = GNUNET_strdup (subsystem);
1035   w->name = GNUNET_strdup (name);
1036   w->proc = proc;
1037   w->proc_cls = proc_cls;
1038   GNUNET_array_append (handle->watches,
1039                        handle->watches_size,
1040                        w);
1041   schedule_watch_request (handle, w);
1042   return GNUNET_OK;
1043 }
1044
1045
1046 static void
1047 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
1048                    const char *name,
1049                    int make_persistent,
1050                    uint64_t value, enum ActionType type)
1051 {
1052   struct GNUNET_STATISTICS_GetHandle *ai;
1053   size_t slen;
1054   size_t nlen;
1055   size_t nsize;
1056   int64_t delta;
1057   
1058   GNUNET_assert (h != NULL);
1059   GNUNET_assert (name != NULL);
1060   if (GNUNET_YES != try_connect (h))
1061     return;
1062   slen = strlen (h->subsystem) + 1;
1063   nlen = strlen (name) + 1;
1064   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1065   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1066     {
1067       GNUNET_break (0);
1068       return;
1069     }
1070   ai = h->action_head;
1071   while (ai != NULL)
1072     {
1073       if ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
1074            (0 == strcmp (ai->name, name)) &&
1075            ( (ai->type == ACTION_UPDATE) ||
1076              (ai->type == ACTION_SET) ) )
1077         {
1078           if (ai->type == ACTION_SET)
1079             {
1080               if (type == ACTION_UPDATE)
1081                 {
1082                   delta = (int64_t) value;
1083                   if (delta > 0) 
1084                     {
1085                       ai->value += delta;
1086                     }
1087                   else
1088                     {
1089                       if (ai->value < -delta)
1090                         ai->value = 0;
1091                       else
1092                         ai->value += delta;
1093                     }
1094                 }
1095               else
1096                 {
1097                   ai->value = value;
1098                 }
1099             }
1100           else
1101             {
1102               if (type == ACTION_UPDATE)
1103                 {
1104                   delta = (int64_t) value;
1105                   ai->value += delta;
1106                 }
1107               else
1108                 {
1109                   ai->value = value;
1110                   ai->type = type;
1111                 }
1112             }
1113           ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1114           ai->make_persistent = make_persistent;
1115           return;
1116         }
1117       ai = ai->next;
1118     }
1119   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1120   ai->sh = h;
1121   ai->subsystem = GNUNET_strdup (h->subsystem);
1122   ai->name = GNUNET_strdup (name);
1123   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1124   ai->make_persistent = make_persistent;
1125   ai->msize = nsize;
1126   ai->value = value;
1127   ai->type = type;
1128   insert_ai (h, ai);
1129 }
1130
1131
1132 /**
1133  * Set statistic value for the peer.  Will always use our
1134  * subsystem (the argument used when "handle" was created).
1135  *
1136  * @param handle identification of the statistics service
1137  * @param name name of the statistic value
1138  * @param value new value to set
1139  * @param make_persistent should the value be kept across restarts?
1140  */
1141 void
1142 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1143                        const char *name,
1144                        uint64_t value, int make_persistent)
1145 {
1146   if (handle == NULL)
1147     return;
1148   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1149   add_setter_action (handle, name, make_persistent, value, ACTION_SET);
1150 }
1151
1152
1153 /**
1154  * Set statistic value for the peer.  Will always use our
1155  * subsystem (the argument used when "handle" was created).
1156  *
1157  * @param handle identification of the statistics service
1158  * @param name name of the statistic value
1159  * @param delta change in value (added to existing value)
1160  * @param make_persistent should the value be kept across restarts?
1161  */
1162 void
1163 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1164                           const char *name,
1165                           int64_t delta, int make_persistent)
1166 {
1167   if (handle == NULL)
1168     return;
1169   if (delta == 0)
1170     return;
1171   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1172   add_setter_action (handle, name, make_persistent,
1173                      (uint64_t) delta, ACTION_UPDATE);
1174 }
1175
1176
1177 /* end of statistics_api.c */