changing time measurement from milliseconds to microseconds
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51 /**
52  * Name under which we store current space consumption.
53  */
54 static char *quota_stat_name;
55
56 /**
57  * After how many payload-changing operations
58  * do we sync our statistics?
59  */
60 #define MAX_STAT_SYNC_LAG 50
61
62
63 /**
64  * Our datastore plugin.
65  */
66 struct DatastorePlugin
67 {
68
69   /**
70    * API of the transport as returned by the plugin's
71    * initialization function.
72    */
73   struct GNUNET_DATASTORE_PluginFunctions *api;
74
75   /**
76    * Short name for the plugin (i.e. "sqlite").
77    */
78   char *short_name;
79
80   /**
81    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
82    */
83   char *lib_name;
84
85   /**
86    * Environment this transport service is using
87    * for this plugin.
88    */
89   struct GNUNET_DATASTORE_PluginEnvironment env;
90
91 };
92
93
94 /**
95  * Linked list of active reservations.
96  */
97 struct ReservationList
98 {
99
100   /**
101    * This is a linked list.
102    */
103   struct ReservationList *next;
104
105   /**
106    * Client that made the reservation.
107    */
108   struct GNUNET_SERVER_Client *client;
109
110   /**
111    * Number of bytes (still) reserved.
112    */
113   uint64_t amount;
114
115   /**
116    * Number of items (still) reserved.
117    */
118   uint64_t entries;
119
120   /**
121    * Reservation identifier.
122    */
123   int32_t rid;
124
125 };
126
127
128
129 /**
130  * Our datastore plugin (NULL if not available).
131  */
132 static struct DatastorePlugin *plugin;
133
134 /**
135  * Linked list of space reservations made by clients.
136  */
137 static struct ReservationList *reservations;
138
139 /**
140  * Bloomfilter to quickly tell if we don't have the content.
141  */
142 static struct GNUNET_CONTAINER_BloomFilter *filter;
143
144 /**
145  * How much space are we allowed to use?
146  */
147 static unsigned long long quota;
148
149 /**
150  * Should the database be dropped on exit?
151  */
152 static int do_drop;
153
154 /**
155  * Name of our plugin.
156  */
157 static char *plugin_name;
158
159 /**
160  * How much space are we using for the cache?  (space available for
161  * insertions that will be instantly reclaimed by discarding less
162  * important content --- or possibly whatever we just inserted into
163  * the "cache").
164  */
165 static unsigned long long cache_size;
166
167 /**
168  * How much space have we currently reserved?
169  */
170 static unsigned long long reserved;
171
172 /**
173  * How much data are we currently storing
174  * in the database?
175  */
176 static unsigned long long payload;
177
178 /**
179  * Number of updates that were made to the
180  * payload value since we last synchronized
181  * it with the statistics service.
182  */
183 static unsigned int lastSync;
184
185 /**
186  * Did we get an answer from statistics?
187  */
188 static int stats_worked;
189
190 /**
191  * Identity of the task that is used to delete
192  * expired content.
193  */
194 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
195
196 /**
197  * Our configuration.
198  */
199 const struct GNUNET_CONFIGURATION_Handle *cfg;
200
201 /**
202  * Minimum time that content should have to not be discarded instantly
203  * (time stamp of any content that we've been discarding recently to
204  * stay below the quota).  FOREVER if we had to expire content with
205  * non-zero priority.
206  */
207 static struct GNUNET_TIME_Absolute min_expiration;
208
209 /**
210  * Handle for reporting statistics.
211  */
212 static struct GNUNET_STATISTICS_Handle *stats;
213
214
215 /**
216  * Synchronize our utilization statistics with the
217  * statistics service.
218  */
219 static void
220 sync_stats ()
221 {
222   GNUNET_STATISTICS_set (stats, quota_stat_name, payload, GNUNET_YES);
223   GNUNET_STATISTICS_set (stats, "# utilization by current datastore", payload, GNUNET_NO);
224   lastSync = 0;
225 }
226
227
228
229 /**
230  * Context for transmitting replies to clients.
231  */
232 struct TransmitCallbackContext
233 {
234
235   /**
236    * We keep these in a doubly-linked list (for cleanup).
237    */
238   struct TransmitCallbackContext *next;
239
240   /**
241    * We keep these in a doubly-linked list (for cleanup).
242    */
243   struct TransmitCallbackContext *prev;
244
245   /**
246    * The message that we're asked to transmit.
247    */
248   struct GNUNET_MessageHeader *msg;
249
250   /**
251    * Handle for the transmission request.
252    */
253   struct GNUNET_SERVER_TransmitHandle *th;
254
255   /**
256    * Client that we are transmitting to.
257    */
258   struct GNUNET_SERVER_Client *client;
259
260 };
261
262
263 /**
264  * Head of the doubly-linked list (for cleanup).
265  */
266 static struct TransmitCallbackContext *tcc_head;
267
268 /**
269  * Tail of the doubly-linked list (for cleanup).
270  */
271 static struct TransmitCallbackContext *tcc_tail;
272
273 /**
274  * Have we already cleaned up the TCCs and are hence no longer
275  * willing (or able) to transmit anything to anyone?
276  */
277 static int cleaning_done;
278
279 /**
280  * Handle for pending get request.
281  */
282 static struct GNUNET_STATISTICS_GetHandle *stat_get;
283
284
285 /**
286  * Task that is used to remove expired entries from
287  * the datastore.  This task will schedule itself
288  * again automatically to always delete all expired
289  * content quickly.
290  *
291  * @param cls not used
292  * @param tc task context
293  */
294 static void
295 delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
296
297
298 /**
299  * Iterate over the expired items stored in the datastore.
300  * Delete all expired items; once we have processed all
301  * expired items, re-schedule the "delete_expired" task.
302  *
303  * @param cls not used
304  * @param key key for the content
305  * @param size number of bytes in data
306  * @param data content stored
307  * @param type type of the content
308  * @param priority priority of the content
309  * @param anonymity anonymity-level for the content
310  * @param expiration expiration time for the content
311  * @param uid unique identifier for the datum;
312  *        maybe 0 if no unique identifier is available
313  *
314  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
315  *         (continue on call to "next", of course),
316  *         GNUNET_NO to delete the item and continue (if supported)
317  */
318 static int
319 expired_processor (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
320                    const void *data, enum GNUNET_BLOCK_Type type,
321                    uint32_t priority, uint32_t anonymity,
322                    struct GNUNET_TIME_Absolute expiration, uint64_t uid)
323 {
324   struct GNUNET_TIME_Absolute now;
325
326   if (key == NULL)
327   {
328     expired_kill_task =
329         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
330                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
331                                                     &delete_expired, NULL);
332     return GNUNET_SYSERR;
333   }
334   now = GNUNET_TIME_absolute_get ();
335   if (expiration.abs_value_us > now.abs_value_us)
336   {
337     /* finished processing */
338     expired_kill_task =
339         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
340                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
341                                                     &delete_expired, NULL);
342     return GNUNET_SYSERR;
343   }
344   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
345               "Deleting content `%s' of type %u that expired %s ago\n",
346               GNUNET_h2s (key), type,
347               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_difference (expiration,
348                                                                                            now),
349                                                       GNUNET_YES));
350   min_expiration = now;
351   GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes expired"), size,
352                             GNUNET_YES);
353   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
354   expired_kill_task =
355       GNUNET_SCHEDULER_add_delayed_with_priority (MIN_EXPIRE_DELAY,
356                                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
357                                                   &delete_expired, NULL);
358   return GNUNET_NO;
359 }
360
361
362 /**
363  * Task that is used to remove expired entries from
364  * the datastore.  This task will schedule itself
365  * again automatically to always delete all expired
366  * content quickly.
367  *
368  * @param cls not used
369  * @param tc task context
370  */
371 static void
372 delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
373 {
374   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
375   plugin->api->get_expiration (plugin->api->cls, &expired_processor, NULL);
376 }
377
378
379 /**
380  * An iterator over a set of items stored in the datastore
381  * that deletes until we're happy with respect to our quota.
382  *
383  * @param cls closure
384  * @param key key for the content
385  * @param size number of bytes in data
386  * @param data content stored
387  * @param type type of the content
388  * @param priority priority of the content
389  * @param anonymity anonymity-level for the content
390  * @param expiration expiration time for the content
391  * @param uid unique identifier for the datum;
392  *        maybe 0 if no unique identifier is available
393  *
394  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
395  *         (continue on call to "next", of course),
396  *         GNUNET_NO to delete the item and continue (if supported)
397  */
398 static int
399 quota_processor (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
400                  const void *data, enum GNUNET_BLOCK_Type type,
401                  uint32_t priority, uint32_t anonymity,
402                  struct GNUNET_TIME_Absolute expiration, uint64_t uid)
403 {
404   unsigned long long *need = cls;
405
406   if (NULL == key)
407     return GNUNET_SYSERR;
408   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
409               "Deleting %llu bytes of low-priority (%u) content `%s' of type %u at %s prior to expiration (still trying to free another %llu bytes)\n",
410               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
411               (unsigned int) priority,
412               GNUNET_h2s (key), type, 
413               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
414                                                       GNUNET_YES),
415               *need);
416   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
417     *need = 0;
418   else
419     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
420   if (priority > 0)
421     min_expiration = GNUNET_TIME_UNIT_FOREVER_ABS;
422   else
423     min_expiration = expiration;
424   GNUNET_STATISTICS_update (stats,
425                             gettext_noop ("# bytes purged (low-priority)"),
426                             size, GNUNET_YES);
427   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
428   return GNUNET_NO;
429 }
430
431
432 /**
433  * Manage available disk space by running tasks
434  * that will discard content if necessary.  This
435  * function will be run whenever a request for
436  * "need" bytes of storage could only be satisfied
437  * by eating into the "cache" (and we want our cache
438  * space back).
439  *
440  * @param need number of bytes of content that were
441  *        placed into the "cache" (and hence the
442  *        number of bytes that should be removed).
443  */
444 static void
445 manage_space (unsigned long long need)
446 {
447   unsigned long long last;
448
449   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
450               "Asked to free up %llu bytes of cache space\n", need);
451   last = 0;
452   while ((need > 0) && (last != need))
453   {
454     last = need;
455     plugin->api->get_expiration (plugin->api->cls, &quota_processor, &need);
456   }
457 }
458
459
460 /**
461  * Function called to notify a client about the socket
462  * begin ready to queue more data.  "buf" will be
463  * NULL and "size" zero if the socket was closed for
464  * writing in the meantime.
465  *
466  * @param cls closure
467  * @param size number of bytes available in buf
468  * @param buf where the callee should write the message
469  * @return number of bytes written to buf
470  */
471 static size_t
472 transmit_callback (void *cls, size_t size, void *buf)
473 {
474   struct TransmitCallbackContext *tcc = cls;
475   size_t msize;
476
477   tcc->th = NULL;
478   GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
479   msize = ntohs (tcc->msg->size);
480   if (size == 0)
481   {
482     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
483                 _("Transmission to client failed!\n"));
484     GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
485     GNUNET_SERVER_client_drop (tcc->client);
486     GNUNET_free (tcc->msg);
487     GNUNET_free (tcc);
488     return 0;
489   }
490   GNUNET_assert (size >= msize);
491   memcpy (buf, tcc->msg, msize);
492   GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
493   GNUNET_SERVER_client_drop (tcc->client);
494   GNUNET_free (tcc->msg);
495   GNUNET_free (tcc);
496   return msize;
497 }
498
499
500 /**
501  * Transmit the given message to the client.
502  *
503  * @param client target of the message
504  * @param msg message to transmit, will be freed!
505  */
506 static void
507 transmit (struct GNUNET_SERVER_Client *client, struct GNUNET_MessageHeader *msg)
508 {
509   struct TransmitCallbackContext *tcc;
510
511   if (GNUNET_YES == cleaning_done)
512   {
513     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
514                 _("Shutdown in progress, aborting transmission.\n"));
515     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
516     GNUNET_free (msg);
517     return;
518   }
519   tcc = GNUNET_malloc (sizeof (struct TransmitCallbackContext));
520   tcc->msg = msg;
521   tcc->client = client;
522   if (NULL ==
523       (tcc->th =
524        GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
525                                             GNUNET_TIME_UNIT_FOREVER_REL,
526                                             &transmit_callback, tcc)))
527   {
528     GNUNET_break (0);
529     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
530     GNUNET_free (msg);
531     GNUNET_free (tcc);
532     return;
533   }
534   GNUNET_SERVER_client_keep (client);
535   GNUNET_CONTAINER_DLL_insert (tcc_head, tcc_tail, tcc);
536 }
537
538
539 /**
540  * Transmit a status code to the client.
541  *
542  * @param client receiver of the response
543  * @param code status code
544  * @param msg optional error message (can be NULL)
545  */
546 static void
547 transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg)
548 {
549   struct StatusMessage *sm;
550   size_t slen;
551
552   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553               "Transmitting `%s' message with value %d and message `%s'\n",
554               "STATUS", code, msg != NULL ? msg : "(none)");
555   slen = (msg == NULL) ? 0 : strlen (msg) + 1;
556   sm = GNUNET_malloc (sizeof (struct StatusMessage) + slen);
557   sm->header.size = htons (sizeof (struct StatusMessage) + slen);
558   sm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
559   sm->status = htonl (code);
560   sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration);
561   if (slen > 0)
562     memcpy (&sm[1], msg, slen);
563   transmit (client, &sm->header);
564 }
565
566
567
568 /**
569  * Function that will transmit the given datastore entry
570  * to the client.
571  *
572  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
573  * @param key key for the content
574  * @param size number of bytes in data
575  * @param data content stored
576  * @param type type of the content
577  * @param priority priority of the content
578  * @param anonymity anonymity-level for the content
579  * @param expiration expiration time for the content
580  * @param uid unique identifier for the datum;
581  *        maybe 0 if no unique identifier is available
582  *
583  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
584  *         GNUNET_NO to delete the item and continue (if supported)
585  */
586 static int
587 transmit_item (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
588                const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
589                uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
590                uint64_t uid)
591 {
592   struct GNUNET_SERVER_Client *client = cls;
593   struct GNUNET_MessageHeader *end;
594   struct DataMessage *dm;
595
596   if (key == NULL)
597   {
598     /* transmit 'DATA_END' */
599     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' message\n",
600                 "DATA_END");
601     end = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
602     end->size = htons (sizeof (struct GNUNET_MessageHeader));
603     end->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
604     transmit (client, end);
605     GNUNET_SERVER_client_drop (client);
606     return GNUNET_OK;
607   }
608   GNUNET_assert (sizeof (struct DataMessage) + size <
609                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
610   dm = GNUNET_malloc (sizeof (struct DataMessage) + size);
611   dm->header.size = htons (sizeof (struct DataMessage) + size);
612   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
613   dm->rid = htonl (0);
614   dm->size = htonl (size);
615   dm->type = htonl (type);
616   dm->priority = htonl (priority);
617   dm->anonymity = htonl (anonymity);
618   dm->replication = htonl (0);
619   dm->reserved = htonl (0);
620   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
621   dm->uid = GNUNET_htonll (uid);
622   dm->key = *key;
623   memcpy (&dm[1], data, size);
624   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
625               "Transmitting `%s' message for `%s' of type %u with expiration %s (in: %s)\n",
626               "DATA", GNUNET_h2s (key), type,
627               GNUNET_STRINGS_absolute_time_to_string (expiration),
628               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
629                                                       GNUNET_YES));
630   GNUNET_STATISTICS_update (stats, gettext_noop ("# results found"), 1,
631                             GNUNET_NO);
632   transmit (client, &dm->header);
633   GNUNET_SERVER_client_drop (client);
634   return GNUNET_OK;
635 }
636
637
638 /**
639  * Handle RESERVE-message.
640  *
641  * @param cls closure
642  * @param client identification of the client
643  * @param message the actual message
644  */
645 static void
646 handle_reserve (void *cls, struct GNUNET_SERVER_Client *client,
647                 const struct GNUNET_MessageHeader *message)
648 {
649   /**
650    * Static counter to produce reservation identifiers.
651    */
652   static int reservation_gen;
653
654   const struct ReserveMessage *msg = (const struct ReserveMessage *) message;
655   struct ReservationList *e;
656   unsigned long long used;
657   unsigned long long req;
658   uint64_t amount;
659   uint32_t entries;
660
661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "RESERVE");
662   amount = GNUNET_ntohll (msg->amount);
663   entries = ntohl (msg->entries);
664   used = payload + reserved;
665   req =
666       amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
667   if (used + req > quota)
668   {
669     if (quota < used)
670       used = quota;             /* cheat a bit for error message (to avoid negative numbers) */
671     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
672                 _
673                 ("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
674                 quota - used, "RESERVE", req);
675     if (cache_size < req)
676     {
677       /* TODO: document this in the FAQ; essentially, if this
678        * message happens, the insertion request could be blocked
679        * by less-important content from migration because it is
680        * larger than 1/8th of the overall available space, and
681        * we only reserve 1/8th for "fresh" insertions */
682       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
683                   _
684                   ("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
685                   req, cache_size);
686       transmit_status (client, 0,
687                        gettext_noop
688                        ("Insufficient space to satisfy request and "
689                         "requested amount is larger than cache size"));
690     }
691     else
692     {
693       transmit_status (client, 0,
694                        gettext_noop ("Insufficient space to satisfy request"));
695     }
696     return;
697   }
698   reserved += req;
699   GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
700                          GNUNET_NO);
701   e = GNUNET_malloc (sizeof (struct ReservationList));
702   e->next = reservations;
703   reservations = e;
704   e->client = client;
705   e->amount = amount;
706   e->entries = entries;
707   e->rid = ++reservation_gen;
708   if (reservation_gen < 0)
709     reservation_gen = 0;        /* wrap around */
710   transmit_status (client, e->rid, NULL);
711 }
712
713
714 /**
715  * Handle RELEASE_RESERVE-message.
716  *
717  * @param cls closure
718  * @param client identification of the client
719  * @param message the actual message
720  */
721 static void
722 handle_release_reserve (void *cls, struct GNUNET_SERVER_Client *client,
723                         const struct GNUNET_MessageHeader *message)
724 {
725   const struct ReleaseReserveMessage *msg =
726       (const struct ReleaseReserveMessage *) message;
727   struct ReservationList *pos;
728   struct ReservationList *prev;
729   struct ReservationList *next;
730   int rid = ntohl (msg->rid);
731   unsigned long long rem;
732
733   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
734               "RELEASE_RESERVE");
735   next = reservations;
736   prev = NULL;
737   while (NULL != (pos = next))
738   {
739     next = pos->next;
740     if (rid == pos->rid)
741     {
742       if (prev == NULL)
743         reservations = next;
744       else
745         prev->next = next;
746       rem =
747           pos->amount +
748           ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
749       GNUNET_assert (reserved >= rem);
750       reserved -= rem;
751       GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
752                              GNUNET_NO);
753       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
754                   "Returning %llu remaining reserved bytes to storage pool\n",
755                   rem);
756       GNUNET_free (pos);
757       transmit_status (client, GNUNET_OK, NULL);
758       return;
759     }
760     prev = pos;
761   }
762   GNUNET_break (0);
763   transmit_status (client, GNUNET_SYSERR,
764                    gettext_noop ("Could not find matching reservation"));
765 }
766
767
768 /**
769  * Check that the given message is a valid data message.
770  *
771  * @return NULL if the message is not well-formed, otherwise the message
772  */
773 static const struct DataMessage *
774 check_data (const struct GNUNET_MessageHeader *message)
775 {
776   uint16_t size;
777   uint32_t dsize;
778   const struct DataMessage *dm;
779
780   size = ntohs (message->size);
781   if (size < sizeof (struct DataMessage))
782   {
783     GNUNET_break (0);
784     return NULL;
785   }
786   dm = (const struct DataMessage *) message;
787   dsize = ntohl (dm->size);
788   if (size != dsize + sizeof (struct DataMessage))
789   {
790     GNUNET_break (0);
791     return NULL;
792   }
793   return dm;
794 }
795
796
797 /**
798  * Context for a PUT request used to see if the content is
799  * already present.
800  */
801 struct PutContext
802 {
803   /**
804    * Client to notify on completion.
805    */
806   struct GNUNET_SERVER_Client *client;
807
808 #if ! HAVE_UNALIGNED_64_ACCESS
809   void *reserved;
810 #endif
811
812   /* followed by the 'struct DataMessage' */
813 };
814
815
816 /**
817  * Actually put the data message.
818  *
819  * @param client sender of the message
820  * @param dm message with the data to store
821  */
822 static void
823 execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm)
824 {
825   uint32_t size;
826   char *msg;
827   int ret;
828
829   size = ntohl (dm->size);
830   msg = NULL;
831   ret =
832       plugin->api->put (plugin->api->cls, &dm->key, size, &dm[1],
833                         ntohl (dm->type), ntohl (dm->priority),
834                         ntohl (dm->anonymity), ntohl (dm->replication),
835                         GNUNET_TIME_absolute_ntoh (dm->expiration), &msg);
836   if (GNUNET_OK == ret)
837   {
838     GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes stored"), size,
839                               GNUNET_YES);
840     GNUNET_CONTAINER_bloomfilter_add (filter, &dm->key);
841     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
842                 "Successfully stored %u bytes of type %u under key `%s'\n",
843                 size, ntohl (dm->type), GNUNET_h2s (&dm->key));
844   }
845   transmit_status (client, ret, msg);
846   GNUNET_free_non_null (msg);
847   if (quota - reserved - cache_size < payload)
848   {
849     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
850                 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
851                 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
852                 (unsigned long long) (quota - reserved - cache_size),
853                 (unsigned long long) payload);
854     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
855   }
856 }
857
858
859 /**
860  * Function that will check if the given datastore entry
861  * matches the put and if none match executes the put.
862  *
863  * @param cls closure, pointer to the client (of type 'struct PutContext').
864  * @param key key for the content
865  * @param size number of bytes in data
866  * @param data content stored
867  * @param type type of the content
868  * @param priority priority of the content
869  * @param anonymity anonymity-level for the content
870  * @param expiration expiration time for the content
871  * @param uid unique identifier for the datum;
872  *        maybe 0 if no unique identifier is available
873  *
874  * @return GNUNET_OK usually
875  *         GNUNET_NO to delete the item
876  */
877 static int
878 check_present (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
879                const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
880                uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
881                uint64_t uid)
882 {
883   struct PutContext *pc = cls;
884   const struct DataMessage *dm;
885
886   dm = (const struct DataMessage *) &pc[1];
887   if (key == NULL)
888   {
889     execute_put (pc->client, dm);
890     GNUNET_SERVER_client_drop (pc->client);
891     GNUNET_free (pc);
892     return GNUNET_OK;
893   }
894   if ((GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
895       (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) || ((size == ntohl (dm->size)) &&
896                                                 (0 ==
897                                                  memcmp (&dm[1], data, size))))
898   {
899     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
900                 "Result already present in datastore\n");
901     /* FIXME: change API to allow increasing 'replication' counter */
902     if ((ntohl (dm->priority) > 0) ||
903         (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value_us >
904          expiration.abs_value_us))
905       plugin->api->update (plugin->api->cls, uid,
906                            (int32_t) ntohl (dm->priority),
907                            GNUNET_TIME_absolute_ntoh (dm->expiration), NULL);
908     transmit_status (pc->client, GNUNET_NO, NULL);
909     GNUNET_SERVER_client_drop (pc->client);
910     GNUNET_free (pc);
911   }
912   else
913   {
914     execute_put (pc->client, dm);
915     GNUNET_SERVER_client_drop (pc->client);
916     GNUNET_free (pc);
917   }
918   return GNUNET_OK;
919 }
920
921
922 /**
923  * Handle PUT-message.
924  *
925  * @param cls closure
926  * @param client identification of the client
927  * @param message the actual message
928  */
929 static void
930 handle_put (void *cls, struct GNUNET_SERVER_Client *client,
931             const struct GNUNET_MessageHeader *message)
932 {
933   const struct DataMessage *dm = check_data (message);
934   int rid;
935   struct ReservationList *pos;
936   struct PutContext *pc;
937   struct GNUNET_HashCode vhash;
938   uint32_t size;
939
940   if ((dm == NULL) || (ntohl (dm->type) == 0))
941   {
942     GNUNET_break (0);
943     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
944     return;
945   }
946   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
947               "Processing `%s' request for `%s' of type %u\n", "PUT",
948               GNUNET_h2s (&dm->key), ntohl (dm->type));
949   rid = ntohl (dm->rid);
950   size = ntohl (dm->size);
951   if (rid > 0)
952   {
953     pos = reservations;
954     while ((NULL != pos) && (rid != pos->rid))
955       pos = pos->next;
956     GNUNET_break (pos != NULL);
957     if (NULL != pos)
958     {
959       GNUNET_break (pos->entries > 0);
960       GNUNET_break (pos->amount >= size);
961       pos->entries--;
962       pos->amount -= size;
963       reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
964       GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
965                              GNUNET_NO);
966     }
967   }
968   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter, &dm->key))
969   {
970     GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
971     pc = GNUNET_malloc (sizeof (struct PutContext) + size +
972                         sizeof (struct DataMessage));
973     pc->client = client;
974     GNUNET_SERVER_client_keep (client);
975     memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
976     plugin->api->get_key (plugin->api->cls, 0, &dm->key, &vhash,
977                           ntohl (dm->type), &check_present, pc);
978     return;
979   }
980   execute_put (client, dm);
981 }
982
983
984 /**
985  * Handle GET-message.
986  *
987  * @param cls closure
988  * @param client identification of the client
989  * @param message the actual message
990  */
991 static void
992 handle_get (void *cls, struct GNUNET_SERVER_Client *client,
993             const struct GNUNET_MessageHeader *message)
994 {
995   const struct GetMessage *msg;
996   uint16_t size;
997
998   size = ntohs (message->size);
999   if ((size != sizeof (struct GetMessage)) &&
1000       (size != sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode)))
1001   {
1002     GNUNET_break (0);
1003     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1004     return;
1005   }
1006   msg = (const struct GetMessage *) message;
1007   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1008               "Processing `%s' request for `%s' of type %u\n", "GET",
1009               GNUNET_h2s (&msg->key), ntohl (msg->type));
1010   GNUNET_STATISTICS_update (stats, gettext_noop ("# GET requests received"), 1,
1011                             GNUNET_NO);
1012   GNUNET_SERVER_client_keep (client);
1013   if ((size == sizeof (struct GetMessage)) &&
1014       (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)))
1015   {
1016     /* don't bother database... */
1017     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1018                 "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1019                 "GET", GNUNET_h2s (&msg->key));
1020     GNUNET_STATISTICS_update (stats,
1021                               gettext_noop
1022                               ("# requests filtered by bloomfilter"), 1,
1023                               GNUNET_NO);
1024     transmit_item (client, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1025                    0);
1026     return;
1027   }
1028   plugin->api->get_key (plugin->api->cls, GNUNET_ntohll (msg->offset),
1029                         ((size ==
1030                           sizeof (struct GetMessage)) ? &msg->key : NULL), NULL,
1031                         ntohl (msg->type), &transmit_item, client);
1032 }
1033
1034
1035 /**
1036  * Handle UPDATE-message.
1037  *
1038  * @param cls closure
1039  * @param client identification of the client
1040  * @param message the actual message
1041  */
1042 static void
1043 handle_update (void *cls, struct GNUNET_SERVER_Client *client,
1044                const struct GNUNET_MessageHeader *message)
1045 {
1046   const struct UpdateMessage *msg;
1047   int ret;
1048   char *emsg;
1049
1050   GNUNET_STATISTICS_update (stats, gettext_noop ("# UPDATE requests received"),
1051                             1, GNUNET_NO);
1052   msg = (const struct UpdateMessage *) message;
1053   emsg = NULL;
1054   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request for %llu\n",
1055               "UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid));
1056   ret =
1057       plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid),
1058                            (int32_t) ntohl (msg->priority),
1059                            GNUNET_TIME_absolute_ntoh (msg->expiration), &emsg);
1060   transmit_status (client, ret, emsg);
1061   GNUNET_free_non_null (emsg);
1062 }
1063
1064
1065 /**
1066  * Handle GET_REPLICATION-message.
1067  *
1068  * @param cls closure
1069  * @param client identification of the client
1070  * @param message the actual message
1071  */
1072 static void
1073 handle_get_replication (void *cls, struct GNUNET_SERVER_Client *client,
1074                         const struct GNUNET_MessageHeader *message)
1075 {
1076   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
1077               "GET_REPLICATION");
1078   GNUNET_STATISTICS_update (stats,
1079                             gettext_noop
1080                             ("# GET REPLICATION requests received"), 1,
1081                             GNUNET_NO);
1082   GNUNET_SERVER_client_keep (client);
1083   plugin->api->get_replication (plugin->api->cls, &transmit_item, client);
1084 }
1085
1086
1087 /**
1088  * Handle GET_ZERO_ANONYMITY-message.
1089  *
1090  * @param cls closure
1091  * @param client identification of the client
1092  * @param message the actual message
1093  */
1094 static void
1095 handle_get_zero_anonymity (void *cls, struct GNUNET_SERVER_Client *client,
1096                            const struct GNUNET_MessageHeader *message)
1097 {
1098   const struct GetZeroAnonymityMessage *msg =
1099       (const struct GetZeroAnonymityMessage *) message;
1100   enum GNUNET_BLOCK_Type type;
1101
1102   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1103   if (type == GNUNET_BLOCK_TYPE_ANY)
1104   {
1105     GNUNET_break (0);
1106     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1107     return;
1108   }
1109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
1110               "GET_ZERO_ANONYMITY");
1111   GNUNET_STATISTICS_update (stats,
1112                             gettext_noop
1113                             ("# GET ZERO ANONYMITY requests received"), 1,
1114                             GNUNET_NO);
1115   GNUNET_SERVER_client_keep (client);
1116   plugin->api->get_zero_anonymity (plugin->api->cls,
1117                                    GNUNET_ntohll (msg->offset), type,
1118                                    &transmit_item, client);
1119 }
1120
1121
1122 /**
1123  * Callback function that will cause the item that is passed
1124  * in to be deleted (by returning GNUNET_NO).
1125  */
1126 static int
1127 remove_callback (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
1128                  const void *data, enum GNUNET_BLOCK_Type type,
1129                  uint32_t priority, uint32_t anonymity,
1130                  struct GNUNET_TIME_Absolute expiration, uint64_t uid)
1131 {
1132   struct GNUNET_SERVER_Client *client = cls;
1133
1134   if (key == NULL)
1135   {
1136     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137                 "No further matches for `%s' request.\n", "REMOVE");
1138     transmit_status (client, GNUNET_NO, _("Content not found"));
1139     GNUNET_SERVER_client_drop (client);
1140     return GNUNET_OK;           /* last item */
1141   }
1142   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1143               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1144               (unsigned long long) uid, "REMOVE", GNUNET_h2s (key), type);
1145   GNUNET_STATISTICS_update (stats,
1146                             gettext_noop ("# bytes removed (explicit request)"),
1147                             size, GNUNET_YES);
1148   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
1149   transmit_status (client, GNUNET_OK, NULL);
1150   GNUNET_SERVER_client_drop (client);
1151   return GNUNET_NO;
1152 }
1153
1154
1155 /**
1156  * Handle REMOVE-message.
1157  *
1158  * @param cls closure
1159  * @param client identification of the client
1160  * @param message the actual message
1161  */
1162 static void
1163 handle_remove (void *cls, struct GNUNET_SERVER_Client *client,
1164                const struct GNUNET_MessageHeader *message)
1165 {
1166   const struct DataMessage *dm = check_data (message);
1167   struct GNUNET_HashCode vhash;
1168
1169   if (dm == NULL)
1170   {
1171     GNUNET_break (0);
1172     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1173     return;
1174   }
1175   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1176               "Processing `%s' request for `%s' of type %u\n", "REMOVE",
1177               GNUNET_h2s (&dm->key), ntohl (dm->type));
1178   GNUNET_STATISTICS_update (stats, gettext_noop ("# REMOVE requests received"),
1179                             1, GNUNET_NO);
1180   GNUNET_SERVER_client_keep (client);
1181   GNUNET_CRYPTO_hash (&dm[1], ntohl (dm->size), &vhash);
1182   plugin->api->get_key (plugin->api->cls, 0, &dm->key, &vhash,
1183                         (enum GNUNET_BLOCK_Type) ntohl (dm->type),
1184                         &remove_callback, client);
1185 }
1186
1187
1188 /**
1189  * Handle DROP-message.
1190  *
1191  * @param cls closure
1192  * @param client identification of the client
1193  * @param message the actual message
1194  */
1195 static void
1196 handle_drop (void *cls, struct GNUNET_SERVER_Client *client,
1197              const struct GNUNET_MessageHeader *message)
1198 {
1199   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "DROP");
1200   do_drop = GNUNET_YES;
1201   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1202 }
1203
1204
1205 /**
1206  * Function called by plugins to notify us about a
1207  * change in their disk utilization.
1208  *
1209  * @param cls closure (NULL)
1210  * @param delta change in disk utilization,
1211  *        0 for "reset to empty"
1212  */
1213 static void
1214 disk_utilization_change_cb (void *cls, int delta)
1215 {
1216   if ((delta < 0) && (payload < -delta))
1217   {
1218     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1219                 _
1220                 ("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1221                 (long long) payload, (long long) -delta);
1222     payload = plugin->api->estimate_size (plugin->api->cls);
1223     sync_stats ();
1224     return;
1225   }
1226   payload += delta;
1227   lastSync++;
1228   if (lastSync >= MAX_STAT_SYNC_LAG)
1229     sync_stats ();
1230 }
1231
1232
1233 /**
1234  * Callback function to process statistic values.
1235  *
1236  * @param cls closure (struct Plugin*)
1237  * @param subsystem name of subsystem that created the statistic
1238  * @param name the name of the datum
1239  * @param value the current value
1240  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1241  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1242  */
1243 static int
1244 process_stat_in (void *cls, const char *subsystem, const char *name,
1245                  uint64_t value, int is_persistent)
1246 {
1247   GNUNET_assert (stats_worked == GNUNET_NO);
1248   stats_worked = GNUNET_YES;
1249   payload += value;
1250   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1251               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1252               value, payload);
1253   return GNUNET_OK;
1254 }
1255
1256
1257 static void
1258 process_stat_done (void *cls, int success)
1259 {
1260   struct DatastorePlugin *plugin = cls;
1261
1262   stat_get = NULL;
1263   if (stats_worked == GNUNET_NO)
1264     payload = plugin->api->estimate_size (plugin->api->cls);
1265 }
1266
1267
1268 /**
1269  * Load the datastore plugin.
1270  */
1271 static struct DatastorePlugin *
1272 load_plugin ()
1273 {
1274   struct DatastorePlugin *ret;
1275   char *libname;
1276
1277   ret = GNUNET_malloc (sizeof (struct DatastorePlugin));
1278   ret->env.cfg = cfg;
1279   ret->env.duc = &disk_utilization_change_cb;
1280   ret->env.cls = NULL;
1281   GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Loading `%s' datastore plugin\n"),
1282               plugin_name);
1283   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", plugin_name);
1284   ret->short_name = GNUNET_strdup (plugin_name);
1285   ret->lib_name = libname;
1286   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1287   if (ret->api == NULL)
1288   {
1289     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1290                 _("Failed to load datastore plugin for `%s'\n"), plugin_name);
1291     GNUNET_free (ret->short_name);
1292     GNUNET_free (libname);
1293     GNUNET_free (ret);
1294     return NULL;
1295   }
1296   return ret;
1297 }
1298
1299
1300 /**
1301  * Function called when the service shuts
1302  * down.  Unloads our datastore plugin.
1303  *
1304  * @param plug plugin to unload
1305  */
1306 static void
1307 unload_plugin (struct DatastorePlugin *plug)
1308 {
1309   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1310               "Datastore service is unloading plugin...\n");
1311   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1312   GNUNET_free (plug->lib_name);
1313   GNUNET_free (plug->short_name);
1314   GNUNET_free (plug);
1315   GNUNET_free (quota_stat_name);
1316   quota_stat_name = NULL;
1317 }
1318
1319
1320 /**
1321  * Final task run after shutdown.  Unloads plugins and disconnects us from
1322  * statistics.
1323  */
1324 static void
1325 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1326 {
1327   if (lastSync > 0)
1328     sync_stats ();
1329   if (GNUNET_YES == do_drop)
1330     plugin->api->drop (plugin->api->cls);
1331   unload_plugin (plugin);
1332   plugin = NULL;
1333   if (filter != NULL)
1334   {
1335     GNUNET_CONTAINER_bloomfilter_free (filter);
1336     filter = NULL;
1337   }
1338   if (stat_get != NULL)
1339   {
1340     GNUNET_STATISTICS_get_cancel (stat_get);
1341     stat_get = NULL;
1342   }
1343   if (stats != NULL)
1344   {
1345     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1346     stats = NULL;
1347   }
1348   GNUNET_free_non_null (plugin_name);
1349   plugin_name = NULL;
1350 }
1351
1352
1353 /**
1354  * Last task run during shutdown.  Disconnects us from
1355  * the transport and core.
1356  */
1357 static void
1358 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1359 {
1360   struct TransmitCallbackContext *tcc;
1361
1362   cleaning_done = GNUNET_YES;
1363   while (NULL != (tcc = tcc_head))
1364   {
1365     GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
1366     if (tcc->th != NULL)
1367     {
1368       GNUNET_SERVER_notify_transmit_ready_cancel (tcc->th);
1369       GNUNET_SERVER_client_drop (tcc->client);
1370     }
1371     GNUNET_free (tcc->msg);
1372     GNUNET_free (tcc);
1373   }
1374   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1375   {
1376     GNUNET_SCHEDULER_cancel (expired_kill_task);
1377     expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1378   }
1379   GNUNET_SCHEDULER_add_continuation (&unload_task, NULL,
1380                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1381 }
1382
1383
1384 /**
1385  * Function that removes all active reservations made
1386  * by the given client and releases the space for other
1387  * requests.
1388  *
1389  * @param cls closure
1390  * @param client identification of the client
1391  */
1392 static void
1393 cleanup_reservations (void *cls, struct GNUNET_SERVER_Client *client)
1394 {
1395   struct ReservationList *pos;
1396   struct ReservationList *prev;
1397   struct ReservationList *next;
1398
1399   if (client == NULL)
1400     return;
1401   prev = NULL;
1402   pos = reservations;
1403   while (NULL != pos)
1404   {
1405     next = pos->next;
1406     if (pos->client == client)
1407     {
1408       if (prev == NULL)
1409         reservations = next;
1410       else
1411         prev->next = next;
1412       reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1413       GNUNET_free (pos);
1414     }
1415     else
1416     {
1417       prev = pos;
1418     }
1419     pos = next;
1420   }
1421   GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
1422                          GNUNET_NO);
1423 }
1424
1425
1426 /**
1427  * Adds a given key to the bloomfilter 'count' times.
1428  *
1429  * @param cls the bloomfilter
1430  * @param key key to add
1431  * @param count number of times to add key
1432  */
1433 static void
1434 add_key_to_bloomfilter (void *cls,
1435                         const struct GNUNET_HashCode *key,
1436                         unsigned int count)
1437 {
1438   struct GNUNET_CONTAINER_BloomFilter *bf = cls;
1439   while (0 < count--)
1440     GNUNET_CONTAINER_bloomfilter_add (bf, key);
1441 }
1442
1443
1444 /**
1445  * Process datastore requests.
1446  *
1447  * @param cls closure
1448  * @param server the initialized server
1449  * @param c configuration to use
1450  */
1451 static void
1452 run (void *cls, struct GNUNET_SERVER_Handle *server,
1453      const struct GNUNET_CONFIGURATION_Handle *c)
1454 {
1455   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1456     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1457      sizeof (struct ReserveMessage)},
1458     {&handle_release_reserve, NULL,
1459      GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1460      sizeof (struct ReleaseReserveMessage)},
1461     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
1462     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1463      sizeof (struct UpdateMessage)},
1464     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
1465     {&handle_get_replication, NULL,
1466      GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1467      sizeof (struct GNUNET_MessageHeader)},
1468     {&handle_get_zero_anonymity, NULL,
1469      GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1470      sizeof (struct GetZeroAnonymityMessage)},
1471     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
1472     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1473      sizeof (struct GNUNET_MessageHeader)},
1474     {NULL, NULL, 0, 0}
1475   };
1476   char *fn;  
1477   char *pfn;
1478   unsigned int bf_size;
1479   int refresh_bf;
1480
1481   cfg = c;
1482   if (GNUNET_OK !=
1483       GNUNET_CONFIGURATION_get_value_string (cfg, "DATASTORE", "DATABASE",
1484                                              &plugin_name))
1485   {
1486     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1487                 _("No `%s' specified for `%s' in configuration!\n"), "DATABASE",
1488                 "DATASTORE");
1489     return;
1490   }
1491   GNUNET_asprintf (&quota_stat_name,
1492                    _("# bytes used in file-sharing datastore `%s'"),
1493                    plugin_name);
1494   if (GNUNET_OK !=
1495       GNUNET_CONFIGURATION_get_value_size (cfg, "DATASTORE", "QUOTA", &quota))
1496   {
1497     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1498                 _("No `%s' specified for `%s' in configuration!\n"), "QUOTA",
1499                 "DATASTORE");
1500     return;
1501   }
1502   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1503   GNUNET_STATISTICS_set (stats, gettext_noop ("# quota"), quota, GNUNET_NO);
1504   cache_size = quota / 8;       /* Or should we make this an option? */
1505   GNUNET_STATISTICS_set (stats, gettext_noop ("# cache size"), cache_size,
1506                          GNUNET_NO);
1507   if (quota / (32 * 1024LL) > (1 << 31)) 
1508     bf_size = (1 << 31);          /* absolute limit: ~2 GB, beyond that BF just won't help anyway */
1509   else
1510     bf_size = quota / (32 * 1024LL);         /* 8 bit per entry, 1 bit per 32 kb in DB */
1511   fn = NULL;
1512   if ((GNUNET_OK !=
1513        GNUNET_CONFIGURATION_get_value_filename (cfg, "DATASTORE", "BLOOMFILTER",
1514                                                 &fn)) ||
1515       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
1516   {
1517     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1518                 _("Could not use specified filename `%s' for bloomfilter.\n"),
1519                 fn != NULL ? fn : "");
1520     GNUNET_free_non_null (fn);
1521     fn = NULL;
1522   }
1523   if (fn != NULL)
1524   {
1525     GNUNET_asprintf (&pfn, "%s.%s", fn, plugin_name);
1526     if (GNUNET_YES == GNUNET_DISK_file_test (pfn))
1527     {
1528       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1529       if (NULL == filter)
1530       {
1531         /* file exists but not valid, remove and try again, but refresh */
1532         if (0 != UNLINK (pfn))
1533         {
1534           /* failed to remove, run without file */
1535           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1536                       _("Failed to remove bogus bloomfilter file `%s'\n"),
1537                       pfn);
1538           GNUNET_free (pfn);
1539           pfn = NULL;
1540           filter = GNUNET_CONTAINER_bloomfilter_load (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1541           refresh_bf = GNUNET_YES;
1542         }
1543         else
1544         {
1545           /* try again after remove */
1546           filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1547           refresh_bf = GNUNET_YES;
1548           if (NULL == filter)
1549           {
1550             /* failed yet again, give up on using file */
1551             GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1552                         _("Failed to remove bogus bloomfilter file `%s'\n"),
1553                         pfn);
1554             GNUNET_free (pfn);
1555             pfn = NULL;
1556             filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1557           }
1558         }
1559       }
1560       else
1561       {
1562         /* normal case: have an existing valid bf file, no need to refresh */
1563         refresh_bf = GNUNET_NO;
1564       }
1565     }
1566     else
1567     {
1568       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1569       refresh_bf = GNUNET_YES;
1570     }
1571     GNUNET_free (pfn);
1572   }
1573   else
1574   {
1575     filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);      /* approx. 3% false positives at max use */
1576     refresh_bf = GNUNET_YES;
1577   }
1578   GNUNET_free_non_null (fn);
1579   if (filter == NULL)
1580   {
1581     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1582                 _("Failed to initialize bloomfilter.\n"));
1583     if (stats != NULL)
1584     {
1585       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1586       stats = NULL;
1587     }
1588     return;
1589   }
1590   plugin = load_plugin ();
1591   if (NULL == plugin)
1592   {
1593     GNUNET_CONTAINER_bloomfilter_free (filter);
1594     filter = NULL;
1595     if (stats != NULL)
1596     {
1597       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1598       stats = NULL;
1599     }
1600     return;
1601   }
1602   stat_get =
1603       GNUNET_STATISTICS_get (stats, "datastore", quota_stat_name,
1604                              GNUNET_TIME_UNIT_SECONDS, &process_stat_done,
1605                              &process_stat_in, plugin);
1606   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1607   GNUNET_SERVER_add_handlers (server, handlers);
1608   if (GNUNET_YES == refresh_bf)
1609   {
1610     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1611                 _("Rebuilding bloomfilter.  Please be patient.\n"));
1612     if (NULL != plugin->api->get_keys)
1613       plugin->api->get_keys (plugin->api->cls, &add_key_to_bloomfilter, filter);  
1614     else
1615       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1616                   _("Plugin does not support get_keys function. Please fix!\n"));
1617
1618     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1619                 _("Bloomfilter construction complete.\n"));
1620   }
1621   expired_kill_task =
1622       GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1623                                           &delete_expired, NULL);
1624   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleaning_task,
1625                                 NULL);
1626 }
1627
1628
1629 /**
1630  * The main function for the datastore service.
1631  *
1632  * @param argc number of arguments from the command line
1633  * @param argv command line arguments
1634  * @return 0 ok, 1 on error
1635  */
1636 int
1637 main (int argc, char *const *argv)
1638 {
1639   int ret;
1640
1641   ret =
1642       (GNUNET_OK ==
1643        GNUNET_SERVICE_run (argc, argv, "datastore", GNUNET_SERVICE_OPTION_NONE,
1644                            &run, NULL)) ? 0 : 1;
1645   return ret;
1646 }
1647
1648
1649 /* end of gnunet-service-datastore.c */