Merge branch 'master' of gnunet.org:gnunet
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2014, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * Limit size of bloom filter to 2 GB.
41  */
42 #define MAX_BF_SIZE ((uint32_t) (1LL << 31))
43
44 /**
45  * How long are we at most keeping "expired" content
46  * past the expiration date in the database?
47  */
48 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
49
50 /**
51  * How fast are we allowed to query the database for deleting
52  * expired content? (1 item per second).
53  */
54 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
55
56 /**
57  * Name under which we store current space consumption.
58  */
59 static char *quota_stat_name;
60
61 /**
62  * Task to timeout stat GET.
63  */
64 static struct GNUNET_SCHEDULER_Task *stat_timeout_task;
65
66 /**
67  * After how many payload-changing operations
68  * do we sync our statistics?
69  */
70 #define MAX_STAT_SYNC_LAG 50
71
72
73 /**
74  * Our datastore plugin.
75  */
76 struct DatastorePlugin
77 {
78
79   /**
80    * API of the transport as returned by the plugin's
81    * initialization function.
82    */
83   struct GNUNET_DATASTORE_PluginFunctions *api;
84
85   /**
86    * Short name for the plugin (i.e. "sqlite").
87    */
88   char *short_name;
89
90   /**
91    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
92    */
93   char *lib_name;
94
95   /**
96    * Environment this transport service is using
97    * for this plugin.
98    */
99   struct GNUNET_DATASTORE_PluginEnvironment env;
100
101 };
102
103
104 /**
105  * Linked list of active reservations.
106  */
107 struct ReservationList
108 {
109
110   /**
111    * This is a linked list.
112    */
113   struct ReservationList *next;
114
115   /**
116    * Client that made the reservation.
117    */
118   struct GNUNET_SERVICE_Client *client;
119
120   /**
121    * Number of bytes (still) reserved.
122    */
123   uint64_t amount;
124
125   /**
126    * Number of items (still) reserved.
127    */
128   uint64_t entries;
129
130   /**
131    * Reservation identifier.
132    */
133   int32_t rid;
134
135 };
136
137
138
139 /**
140  * Our datastore plugin (NULL if not available).
141  */
142 static struct DatastorePlugin *plugin;
143
144 /**
145  * Linked list of space reservations made by clients.
146  */
147 static struct ReservationList *reservations;
148
149 /**
150  * Bloomfilter to quickly tell if we don't have the content.
151  */
152 static struct GNUNET_CONTAINER_BloomFilter *filter;
153
154 /**
155  * Name of our plugin.
156  */
157 static char *plugin_name;
158
159 /**
160  * Our configuration.
161  */
162 static const struct GNUNET_CONFIGURATION_Handle *cfg;
163
164 /**
165  * Handle for reporting statistics.
166  */
167 static struct GNUNET_STATISTICS_Handle *stats;
168
169 /**
170  * How much space are we using for the cache?  (space available for
171  * insertions that will be instantly reclaimed by discarding less
172  * important content --- or possibly whatever we just inserted into
173  * the "cache").
174  */
175 static unsigned long long cache_size;
176
177 /**
178  * How much space have we currently reserved?
179  */
180 static unsigned long long reserved;
181
182 /**
183  * How much data are we currently storing
184  * in the database?
185  */
186 static unsigned long long payload;
187
188 /**
189  * Identity of the task that is used to delete
190  * expired content.
191  */
192 static struct GNUNET_SCHEDULER_Task *expired_kill_task;
193
194 /**
195  * Minimum time that content should have to not be discarded instantly
196  * (time stamp of any content that we've been discarding recently to
197  * stay below the quota).  FOREVER if we had to expire content with
198  * non-zero priority.
199  */
200 static struct GNUNET_TIME_Absolute min_expiration;
201
202 /**
203  * How much space are we allowed to use?
204  */
205 static unsigned long long quota;
206
207 /**
208  * Should the database be dropped on exit?
209  */
210 static int do_drop;
211
212 /**
213  * Should we refresh the BF when the DB is loaded?
214  */
215 static int refresh_bf;
216
217 /**
218  * Number of updates that were made to the
219  * payload value since we last synchronized
220  * it with the statistics service.
221  */
222 static unsigned int last_sync;
223
224 /**
225  * Did we get an answer from statistics?
226  */
227 static int stats_worked;
228
229
230 /**
231  * Synchronize our utilization statistics with the
232  * statistics service.
233  */
234 static void
235 sync_stats ()
236 {
237   GNUNET_STATISTICS_set (stats,
238                          quota_stat_name,
239                          payload,
240                          GNUNET_YES);
241   GNUNET_STATISTICS_set (stats,
242                          "# utilization by current datastore",
243                          payload,
244                          GNUNET_NO);
245   last_sync = 0;
246 }
247
248
249 /**
250  * Have we already cleaned up the TCCs and are hence no longer
251  * willing (or able) to transmit anything to anyone?
252  */
253 static int cleaning_done;
254
255 /**
256  * Handle for pending get request.
257  */
258 static struct GNUNET_STATISTICS_GetHandle *stat_get;
259
260 /**
261  * Handle to our server.
262  */
263 static struct GNUNET_SERVICE_Handle *service;
264
265 /**
266  * Task that is used to remove expired entries from
267  * the datastore.  This task will schedule itself
268  * again automatically to always delete all expired
269  * content quickly.
270  *
271  * @param cls not used
272  */
273 static void
274 delete_expired (void *cls);
275
276
277 /**
278  * Iterate over the expired items stored in the datastore.
279  * Delete all expired items; once we have processed all
280  * expired items, re-schedule the "delete_expired" task.
281  *
282  * @param cls not used
283  * @param key key for the content
284  * @param size number of bytes in data
285  * @param data content stored
286  * @param type type of the content
287  * @param priority priority of the content
288  * @param anonymity anonymity-level for the content
289  * @param expiration expiration time for the content
290  * @param uid unique identifier for the datum;
291  *        maybe 0 if no unique identifier is available
292  *
293  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
294  *         (continue on call to "next", of course),
295  *         #GNUNET_NO to delete the item and continue (if supported)
296  */
297 static int
298 expired_processor (void *cls,
299                    const struct GNUNET_HashCode *key,
300                    uint32_t size,
301                    const void *data,
302                    enum GNUNET_BLOCK_Type type,
303                    uint32_t priority,
304                    uint32_t anonymity,
305                    struct GNUNET_TIME_Absolute expiration,
306                    uint64_t uid)
307 {
308   struct GNUNET_TIME_Absolute now;
309
310   if (NULL == key)
311   {
312     expired_kill_task =
313         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
314                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
315                                                     &delete_expired, NULL);
316     return GNUNET_SYSERR;
317   }
318   now = GNUNET_TIME_absolute_get ();
319   if (expiration.abs_value_us > now.abs_value_us)
320   {
321     /* finished processing */
322     expired_kill_task =
323         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
324                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
325                                                     &delete_expired, NULL);
326     return GNUNET_SYSERR;
327   }
328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329               "Deleting content `%s' of type %u that expired %s ago\n",
330               GNUNET_h2s (key), type,
331               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_difference (expiration,
332                                                                                            now),
333                                                       GNUNET_YES));
334   min_expiration = now;
335   GNUNET_STATISTICS_update (stats,
336                             gettext_noop ("# bytes expired"),
337                             size,
338                             GNUNET_YES);
339   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
340   expired_kill_task =
341       GNUNET_SCHEDULER_add_delayed_with_priority (MIN_EXPIRE_DELAY,
342                                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
343                                                   &delete_expired, NULL);
344   return GNUNET_NO;
345 }
346
347
348 /**
349  * Task that is used to remove expired entries from
350  * the datastore.  This task will schedule itself
351  * again automatically to always delete all expired
352  * content quickly.
353  *
354  * @param cls not used
355  */
356 static void
357 delete_expired (void *cls)
358 {
359   expired_kill_task = NULL;
360   plugin->api->get_expiration (plugin->api->cls,
361                                &expired_processor,
362                                NULL);
363 }
364
365
366 /**
367  * An iterator over a set of items stored in the datastore
368  * that deletes until we're happy with respect to our quota.
369  *
370  * @param cls closure
371  * @param key key for the content
372  * @param size number of bytes in data
373  * @param data content stored
374  * @param type type of the content
375  * @param priority priority of the content
376  * @param anonymity anonymity-level for the content
377  * @param expiration expiration time for the content
378  * @param uid unique identifier for the datum;
379  *        maybe 0 if no unique identifier is available
380  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
381  *         (continue on call to "next", of course),
382  *         #GNUNET_NO to delete the item and continue (if supported)
383  */
384 static int
385 quota_processor (void *cls,
386                  const struct GNUNET_HashCode *key,
387                  uint32_t size,
388                  const void *data,
389                  enum GNUNET_BLOCK_Type type,
390                  uint32_t priority,
391                  uint32_t anonymity,
392                  struct GNUNET_TIME_Absolute expiration,
393                  uint64_t uid)
394 {
395   unsigned long long *need = cls;
396
397   if (NULL == key)
398     return GNUNET_SYSERR;
399   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
400               "Deleting %llu bytes of low-priority (%u) content `%s' of type %u at %s prior to expiration (still trying to free another %llu bytes)\n",
401               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
402               (unsigned int) priority,
403               GNUNET_h2s (key), type,
404               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
405                                                       GNUNET_YES),
406               *need);
407   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
408     *need = 0;
409   else
410     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
411   if (priority > 0)
412     min_expiration = GNUNET_TIME_UNIT_FOREVER_ABS;
413   else
414     min_expiration = expiration;
415   GNUNET_STATISTICS_update (stats,
416                             gettext_noop ("# bytes purged (low-priority)"),
417                             size, GNUNET_YES);
418   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
419   return GNUNET_NO;
420 }
421
422
423 /**
424  * Manage available disk space by running tasks
425  * that will discard content if necessary.  This
426  * function will be run whenever a request for
427  * "need" bytes of storage could only be satisfied
428  * by eating into the "cache" (and we want our cache
429  * space back).
430  *
431  * @param need number of bytes of content that were
432  *        placed into the "cache" (and hence the
433  *        number of bytes that should be removed).
434  */
435 static void
436 manage_space (unsigned long long need)
437 {
438   unsigned long long last;
439
440   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
441               "Asked to free up %llu bytes of cache space\n",
442               need);
443   last = 0;
444   while ((need > 0) && (last != need))
445   {
446     last = need;
447     plugin->api->get_expiration (plugin->api->cls,
448                                  &quota_processor,
449                                  &need);
450   }
451 }
452
453
454 /**
455  * Transmit a status code to the client.
456  *
457  * @param client receiver of the response
458  * @param code status code
459  * @param msg optional error message (can be NULL)
460  */
461 static void
462 transmit_status (struct GNUNET_SERVICE_Client *client,
463                  int code,
464                  const char *msg)
465 {
466   struct GNUNET_MQ_Envelope *env;
467   struct StatusMessage *sm;
468   size_t slen;
469
470   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471               "Transmitting `%s' message with value %d and message `%s'\n",
472               "STATUS", code, msg != NULL ? msg : "(none)");
473   slen = (msg == NULL) ? 0 : strlen (msg) + 1;
474   env = GNUNET_MQ_msg_extra (sm,
475                              slen,
476                              GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
477   sm->status = htonl (code);
478   sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration);
479   GNUNET_memcpy (&sm[1],
480                  msg,
481                  slen);
482   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
483                   env);
484 }
485
486
487 /**
488  * Function that will transmit the given datastore entry
489  * to the client.
490  *
491  * @param cls closure, pointer to the client (of type `struct GNUNET_SERVICE_Client`).
492  * @param key key for the content
493  * @param size number of bytes in data
494  * @param data content stored
495  * @param type type of the content
496  * @param priority priority of the content
497  * @param anonymity anonymity-level for the content
498  * @param expiration expiration time for the content
499  * @param uid unique identifier for the datum;
500  *        maybe 0 if no unique identifier is available
501  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue,
502  *         #GNUNET_NO to delete the item and continue (if supported)
503  */
504 static int
505 transmit_item (void *cls,
506                const struct GNUNET_HashCode *key,
507                uint32_t size,
508                const void *data,
509                enum GNUNET_BLOCK_Type type,
510                uint32_t priority,
511                uint32_t anonymity,
512                struct GNUNET_TIME_Absolute expiration,
513                uint64_t uid)
514 {
515   struct GNUNET_SERVICE_Client *client = cls;
516   struct GNUNET_MQ_Envelope *env;
517   struct GNUNET_MessageHeader *end;
518   struct DataMessage *dm;
519
520   if (NULL == key)
521   {
522     /* transmit 'DATA_END' */
523     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
524                 "Transmitting DATA_END message\n");
525     env = GNUNET_MQ_msg (end,
526                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
527     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
528                     env);
529     return GNUNET_OK;
530   }
531   GNUNET_assert (sizeof (struct DataMessage) + size <
532                  GNUNET_MAX_MESSAGE_SIZE);
533   env = GNUNET_MQ_msg_extra (dm,
534                              size,
535                              GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
536   dm->rid = htonl (0);
537   dm->size = htonl (size);
538   dm->type = htonl (type);
539   dm->priority = htonl (priority);
540   dm->anonymity = htonl (anonymity);
541   dm->replication = htonl (0);
542   dm->reserved = htonl (0);
543   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
544   dm->uid = GNUNET_htonll (uid);
545   dm->key = *key;
546   GNUNET_memcpy (&dm[1],
547                  data,
548                  size);
549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550               "Transmitting DATA message for `%s' of type %u with expiration %s (in: %s)\n",
551               GNUNET_h2s (key),
552               type,
553               GNUNET_STRINGS_absolute_time_to_string (expiration),
554               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
555                                                       GNUNET_YES));
556   GNUNET_STATISTICS_update (stats,
557                             gettext_noop ("# results found"),
558                             1,
559                             GNUNET_NO);
560   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
561                   env);
562   return GNUNET_OK;
563 }
564
565
566 /**
567  * Handle RESERVE-message.
568  *
569  * @param cls identification of the client
570  * @param message the actual message
571  */
572 static void
573 handle_reserve (void *cls,
574                 const struct ReserveMessage *msg)
575 {
576   /**
577    * Static counter to produce reservation identifiers.
578    */
579   static int reservation_gen;
580   struct GNUNET_SERVICE_Client *client = cls;
581   struct ReservationList *e;
582   unsigned long long used;
583   unsigned long long req;
584   uint64_t amount;
585   uint32_t entries;
586
587   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
588               "Processing RESERVE request\n");
589   amount = GNUNET_ntohll (msg->amount);
590   entries = ntohl (msg->entries);
591   used = payload + reserved;
592   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
593   if (used + req > quota)
594   {
595     if (quota < used)
596       used = quota;             /* cheat a bit for error message (to avoid negative numbers) */
597     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
598                 _("Insufficient space (%llu bytes are available) to satisfy RESERVE request for %llu bytes\n"),
599                 quota - used,
600                 req);
601     if (cache_size < req)
602     {
603       /* TODO: document this in the FAQ; essentially, if this
604        * message happens, the insertion request could be blocked
605        * by less-important content from migration because it is
606        * larger than 1/8th of the overall available space, and
607        * we only reserve 1/8th for "fresh" insertions */
608       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
609                   _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
610                   req,
611                   cache_size);
612       transmit_status (client,
613                        0,
614                        gettext_noop
615                        ("Insufficient space to satisfy request and "
616                         "requested amount is larger than cache size"));
617     }
618     else
619     {
620       transmit_status (client,
621                        0,
622                        gettext_noop ("Insufficient space to satisfy request"));
623     }
624     GNUNET_SERVICE_client_continue (client);
625     return;
626   }
627   reserved += req;
628   GNUNET_STATISTICS_set (stats,
629                          gettext_noop ("# reserved"),
630                          reserved,
631                          GNUNET_NO);
632   e = GNUNET_new (struct ReservationList);
633   e->next = reservations;
634   reservations = e;
635   e->client = client;
636   e->amount = amount;
637   e->entries = entries;
638   e->rid = ++reservation_gen;
639   if (reservation_gen < 0)
640     reservation_gen = 0;        /* wrap around */
641   transmit_status (client,
642                    e->rid,
643                    NULL);
644   GNUNET_SERVICE_client_continue (client);
645 }
646
647
648 /**
649  * Handle RELEASE_RESERVE-message.
650  *
651  * @param cls identification of the client
652  * @param message the actual message
653  */
654 static void
655 handle_release_reserve (void *cls,
656                         const struct ReleaseReserveMessage *msg)
657 {
658   struct GNUNET_SERVICE_Client *client = cls;
659   struct ReservationList *pos;
660   struct ReservationList *prev;
661   struct ReservationList *next;
662   int rid = ntohl (msg->rid);
663   unsigned long long rem;
664
665   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
666               "Processing RELEASE_RESERVE request\n");
667   next = reservations;
668   prev = NULL;
669   while (NULL != (pos = next))
670   {
671     next = pos->next;
672     if (rid == pos->rid)
673     {
674       if (prev == NULL)
675         reservations = next;
676       else
677         prev->next = next;
678       rem =
679           pos->amount +
680           ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
681       GNUNET_assert (reserved >= rem);
682       reserved -= rem;
683       GNUNET_STATISTICS_set (stats,
684                              gettext_noop ("# reserved"),
685                              reserved,
686                              GNUNET_NO);
687       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688                   "Returning %llu remaining reserved bytes to storage pool\n",
689                   rem);
690       GNUNET_free (pos);
691       transmit_status (client,
692                        GNUNET_OK,
693                        NULL);
694       GNUNET_SERVICE_client_continue (client);
695       return;
696     }
697     prev = pos;
698   }
699   GNUNET_break (0);
700   transmit_status (client,
701                    GNUNET_SYSERR,
702                    gettext_noop ("Could not find matching reservation"));
703   GNUNET_SERVICE_client_continue (client);
704 }
705
706
707 /**
708  * Check that the given message is a valid data message.
709  *
710  * @param dm message to check
711  * @return #GNUNET_SYSERR is not well-formed, otherwise #GNUNET_OK
712  */
713 static int
714 check_data (const struct DataMessage *dm)
715 {
716   uint16_t size;
717   uint32_t dsize;
718
719   size = ntohs (dm->header.size);
720   dsize = ntohl (dm->size);
721   if (size != dsize + sizeof (struct DataMessage))
722   {
723     GNUNET_break (0);
724     return GNUNET_SYSERR;
725   }
726   return GNUNET_OK;
727 }
728
729
730 /**
731  * Context for a PUT request used to see if the content is
732  * already present.
733  */
734 struct PutContext
735 {
736   /**
737    * Client to notify on completion.
738    */
739   struct GNUNET_SERVICE_Client *client;
740
741 #if ! HAVE_UNALIGNED_64_ACCESS
742   void *reserved;
743 #endif
744
745   /* followed by the 'struct DataMessage' */
746 };
747
748
749 /**
750  * Put continuation.
751  *
752  * @param cls closure
753  * @param key key for the item stored
754  * @param size size of the item stored
755  * @param status #GNUNET_OK or #GNUNET_SYSERROR
756  * @param msg error message on error
757  */
758 static void
759 put_continuation (void *cls,
760                   const struct GNUNET_HashCode *key,
761                   uint32_t size,
762                   int status,
763                   const char *msg)
764 {
765   struct PutContext *pc = cls;
766
767   if (GNUNET_OK == status)
768   {
769     GNUNET_STATISTICS_update (stats,
770                               gettext_noop ("# bytes stored"),
771                               size,
772                               GNUNET_YES);
773     GNUNET_CONTAINER_bloomfilter_add (filter,
774                                       key);
775     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776                 "Successfully stored %u bytes under key `%s'\n",
777                 size,
778                 GNUNET_h2s (key));
779   }
780   transmit_status (pc->client,
781                    status,
782                    msg);
783   GNUNET_free (pc);
784   if (quota - reserved - cache_size < payload)
785   {
786     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
787                 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
788                 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
789                 (unsigned long long) (quota - reserved - cache_size),
790                 (unsigned long long) payload);
791     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
792   }
793 }
794
795
796 /**
797  * Actually put the data message.
798  *
799  * @param pc put context
800  */
801 static void
802 execute_put (struct PutContext *pc)
803 {
804   const struct DataMessage *dm;
805
806   dm = (const struct DataMessage *) &pc[1];
807   plugin->api->put (plugin->api->cls,
808                     &dm->key,
809                     ntohl (dm->size),
810                     &dm[1],
811                     ntohl (dm->type),
812                     ntohl (dm->priority),
813                     ntohl (dm->anonymity),
814                     ntohl (dm->replication),
815                     GNUNET_TIME_absolute_ntoh (dm->expiration),
816                     &put_continuation,
817                     pc);
818 }
819
820
821 /**
822  *
823  * @param cls closure
824  * @param status #GNUNET_OK or #GNUNET_SYSERR
825  * @param msg error message on error
826  */
827 static void
828 check_present_continuation (void *cls,
829                             int status,
830                             const char *msg)
831 {
832   struct GNUNET_SERVICE_Client *client = cls;
833
834   transmit_status (client,
835                    GNUNET_NO,
836                    NULL);
837 }
838
839
840 /**
841  * Function that will check if the given datastore entry
842  * matches the put and if none match executes the put.
843  *
844  * @param cls closure, pointer to the client (of type `struct PutContext`).
845  * @param key key for the content
846  * @param size number of bytes in data
847  * @param data content stored
848  * @param type type of the content
849  * @param priority priority of the content
850  * @param anonymity anonymity-level for the content
851  * @param expiration expiration time for the content
852  * @param uid unique identifier for the datum;
853  *        maybe 0 if no unique identifier is available
854  * @return #GNUNET_OK usually
855  *         #GNUNET_NO to delete the item
856  */
857 static int
858 check_present (void *cls,
859                const struct GNUNET_HashCode *key,
860                uint32_t size,
861                const void *data,
862                enum GNUNET_BLOCK_Type type,
863                uint32_t priority,
864                uint32_t anonymity,
865                struct GNUNET_TIME_Absolute expiration,
866                uint64_t uid)
867 {
868   struct PutContext *pc = cls;
869   const struct DataMessage *dm;
870
871   dm = (const struct DataMessage *) &pc[1];
872   if (key == NULL)
873   {
874     execute_put (pc);
875     return GNUNET_OK;
876   }
877   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
878        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
879        ( (size == ntohl (dm->size)) &&
880          (0 == memcmp (&dm[1],
881                        data,
882                        size)) ) )
883   {
884     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885                 "Result already present in datastore\n");
886     /* FIXME: change API to allow increasing 'replication' counter */
887     if ((ntohl (dm->priority) > 0) ||
888         (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value_us >
889          expiration.abs_value_us))
890       plugin->api->update (plugin->api->cls,
891                            uid,
892                            ntohl (dm->priority),
893                            GNUNET_TIME_absolute_ntoh (dm->expiration),
894                            &check_present_continuation,
895                            pc->client);
896     else
897     {
898       transmit_status (pc->client,
899                        GNUNET_NO,
900                        NULL);
901     }
902     GNUNET_free (pc);
903   }
904   else
905   {
906     execute_put (pc);
907   }
908   return GNUNET_OK;
909 }
910
911
912 /**
913  * Verify PUT-message.
914  *
915  * @param cls identification of the client
916  * @param message the actual message
917  * @return #GNUNET_OK if @a dm is well-formed
918  */
919 static int
920 check_put (void *cls,
921            const struct DataMessage *dm)
922 {
923   if (GNUNET_OK != check_data (dm))
924   {
925     GNUNET_break (0);
926     return GNUNET_SYSERR;
927   }
928   return GNUNET_OK;
929 }
930
931
932 /**
933  * Handle PUT-message.
934  *
935  * @param cls identification of the client
936  * @param message the actual message
937  */
938 static void
939 handle_put (void *cls,
940             const struct DataMessage *dm)
941 {
942   struct GNUNET_SERVICE_Client *client = cls;
943   int rid;
944   struct ReservationList *pos;
945   struct PutContext *pc;
946   struct GNUNET_HashCode vhash;
947   uint32_t size;
948
949   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
950               "Processing PUT request for `%s' of type %u\n",
951               GNUNET_h2s (&dm->key),
952               (uint32_t) ntohl (dm->type));
953   rid = ntohl (dm->rid);
954   size = ntohl (dm->size);
955   if (rid > 0)
956   {
957     pos = reservations;
958     while ((NULL != pos) && (rid != pos->rid))
959       pos = pos->next;
960     GNUNET_break (pos != NULL);
961     if (NULL != pos)
962     {
963       GNUNET_break (pos->entries > 0);
964       GNUNET_break (pos->amount >= size);
965       pos->entries--;
966       pos->amount -= size;
967       reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
968       GNUNET_STATISTICS_set (stats,
969                              gettext_noop ("# reserved"),
970                              reserved,
971                              GNUNET_NO);
972     }
973   }
974   pc = GNUNET_malloc (sizeof (struct PutContext) + size +
975                       sizeof (struct DataMessage));
976   pc->client = client;
977   GNUNET_memcpy (&pc[1],
978                  dm,
979                  size + sizeof (struct DataMessage));
980   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
981                                                        &dm->key))
982   {
983     GNUNET_CRYPTO_hash (&dm[1],
984                         size,
985                         &vhash);
986     plugin->api->get_key (plugin->api->cls,
987                           0,
988                           false,
989                           &dm->key,
990                           &vhash,
991                           ntohl (dm->type),
992                           &check_present,
993                           pc);
994     GNUNET_SERVICE_client_continue (client);
995     return;
996   }
997   execute_put (pc);
998   GNUNET_SERVICE_client_continue (client);
999 }
1000
1001
1002 /**
1003  * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET-message.
1004  *
1005  * @param cls identification of the client
1006  * @param msg the actual message
1007  */
1008 static void
1009 handle_get (void *cls,
1010             const struct GetMessage *msg)
1011 {
1012   struct GNUNET_SERVICE_Client *client = cls;
1013
1014   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1015               "Processing GET request of type %u\n",
1016               (uint32_t) ntohl (msg->type));
1017   GNUNET_STATISTICS_update (stats,
1018                             gettext_noop ("# GET requests received"),
1019                             1,
1020                             GNUNET_NO);
1021   plugin->api->get_key (plugin->api->cls,
1022                         GNUNET_ntohll (msg->next_uid),
1023                         msg->random,
1024                         NULL,
1025                         NULL,
1026                         ntohl (msg->type),
1027                         &transmit_item,
1028                         client);
1029   GNUNET_SERVICE_client_continue (client);
1030 }
1031
1032
1033 /**
1034  * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY-message.
1035  *
1036  * @param cls closure
1037  * @param msg the actual message
1038  */
1039 static void
1040 handle_get_key (void *cls,
1041                 const struct GetKeyMessage *msg)
1042 {
1043   struct GNUNET_SERVICE_Client *client = cls;
1044
1045   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1046               "Processing GET request for `%s' of type %u\n",
1047               GNUNET_h2s (&msg->key),
1048               (uint32_t) ntohl (msg->type));
1049   GNUNET_STATISTICS_update (stats,
1050                             gettext_noop ("# GET KEY requests received"),
1051                             1,
1052                             GNUNET_NO);
1053   if (GNUNET_YES !=
1054       GNUNET_CONTAINER_bloomfilter_test (filter,
1055                                          &msg->key))
1056   {
1057     /* don't bother database... */
1058     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059                 "Empty result set for GET request for `%s' (bloomfilter).\n",
1060                 GNUNET_h2s (&msg->key));
1061     GNUNET_STATISTICS_update (stats,
1062                               gettext_noop
1063                               ("# requests filtered by bloomfilter"),
1064                               1,
1065                               GNUNET_NO);
1066     transmit_item (client,
1067                    NULL, 0, NULL, 0, 0, 0,
1068                    GNUNET_TIME_UNIT_ZERO_ABS,
1069                    0);
1070     GNUNET_SERVICE_client_continue (client);
1071     return;
1072   }
1073   plugin->api->get_key (plugin->api->cls,
1074                         GNUNET_ntohll (msg->next_uid),
1075                         msg->random,
1076                         &msg->key,
1077                         NULL,
1078                         ntohl (msg->type),
1079                         &transmit_item,
1080                         client);
1081   GNUNET_SERVICE_client_continue (client);
1082 }
1083
1084
1085 /**
1086  * Handle GET_REPLICATION-message.
1087  *
1088  * @param cls identification of the client
1089  * @param message the actual message
1090  */
1091 static void
1092 handle_get_replication (void *cls,
1093                         const struct GNUNET_MessageHeader *message)
1094 {
1095   struct GNUNET_SERVICE_Client *client = cls;
1096
1097   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098               "Processing GET_REPLICATION request\n");
1099   GNUNET_STATISTICS_update (stats,
1100                             gettext_noop ("# GET REPLICATION requests received"),
1101                             1,
1102                             GNUNET_NO);
1103   plugin->api->get_replication (plugin->api->cls,
1104                                 &transmit_item,
1105                                 client);
1106   GNUNET_SERVICE_client_continue (client);
1107 }
1108
1109
1110 /**
1111  * Handle GET_ZERO_ANONYMITY-message.
1112  *
1113  * @param cls client identification of the client
1114  * @param message the actual message
1115  */
1116 static void
1117 handle_get_zero_anonymity (void *cls,
1118                            const struct GetZeroAnonymityMessage *msg)
1119 {
1120   struct GNUNET_SERVICE_Client *client = cls;
1121   enum GNUNET_BLOCK_Type type;
1122
1123   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1124   if (type == GNUNET_BLOCK_TYPE_ANY)
1125   {
1126     GNUNET_break (0);
1127     GNUNET_SERVICE_client_drop (client);
1128     return;
1129   }
1130   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1131               "Processing GET_ZERO_ANONYMITY request\n");
1132   GNUNET_STATISTICS_update (stats,
1133                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1134                             1,
1135                             GNUNET_NO);
1136   plugin->api->get_zero_anonymity (plugin->api->cls,
1137                                    GNUNET_ntohll (msg->next_uid),
1138                                    type,
1139                                    &transmit_item,
1140                                    client);
1141   GNUNET_SERVICE_client_continue (client);
1142 }
1143
1144
1145 /**
1146  * Callback function that will cause the item that is passed
1147  * in to be deleted (by returning #GNUNET_NO).
1148  *
1149  * @param cls closure
1150  * @param key key for the content
1151  * @param size number of bytes in data
1152  * @param data content stored
1153  * @param type type of the content
1154  * @param priority priority of the content
1155  * @param anonymity anonymity-level for the content
1156  * @param expiration expiration time for the content
1157  * @param uid unique identifier for the datum
1158  * @return #GNUNET_OK to keep the item
1159  *         #GNUNET_NO to delete the item
1160  */
1161 static int
1162 remove_callback (void *cls,
1163                  const struct GNUNET_HashCode *key,
1164                  uint32_t size,
1165                  const void *data,
1166                  enum GNUNET_BLOCK_Type type,
1167                  uint32_t priority,
1168                  uint32_t anonymity,
1169                  struct GNUNET_TIME_Absolute expiration,
1170                  uint64_t uid)
1171 {
1172   struct GNUNET_SERVICE_Client *client = cls;
1173
1174   if (NULL == key)
1175   {
1176     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1177                 "No further matches for REMOVE request.\n");
1178     transmit_status (client,
1179                      GNUNET_NO,
1180                      _("Content not found"));
1181     return GNUNET_OK;           /* last item */
1182   }
1183   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1184               "Item %llu matches REMOVE request for key `%s' and type %u.\n",
1185               (unsigned long long) uid,
1186               GNUNET_h2s (key),
1187               type);
1188   GNUNET_STATISTICS_update (stats,
1189                             gettext_noop ("# bytes removed (explicit request)"),
1190                             size,
1191                             GNUNET_YES);
1192   GNUNET_CONTAINER_bloomfilter_remove (filter,
1193                                        key);
1194   transmit_status (client,
1195                    GNUNET_OK,
1196                    NULL);
1197   return GNUNET_NO;
1198 }
1199
1200
1201 /**
1202  * Verify REMOVE-message.
1203  *
1204  * @param cls identification of the client
1205  * @param message the actual message
1206  * @return #GNUNET_OK if @a dm is well-formed
1207  */
1208 static int
1209 check_remove (void *cls,
1210               const struct DataMessage *dm)
1211 {
1212   if (GNUNET_OK != check_data (dm))
1213   {
1214     GNUNET_break (0);
1215     return GNUNET_SYSERR;
1216   }
1217   return GNUNET_OK;
1218 }
1219
1220
1221 /**
1222  * Handle REMOVE-message.
1223  *
1224  * @param cls closure
1225  * @param client identification of the client
1226  * @param message the actual message
1227  */
1228 static void
1229 handle_remove (void *cls,
1230                const struct DataMessage *dm)
1231 {
1232   struct GNUNET_SERVICE_Client *client = cls;
1233   struct GNUNET_HashCode vhash;
1234
1235   GNUNET_STATISTICS_update (stats,
1236                             gettext_noop ("# REMOVE requests received"),
1237                             1, GNUNET_NO);
1238   GNUNET_CRYPTO_hash (&dm[1],
1239                       ntohl (dm->size),
1240                       &vhash);
1241   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1242               "Processing REMOVE request for `%s' of type %u\n",
1243               GNUNET_h2s (&dm->key),
1244               (uint32_t) ntohl (dm->type));
1245   plugin->api->get_key (plugin->api->cls,
1246                         0,
1247                         false,
1248                         &dm->key,
1249                         &vhash,
1250                         (enum GNUNET_BLOCK_Type) ntohl (dm->type),
1251                         &remove_callback,
1252                         client);
1253   GNUNET_SERVICE_client_continue (client);
1254 }
1255
1256
1257 /**
1258  * Handle DROP-message.
1259  *
1260  * @param cls identification of the client
1261  * @param message the actual message
1262  */
1263 static void
1264 handle_drop (void *cls,
1265              const struct GNUNET_MessageHeader *message)
1266 {
1267   struct GNUNET_SERVICE_Client *client = cls;
1268
1269   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1270               "Processing DROP request\n");
1271   do_drop = GNUNET_YES;
1272   GNUNET_SERVICE_client_continue (client);
1273 }
1274
1275
1276 /**
1277  * Function called by plugins to notify us about a
1278  * change in their disk utilization.
1279  *
1280  * @param cls closure (NULL)
1281  * @param delta change in disk utilization,
1282  *        0 for "reset to empty"
1283  */
1284 static void
1285 disk_utilization_change_cb (void *cls,
1286                             int delta)
1287 {
1288   if ((delta < 0) && (payload < -delta))
1289   {
1290     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1291                 _("Datastore payload must have been inaccurate (%lld < %lld). Recomputing it.\n"),
1292                 (long long) payload,
1293                 (long long) -delta);
1294     plugin->api->estimate_size (plugin->api->cls,
1295                                 &payload);
1296     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1297                 _("New payload: %lld\n"),
1298                 (long long) payload);
1299      sync_stats ();
1300     return;
1301   }
1302   payload += delta;
1303   last_sync++;
1304   if (last_sync >= MAX_STAT_SYNC_LAG)
1305     sync_stats ();
1306 }
1307
1308
1309 /**
1310  * Callback function to process statistic values.
1311  *
1312  * @param cls closure (struct Plugin*)
1313  * @param subsystem name of subsystem that created the statistic
1314  * @param name the name of the datum
1315  * @param value the current value
1316  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
1317  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
1318  */
1319 static int
1320 process_stat_in (void *cls,
1321                  const char *subsystem,
1322                  const char *name,
1323                  uint64_t value,
1324                  int is_persistent)
1325 {
1326   GNUNET_assert (GNUNET_NO == stats_worked);
1327   stats_worked = GNUNET_YES;
1328   payload += value;
1329   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1330               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1331               (unsigned long long) value,
1332               (unsigned long long) payload);
1333   return GNUNET_OK;
1334 }
1335
1336
1337 /**
1338  * Load the datastore plugin.
1339  */
1340 static struct DatastorePlugin *
1341 load_plugin ()
1342 {
1343   struct DatastorePlugin *ret;
1344   char *libname;
1345
1346   ret = GNUNET_new (struct DatastorePlugin);
1347   ret->env.cfg = cfg;
1348   ret->env.duc = &disk_utilization_change_cb;
1349   ret->env.cls = NULL;
1350   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1351               _("Loading `%s' datastore plugin\n"),
1352               plugin_name);
1353   GNUNET_asprintf (&libname,
1354                    "libgnunet_plugin_datastore_%s",
1355                    plugin_name);
1356   ret->short_name = GNUNET_strdup (plugin_name);
1357   ret->lib_name = libname;
1358   ret->api = GNUNET_PLUGIN_load (libname,
1359                                  &ret->env);
1360   if (NULL == ret->api)
1361   {
1362     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1363                 _("Failed to load datastore plugin for `%s'\n"),
1364                 plugin_name);
1365     GNUNET_free (ret->short_name);
1366     GNUNET_free (libname);
1367     GNUNET_free (ret);
1368     return NULL;
1369   }
1370   return ret;
1371 }
1372
1373
1374 /**
1375  * Function called when the service shuts
1376  * down.  Unloads our datastore plugin.
1377  *
1378  * @param plug plugin to unload
1379  */
1380 static void
1381 unload_plugin (struct DatastorePlugin *plug)
1382 {
1383   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1384               "Datastore service is unloading plugin...\n");
1385   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1386   GNUNET_free (plug->lib_name);
1387   GNUNET_free (plug->short_name);
1388   GNUNET_free (plug);
1389 }
1390
1391
1392 /**
1393  * Initialization complete, start operating the service.
1394  */
1395 static void
1396 begin_service ()
1397 {
1398   GNUNET_SERVICE_resume (service);
1399   expired_kill_task
1400     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1401                                           &delete_expired,
1402                                           NULL);
1403 }
1404
1405
1406 /**
1407  * Adds a given @a key to the bloomfilter in @a cls @a count times.
1408  *
1409  * @param cls the bloomfilter
1410  * @param key key to add
1411  * @param count number of times to add key
1412  */
1413 static void
1414 add_key_to_bloomfilter (void *cls,
1415                         const struct GNUNET_HashCode *key,
1416                         unsigned int count)
1417 {
1418   struct GNUNET_CONTAINER_BloomFilter *bf = cls;
1419
1420   if (NULL == key)
1421   {
1422     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1423                 _("Bloomfilter construction complete.\n"));
1424     begin_service ();
1425     return;
1426   }
1427
1428   while (0 < count--)
1429     GNUNET_CONTAINER_bloomfilter_add (bf,
1430                                       key);
1431 }
1432
1433
1434 /**
1435  * We finished receiving the statistic.  Initialize the plugin; if
1436  * loading the statistic failed, run the estimator.
1437  *
1438  * @param cls NULL
1439  * @param success #GNUNET_NO if we failed to read the stat
1440  */
1441 static void
1442 process_stat_done (void *cls,
1443                    int success)
1444 {
1445   stat_get = NULL;
1446   if (NULL != stat_timeout_task)
1447   {
1448     GNUNET_SCHEDULER_cancel (stat_timeout_task);
1449     stat_timeout_task = NULL;
1450   }
1451   plugin = load_plugin ();
1452   if (NULL == plugin)
1453   {
1454     GNUNET_CONTAINER_bloomfilter_free (filter);
1455     filter = NULL;
1456     if (NULL != stats)
1457     {
1458       GNUNET_STATISTICS_destroy (stats,
1459                                  GNUNET_YES);
1460       stats = NULL;
1461     }
1462     return;
1463   }
1464
1465   if (GNUNET_NO == stats_worked)
1466   {
1467     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1468                 "Failed to obtain value from statistics service, recomputing it\n");
1469     plugin->api->estimate_size (plugin->api->cls,
1470                                 &payload);
1471     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1472                 _("New payload: %lld\n"),
1473                 (long long) payload);
1474   }
1475
1476   if (GNUNET_YES == refresh_bf)
1477   {
1478     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1479                 _("Rebuilding bloomfilter.  Please be patient.\n"));
1480     if (NULL != plugin->api->get_keys)
1481     {
1482       plugin->api->get_keys (plugin->api->cls,
1483                              &add_key_to_bloomfilter,
1484                              filter);
1485       return;
1486     }
1487     else
1488     {
1489       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1490                   _("Plugin does not support get_keys function. Please fix!\n"));
1491     }
1492   }
1493   begin_service ();
1494 }
1495
1496
1497 /**
1498  * Fetching stats took to long, run without.
1499  *
1500  * @param cls NULL
1501  */
1502 static void
1503 stat_timeout (void *cls)
1504 {
1505   stat_timeout_task = NULL;
1506   GNUNET_STATISTICS_get_cancel (stat_get);
1507   process_stat_done (NULL,
1508                      GNUNET_NO);
1509 }
1510
1511
1512 /**
1513  * Task run during shutdown.
1514  */
1515 static void
1516 cleaning_task (void *cls)
1517 {
1518   cleaning_done = GNUNET_YES;
1519   if (NULL != expired_kill_task)
1520   {
1521     GNUNET_SCHEDULER_cancel (expired_kill_task);
1522     expired_kill_task = NULL;
1523   }
1524   if (GNUNET_YES == do_drop)
1525   {
1526     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1527                 "Dropping database!\n");
1528     plugin->api->drop (plugin->api->cls);
1529     payload = 0;
1530     last_sync++;
1531   }
1532   if (NULL != plugin)
1533   {
1534     unload_plugin (plugin);
1535     plugin = NULL;
1536   }
1537   if (NULL != filter)
1538   {
1539     GNUNET_CONTAINER_bloomfilter_free (filter);
1540     filter = NULL;
1541   }
1542   if (NULL != stat_get)
1543   {
1544     GNUNET_STATISTICS_get_cancel (stat_get);
1545     stat_get = NULL;
1546   }
1547   if (NULL != stat_timeout_task)
1548   {
1549     GNUNET_SCHEDULER_cancel (stat_timeout_task);
1550     stat_timeout_task = NULL;
1551   }
1552   GNUNET_free_non_null (plugin_name);
1553   plugin_name = NULL;
1554   if (last_sync > 0)
1555     sync_stats ();
1556   if (NULL != stats)
1557   {
1558     GNUNET_STATISTICS_destroy (stats,
1559                                GNUNET_YES);
1560     stats = NULL;
1561   }
1562   GNUNET_free (quota_stat_name);
1563   quota_stat_name = NULL;
1564 }
1565
1566
1567 /**
1568  * Add a client to our list of active clients.
1569  *
1570  * @param cls NULL
1571  * @param client client to add
1572  * @param mq message queue for @a client
1573  * @return @a client
1574  */
1575 static void *
1576 client_connect_cb (void *cls,
1577                    struct GNUNET_SERVICE_Client *client,
1578                    struct GNUNET_MQ_Handle *mq)
1579 {
1580   return client;
1581 }
1582
1583
1584 /**
1585  * Called whenever a client is disconnected.
1586  * Frees our resources associated with that client.
1587  *
1588  * @param cls closure
1589  * @param client identification of the client
1590  * @param app_ctx must match @a client
1591  */
1592 static void
1593 client_disconnect_cb (void *cls,
1594                       struct GNUNET_SERVICE_Client *client,
1595                       void *app_ctx)
1596 {
1597   struct ReservationList *pos;
1598   struct ReservationList *prev;
1599   struct ReservationList *next;
1600
1601   GNUNET_assert (app_ctx == client);
1602   prev = NULL;
1603   pos = reservations;
1604   while (NULL != pos)
1605   {
1606     next = pos->next;
1607     if (pos->client == client)
1608     {
1609       if (NULL == prev)
1610         reservations = next;
1611       else
1612         prev->next = next;
1613       reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1614       GNUNET_free (pos);
1615     }
1616     else
1617     {
1618       prev = pos;
1619     }
1620     pos = next;
1621   }
1622   GNUNET_STATISTICS_set (stats,
1623                          gettext_noop ("# reserved"),
1624                          reserved,
1625                          GNUNET_NO);
1626
1627 }
1628
1629
1630 /**
1631  * Process datastore requests.
1632  *
1633  * @param cls closure
1634  * @param serv the initialized service
1635  * @param c configuration to use
1636  */
1637 static void
1638 run (void *cls,
1639      const struct GNUNET_CONFIGURATION_Handle *c,
1640      struct GNUNET_SERVICE_Handle *serv)
1641 {
1642   char *fn;
1643   char *pfn;
1644   unsigned int bf_size;
1645
1646   service = serv;
1647   cfg = c;
1648   if (GNUNET_OK !=
1649       GNUNET_CONFIGURATION_get_value_string (cfg,
1650                                              "DATASTORE",
1651                                              "DATABASE",
1652                                              &plugin_name))
1653   {
1654     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1655                                "DATABASE",
1656                                "DATASTORE");
1657     return;
1658   }
1659   GNUNET_asprintf (&quota_stat_name,
1660                    _("# bytes used in file-sharing datastore `%s'"),
1661                    plugin_name);
1662   if (GNUNET_OK !=
1663       GNUNET_CONFIGURATION_get_value_size (cfg,
1664                                            "DATASTORE",
1665                                            "QUOTA",
1666                                            &quota))
1667   {
1668     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1669                                "QUOTA",
1670                                "DATASTORE");
1671     return;
1672   }
1673   stats = GNUNET_STATISTICS_create ("datastore",
1674                                     cfg);
1675   GNUNET_STATISTICS_set (stats,
1676                          gettext_noop ("# quota"),
1677                          quota,
1678                          GNUNET_NO);
1679   cache_size = quota / 8;       /* Or should we make this an option? */
1680   GNUNET_STATISTICS_set (stats,
1681                          gettext_noop ("# cache size"),
1682                          cache_size,
1683                          GNUNET_NO);
1684   if (quota / (32 * 1024LL) > MAX_BF_SIZE)
1685     bf_size = MAX_BF_SIZE;
1686   else
1687     bf_size = quota / (32 * 1024LL);         /* 8 bit per entry, 1 bit per 32 kb in DB */
1688   fn = NULL;
1689   if ((GNUNET_OK !=
1690        GNUNET_CONFIGURATION_get_value_filename (cfg,
1691                                                 "DATASTORE",
1692                                                 "BLOOMFILTER",
1693                                                 &fn)) ||
1694       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
1695   {
1696     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1697                 _("Could not use specified filename `%s' for bloomfilter.\n"),
1698                 NULL != fn ? fn : "");
1699     GNUNET_free_non_null (fn);
1700     fn = NULL;
1701   }
1702   if (NULL != fn)
1703   {
1704     GNUNET_asprintf (&pfn, "%s.%s", fn, plugin_name);
1705     if (GNUNET_YES == GNUNET_DISK_file_test (pfn))
1706     {
1707       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1708       if (NULL == filter)
1709       {
1710         /* file exists but not valid, remove and try again, but refresh */
1711         if (0 != UNLINK (pfn))
1712         {
1713           /* failed to remove, run without file */
1714           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1715                       _("Failed to remove bogus bloomfilter file `%s'\n"),
1716                       pfn);
1717           GNUNET_free (pfn);
1718           pfn = NULL;
1719           filter = GNUNET_CONTAINER_bloomfilter_load (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1720           refresh_bf = GNUNET_YES;
1721         }
1722         else
1723         {
1724           /* try again after remove */
1725           filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1726           refresh_bf = GNUNET_YES;
1727           if (NULL == filter)
1728           {
1729             /* failed yet again, give up on using file */
1730             GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1731                         _("Failed to remove bogus bloomfilter file `%s'\n"),
1732                         pfn);
1733             GNUNET_free (pfn);
1734             pfn = NULL;
1735             filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1736           }
1737         }
1738       }
1739       else
1740       {
1741         /* normal case: have an existing valid bf file, no need to refresh */
1742         refresh_bf = GNUNET_NO;
1743       }
1744     }
1745     else
1746     {
1747       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1748       refresh_bf = GNUNET_YES;
1749     }
1750     GNUNET_free (pfn);
1751   }
1752   else
1753   {
1754     filter = GNUNET_CONTAINER_bloomfilter_init (NULL,
1755                                                 bf_size,
1756                                                 5);      /* approx. 3% false positives at max use */
1757     refresh_bf = GNUNET_YES;
1758   }
1759   GNUNET_free_non_null (fn);
1760   if (NULL == filter)
1761   {
1762     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1763                 _("Failed to initialize bloomfilter.\n"));
1764     if (NULL != stats)
1765     {
1766       GNUNET_STATISTICS_destroy (stats,
1767                                  GNUNET_YES);
1768       stats = NULL;
1769     }
1770     return;
1771   }
1772   GNUNET_SERVICE_suspend (service);
1773   stat_get =
1774       GNUNET_STATISTICS_get (stats,
1775                              "datastore",
1776                              quota_stat_name,
1777                              &process_stat_done,
1778                              &process_stat_in,
1779                              NULL);
1780   if (NULL == stat_get)
1781     process_stat_done (NULL,
1782                        GNUNET_SYSERR);
1783   else
1784     stat_timeout_task
1785       = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
1786                                       &stat_timeout,
1787                                       NULL);
1788   GNUNET_SCHEDULER_add_shutdown (&cleaning_task,
1789                                  NULL);
1790 }
1791
1792
1793 /**
1794  * Define "main" method using service macro.
1795  */
1796 GNUNET_SERVICE_MAIN
1797 ("datastore",
1798  GNUNET_SERVICE_OPTION_NONE,
1799  &run,
1800  &client_connect_cb,
1801  &client_disconnect_cb,
1802  NULL,
1803  GNUNET_MQ_hd_fixed_size (reserve,
1804                           GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1805                           struct ReserveMessage,
1806                           NULL),
1807  GNUNET_MQ_hd_fixed_size (release_reserve,
1808                           GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1809                           struct ReleaseReserveMessage,
1810                           NULL),
1811  GNUNET_MQ_hd_var_size (put,
1812                         GNUNET_MESSAGE_TYPE_DATASTORE_PUT,
1813                         struct DataMessage,
1814                         NULL),
1815  GNUNET_MQ_hd_fixed_size (get,
1816                           GNUNET_MESSAGE_TYPE_DATASTORE_GET,
1817                           struct GetMessage,
1818                           NULL),
1819  GNUNET_MQ_hd_fixed_size (get_key,
1820                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY,
1821                           struct GetKeyMessage,
1822                           NULL),
1823  GNUNET_MQ_hd_fixed_size (get_replication,
1824                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1825                           struct GNUNET_MessageHeader,
1826                           NULL),
1827  GNUNET_MQ_hd_fixed_size (get_zero_anonymity,
1828                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1829                           struct GetZeroAnonymityMessage,
1830                           NULL),
1831  GNUNET_MQ_hd_var_size (remove,
1832                         GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE,
1833                         struct DataMessage,
1834                         NULL),
1835  GNUNET_MQ_hd_fixed_size (drop,
1836                           GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1837                           struct GNUNET_MessageHeader,
1838                           NULL),
1839  GNUNET_MQ_handler_end ());
1840
1841
1842 /* end of gnunet-service-datastore.c */