indentation
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51
52 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
53
54 /**
55  * After how many payload-changing operations
56  * do we sync our statistics?
57  */
58 #define MAX_STAT_SYNC_LAG 50
59
60
61 /**
62  * Our datastore plugin.
63  */
64 struct DatastorePlugin
65 {
66
67   /**
68    * API of the transport as returned by the plugin's
69    * initialization function.
70    */
71   struct GNUNET_DATASTORE_PluginFunctions *api;
72
73   /**
74    * Short name for the plugin (i.e. "sqlite").
75    */
76   char *short_name;
77
78   /**
79    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
80    */
81   char *lib_name;
82
83   /**
84    * Environment this transport service is using
85    * for this plugin.
86    */
87   struct GNUNET_DATASTORE_PluginEnvironment env;
88
89 };
90
91
92 /**
93  * Linked list of active reservations.
94  */
95 struct ReservationList
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct ReservationList *next;
102
103   /**
104    * Client that made the reservation.
105    */
106   struct GNUNET_SERVER_Client *client;
107
108   /**
109    * Number of bytes (still) reserved.
110    */
111   uint64_t amount;
112
113   /**
114    * Number of items (still) reserved.
115    */
116   uint64_t entries;
117
118   /**
119    * Reservation identifier.
120    */
121   int32_t rid;
122
123 };
124
125
126
127 /**
128  * Our datastore plugin (NULL if not available).
129  */
130 static struct DatastorePlugin *plugin;
131
132 /**
133  * Linked list of space reservations made by clients.
134  */
135 static struct ReservationList *reservations;
136
137 /**
138  * Bloomfilter to quickly tell if we don't have the content.
139  */
140 static struct GNUNET_CONTAINER_BloomFilter *filter;
141
142 /**
143  * How much space are we allowed to use?
144  */
145 static unsigned long long quota;
146
147 /**
148  * Should the database be dropped on exit?
149  */
150 static int do_drop;
151
152 /**
153  * How much space are we using for the cache?  (space available for
154  * insertions that will be instantly reclaimed by discarding less
155  * important content --- or possibly whatever we just inserted into
156  * the "cache").
157  */
158 static unsigned long long cache_size;
159
160 /**
161  * How much space have we currently reserved?
162  */
163 static unsigned long long reserved;
164
165 /**
166  * How much data are we currently storing
167  * in the database?
168  */
169 static unsigned long long payload;
170
171 /**
172  * Number of updates that were made to the
173  * payload value since we last synchronized
174  * it with the statistics service.
175  */
176 static unsigned int lastSync;
177
178 /**
179  * Did we get an answer from statistics?
180  */
181 static int stats_worked;
182
183 /**
184  * Identity of the task that is used to delete
185  * expired content.
186  */
187 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
188
189 /**
190  * Our configuration.
191  */
192 const struct GNUNET_CONFIGURATION_Handle *cfg;
193
194
195 /**
196  * Handle for reporting statistics.
197  */
198 static struct GNUNET_STATISTICS_Handle *stats;
199
200
201 /**
202  * Synchronize our utilization statistics with the 
203  * statistics service.
204  */
205 static void
206 sync_stats ()
207 {
208   GNUNET_STATISTICS_set (stats, QUOTA_STAT_NAME, payload, GNUNET_YES);
209   lastSync = 0;
210 }
211
212
213
214 /**
215  * Context for transmitting replies to clients.
216  */
217 struct TransmitCallbackContext
218 {
219
220   /**
221    * We keep these in a doubly-linked list (for cleanup).
222    */
223   struct TransmitCallbackContext *next;
224
225   /**
226    * We keep these in a doubly-linked list (for cleanup).
227    */
228   struct TransmitCallbackContext *prev;
229
230   /**
231    * The message that we're asked to transmit.
232    */
233   struct GNUNET_MessageHeader *msg;
234
235   /**
236    * Handle for the transmission request.
237    */
238   struct GNUNET_CONNECTION_TransmitHandle *th;
239
240   /**
241    * Client that we are transmitting to.
242    */
243   struct GNUNET_SERVER_Client *client;
244
245 };
246
247
248 /**
249  * Head of the doubly-linked list (for cleanup).
250  */
251 static struct TransmitCallbackContext *tcc_head;
252
253 /**
254  * Tail of the doubly-linked list (for cleanup).
255  */
256 static struct TransmitCallbackContext *tcc_tail;
257
258 /**
259  * Have we already cleaned up the TCCs and are hence no longer
260  * willing (or able) to transmit anything to anyone?
261  */
262 static int cleaning_done;
263
264 /**
265  * Handle for pending get request.
266  */
267 static struct GNUNET_STATISTICS_GetHandle *stat_get;
268
269
270 /**
271  * Task that is used to remove expired entries from
272  * the datastore.  This task will schedule itself
273  * again automatically to always delete all expired
274  * content quickly.
275  *
276  * @param cls not used
277  * @param tc task context
278  */
279 static void
280 delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
281
282
283 /**
284  * Iterate over the expired items stored in the datastore.
285  * Delete all expired items; once we have processed all
286  * expired items, re-schedule the "delete_expired" task.
287  *
288  * @param cls not used
289  * @param key key for the content
290  * @param size number of bytes in data
291  * @param data content stored
292  * @param type type of the content
293  * @param priority priority of the content
294  * @param anonymity anonymity-level for the content
295  * @param expiration expiration time for the content
296  * @param uid unique identifier for the datum;
297  *        maybe 0 if no unique identifier is available
298  *
299  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
300  *         (continue on call to "next", of course),
301  *         GNUNET_NO to delete the item and continue (if supported)
302  */
303 static int
304 expired_processor (void *cls,
305                    const GNUNET_HashCode * key,
306                    uint32_t size,
307                    const void *data,
308                    enum GNUNET_BLOCK_Type type,
309                    uint32_t priority,
310                    uint32_t anonymity,
311                    struct GNUNET_TIME_Absolute expiration, uint64_t uid)
312 {
313   struct GNUNET_TIME_Absolute now;
314
315   if (key == NULL)
316   {
317     expired_kill_task
318         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
319                                         &delete_expired, NULL);
320     return GNUNET_SYSERR;
321   }
322   now = GNUNET_TIME_absolute_get ();
323   if (expiration.abs_value > now.abs_value)
324   {
325     /* finished processing */
326     expired_kill_task
327         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
328                                         &delete_expired, NULL);
329     return GNUNET_SYSERR;
330   }
331 #if DEBUG_DATASTORE
332   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
333               "Deleting content `%s' of type %u that expired %llu ms ago\n",
334               GNUNET_h2s (key),
335               type,
336               (unsigned long long) (now.abs_value - expiration.abs_value));
337 #endif
338   GNUNET_STATISTICS_update (stats,
339                             gettext_noop ("# bytes expired"), size, GNUNET_YES);
340   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
341   expired_kill_task
342       = GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY, &delete_expired, NULL);
343   return GNUNET_NO;
344 }
345
346
347 /**
348  * Task that is used to remove expired entries from
349  * the datastore.  This task will schedule itself
350  * again automatically to always delete all expired
351  * content quickly.
352  *
353  * @param cls not used
354  * @param tc task context
355  */
356 static void
357 delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
358 {
359   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
360   plugin->api->get_expiration (plugin->api->cls, &expired_processor, NULL);
361 }
362
363
364 /**
365  * An iterator over a set of items stored in the datastore
366  * that deletes until we're happy with respect to our quota.
367  *
368  * @param cls closure
369  * @param key key for the content
370  * @param size number of bytes in data
371  * @param data content stored
372  * @param type type of the content
373  * @param priority priority of the content
374  * @param anonymity anonymity-level for the content
375  * @param expiration expiration time for the content
376  * @param uid unique identifier for the datum;
377  *        maybe 0 if no unique identifier is available
378  *
379  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
380  *         (continue on call to "next", of course),
381  *         GNUNET_NO to delete the item and continue (if supported)
382  */
383 static int
384 quota_processor (void *cls,
385                  const GNUNET_HashCode * key,
386                  uint32_t size,
387                  const void *data,
388                  enum GNUNET_BLOCK_Type type,
389                  uint32_t priority,
390                  uint32_t anonymity,
391                  struct GNUNET_TIME_Absolute expiration, uint64_t uid)
392 {
393   unsigned long long *need = cls;
394
395   if (NULL == key)
396     return GNUNET_SYSERR;
397 #if DEBUG_DATASTORE
398   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
400               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
401               GNUNET_h2s (key), type, *need);
402 #endif
403   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
404     *need = 0;
405   else
406     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
407   GNUNET_STATISTICS_update (stats,
408                             gettext_noop ("# bytes purged (low-priority)"),
409                             size, GNUNET_YES);
410   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
411   return GNUNET_NO;
412 }
413
414
415 /**
416  * Manage available disk space by running tasks
417  * that will discard content if necessary.  This
418  * function will be run whenever a request for
419  * "need" bytes of storage could only be satisfied
420  * by eating into the "cache" (and we want our cache
421  * space back).
422  *
423  * @param need number of bytes of content that were
424  *        placed into the "cache" (and hence the
425  *        number of bytes that should be removed).
426  */
427 static void
428 manage_space (unsigned long long need)
429 {
430   unsigned long long last;
431
432 #if DEBUG_DATASTORE
433   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
434               "Asked to free up %llu bytes of cache space\n", need);
435 #endif
436   last = 0;
437   while ((need > 0) && (last != need))
438   {
439     last = need;
440     plugin->api->get_expiration (plugin->api->cls, &quota_processor, &need);
441   }
442 }
443
444
445 /**
446  * Function called to notify a client about the socket
447  * begin ready to queue more data.  "buf" will be
448  * NULL and "size" zero if the socket was closed for
449  * writing in the meantime.
450  *
451  * @param cls closure
452  * @param size number of bytes available in buf
453  * @param buf where the callee should write the message
454  * @return number of bytes written to buf
455  */
456 static size_t
457 transmit_callback (void *cls, size_t size, void *buf)
458 {
459   struct TransmitCallbackContext *tcc = cls;
460   size_t msize;
461
462   tcc->th = NULL;
463   GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
464   msize = ntohs (tcc->msg->size);
465   if (size == 0)
466   {
467     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
468                 _("Transmission to client failed!\n"));
469     GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
470     GNUNET_SERVER_client_drop (tcc->client);
471     GNUNET_free (tcc->msg);
472     GNUNET_free (tcc);
473     return 0;
474   }
475   GNUNET_assert (size >= msize);
476   memcpy (buf, tcc->msg, msize);
477   GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
478   GNUNET_SERVER_client_drop (tcc->client);
479   GNUNET_free (tcc->msg);
480   GNUNET_free (tcc);
481   return msize;
482 }
483
484
485 /**
486  * Transmit the given message to the client.
487  *
488  * @param client target of the message
489  * @param msg message to transmit, will be freed!
490  */
491 static void
492 transmit (struct GNUNET_SERVER_Client *client, struct GNUNET_MessageHeader *msg)
493 {
494   struct TransmitCallbackContext *tcc;
495
496   if (GNUNET_YES == cleaning_done)
497   {
498 #if DEBUG_DATASTORE
499     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
500                 "Shutdown in progress, aborting transmission.\n");
501 #endif
502     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
503     GNUNET_free (msg);
504     return;
505   }
506   tcc = GNUNET_malloc (sizeof (struct TransmitCallbackContext));
507   tcc->msg = msg;
508   tcc->client = client;
509   if (NULL ==
510       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
511                                                       ntohs (msg->size),
512                                                       GNUNET_TIME_UNIT_FOREVER_REL,
513                                                       &transmit_callback, tcc)))
514   {
515     GNUNET_break (0);
516     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
517     GNUNET_free (msg);
518     GNUNET_free (tcc);
519     return;
520   }
521   GNUNET_SERVER_client_keep (client);
522   GNUNET_CONTAINER_DLL_insert (tcc_head, tcc_tail, tcc);
523 }
524
525
526 /**
527  * Transmit a status code to the client.
528  *
529  * @param client receiver of the response
530  * @param code status code
531  * @param msg optional error message (can be NULL)
532  */
533 static void
534 transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg)
535 {
536   struct StatusMessage *sm;
537   size_t slen;
538
539 #if DEBUG_DATASTORE
540   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
541               "Transmitting `%s' message with value %d and message `%s'\n",
542               "STATUS", code, msg != NULL ? msg : "(none)");
543 #endif
544   slen = (msg == NULL) ? 0 : strlen (msg) + 1;
545   sm = GNUNET_malloc (sizeof (struct StatusMessage) + slen);
546   sm->header.size = htons (sizeof (struct StatusMessage) + slen);
547   sm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
548   sm->status = htonl (code);
549   if (slen > 0)
550     memcpy (&sm[1], msg, slen);
551   transmit (client, &sm->header);
552 }
553
554
555
556 /**
557  * Function that will transmit the given datastore entry
558  * to the client.
559  *
560  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
561  * @param key key for the content
562  * @param size number of bytes in data
563  * @param data content stored
564  * @param type type of the content
565  * @param priority priority of the content
566  * @param anonymity anonymity-level for the content
567  * @param expiration expiration time for the content
568  * @param uid unique identifier for the datum;
569  *        maybe 0 if no unique identifier is available
570  *
571  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
572  *         GNUNET_NO to delete the item and continue (if supported)
573  */
574 static int
575 transmit_item (void *cls,
576                const GNUNET_HashCode * key,
577                uint32_t size,
578                const void *data,
579                enum GNUNET_BLOCK_Type type,
580                uint32_t priority,
581                uint32_t anonymity,
582                struct GNUNET_TIME_Absolute expiration, uint64_t uid)
583 {
584   struct GNUNET_SERVER_Client *client = cls;
585   struct GNUNET_MessageHeader *end;
586   struct DataMessage *dm;
587
588   if (key == NULL)
589   {
590     /* transmit 'DATA_END' */
591 #if DEBUG_DATASTORE
592     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
593                 "Transmitting `%s' message\n", "DATA_END");
594 #endif
595     end = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
596     end->size = htons (sizeof (struct GNUNET_MessageHeader));
597     end->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
598     transmit (client, end);
599     GNUNET_SERVER_client_drop (client);
600     return GNUNET_OK;
601   }
602   GNUNET_assert (sizeof (struct DataMessage) + size <
603                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
604   dm = GNUNET_malloc (sizeof (struct DataMessage) + size);
605   dm->header.size = htons (sizeof (struct DataMessage) + size);
606   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
607   dm->rid = htonl (0);
608   dm->size = htonl (size);
609   dm->type = htonl (type);
610   dm->priority = htonl (priority);
611   dm->anonymity = htonl (anonymity);
612   dm->replication = htonl (0);
613   dm->reserved = htonl (0);
614   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
615   dm->uid = GNUNET_htonll (uid);
616   dm->key = *key;
617   memcpy (&dm[1], data, size);
618 #if DEBUG_DATASTORE
619   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
620               "Transmitting `%s' message for `%s' of type %u with expiration %llu (now: %llu)\n",
621               "DATA",
622               GNUNET_h2s (key),
623               type,
624               (unsigned long long) expiration.abs_value,
625               (unsigned long long) GNUNET_TIME_absolute_get ().abs_value);
626 #endif
627   GNUNET_STATISTICS_update (stats,
628                             gettext_noop ("# results found"), 1, GNUNET_NO);
629   transmit (client, &dm->header);
630   GNUNET_SERVER_client_drop (client);
631   return GNUNET_OK;
632 }
633
634
635 /**
636  * Handle RESERVE-message.
637  *
638  * @param cls closure
639  * @param client identification of the client
640  * @param message the actual message
641  */
642 static void
643 handle_reserve (void *cls,
644                 struct GNUNET_SERVER_Client *client,
645                 const struct GNUNET_MessageHeader *message)
646 {
647   /**
648    * Static counter to produce reservation identifiers.
649    */
650   static int reservation_gen;
651
652   const struct ReserveMessage *msg = (const struct ReserveMessage *) message;
653   struct ReservationList *e;
654   unsigned long long used;
655   unsigned long long req;
656   uint64_t amount;
657   uint32_t entries;
658
659 #if DEBUG_DATASTORE
660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "RESERVE");
661 #endif
662   amount = GNUNET_ntohll (msg->amount);
663   entries = ntohl (msg->entries);
664   used = payload + reserved;
665   req =
666       amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
667   if (used + req > quota)
668   {
669     if (quota < used)
670       used = quota;             /* cheat a bit for error message (to avoid negative numbers) */
671     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
672                 _
673                 ("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
674                 quota - used, "RESERVE", req);
675     if (cache_size < req)
676     {
677       /* TODO: document this in the FAQ; essentially, if this
678        * message happens, the insertion request could be blocked
679        * by less-important content from migration because it is
680        * larger than 1/8th of the overall available space, and
681        * we only reserve 1/8th for "fresh" insertions */
682       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
683                   _
684                   ("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
685                   req, cache_size);
686       transmit_status (client, 0,
687                        gettext_noop
688                        ("Insufficient space to satisfy request and "
689                         "requested amount is larger than cache size"));
690     }
691     else
692     {
693       transmit_status (client, 0,
694                        gettext_noop ("Insufficient space to satisfy request"));
695     }
696     return;
697   }
698   reserved += req;
699   GNUNET_STATISTICS_set (stats,
700                          gettext_noop ("# reserved"), reserved, GNUNET_NO);
701   e = GNUNET_malloc (sizeof (struct ReservationList));
702   e->next = reservations;
703   reservations = e;
704   e->client = client;
705   e->amount = amount;
706   e->entries = entries;
707   e->rid = ++reservation_gen;
708   if (reservation_gen < 0)
709     reservation_gen = 0;        /* wrap around */
710   transmit_status (client, e->rid, NULL);
711 }
712
713
714 /**
715  * Handle RELEASE_RESERVE-message.
716  *
717  * @param cls closure
718  * @param client identification of the client
719  * @param message the actual message
720  */
721 static void
722 handle_release_reserve (void *cls,
723                         struct GNUNET_SERVER_Client *client,
724                         const struct GNUNET_MessageHeader *message)
725 {
726   const struct ReleaseReserveMessage *msg =
727       (const struct ReleaseReserveMessage *) message;
728   struct ReservationList *pos;
729   struct ReservationList *prev;
730   struct ReservationList *next;
731   int rid = ntohl (msg->rid);
732   unsigned long long rem;
733
734 #if DEBUG_DATASTORE
735   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
736               "Processing `%s' request\n", "RELEASE_RESERVE");
737 #endif
738   next = reservations;
739   prev = NULL;
740   while (NULL != (pos = next))
741   {
742     next = pos->next;
743     if (rid == pos->rid)
744     {
745       if (prev == NULL)
746         reservations = next;
747       else
748         prev->next = next;
749       rem =
750           pos->amount +
751           ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
752       GNUNET_assert (reserved >= rem);
753       reserved -= rem;
754       GNUNET_STATISTICS_set (stats,
755                              gettext_noop ("# reserved"), reserved, GNUNET_NO);
756 #if DEBUG_DATASTORE
757       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
758                   "Returning %llu remaining reserved bytes to storage pool\n",
759                   rem);
760 #endif
761       GNUNET_free (pos);
762       transmit_status (client, GNUNET_OK, NULL);
763       return;
764     }
765     prev = pos;
766   }
767   GNUNET_break (0);
768   transmit_status (client, GNUNET_SYSERR,
769                    gettext_noop ("Could not find matching reservation"));
770 }
771
772
773 /**
774  * Check that the given message is a valid data message.
775  *
776  * @return NULL if the message is not well-formed, otherwise the message
777  */
778 static const struct DataMessage *
779 check_data (const struct GNUNET_MessageHeader *message)
780 {
781   uint16_t size;
782   uint32_t dsize;
783   const struct DataMessage *dm;
784
785   size = ntohs (message->size);
786   if (size < sizeof (struct DataMessage))
787   {
788     GNUNET_break (0);
789     return NULL;
790   }
791   dm = (const struct DataMessage *) message;
792   dsize = ntohl (dm->size);
793   if (size != dsize + sizeof (struct DataMessage))
794   {
795     GNUNET_break (0);
796     return NULL;
797   }
798   return dm;
799 }
800
801
802 /**
803  * Context for a PUT request used to see if the content is
804  * already present.
805  */
806 struct PutContext
807 {
808   /**
809    * Client to notify on completion.
810    */
811   struct GNUNET_SERVER_Client *client;
812
813 #if ! HAVE_UNALIGNED_64_ACCESS
814   void *reserved;
815 #endif
816
817   /* followed by the 'struct DataMessage' */
818 };
819
820
821 /**
822  * Actually put the data message.
823  */
824 static void
825 execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm)
826 {
827   uint32_t size;
828   char *msg;
829   int ret;
830
831   size = ntohl (dm->size);
832   msg = NULL;
833   ret = plugin->api->put (plugin->api->cls,
834                           &dm->key,
835                           size,
836                           &dm[1],
837                           ntohl (dm->type),
838                           ntohl (dm->priority),
839                           ntohl (dm->anonymity),
840                           ntohl (dm->replication),
841                           GNUNET_TIME_absolute_ntoh (dm->expiration), &msg);
842   if (GNUNET_OK == ret)
843   {
844     GNUNET_STATISTICS_update (stats,
845                               gettext_noop ("# bytes stored"),
846                               size, GNUNET_YES);
847     GNUNET_CONTAINER_bloomfilter_add (filter, &dm->key);
848 #if DEBUG_DATASTORE
849     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850                 "Successfully stored %u bytes of type %u under key `%s'\n",
851                 size, ntohl (dm->type), GNUNET_h2s (&dm->key));
852 #endif
853   }
854   transmit_status (client, ret, msg);
855   GNUNET_free_non_null (msg);
856   if (quota - reserved - cache_size < payload)
857   {
858     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
859                 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
860                 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
861                 (unsigned long long) (quota - reserved - cache_size),
862                 (unsigned long long) payload);
863     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
864   }
865 }
866
867
868 /**
869  * Function that will check if the given datastore entry
870  * matches the put and if none match executes the put.
871  *
872  * @param cls closure, pointer to the client (of type 'struct PutContext').
873  * @param key key for the content
874  * @param size number of bytes in data
875  * @param data content stored
876  * @param type type of the content
877  * @param priority priority of the content
878  * @param anonymity anonymity-level for the content
879  * @param expiration expiration time for the content
880  * @param uid unique identifier for the datum;
881  *        maybe 0 if no unique identifier is available
882  *
883  * @return GNUNET_OK usually
884  *         GNUNET_NO to delete the item 
885  */
886 static int
887 check_present (void *cls,
888                const GNUNET_HashCode * key,
889                uint32_t size,
890                const void *data,
891                enum GNUNET_BLOCK_Type type,
892                uint32_t priority,
893                uint32_t anonymity,
894                struct GNUNET_TIME_Absolute expiration, uint64_t uid)
895 {
896   struct PutContext *pc = cls;
897   const struct DataMessage *dm;
898
899   dm = (const struct DataMessage *) &pc[1];
900   if (key == NULL)
901   {
902     execute_put (pc->client, dm);
903     GNUNET_SERVER_client_drop (pc->client);
904     GNUNET_free (pc);
905     return GNUNET_OK;
906   }
907   if ((GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
908       (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
909       ((size == ntohl (dm->size)) && (0 == memcmp (&dm[1], data, size))))
910   {
911 #if DEBUG_MYSQL
912     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
913                 "Result already present in datastore\n");
914 #endif
915     /* FIXME: change API to allow increasing 'replication' counter */
916     if ((ntohl (dm->priority) > 0) ||
917         (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value >
918          expiration.abs_value))
919       plugin->api->update (plugin->api->cls,
920                            uid,
921                            (int32_t) ntohl (dm->priority),
922                            GNUNET_TIME_absolute_ntoh (dm->expiration), NULL);
923     transmit_status (pc->client, GNUNET_NO, NULL);
924     GNUNET_SERVER_client_drop (pc->client);
925     GNUNET_free (pc);
926   }
927   else
928   {
929     execute_put (pc->client, dm);
930     GNUNET_SERVER_client_drop (pc->client);
931     GNUNET_free (pc);
932   }
933   return GNUNET_OK;
934 }
935
936
937 /**
938  * Handle PUT-message.
939  *
940  * @param cls closure
941  * @param client identification of the client
942  * @param message the actual message
943  */
944 static void
945 handle_put (void *cls,
946             struct GNUNET_SERVER_Client *client,
947             const struct GNUNET_MessageHeader *message)
948 {
949   const struct DataMessage *dm = check_data (message);
950   int rid;
951   struct ReservationList *pos;
952   struct PutContext *pc;
953   GNUNET_HashCode vhash;
954   uint32_t size;
955
956   if ((dm == NULL) || (ntohl (dm->type) == 0))
957   {
958     GNUNET_break (0);
959     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
960     return;
961   }
962 #if DEBUG_DATASTORE
963   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
964               "Processing `%s' request for `%s' of type %u\n",
965               "PUT", GNUNET_h2s (&dm->key), ntohl (dm->type));
966 #endif
967   rid = ntohl (dm->rid);
968   size = ntohl (dm->size);
969   if (rid > 0)
970   {
971     pos = reservations;
972     while ((NULL != pos) && (rid != pos->rid))
973       pos = pos->next;
974     GNUNET_break (pos != NULL);
975     if (NULL != pos)
976     {
977       GNUNET_break (pos->entries > 0);
978       GNUNET_break (pos->amount >= size);
979       pos->entries--;
980       pos->amount -= size;
981       reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
982       GNUNET_STATISTICS_set (stats,
983                              gettext_noop ("# reserved"), reserved, GNUNET_NO);
984     }
985   }
986   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter, &dm->key))
987   {
988     GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
989     pc = GNUNET_malloc (sizeof (struct PutContext) + size +
990                         sizeof (struct DataMessage));
991     pc->client = client;
992     GNUNET_SERVER_client_keep (client);
993     memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
994     plugin->api->get_key (plugin->api->cls,
995                           0,
996                           &dm->key,
997                           &vhash, ntohl (dm->type), &check_present, pc);
998     return;
999   }
1000   execute_put (client, dm);
1001 }
1002
1003
1004 /**
1005  * Handle GET-message.
1006  *
1007  * @param cls closure
1008  * @param client identification of the client
1009  * @param message the actual message
1010  */
1011 static void
1012 handle_get (void *cls,
1013             struct GNUNET_SERVER_Client *client,
1014             const struct GNUNET_MessageHeader *message)
1015 {
1016   const struct GetMessage *msg;
1017   uint16_t size;
1018
1019   size = ntohs (message->size);
1020   if ((size != sizeof (struct GetMessage)) &&
1021       (size != sizeof (struct GetMessage) - sizeof (GNUNET_HashCode)))
1022   {
1023     GNUNET_break (0);
1024     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1025     return;
1026   }
1027   msg = (const struct GetMessage *) message;
1028 #if DEBUG_DATASTORE
1029   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030               "Processing `%s' request for `%s' of type %u\n",
1031               "GET", GNUNET_h2s (&msg->key), ntohl (msg->type));
1032 #endif
1033   GNUNET_STATISTICS_update (stats,
1034                             gettext_noop ("# GET requests received"),
1035                             1, GNUNET_NO);
1036   GNUNET_SERVER_client_keep (client);
1037   if ((size == sizeof (struct GetMessage)) &&
1038       (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)))
1039   {
1040     /* don't bother database... */
1041 #if DEBUG_DATASTORE
1042     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043                 "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1044                 "GET", GNUNET_h2s (&msg->key));
1045 #endif
1046     GNUNET_STATISTICS_update (stats,
1047                               gettext_noop
1048                               ("# requests filtered by bloomfilter"), 1,
1049                               GNUNET_NO);
1050     transmit_item (client, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1051                    0);
1052     return;
1053   }
1054   plugin->api->get_key (plugin->api->cls,
1055                         GNUNET_ntohll (msg->offset),
1056                         ((size ==
1057                           sizeof (struct GetMessage)) ? &msg->key : NULL), NULL,
1058                         ntohl (msg->type), &transmit_item, client);
1059 }
1060
1061
1062 /**
1063  * Handle UPDATE-message.
1064  *
1065  * @param cls closure
1066  * @param client identification of the client
1067  * @param message the actual message
1068  */
1069 static void
1070 handle_update (void *cls,
1071                struct GNUNET_SERVER_Client *client,
1072                const struct GNUNET_MessageHeader *message)
1073 {
1074   const struct UpdateMessage *msg;
1075   int ret;
1076   char *emsg;
1077
1078   GNUNET_STATISTICS_update (stats,
1079                             gettext_noop ("# UPDATE requests received"),
1080                             1, GNUNET_NO);
1081   msg = (const struct UpdateMessage *) message;
1082   emsg = NULL;
1083 #if DEBUG_DATASTORE
1084   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1085               "Processing `%s' request for %llu\n",
1086               "UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid));
1087 #endif
1088   ret = plugin->api->update (plugin->api->cls,
1089                              GNUNET_ntohll (msg->uid),
1090                              (int32_t) ntohl (msg->priority),
1091                              GNUNET_TIME_absolute_ntoh (msg->expiration),
1092                              &emsg);
1093   transmit_status (client, ret, emsg);
1094   GNUNET_free_non_null (emsg);
1095 }
1096
1097
1098 /**
1099  * Handle GET_REPLICATION-message.
1100  *
1101  * @param cls closure
1102  * @param client identification of the client
1103  * @param message the actual message
1104  */
1105 static void
1106 handle_get_replication (void *cls,
1107                         struct GNUNET_SERVER_Client *client,
1108                         const struct GNUNET_MessageHeader *message)
1109 {
1110 #if DEBUG_DATASTORE
1111   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1112               "Processing `%s' request\n", "GET_REPLICATION");
1113 #endif
1114   GNUNET_STATISTICS_update (stats,
1115                             gettext_noop
1116                             ("# GET REPLICATION requests received"), 1,
1117                             GNUNET_NO);
1118   GNUNET_SERVER_client_keep (client);
1119   plugin->api->get_replication (plugin->api->cls, &transmit_item, client);
1120 }
1121
1122
1123 /**
1124  * Handle GET_ZERO_ANONYMITY-message.
1125  *
1126  * @param cls closure
1127  * @param client identification of the client
1128  * @param message the actual message
1129  */
1130 static void
1131 handle_get_zero_anonymity (void *cls,
1132                            struct GNUNET_SERVER_Client *client,
1133                            const struct GNUNET_MessageHeader *message)
1134 {
1135   const struct GetZeroAnonymityMessage *msg =
1136       (const struct GetZeroAnonymityMessage *) message;
1137   enum GNUNET_BLOCK_Type type;
1138
1139   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1140   if (type == GNUNET_BLOCK_TYPE_ANY)
1141   {
1142     GNUNET_break (0);
1143     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1144     return;
1145   }
1146 #if DEBUG_DATASTORE
1147   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1148               "Processing `%s' request\n", "GET_ZERO_ANONYMITY");
1149 #endif
1150   GNUNET_STATISTICS_update (stats,
1151                             gettext_noop
1152                             ("# GET ZERO ANONYMITY requests received"), 1,
1153                             GNUNET_NO);
1154   GNUNET_SERVER_client_keep (client);
1155   plugin->api->get_zero_anonymity (plugin->api->cls,
1156                                    GNUNET_ntohll (msg->offset),
1157                                    type, &transmit_item, client);
1158 }
1159
1160
1161 /**
1162  * Callback function that will cause the item that is passed
1163  * in to be deleted (by returning GNUNET_NO).
1164  */
1165 static int
1166 remove_callback (void *cls,
1167                  const GNUNET_HashCode * key,
1168                  uint32_t size,
1169                  const void *data,
1170                  enum GNUNET_BLOCK_Type type,
1171                  uint32_t priority,
1172                  uint32_t anonymity,
1173                  struct GNUNET_TIME_Absolute expiration, uint64_t uid)
1174 {
1175   struct GNUNET_SERVER_Client *client = cls;
1176
1177   if (key == NULL)
1178   {
1179 #if DEBUG_DATASTORE
1180     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1181                 "No further matches for `%s' request.\n", "REMOVE");
1182 #endif
1183     transmit_status (client, GNUNET_NO, _("Content not found"));
1184     GNUNET_SERVER_client_drop (client);
1185     return GNUNET_OK;           /* last item */
1186   }
1187 #if DEBUG_DATASTORE
1188   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1189               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1190               (unsigned long long) uid, "REMOVE", GNUNET_h2s (key), type);
1191 #endif
1192   GNUNET_STATISTICS_update (stats,
1193                             gettext_noop ("# bytes removed (explicit request)"),
1194                             size, GNUNET_YES);
1195   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
1196   transmit_status (client, GNUNET_OK, NULL);
1197   GNUNET_SERVER_client_drop (client);
1198   return GNUNET_NO;
1199 }
1200
1201
1202 /**
1203  * Handle REMOVE-message.
1204  *
1205  * @param cls closure
1206  * @param client identification of the client
1207  * @param message the actual message
1208  */
1209 static void
1210 handle_remove (void *cls,
1211                struct GNUNET_SERVER_Client *client,
1212                const struct GNUNET_MessageHeader *message)
1213 {
1214   const struct DataMessage *dm = check_data (message);
1215   GNUNET_HashCode vhash;
1216
1217   if (dm == NULL)
1218   {
1219     GNUNET_break (0);
1220     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1221     return;
1222   }
1223 #if DEBUG_DATASTORE
1224   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1225               "Processing `%s' request for `%s' of type %u\n",
1226               "REMOVE", GNUNET_h2s (&dm->key), ntohl (dm->type));
1227 #endif
1228   GNUNET_STATISTICS_update (stats,
1229                             gettext_noop ("# REMOVE requests received"),
1230                             1, GNUNET_NO);
1231   GNUNET_SERVER_client_keep (client);
1232   GNUNET_CRYPTO_hash (&dm[1], ntohl (dm->size), &vhash);
1233   plugin->api->get_key (plugin->api->cls,
1234                         0,
1235                         &dm->key,
1236                         &vhash,
1237                         (enum GNUNET_BLOCK_Type) ntohl (dm->type),
1238                         &remove_callback, client);
1239 }
1240
1241
1242 /**
1243  * Handle DROP-message.
1244  *
1245  * @param cls closure
1246  * @param client identification of the client
1247  * @param message the actual message
1248  */
1249 static void
1250 handle_drop (void *cls,
1251              struct GNUNET_SERVER_Client *client,
1252              const struct GNUNET_MessageHeader *message)
1253 {
1254 #if DEBUG_DATASTORE
1255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "DROP");
1256 #endif
1257   do_drop = GNUNET_YES;
1258   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1259 }
1260
1261
1262 /**
1263  * Function called by plugins to notify us about a
1264  * change in their disk utilization.
1265  *
1266  * @param cls closure (NULL)
1267  * @param delta change in disk utilization, 
1268  *        0 for "reset to empty"
1269  */
1270 static void
1271 disk_utilization_change_cb (void *cls, int delta)
1272 {
1273   if ((delta < 0) && (payload < -delta))
1274   {
1275     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1276                 _
1277                 ("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1278                 (long long) payload, (long long) -delta);
1279     payload = plugin->api->estimate_size (plugin->api->cls);
1280     sync_stats ();
1281     return;
1282   }
1283   payload += delta;
1284   lastSync++;
1285   if (lastSync >= MAX_STAT_SYNC_LAG)
1286     sync_stats ();
1287 }
1288
1289
1290 /**
1291  * Callback function to process statistic values.
1292  *
1293  * @param cls closure (struct Plugin*)
1294  * @param subsystem name of subsystem that created the statistic
1295  * @param name the name of the datum
1296  * @param value the current value
1297  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1298  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1299  */
1300 static int
1301 process_stat_in (void *cls,
1302                  const char *subsystem,
1303                  const char *name, uint64_t value, int is_persistent)
1304 {
1305   GNUNET_assert (stats_worked == GNUNET_NO);
1306   stats_worked = GNUNET_YES;
1307   payload += value;
1308 #if DEBUG_SQLITE
1309   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1310               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1311               abs_value, payload);
1312 #endif
1313   return GNUNET_OK;
1314 }
1315
1316
1317 static void
1318 process_stat_done (void *cls, int success)
1319 {
1320   struct DatastorePlugin *plugin = cls;
1321
1322   stat_get = NULL;
1323   if (stats_worked == GNUNET_NO)
1324     payload = plugin->api->estimate_size (plugin->api->cls);
1325 }
1326
1327
1328 /**
1329  * Load the datastore plugin.
1330  */
1331 static struct DatastorePlugin *
1332 load_plugin ()
1333 {
1334   struct DatastorePlugin *ret;
1335   char *libname;
1336   char *name;
1337
1338   if (GNUNET_OK !=
1339       GNUNET_CONFIGURATION_get_value_string (cfg,
1340                                              "DATASTORE", "DATABASE", &name))
1341   {
1342     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1343                 _("No `%s' specified for `%s' in configuration!\n"),
1344                 "DATABASE", "DATASTORE");
1345     return NULL;
1346   }
1347   ret = GNUNET_malloc (sizeof (struct DatastorePlugin));
1348   ret->env.cfg = cfg;
1349   ret->env.duc = &disk_utilization_change_cb;
1350   ret->env.cls = NULL;
1351   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1352               _("Loading `%s' datastore plugin\n"), name);
1353   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1354   ret->short_name = name;
1355   ret->lib_name = libname;
1356   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1357   if (ret->api == NULL)
1358   {
1359     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1360                 _("Failed to load datastore plugin for `%s'\n"), name);
1361     GNUNET_free (ret->short_name);
1362     GNUNET_free (libname);
1363     GNUNET_free (ret);
1364     return NULL;
1365   }
1366   return ret;
1367 }
1368
1369
1370 /**
1371  * Function called when the service shuts
1372  * down.  Unloads our datastore plugin.
1373  *
1374  * @param plug plugin to unload
1375  */
1376 static void
1377 unload_plugin (struct DatastorePlugin *plug)
1378 {
1379 #if DEBUG_DATASTORE
1380   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1381               "Datastore service is unloading plugin...\n");
1382 #endif
1383   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1384   GNUNET_free (plug->lib_name);
1385   GNUNET_free (plug->short_name);
1386   GNUNET_free (plug);
1387 }
1388
1389
1390 /**
1391  * Final task run after shutdown.  Unloads plugins and disconnects us from
1392  * statistics.
1393  */
1394 static void
1395 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1396 {
1397   if (GNUNET_YES == do_drop)
1398     plugin->api->drop (plugin->api->cls);
1399   unload_plugin (plugin);
1400   plugin = NULL;
1401   if (filter != NULL)
1402   {
1403     GNUNET_CONTAINER_bloomfilter_free (filter);
1404     filter = NULL;
1405   }
1406   if (lastSync > 0)
1407     sync_stats ();
1408   if (stat_get != NULL)
1409   {
1410     GNUNET_STATISTICS_get_cancel (stat_get);
1411     stat_get = NULL;
1412   }
1413   if (stats != NULL)
1414   {
1415     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1416     stats = NULL;
1417   }
1418 }
1419
1420
1421 /**
1422  * Last task run during shutdown.  Disconnects us from
1423  * the transport and core.
1424  */
1425 static void
1426 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1427 {
1428   struct TransmitCallbackContext *tcc;
1429
1430   cleaning_done = GNUNET_YES;
1431   while (NULL != (tcc = tcc_head))
1432   {
1433     GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
1434     if (tcc->th != NULL)
1435     {
1436       GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1437       GNUNET_SERVER_client_drop (tcc->client);
1438     }
1439     GNUNET_free (tcc->msg);
1440     GNUNET_free (tcc);
1441   }
1442   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1443   {
1444     GNUNET_SCHEDULER_cancel (expired_kill_task);
1445     expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1446   }
1447   GNUNET_SCHEDULER_add_continuation (&unload_task,
1448                                      NULL, GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1449 }
1450
1451
1452 /**
1453  * Function that removes all active reservations made
1454  * by the given client and releases the space for other
1455  * requests.
1456  *
1457  * @param cls closure
1458  * @param client identification of the client
1459  */
1460 static void
1461 cleanup_reservations (void *cls, struct GNUNET_SERVER_Client *client)
1462 {
1463   struct ReservationList *pos;
1464   struct ReservationList *prev;
1465   struct ReservationList *next;
1466
1467   if (client == NULL)
1468     return;
1469   prev = NULL;
1470   pos = reservations;
1471   while (NULL != pos)
1472   {
1473     next = pos->next;
1474     if (pos->client == client)
1475     {
1476       if (prev == NULL)
1477         reservations = next;
1478       else
1479         prev->next = next;
1480       reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1481       GNUNET_free (pos);
1482     }
1483     else
1484     {
1485       prev = pos;
1486     }
1487     pos = next;
1488   }
1489   GNUNET_STATISTICS_set (stats,
1490                          gettext_noop ("# reserved"), reserved, GNUNET_NO);
1491 }
1492
1493
1494 /**
1495  * Process datastore requests.
1496  *
1497  * @param cls closure
1498  * @param server the initialized server
1499  * @param c configuration to use
1500  */
1501 static void
1502 run (void *cls,
1503      struct GNUNET_SERVER_Handle *server,
1504      const struct GNUNET_CONFIGURATION_Handle *c)
1505 {
1506   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1507     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1508      sizeof (struct ReserveMessage)},
1509     {&handle_release_reserve, NULL,
1510      GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1511      sizeof (struct ReleaseReserveMessage)},
1512     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
1513     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1514      sizeof (struct UpdateMessage)},
1515     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
1516     {&handle_get_replication, NULL,
1517      GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1518      sizeof (struct GNUNET_MessageHeader)},
1519     {&handle_get_zero_anonymity, NULL,
1520      GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1521      sizeof (struct GetZeroAnonymityMessage)},
1522     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
1523     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1524      sizeof (struct GNUNET_MessageHeader)},
1525     {NULL, NULL, 0, 0}
1526   };
1527   char *fn;
1528   unsigned int bf_size;
1529
1530   cfg = c;
1531   if (GNUNET_OK !=
1532       GNUNET_CONFIGURATION_get_value_number (cfg, "DATASTORE", "QUOTA", &quota))
1533   {
1534     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1535                 _("No `%s' specified for `%s' in configuration!\n"),
1536                 "QUOTA", "DATASTORE");
1537     return;
1538   }
1539   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1540   GNUNET_STATISTICS_set (stats, gettext_noop ("# quota"), quota, GNUNET_NO);
1541   cache_size = quota / 8;       /* Or should we make this an option? */
1542   GNUNET_STATISTICS_set (stats,
1543                          gettext_noop ("# cache size"), cache_size, GNUNET_NO);
1544   bf_size = quota / 32;         /* 8 bit per entry, 1 bit per 32 kb in DB */
1545   fn = NULL;
1546   if ((GNUNET_OK !=
1547        GNUNET_CONFIGURATION_get_value_filename (cfg,
1548                                                 "DATASTORE",
1549                                                 "BLOOMFILTER",
1550                                                 &fn)) ||
1551       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
1552   {
1553     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1554                 _("Could not use specified filename `%s' for bloomfilter.\n"),
1555                 fn != NULL ? fn : "");
1556     GNUNET_free_non_null (fn);
1557     fn = NULL;
1558   }
1559   if (fn != NULL)
1560     filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);        /* approx. 3% false positives at max use */
1561   else
1562     filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);      /* approx. 3% false positives at max use */
1563   GNUNET_free_non_null (fn);
1564   if (filter == NULL)
1565   {
1566     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1567                 _("Failed to initialize bloomfilter.\n"));
1568     if (stats != NULL)
1569     {
1570       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1571       stats = NULL;
1572     }
1573     return;
1574   }
1575   plugin = load_plugin ();
1576   if (NULL == plugin)
1577   {
1578     GNUNET_CONTAINER_bloomfilter_free (filter);
1579     filter = NULL;
1580     if (stats != NULL)
1581     {
1582       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1583       stats = NULL;
1584     }
1585     return;
1586   }
1587   stat_get = GNUNET_STATISTICS_get (stats,
1588                                     "datastore",
1589                                     QUOTA_STAT_NAME,
1590                                     GNUNET_TIME_UNIT_SECONDS,
1591                                     &process_stat_done,
1592                                     &process_stat_in, plugin);
1593   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1594   GNUNET_SERVER_add_handlers (server, handlers);
1595   expired_kill_task
1596       = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1597                                             &delete_expired, NULL);
1598   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1599                                 &cleaning_task, NULL);
1600 }
1601
1602
1603 /**
1604  * The main function for the datastore service.
1605  *
1606  * @param argc number of arguments from the command line
1607  * @param argv command line arguments
1608  * @return 0 ok, 1 on error
1609  */
1610 int
1611 main (int argc, char *const *argv)
1612 {
1613   int ret;
1614
1615   ret = (GNUNET_OK ==
1616          GNUNET_SERVICE_run (argc,
1617                              argv,
1618                              "datastore",
1619                              GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1;
1620   return ret;
1621 }
1622
1623
1624 /* end of gnunet-service-datastore.c */