uncrustify as demanded.
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2014, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * Limit size of bloom filter to 2 GB.
41  */
42 #define MAX_BF_SIZE ((uint32_t)(1LL << 31))
43
44 /**
45  * How long are we at most keeping "expired" content
46  * past the expiration date in the database?
47  */
48 #define MAX_EXPIRE_DELAY \
49   GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 15)
50
51 /**
52  * How fast are we allowed to query the database for deleting
53  * expired content? (1 item per second).
54  */
55 #define MIN_EXPIRE_DELAY \
56   GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1)
57
58 /**
59  * Name under which we store current space consumption.
60  */
61 static char *quota_stat_name;
62
63 /**
64  * Task to timeout stat GET.
65  */
66 static struct GNUNET_SCHEDULER_Task *stat_timeout_task;
67
68 /**
69  * After how many payload-changing operations
70  * do we sync our statistics?
71  */
72 #define MAX_STAT_SYNC_LAG 50
73
74
75 /**
76  * Our datastore plugin.
77  */
78 struct DatastorePlugin {
79   /**
80    * API of the transport as returned by the plugin's
81    * initialization function.
82    */
83   struct GNUNET_DATASTORE_PluginFunctions *api;
84
85   /**
86    * Short name for the plugin (i.e. "sqlite").
87    */
88   char *short_name;
89
90   /**
91    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
92    */
93   char *lib_name;
94
95   /**
96    * Environment this transport service is using
97    * for this plugin.
98    */
99   struct GNUNET_DATASTORE_PluginEnvironment env;
100 };
101
102
103 /**
104  * Linked list of active reservations.
105  */
106 struct ReservationList {
107   /**
108    * This is a linked list.
109    */
110   struct ReservationList *next;
111
112   /**
113    * Client that made the reservation.
114    */
115   struct GNUNET_SERVICE_Client *client;
116
117   /**
118    * Number of bytes (still) reserved.
119    */
120   uint64_t amount;
121
122   /**
123    * Number of items (still) reserved.
124    */
125   uint64_t entries;
126
127   /**
128    * Reservation identifier.
129    */
130   int32_t rid;
131 };
132
133
134 /**
135  * Our datastore plugin (NULL if not available).
136  */
137 static struct DatastorePlugin *plugin;
138
139 /**
140  * Linked list of space reservations made by clients.
141  */
142 static struct ReservationList *reservations;
143
144 /**
145  * Bloomfilter to quickly tell if we don't have the content.
146  */
147 static struct GNUNET_CONTAINER_BloomFilter *filter;
148
149 /**
150  * Name of our plugin.
151  */
152 static char *plugin_name;
153
154 /**
155  * Our configuration.
156  */
157 static const struct GNUNET_CONFIGURATION_Handle *cfg;
158
159 /**
160  * Handle for reporting statistics.
161  */
162 static struct GNUNET_STATISTICS_Handle *stats;
163
164 /**
165  * How much space are we using for the cache?  (space available for
166  * insertions that will be instantly reclaimed by discarding less
167  * important content --- or possibly whatever we just inserted into
168  * the "cache").
169  */
170 static unsigned long long cache_size;
171
172 /**
173  * How much space have we currently reserved?
174  */
175 static unsigned long long reserved;
176
177 /**
178  * How much data are we currently storing
179  * in the database?
180  */
181 static unsigned long long payload;
182
183 /**
184  * Identity of the task that is used to delete
185  * expired content.
186  */
187 static struct GNUNET_SCHEDULER_Task *expired_kill_task;
188
189 /**
190  * Minimum time that content should have to not be discarded instantly
191  * (time stamp of any content that we've been discarding recently to
192  * stay below the quota).  FOREVER if we had to expire content with
193  * non-zero priority.
194  */
195 static struct GNUNET_TIME_Absolute min_expiration;
196
197 /**
198  * How much space are we allowed to use?
199  */
200 static unsigned long long quota;
201
202 /**
203  * Should the database be dropped on exit?
204  */
205 static int do_drop;
206
207 /**
208  * Should we refresh the BF when the DB is loaded?
209  */
210 static int refresh_bf;
211
212 /**
213  * Number of updates that were made to the
214  * payload value since we last synchronized
215  * it with the statistics service.
216  */
217 static unsigned int last_sync;
218
219 /**
220  * Did we get an answer from statistics?
221  */
222 static int stats_worked;
223
224
225 /**
226  * Synchronize our utilization statistics with the
227  * statistics service.
228  */
229 static void
230 sync_stats()
231 {
232   GNUNET_STATISTICS_set(stats, quota_stat_name, payload, GNUNET_YES);
233   GNUNET_STATISTICS_set(stats,
234                         "# utilization by current datastore",
235                         payload,
236                         GNUNET_NO);
237   last_sync = 0;
238 }
239
240
241 /**
242  * Have we already cleaned up the TCCs and are hence no longer
243  * willing (or able) to transmit anything to anyone?
244  */
245 static int cleaning_done;
246
247 /**
248  * Handle for pending get request.
249  */
250 static struct GNUNET_STATISTICS_GetHandle *stat_get;
251
252 /**
253  * Handle to our server.
254  */
255 static struct GNUNET_SERVICE_Handle *service;
256
257 /**
258  * Task that is used to remove expired entries from
259  * the datastore.  This task will schedule itself
260  * again automatically to always delete all expired
261  * content quickly.
262  *
263  * @param cls not used
264  */
265 static void
266 delete_expired(void *cls);
267
268
269 /**
270  * Iterate over the expired items stored in the datastore.
271  * Delete all expired items; once we have processed all
272  * expired items, re-schedule the "delete_expired" task.
273  *
274  * @param cls not used
275  * @param key key for the content
276  * @param size number of bytes in data
277  * @param data content stored
278  * @param type type of the content
279  * @param priority priority of the content
280  * @param anonymity anonymity-level for the content
281  * @param replication replication-level for the content
282  * @param expiration expiration time for the content
283  * @param uid unique identifier for the datum;
284  *        maybe 0 if no unique identifier is available
285  *
286  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
287  *         (continue on call to "next", of course),
288  *         #GNUNET_NO to delete the item and continue (if supported)
289  */
290 static int
291 expired_processor(void *cls,
292                   const struct GNUNET_HashCode *key,
293                   uint32_t size,
294                   const void *data,
295                   enum GNUNET_BLOCK_Type type,
296                   uint32_t priority,
297                   uint32_t anonymity,
298                   uint32_t replication,
299                   struct GNUNET_TIME_Absolute expiration,
300                   uint64_t uid)
301 {
302   struct GNUNET_TIME_Absolute now;
303
304   if (NULL == key)
305     {
306       expired_kill_task =
307         GNUNET_SCHEDULER_add_delayed_with_priority(MAX_EXPIRE_DELAY,
308                                                    GNUNET_SCHEDULER_PRIORITY_IDLE,
309                                                    &delete_expired,
310                                                    NULL);
311       return GNUNET_SYSERR;
312     }
313   now = GNUNET_TIME_absolute_get();
314   if (expiration.abs_value_us > now.abs_value_us)
315     {
316       /* finished processing */
317       expired_kill_task =
318         GNUNET_SCHEDULER_add_delayed_with_priority(MAX_EXPIRE_DELAY,
319                                                    GNUNET_SCHEDULER_PRIORITY_IDLE,
320                                                    &delete_expired,
321                                                    NULL);
322       return GNUNET_SYSERR;
323     }
324   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
325              "Deleting content `%s' of type %u that expired %s ago\n",
326              GNUNET_h2s(key),
327              type,
328              GNUNET_STRINGS_relative_time_to_string(
329                GNUNET_TIME_absolute_get_difference(expiration, now),
330                GNUNET_YES));
331   min_expiration = now;
332   GNUNET_STATISTICS_update(stats,
333                            gettext_noop("# bytes expired"),
334                            size,
335                            GNUNET_YES);
336   GNUNET_CONTAINER_bloomfilter_remove(filter, key);
337   expired_kill_task =
338     GNUNET_SCHEDULER_add_delayed_with_priority(MIN_EXPIRE_DELAY,
339                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
340                                                &delete_expired,
341                                                NULL);
342   return GNUNET_NO;
343 }
344
345
346 /**
347  * Task that is used to remove expired entries from
348  * the datastore.  This task will schedule itself
349  * again automatically to always delete all expired
350  * content quickly.
351  *
352  * @param cls not used
353  */
354 static void
355 delete_expired(void *cls)
356 {
357   expired_kill_task = NULL;
358   plugin->api->get_expiration(plugin->api->cls, &expired_processor, NULL);
359 }
360
361
362 /**
363  * An iterator over a set of items stored in the datastore
364  * that deletes until we're happy with respect to our quota.
365  *
366  * @param cls closure
367  * @param key key for the content
368  * @param size number of bytes in data
369  * @param data content stored
370  * @param type type of the content
371  * @param priority priority of the content
372  * @param anonymity anonymity-level for the content
373  * @param replication replication-level for the content
374  * @param expiration expiration time for the content
375  * @param uid unique identifier for the datum;
376  *        maybe 0 if no unique identifier is available
377  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
378  *         (continue on call to "next", of course),
379  *         #GNUNET_NO to delete the item and continue (if supported)
380  */
381 static int
382 quota_processor(void *cls,
383                 const struct GNUNET_HashCode *key,
384                 uint32_t size,
385                 const void *data,
386                 enum GNUNET_BLOCK_Type type,
387                 uint32_t priority,
388                 uint32_t anonymity,
389                 uint32_t replication,
390                 struct GNUNET_TIME_Absolute expiration,
391                 uint64_t uid)
392 {
393   unsigned long long *need = cls;
394
395   if (NULL == key)
396     return GNUNET_SYSERR;
397   GNUNET_log(
398     GNUNET_ERROR_TYPE_DEBUG,
399     "Deleting %llu bytes of low-priority (%u) content `%s' of type %u at %s prior to expiration (still trying to free another %llu bytes)\n",
400     (unsigned long long)(size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
401     (unsigned int)priority,
402     GNUNET_h2s(key),
403     type,
404     GNUNET_STRINGS_relative_time_to_string(GNUNET_TIME_absolute_get_remaining(
405                                              expiration),
406                                            GNUNET_YES),
407     *need);
408   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
409     *need = 0;
410   else
411     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
412   if (priority > 0)
413     min_expiration = GNUNET_TIME_UNIT_FOREVER_ABS;
414   else
415     min_expiration = expiration;
416   GNUNET_STATISTICS_update(stats,
417                            gettext_noop("# bytes purged (low-priority)"),
418                            size,
419                            GNUNET_YES);
420   GNUNET_CONTAINER_bloomfilter_remove(filter, key);
421   return GNUNET_NO;
422 }
423
424
425 /**
426  * Manage available disk space by running tasks
427  * that will discard content if necessary.  This
428  * function will be run whenever a request for
429  * "need" bytes of storage could only be satisfied
430  * by eating into the "cache" (and we want our cache
431  * space back).
432  *
433  * @param need number of bytes of content that were
434  *        placed into the "cache" (and hence the
435  *        number of bytes that should be removed).
436  */
437 static void
438 manage_space(unsigned long long need)
439 {
440   unsigned long long last;
441
442   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
443              "Asked to free up %llu bytes of cache space\n",
444              need);
445   last = 0;
446   while ((need > 0) && (last != need))
447     {
448       last = need;
449       plugin->api->get_expiration(plugin->api->cls, &quota_processor, &need);
450     }
451 }
452
453
454 /**
455  * Transmit a status code to the client.
456  *
457  * @param client receiver of the response
458  * @param code status code
459  * @param msg optional error message (can be NULL)
460  */
461 static void
462 transmit_status(struct GNUNET_SERVICE_Client *client,
463                 int code,
464                 const char *msg)
465 {
466   struct GNUNET_MQ_Envelope *env;
467   struct StatusMessage *sm;
468   size_t slen;
469
470   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
471              "Transmitting `%s' message with value %d and message `%s'\n",
472              "STATUS",
473              code,
474              msg != NULL ? msg : "(none)");
475   slen = (msg == NULL) ? 0 : strlen(msg) + 1;
476   env = GNUNET_MQ_msg_extra(sm, slen, GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
477   sm->status = htonl(code);
478   sm->min_expiration = GNUNET_TIME_absolute_hton(min_expiration);
479   GNUNET_memcpy(&sm[1], msg, slen);
480   GNUNET_MQ_send(GNUNET_SERVICE_client_get_mq(client), env);
481 }
482
483
484 /**
485  * Function that will transmit the given datastore entry
486  * to the client.
487  *
488  * @param cls closure, pointer to the client (of type `struct GNUNET_SERVICE_Client`).
489  * @param key key for the content
490  * @param size number of bytes in data
491  * @param data content stored
492  * @param type type of the content
493  * @param priority priority of the content
494  * @param anonymity anonymity-level for the content
495  * @param replication replication-level for the content
496  * @param expiration expiration time for the content
497  * @param uid unique identifier for the datum;
498  *        maybe 0 if no unique identifier is available
499  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue,
500  *         #GNUNET_NO to delete the item and continue (if supported)
501  */
502 static int
503 transmit_item(void *cls,
504               const struct GNUNET_HashCode *key,
505               uint32_t size,
506               const void *data,
507               enum GNUNET_BLOCK_Type type,
508               uint32_t priority,
509               uint32_t anonymity,
510               uint32_t replication,
511               struct GNUNET_TIME_Absolute expiration,
512               uint64_t uid)
513 {
514   struct GNUNET_SERVICE_Client *client = cls;
515   struct GNUNET_MQ_Envelope *env;
516   struct GNUNET_MessageHeader *end;
517   struct DataMessage *dm;
518
519   if (NULL == key)
520     {
521       /* transmit 'DATA_END' */
522       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Transmitting DATA_END message\n");
523       env = GNUNET_MQ_msg(end, GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
524       GNUNET_MQ_send(GNUNET_SERVICE_client_get_mq(client), env);
525       return GNUNET_OK;
526     }
527   GNUNET_assert(sizeof(struct DataMessage) + size < GNUNET_MAX_MESSAGE_SIZE);
528   env = GNUNET_MQ_msg_extra(dm, size, GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
529   dm->rid = htonl(0);
530   dm->size = htonl(size);
531   dm->type = htonl(type);
532   dm->priority = htonl(priority);
533   dm->anonymity = htonl(anonymity);
534   dm->replication = htonl(replication);
535   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
536   dm->uid = GNUNET_htonll(uid);
537   dm->key = *key;
538   GNUNET_memcpy(&dm[1], data, size);
539   GNUNET_log(
540     GNUNET_ERROR_TYPE_DEBUG,
541     "Transmitting DATA message for `%s' of type %u with expiration %s (in: %s)\n",
542     GNUNET_h2s(key),
543     type,
544     GNUNET_STRINGS_absolute_time_to_string(expiration),
545     GNUNET_STRINGS_relative_time_to_string(GNUNET_TIME_absolute_get_remaining(
546                                              expiration),
547                                            GNUNET_YES));
548   GNUNET_STATISTICS_update(stats,
549                            gettext_noop("# results found"),
550                            1,
551                            GNUNET_NO);
552   GNUNET_MQ_send(GNUNET_SERVICE_client_get_mq(client), env);
553   return GNUNET_OK;
554 }
555
556
557 /**
558  * Handle RESERVE-message.
559  *
560  * @param cls identification of the client
561  * @param message the actual message
562  */
563 static void
564 handle_reserve(void *cls, const struct ReserveMessage *msg)
565 {
566   /**
567    * Static counter to produce reservation identifiers.
568    */
569   static int reservation_gen;
570   struct GNUNET_SERVICE_Client *client = cls;
571   struct ReservationList *e;
572   unsigned long long used;
573   unsigned long long req;
574   uint64_t amount;
575   uint32_t entries;
576
577   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Processing RESERVE request\n");
578   amount = GNUNET_ntohll(msg->amount);
579   entries = ntohl(msg->entries);
580   used = payload + reserved;
581   req =
582     amount + ((unsigned long long)GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
583   if (used + req > quota)
584     {
585       if (quota < used)
586         used =
587           quota; /* cheat a bit for error message (to avoid negative numbers) */
588       GNUNET_log(
589         GNUNET_ERROR_TYPE_WARNING,
590         _(
591           "Insufficient space (%llu bytes are available) to satisfy RESERVE request for %llu bytes\n"),
592         quota - used,
593         req);
594       if (cache_size < req)
595         {
596           /* TODO: document this in the FAQ; essentially, if this
597            * message happens, the insertion request could be blocked
598            * by less-important content from migration because it is
599            * larger than 1/8th of the overall available space, and
600            * we only reserve 1/8th for "fresh" insertions */
601           GNUNET_log(
602             GNUNET_ERROR_TYPE_WARNING,
603             _(
604               "The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
605             req,
606             cache_size);
607           transmit_status(client,
608                           0,
609                           gettext_noop(
610                             "Insufficient space to satisfy request and "
611                             "requested amount is larger than cache size"));
612         }
613       else
614         {
615           transmit_status(client,
616                           0,
617                           gettext_noop("Insufficient space to satisfy request"));
618         }
619       GNUNET_SERVICE_client_continue(client);
620       return;
621     }
622   reserved += req;
623   GNUNET_STATISTICS_set(stats,
624                         gettext_noop("# reserved"),
625                         reserved,
626                         GNUNET_NO);
627   e = GNUNET_new(struct ReservationList);
628   e->next = reservations;
629   reservations = e;
630   e->client = client;
631   e->amount = amount;
632   e->entries = entries;
633   e->rid = ++reservation_gen;
634   if (reservation_gen < 0)
635     reservation_gen = 0; /* wrap around */
636   transmit_status(client, e->rid, NULL);
637   GNUNET_SERVICE_client_continue(client);
638 }
639
640
641 /**
642  * Handle RELEASE_RESERVE-message.
643  *
644  * @param cls identification of the client
645  * @param message the actual message
646  */
647 static void
648 handle_release_reserve(void *cls, const struct ReleaseReserveMessage *msg)
649 {
650   struct GNUNET_SERVICE_Client *client = cls;
651   struct ReservationList *pos;
652   struct ReservationList *prev;
653   struct ReservationList *next;
654   int rid = ntohl(msg->rid);
655   unsigned long long rem;
656
657   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Processing RELEASE_RESERVE request\n");
658   next = reservations;
659   prev = NULL;
660   while (NULL != (pos = next))
661     {
662       next = pos->next;
663       if (rid == pos->rid)
664         {
665           if (prev == NULL)
666             reservations = next;
667           else
668             prev->next = next;
669           rem =
670             pos->amount +
671             ((unsigned long long)GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
672           GNUNET_assert(reserved >= rem);
673           reserved -= rem;
674           GNUNET_STATISTICS_set(stats,
675                                 gettext_noop("# reserved"),
676                                 reserved,
677                                 GNUNET_NO);
678           GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
679                      "Returning %llu remaining reserved bytes to storage pool\n",
680                      rem);
681           GNUNET_free(pos);
682           transmit_status(client, GNUNET_OK, NULL);
683           GNUNET_SERVICE_client_continue(client);
684           return;
685         }
686       prev = pos;
687     }
688   GNUNET_break(0);
689   transmit_status(client,
690                   GNUNET_SYSERR,
691                   gettext_noop("Could not find matching reservation"));
692   GNUNET_SERVICE_client_continue(client);
693 }
694
695
696 /**
697  * Check that the given message is a valid data message.
698  *
699  * @param dm message to check
700  * @return #GNUNET_SYSERR is not well-formed, otherwise #GNUNET_OK
701  */
702 static int
703 check_data(const struct DataMessage *dm)
704 {
705   uint16_t size;
706   uint32_t dsize;
707
708   size = ntohs(dm->header.size);
709   dsize = ntohl(dm->size);
710   if (size != dsize + sizeof(struct DataMessage))
711     {
712       GNUNET_break(0);
713       return GNUNET_SYSERR;
714     }
715   return GNUNET_OK;
716 }
717
718
719 /**
720  * Put continuation.
721  *
722  * @param cls closure
723  * @param key key for the item stored
724  * @param size size of the item stored
725  * @param status #GNUNET_OK if inserted, #GNUNET_NO if updated,
726  *        or #GNUNET_SYSERROR if error
727  * @param msg error message on error
728  */
729 static void
730 put_continuation(void *cls,
731                  const struct GNUNET_HashCode *key,
732                  uint32_t size,
733                  int status,
734                  const char *msg)
735 {
736   struct GNUNET_SERVICE_Client *client = cls;
737
738   if (GNUNET_OK == status)
739     {
740       GNUNET_STATISTICS_update(stats,
741                                gettext_noop("# bytes stored"),
742                                size,
743                                GNUNET_YES);
744       GNUNET_CONTAINER_bloomfilter_add(filter, key);
745       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
746                  "Successfully stored %u bytes under key `%s'\n",
747                  size,
748                  GNUNET_h2s(key));
749     }
750   transmit_status(client,
751                   GNUNET_SYSERR == status ? GNUNET_SYSERR : GNUNET_OK,
752                   msg);
753   if (quota - reserved - cache_size < payload)
754     {
755       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
756                  _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
757                  (unsigned long long)size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
758                  (unsigned long long)(quota - reserved - cache_size),
759                  (unsigned long long)payload);
760       manage_space(size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
761     }
762 }
763
764
765 /**
766  * Verify PUT-message.
767  *
768  * @param cls identification of the client
769  * @param message the actual message
770  * @return #GNUNET_OK if @a dm is well-formed
771  */
772 static int
773 check_put(void *cls, const struct DataMessage *dm)
774 {
775   if (GNUNET_OK != check_data(dm))
776     {
777       GNUNET_break(0);
778       return GNUNET_SYSERR;
779     }
780   return GNUNET_OK;
781 }
782
783
784 /**
785  * Handle PUT-message.
786  *
787  * @param cls identification of the client
788  * @param message the actual message
789  */
790 static void
791 handle_put(void *cls, const struct DataMessage *dm)
792 {
793   struct GNUNET_SERVICE_Client *client = cls;
794   int rid;
795   struct ReservationList *pos;
796   uint32_t size;
797
798   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
799              "Processing PUT request for `%s' of type %u\n",
800              GNUNET_h2s(&dm->key),
801              (uint32_t)ntohl(dm->type));
802   rid = ntohl(dm->rid);
803   size = ntohl(dm->size);
804   if (rid > 0)
805     {
806       pos = reservations;
807       while ((NULL != pos) && (rid != pos->rid))
808         pos = pos->next;
809       GNUNET_break(pos != NULL);
810       if (NULL != pos)
811         {
812           GNUNET_break(pos->entries > 0);
813           GNUNET_break(pos->amount >= size);
814           pos->entries--;
815           pos->amount -= size;
816           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
817           GNUNET_STATISTICS_set(stats,
818                                 gettext_noop("# reserved"),
819                                 reserved,
820                                 GNUNET_NO);
821         }
822     }
823   bool absent =
824     GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test(filter, &dm->key);
825   plugin->api->put(plugin->api->cls,
826                    &dm->key,
827                    absent,
828                    ntohl(dm->size),
829                    &dm[1],
830                    ntohl(dm->type),
831                    ntohl(dm->priority),
832                    ntohl(dm->anonymity),
833                    ntohl(dm->replication),
834                    GNUNET_TIME_absolute_ntoh(dm->expiration),
835                    &put_continuation,
836                    client);
837   GNUNET_SERVICE_client_continue(client);
838 }
839
840
841 /**
842  * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET-message.
843  *
844  * @param cls identification of the client
845  * @param msg the actual message
846  */
847 static void
848 handle_get(void *cls, const struct GetMessage *msg)
849 {
850   struct GNUNET_SERVICE_Client *client = cls;
851
852   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
853              "Processing GET request of type %u\n",
854              (uint32_t)ntohl(msg->type));
855   GNUNET_STATISTICS_update(stats,
856                            gettext_noop("# GET requests received"),
857                            1,
858                            GNUNET_NO);
859   plugin->api->get_key(plugin->api->cls,
860                        GNUNET_ntohll(msg->next_uid),
861                        msg->random,
862                        NULL,
863                        ntohl(msg->type),
864                        &transmit_item,
865                        client);
866   GNUNET_SERVICE_client_continue(client);
867 }
868
869
870 /**
871  * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY-message.
872  *
873  * @param cls closure
874  * @param msg the actual message
875  */
876 static void
877 handle_get_key(void *cls, const struct GetKeyMessage *msg)
878 {
879   struct GNUNET_SERVICE_Client *client = cls;
880
881   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
882              "Processing GET request for `%s' of type %u\n",
883              GNUNET_h2s(&msg->key),
884              (uint32_t)ntohl(msg->type));
885   GNUNET_STATISTICS_update(stats,
886                            gettext_noop("# GET KEY requests received"),
887                            1,
888                            GNUNET_NO);
889   if (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test(filter, &msg->key))
890     {
891       /* don't bother database... */
892       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
893                  "Empty result set for GET request for `%s' (bloomfilter).\n",
894                  GNUNET_h2s(&msg->key));
895       GNUNET_STATISTICS_update(stats,
896                                gettext_noop(
897                                  "# requests filtered by bloomfilter"),
898                                1,
899                                GNUNET_NO);
900       transmit_item(client,
901                     NULL,
902                     0,
903                     NULL,
904                     0,
905                     0,
906                     0,
907                     0,
908                     GNUNET_TIME_UNIT_ZERO_ABS,
909                     0);
910       GNUNET_SERVICE_client_continue(client);
911       return;
912     }
913   plugin->api->get_key(plugin->api->cls,
914                        GNUNET_ntohll(msg->next_uid),
915                        msg->random,
916                        &msg->key,
917                        ntohl(msg->type),
918                        &transmit_item,
919                        client);
920   GNUNET_SERVICE_client_continue(client);
921 }
922
923
924 /**
925  * Handle GET_REPLICATION-message.
926  *
927  * @param cls identification of the client
928  * @param message the actual message
929  */
930 static void
931 handle_get_replication(void *cls, const struct GNUNET_MessageHeader *message)
932 {
933   struct GNUNET_SERVICE_Client *client = cls;
934
935   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Processing GET_REPLICATION request\n");
936   GNUNET_STATISTICS_update(stats,
937                            gettext_noop(
938                              "# GET REPLICATION requests received"),
939                            1,
940                            GNUNET_NO);
941   plugin->api->get_replication(plugin->api->cls, &transmit_item, client);
942   GNUNET_SERVICE_client_continue(client);
943 }
944
945
946 /**
947  * Handle GET_ZERO_ANONYMITY-message.
948  *
949  * @param cls client identification of the client
950  * @param message the actual message
951  */
952 static void
953 handle_get_zero_anonymity(void *cls, const struct GetZeroAnonymityMessage *msg)
954 {
955   struct GNUNET_SERVICE_Client *client = cls;
956   enum GNUNET_BLOCK_Type type;
957
958   type = (enum GNUNET_BLOCK_Type)ntohl(msg->type);
959   if (type == GNUNET_BLOCK_TYPE_ANY)
960     {
961       GNUNET_break(0);
962       GNUNET_SERVICE_client_drop(client);
963       return;
964     }
965   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
966              "Processing GET_ZERO_ANONYMITY request\n");
967   GNUNET_STATISTICS_update(stats,
968                            gettext_noop(
969                              "# GET ZERO ANONYMITY requests received"),
970                            1,
971                            GNUNET_NO);
972   plugin->api->get_zero_anonymity(plugin->api->cls,
973                                   GNUNET_ntohll(msg->next_uid),
974                                   type,
975                                   &transmit_item,
976                                   client);
977   GNUNET_SERVICE_client_continue(client);
978 }
979
980
981 /**
982  * Remove continuation.
983  *
984  * @param cls closure
985  * @param key key for the content
986  * @param size number of bytes in data
987  * @param status #GNUNET_OK if removed, #GNUNET_NO if not found,
988  *        or #GNUNET_SYSERROR if error
989  * @param msg error message on error
990  */
991 static void
992 remove_continuation(void *cls,
993                     const struct GNUNET_HashCode *key,
994                     uint32_t size,
995                     int status,
996                     const char *msg)
997 {
998   struct GNUNET_SERVICE_Client *client = cls;
999
1000   if (GNUNET_SYSERR == status)
1001     {
1002       GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "REMOVE request failed: %s.\n", msg);
1003       transmit_status(client, GNUNET_NO, msg);
1004       return;
1005     }
1006   if (GNUNET_NO == status)
1007     {
1008       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1009                  "Content not found for REMOVE request.\n");
1010       transmit_status(client, GNUNET_NO, _("Content not found"));
1011       return;
1012     }
1013   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1014              "Item matches REMOVE request for key `%s'.\n",
1015              GNUNET_h2s(key));
1016   GNUNET_STATISTICS_update(stats,
1017                            gettext_noop("# bytes removed (explicit request)"),
1018                            size,
1019                            GNUNET_YES);
1020   GNUNET_CONTAINER_bloomfilter_remove(filter, key);
1021   transmit_status(client, GNUNET_OK, NULL);
1022 }
1023
1024
1025 /**
1026  * Verify REMOVE-message.
1027  *
1028  * @param cls identification of the client
1029  * @param message the actual message
1030  * @return #GNUNET_OK if @a dm is well-formed
1031  */
1032 static int
1033 check_remove(void *cls, const struct DataMessage *dm)
1034 {
1035   if (GNUNET_OK != check_data(dm))
1036     {
1037       GNUNET_break(0);
1038       return GNUNET_SYSERR;
1039     }
1040   return GNUNET_OK;
1041 }
1042
1043
1044 /**
1045  * Handle REMOVE-message.
1046  *
1047  * @param cls closure
1048  * @param client identification of the client
1049  * @param message the actual message
1050  */
1051 static void
1052 handle_remove(void *cls, const struct DataMessage *dm)
1053 {
1054   struct GNUNET_SERVICE_Client *client = cls;
1055
1056   GNUNET_STATISTICS_update(stats,
1057                            gettext_noop("# REMOVE requests received"),
1058                            1,
1059                            GNUNET_NO);
1060   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1061              "Processing REMOVE request for `%s'\n",
1062              GNUNET_h2s(&dm->key));
1063   plugin->api->remove_key(plugin->api->cls,
1064                           &dm->key,
1065                           ntohl(dm->size),
1066                           &dm[1],
1067                           &remove_continuation,
1068                           client);
1069   GNUNET_SERVICE_client_continue(client);
1070 }
1071
1072
1073 /**
1074  * Handle DROP-message.
1075  *
1076  * @param cls identification of the client
1077  * @param message the actual message
1078  */
1079 static void
1080 handle_drop(void *cls, const struct GNUNET_MessageHeader *message)
1081 {
1082   struct GNUNET_SERVICE_Client *client = cls;
1083
1084   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Processing DROP request\n");
1085   do_drop = GNUNET_YES;
1086   GNUNET_SERVICE_client_continue(client);
1087 }
1088
1089
1090 /**
1091  * Function called by plugins to notify us about a
1092  * change in their disk utilization.
1093  *
1094  * @param cls closure (NULL)
1095  * @param delta change in disk utilization,
1096  *        0 for "reset to empty"
1097  */
1098 static void
1099 disk_utilization_change_cb(void *cls, int delta)
1100 {
1101   if ((delta < 0) && (payload < -delta))
1102     {
1103       GNUNET_log(
1104         GNUNET_ERROR_TYPE_WARNING,
1105         _(
1106           "Datastore payload must have been inaccurate (%lld < %lld). Recomputing it.\n"),
1107         (long long)payload,
1108         (long long)-delta);
1109       plugin->api->estimate_size(plugin->api->cls, &payload);
1110       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
1111                  _("New payload: %lld\n"),
1112                  (long long)payload);
1113       sync_stats();
1114       return;
1115     }
1116   payload += delta;
1117   last_sync++;
1118   if (last_sync >= MAX_STAT_SYNC_LAG)
1119     sync_stats();
1120 }
1121
1122
1123 /**
1124  * Callback function to process statistic values.
1125  *
1126  * @param cls closure (struct Plugin*)
1127  * @param subsystem name of subsystem that created the statistic
1128  * @param name the name of the datum
1129  * @param value the current value
1130  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
1131  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
1132  */
1133 static int
1134 process_stat_in(void *cls,
1135                 const char *subsystem,
1136                 const char *name,
1137                 uint64_t value,
1138                 int is_persistent)
1139 {
1140   GNUNET_assert(GNUNET_NO == stats_worked);
1141   stats_worked = GNUNET_YES;
1142   payload += value;
1143   GNUNET_log(
1144     GNUNET_ERROR_TYPE_DEBUG,
1145     "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1146     (unsigned long long)value,
1147     (unsigned long long)payload);
1148   return GNUNET_OK;
1149 }
1150
1151
1152 /**
1153  * Load the datastore plugin.
1154  */
1155 static struct DatastorePlugin *
1156 load_plugin()
1157 {
1158   struct DatastorePlugin *ret;
1159   char *libname;
1160
1161   ret = GNUNET_new(struct DatastorePlugin);
1162   ret->env.cfg = cfg;
1163   ret->env.duc = &disk_utilization_change_cb;
1164   ret->env.cls = NULL;
1165   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
1166              _("Loading `%s' datastore plugin\n"),
1167              plugin_name);
1168   GNUNET_asprintf(&libname, "libgnunet_plugin_datastore_%s", plugin_name);
1169   ret->short_name = GNUNET_strdup(plugin_name);
1170   ret->lib_name = libname;
1171   ret->api = GNUNET_PLUGIN_load(libname, &ret->env);
1172   if (NULL == ret->api)
1173     {
1174       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1175                  _("Failed to load datastore plugin for `%s'\n"),
1176                  plugin_name);
1177       GNUNET_free(ret->short_name);
1178       GNUNET_free(libname);
1179       GNUNET_free(ret);
1180       return NULL;
1181     }
1182   return ret;
1183 }
1184
1185
1186 /**
1187  * Function called when the service shuts
1188  * down.  Unloads our datastore plugin.
1189  *
1190  * @param plug plugin to unload
1191  */
1192 static void
1193 unload_plugin(struct DatastorePlugin *plug)
1194 {
1195   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1196              "Datastore service is unloading plugin...\n");
1197   GNUNET_break(NULL == GNUNET_PLUGIN_unload(plug->lib_name, plug->api));
1198   GNUNET_free(plug->lib_name);
1199   GNUNET_free(plug->short_name);
1200   GNUNET_free(plug);
1201 }
1202
1203
1204 /**
1205  * Initialization complete, start operating the service.
1206  */
1207 static void
1208 begin_service()
1209 {
1210   GNUNET_SERVICE_resume(service);
1211   expired_kill_task =
1212     GNUNET_SCHEDULER_add_with_priority(GNUNET_SCHEDULER_PRIORITY_IDLE,
1213                                        &delete_expired,
1214                                        NULL);
1215 }
1216
1217
1218 /**
1219  * Adds a given @a key to the bloomfilter in @a cls @a count times.
1220  *
1221  * @param cls the bloomfilter
1222  * @param key key to add
1223  * @param count number of times to add key
1224  */
1225 static void
1226 add_key_to_bloomfilter(void *cls,
1227                        const struct GNUNET_HashCode *key,
1228                        unsigned int count)
1229 {
1230   struct GNUNET_CONTAINER_BloomFilter *bf = cls;
1231
1232   if (NULL == key)
1233     {
1234       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
1235                  _("Bloomfilter construction complete.\n"));
1236       begin_service();
1237       return;
1238     }
1239
1240   while (0 < count--)
1241     GNUNET_CONTAINER_bloomfilter_add(bf, key);
1242 }
1243
1244
1245 /**
1246  * We finished receiving the statistic.  Initialize the plugin; if
1247  * loading the statistic failed, run the estimator.
1248  *
1249  * @param cls NULL
1250  * @param success #GNUNET_NO if we failed to read the stat
1251  */
1252 static void
1253 process_stat_done(void *cls, int success)
1254 {
1255   stat_get = NULL;
1256   if (NULL != stat_timeout_task)
1257     {
1258       GNUNET_SCHEDULER_cancel(stat_timeout_task);
1259       stat_timeout_task = NULL;
1260     }
1261   plugin = load_plugin();
1262   if (NULL == plugin)
1263     {
1264       GNUNET_CONTAINER_bloomfilter_free(filter);
1265       filter = NULL;
1266       if (NULL != stats)
1267         {
1268           GNUNET_STATISTICS_destroy(stats, GNUNET_YES);
1269           stats = NULL;
1270         }
1271       return;
1272     }
1273
1274   if (GNUNET_NO == stats_worked)
1275     {
1276       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1277                  "Failed to obtain value from statistics service, recomputing it\n");
1278       plugin->api->estimate_size(plugin->api->cls, &payload);
1279       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
1280                  _("New payload: %lld\n"),
1281                  (long long)payload);
1282     }
1283
1284   if (GNUNET_YES == refresh_bf)
1285     {
1286       GNUNET_log(GNUNET_ERROR_TYPE_INFO,
1287                  _("Rebuilding bloomfilter.  Please be patient.\n"));
1288       if (NULL != plugin->api->get_keys)
1289         {
1290           plugin->api->get_keys(plugin->api->cls, &add_key_to_bloomfilter, filter);
1291           return;
1292         }
1293       else
1294         {
1295           GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1296                      _(
1297                        "Plugin does not support get_keys function. Please fix!\n"));
1298         }
1299     }
1300   begin_service();
1301 }
1302
1303
1304 /**
1305  * Fetching stats took to long, run without.
1306  *
1307  * @param cls NULL
1308  */
1309 static void
1310 stat_timeout(void *cls)
1311 {
1312   stat_timeout_task = NULL;
1313   GNUNET_STATISTICS_get_cancel(stat_get);
1314   process_stat_done(NULL, GNUNET_NO);
1315 }
1316
1317
1318 /**
1319  * Task run during shutdown.
1320  */
1321 static void
1322 cleaning_task(void *cls)
1323 {
1324   cleaning_done = GNUNET_YES;
1325   if (NULL != expired_kill_task)
1326     {
1327       GNUNET_SCHEDULER_cancel(expired_kill_task);
1328       expired_kill_task = NULL;
1329     }
1330   if (GNUNET_YES == do_drop)
1331     {
1332       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Dropping database!\n");
1333       plugin->api->drop(plugin->api->cls);
1334       payload = 0;
1335       last_sync++;
1336     }
1337   if (NULL != plugin)
1338     {
1339       unload_plugin(plugin);
1340       plugin = NULL;
1341     }
1342   if (NULL != filter)
1343     {
1344       GNUNET_CONTAINER_bloomfilter_free(filter);
1345       filter = NULL;
1346     }
1347   if (NULL != stat_get)
1348     {
1349       GNUNET_STATISTICS_get_cancel(stat_get);
1350       stat_get = NULL;
1351     }
1352   if (NULL != stat_timeout_task)
1353     {
1354       GNUNET_SCHEDULER_cancel(stat_timeout_task);
1355       stat_timeout_task = NULL;
1356     }
1357   GNUNET_free_non_null(plugin_name);
1358   plugin_name = NULL;
1359   if (last_sync > 0)
1360     sync_stats();
1361   if (NULL != stats)
1362     {
1363       GNUNET_STATISTICS_destroy(stats, GNUNET_YES);
1364       stats = NULL;
1365     }
1366   GNUNET_free(quota_stat_name);
1367   quota_stat_name = NULL;
1368 }
1369
1370
1371 /**
1372  * Add a client to our list of active clients.
1373  *
1374  * @param cls NULL
1375  * @param client client to add
1376  * @param mq message queue for @a client
1377  * @return @a client
1378  */
1379 static void *
1380 client_connect_cb(void *cls,
1381                   struct GNUNET_SERVICE_Client *client,
1382                   struct GNUNET_MQ_Handle *mq)
1383 {
1384   return client;
1385 }
1386
1387
1388 /**
1389  * Called whenever a client is disconnected.
1390  * Frees our resources associated with that client.
1391  *
1392  * @param cls closure
1393  * @param client identification of the client
1394  * @param app_ctx must match @a client
1395  */
1396 static void
1397 client_disconnect_cb(void *cls,
1398                      struct GNUNET_SERVICE_Client *client,
1399                      void *app_ctx)
1400 {
1401   struct ReservationList *pos;
1402   struct ReservationList *prev;
1403   struct ReservationList *next;
1404
1405   GNUNET_assert(app_ctx == client);
1406   prev = NULL;
1407   pos = reservations;
1408   while (NULL != pos)
1409     {
1410       next = pos->next;
1411       if (pos->client == client)
1412         {
1413           if (NULL == prev)
1414             reservations = next;
1415           else
1416             prev->next = next;
1417           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1418           GNUNET_free(pos);
1419         }
1420       else
1421         {
1422           prev = pos;
1423         }
1424       pos = next;
1425     }
1426   GNUNET_STATISTICS_set(stats,
1427                         gettext_noop("# reserved"),
1428                         reserved,
1429                         GNUNET_NO);
1430 }
1431
1432
1433 /**
1434  * Process datastore requests.
1435  *
1436  * @param cls closure
1437  * @param serv the initialized service
1438  * @param c configuration to use
1439  */
1440 static void
1441 run(void *cls,
1442     const struct GNUNET_CONFIGURATION_Handle *c,
1443     struct GNUNET_SERVICE_Handle *serv)
1444 {
1445   char *fn;
1446   char *pfn;
1447   unsigned int bf_size;
1448
1449   service = serv;
1450   cfg = c;
1451   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string(cfg,
1452                                                          "DATASTORE",
1453                                                          "DATABASE",
1454                                                          &plugin_name))
1455     {
1456       GNUNET_log_config_missing(GNUNET_ERROR_TYPE_ERROR,
1457                                 "DATABASE",
1458                                 "DATASTORE");
1459       return;
1460     }
1461   GNUNET_asprintf(&quota_stat_name,
1462                   _("# bytes used in file-sharing datastore `%s'"),
1463                   plugin_name);
1464   if (GNUNET_OK !=
1465       GNUNET_CONFIGURATION_get_value_size(cfg, "DATASTORE", "QUOTA", &quota))
1466     {
1467       GNUNET_log_config_missing(GNUNET_ERROR_TYPE_ERROR, "QUOTA", "DATASTORE");
1468       return;
1469     }
1470   stats = GNUNET_STATISTICS_create("datastore", cfg);
1471   GNUNET_STATISTICS_set(stats, gettext_noop("# quota"), quota, GNUNET_NO);
1472   cache_size = quota / 8; /* Or should we make this an option? */
1473   GNUNET_STATISTICS_set(stats,
1474                         gettext_noop("# cache size"),
1475                         cache_size,
1476                         GNUNET_NO);
1477   if (quota / (32 * 1024LL) > MAX_BF_SIZE)
1478     bf_size = MAX_BF_SIZE;
1479   else
1480     bf_size =
1481       quota / (32 * 1024LL); /* 8 bit per entry, 1 bit per 32 kb in DB */
1482   fn = NULL;
1483   if ((GNUNET_OK != GNUNET_CONFIGURATION_get_value_filename(cfg,
1484                                                             "DATASTORE",
1485                                                             "BLOOMFILTER",
1486                                                             &fn)) ||
1487       (GNUNET_OK != GNUNET_DISK_directory_create_for_file(fn)))
1488     {
1489       GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
1490                  _("Could not use specified filename `%s' for bloomfilter.\n"),
1491                  NULL != fn ? fn : "");
1492       GNUNET_free_non_null(fn);
1493       fn = NULL;
1494     }
1495   if (NULL != fn)
1496     {
1497       GNUNET_asprintf(&pfn, "%s.%s", fn, plugin_name);
1498       if (GNUNET_YES == GNUNET_DISK_file_test(pfn))
1499         {
1500           filter =
1501             GNUNET_CONTAINER_bloomfilter_load(pfn,
1502                                               bf_size,
1503                                               5); /* approx. 3% false positives at max use */
1504           if (NULL == filter)
1505             {
1506               /* file exists but not valid, remove and try again, but refresh */
1507               if (0 != unlink(pfn))
1508                 {
1509                   /* failed to remove, run without file */
1510                   GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1511                              _("Failed to remove bogus bloomfilter file `%s'\n"),
1512                              pfn);
1513                   GNUNET_free(pfn);
1514                   pfn = NULL;
1515                   filter = GNUNET_CONTAINER_bloomfilter_load(
1516                     NULL,
1517                     bf_size,
1518                     5); /* approx. 3% false positives at max use */
1519                   refresh_bf = GNUNET_YES;
1520                 }
1521               else
1522                 {
1523                   /* try again after remove */
1524                   filter = GNUNET_CONTAINER_bloomfilter_load(
1525                     pfn,
1526                     bf_size,
1527                     5); /* approx. 3% false positives at max use */
1528                   refresh_bf = GNUNET_YES;
1529                   if (NULL == filter)
1530                     {
1531                       /* failed yet again, give up on using file */
1532                       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1533                                  _("Failed to remove bogus bloomfilter file `%s'\n"),
1534                                  pfn);
1535                       GNUNET_free(pfn);
1536                       pfn = NULL;
1537                       filter = GNUNET_CONTAINER_bloomfilter_init(
1538                         NULL,
1539                         bf_size,
1540                         5); /* approx. 3% false positives at max use */
1541                     }
1542                 }
1543             }
1544           else
1545             {
1546               /* normal case: have an existing valid bf file, no need to refresh */
1547               refresh_bf = GNUNET_NO;
1548             }
1549         }
1550       else
1551         {
1552           filter =
1553             GNUNET_CONTAINER_bloomfilter_load(pfn,
1554                                               bf_size,
1555                                               5); /* approx. 3% false positives at max use */
1556           refresh_bf = GNUNET_YES;
1557         }
1558       GNUNET_free(pfn);
1559     }
1560   else
1561     {
1562       filter =
1563         GNUNET_CONTAINER_bloomfilter_init(NULL,
1564                                           bf_size,
1565                                           5); /* approx. 3% false positives at max use */
1566       refresh_bf = GNUNET_YES;
1567     }
1568   GNUNET_free_non_null(fn);
1569   if (NULL == filter)
1570     {
1571       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1572                  _("Failed to initialize bloomfilter.\n"));
1573       if (NULL != stats)
1574         {
1575           GNUNET_STATISTICS_destroy(stats, GNUNET_YES);
1576           stats = NULL;
1577         }
1578       return;
1579     }
1580   GNUNET_SERVICE_suspend(service);
1581   stat_get = GNUNET_STATISTICS_get(stats,
1582                                    "datastore",
1583                                    quota_stat_name,
1584                                    &process_stat_done,
1585                                    &process_stat_in,
1586                                    NULL);
1587   if (NULL == stat_get)
1588     process_stat_done(NULL, GNUNET_SYSERR);
1589   else
1590     stat_timeout_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_SECONDS,
1591                                                      &stat_timeout,
1592                                                      NULL);
1593   GNUNET_SCHEDULER_add_shutdown(&cleaning_task, NULL);
1594 }
1595
1596
1597 /**
1598  * Define "main" method using service macro.
1599  */
1600 GNUNET_SERVICE_MAIN(
1601   "datastore",
1602   GNUNET_SERVICE_OPTION_NONE,
1603   &run,
1604   &client_connect_cb,
1605   &client_disconnect_cb,
1606   NULL,
1607   GNUNET_MQ_hd_fixed_size(reserve,
1608                           GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1609                           struct ReserveMessage,
1610                           NULL),
1611   GNUNET_MQ_hd_fixed_size(release_reserve,
1612                           GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1613                           struct ReleaseReserveMessage,
1614                           NULL),
1615   GNUNET_MQ_hd_var_size(put,
1616                         GNUNET_MESSAGE_TYPE_DATASTORE_PUT,
1617                         struct DataMessage,
1618                         NULL),
1619   GNUNET_MQ_hd_fixed_size(get,
1620                           GNUNET_MESSAGE_TYPE_DATASTORE_GET,
1621                           struct GetMessage,
1622                           NULL),
1623   GNUNET_MQ_hd_fixed_size(get_key,
1624                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY,
1625                           struct GetKeyMessage,
1626                           NULL),
1627   GNUNET_MQ_hd_fixed_size(get_replication,
1628                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1629                           struct GNUNET_MessageHeader,
1630                           NULL),
1631   GNUNET_MQ_hd_fixed_size(get_zero_anonymity,
1632                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1633                           struct GetZeroAnonymityMessage,
1634                           NULL),
1635   GNUNET_MQ_hd_var_size(remove,
1636                         GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE,
1637                         struct DataMessage,
1638                         NULL),
1639   GNUNET_MQ_hd_fixed_size(drop,
1640                           GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1641                           struct GNUNET_MessageHeader,
1642                           NULL),
1643   GNUNET_MQ_handler_end());
1644
1645
1646 /* end of gnunet-service-datastore.c */