fix #4546
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51 /**
52  * Name under which we store current space consumption.
53  */
54 static char *quota_stat_name;
55
56 /**
57  * After how many payload-changing operations
58  * do we sync our statistics?
59  */
60 #define MAX_STAT_SYNC_LAG 50
61
62
63 /**
64  * Our datastore plugin.
65  */
66 struct DatastorePlugin
67 {
68
69   /**
70    * API of the transport as returned by the plugin's
71    * initialization function.
72    */
73   struct GNUNET_DATASTORE_PluginFunctions *api;
74
75   /**
76    * Short name for the plugin (i.e. "sqlite").
77    */
78   char *short_name;
79
80   /**
81    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
82    */
83   char *lib_name;
84
85   /**
86    * Environment this transport service is using
87    * for this plugin.
88    */
89   struct GNUNET_DATASTORE_PluginEnvironment env;
90
91 };
92
93
94 /**
95  * Linked list of active reservations.
96  */
97 struct ReservationList
98 {
99
100   /**
101    * This is a linked list.
102    */
103   struct ReservationList *next;
104
105   /**
106    * Client that made the reservation.
107    */
108   struct GNUNET_SERVER_Client *client;
109
110   /**
111    * Number of bytes (still) reserved.
112    */
113   uint64_t amount;
114
115   /**
116    * Number of items (still) reserved.
117    */
118   uint64_t entries;
119
120   /**
121    * Reservation identifier.
122    */
123   int32_t rid;
124
125 };
126
127
128
129 /**
130  * Our datastore plugin (NULL if not available).
131  */
132 static struct DatastorePlugin *plugin;
133
134 /**
135  * Linked list of space reservations made by clients.
136  */
137 static struct ReservationList *reservations;
138
139 /**
140  * Bloomfilter to quickly tell if we don't have the content.
141  */
142 static struct GNUNET_CONTAINER_BloomFilter *filter;
143
144 /**
145  * Name of our plugin.
146  */
147 static char *plugin_name;
148
149 /**
150  * Our configuration.
151  */
152 static const struct GNUNET_CONFIGURATION_Handle *cfg;
153
154 /**
155  * Handle for reporting statistics.
156  */
157 static struct GNUNET_STATISTICS_Handle *stats;
158
159 /**
160  * How much space are we using for the cache?  (space available for
161  * insertions that will be instantly reclaimed by discarding less
162  * important content --- or possibly whatever we just inserted into
163  * the "cache").
164  */
165 static unsigned long long cache_size;
166
167 /**
168  * How much space have we currently reserved?
169  */
170 static unsigned long long reserved;
171
172 /**
173  * How much data are we currently storing
174  * in the database?
175  */
176 static unsigned long long payload;
177
178 /**
179  * Identity of the task that is used to delete
180  * expired content.
181  */
182 static struct GNUNET_SCHEDULER_Task * expired_kill_task;
183
184 /**
185  * Minimum time that content should have to not be discarded instantly
186  * (time stamp of any content that we've been discarding recently to
187  * stay below the quota).  FOREVER if we had to expire content with
188  * non-zero priority.
189  */
190 static struct GNUNET_TIME_Absolute min_expiration;
191
192 /**
193  * How much space are we allowed to use?
194  */
195 static unsigned long long quota;
196
197 /**
198  * Should the database be dropped on exit?
199  */
200 static int do_drop;
201
202 /**
203  * Should we refresh the BF when the DB is loaded?
204  */
205 static int refresh_bf;
206
207 /**
208  * Number of updates that were made to the
209  * payload value since we last synchronized
210  * it with the statistics service.
211  */
212 static unsigned int last_sync;
213
214 /**
215  * Did we get an answer from statistics?
216  */
217 static int stats_worked;
218
219
220 /**
221  * Synchronize our utilization statistics with the
222  * statistics service.
223  */
224 static void
225 sync_stats ()
226 {
227   GNUNET_STATISTICS_set (stats,
228                          quota_stat_name,
229                          payload,
230                          GNUNET_YES);
231   GNUNET_STATISTICS_set (stats,
232                          "# utilization by current datastore",
233                          payload,
234                          GNUNET_NO);
235   last_sync = 0;
236 }
237
238
239 /**
240  * Context for transmitting replies to clients.
241  */
242 struct TransmitCallbackContext
243 {
244
245   /**
246    * We keep these in a doubly-linked list (for cleanup).
247    */
248   struct TransmitCallbackContext *next;
249
250   /**
251    * We keep these in a doubly-linked list (for cleanup).
252    */
253   struct TransmitCallbackContext *prev;
254
255   /**
256    * The message that we're asked to transmit.
257    */
258   struct GNUNET_MessageHeader *msg;
259
260   /**
261    * Handle for the transmission request.
262    */
263   struct GNUNET_SERVER_TransmitHandle *th;
264
265   /**
266    * Client that we are transmitting to.
267    */
268   struct GNUNET_SERVER_Client *client;
269
270 };
271
272
273 /**
274  * Head of the doubly-linked list (for cleanup).
275  */
276 static struct TransmitCallbackContext *tcc_head;
277
278 /**
279  * Tail of the doubly-linked list (for cleanup).
280  */
281 static struct TransmitCallbackContext *tcc_tail;
282
283 /**
284  * Have we already cleaned up the TCCs and are hence no longer
285  * willing (or able) to transmit anything to anyone?
286  */
287 static int cleaning_done;
288
289 /**
290  * Handle for pending get request.
291  */
292 static struct GNUNET_STATISTICS_GetHandle *stat_get;
293
294 /**
295  * Handle to our server.
296  */
297 static struct GNUNET_SERVER_Handle *server;
298
299 /**
300  * Task that is used to remove expired entries from
301  * the datastore.  This task will schedule itself
302  * again automatically to always delete all expired
303  * content quickly.
304  *
305  * @param cls not used
306  */
307 static void
308 delete_expired (void *cls);
309
310
311 /**
312  * Iterate over the expired items stored in the datastore.
313  * Delete all expired items; once we have processed all
314  * expired items, re-schedule the "delete_expired" task.
315  *
316  * @param cls not used
317  * @param key key for the content
318  * @param size number of bytes in data
319  * @param data content stored
320  * @param type type of the content
321  * @param priority priority of the content
322  * @param anonymity anonymity-level for the content
323  * @param expiration expiration time for the content
324  * @param uid unique identifier for the datum;
325  *        maybe 0 if no unique identifier is available
326  *
327  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
328  *         (continue on call to "next", of course),
329  *         #GNUNET_NO to delete the item and continue (if supported)
330  */
331 static int
332 expired_processor (void *cls,
333                    const struct GNUNET_HashCode *key,
334                    uint32_t size,
335                    const void *data,
336                    enum GNUNET_BLOCK_Type type,
337                    uint32_t priority,
338                    uint32_t anonymity,
339                    struct GNUNET_TIME_Absolute expiration,
340                    uint64_t uid)
341 {
342   struct GNUNET_TIME_Absolute now;
343
344   if (key == NULL)
345   {
346     expired_kill_task =
347         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
348                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
349                                                     &delete_expired, NULL);
350     return GNUNET_SYSERR;
351   }
352   now = GNUNET_TIME_absolute_get ();
353   if (expiration.abs_value_us > now.abs_value_us)
354   {
355     /* finished processing */
356     expired_kill_task =
357         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
358                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
359                                                     &delete_expired, NULL);
360     return GNUNET_SYSERR;
361   }
362   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
363               "Deleting content `%s' of type %u that expired %s ago\n",
364               GNUNET_h2s (key), type,
365               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_difference (expiration,
366                                                                                            now),
367                                                       GNUNET_YES));
368   min_expiration = now;
369   GNUNET_STATISTICS_update (stats,
370                             gettext_noop ("# bytes expired"),
371                             size,
372                             GNUNET_YES);
373   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
374   expired_kill_task =
375       GNUNET_SCHEDULER_add_delayed_with_priority (MIN_EXPIRE_DELAY,
376                                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
377                                                   &delete_expired, NULL);
378   return GNUNET_NO;
379 }
380
381
382 /**
383  * Task that is used to remove expired entries from
384  * the datastore.  This task will schedule itself
385  * again automatically to always delete all expired
386  * content quickly.
387  *
388  * @param cls not used
389  */
390 static void
391 delete_expired (void *cls)
392 {
393   expired_kill_task = NULL;
394   plugin->api->get_expiration (plugin->api->cls,
395                                &expired_processor,
396                                NULL);
397 }
398
399
400 /**
401  * An iterator over a set of items stored in the datastore
402  * that deletes until we're happy with respect to our quota.
403  *
404  * @param cls closure
405  * @param key key for the content
406  * @param size number of bytes in data
407  * @param data content stored
408  * @param type type of the content
409  * @param priority priority of the content
410  * @param anonymity anonymity-level for the content
411  * @param expiration expiration time for the content
412  * @param uid unique identifier for the datum;
413  *        maybe 0 if no unique identifier is available
414  *
415  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
416  *         (continue on call to "next", of course),
417  *         GNUNET_NO to delete the item and continue (if supported)
418  */
419 static int
420 quota_processor (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
421                  const void *data, enum GNUNET_BLOCK_Type type,
422                  uint32_t priority, uint32_t anonymity,
423                  struct GNUNET_TIME_Absolute expiration, uint64_t uid)
424 {
425   unsigned long long *need = cls;
426
427   if (NULL == key)
428     return GNUNET_SYSERR;
429   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
430               "Deleting %llu bytes of low-priority (%u) content `%s' of type %u at %s prior to expiration (still trying to free another %llu bytes)\n",
431               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
432               (unsigned int) priority,
433               GNUNET_h2s (key), type,
434               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
435                                                       GNUNET_YES),
436               *need);
437   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
438     *need = 0;
439   else
440     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
441   if (priority > 0)
442     min_expiration = GNUNET_TIME_UNIT_FOREVER_ABS;
443   else
444     min_expiration = expiration;
445   GNUNET_STATISTICS_update (stats,
446                             gettext_noop ("# bytes purged (low-priority)"),
447                             size, GNUNET_YES);
448   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
449   return GNUNET_NO;
450 }
451
452
453 /**
454  * Manage available disk space by running tasks
455  * that will discard content if necessary.  This
456  * function will be run whenever a request for
457  * "need" bytes of storage could only be satisfied
458  * by eating into the "cache" (and we want our cache
459  * space back).
460  *
461  * @param need number of bytes of content that were
462  *        placed into the "cache" (and hence the
463  *        number of bytes that should be removed).
464  */
465 static void
466 manage_space (unsigned long long need)
467 {
468   unsigned long long last;
469
470   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471               "Asked to free up %llu bytes of cache space\n", need);
472   last = 0;
473   while ((need > 0) && (last != need))
474   {
475     last = need;
476     plugin->api->get_expiration (plugin->api->cls, &quota_processor, &need);
477   }
478 }
479
480
481 /**
482  * Function called to notify a client about the socket
483  * begin ready to queue more data.  "buf" will be
484  * NULL and "size" zero if the socket was closed for
485  * writing in the meantime.
486  *
487  * @param cls closure
488  * @param size number of bytes available in buf
489  * @param buf where the callee should write the message
490  * @return number of bytes written to buf
491  */
492 static size_t
493 transmit_callback (void *cls, size_t size, void *buf)
494 {
495   struct TransmitCallbackContext *tcc = cls;
496   size_t msize;
497
498   tcc->th = NULL;
499   GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
500   msize = ntohs (tcc->msg->size);
501   if (size == 0)
502   {
503     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
504                 _("Transmission to client failed!\n"));
505     GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
506     GNUNET_SERVER_client_drop (tcc->client);
507     GNUNET_free (tcc->msg);
508     GNUNET_free (tcc);
509     return 0;
510   }
511   GNUNET_assert (size >= msize);
512   memcpy (buf, tcc->msg, msize);
513   GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
514   GNUNET_SERVER_client_drop (tcc->client);
515   GNUNET_free (tcc->msg);
516   GNUNET_free (tcc);
517   return msize;
518 }
519
520
521 /**
522  * Transmit the given message to the client.
523  *
524  * @param client target of the message
525  * @param msg message to transmit, will be freed!
526  */
527 static void
528 transmit (struct GNUNET_SERVER_Client *client, struct GNUNET_MessageHeader *msg)
529 {
530   struct TransmitCallbackContext *tcc;
531
532   if (GNUNET_YES == cleaning_done)
533   {
534     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
535                 _("Shutdown in progress, aborting transmission.\n"));
536     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
537     GNUNET_free (msg);
538     return;
539   }
540   tcc = GNUNET_new (struct TransmitCallbackContext);
541   tcc->msg = msg;
542   tcc->client = client;
543   if (NULL ==
544       (tcc->th =
545        GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
546                                             GNUNET_TIME_UNIT_FOREVER_REL,
547                                             &transmit_callback, tcc)))
548   {
549     GNUNET_break (0);
550     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
551     GNUNET_free (msg);
552     GNUNET_free (tcc);
553     return;
554   }
555   GNUNET_SERVER_client_keep (client);
556   GNUNET_CONTAINER_DLL_insert (tcc_head, tcc_tail, tcc);
557 }
558
559
560 /**
561  * Transmit a status code to the client.
562  *
563  * @param client receiver of the response
564  * @param code status code
565  * @param msg optional error message (can be NULL)
566  */
567 static void
568 transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg)
569 {
570   struct StatusMessage *sm;
571   size_t slen;
572
573   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
574               "Transmitting `%s' message with value %d and message `%s'\n",
575               "STATUS", code, msg != NULL ? msg : "(none)");
576   slen = (msg == NULL) ? 0 : strlen (msg) + 1;
577   sm = GNUNET_malloc (sizeof (struct StatusMessage) + slen);
578   sm->header.size = htons (sizeof (struct StatusMessage) + slen);
579   sm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
580   sm->status = htonl (code);
581   sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration);
582   if (slen > 0)
583     memcpy (&sm[1], msg, slen);
584   transmit (client, &sm->header);
585 }
586
587
588 /**
589  * Function that will transmit the given datastore entry
590  * to the client.
591  *
592  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
593  * @param key key for the content
594  * @param size number of bytes in data
595  * @param data content stored
596  * @param type type of the content
597  * @param priority priority of the content
598  * @param anonymity anonymity-level for the content
599  * @param expiration expiration time for the content
600  * @param uid unique identifier for the datum;
601  *        maybe 0 if no unique identifier is available
602  *
603  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
604  *         GNUNET_NO to delete the item and continue (if supported)
605  */
606 static int
607 transmit_item (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
608                const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
609                uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
610                uint64_t uid)
611 {
612   struct GNUNET_SERVER_Client *client = cls;
613   struct GNUNET_MessageHeader *end;
614   struct DataMessage *dm;
615
616   if (key == NULL)
617   {
618     /* transmit 'DATA_END' */
619     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' message\n",
620                 "DATA_END");
621     end = GNUNET_new (struct GNUNET_MessageHeader);
622     end->size = htons (sizeof (struct GNUNET_MessageHeader));
623     end->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
624     transmit (client, end);
625     GNUNET_SERVER_client_drop (client);
626     return GNUNET_OK;
627   }
628   GNUNET_assert (sizeof (struct DataMessage) + size <
629                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
630   dm = GNUNET_malloc (sizeof (struct DataMessage) + size);
631   dm->header.size = htons (sizeof (struct DataMessage) + size);
632   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
633   dm->rid = htonl (0);
634   dm->size = htonl (size);
635   dm->type = htonl (type);
636   dm->priority = htonl (priority);
637   dm->anonymity = htonl (anonymity);
638   dm->replication = htonl (0);
639   dm->reserved = htonl (0);
640   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
641   dm->uid = GNUNET_htonll (uid);
642   dm->key = *key;
643   memcpy (&dm[1], data, size);
644   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
645               "Transmitting `%s' message for `%s' of type %u with expiration %s (in: %s)\n",
646               "DATA", GNUNET_h2s (key), type,
647               GNUNET_STRINGS_absolute_time_to_string (expiration),
648               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
649                                                       GNUNET_YES));
650   GNUNET_STATISTICS_update (stats,
651                             gettext_noop ("# results found"),
652                             1,
653                             GNUNET_NO);
654   transmit (client, &dm->header);
655   GNUNET_SERVER_client_drop (client);
656   return GNUNET_OK;
657 }
658
659
660 /**
661  * Handle RESERVE-message.
662  *
663  * @param cls closure
664  * @param client identification of the client
665  * @param message the actual message
666  */
667 static void
668 handle_reserve (void *cls, struct GNUNET_SERVER_Client *client,
669                 const struct GNUNET_MessageHeader *message)
670 {
671   /**
672    * Static counter to produce reservation identifiers.
673    */
674   static int reservation_gen;
675
676   const struct ReserveMessage *msg = (const struct ReserveMessage *) message;
677   struct ReservationList *e;
678   unsigned long long used;
679   unsigned long long req;
680   uint64_t amount;
681   uint32_t entries;
682
683   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
684               "Processing RESERVE request\n");
685   amount = GNUNET_ntohll (msg->amount);
686   entries = ntohl (msg->entries);
687   used = payload + reserved;
688   req =
689       amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
690   if (used + req > quota)
691   {
692     if (quota < used)
693       used = quota;             /* cheat a bit for error message (to avoid negative numbers) */
694     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
695                 _
696                 ("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
697                 quota - used, "RESERVE", req);
698     if (cache_size < req)
699     {
700       /* TODO: document this in the FAQ; essentially, if this
701        * message happens, the insertion request could be blocked
702        * by less-important content from migration because it is
703        * larger than 1/8th of the overall available space, and
704        * we only reserve 1/8th for "fresh" insertions */
705       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
706                   _
707                   ("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
708                   req, cache_size);
709       transmit_status (client, 0,
710                        gettext_noop
711                        ("Insufficient space to satisfy request and "
712                         "requested amount is larger than cache size"));
713     }
714     else
715     {
716       transmit_status (client, 0,
717                        gettext_noop ("Insufficient space to satisfy request"));
718     }
719     return;
720   }
721   reserved += req;
722   GNUNET_STATISTICS_set (stats,
723                          gettext_noop ("# reserved"),
724                          reserved,
725                          GNUNET_NO);
726   e = GNUNET_new (struct ReservationList);
727   e->next = reservations;
728   reservations = e;
729   e->client = client;
730   e->amount = amount;
731   e->entries = entries;
732   e->rid = ++reservation_gen;
733   if (reservation_gen < 0)
734     reservation_gen = 0;        /* wrap around */
735   transmit_status (client, e->rid, NULL);
736 }
737
738
739 /**
740  * Handle RELEASE_RESERVE-message.
741  *
742  * @param cls closure
743  * @param client identification of the client
744  * @param message the actual message
745  */
746 static void
747 handle_release_reserve (void *cls,
748                         struct GNUNET_SERVER_Client *client,
749                         const struct GNUNET_MessageHeader *message)
750 {
751   const struct ReleaseReserveMessage *msg =
752       (const struct ReleaseReserveMessage *) message;
753   struct ReservationList *pos;
754   struct ReservationList *prev;
755   struct ReservationList *next;
756   int rid = ntohl (msg->rid);
757   unsigned long long rem;
758
759   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
760               "Processing RELEASE_RESERVE request\n");
761   next = reservations;
762   prev = NULL;
763   while (NULL != (pos = next))
764   {
765     next = pos->next;
766     if (rid == pos->rid)
767     {
768       if (prev == NULL)
769         reservations = next;
770       else
771         prev->next = next;
772       rem =
773           pos->amount +
774           ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
775       GNUNET_assert (reserved >= rem);
776       reserved -= rem;
777       GNUNET_STATISTICS_set (stats,
778                              gettext_noop ("# reserved"),
779                              reserved,
780                              GNUNET_NO);
781       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782                   "Returning %llu remaining reserved bytes to storage pool\n",
783                   rem);
784       GNUNET_free (pos);
785       transmit_status (client, GNUNET_OK, NULL);
786       return;
787     }
788     prev = pos;
789   }
790   GNUNET_break (0);
791   transmit_status (client, GNUNET_SYSERR,
792                    gettext_noop ("Could not find matching reservation"));
793 }
794
795
796 /**
797  * Check that the given message is a valid data message.
798  *
799  * @return NULL if the message is not well-formed, otherwise the message
800  */
801 static const struct DataMessage *
802 check_data (const struct GNUNET_MessageHeader *message)
803 {
804   uint16_t size;
805   uint32_t dsize;
806   const struct DataMessage *dm;
807
808   size = ntohs (message->size);
809   if (size < sizeof (struct DataMessage))
810   {
811     GNUNET_break (0);
812     return NULL;
813   }
814   dm = (const struct DataMessage *) message;
815   dsize = ntohl (dm->size);
816   if (size != dsize + sizeof (struct DataMessage))
817   {
818     GNUNET_break (0);
819     return NULL;
820   }
821   return dm;
822 }
823
824
825 /**
826  * Context for a PUT request used to see if the content is
827  * already present.
828  */
829 struct PutContext
830 {
831   /**
832    * Client to notify on completion.
833    */
834   struct GNUNET_SERVER_Client *client;
835
836 #if ! HAVE_UNALIGNED_64_ACCESS
837   void *reserved;
838 #endif
839
840   /* followed by the 'struct DataMessage' */
841 };
842
843
844 /**
845  * Put continuation.
846  *
847  * @param cls closure
848  * @param key key for the item stored
849  * @param size size of the item stored
850  * @param status #GNUNET_OK or #GNUNET_SYSERROR
851  * @param msg error message on error
852  */
853 static void
854 put_continuation (void *cls,
855                   const struct GNUNET_HashCode *key,
856                   uint32_t size,
857                   int status,
858                   const char *msg)
859 {
860   struct PutContext *pc = cls;
861
862   if (GNUNET_OK == status)
863   {
864     GNUNET_STATISTICS_update (stats,
865                               gettext_noop ("# bytes stored"),
866                               size,
867                               GNUNET_YES);
868     GNUNET_CONTAINER_bloomfilter_add (filter, key);
869     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
870                 "Successfully stored %u bytes under key `%s'\n",
871                 size, GNUNET_h2s (key));
872   }
873   transmit_status (pc->client, status, msg);
874   GNUNET_SERVER_client_drop (pc->client);
875   GNUNET_free (pc);
876   if (quota - reserved - cache_size < payload)
877   {
878     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
879                 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
880                 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
881                 (unsigned long long) (quota - reserved - cache_size),
882                 (unsigned long long) payload);
883     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
884   }
885 }
886
887
888 /**
889  * Actually put the data message.
890  *
891  * @param pc put context
892  */
893 static void
894 execute_put (struct PutContext *pc)
895 {
896   const struct DataMessage *dm;
897
898   dm = (const struct DataMessage *) &pc[1];
899   plugin->api->put (plugin->api->cls, &dm->key, ntohl (dm->size), &dm[1],
900                     ntohl (dm->type), ntohl (dm->priority),
901                     ntohl (dm->anonymity), ntohl (dm->replication),
902                     GNUNET_TIME_absolute_ntoh (dm->expiration),
903                     &put_continuation, pc);
904 }
905
906
907 /**
908  *
909  * @param cls closure
910  * @param status #GNUNET_OK or #GNUNET_SYSERR
911  * @param msg error message on error
912  */
913 static void
914 check_present_continuation (void *cls,
915                             int status,
916                             const char *msg)
917 {
918   struct GNUNET_SERVER_Client *client = cls;
919
920   transmit_status (client, GNUNET_NO, NULL);
921   GNUNET_SERVER_client_drop (client);
922 }
923
924
925 /**
926  * Function that will check if the given datastore entry
927  * matches the put and if none match executes the put.
928  *
929  * @param cls closure, pointer to the client (of type `struct PutContext`).
930  * @param key key for the content
931  * @param size number of bytes in data
932  * @param data content stored
933  * @param type type of the content
934  * @param priority priority of the content
935  * @param anonymity anonymity-level for the content
936  * @param expiration expiration time for the content
937  * @param uid unique identifier for the datum;
938  *        maybe 0 if no unique identifier is available
939  * @return #GNUNET_OK usually
940  *         #GNUNET_NO to delete the item
941  */
942 static int
943 check_present (void *cls,
944                const struct GNUNET_HashCode *key,
945                uint32_t size,
946                const void *data,
947                enum GNUNET_BLOCK_Type type,
948                uint32_t priority,
949                uint32_t anonymity,
950                struct GNUNET_TIME_Absolute expiration,
951                uint64_t uid)
952 {
953   struct PutContext *pc = cls;
954   const struct DataMessage *dm;
955
956   dm = (const struct DataMessage *) &pc[1];
957   if (key == NULL)
958   {
959     execute_put (pc);
960     return GNUNET_OK;
961   }
962   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
963        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
964        ( (size == ntohl (dm->size)) &&
965          (0 == memcmp (&dm[1], data, size)) ) )
966   {
967     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
968                 "Result already present in datastore\n");
969     /* FIXME: change API to allow increasing 'replication' counter */
970     if ((ntohl (dm->priority) > 0) ||
971         (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value_us >
972          expiration.abs_value_us))
973       plugin->api->update (plugin->api->cls,
974                            uid,
975                            (int32_t) ntohl (dm->priority),
976                            GNUNET_TIME_absolute_ntoh (dm->expiration),
977                            &check_present_continuation,
978                            pc->client);
979     else
980     {
981       transmit_status (pc->client, GNUNET_NO, NULL);
982       GNUNET_SERVER_client_drop (pc->client);
983     }
984     GNUNET_free (pc);
985   }
986   else
987   {
988     execute_put (pc);
989   }
990   return GNUNET_OK;
991 }
992
993
994 /**
995  * Handle PUT-message.
996  *
997  * @param cls closure
998  * @param client identification of the client
999  * @param message the actual message
1000  */
1001 static void
1002 handle_put (void *cls,
1003             struct GNUNET_SERVER_Client *client,
1004             const struct GNUNET_MessageHeader *message)
1005 {
1006   const struct DataMessage *dm = check_data (message);
1007   int rid;
1008   struct ReservationList *pos;
1009   struct PutContext *pc;
1010   struct GNUNET_HashCode vhash;
1011   uint32_t size;
1012
1013   if ((dm == NULL) || (ntohl (dm->type) == 0))
1014   {
1015     GNUNET_break (0);
1016     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1017     return;
1018   }
1019   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020               "Processing PUT request for `%s' of type %u\n",
1021               GNUNET_h2s (&dm->key),
1022               ntohl (dm->type));
1023   rid = ntohl (dm->rid);
1024   size = ntohl (dm->size);
1025   if (rid > 0)
1026   {
1027     pos = reservations;
1028     while ((NULL != pos) && (rid != pos->rid))
1029       pos = pos->next;
1030     GNUNET_break (pos != NULL);
1031     if (NULL != pos)
1032     {
1033       GNUNET_break (pos->entries > 0);
1034       GNUNET_break (pos->amount >= size);
1035       pos->entries--;
1036       pos->amount -= size;
1037       reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1038       GNUNET_STATISTICS_set (stats,
1039                              gettext_noop ("# reserved"),
1040                              reserved,
1041                              GNUNET_NO);
1042     }
1043   }
1044   pc = GNUNET_malloc (sizeof (struct PutContext) + size +
1045                       sizeof (struct DataMessage));
1046   pc->client = client;
1047   GNUNET_SERVER_client_keep (client);
1048   memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1049   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter, &dm->key))
1050   {
1051     GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
1052     plugin->api->get_key (plugin->api->cls,
1053                           0,
1054                           &dm->key,
1055                           &vhash,
1056                           ntohl (dm->type),
1057                           &check_present,
1058                           pc);
1059     return;
1060   }
1061   execute_put (pc);
1062 }
1063
1064
1065 /**
1066  * Handle GET-message.
1067  *
1068  * @param cls closure
1069  * @param client identification of the client
1070  * @param message the actual message
1071  */
1072 static void
1073 handle_get (void *cls,
1074             struct GNUNET_SERVER_Client *client,
1075             const struct GNUNET_MessageHeader *message)
1076 {
1077   const struct GetMessage *msg;
1078   uint16_t size;
1079
1080   size = ntohs (message->size);
1081   if ((size != sizeof (struct GetMessage)) &&
1082       (size != sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode)))
1083   {
1084     GNUNET_break (0);
1085     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1086     return;
1087   }
1088   msg = (const struct GetMessage *) message;
1089   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1090               "Processing GET request for `%s' of type %u\n",
1091               GNUNET_h2s (&msg->key),
1092               ntohl (msg->type));
1093   GNUNET_STATISTICS_update (stats,
1094                             gettext_noop ("# GET requests received"),
1095                             1,
1096                             GNUNET_NO);
1097   GNUNET_SERVER_client_keep (client);
1098   if ( (size == sizeof (struct GetMessage)) &&
1099        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)) )
1100   {
1101     /* don't bother database... */
1102     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103                 "Empty result set for GET request for `%s' (bloomfilter).\n",
1104                 GNUNET_h2s (&msg->key));
1105     GNUNET_STATISTICS_update (stats,
1106                               gettext_noop
1107                               ("# requests filtered by bloomfilter"),
1108                               1,
1109                               GNUNET_NO);
1110     transmit_item (client, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1111                    0);
1112     return;
1113   }
1114   plugin->api->get_key (plugin->api->cls, GNUNET_ntohll (msg->offset),
1115                         ((size ==
1116                           sizeof (struct GetMessage)) ? &msg->key : NULL), NULL,
1117                         ntohl (msg->type), &transmit_item, client);
1118 }
1119
1120
1121 /**
1122  * Function called with the result of an update operation.
1123  *
1124  * @param cls closure
1125  * @param status #GNUNET_OK or #GNUNET_SYSERR
1126  * @param msg error message on error
1127  */
1128 static void
1129 update_continuation (void *cls,
1130                      int status,
1131                      const char *msg)
1132 {
1133   struct GNUNET_SERVER_Client *client = cls;
1134
1135   transmit_status (client, status, msg);
1136   GNUNET_SERVER_client_drop (client);
1137 }
1138
1139
1140 /**
1141  * Handle UPDATE-message.
1142  *
1143  * @param cls closure
1144  * @param client identification of the client
1145  * @param message the actual message
1146  */
1147 static void
1148 handle_update (void *cls,
1149                struct GNUNET_SERVER_Client *client,
1150                const struct GNUNET_MessageHeader *message)
1151 {
1152   const struct UpdateMessage *msg;
1153
1154   GNUNET_STATISTICS_update (stats,
1155                             gettext_noop ("# UPDATE requests received"),
1156                             1,
1157                             GNUNET_NO);
1158   msg = (const struct UpdateMessage *) message;
1159   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1160               "Processing UPDATE request for %llu\n",
1161               (unsigned long long) GNUNET_ntohll (msg->uid));
1162   GNUNET_SERVER_client_keep (client);
1163   plugin->api->update (plugin->api->cls,
1164                        GNUNET_ntohll (msg->uid),
1165                        (int32_t) ntohl (msg->priority),
1166                        GNUNET_TIME_absolute_ntoh (msg->expiration),
1167                        &update_continuation, client);
1168 }
1169
1170
1171 /**
1172  * Handle GET_REPLICATION-message.
1173  *
1174  * @param cls closure
1175  * @param client identification of the client
1176  * @param message the actual message
1177  */
1178 static void
1179 handle_get_replication (void *cls,
1180                         struct GNUNET_SERVER_Client *client,
1181                         const struct GNUNET_MessageHeader *message)
1182 {
1183   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1184               "Processing `%s' request\n",
1185               "GET_REPLICATION");
1186   GNUNET_STATISTICS_update (stats,
1187                             gettext_noop ("# GET REPLICATION requests received"),
1188                             1,
1189                             GNUNET_NO);
1190   GNUNET_SERVER_client_keep (client);
1191   plugin->api->get_replication (plugin->api->cls,
1192                                 &transmit_item, client);
1193 }
1194
1195
1196 /**
1197  * Handle GET_ZERO_ANONYMITY-message.
1198  *
1199  * @param cls closure
1200  * @param client identification of the client
1201  * @param message the actual message
1202  */
1203 static void
1204 handle_get_zero_anonymity (void *cls,
1205                            struct GNUNET_SERVER_Client *client,
1206                            const struct GNUNET_MessageHeader *message)
1207 {
1208   const struct GetZeroAnonymityMessage *msg =
1209       (const struct GetZeroAnonymityMessage *) message;
1210   enum GNUNET_BLOCK_Type type;
1211
1212   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1213   if (type == GNUNET_BLOCK_TYPE_ANY)
1214   {
1215     GNUNET_break (0);
1216     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1217     return;
1218   }
1219   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1220               "Processing `%s' request\n",
1221               "GET_ZERO_ANONYMITY");
1222   GNUNET_STATISTICS_update (stats,
1223                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1224                             1,
1225                             GNUNET_NO);
1226   GNUNET_SERVER_client_keep (client);
1227   plugin->api->get_zero_anonymity (plugin->api->cls,
1228                                    GNUNET_ntohll (msg->offset),
1229                                    type,
1230                                    &transmit_item, client);
1231 }
1232
1233
1234 /**
1235  * Callback function that will cause the item that is passed
1236  * in to be deleted (by returning #GNUNET_NO).
1237  *
1238  * @param cls closure
1239  * @param key key for the content
1240  * @param size number of bytes in data
1241  * @param data content stored
1242  * @param type type of the content
1243  * @param priority priority of the content
1244  * @param anonymity anonymity-level for the content
1245  * @param expiration expiration time for the content
1246  * @param uid unique identifier for the datum
1247  * @return #GNUNET_OK to keep the item
1248  *         #GNUNET_NO to delete the item
1249  */
1250 static int
1251 remove_callback (void *cls,
1252                  const struct GNUNET_HashCode *key,
1253                  uint32_t size,
1254                  const void *data,
1255                  enum GNUNET_BLOCK_Type type,
1256                  uint32_t priority,
1257                  uint32_t anonymity,
1258                  struct GNUNET_TIME_Absolute expiration,
1259                  uint64_t uid)
1260 {
1261   struct GNUNET_SERVER_Client *client = cls;
1262
1263   if (NULL == key)
1264   {
1265     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266                 "No further matches for REMOVE request.\n");
1267     transmit_status (client,
1268                      GNUNET_NO,
1269                      _("Content not found"));
1270     GNUNET_SERVER_client_drop (client);
1271     return GNUNET_OK;           /* last item */
1272   }
1273   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1274               "Item %llu matches REMOVE request for key `%s' and type %u.\n",
1275               (unsigned long long) uid,
1276               GNUNET_h2s (key),
1277               type);
1278   GNUNET_STATISTICS_update (stats,
1279                             gettext_noop ("# bytes removed (explicit request)"),
1280                             size,
1281                             GNUNET_YES);
1282   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
1283   transmit_status (client, GNUNET_OK, NULL);
1284   GNUNET_SERVER_client_drop (client);
1285   return GNUNET_NO;
1286 }
1287
1288
1289 /**
1290  * Handle REMOVE-message.
1291  *
1292  * @param cls closure
1293  * @param client identification of the client
1294  * @param message the actual message
1295  */
1296 static void
1297 handle_remove (void *cls,
1298                struct GNUNET_SERVER_Client *client,
1299                const struct GNUNET_MessageHeader *message)
1300 {
1301   const struct DataMessage *dm = check_data (message);
1302   struct GNUNET_HashCode vhash;
1303
1304   if (NULL == dm)
1305   {
1306     GNUNET_break (0);
1307     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1308     return;
1309   }
1310   GNUNET_STATISTICS_update (stats,
1311                             gettext_noop ("# REMOVE requests received"),
1312                             1, GNUNET_NO);
1313   GNUNET_SERVER_client_keep (client);
1314   GNUNET_CRYPTO_hash (&dm[1],
1315                       ntohl (dm->size),
1316                       &vhash);
1317   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1318               "Processing REMOVE request for `%s' of type %u\n",
1319               GNUNET_h2s (&dm->key),
1320               ntohl (dm->type));
1321   plugin->api->get_key (plugin->api->cls,
1322                         0,
1323                         &dm->key,
1324                         &vhash,
1325                         (enum GNUNET_BLOCK_Type) ntohl (dm->type),
1326                         &remove_callback, client);
1327 }
1328
1329
1330 /**
1331  * Handle DROP-message.
1332  *
1333  * @param cls closure
1334  * @param client identification of the client
1335  * @param message the actual message
1336  */
1337 static void
1338 handle_drop (void *cls,
1339              struct GNUNET_SERVER_Client *client,
1340              const struct GNUNET_MessageHeader *message)
1341 {
1342   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1343               "Processing DROP request\n");
1344   do_drop = GNUNET_YES;
1345   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1346 }
1347
1348
1349 /**
1350  * Function called by plugins to notify us about a
1351  * change in their disk utilization.
1352  *
1353  * @param cls closure (NULL)
1354  * @param delta change in disk utilization,
1355  *        0 for "reset to empty"
1356  */
1357 static void
1358 disk_utilization_change_cb (void *cls,
1359                             int delta)
1360 {
1361   if ((delta < 0) && (payload < -delta))
1362   {
1363     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1364                 _("Datastore payload must have been inaccurate (%lld < %lld). Recomputing it.\n"),
1365                 (long long) payload,
1366                 (long long) -delta);
1367     plugin->api->estimate_size (plugin->api->cls, &payload);
1368     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1369                 _("New payload: %lld\n"),
1370                 (long long) payload);
1371      sync_stats ();
1372     return;
1373   }
1374   payload += delta;
1375   last_sync++;
1376   if (last_sync >= MAX_STAT_SYNC_LAG)
1377     sync_stats ();
1378 }
1379
1380
1381 /**
1382  * Callback function to process statistic values.
1383  *
1384  * @param cls closure (struct Plugin*)
1385  * @param subsystem name of subsystem that created the statistic
1386  * @param name the name of the datum
1387  * @param value the current value
1388  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
1389  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
1390  */
1391 static int
1392 process_stat_in (void *cls,
1393                  const char *subsystem,
1394                  const char *name,
1395                  uint64_t value,
1396                  int is_persistent)
1397 {
1398   GNUNET_assert (GNUNET_NO == stats_worked);
1399   stats_worked = GNUNET_YES;
1400   payload += value;
1401   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1402               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1403               (unsigned long long) value,
1404               (unsigned long long) payload);
1405   return GNUNET_OK;
1406 }
1407
1408
1409 /**
1410  * Load the datastore plugin.
1411  */
1412 static struct DatastorePlugin *
1413 load_plugin ()
1414 {
1415   struct DatastorePlugin *ret;
1416   char *libname;
1417
1418   ret = GNUNET_new (struct DatastorePlugin);
1419   ret->env.cfg = cfg;
1420   ret->env.duc = &disk_utilization_change_cb;
1421   ret->env.cls = NULL;
1422   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1423               _("Loading `%s' datastore plugin\n"),
1424               plugin_name);
1425   GNUNET_asprintf (&libname,
1426                    "libgnunet_plugin_datastore_%s",
1427                    plugin_name);
1428   ret->short_name = GNUNET_strdup (plugin_name);
1429   ret->lib_name = libname;
1430   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1431   if (NULL == ret->api)
1432   {
1433     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1434                 _("Failed to load datastore plugin for `%s'\n"),
1435                 plugin_name);
1436     GNUNET_free (ret->short_name);
1437     GNUNET_free (libname);
1438     GNUNET_free (ret);
1439     return NULL;
1440   }
1441   return ret;
1442 }
1443
1444
1445 /**
1446  * Function called when the service shuts
1447  * down.  Unloads our datastore plugin.
1448  *
1449  * @param plug plugin to unload
1450  */
1451 static void
1452 unload_plugin (struct DatastorePlugin *plug)
1453 {
1454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1455               "Datastore service is unloading plugin...\n");
1456   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1457   GNUNET_free (plug->lib_name);
1458   GNUNET_free (plug->short_name);
1459   GNUNET_free (plug);
1460 }
1461
1462
1463 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1464   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1465    sizeof (struct ReserveMessage)},
1466   {&handle_release_reserve, NULL,
1467    GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1468    sizeof (struct ReleaseReserveMessage)},
1469   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
1470   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1471    sizeof (struct UpdateMessage)},
1472   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
1473   {&handle_get_replication, NULL,
1474    GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1475    sizeof (struct GNUNET_MessageHeader)},
1476   {&handle_get_zero_anonymity, NULL,
1477    GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1478    sizeof (struct GetZeroAnonymityMessage)},
1479   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
1480   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1481    sizeof (struct GNUNET_MessageHeader)},
1482   {NULL, NULL, 0, 0}
1483 };
1484
1485
1486 /**
1487  * Adds a given @a key to the bloomfilter in @a cls @a count times.
1488  *
1489  * @param cls the bloomfilter
1490  * @param key key to add
1491  * @param count number of times to add key
1492  */
1493 static void
1494 add_key_to_bloomfilter (void *cls,
1495                         const struct GNUNET_HashCode *key,
1496                         unsigned int count)
1497 {
1498   struct GNUNET_CONTAINER_BloomFilter *bf = cls;
1499
1500   if (NULL == key)
1501   {
1502     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1503                 _("Bloomfilter construction complete.\n"));
1504     GNUNET_SERVER_add_handlers (server, handlers);
1505     GNUNET_SERVER_resume (server);
1506     expired_kill_task
1507       = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1508                                             &delete_expired,
1509                                             NULL);
1510     return;
1511   }
1512
1513   while (0 < count--)
1514     GNUNET_CONTAINER_bloomfilter_add (bf, key);
1515 }
1516
1517
1518 /**
1519  * We finished receiving the statistic.  Initialize the plugin; if
1520  * loading the statistic failed, run the estimator.
1521  *
1522  * @param cls NULL
1523  * @param success #GNUNET_NO if we failed to read the stat
1524  */
1525 static void
1526 process_stat_done (void *cls,
1527                    int success)
1528 {
1529
1530   stat_get = NULL;
1531   plugin = load_plugin ();
1532   if (NULL == plugin)
1533   {
1534     GNUNET_CONTAINER_bloomfilter_free (filter);
1535     filter = NULL;
1536     if (NULL != stats)
1537     {
1538       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1539       stats = NULL;
1540     }
1541     return;
1542   }
1543   if (GNUNET_NO == stats_worked)
1544   {
1545     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1546                 "Failed to obtain value from statistics service, recomputing it\n");
1547     plugin->api->estimate_size (plugin->api->cls,
1548                                 &payload);
1549   }
1550   if (GNUNET_YES == refresh_bf)
1551   {
1552     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1553                 _("Rebuilding bloomfilter.  Please be patient.\n"));
1554     if (NULL != plugin->api->get_keys)
1555     {
1556       plugin->api->get_keys (plugin->api->cls,
1557                              &add_key_to_bloomfilter,
1558                              filter);
1559       return;
1560     }
1561     else
1562       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1563                   _("Plugin does not support get_keys function. Please fix!\n"));
1564
1565     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1566                 _("Bloomfilter construction complete.\n"));
1567   }
1568
1569   GNUNET_SERVER_add_handlers (server, handlers);
1570   GNUNET_SERVER_resume (server);
1571   expired_kill_task
1572     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1573                                           &delete_expired,
1574                                           NULL);
1575 }
1576
1577
1578 /**
1579  * Task run during shutdown.
1580  */
1581 static void
1582 cleaning_task (void *cls)
1583 {
1584   struct TransmitCallbackContext *tcc;
1585
1586   cleaning_done = GNUNET_YES;
1587   while (NULL != (tcc = tcc_head))
1588   {
1589     GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
1590     if (tcc->th != NULL)
1591     {
1592       GNUNET_SERVER_notify_transmit_ready_cancel (tcc->th);
1593       GNUNET_SERVER_client_drop (tcc->client);
1594     }
1595     GNUNET_free (tcc->msg);
1596     GNUNET_free (tcc);
1597   }
1598   if (NULL != expired_kill_task)
1599   {
1600     GNUNET_SCHEDULER_cancel (expired_kill_task);
1601     expired_kill_task = NULL;
1602   }
1603   if (GNUNET_YES == do_drop)
1604     plugin->api->drop (plugin->api->cls);
1605   if (NULL != plugin)
1606   {
1607     unload_plugin (plugin);
1608     plugin = NULL;
1609   }
1610   if (NULL != filter)
1611   {
1612     GNUNET_CONTAINER_bloomfilter_free (filter);
1613     filter = NULL;
1614   }
1615   if (NULL != stat_get)
1616   {
1617     GNUNET_STATISTICS_get_cancel (stat_get);
1618     stat_get = NULL;
1619   }
1620   GNUNET_free_non_null (plugin_name);
1621   plugin_name = NULL;
1622   if (last_sync > 0)
1623     sync_stats ();
1624   if (NULL != stats)
1625   {
1626     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1627     stats = NULL;
1628   }
1629   GNUNET_free (quota_stat_name);
1630   quota_stat_name = NULL;
1631 }
1632
1633
1634 /**
1635  * Function that removes all active reservations made
1636  * by the given client and releases the space for other
1637  * requests.
1638  *
1639  * @param cls closure
1640  * @param client identification of the client
1641  */
1642 static void
1643 cleanup_reservations (void *cls,
1644                       struct GNUNET_SERVER_Client *client)
1645 {
1646   struct ReservationList *pos;
1647   struct ReservationList *prev;
1648   struct ReservationList *next;
1649
1650   if (NULL == client)
1651     return;
1652   prev = NULL;
1653   pos = reservations;
1654   while (NULL != pos)
1655   {
1656     next = pos->next;
1657     if (pos->client == client)
1658     {
1659       if (NULL == prev)
1660         reservations = next;
1661       else
1662         prev->next = next;
1663       reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1664       GNUNET_free (pos);
1665     }
1666     else
1667     {
1668       prev = pos;
1669     }
1670     pos = next;
1671   }
1672   GNUNET_STATISTICS_set (stats,
1673                          gettext_noop ("# reserved"),
1674                          reserved,
1675                          GNUNET_NO);
1676 }
1677
1678
1679 /**
1680  * Process datastore requests.
1681  *
1682  * @param cls closure
1683  * @param serv the initialized server
1684  * @param c configuration to use
1685  */
1686 static void
1687 run (void *cls,
1688      struct GNUNET_SERVER_Handle *serv,
1689      const struct GNUNET_CONFIGURATION_Handle *c)
1690 {
1691   char *fn;
1692   char *pfn;
1693   unsigned int bf_size;
1694
1695   server = serv;
1696   cfg = c;
1697   if (GNUNET_OK !=
1698       GNUNET_CONFIGURATION_get_value_string (cfg,
1699                                              "DATASTORE",
1700                                              "DATABASE",
1701                                              &plugin_name))
1702   {
1703     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1704                 _("No `%s' specified for `%s' in configuration!\n"),
1705                 "DATABASE",
1706                 "DATASTORE");
1707     return;
1708   }
1709   GNUNET_asprintf (&quota_stat_name,
1710                    _("# bytes used in file-sharing datastore `%s'"),
1711                    plugin_name);
1712   if (GNUNET_OK !=
1713       GNUNET_CONFIGURATION_get_value_size (cfg, "DATASTORE", "QUOTA", &quota))
1714   {
1715     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1716                 _("No `%s' specified for `%s' in configuration!\n"),
1717                 "QUOTA",
1718                 "DATASTORE");
1719     return;
1720   }
1721   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1722   GNUNET_STATISTICS_set (stats, gettext_noop ("# quota"), quota, GNUNET_NO);
1723   cache_size = quota / 8;       /* Or should we make this an option? */
1724   GNUNET_STATISTICS_set (stats, gettext_noop ("# cache size"), cache_size,
1725                          GNUNET_NO);
1726   if (quota / (32 * 1024LL) > (1 << 31))
1727     bf_size = (1 << 31);          /* absolute limit: ~2 GB, beyond that BF just won't help anyway */
1728   else
1729     bf_size = quota / (32 * 1024LL);         /* 8 bit per entry, 1 bit per 32 kb in DB */
1730   fn = NULL;
1731   if ((GNUNET_OK !=
1732        GNUNET_CONFIGURATION_get_value_filename (cfg,
1733                                                 "DATASTORE",
1734                                                 "BLOOMFILTER",
1735                                                 &fn)) ||
1736       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
1737   {
1738     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1739                 _("Could not use specified filename `%s' for bloomfilter.\n"),
1740                 NULL != fn ? fn : "");
1741     GNUNET_free_non_null (fn);
1742     fn = NULL;
1743   }
1744   if (NULL != fn)
1745   {
1746     GNUNET_asprintf (&pfn, "%s.%s", fn, plugin_name);
1747     if (GNUNET_YES == GNUNET_DISK_file_test (pfn))
1748     {
1749       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1750       if (NULL == filter)
1751       {
1752         /* file exists but not valid, remove and try again, but refresh */
1753         if (0 != UNLINK (pfn))
1754         {
1755           /* failed to remove, run without file */
1756           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1757                       _("Failed to remove bogus bloomfilter file `%s'\n"),
1758                       pfn);
1759           GNUNET_free (pfn);
1760           pfn = NULL;
1761           filter = GNUNET_CONTAINER_bloomfilter_load (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1762           refresh_bf = GNUNET_YES;
1763         }
1764         else
1765         {
1766           /* try again after remove */
1767           filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1768           refresh_bf = GNUNET_YES;
1769           if (NULL == filter)
1770           {
1771             /* failed yet again, give up on using file */
1772             GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1773                         _("Failed to remove bogus bloomfilter file `%s'\n"),
1774                         pfn);
1775             GNUNET_free (pfn);
1776             pfn = NULL;
1777             filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1778           }
1779         }
1780       }
1781       else
1782       {
1783         /* normal case: have an existing valid bf file, no need to refresh */
1784         refresh_bf = GNUNET_NO;
1785       }
1786     }
1787     else
1788     {
1789       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1790       refresh_bf = GNUNET_YES;
1791     }
1792     GNUNET_free (pfn);
1793   }
1794   else
1795   {
1796     filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);      /* approx. 3% false positives at max use */
1797     refresh_bf = GNUNET_YES;
1798   }
1799   GNUNET_free_non_null (fn);
1800   if (NULL == filter)
1801   {
1802     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1803                 _("Failed to initialize bloomfilter.\n"));
1804     if (NULL != stats)
1805     {
1806       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1807       stats = NULL;
1808     }
1809     return;
1810   }
1811   GNUNET_SERVER_suspend (server);
1812   stat_get =
1813       GNUNET_STATISTICS_get (stats,
1814                              "datastore",
1815                              quota_stat_name,
1816                              GNUNET_TIME_UNIT_SECONDS,
1817                              &process_stat_done,
1818                              &process_stat_in,
1819                              NULL);
1820   if (NULL == stat_get)
1821     process_stat_done (NULL, GNUNET_SYSERR);
1822   GNUNET_SERVER_disconnect_notify (server,
1823                                    &cleanup_reservations,
1824                                    NULL);
1825   GNUNET_SCHEDULER_add_shutdown (&cleaning_task,
1826                                  NULL);
1827 }
1828
1829
1830 /**
1831  * The main function for the datastore service.
1832  *
1833  * @param argc number of arguments from the command line
1834  * @param argv command line arguments
1835  * @return 0 ok, 1 on error
1836  */
1837 int
1838 main (int argc,
1839       char *const *argv)
1840 {
1841   int ret;
1842
1843   ret =
1844       (GNUNET_OK ==
1845        GNUNET_SERVICE_run (argc, argv, "datastore",
1846                            GNUNET_SERVICE_OPTION_NONE,
1847                            &run, NULL)) ? 0 : 1;
1848   return ret;
1849 }
1850
1851
1852 /* end of gnunet-service-datastore.c */