ef9865e0243d18a2cbbdb279fb4ee7840cf6a313
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_client_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_server_lib.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_strings_lib.h"
34 #include "statistics.h"
35
36 /**
37  * How long do we wait until a statistics request for setting
38  * a value times out?  (The update will be lost if the
39  * service does not react within this timeframe).  
40  */
41 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
42
43
44 /**
45  * Types of actions.
46  */
47 enum ActionType
48 {
49   ACTION_GET,
50   ACTION_SET,
51   ACTION_UPDATE,
52   ACTION_WATCH
53 };
54
55
56 /**
57  * Entry kept for each value we are watching.
58  */
59 struct GNUNET_STATISTICS_WatchEntry
60 {
61  
62   /**
63    * What subsystem is this action about? (never NULL)
64    */
65   char *subsystem;
66
67   /**
68    * What value is this action about? (never NULL)
69    */
70   char *name;
71
72   /**
73    * Function to call
74    */
75   GNUNET_STATISTICS_Iterator proc;
76
77   /**
78    * Closure for proc
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  * Linked list of things we still need to do.
87  */
88 struct GNUNET_STATISTICS_GetHandle
89 {
90
91   /**
92    * This is a doubly linked list.
93    */
94   struct GNUNET_STATISTICS_GetHandle *next;
95
96   /**
97    * This is a doubly linked list.
98    */
99   struct GNUNET_STATISTICS_GetHandle *prev;
100
101   /**
102    * Main statistics handle.
103    */
104   struct GNUNET_STATISTICS_Handle *sh;
105  
106   /**
107    * What subsystem is this action about? (can be NULL)
108    */
109   char *subsystem;
110
111   /**
112    * What value is this action about? (can be NULL)
113    */
114   char *name;
115
116   /**
117    * Continuation to call once action is complete.
118    */
119   GNUNET_STATISTICS_Callback cont;
120
121   /**
122    * Function to call (for GET actions only).
123    */
124   GNUNET_STATISTICS_Iterator proc;
125
126   /**
127    * Closure for proc and cont.
128    */
129   void *cls;
130
131   /**
132    * Timeout for this action.
133    */
134   struct GNUNET_TIME_Absolute timeout;
135
136   /**
137    * Associated value.
138    */
139   uint64_t value;
140
141   /**
142    * Flag for SET/UPDATE actions.
143    */
144   int make_persistent;
145
146   /**
147    * Has the current iteration been aborted; for GET actions.
148    */
149   int aborted;
150
151   /**
152    * Is this a GET, SET, UPDATE or WATCH?
153    */
154   enum ActionType type;
155
156   /**
157    * Size of the message that we will be transmitting.
158    */
159   uint16_t msize;
160
161 };
162
163
164 /**
165  * Handle for the service.
166  */
167 struct GNUNET_STATISTICS_Handle
168 {
169   /**
170    * Our scheduler.
171    */
172   struct GNUNET_SCHEDULER_Handle *sched;
173
174   /**
175    * Name of our subsystem.
176    */
177   char *subsystem;
178
179   /**
180    * Configuration to use.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Socket (if available).
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Currently pending transmission request.
191    */
192   struct GNUNET_CLIENT_TransmitHandle *th;
193
194   /**
195    * Head of the linked list of pending actions (first action
196    * to be performed).
197    */
198   struct GNUNET_STATISTICS_GetHandle *action_head;
199
200   /**
201    * Tail of the linked list of actions (for fast append).
202    */
203   struct GNUNET_STATISTICS_GetHandle *action_tail;
204
205   /**
206    * Action we are currently busy with (action request has been
207    * transmitted, we're now receiving the response from the
208    * service).
209    */
210   struct GNUNET_STATISTICS_GetHandle *current;
211
212   /**
213    * Array of watch entries.
214    */
215   struct GNUNET_STATISTICS_WatchEntry **watches;
216
217   /**
218    * Task doing exponential back-off trying to reconnect.
219    */
220   GNUNET_SCHEDULER_TaskIdentifier backoff_task;
221
222   /**
223    * Time for next connect retry.
224    */
225   struct GNUNET_TIME_Relative backoff;
226
227   /**
228    * Size of the 'watches' array.
229    */
230   unsigned int watches_size;
231
232   /**
233    * Should this handle auto-destruct once all actions have
234    * been processed?
235    */
236   int do_destroy;
237
238   /**
239    * Are we currently receiving from the service?
240    */
241   int receiving;
242
243 };
244
245
246
247 /**
248  * Schedule the next action to be performed.
249  */
250 static void schedule_action (struct GNUNET_STATISTICS_Handle *h);
251
252 /**
253  * Try to (re)connect to the statistics service.
254  *
255  * @return GNUNET_YES on success, GNUNET_NO on failure.
256  */
257 static int
258 try_connect (struct GNUNET_STATISTICS_Handle *ret);
259
260
261 static void
262 insert_ai (struct GNUNET_STATISTICS_Handle *h, struct GNUNET_STATISTICS_GetHandle *ai)
263 {
264   GNUNET_CONTAINER_DLL_insert_after (h->action_head,
265                                      h->action_tail,
266                                      h->action_tail,
267                                      ai);                                    
268   if (h->action_head == ai)
269     schedule_action (h);
270 }
271
272
273 static void
274 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
275                         struct GNUNET_STATISTICS_WatchEntry *watch)
276 {
277
278   struct GNUNET_STATISTICS_GetHandle *ai;
279   size_t slen;
280   size_t nlen;
281   size_t nsize;
282   
283   GNUNET_assert (h != NULL);
284   if (GNUNET_YES != try_connect (h))
285     {
286       schedule_action (h);
287       return;
288     }
289   slen = strlen (watch->subsystem) + 1;
290   nlen = strlen (watch->name) + 1;
291   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
292   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
293     {
294       GNUNET_break (0);
295       return;
296     }
297   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
298   ai->sh = h;
299   ai->subsystem = GNUNET_strdup (watch->subsystem);
300   ai->name = GNUNET_strdup (watch->name);
301   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
302   ai->msize = nsize;
303   ai->type = ACTION_WATCH;
304   ai->proc = watch->proc;
305   ai->cls = watch->proc_cls;
306   insert_ai (h, ai);
307 }
308
309
310 /**
311  * Try to (re)connect to the statistics service.
312  *
313  * @return GNUNET_YES on success, GNUNET_NO on failure.
314  */
315 static int
316 try_connect (struct GNUNET_STATISTICS_Handle *ret)
317 {
318   unsigned int i;
319   if (ret->client != NULL)
320     return GNUNET_YES;
321   ret->client = GNUNET_CLIENT_connect (ret->sched, "statistics", ret->cfg);
322   if (ret->client != NULL)
323     {
324       for (i=0;i<ret->watches_size;i++)
325         schedule_watch_request (ret, ret->watches[i]);
326       return GNUNET_YES;
327     }
328 #if DEBUG_STATISTICS
329   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
330               _("Failed to connect to statistics service!\n"));
331 #endif
332   return GNUNET_NO;
333 }
334
335
336 /**
337  * Free memory associated with the given action item.
338  */
339 static void
340 free_action_item (struct GNUNET_STATISTICS_GetHandle *ai)
341 {
342   GNUNET_free_non_null (ai->subsystem);
343   GNUNET_free_non_null (ai->name);
344   GNUNET_free (ai);
345 }
346
347
348 /**
349  * GET processing is complete, tell client about it.
350  */
351 static void
352 finish (struct GNUNET_STATISTICS_Handle *h, int code)
353 {
354   struct GNUNET_STATISTICS_GetHandle *pos = h->current;
355   h->current = NULL;
356   schedule_action (h);
357   if (pos != NULL)
358     {
359       if (pos->cont != NULL)
360         pos->cont (pos->cls, code);
361       free_action_item (pos);
362     }
363 }
364
365
366 /**
367  * Process the message.
368  *
369  * @return GNUNET_OK if the message was well-formed
370  */
371 static int
372 process_message (struct GNUNET_STATISTICS_Handle *h,
373                  const struct GNUNET_MessageHeader *msg)
374 {
375   char *service;
376   char *name;
377   const struct GNUNET_STATISTICS_ReplyMessage *smsg;
378   uint16_t size;
379
380   if (h->current->aborted)
381     {
382 #if DEBUG_STATISTICS
383       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
384                   "Iteration was aborted, ignoring VALUE\n");
385 #endif      
386       return GNUNET_OK;           /* don't bother */
387     }
388   size = ntohs (msg->size);
389   if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
390     {
391       GNUNET_break (0);
392       return GNUNET_SYSERR;
393     }
394   smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
395   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
396   if (size != GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1],
397                                               size, 2, &service, &name))
398     {
399       GNUNET_break (0);
400       return GNUNET_SYSERR;
401     }
402 #if DEBUG_STATISTICS
403   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404               "Received valid statistic on `%s:%s': %llu\n",
405               service, name, GNUNET_ntohll (smsg->value));
406 #endif
407   if (GNUNET_OK !=
408       h->current->proc (h->current->cls,
409                         service,
410                         name,
411                         GNUNET_ntohll (smsg->value),
412                         0 !=
413                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
414     {
415 #if DEBUG_STATISTICS
416       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
417                   "Processing of remaining statistics aborted by client.\n");
418 #endif
419       h->current->aborted = GNUNET_YES;    
420     }
421 #if DEBUG_STATISTICS
422   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
423               "VALUE processed successfully\n");
424 #endif      
425   return GNUNET_OK;
426 }
427
428
429 static int
430 process_watch_value (struct GNUNET_STATISTICS_Handle *h,
431                      const struct GNUNET_MessageHeader *msg)
432 {
433   const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
434   struct GNUNET_STATISTICS_WatchEntry *w;
435   uint32_t wid;
436
437   if (sizeof(struct GNUNET_STATISTICS_WatchValueMessage) !=
438       ntohs (msg->size))
439     {
440       GNUNET_break (0);
441       return GNUNET_SYSERR;
442     }
443   wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *)msg;
444   GNUNET_break (0 == ntohl (wvm->reserved));
445   wid = ntohl (wvm->wid);
446   if (wid >= h->watches_size)
447     {
448       GNUNET_break (0);
449       return GNUNET_SYSERR;
450     }
451   w = h->watches[wid];
452   (void) w->proc (w->proc_cls,
453                   w->subsystem,
454                   w->name,
455                   GNUNET_ntohll (wvm->value),
456                   0 !=
457                   (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
458   return GNUNET_OK;
459 }
460
461
462 /**
463  * Function called with messages from stats service.
464  *
465  * @param cls closure
466  * @param msg message received, NULL on timeout or fatal error
467  */
468 static void
469 receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
470 {
471   struct GNUNET_STATISTICS_Handle *h = cls;
472
473   if (msg == NULL)
474     {
475       if (NULL != h->client)
476         {
477           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
478           h->client = NULL;
479         }
480 #if DEBUG_STATISTICS
481       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
482                   "Error receiving statistics from service, is the service running?\n" );
483 #endif
484       finish (h, GNUNET_SYSERR);
485       return;
486     }
487   switch (ntohs (msg->type))
488     {
489     case GNUNET_MESSAGE_TYPE_STATISTICS_END:
490 #if DEBUG_STATISTICS
491       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
492                   "Received end of statistics marker\n");
493 #endif
494       h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
495       if (h->watches_size > 0)
496         {
497           GNUNET_CLIENT_receive (h->client,
498                                  &receive_stats,
499                                  h,
500                                  GNUNET_TIME_UNIT_FOREVER_REL);
501         }
502       else
503         {
504           h->receiving = GNUNET_NO;
505         }
506       finish (h, GNUNET_OK);
507       return;
508     case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
509       if (GNUNET_OK == process_message (h, msg))
510         {
511           /* finally, look for more! */
512 #if DEBUG_STATISTICS
513           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
514                       "Processing VALUE done, now reading more\n");
515 #endif      
516           GNUNET_CLIENT_receive (h->client,
517                                  &receive_stats,
518                                  h,
519                                  GNUNET_TIME_absolute_get_remaining
520                                  (h->current->timeout));
521           h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
522           return;
523         }
524       GNUNET_break (0);
525       break;
526     case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
527       if (GNUNET_OK ==
528           process_watch_value (h, 
529                                msg))
530         {
531           h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
532           GNUNET_assert (h->watches_size > 0);
533           GNUNET_CLIENT_receive (h->client,
534                                  &receive_stats,
535                                  h,
536                                  GNUNET_TIME_UNIT_FOREVER_REL);
537           return;
538         }
539       GNUNET_break (0);
540       break;
541     default:
542       GNUNET_break (0);
543       break;
544     }
545   if (NULL != h->client)
546     {
547       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
548       h->client = NULL;
549     }
550   finish (h, GNUNET_SYSERR);
551 }
552
553
554 /**
555  * Transmit a GET request (and if successful, start to receive
556  * the response).
557  */
558 static size_t
559 transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
560 {
561   struct GNUNET_MessageHeader *hdr;
562   size_t slen1;
563   size_t slen2;
564   uint16_t msize;
565
566   if (buf == NULL)
567     {
568       /* timeout / error */
569 #if DEBUG_STATISTICS
570       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
571                   "Transmission of request for statistics failed!\n");
572 #endif
573       finish (handle, GNUNET_SYSERR);
574       return 0;
575     }
576   slen1 = strlen (handle->current->subsystem) + 1;
577   slen2 = strlen (handle->current->name) + 1;
578   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
579   GNUNET_assert (msize <= size);
580   hdr = (struct GNUNET_MessageHeader *) buf;
581   hdr->size = htons (msize);
582   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
583   GNUNET_assert (slen1 + slen2 ==
584                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
585                                              slen1 + slen2,
586                                              2,
587                                              handle->current->subsystem,
588                                              handle->current->name));
589   if (! handle->receiving)
590     {
591 #if DEBUG_STATISTICS
592       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
593                   "Transmission of GET done, now reading response\n");
594 #endif      
595       handle->receiving = GNUNET_YES;
596       GNUNET_CLIENT_receive (handle->client,
597                              &receive_stats,
598                              handle,
599                              GNUNET_TIME_absolute_get_remaining (handle->
600                                                                  current->timeout));
601     }
602   return msize;
603 }
604
605
606 /**
607  * Transmit a WATCH request (and if successful, start to receive
608  * the response).
609  */
610 static size_t
611 transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
612 {
613   struct GNUNET_MessageHeader *hdr;
614   size_t slen1;
615   size_t slen2;
616   uint16_t msize;
617
618   if (buf == NULL)
619     {
620       /* timeout / error */
621 #if DEBUG_STATISTICS
622       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
623                   "Transmission of request for statistics failed!\n");
624 #endif
625       finish (handle, GNUNET_SYSERR);
626       return 0;
627     }
628   slen1 = strlen (handle->current->subsystem) + 1;
629   slen2 = strlen (handle->current->name) + 1;
630   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
631   GNUNET_assert (msize <= size);
632   hdr = (struct GNUNET_MessageHeader *) buf;
633   hdr->size = htons (msize);
634   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
635   GNUNET_assert (slen1 + slen2 ==
636                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1],
637                                              slen1 + slen2,
638                                              2,
639                                              handle->current->subsystem,
640                                              handle->current->name));
641   if (! handle->receiving)
642     {
643       handle->receiving = GNUNET_YES;
644       GNUNET_CLIENT_receive (handle->client,
645                              &receive_stats,
646                              handle,
647                              GNUNET_TIME_UNIT_FOREVER_REL);
648     }
649   return msize;
650 }
651
652
653 /**
654  * Transmit a SET/UPDATE request.
655  */
656 static size_t
657 transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
658 {
659   struct GNUNET_STATISTICS_SetMessage *r;
660   size_t slen;
661   size_t nlen;
662   size_t nsize;
663
664   if (NULL == buf)
665     {
666       finish (handle, GNUNET_SYSERR);
667       return 0;
668     }
669
670   slen = strlen (handle->current->subsystem) + 1;
671   nlen = strlen (handle->current->name) + 1;
672   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
673   if (size < nsize)
674     {
675       GNUNET_break (0);
676       finish (handle, GNUNET_SYSERR);
677       return 0;
678     }
679   r = buf;
680   r->header.size = htons (nsize);
681   r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
682   r->flags = 0;
683   r->value = GNUNET_htonll (handle->current->value);
684   if (handle->current->make_persistent)
685     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
686   if (handle->current->type == ACTION_UPDATE)
687     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
688   GNUNET_assert (slen + nlen ==
689                  GNUNET_STRINGS_buffer_fill ((char *) &r[1],
690                                              slen + nlen,
691                                              2,
692                                              handle->current->subsystem,
693                                              handle->current->name));
694   finish (handle, GNUNET_OK);
695   return nsize;
696 }
697
698
699 static size_t
700 transmit_action (void *cls, size_t size, void *buf)
701 {
702   struct GNUNET_STATISTICS_Handle *handle = cls;
703   size_t ret;
704
705   handle->th = NULL;
706   switch (handle->current->type)
707     {
708     case ACTION_GET:
709       ret = transmit_get (handle, size, buf);
710       break;
711     case ACTION_SET:
712     case ACTION_UPDATE:
713       ret = transmit_set (handle, size, buf);
714       break;
715     case ACTION_WATCH:
716       ret = transmit_watch (handle, size, buf);
717       break;
718     default:
719       ret = 0;
720       GNUNET_break (0);
721       break; 
722     }
723   return ret;
724 }
725
726
727 /**
728  * Get handle for the statistics service.
729  *
730  * @param sched scheduler to use
731  * @param subsystem name of subsystem using the service
732  * @param cfg services configuration in use
733  * @return handle to use
734  */
735 struct GNUNET_STATISTICS_Handle *
736 GNUNET_STATISTICS_create (struct GNUNET_SCHEDULER_Handle *sched,
737                           const char *subsystem,
738                           const struct GNUNET_CONFIGURATION_Handle *cfg)
739 {
740   struct GNUNET_STATISTICS_Handle *ret;
741
742   GNUNET_assert (subsystem != NULL);
743   GNUNET_assert (sched != NULL);
744   GNUNET_assert (cfg != NULL);
745   ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
746   ret->sched = sched;
747   ret->cfg = cfg;
748   ret->subsystem = GNUNET_strdup (subsystem);
749   ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
750   try_connect (ret);
751   return ret;
752 }
753
754
755 /**
756  * Destroy a handle (free all state associated with
757  * it).
758  *
759  * @param h statistics handle to destroy
760  * @param sync_first set to GNUNET_YES if pending SET requests should
761  *        be completed
762  */
763 void
764 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h,
765                            int sync_first)
766 {
767   struct GNUNET_STATISTICS_GetHandle *pos;
768   struct GNUNET_STATISTICS_GetHandle *next;
769   struct GNUNET_STATISTICS_GetHandle *prev;
770   struct GNUNET_TIME_Relative timeout;
771   int i;
772
773   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
774     GNUNET_SCHEDULER_cancel (h->sched,
775                              h->backoff_task);
776   if (sync_first)
777     {
778       if (h->current != NULL)
779         {
780           if (h->current->type == ACTION_GET)
781             {
782               GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
783               h->th = NULL;
784               free_action_item (h->current);
785               h->current = NULL;
786             }
787         }
788       pos = h->action_head;
789       prev = NULL;
790       while (pos != NULL)
791         {
792           next = pos->next;
793           if (pos->type == ACTION_GET)
794             {
795               if (prev == NULL)
796                 h->action_head = next;
797               else
798                 prev->next = next;
799               free_action_item (pos);
800             }
801           else
802             {
803               prev = pos;
804             }
805           pos = next;
806         }
807       h->action_tail = prev;
808       if (h->current == NULL)
809         {
810           h->current = h->action_head;
811           if (h->action_head != NULL)
812             {
813               h->action_head = h->action_head->next;
814               if (h->action_head == NULL)
815                 h->action_tail = NULL;
816             }
817         }
818       h->do_destroy = GNUNET_YES;
819       if ( (h->current != NULL) &&
820            (h->th == NULL) )
821         {                                       
822           timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
823           h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
824                                                        h->current->msize,
825                                                        timeout,
826                                                        GNUNET_YES,
827                                                        &transmit_action, h);
828           GNUNET_assert (NULL != h->th);
829         }
830       if (h->th != NULL)
831         return;
832     }
833   if (NULL != h->th)
834     {
835       GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
836       h->th = NULL;
837     }
838   if (h->current != NULL)
839     free_action_item (h->current);
840   while (NULL != (pos = h->action_head))
841     {
842       h->action_head = pos->next;
843       free_action_item (pos);
844     }
845   if (h->client != NULL)
846     {
847       GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
848       h->client = NULL;
849     }
850   for (i=0;i<h->watches_size;i++)
851     {
852       GNUNET_free (h->watches[i]->subsystem);
853       GNUNET_free (h->watches[i]->name);
854       GNUNET_free (h->watches[i]);
855     }
856   GNUNET_array_grow (h->watches,
857                      h->watches_size,
858                      0);
859   GNUNET_free (h->subsystem);
860   GNUNET_free (h);
861 }
862
863
864 static void
865 finish_task (void *cls,
866              const struct GNUNET_SCHEDULER_TaskContext *tc)
867 {
868   struct GNUNET_STATISTICS_Handle *h = cls;
869
870   h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
871   finish (h, GNUNET_SYSERR);
872 }
873
874
875 /**
876  * Schedule the next action to be performed.
877  */
878 static void
879 schedule_action (struct GNUNET_STATISTICS_Handle *h)
880 {
881   struct GNUNET_TIME_Relative timeout;
882
883   if (h->current != NULL)
884     return;                     /* action already pending */
885   if (GNUNET_YES != try_connect (h))
886     {
887       h->backoff_task = GNUNET_SCHEDULER_add_delayed (h->sched,
888                                                       h->backoff,
889                                                       &finish_task,
890                                                       h);
891       h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2);
892       h->backoff = GNUNET_TIME_relative_min (h->backoff,
893                                              GNUNET_CONSTANTS_SERVICE_TIMEOUT);
894       return;
895     }
896
897   /* schedule next action */
898   h->current = h->action_head;
899   if (NULL == h->current)
900     {
901       if (h->do_destroy)
902         {
903           h->do_destroy = GNUNET_NO;
904           GNUNET_STATISTICS_destroy (h, GNUNET_YES);
905         }
906       return;
907     }
908   GNUNET_CONTAINER_DLL_remove (h->action_head,
909                                h->action_tail,
910                                h->current);
911   timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
912   if (NULL ==
913       (h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
914                                                     h->current->msize,
915                                                     timeout,
916                                                     GNUNET_YES,
917                                                     &transmit_action, h)))
918     {
919 #if DEBUG_STATISTICS
920       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
921                   "Failed to transmit request to statistics service.\n");
922 #endif
923       finish (h, GNUNET_SYSERR);
924     }
925 }
926
927 /**
928  * Get statistic from the peer.
929  *
930  * @param handle identification of the statistics service
931  * @param subsystem limit to the specified subsystem, NULL for our subsystem
932  * @param name name of the statistic value, NULL for all values
933  * @param timeout after how long should we give up (and call
934  *        cont with an error code)?
935  * @param cont continuation to call when done (can be NULL)
936  * @param proc function to call on each value
937  * @param cls closure for cont and proc
938  * @return NULL on error
939  */
940 struct GNUNET_STATISTICS_GetHandle *
941 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
942                        const char *subsystem,
943                        const char *name,
944                        struct GNUNET_TIME_Relative timeout,
945                        GNUNET_STATISTICS_Callback cont,
946                        GNUNET_STATISTICS_Iterator proc, void *cls)
947 {
948   size_t slen1;
949   size_t slen2;
950   struct GNUNET_STATISTICS_GetHandle *ai;
951
952   GNUNET_assert (handle != NULL);
953   GNUNET_assert (proc != NULL);
954   GNUNET_assert (GNUNET_NO == handle->do_destroy);
955   if (GNUNET_YES != try_connect (handle))
956     {
957 #if DEBUG_STATISTICS
958       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
959                   "Failed to connect to statistics service, can not get value `%s:%s'.\n",
960                   strlen (subsystem) ? subsystem : "*",
961                   strlen (name) ? name : "*");
962 #endif
963       return NULL;
964     }
965   if (subsystem == NULL)
966     subsystem = "";
967   if (name == NULL)
968     name = "";
969   slen1 = strlen (subsystem) + 1;
970   slen2 = strlen (name) + 1;
971   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
972                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
973   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
974   ai->sh = handle;
975   ai->subsystem = GNUNET_strdup (subsystem);
976   ai->name = GNUNET_strdup (name);
977   ai->cont = cont;
978   ai->proc = proc;
979   ai->cls = cls;
980   ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
981   ai->type = ACTION_GET;
982   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
983   insert_ai (handle, ai);
984   return ai;
985 }
986
987
988 /**
989  * Cancel a 'get' request.  Must be called before the 'cont' 
990  * function is called.
991  *
992  * @param gh handle of the request to cancel
993  */
994 void
995 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
996 {
997   if (gh->sh->current == gh)
998     {
999       gh->aborted = GNUNET_YES;
1000     }
1001   else
1002     {
1003       GNUNET_CONTAINER_DLL_remove (gh->sh->action_head,
1004                                    gh->sh->action_tail,
1005                                    gh);
1006       GNUNET_free (gh->name);
1007       GNUNET_free (gh->subsystem);
1008       GNUNET_free (gh);
1009     }
1010 }
1011
1012
1013 /**
1014  * Watch statistics from the peer (be notified whenever they change).
1015  * Note that the only way to cancel a "watch" request is to destroy
1016  * the statistics handle given as the first argument to this call.
1017  *
1018  * @param handle identification of the statistics service
1019  * @param subsystem limit to the specified subsystem, never NULL
1020  * @param name name of the statistic value, never NULL
1021  * @param proc function to call on each value
1022  * @param proc_cls closure for proc
1023  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1024  */
1025 int
1026 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1027                          const char *subsystem,
1028                          const char *name,
1029                          GNUNET_STATISTICS_Iterator proc, 
1030                          void *proc_cls)
1031 {
1032   struct GNUNET_STATISTICS_WatchEntry *w;
1033
1034   w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
1035   w->subsystem = GNUNET_strdup (subsystem);
1036   w->name = GNUNET_strdup (name);
1037   w->proc = proc;
1038   w->proc_cls = proc_cls;
1039   GNUNET_array_append (handle->watches,
1040                        handle->watches_size,
1041                        w);
1042   schedule_watch_request (handle, w);
1043   return GNUNET_OK;
1044 }
1045
1046
1047 static void
1048 add_setter_action (struct GNUNET_STATISTICS_Handle *h,
1049                    const char *name,
1050                    int make_persistent,
1051                    uint64_t value, enum ActionType type)
1052 {
1053   struct GNUNET_STATISTICS_GetHandle *ai;
1054   size_t slen;
1055   size_t nlen;
1056   size_t nsize;
1057   int64_t delta;
1058   
1059   GNUNET_assert (h != NULL);
1060   GNUNET_assert (name != NULL);
1061   if (GNUNET_YES != try_connect (h))
1062     return;
1063   slen = strlen (h->subsystem) + 1;
1064   nlen = strlen (name) + 1;
1065   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1066   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1067     {
1068       GNUNET_break (0);
1069       return;
1070     }
1071   ai = h->action_head;
1072   while (ai != NULL)
1073     {
1074       if ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
1075            (0 == strcmp (ai->name, name)) &&
1076            ( (ai->type == ACTION_UPDATE) ||
1077              (ai->type == ACTION_SET) ) )
1078         {
1079           if (ai->type == ACTION_SET)
1080             {
1081               if (type == ACTION_UPDATE)
1082                 {
1083                   delta = (int64_t) value;
1084                   if (delta > 0) 
1085                     {
1086                       ai->value += delta;
1087                     }
1088                   else
1089                     {
1090                       if (ai->value < -delta)
1091                         ai->value = 0;
1092                       else
1093                         ai->value += delta;
1094                     }
1095                 }
1096               else
1097                 {
1098                   ai->value = value;
1099                 }
1100             }
1101           else
1102             {
1103               if (type == ACTION_UPDATE)
1104                 {
1105                   delta = (int64_t) value;
1106                   ai->value += delta;
1107                 }
1108               else
1109                 {
1110                   ai->value = value;
1111                   ai->type = type;
1112                 }
1113             }
1114           ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1115           ai->make_persistent = make_persistent;
1116           return;
1117         }
1118       ai = ai->next;
1119     }
1120   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1121   ai->sh = h;
1122   ai->subsystem = GNUNET_strdup (h->subsystem);
1123   ai->name = GNUNET_strdup (name);
1124   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1125   ai->make_persistent = make_persistent;
1126   ai->msize = nsize;
1127   ai->value = value;
1128   ai->type = type;
1129   insert_ai (h, ai);
1130 }
1131
1132
1133 /**
1134  * Set statistic value for the peer.  Will always use our
1135  * subsystem (the argument used when "handle" was created).
1136  *
1137  * @param handle identification of the statistics service
1138  * @param name name of the statistic value
1139  * @param value new value to set
1140  * @param make_persistent should the value be kept across restarts?
1141  */
1142 void
1143 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1144                        const char *name,
1145                        uint64_t value, int make_persistent)
1146 {
1147   if (handle == NULL)
1148     return;
1149   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1150   add_setter_action (handle, name, make_persistent, value, ACTION_SET);
1151 }
1152
1153
1154 /**
1155  * Set statistic value for the peer.  Will always use our
1156  * subsystem (the argument used when "handle" was created).
1157  *
1158  * @param handle identification of the statistics service
1159  * @param name name of the statistic value
1160  * @param delta change in value (added to existing value)
1161  * @param make_persistent should the value be kept across restarts?
1162  */
1163 void
1164 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1165                           const char *name,
1166                           int64_t delta, int make_persistent)
1167 {
1168   if (handle == NULL)
1169     return;
1170   if (delta == 0)
1171     return;
1172   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1173   add_setter_action (handle, name, make_persistent,
1174                      (uint64_t) delta, ACTION_UPDATE);
1175 }
1176
1177
1178 /* end of statistics_api.c */