1d7e8cd2beb68e185c8820c0d929f8df4fe54f72
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51 /**
52  * Name under which we store current space consumption.
53  */
54 static char *quota_stat_name;
55
56 /**
57  * After how many payload-changing operations
58  * do we sync our statistics?
59  */
60 #define MAX_STAT_SYNC_LAG 50
61
62
63 /**
64  * Our datastore plugin.
65  */
66 struct DatastorePlugin
67 {
68
69   /**
70    * API of the transport as returned by the plugin's
71    * initialization function.
72    */
73   struct GNUNET_DATASTORE_PluginFunctions *api;
74
75   /**
76    * Short name for the plugin (i.e. "sqlite").
77    */
78   char *short_name;
79
80   /**
81    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
82    */
83   char *lib_name;
84
85   /**
86    * Environment this transport service is using
87    * for this plugin.
88    */
89   struct GNUNET_DATASTORE_PluginEnvironment env;
90
91 };
92
93
94 /**
95  * Linked list of active reservations.
96  */
97 struct ReservationList
98 {
99
100   /**
101    * This is a linked list.
102    */
103   struct ReservationList *next;
104
105   /**
106    * Client that made the reservation.
107    */
108   struct GNUNET_SERVER_Client *client;
109
110   /**
111    * Number of bytes (still) reserved.
112    */
113   uint64_t amount;
114
115   /**
116    * Number of items (still) reserved.
117    */
118   uint64_t entries;
119
120   /**
121    * Reservation identifier.
122    */
123   int32_t rid;
124
125 };
126
127
128
129 /**
130  * Our datastore plugin (NULL if not available).
131  */
132 static struct DatastorePlugin *plugin;
133
134 /**
135  * Linked list of space reservations made by clients.
136  */
137 static struct ReservationList *reservations;
138
139 /**
140  * Bloomfilter to quickly tell if we don't have the content.
141  */
142 static struct GNUNET_CONTAINER_BloomFilter *filter;
143
144 /**
145  * How much space are we allowed to use?
146  */
147 static unsigned long long quota;
148
149 /**
150  * Should the database be dropped on exit?
151  */
152 static int do_drop;
153
154 /**
155  * Name of our plugin.
156  */
157 static char *plugin_name;
158
159 /**
160  * How much space are we using for the cache?  (space available for
161  * insertions that will be instantly reclaimed by discarding less
162  * important content --- or possibly whatever we just inserted into
163  * the "cache").
164  */
165 static unsigned long long cache_size;
166
167 /**
168  * How much space have we currently reserved?
169  */
170 static unsigned long long reserved;
171
172 /**
173  * How much data are we currently storing
174  * in the database?
175  */
176 static unsigned long long payload;
177
178 /**
179  * Number of updates that were made to the
180  * payload value since we last synchronized
181  * it with the statistics service.
182  */
183 static unsigned int lastSync;
184
185 /**
186  * Did we get an answer from statistics?
187  */
188 static int stats_worked;
189
190 /**
191  * Identity of the task that is used to delete
192  * expired content.
193  */
194 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
195
196 /**
197  * Our configuration.
198  */
199 const struct GNUNET_CONFIGURATION_Handle *cfg;
200
201 /**
202  * Minimum time that content should have to not be discarded instantly
203  * (time stamp of any content that we've been discarding recently to
204  * stay below the quota).  FOREVER if we had to expire content with
205  * non-zero priority.
206  */
207 static struct GNUNET_TIME_Absolute min_expiration;
208
209 /**
210  * Handle for reporting statistics.
211  */
212 static struct GNUNET_STATISTICS_Handle *stats;
213
214
215 /**
216  * Synchronize our utilization statistics with the
217  * statistics service.
218  */
219 static void
220 sync_stats ()
221 {
222   GNUNET_STATISTICS_set (stats, quota_stat_name, payload, GNUNET_YES);
223   GNUNET_STATISTICS_set (stats, "# utilization by current datastore", payload, GNUNET_NO);
224   lastSync = 0;
225 }
226
227
228
229 /**
230  * Context for transmitting replies to clients.
231  */
232 struct TransmitCallbackContext
233 {
234
235   /**
236    * We keep these in a doubly-linked list (for cleanup).
237    */
238   struct TransmitCallbackContext *next;
239
240   /**
241    * We keep these in a doubly-linked list (for cleanup).
242    */
243   struct TransmitCallbackContext *prev;
244
245   /**
246    * The message that we're asked to transmit.
247    */
248   struct GNUNET_MessageHeader *msg;
249
250   /**
251    * Handle for the transmission request.
252    */
253   struct GNUNET_CONNECTION_TransmitHandle *th;
254
255   /**
256    * Client that we are transmitting to.
257    */
258   struct GNUNET_SERVER_Client *client;
259
260 };
261
262
263 /**
264  * Head of the doubly-linked list (for cleanup).
265  */
266 static struct TransmitCallbackContext *tcc_head;
267
268 /**
269  * Tail of the doubly-linked list (for cleanup).
270  */
271 static struct TransmitCallbackContext *tcc_tail;
272
273 /**
274  * Have we already cleaned up the TCCs and are hence no longer
275  * willing (or able) to transmit anything to anyone?
276  */
277 static int cleaning_done;
278
279 /**
280  * Handle for pending get request.
281  */
282 static struct GNUNET_STATISTICS_GetHandle *stat_get;
283
284
285 /**
286  * Task that is used to remove expired entries from
287  * the datastore.  This task will schedule itself
288  * again automatically to always delete all expired
289  * content quickly.
290  *
291  * @param cls not used
292  * @param tc task context
293  */
294 static void
295 delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
296
297
298 /**
299  * Iterate over the expired items stored in the datastore.
300  * Delete all expired items; once we have processed all
301  * expired items, re-schedule the "delete_expired" task.
302  *
303  * @param cls not used
304  * @param key key for the content
305  * @param size number of bytes in data
306  * @param data content stored
307  * @param type type of the content
308  * @param priority priority of the content
309  * @param anonymity anonymity-level for the content
310  * @param expiration expiration time for the content
311  * @param uid unique identifier for the datum;
312  *        maybe 0 if no unique identifier is available
313  *
314  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
315  *         (continue on call to "next", of course),
316  *         GNUNET_NO to delete the item and continue (if supported)
317  */
318 static int
319 expired_processor (void *cls, const GNUNET_HashCode * key, uint32_t size,
320                    const void *data, enum GNUNET_BLOCK_Type type,
321                    uint32_t priority, uint32_t anonymity,
322                    struct GNUNET_TIME_Absolute expiration, uint64_t uid)
323 {
324   struct GNUNET_TIME_Absolute now;
325
326   if (key == NULL)
327   {
328     expired_kill_task =
329         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
330                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
331                                                     &delete_expired, NULL);
332     return GNUNET_SYSERR;
333   }
334   now = GNUNET_TIME_absolute_get ();
335   if (expiration.abs_value > now.abs_value)
336   {
337     /* finished processing */
338     expired_kill_task =
339         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
340                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
341                                                     &delete_expired, NULL);
342     return GNUNET_SYSERR;
343   }
344 #if DEBUG_DATASTORE
345   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
346               "Deleting content `%s' of type %u that expired %llu ms ago\n",
347               GNUNET_h2s (key), type,
348               (unsigned long long) (now.abs_value - expiration.abs_value));
349 #endif
350   min_expiration = now;
351   GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes expired"), size,
352                             GNUNET_YES);
353   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
354   expired_kill_task =
355       GNUNET_SCHEDULER_add_delayed_with_priority (MIN_EXPIRE_DELAY,
356                                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
357                                                   &delete_expired, NULL);
358   return GNUNET_NO;
359 }
360
361
362 /**
363  * Task that is used to remove expired entries from
364  * the datastore.  This task will schedule itself
365  * again automatically to always delete all expired
366  * content quickly.
367  *
368  * @param cls not used
369  * @param tc task context
370  */
371 static void
372 delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
373 {
374   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
375   plugin->api->get_expiration (plugin->api->cls, &expired_processor, NULL);
376 }
377
378
379 /**
380  * An iterator over a set of items stored in the datastore
381  * that deletes until we're happy with respect to our quota.
382  *
383  * @param cls closure
384  * @param key key for the content
385  * @param size number of bytes in data
386  * @param data content stored
387  * @param type type of the content
388  * @param priority priority of the content
389  * @param anonymity anonymity-level for the content
390  * @param expiration expiration time for the content
391  * @param uid unique identifier for the datum;
392  *        maybe 0 if no unique identifier is available
393  *
394  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
395  *         (continue on call to "next", of course),
396  *         GNUNET_NO to delete the item and continue (if supported)
397  */
398 static int
399 quota_processor (void *cls, const GNUNET_HashCode * key, uint32_t size,
400                  const void *data, enum GNUNET_BLOCK_Type type,
401                  uint32_t priority, uint32_t anonymity,
402                  struct GNUNET_TIME_Absolute expiration, uint64_t uid)
403 {
404   unsigned long long *need = cls;
405
406   if (NULL == key)
407     return GNUNET_SYSERR;
408 #if DEBUG_DATASTORE
409   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
410               "Deleting %llu bytes of low-priority (%u) content `%s' of type %u at %llu ms prior to expiration (still trying to free another %llu bytes)\n",
411               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
412               (unsigned int) priority,
413               GNUNET_h2s (key), type, 
414               (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
415               *need);
416 #endif
417   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
418     *need = 0;
419   else
420     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
421   if (priority > 0)
422     min_expiration = GNUNET_TIME_UNIT_FOREVER_ABS;
423   else
424     min_expiration = expiration;
425   GNUNET_STATISTICS_update (stats,
426                             gettext_noop ("# bytes purged (low-priority)"),
427                             size, GNUNET_YES);
428   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
429   return GNUNET_NO;
430 }
431
432
433 /**
434  * Manage available disk space by running tasks
435  * that will discard content if necessary.  This
436  * function will be run whenever a request for
437  * "need" bytes of storage could only be satisfied
438  * by eating into the "cache" (and we want our cache
439  * space back).
440  *
441  * @param need number of bytes of content that were
442  *        placed into the "cache" (and hence the
443  *        number of bytes that should be removed).
444  */
445 static void
446 manage_space (unsigned long long need)
447 {
448   unsigned long long last;
449
450 #if DEBUG_DATASTORE
451   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
452               "Asked to free up %llu bytes of cache space\n", need);
453 #endif
454   last = 0;
455   while ((need > 0) && (last != need))
456   {
457     last = need;
458     plugin->api->get_expiration (plugin->api->cls, &quota_processor, &need);
459   }
460 }
461
462
463 /**
464  * Function called to notify a client about the socket
465  * begin ready to queue more data.  "buf" will be
466  * NULL and "size" zero if the socket was closed for
467  * writing in the meantime.
468  *
469  * @param cls closure
470  * @param size number of bytes available in buf
471  * @param buf where the callee should write the message
472  * @return number of bytes written to buf
473  */
474 static size_t
475 transmit_callback (void *cls, size_t size, void *buf)
476 {
477   struct TransmitCallbackContext *tcc = cls;
478   size_t msize;
479
480   tcc->th = NULL;
481   GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
482   msize = ntohs (tcc->msg->size);
483   if (size == 0)
484   {
485     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
486                 _("Transmission to client failed!\n"));
487     GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
488     GNUNET_SERVER_client_drop (tcc->client);
489     GNUNET_free (tcc->msg);
490     GNUNET_free (tcc);
491     return 0;
492   }
493   GNUNET_assert (size >= msize);
494   memcpy (buf, tcc->msg, msize);
495   GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
496   GNUNET_SERVER_client_drop (tcc->client);
497   GNUNET_free (tcc->msg);
498   GNUNET_free (tcc);
499   return msize;
500 }
501
502
503 /**
504  * Transmit the given message to the client.
505  *
506  * @param client target of the message
507  * @param msg message to transmit, will be freed!
508  */
509 static void
510 transmit (struct GNUNET_SERVER_Client *client, struct GNUNET_MessageHeader *msg)
511 {
512   struct TransmitCallbackContext *tcc;
513
514   if (GNUNET_YES == cleaning_done)
515   {
516 #if DEBUG_DATASTORE
517     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
518                 "Shutdown in progress, aborting transmission.\n");
519 #endif
520     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
521     GNUNET_free (msg);
522     return;
523   }
524   tcc = GNUNET_malloc (sizeof (struct TransmitCallbackContext));
525   tcc->msg = msg;
526   tcc->client = client;
527   if (NULL ==
528       (tcc->th =
529        GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
530                                             GNUNET_TIME_UNIT_FOREVER_REL,
531                                             &transmit_callback, tcc)))
532   {
533     GNUNET_break (0);
534     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
535     GNUNET_free (msg);
536     GNUNET_free (tcc);
537     return;
538   }
539   GNUNET_SERVER_client_keep (client);
540   GNUNET_CONTAINER_DLL_insert (tcc_head, tcc_tail, tcc);
541 }
542
543
544 /**
545  * Transmit a status code to the client.
546  *
547  * @param client receiver of the response
548  * @param code status code
549  * @param msg optional error message (can be NULL)
550  */
551 static void
552 transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg)
553 {
554   struct StatusMessage *sm;
555   size_t slen;
556
557 #if DEBUG_DATASTORE
558   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
559               "Transmitting `%s' message with value %d and message `%s'\n",
560               "STATUS", code, msg != NULL ? msg : "(none)");
561 #endif
562   slen = (msg == NULL) ? 0 : strlen (msg) + 1;
563   sm = GNUNET_malloc (sizeof (struct StatusMessage) + slen);
564   sm->header.size = htons (sizeof (struct StatusMessage) + slen);
565   sm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
566   sm->status = htonl (code);
567   sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration);
568   if (slen > 0)
569     memcpy (&sm[1], msg, slen);
570   transmit (client, &sm->header);
571 }
572
573
574
575 /**
576  * Function that will transmit the given datastore entry
577  * to the client.
578  *
579  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
580  * @param key key for the content
581  * @param size number of bytes in data
582  * @param data content stored
583  * @param type type of the content
584  * @param priority priority of the content
585  * @param anonymity anonymity-level for the content
586  * @param expiration expiration time for the content
587  * @param uid unique identifier for the datum;
588  *        maybe 0 if no unique identifier is available
589  *
590  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
591  *         GNUNET_NO to delete the item and continue (if supported)
592  */
593 static int
594 transmit_item (void *cls, const GNUNET_HashCode * key, uint32_t size,
595                const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
596                uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
597                uint64_t uid)
598 {
599   struct GNUNET_SERVER_Client *client = cls;
600   struct GNUNET_MessageHeader *end;
601   struct DataMessage *dm;
602
603   if (key == NULL)
604   {
605     /* transmit 'DATA_END' */
606 #if DEBUG_DATASTORE
607     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' message\n",
608                 "DATA_END");
609 #endif
610     end = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
611     end->size = htons (sizeof (struct GNUNET_MessageHeader));
612     end->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
613     transmit (client, end);
614     GNUNET_SERVER_client_drop (client);
615     return GNUNET_OK;
616   }
617   GNUNET_assert (sizeof (struct DataMessage) + size <
618                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
619   dm = GNUNET_malloc (sizeof (struct DataMessage) + size);
620   dm->header.size = htons (sizeof (struct DataMessage) + size);
621   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
622   dm->rid = htonl (0);
623   dm->size = htonl (size);
624   dm->type = htonl (type);
625   dm->priority = htonl (priority);
626   dm->anonymity = htonl (anonymity);
627   dm->replication = htonl (0);
628   dm->reserved = htonl (0);
629   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
630   dm->uid = GNUNET_htonll (uid);
631   dm->key = *key;
632   memcpy (&dm[1], data, size);
633 #if DEBUG_DATASTORE
634   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
635               "Transmitting `%s' message for `%s' of type %u with expiration %llu (now: %llu)\n",
636               "DATA", GNUNET_h2s (key), type,
637               (unsigned long long) expiration.abs_value,
638               (unsigned long long) GNUNET_TIME_absolute_get ().abs_value);
639 #endif
640   GNUNET_STATISTICS_update (stats, gettext_noop ("# results found"), 1,
641                             GNUNET_NO);
642   transmit (client, &dm->header);
643   GNUNET_SERVER_client_drop (client);
644   return GNUNET_OK;
645 }
646
647
648 /**
649  * Handle RESERVE-message.
650  *
651  * @param cls closure
652  * @param client identification of the client
653  * @param message the actual message
654  */
655 static void
656 handle_reserve (void *cls, struct GNUNET_SERVER_Client *client,
657                 const struct GNUNET_MessageHeader *message)
658 {
659   /**
660    * Static counter to produce reservation identifiers.
661    */
662   static int reservation_gen;
663
664   const struct ReserveMessage *msg = (const struct ReserveMessage *) message;
665   struct ReservationList *e;
666   unsigned long long used;
667   unsigned long long req;
668   uint64_t amount;
669   uint32_t entries;
670
671 #if DEBUG_DATASTORE
672   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "RESERVE");
673 #endif
674   amount = GNUNET_ntohll (msg->amount);
675   entries = ntohl (msg->entries);
676   used = payload + reserved;
677   req =
678       amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
679   if (used + req > quota)
680   {
681     if (quota < used)
682       used = quota;             /* cheat a bit for error message (to avoid negative numbers) */
683     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
684                 _
685                 ("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
686                 quota - used, "RESERVE", req);
687     if (cache_size < req)
688     {
689       /* TODO: document this in the FAQ; essentially, if this
690        * message happens, the insertion request could be blocked
691        * by less-important content from migration because it is
692        * larger than 1/8th of the overall available space, and
693        * we only reserve 1/8th for "fresh" insertions */
694       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
695                   _
696                   ("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
697                   req, cache_size);
698       transmit_status (client, 0,
699                        gettext_noop
700                        ("Insufficient space to satisfy request and "
701                         "requested amount is larger than cache size"));
702     }
703     else
704     {
705       transmit_status (client, 0,
706                        gettext_noop ("Insufficient space to satisfy request"));
707     }
708     return;
709   }
710   reserved += req;
711   GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
712                          GNUNET_NO);
713   e = GNUNET_malloc (sizeof (struct ReservationList));
714   e->next = reservations;
715   reservations = e;
716   e->client = client;
717   e->amount = amount;
718   e->entries = entries;
719   e->rid = ++reservation_gen;
720   if (reservation_gen < 0)
721     reservation_gen = 0;        /* wrap around */
722   transmit_status (client, e->rid, NULL);
723 }
724
725
726 /**
727  * Handle RELEASE_RESERVE-message.
728  *
729  * @param cls closure
730  * @param client identification of the client
731  * @param message the actual message
732  */
733 static void
734 handle_release_reserve (void *cls, struct GNUNET_SERVER_Client *client,
735                         const struct GNUNET_MessageHeader *message)
736 {
737   const struct ReleaseReserveMessage *msg =
738       (const struct ReleaseReserveMessage *) message;
739   struct ReservationList *pos;
740   struct ReservationList *prev;
741   struct ReservationList *next;
742   int rid = ntohl (msg->rid);
743   unsigned long long rem;
744
745 #if DEBUG_DATASTORE
746   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
747               "RELEASE_RESERVE");
748 #endif
749   next = reservations;
750   prev = NULL;
751   while (NULL != (pos = next))
752   {
753     next = pos->next;
754     if (rid == pos->rid)
755     {
756       if (prev == NULL)
757         reservations = next;
758       else
759         prev->next = next;
760       rem =
761           pos->amount +
762           ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
763       GNUNET_assert (reserved >= rem);
764       reserved -= rem;
765       GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
766                              GNUNET_NO);
767 #if DEBUG_DATASTORE
768       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
769                   "Returning %llu remaining reserved bytes to storage pool\n",
770                   rem);
771 #endif
772       GNUNET_free (pos);
773       transmit_status (client, GNUNET_OK, NULL);
774       return;
775     }
776     prev = pos;
777   }
778   GNUNET_break (0);
779   transmit_status (client, GNUNET_SYSERR,
780                    gettext_noop ("Could not find matching reservation"));
781 }
782
783
784 /**
785  * Check that the given message is a valid data message.
786  *
787  * @return NULL if the message is not well-formed, otherwise the message
788  */
789 static const struct DataMessage *
790 check_data (const struct GNUNET_MessageHeader *message)
791 {
792   uint16_t size;
793   uint32_t dsize;
794   const struct DataMessage *dm;
795
796   size = ntohs (message->size);
797   if (size < sizeof (struct DataMessage))
798   {
799     GNUNET_break (0);
800     return NULL;
801   }
802   dm = (const struct DataMessage *) message;
803   dsize = ntohl (dm->size);
804   if (size != dsize + sizeof (struct DataMessage))
805   {
806     GNUNET_break (0);
807     return NULL;
808   }
809   return dm;
810 }
811
812
813 /**
814  * Context for a PUT request used to see if the content is
815  * already present.
816  */
817 struct PutContext
818 {
819   /**
820    * Client to notify on completion.
821    */
822   struct GNUNET_SERVER_Client *client;
823
824 #if ! HAVE_UNALIGNED_64_ACCESS
825   void *reserved;
826 #endif
827
828   /* followed by the 'struct DataMessage' */
829 };
830
831
832 /**
833  * Actually put the data message.
834  *
835  * @param client sender of the message
836  * @param dm message with the data to store
837  */
838 static void
839 execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm)
840 {
841   uint32_t size;
842   char *msg;
843   int ret;
844
845   size = ntohl (dm->size);
846   msg = NULL;
847   ret =
848       plugin->api->put (plugin->api->cls, &dm->key, size, &dm[1],
849                         ntohl (dm->type), ntohl (dm->priority),
850                         ntohl (dm->anonymity), ntohl (dm->replication),
851                         GNUNET_TIME_absolute_ntoh (dm->expiration), &msg);
852   if (GNUNET_OK == ret)
853   {
854     GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes stored"), size,
855                               GNUNET_YES);
856     GNUNET_CONTAINER_bloomfilter_add (filter, &dm->key);
857 #if DEBUG_DATASTORE
858     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
859                 "Successfully stored %u bytes of type %u under key `%s'\n",
860                 size, ntohl (dm->type), GNUNET_h2s (&dm->key));
861 #endif
862   }
863   transmit_status (client, ret, msg);
864   GNUNET_free_non_null (msg);
865   if (quota - reserved - cache_size < payload)
866   {
867     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
868                 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
869                 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
870                 (unsigned long long) (quota - reserved - cache_size),
871                 (unsigned long long) payload);
872     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
873   }
874 }
875
876
877 /**
878  * Function that will check if the given datastore entry
879  * matches the put and if none match executes the put.
880  *
881  * @param cls closure, pointer to the client (of type 'struct PutContext').
882  * @param key key for the content
883  * @param size number of bytes in data
884  * @param data content stored
885  * @param type type of the content
886  * @param priority priority of the content
887  * @param anonymity anonymity-level for the content
888  * @param expiration expiration time for the content
889  * @param uid unique identifier for the datum;
890  *        maybe 0 if no unique identifier is available
891  *
892  * @return GNUNET_OK usually
893  *         GNUNET_NO to delete the item
894  */
895 static int
896 check_present (void *cls, const GNUNET_HashCode * key, uint32_t size,
897                const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
898                uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
899                uint64_t uid)
900 {
901   struct PutContext *pc = cls;
902   const struct DataMessage *dm;
903
904   dm = (const struct DataMessage *) &pc[1];
905   if (key == NULL)
906   {
907     execute_put (pc->client, dm);
908     GNUNET_SERVER_client_drop (pc->client);
909     GNUNET_free (pc);
910     return GNUNET_OK;
911   }
912   if ((GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
913       (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) || ((size == ntohl (dm->size)) &&
914                                                 (0 ==
915                                                  memcmp (&dm[1], data, size))))
916   {
917 #if DEBUG_MYSQL
918     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919                 "Result already present in datastore\n");
920 #endif
921     /* FIXME: change API to allow increasing 'replication' counter */
922     if ((ntohl (dm->priority) > 0) ||
923         (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value >
924          expiration.abs_value))
925       plugin->api->update (plugin->api->cls, uid,
926                            (int32_t) ntohl (dm->priority),
927                            GNUNET_TIME_absolute_ntoh (dm->expiration), NULL);
928     transmit_status (pc->client, GNUNET_NO, NULL);
929     GNUNET_SERVER_client_drop (pc->client);
930     GNUNET_free (pc);
931   }
932   else
933   {
934     execute_put (pc->client, dm);
935     GNUNET_SERVER_client_drop (pc->client);
936     GNUNET_free (pc);
937   }
938   return GNUNET_OK;
939 }
940
941
942 /**
943  * Handle PUT-message.
944  *
945  * @param cls closure
946  * @param client identification of the client
947  * @param message the actual message
948  */
949 static void
950 handle_put (void *cls, struct GNUNET_SERVER_Client *client,
951             const struct GNUNET_MessageHeader *message)
952 {
953   const struct DataMessage *dm = check_data (message);
954   int rid;
955   struct ReservationList *pos;
956   struct PutContext *pc;
957   GNUNET_HashCode vhash;
958   uint32_t size;
959
960   if ((dm == NULL) || (ntohl (dm->type) == 0))
961   {
962     GNUNET_break (0);
963     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
964     return;
965   }
966 #if DEBUG_DATASTORE
967   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
968               "Processing `%s' request for `%s' of type %u\n", "PUT",
969               GNUNET_h2s (&dm->key), ntohl (dm->type));
970 #endif
971   rid = ntohl (dm->rid);
972   size = ntohl (dm->size);
973   if (rid > 0)
974   {
975     pos = reservations;
976     while ((NULL != pos) && (rid != pos->rid))
977       pos = pos->next;
978     GNUNET_break (pos != NULL);
979     if (NULL != pos)
980     {
981       GNUNET_break (pos->entries > 0);
982       GNUNET_break (pos->amount >= size);
983       pos->entries--;
984       pos->amount -= size;
985       reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
986       GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
987                              GNUNET_NO);
988     }
989   }
990   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter, &dm->key))
991   {
992     GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
993     pc = GNUNET_malloc (sizeof (struct PutContext) + size +
994                         sizeof (struct DataMessage));
995     pc->client = client;
996     GNUNET_SERVER_client_keep (client);
997     memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
998     plugin->api->get_key (plugin->api->cls, 0, &dm->key, &vhash,
999                           ntohl (dm->type), &check_present, pc);
1000     return;
1001   }
1002   execute_put (client, dm);
1003 }
1004
1005
1006 /**
1007  * Handle GET-message.
1008  *
1009  * @param cls closure
1010  * @param client identification of the client
1011  * @param message the actual message
1012  */
1013 static void
1014 handle_get (void *cls, struct GNUNET_SERVER_Client *client,
1015             const struct GNUNET_MessageHeader *message)
1016 {
1017   const struct GetMessage *msg;
1018   uint16_t size;
1019
1020   size = ntohs (message->size);
1021   if ((size != sizeof (struct GetMessage)) &&
1022       (size != sizeof (struct GetMessage) - sizeof (GNUNET_HashCode)))
1023   {
1024     GNUNET_break (0);
1025     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1026     return;
1027   }
1028   msg = (const struct GetMessage *) message;
1029 #if DEBUG_DATASTORE
1030   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1031               "Processing `%s' request for `%s' of type %u\n", "GET",
1032               GNUNET_h2s (&msg->key), ntohl (msg->type));
1033 #endif
1034   GNUNET_STATISTICS_update (stats, gettext_noop ("# GET requests received"), 1,
1035                             GNUNET_NO);
1036   GNUNET_SERVER_client_keep (client);
1037   if ((size == sizeof (struct GetMessage)) &&
1038       (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)))
1039   {
1040     /* don't bother database... */
1041 #if DEBUG_DATASTORE
1042     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043                 "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1044                 "GET", GNUNET_h2s (&msg->key));
1045 #endif
1046     GNUNET_STATISTICS_update (stats,
1047                               gettext_noop
1048                               ("# requests filtered by bloomfilter"), 1,
1049                               GNUNET_NO);
1050     transmit_item (client, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1051                    0);
1052     return;
1053   }
1054   plugin->api->get_key (plugin->api->cls, GNUNET_ntohll (msg->offset),
1055                         ((size ==
1056                           sizeof (struct GetMessage)) ? &msg->key : NULL), NULL,
1057                         ntohl (msg->type), &transmit_item, client);
1058 }
1059
1060
1061 /**
1062  * Handle UPDATE-message.
1063  *
1064  * @param cls closure
1065  * @param client identification of the client
1066  * @param message the actual message
1067  */
1068 static void
1069 handle_update (void *cls, struct GNUNET_SERVER_Client *client,
1070                const struct GNUNET_MessageHeader *message)
1071 {
1072   const struct UpdateMessage *msg;
1073   int ret;
1074   char *emsg;
1075
1076   GNUNET_STATISTICS_update (stats, gettext_noop ("# UPDATE requests received"),
1077                             1, GNUNET_NO);
1078   msg = (const struct UpdateMessage *) message;
1079   emsg = NULL;
1080 #if DEBUG_DATASTORE
1081   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request for %llu\n",
1082               "UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid));
1083 #endif
1084   ret =
1085       plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid),
1086                            (int32_t) ntohl (msg->priority),
1087                            GNUNET_TIME_absolute_ntoh (msg->expiration), &emsg);
1088   transmit_status (client, ret, emsg);
1089   GNUNET_free_non_null (emsg);
1090 }
1091
1092
1093 /**
1094  * Handle GET_REPLICATION-message.
1095  *
1096  * @param cls closure
1097  * @param client identification of the client
1098  * @param message the actual message
1099  */
1100 static void
1101 handle_get_replication (void *cls, struct GNUNET_SERVER_Client *client,
1102                         const struct GNUNET_MessageHeader *message)
1103 {
1104 #if DEBUG_DATASTORE
1105   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
1106               "GET_REPLICATION");
1107 #endif
1108   GNUNET_STATISTICS_update (stats,
1109                             gettext_noop
1110                             ("# GET REPLICATION requests received"), 1,
1111                             GNUNET_NO);
1112   GNUNET_SERVER_client_keep (client);
1113   plugin->api->get_replication (plugin->api->cls, &transmit_item, client);
1114 }
1115
1116
1117 /**
1118  * Handle GET_ZERO_ANONYMITY-message.
1119  *
1120  * @param cls closure
1121  * @param client identification of the client
1122  * @param message the actual message
1123  */
1124 static void
1125 handle_get_zero_anonymity (void *cls, struct GNUNET_SERVER_Client *client,
1126                            const struct GNUNET_MessageHeader *message)
1127 {
1128   const struct GetZeroAnonymityMessage *msg =
1129       (const struct GetZeroAnonymityMessage *) message;
1130   enum GNUNET_BLOCK_Type type;
1131
1132   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1133   if (type == GNUNET_BLOCK_TYPE_ANY)
1134   {
1135     GNUNET_break (0);
1136     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1137     return;
1138   }
1139 #if DEBUG_DATASTORE
1140   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
1141               "GET_ZERO_ANONYMITY");
1142 #endif
1143   GNUNET_STATISTICS_update (stats,
1144                             gettext_noop
1145                             ("# GET ZERO ANONYMITY requests received"), 1,
1146                             GNUNET_NO);
1147   GNUNET_SERVER_client_keep (client);
1148   plugin->api->get_zero_anonymity (plugin->api->cls,
1149                                    GNUNET_ntohll (msg->offset), type,
1150                                    &transmit_item, client);
1151 }
1152
1153
1154 /**
1155  * Callback function that will cause the item that is passed
1156  * in to be deleted (by returning GNUNET_NO).
1157  */
1158 static int
1159 remove_callback (void *cls, const GNUNET_HashCode * key, uint32_t size,
1160                  const void *data, enum GNUNET_BLOCK_Type type,
1161                  uint32_t priority, uint32_t anonymity,
1162                  struct GNUNET_TIME_Absolute expiration, uint64_t uid)
1163 {
1164   struct GNUNET_SERVER_Client *client = cls;
1165
1166   if (key == NULL)
1167   {
1168 #if DEBUG_DATASTORE
1169     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1170                 "No further matches for `%s' request.\n", "REMOVE");
1171 #endif
1172     transmit_status (client, GNUNET_NO, _("Content not found"));
1173     GNUNET_SERVER_client_drop (client);
1174     return GNUNET_OK;           /* last item */
1175   }
1176 #if DEBUG_DATASTORE
1177   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1178               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1179               (unsigned long long) uid, "REMOVE", GNUNET_h2s (key), type);
1180 #endif
1181   GNUNET_STATISTICS_update (stats,
1182                             gettext_noop ("# bytes removed (explicit request)"),
1183                             size, GNUNET_YES);
1184   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
1185   transmit_status (client, GNUNET_OK, NULL);
1186   GNUNET_SERVER_client_drop (client);
1187   return GNUNET_NO;
1188 }
1189
1190
1191 /**
1192  * Handle REMOVE-message.
1193  *
1194  * @param cls closure
1195  * @param client identification of the client
1196  * @param message the actual message
1197  */
1198 static void
1199 handle_remove (void *cls, struct GNUNET_SERVER_Client *client,
1200                const struct GNUNET_MessageHeader *message)
1201 {
1202   const struct DataMessage *dm = check_data (message);
1203   GNUNET_HashCode vhash;
1204
1205   if (dm == NULL)
1206   {
1207     GNUNET_break (0);
1208     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1209     return;
1210   }
1211 #if DEBUG_DATASTORE
1212   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1213               "Processing `%s' request for `%s' of type %u\n", "REMOVE",
1214               GNUNET_h2s (&dm->key), ntohl (dm->type));
1215 #endif
1216   GNUNET_STATISTICS_update (stats, gettext_noop ("# REMOVE requests received"),
1217                             1, GNUNET_NO);
1218   GNUNET_SERVER_client_keep (client);
1219   GNUNET_CRYPTO_hash (&dm[1], ntohl (dm->size), &vhash);
1220   plugin->api->get_key (plugin->api->cls, 0, &dm->key, &vhash,
1221                         (enum GNUNET_BLOCK_Type) ntohl (dm->type),
1222                         &remove_callback, client);
1223 }
1224
1225
1226 /**
1227  * Handle DROP-message.
1228  *
1229  * @param cls closure
1230  * @param client identification of the client
1231  * @param message the actual message
1232  */
1233 static void
1234 handle_drop (void *cls, struct GNUNET_SERVER_Client *client,
1235              const struct GNUNET_MessageHeader *message)
1236 {
1237 #if DEBUG_DATASTORE
1238   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "DROP");
1239 #endif
1240   do_drop = GNUNET_YES;
1241   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1242 }
1243
1244
1245 /**
1246  * Function called by plugins to notify us about a
1247  * change in their disk utilization.
1248  *
1249  * @param cls closure (NULL)
1250  * @param delta change in disk utilization,
1251  *        0 for "reset to empty"
1252  */
1253 static void
1254 disk_utilization_change_cb (void *cls, int delta)
1255 {
1256   if ((delta < 0) && (payload < -delta))
1257   {
1258     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1259                 _
1260                 ("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1261                 (long long) payload, (long long) -delta);
1262     payload = plugin->api->estimate_size (plugin->api->cls);
1263     sync_stats ();
1264     return;
1265   }
1266   payload += delta;
1267   lastSync++;
1268   if (lastSync >= MAX_STAT_SYNC_LAG)
1269     sync_stats ();
1270 }
1271
1272
1273 /**
1274  * Callback function to process statistic values.
1275  *
1276  * @param cls closure (struct Plugin*)
1277  * @param subsystem name of subsystem that created the statistic
1278  * @param name the name of the datum
1279  * @param value the current value
1280  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1281  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1282  */
1283 static int
1284 process_stat_in (void *cls, const char *subsystem, const char *name,
1285                  uint64_t value, int is_persistent)
1286 {
1287   GNUNET_assert (stats_worked == GNUNET_NO);
1288   stats_worked = GNUNET_YES;
1289   payload += value;
1290 #if DEBUG_SQLITE
1291   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1292               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1293               abs_value, payload);
1294 #endif
1295   return GNUNET_OK;
1296 }
1297
1298
1299 static void
1300 process_stat_done (void *cls, int success)
1301 {
1302   struct DatastorePlugin *plugin = cls;
1303
1304   stat_get = NULL;
1305   if (stats_worked == GNUNET_NO)
1306     payload = plugin->api->estimate_size (plugin->api->cls);
1307 }
1308
1309
1310 /**
1311  * Load the datastore plugin.
1312  */
1313 static struct DatastorePlugin *
1314 load_plugin ()
1315 {
1316   struct DatastorePlugin *ret;
1317   char *libname;
1318
1319   ret = GNUNET_malloc (sizeof (struct DatastorePlugin));
1320   ret->env.cfg = cfg;
1321   ret->env.duc = &disk_utilization_change_cb;
1322   ret->env.cls = NULL;
1323   GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Loading `%s' datastore plugin\n"),
1324               plugin_name);
1325   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", plugin_name);
1326   ret->short_name = GNUNET_strdup (plugin_name);
1327   ret->lib_name = libname;
1328   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1329   if (ret->api == NULL)
1330   {
1331     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1332                 _("Failed to load datastore plugin for `%s'\n"), plugin_name);
1333     GNUNET_free (ret->short_name);
1334     GNUNET_free (libname);
1335     GNUNET_free (ret);
1336     return NULL;
1337   }
1338   return ret;
1339 }
1340
1341
1342 /**
1343  * Function called when the service shuts
1344  * down.  Unloads our datastore plugin.
1345  *
1346  * @param plug plugin to unload
1347  */
1348 static void
1349 unload_plugin (struct DatastorePlugin *plug)
1350 {
1351 #if DEBUG_DATASTORE
1352   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1353               "Datastore service is unloading plugin...\n");
1354 #endif
1355   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1356   GNUNET_free (plug->lib_name);
1357   GNUNET_free (plug->short_name);
1358   GNUNET_free (plug);
1359   GNUNET_free (quota_stat_name);
1360   quota_stat_name = NULL;
1361 }
1362
1363
1364 /**
1365  * Final task run after shutdown.  Unloads plugins and disconnects us from
1366  * statistics.
1367  */
1368 static void
1369 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1370 {
1371   if (lastSync > 0)
1372     sync_stats ();
1373   if (GNUNET_YES == do_drop)
1374     plugin->api->drop (plugin->api->cls);
1375   unload_plugin (plugin);
1376   plugin = NULL;
1377   if (filter != NULL)
1378   {
1379     GNUNET_CONTAINER_bloomfilter_free (filter);
1380     filter = NULL;
1381   }
1382   if (stat_get != NULL)
1383   {
1384     GNUNET_STATISTICS_get_cancel (stat_get);
1385     stat_get = NULL;
1386   }
1387   if (stats != NULL)
1388   {
1389     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1390     stats = NULL;
1391   }
1392   GNUNET_free_non_null (plugin_name);
1393   plugin_name = NULL;
1394 }
1395
1396
1397 /**
1398  * Last task run during shutdown.  Disconnects us from
1399  * the transport and core.
1400  */
1401 static void
1402 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1403 {
1404   struct TransmitCallbackContext *tcc;
1405
1406   cleaning_done = GNUNET_YES;
1407   while (NULL != (tcc = tcc_head))
1408   {
1409     GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
1410     if (tcc->th != NULL)
1411     {
1412       GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1413       GNUNET_SERVER_client_drop (tcc->client);
1414     }
1415     GNUNET_free (tcc->msg);
1416     GNUNET_free (tcc);
1417   }
1418   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1419   {
1420     GNUNET_SCHEDULER_cancel (expired_kill_task);
1421     expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1422   }
1423   GNUNET_SCHEDULER_add_continuation (&unload_task, NULL,
1424                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1425 }
1426
1427
1428 /**
1429  * Function that removes all active reservations made
1430  * by the given client and releases the space for other
1431  * requests.
1432  *
1433  * @param cls closure
1434  * @param client identification of the client
1435  */
1436 static void
1437 cleanup_reservations (void *cls, struct GNUNET_SERVER_Client *client)
1438 {
1439   struct ReservationList *pos;
1440   struct ReservationList *prev;
1441   struct ReservationList *next;
1442
1443   if (client == NULL)
1444     return;
1445   prev = NULL;
1446   pos = reservations;
1447   while (NULL != pos)
1448   {
1449     next = pos->next;
1450     if (pos->client == client)
1451     {
1452       if (prev == NULL)
1453         reservations = next;
1454       else
1455         prev->next = next;
1456       reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1457       GNUNET_free (pos);
1458     }
1459     else
1460     {
1461       prev = pos;
1462     }
1463     pos = next;
1464   }
1465   GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
1466                          GNUNET_NO);
1467 }
1468
1469
1470 /**
1471  * Adds a given key to the bloomfilter 'count' times.
1472  *
1473  * @param cls the bloomfilter
1474  * @param key key to add
1475  * @param count number of times to add key
1476  */
1477 static void
1478 add_key_to_bloomfilter (void *cls,
1479                         const GNUNET_HashCode *key,
1480                         unsigned int count)
1481 {
1482   struct GNUNET_CONTAINER_BloomFilter *bf = cls;
1483   while (0 < count--)
1484     GNUNET_CONTAINER_bloomfilter_add (bf, key);
1485 }
1486
1487
1488 /**
1489  * Process datastore requests.
1490  *
1491  * @param cls closure
1492  * @param server the initialized server
1493  * @param c configuration to use
1494  */
1495 static void
1496 run (void *cls, struct GNUNET_SERVER_Handle *server,
1497      const struct GNUNET_CONFIGURATION_Handle *c)
1498 {
1499   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1500     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1501      sizeof (struct ReserveMessage)},
1502     {&handle_release_reserve, NULL,
1503      GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1504      sizeof (struct ReleaseReserveMessage)},
1505     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
1506     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1507      sizeof (struct UpdateMessage)},
1508     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
1509     {&handle_get_replication, NULL,
1510      GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1511      sizeof (struct GNUNET_MessageHeader)},
1512     {&handle_get_zero_anonymity, NULL,
1513      GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1514      sizeof (struct GetZeroAnonymityMessage)},
1515     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
1516     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1517      sizeof (struct GNUNET_MessageHeader)},
1518     {NULL, NULL, 0, 0}
1519   };
1520   char *fn;  
1521   char *pfn;
1522   unsigned int bf_size;
1523   int refresh_bf;
1524
1525   cfg = c;
1526   if (GNUNET_OK !=
1527       GNUNET_CONFIGURATION_get_value_string (cfg, "DATASTORE", "DATABASE",
1528                                              &plugin_name))
1529   {
1530     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1531                 _("No `%s' specified for `%s' in configuration!\n"), "DATABASE",
1532                 "DATASTORE");
1533     return;
1534   }
1535   GNUNET_asprintf (&quota_stat_name,
1536                    _("# bytes used in file-sharing datastore `%s'"),
1537                    plugin_name);
1538   if (GNUNET_OK !=
1539       GNUNET_CONFIGURATION_get_value_size (cfg, "DATASTORE", "QUOTA", &quota))
1540   {
1541     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1542                 _("No `%s' specified for `%s' in configuration!\n"), "QUOTA",
1543                 "DATASTORE");
1544     return;
1545   }
1546   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1547   GNUNET_STATISTICS_set (stats, gettext_noop ("# quota"), quota, GNUNET_NO);
1548   cache_size = quota / 8;       /* Or should we make this an option? */
1549   GNUNET_STATISTICS_set (stats, gettext_noop ("# cache size"), cache_size,
1550                          GNUNET_NO);
1551   if (quota / (32 * 1024LL) > (1 << 31)) 
1552     bf_size = (1 << 31);          /* absolute limit: ~2 GB, beyond that BF just won't help anyway */
1553   else
1554     bf_size = quota / (32 * 1024LL);         /* 8 bit per entry, 1 bit per 32 kb in DB */
1555   fn = NULL;
1556   if ((GNUNET_OK !=
1557        GNUNET_CONFIGURATION_get_value_filename (cfg, "DATASTORE", "BLOOMFILTER",
1558                                                 &fn)) ||
1559       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
1560   {
1561     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1562                 _("Could not use specified filename `%s' for bloomfilter.\n"),
1563                 fn != NULL ? fn : "");
1564     GNUNET_free_non_null (fn);
1565     fn = NULL;
1566   }
1567   if (fn != NULL)
1568   {
1569     GNUNET_asprintf (&pfn, "%s.%s", fn, plugin_name);
1570     if (GNUNET_YES == GNUNET_DISK_file_test (pfn))
1571     {
1572       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1573       if (NULL == filter)
1574       {
1575         /* file exists but not valid, remove and try again, but refresh */
1576         if (0 != UNLINK (pfn))
1577         {
1578           /* failed to remove, run without file */
1579           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1580                       _("Failed to remove bogus bloomfilter file `%s'\n"),
1581                       pfn);
1582           GNUNET_free (pfn);
1583           pfn = NULL;
1584           filter = GNUNET_CONTAINER_bloomfilter_load (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1585           refresh_bf = GNUNET_YES;
1586         }
1587         else
1588         {
1589           /* try again after remove */
1590           filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1591           refresh_bf = GNUNET_YES;
1592           if (NULL == filter)
1593           {
1594             /* failed yet again, give up on using file */
1595             GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1596                         _("Failed to remove bogus bloomfilter file `%s'\n"),
1597                         pfn);
1598             GNUNET_free (pfn);
1599             pfn = NULL;
1600             filter = GNUNET_CONTAINER_bloomfilter_load (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1601           }
1602         }
1603       }
1604       else
1605       {
1606         /* normal case: have an existing valid bf file, no need to refresh */
1607         refresh_bf = GNUNET_NO;
1608       }
1609     }
1610     else
1611     {
1612       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1613       refresh_bf = GNUNET_YES;
1614     }
1615     GNUNET_free (pfn);
1616   }
1617   else
1618   {
1619     filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);      /* approx. 3% false positives at max use */
1620     refresh_bf = GNUNET_YES;
1621   }
1622   GNUNET_free_non_null (fn);
1623   if (filter == NULL)
1624   {
1625     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1626                 _("Failed to initialize bloomfilter.\n"));
1627     if (stats != NULL)
1628     {
1629       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1630       stats = NULL;
1631     }
1632     return;
1633   }
1634   plugin = load_plugin ();
1635   if (NULL == plugin)
1636   {
1637     GNUNET_CONTAINER_bloomfilter_free (filter);
1638     filter = NULL;
1639     if (stats != NULL)
1640     {
1641       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1642       stats = NULL;
1643     }
1644     return;
1645   }
1646   stat_get =
1647       GNUNET_STATISTICS_get (stats, "datastore", quota_stat_name,
1648                              GNUNET_TIME_UNIT_SECONDS, &process_stat_done,
1649                              &process_stat_in, plugin);
1650   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1651   GNUNET_SERVER_add_handlers (server, handlers);
1652   if (GNUNET_YES == refresh_bf)
1653   {
1654     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1655                 _("Rebuilding bloomfilter.  Please be patient.\n"));
1656     if (NULL != plugin->api->get_keys)
1657       plugin->api->get_keys (plugin->api->cls, &add_key_to_bloomfilter, filter);  
1658     else
1659       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1660                   _("Plugin does not support get_keys function. Please fix!\n"));
1661
1662     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1663                 _("Bloomfilter construction complete.\n"));
1664   }
1665   expired_kill_task =
1666       GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1667                                           &delete_expired, NULL);
1668   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleaning_task,
1669                                 NULL);
1670 }
1671
1672
1673 /**
1674  * The main function for the datastore service.
1675  *
1676  * @param argc number of arguments from the command line
1677  * @param argv command line arguments
1678  * @return 0 ok, 1 on error
1679  */
1680 int
1681 main (int argc, char *const *argv)
1682 {
1683   int ret;
1684
1685   ret =
1686       (GNUNET_OK ==
1687        GNUNET_SERVICE_run (argc, argv, "datastore", GNUNET_SERVICE_OPTION_NONE,
1688                            &run, NULL)) ? 0 : 1;
1689   return ret;
1690 }
1691
1692
1693 /* end of gnunet-service-datastore.c */