-fix order
[oweals/gnunet.git] / src / statistics / statistics_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file statistics/statistics_api.c
23  * @brief API of the statistics service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_client_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_server_lib.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_strings_lib.h"
34 #include "statistics.h"
35
36 /**
37  * How long do we wait until a statistics request for setting
38  * a value times out?  (The update will be lost if the
39  * service does not react within this timeframe).
40  */
41 #define SET_TRANSMIT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2)
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "statistics-api",__VA_ARGS__)
44
45 /**
46  * Types of actions.
47  */
48 enum ActionType
49 {
50   /**
51    * Get a value.
52    */
53   ACTION_GET,
54
55   /**
56    * Set a value.
57    */
58   ACTION_SET,
59
60   /**
61    * Update a value.
62    */
63   ACTION_UPDATE,
64
65   /**
66    * Watch a value.
67    */
68   ACTION_WATCH
69 };
70
71
72 /**
73  * Entry kept for each value we are watching.
74  */
75 struct GNUNET_STATISTICS_WatchEntry
76 {
77
78   /**
79    * What subsystem is this action about? (never NULL)
80    */
81   char *subsystem;
82
83   /**
84    * What value is this action about? (never NULL)
85    */
86   char *name;
87
88   /**
89    * Function to call
90    */
91   GNUNET_STATISTICS_Iterator proc;
92
93   /**
94    * Closure for proc
95    */
96   void *proc_cls;
97
98 };
99
100
101 /**
102  * Linked list of things we still need to do.
103  */
104 struct GNUNET_STATISTICS_GetHandle
105 {
106
107   /**
108    * This is a doubly linked list.
109    */
110   struct GNUNET_STATISTICS_GetHandle *next;
111
112   /**
113    * This is a doubly linked list.
114    */
115   struct GNUNET_STATISTICS_GetHandle *prev;
116
117   /**
118    * Main statistics handle.
119    */
120   struct GNUNET_STATISTICS_Handle *sh;
121
122   /**
123    * What subsystem is this action about? (can be NULL)
124    */
125   char *subsystem;
126
127   /**
128    * What value is this action about? (can be NULL)
129    */
130   char *name;
131
132   /**
133    * Continuation to call once action is complete.
134    */
135   GNUNET_STATISTICS_Callback cont;
136
137   /**
138    * Function to call (for GET actions only).
139    */
140   GNUNET_STATISTICS_Iterator proc;
141
142   /**
143    * Closure for proc and cont.
144    */
145   void *cls;
146
147   /**
148    * Timeout for this action.
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Associated value.
154    */
155   uint64_t value;
156
157   /**
158    * Flag for SET/UPDATE actions.
159    */
160   int make_persistent;
161
162   /**
163    * Has the current iteration been aborted; for GET actions.
164    */
165   int aborted;
166
167   /**
168    * Is this a GET, SET, UPDATE or WATCH?
169    */
170   enum ActionType type;
171
172   /**
173    * Size of the message that we will be transmitting.
174    */
175   uint16_t msize;
176
177 };
178
179
180 /**
181  * Handle for the service.
182  */
183 struct GNUNET_STATISTICS_Handle
184 {
185   /**
186    * Name of our subsystem.
187    */
188   char *subsystem;
189
190   /**
191    * Configuration to use.
192    */
193   const struct GNUNET_CONFIGURATION_Handle *cfg;
194
195   /**
196    * Socket (if available).
197    */
198   struct GNUNET_CLIENT_Connection *client;
199
200   /**
201    * Currently pending transmission request.
202    */
203   struct GNUNET_CLIENT_TransmitHandle *th;
204
205   /**
206    * Head of the linked list of pending actions (first action
207    * to be performed).
208    */
209   struct GNUNET_STATISTICS_GetHandle *action_head;
210
211   /**
212    * Tail of the linked list of actions (for fast append).
213    */
214   struct GNUNET_STATISTICS_GetHandle *action_tail;
215
216   /**
217    * Action we are currently busy with (action request has been
218    * transmitted, we're now receiving the response from the
219    * service).
220    */
221   struct GNUNET_STATISTICS_GetHandle *current;
222
223   /**
224    * Array of watch entries.
225    */
226   struct GNUNET_STATISTICS_WatchEntry **watches;
227
228   /**
229    * Task doing exponential back-off trying to reconnect.
230    */
231   GNUNET_SCHEDULER_TaskIdentifier backoff_task;
232
233   /**
234    * Time for next connect retry.
235    */
236   struct GNUNET_TIME_Relative backoff;
237
238   /**
239    * Size of the 'watches' array.
240    */
241   unsigned int watches_size;
242
243   /**
244    * Should this handle auto-destruct once all actions have
245    * been processed?
246    */
247   int do_destroy;
248
249   /**
250    * Are we currently receiving from the service?
251    */
252   int receiving;
253
254 };
255
256
257 /**
258  * Schedule the next action to be performed.
259  *
260  * @param h statistics handle to reconnect
261  */
262 static void
263 schedule_action (struct GNUNET_STATISTICS_Handle *h);
264
265
266 /**
267  * Transmit request to service that we want to watch
268  * the development of a particular value.
269  *
270  * @param h statistics handle
271  * @param watch watch entry of the value to watch
272  */
273 static void
274 schedule_watch_request (struct GNUNET_STATISTICS_Handle *h,
275                         struct GNUNET_STATISTICS_WatchEntry *watch)
276 {
277
278   struct GNUNET_STATISTICS_GetHandle *ai;
279   size_t slen;
280   size_t nlen;
281   size_t nsize;
282
283   GNUNET_assert (h != NULL);
284   slen = strlen (watch->subsystem) + 1;
285   nlen = strlen (watch->name) + 1;
286   nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen;
287   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
288   {
289     GNUNET_break (0);
290     return;
291   }
292   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
293   ai->sh = h;
294   ai->subsystem = GNUNET_strdup (watch->subsystem);
295   ai->name = GNUNET_strdup (watch->name);
296   ai->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
297   ai->msize = nsize;
298   ai->type = ACTION_WATCH;
299   ai->proc = watch->proc;
300   ai->cls = watch->proc_cls;
301   GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
302                                     ai);
303   schedule_action (h);
304 }
305
306
307 /**
308  * Free memory associated with the given action item.
309  *
310  * @param gh action item to free
311  */
312 static void
313 free_action_item (struct GNUNET_STATISTICS_GetHandle *gh)
314 {
315   GNUNET_free_non_null (gh->subsystem);
316   GNUNET_free_non_null (gh->name);
317   GNUNET_free (gh);
318 }
319
320
321 /**
322  * Disconnect from the statistics service.
323  *
324  * @param h statistics handle to disconnect from
325  */
326 static void
327 do_disconnect (struct GNUNET_STATISTICS_Handle *h)
328 {
329   struct GNUNET_STATISTICS_GetHandle *c;
330   
331   if (NULL != h->th)
332   {
333     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
334     h->th = NULL;
335   } 
336   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
337   h->client = NULL;
338   h->receiving = GNUNET_NO;
339   if (NULL != (c = h->current))
340   {
341     h->current = NULL;
342     if (c->cont != NULL)
343       c->cont (c->cls, GNUNET_SYSERR);
344     free_action_item (c);
345   }
346 }
347
348
349 /**
350  * Try to (re)connect to the statistics service.
351  *
352  * @param h statistics handle to reconnect
353  * @return GNUNET_YES on success, GNUNET_NO on failure.
354  */
355 static int
356 try_connect (struct GNUNET_STATISTICS_Handle *h)
357 {
358   struct GNUNET_STATISTICS_GetHandle *gh;
359   struct GNUNET_STATISTICS_GetHandle *gn;
360   unsigned int i;
361
362   if (h->backoff_task != GNUNET_SCHEDULER_NO_TASK)
363     return GNUNET_NO;
364   if (h->client != NULL)
365     return GNUNET_YES;
366   h->client = GNUNET_CLIENT_connect ("statistics", h->cfg);  
367   if (h->client != NULL)
368   {
369     gn = h->action_head; 
370     while (NULL != (gh = gn))
371     {
372       gn = gh->next;
373       if (gh->type == ACTION_WATCH)
374       {
375         GNUNET_CONTAINER_DLL_remove (h->action_head,
376                                      h->action_tail,
377                                      gh);
378         free_action_item (gh);  
379       }
380     }
381     for (i = 0; i < h->watches_size; i++)
382       schedule_watch_request (h, h->watches[i]);
383     return GNUNET_YES;
384   }
385 #if DEBUG_STATISTICS
386   LOG (GNUNET_ERROR_TYPE_DEBUG,
387        _("Failed to connect to statistics service!\n"));
388 #endif
389   return GNUNET_NO;
390 }
391
392
393 /**
394  * We've waited long enough, reconnect now.
395  *
396  * @param cls the 'struct GNUNET_STATISTICS_Handle' to reconnect
397  * @param tc scheduler context (unused)
398  */
399 static void
400 reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
401 {
402   struct GNUNET_STATISTICS_Handle *h = cls;
403
404   h->backoff_task = GNUNET_SCHEDULER_NO_TASK;
405   schedule_action (h);
406 }
407
408
409 /**
410  * Reconnect at a later time, respecting back-off.
411  *
412  * @param h statistics handle
413  */
414 static void
415 reconnect_later (struct GNUNET_STATISTICS_Handle *h)
416 {
417   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == h->backoff_task);
418   h->backoff_task =
419     GNUNET_SCHEDULER_add_delayed (h->backoff, &reconnect_task, h);
420   h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2);
421   h->backoff =
422     GNUNET_TIME_relative_min (h->backoff, GNUNET_CONSTANTS_SERVICE_TIMEOUT);
423 }
424
425
426 /**
427  * Process a 'GNUNET_MESSAGE_TYPE_STATISTICS_VALUE' message.
428  *
429  * @param h statistics handle
430  * @param msg message received from the service, never NULL
431  * @return GNUNET_OK if the message was well-formed
432  */
433 static int
434 process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h,
435                                   const struct GNUNET_MessageHeader *msg)
436 {
437   char *service;
438   char *name;
439   const struct GNUNET_STATISTICS_ReplyMessage *smsg;
440   uint16_t size;
441
442   if (h->current->aborted)
443   {
444 #if DEBUG_STATISTICS
445     LOG (GNUNET_ERROR_TYPE_DEBUG, "Iteration was aborted, ignoring VALUE\n");
446 #endif
447     return GNUNET_OK;           /* don't bother */
448   }
449   size = ntohs (msg->size);
450   if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage))
451   {
452     GNUNET_break (0);
453     return GNUNET_SYSERR;
454   }
455   smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg;
456   size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage);
457   if (size !=
458       GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], size, 2,
459                                       &service, &name))
460   {
461     GNUNET_break (0);
462     return GNUNET_SYSERR;
463   }
464 #if DEBUG_STATISTICS
465   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received valid statistic on `%s:%s': %llu\n",
466        service, name, GNUNET_ntohll (smsg->value));
467 #endif
468   if (GNUNET_OK !=
469       h->current->proc (h->current->cls, service, name,
470                         GNUNET_ntohll (smsg->value),
471                         0 !=
472                         (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT)))
473   {
474 #if DEBUG_STATISTICS
475     LOG (GNUNET_ERROR_TYPE_DEBUG,
476          "Processing of remaining statistics aborted by client.\n");
477 #endif
478     h->current->aborted = GNUNET_YES;
479   }
480 #if DEBUG_STATISTICS
481   LOG (GNUNET_ERROR_TYPE_DEBUG, "VALUE processed successfully\n");
482 #endif
483   return GNUNET_OK;
484 }
485
486
487 /**
488  * We have received a watch value from the service.  Process it.
489  *
490  * @param h statistics handle
491  * @param msg the watch value message
492  * @return GNUNET_OK if the message was well-formed, GNUNET_SYSERR if not
493  */
494 static int
495 process_watch_value (struct GNUNET_STATISTICS_Handle *h,
496                      const struct GNUNET_MessageHeader *msg)
497 {
498   const struct GNUNET_STATISTICS_WatchValueMessage *wvm;
499   struct GNUNET_STATISTICS_WatchEntry *w;
500   uint32_t wid;
501
502   if (sizeof (struct GNUNET_STATISTICS_WatchValueMessage) != ntohs (msg->size))
503   {
504     GNUNET_break (0);
505     return GNUNET_SYSERR;
506   }
507   wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *) msg;
508   GNUNET_break (0 == ntohl (wvm->reserved));
509   wid = ntohl (wvm->wid);
510   if (wid >= h->watches_size)
511   {
512     GNUNET_break (0);
513     return GNUNET_SYSERR;
514   }
515   w = h->watches[wid];
516   (void) w->proc (w->proc_cls, w->subsystem, w->name,
517                   GNUNET_ntohll (wvm->value),
518                   0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT));
519   return GNUNET_OK;
520 }
521
522
523 /**
524  * Function called with messages from stats service.
525  *
526  * @param cls closure
527  * @param msg message received, NULL on timeout or fatal error
528  */
529 static void
530 receive_stats (void *cls, const struct GNUNET_MessageHeader *msg)
531 {
532   struct GNUNET_STATISTICS_Handle *h = cls;
533   struct GNUNET_STATISTICS_GetHandle *c;
534  
535
536   if (msg == NULL)
537   {
538 #if DEBUG_STATISTICS
539     LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
540          "Error receiving statistics from service, is the service running?\n");
541 #endif
542     do_disconnect (h);
543     reconnect_later (h);
544     return;
545   }
546   switch (ntohs (msg->type))
547   {
548   case GNUNET_MESSAGE_TYPE_STATISTICS_END:
549 #if DEBUG_STATISTICS
550     LOG (GNUNET_ERROR_TYPE_DEBUG, "Received end of statistics marker\n");
551 #endif
552     if (NULL == (c = h->current))
553     {
554       GNUNET_break (0);
555       do_disconnect (h);
556       reconnect_later (h);
557       return;
558     }
559     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
560     if (h->watches_size > 0)
561     {
562       GNUNET_CLIENT_receive (h->client, &receive_stats, h,
563                              GNUNET_TIME_UNIT_FOREVER_REL);
564     }
565     else
566     {
567       h->receiving = GNUNET_NO;
568     }    
569     h->current = NULL;
570     schedule_action (h);
571     if (c->cont != NULL)
572       c->cont (c->cls, GNUNET_OK);
573     free_action_item (c);
574     return;
575   case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE:
576     if (GNUNET_OK != process_statistics_value_message (h, msg))
577     {
578       do_disconnect (h);
579       reconnect_later (h);
580       return;     
581     }
582     /* finally, look for more! */
583 #if DEBUG_STATISTICS
584     LOG (GNUNET_ERROR_TYPE_DEBUG,
585          "Processing VALUE done, now reading more\n");
586 #endif
587     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
588                            GNUNET_TIME_absolute_get_remaining (h->
589                                                                current->timeout));
590     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
591     return;
592   case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE:
593     if (GNUNET_OK != 
594         process_watch_value (h, msg))
595     {
596       do_disconnect (h);
597       reconnect_later (h);
598       return;
599     }
600     h->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
601     GNUNET_assert (h->watches_size > 0);
602     GNUNET_CLIENT_receive (h->client, &receive_stats, h,
603                            GNUNET_TIME_UNIT_FOREVER_REL);
604     return;    
605   default:
606     GNUNET_break (0);
607     do_disconnect (h);
608     reconnect_later (h);
609     return;
610   }
611 }
612
613
614 /**
615  * Transmit a GET request (and if successful, start to receive
616  * the response).
617  *
618  * @param handle statistics handle
619  * @param size how many bytes can we write to buf
620  * @param buf where to write requests to the service
621  * @return number of bytes written to buf
622  */
623 static size_t
624 transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
625 {
626   struct GNUNET_STATISTICS_GetHandle *c;
627   struct GNUNET_MessageHeader *hdr;
628   size_t slen1;
629   size_t slen2;
630   uint16_t msize;
631
632   GNUNET_assert (NULL != (c = handle->current));
633   if (buf == NULL)
634   {
635     /* timeout / error */
636 #if DEBUG_STATISTICS
637     LOG (GNUNET_ERROR_TYPE_DEBUG,
638          "Transmission of request for statistics failed!\n");
639 #endif  
640     do_disconnect (handle);
641     reconnect_later (handle);
642     return 0;
643   }
644   slen1 = strlen (c->subsystem) + 1;
645   slen2 = strlen (c->name) + 1;
646   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
647   GNUNET_assert (msize <= size);
648   hdr = (struct GNUNET_MessageHeader *) buf;
649   hdr->size = htons (msize);
650   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET);
651   GNUNET_assert (slen1 + slen2 ==
652                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
653                                              c->subsystem,
654                                              c->name));
655   if (GNUNET_YES != handle->receiving)
656   {
657 #if DEBUG_STATISTICS
658     LOG (GNUNET_ERROR_TYPE_DEBUG,
659          "Transmission of GET done, now reading response\n");
660 #endif
661     handle->receiving = GNUNET_YES;
662     GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
663                            GNUNET_TIME_absolute_get_remaining (c->timeout));
664   }
665   return msize;
666 }
667
668
669 /**
670  * Transmit a WATCH request (and if successful, start to receive
671  * the response).
672  *
673  * @param handle statistics handle
674  * @param size how many bytes can we write to buf
675  * @param buf where to write requests to the service
676  * @return number of bytes written to buf
677  */
678 static size_t
679 transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
680 {
681   struct GNUNET_MessageHeader *hdr;
682   size_t slen1;
683   size_t slen2;
684   uint16_t msize;
685
686   if (buf == NULL)
687   {
688     /* timeout / error */
689 #if DEBUG_STATISTICS
690     LOG (GNUNET_ERROR_TYPE_DEBUG,
691          "Transmission of request for statistics failed!\n");
692 #endif
693     do_disconnect (handle);
694     reconnect_later (handle);
695     return 0;
696   }
697 #if DEBUG_STATISTICS
698   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting watch request for `%s'\n",
699        handle->current->name);
700 #endif
701   slen1 = strlen (handle->current->subsystem) + 1;
702   slen2 = strlen (handle->current->name) + 1;
703   msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
704   GNUNET_assert (msize <= size);
705   hdr = (struct GNUNET_MessageHeader *) buf;
706   hdr->size = htons (msize);
707   hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH);
708   GNUNET_assert (slen1 + slen2 ==
709                  GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2,
710                                              handle->current->subsystem,
711                                              handle->current->name));
712   if (GNUNET_YES != handle->receiving)
713   {
714     handle->receiving = GNUNET_YES;
715     GNUNET_CLIENT_receive (handle->client, &receive_stats, handle,
716                            GNUNET_TIME_UNIT_FOREVER_REL);
717   }
718   GNUNET_assert (NULL == handle->current->cont);
719   free_action_item (handle->current);
720   handle->current = NULL;
721   return msize;
722 }
723
724
725 /**
726  * Transmit a SET/UPDATE request.
727  *
728  * @param handle statistics handle
729  * @param size how many bytes can we write to buf
730  * @param buf where to write requests to the service
731  * @return number of bytes written to buf
732  */
733 static size_t
734 transmit_set (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf)
735 {
736   struct GNUNET_STATISTICS_SetMessage *r;
737   size_t slen;
738   size_t nlen;
739   size_t nsize;
740
741   if (NULL == buf)
742   {
743     do_disconnect (handle);
744     reconnect_later (handle);
745     return 0;
746   }
747   slen = strlen (handle->current->subsystem) + 1;
748   nlen = strlen (handle->current->name) + 1;
749   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
750   if (size < nsize)
751   {
752     GNUNET_break (0);
753     do_disconnect (handle);
754     reconnect_later (handle);
755     return 0;
756   }
757   r = buf;
758   r->header.size = htons (nsize);
759   r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET);
760   r->flags = 0;
761   r->value = GNUNET_htonll (handle->current->value);
762   if (handle->current->make_persistent)
763     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_PERSISTENT);
764   if (handle->current->type == ACTION_UPDATE)
765     r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE);
766   GNUNET_assert (slen + nlen ==
767                  GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2,
768                                              handle->current->subsystem,
769                                              handle->current->name));
770   GNUNET_assert (NULL == handle->current->cont);
771   free_action_item (handle->current);
772   handle->current = NULL;
773   return nsize;
774 }
775
776
777 /**
778  * Function called when we are ready to transmit a request to the service.
779  *
780  * @param cls the 'struct GNUNET_STATISTICS_Handle'
781  * @param size how many bytes can we write to buf
782  * @param buf where to write requests to the service
783  * @return number of bytes written to buf
784  */
785 static size_t
786 transmit_action (void *cls, size_t size, void *buf)
787 {
788   struct GNUNET_STATISTICS_Handle *h = cls;
789   size_t ret;
790
791   h->th = NULL;
792   ret = 0;
793   if (NULL != h->current)
794     switch (h->current->type)
795     {
796     case ACTION_GET:
797       ret = transmit_get (h, size, buf);
798       break;
799     case ACTION_SET:
800     case ACTION_UPDATE:
801       ret = transmit_set (h, size, buf);
802       break;
803     case ACTION_WATCH:
804       ret = transmit_watch (h, size, buf);
805       break;
806     default:
807       GNUNET_assert (0);
808       break;
809     }
810   schedule_action (h);
811   return ret;
812 }
813
814
815 /**
816  * Get handle for the statistics service.
817  *
818  * @param subsystem name of subsystem using the service
819  * @param cfg services configuration in use
820  * @return handle to use
821  */
822 struct GNUNET_STATISTICS_Handle *
823 GNUNET_STATISTICS_create (const char *subsystem,
824                           const struct GNUNET_CONFIGURATION_Handle *cfg)
825 {
826   struct GNUNET_STATISTICS_Handle *ret;
827
828   GNUNET_assert (subsystem != NULL);
829   GNUNET_assert (cfg != NULL);
830   ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle));
831   ret->cfg = cfg;
832   ret->subsystem = GNUNET_strdup (subsystem);
833   ret->backoff = GNUNET_TIME_UNIT_MILLISECONDS;
834   return ret;
835 }
836
837
838 /**
839  * Destroy a handle (free all state associated with
840  * it).
841  *
842  * @param h statistics handle to destroy
843  * @param sync_first set to GNUNET_YES if pending SET requests should
844  *        be completed
845  */
846 void
847 GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first)
848 {
849   struct GNUNET_STATISTICS_GetHandle *pos;
850   struct GNUNET_STATISTICS_GetHandle *next;
851   struct GNUNET_STATISTICS_GetHandle *prev;
852   struct GNUNET_TIME_Relative timeout;
853   int i;
854
855   if (h == NULL)
856     return;
857   if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task)
858     GNUNET_SCHEDULER_cancel (h->backoff_task);
859   if (sync_first)
860   {
861     if (h->current != NULL)
862     {
863       if (h->current->type == ACTION_GET)
864       {
865         GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
866         h->th = NULL;
867         free_action_item (h->current);
868         h->current = NULL;
869       }
870     }
871     pos = h->action_head;
872     prev = NULL;
873     while (pos != NULL)
874     {
875       next = pos->next;
876       if (pos->type == ACTION_GET)
877       {
878         if (prev == NULL)
879           h->action_head = next;
880         else
881           prev->next = next;
882         free_action_item (pos);
883       }
884       else
885       {
886         prev = pos;
887       }
888       pos = next;
889     }
890     h->action_tail = prev;
891     if (h->current == NULL)
892     {
893       h->current = h->action_head;
894       if (h->action_head != NULL)
895       {
896         h->action_head = h->action_head->next;
897         if (h->action_head == NULL)
898           h->action_tail = NULL;
899       }
900     }
901     h->do_destroy = GNUNET_YES;
902     if ((h->current != NULL) && (h->th == NULL))
903     {
904       timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
905       h->th =
906           GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
907                                                timeout, GNUNET_YES,
908                                                &transmit_action, h);
909       GNUNET_assert (NULL != h->th);
910     }
911     if (h->th != NULL)
912       return;
913   }
914   while (NULL != (pos = h->action_head))
915   {
916     GNUNET_CONTAINER_DLL_remove (h->action_head,
917                                  h->action_tail,
918                                  pos);
919     free_action_item (pos);
920   }
921   do_disconnect (h);
922   for (i = 0; i < h->watches_size; i++)
923   {
924     if (NULL == h->watches[i])
925       continue; 
926     GNUNET_free (h->watches[i]->subsystem);
927     GNUNET_free (h->watches[i]->name);
928     GNUNET_free (h->watches[i]);
929   }
930   GNUNET_array_grow (h->watches, h->watches_size, 0);
931   GNUNET_free (h->subsystem);
932   GNUNET_free (h);
933 }
934
935
936 /**
937  * Schedule the next action to be performed.
938  *
939  * @param h statistics handle
940  */
941 static void
942 schedule_action (struct GNUNET_STATISTICS_Handle *h)
943 {
944   struct GNUNET_TIME_Relative timeout;
945
946   if ( (h->th != NULL) ||
947        (h->backoff_task != GNUNET_SCHEDULER_NO_TASK) )
948     return;                     /* action already pending */
949   if (GNUNET_YES != try_connect (h))
950   {
951     reconnect_later (h);
952     return;
953   }
954   if (NULL != h->current)
955     return; /* action already pending */
956   /* schedule next action */
957   h->current = h->action_head;
958   if (NULL == h->current)
959   {
960     if (h->do_destroy)
961     {
962       h->do_destroy = GNUNET_NO;
963       GNUNET_STATISTICS_destroy (h, GNUNET_YES);
964     }
965     return;
966   }
967   GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current);
968   timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout);
969   if (NULL ==
970       (h->th =
971        GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize,
972                                             timeout, GNUNET_YES,
973                                             &transmit_action, h)))
974   {
975 #if DEBUG_STATISTICS
976     LOG (GNUNET_ERROR_TYPE_DEBUG,
977          "Failed to transmit request to statistics service.\n");
978 #endif
979     do_disconnect (h);
980     reconnect_later (h);
981   }
982 }
983
984
985 /**
986  * Get statistic from the peer.
987  *
988  * @param handle identification of the statistics service
989  * @param subsystem limit to the specified subsystem, NULL for our subsystem
990  * @param name name of the statistic value, NULL for all values
991  * @param timeout after how long should we give up (and call
992  *        cont with an error code)?
993  * @param cont continuation to call when done (can be NULL)
994  * @param proc function to call on each value
995  * @param cls closure for cont and proc
996  * @return NULL on error
997  */
998 struct GNUNET_STATISTICS_GetHandle *
999 GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle,
1000                        const char *subsystem, const char *name,
1001                        struct GNUNET_TIME_Relative timeout,
1002                        GNUNET_STATISTICS_Callback cont,
1003                        GNUNET_STATISTICS_Iterator proc, void *cls)
1004 {
1005   size_t slen1;
1006   size_t slen2;
1007   struct GNUNET_STATISTICS_GetHandle *ai;
1008
1009   if (NULL == handle)
1010     return NULL;
1011   GNUNET_assert (proc != NULL);
1012   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1013   if (subsystem == NULL)
1014     subsystem = "";
1015   if (name == NULL)
1016     name = "";
1017   slen1 = strlen (subsystem) + 1;
1018   slen2 = strlen (name) + 1;
1019   GNUNET_assert (slen1 + slen2 + sizeof (struct GNUNET_MessageHeader) <
1020                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1021   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1022   ai->sh = handle;
1023   ai->subsystem = GNUNET_strdup (subsystem);
1024   ai->name = GNUNET_strdup (name);
1025   ai->cont = cont;
1026   ai->proc = proc;
1027   ai->cls = cls;
1028   ai->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1029   ai->type = ACTION_GET;
1030   ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader);
1031   GNUNET_CONTAINER_DLL_insert_tail (handle->action_head, handle->action_tail,
1032                                     ai);
1033   schedule_action (handle);
1034   return ai;
1035 }
1036
1037
1038 /**
1039  * Cancel a 'get' request.  Must be called before the 'cont'
1040  * function is called.
1041  *
1042  * @param gh handle of the request to cancel
1043  */
1044 void
1045 GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh)
1046 {
1047   if (NULL == gh)
1048     return;
1049   if (gh->sh->current == gh)
1050   {
1051     gh->aborted = GNUNET_YES;
1052   }
1053   else
1054   {
1055     GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, gh->sh->action_tail, gh);
1056     GNUNET_free (gh->name);
1057     GNUNET_free (gh->subsystem);
1058     GNUNET_free (gh);
1059   }
1060 }
1061
1062
1063 /**
1064  * Watch statistics from the peer (be notified whenever they change).
1065  * Note that the only way to cancel a "watch" request is to destroy
1066  * the statistics handle given as the first argument to this call.
1067  *
1068  * @param handle identification of the statistics service
1069  * @param subsystem limit to the specified subsystem, never NULL
1070  * @param name name of the statistic value, never NULL
1071  * @param proc function to call on each value
1072  * @param proc_cls closure for proc
1073  * @return GNUNET_OK on success, GNUNET_SYSERR on error
1074  */
1075 int
1076 GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle,
1077                          const char *subsystem, const char *name,
1078                          GNUNET_STATISTICS_Iterator proc, void *proc_cls)
1079 {
1080   struct GNUNET_STATISTICS_WatchEntry *w;
1081
1082   if (handle == NULL)
1083     return GNUNET_SYSERR;
1084   w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry));
1085   w->subsystem = GNUNET_strdup (subsystem);
1086   w->name = GNUNET_strdup (name);
1087   w->proc = proc;
1088   w->proc_cls = proc_cls;
1089   GNUNET_array_append (handle->watches, handle->watches_size, w);
1090   schedule_watch_request (handle, w);
1091   return GNUNET_OK;
1092 }
1093
1094
1095 /**
1096  * Queue a request to change a statistic.
1097  *
1098  * @param h statistics handle
1099  * @param name name of the value
1100  * @param make_persistent  should the value be kept across restarts?
1101  * @param value new value or change
1102  * @param type type of the action (ACTION_SET or ACTION_UPDATE)
1103  */
1104 static void
1105 add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name,
1106                    int make_persistent, uint64_t value, enum ActionType type)
1107 {
1108   struct GNUNET_STATISTICS_GetHandle *ai;
1109   size_t slen;
1110   size_t nlen;
1111   size_t nsize;
1112   int64_t delta;
1113
1114   GNUNET_assert (h != NULL);
1115   GNUNET_assert (name != NULL);
1116   slen = strlen (h->subsystem) + 1;
1117   nlen = strlen (name) + 1;
1118   nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen;
1119   if (nsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1120   {
1121     GNUNET_break (0);
1122     return;
1123   }
1124   for (ai = h->action_head; ai != NULL; ai = ai->next)
1125   {
1126     if (! ( (0 == strcmp (ai->subsystem, h->subsystem)) &&
1127             (0 == strcmp (ai->name, name)) && 
1128             ( (ai->type == ACTION_UPDATE) ||
1129               (ai->type == ACTION_SET) ) ) )
1130       continue;
1131     if (ai->type == ACTION_SET)
1132     {
1133       if (type == ACTION_UPDATE)
1134       {
1135         delta = (int64_t) value;
1136         if (delta > 0)
1137         {
1138           /* update old set by new delta */
1139           ai->value += delta;
1140         }
1141         else
1142         {
1143           /* update old set by new delta, but never go negative */
1144           if (ai->value < -delta)
1145             ai->value = 0;
1146           else
1147             ai->value += delta;
1148         }
1149       }
1150       else
1151       {
1152         /* new set overrides old set */
1153         ai->value = value;
1154       }
1155     }
1156     else
1157     {
1158       if (type == ACTION_UPDATE)
1159       {
1160         /* make delta cummulative */
1161         delta = (int64_t) value;
1162         ai->value += delta;
1163       }
1164       else
1165       {
1166         /* drop old 'update', use new 'set' instead */
1167         ai->value = value;
1168         ai->type = type;
1169       }
1170     }
1171     ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1172     ai->make_persistent = make_persistent;
1173     return;  
1174   }
1175   /* no existing entry matches, create a fresh one */
1176   ai = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_GetHandle));
1177   ai->sh = h;
1178   ai->subsystem = GNUNET_strdup (h->subsystem);
1179   ai->name = GNUNET_strdup (name);
1180   ai->timeout = GNUNET_TIME_relative_to_absolute (SET_TRANSMIT_TIMEOUT);
1181   ai->make_persistent = make_persistent;
1182   ai->msize = nsize;
1183   ai->value = value;
1184   ai->type = type;
1185   GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail,
1186                                     ai);
1187   schedule_action (h);
1188 }
1189
1190
1191 /**
1192  * Set statistic value for the peer.  Will always use our
1193  * subsystem (the argument used when "handle" was created).
1194  *
1195  * @param handle identification of the statistics service
1196  * @param name name of the statistic value
1197  * @param value new value to set
1198  * @param make_persistent should the value be kept across restarts?
1199  */
1200 void
1201 GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle,
1202                        const char *name, uint64_t value, int make_persistent)
1203 {
1204   if (handle == NULL)
1205     return;
1206   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1207   add_setter_action (handle, name, make_persistent, value, ACTION_SET);
1208 }
1209
1210
1211 /**
1212  * Set statistic value for the peer.  Will always use our
1213  * subsystem (the argument used when "handle" was created).
1214  *
1215  * @param handle identification of the statistics service
1216  * @param name name of the statistic value
1217  * @param delta change in value (added to existing value)
1218  * @param make_persistent should the value be kept across restarts?
1219  */
1220 void
1221 GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle,
1222                           const char *name, int64_t delta, int make_persistent)
1223 {
1224   if (handle == NULL)
1225     return;
1226   if (delta == 0)
1227     return;
1228   GNUNET_assert (GNUNET_NO == handle->do_destroy);
1229   add_setter_action (handle, name, make_persistent, (uint64_t) delta,
1230                      ACTION_UPDATE);
1231 }
1232
1233
1234 /* end of statistics_api.c */