[datastore] Create remove plugin API call
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2014, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * Limit size of bloom filter to 2 GB.
41  */
42 #define MAX_BF_SIZE ((uint32_t) (1LL << 31))
43
44 /**
45  * How long are we at most keeping "expired" content
46  * past the expiration date in the database?
47  */
48 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
49
50 /**
51  * How fast are we allowed to query the database for deleting
52  * expired content? (1 item per second).
53  */
54 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
55
56 /**
57  * Name under which we store current space consumption.
58  */
59 static char *quota_stat_name;
60
61 /**
62  * Task to timeout stat GET.
63  */
64 static struct GNUNET_SCHEDULER_Task *stat_timeout_task;
65
66 /**
67  * After how many payload-changing operations
68  * do we sync our statistics?
69  */
70 #define MAX_STAT_SYNC_LAG 50
71
72
73 /**
74  * Our datastore plugin.
75  */
76 struct DatastorePlugin
77 {
78
79   /**
80    * API of the transport as returned by the plugin's
81    * initialization function.
82    */
83   struct GNUNET_DATASTORE_PluginFunctions *api;
84
85   /**
86    * Short name for the plugin (i.e. "sqlite").
87    */
88   char *short_name;
89
90   /**
91    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
92    */
93   char *lib_name;
94
95   /**
96    * Environment this transport service is using
97    * for this plugin.
98    */
99   struct GNUNET_DATASTORE_PluginEnvironment env;
100
101 };
102
103
104 /**
105  * Linked list of active reservations.
106  */
107 struct ReservationList
108 {
109
110   /**
111    * This is a linked list.
112    */
113   struct ReservationList *next;
114
115   /**
116    * Client that made the reservation.
117    */
118   struct GNUNET_SERVICE_Client *client;
119
120   /**
121    * Number of bytes (still) reserved.
122    */
123   uint64_t amount;
124
125   /**
126    * Number of items (still) reserved.
127    */
128   uint64_t entries;
129
130   /**
131    * Reservation identifier.
132    */
133   int32_t rid;
134
135 };
136
137
138
139 /**
140  * Our datastore plugin (NULL if not available).
141  */
142 static struct DatastorePlugin *plugin;
143
144 /**
145  * Linked list of space reservations made by clients.
146  */
147 static struct ReservationList *reservations;
148
149 /**
150  * Bloomfilter to quickly tell if we don't have the content.
151  */
152 static struct GNUNET_CONTAINER_BloomFilter *filter;
153
154 /**
155  * Name of our plugin.
156  */
157 static char *plugin_name;
158
159 /**
160  * Our configuration.
161  */
162 static const struct GNUNET_CONFIGURATION_Handle *cfg;
163
164 /**
165  * Handle for reporting statistics.
166  */
167 static struct GNUNET_STATISTICS_Handle *stats;
168
169 /**
170  * How much space are we using for the cache?  (space available for
171  * insertions that will be instantly reclaimed by discarding less
172  * important content --- or possibly whatever we just inserted into
173  * the "cache").
174  */
175 static unsigned long long cache_size;
176
177 /**
178  * How much space have we currently reserved?
179  */
180 static unsigned long long reserved;
181
182 /**
183  * How much data are we currently storing
184  * in the database?
185  */
186 static unsigned long long payload;
187
188 /**
189  * Identity of the task that is used to delete
190  * expired content.
191  */
192 static struct GNUNET_SCHEDULER_Task *expired_kill_task;
193
194 /**
195  * Minimum time that content should have to not be discarded instantly
196  * (time stamp of any content that we've been discarding recently to
197  * stay below the quota).  FOREVER if we had to expire content with
198  * non-zero priority.
199  */
200 static struct GNUNET_TIME_Absolute min_expiration;
201
202 /**
203  * How much space are we allowed to use?
204  */
205 static unsigned long long quota;
206
207 /**
208  * Should the database be dropped on exit?
209  */
210 static int do_drop;
211
212 /**
213  * Should we refresh the BF when the DB is loaded?
214  */
215 static int refresh_bf;
216
217 /**
218  * Number of updates that were made to the
219  * payload value since we last synchronized
220  * it with the statistics service.
221  */
222 static unsigned int last_sync;
223
224 /**
225  * Did we get an answer from statistics?
226  */
227 static int stats_worked;
228
229
230 /**
231  * Synchronize our utilization statistics with the
232  * statistics service.
233  */
234 static void
235 sync_stats ()
236 {
237   GNUNET_STATISTICS_set (stats,
238                          quota_stat_name,
239                          payload,
240                          GNUNET_YES);
241   GNUNET_STATISTICS_set (stats,
242                          "# utilization by current datastore",
243                          payload,
244                          GNUNET_NO);
245   last_sync = 0;
246 }
247
248
249 /**
250  * Have we already cleaned up the TCCs and are hence no longer
251  * willing (or able) to transmit anything to anyone?
252  */
253 static int cleaning_done;
254
255 /**
256  * Handle for pending get request.
257  */
258 static struct GNUNET_STATISTICS_GetHandle *stat_get;
259
260 /**
261  * Handle to our server.
262  */
263 static struct GNUNET_SERVICE_Handle *service;
264
265 /**
266  * Task that is used to remove expired entries from
267  * the datastore.  This task will schedule itself
268  * again automatically to always delete all expired
269  * content quickly.
270  *
271  * @param cls not used
272  */
273 static void
274 delete_expired (void *cls);
275
276
277 /**
278  * Iterate over the expired items stored in the datastore.
279  * Delete all expired items; once we have processed all
280  * expired items, re-schedule the "delete_expired" task.
281  *
282  * @param cls not used
283  * @param key key for the content
284  * @param size number of bytes in data
285  * @param data content stored
286  * @param type type of the content
287  * @param priority priority of the content
288  * @param anonymity anonymity-level for the content
289  * @param replication replication-level for the content
290  * @param expiration expiration time for the content
291  * @param uid unique identifier for the datum;
292  *        maybe 0 if no unique identifier is available
293  *
294  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
295  *         (continue on call to "next", of course),
296  *         #GNUNET_NO to delete the item and continue (if supported)
297  */
298 static int
299 expired_processor (void *cls,
300                    const struct GNUNET_HashCode *key,
301                    uint32_t size,
302                    const void *data,
303                    enum GNUNET_BLOCK_Type type,
304                    uint32_t priority,
305                    uint32_t anonymity,
306                    uint32_t replication,
307                    struct GNUNET_TIME_Absolute expiration,
308                    uint64_t uid)
309 {
310   struct GNUNET_TIME_Absolute now;
311
312   if (NULL == key)
313   {
314     expired_kill_task =
315         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
316                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
317                                                     &delete_expired, NULL);
318     return GNUNET_SYSERR;
319   }
320   now = GNUNET_TIME_absolute_get ();
321   if (expiration.abs_value_us > now.abs_value_us)
322   {
323     /* finished processing */
324     expired_kill_task =
325         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
326                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
327                                                     &delete_expired, NULL);
328     return GNUNET_SYSERR;
329   }
330   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
331               "Deleting content `%s' of type %u that expired %s ago\n",
332               GNUNET_h2s (key), type,
333               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_difference (expiration,
334                                                                                            now),
335                                                       GNUNET_YES));
336   min_expiration = now;
337   GNUNET_STATISTICS_update (stats,
338                             gettext_noop ("# bytes expired"),
339                             size,
340                             GNUNET_YES);
341   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
342   expired_kill_task =
343       GNUNET_SCHEDULER_add_delayed_with_priority (MIN_EXPIRE_DELAY,
344                                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
345                                                   &delete_expired, NULL);
346   return GNUNET_NO;
347 }
348
349
350 /**
351  * Task that is used to remove expired entries from
352  * the datastore.  This task will schedule itself
353  * again automatically to always delete all expired
354  * content quickly.
355  *
356  * @param cls not used
357  */
358 static void
359 delete_expired (void *cls)
360 {
361   expired_kill_task = NULL;
362   plugin->api->get_expiration (plugin->api->cls,
363                                &expired_processor,
364                                NULL);
365 }
366
367
368 /**
369  * An iterator over a set of items stored in the datastore
370  * that deletes until we're happy with respect to our quota.
371  *
372  * @param cls closure
373  * @param key key for the content
374  * @param size number of bytes in data
375  * @param data content stored
376  * @param type type of the content
377  * @param priority priority of the content
378  * @param anonymity anonymity-level for the content
379  * @param replication replication-level for the content
380  * @param expiration expiration time for the content
381  * @param uid unique identifier for the datum;
382  *        maybe 0 if no unique identifier is available
383  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
384  *         (continue on call to "next", of course),
385  *         #GNUNET_NO to delete the item and continue (if supported)
386  */
387 static int
388 quota_processor (void *cls,
389                  const struct GNUNET_HashCode *key,
390                  uint32_t size,
391                  const void *data,
392                  enum GNUNET_BLOCK_Type type,
393                  uint32_t priority,
394                  uint32_t anonymity,
395                  uint32_t replication,
396                  struct GNUNET_TIME_Absolute expiration,
397                  uint64_t uid)
398 {
399   unsigned long long *need = cls;
400
401   if (NULL == key)
402     return GNUNET_SYSERR;
403   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404               "Deleting %llu bytes of low-priority (%u) content `%s' of type %u at %s prior to expiration (still trying to free another %llu bytes)\n",
405               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
406               (unsigned int) priority,
407               GNUNET_h2s (key), type,
408               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
409                                                       GNUNET_YES),
410               *need);
411   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
412     *need = 0;
413   else
414     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
415   if (priority > 0)
416     min_expiration = GNUNET_TIME_UNIT_FOREVER_ABS;
417   else
418     min_expiration = expiration;
419   GNUNET_STATISTICS_update (stats,
420                             gettext_noop ("# bytes purged (low-priority)"),
421                             size, GNUNET_YES);
422   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
423   return GNUNET_NO;
424 }
425
426
427 /**
428  * Manage available disk space by running tasks
429  * that will discard content if necessary.  This
430  * function will be run whenever a request for
431  * "need" bytes of storage could only be satisfied
432  * by eating into the "cache" (and we want our cache
433  * space back).
434  *
435  * @param need number of bytes of content that were
436  *        placed into the "cache" (and hence the
437  *        number of bytes that should be removed).
438  */
439 static void
440 manage_space (unsigned long long need)
441 {
442   unsigned long long last;
443
444   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
445               "Asked to free up %llu bytes of cache space\n",
446               need);
447   last = 0;
448   while ((need > 0) && (last != need))
449   {
450     last = need;
451     plugin->api->get_expiration (plugin->api->cls,
452                                  &quota_processor,
453                                  &need);
454   }
455 }
456
457
458 /**
459  * Transmit a status code to the client.
460  *
461  * @param client receiver of the response
462  * @param code status code
463  * @param msg optional error message (can be NULL)
464  */
465 static void
466 transmit_status (struct GNUNET_SERVICE_Client *client,
467                  int code,
468                  const char *msg)
469 {
470   struct GNUNET_MQ_Envelope *env;
471   struct StatusMessage *sm;
472   size_t slen;
473
474   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
475               "Transmitting `%s' message with value %d and message `%s'\n",
476               "STATUS", code, msg != NULL ? msg : "(none)");
477   slen = (msg == NULL) ? 0 : strlen (msg) + 1;
478   env = GNUNET_MQ_msg_extra (sm,
479                              slen,
480                              GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
481   sm->status = htonl (code);
482   sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration);
483   GNUNET_memcpy (&sm[1],
484                  msg,
485                  slen);
486   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
487                   env);
488 }
489
490
491 /**
492  * Function that will transmit the given datastore entry
493  * to the client.
494  *
495  * @param cls closure, pointer to the client (of type `struct GNUNET_SERVICE_Client`).
496  * @param key key for the content
497  * @param size number of bytes in data
498  * @param data content stored
499  * @param type type of the content
500  * @param priority priority of the content
501  * @param anonymity anonymity-level for the content
502  * @param replication replication-level for the content
503  * @param expiration expiration time for the content
504  * @param uid unique identifier for the datum;
505  *        maybe 0 if no unique identifier is available
506  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue,
507  *         #GNUNET_NO to delete the item and continue (if supported)
508  */
509 static int
510 transmit_item (void *cls,
511                const struct GNUNET_HashCode *key,
512                uint32_t size,
513                const void *data,
514                enum GNUNET_BLOCK_Type type,
515                uint32_t priority,
516                uint32_t anonymity,
517                uint32_t replication,
518                struct GNUNET_TIME_Absolute expiration,
519                uint64_t uid)
520 {
521   struct GNUNET_SERVICE_Client *client = cls;
522   struct GNUNET_MQ_Envelope *env;
523   struct GNUNET_MessageHeader *end;
524   struct DataMessage *dm;
525
526   if (NULL == key)
527   {
528     /* transmit 'DATA_END' */
529     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
530                 "Transmitting DATA_END message\n");
531     env = GNUNET_MQ_msg (end,
532                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
533     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
534                     env);
535     return GNUNET_OK;
536   }
537   GNUNET_assert (sizeof (struct DataMessage) + size <
538                  GNUNET_MAX_MESSAGE_SIZE);
539   env = GNUNET_MQ_msg_extra (dm,
540                              size,
541                              GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
542   dm->rid = htonl (0);
543   dm->size = htonl (size);
544   dm->type = htonl (type);
545   dm->priority = htonl (priority);
546   dm->anonymity = htonl (anonymity);
547   dm->replication = htonl (replication);
548   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
549   dm->uid = GNUNET_htonll (uid);
550   dm->key = *key;
551   GNUNET_memcpy (&dm[1],
552                  data,
553                  size);
554   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555               "Transmitting DATA message for `%s' of type %u with expiration %s (in: %s)\n",
556               GNUNET_h2s (key),
557               type,
558               GNUNET_STRINGS_absolute_time_to_string (expiration),
559               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
560                                                       GNUNET_YES));
561   GNUNET_STATISTICS_update (stats,
562                             gettext_noop ("# results found"),
563                             1,
564                             GNUNET_NO);
565   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
566                   env);
567   return GNUNET_OK;
568 }
569
570
571 /**
572  * Handle RESERVE-message.
573  *
574  * @param cls identification of the client
575  * @param message the actual message
576  */
577 static void
578 handle_reserve (void *cls,
579                 const struct ReserveMessage *msg)
580 {
581   /**
582    * Static counter to produce reservation identifiers.
583    */
584   static int reservation_gen;
585   struct GNUNET_SERVICE_Client *client = cls;
586   struct ReservationList *e;
587   unsigned long long used;
588   unsigned long long req;
589   uint64_t amount;
590   uint32_t entries;
591
592   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
593               "Processing RESERVE request\n");
594   amount = GNUNET_ntohll (msg->amount);
595   entries = ntohl (msg->entries);
596   used = payload + reserved;
597   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
598   if (used + req > quota)
599   {
600     if (quota < used)
601       used = quota;             /* cheat a bit for error message (to avoid negative numbers) */
602     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
603                 _("Insufficient space (%llu bytes are available) to satisfy RESERVE request for %llu bytes\n"),
604                 quota - used,
605                 req);
606     if (cache_size < req)
607     {
608       /* TODO: document this in the FAQ; essentially, if this
609        * message happens, the insertion request could be blocked
610        * by less-important content from migration because it is
611        * larger than 1/8th of the overall available space, and
612        * we only reserve 1/8th for "fresh" insertions */
613       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
614                   _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
615                   req,
616                   cache_size);
617       transmit_status (client,
618                        0,
619                        gettext_noop
620                        ("Insufficient space to satisfy request and "
621                         "requested amount is larger than cache size"));
622     }
623     else
624     {
625       transmit_status (client,
626                        0,
627                        gettext_noop ("Insufficient space to satisfy request"));
628     }
629     GNUNET_SERVICE_client_continue (client);
630     return;
631   }
632   reserved += req;
633   GNUNET_STATISTICS_set (stats,
634                          gettext_noop ("# reserved"),
635                          reserved,
636                          GNUNET_NO);
637   e = GNUNET_new (struct ReservationList);
638   e->next = reservations;
639   reservations = e;
640   e->client = client;
641   e->amount = amount;
642   e->entries = entries;
643   e->rid = ++reservation_gen;
644   if (reservation_gen < 0)
645     reservation_gen = 0;        /* wrap around */
646   transmit_status (client,
647                    e->rid,
648                    NULL);
649   GNUNET_SERVICE_client_continue (client);
650 }
651
652
653 /**
654  * Handle RELEASE_RESERVE-message.
655  *
656  * @param cls identification of the client
657  * @param message the actual message
658  */
659 static void
660 handle_release_reserve (void *cls,
661                         const struct ReleaseReserveMessage *msg)
662 {
663   struct GNUNET_SERVICE_Client *client = cls;
664   struct ReservationList *pos;
665   struct ReservationList *prev;
666   struct ReservationList *next;
667   int rid = ntohl (msg->rid);
668   unsigned long long rem;
669
670   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
671               "Processing RELEASE_RESERVE request\n");
672   next = reservations;
673   prev = NULL;
674   while (NULL != (pos = next))
675   {
676     next = pos->next;
677     if (rid == pos->rid)
678     {
679       if (prev == NULL)
680         reservations = next;
681       else
682         prev->next = next;
683       rem =
684           pos->amount +
685           ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
686       GNUNET_assert (reserved >= rem);
687       reserved -= rem;
688       GNUNET_STATISTICS_set (stats,
689                              gettext_noop ("# reserved"),
690                              reserved,
691                              GNUNET_NO);
692       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
693                   "Returning %llu remaining reserved bytes to storage pool\n",
694                   rem);
695       GNUNET_free (pos);
696       transmit_status (client,
697                        GNUNET_OK,
698                        NULL);
699       GNUNET_SERVICE_client_continue (client);
700       return;
701     }
702     prev = pos;
703   }
704   GNUNET_break (0);
705   transmit_status (client,
706                    GNUNET_SYSERR,
707                    gettext_noop ("Could not find matching reservation"));
708   GNUNET_SERVICE_client_continue (client);
709 }
710
711
712 /**
713  * Check that the given message is a valid data message.
714  *
715  * @param dm message to check
716  * @return #GNUNET_SYSERR is not well-formed, otherwise #GNUNET_OK
717  */
718 static int
719 check_data (const struct DataMessage *dm)
720 {
721   uint16_t size;
722   uint32_t dsize;
723
724   size = ntohs (dm->header.size);
725   dsize = ntohl (dm->size);
726   if (size != dsize + sizeof (struct DataMessage))
727   {
728     GNUNET_break (0);
729     return GNUNET_SYSERR;
730   }
731   return GNUNET_OK;
732 }
733
734
735 /**
736  * Put continuation.
737  *
738  * @param cls closure
739  * @param key key for the item stored
740  * @param size size of the item stored
741  * @param status #GNUNET_OK if inserted, #GNUNET_NO if updated,
742  *        or #GNUNET_SYSERROR if error
743  * @param msg error message on error
744  */
745 static void
746 put_continuation (void *cls,
747                   const struct GNUNET_HashCode *key,
748                   uint32_t size,
749                   int status,
750                   const char *msg)
751 {
752   struct GNUNET_SERVICE_Client *client = cls;
753
754   if (GNUNET_OK == status)
755   {
756     GNUNET_STATISTICS_update (stats,
757                               gettext_noop ("# bytes stored"),
758                               size,
759                               GNUNET_YES);
760     GNUNET_CONTAINER_bloomfilter_add (filter,
761                                       key);
762     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
763                 "Successfully stored %u bytes under key `%s'\n",
764                 size,
765                 GNUNET_h2s (key));
766   }
767   transmit_status (client,
768                    GNUNET_SYSERR == status ? GNUNET_SYSERR : GNUNET_OK,
769                    msg);
770   if (quota - reserved - cache_size < payload)
771   {
772     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
773                 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
774                 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
775                 (unsigned long long) (quota - reserved - cache_size),
776                 (unsigned long long) payload);
777     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
778   }
779 }
780
781
782 /**
783  * Verify PUT-message.
784  *
785  * @param cls identification of the client
786  * @param message the actual message
787  * @return #GNUNET_OK if @a dm is well-formed
788  */
789 static int
790 check_put (void *cls,
791            const struct DataMessage *dm)
792 {
793   if (GNUNET_OK != check_data (dm))
794   {
795     GNUNET_break (0);
796     return GNUNET_SYSERR;
797   }
798   return GNUNET_OK;
799 }
800
801
802 /**
803  * Handle PUT-message.
804  *
805  * @param cls identification of the client
806  * @param message the actual message
807  */
808 static void
809 handle_put (void *cls,
810             const struct DataMessage *dm)
811 {
812   struct GNUNET_SERVICE_Client *client = cls;
813   int rid;
814   struct ReservationList *pos;
815   uint32_t size;
816
817   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
818               "Processing PUT request for `%s' of type %u\n",
819               GNUNET_h2s (&dm->key),
820               (uint32_t) ntohl (dm->type));
821   rid = ntohl (dm->rid);
822   size = ntohl (dm->size);
823   if (rid > 0)
824   {
825     pos = reservations;
826     while ((NULL != pos) && (rid != pos->rid))
827       pos = pos->next;
828     GNUNET_break (pos != NULL);
829     if (NULL != pos)
830     {
831       GNUNET_break (pos->entries > 0);
832       GNUNET_break (pos->amount >= size);
833       pos->entries--;
834       pos->amount -= size;
835       reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
836       GNUNET_STATISTICS_set (stats,
837                              gettext_noop ("# reserved"),
838                              reserved,
839                              GNUNET_NO);
840     }
841   }
842   bool absent = GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (filter,
843                                                                 &dm->key);
844   plugin->api->put (plugin->api->cls,
845                     &dm->key,
846                     absent,
847                     ntohl (dm->size),
848                     &dm[1],
849                     ntohl (dm->type),
850                     ntohl (dm->priority),
851                     ntohl (dm->anonymity),
852                     ntohl (dm->replication),
853                     GNUNET_TIME_absolute_ntoh (dm->expiration),
854                     &put_continuation,
855                     client);
856   GNUNET_SERVICE_client_continue (client);
857 }
858
859
860 /**
861  * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET-message.
862  *
863  * @param cls identification of the client
864  * @param msg the actual message
865  */
866 static void
867 handle_get (void *cls,
868             const struct GetMessage *msg)
869 {
870   struct GNUNET_SERVICE_Client *client = cls;
871
872   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873               "Processing GET request of type %u\n",
874               (uint32_t) ntohl (msg->type));
875   GNUNET_STATISTICS_update (stats,
876                             gettext_noop ("# GET requests received"),
877                             1,
878                             GNUNET_NO);
879   plugin->api->get_key (plugin->api->cls,
880                         GNUNET_ntohll (msg->next_uid),
881                         msg->random,
882                         NULL,
883                         ntohl (msg->type),
884                         &transmit_item,
885                         client);
886   GNUNET_SERVICE_client_continue (client);
887 }
888
889
890 /**
891  * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY-message.
892  *
893  * @param cls closure
894  * @param msg the actual message
895  */
896 static void
897 handle_get_key (void *cls,
898                 const struct GetKeyMessage *msg)
899 {
900   struct GNUNET_SERVICE_Client *client = cls;
901
902   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
903               "Processing GET request for `%s' of type %u\n",
904               GNUNET_h2s (&msg->key),
905               (uint32_t) ntohl (msg->type));
906   GNUNET_STATISTICS_update (stats,
907                             gettext_noop ("# GET KEY requests received"),
908                             1,
909                             GNUNET_NO);
910   if (GNUNET_YES !=
911       GNUNET_CONTAINER_bloomfilter_test (filter,
912                                          &msg->key))
913   {
914     /* don't bother database... */
915     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
916                 "Empty result set for GET request for `%s' (bloomfilter).\n",
917                 GNUNET_h2s (&msg->key));
918     GNUNET_STATISTICS_update (stats,
919                               gettext_noop
920                               ("# requests filtered by bloomfilter"),
921                               1,
922                               GNUNET_NO);
923     transmit_item (client,
924                    NULL, 0, NULL, 0, 0, 0, 0,
925                    GNUNET_TIME_UNIT_ZERO_ABS,
926                    0);
927     GNUNET_SERVICE_client_continue (client);
928     return;
929   }
930   plugin->api->get_key (plugin->api->cls,
931                         GNUNET_ntohll (msg->next_uid),
932                         msg->random,
933                         &msg->key,
934                         ntohl (msg->type),
935                         &transmit_item,
936                         client);
937   GNUNET_SERVICE_client_continue (client);
938 }
939
940
941 /**
942  * Handle GET_REPLICATION-message.
943  *
944  * @param cls identification of the client
945  * @param message the actual message
946  */
947 static void
948 handle_get_replication (void *cls,
949                         const struct GNUNET_MessageHeader *message)
950 {
951   struct GNUNET_SERVICE_Client *client = cls;
952
953   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
954               "Processing GET_REPLICATION request\n");
955   GNUNET_STATISTICS_update (stats,
956                             gettext_noop ("# GET REPLICATION requests received"),
957                             1,
958                             GNUNET_NO);
959   plugin->api->get_replication (plugin->api->cls,
960                                 &transmit_item,
961                                 client);
962   GNUNET_SERVICE_client_continue (client);
963 }
964
965
966 /**
967  * Handle GET_ZERO_ANONYMITY-message.
968  *
969  * @param cls client identification of the client
970  * @param message the actual message
971  */
972 static void
973 handle_get_zero_anonymity (void *cls,
974                            const struct GetZeroAnonymityMessage *msg)
975 {
976   struct GNUNET_SERVICE_Client *client = cls;
977   enum GNUNET_BLOCK_Type type;
978
979   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
980   if (type == GNUNET_BLOCK_TYPE_ANY)
981   {
982     GNUNET_break (0);
983     GNUNET_SERVICE_client_drop (client);
984     return;
985   }
986   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
987               "Processing GET_ZERO_ANONYMITY request\n");
988   GNUNET_STATISTICS_update (stats,
989                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
990                             1,
991                             GNUNET_NO);
992   plugin->api->get_zero_anonymity (plugin->api->cls,
993                                    GNUNET_ntohll (msg->next_uid),
994                                    type,
995                                    &transmit_item,
996                                    client);
997   GNUNET_SERVICE_client_continue (client);
998 }
999
1000
1001 /**
1002  * Remove continuation.
1003  *
1004  * @param cls closure
1005  * @param key key for the content
1006  * @param size number of bytes in data
1007  * @param status #GNUNET_OK if removed, #GNUNET_NO if not found,
1008  *        or #GNUNET_SYSERROR if error
1009  * @param msg error message on error
1010  */
1011 static void
1012 remove_continuation (void *cls,
1013                      const struct GNUNET_HashCode *key,
1014                      uint32_t size,
1015                      int status,
1016                      const char *msg)
1017 {
1018   struct GNUNET_SERVICE_Client *client = cls;
1019
1020   if (GNUNET_SYSERR == status)
1021   {
1022     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1023                 "REMOVE request failed: %s.\n",
1024                 msg);
1025     transmit_status (client,
1026                      GNUNET_NO,
1027                      msg);
1028     return;
1029   }
1030   if (GNUNET_NO == status)
1031   {
1032     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033                 "Content not found for REMOVE request.\n");
1034     transmit_status (client,
1035                      GNUNET_NO,
1036                      _("Content not found"));
1037     return;
1038   }
1039   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1040               "Item matches REMOVE request for key `%s'.\n",
1041               GNUNET_h2s (key));
1042   GNUNET_STATISTICS_update (stats,
1043                             gettext_noop ("# bytes removed (explicit request)"),
1044                             size,
1045                             GNUNET_YES);
1046   GNUNET_CONTAINER_bloomfilter_remove (filter,
1047                                        key);
1048   transmit_status (client,
1049                    GNUNET_OK,
1050                    NULL);
1051 }
1052
1053
1054 /**
1055  * Verify REMOVE-message.
1056  *
1057  * @param cls identification of the client
1058  * @param message the actual message
1059  * @return #GNUNET_OK if @a dm is well-formed
1060  */
1061 static int
1062 check_remove (void *cls,
1063               const struct DataMessage *dm)
1064 {
1065   if (GNUNET_OK != check_data (dm))
1066   {
1067     GNUNET_break (0);
1068     return GNUNET_SYSERR;
1069   }
1070   return GNUNET_OK;
1071 }
1072
1073
1074 /**
1075  * Handle REMOVE-message.
1076  *
1077  * @param cls closure
1078  * @param client identification of the client
1079  * @param message the actual message
1080  */
1081 static void
1082 handle_remove (void *cls,
1083                const struct DataMessage *dm)
1084 {
1085   struct GNUNET_SERVICE_Client *client = cls;
1086
1087   GNUNET_STATISTICS_update (stats,
1088                             gettext_noop ("# REMOVE requests received"),
1089                             1, GNUNET_NO);
1090   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1091               "Processing REMOVE request for `%s'\n",
1092               GNUNET_h2s (&dm->key));
1093   plugin->api->remove_key (plugin->api->cls,
1094                            &dm->key,
1095                            ntohl (dm->size),
1096                            &dm[1],
1097                            &remove_continuation,
1098                            client);
1099   GNUNET_SERVICE_client_continue (client);
1100 }
1101
1102
1103 /**
1104  * Handle DROP-message.
1105  *
1106  * @param cls identification of the client
1107  * @param message the actual message
1108  */
1109 static void
1110 handle_drop (void *cls,
1111              const struct GNUNET_MessageHeader *message)
1112 {
1113   struct GNUNET_SERVICE_Client *client = cls;
1114
1115   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1116               "Processing DROP request\n");
1117   do_drop = GNUNET_YES;
1118   GNUNET_SERVICE_client_continue (client);
1119 }
1120
1121
1122 /**
1123  * Function called by plugins to notify us about a
1124  * change in their disk utilization.
1125  *
1126  * @param cls closure (NULL)
1127  * @param delta change in disk utilization,
1128  *        0 for "reset to empty"
1129  */
1130 static void
1131 disk_utilization_change_cb (void *cls,
1132                             int delta)
1133 {
1134   if ((delta < 0) && (payload < -delta))
1135   {
1136     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1137                 _("Datastore payload must have been inaccurate (%lld < %lld). Recomputing it.\n"),
1138                 (long long) payload,
1139                 (long long) -delta);
1140     plugin->api->estimate_size (plugin->api->cls,
1141                                 &payload);
1142     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1143                 _("New payload: %lld\n"),
1144                 (long long) payload);
1145      sync_stats ();
1146     return;
1147   }
1148   payload += delta;
1149   last_sync++;
1150   if (last_sync >= MAX_STAT_SYNC_LAG)
1151     sync_stats ();
1152 }
1153
1154
1155 /**
1156  * Callback function to process statistic values.
1157  *
1158  * @param cls closure (struct Plugin*)
1159  * @param subsystem name of subsystem that created the statistic
1160  * @param name the name of the datum
1161  * @param value the current value
1162  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
1163  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
1164  */
1165 static int
1166 process_stat_in (void *cls,
1167                  const char *subsystem,
1168                  const char *name,
1169                  uint64_t value,
1170                  int is_persistent)
1171 {
1172   GNUNET_assert (GNUNET_NO == stats_worked);
1173   stats_worked = GNUNET_YES;
1174   payload += value;
1175   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1176               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1177               (unsigned long long) value,
1178               (unsigned long long) payload);
1179   return GNUNET_OK;
1180 }
1181
1182
1183 /**
1184  * Load the datastore plugin.
1185  */
1186 static struct DatastorePlugin *
1187 load_plugin ()
1188 {
1189   struct DatastorePlugin *ret;
1190   char *libname;
1191
1192   ret = GNUNET_new (struct DatastorePlugin);
1193   ret->env.cfg = cfg;
1194   ret->env.duc = &disk_utilization_change_cb;
1195   ret->env.cls = NULL;
1196   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1197               _("Loading `%s' datastore plugin\n"),
1198               plugin_name);
1199   GNUNET_asprintf (&libname,
1200                    "libgnunet_plugin_datastore_%s",
1201                    plugin_name);
1202   ret->short_name = GNUNET_strdup (plugin_name);
1203   ret->lib_name = libname;
1204   ret->api = GNUNET_PLUGIN_load (libname,
1205                                  &ret->env);
1206   if (NULL == ret->api)
1207   {
1208     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1209                 _("Failed to load datastore plugin for `%s'\n"),
1210                 plugin_name);
1211     GNUNET_free (ret->short_name);
1212     GNUNET_free (libname);
1213     GNUNET_free (ret);
1214     return NULL;
1215   }
1216   return ret;
1217 }
1218
1219
1220 /**
1221  * Function called when the service shuts
1222  * down.  Unloads our datastore plugin.
1223  *
1224  * @param plug plugin to unload
1225  */
1226 static void
1227 unload_plugin (struct DatastorePlugin *plug)
1228 {
1229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1230               "Datastore service is unloading plugin...\n");
1231   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1232   GNUNET_free (plug->lib_name);
1233   GNUNET_free (plug->short_name);
1234   GNUNET_free (plug);
1235 }
1236
1237
1238 /**
1239  * Initialization complete, start operating the service.
1240  */
1241 static void
1242 begin_service ()
1243 {
1244   GNUNET_SERVICE_resume (service);
1245   expired_kill_task
1246     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1247                                           &delete_expired,
1248                                           NULL);
1249 }
1250
1251
1252 /**
1253  * Adds a given @a key to the bloomfilter in @a cls @a count times.
1254  *
1255  * @param cls the bloomfilter
1256  * @param key key to add
1257  * @param count number of times to add key
1258  */
1259 static void
1260 add_key_to_bloomfilter (void *cls,
1261                         const struct GNUNET_HashCode *key,
1262                         unsigned int count)
1263 {
1264   struct GNUNET_CONTAINER_BloomFilter *bf = cls;
1265
1266   if (NULL == key)
1267   {
1268     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1269                 _("Bloomfilter construction complete.\n"));
1270     begin_service ();
1271     return;
1272   }
1273
1274   while (0 < count--)
1275     GNUNET_CONTAINER_bloomfilter_add (bf,
1276                                       key);
1277 }
1278
1279
1280 /**
1281  * We finished receiving the statistic.  Initialize the plugin; if
1282  * loading the statistic failed, run the estimator.
1283  *
1284  * @param cls NULL
1285  * @param success #GNUNET_NO if we failed to read the stat
1286  */
1287 static void
1288 process_stat_done (void *cls,
1289                    int success)
1290 {
1291   stat_get = NULL;
1292   if (NULL != stat_timeout_task)
1293   {
1294     GNUNET_SCHEDULER_cancel (stat_timeout_task);
1295     stat_timeout_task = NULL;
1296   }
1297   plugin = load_plugin ();
1298   if (NULL == plugin)
1299   {
1300     GNUNET_CONTAINER_bloomfilter_free (filter);
1301     filter = NULL;
1302     if (NULL != stats)
1303     {
1304       GNUNET_STATISTICS_destroy (stats,
1305                                  GNUNET_YES);
1306       stats = NULL;
1307     }
1308     return;
1309   }
1310
1311   if (GNUNET_NO == stats_worked)
1312   {
1313     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1314                 "Failed to obtain value from statistics service, recomputing it\n");
1315     plugin->api->estimate_size (plugin->api->cls,
1316                                 &payload);
1317     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1318                 _("New payload: %lld\n"),
1319                 (long long) payload);
1320   }
1321
1322   if (GNUNET_YES == refresh_bf)
1323   {
1324     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1325                 _("Rebuilding bloomfilter.  Please be patient.\n"));
1326     if (NULL != plugin->api->get_keys)
1327     {
1328       plugin->api->get_keys (plugin->api->cls,
1329                              &add_key_to_bloomfilter,
1330                              filter);
1331       return;
1332     }
1333     else
1334     {
1335       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1336                   _("Plugin does not support get_keys function. Please fix!\n"));
1337     }
1338   }
1339   begin_service ();
1340 }
1341
1342
1343 /**
1344  * Fetching stats took to long, run without.
1345  *
1346  * @param cls NULL
1347  */
1348 static void
1349 stat_timeout (void *cls)
1350 {
1351   stat_timeout_task = NULL;
1352   GNUNET_STATISTICS_get_cancel (stat_get);
1353   process_stat_done (NULL,
1354                      GNUNET_NO);
1355 }
1356
1357
1358 /**
1359  * Task run during shutdown.
1360  */
1361 static void
1362 cleaning_task (void *cls)
1363 {
1364   cleaning_done = GNUNET_YES;
1365   if (NULL != expired_kill_task)
1366   {
1367     GNUNET_SCHEDULER_cancel (expired_kill_task);
1368     expired_kill_task = NULL;
1369   }
1370   if (GNUNET_YES == do_drop)
1371   {
1372     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1373                 "Dropping database!\n");
1374     plugin->api->drop (plugin->api->cls);
1375     payload = 0;
1376     last_sync++;
1377   }
1378   if (NULL != plugin)
1379   {
1380     unload_plugin (plugin);
1381     plugin = NULL;
1382   }
1383   if (NULL != filter)
1384   {
1385     GNUNET_CONTAINER_bloomfilter_free (filter);
1386     filter = NULL;
1387   }
1388   if (NULL != stat_get)
1389   {
1390     GNUNET_STATISTICS_get_cancel (stat_get);
1391     stat_get = NULL;
1392   }
1393   if (NULL != stat_timeout_task)
1394   {
1395     GNUNET_SCHEDULER_cancel (stat_timeout_task);
1396     stat_timeout_task = NULL;
1397   }
1398   GNUNET_free_non_null (plugin_name);
1399   plugin_name = NULL;
1400   if (last_sync > 0)
1401     sync_stats ();
1402   if (NULL != stats)
1403   {
1404     GNUNET_STATISTICS_destroy (stats,
1405                                GNUNET_YES);
1406     stats = NULL;
1407   }
1408   GNUNET_free (quota_stat_name);
1409   quota_stat_name = NULL;
1410 }
1411
1412
1413 /**
1414  * Add a client to our list of active clients.
1415  *
1416  * @param cls NULL
1417  * @param client client to add
1418  * @param mq message queue for @a client
1419  * @return @a client
1420  */
1421 static void *
1422 client_connect_cb (void *cls,
1423                    struct GNUNET_SERVICE_Client *client,
1424                    struct GNUNET_MQ_Handle *mq)
1425 {
1426   return client;
1427 }
1428
1429
1430 /**
1431  * Called whenever a client is disconnected.
1432  * Frees our resources associated with that client.
1433  *
1434  * @param cls closure
1435  * @param client identification of the client
1436  * @param app_ctx must match @a client
1437  */
1438 static void
1439 client_disconnect_cb (void *cls,
1440                       struct GNUNET_SERVICE_Client *client,
1441                       void *app_ctx)
1442 {
1443   struct ReservationList *pos;
1444   struct ReservationList *prev;
1445   struct ReservationList *next;
1446
1447   GNUNET_assert (app_ctx == client);
1448   prev = NULL;
1449   pos = reservations;
1450   while (NULL != pos)
1451   {
1452     next = pos->next;
1453     if (pos->client == client)
1454     {
1455       if (NULL == prev)
1456         reservations = next;
1457       else
1458         prev->next = next;
1459       reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1460       GNUNET_free (pos);
1461     }
1462     else
1463     {
1464       prev = pos;
1465     }
1466     pos = next;
1467   }
1468   GNUNET_STATISTICS_set (stats,
1469                          gettext_noop ("# reserved"),
1470                          reserved,
1471                          GNUNET_NO);
1472
1473 }
1474
1475
1476 /**
1477  * Process datastore requests.
1478  *
1479  * @param cls closure
1480  * @param serv the initialized service
1481  * @param c configuration to use
1482  */
1483 static void
1484 run (void *cls,
1485      const struct GNUNET_CONFIGURATION_Handle *c,
1486      struct GNUNET_SERVICE_Handle *serv)
1487 {
1488   char *fn;
1489   char *pfn;
1490   unsigned int bf_size;
1491
1492   service = serv;
1493   cfg = c;
1494   if (GNUNET_OK !=
1495       GNUNET_CONFIGURATION_get_value_string (cfg,
1496                                              "DATASTORE",
1497                                              "DATABASE",
1498                                              &plugin_name))
1499   {
1500     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1501                                "DATABASE",
1502                                "DATASTORE");
1503     return;
1504   }
1505   GNUNET_asprintf (&quota_stat_name,
1506                    _("# bytes used in file-sharing datastore `%s'"),
1507                    plugin_name);
1508   if (GNUNET_OK !=
1509       GNUNET_CONFIGURATION_get_value_size (cfg,
1510                                            "DATASTORE",
1511                                            "QUOTA",
1512                                            &quota))
1513   {
1514     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1515                                "QUOTA",
1516                                "DATASTORE");
1517     return;
1518   }
1519   stats = GNUNET_STATISTICS_create ("datastore",
1520                                     cfg);
1521   GNUNET_STATISTICS_set (stats,
1522                          gettext_noop ("# quota"),
1523                          quota,
1524                          GNUNET_NO);
1525   cache_size = quota / 8;       /* Or should we make this an option? */
1526   GNUNET_STATISTICS_set (stats,
1527                          gettext_noop ("# cache size"),
1528                          cache_size,
1529                          GNUNET_NO);
1530   if (quota / (32 * 1024LL) > MAX_BF_SIZE)
1531     bf_size = MAX_BF_SIZE;
1532   else
1533     bf_size = quota / (32 * 1024LL);         /* 8 bit per entry, 1 bit per 32 kb in DB */
1534   fn = NULL;
1535   if ((GNUNET_OK !=
1536        GNUNET_CONFIGURATION_get_value_filename (cfg,
1537                                                 "DATASTORE",
1538                                                 "BLOOMFILTER",
1539                                                 &fn)) ||
1540       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
1541   {
1542     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1543                 _("Could not use specified filename `%s' for bloomfilter.\n"),
1544                 NULL != fn ? fn : "");
1545     GNUNET_free_non_null (fn);
1546     fn = NULL;
1547   }
1548   if (NULL != fn)
1549   {
1550     GNUNET_asprintf (&pfn, "%s.%s", fn, plugin_name);
1551     if (GNUNET_YES == GNUNET_DISK_file_test (pfn))
1552     {
1553       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1554       if (NULL == filter)
1555       {
1556         /* file exists but not valid, remove and try again, but refresh */
1557         if (0 != UNLINK (pfn))
1558         {
1559           /* failed to remove, run without file */
1560           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1561                       _("Failed to remove bogus bloomfilter file `%s'\n"),
1562                       pfn);
1563           GNUNET_free (pfn);
1564           pfn = NULL;
1565           filter = GNUNET_CONTAINER_bloomfilter_load (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1566           refresh_bf = GNUNET_YES;
1567         }
1568         else
1569         {
1570           /* try again after remove */
1571           filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1572           refresh_bf = GNUNET_YES;
1573           if (NULL == filter)
1574           {
1575             /* failed yet again, give up on using file */
1576             GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1577                         _("Failed to remove bogus bloomfilter file `%s'\n"),
1578                         pfn);
1579             GNUNET_free (pfn);
1580             pfn = NULL;
1581             filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1582           }
1583         }
1584       }
1585       else
1586       {
1587         /* normal case: have an existing valid bf file, no need to refresh */
1588         refresh_bf = GNUNET_NO;
1589       }
1590     }
1591     else
1592     {
1593       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1594       refresh_bf = GNUNET_YES;
1595     }
1596     GNUNET_free (pfn);
1597   }
1598   else
1599   {
1600     filter = GNUNET_CONTAINER_bloomfilter_init (NULL,
1601                                                 bf_size,
1602                                                 5);      /* approx. 3% false positives at max use */
1603     refresh_bf = GNUNET_YES;
1604   }
1605   GNUNET_free_non_null (fn);
1606   if (NULL == filter)
1607   {
1608     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1609                 _("Failed to initialize bloomfilter.\n"));
1610     if (NULL != stats)
1611     {
1612       GNUNET_STATISTICS_destroy (stats,
1613                                  GNUNET_YES);
1614       stats = NULL;
1615     }
1616     return;
1617   }
1618   GNUNET_SERVICE_suspend (service);
1619   stat_get =
1620       GNUNET_STATISTICS_get (stats,
1621                              "datastore",
1622                              quota_stat_name,
1623                              &process_stat_done,
1624                              &process_stat_in,
1625                              NULL);
1626   if (NULL == stat_get)
1627     process_stat_done (NULL,
1628                        GNUNET_SYSERR);
1629   else
1630     stat_timeout_task
1631       = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
1632                                       &stat_timeout,
1633                                       NULL);
1634   GNUNET_SCHEDULER_add_shutdown (&cleaning_task,
1635                                  NULL);
1636 }
1637
1638
1639 /**
1640  * Define "main" method using service macro.
1641  */
1642 GNUNET_SERVICE_MAIN
1643 ("datastore",
1644  GNUNET_SERVICE_OPTION_NONE,
1645  &run,
1646  &client_connect_cb,
1647  &client_disconnect_cb,
1648  NULL,
1649  GNUNET_MQ_hd_fixed_size (reserve,
1650                           GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1651                           struct ReserveMessage,
1652                           NULL),
1653  GNUNET_MQ_hd_fixed_size (release_reserve,
1654                           GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1655                           struct ReleaseReserveMessage,
1656                           NULL),
1657  GNUNET_MQ_hd_var_size (put,
1658                         GNUNET_MESSAGE_TYPE_DATASTORE_PUT,
1659                         struct DataMessage,
1660                         NULL),
1661  GNUNET_MQ_hd_fixed_size (get,
1662                           GNUNET_MESSAGE_TYPE_DATASTORE_GET,
1663                           struct GetMessage,
1664                           NULL),
1665  GNUNET_MQ_hd_fixed_size (get_key,
1666                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY,
1667                           struct GetKeyMessage,
1668                           NULL),
1669  GNUNET_MQ_hd_fixed_size (get_replication,
1670                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1671                           struct GNUNET_MessageHeader,
1672                           NULL),
1673  GNUNET_MQ_hd_fixed_size (get_zero_anonymity,
1674                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1675                           struct GetZeroAnonymityMessage,
1676                           NULL),
1677  GNUNET_MQ_hd_var_size (remove,
1678                         GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE,
1679                         struct DataMessage,
1680                         NULL),
1681  GNUNET_MQ_hd_fixed_size (drop,
1682                           GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1683                           struct GNUNET_MessageHeader,
1684                           NULL),
1685  GNUNET_MQ_handler_end ());
1686
1687
1688 /* end of gnunet-service-datastore.c */