include plugin in gnunet-transport output
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51 /**
52  * Name under which we store current space consumption.
53  */
54 static char *quota_stat_name;
55
56 /**
57  * After how many payload-changing operations
58  * do we sync our statistics?
59  */
60 #define MAX_STAT_SYNC_LAG 50
61
62
63 /**
64  * Our datastore plugin.
65  */
66 struct DatastorePlugin
67 {
68
69   /**
70    * API of the transport as returned by the plugin's
71    * initialization function.
72    */
73   struct GNUNET_DATASTORE_PluginFunctions *api;
74
75   /**
76    * Short name for the plugin (i.e. "sqlite").
77    */
78   char *short_name;
79
80   /**
81    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
82    */
83   char *lib_name;
84
85   /**
86    * Environment this transport service is using
87    * for this plugin.
88    */
89   struct GNUNET_DATASTORE_PluginEnvironment env;
90
91 };
92
93
94 /**
95  * Linked list of active reservations.
96  */
97 struct ReservationList
98 {
99
100   /**
101    * This is a linked list.
102    */
103   struct ReservationList *next;
104
105   /**
106    * Client that made the reservation.
107    */
108   struct GNUNET_SERVER_Client *client;
109
110   /**
111    * Number of bytes (still) reserved.
112    */
113   uint64_t amount;
114
115   /**
116    * Number of items (still) reserved.
117    */
118   uint64_t entries;
119
120   /**
121    * Reservation identifier.
122    */
123   int32_t rid;
124
125 };
126
127
128
129 /**
130  * Our datastore plugin (NULL if not available).
131  */
132 static struct DatastorePlugin *plugin;
133
134 /**
135  * Linked list of space reservations made by clients.
136  */
137 static struct ReservationList *reservations;
138
139 /**
140  * Bloomfilter to quickly tell if we don't have the content.
141  */
142 static struct GNUNET_CONTAINER_BloomFilter *filter;
143
144 /**
145  * How much space are we allowed to use?
146  */
147 static unsigned long long quota;
148
149 /**
150  * Should the database be dropped on exit?
151  */
152 static int do_drop;
153
154 /**
155  * Name of our plugin.
156  */
157 static char *plugin_name;
158
159 /**
160  * How much space are we using for the cache?  (space available for
161  * insertions that will be instantly reclaimed by discarding less
162  * important content --- or possibly whatever we just inserted into
163  * the "cache").
164  */
165 static unsigned long long cache_size;
166
167 /**
168  * How much space have we currently reserved?
169  */
170 static unsigned long long reserved;
171
172 /**
173  * How much data are we currently storing
174  * in the database?
175  */
176 static unsigned long long payload;
177
178 /**
179  * Number of updates that were made to the
180  * payload value since we last synchronized
181  * it with the statistics service.
182  */
183 static unsigned int lastSync;
184
185 /**
186  * Did we get an answer from statistics?
187  */
188 static int stats_worked;
189
190 /**
191  * Identity of the task that is used to delete
192  * expired content.
193  */
194 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
195
196 /**
197  * Our configuration.
198  */
199 const struct GNUNET_CONFIGURATION_Handle *cfg;
200
201 /**
202  * Minimum time that content should have to not be discarded instantly
203  * (time stamp of any content that we've been discarding recently to
204  * stay below the quota).  FOREVER if we had to expire content with
205  * non-zero priority.
206  */
207 static struct GNUNET_TIME_Absolute min_expiration;
208
209 /**
210  * Handle for reporting statistics.
211  */
212 static struct GNUNET_STATISTICS_Handle *stats;
213
214
215 /**
216  * Synchronize our utilization statistics with the
217  * statistics service.
218  */
219 static void
220 sync_stats ()
221 {
222   GNUNET_STATISTICS_set (stats, quota_stat_name, payload, GNUNET_YES);
223   GNUNET_STATISTICS_set (stats, "# utilization by current datastore", payload, GNUNET_NO);
224   lastSync = 0;
225 }
226
227
228
229 /**
230  * Context for transmitting replies to clients.
231  */
232 struct TransmitCallbackContext
233 {
234
235   /**
236    * We keep these in a doubly-linked list (for cleanup).
237    */
238   struct TransmitCallbackContext *next;
239
240   /**
241    * We keep these in a doubly-linked list (for cleanup).
242    */
243   struct TransmitCallbackContext *prev;
244
245   /**
246    * The message that we're asked to transmit.
247    */
248   struct GNUNET_MessageHeader *msg;
249
250   /**
251    * Handle for the transmission request.
252    */
253   struct GNUNET_CONNECTION_TransmitHandle *th;
254
255   /**
256    * Client that we are transmitting to.
257    */
258   struct GNUNET_SERVER_Client *client;
259
260 };
261
262
263 /**
264  * Head of the doubly-linked list (for cleanup).
265  */
266 static struct TransmitCallbackContext *tcc_head;
267
268 /**
269  * Tail of the doubly-linked list (for cleanup).
270  */
271 static struct TransmitCallbackContext *tcc_tail;
272
273 /**
274  * Have we already cleaned up the TCCs and are hence no longer
275  * willing (or able) to transmit anything to anyone?
276  */
277 static int cleaning_done;
278
279 /**
280  * Handle for pending get request.
281  */
282 static struct GNUNET_STATISTICS_GetHandle *stat_get;
283
284
285 /**
286  * Task that is used to remove expired entries from
287  * the datastore.  This task will schedule itself
288  * again automatically to always delete all expired
289  * content quickly.
290  *
291  * @param cls not used
292  * @param tc task context
293  */
294 static void
295 delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
296
297
298 /**
299  * Iterate over the expired items stored in the datastore.
300  * Delete all expired items; once we have processed all
301  * expired items, re-schedule the "delete_expired" task.
302  *
303  * @param cls not used
304  * @param key key for the content
305  * @param size number of bytes in data
306  * @param data content stored
307  * @param type type of the content
308  * @param priority priority of the content
309  * @param anonymity anonymity-level for the content
310  * @param expiration expiration time for the content
311  * @param uid unique identifier for the datum;
312  *        maybe 0 if no unique identifier is available
313  *
314  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
315  *         (continue on call to "next", of course),
316  *         GNUNET_NO to delete the item and continue (if supported)
317  */
318 static int
319 expired_processor (void *cls, const GNUNET_HashCode * key, uint32_t size,
320                    const void *data, enum GNUNET_BLOCK_Type type,
321                    uint32_t priority, uint32_t anonymity,
322                    struct GNUNET_TIME_Absolute expiration, uint64_t uid)
323 {
324   struct GNUNET_TIME_Absolute now;
325
326   if (key == NULL)
327   {
328     expired_kill_task =
329         GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY, &delete_expired, NULL);
330     return GNUNET_SYSERR;
331   }
332   now = GNUNET_TIME_absolute_get ();
333   if (expiration.abs_value > now.abs_value)
334   {
335     /* finished processing */
336     expired_kill_task =
337         GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY, &delete_expired, NULL);
338     return GNUNET_SYSERR;
339   }
340 #if DEBUG_DATASTORE
341   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
342               "Deleting content `%s' of type %u that expired %llu ms ago\n",
343               GNUNET_h2s (key), type,
344               (unsigned long long) (now.abs_value - expiration.abs_value));
345 #endif
346   min_expiration = now;
347   GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes expired"), size,
348                             GNUNET_YES);
349   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
350   expired_kill_task =
351       GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY, &delete_expired, NULL);
352   return GNUNET_NO;
353 }
354
355
356 /**
357  * Task that is used to remove expired entries from
358  * the datastore.  This task will schedule itself
359  * again automatically to always delete all expired
360  * content quickly.
361  *
362  * @param cls not used
363  * @param tc task context
364  */
365 static void
366 delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
367 {
368   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
369   plugin->api->get_expiration (plugin->api->cls, &expired_processor, NULL);
370 }
371
372
373 /**
374  * An iterator over a set of items stored in the datastore
375  * that deletes until we're happy with respect to our quota.
376  *
377  * @param cls closure
378  * @param key key for the content
379  * @param size number of bytes in data
380  * @param data content stored
381  * @param type type of the content
382  * @param priority priority of the content
383  * @param anonymity anonymity-level for the content
384  * @param expiration expiration time for the content
385  * @param uid unique identifier for the datum;
386  *        maybe 0 if no unique identifier is available
387  *
388  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
389  *         (continue on call to "next", of course),
390  *         GNUNET_NO to delete the item and continue (if supported)
391  */
392 static int
393 quota_processor (void *cls, const GNUNET_HashCode * key, uint32_t size,
394                  const void *data, enum GNUNET_BLOCK_Type type,
395                  uint32_t priority, uint32_t anonymity,
396                  struct GNUNET_TIME_Absolute expiration, uint64_t uid)
397 {
398   unsigned long long *need = cls;
399
400   if (NULL == key)
401     return GNUNET_SYSERR;
402 #if DEBUG_DATASTORE
403   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
405               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
406               GNUNET_h2s (key), type, *need);
407 #endif
408   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
409     *need = 0;
410   else
411     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
412   if (priority > 0)
413     min_expiration = GNUNET_TIME_UNIT_FOREVER_ABS;
414   else
415     min_expiration = expiration;
416   GNUNET_STATISTICS_update (stats,
417                             gettext_noop ("# bytes purged (low-priority)"),
418                             size, GNUNET_YES);
419   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
420   return GNUNET_NO;
421 }
422
423
424 /**
425  * Manage available disk space by running tasks
426  * that will discard content if necessary.  This
427  * function will be run whenever a request for
428  * "need" bytes of storage could only be satisfied
429  * by eating into the "cache" (and we want our cache
430  * space back).
431  *
432  * @param need number of bytes of content that were
433  *        placed into the "cache" (and hence the
434  *        number of bytes that should be removed).
435  */
436 static void
437 manage_space (unsigned long long need)
438 {
439   unsigned long long last;
440
441 #if DEBUG_DATASTORE
442   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443               "Asked to free up %llu bytes of cache space\n", need);
444 #endif
445   last = 0;
446   while ((need > 0) && (last != need))
447   {
448     last = need;
449     plugin->api->get_expiration (plugin->api->cls, &quota_processor, &need);
450   }
451 }
452
453
454 /**
455  * Function called to notify a client about the socket
456  * begin ready to queue more data.  "buf" will be
457  * NULL and "size" zero if the socket was closed for
458  * writing in the meantime.
459  *
460  * @param cls closure
461  * @param size number of bytes available in buf
462  * @param buf where the callee should write the message
463  * @return number of bytes written to buf
464  */
465 static size_t
466 transmit_callback (void *cls, size_t size, void *buf)
467 {
468   struct TransmitCallbackContext *tcc = cls;
469   size_t msize;
470
471   tcc->th = NULL;
472   GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
473   msize = ntohs (tcc->msg->size);
474   if (size == 0)
475   {
476     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
477                 _("Transmission to client failed!\n"));
478     GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
479     GNUNET_SERVER_client_drop (tcc->client);
480     GNUNET_free (tcc->msg);
481     GNUNET_free (tcc);
482     return 0;
483   }
484   GNUNET_assert (size >= msize);
485   memcpy (buf, tcc->msg, msize);
486   GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
487   GNUNET_SERVER_client_drop (tcc->client);
488   GNUNET_free (tcc->msg);
489   GNUNET_free (tcc);
490   return msize;
491 }
492
493
494 /**
495  * Transmit the given message to the client.
496  *
497  * @param client target of the message
498  * @param msg message to transmit, will be freed!
499  */
500 static void
501 transmit (struct GNUNET_SERVER_Client *client, struct GNUNET_MessageHeader *msg)
502 {
503   struct TransmitCallbackContext *tcc;
504
505   if (GNUNET_YES == cleaning_done)
506   {
507 #if DEBUG_DATASTORE
508     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
509                 "Shutdown in progress, aborting transmission.\n");
510 #endif
511     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
512     GNUNET_free (msg);
513     return;
514   }
515   tcc = GNUNET_malloc (sizeof (struct TransmitCallbackContext));
516   tcc->msg = msg;
517   tcc->client = client;
518   if (NULL ==
519       (tcc->th =
520        GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
521                                             GNUNET_TIME_UNIT_FOREVER_REL,
522                                             &transmit_callback, tcc)))
523   {
524     GNUNET_break (0);
525     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
526     GNUNET_free (msg);
527     GNUNET_free (tcc);
528     return;
529   }
530   GNUNET_SERVER_client_keep (client);
531   GNUNET_CONTAINER_DLL_insert (tcc_head, tcc_tail, tcc);
532 }
533
534
535 /**
536  * Transmit a status code to the client.
537  *
538  * @param client receiver of the response
539  * @param code status code
540  * @param msg optional error message (can be NULL)
541  */
542 static void
543 transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg)
544 {
545   struct StatusMessage *sm;
546   size_t slen;
547
548 #if DEBUG_DATASTORE
549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550               "Transmitting `%s' message with value %d and message `%s'\n",
551               "STATUS", code, msg != NULL ? msg : "(none)");
552 #endif
553   slen = (msg == NULL) ? 0 : strlen (msg) + 1;
554   sm = GNUNET_malloc (sizeof (struct StatusMessage) + slen);
555   sm->header.size = htons (sizeof (struct StatusMessage) + slen);
556   sm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
557   sm->status = htonl (code);
558   sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration);
559   if (slen > 0)
560     memcpy (&sm[1], msg, slen);
561   transmit (client, &sm->header);
562 }
563
564
565
566 /**
567  * Function that will transmit the given datastore entry
568  * to the client.
569  *
570  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
571  * @param key key for the content
572  * @param size number of bytes in data
573  * @param data content stored
574  * @param type type of the content
575  * @param priority priority of the content
576  * @param anonymity anonymity-level for the content
577  * @param expiration expiration time for the content
578  * @param uid unique identifier for the datum;
579  *        maybe 0 if no unique identifier is available
580  *
581  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
582  *         GNUNET_NO to delete the item and continue (if supported)
583  */
584 static int
585 transmit_item (void *cls, const GNUNET_HashCode * key, uint32_t size,
586                const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
587                uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
588                uint64_t uid)
589 {
590   struct GNUNET_SERVER_Client *client = cls;
591   struct GNUNET_MessageHeader *end;
592   struct DataMessage *dm;
593
594   if (key == NULL)
595   {
596     /* transmit 'DATA_END' */
597 #if DEBUG_DATASTORE
598     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' message\n",
599                 "DATA_END");
600 #endif
601     end = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
602     end->size = htons (sizeof (struct GNUNET_MessageHeader));
603     end->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
604     transmit (client, end);
605     GNUNET_SERVER_client_drop (client);
606     return GNUNET_OK;
607   }
608   GNUNET_assert (sizeof (struct DataMessage) + size <
609                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
610   dm = GNUNET_malloc (sizeof (struct DataMessage) + size);
611   dm->header.size = htons (sizeof (struct DataMessage) + size);
612   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
613   dm->rid = htonl (0);
614   dm->size = htonl (size);
615   dm->type = htonl (type);
616   dm->priority = htonl (priority);
617   dm->anonymity = htonl (anonymity);
618   dm->replication = htonl (0);
619   dm->reserved = htonl (0);
620   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
621   dm->uid = GNUNET_htonll (uid);
622   dm->key = *key;
623   memcpy (&dm[1], data, size);
624 #if DEBUG_DATASTORE
625   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
626               "Transmitting `%s' message for `%s' of type %u with expiration %llu (now: %llu)\n",
627               "DATA", GNUNET_h2s (key), type,
628               (unsigned long long) expiration.abs_value,
629               (unsigned long long) GNUNET_TIME_absolute_get ().abs_value);
630 #endif
631   GNUNET_STATISTICS_update (stats, gettext_noop ("# results found"), 1,
632                             GNUNET_NO);
633   transmit (client, &dm->header);
634   GNUNET_SERVER_client_drop (client);
635   return GNUNET_OK;
636 }
637
638
639 /**
640  * Handle RESERVE-message.
641  *
642  * @param cls closure
643  * @param client identification of the client
644  * @param message the actual message
645  */
646 static void
647 handle_reserve (void *cls, struct GNUNET_SERVER_Client *client,
648                 const struct GNUNET_MessageHeader *message)
649 {
650   /**
651    * Static counter to produce reservation identifiers.
652    */
653   static int reservation_gen;
654
655   const struct ReserveMessage *msg = (const struct ReserveMessage *) message;
656   struct ReservationList *e;
657   unsigned long long used;
658   unsigned long long req;
659   uint64_t amount;
660   uint32_t entries;
661
662 #if DEBUG_DATASTORE
663   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "RESERVE");
664 #endif
665   amount = GNUNET_ntohll (msg->amount);
666   entries = ntohl (msg->entries);
667   used = payload + reserved;
668   req =
669       amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
670   if (used + req > quota)
671   {
672     if (quota < used)
673       used = quota;             /* cheat a bit for error message (to avoid negative numbers) */
674     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
675                 _
676                 ("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
677                 quota - used, "RESERVE", req);
678     if (cache_size < req)
679     {
680       /* TODO: document this in the FAQ; essentially, if this
681        * message happens, the insertion request could be blocked
682        * by less-important content from migration because it is
683        * larger than 1/8th of the overall available space, and
684        * we only reserve 1/8th for "fresh" insertions */
685       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
686                   _
687                   ("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
688                   req, cache_size);
689       transmit_status (client, 0,
690                        gettext_noop
691                        ("Insufficient space to satisfy request and "
692                         "requested amount is larger than cache size"));
693     }
694     else
695     {
696       transmit_status (client, 0,
697                        gettext_noop ("Insufficient space to satisfy request"));
698     }
699     return;
700   }
701   reserved += req;
702   GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
703                          GNUNET_NO);
704   e = GNUNET_malloc (sizeof (struct ReservationList));
705   e->next = reservations;
706   reservations = e;
707   e->client = client;
708   e->amount = amount;
709   e->entries = entries;
710   e->rid = ++reservation_gen;
711   if (reservation_gen < 0)
712     reservation_gen = 0;        /* wrap around */
713   transmit_status (client, e->rid, NULL);
714 }
715
716
717 /**
718  * Handle RELEASE_RESERVE-message.
719  *
720  * @param cls closure
721  * @param client identification of the client
722  * @param message the actual message
723  */
724 static void
725 handle_release_reserve (void *cls, struct GNUNET_SERVER_Client *client,
726                         const struct GNUNET_MessageHeader *message)
727 {
728   const struct ReleaseReserveMessage *msg =
729       (const struct ReleaseReserveMessage *) message;
730   struct ReservationList *pos;
731   struct ReservationList *prev;
732   struct ReservationList *next;
733   int rid = ntohl (msg->rid);
734   unsigned long long rem;
735
736 #if DEBUG_DATASTORE
737   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
738               "RELEASE_RESERVE");
739 #endif
740   next = reservations;
741   prev = NULL;
742   while (NULL != (pos = next))
743   {
744     next = pos->next;
745     if (rid == pos->rid)
746     {
747       if (prev == NULL)
748         reservations = next;
749       else
750         prev->next = next;
751       rem =
752           pos->amount +
753           ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
754       GNUNET_assert (reserved >= rem);
755       reserved -= rem;
756       GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
757                              GNUNET_NO);
758 #if DEBUG_DATASTORE
759       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
760                   "Returning %llu remaining reserved bytes to storage pool\n",
761                   rem);
762 #endif
763       GNUNET_free (pos);
764       transmit_status (client, GNUNET_OK, NULL);
765       return;
766     }
767     prev = pos;
768   }
769   GNUNET_break (0);
770   transmit_status (client, GNUNET_SYSERR,
771                    gettext_noop ("Could not find matching reservation"));
772 }
773
774
775 /**
776  * Check that the given message is a valid data message.
777  *
778  * @return NULL if the message is not well-formed, otherwise the message
779  */
780 static const struct DataMessage *
781 check_data (const struct GNUNET_MessageHeader *message)
782 {
783   uint16_t size;
784   uint32_t dsize;
785   const struct DataMessage *dm;
786
787   size = ntohs (message->size);
788   if (size < sizeof (struct DataMessage))
789   {
790     GNUNET_break (0);
791     return NULL;
792   }
793   dm = (const struct DataMessage *) message;
794   dsize = ntohl (dm->size);
795   if (size != dsize + sizeof (struct DataMessage))
796   {
797     GNUNET_break (0);
798     return NULL;
799   }
800   return dm;
801 }
802
803
804 /**
805  * Context for a PUT request used to see if the content is
806  * already present.
807  */
808 struct PutContext
809 {
810   /**
811    * Client to notify on completion.
812    */
813   struct GNUNET_SERVER_Client *client;
814
815 #if ! HAVE_UNALIGNED_64_ACCESS
816   void *reserved;
817 #endif
818
819   /* followed by the 'struct DataMessage' */
820 };
821
822
823 /**
824  * Actually put the data message.
825  *
826  * @param client sender of the message
827  * @param dm message with the data to store
828  */
829 static void
830 execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm)
831 {
832   uint32_t size;
833   char *msg;
834   int ret;
835
836   size = ntohl (dm->size);
837   msg = NULL;
838   ret =
839       plugin->api->put (plugin->api->cls, &dm->key, size, &dm[1],
840                         ntohl (dm->type), ntohl (dm->priority),
841                         ntohl (dm->anonymity), ntohl (dm->replication),
842                         GNUNET_TIME_absolute_ntoh (dm->expiration), &msg);
843   if (GNUNET_OK == ret)
844   {
845     GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes stored"), size,
846                               GNUNET_YES);
847     GNUNET_CONTAINER_bloomfilter_add (filter, &dm->key);
848 #if DEBUG_DATASTORE
849     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850                 "Successfully stored %u bytes of type %u under key `%s'\n",
851                 size, ntohl (dm->type), GNUNET_h2s (&dm->key));
852 #endif
853   }
854   transmit_status (client, ret, msg);
855   GNUNET_free_non_null (msg);
856   if (quota - reserved - cache_size < payload)
857   {
858     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
859                 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
860                 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
861                 (unsigned long long) (quota - reserved - cache_size),
862                 (unsigned long long) payload);
863     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
864   }
865 }
866
867
868 /**
869  * Function that will check if the given datastore entry
870  * matches the put and if none match executes the put.
871  *
872  * @param cls closure, pointer to the client (of type 'struct PutContext').
873  * @param key key for the content
874  * @param size number of bytes in data
875  * @param data content stored
876  * @param type type of the content
877  * @param priority priority of the content
878  * @param anonymity anonymity-level for the content
879  * @param expiration expiration time for the content
880  * @param uid unique identifier for the datum;
881  *        maybe 0 if no unique identifier is available
882  *
883  * @return GNUNET_OK usually
884  *         GNUNET_NO to delete the item
885  */
886 static int
887 check_present (void *cls, const GNUNET_HashCode * key, uint32_t size,
888                const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
889                uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
890                uint64_t uid)
891 {
892   struct PutContext *pc = cls;
893   const struct DataMessage *dm;
894
895   dm = (const struct DataMessage *) &pc[1];
896   if (key == NULL)
897   {
898     execute_put (pc->client, dm);
899     GNUNET_SERVER_client_drop (pc->client);
900     GNUNET_free (pc);
901     return GNUNET_OK;
902   }
903   if ((GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
904       (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) || ((size == ntohl (dm->size)) &&
905                                                 (0 ==
906                                                  memcmp (&dm[1], data, size))))
907   {
908 #if DEBUG_MYSQL
909     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
910                 "Result already present in datastore\n");
911 #endif
912     /* FIXME: change API to allow increasing 'replication' counter */
913     if ((ntohl (dm->priority) > 0) ||
914         (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value >
915          expiration.abs_value))
916       plugin->api->update (plugin->api->cls, uid,
917                            (int32_t) ntohl (dm->priority),
918                            GNUNET_TIME_absolute_ntoh (dm->expiration), NULL);
919     transmit_status (pc->client, GNUNET_NO, NULL);
920     GNUNET_SERVER_client_drop (pc->client);
921     GNUNET_free (pc);
922   }
923   else
924   {
925     execute_put (pc->client, dm);
926     GNUNET_SERVER_client_drop (pc->client);
927     GNUNET_free (pc);
928   }
929   return GNUNET_OK;
930 }
931
932
933 /**
934  * Handle PUT-message.
935  *
936  * @param cls closure
937  * @param client identification of the client
938  * @param message the actual message
939  */
940 static void
941 handle_put (void *cls, struct GNUNET_SERVER_Client *client,
942             const struct GNUNET_MessageHeader *message)
943 {
944   const struct DataMessage *dm = check_data (message);
945   int rid;
946   struct ReservationList *pos;
947   struct PutContext *pc;
948   GNUNET_HashCode vhash;
949   uint32_t size;
950
951   if ((dm == NULL) || (ntohl (dm->type) == 0))
952   {
953     GNUNET_break (0);
954     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
955     return;
956   }
957 #if DEBUG_DATASTORE
958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
959               "Processing `%s' request for `%s' of type %u\n", "PUT",
960               GNUNET_h2s (&dm->key), ntohl (dm->type));
961 #endif
962   rid = ntohl (dm->rid);
963   size = ntohl (dm->size);
964   if (rid > 0)
965   {
966     pos = reservations;
967     while ((NULL != pos) && (rid != pos->rid))
968       pos = pos->next;
969     GNUNET_break (pos != NULL);
970     if (NULL != pos)
971     {
972       GNUNET_break (pos->entries > 0);
973       GNUNET_break (pos->amount >= size);
974       pos->entries--;
975       pos->amount -= size;
976       reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
977       GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
978                              GNUNET_NO);
979     }
980   }
981   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter, &dm->key))
982   {
983     GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
984     pc = GNUNET_malloc (sizeof (struct PutContext) + size +
985                         sizeof (struct DataMessage));
986     pc->client = client;
987     GNUNET_SERVER_client_keep (client);
988     memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
989     plugin->api->get_key (plugin->api->cls, 0, &dm->key, &vhash,
990                           ntohl (dm->type), &check_present, pc);
991     return;
992   }
993   execute_put (client, dm);
994 }
995
996
997 /**
998  * Handle GET-message.
999  *
1000  * @param cls closure
1001  * @param client identification of the client
1002  * @param message the actual message
1003  */
1004 static void
1005 handle_get (void *cls, struct GNUNET_SERVER_Client *client,
1006             const struct GNUNET_MessageHeader *message)
1007 {
1008   const struct GetMessage *msg;
1009   uint16_t size;
1010
1011   size = ntohs (message->size);
1012   if ((size != sizeof (struct GetMessage)) &&
1013       (size != sizeof (struct GetMessage) - sizeof (GNUNET_HashCode)))
1014   {
1015     GNUNET_break (0);
1016     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1017     return;
1018   }
1019   msg = (const struct GetMessage *) message;
1020 #if DEBUG_DATASTORE
1021   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022               "Processing `%s' request for `%s' of type %u\n", "GET",
1023               GNUNET_h2s (&msg->key), ntohl (msg->type));
1024 #endif
1025   GNUNET_STATISTICS_update (stats, gettext_noop ("# GET requests received"), 1,
1026                             GNUNET_NO);
1027   GNUNET_SERVER_client_keep (client);
1028   if ((size == sizeof (struct GetMessage)) &&
1029       (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)))
1030   {
1031     /* don't bother database... */
1032 #if DEBUG_DATASTORE
1033     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1034                 "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1035                 "GET", GNUNET_h2s (&msg->key));
1036 #endif
1037     GNUNET_STATISTICS_update (stats,
1038                               gettext_noop
1039                               ("# requests filtered by bloomfilter"), 1,
1040                               GNUNET_NO);
1041     transmit_item (client, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1042                    0);
1043     return;
1044   }
1045   plugin->api->get_key (plugin->api->cls, GNUNET_ntohll (msg->offset),
1046                         ((size ==
1047                           sizeof (struct GetMessage)) ? &msg->key : NULL), NULL,
1048                         ntohl (msg->type), &transmit_item, client);
1049 }
1050
1051
1052 /**
1053  * Handle UPDATE-message.
1054  *
1055  * @param cls closure
1056  * @param client identification of the client
1057  * @param message the actual message
1058  */
1059 static void
1060 handle_update (void *cls, struct GNUNET_SERVER_Client *client,
1061                const struct GNUNET_MessageHeader *message)
1062 {
1063   const struct UpdateMessage *msg;
1064   int ret;
1065   char *emsg;
1066
1067   GNUNET_STATISTICS_update (stats, gettext_noop ("# UPDATE requests received"),
1068                             1, GNUNET_NO);
1069   msg = (const struct UpdateMessage *) message;
1070   emsg = NULL;
1071 #if DEBUG_DATASTORE
1072   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request for %llu\n",
1073               "UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid));
1074 #endif
1075   ret =
1076       plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid),
1077                            (int32_t) ntohl (msg->priority),
1078                            GNUNET_TIME_absolute_ntoh (msg->expiration), &emsg);
1079   transmit_status (client, ret, emsg);
1080   GNUNET_free_non_null (emsg);
1081 }
1082
1083
1084 /**
1085  * Handle GET_REPLICATION-message.
1086  *
1087  * @param cls closure
1088  * @param client identification of the client
1089  * @param message the actual message
1090  */
1091 static void
1092 handle_get_replication (void *cls, struct GNUNET_SERVER_Client *client,
1093                         const struct GNUNET_MessageHeader *message)
1094 {
1095 #if DEBUG_DATASTORE
1096   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
1097               "GET_REPLICATION");
1098 #endif
1099   GNUNET_STATISTICS_update (stats,
1100                             gettext_noop
1101                             ("# GET REPLICATION requests received"), 1,
1102                             GNUNET_NO);
1103   GNUNET_SERVER_client_keep (client);
1104   plugin->api->get_replication (plugin->api->cls, &transmit_item, client);
1105 }
1106
1107
1108 /**
1109  * Handle GET_ZERO_ANONYMITY-message.
1110  *
1111  * @param cls closure
1112  * @param client identification of the client
1113  * @param message the actual message
1114  */
1115 static void
1116 handle_get_zero_anonymity (void *cls, struct GNUNET_SERVER_Client *client,
1117                            const struct GNUNET_MessageHeader *message)
1118 {
1119   const struct GetZeroAnonymityMessage *msg =
1120       (const struct GetZeroAnonymityMessage *) message;
1121   enum GNUNET_BLOCK_Type type;
1122
1123   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1124   if (type == GNUNET_BLOCK_TYPE_ANY)
1125   {
1126     GNUNET_break (0);
1127     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1128     return;
1129   }
1130 #if DEBUG_DATASTORE
1131   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
1132               "GET_ZERO_ANONYMITY");
1133 #endif
1134   GNUNET_STATISTICS_update (stats,
1135                             gettext_noop
1136                             ("# GET ZERO ANONYMITY requests received"), 1,
1137                             GNUNET_NO);
1138   GNUNET_SERVER_client_keep (client);
1139   plugin->api->get_zero_anonymity (plugin->api->cls,
1140                                    GNUNET_ntohll (msg->offset), type,
1141                                    &transmit_item, client);
1142 }
1143
1144
1145 /**
1146  * Callback function that will cause the item that is passed
1147  * in to be deleted (by returning GNUNET_NO).
1148  */
1149 static int
1150 remove_callback (void *cls, const GNUNET_HashCode * key, uint32_t size,
1151                  const void *data, enum GNUNET_BLOCK_Type type,
1152                  uint32_t priority, uint32_t anonymity,
1153                  struct GNUNET_TIME_Absolute expiration, uint64_t uid)
1154 {
1155   struct GNUNET_SERVER_Client *client = cls;
1156
1157   if (key == NULL)
1158   {
1159 #if DEBUG_DATASTORE
1160     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1161                 "No further matches for `%s' request.\n", "REMOVE");
1162 #endif
1163     transmit_status (client, GNUNET_NO, _("Content not found"));
1164     GNUNET_SERVER_client_drop (client);
1165     return GNUNET_OK;           /* last item */
1166   }
1167 #if DEBUG_DATASTORE
1168   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1169               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1170               (unsigned long long) uid, "REMOVE", GNUNET_h2s (key), type);
1171 #endif
1172   GNUNET_STATISTICS_update (stats,
1173                             gettext_noop ("# bytes removed (explicit request)"),
1174                             size, GNUNET_YES);
1175   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
1176   transmit_status (client, GNUNET_OK, NULL);
1177   GNUNET_SERVER_client_drop (client);
1178   return GNUNET_NO;
1179 }
1180
1181
1182 /**
1183  * Handle REMOVE-message.
1184  *
1185  * @param cls closure
1186  * @param client identification of the client
1187  * @param message the actual message
1188  */
1189 static void
1190 handle_remove (void *cls, struct GNUNET_SERVER_Client *client,
1191                const struct GNUNET_MessageHeader *message)
1192 {
1193   const struct DataMessage *dm = check_data (message);
1194   GNUNET_HashCode vhash;
1195
1196   if (dm == NULL)
1197   {
1198     GNUNET_break (0);
1199     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1200     return;
1201   }
1202 #if DEBUG_DATASTORE
1203   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1204               "Processing `%s' request for `%s' of type %u\n", "REMOVE",
1205               GNUNET_h2s (&dm->key), ntohl (dm->type));
1206 #endif
1207   GNUNET_STATISTICS_update (stats, gettext_noop ("# REMOVE requests received"),
1208                             1, GNUNET_NO);
1209   GNUNET_SERVER_client_keep (client);
1210   GNUNET_CRYPTO_hash (&dm[1], ntohl (dm->size), &vhash);
1211   plugin->api->get_key (plugin->api->cls, 0, &dm->key, &vhash,
1212                         (enum GNUNET_BLOCK_Type) ntohl (dm->type),
1213                         &remove_callback, client);
1214 }
1215
1216
1217 /**
1218  * Handle DROP-message.
1219  *
1220  * @param cls closure
1221  * @param client identification of the client
1222  * @param message the actual message
1223  */
1224 static void
1225 handle_drop (void *cls, struct GNUNET_SERVER_Client *client,
1226              const struct GNUNET_MessageHeader *message)
1227 {
1228 #if DEBUG_DATASTORE
1229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "DROP");
1230 #endif
1231   do_drop = GNUNET_YES;
1232   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1233 }
1234
1235
1236 /**
1237  * Function called by plugins to notify us about a
1238  * change in their disk utilization.
1239  *
1240  * @param cls closure (NULL)
1241  * @param delta change in disk utilization,
1242  *        0 for "reset to empty"
1243  */
1244 static void
1245 disk_utilization_change_cb (void *cls, int delta)
1246 {
1247   if ((delta < 0) && (payload < -delta))
1248   {
1249     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1250                 _
1251                 ("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1252                 (long long) payload, (long long) -delta);
1253     payload = plugin->api->estimate_size (plugin->api->cls);
1254     sync_stats ();
1255     return;
1256   }
1257   payload += delta;
1258   lastSync++;
1259   if (lastSync >= MAX_STAT_SYNC_LAG)
1260     sync_stats ();
1261 }
1262
1263
1264 /**
1265  * Callback function to process statistic values.
1266  *
1267  * @param cls closure (struct Plugin*)
1268  * @param subsystem name of subsystem that created the statistic
1269  * @param name the name of the datum
1270  * @param value the current value
1271  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1272  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1273  */
1274 static int
1275 process_stat_in (void *cls, const char *subsystem, const char *name,
1276                  uint64_t value, int is_persistent)
1277 {
1278   GNUNET_assert (stats_worked == GNUNET_NO);
1279   stats_worked = GNUNET_YES;
1280   payload += value;
1281 #if DEBUG_SQLITE
1282   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1283               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1284               abs_value, payload);
1285 #endif
1286   return GNUNET_OK;
1287 }
1288
1289
1290 static void
1291 process_stat_done (void *cls, int success)
1292 {
1293   struct DatastorePlugin *plugin = cls;
1294
1295   stat_get = NULL;
1296   if (stats_worked == GNUNET_NO)
1297     payload = plugin->api->estimate_size (plugin->api->cls);
1298 }
1299
1300
1301 /**
1302  * Load the datastore plugin.
1303  */
1304 static struct DatastorePlugin *
1305 load_plugin ()
1306 {
1307   struct DatastorePlugin *ret;
1308   char *libname;
1309
1310   ret = GNUNET_malloc (sizeof (struct DatastorePlugin));
1311   ret->env.cfg = cfg;
1312   ret->env.duc = &disk_utilization_change_cb;
1313   ret->env.cls = NULL;
1314   GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Loading `%s' datastore plugin\n"),
1315               plugin_name);
1316   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", plugin_name);
1317   ret->short_name = GNUNET_strdup (plugin_name);
1318   ret->lib_name = libname;
1319   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1320   if (ret->api == NULL)
1321   {
1322     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1323                 _("Failed to load datastore plugin for `%s'\n"), plugin_name);
1324     GNUNET_free (ret->short_name);
1325     GNUNET_free (libname);
1326     GNUNET_free (ret);
1327     return NULL;
1328   }
1329   return ret;
1330 }
1331
1332
1333 /**
1334  * Function called when the service shuts
1335  * down.  Unloads our datastore plugin.
1336  *
1337  * @param plug plugin to unload
1338  */
1339 static void
1340 unload_plugin (struct DatastorePlugin *plug)
1341 {
1342 #if DEBUG_DATASTORE
1343   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1344               "Datastore service is unloading plugin...\n");
1345 #endif
1346   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1347   GNUNET_free (plug->lib_name);
1348   GNUNET_free (plug->short_name);
1349   GNUNET_free (plug);
1350   GNUNET_free (quota_stat_name);
1351   quota_stat_name = NULL;
1352 }
1353
1354
1355 /**
1356  * Final task run after shutdown.  Unloads plugins and disconnects us from
1357  * statistics.
1358  */
1359 static void
1360 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1361 {
1362   if (lastSync > 0)
1363     sync_stats ();
1364   if (GNUNET_YES == do_drop)
1365     plugin->api->drop (plugin->api->cls);
1366   unload_plugin (plugin);
1367   plugin = NULL;
1368   if (filter != NULL)
1369   {
1370     GNUNET_CONTAINER_bloomfilter_free (filter);
1371     filter = NULL;
1372   }
1373   if (stat_get != NULL)
1374   {
1375     GNUNET_STATISTICS_get_cancel (stat_get);
1376     stat_get = NULL;
1377   }
1378   if (stats != NULL)
1379   {
1380     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1381     stats = NULL;
1382   }
1383   GNUNET_free_non_null (plugin_name);
1384   plugin_name = NULL;
1385 }
1386
1387
1388 /**
1389  * Last task run during shutdown.  Disconnects us from
1390  * the transport and core.
1391  */
1392 static void
1393 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1394 {
1395   struct TransmitCallbackContext *tcc;
1396
1397   cleaning_done = GNUNET_YES;
1398   while (NULL != (tcc = tcc_head))
1399   {
1400     GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
1401     if (tcc->th != NULL)
1402     {
1403       GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1404       GNUNET_SERVER_client_drop (tcc->client);
1405     }
1406     GNUNET_free (tcc->msg);
1407     GNUNET_free (tcc);
1408   }
1409   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1410   {
1411     GNUNET_SCHEDULER_cancel (expired_kill_task);
1412     expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1413   }
1414   GNUNET_SCHEDULER_add_continuation (&unload_task, NULL,
1415                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1416 }
1417
1418
1419 /**
1420  * Function that removes all active reservations made
1421  * by the given client and releases the space for other
1422  * requests.
1423  *
1424  * @param cls closure
1425  * @param client identification of the client
1426  */
1427 static void
1428 cleanup_reservations (void *cls, struct GNUNET_SERVER_Client *client)
1429 {
1430   struct ReservationList *pos;
1431   struct ReservationList *prev;
1432   struct ReservationList *next;
1433
1434   if (client == NULL)
1435     return;
1436   prev = NULL;
1437   pos = reservations;
1438   while (NULL != pos)
1439   {
1440     next = pos->next;
1441     if (pos->client == client)
1442     {
1443       if (prev == NULL)
1444         reservations = next;
1445       else
1446         prev->next = next;
1447       reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1448       GNUNET_free (pos);
1449     }
1450     else
1451     {
1452       prev = pos;
1453     }
1454     pos = next;
1455   }
1456   GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
1457                          GNUNET_NO);
1458 }
1459
1460
1461 /**
1462  * Adds a given key to the bloomfilter 'count' times.
1463  *
1464  * @param cls the bloomfilter
1465  * @param key key to add
1466  * @param count number of times to add key
1467  */
1468 static void
1469 add_key_to_bloomfilter (void *cls,
1470                         const GNUNET_HashCode *key,
1471                         unsigned int count)
1472 {
1473   struct GNUNET_CONTAINER_BloomFilter *bf = cls;
1474   while (0 < count--)
1475     GNUNET_CONTAINER_bloomfilter_add (bf, key);
1476 }
1477
1478
1479 /**
1480  * Process datastore requests.
1481  *
1482  * @param cls closure
1483  * @param server the initialized server
1484  * @param c configuration to use
1485  */
1486 static void
1487 run (void *cls, struct GNUNET_SERVER_Handle *server,
1488      const struct GNUNET_CONFIGURATION_Handle *c)
1489 {
1490   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1491     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1492      sizeof (struct ReserveMessage)},
1493     {&handle_release_reserve, NULL,
1494      GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1495      sizeof (struct ReleaseReserveMessage)},
1496     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
1497     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1498      sizeof (struct UpdateMessage)},
1499     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
1500     {&handle_get_replication, NULL,
1501      GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1502      sizeof (struct GNUNET_MessageHeader)},
1503     {&handle_get_zero_anonymity, NULL,
1504      GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1505      sizeof (struct GetZeroAnonymityMessage)},
1506     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
1507     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1508      sizeof (struct GNUNET_MessageHeader)},
1509     {NULL, NULL, 0, 0}
1510   };
1511   char *fn;  
1512   char *pfn;
1513   unsigned int bf_size;
1514   int refresh_bf;
1515
1516   cfg = c;
1517   if (GNUNET_OK !=
1518       GNUNET_CONFIGURATION_get_value_string (cfg, "DATASTORE", "DATABASE",
1519                                              &plugin_name))
1520   {
1521     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1522                 _("No `%s' specified for `%s' in configuration!\n"), "DATABASE",
1523                 "DATASTORE");
1524     return;
1525   }
1526   GNUNET_asprintf (&quota_stat_name,
1527                    _("# bytes used in file-sharing datastore `%s'"),
1528                    plugin_name);
1529   if (GNUNET_OK !=
1530       GNUNET_CONFIGURATION_get_value_size (cfg, "DATASTORE", "QUOTA", &quota))
1531   {
1532     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1533                 _("No `%s' specified for `%s' in configuration!\n"), "QUOTA",
1534                 "DATASTORE");
1535     return;
1536   }
1537   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1538   GNUNET_STATISTICS_set (stats, gettext_noop ("# quota"), quota, GNUNET_NO);
1539   cache_size = quota / 8;       /* Or should we make this an option? */
1540   GNUNET_STATISTICS_set (stats, gettext_noop ("# cache size"), cache_size,
1541                          GNUNET_NO);
1542   if (quota / (32 * 1024LL) > (1 << 31)) 
1543     bf_size = (1 << 31);          /* absolute limit: ~2 GB, beyond that BF just won't help anyway */
1544   else
1545     bf_size = quota / (32 * 1024LL);         /* 8 bit per entry, 1 bit per 32 kb in DB */
1546   fn = NULL;
1547   if ((GNUNET_OK !=
1548        GNUNET_CONFIGURATION_get_value_filename (cfg, "DATASTORE", "BLOOMFILTER",
1549                                                 &fn)) ||
1550       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
1551   {
1552     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1553                 _("Could not use specified filename `%s' for bloomfilter.\n"),
1554                 fn != NULL ? fn : "");
1555     GNUNET_free_non_null (fn);
1556     fn = NULL;
1557   }
1558   if (fn != NULL)
1559   {
1560     GNUNET_asprintf (&pfn, "%s.%s\n", fn, plugin_name);
1561     if (GNUNET_YES == GNUNET_DISK_file_test (pfn))
1562     {
1563       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1564       if (NULL == filter)
1565       {
1566         /* file exists but not valid, remove and try again, but refresh */
1567         if (0 != UNLINK (pfn))
1568         {
1569           /* failed to remove, run without file */
1570           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1571                       _("Failed to remove bogus bloomfilter file `%s'\n"),
1572                       pfn);
1573           GNUNET_free (pfn);
1574           pfn = NULL;
1575           filter = GNUNET_CONTAINER_bloomfilter_load (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1576           refresh_bf = GNUNET_YES;
1577         }
1578         else
1579         {
1580           /* try again after remove */
1581           filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1582           refresh_bf = GNUNET_YES;
1583           if (NULL == filter)
1584           {
1585             /* failed yet again, give up on using file */
1586             GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1587                         _("Failed to remove bogus bloomfilter file `%s'\n"),
1588                         pfn);
1589             GNUNET_free (pfn);
1590             pfn = NULL;
1591             filter = GNUNET_CONTAINER_bloomfilter_load (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1592           }
1593         }
1594       }
1595       else
1596       {
1597         /* normal case: have an existing valid bf file, no need to refresh */
1598         refresh_bf = GNUNET_NO;
1599       }
1600     }
1601     else
1602     {
1603       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1604       refresh_bf = GNUNET_YES;
1605     }
1606     GNUNET_free (pfn);
1607   }
1608   else
1609   {
1610     filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);      /* approx. 3% false positives at max use */
1611     refresh_bf = GNUNET_YES;
1612   }
1613   GNUNET_free_non_null (fn);
1614   if (filter == NULL)
1615   {
1616     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1617                 _("Failed to initialize bloomfilter.\n"));
1618     if (stats != NULL)
1619     {
1620       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1621       stats = NULL;
1622     }
1623     return;
1624   }
1625   plugin = load_plugin ();
1626   if (NULL == plugin)
1627   {
1628     GNUNET_CONTAINER_bloomfilter_free (filter);
1629     filter = NULL;
1630     if (stats != NULL)
1631     {
1632       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1633       stats = NULL;
1634     }
1635     return;
1636   }
1637   stat_get =
1638       GNUNET_STATISTICS_get (stats, "datastore", quota_stat_name,
1639                              GNUNET_TIME_UNIT_SECONDS, &process_stat_done,
1640                              &process_stat_in, plugin);
1641   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1642   GNUNET_SERVER_add_handlers (server, handlers);
1643   if (GNUNET_YES == refresh_bf)
1644   {
1645     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1646                 _("Rebuilding bloomfilter.  Please be patient.\n"));
1647     if (NULL != plugin->api->get_keys)
1648       plugin->api->get_keys (plugin->api->cls, &add_key_to_bloomfilter, filter);  
1649     else
1650       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1651                   _("Plugin does not support get_keys function. Please fix!\n"));
1652
1653     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1654                 _("Bloomfilter construction complete.\n"));
1655   }
1656   expired_kill_task =
1657       GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1658                                           &delete_expired, NULL);
1659   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleaning_task,
1660                                 NULL);
1661 }
1662
1663
1664 /**
1665  * The main function for the datastore service.
1666  *
1667  * @param argc number of arguments from the command line
1668  * @param argv command line arguments
1669  * @return 0 ok, 1 on error
1670  */
1671 int
1672 main (int argc, char *const *argv)
1673 {
1674   int ret;
1675
1676   ret =
1677       (GNUNET_OK ==
1678        GNUNET_SERVICE_run (argc, argv, "datastore", GNUNET_SERVICE_OPTION_NONE,
1679                            &run, NULL)) ? 0 : 1;
1680   return ret;
1681 }
1682
1683
1684 /* end of gnunet-service-datastore.c */