bf89c1bc2ba90c0cdb5a42338060bbfea2e08f3c
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51 /**
52  * Name under which we store current space consumption.
53  */
54 static char *quota_stat_name;
55
56 /**
57  * After how many payload-changing operations
58  * do we sync our statistics?
59  */
60 #define MAX_STAT_SYNC_LAG 50
61
62
63 /**
64  * Our datastore plugin.
65  */
66 struct DatastorePlugin
67 {
68
69   /**
70    * API of the transport as returned by the plugin's
71    * initialization function.
72    */
73   struct GNUNET_DATASTORE_PluginFunctions *api;
74
75   /**
76    * Short name for the plugin (i.e. "sqlite").
77    */
78   char *short_name;
79
80   /**
81    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
82    */
83   char *lib_name;
84
85   /**
86    * Environment this transport service is using
87    * for this plugin.
88    */
89   struct GNUNET_DATASTORE_PluginEnvironment env;
90
91 };
92
93
94 /**
95  * Linked list of active reservations.
96  */
97 struct ReservationList
98 {
99
100   /**
101    * This is a linked list.
102    */
103   struct ReservationList *next;
104
105   /**
106    * Client that made the reservation.
107    */
108   struct GNUNET_SERVER_Client *client;
109
110   /**
111    * Number of bytes (still) reserved.
112    */
113   uint64_t amount;
114
115   /**
116    * Number of items (still) reserved.
117    */
118   uint64_t entries;
119
120   /**
121    * Reservation identifier.
122    */
123   int32_t rid;
124
125 };
126
127
128
129 /**
130  * Our datastore plugin (NULL if not available).
131  */
132 static struct DatastorePlugin *plugin;
133
134 /**
135  * Linked list of space reservations made by clients.
136  */
137 static struct ReservationList *reservations;
138
139 /**
140  * Bloomfilter to quickly tell if we don't have the content.
141  */
142 static struct GNUNET_CONTAINER_BloomFilter *filter;
143
144 /**
145  * How much space are we allowed to use?
146  */
147 static unsigned long long quota;
148
149 /**
150  * Should the database be dropped on exit?
151  */
152 static int do_drop;
153
154 /**
155  * Name of our plugin.
156  */
157 static char *plugin_name;
158
159 /**
160  * How much space are we using for the cache?  (space available for
161  * insertions that will be instantly reclaimed by discarding less
162  * important content --- or possibly whatever we just inserted into
163  * the "cache").
164  */
165 static unsigned long long cache_size;
166
167 /**
168  * How much space have we currently reserved?
169  */
170 static unsigned long long reserved;
171
172 /**
173  * How much data are we currently storing
174  * in the database?
175  */
176 static unsigned long long payload;
177
178 /**
179  * Number of updates that were made to the
180  * payload value since we last synchronized
181  * it with the statistics service.
182  */
183 static unsigned int lastSync;
184
185 /**
186  * Did we get an answer from statistics?
187  */
188 static int stats_worked;
189
190 /**
191  * Identity of the task that is used to delete
192  * expired content.
193  */
194 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
195
196 /**
197  * Our configuration.
198  */
199 const struct GNUNET_CONFIGURATION_Handle *cfg;
200
201 /**
202  * Minimum time that content should have to not be discarded instantly
203  * (time stamp of any content that we've been discarding recently to
204  * stay below the quota).  FOREVER if we had to expire content with
205  * non-zero priority.
206  */
207 static struct GNUNET_TIME_Absolute min_expiration;
208
209 /**
210  * Handle for reporting statistics.
211  */
212 static struct GNUNET_STATISTICS_Handle *stats;
213
214
215 /**
216  * Synchronize our utilization statistics with the
217  * statistics service.
218  */
219 static void
220 sync_stats ()
221 {
222   GNUNET_STATISTICS_set (stats, quota_stat_name, payload, GNUNET_YES);
223   GNUNET_STATISTICS_set (stats, "# utilization by current datastore", payload, GNUNET_NO);
224   lastSync = 0;
225 }
226
227
228
229 /**
230  * Context for transmitting replies to clients.
231  */
232 struct TransmitCallbackContext
233 {
234
235   /**
236    * We keep these in a doubly-linked list (for cleanup).
237    */
238   struct TransmitCallbackContext *next;
239
240   /**
241    * We keep these in a doubly-linked list (for cleanup).
242    */
243   struct TransmitCallbackContext *prev;
244
245   /**
246    * The message that we're asked to transmit.
247    */
248   struct GNUNET_MessageHeader *msg;
249
250   /**
251    * Handle for the transmission request.
252    */
253   struct GNUNET_CONNECTION_TransmitHandle *th;
254
255   /**
256    * Client that we are transmitting to.
257    */
258   struct GNUNET_SERVER_Client *client;
259
260 };
261
262
263 /**
264  * Head of the doubly-linked list (for cleanup).
265  */
266 static struct TransmitCallbackContext *tcc_head;
267
268 /**
269  * Tail of the doubly-linked list (for cleanup).
270  */
271 static struct TransmitCallbackContext *tcc_tail;
272
273 /**
274  * Have we already cleaned up the TCCs and are hence no longer
275  * willing (or able) to transmit anything to anyone?
276  */
277 static int cleaning_done;
278
279 /**
280  * Handle for pending get request.
281  */
282 static struct GNUNET_STATISTICS_GetHandle *stat_get;
283
284
285 /**
286  * Task that is used to remove expired entries from
287  * the datastore.  This task will schedule itself
288  * again automatically to always delete all expired
289  * content quickly.
290  *
291  * @param cls not used
292  * @param tc task context
293  */
294 static void
295 delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
296
297
298 /**
299  * Iterate over the expired items stored in the datastore.
300  * Delete all expired items; once we have processed all
301  * expired items, re-schedule the "delete_expired" task.
302  *
303  * @param cls not used
304  * @param key key for the content
305  * @param size number of bytes in data
306  * @param data content stored
307  * @param type type of the content
308  * @param priority priority of the content
309  * @param anonymity anonymity-level for the content
310  * @param expiration expiration time for the content
311  * @param uid unique identifier for the datum;
312  *        maybe 0 if no unique identifier is available
313  *
314  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
315  *         (continue on call to "next", of course),
316  *         GNUNET_NO to delete the item and continue (if supported)
317  */
318 static int
319 expired_processor (void *cls, const GNUNET_HashCode * key, uint32_t size,
320                    const void *data, enum GNUNET_BLOCK_Type type,
321                    uint32_t priority, uint32_t anonymity,
322                    struct GNUNET_TIME_Absolute expiration, uint64_t uid)
323 {
324   struct GNUNET_TIME_Absolute now;
325
326   if (key == NULL)
327   {
328     expired_kill_task =
329         GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY, &delete_expired, NULL);
330     return GNUNET_SYSERR;
331   }
332   now = GNUNET_TIME_absolute_get ();
333   if (expiration.abs_value > now.abs_value)
334   {
335     /* finished processing */
336     expired_kill_task =
337         GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY, &delete_expired, NULL);
338     return GNUNET_SYSERR;
339   }
340 #if DEBUG_DATASTORE
341   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
342               "Deleting content `%s' of type %u that expired %llu ms ago\n",
343               GNUNET_h2s (key), type,
344               (unsigned long long) (now.abs_value - expiration.abs_value));
345 #endif
346   min_expiration = now;
347   GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes expired"), size,
348                             GNUNET_YES);
349   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
350   expired_kill_task =
351       GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY, &delete_expired, NULL);
352   return GNUNET_NO;
353 }
354
355
356 /**
357  * Task that is used to remove expired entries from
358  * the datastore.  This task will schedule itself
359  * again automatically to always delete all expired
360  * content quickly.
361  *
362  * @param cls not used
363  * @param tc task context
364  */
365 static void
366 delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
367 {
368   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
369   plugin->api->get_expiration (plugin->api->cls, &expired_processor, NULL);
370 }
371
372
373 /**
374  * An iterator over a set of items stored in the datastore
375  * that deletes until we're happy with respect to our quota.
376  *
377  * @param cls closure
378  * @param key key for the content
379  * @param size number of bytes in data
380  * @param data content stored
381  * @param type type of the content
382  * @param priority priority of the content
383  * @param anonymity anonymity-level for the content
384  * @param expiration expiration time for the content
385  * @param uid unique identifier for the datum;
386  *        maybe 0 if no unique identifier is available
387  *
388  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
389  *         (continue on call to "next", of course),
390  *         GNUNET_NO to delete the item and continue (if supported)
391  */
392 static int
393 quota_processor (void *cls, const GNUNET_HashCode * key, uint32_t size,
394                  const void *data, enum GNUNET_BLOCK_Type type,
395                  uint32_t priority, uint32_t anonymity,
396                  struct GNUNET_TIME_Absolute expiration, uint64_t uid)
397 {
398   unsigned long long *need = cls;
399
400   if (NULL == key)
401     return GNUNET_SYSERR;
402 #if DEBUG_DATASTORE
403   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404               "Deleting %llu bytes of low-priority (%u) content `%s' of type %u at %llu ms prior to expiration (still trying to free another %llu bytes)\n",
405               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
406               (unsigned int) priority,
407               GNUNET_h2s (key), type, 
408               (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
409               *need);
410 #endif
411   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
412     *need = 0;
413   else
414     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
415   if (priority > 0)
416     min_expiration = GNUNET_TIME_UNIT_FOREVER_ABS;
417   else
418     min_expiration = expiration;
419   GNUNET_STATISTICS_update (stats,
420                             gettext_noop ("# bytes purged (low-priority)"),
421                             size, GNUNET_YES);
422   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
423   return GNUNET_NO;
424 }
425
426
427 /**
428  * Manage available disk space by running tasks
429  * that will discard content if necessary.  This
430  * function will be run whenever a request for
431  * "need" bytes of storage could only be satisfied
432  * by eating into the "cache" (and we want our cache
433  * space back).
434  *
435  * @param need number of bytes of content that were
436  *        placed into the "cache" (and hence the
437  *        number of bytes that should be removed).
438  */
439 static void
440 manage_space (unsigned long long need)
441 {
442   unsigned long long last;
443
444 #if DEBUG_DATASTORE
445   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
446               "Asked to free up %llu bytes of cache space\n", need);
447 #endif
448   last = 0;
449   while ((need > 0) && (last != need))
450   {
451     last = need;
452     plugin->api->get_expiration (plugin->api->cls, &quota_processor, &need);
453   }
454 }
455
456
457 /**
458  * Function called to notify a client about the socket
459  * begin ready to queue more data.  "buf" will be
460  * NULL and "size" zero if the socket was closed for
461  * writing in the meantime.
462  *
463  * @param cls closure
464  * @param size number of bytes available in buf
465  * @param buf where the callee should write the message
466  * @return number of bytes written to buf
467  */
468 static size_t
469 transmit_callback (void *cls, size_t size, void *buf)
470 {
471   struct TransmitCallbackContext *tcc = cls;
472   size_t msize;
473
474   tcc->th = NULL;
475   GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
476   msize = ntohs (tcc->msg->size);
477   if (size == 0)
478   {
479     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
480                 _("Transmission to client failed!\n"));
481     GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
482     GNUNET_SERVER_client_drop (tcc->client);
483     GNUNET_free (tcc->msg);
484     GNUNET_free (tcc);
485     return 0;
486   }
487   GNUNET_assert (size >= msize);
488   memcpy (buf, tcc->msg, msize);
489   GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
490   GNUNET_SERVER_client_drop (tcc->client);
491   GNUNET_free (tcc->msg);
492   GNUNET_free (tcc);
493   return msize;
494 }
495
496
497 /**
498  * Transmit the given message to the client.
499  *
500  * @param client target of the message
501  * @param msg message to transmit, will be freed!
502  */
503 static void
504 transmit (struct GNUNET_SERVER_Client *client, struct GNUNET_MessageHeader *msg)
505 {
506   struct TransmitCallbackContext *tcc;
507
508   if (GNUNET_YES == cleaning_done)
509   {
510 #if DEBUG_DATASTORE
511     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
512                 "Shutdown in progress, aborting transmission.\n");
513 #endif
514     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
515     GNUNET_free (msg);
516     return;
517   }
518   tcc = GNUNET_malloc (sizeof (struct TransmitCallbackContext));
519   tcc->msg = msg;
520   tcc->client = client;
521   if (NULL ==
522       (tcc->th =
523        GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
524                                             GNUNET_TIME_UNIT_FOREVER_REL,
525                                             &transmit_callback, tcc)))
526   {
527     GNUNET_break (0);
528     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
529     GNUNET_free (msg);
530     GNUNET_free (tcc);
531     return;
532   }
533   GNUNET_SERVER_client_keep (client);
534   GNUNET_CONTAINER_DLL_insert (tcc_head, tcc_tail, tcc);
535 }
536
537
538 /**
539  * Transmit a status code to the client.
540  *
541  * @param client receiver of the response
542  * @param code status code
543  * @param msg optional error message (can be NULL)
544  */
545 static void
546 transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg)
547 {
548   struct StatusMessage *sm;
549   size_t slen;
550
551 #if DEBUG_DATASTORE
552   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553               "Transmitting `%s' message with value %d and message `%s'\n",
554               "STATUS", code, msg != NULL ? msg : "(none)");
555 #endif
556   slen = (msg == NULL) ? 0 : strlen (msg) + 1;
557   sm = GNUNET_malloc (sizeof (struct StatusMessage) + slen);
558   sm->header.size = htons (sizeof (struct StatusMessage) + slen);
559   sm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
560   sm->status = htonl (code);
561   sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration);
562   if (slen > 0)
563     memcpy (&sm[1], msg, slen);
564   transmit (client, &sm->header);
565 }
566
567
568
569 /**
570  * Function that will transmit the given datastore entry
571  * to the client.
572  *
573  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
574  * @param key key for the content
575  * @param size number of bytes in data
576  * @param data content stored
577  * @param type type of the content
578  * @param priority priority of the content
579  * @param anonymity anonymity-level for the content
580  * @param expiration expiration time for the content
581  * @param uid unique identifier for the datum;
582  *        maybe 0 if no unique identifier is available
583  *
584  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
585  *         GNUNET_NO to delete the item and continue (if supported)
586  */
587 static int
588 transmit_item (void *cls, const GNUNET_HashCode * key, uint32_t size,
589                const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
590                uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
591                uint64_t uid)
592 {
593   struct GNUNET_SERVER_Client *client = cls;
594   struct GNUNET_MessageHeader *end;
595   struct DataMessage *dm;
596
597   if (key == NULL)
598   {
599     /* transmit 'DATA_END' */
600 #if DEBUG_DATASTORE
601     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' message\n",
602                 "DATA_END");
603 #endif
604     end = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
605     end->size = htons (sizeof (struct GNUNET_MessageHeader));
606     end->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
607     transmit (client, end);
608     GNUNET_SERVER_client_drop (client);
609     return GNUNET_OK;
610   }
611   GNUNET_assert (sizeof (struct DataMessage) + size <
612                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
613   dm = GNUNET_malloc (sizeof (struct DataMessage) + size);
614   dm->header.size = htons (sizeof (struct DataMessage) + size);
615   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
616   dm->rid = htonl (0);
617   dm->size = htonl (size);
618   dm->type = htonl (type);
619   dm->priority = htonl (priority);
620   dm->anonymity = htonl (anonymity);
621   dm->replication = htonl (0);
622   dm->reserved = htonl (0);
623   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
624   dm->uid = GNUNET_htonll (uid);
625   dm->key = *key;
626   memcpy (&dm[1], data, size);
627 #if DEBUG_DATASTORE
628   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629               "Transmitting `%s' message for `%s' of type %u with expiration %llu (now: %llu)\n",
630               "DATA", GNUNET_h2s (key), type,
631               (unsigned long long) expiration.abs_value,
632               (unsigned long long) GNUNET_TIME_absolute_get ().abs_value);
633 #endif
634   GNUNET_STATISTICS_update (stats, gettext_noop ("# results found"), 1,
635                             GNUNET_NO);
636   transmit (client, &dm->header);
637   GNUNET_SERVER_client_drop (client);
638   return GNUNET_OK;
639 }
640
641
642 /**
643  * Handle RESERVE-message.
644  *
645  * @param cls closure
646  * @param client identification of the client
647  * @param message the actual message
648  */
649 static void
650 handle_reserve (void *cls, struct GNUNET_SERVER_Client *client,
651                 const struct GNUNET_MessageHeader *message)
652 {
653   /**
654    * Static counter to produce reservation identifiers.
655    */
656   static int reservation_gen;
657
658   const struct ReserveMessage *msg = (const struct ReserveMessage *) message;
659   struct ReservationList *e;
660   unsigned long long used;
661   unsigned long long req;
662   uint64_t amount;
663   uint32_t entries;
664
665 #if DEBUG_DATASTORE
666   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "RESERVE");
667 #endif
668   amount = GNUNET_ntohll (msg->amount);
669   entries = ntohl (msg->entries);
670   used = payload + reserved;
671   req =
672       amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
673   if (used + req > quota)
674   {
675     if (quota < used)
676       used = quota;             /* cheat a bit for error message (to avoid negative numbers) */
677     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
678                 _
679                 ("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
680                 quota - used, "RESERVE", req);
681     if (cache_size < req)
682     {
683       /* TODO: document this in the FAQ; essentially, if this
684        * message happens, the insertion request could be blocked
685        * by less-important content from migration because it is
686        * larger than 1/8th of the overall available space, and
687        * we only reserve 1/8th for "fresh" insertions */
688       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
689                   _
690                   ("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
691                   req, cache_size);
692       transmit_status (client, 0,
693                        gettext_noop
694                        ("Insufficient space to satisfy request and "
695                         "requested amount is larger than cache size"));
696     }
697     else
698     {
699       transmit_status (client, 0,
700                        gettext_noop ("Insufficient space to satisfy request"));
701     }
702     return;
703   }
704   reserved += req;
705   GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
706                          GNUNET_NO);
707   e = GNUNET_malloc (sizeof (struct ReservationList));
708   e->next = reservations;
709   reservations = e;
710   e->client = client;
711   e->amount = amount;
712   e->entries = entries;
713   e->rid = ++reservation_gen;
714   if (reservation_gen < 0)
715     reservation_gen = 0;        /* wrap around */
716   transmit_status (client, e->rid, NULL);
717 }
718
719
720 /**
721  * Handle RELEASE_RESERVE-message.
722  *
723  * @param cls closure
724  * @param client identification of the client
725  * @param message the actual message
726  */
727 static void
728 handle_release_reserve (void *cls, struct GNUNET_SERVER_Client *client,
729                         const struct GNUNET_MessageHeader *message)
730 {
731   const struct ReleaseReserveMessage *msg =
732       (const struct ReleaseReserveMessage *) message;
733   struct ReservationList *pos;
734   struct ReservationList *prev;
735   struct ReservationList *next;
736   int rid = ntohl (msg->rid);
737   unsigned long long rem;
738
739 #if DEBUG_DATASTORE
740   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
741               "RELEASE_RESERVE");
742 #endif
743   next = reservations;
744   prev = NULL;
745   while (NULL != (pos = next))
746   {
747     next = pos->next;
748     if (rid == pos->rid)
749     {
750       if (prev == NULL)
751         reservations = next;
752       else
753         prev->next = next;
754       rem =
755           pos->amount +
756           ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
757       GNUNET_assert (reserved >= rem);
758       reserved -= rem;
759       GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
760                              GNUNET_NO);
761 #if DEBUG_DATASTORE
762       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
763                   "Returning %llu remaining reserved bytes to storage pool\n",
764                   rem);
765 #endif
766       GNUNET_free (pos);
767       transmit_status (client, GNUNET_OK, NULL);
768       return;
769     }
770     prev = pos;
771   }
772   GNUNET_break (0);
773   transmit_status (client, GNUNET_SYSERR,
774                    gettext_noop ("Could not find matching reservation"));
775 }
776
777
778 /**
779  * Check that the given message is a valid data message.
780  *
781  * @return NULL if the message is not well-formed, otherwise the message
782  */
783 static const struct DataMessage *
784 check_data (const struct GNUNET_MessageHeader *message)
785 {
786   uint16_t size;
787   uint32_t dsize;
788   const struct DataMessage *dm;
789
790   size = ntohs (message->size);
791   if (size < sizeof (struct DataMessage))
792   {
793     GNUNET_break (0);
794     return NULL;
795   }
796   dm = (const struct DataMessage *) message;
797   dsize = ntohl (dm->size);
798   if (size != dsize + sizeof (struct DataMessage))
799   {
800     GNUNET_break (0);
801     return NULL;
802   }
803   return dm;
804 }
805
806
807 /**
808  * Context for a PUT request used to see if the content is
809  * already present.
810  */
811 struct PutContext
812 {
813   /**
814    * Client to notify on completion.
815    */
816   struct GNUNET_SERVER_Client *client;
817
818 #if ! HAVE_UNALIGNED_64_ACCESS
819   void *reserved;
820 #endif
821
822   /* followed by the 'struct DataMessage' */
823 };
824
825
826 /**
827  * Actually put the data message.
828  *
829  * @param client sender of the message
830  * @param dm message with the data to store
831  */
832 static void
833 execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm)
834 {
835   uint32_t size;
836   char *msg;
837   int ret;
838
839   size = ntohl (dm->size);
840   msg = NULL;
841   ret =
842       plugin->api->put (plugin->api->cls, &dm->key, size, &dm[1],
843                         ntohl (dm->type), ntohl (dm->priority),
844                         ntohl (dm->anonymity), ntohl (dm->replication),
845                         GNUNET_TIME_absolute_ntoh (dm->expiration), &msg);
846   if (GNUNET_OK == ret)
847   {
848     GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes stored"), size,
849                               GNUNET_YES);
850     GNUNET_CONTAINER_bloomfilter_add (filter, &dm->key);
851 #if DEBUG_DATASTORE
852     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
853                 "Successfully stored %u bytes of type %u under key `%s'\n",
854                 size, ntohl (dm->type), GNUNET_h2s (&dm->key));
855 #endif
856   }
857   transmit_status (client, ret, msg);
858   GNUNET_free_non_null (msg);
859   if (quota - reserved - cache_size < payload)
860   {
861     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
862                 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
863                 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
864                 (unsigned long long) (quota - reserved - cache_size),
865                 (unsigned long long) payload);
866     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
867   }
868 }
869
870
871 /**
872  * Function that will check if the given datastore entry
873  * matches the put and if none match executes the put.
874  *
875  * @param cls closure, pointer to the client (of type 'struct PutContext').
876  * @param key key for the content
877  * @param size number of bytes in data
878  * @param data content stored
879  * @param type type of the content
880  * @param priority priority of the content
881  * @param anonymity anonymity-level for the content
882  * @param expiration expiration time for the content
883  * @param uid unique identifier for the datum;
884  *        maybe 0 if no unique identifier is available
885  *
886  * @return GNUNET_OK usually
887  *         GNUNET_NO to delete the item
888  */
889 static int
890 check_present (void *cls, const GNUNET_HashCode * key, uint32_t size,
891                const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
892                uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
893                uint64_t uid)
894 {
895   struct PutContext *pc = cls;
896   const struct DataMessage *dm;
897
898   dm = (const struct DataMessage *) &pc[1];
899   if (key == NULL)
900   {
901     execute_put (pc->client, dm);
902     GNUNET_SERVER_client_drop (pc->client);
903     GNUNET_free (pc);
904     return GNUNET_OK;
905   }
906   if ((GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
907       (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) || ((size == ntohl (dm->size)) &&
908                                                 (0 ==
909                                                  memcmp (&dm[1], data, size))))
910   {
911 #if DEBUG_MYSQL
912     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
913                 "Result already present in datastore\n");
914 #endif
915     /* FIXME: change API to allow increasing 'replication' counter */
916     if ((ntohl (dm->priority) > 0) ||
917         (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value >
918          expiration.abs_value))
919       plugin->api->update (plugin->api->cls, uid,
920                            (int32_t) ntohl (dm->priority),
921                            GNUNET_TIME_absolute_ntoh (dm->expiration), NULL);
922     transmit_status (pc->client, GNUNET_NO, NULL);
923     GNUNET_SERVER_client_drop (pc->client);
924     GNUNET_free (pc);
925   }
926   else
927   {
928     execute_put (pc->client, dm);
929     GNUNET_SERVER_client_drop (pc->client);
930     GNUNET_free (pc);
931   }
932   return GNUNET_OK;
933 }
934
935
936 /**
937  * Handle PUT-message.
938  *
939  * @param cls closure
940  * @param client identification of the client
941  * @param message the actual message
942  */
943 static void
944 handle_put (void *cls, struct GNUNET_SERVER_Client *client,
945             const struct GNUNET_MessageHeader *message)
946 {
947   const struct DataMessage *dm = check_data (message);
948   int rid;
949   struct ReservationList *pos;
950   struct PutContext *pc;
951   GNUNET_HashCode vhash;
952   uint32_t size;
953
954   if ((dm == NULL) || (ntohl (dm->type) == 0))
955   {
956     GNUNET_break (0);
957     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
958     return;
959   }
960 #if DEBUG_DATASTORE
961   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
962               "Processing `%s' request for `%s' of type %u\n", "PUT",
963               GNUNET_h2s (&dm->key), ntohl (dm->type));
964 #endif
965   rid = ntohl (dm->rid);
966   size = ntohl (dm->size);
967   if (rid > 0)
968   {
969     pos = reservations;
970     while ((NULL != pos) && (rid != pos->rid))
971       pos = pos->next;
972     GNUNET_break (pos != NULL);
973     if (NULL != pos)
974     {
975       GNUNET_break (pos->entries > 0);
976       GNUNET_break (pos->amount >= size);
977       pos->entries--;
978       pos->amount -= size;
979       reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
980       GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
981                              GNUNET_NO);
982     }
983   }
984   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter, &dm->key))
985   {
986     GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
987     pc = GNUNET_malloc (sizeof (struct PutContext) + size +
988                         sizeof (struct DataMessage));
989     pc->client = client;
990     GNUNET_SERVER_client_keep (client);
991     memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
992     plugin->api->get_key (plugin->api->cls, 0, &dm->key, &vhash,
993                           ntohl (dm->type), &check_present, pc);
994     return;
995   }
996   execute_put (client, dm);
997 }
998
999
1000 /**
1001  * Handle GET-message.
1002  *
1003  * @param cls closure
1004  * @param client identification of the client
1005  * @param message the actual message
1006  */
1007 static void
1008 handle_get (void *cls, struct GNUNET_SERVER_Client *client,
1009             const struct GNUNET_MessageHeader *message)
1010 {
1011   const struct GetMessage *msg;
1012   uint16_t size;
1013
1014   size = ntohs (message->size);
1015   if ((size != sizeof (struct GetMessage)) &&
1016       (size != sizeof (struct GetMessage) - sizeof (GNUNET_HashCode)))
1017   {
1018     GNUNET_break (0);
1019     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1020     return;
1021   }
1022   msg = (const struct GetMessage *) message;
1023 #if DEBUG_DATASTORE
1024   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1025               "Processing `%s' request for `%s' of type %u\n", "GET",
1026               GNUNET_h2s (&msg->key), ntohl (msg->type));
1027 #endif
1028   GNUNET_STATISTICS_update (stats, gettext_noop ("# GET requests received"), 1,
1029                             GNUNET_NO);
1030   GNUNET_SERVER_client_keep (client);
1031   if ((size == sizeof (struct GetMessage)) &&
1032       (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)))
1033   {
1034     /* don't bother database... */
1035 #if DEBUG_DATASTORE
1036     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1037                 "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1038                 "GET", GNUNET_h2s (&msg->key));
1039 #endif
1040     GNUNET_STATISTICS_update (stats,
1041                               gettext_noop
1042                               ("# requests filtered by bloomfilter"), 1,
1043                               GNUNET_NO);
1044     transmit_item (client, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1045                    0);
1046     return;
1047   }
1048   plugin->api->get_key (plugin->api->cls, GNUNET_ntohll (msg->offset),
1049                         ((size ==
1050                           sizeof (struct GetMessage)) ? &msg->key : NULL), NULL,
1051                         ntohl (msg->type), &transmit_item, client);
1052 }
1053
1054
1055 /**
1056  * Handle UPDATE-message.
1057  *
1058  * @param cls closure
1059  * @param client identification of the client
1060  * @param message the actual message
1061  */
1062 static void
1063 handle_update (void *cls, struct GNUNET_SERVER_Client *client,
1064                const struct GNUNET_MessageHeader *message)
1065 {
1066   const struct UpdateMessage *msg;
1067   int ret;
1068   char *emsg;
1069
1070   GNUNET_STATISTICS_update (stats, gettext_noop ("# UPDATE requests received"),
1071                             1, GNUNET_NO);
1072   msg = (const struct UpdateMessage *) message;
1073   emsg = NULL;
1074 #if DEBUG_DATASTORE
1075   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request for %llu\n",
1076               "UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid));
1077 #endif
1078   ret =
1079       plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid),
1080                            (int32_t) ntohl (msg->priority),
1081                            GNUNET_TIME_absolute_ntoh (msg->expiration), &emsg);
1082   transmit_status (client, ret, emsg);
1083   GNUNET_free_non_null (emsg);
1084 }
1085
1086
1087 /**
1088  * Handle GET_REPLICATION-message.
1089  *
1090  * @param cls closure
1091  * @param client identification of the client
1092  * @param message the actual message
1093  */
1094 static void
1095 handle_get_replication (void *cls, struct GNUNET_SERVER_Client *client,
1096                         const struct GNUNET_MessageHeader *message)
1097 {
1098 #if DEBUG_DATASTORE
1099   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
1100               "GET_REPLICATION");
1101 #endif
1102   GNUNET_STATISTICS_update (stats,
1103                             gettext_noop
1104                             ("# GET REPLICATION requests received"), 1,
1105                             GNUNET_NO);
1106   GNUNET_SERVER_client_keep (client);
1107   plugin->api->get_replication (plugin->api->cls, &transmit_item, client);
1108 }
1109
1110
1111 /**
1112  * Handle GET_ZERO_ANONYMITY-message.
1113  *
1114  * @param cls closure
1115  * @param client identification of the client
1116  * @param message the actual message
1117  */
1118 static void
1119 handle_get_zero_anonymity (void *cls, struct GNUNET_SERVER_Client *client,
1120                            const struct GNUNET_MessageHeader *message)
1121 {
1122   const struct GetZeroAnonymityMessage *msg =
1123       (const struct GetZeroAnonymityMessage *) message;
1124   enum GNUNET_BLOCK_Type type;
1125
1126   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1127   if (type == GNUNET_BLOCK_TYPE_ANY)
1128   {
1129     GNUNET_break (0);
1130     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1131     return;
1132   }
1133 #if DEBUG_DATASTORE
1134   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
1135               "GET_ZERO_ANONYMITY");
1136 #endif
1137   GNUNET_STATISTICS_update (stats,
1138                             gettext_noop
1139                             ("# GET ZERO ANONYMITY requests received"), 1,
1140                             GNUNET_NO);
1141   GNUNET_SERVER_client_keep (client);
1142   plugin->api->get_zero_anonymity (plugin->api->cls,
1143                                    GNUNET_ntohll (msg->offset), type,
1144                                    &transmit_item, client);
1145 }
1146
1147
1148 /**
1149  * Callback function that will cause the item that is passed
1150  * in to be deleted (by returning GNUNET_NO).
1151  */
1152 static int
1153 remove_callback (void *cls, const GNUNET_HashCode * key, uint32_t size,
1154                  const void *data, enum GNUNET_BLOCK_Type type,
1155                  uint32_t priority, uint32_t anonymity,
1156                  struct GNUNET_TIME_Absolute expiration, uint64_t uid)
1157 {
1158   struct GNUNET_SERVER_Client *client = cls;
1159
1160   if (key == NULL)
1161   {
1162 #if DEBUG_DATASTORE
1163     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1164                 "No further matches for `%s' request.\n", "REMOVE");
1165 #endif
1166     transmit_status (client, GNUNET_NO, _("Content not found"));
1167     GNUNET_SERVER_client_drop (client);
1168     return GNUNET_OK;           /* last item */
1169   }
1170 #if DEBUG_DATASTORE
1171   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1172               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1173               (unsigned long long) uid, "REMOVE", GNUNET_h2s (key), type);
1174 #endif
1175   GNUNET_STATISTICS_update (stats,
1176                             gettext_noop ("# bytes removed (explicit request)"),
1177                             size, GNUNET_YES);
1178   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
1179   transmit_status (client, GNUNET_OK, NULL);
1180   GNUNET_SERVER_client_drop (client);
1181   return GNUNET_NO;
1182 }
1183
1184
1185 /**
1186  * Handle REMOVE-message.
1187  *
1188  * @param cls closure
1189  * @param client identification of the client
1190  * @param message the actual message
1191  */
1192 static void
1193 handle_remove (void *cls, struct GNUNET_SERVER_Client *client,
1194                const struct GNUNET_MessageHeader *message)
1195 {
1196   const struct DataMessage *dm = check_data (message);
1197   GNUNET_HashCode vhash;
1198
1199   if (dm == NULL)
1200   {
1201     GNUNET_break (0);
1202     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1203     return;
1204   }
1205 #if DEBUG_DATASTORE
1206   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1207               "Processing `%s' request for `%s' of type %u\n", "REMOVE",
1208               GNUNET_h2s (&dm->key), ntohl (dm->type));
1209 #endif
1210   GNUNET_STATISTICS_update (stats, gettext_noop ("# REMOVE requests received"),
1211                             1, GNUNET_NO);
1212   GNUNET_SERVER_client_keep (client);
1213   GNUNET_CRYPTO_hash (&dm[1], ntohl (dm->size), &vhash);
1214   plugin->api->get_key (plugin->api->cls, 0, &dm->key, &vhash,
1215                         (enum GNUNET_BLOCK_Type) ntohl (dm->type),
1216                         &remove_callback, client);
1217 }
1218
1219
1220 /**
1221  * Handle DROP-message.
1222  *
1223  * @param cls closure
1224  * @param client identification of the client
1225  * @param message the actual message
1226  */
1227 static void
1228 handle_drop (void *cls, struct GNUNET_SERVER_Client *client,
1229              const struct GNUNET_MessageHeader *message)
1230 {
1231 #if DEBUG_DATASTORE
1232   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "DROP");
1233 #endif
1234   do_drop = GNUNET_YES;
1235   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1236 }
1237
1238
1239 /**
1240  * Function called by plugins to notify us about a
1241  * change in their disk utilization.
1242  *
1243  * @param cls closure (NULL)
1244  * @param delta change in disk utilization,
1245  *        0 for "reset to empty"
1246  */
1247 static void
1248 disk_utilization_change_cb (void *cls, int delta)
1249 {
1250   if ((delta < 0) && (payload < -delta))
1251   {
1252     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1253                 _
1254                 ("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1255                 (long long) payload, (long long) -delta);
1256     payload = plugin->api->estimate_size (plugin->api->cls);
1257     sync_stats ();
1258     return;
1259   }
1260   payload += delta;
1261   lastSync++;
1262   if (lastSync >= MAX_STAT_SYNC_LAG)
1263     sync_stats ();
1264 }
1265
1266
1267 /**
1268  * Callback function to process statistic values.
1269  *
1270  * @param cls closure (struct Plugin*)
1271  * @param subsystem name of subsystem that created the statistic
1272  * @param name the name of the datum
1273  * @param value the current value
1274  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1275  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1276  */
1277 static int
1278 process_stat_in (void *cls, const char *subsystem, const char *name,
1279                  uint64_t value, int is_persistent)
1280 {
1281   GNUNET_assert (stats_worked == GNUNET_NO);
1282   stats_worked = GNUNET_YES;
1283   payload += value;
1284 #if DEBUG_SQLITE
1285   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1286               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1287               abs_value, payload);
1288 #endif
1289   return GNUNET_OK;
1290 }
1291
1292
1293 static void
1294 process_stat_done (void *cls, int success)
1295 {
1296   struct DatastorePlugin *plugin = cls;
1297
1298   stat_get = NULL;
1299   if (stats_worked == GNUNET_NO)
1300     payload = plugin->api->estimate_size (plugin->api->cls);
1301 }
1302
1303
1304 /**
1305  * Load the datastore plugin.
1306  */
1307 static struct DatastorePlugin *
1308 load_plugin ()
1309 {
1310   struct DatastorePlugin *ret;
1311   char *libname;
1312
1313   ret = GNUNET_malloc (sizeof (struct DatastorePlugin));
1314   ret->env.cfg = cfg;
1315   ret->env.duc = &disk_utilization_change_cb;
1316   ret->env.cls = NULL;
1317   GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Loading `%s' datastore plugin\n"),
1318               plugin_name);
1319   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", plugin_name);
1320   ret->short_name = GNUNET_strdup (plugin_name);
1321   ret->lib_name = libname;
1322   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1323   if (ret->api == NULL)
1324   {
1325     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1326                 _("Failed to load datastore plugin for `%s'\n"), plugin_name);
1327     GNUNET_free (ret->short_name);
1328     GNUNET_free (libname);
1329     GNUNET_free (ret);
1330     return NULL;
1331   }
1332   return ret;
1333 }
1334
1335
1336 /**
1337  * Function called when the service shuts
1338  * down.  Unloads our datastore plugin.
1339  *
1340  * @param plug plugin to unload
1341  */
1342 static void
1343 unload_plugin (struct DatastorePlugin *plug)
1344 {
1345 #if DEBUG_DATASTORE
1346   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1347               "Datastore service is unloading plugin...\n");
1348 #endif
1349   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1350   GNUNET_free (plug->lib_name);
1351   GNUNET_free (plug->short_name);
1352   GNUNET_free (plug);
1353   GNUNET_free (quota_stat_name);
1354   quota_stat_name = NULL;
1355 }
1356
1357
1358 /**
1359  * Final task run after shutdown.  Unloads plugins and disconnects us from
1360  * statistics.
1361  */
1362 static void
1363 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1364 {
1365   if (lastSync > 0)
1366     sync_stats ();
1367   if (GNUNET_YES == do_drop)
1368     plugin->api->drop (plugin->api->cls);
1369   unload_plugin (plugin);
1370   plugin = NULL;
1371   if (filter != NULL)
1372   {
1373     GNUNET_CONTAINER_bloomfilter_free (filter);
1374     filter = NULL;
1375   }
1376   if (stat_get != NULL)
1377   {
1378     GNUNET_STATISTICS_get_cancel (stat_get);
1379     stat_get = NULL;
1380   }
1381   if (stats != NULL)
1382   {
1383     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1384     stats = NULL;
1385   }
1386   GNUNET_free_non_null (plugin_name);
1387   plugin_name = NULL;
1388 }
1389
1390
1391 /**
1392  * Last task run during shutdown.  Disconnects us from
1393  * the transport and core.
1394  */
1395 static void
1396 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1397 {
1398   struct TransmitCallbackContext *tcc;
1399
1400   cleaning_done = GNUNET_YES;
1401   while (NULL != (tcc = tcc_head))
1402   {
1403     GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
1404     if (tcc->th != NULL)
1405     {
1406       GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1407       GNUNET_SERVER_client_drop (tcc->client);
1408     }
1409     GNUNET_free (tcc->msg);
1410     GNUNET_free (tcc);
1411   }
1412   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1413   {
1414     GNUNET_SCHEDULER_cancel (expired_kill_task);
1415     expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1416   }
1417   GNUNET_SCHEDULER_add_continuation (&unload_task, NULL,
1418                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1419 }
1420
1421
1422 /**
1423  * Function that removes all active reservations made
1424  * by the given client and releases the space for other
1425  * requests.
1426  *
1427  * @param cls closure
1428  * @param client identification of the client
1429  */
1430 static void
1431 cleanup_reservations (void *cls, struct GNUNET_SERVER_Client *client)
1432 {
1433   struct ReservationList *pos;
1434   struct ReservationList *prev;
1435   struct ReservationList *next;
1436
1437   if (client == NULL)
1438     return;
1439   prev = NULL;
1440   pos = reservations;
1441   while (NULL != pos)
1442   {
1443     next = pos->next;
1444     if (pos->client == client)
1445     {
1446       if (prev == NULL)
1447         reservations = next;
1448       else
1449         prev->next = next;
1450       reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1451       GNUNET_free (pos);
1452     }
1453     else
1454     {
1455       prev = pos;
1456     }
1457     pos = next;
1458   }
1459   GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
1460                          GNUNET_NO);
1461 }
1462
1463
1464 /**
1465  * Adds a given key to the bloomfilter 'count' times.
1466  *
1467  * @param cls the bloomfilter
1468  * @param key key to add
1469  * @param count number of times to add key
1470  */
1471 static void
1472 add_key_to_bloomfilter (void *cls,
1473                         const GNUNET_HashCode *key,
1474                         unsigned int count)
1475 {
1476   struct GNUNET_CONTAINER_BloomFilter *bf = cls;
1477   while (0 < count--)
1478     GNUNET_CONTAINER_bloomfilter_add (bf, key);
1479 }
1480
1481
1482 /**
1483  * Process datastore requests.
1484  *
1485  * @param cls closure
1486  * @param server the initialized server
1487  * @param c configuration to use
1488  */
1489 static void
1490 run (void *cls, struct GNUNET_SERVER_Handle *server,
1491      const struct GNUNET_CONFIGURATION_Handle *c)
1492 {
1493   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1494     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1495      sizeof (struct ReserveMessage)},
1496     {&handle_release_reserve, NULL,
1497      GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1498      sizeof (struct ReleaseReserveMessage)},
1499     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
1500     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1501      sizeof (struct UpdateMessage)},
1502     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
1503     {&handle_get_replication, NULL,
1504      GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1505      sizeof (struct GNUNET_MessageHeader)},
1506     {&handle_get_zero_anonymity, NULL,
1507      GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1508      sizeof (struct GetZeroAnonymityMessage)},
1509     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
1510     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1511      sizeof (struct GNUNET_MessageHeader)},
1512     {NULL, NULL, 0, 0}
1513   };
1514   char *fn;  
1515   char *pfn;
1516   unsigned int bf_size;
1517   int refresh_bf;
1518
1519   cfg = c;
1520   if (GNUNET_OK !=
1521       GNUNET_CONFIGURATION_get_value_string (cfg, "DATASTORE", "DATABASE",
1522                                              &plugin_name))
1523   {
1524     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1525                 _("No `%s' specified for `%s' in configuration!\n"), "DATABASE",
1526                 "DATASTORE");
1527     return;
1528   }
1529   GNUNET_asprintf (&quota_stat_name,
1530                    _("# bytes used in file-sharing datastore `%s'"),
1531                    plugin_name);
1532   if (GNUNET_OK !=
1533       GNUNET_CONFIGURATION_get_value_size (cfg, "DATASTORE", "QUOTA", &quota))
1534   {
1535     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1536                 _("No `%s' specified for `%s' in configuration!\n"), "QUOTA",
1537                 "DATASTORE");
1538     return;
1539   }
1540   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1541   GNUNET_STATISTICS_set (stats, gettext_noop ("# quota"), quota, GNUNET_NO);
1542   cache_size = quota / 8;       /* Or should we make this an option? */
1543   GNUNET_STATISTICS_set (stats, gettext_noop ("# cache size"), cache_size,
1544                          GNUNET_NO);
1545   if (quota / (32 * 1024LL) > (1 << 31)) 
1546     bf_size = (1 << 31);          /* absolute limit: ~2 GB, beyond that BF just won't help anyway */
1547   else
1548     bf_size = quota / (32 * 1024LL);         /* 8 bit per entry, 1 bit per 32 kb in DB */
1549   fn = NULL;
1550   if ((GNUNET_OK !=
1551        GNUNET_CONFIGURATION_get_value_filename (cfg, "DATASTORE", "BLOOMFILTER",
1552                                                 &fn)) ||
1553       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
1554   {
1555     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1556                 _("Could not use specified filename `%s' for bloomfilter.\n"),
1557                 fn != NULL ? fn : "");
1558     GNUNET_free_non_null (fn);
1559     fn = NULL;
1560   }
1561   if (fn != NULL)
1562   {
1563     GNUNET_asprintf (&pfn, "%s.%s", fn, plugin_name);
1564     if (GNUNET_YES == GNUNET_DISK_file_test (pfn))
1565     {
1566       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1567       if (NULL == filter)
1568       {
1569         /* file exists but not valid, remove and try again, but refresh */
1570         if (0 != UNLINK (pfn))
1571         {
1572           /* failed to remove, run without file */
1573           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1574                       _("Failed to remove bogus bloomfilter file `%s'\n"),
1575                       pfn);
1576           GNUNET_free (pfn);
1577           pfn = NULL;
1578           filter = GNUNET_CONTAINER_bloomfilter_load (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1579           refresh_bf = GNUNET_YES;
1580         }
1581         else
1582         {
1583           /* try again after remove */
1584           filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1585           refresh_bf = GNUNET_YES;
1586           if (NULL == filter)
1587           {
1588             /* failed yet again, give up on using file */
1589             GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1590                         _("Failed to remove bogus bloomfilter file `%s'\n"),
1591                         pfn);
1592             GNUNET_free (pfn);
1593             pfn = NULL;
1594             filter = GNUNET_CONTAINER_bloomfilter_load (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1595           }
1596         }
1597       }
1598       else
1599       {
1600         /* normal case: have an existing valid bf file, no need to refresh */
1601         refresh_bf = GNUNET_NO;
1602       }
1603     }
1604     else
1605     {
1606       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1607       refresh_bf = GNUNET_YES;
1608     }
1609     GNUNET_free (pfn);
1610   }
1611   else
1612   {
1613     filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);      /* approx. 3% false positives at max use */
1614     refresh_bf = GNUNET_YES;
1615   }
1616   GNUNET_free_non_null (fn);
1617   if (filter == NULL)
1618   {
1619     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1620                 _("Failed to initialize bloomfilter.\n"));
1621     if (stats != NULL)
1622     {
1623       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1624       stats = NULL;
1625     }
1626     return;
1627   }
1628   plugin = load_plugin ();
1629   if (NULL == plugin)
1630   {
1631     GNUNET_CONTAINER_bloomfilter_free (filter);
1632     filter = NULL;
1633     if (stats != NULL)
1634     {
1635       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1636       stats = NULL;
1637     }
1638     return;
1639   }
1640   stat_get =
1641       GNUNET_STATISTICS_get (stats, "datastore", quota_stat_name,
1642                              GNUNET_TIME_UNIT_SECONDS, &process_stat_done,
1643                              &process_stat_in, plugin);
1644   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1645   GNUNET_SERVER_add_handlers (server, handlers);
1646   if (GNUNET_YES == refresh_bf)
1647   {
1648     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1649                 _("Rebuilding bloomfilter.  Please be patient.\n"));
1650     if (NULL != plugin->api->get_keys)
1651       plugin->api->get_keys (plugin->api->cls, &add_key_to_bloomfilter, filter);  
1652     else
1653       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1654                   _("Plugin does not support get_keys function. Please fix!\n"));
1655
1656     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1657                 _("Bloomfilter construction complete.\n"));
1658   }
1659   expired_kill_task =
1660       GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1661                                           &delete_expired, NULL);
1662   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleaning_task,
1663                                 NULL);
1664 }
1665
1666
1667 /**
1668  * The main function for the datastore service.
1669  *
1670  * @param argc number of arguments from the command line
1671  * @param argv command line arguments
1672  * @return 0 ok, 1 on error
1673  */
1674 int
1675 main (int argc, char *const *argv)
1676 {
1677   int ret;
1678
1679   ret =
1680       (GNUNET_OK ==
1681        GNUNET_SERVICE_run (argc, argv, "datastore", GNUNET_SERVICE_OPTION_NONE,
1682                            &run, NULL)) ? 0 : 1;
1683   return ret;
1684 }
1685
1686
1687 /* end of gnunet-service-datastore.c */