update statistics API to use new MQ API style, also get rid of timeout argument
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51 /**
52  * Name under which we store current space consumption.
53  */
54 static char *quota_stat_name;
55
56 /**
57  * Task to timeout stat GET.
58  */
59 static struct GNUNET_SCHEDULER_Task *stat_timeout_task;
60
61 /**
62  * After how many payload-changing operations
63  * do we sync our statistics?
64  */
65 #define MAX_STAT_SYNC_LAG 50
66
67
68 /**
69  * Our datastore plugin.
70  */
71 struct DatastorePlugin
72 {
73
74   /**
75    * API of the transport as returned by the plugin's
76    * initialization function.
77    */
78   struct GNUNET_DATASTORE_PluginFunctions *api;
79
80   /**
81    * Short name for the plugin (i.e. "sqlite").
82    */
83   char *short_name;
84
85   /**
86    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
87    */
88   char *lib_name;
89
90   /**
91    * Environment this transport service is using
92    * for this plugin.
93    */
94   struct GNUNET_DATASTORE_PluginEnvironment env;
95
96 };
97
98
99 /**
100  * Linked list of active reservations.
101  */
102 struct ReservationList
103 {
104
105   /**
106    * This is a linked list.
107    */
108   struct ReservationList *next;
109
110   /**
111    * Client that made the reservation.
112    */
113   struct GNUNET_SERVER_Client *client;
114
115   /**
116    * Number of bytes (still) reserved.
117    */
118   uint64_t amount;
119
120   /**
121    * Number of items (still) reserved.
122    */
123   uint64_t entries;
124
125   /**
126    * Reservation identifier.
127    */
128   int32_t rid;
129
130 };
131
132
133
134 /**
135  * Our datastore plugin (NULL if not available).
136  */
137 static struct DatastorePlugin *plugin;
138
139 /**
140  * Linked list of space reservations made by clients.
141  */
142 static struct ReservationList *reservations;
143
144 /**
145  * Bloomfilter to quickly tell if we don't have the content.
146  */
147 static struct GNUNET_CONTAINER_BloomFilter *filter;
148
149 /**
150  * Name of our plugin.
151  */
152 static char *plugin_name;
153
154 /**
155  * Our configuration.
156  */
157 static const struct GNUNET_CONFIGURATION_Handle *cfg;
158
159 /**
160  * Handle for reporting statistics.
161  */
162 static struct GNUNET_STATISTICS_Handle *stats;
163
164 /**
165  * How much space are we using for the cache?  (space available for
166  * insertions that will be instantly reclaimed by discarding less
167  * important content --- or possibly whatever we just inserted into
168  * the "cache").
169  */
170 static unsigned long long cache_size;
171
172 /**
173  * How much space have we currently reserved?
174  */
175 static unsigned long long reserved;
176
177 /**
178  * How much data are we currently storing
179  * in the database?
180  */
181 static unsigned long long payload;
182
183 /**
184  * Identity of the task that is used to delete
185  * expired content.
186  */
187 static struct GNUNET_SCHEDULER_Task * expired_kill_task;
188
189 /**
190  * Minimum time that content should have to not be discarded instantly
191  * (time stamp of any content that we've been discarding recently to
192  * stay below the quota).  FOREVER if we had to expire content with
193  * non-zero priority.
194  */
195 static struct GNUNET_TIME_Absolute min_expiration;
196
197 /**
198  * How much space are we allowed to use?
199  */
200 static unsigned long long quota;
201
202 /**
203  * Should the database be dropped on exit?
204  */
205 static int do_drop;
206
207 /**
208  * Should we refresh the BF when the DB is loaded?
209  */
210 static int refresh_bf;
211
212 /**
213  * Number of updates that were made to the
214  * payload value since we last synchronized
215  * it with the statistics service.
216  */
217 static unsigned int last_sync;
218
219 /**
220  * Did we get an answer from statistics?
221  */
222 static int stats_worked;
223
224
225 /**
226  * Synchronize our utilization statistics with the
227  * statistics service.
228  */
229 static void
230 sync_stats ()
231 {
232   GNUNET_STATISTICS_set (stats,
233                          quota_stat_name,
234                          payload,
235                          GNUNET_YES);
236   GNUNET_STATISTICS_set (stats,
237                          "# utilization by current datastore",
238                          payload,
239                          GNUNET_NO);
240   last_sync = 0;
241 }
242
243
244 /**
245  * Context for transmitting replies to clients.
246  */
247 struct TransmitCallbackContext
248 {
249
250   /**
251    * We keep these in a doubly-linked list (for cleanup).
252    */
253   struct TransmitCallbackContext *next;
254
255   /**
256    * We keep these in a doubly-linked list (for cleanup).
257    */
258   struct TransmitCallbackContext *prev;
259
260   /**
261    * The message that we're asked to transmit.
262    */
263   struct GNUNET_MessageHeader *msg;
264
265   /**
266    * Handle for the transmission request.
267    */
268   struct GNUNET_SERVER_TransmitHandle *th;
269
270   /**
271    * Client that we are transmitting to.
272    */
273   struct GNUNET_SERVER_Client *client;
274
275 };
276
277
278 /**
279  * Head of the doubly-linked list (for cleanup).
280  */
281 static struct TransmitCallbackContext *tcc_head;
282
283 /**
284  * Tail of the doubly-linked list (for cleanup).
285  */
286 static struct TransmitCallbackContext *tcc_tail;
287
288 /**
289  * Have we already cleaned up the TCCs and are hence no longer
290  * willing (or able) to transmit anything to anyone?
291  */
292 static int cleaning_done;
293
294 /**
295  * Handle for pending get request.
296  */
297 static struct GNUNET_STATISTICS_GetHandle *stat_get;
298
299 /**
300  * Handle to our server.
301  */
302 static struct GNUNET_SERVER_Handle *server;
303
304 /**
305  * Task that is used to remove expired entries from
306  * the datastore.  This task will schedule itself
307  * again automatically to always delete all expired
308  * content quickly.
309  *
310  * @param cls not used
311  */
312 static void
313 delete_expired (void *cls);
314
315
316 /**
317  * Iterate over the expired items stored in the datastore.
318  * Delete all expired items; once we have processed all
319  * expired items, re-schedule the "delete_expired" task.
320  *
321  * @param cls not used
322  * @param key key for the content
323  * @param size number of bytes in data
324  * @param data content stored
325  * @param type type of the content
326  * @param priority priority of the content
327  * @param anonymity anonymity-level for the content
328  * @param expiration expiration time for the content
329  * @param uid unique identifier for the datum;
330  *        maybe 0 if no unique identifier is available
331  *
332  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
333  *         (continue on call to "next", of course),
334  *         #GNUNET_NO to delete the item and continue (if supported)
335  */
336 static int
337 expired_processor (void *cls,
338                    const struct GNUNET_HashCode *key,
339                    uint32_t size,
340                    const void *data,
341                    enum GNUNET_BLOCK_Type type,
342                    uint32_t priority,
343                    uint32_t anonymity,
344                    struct GNUNET_TIME_Absolute expiration,
345                    uint64_t uid)
346 {
347   struct GNUNET_TIME_Absolute now;
348
349   if (key == NULL)
350   {
351     expired_kill_task =
352         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
353                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
354                                                     &delete_expired, NULL);
355     return GNUNET_SYSERR;
356   }
357   now = GNUNET_TIME_absolute_get ();
358   if (expiration.abs_value_us > now.abs_value_us)
359   {
360     /* finished processing */
361     expired_kill_task =
362         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
363                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
364                                                     &delete_expired, NULL);
365     return GNUNET_SYSERR;
366   }
367   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368               "Deleting content `%s' of type %u that expired %s ago\n",
369               GNUNET_h2s (key), type,
370               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_difference (expiration,
371                                                                                            now),
372                                                       GNUNET_YES));
373   min_expiration = now;
374   GNUNET_STATISTICS_update (stats,
375                             gettext_noop ("# bytes expired"),
376                             size,
377                             GNUNET_YES);
378   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
379   expired_kill_task =
380       GNUNET_SCHEDULER_add_delayed_with_priority (MIN_EXPIRE_DELAY,
381                                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
382                                                   &delete_expired, NULL);
383   return GNUNET_NO;
384 }
385
386
387 /**
388  * Task that is used to remove expired entries from
389  * the datastore.  This task will schedule itself
390  * again automatically to always delete all expired
391  * content quickly.
392  *
393  * @param cls not used
394  */
395 static void
396 delete_expired (void *cls)
397 {
398   expired_kill_task = NULL;
399   plugin->api->get_expiration (plugin->api->cls,
400                                &expired_processor,
401                                NULL);
402 }
403
404
405 /**
406  * An iterator over a set of items stored in the datastore
407  * that deletes until we're happy with respect to our quota.
408  *
409  * @param cls closure
410  * @param key key for the content
411  * @param size number of bytes in data
412  * @param data content stored
413  * @param type type of the content
414  * @param priority priority of the content
415  * @param anonymity anonymity-level for the content
416  * @param expiration expiration time for the content
417  * @param uid unique identifier for the datum;
418  *        maybe 0 if no unique identifier is available
419  *
420  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
421  *         (continue on call to "next", of course),
422  *         GNUNET_NO to delete the item and continue (if supported)
423  */
424 static int
425 quota_processor (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
426                  const void *data, enum GNUNET_BLOCK_Type type,
427                  uint32_t priority, uint32_t anonymity,
428                  struct GNUNET_TIME_Absolute expiration, uint64_t uid)
429 {
430   unsigned long long *need = cls;
431
432   if (NULL == key)
433     return GNUNET_SYSERR;
434   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
435               "Deleting %llu bytes of low-priority (%u) content `%s' of type %u at %s prior to expiration (still trying to free another %llu bytes)\n",
436               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
437               (unsigned int) priority,
438               GNUNET_h2s (key), type,
439               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
440                                                       GNUNET_YES),
441               *need);
442   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
443     *need = 0;
444   else
445     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
446   if (priority > 0)
447     min_expiration = GNUNET_TIME_UNIT_FOREVER_ABS;
448   else
449     min_expiration = expiration;
450   GNUNET_STATISTICS_update (stats,
451                             gettext_noop ("# bytes purged (low-priority)"),
452                             size, GNUNET_YES);
453   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
454   return GNUNET_NO;
455 }
456
457
458 /**
459  * Manage available disk space by running tasks
460  * that will discard content if necessary.  This
461  * function will be run whenever a request for
462  * "need" bytes of storage could only be satisfied
463  * by eating into the "cache" (and we want our cache
464  * space back).
465  *
466  * @param need number of bytes of content that were
467  *        placed into the "cache" (and hence the
468  *        number of bytes that should be removed).
469  */
470 static void
471 manage_space (unsigned long long need)
472 {
473   unsigned long long last;
474
475   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
476               "Asked to free up %llu bytes of cache space\n", need);
477   last = 0;
478   while ((need > 0) && (last != need))
479   {
480     last = need;
481     plugin->api->get_expiration (plugin->api->cls, &quota_processor, &need);
482   }
483 }
484
485
486 /**
487  * Function called to notify a client about the socket
488  * begin ready to queue more data.  "buf" will be
489  * NULL and "size" zero if the socket was closed for
490  * writing in the meantime.
491  *
492  * @param cls closure
493  * @param size number of bytes available in buf
494  * @param buf where the callee should write the message
495  * @return number of bytes written to buf
496  */
497 static size_t
498 transmit_callback (void *cls, size_t size, void *buf)
499 {
500   struct TransmitCallbackContext *tcc = cls;
501   size_t msize;
502
503   tcc->th = NULL;
504   GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
505   msize = ntohs (tcc->msg->size);
506   if (size == 0)
507   {
508     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
509                 _("Transmission to client failed!\n"));
510     GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
511     GNUNET_SERVER_client_drop (tcc->client);
512     GNUNET_free (tcc->msg);
513     GNUNET_free (tcc);
514     return 0;
515   }
516   GNUNET_assert (size >= msize);
517   memcpy (buf, tcc->msg, msize);
518   GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
519   GNUNET_SERVER_client_drop (tcc->client);
520   GNUNET_free (tcc->msg);
521   GNUNET_free (tcc);
522   return msize;
523 }
524
525
526 /**
527  * Transmit the given message to the client.
528  *
529  * @param client target of the message
530  * @param msg message to transmit, will be freed!
531  */
532 static void
533 transmit (struct GNUNET_SERVER_Client *client, struct GNUNET_MessageHeader *msg)
534 {
535   struct TransmitCallbackContext *tcc;
536
537   if (GNUNET_YES == cleaning_done)
538   {
539     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
540                 _("Shutdown in progress, aborting transmission.\n"));
541     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
542     GNUNET_free (msg);
543     return;
544   }
545   tcc = GNUNET_new (struct TransmitCallbackContext);
546   tcc->msg = msg;
547   tcc->client = client;
548   if (NULL ==
549       (tcc->th =
550        GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
551                                             GNUNET_TIME_UNIT_FOREVER_REL,
552                                             &transmit_callback, tcc)))
553   {
554     GNUNET_break (0);
555     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
556     GNUNET_free (msg);
557     GNUNET_free (tcc);
558     return;
559   }
560   GNUNET_SERVER_client_keep (client);
561   GNUNET_CONTAINER_DLL_insert (tcc_head, tcc_tail, tcc);
562 }
563
564
565 /**
566  * Transmit a status code to the client.
567  *
568  * @param client receiver of the response
569  * @param code status code
570  * @param msg optional error message (can be NULL)
571  */
572 static void
573 transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg)
574 {
575   struct StatusMessage *sm;
576   size_t slen;
577
578   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
579               "Transmitting `%s' message with value %d and message `%s'\n",
580               "STATUS", code, msg != NULL ? msg : "(none)");
581   slen = (msg == NULL) ? 0 : strlen (msg) + 1;
582   sm = GNUNET_malloc (sizeof (struct StatusMessage) + slen);
583   sm->header.size = htons (sizeof (struct StatusMessage) + slen);
584   sm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
585   sm->status = htonl (code);
586   sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration);
587   if (slen > 0)
588     memcpy (&sm[1], msg, slen);
589   transmit (client, &sm->header);
590 }
591
592
593 /**
594  * Function that will transmit the given datastore entry
595  * to the client.
596  *
597  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
598  * @param key key for the content
599  * @param size number of bytes in data
600  * @param data content stored
601  * @param type type of the content
602  * @param priority priority of the content
603  * @param anonymity anonymity-level for the content
604  * @param expiration expiration time for the content
605  * @param uid unique identifier for the datum;
606  *        maybe 0 if no unique identifier is available
607  *
608  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
609  *         GNUNET_NO to delete the item and continue (if supported)
610  */
611 static int
612 transmit_item (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
613                const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
614                uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
615                uint64_t uid)
616 {
617   struct GNUNET_SERVER_Client *client = cls;
618   struct GNUNET_MessageHeader *end;
619   struct DataMessage *dm;
620
621   if (key == NULL)
622   {
623     /* transmit 'DATA_END' */
624     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' message\n",
625                 "DATA_END");
626     end = GNUNET_new (struct GNUNET_MessageHeader);
627     end->size = htons (sizeof (struct GNUNET_MessageHeader));
628     end->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
629     transmit (client, end);
630     GNUNET_SERVER_client_drop (client);
631     return GNUNET_OK;
632   }
633   GNUNET_assert (sizeof (struct DataMessage) + size <
634                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
635   dm = GNUNET_malloc (sizeof (struct DataMessage) + size);
636   dm->header.size = htons (sizeof (struct DataMessage) + size);
637   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
638   dm->rid = htonl (0);
639   dm->size = htonl (size);
640   dm->type = htonl (type);
641   dm->priority = htonl (priority);
642   dm->anonymity = htonl (anonymity);
643   dm->replication = htonl (0);
644   dm->reserved = htonl (0);
645   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
646   dm->uid = GNUNET_htonll (uid);
647   dm->key = *key;
648   memcpy (&dm[1], data, size);
649   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
650               "Transmitting `%s' message for `%s' of type %u with expiration %s (in: %s)\n",
651               "DATA", GNUNET_h2s (key), type,
652               GNUNET_STRINGS_absolute_time_to_string (expiration),
653               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
654                                                       GNUNET_YES));
655   GNUNET_STATISTICS_update (stats,
656                             gettext_noop ("# results found"),
657                             1,
658                             GNUNET_NO);
659   transmit (client, &dm->header);
660   GNUNET_SERVER_client_drop (client);
661   return GNUNET_OK;
662 }
663
664
665 /**
666  * Handle RESERVE-message.
667  *
668  * @param cls closure
669  * @param client identification of the client
670  * @param message the actual message
671  */
672 static void
673 handle_reserve (void *cls, struct GNUNET_SERVER_Client *client,
674                 const struct GNUNET_MessageHeader *message)
675 {
676   /**
677    * Static counter to produce reservation identifiers.
678    */
679   static int reservation_gen;
680
681   const struct ReserveMessage *msg = (const struct ReserveMessage *) message;
682   struct ReservationList *e;
683   unsigned long long used;
684   unsigned long long req;
685   uint64_t amount;
686   uint32_t entries;
687
688   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
689               "Processing RESERVE request\n");
690   amount = GNUNET_ntohll (msg->amount);
691   entries = ntohl (msg->entries);
692   used = payload + reserved;
693   req =
694       amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
695   if (used + req > quota)
696   {
697     if (quota < used)
698       used = quota;             /* cheat a bit for error message (to avoid negative numbers) */
699     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
700                 _
701                 ("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
702                 quota - used, "RESERVE", req);
703     if (cache_size < req)
704     {
705       /* TODO: document this in the FAQ; essentially, if this
706        * message happens, the insertion request could be blocked
707        * by less-important content from migration because it is
708        * larger than 1/8th of the overall available space, and
709        * we only reserve 1/8th for "fresh" insertions */
710       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
711                   _
712                   ("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
713                   req, cache_size);
714       transmit_status (client, 0,
715                        gettext_noop
716                        ("Insufficient space to satisfy request and "
717                         "requested amount is larger than cache size"));
718     }
719     else
720     {
721       transmit_status (client, 0,
722                        gettext_noop ("Insufficient space to satisfy request"));
723     }
724     return;
725   }
726   reserved += req;
727   GNUNET_STATISTICS_set (stats,
728                          gettext_noop ("# reserved"),
729                          reserved,
730                          GNUNET_NO);
731   e = GNUNET_new (struct ReservationList);
732   e->next = reservations;
733   reservations = e;
734   e->client = client;
735   e->amount = amount;
736   e->entries = entries;
737   e->rid = ++reservation_gen;
738   if (reservation_gen < 0)
739     reservation_gen = 0;        /* wrap around */
740   transmit_status (client, e->rid, NULL);
741 }
742
743
744 /**
745  * Handle RELEASE_RESERVE-message.
746  *
747  * @param cls closure
748  * @param client identification of the client
749  * @param message the actual message
750  */
751 static void
752 handle_release_reserve (void *cls,
753                         struct GNUNET_SERVER_Client *client,
754                         const struct GNUNET_MessageHeader *message)
755 {
756   const struct ReleaseReserveMessage *msg =
757       (const struct ReleaseReserveMessage *) message;
758   struct ReservationList *pos;
759   struct ReservationList *prev;
760   struct ReservationList *next;
761   int rid = ntohl (msg->rid);
762   unsigned long long rem;
763
764   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
765               "Processing RELEASE_RESERVE request\n");
766   next = reservations;
767   prev = NULL;
768   while (NULL != (pos = next))
769   {
770     next = pos->next;
771     if (rid == pos->rid)
772     {
773       if (prev == NULL)
774         reservations = next;
775       else
776         prev->next = next;
777       rem =
778           pos->amount +
779           ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
780       GNUNET_assert (reserved >= rem);
781       reserved -= rem;
782       GNUNET_STATISTICS_set (stats,
783                              gettext_noop ("# reserved"),
784                              reserved,
785                              GNUNET_NO);
786       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787                   "Returning %llu remaining reserved bytes to storage pool\n",
788                   rem);
789       GNUNET_free (pos);
790       transmit_status (client, GNUNET_OK, NULL);
791       return;
792     }
793     prev = pos;
794   }
795   GNUNET_break (0);
796   transmit_status (client, GNUNET_SYSERR,
797                    gettext_noop ("Could not find matching reservation"));
798 }
799
800
801 /**
802  * Check that the given message is a valid data message.
803  *
804  * @return NULL if the message is not well-formed, otherwise the message
805  */
806 static const struct DataMessage *
807 check_data (const struct GNUNET_MessageHeader *message)
808 {
809   uint16_t size;
810   uint32_t dsize;
811   const struct DataMessage *dm;
812
813   size = ntohs (message->size);
814   if (size < sizeof (struct DataMessage))
815   {
816     GNUNET_break (0);
817     return NULL;
818   }
819   dm = (const struct DataMessage *) message;
820   dsize = ntohl (dm->size);
821   if (size != dsize + sizeof (struct DataMessage))
822   {
823     GNUNET_break (0);
824     return NULL;
825   }
826   return dm;
827 }
828
829
830 /**
831  * Context for a PUT request used to see if the content is
832  * already present.
833  */
834 struct PutContext
835 {
836   /**
837    * Client to notify on completion.
838    */
839   struct GNUNET_SERVER_Client *client;
840
841 #if ! HAVE_UNALIGNED_64_ACCESS
842   void *reserved;
843 #endif
844
845   /* followed by the 'struct DataMessage' */
846 };
847
848
849 /**
850  * Put continuation.
851  *
852  * @param cls closure
853  * @param key key for the item stored
854  * @param size size of the item stored
855  * @param status #GNUNET_OK or #GNUNET_SYSERROR
856  * @param msg error message on error
857  */
858 static void
859 put_continuation (void *cls,
860                   const struct GNUNET_HashCode *key,
861                   uint32_t size,
862                   int status,
863                   const char *msg)
864 {
865   struct PutContext *pc = cls;
866
867   if (GNUNET_OK == status)
868   {
869     GNUNET_STATISTICS_update (stats,
870                               gettext_noop ("# bytes stored"),
871                               size,
872                               GNUNET_YES);
873     GNUNET_CONTAINER_bloomfilter_add (filter, key);
874     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
875                 "Successfully stored %u bytes under key `%s'\n",
876                 size, GNUNET_h2s (key));
877   }
878   transmit_status (pc->client, status, msg);
879   GNUNET_SERVER_client_drop (pc->client);
880   GNUNET_free (pc);
881   if (quota - reserved - cache_size < payload)
882   {
883     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
884                 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
885                 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
886                 (unsigned long long) (quota - reserved - cache_size),
887                 (unsigned long long) payload);
888     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
889   }
890 }
891
892
893 /**
894  * Actually put the data message.
895  *
896  * @param pc put context
897  */
898 static void
899 execute_put (struct PutContext *pc)
900 {
901   const struct DataMessage *dm;
902
903   dm = (const struct DataMessage *) &pc[1];
904   plugin->api->put (plugin->api->cls, &dm->key, ntohl (dm->size), &dm[1],
905                     ntohl (dm->type), ntohl (dm->priority),
906                     ntohl (dm->anonymity), ntohl (dm->replication),
907                     GNUNET_TIME_absolute_ntoh (dm->expiration),
908                     &put_continuation, pc);
909 }
910
911
912 /**
913  *
914  * @param cls closure
915  * @param status #GNUNET_OK or #GNUNET_SYSERR
916  * @param msg error message on error
917  */
918 static void
919 check_present_continuation (void *cls,
920                             int status,
921                             const char *msg)
922 {
923   struct GNUNET_SERVER_Client *client = cls;
924
925   transmit_status (client, GNUNET_NO, NULL);
926   GNUNET_SERVER_client_drop (client);
927 }
928
929
930 /**
931  * Function that will check if the given datastore entry
932  * matches the put and if none match executes the put.
933  *
934  * @param cls closure, pointer to the client (of type `struct PutContext`).
935  * @param key key for the content
936  * @param size number of bytes in data
937  * @param data content stored
938  * @param type type of the content
939  * @param priority priority of the content
940  * @param anonymity anonymity-level for the content
941  * @param expiration expiration time for the content
942  * @param uid unique identifier for the datum;
943  *        maybe 0 if no unique identifier is available
944  * @return #GNUNET_OK usually
945  *         #GNUNET_NO to delete the item
946  */
947 static int
948 check_present (void *cls,
949                const struct GNUNET_HashCode *key,
950                uint32_t size,
951                const void *data,
952                enum GNUNET_BLOCK_Type type,
953                uint32_t priority,
954                uint32_t anonymity,
955                struct GNUNET_TIME_Absolute expiration,
956                uint64_t uid)
957 {
958   struct PutContext *pc = cls;
959   const struct DataMessage *dm;
960
961   dm = (const struct DataMessage *) &pc[1];
962   if (key == NULL)
963   {
964     execute_put (pc);
965     return GNUNET_OK;
966   }
967   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
968        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
969        ( (size == ntohl (dm->size)) &&
970          (0 == memcmp (&dm[1], data, size)) ) )
971   {
972     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
973                 "Result already present in datastore\n");
974     /* FIXME: change API to allow increasing 'replication' counter */
975     if ((ntohl (dm->priority) > 0) ||
976         (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value_us >
977          expiration.abs_value_us))
978       plugin->api->update (plugin->api->cls,
979                            uid,
980                            (int32_t) ntohl (dm->priority),
981                            GNUNET_TIME_absolute_ntoh (dm->expiration),
982                            &check_present_continuation,
983                            pc->client);
984     else
985     {
986       transmit_status (pc->client, GNUNET_NO, NULL);
987       GNUNET_SERVER_client_drop (pc->client);
988     }
989     GNUNET_free (pc);
990   }
991   else
992   {
993     execute_put (pc);
994   }
995   return GNUNET_OK;
996 }
997
998
999 /**
1000  * Handle PUT-message.
1001  *
1002  * @param cls closure
1003  * @param client identification of the client
1004  * @param message the actual message
1005  */
1006 static void
1007 handle_put (void *cls,
1008             struct GNUNET_SERVER_Client *client,
1009             const struct GNUNET_MessageHeader *message)
1010 {
1011   const struct DataMessage *dm = check_data (message);
1012   int rid;
1013   struct ReservationList *pos;
1014   struct PutContext *pc;
1015   struct GNUNET_HashCode vhash;
1016   uint32_t size;
1017
1018   if ((dm == NULL) || (ntohl (dm->type) == 0))
1019   {
1020     GNUNET_break (0);
1021     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1022     return;
1023   }
1024   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1025               "Processing PUT request for `%s' of type %u\n",
1026               GNUNET_h2s (&dm->key),
1027               ntohl (dm->type));
1028   rid = ntohl (dm->rid);
1029   size = ntohl (dm->size);
1030   if (rid > 0)
1031   {
1032     pos = reservations;
1033     while ((NULL != pos) && (rid != pos->rid))
1034       pos = pos->next;
1035     GNUNET_break (pos != NULL);
1036     if (NULL != pos)
1037     {
1038       GNUNET_break (pos->entries > 0);
1039       GNUNET_break (pos->amount >= size);
1040       pos->entries--;
1041       pos->amount -= size;
1042       reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1043       GNUNET_STATISTICS_set (stats,
1044                              gettext_noop ("# reserved"),
1045                              reserved,
1046                              GNUNET_NO);
1047     }
1048   }
1049   pc = GNUNET_malloc (sizeof (struct PutContext) + size +
1050                       sizeof (struct DataMessage));
1051   pc->client = client;
1052   GNUNET_SERVER_client_keep (client);
1053   memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1054   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter, &dm->key))
1055   {
1056     GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
1057     plugin->api->get_key (plugin->api->cls,
1058                           0,
1059                           &dm->key,
1060                           &vhash,
1061                           ntohl (dm->type),
1062                           &check_present,
1063                           pc);
1064     return;
1065   }
1066   execute_put (pc);
1067 }
1068
1069
1070 /**
1071  * Handle GET-message.
1072  *
1073  * @param cls closure
1074  * @param client identification of the client
1075  * @param message the actual message
1076  */
1077 static void
1078 handle_get (void *cls,
1079             struct GNUNET_SERVER_Client *client,
1080             const struct GNUNET_MessageHeader *message)
1081 {
1082   const struct GetMessage *msg;
1083   uint16_t size;
1084
1085   size = ntohs (message->size);
1086   if ((size != sizeof (struct GetMessage)) &&
1087       (size != sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode)))
1088   {
1089     GNUNET_break (0);
1090     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1091     return;
1092   }
1093   msg = (const struct GetMessage *) message;
1094   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095               "Processing GET request for `%s' of type %u\n",
1096               GNUNET_h2s (&msg->key),
1097               ntohl (msg->type));
1098   GNUNET_STATISTICS_update (stats,
1099                             gettext_noop ("# GET requests received"),
1100                             1,
1101                             GNUNET_NO);
1102   GNUNET_SERVER_client_keep (client);
1103   if ( (size == sizeof (struct GetMessage)) &&
1104        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)) )
1105   {
1106     /* don't bother database... */
1107     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1108                 "Empty result set for GET request for `%s' (bloomfilter).\n",
1109                 GNUNET_h2s (&msg->key));
1110     GNUNET_STATISTICS_update (stats,
1111                               gettext_noop
1112                               ("# requests filtered by bloomfilter"),
1113                               1,
1114                               GNUNET_NO);
1115     transmit_item (client, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1116                    0);
1117     return;
1118   }
1119   plugin->api->get_key (plugin->api->cls, GNUNET_ntohll (msg->offset),
1120                         ((size ==
1121                           sizeof (struct GetMessage)) ? &msg->key : NULL), NULL,
1122                         ntohl (msg->type), &transmit_item, client);
1123 }
1124
1125
1126 /**
1127  * Function called with the result of an update operation.
1128  *
1129  * @param cls closure
1130  * @param status #GNUNET_OK or #GNUNET_SYSERR
1131  * @param msg error message on error
1132  */
1133 static void
1134 update_continuation (void *cls,
1135                      int status,
1136                      const char *msg)
1137 {
1138   struct GNUNET_SERVER_Client *client = cls;
1139
1140   transmit_status (client, status, msg);
1141   GNUNET_SERVER_client_drop (client);
1142 }
1143
1144
1145 /**
1146  * Handle UPDATE-message.
1147  *
1148  * @param cls closure
1149  * @param client identification of the client
1150  * @param message the actual message
1151  */
1152 static void
1153 handle_update (void *cls,
1154                struct GNUNET_SERVER_Client *client,
1155                const struct GNUNET_MessageHeader *message)
1156 {
1157   const struct UpdateMessage *msg;
1158
1159   GNUNET_STATISTICS_update (stats,
1160                             gettext_noop ("# UPDATE requests received"),
1161                             1,
1162                             GNUNET_NO);
1163   msg = (const struct UpdateMessage *) message;
1164   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1165               "Processing UPDATE request for %llu\n",
1166               (unsigned long long) GNUNET_ntohll (msg->uid));
1167   GNUNET_SERVER_client_keep (client);
1168   plugin->api->update (plugin->api->cls,
1169                        GNUNET_ntohll (msg->uid),
1170                        (int32_t) ntohl (msg->priority),
1171                        GNUNET_TIME_absolute_ntoh (msg->expiration),
1172                        &update_continuation, client);
1173 }
1174
1175
1176 /**
1177  * Handle GET_REPLICATION-message.
1178  *
1179  * @param cls closure
1180  * @param client identification of the client
1181  * @param message the actual message
1182  */
1183 static void
1184 handle_get_replication (void *cls,
1185                         struct GNUNET_SERVER_Client *client,
1186                         const struct GNUNET_MessageHeader *message)
1187 {
1188   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1189               "Processing `%s' request\n",
1190               "GET_REPLICATION");
1191   GNUNET_STATISTICS_update (stats,
1192                             gettext_noop ("# GET REPLICATION requests received"),
1193                             1,
1194                             GNUNET_NO);
1195   GNUNET_SERVER_client_keep (client);
1196   plugin->api->get_replication (plugin->api->cls,
1197                                 &transmit_item, client);
1198 }
1199
1200
1201 /**
1202  * Handle GET_ZERO_ANONYMITY-message.
1203  *
1204  * @param cls closure
1205  * @param client identification of the client
1206  * @param message the actual message
1207  */
1208 static void
1209 handle_get_zero_anonymity (void *cls,
1210                            struct GNUNET_SERVER_Client *client,
1211                            const struct GNUNET_MessageHeader *message)
1212 {
1213   const struct GetZeroAnonymityMessage *msg =
1214       (const struct GetZeroAnonymityMessage *) message;
1215   enum GNUNET_BLOCK_Type type;
1216
1217   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1218   if (type == GNUNET_BLOCK_TYPE_ANY)
1219   {
1220     GNUNET_break (0);
1221     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1222     return;
1223   }
1224   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1225               "Processing `%s' request\n",
1226               "GET_ZERO_ANONYMITY");
1227   GNUNET_STATISTICS_update (stats,
1228                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1229                             1,
1230                             GNUNET_NO);
1231   GNUNET_SERVER_client_keep (client);
1232   plugin->api->get_zero_anonymity (plugin->api->cls,
1233                                    GNUNET_ntohll (msg->offset),
1234                                    type,
1235                                    &transmit_item, client);
1236 }
1237
1238
1239 /**
1240  * Callback function that will cause the item that is passed
1241  * in to be deleted (by returning #GNUNET_NO).
1242  *
1243  * @param cls closure
1244  * @param key key for the content
1245  * @param size number of bytes in data
1246  * @param data content stored
1247  * @param type type of the content
1248  * @param priority priority of the content
1249  * @param anonymity anonymity-level for the content
1250  * @param expiration expiration time for the content
1251  * @param uid unique identifier for the datum
1252  * @return #GNUNET_OK to keep the item
1253  *         #GNUNET_NO to delete the item
1254  */
1255 static int
1256 remove_callback (void *cls,
1257                  const struct GNUNET_HashCode *key,
1258                  uint32_t size,
1259                  const void *data,
1260                  enum GNUNET_BLOCK_Type type,
1261                  uint32_t priority,
1262                  uint32_t anonymity,
1263                  struct GNUNET_TIME_Absolute expiration,
1264                  uint64_t uid)
1265 {
1266   struct GNUNET_SERVER_Client *client = cls;
1267
1268   if (NULL == key)
1269   {
1270     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1271                 "No further matches for REMOVE request.\n");
1272     transmit_status (client,
1273                      GNUNET_NO,
1274                      _("Content not found"));
1275     GNUNET_SERVER_client_drop (client);
1276     return GNUNET_OK;           /* last item */
1277   }
1278   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1279               "Item %llu matches REMOVE request for key `%s' and type %u.\n",
1280               (unsigned long long) uid,
1281               GNUNET_h2s (key),
1282               type);
1283   GNUNET_STATISTICS_update (stats,
1284                             gettext_noop ("# bytes removed (explicit request)"),
1285                             size,
1286                             GNUNET_YES);
1287   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
1288   transmit_status (client, GNUNET_OK, NULL);
1289   GNUNET_SERVER_client_drop (client);
1290   return GNUNET_NO;
1291 }
1292
1293
1294 /**
1295  * Handle REMOVE-message.
1296  *
1297  * @param cls closure
1298  * @param client identification of the client
1299  * @param message the actual message
1300  */
1301 static void
1302 handle_remove (void *cls,
1303                struct GNUNET_SERVER_Client *client,
1304                const struct GNUNET_MessageHeader *message)
1305 {
1306   const struct DataMessage *dm = check_data (message);
1307   struct GNUNET_HashCode vhash;
1308
1309   if (NULL == dm)
1310   {
1311     GNUNET_break (0);
1312     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1313     return;
1314   }
1315   GNUNET_STATISTICS_update (stats,
1316                             gettext_noop ("# REMOVE requests received"),
1317                             1, GNUNET_NO);
1318   GNUNET_SERVER_client_keep (client);
1319   GNUNET_CRYPTO_hash (&dm[1],
1320                       ntohl (dm->size),
1321                       &vhash);
1322   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1323               "Processing REMOVE request for `%s' of type %u\n",
1324               GNUNET_h2s (&dm->key),
1325               ntohl (dm->type));
1326   plugin->api->get_key (plugin->api->cls,
1327                         0,
1328                         &dm->key,
1329                         &vhash,
1330                         (enum GNUNET_BLOCK_Type) ntohl (dm->type),
1331                         &remove_callback, client);
1332 }
1333
1334
1335 /**
1336  * Handle DROP-message.
1337  *
1338  * @param cls closure
1339  * @param client identification of the client
1340  * @param message the actual message
1341  */
1342 static void
1343 handle_drop (void *cls,
1344              struct GNUNET_SERVER_Client *client,
1345              const struct GNUNET_MessageHeader *message)
1346 {
1347   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1348               "Processing DROP request\n");
1349   do_drop = GNUNET_YES;
1350   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1351 }
1352
1353
1354 /**
1355  * Function called by plugins to notify us about a
1356  * change in their disk utilization.
1357  *
1358  * @param cls closure (NULL)
1359  * @param delta change in disk utilization,
1360  *        0 for "reset to empty"
1361  */
1362 static void
1363 disk_utilization_change_cb (void *cls,
1364                             int delta)
1365 {
1366   if ((delta < 0) && (payload < -delta))
1367   {
1368     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1369                 _("Datastore payload must have been inaccurate (%lld < %lld). Recomputing it.\n"),
1370                 (long long) payload,
1371                 (long long) -delta);
1372     plugin->api->estimate_size (plugin->api->cls, &payload);
1373     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1374                 _("New payload: %lld\n"),
1375                 (long long) payload);
1376      sync_stats ();
1377     return;
1378   }
1379   payload += delta;
1380   last_sync++;
1381   if (last_sync >= MAX_STAT_SYNC_LAG)
1382     sync_stats ();
1383 }
1384
1385
1386 /**
1387  * Callback function to process statistic values.
1388  *
1389  * @param cls closure (struct Plugin*)
1390  * @param subsystem name of subsystem that created the statistic
1391  * @param name the name of the datum
1392  * @param value the current value
1393  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
1394  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
1395  */
1396 static int
1397 process_stat_in (void *cls,
1398                  const char *subsystem,
1399                  const char *name,
1400                  uint64_t value,
1401                  int is_persistent)
1402 {
1403   GNUNET_assert (GNUNET_NO == stats_worked);
1404   stats_worked = GNUNET_YES;
1405   payload += value;
1406   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1407               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1408               (unsigned long long) value,
1409               (unsigned long long) payload);
1410   return GNUNET_OK;
1411 }
1412
1413
1414 /**
1415  * Load the datastore plugin.
1416  */
1417 static struct DatastorePlugin *
1418 load_plugin ()
1419 {
1420   struct DatastorePlugin *ret;
1421   char *libname;
1422
1423   ret = GNUNET_new (struct DatastorePlugin);
1424   ret->env.cfg = cfg;
1425   ret->env.duc = &disk_utilization_change_cb;
1426   ret->env.cls = NULL;
1427   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1428               _("Loading `%s' datastore plugin\n"),
1429               plugin_name);
1430   GNUNET_asprintf (&libname,
1431                    "libgnunet_plugin_datastore_%s",
1432                    plugin_name);
1433   ret->short_name = GNUNET_strdup (plugin_name);
1434   ret->lib_name = libname;
1435   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1436   if (NULL == ret->api)
1437   {
1438     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1439                 _("Failed to load datastore plugin for `%s'\n"),
1440                 plugin_name);
1441     GNUNET_free (ret->short_name);
1442     GNUNET_free (libname);
1443     GNUNET_free (ret);
1444     return NULL;
1445   }
1446   return ret;
1447 }
1448
1449
1450 /**
1451  * Function called when the service shuts
1452  * down.  Unloads our datastore plugin.
1453  *
1454  * @param plug plugin to unload
1455  */
1456 static void
1457 unload_plugin (struct DatastorePlugin *plug)
1458 {
1459   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1460               "Datastore service is unloading plugin...\n");
1461   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1462   GNUNET_free (plug->lib_name);
1463   GNUNET_free (plug->short_name);
1464   GNUNET_free (plug);
1465 }
1466
1467
1468 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1469   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1470    sizeof (struct ReserveMessage)},
1471   {&handle_release_reserve, NULL,
1472    GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1473    sizeof (struct ReleaseReserveMessage)},
1474   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
1475   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1476    sizeof (struct UpdateMessage)},
1477   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
1478   {&handle_get_replication, NULL,
1479    GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1480    sizeof (struct GNUNET_MessageHeader)},
1481   {&handle_get_zero_anonymity, NULL,
1482    GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1483    sizeof (struct GetZeroAnonymityMessage)},
1484   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
1485   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1486    sizeof (struct GNUNET_MessageHeader)},
1487   {NULL, NULL, 0, 0}
1488 };
1489
1490
1491 /**
1492  * Adds a given @a key to the bloomfilter in @a cls @a count times.
1493  *
1494  * @param cls the bloomfilter
1495  * @param key key to add
1496  * @param count number of times to add key
1497  */
1498 static void
1499 add_key_to_bloomfilter (void *cls,
1500                         const struct GNUNET_HashCode *key,
1501                         unsigned int count)
1502 {
1503   struct GNUNET_CONTAINER_BloomFilter *bf = cls;
1504
1505   if (NULL == key)
1506   {
1507     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1508                 _("Bloomfilter construction complete.\n"));
1509     GNUNET_SERVER_add_handlers (server, handlers);
1510     GNUNET_SERVER_resume (server);
1511     expired_kill_task
1512       = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1513                                             &delete_expired,
1514                                             NULL);
1515     return;
1516   }
1517
1518   while (0 < count--)
1519     GNUNET_CONTAINER_bloomfilter_add (bf, key);
1520 }
1521
1522
1523 /**
1524  * We finished receiving the statistic.  Initialize the plugin; if
1525  * loading the statistic failed, run the estimator.
1526  *
1527  * @param cls NULL
1528  * @param success #GNUNET_NO if we failed to read the stat
1529  */
1530 static void
1531 process_stat_done (void *cls,
1532                    int success)
1533 {
1534   stat_get = NULL;
1535   if (NULL != stat_timeout_task)
1536   {
1537     GNUNET_SCHEDULER_cancel (stat_timeout_task);
1538     stat_timeout_task = NULL;
1539   }
1540   plugin = load_plugin ();
1541   if (NULL == plugin)
1542   {
1543     GNUNET_CONTAINER_bloomfilter_free (filter);
1544     filter = NULL;
1545     if (NULL != stats)
1546     {
1547       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1548       stats = NULL;
1549     }
1550     return;
1551   }
1552   if (GNUNET_NO == stats_worked)
1553   {
1554     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1555                 "Failed to obtain value from statistics service, recomputing it\n");
1556     plugin->api->estimate_size (plugin->api->cls,
1557                                 &payload);
1558   }
1559   if (GNUNET_YES == refresh_bf)
1560   {
1561     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1562                 _("Rebuilding bloomfilter.  Please be patient.\n"));
1563     if (NULL != plugin->api->get_keys)
1564     {
1565       plugin->api->get_keys (plugin->api->cls,
1566                              &add_key_to_bloomfilter,
1567                              filter);
1568       return;
1569     }
1570     else
1571       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1572                   _("Plugin does not support get_keys function. Please fix!\n"));
1573
1574     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1575                 _("Bloomfilter construction complete.\n"));
1576   }
1577
1578   GNUNET_SERVER_add_handlers (server, handlers);
1579   GNUNET_SERVER_resume (server);
1580   expired_kill_task
1581     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1582                                           &delete_expired,
1583                                           NULL);
1584 }
1585
1586
1587 /**
1588  * Fetching stats took to long, run without.
1589  *
1590  * @param cls NULL
1591  */
1592 static void
1593 stat_timeout (void *cls)
1594 {
1595   stat_timeout_task = NULL;
1596   GNUNET_STATISTICS_get_cancel (stat_get);
1597   process_stat_done (NULL, GNUNET_NO);
1598 }
1599
1600
1601 /**
1602  * Task run during shutdown.
1603  */
1604 static void
1605 cleaning_task (void *cls)
1606 {
1607   struct TransmitCallbackContext *tcc;
1608
1609   cleaning_done = GNUNET_YES;
1610   while (NULL != (tcc = tcc_head))
1611   {
1612     GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
1613     if (tcc->th != NULL)
1614     {
1615       GNUNET_SERVER_notify_transmit_ready_cancel (tcc->th);
1616       GNUNET_SERVER_client_drop (tcc->client);
1617     }
1618     GNUNET_free (tcc->msg);
1619     GNUNET_free (tcc);
1620   }
1621   if (NULL != expired_kill_task)
1622   {
1623     GNUNET_SCHEDULER_cancel (expired_kill_task);
1624     expired_kill_task = NULL;
1625   }
1626   if (GNUNET_YES == do_drop)
1627     plugin->api->drop (plugin->api->cls);
1628   if (NULL != plugin)
1629   {
1630     unload_plugin (plugin);
1631     plugin = NULL;
1632   }
1633   if (NULL != filter)
1634   {
1635     GNUNET_CONTAINER_bloomfilter_free (filter);
1636     filter = NULL;
1637   }
1638   if (NULL != stat_get)
1639   {
1640     GNUNET_STATISTICS_get_cancel (stat_get);
1641     stat_get = NULL;
1642   }
1643   if (NULL != stat_timeout_task)
1644   {
1645     GNUNET_SCHEDULER_cancel (stat_timeout_task);
1646     stat_timeout_task = NULL;
1647   }
1648   GNUNET_free_non_null (plugin_name);
1649   plugin_name = NULL;
1650   if (last_sync > 0)
1651     sync_stats ();
1652   if (NULL != stats)
1653   {
1654     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1655     stats = NULL;
1656   }
1657   GNUNET_free (quota_stat_name);
1658   quota_stat_name = NULL;
1659 }
1660
1661
1662 /**
1663  * Function that removes all active reservations made
1664  * by the given client and releases the space for other
1665  * requests.
1666  *
1667  * @param cls closure
1668  * @param client identification of the client
1669  */
1670 static void
1671 cleanup_reservations (void *cls,
1672                       struct GNUNET_SERVER_Client *client)
1673 {
1674   struct ReservationList *pos;
1675   struct ReservationList *prev;
1676   struct ReservationList *next;
1677
1678   if (NULL == client)
1679     return;
1680   prev = NULL;
1681   pos = reservations;
1682   while (NULL != pos)
1683   {
1684     next = pos->next;
1685     if (pos->client == client)
1686     {
1687       if (NULL == prev)
1688         reservations = next;
1689       else
1690         prev->next = next;
1691       reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1692       GNUNET_free (pos);
1693     }
1694     else
1695     {
1696       prev = pos;
1697     }
1698     pos = next;
1699   }
1700   GNUNET_STATISTICS_set (stats,
1701                          gettext_noop ("# reserved"),
1702                          reserved,
1703                          GNUNET_NO);
1704 }
1705
1706
1707 /**
1708  * Process datastore requests.
1709  *
1710  * @param cls closure
1711  * @param serv the initialized server
1712  * @param c configuration to use
1713  */
1714 static void
1715 run (void *cls,
1716      struct GNUNET_SERVER_Handle *serv,
1717      const struct GNUNET_CONFIGURATION_Handle *c)
1718 {
1719   char *fn;
1720   char *pfn;
1721   unsigned int bf_size;
1722
1723   server = serv;
1724   cfg = c;
1725   if (GNUNET_OK !=
1726       GNUNET_CONFIGURATION_get_value_string (cfg,
1727                                              "DATASTORE",
1728                                              "DATABASE",
1729                                              &plugin_name))
1730   {
1731     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1732                 _("No `%s' specified for `%s' in configuration!\n"),
1733                 "DATABASE",
1734                 "DATASTORE");
1735     return;
1736   }
1737   GNUNET_asprintf (&quota_stat_name,
1738                    _("# bytes used in file-sharing datastore `%s'"),
1739                    plugin_name);
1740   if (GNUNET_OK !=
1741       GNUNET_CONFIGURATION_get_value_size (cfg, "DATASTORE", "QUOTA", &quota))
1742   {
1743     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1744                 _("No `%s' specified for `%s' in configuration!\n"),
1745                 "QUOTA",
1746                 "DATASTORE");
1747     return;
1748   }
1749   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1750   GNUNET_STATISTICS_set (stats, gettext_noop ("# quota"), quota, GNUNET_NO);
1751   cache_size = quota / 8;       /* Or should we make this an option? */
1752   GNUNET_STATISTICS_set (stats, gettext_noop ("# cache size"), cache_size,
1753                          GNUNET_NO);
1754   if (quota / (32 * 1024LL) > (1 << 31))
1755     bf_size = (1 << 31);          /* absolute limit: ~2 GB, beyond that BF just won't help anyway */
1756   else
1757     bf_size = quota / (32 * 1024LL);         /* 8 bit per entry, 1 bit per 32 kb in DB */
1758   fn = NULL;
1759   if ((GNUNET_OK !=
1760        GNUNET_CONFIGURATION_get_value_filename (cfg,
1761                                                 "DATASTORE",
1762                                                 "BLOOMFILTER",
1763                                                 &fn)) ||
1764       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
1765   {
1766     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1767                 _("Could not use specified filename `%s' for bloomfilter.\n"),
1768                 NULL != fn ? fn : "");
1769     GNUNET_free_non_null (fn);
1770     fn = NULL;
1771   }
1772   if (NULL != fn)
1773   {
1774     GNUNET_asprintf (&pfn, "%s.%s", fn, plugin_name);
1775     if (GNUNET_YES == GNUNET_DISK_file_test (pfn))
1776     {
1777       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1778       if (NULL == filter)
1779       {
1780         /* file exists but not valid, remove and try again, but refresh */
1781         if (0 != UNLINK (pfn))
1782         {
1783           /* failed to remove, run without file */
1784           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1785                       _("Failed to remove bogus bloomfilter file `%s'\n"),
1786                       pfn);
1787           GNUNET_free (pfn);
1788           pfn = NULL;
1789           filter = GNUNET_CONTAINER_bloomfilter_load (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1790           refresh_bf = GNUNET_YES;
1791         }
1792         else
1793         {
1794           /* try again after remove */
1795           filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1796           refresh_bf = GNUNET_YES;
1797           if (NULL == filter)
1798           {
1799             /* failed yet again, give up on using file */
1800             GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1801                         _("Failed to remove bogus bloomfilter file `%s'\n"),
1802                         pfn);
1803             GNUNET_free (pfn);
1804             pfn = NULL;
1805             filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1806           }
1807         }
1808       }
1809       else
1810       {
1811         /* normal case: have an existing valid bf file, no need to refresh */
1812         refresh_bf = GNUNET_NO;
1813       }
1814     }
1815     else
1816     {
1817       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1818       refresh_bf = GNUNET_YES;
1819     }
1820     GNUNET_free (pfn);
1821   }
1822   else
1823   {
1824     filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);      /* approx. 3% false positives at max use */
1825     refresh_bf = GNUNET_YES;
1826   }
1827   GNUNET_free_non_null (fn);
1828   if (NULL == filter)
1829   {
1830     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1831                 _("Failed to initialize bloomfilter.\n"));
1832     if (NULL != stats)
1833     {
1834       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1835       stats = NULL;
1836     }
1837     return;
1838   }
1839   GNUNET_SERVER_suspend (server);
1840   stat_get =
1841       GNUNET_STATISTICS_get (stats,
1842                              "datastore",
1843                              quota_stat_name,
1844                              &process_stat_done,
1845                              &process_stat_in,
1846                              NULL);
1847   if (NULL == stat_get)
1848     process_stat_done (NULL, GNUNET_SYSERR);
1849   else
1850     stat_timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
1851                                                       &stat_timeout,
1852                                                       NULL);
1853   GNUNET_SERVER_disconnect_notify (server,
1854                                    &cleanup_reservations,
1855                                    NULL);
1856   GNUNET_SCHEDULER_add_shutdown (&cleaning_task,
1857                                  NULL);
1858 }
1859
1860
1861 /**
1862  * The main function for the datastore service.
1863  *
1864  * @param argc number of arguments from the command line
1865  * @param argv command line arguments
1866  * @return 0 ok, 1 on error
1867  */
1868 int
1869 main (int argc,
1870       char *const *argv)
1871 {
1872   int ret;
1873
1874   ret =
1875       (GNUNET_OK ==
1876        GNUNET_SERVICE_run (argc, argv, "datastore",
1877                            GNUNET_SERVICE_OPTION_NONE,
1878                            &run, NULL)) ? 0 : 1;
1879   return ret;
1880 }
1881
1882
1883 /* end of gnunet-service-datastore.c */