More API function tests...
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2014, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * Limit size of bloom filter to 2 GB.
41  */
42 #define MAX_BF_SIZE ((uint32_t) (1LL << 31))
43
44 /**
45  * How long are we at most keeping "expired" content
46  * past the expiration date in the database?
47  */
48 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
49
50 /**
51  * How fast are we allowed to query the database for deleting
52  * expired content? (1 item per second).
53  */
54 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
55
56 /**
57  * Name under which we store current space consumption.
58  */
59 static char *quota_stat_name;
60
61 /**
62  * Task to timeout stat GET.
63  */
64 static struct GNUNET_SCHEDULER_Task *stat_timeout_task;
65
66 /**
67  * After how many payload-changing operations
68  * do we sync our statistics?
69  */
70 #define MAX_STAT_SYNC_LAG 50
71
72
73 /**
74  * Our datastore plugin.
75  */
76 struct DatastorePlugin
77 {
78
79   /**
80    * API of the transport as returned by the plugin's
81    * initialization function.
82    */
83   struct GNUNET_DATASTORE_PluginFunctions *api;
84
85   /**
86    * Short name for the plugin (i.e. "sqlite").
87    */
88   char *short_name;
89
90   /**
91    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
92    */
93   char *lib_name;
94
95   /**
96    * Environment this transport service is using
97    * for this plugin.
98    */
99   struct GNUNET_DATASTORE_PluginEnvironment env;
100
101 };
102
103
104 /**
105  * Linked list of active reservations.
106  */
107 struct ReservationList
108 {
109
110   /**
111    * This is a linked list.
112    */
113   struct ReservationList *next;
114
115   /**
116    * Client that made the reservation.
117    */
118   struct GNUNET_SERVICE_Client *client;
119
120   /**
121    * Number of bytes (still) reserved.
122    */
123   uint64_t amount;
124
125   /**
126    * Number of items (still) reserved.
127    */
128   uint64_t entries;
129
130   /**
131    * Reservation identifier.
132    */
133   int32_t rid;
134
135 };
136
137
138
139 /**
140  * Our datastore plugin (NULL if not available).
141  */
142 static struct DatastorePlugin *plugin;
143
144 /**
145  * Linked list of space reservations made by clients.
146  */
147 static struct ReservationList *reservations;
148
149 /**
150  * Bloomfilter to quickly tell if we don't have the content.
151  */
152 static struct GNUNET_CONTAINER_BloomFilter *filter;
153
154 /**
155  * Name of our plugin.
156  */
157 static char *plugin_name;
158
159 /**
160  * Our configuration.
161  */
162 static const struct GNUNET_CONFIGURATION_Handle *cfg;
163
164 /**
165  * Handle for reporting statistics.
166  */
167 static struct GNUNET_STATISTICS_Handle *stats;
168
169 /**
170  * How much space are we using for the cache?  (space available for
171  * insertions that will be instantly reclaimed by discarding less
172  * important content --- or possibly whatever we just inserted into
173  * the "cache").
174  */
175 static unsigned long long cache_size;
176
177 /**
178  * How much space have we currently reserved?
179  */
180 static unsigned long long reserved;
181
182 /**
183  * How much data are we currently storing
184  * in the database?
185  */
186 static unsigned long long payload;
187
188 /**
189  * Identity of the task that is used to delete
190  * expired content.
191  */
192 static struct GNUNET_SCHEDULER_Task *expired_kill_task;
193
194 /**
195  * Minimum time that content should have to not be discarded instantly
196  * (time stamp of any content that we've been discarding recently to
197  * stay below the quota).  FOREVER if we had to expire content with
198  * non-zero priority.
199  */
200 static struct GNUNET_TIME_Absolute min_expiration;
201
202 /**
203  * How much space are we allowed to use?
204  */
205 static unsigned long long quota;
206
207 /**
208  * Should the database be dropped on exit?
209  */
210 static int do_drop;
211
212 /**
213  * Should we refresh the BF when the DB is loaded?
214  */
215 static int refresh_bf;
216
217 /**
218  * Number of updates that were made to the
219  * payload value since we last synchronized
220  * it with the statistics service.
221  */
222 static unsigned int last_sync;
223
224 /**
225  * Did we get an answer from statistics?
226  */
227 static int stats_worked;
228
229
230 /**
231  * Synchronize our utilization statistics with the
232  * statistics service.
233  */
234 static void
235 sync_stats ()
236 {
237   GNUNET_STATISTICS_set (stats,
238                          quota_stat_name,
239                          payload,
240                          GNUNET_YES);
241   GNUNET_STATISTICS_set (stats,
242                          "# utilization by current datastore",
243                          payload,
244                          GNUNET_NO);
245   last_sync = 0;
246 }
247
248
249 /**
250  * Have we already cleaned up the TCCs and are hence no longer
251  * willing (or able) to transmit anything to anyone?
252  */
253 static int cleaning_done;
254
255 /**
256  * Handle for pending get request.
257  */
258 static struct GNUNET_STATISTICS_GetHandle *stat_get;
259
260 /**
261  * Handle to our server.
262  */
263 static struct GNUNET_SERVICE_Handle *service;
264
265 /**
266  * Task that is used to remove expired entries from
267  * the datastore.  This task will schedule itself
268  * again automatically to always delete all expired
269  * content quickly.
270  *
271  * @param cls not used
272  */
273 static void
274 delete_expired (void *cls);
275
276
277 /**
278  * Iterate over the expired items stored in the datastore.
279  * Delete all expired items; once we have processed all
280  * expired items, re-schedule the "delete_expired" task.
281  *
282  * @param cls not used
283  * @param key key for the content
284  * @param size number of bytes in data
285  * @param data content stored
286  * @param type type of the content
287  * @param priority priority of the content
288  * @param anonymity anonymity-level for the content
289  * @param expiration expiration time for the content
290  * @param uid unique identifier for the datum;
291  *        maybe 0 if no unique identifier is available
292  *
293  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
294  *         (continue on call to "next", of course),
295  *         #GNUNET_NO to delete the item and continue (if supported)
296  */
297 static int
298 expired_processor (void *cls,
299                    const struct GNUNET_HashCode *key,
300                    uint32_t size,
301                    const void *data,
302                    enum GNUNET_BLOCK_Type type,
303                    uint32_t priority,
304                    uint32_t anonymity,
305                    struct GNUNET_TIME_Absolute expiration,
306                    uint64_t uid)
307 {
308   struct GNUNET_TIME_Absolute now;
309
310   if (key == NULL)
311   {
312     expired_kill_task =
313         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
314                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
315                                                     &delete_expired, NULL);
316     return GNUNET_SYSERR;
317   }
318   now = GNUNET_TIME_absolute_get ();
319   if (expiration.abs_value_us > now.abs_value_us)
320   {
321     /* finished processing */
322     expired_kill_task =
323         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
324                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
325                                                     &delete_expired, NULL);
326     return GNUNET_SYSERR;
327   }
328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329               "Deleting content `%s' of type %u that expired %s ago\n",
330               GNUNET_h2s (key), type,
331               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_difference (expiration,
332                                                                                            now),
333                                                       GNUNET_YES));
334   min_expiration = now;
335   GNUNET_STATISTICS_update (stats,
336                             gettext_noop ("# bytes expired"),
337                             size,
338                             GNUNET_YES);
339   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
340   expired_kill_task =
341       GNUNET_SCHEDULER_add_delayed_with_priority (MIN_EXPIRE_DELAY,
342                                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
343                                                   &delete_expired, NULL);
344   return GNUNET_NO;
345 }
346
347
348 /**
349  * Task that is used to remove expired entries from
350  * the datastore.  This task will schedule itself
351  * again automatically to always delete all expired
352  * content quickly.
353  *
354  * @param cls not used
355  */
356 static void
357 delete_expired (void *cls)
358 {
359   expired_kill_task = NULL;
360   plugin->api->get_expiration (plugin->api->cls,
361                                &expired_processor,
362                                NULL);
363 }
364
365
366 /**
367  * An iterator over a set of items stored in the datastore
368  * that deletes until we're happy with respect to our quota.
369  *
370  * @param cls closure
371  * @param key key for the content
372  * @param size number of bytes in data
373  * @param data content stored
374  * @param type type of the content
375  * @param priority priority of the content
376  * @param anonymity anonymity-level for the content
377  * @param expiration expiration time for the content
378  * @param uid unique identifier for the datum;
379  *        maybe 0 if no unique identifier is available
380  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
381  *         (continue on call to "next", of course),
382  *         #GNUNET_NO to delete the item and continue (if supported)
383  */
384 static int
385 quota_processor (void *cls,
386                  const struct GNUNET_HashCode *key,
387                  uint32_t size,
388                  const void *data,
389                  enum GNUNET_BLOCK_Type type,
390                  uint32_t priority,
391                  uint32_t anonymity,
392                  struct GNUNET_TIME_Absolute expiration,
393                  uint64_t uid)
394 {
395   unsigned long long *need = cls;
396
397   if (NULL == key)
398     return GNUNET_SYSERR;
399   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
400               "Deleting %llu bytes of low-priority (%u) content `%s' of type %u at %s prior to expiration (still trying to free another %llu bytes)\n",
401               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
402               (unsigned int) priority,
403               GNUNET_h2s (key), type,
404               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
405                                                       GNUNET_YES),
406               *need);
407   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
408     *need = 0;
409   else
410     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
411   if (priority > 0)
412     min_expiration = GNUNET_TIME_UNIT_FOREVER_ABS;
413   else
414     min_expiration = expiration;
415   GNUNET_STATISTICS_update (stats,
416                             gettext_noop ("# bytes purged (low-priority)"),
417                             size, GNUNET_YES);
418   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
419   return GNUNET_NO;
420 }
421
422
423 /**
424  * Manage available disk space by running tasks
425  * that will discard content if necessary.  This
426  * function will be run whenever a request for
427  * "need" bytes of storage could only be satisfied
428  * by eating into the "cache" (and we want our cache
429  * space back).
430  *
431  * @param need number of bytes of content that were
432  *        placed into the "cache" (and hence the
433  *        number of bytes that should be removed).
434  */
435 static void
436 manage_space (unsigned long long need)
437 {
438   unsigned long long last;
439
440   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
441               "Asked to free up %llu bytes of cache space\n",
442               need);
443   last = 0;
444   while ((need > 0) && (last != need))
445   {
446     last = need;
447     plugin->api->get_expiration (plugin->api->cls,
448                                  &quota_processor,
449                                  &need);
450   }
451 }
452
453
454 /**
455  * Transmit a status code to the client.
456  *
457  * @param client receiver of the response
458  * @param code status code
459  * @param msg optional error message (can be NULL)
460  */
461 static void
462 transmit_status (struct GNUNET_SERVICE_Client *client,
463                  int code,
464                  const char *msg)
465 {
466   struct GNUNET_MQ_Envelope *env;
467   struct StatusMessage *sm;
468   size_t slen;
469
470   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471               "Transmitting `%s' message with value %d and message `%s'\n",
472               "STATUS", code, msg != NULL ? msg : "(none)");
473   slen = (msg == NULL) ? 0 : strlen (msg) + 1;
474   env = GNUNET_MQ_msg_extra (sm,
475                              slen,
476                              GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
477   sm->status = htonl (code);
478   sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration);
479   GNUNET_memcpy (&sm[1],
480                  msg,
481                  slen);
482   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
483                   env);
484 }
485
486
487 /**
488  * Function that will transmit the given datastore entry
489  * to the client.
490  *
491  * @param cls closure, pointer to the client (of type `struct GNUNET_SERVICE_Client`).
492  * @param key key for the content
493  * @param size number of bytes in data
494  * @param data content stored
495  * @param type type of the content
496  * @param priority priority of the content
497  * @param anonymity anonymity-level for the content
498  * @param expiration expiration time for the content
499  * @param uid unique identifier for the datum;
500  *        maybe 0 if no unique identifier is available
501  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue,
502  *         #GNUNET_NO to delete the item and continue (if supported)
503  */
504 static int
505 transmit_item (void *cls,
506                const struct GNUNET_HashCode *key,
507                uint32_t size,
508                const void *data,
509                enum GNUNET_BLOCK_Type type,
510                uint32_t priority,
511                uint32_t anonymity,
512                struct GNUNET_TIME_Absolute expiration,
513                uint64_t uid)
514 {
515   struct GNUNET_SERVICE_Client *client = cls;
516   struct GNUNET_MQ_Envelope *env;
517   struct GNUNET_MessageHeader *end;
518   struct DataMessage *dm;
519
520   if (NULL == key)
521   {
522     /* transmit 'DATA_END' */
523     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
524                 "Transmitting DATA_END message\n");
525     env = GNUNET_MQ_msg (end,
526                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
527     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
528                     env);
529     return GNUNET_OK;
530   }
531   GNUNET_assert (sizeof (struct DataMessage) + size <
532                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
533   env = GNUNET_MQ_msg_extra (dm,
534                              size,
535                              GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
536   dm->rid = htonl (0);
537   dm->size = htonl (size);
538   dm->type = htonl (type);
539   dm->priority = htonl (priority);
540   dm->anonymity = htonl (anonymity);
541   dm->replication = htonl (0);
542   dm->reserved = htonl (0);
543   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
544   dm->uid = GNUNET_htonll (uid);
545   dm->key = *key;
546   GNUNET_memcpy (&dm[1],
547                  data,
548                  size);
549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550               "Transmitting DATA message for `%s' of type %u with expiration %s (in: %s)\n",
551               GNUNET_h2s (key),
552               type,
553               GNUNET_STRINGS_absolute_time_to_string (expiration),
554               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
555                                                       GNUNET_YES));
556   GNUNET_STATISTICS_update (stats,
557                             gettext_noop ("# results found"),
558                             1,
559                             GNUNET_NO);
560   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
561                   env);
562   return GNUNET_OK;
563 }
564
565
566 /**
567  * Handle RESERVE-message.
568  *
569  * @param cls identification of the client
570  * @param message the actual message
571  */
572 static void
573 handle_reserve (void *cls,
574                 const struct ReserveMessage *msg)
575 {
576   /**
577    * Static counter to produce reservation identifiers.
578    */
579   static int reservation_gen;
580   struct GNUNET_SERVICE_Client *client = cls;
581   struct ReservationList *e;
582   unsigned long long used;
583   unsigned long long req;
584   uint64_t amount;
585   uint32_t entries;
586
587   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
588               "Processing RESERVE request\n");
589   amount = GNUNET_ntohll (msg->amount);
590   entries = ntohl (msg->entries);
591   used = payload + reserved;
592   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
593   if (used + req > quota)
594   {
595     if (quota < used)
596       used = quota;             /* cheat a bit for error message (to avoid negative numbers) */
597     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
598                 _("Insufficient space (%llu bytes are available) to satisfy RESERVE request for %llu bytes\n"),
599                 quota - used,
600                 req);
601     if (cache_size < req)
602     {
603       /* TODO: document this in the FAQ; essentially, if this
604        * message happens, the insertion request could be blocked
605        * by less-important content from migration because it is
606        * larger than 1/8th of the overall available space, and
607        * we only reserve 1/8th for "fresh" insertions */
608       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
609                   _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
610                   req,
611                   cache_size);
612       transmit_status (client,
613                        0,
614                        gettext_noop
615                        ("Insufficient space to satisfy request and "
616                         "requested amount is larger than cache size"));
617     }
618     else
619     {
620       transmit_status (client,
621                        0,
622                        gettext_noop ("Insufficient space to satisfy request"));
623     }
624     GNUNET_SERVICE_client_continue (client);
625     return;
626   }
627   reserved += req;
628   GNUNET_STATISTICS_set (stats,
629                          gettext_noop ("# reserved"),
630                          reserved,
631                          GNUNET_NO);
632   e = GNUNET_new (struct ReservationList);
633   e->next = reservations;
634   reservations = e;
635   e->client = client;
636   e->amount = amount;
637   e->entries = entries;
638   e->rid = ++reservation_gen;
639   if (reservation_gen < 0)
640     reservation_gen = 0;        /* wrap around */
641   transmit_status (client,
642                    e->rid,
643                    NULL);
644   GNUNET_SERVICE_client_continue (client);
645 }
646
647
648 /**
649  * Handle RELEASE_RESERVE-message.
650  *
651  * @param cls identification of the client
652  * @param message the actual message
653  */
654 static void
655 handle_release_reserve (void *cls,
656                         const struct ReleaseReserveMessage *msg)
657 {
658   struct GNUNET_SERVICE_Client *client = cls;
659   struct ReservationList *pos;
660   struct ReservationList *prev;
661   struct ReservationList *next;
662   int rid = ntohl (msg->rid);
663   unsigned long long rem;
664
665   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
666               "Processing RELEASE_RESERVE request\n");
667   next = reservations;
668   prev = NULL;
669   while (NULL != (pos = next))
670   {
671     next = pos->next;
672     if (rid == pos->rid)
673     {
674       if (prev == NULL)
675         reservations = next;
676       else
677         prev->next = next;
678       rem =
679           pos->amount +
680           ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
681       GNUNET_assert (reserved >= rem);
682       reserved -= rem;
683       GNUNET_STATISTICS_set (stats,
684                              gettext_noop ("# reserved"),
685                              reserved,
686                              GNUNET_NO);
687       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688                   "Returning %llu remaining reserved bytes to storage pool\n",
689                   rem);
690       GNUNET_free (pos);
691       transmit_status (client,
692                        GNUNET_OK,
693                        NULL);
694       GNUNET_SERVICE_client_continue (client);
695       return;
696     }
697     prev = pos;
698   }
699   GNUNET_break (0);
700   transmit_status (client,
701                    GNUNET_SYSERR,
702                    gettext_noop ("Could not find matching reservation"));
703   GNUNET_SERVICE_client_continue (client);
704 }
705
706
707 /**
708  * Check that the given message is a valid data message.
709  *
710  * @param dm message to check
711  * @return #GNUNET_SYSERR is not well-formed, otherwise #GNUNET_OK
712  */
713 static int
714 check_data (const struct DataMessage *dm)
715 {
716   uint16_t size;
717   uint32_t dsize;
718
719   size = ntohs (dm->header.size);
720   dsize = ntohl (dm->size);
721   if (size != dsize + sizeof (struct DataMessage))
722   {
723     GNUNET_break (0);
724     return GNUNET_SYSERR;
725   }
726   return GNUNET_OK;
727 }
728
729
730 /**
731  * Context for a PUT request used to see if the content is
732  * already present.
733  */
734 struct PutContext
735 {
736   /**
737    * Client to notify on completion.
738    */
739   struct GNUNET_SERVICE_Client *client;
740
741 #if ! HAVE_UNALIGNED_64_ACCESS
742   void *reserved;
743 #endif
744
745   /* followed by the 'struct DataMessage' */
746 };
747
748
749 /**
750  * Put continuation.
751  *
752  * @param cls closure
753  * @param key key for the item stored
754  * @param size size of the item stored
755  * @param status #GNUNET_OK or #GNUNET_SYSERROR
756  * @param msg error message on error
757  */
758 static void
759 put_continuation (void *cls,
760                   const struct GNUNET_HashCode *key,
761                   uint32_t size,
762                   int status,
763                   const char *msg)
764 {
765   struct PutContext *pc = cls;
766
767   if (GNUNET_OK == status)
768   {
769     GNUNET_STATISTICS_update (stats,
770                               gettext_noop ("# bytes stored"),
771                               size,
772                               GNUNET_YES);
773     GNUNET_CONTAINER_bloomfilter_add (filter,
774                                       key);
775     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776                 "Successfully stored %u bytes under key `%s'\n",
777                 size,
778                 GNUNET_h2s (key));
779   }
780   transmit_status (pc->client,
781                    status,
782                    msg);
783   GNUNET_free (pc);
784   if (quota - reserved - cache_size < payload)
785   {
786     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
787                 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
788                 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
789                 (unsigned long long) (quota - reserved - cache_size),
790                 (unsigned long long) payload);
791     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
792   }
793 }
794
795
796 /**
797  * Actually put the data message.
798  *
799  * @param pc put context
800  */
801 static void
802 execute_put (struct PutContext *pc)
803 {
804   const struct DataMessage *dm;
805
806   dm = (const struct DataMessage *) &pc[1];
807   plugin->api->put (plugin->api->cls,
808                     &dm->key,
809                     ntohl (dm->size),
810                     &dm[1],
811                     ntohl (dm->type),
812                     ntohl (dm->priority),
813                     ntohl (dm->anonymity),
814                     ntohl (dm->replication),
815                     GNUNET_TIME_absolute_ntoh (dm->expiration),
816                     &put_continuation,
817                     pc);
818 }
819
820
821 /**
822  *
823  * @param cls closure
824  * @param status #GNUNET_OK or #GNUNET_SYSERR
825  * @param msg error message on error
826  */
827 static void
828 check_present_continuation (void *cls,
829                             int status,
830                             const char *msg)
831 {
832   struct GNUNET_SERVICE_Client *client = cls;
833
834   transmit_status (client,
835                    GNUNET_NO,
836                    NULL);
837 }
838
839
840 /**
841  * Function that will check if the given datastore entry
842  * matches the put and if none match executes the put.
843  *
844  * @param cls closure, pointer to the client (of type `struct PutContext`).
845  * @param key key for the content
846  * @param size number of bytes in data
847  * @param data content stored
848  * @param type type of the content
849  * @param priority priority of the content
850  * @param anonymity anonymity-level for the content
851  * @param expiration expiration time for the content
852  * @param uid unique identifier for the datum;
853  *        maybe 0 if no unique identifier is available
854  * @return #GNUNET_OK usually
855  *         #GNUNET_NO to delete the item
856  */
857 static int
858 check_present (void *cls,
859                const struct GNUNET_HashCode *key,
860                uint32_t size,
861                const void *data,
862                enum GNUNET_BLOCK_Type type,
863                uint32_t priority,
864                uint32_t anonymity,
865                struct GNUNET_TIME_Absolute expiration,
866                uint64_t uid)
867 {
868   struct PutContext *pc = cls;
869   const struct DataMessage *dm;
870
871   dm = (const struct DataMessage *) &pc[1];
872   if (key == NULL)
873   {
874     execute_put (pc);
875     return GNUNET_OK;
876   }
877   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
878        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
879        ( (size == ntohl (dm->size)) &&
880          (0 == memcmp (&dm[1],
881                        data,
882                        size)) ) )
883   {
884     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885                 "Result already present in datastore\n");
886     /* FIXME: change API to allow increasing 'replication' counter */
887     if ((ntohl (dm->priority) > 0) ||
888         (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value_us >
889          expiration.abs_value_us))
890       plugin->api->update (plugin->api->cls,
891                            uid,
892                            ntohl (dm->priority),
893                            GNUNET_TIME_absolute_ntoh (dm->expiration),
894                            &check_present_continuation,
895                            pc->client);
896     else
897     {
898       transmit_status (pc->client,
899                        GNUNET_NO,
900                        NULL);
901     }
902     GNUNET_free (pc);
903   }
904   else
905   {
906     execute_put (pc);
907   }
908   return GNUNET_OK;
909 }
910
911
912 /**
913  * Verify PUT-message.
914  *
915  * @param cls identification of the client
916  * @param message the actual message
917  * @return #GNUNET_OK if @a dm is well-formed
918  */
919 static int
920 check_put (void *cls,
921            const struct DataMessage *dm)
922 {
923   if (GNUNET_OK != check_data (dm))
924   {
925     GNUNET_break (0);
926     return GNUNET_SYSERR;
927   }
928   return GNUNET_OK;
929 }
930
931
932 /**
933  * Handle PUT-message.
934  *
935  * @param cls identification of the client
936  * @param message the actual message
937  */
938 static void
939 handle_put (void *cls,
940             const struct DataMessage *dm)
941 {
942   struct GNUNET_SERVICE_Client *client = cls;
943   int rid;
944   struct ReservationList *pos;
945   struct PutContext *pc;
946   struct GNUNET_HashCode vhash;
947   uint32_t size;
948
949   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
950               "Processing PUT request for `%s' of type %u\n",
951               GNUNET_h2s (&dm->key),
952               ntohl (dm->type));
953   rid = ntohl (dm->rid);
954   size = ntohl (dm->size);
955   if (rid > 0)
956   {
957     pos = reservations;
958     while ((NULL != pos) && (rid != pos->rid))
959       pos = pos->next;
960     GNUNET_break (pos != NULL);
961     if (NULL != pos)
962     {
963       GNUNET_break (pos->entries > 0);
964       GNUNET_break (pos->amount >= size);
965       pos->entries--;
966       pos->amount -= size;
967       reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
968       GNUNET_STATISTICS_set (stats,
969                              gettext_noop ("# reserved"),
970                              reserved,
971                              GNUNET_NO);
972     }
973   }
974   pc = GNUNET_malloc (sizeof (struct PutContext) + size +
975                       sizeof (struct DataMessage));
976   pc->client = client;
977   GNUNET_memcpy (&pc[1],
978                  dm,
979                  size + sizeof (struct DataMessage));
980   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
981                                                        &dm->key))
982   {
983     GNUNET_CRYPTO_hash (&dm[1],
984                         size,
985                         &vhash);
986     plugin->api->get_key (plugin->api->cls,
987                           0,
988                           &dm->key,
989                           &vhash,
990                           ntohl (dm->type),
991                           &check_present,
992                           pc);
993     GNUNET_SERVICE_client_continue (client);
994     return;
995   }
996   execute_put (pc);
997   GNUNET_SERVICE_client_continue (client);
998 }
999
1000
1001 /**
1002  * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET-message.
1003  *
1004  * @param cls identification of the client
1005  * @param msg the actual message
1006  */
1007 static void
1008 handle_get (void *cls,
1009             const struct GetMessage *msg)
1010 {
1011   struct GNUNET_SERVICE_Client *client = cls;
1012
1013   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1014               "Processing GET request of type %u\n",
1015               ntohl (msg->type));
1016   GNUNET_STATISTICS_update (stats,
1017                             gettext_noop ("# GET requests received"),
1018                             1,
1019                             GNUNET_NO);
1020   plugin->api->get_key (plugin->api->cls,
1021                         GNUNET_ntohll (msg->offset),
1022                         NULL,
1023                         NULL,
1024                         ntohl (msg->type),
1025                         &transmit_item,
1026                         client);
1027   GNUNET_SERVICE_client_continue (client);
1028 }
1029
1030
1031 /**
1032  * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY-message.
1033  *
1034  * @param cls closure
1035  * @param msg the actual message
1036  */
1037 static void
1038 handle_get_key (void *cls,
1039                 const struct GetKeyMessage *msg)
1040 {
1041   struct GNUNET_SERVICE_Client *client = cls;
1042
1043   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044               "Processing GET request for `%s' of type %u\n",
1045               GNUNET_h2s (&msg->key),
1046               ntohl (msg->type));
1047   GNUNET_STATISTICS_update (stats,
1048                             gettext_noop ("# GET KEY requests received"),
1049                             1,
1050                             GNUNET_NO);
1051   if (GNUNET_YES !=
1052       GNUNET_CONTAINER_bloomfilter_test (filter,
1053                                          &msg->key))
1054   {
1055     /* don't bother database... */
1056     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057                 "Empty result set for GET request for `%s' (bloomfilter).\n",
1058                 GNUNET_h2s (&msg->key));
1059     GNUNET_STATISTICS_update (stats,
1060                               gettext_noop
1061                               ("# requests filtered by bloomfilter"),
1062                               1,
1063                               GNUNET_NO);
1064     transmit_item (client,
1065                    NULL, 0, NULL, 0, 0, 0,
1066                    GNUNET_TIME_UNIT_ZERO_ABS,
1067                    0);
1068     GNUNET_SERVICE_client_continue (client);
1069     return;
1070   }
1071   plugin->api->get_key (plugin->api->cls,
1072                         GNUNET_ntohll (msg->offset),
1073                         &msg->key,
1074                         NULL,
1075                         ntohl (msg->type),
1076                         &transmit_item,
1077                         client);
1078   GNUNET_SERVICE_client_continue (client);
1079 }
1080
1081
1082 /**
1083  * Handle GET_REPLICATION-message.
1084  *
1085  * @param cls identification of the client
1086  * @param message the actual message
1087  */
1088 static void
1089 handle_get_replication (void *cls,
1090                         const struct GNUNET_MessageHeader *message)
1091 {
1092   struct GNUNET_SERVICE_Client *client = cls;
1093
1094   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095               "Processing GET_REPLICATION request\n");
1096   GNUNET_STATISTICS_update (stats,
1097                             gettext_noop ("# GET REPLICATION requests received"),
1098                             1,
1099                             GNUNET_NO);
1100   plugin->api->get_replication (plugin->api->cls,
1101                                 &transmit_item,
1102                                 client);
1103   GNUNET_SERVICE_client_continue (client);
1104 }
1105
1106
1107 /**
1108  * Handle GET_ZERO_ANONYMITY-message.
1109  *
1110  * @param cls client identification of the client
1111  * @param message the actual message
1112  */
1113 static void
1114 handle_get_zero_anonymity (void *cls,
1115                            const struct GetZeroAnonymityMessage *msg)
1116 {
1117   struct GNUNET_SERVICE_Client *client = cls;
1118   enum GNUNET_BLOCK_Type type;
1119
1120   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1121   if (type == GNUNET_BLOCK_TYPE_ANY)
1122   {
1123     GNUNET_break (0);
1124     GNUNET_SERVICE_client_drop (client);
1125     return;
1126   }
1127   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1128               "Processing GET_ZERO_ANONYMITY request\n");
1129   GNUNET_STATISTICS_update (stats,
1130                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1131                             1,
1132                             GNUNET_NO);
1133   plugin->api->get_zero_anonymity (plugin->api->cls,
1134                                    GNUNET_ntohll (msg->offset),
1135                                    type,
1136                                    &transmit_item,
1137                                    client);
1138   GNUNET_SERVICE_client_continue (client);
1139 }
1140
1141
1142 /**
1143  * Callback function that will cause the item that is passed
1144  * in to be deleted (by returning #GNUNET_NO).
1145  *
1146  * @param cls closure
1147  * @param key key for the content
1148  * @param size number of bytes in data
1149  * @param data content stored
1150  * @param type type of the content
1151  * @param priority priority of the content
1152  * @param anonymity anonymity-level for the content
1153  * @param expiration expiration time for the content
1154  * @param uid unique identifier for the datum
1155  * @return #GNUNET_OK to keep the item
1156  *         #GNUNET_NO to delete the item
1157  */
1158 static int
1159 remove_callback (void *cls,
1160                  const struct GNUNET_HashCode *key,
1161                  uint32_t size,
1162                  const void *data,
1163                  enum GNUNET_BLOCK_Type type,
1164                  uint32_t priority,
1165                  uint32_t anonymity,
1166                  struct GNUNET_TIME_Absolute expiration,
1167                  uint64_t uid)
1168 {
1169   struct GNUNET_SERVICE_Client *client = cls;
1170
1171   if (NULL == key)
1172   {
1173     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1174                 "No further matches for REMOVE request.\n");
1175     transmit_status (client,
1176                      GNUNET_NO,
1177                      _("Content not found"));
1178     return GNUNET_OK;           /* last item */
1179   }
1180   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1181               "Item %llu matches REMOVE request for key `%s' and type %u.\n",
1182               (unsigned long long) uid,
1183               GNUNET_h2s (key),
1184               type);
1185   GNUNET_STATISTICS_update (stats,
1186                             gettext_noop ("# bytes removed (explicit request)"),
1187                             size,
1188                             GNUNET_YES);
1189   GNUNET_CONTAINER_bloomfilter_remove (filter,
1190                                        key);
1191   transmit_status (client,
1192                    GNUNET_OK,
1193                    NULL);
1194   return GNUNET_NO;
1195 }
1196
1197
1198 /**
1199  * Verify REMOVE-message.
1200  *
1201  * @param cls identification of the client
1202  * @param message the actual message
1203  * @return #GNUNET_OK if @a dm is well-formed
1204  */
1205 static int
1206 check_remove (void *cls,
1207               const struct DataMessage *dm)
1208 {
1209   if (GNUNET_OK != check_data (dm))
1210   {
1211     GNUNET_break (0);
1212     return GNUNET_SYSERR;
1213   }
1214   return GNUNET_OK;
1215 }
1216
1217
1218 /**
1219  * Handle REMOVE-message.
1220  *
1221  * @param cls closure
1222  * @param client identification of the client
1223  * @param message the actual message
1224  */
1225 static void
1226 handle_remove (void *cls,
1227                const struct DataMessage *dm)
1228 {
1229   struct GNUNET_SERVICE_Client *client = cls;
1230   struct GNUNET_HashCode vhash;
1231
1232   GNUNET_STATISTICS_update (stats,
1233                             gettext_noop ("# REMOVE requests received"),
1234                             1, GNUNET_NO);
1235   GNUNET_CRYPTO_hash (&dm[1],
1236                       ntohl (dm->size),
1237                       &vhash);
1238   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1239               "Processing REMOVE request for `%s' of type %u\n",
1240               GNUNET_h2s (&dm->key),
1241               ntohl (dm->type));
1242   plugin->api->get_key (plugin->api->cls,
1243                         0,
1244                         &dm->key,
1245                         &vhash,
1246                         (enum GNUNET_BLOCK_Type) ntohl (dm->type),
1247                         &remove_callback,
1248                         client);
1249   GNUNET_SERVICE_client_continue (client);
1250 }
1251
1252
1253 /**
1254  * Handle DROP-message.
1255  *
1256  * @param cls identification of the client
1257  * @param message the actual message
1258  */
1259 static void
1260 handle_drop (void *cls,
1261              const struct GNUNET_MessageHeader *message)
1262 {
1263   struct GNUNET_SERVICE_Client *client = cls;
1264
1265   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266               "Processing DROP request\n");
1267   do_drop = GNUNET_YES;
1268   GNUNET_SERVICE_client_continue (client);
1269 }
1270
1271
1272 /**
1273  * Function called by plugins to notify us about a
1274  * change in their disk utilization.
1275  *
1276  * @param cls closure (NULL)
1277  * @param delta change in disk utilization,
1278  *        0 for "reset to empty"
1279  */
1280 static void
1281 disk_utilization_change_cb (void *cls,
1282                             int delta)
1283 {
1284   if ((delta < 0) && (payload < -delta))
1285   {
1286     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1287                 _("Datastore payload must have been inaccurate (%lld < %lld). Recomputing it.\n"),
1288                 (long long) payload,
1289                 (long long) -delta);
1290     plugin->api->estimate_size (plugin->api->cls,
1291                                 &payload);
1292     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1293                 _("New payload: %lld\n"),
1294                 (long long) payload);
1295      sync_stats ();
1296     return;
1297   }
1298   payload += delta;
1299   last_sync++;
1300   if (last_sync >= MAX_STAT_SYNC_LAG)
1301     sync_stats ();
1302 }
1303
1304
1305 /**
1306  * Callback function to process statistic values.
1307  *
1308  * @param cls closure (struct Plugin*)
1309  * @param subsystem name of subsystem that created the statistic
1310  * @param name the name of the datum
1311  * @param value the current value
1312  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
1313  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
1314  */
1315 static int
1316 process_stat_in (void *cls,
1317                  const char *subsystem,
1318                  const char *name,
1319                  uint64_t value,
1320                  int is_persistent)
1321 {
1322   GNUNET_assert (GNUNET_NO == stats_worked);
1323   stats_worked = GNUNET_YES;
1324   payload += value;
1325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1327               (unsigned long long) value,
1328               (unsigned long long) payload);
1329   return GNUNET_OK;
1330 }
1331
1332
1333 /**
1334  * Load the datastore plugin.
1335  */
1336 static struct DatastorePlugin *
1337 load_plugin ()
1338 {
1339   struct DatastorePlugin *ret;
1340   char *libname;
1341
1342   ret = GNUNET_new (struct DatastorePlugin);
1343   ret->env.cfg = cfg;
1344   ret->env.duc = &disk_utilization_change_cb;
1345   ret->env.cls = NULL;
1346   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1347               _("Loading `%s' datastore plugin\n"),
1348               plugin_name);
1349   GNUNET_asprintf (&libname,
1350                    "libgnunet_plugin_datastore_%s",
1351                    plugin_name);
1352   ret->short_name = GNUNET_strdup (plugin_name);
1353   ret->lib_name = libname;
1354   ret->api = GNUNET_PLUGIN_load (libname,
1355                                  &ret->env);
1356   if (NULL == ret->api)
1357   {
1358     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1359                 _("Failed to load datastore plugin for `%s'\n"),
1360                 plugin_name);
1361     GNUNET_free (ret->short_name);
1362     GNUNET_free (libname);
1363     GNUNET_free (ret);
1364     return NULL;
1365   }
1366   return ret;
1367 }
1368
1369
1370 /**
1371  * Function called when the service shuts
1372  * down.  Unloads our datastore plugin.
1373  *
1374  * @param plug plugin to unload
1375  */
1376 static void
1377 unload_plugin (struct DatastorePlugin *plug)
1378 {
1379   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1380               "Datastore service is unloading plugin...\n");
1381   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1382   GNUNET_free (plug->lib_name);
1383   GNUNET_free (plug->short_name);
1384   GNUNET_free (plug);
1385 }
1386
1387
1388 /**
1389  * Initialization complete, start operating the service.
1390  */
1391 static void
1392 begin_service ()
1393 {
1394   GNUNET_SERVICE_resume (service);
1395   expired_kill_task
1396     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1397                                           &delete_expired,
1398                                           NULL);
1399 }
1400
1401
1402 /**
1403  * Adds a given @a key to the bloomfilter in @a cls @a count times.
1404  *
1405  * @param cls the bloomfilter
1406  * @param key key to add
1407  * @param count number of times to add key
1408  */
1409 static void
1410 add_key_to_bloomfilter (void *cls,
1411                         const struct GNUNET_HashCode *key,
1412                         unsigned int count)
1413 {
1414   struct GNUNET_CONTAINER_BloomFilter *bf = cls;
1415
1416   if (NULL == key)
1417   {
1418     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1419                 _("Bloomfilter construction complete.\n"));
1420     begin_service ();
1421     return;
1422   }
1423
1424   while (0 < count--)
1425     GNUNET_CONTAINER_bloomfilter_add (bf,
1426                                       key);
1427 }
1428
1429
1430 /**
1431  * We finished receiving the statistic.  Initialize the plugin; if
1432  * loading the statistic failed, run the estimator.
1433  *
1434  * @param cls NULL
1435  * @param success #GNUNET_NO if we failed to read the stat
1436  */
1437 static void
1438 process_stat_done (void *cls,
1439                    int success)
1440 {
1441   stat_get = NULL;
1442   if (NULL != stat_timeout_task)
1443   {
1444     GNUNET_SCHEDULER_cancel (stat_timeout_task);
1445     stat_timeout_task = NULL;
1446   }
1447   plugin = load_plugin ();
1448   if (NULL == plugin)
1449   {
1450     GNUNET_CONTAINER_bloomfilter_free (filter);
1451     filter = NULL;
1452     if (NULL != stats)
1453     {
1454       GNUNET_STATISTICS_destroy (stats,
1455                                  GNUNET_YES);
1456       stats = NULL;
1457     }
1458     return;
1459   }
1460
1461   if (GNUNET_NO == stats_worked)
1462   {
1463     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1464                 "Failed to obtain value from statistics service, recomputing it\n");
1465     plugin->api->estimate_size (plugin->api->cls,
1466                                 &payload);
1467     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1468                 _("New payload: %lld\n"),
1469                 (long long) payload);
1470   }
1471
1472   if (GNUNET_YES == refresh_bf)
1473   {
1474     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1475                 _("Rebuilding bloomfilter.  Please be patient.\n"));
1476     if (NULL != plugin->api->get_keys)
1477     {
1478       plugin->api->get_keys (plugin->api->cls,
1479                              &add_key_to_bloomfilter,
1480                              filter);
1481       return;
1482     }
1483     else
1484     {
1485       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1486                   _("Plugin does not support get_keys function. Please fix!\n"));
1487     }
1488   }
1489   begin_service ();
1490 }
1491
1492
1493 /**
1494  * Fetching stats took to long, run without.
1495  *
1496  * @param cls NULL
1497  */
1498 static void
1499 stat_timeout (void *cls)
1500 {
1501   stat_timeout_task = NULL;
1502   GNUNET_STATISTICS_get_cancel (stat_get);
1503   process_stat_done (NULL,
1504                      GNUNET_NO);
1505 }
1506
1507
1508 /**
1509  * Task run during shutdown.
1510  */
1511 static void
1512 cleaning_task (void *cls)
1513 {
1514   cleaning_done = GNUNET_YES;
1515   if (NULL != expired_kill_task)
1516   {
1517     GNUNET_SCHEDULER_cancel (expired_kill_task);
1518     expired_kill_task = NULL;
1519   }
1520   if (GNUNET_YES == do_drop)
1521   {
1522     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1523                 "Dropping database!\n");
1524     plugin->api->drop (plugin->api->cls);
1525     payload = 0;
1526     last_sync++;
1527   }
1528   if (NULL != plugin)
1529   {
1530     unload_plugin (plugin);
1531     plugin = NULL;
1532   }
1533   if (NULL != filter)
1534   {
1535     GNUNET_CONTAINER_bloomfilter_free (filter);
1536     filter = NULL;
1537   }
1538   if (NULL != stat_get)
1539   {
1540     GNUNET_STATISTICS_get_cancel (stat_get);
1541     stat_get = NULL;
1542   }
1543   if (NULL != stat_timeout_task)
1544   {
1545     GNUNET_SCHEDULER_cancel (stat_timeout_task);
1546     stat_timeout_task = NULL;
1547   }
1548   GNUNET_free_non_null (plugin_name);
1549   plugin_name = NULL;
1550   if (last_sync > 0)
1551     sync_stats ();
1552   if (NULL != stats)
1553   {
1554     GNUNET_STATISTICS_destroy (stats,
1555                                GNUNET_YES);
1556     stats = NULL;
1557   }
1558   GNUNET_free (quota_stat_name);
1559   quota_stat_name = NULL;
1560 }
1561
1562
1563 /**
1564  * Add a client to our list of active clients.
1565  *
1566  * @param cls NULL
1567  * @param client client to add
1568  * @param mq message queue for @a client
1569  * @return @a client
1570  */
1571 static void *
1572 client_connect_cb (void *cls,
1573                    struct GNUNET_SERVICE_Client *client,
1574                    struct GNUNET_MQ_Handle *mq)
1575 {
1576   return client;
1577 }
1578
1579
1580 /**
1581  * Called whenever a client is disconnected.
1582  * Frees our resources associated with that client.
1583  *
1584  * @param cls closure
1585  * @param client identification of the client
1586  * @param app_ctx must match @a client
1587  */
1588 static void
1589 client_disconnect_cb (void *cls,
1590                       struct GNUNET_SERVICE_Client *client,
1591                       void *app_ctx)
1592 {
1593   struct ReservationList *pos;
1594   struct ReservationList *prev;
1595   struct ReservationList *next;
1596
1597   GNUNET_assert (app_ctx == client);
1598   prev = NULL;
1599   pos = reservations;
1600   while (NULL != pos)
1601   {
1602     next = pos->next;
1603     if (pos->client == client)
1604     {
1605       if (NULL == prev)
1606         reservations = next;
1607       else
1608         prev->next = next;
1609       reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1610       GNUNET_free (pos);
1611     }
1612     else
1613     {
1614       prev = pos;
1615     }
1616     pos = next;
1617   }
1618   GNUNET_STATISTICS_set (stats,
1619                          gettext_noop ("# reserved"),
1620                          reserved,
1621                          GNUNET_NO);
1622
1623 }
1624
1625
1626 /**
1627  * Process datastore requests.
1628  *
1629  * @param cls closure
1630  * @param serv the initialized service
1631  * @param c configuration to use
1632  */
1633 static void
1634 run (void *cls,
1635      const struct GNUNET_CONFIGURATION_Handle *c,
1636      struct GNUNET_SERVICE_Handle *serv)
1637 {
1638   char *fn;
1639   char *pfn;
1640   unsigned int bf_size;
1641
1642   service = serv;
1643   cfg = c;
1644   if (GNUNET_OK !=
1645       GNUNET_CONFIGURATION_get_value_string (cfg,
1646                                              "DATASTORE",
1647                                              "DATABASE",
1648                                              &plugin_name))
1649   {
1650     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1651                                "DATABASE",
1652                                "DATASTORE");
1653     return;
1654   }
1655   GNUNET_asprintf (&quota_stat_name,
1656                    _("# bytes used in file-sharing datastore `%s'"),
1657                    plugin_name);
1658   if (GNUNET_OK !=
1659       GNUNET_CONFIGURATION_get_value_size (cfg,
1660                                            "DATASTORE",
1661                                            "QUOTA",
1662                                            &quota))
1663   {
1664     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1665                                "QUOTA",
1666                                "DATASTORE");
1667     return;
1668   }
1669   stats = GNUNET_STATISTICS_create ("datastore",
1670                                     cfg);
1671   GNUNET_STATISTICS_set (stats,
1672                          gettext_noop ("# quota"),
1673                          quota,
1674                          GNUNET_NO);
1675   cache_size = quota / 8;       /* Or should we make this an option? */
1676   GNUNET_STATISTICS_set (stats,
1677                          gettext_noop ("# cache size"),
1678                          cache_size,
1679                          GNUNET_NO);
1680   if (quota / (32 * 1024LL) > MAX_BF_SIZE)
1681     bf_size = MAX_BF_SIZE;
1682   else
1683     bf_size = quota / (32 * 1024LL);         /* 8 bit per entry, 1 bit per 32 kb in DB */
1684   fn = NULL;
1685   if ((GNUNET_OK !=
1686        GNUNET_CONFIGURATION_get_value_filename (cfg,
1687                                                 "DATASTORE",
1688                                                 "BLOOMFILTER",
1689                                                 &fn)) ||
1690       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
1691   {
1692     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1693                 _("Could not use specified filename `%s' for bloomfilter.\n"),
1694                 NULL != fn ? fn : "");
1695     GNUNET_free_non_null (fn);
1696     fn = NULL;
1697   }
1698   if (NULL != fn)
1699   {
1700     GNUNET_asprintf (&pfn, "%s.%s", fn, plugin_name);
1701     if (GNUNET_YES == GNUNET_DISK_file_test (pfn))
1702     {
1703       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1704       if (NULL == filter)
1705       {
1706         /* file exists but not valid, remove and try again, but refresh */
1707         if (0 != UNLINK (pfn))
1708         {
1709           /* failed to remove, run without file */
1710           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1711                       _("Failed to remove bogus bloomfilter file `%s'\n"),
1712                       pfn);
1713           GNUNET_free (pfn);
1714           pfn = NULL;
1715           filter = GNUNET_CONTAINER_bloomfilter_load (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1716           refresh_bf = GNUNET_YES;
1717         }
1718         else
1719         {
1720           /* try again after remove */
1721           filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1722           refresh_bf = GNUNET_YES;
1723           if (NULL == filter)
1724           {
1725             /* failed yet again, give up on using file */
1726             GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1727                         _("Failed to remove bogus bloomfilter file `%s'\n"),
1728                         pfn);
1729             GNUNET_free (pfn);
1730             pfn = NULL;
1731             filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1732           }
1733         }
1734       }
1735       else
1736       {
1737         /* normal case: have an existing valid bf file, no need to refresh */
1738         refresh_bf = GNUNET_NO;
1739       }
1740     }
1741     else
1742     {
1743       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1744       refresh_bf = GNUNET_YES;
1745     }
1746     GNUNET_free (pfn);
1747   }
1748   else
1749   {
1750     filter = GNUNET_CONTAINER_bloomfilter_init (NULL,
1751                                                 bf_size,
1752                                                 5);      /* approx. 3% false positives at max use */
1753     refresh_bf = GNUNET_YES;
1754   }
1755   GNUNET_free_non_null (fn);
1756   if (NULL == filter)
1757   {
1758     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1759                 _("Failed to initialize bloomfilter.\n"));
1760     if (NULL != stats)
1761     {
1762       GNUNET_STATISTICS_destroy (stats,
1763                                  GNUNET_YES);
1764       stats = NULL;
1765     }
1766     return;
1767   }
1768   GNUNET_SERVICE_suspend (service);
1769   stat_get =
1770       GNUNET_STATISTICS_get (stats,
1771                              "datastore",
1772                              quota_stat_name,
1773                              &process_stat_done,
1774                              &process_stat_in,
1775                              NULL);
1776   if (NULL == stat_get)
1777     process_stat_done (NULL,
1778                        GNUNET_SYSERR);
1779   else
1780     stat_timeout_task
1781       = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
1782                                       &stat_timeout,
1783                                       NULL);
1784   GNUNET_SCHEDULER_add_shutdown (&cleaning_task,
1785                                  NULL);
1786 }
1787
1788
1789 /**
1790  * Define "main" method using service macro.
1791  */
1792 GNUNET_SERVICE_MAIN
1793 ("datastore",
1794  GNUNET_SERVICE_OPTION_NONE,
1795  &run,
1796  &client_connect_cb,
1797  &client_disconnect_cb,
1798  NULL,
1799  GNUNET_MQ_hd_fixed_size (reserve,
1800                           GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1801                           struct ReserveMessage,
1802                           NULL),
1803  GNUNET_MQ_hd_fixed_size (release_reserve,
1804                           GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1805                           struct ReleaseReserveMessage,
1806                           NULL),
1807  GNUNET_MQ_hd_var_size (put,
1808                         GNUNET_MESSAGE_TYPE_DATASTORE_PUT,
1809                         struct DataMessage,
1810                         NULL),
1811  GNUNET_MQ_hd_fixed_size (get,
1812                           GNUNET_MESSAGE_TYPE_DATASTORE_GET,
1813                           struct GetMessage,
1814                           NULL),
1815  GNUNET_MQ_hd_fixed_size (get_key,
1816                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY,
1817                           struct GetKeyMessage,
1818                           NULL),
1819  GNUNET_MQ_hd_fixed_size (get_replication,
1820                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1821                           struct GNUNET_MessageHeader,
1822                           NULL),
1823  GNUNET_MQ_hd_fixed_size (get_zero_anonymity,
1824                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1825                           struct GetZeroAnonymityMessage,
1826                           NULL),
1827  GNUNET_MQ_hd_var_size (remove,
1828                         GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE,
1829                         struct DataMessage,
1830                         NULL),
1831  GNUNET_MQ_hd_fixed_size (drop,
1832                           GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1833                           struct GNUNET_MessageHeader,
1834                           NULL),
1835  GNUNET_MQ_handler_end ());
1836
1837
1838 /* end of gnunet-service-datastore.c */