b1d9b85aa8568223c739dd98b2bc65332194bc1e
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_client_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_server_lib.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_strings_lib.h"
34 #include "statistics.h"
35
36 /**
37  * How long do we wait until a statistics request for setting
38  * a value times out?  (The update will be lost if the
39  * service does not react within this timeframe).  
40  */
41 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
42
43
44 /**
45  * Types of actions.
46  */
47 enum ActionType
48 {
49   ACTION_GET,
50   ACTION_SET,
51   ACTION_UPDATE,
52   ACTION_WATCH
53 };
54
55
56 /**
57  * Entry kept for each value we are watching.
58  */
59 struct GNUNET_STATISTICS_WatchEntry
60 {
61  
62   /**
63    * What subsystem is this action about? (never NULL)
64    */
65   char *subsystem;
66
67   /**
68    * What value is this action about? (never NULL)
69    */
70   char *name;
71
72   /**
73    * Function to call
74    */
75   GNUNET_STATISTICS_Iterator proc;
76
77   /**
78    * Closure for proc
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  * Linked list of things we still need to do.
87  */
88 struct GNUNET_STATISTICS_GetHandle
89 {
90
91   /**
92    * This is a doubly linked list.
93    */
94   struct GNUNET_STATISTICS_GetHandle *next;
95
96   /**
97    * This is a doubly linked list.
98    */
99   struct GNUNET_STATISTICS_GetHandle *prev;
100
101   /**
102    * Main statistics handle.
103    */
104   struct GNUNET_STATISTICS_Handle *sh;
105  
106   /**
107    * What subsystem is this action about? (can be NULL)
108    */
109   char *subsystem;
110
111   /**
112    * What value is this action about? (can be NULL)
113    */
114   char *name;
115
116   /**
117    * Continuation to call once action is complete.
118    */
119   GNUNET_STATISTICS_Callback cont;
120
121   /**
122    * Function to call (for GET actions only).
123    */
124   GNUNET_STATISTICS_Iterator proc;
125
126   /**
127    * Closure for proc and cont.
128    */
129   void *cls;
130
131   /**
132    * Timeout for this action.
133    */
134   struct GNUNET_TIME_Absolute timeout;
135
136   /**
137    * Associated value.
138    */
139   uint64_t value;
140
141   /**
142    * Flag for SET/UPDATE actions.
143    */
144   int make_persistent;
145
146   /**
147    * Has the current iteration been aborted; for GET actions.
148    */
149   int aborted;
150
151   /**
152    * Is this a GET, SET, UPDATE or WATCH?
153    */
154   enum ActionType type;
155
156   /**
157    * Size of the message that we will be transmitting.
158    */
159   uint16_t msize;
160
161 };
162
163
164 /**
165  * Handle for the service.
166  */
167 struct GNUNET_STATISTICS_Handle
168 {
169   /**
170    * Name of our subsystem.
171    */
172   char *subsystem;
173
174   /**
175    * Configuration to use.
176    */
177   const struct GNUNET_CONFIGURATION_Handle *cfg;
178
179   /**
180    * Socket (if available).
181    */
182   struct GNUNET_CLIENT_Connection *client;
183
184   /**
185    * Currently pending transmission request.
186    */
187   struct GNUNET_CLIENT_TransmitHandle *th;
188
189   /**
190    * Head of the linked list of pending actions (first action
191    * to be performed).
192    */
193   struct GNUNET_STATISTICS_GetHandle *action_head;
194
195   /**
196    * Tail of the linked list of actions (for fast append).
197    */
198   struct GNUNET_STATISTICS_GetHandle *action_tail;
199
200   /**
201    * Action we are currently busy with (action request has been
202    * transmitted, we're now receiving the response from the
203    * service).
204    */
205   struct GNUNET_STATISTICS_GetHandle *current;
206
207   /**
208    * Array of watch entries.
209    */
210   struct GNUNET_STATISTICS_WatchEntry **watches;
211
212   /**
213    * Task doing exponential back-off trying to reconnect.
214    */
215   GNUNET_SCHEDULER_TaskIdentifier backoff_task;
216
217   /**
218    * Time for next connect retry.
219    */
220   struct GNUNET_TIME_Relative backoff;
221
222   /**
223    * Size of the 'watches' array.
224    */
225   unsigned int watches_size;
226
227   /**
228    * Should this handle auto-destruct once all actions have
229    * been processed?
230    */
231   int do_destroy;
232
233   /**
234    * Are we currently receiving from the service?
235    */
236   int receiving;
237
238 };
239
240
241
242 /**
243  * Schedule the next action to be performed.
244  */
245 static void schedule_action (struct GNUNET_STATISTICS_Handle *h);
246
247 /**
248  * Try to (re)connect to the statistics service.
249  *
250  * @return GNUNET_YES on success, GNUNET_NO on failure.
251  */
252 static int
253 try_connect (struct GNUNET_STATISTICS_Handle *ret);
254
255
256 static void
257 insert_ai (struct GNUNET_STATISTICS_Handle *h, struct GNUNET_STATISTICS_GetHandle *ai)
258 {
259   GNUNET_CONTAINER_DLL_insert_after (h->action_head,
260                                      h->action_tail,
261                                      h->action_tail,
262                                      ai);                                    
263   if (h->action_head == ai)
264     schedule_action (h);
265 }
266
267
268 static void
269 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
270                         struct GNUNET_STATISTICS_WatchEntry *watch)
271 {
272
273   struct GNUNET_STATISTICS_GetHandle *ai;
274   size_t slen;
275   size_t nlen;
276   size_t nsize;
277   
278   GNUNET_assert (h != NULL);
279   if (GNUNET_YES != try_connect (h))
280     {
281       schedule_action (h);
282       return;
283     }
284   slen = strlen (watch->subsystem) + 1;
285   nlen = strlen (watch->name) + 1;
286   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
287   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
288     {
289       GNUNET_break (0);
290       return;
291     }
292   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
293   ai->sh = h;
294   ai->subsystem = GNUNET_strdup (watch->subsystem);
295   ai->name = GNUNET_strdup (watch->name);
296   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
297   ai->msize = nsize;
298   ai->type = ACTION_WATCH;
299   ai->proc = watch->proc;
300   ai->cls = watch->proc_cls;
301   insert_ai (h, ai);
302 }
303
304
305 /**
306  * Try to (re)connect to the statistics service.
307  *
308  * @return GNUNET_YES on success, GNUNET_NO on failure.
309  */
310 static int
311 try_connect (struct GNUNET_STATISTICS_Handle *ret)
312 {
313   unsigned int i;
314   if (ret->client != NULL)
315     return GNUNET_YES;
316   ret->client = GNUNET_CLIENT_connect ("statistics", ret->cfg);
317   if (ret->client != NULL)
318     {
319       for (i=0;i<ret->watches_size;i++)
320         schedule_watch_request (ret, ret->watches[i]);
321       return GNUNET_YES;
322     }
323 #if DEBUG_STATISTICS
324   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
325               _("Failed to connect to statistics service!\n"));
326 #endif
327   return GNUNET_NO;
328 }
329
330
331 /**
332  * Free memory associated with the given action item.
333  */
334 static void
335 free_action_item (struct GNUNET_STATISTICS_GetHandle *ai)
336 {
337   GNUNET_free_non_null (ai->subsystem);
338   GNUNET_free_non_null (ai->name);
339   GNUNET_free (ai);
340 }
341
342
343 /**
344  * GET processing is complete, tell client about it.
345  */
346 static void
347 finish (struct GNUNET_STATISTICS_Handle *h, int code)
348 {
349   struct GNUNET_STATISTICS_GetHandle *pos = h->current;
350   h->current = NULL;
351   schedule_action (h);
352   if (pos != NULL)
353     {
354       if (pos->cont != NULL)
355         pos->cont (pos->cls, code);
356       free_action_item (pos);
357     }
358 }
359
360
361 /**
362  * Process the message.
363  *
364  * @return GNUNET_OK if the message was well-formed
365  */
366 static int
367 process_message (struct GNUNET_STATISTICS_Handle *h,
368                  const struct GNUNET_MessageHeader *msg)
369 {
370   char *service;
371   char *name;
372   const struct GNUNET_STATISTICS_ReplyMessage *smsg;
373   uint16_t size;
374
375   if (h->current->aborted)
376     {
377 #if DEBUG_STATISTICS
378       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
379                   "Iteration was aborted, ignoring VALUE\n");
380 #endif      
381       return GNUNET_OK;           /* don't bother */
382     }
383   size = ntohs (msg->size);
384   if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
385     {
386       GNUNET_break (0);
387       return GNUNET_SYSERR;
388     }
389   smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
390   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
391   if (size != GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
392                                               size, 2, &service, &name))
393     {
394       GNUNET_break (0);
395       return GNUNET_SYSERR;
396     }
397 #if DEBUG_STATISTICS
398   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399               "Received valid statistic on `%s:%s': %llu\n",
400               service, name, GNUNET_ntohll (smsg->value));
401 #endif
402   if (GNUNET_OK !=
403       h->current->proc (h->current->cls,
404                         service,
405                         name,
406                         GNUNET_ntohll (smsg->value),
407                         0 !=
408                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
409     {
410 #if DEBUG_STATISTICS
411       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
412                   "Processing of remaining statistics aborted by client.\n");
413 #endif
414       h->current->aborted = GNUNET_YES;    
415     }
416 #if DEBUG_STATISTICS
417   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
418               "VALUE processed successfully\n");
419 #endif      
420   return GNUNET_OK;
421 }
422
423
424 static int
425 process_watch_value (struct GNUNET_STATISTICS_Handle *h,
426                      const struct GNUNET_MessageHeader *msg)
427 {
428   const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
429   struct GNUNET_STATISTICS_WatchEntry *w;
430   uint32_t wid;
431
432   if (sizeof(struct GNUNET_STATISTICS_WatchValueMessage) !=
433       ntohs (msg->size))
434     {
435       GNUNET_break (0);
436       return GNUNET_SYSERR;
437     }
438   wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *)msg;
439   GNUNET_break (0 == ntohl (wvm->reserved));
440   wid = ntohl (wvm->wid);
441   if (wid >= h->watches_size)
442     {
443       GNUNET_break (0);
444       return GNUNET_SYSERR;
445     }
446   w = h->watches[wid];
447   (void) w->proc (w->proc_cls,
448                   w->subsystem,
449                   w->name,
450                   GNUNET_ntohll (wvm->value),
451                   0 !=
452                   (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
453   return GNUNET_OK;
454 }
455
456
457 /**
458  * Function called with messages from stats service.
459  *
460  * @param cls closure
461  * @param msg message received, NULL on timeout or fatal error
462  */
463 static void
464 receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
465 {
466   struct GNUNET_STATISTICS_Handle *h = cls;
467
468   if (msg == NULL)
469     {
470       if (NULL != h->client)
471         {
472           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
473           h->client = NULL;
474         }
475 #if DEBUG_STATISTICS
476       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
477                   "Error receiving statistics from service, is the service running?\n" );
478 #endif
479       finish (h, GNUNET_SYSERR);
480       return;
481     }
482   switch (ntohs (msg->type))
483     {
484     case GNUNET_MESSAGE_TYPE_STATISTICS_END:
485 #if DEBUG_STATISTICS
486       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
487                   "Received end of statistics marker\n");
488 #endif
489       h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
490       if (h->watches_size > 0)
491         {
492           GNUNET_CLIENT_receive (h->client,
493                                  &receive_stats,
494                                  h,
495                                  GNUNET_TIME_UNIT_FOREVER_REL);
496         }
497       else
498         {
499           h->receiving = GNUNET_NO;
500         }
501       finish (h, GNUNET_OK);
502       return;
503     case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
504       if (GNUNET_OK == process_message (h, msg))
505         {
506           /* finally, look for more! */
507 #if DEBUG_STATISTICS
508           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
509                       "Processing VALUE done, now reading more\n");
510 #endif      
511           GNUNET_CLIENT_receive (h->client,
512                                  &receive_stats,
513                                  h,
514                                  GNUNET_TIME_absolute_get_remaining
515                                  (h->current->timeout));
516           h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
517           return;
518         }
519       GNUNET_break (0);
520       break;
521     case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
522       if (GNUNET_OK ==
523           process_watch_value (h, 
524                                msg))
525         {
526           h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
527           GNUNET_assert (h->watches_size > 0);
528           GNUNET_CLIENT_receive (h->client,
529                                  &receive_stats,
530                                  h,
531                                  GNUNET_TIME_UNIT_FOREVER_REL);
532           return;
533         }
534       GNUNET_break (0);
535       break;
536     default:
537       GNUNET_break (0);
538       break;
539     }
540   if (NULL != h->client)
541     {
542       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
543       h->client = NULL;
544     }
545   finish (h, GNUNET_SYSERR);
546 }
547
548
549 /**
550  * Transmit a GET request (and if successful, start to receive
551  * the response).
552  */
553 static size_t
554 transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
555 {
556   struct GNUNET_MessageHeader *hdr;
557   size_t slen1;
558   size_t slen2;
559   uint16_t msize;
560
561   if (buf == NULL)
562     {
563       /* timeout / error */
564 #if DEBUG_STATISTICS
565       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
566                   "Transmission of request for statistics failed!\n");
567 #endif
568       finish (handle, GNUNET_SYSERR);
569       return 0;
570     }
571   slen1 = strlen (handle->current->subsystem) + 1;
572   slen2 = strlen (handle->current->name) + 1;
573   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
574   GNUNET_assert (msize <= size);
575   hdr = (struct GNUNET_MessageHeader *) buf;
576   hdr->size = htons (msize);
577   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
578   GNUNET_assert (slen1 + slen2 ==
579                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
580                                              slen1 + slen2,
581                                              2,
582                                              handle->current->subsystem,
583                                              handle->current->name));
584   if (! handle->receiving)
585     {
586 #if DEBUG_STATISTICS
587       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
588                   "Transmission of GET done, now reading response\n");
589 #endif      
590       handle->receiving = GNUNET_YES;
591       GNUNET_CLIENT_receive (handle->client,
592                              &receive_stats,
593                              handle,
594                              GNUNET_TIME_absolute_get_remaining (handle->
595                                                                  current->timeout));
596     }
597   return msize;
598 }
599
600
601 /**
602  * Transmit a WATCH request (and if successful, start to receive
603  * the response).
604  */
605 static size_t
606 transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
607 {
608   struct GNUNET_MessageHeader *hdr;
609   size_t slen1;
610   size_t slen2;
611   uint16_t msize;
612
613   if (buf == NULL)
614     {
615       /* timeout / error */
616 #if DEBUG_STATISTICS
617       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618                   "Transmission of request for statistics failed!\n");
619 #endif
620       finish (handle, GNUNET_SYSERR);
621       return 0;
622     }
623 #if DEBUG_STATISTICS
624   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
625               "Transmitting watch request for `%s'\n",
626               handle->current->name);
627 #endif
628   slen1 = strlen (handle->current->subsystem) + 1;
629   slen2 = strlen (handle->current->name) + 1;
630   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
631   GNUNET_assert (msize <= size);
632   hdr = (struct GNUNET_MessageHeader *) buf;
633   hdr->size = htons (msize);
634   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
635   GNUNET_assert (slen1 + slen2 ==
636                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
637                                              slen1 + slen2,
638                                              2,
639                                              handle->current->subsystem,
640                                              handle->current->name));
641   if (GNUNET_YES != handle->receiving)
642     {
643       handle->receiving = GNUNET_YES;
644       GNUNET_CLIENT_receive (handle->client,
645                              &receive_stats,
646                              handle,
647                              GNUNET_TIME_UNIT_FOREVER_REL);
648     }
649   finish (handle, GNUNET_OK);
650   return msize;
651 }
652
653
654 /**
655  * Transmit a SET/UPDATE request.
656  */
657 static size_t
658 transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
659 {
660   struct GNUNET_STATISTICS_SetMessage *r;
661   size_t slen;
662   size_t nlen;
663   size_t nsize;
664
665   if (NULL == buf)
666     {
667       finish (handle, GNUNET_SYSERR);
668       return 0;
669     }
670
671   slen = strlen (handle->current->subsystem) + 1;
672   nlen = strlen (handle->current->name) + 1;
673   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
674   if (size < nsize)
675     {
676       GNUNET_break (0);
677       finish (handle, GNUNET_SYSERR);
678       return 0;
679     }
680   r = buf;
681   r->header.size = htons (nsize);
682   r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
683   r->flags = 0;
684   r->value = GNUNET_htonll (handle->current->value);
685   if (handle->current->make_persistent)
686     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
687   if (handle->current->type == ACTION_UPDATE)
688     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
689   GNUNET_assert (slen + nlen ==
690                  GNUNET_STRINGS_buffer_fill ((char *) &r[1],
691                                              slen + nlen,
692                                              2,
693                                              handle->current->subsystem,
694                                              handle->current->name));
695   finish (handle, GNUNET_OK);
696   return nsize;
697 }
698
699
700 static size_t
701 transmit_action (void *cls, size_t size, void *buf)
702 {
703   struct GNUNET_STATISTICS_Handle *handle = cls;
704   size_t ret;
705
706   handle->th = NULL;
707   switch (handle->current->type)
708     {
709     case ACTION_GET:
710       ret = transmit_get (handle, size, buf);
711       break;
712     case ACTION_SET:
713     case ACTION_UPDATE:
714       ret = transmit_set (handle, size, buf);
715       break;
716     case ACTION_WATCH:
717       ret = transmit_watch (handle, size, buf);
718       break;
719     default:
720       ret = 0;
721       GNUNET_break (0);
722       break; 
723     }
724   return ret;
725 }
726
727
728 /**
729  * Get handle for the statistics service.
730  *
731  * @param subsystem name of subsystem using the service
732  * @param cfg services configuration in use
733  * @return handle to use
734  */
735 struct GNUNET_STATISTICS_Handle *
736 GNUNET_STATISTICS_create (const char *subsystem,
737                           const struct GNUNET_CONFIGURATION_Handle *cfg)
738 {
739   struct GNUNET_STATISTICS_Handle *ret;
740
741   GNUNET_assert (subsystem != NULL);
742   GNUNET_assert (cfg != NULL);
743   ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
744   ret->cfg = cfg;
745   ret->subsystem = GNUNET_strdup (subsystem);
746   ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
747   if (GNUNET_YES != try_connect (ret))
748     {
749       GNUNET_free (ret->subsystem);
750       GNUNET_free (ret);
751       return NULL;
752     }
753   return ret;
754 }
755
756
757 /**
758  * Destroy a handle (free all state associated with
759  * it).
760  *
761  * @param h statistics handle to destroy
762  * @param sync_first set to GNUNET_YES if pending SET requests should
763  *        be completed
764  */
765 void
766 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
767                            int sync_first)
768 {
769   struct GNUNET_STATISTICS_GetHandle *pos;
770   struct GNUNET_STATISTICS_GetHandle *next;
771   struct GNUNET_STATISTICS_GetHandle *prev;
772   struct GNUNET_TIME_Relative timeout;
773   int i;
774
775   if (h == NULL) return;
776   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
777     GNUNET_SCHEDULER_cancel (h->backoff_task);
778   if (sync_first)
779     {
780       if (h->current != NULL)
781         {
782           if (h->current->type == ACTION_GET)
783             {
784               GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
785               h->th = NULL;
786               free_action_item (h->current);
787               h->current = NULL;
788             }
789         }
790       pos = h->action_head;
791       prev = NULL;
792       while (pos != NULL)
793         {
794           next = pos->next;
795           if (pos->type == ACTION_GET)
796             {
797               if (prev == NULL)
798                 h->action_head = next;
799               else
800                 prev->next = next;
801               free_action_item (pos);
802             }
803           else
804             {
805               prev = pos;
806             }
807           pos = next;
808         }
809       h->action_tail = prev;
810       if (h->current == NULL)
811         {
812           h->current = h->action_head;
813           if (h->action_head != NULL)
814             {
815               h->action_head = h->action_head->next;
816               if (h->action_head == NULL)
817                 h->action_tail = NULL;
818             }
819         }
820       h->do_destroy = GNUNET_YES;
821       if ( (h->current != NULL) &&
822            (h->th == NULL) )
823         {                                       
824           timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
825           h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
826                                                        h->current->msize,
827                                                        timeout,
828                                                        GNUNET_YES,
829                                                        &transmit_action, h);
830           GNUNET_assert (NULL != h->th);
831         }
832       if (h->th != NULL)
833         return;
834     }
835   if (NULL != h->th)
836     {
837       GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
838       h->th = NULL;
839     }
840   if (h->current != NULL)
841     free_action_item (h->current);
842   while (NULL != (pos = h->action_head))
843     {
844       h->action_head = pos->next;
845       free_action_item (pos);
846     }
847   if (h->client != NULL)
848     {
849       GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
850       h->client = NULL;
851     }
852   for (i=0;i<h->watches_size;i++)
853     {
854       GNUNET_free (h->watches[i]->subsystem);
855       GNUNET_free (h->watches[i]->name);
856       GNUNET_free (h->watches[i]);
857     }
858   GNUNET_array_grow (h->watches,
859                      h->watches_size,
860                      0);
861   GNUNET_free (h->subsystem);
862   GNUNET_free (h);
863 }
864
865
866 static void
867 finish_task (void *cls,
868              const struct GNUNET_SCHEDULER_TaskContext *tc)
869 {
870   struct GNUNET_STATISTICS_Handle *h = cls;
871
872   h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
873   finish (h, GNUNET_SYSERR);
874 }
875
876
877 /**
878  * Schedule the next action to be performed.
879  */
880 static void
881 schedule_action (struct GNUNET_STATISTICS_Handle *h)
882 {
883   struct GNUNET_TIME_Relative timeout;
884
885   if (h->current != NULL)
886     return;                     /* action already pending */
887   if (GNUNET_YES != try_connect (h))
888     {
889       h->backoff_task = GNUNET_SCHEDULER_add_delayed (h->backoff,
890                                                       &finish_task,
891                                                       h);
892       h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2);
893       h->backoff = GNUNET_TIME_relative_min (h->backoff,
894                                              GNUNET_CONSTANTS_SERVICE_TIMEOUT);
895       return;
896     }
897
898   /* schedule next action */
899   h->current = h->action_head;
900   if (NULL == h->current)
901     {
902       if (h->do_destroy)
903         {
904           h->do_destroy = GNUNET_NO;
905           GNUNET_STATISTICS_destroy (h, GNUNET_YES);
906         }
907       return;
908     }
909   GNUNET_CONTAINER_DLL_remove (h->action_head,
910                                h->action_tail,
911                                h->current);
912   timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
913   if (NULL ==
914       (h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
915                                                     h->current->msize,
916                                                     timeout,
917                                                     GNUNET_YES,
918                                                     &transmit_action, h)))
919     {
920 #if DEBUG_STATISTICS
921       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
922                   "Failed to transmit request to statistics service.\n");
923 #endif
924       finish (h, GNUNET_SYSERR);
925     }
926 }
927
928
929 /**
930  * Get statistic from the peer.
931  *
932  * @param handle identification of the statistics service
933  * @param subsystem limit to the specified subsystem, NULL for our subsystem
934  * @param name name of the statistic value, NULL for all values
935  * @param timeout after how long should we give up (and call
936  *        cont with an error code)?
937  * @param cont continuation to call when done (can be NULL)
938  * @param proc function to call on each value
939  * @param cls closure for cont and proc
940  * @return NULL on error
941  */
942 struct GNUNET_STATISTICS_GetHandle *
943 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
944                        const char *subsystem,
945                        const char *name,
946                        struct GNUNET_TIME_Relative timeout,
947                        GNUNET_STATISTICS_Callback cont,
948                        GNUNET_STATISTICS_Iterator proc, void *cls)
949 {
950   size_t slen1;
951   size_t slen2;
952   struct GNUNET_STATISTICS_GetHandle *ai;
953
954   GNUNET_assert (handle != NULL);
955   GNUNET_assert (proc != NULL);
956   GNUNET_assert (GNUNET_NO == handle->do_destroy);
957   if (GNUNET_YES != try_connect (handle))
958     {
959 #if DEBUG_STATISTICS
960       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
961                   "Failed to connect to statistics service, can not get value `%s:%s'.\n",
962                   strlen (subsystem) ? subsystem : "*",
963                   strlen (name) ? name : "*");
964 #endif
965       return NULL;
966     }
967   if (subsystem == NULL)
968     subsystem = "";
969   if (name == NULL)
970     name = "";
971   slen1 = strlen (subsystem) + 1;
972   slen2 = strlen (name) + 1;
973   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
974                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
975   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
976   ai->sh = handle;
977   ai->subsystem = GNUNET_strdup (subsystem);
978   ai->name = GNUNET_strdup (name);
979   ai->cont = cont;
980   ai->proc = proc;
981   ai->cls = cls;
982   ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
983   ai->type = ACTION_GET;
984   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
985   insert_ai (handle, ai);
986   return ai;
987 }
988
989
990 /**
991  * Cancel a 'get' request.  Must be called before the 'cont' 
992  * function is called.
993  *
994  * @param gh handle of the request to cancel
995  */
996 void
997 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
998 {
999   if (gh->sh->current == gh)
1000     {
1001       gh->aborted = GNUNET_YES;
1002     }
1003   else
1004     {
1005       GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
1006                                    gh->sh->action_tail,
1007                                    gh);
1008       GNUNET_free (gh->name);
1009       GNUNET_free (gh->subsystem);
1010       GNUNET_free (gh);
1011     }
1012 }
1013
1014
1015 /**
1016  * Watch statistics from the peer (be notified whenever they change).
1017  * Note that the only way to cancel a "watch" request is to destroy
1018  * the statistics handle given as the first argument to this call.
1019  *
1020  * @param handle identification of the statistics service
1021  * @param subsystem limit to the specified subsystem, never NULL
1022  * @param name name of the statistic value, never NULL
1023  * @param proc function to call on each value
1024  * @param proc_cls closure for proc
1025  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1026  */
1027 int
1028 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1029                          const char *subsystem,
1030                          const char *name,
1031                          GNUNET_STATISTICS_Iterator proc, 
1032                          void *proc_cls)
1033 {
1034   struct GNUNET_STATISTICS_WatchEntry *w;
1035
1036   if (handle == NULL) 
1037     return GNUNET_SYSERR;
1038   w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
1039   w->subsystem = GNUNET_strdup (subsystem);
1040   w->name = GNUNET_strdup (name);
1041   w->proc = proc;
1042   w->proc_cls = proc_cls;
1043   GNUNET_array_append (handle->watches,
1044                        handle->watches_size,
1045                        w);
1046   schedule_watch_request (handle, w);
1047   return GNUNET_OK;
1048 }
1049
1050
1051 static void
1052 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
1053                    const char *name,
1054                    int make_persistent,
1055                    uint64_t value, enum ActionType type)
1056 {
1057   struct GNUNET_STATISTICS_GetHandle *ai;
1058   size_t slen;
1059   size_t nlen;
1060   size_t nsize;
1061   int64_t delta;
1062   
1063   GNUNET_assert (h != NULL);
1064   GNUNET_assert (name != NULL);
1065   if (GNUNET_YES != try_connect (h))
1066     return;
1067   slen = strlen (h->subsystem) + 1;
1068   nlen = strlen (name) + 1;
1069   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1070   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1071     {
1072       GNUNET_break (0);
1073       return;
1074     }
1075   ai = h->action_head;
1076   while (ai != NULL)
1077     {
1078       if ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
1079            (0 == strcmp (ai->name, name)) &&
1080            ( (ai->type == ACTION_UPDATE) ||
1081              (ai->type == ACTION_SET) ) )
1082         {
1083           if (ai->type == ACTION_SET)
1084             {
1085               if (type == ACTION_UPDATE)
1086                 {
1087                   delta = (int64_t) value;
1088                   if (delta > 0) 
1089                     {
1090                       ai->value += delta;
1091                     }
1092                   else
1093                     {
1094                       if (ai->value < -delta)
1095                         ai->value = 0;
1096                       else
1097                         ai->value += delta;
1098                     }
1099                 }
1100               else
1101                 {
1102                   ai->value = value;
1103                 }
1104             }
1105           else
1106             {
1107               if (type == ACTION_UPDATE)
1108                 {
1109                   delta = (int64_t) value;
1110                   ai->value += delta;
1111                 }
1112               else
1113                 {
1114                   ai->value = value;
1115                   ai->type = type;
1116                 }
1117             }
1118           ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1119           ai->make_persistent = make_persistent;
1120           return;
1121         }
1122       ai = ai->next;
1123     }
1124   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1125   ai->sh = h;
1126   ai->subsystem = GNUNET_strdup (h->subsystem);
1127   ai->name = GNUNET_strdup (name);
1128   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1129   ai->make_persistent = make_persistent;
1130   ai->msize = nsize;
1131   ai->value = value;
1132   ai->type = type;
1133   insert_ai (h, ai);
1134 }
1135
1136
1137 /**
1138  * Set statistic value for the peer.  Will always use our
1139  * subsystem (the argument used when "handle" was created).
1140  *
1141  * @param handle identification of the statistics service
1142  * @param name name of the statistic value
1143  * @param value new value to set
1144  * @param make_persistent should the value be kept across restarts?
1145  */
1146 void
1147 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1148                        const char *name,
1149                        uint64_t value, int make_persistent)
1150 {
1151   if (handle == NULL)
1152     return;
1153   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1154   add_setter_action (handle, name, make_persistent, value, ACTION_SET);
1155 }
1156
1157
1158 /**
1159  * Set statistic value for the peer.  Will always use our
1160  * subsystem (the argument used when "handle" was created).
1161  *
1162  * @param handle identification of the statistics service
1163  * @param name name of the statistic value
1164  * @param delta change in value (added to existing value)
1165  * @param make_persistent should the value be kept across restarts?
1166  */
1167 void
1168 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1169                           const char *name,
1170                           int64_t delta, int make_persistent)
1171 {
1172   if (handle == NULL)
1173     return;
1174   if (delta == 0)
1175     return;
1176   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1177   add_setter_action (handle, name, make_persistent,
1178                      (uint64_t) delta, ACTION_UPDATE);
1179 }
1180
1181
1182 /* end of statistics_api.c */