use NULL value in load_path_suffix to NOT load any files
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2014, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * Limit size of bloom filter to 2 GB.
41  */
42 #define MAX_BF_SIZE ((uint32_t) (1LL << 31))
43
44 /**
45  * How long are we at most keeping "expired" content
46  * past the expiration date in the database?
47  */
48 #define MAX_EXPIRE_DELAY \
49   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
50
51 /**
52  * How fast are we allowed to query the database for deleting
53  * expired content? (1 item per second).
54  */
55 #define MIN_EXPIRE_DELAY \
56   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
57
58 /**
59  * Name under which we store current space consumption.
60  */
61 static char *quota_stat_name;
62
63 /**
64  * Task to timeout stat GET.
65  */
66 static struct GNUNET_SCHEDULER_Task *stat_timeout_task;
67
68 /**
69  * After how many payload-changing operations
70  * do we sync our statistics?
71  */
72 #define MAX_STAT_SYNC_LAG 50
73
74
75 /**
76  * Our datastore plugin.
77  */
78 struct DatastorePlugin
79 {
80   /**
81    * API of the transport as returned by the plugin's
82    * initialization function.
83    */
84   struct GNUNET_DATASTORE_PluginFunctions *api;
85
86   /**
87    * Short name for the plugin (i.e. "sqlite").
88    */
89   char *short_name;
90
91   /**
92    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
93    */
94   char *lib_name;
95
96   /**
97    * Environment this transport service is using
98    * for this plugin.
99    */
100   struct GNUNET_DATASTORE_PluginEnvironment env;
101 };
102
103
104 /**
105  * Linked list of active reservations.
106  */
107 struct ReservationList
108 {
109   /**
110    * This is a linked list.
111    */
112   struct ReservationList *next;
113
114   /**
115    * Client that made the reservation.
116    */
117   struct GNUNET_SERVICE_Client *client;
118
119   /**
120    * Number of bytes (still) reserved.
121    */
122   uint64_t amount;
123
124   /**
125    * Number of items (still) reserved.
126    */
127   uint64_t entries;
128
129   /**
130    * Reservation identifier.
131    */
132   int32_t rid;
133 };
134
135
136 /**
137  * Our datastore plugin (NULL if not available).
138  */
139 static struct DatastorePlugin *plugin;
140
141 /**
142  * Linked list of space reservations made by clients.
143  */
144 static struct ReservationList *reservations;
145
146 /**
147  * Bloomfilter to quickly tell if we don't have the content.
148  */
149 static struct GNUNET_CONTAINER_BloomFilter *filter;
150
151 /**
152  * Name of our plugin.
153  */
154 static char *plugin_name;
155
156 /**
157  * Our configuration.
158  */
159 static const struct GNUNET_CONFIGURATION_Handle *cfg;
160
161 /**
162  * Handle for reporting statistics.
163  */
164 static struct GNUNET_STATISTICS_Handle *stats;
165
166 /**
167  * How much space are we using for the cache?  (space available for
168  * insertions that will be instantly reclaimed by discarding less
169  * important content --- or possibly whatever we just inserted into
170  * the "cache").
171  */
172 static unsigned long long cache_size;
173
174 /**
175  * How much space have we currently reserved?
176  */
177 static unsigned long long reserved;
178
179 /**
180  * How much data are we currently storing
181  * in the database?
182  */
183 static unsigned long long payload;
184
185 /**
186  * Identity of the task that is used to delete
187  * expired content.
188  */
189 static struct GNUNET_SCHEDULER_Task *expired_kill_task;
190
191 /**
192  * Minimum time that content should have to not be discarded instantly
193  * (time stamp of any content that we've been discarding recently to
194  * stay below the quota).  FOREVER if we had to expire content with
195  * non-zero priority.
196  */
197 static struct GNUNET_TIME_Absolute min_expiration;
198
199 /**
200  * How much space are we allowed to use?
201  */
202 static unsigned long long quota;
203
204 /**
205  * Should the database be dropped on exit?
206  */
207 static int do_drop;
208
209 /**
210  * Should we refresh the BF when the DB is loaded?
211  */
212 static int refresh_bf;
213
214 /**
215  * Number of updates that were made to the
216  * payload value since we last synchronized
217  * it with the statistics service.
218  */
219 static unsigned int last_sync;
220
221 /**
222  * Did we get an answer from statistics?
223  */
224 static int stats_worked;
225
226
227 /**
228  * Synchronize our utilization statistics with the
229  * statistics service.
230  */
231 static void
232 sync_stats ()
233 {
234   GNUNET_STATISTICS_set (stats, quota_stat_name, payload, GNUNET_YES);
235   GNUNET_STATISTICS_set (stats,
236                          "# utilization by current datastore",
237                          payload,
238                          GNUNET_NO);
239   last_sync = 0;
240 }
241
242
243 /**
244  * Have we already cleaned up the TCCs and are hence no longer
245  * willing (or able) to transmit anything to anyone?
246  */
247 static int cleaning_done;
248
249 /**
250  * Handle for pending get request.
251  */
252 static struct GNUNET_STATISTICS_GetHandle *stat_get;
253
254 /**
255  * Handle to our server.
256  */
257 static struct GNUNET_SERVICE_Handle *service;
258
259 /**
260  * Task that is used to remove expired entries from
261  * the datastore.  This task will schedule itself
262  * again automatically to always delete all expired
263  * content quickly.
264  *
265  * @param cls not used
266  */
267 static void
268 delete_expired (void *cls);
269
270
271 /**
272  * Iterate over the expired items stored in the datastore.
273  * Delete all expired items; once we have processed all
274  * expired items, re-schedule the "delete_expired" task.
275  *
276  * @param cls not used
277  * @param key key for the content
278  * @param size number of bytes in data
279  * @param data content stored
280  * @param type type of the content
281  * @param priority priority of the content
282  * @param anonymity anonymity-level for the content
283  * @param replication replication-level for the content
284  * @param expiration expiration time for the content
285  * @param uid unique identifier for the datum;
286  *        maybe 0 if no unique identifier is available
287  *
288  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
289  *         (continue on call to "next", of course),
290  *         #GNUNET_NO to delete the item and continue (if supported)
291  */
292 static int
293 expired_processor (void *cls,
294                    const struct GNUNET_HashCode *key,
295                    uint32_t size,
296                    const void *data,
297                    enum GNUNET_BLOCK_Type type,
298                    uint32_t priority,
299                    uint32_t anonymity,
300                    uint32_t replication,
301                    struct GNUNET_TIME_Absolute expiration,
302                    uint64_t uid)
303 {
304   struct GNUNET_TIME_Absolute now;
305
306   if (NULL == key)
307   {
308     expired_kill_task =
309       GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
310                                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
311                                                   &delete_expired,
312                                                   NULL);
313     return GNUNET_SYSERR;
314   }
315   now = GNUNET_TIME_absolute_get ();
316   if (expiration.abs_value_us > now.abs_value_us)
317   {
318     /* finished processing */
319     expired_kill_task =
320       GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
321                                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
322                                                   &delete_expired,
323                                                   NULL);
324     return GNUNET_SYSERR;
325   }
326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327               "Deleting content `%s' of type %u that expired %s ago\n",
328               GNUNET_h2s (key),
329               type,
330               GNUNET_STRINGS_relative_time_to_string (
331                 GNUNET_TIME_absolute_get_difference (expiration, now),
332                 GNUNET_YES));
333   min_expiration = now;
334   GNUNET_STATISTICS_update (stats,
335                             gettext_noop ("# bytes expired"),
336                             size,
337                             GNUNET_YES);
338   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
339   expired_kill_task =
340     GNUNET_SCHEDULER_add_delayed_with_priority (MIN_EXPIRE_DELAY,
341                                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
342                                                 &delete_expired,
343                                                 NULL);
344   return GNUNET_NO;
345 }
346
347
348 /**
349  * Task that is used to remove expired entries from
350  * the datastore.  This task will schedule itself
351  * again automatically to always delete all expired
352  * content quickly.
353  *
354  * @param cls not used
355  */
356 static void
357 delete_expired (void *cls)
358 {
359   expired_kill_task = NULL;
360   plugin->api->get_expiration (plugin->api->cls, &expired_processor, NULL);
361 }
362
363
364 /**
365  * An iterator over a set of items stored in the datastore
366  * that deletes until we're happy with respect to our quota.
367  *
368  * @param cls closure
369  * @param key key for the content
370  * @param size number of bytes in data
371  * @param data content stored
372  * @param type type of the content
373  * @param priority priority of the content
374  * @param anonymity anonymity-level for the content
375  * @param replication replication-level for the content
376  * @param expiration expiration time for the content
377  * @param uid unique identifier for the datum;
378  *        maybe 0 if no unique identifier is available
379  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
380  *         (continue on call to "next", of course),
381  *         #GNUNET_NO to delete the item and continue (if supported)
382  */
383 static int
384 quota_processor (void *cls,
385                  const struct GNUNET_HashCode *key,
386                  uint32_t size,
387                  const void *data,
388                  enum GNUNET_BLOCK_Type type,
389                  uint32_t priority,
390                  uint32_t anonymity,
391                  uint32_t replication,
392                  struct GNUNET_TIME_Absolute expiration,
393                  uint64_t uid)
394 {
395   unsigned long long *need = cls;
396
397   if (NULL == key)
398     return GNUNET_SYSERR;
399   GNUNET_log (
400     GNUNET_ERROR_TYPE_DEBUG,
401     "Deleting %llu bytes of low-priority (%u) content `%s' of type %u at %s prior to expiration (still trying to free another %llu bytes)\n",
402     (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
403     (unsigned int) priority,
404     GNUNET_h2s (key),
405     type,
406     GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (
407                                               expiration),
408                                             GNUNET_YES),
409     *need);
410   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
411     *need = 0;
412   else
413     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
414   if (priority > 0)
415     min_expiration = GNUNET_TIME_UNIT_FOREVER_ABS;
416   else
417     min_expiration = expiration;
418   GNUNET_STATISTICS_update (stats,
419                             gettext_noop ("# bytes purged (low-priority)"),
420                             size,
421                             GNUNET_YES);
422   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
423   return GNUNET_NO;
424 }
425
426
427 /**
428  * Manage available disk space by running tasks
429  * that will discard content if necessary.  This
430  * function will be run whenever a request for
431  * "need" bytes of storage could only be satisfied
432  * by eating into the "cache" (and we want our cache
433  * space back).
434  *
435  * @param need number of bytes of content that were
436  *        placed into the "cache" (and hence the
437  *        number of bytes that should be removed).
438  */
439 static void
440 manage_space (unsigned long long need)
441 {
442   unsigned long long last;
443
444   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
445               "Asked to free up %llu bytes of cache space\n",
446               need);
447   last = 0;
448   while ((need > 0) && (last != need))
449   {
450     last = need;
451     plugin->api->get_expiration (plugin->api->cls, &quota_processor, &need);
452   }
453 }
454
455
456 /**
457  * Transmit a status code to the client.
458  *
459  * @param client receiver of the response
460  * @param code status code
461  * @param msg optional error message (can be NULL)
462  */
463 static void
464 transmit_status (struct GNUNET_SERVICE_Client *client,
465                  int code,
466                  const char *msg)
467 {
468   struct GNUNET_MQ_Envelope *env;
469   struct StatusMessage *sm;
470   size_t slen;
471
472   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
473               "Transmitting `%s' message with value %d and message `%s'\n",
474               "STATUS",
475               code,
476               msg != NULL ? msg : "(none)");
477   slen = (msg == NULL) ? 0 : strlen (msg) + 1;
478   env = GNUNET_MQ_msg_extra (sm, slen, GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
479   sm->status = htonl (code);
480   sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration);
481   GNUNET_memcpy (&sm[1], msg, slen);
482   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
483 }
484
485
486 /**
487  * Function that will transmit the given datastore entry
488  * to the client.
489  *
490  * @param cls closure, pointer to the client (of type `struct GNUNET_SERVICE_Client`).
491  * @param key key for the content
492  * @param size number of bytes in data
493  * @param data content stored
494  * @param type type of the content
495  * @param priority priority of the content
496  * @param anonymity anonymity-level for the content
497  * @param replication replication-level for the content
498  * @param expiration expiration time for the content
499  * @param uid unique identifier for the datum;
500  *        maybe 0 if no unique identifier is available
501  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue,
502  *         #GNUNET_NO to delete the item and continue (if supported)
503  */
504 static int
505 transmit_item (void *cls,
506                const struct GNUNET_HashCode *key,
507                uint32_t size,
508                const void *data,
509                enum GNUNET_BLOCK_Type type,
510                uint32_t priority,
511                uint32_t anonymity,
512                uint32_t replication,
513                struct GNUNET_TIME_Absolute expiration,
514                uint64_t uid)
515 {
516   struct GNUNET_SERVICE_Client *client = cls;
517   struct GNUNET_MQ_Envelope *env;
518   struct GNUNET_MessageHeader *end;
519   struct DataMessage *dm;
520
521   if (NULL == key)
522   {
523     /* transmit 'DATA_END' */
524     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting DATA_END message\n");
525     env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
526     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
527     return GNUNET_OK;
528   }
529   GNUNET_assert (sizeof(struct DataMessage) + size < GNUNET_MAX_MESSAGE_SIZE);
530   env = GNUNET_MQ_msg_extra (dm, size, GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
531   dm->rid = htonl (0);
532   dm->size = htonl (size);
533   dm->type = htonl (type);
534   dm->priority = htonl (priority);
535   dm->anonymity = htonl (anonymity);
536   dm->replication = htonl (replication);
537   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
538   dm->uid = GNUNET_htonll (uid);
539   dm->key = *key;
540   GNUNET_memcpy (&dm[1], data, size);
541   GNUNET_log (
542     GNUNET_ERROR_TYPE_DEBUG,
543     "Transmitting DATA message for `%s' of type %u with expiration %s (in: %s)\n",
544     GNUNET_h2s (key),
545     type,
546     GNUNET_STRINGS_absolute_time_to_string (expiration),
547     GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (
548                                               expiration),
549                                             GNUNET_YES));
550   GNUNET_STATISTICS_update (stats,
551                             gettext_noop ("# results found"),
552                             1,
553                             GNUNET_NO);
554   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
555   return GNUNET_OK;
556 }
557
558
559 /**
560  * Handle RESERVE-message.
561  *
562  * @param cls identification of the client
563  * @param message the actual message
564  */
565 static void
566 handle_reserve (void *cls, const struct ReserveMessage *msg)
567 {
568   /**
569    * Static counter to produce reservation identifiers.
570    */
571   static int reservation_gen;
572   struct GNUNET_SERVICE_Client *client = cls;
573   struct ReservationList *e;
574   unsigned long long used;
575   unsigned long long req;
576   uint64_t amount;
577   uint32_t entries;
578
579   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing RESERVE request\n");
580   amount = GNUNET_ntohll (msg->amount);
581   entries = ntohl (msg->entries);
582   used = payload + reserved;
583   req =
584     amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
585   if (used + req > quota)
586   {
587     if (quota < used)
588       used =
589         quota;   /* cheat a bit for error message (to avoid negative numbers) */
590     GNUNET_log (
591       GNUNET_ERROR_TYPE_WARNING,
592       _ (
593         "Insufficient space (%llu bytes are available) to satisfy RESERVE request for %llu bytes\n"),
594       quota - used,
595       req);
596     if (cache_size < req)
597     {
598       /* TODO: document this in the FAQ; essentially, if this
599        * message happens, the insertion request could be blocked
600        * by less-important content from migration because it is
601        * larger than 1/8th of the overall available space, and
602        * we only reserve 1/8th for "fresh" insertions */GNUNET_log (
603         GNUNET_ERROR_TYPE_WARNING,
604         _ (
605           "The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
606         req,
607         cache_size);
608       transmit_status (client,
609                        0,
610                        gettext_noop (
611                          "Insufficient space to satisfy request and "
612                          "requested amount is larger than cache size"));
613     }
614     else
615     {
616       transmit_status (client,
617                        0,
618                        gettext_noop ("Insufficient space to satisfy request"));
619     }
620     GNUNET_SERVICE_client_continue (client);
621     return;
622   }
623   reserved += req;
624   GNUNET_STATISTICS_set (stats,
625                          gettext_noop ("# reserved"),
626                          reserved,
627                          GNUNET_NO);
628   e = GNUNET_new (struct ReservationList);
629   e->next = reservations;
630   reservations = e;
631   e->client = client;
632   e->amount = amount;
633   e->entries = entries;
634   e->rid = ++reservation_gen;
635   if (reservation_gen < 0)
636     reservation_gen = 0; /* wrap around */
637   transmit_status (client, e->rid, NULL);
638   GNUNET_SERVICE_client_continue (client);
639 }
640
641
642 /**
643  * Handle RELEASE_RESERVE-message.
644  *
645  * @param cls identification of the client
646  * @param message the actual message
647  */
648 static void
649 handle_release_reserve (void *cls, const struct ReleaseReserveMessage *msg)
650 {
651   struct GNUNET_SERVICE_Client *client = cls;
652   struct ReservationList *pos;
653   struct ReservationList *prev;
654   struct ReservationList *next;
655   int rid = ntohl (msg->rid);
656   unsigned long long rem;
657
658   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing RELEASE_RESERVE request\n");
659   next = reservations;
660   prev = NULL;
661   while (NULL != (pos = next))
662   {
663     next = pos->next;
664     if (rid == pos->rid)
665     {
666       if (prev == NULL)
667         reservations = next;
668       else
669         prev->next = next;
670       rem =
671         pos->amount
672         + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
673       GNUNET_assert (reserved >= rem);
674       reserved -= rem;
675       GNUNET_STATISTICS_set (stats,
676                              gettext_noop ("# reserved"),
677                              reserved,
678                              GNUNET_NO);
679       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
680                   "Returning %llu remaining reserved bytes to storage pool\n",
681                   rem);
682       GNUNET_free (pos);
683       transmit_status (client, GNUNET_OK, NULL);
684       GNUNET_SERVICE_client_continue (client);
685       return;
686     }
687     prev = pos;
688   }
689   GNUNET_break (0);
690   transmit_status (client,
691                    GNUNET_SYSERR,
692                    gettext_noop ("Could not find matching reservation"));
693   GNUNET_SERVICE_client_continue (client);
694 }
695
696
697 /**
698  * Check that the given message is a valid data message.
699  *
700  * @param dm message to check
701  * @return #GNUNET_SYSERR is not well-formed, otherwise #GNUNET_OK
702  */
703 static int
704 check_data (const struct DataMessage *dm)
705 {
706   uint16_t size;
707   uint32_t dsize;
708
709   size = ntohs (dm->header.size);
710   dsize = ntohl (dm->size);
711   if (size != dsize + sizeof(struct DataMessage))
712   {
713     GNUNET_break (0);
714     return GNUNET_SYSERR;
715   }
716   return GNUNET_OK;
717 }
718
719
720 /**
721  * Put continuation.
722  *
723  * @param cls closure
724  * @param key key for the item stored
725  * @param size size of the item stored
726  * @param status #GNUNET_OK if inserted, #GNUNET_NO if updated,
727  *        or #GNUNET_SYSERROR if error
728  * @param msg error message on error
729  */
730 static void
731 put_continuation (void *cls,
732                   const struct GNUNET_HashCode *key,
733                   uint32_t size,
734                   int status,
735                   const char *msg)
736 {
737   struct GNUNET_SERVICE_Client *client = cls;
738
739   if (GNUNET_OK == status)
740   {
741     GNUNET_STATISTICS_update (stats,
742                               gettext_noop ("# bytes stored"),
743                               size,
744                               GNUNET_YES);
745     GNUNET_CONTAINER_bloomfilter_add (filter, key);
746     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
747                 "Successfully stored %u bytes under key `%s'\n",
748                 size,
749                 GNUNET_h2s (key));
750   }
751   transmit_status (client,
752                    GNUNET_SYSERR == status ? GNUNET_SYSERR : GNUNET_OK,
753                    msg);
754   if (quota - reserved - cache_size < payload)
755   {
756     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
757                 _ ("Need %llu bytes more space (%llu allowed, using %llu)\n"),
758                 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
759                 (unsigned long long) (quota - reserved - cache_size),
760                 (unsigned long long) payload);
761     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
762   }
763 }
764
765
766 /**
767  * Verify PUT-message.
768  *
769  * @param cls identification of the client
770  * @param message the actual message
771  * @return #GNUNET_OK if @a dm is well-formed
772  */
773 static int
774 check_put (void *cls, const struct DataMessage *dm)
775 {
776   if (GNUNET_OK != check_data (dm))
777   {
778     GNUNET_break (0);
779     return GNUNET_SYSERR;
780   }
781   return GNUNET_OK;
782 }
783
784
785 /**
786  * Handle PUT-message.
787  *
788  * @param cls identification of the client
789  * @param message the actual message
790  */
791 static void
792 handle_put (void *cls, const struct DataMessage *dm)
793 {
794   struct GNUNET_SERVICE_Client *client = cls;
795   int rid;
796   struct ReservationList *pos;
797   uint32_t size;
798
799   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800               "Processing PUT request for `%s' of type %u\n",
801               GNUNET_h2s (&dm->key),
802               (uint32_t) ntohl (dm->type));
803   rid = ntohl (dm->rid);
804   size = ntohl (dm->size);
805   if (rid > 0)
806   {
807     pos = reservations;
808     while ((NULL != pos) && (rid != pos->rid))
809       pos = pos->next;
810     GNUNET_break (pos != NULL);
811     if (NULL != pos)
812     {
813       GNUNET_break (pos->entries > 0);
814       GNUNET_break (pos->amount >= size);
815       pos->entries--;
816       pos->amount -= size;
817       reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
818       GNUNET_STATISTICS_set (stats,
819                              gettext_noop ("# reserved"),
820                              reserved,
821                              GNUNET_NO);
822     }
823   }
824   bool absent =
825     GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (filter, &dm->key);
826   plugin->api->put (plugin->api->cls,
827                     &dm->key,
828                     absent,
829                     ntohl (dm->size),
830                     &dm[1],
831                     ntohl (dm->type),
832                     ntohl (dm->priority),
833                     ntohl (dm->anonymity),
834                     ntohl (dm->replication),
835                     GNUNET_TIME_absolute_ntoh (dm->expiration),
836                     &put_continuation,
837                     client);
838   GNUNET_SERVICE_client_continue (client);
839 }
840
841
842 /**
843  * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET-message.
844  *
845  * @param cls identification of the client
846  * @param msg the actual message
847  */
848 static void
849 handle_get (void *cls, const struct GetMessage *msg)
850 {
851   struct GNUNET_SERVICE_Client *client = cls;
852
853   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
854               "Processing GET request of type %u\n",
855               (uint32_t) ntohl (msg->type));
856   GNUNET_STATISTICS_update (stats,
857                             gettext_noop ("# GET requests received"),
858                             1,
859                             GNUNET_NO);
860   plugin->api->get_key (plugin->api->cls,
861                         GNUNET_ntohll (msg->next_uid),
862                         msg->random,
863                         NULL,
864                         ntohl (msg->type),
865                         &transmit_item,
866                         client);
867   GNUNET_SERVICE_client_continue (client);
868 }
869
870
871 /**
872  * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY-message.
873  *
874  * @param cls closure
875  * @param msg the actual message
876  */
877 static void
878 handle_get_key (void *cls, const struct GetKeyMessage *msg)
879 {
880   struct GNUNET_SERVICE_Client *client = cls;
881
882   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
883               "Processing GET request for `%s' of type %u\n",
884               GNUNET_h2s (&msg->key),
885               (uint32_t) ntohl (msg->type));
886   GNUNET_STATISTICS_update (stats,
887                             gettext_noop ("# GET KEY requests received"),
888                             1,
889                             GNUNET_NO);
890   if (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key))
891   {
892     /* don't bother database... */
893     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
894                 "Empty result set for GET request for `%s' (bloomfilter).\n",
895                 GNUNET_h2s (&msg->key));
896     GNUNET_STATISTICS_update (stats,
897                               gettext_noop (
898                                 "# requests filtered by bloomfilter"),
899                               1,
900                               GNUNET_NO);
901     transmit_item (client,
902                    NULL,
903                    0,
904                    NULL,
905                    0,
906                    0,
907                    0,
908                    0,
909                    GNUNET_TIME_UNIT_ZERO_ABS,
910                    0);
911     GNUNET_SERVICE_client_continue (client);
912     return;
913   }
914   plugin->api->get_key (plugin->api->cls,
915                         GNUNET_ntohll (msg->next_uid),
916                         msg->random,
917                         &msg->key,
918                         ntohl (msg->type),
919                         &transmit_item,
920                         client);
921   GNUNET_SERVICE_client_continue (client);
922 }
923
924
925 /**
926  * Handle GET_REPLICATION-message.
927  *
928  * @param cls identification of the client
929  * @param message the actual message
930  */
931 static void
932 handle_get_replication (void *cls, const struct GNUNET_MessageHeader *message)
933 {
934   struct GNUNET_SERVICE_Client *client = cls;
935
936   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing GET_REPLICATION request\n");
937   GNUNET_STATISTICS_update (stats,
938                             gettext_noop (
939                               "# GET REPLICATION requests received"),
940                             1,
941                             GNUNET_NO);
942   plugin->api->get_replication (plugin->api->cls, &transmit_item, client);
943   GNUNET_SERVICE_client_continue (client);
944 }
945
946
947 /**
948  * Handle GET_ZERO_ANONYMITY-message.
949  *
950  * @param cls client identification of the client
951  * @param message the actual message
952  */
953 static void
954 handle_get_zero_anonymity (void *cls, const struct GetZeroAnonymityMessage *msg)
955 {
956   struct GNUNET_SERVICE_Client *client = cls;
957   enum GNUNET_BLOCK_Type type;
958
959   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
960   if (type == GNUNET_BLOCK_TYPE_ANY)
961   {
962     GNUNET_break (0);
963     GNUNET_SERVICE_client_drop (client);
964     return;
965   }
966   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
967               "Processing GET_ZERO_ANONYMITY request\n");
968   GNUNET_STATISTICS_update (stats,
969                             gettext_noop (
970                               "# GET ZERO ANONYMITY requests received"),
971                             1,
972                             GNUNET_NO);
973   plugin->api->get_zero_anonymity (plugin->api->cls,
974                                    GNUNET_ntohll (msg->next_uid),
975                                    type,
976                                    &transmit_item,
977                                    client);
978   GNUNET_SERVICE_client_continue (client);
979 }
980
981
982 /**
983  * Remove continuation.
984  *
985  * @param cls closure
986  * @param key key for the content
987  * @param size number of bytes in data
988  * @param status #GNUNET_OK if removed, #GNUNET_NO if not found,
989  *        or #GNUNET_SYSERROR if error
990  * @param msg error message on error
991  */
992 static void
993 remove_continuation (void *cls,
994                      const struct GNUNET_HashCode *key,
995                      uint32_t size,
996                      int status,
997                      const char *msg)
998 {
999   struct GNUNET_SERVICE_Client *client = cls;
1000
1001   if (GNUNET_SYSERR == status)
1002   {
1003     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "REMOVE request failed: %s.\n", msg);
1004     transmit_status (client, GNUNET_NO, msg);
1005     return;
1006   }
1007   if (GNUNET_NO == status)
1008   {
1009     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010                 "Content not found for REMOVE request.\n");
1011     transmit_status (client, GNUNET_NO, _ ("Content not found"));
1012     return;
1013   }
1014   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1015               "Item matches REMOVE request for key `%s'.\n",
1016               GNUNET_h2s (key));
1017   GNUNET_STATISTICS_update (stats,
1018                             gettext_noop ("# bytes removed (explicit request)"),
1019                             size,
1020                             GNUNET_YES);
1021   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
1022   transmit_status (client, GNUNET_OK, NULL);
1023 }
1024
1025
1026 /**
1027  * Verify REMOVE-message.
1028  *
1029  * @param cls identification of the client
1030  * @param message the actual message
1031  * @return #GNUNET_OK if @a dm is well-formed
1032  */
1033 static int
1034 check_remove (void *cls, const struct DataMessage *dm)
1035 {
1036   if (GNUNET_OK != check_data (dm))
1037   {
1038     GNUNET_break (0);
1039     return GNUNET_SYSERR;
1040   }
1041   return GNUNET_OK;
1042 }
1043
1044
1045 /**
1046  * Handle REMOVE-message.
1047  *
1048  * @param cls closure
1049  * @param client identification of the client
1050  * @param message the actual message
1051  */
1052 static void
1053 handle_remove (void *cls, const struct DataMessage *dm)
1054 {
1055   struct GNUNET_SERVICE_Client *client = cls;
1056
1057   GNUNET_STATISTICS_update (stats,
1058                             gettext_noop ("# REMOVE requests received"),
1059                             1,
1060                             GNUNET_NO);
1061   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1062               "Processing REMOVE request for `%s'\n",
1063               GNUNET_h2s (&dm->key));
1064   plugin->api->remove_key (plugin->api->cls,
1065                            &dm->key,
1066                            ntohl (dm->size),
1067                            &dm[1],
1068                            &remove_continuation,
1069                            client);
1070   GNUNET_SERVICE_client_continue (client);
1071 }
1072
1073
1074 /**
1075  * Handle DROP-message.
1076  *
1077  * @param cls identification of the client
1078  * @param message the actual message
1079  */
1080 static void
1081 handle_drop (void *cls, const struct GNUNET_MessageHeader *message)
1082 {
1083   struct GNUNET_SERVICE_Client *client = cls;
1084
1085   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing DROP request\n");
1086   do_drop = GNUNET_YES;
1087   GNUNET_SERVICE_client_continue (client);
1088 }
1089
1090
1091 /**
1092  * Function called by plugins to notify us about a
1093  * change in their disk utilization.
1094  *
1095  * @param cls closure (NULL)
1096  * @param delta change in disk utilization,
1097  *        0 for "reset to empty"
1098  */
1099 static void
1100 disk_utilization_change_cb (void *cls, int delta)
1101 {
1102   if ((delta < 0) && (payload < -delta))
1103   {
1104     GNUNET_log (
1105       GNUNET_ERROR_TYPE_WARNING,
1106       _ (
1107         "Datastore payload must have been inaccurate (%lld < %lld). Recomputing it.\n"),
1108       (long long) payload,
1109       (long long) -delta);
1110     plugin->api->estimate_size (plugin->api->cls, &payload);
1111     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1112                 _ ("New payload: %lld\n"),
1113                 (long long) payload);
1114     sync_stats ();
1115     return;
1116   }
1117   payload += delta;
1118   last_sync++;
1119   if (last_sync >= MAX_STAT_SYNC_LAG)
1120     sync_stats ();
1121 }
1122
1123
1124 /**
1125  * Callback function to process statistic values.
1126  *
1127  * @param cls closure (struct Plugin*)
1128  * @param subsystem name of subsystem that created the statistic
1129  * @param name the name of the datum
1130  * @param value the current value
1131  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
1132  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
1133  */
1134 static int
1135 process_stat_in (void *cls,
1136                  const char *subsystem,
1137                  const char *name,
1138                  uint64_t value,
1139                  int is_persistent)
1140 {
1141   GNUNET_assert (GNUNET_NO == stats_worked);
1142   stats_worked = GNUNET_YES;
1143   payload += value;
1144   GNUNET_log (
1145     GNUNET_ERROR_TYPE_DEBUG,
1146     "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1147     (unsigned long long) value,
1148     (unsigned long long) payload);
1149   return GNUNET_OK;
1150 }
1151
1152
1153 /**
1154  * Load the datastore plugin.
1155  */
1156 static struct DatastorePlugin *
1157 load_plugin ()
1158 {
1159   struct DatastorePlugin *ret;
1160   char *libname;
1161
1162   ret = GNUNET_new (struct DatastorePlugin);
1163   ret->env.cfg = cfg;
1164   ret->env.duc = &disk_utilization_change_cb;
1165   ret->env.cls = NULL;
1166   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1167               _ ("Loading `%s' datastore plugin\n"),
1168               plugin_name);
1169   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", plugin_name);
1170   ret->short_name = GNUNET_strdup (plugin_name);
1171   ret->lib_name = libname;
1172   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1173   if (NULL == ret->api)
1174   {
1175     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1176                 _ ("Failed to load datastore plugin for `%s'\n"),
1177                 plugin_name);
1178     GNUNET_free (ret->short_name);
1179     GNUNET_free (libname);
1180     GNUNET_free (ret);
1181     return NULL;
1182   }
1183   return ret;
1184 }
1185
1186
1187 /**
1188  * Function called when the service shuts
1189  * down.  Unloads our datastore plugin.
1190  *
1191  * @param plug plugin to unload
1192  */
1193 static void
1194 unload_plugin (struct DatastorePlugin *plug)
1195 {
1196   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1197               "Datastore service is unloading plugin...\n");
1198   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1199   GNUNET_free (plug->lib_name);
1200   GNUNET_free (plug->short_name);
1201   GNUNET_free (plug);
1202 }
1203
1204
1205 /**
1206  * Initialization complete, start operating the service.
1207  */
1208 static void
1209 begin_service ()
1210 {
1211   GNUNET_SERVICE_resume (service);
1212   expired_kill_task =
1213     GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1214                                         &delete_expired,
1215                                         NULL);
1216 }
1217
1218
1219 /**
1220  * Adds a given @a key to the bloomfilter in @a cls @a count times.
1221  *
1222  * @param cls the bloomfilter
1223  * @param key key to add
1224  * @param count number of times to add key
1225  */
1226 static void
1227 add_key_to_bloomfilter (void *cls,
1228                         const struct GNUNET_HashCode *key,
1229                         unsigned int count)
1230 {
1231   struct GNUNET_CONTAINER_BloomFilter *bf = cls;
1232
1233   if (NULL == key)
1234   {
1235     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1236                 _ ("Bloomfilter construction complete.\n"));
1237     begin_service ();
1238     return;
1239   }
1240
1241   while (0 < count--)
1242     GNUNET_CONTAINER_bloomfilter_add (bf, key);
1243 }
1244
1245
1246 /**
1247  * We finished receiving the statistic.  Initialize the plugin; if
1248  * loading the statistic failed, run the estimator.
1249  *
1250  * @param cls NULL
1251  * @param success #GNUNET_NO if we failed to read the stat
1252  */
1253 static void
1254 process_stat_done (void *cls, int success)
1255 {
1256   stat_get = NULL;
1257   if (NULL != stat_timeout_task)
1258   {
1259     GNUNET_SCHEDULER_cancel (stat_timeout_task);
1260     stat_timeout_task = NULL;
1261   }
1262   plugin = load_plugin ();
1263   if (NULL == plugin)
1264   {
1265     GNUNET_CONTAINER_bloomfilter_free (filter);
1266     filter = NULL;
1267     if (NULL != stats)
1268     {
1269       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1270       stats = NULL;
1271     }
1272     return;
1273   }
1274
1275   if (GNUNET_NO == stats_worked)
1276   {
1277     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1278                 "Failed to obtain value from statistics service, recomputing it\n");
1279     plugin->api->estimate_size (plugin->api->cls, &payload);
1280     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1281                 _ ("New payload: %lld\n"),
1282                 (long long) payload);
1283   }
1284
1285   if (GNUNET_YES == refresh_bf)
1286   {
1287     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1288                 _ ("Rebuilding bloomfilter.  Please be patient.\n"));
1289     if (NULL != plugin->api->get_keys)
1290     {
1291       plugin->api->get_keys (plugin->api->cls, &add_key_to_bloomfilter, filter);
1292       return;
1293     }
1294     else
1295     {
1296       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1297                   _ (
1298                     "Plugin does not support get_keys function. Please fix!\n"));
1299     }
1300   }
1301   begin_service ();
1302 }
1303
1304
1305 /**
1306  * Fetching stats took to long, run without.
1307  *
1308  * @param cls NULL
1309  */
1310 static void
1311 stat_timeout (void *cls)
1312 {
1313   stat_timeout_task = NULL;
1314   GNUNET_STATISTICS_get_cancel (stat_get);
1315   process_stat_done (NULL, GNUNET_NO);
1316 }
1317
1318
1319 /**
1320  * Task run during shutdown.
1321  */
1322 static void
1323 cleaning_task (void *cls)
1324 {
1325   cleaning_done = GNUNET_YES;
1326   if (NULL != expired_kill_task)
1327   {
1328     GNUNET_SCHEDULER_cancel (expired_kill_task);
1329     expired_kill_task = NULL;
1330   }
1331   if (GNUNET_YES == do_drop)
1332   {
1333     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Dropping database!\n");
1334     plugin->api->drop (plugin->api->cls);
1335     payload = 0;
1336     last_sync++;
1337   }
1338   if (NULL != plugin)
1339   {
1340     unload_plugin (plugin);
1341     plugin = NULL;
1342   }
1343   if (NULL != filter)
1344   {
1345     GNUNET_CONTAINER_bloomfilter_free (filter);
1346     filter = NULL;
1347   }
1348   if (NULL != stat_get)
1349   {
1350     GNUNET_STATISTICS_get_cancel (stat_get);
1351     stat_get = NULL;
1352   }
1353   if (NULL != stat_timeout_task)
1354   {
1355     GNUNET_SCHEDULER_cancel (stat_timeout_task);
1356     stat_timeout_task = NULL;
1357   }
1358   GNUNET_free_non_null (plugin_name);
1359   plugin_name = NULL;
1360   if (last_sync > 0)
1361     sync_stats ();
1362   if (NULL != stats)
1363   {
1364     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1365     stats = NULL;
1366   }
1367   GNUNET_free (quota_stat_name);
1368   quota_stat_name = NULL;
1369 }
1370
1371
1372 /**
1373  * Add a client to our list of active clients.
1374  *
1375  * @param cls NULL
1376  * @param client client to add
1377  * @param mq message queue for @a client
1378  * @return @a client
1379  */
1380 static void *
1381 client_connect_cb (void *cls,
1382                    struct GNUNET_SERVICE_Client *client,
1383                    struct GNUNET_MQ_Handle *mq)
1384 {
1385   return client;
1386 }
1387
1388
1389 /**
1390  * Called whenever a client is disconnected.
1391  * Frees our resources associated with that client.
1392  *
1393  * @param cls closure
1394  * @param client identification of the client
1395  * @param app_ctx must match @a client
1396  */
1397 static void
1398 client_disconnect_cb (void *cls,
1399                       struct GNUNET_SERVICE_Client *client,
1400                       void *app_ctx)
1401 {
1402   struct ReservationList *pos;
1403   struct ReservationList *prev;
1404   struct ReservationList *next;
1405
1406   GNUNET_assert (app_ctx == client);
1407   prev = NULL;
1408   pos = reservations;
1409   while (NULL != pos)
1410   {
1411     next = pos->next;
1412     if (pos->client == client)
1413     {
1414       if (NULL == prev)
1415         reservations = next;
1416       else
1417         prev->next = next;
1418       reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1419       GNUNET_free (pos);
1420     }
1421     else
1422     {
1423       prev = pos;
1424     }
1425     pos = next;
1426   }
1427   GNUNET_STATISTICS_set (stats,
1428                          gettext_noop ("# reserved"),
1429                          reserved,
1430                          GNUNET_NO);
1431 }
1432
1433
1434 /**
1435  * Process datastore requests.
1436  *
1437  * @param cls closure
1438  * @param serv the initialized service
1439  * @param c configuration to use
1440  */
1441 static void
1442 run (void *cls,
1443      const struct GNUNET_CONFIGURATION_Handle *c,
1444      struct GNUNET_SERVICE_Handle *serv)
1445 {
1446   char *fn;
1447   char *pfn;
1448   unsigned int bf_size;
1449
1450   service = serv;
1451   cfg = c;
1452   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg,
1453                                                           "DATASTORE",
1454                                                           "DATABASE",
1455                                                           &plugin_name))
1456   {
1457     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1458                                "DATABASE",
1459                                "DATASTORE");
1460     return;
1461   }
1462   GNUNET_asprintf (&quota_stat_name,
1463                    _ ("# bytes used in file-sharing datastore `%s'"),
1464                    plugin_name);
1465   if (GNUNET_OK !=
1466       GNUNET_CONFIGURATION_get_value_size (cfg, "DATASTORE", "QUOTA", &quota))
1467   {
1468     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "QUOTA", "DATASTORE");
1469     return;
1470   }
1471   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1472   GNUNET_STATISTICS_set (stats, gettext_noop ("# quota"), quota, GNUNET_NO);
1473   cache_size = quota / 8; /* Or should we make this an option? */
1474   GNUNET_STATISTICS_set (stats,
1475                          gettext_noop ("# cache size"),
1476                          cache_size,
1477                          GNUNET_NO);
1478   if (quota / (32 * 1024LL) > MAX_BF_SIZE)
1479     bf_size = MAX_BF_SIZE;
1480   else
1481     bf_size =
1482       quota / (32 * 1024LL); /* 8 bit per entry, 1 bit per 32 kb in DB */
1483   fn = NULL;
1484   if ((GNUNET_OK != GNUNET_CONFIGURATION_get_value_filename (cfg,
1485                                                              "DATASTORE",
1486                                                              "BLOOMFILTER",
1487                                                              &fn)) ||
1488       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
1489   {
1490     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1491                 _ ("Could not use specified filename `%s' for bloomfilter.\n"),
1492                 NULL != fn ? fn : "");
1493     GNUNET_free_non_null (fn);
1494     fn = NULL;
1495   }
1496   if (NULL != fn)
1497   {
1498     GNUNET_asprintf (&pfn, "%s.%s", fn, plugin_name);
1499     if (GNUNET_YES == GNUNET_DISK_file_test (pfn))
1500     {
1501       filter =
1502         GNUNET_CONTAINER_bloomfilter_load (pfn,
1503                                            bf_size,
1504                                            5);    /* approx. 3% false positives at max use */
1505       if (NULL == filter)
1506       {
1507         /* file exists but not valid, remove and try again, but refresh */
1508         if (0 != unlink (pfn))
1509         {
1510           /* failed to remove, run without file */
1511           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1512                       _ ("Failed to remove bogus bloomfilter file `%s'\n"),
1513                       pfn);
1514           GNUNET_free (pfn);
1515           pfn = NULL;
1516           filter = GNUNET_CONTAINER_bloomfilter_load (
1517             NULL,
1518             bf_size,
1519             5);         /* approx. 3% false positives at max use */
1520           refresh_bf = GNUNET_YES;
1521         }
1522         else
1523         {
1524           /* try again after remove */
1525           filter = GNUNET_CONTAINER_bloomfilter_load (
1526             pfn,
1527             bf_size,
1528             5);         /* approx. 3% false positives at max use */
1529           refresh_bf = GNUNET_YES;
1530           if (NULL == filter)
1531           {
1532             /* failed yet again, give up on using file */
1533             GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1534                         _ ("Failed to remove bogus bloomfilter file `%s'\n"),
1535                         pfn);
1536             GNUNET_free (pfn);
1537             pfn = NULL;
1538             filter = GNUNET_CONTAINER_bloomfilter_init (
1539               NULL,
1540               bf_size,
1541               5);           /* approx. 3% false positives at max use */
1542           }
1543         }
1544       }
1545       else
1546       {
1547         /* normal case: have an existing valid bf file, no need to refresh */
1548         refresh_bf = GNUNET_NO;
1549       }
1550     }
1551     else
1552     {
1553       filter =
1554         GNUNET_CONTAINER_bloomfilter_load (pfn,
1555                                            bf_size,
1556                                            5);    /* approx. 3% false positives at max use */
1557       refresh_bf = GNUNET_YES;
1558     }
1559     GNUNET_free (pfn);
1560   }
1561   else
1562   {
1563     filter =
1564       GNUNET_CONTAINER_bloomfilter_init (NULL,
1565                                          bf_size,
1566                                          5);  /* approx. 3% false positives at max use */
1567     refresh_bf = GNUNET_YES;
1568   }
1569   GNUNET_free_non_null (fn);
1570   if (NULL == filter)
1571   {
1572     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1573                 _ ("Failed to initialize bloomfilter.\n"));
1574     if (NULL != stats)
1575     {
1576       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1577       stats = NULL;
1578     }
1579     return;
1580   }
1581   GNUNET_SERVICE_suspend (service);
1582   stat_get = GNUNET_STATISTICS_get (stats,
1583                                     "datastore",
1584                                     quota_stat_name,
1585                                     &process_stat_done,
1586                                     &process_stat_in,
1587                                     NULL);
1588   if (NULL == stat_get)
1589     process_stat_done (NULL, GNUNET_SYSERR);
1590   else
1591     stat_timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
1592                                                       &stat_timeout,
1593                                                       NULL);
1594   GNUNET_SCHEDULER_add_shutdown (&cleaning_task, NULL);
1595 }
1596
1597
1598 /**
1599  * Define "main" method using service macro.
1600  */
1601 GNUNET_SERVICE_MAIN (
1602   "datastore",
1603   GNUNET_SERVICE_OPTION_NONE,
1604   &run,
1605   &client_connect_cb,
1606   &client_disconnect_cb,
1607   NULL,
1608   GNUNET_MQ_hd_fixed_size (reserve,
1609                            GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1610                            struct ReserveMessage,
1611                            NULL),
1612   GNUNET_MQ_hd_fixed_size (release_reserve,
1613                            GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1614                            struct ReleaseReserveMessage,
1615                            NULL),
1616   GNUNET_MQ_hd_var_size (put,
1617                          GNUNET_MESSAGE_TYPE_DATASTORE_PUT,
1618                          struct DataMessage,
1619                          NULL),
1620   GNUNET_MQ_hd_fixed_size (get,
1621                            GNUNET_MESSAGE_TYPE_DATASTORE_GET,
1622                            struct GetMessage,
1623                            NULL),
1624   GNUNET_MQ_hd_fixed_size (get_key,
1625                            GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY,
1626                            struct GetKeyMessage,
1627                            NULL),
1628   GNUNET_MQ_hd_fixed_size (get_replication,
1629                            GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1630                            struct GNUNET_MessageHeader,
1631                            NULL),
1632   GNUNET_MQ_hd_fixed_size (get_zero_anonymity,
1633                            GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1634                            struct GetZeroAnonymityMessage,
1635                            NULL),
1636   GNUNET_MQ_hd_var_size (remove,
1637                          GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE,
1638                          struct DataMessage,
1639                          NULL),
1640   GNUNET_MQ_hd_fixed_size (drop,
1641                            GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1642                            struct GNUNET_MessageHeader,
1643                            NULL),
1644   GNUNET_MQ_handler_end ());
1645
1646
1647 /* end of gnunet-service-datastore.c */