09df51d9b3ca1a45aa82637e635d26227df0ed0a
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "plugin_datastore.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
46
47 /**
48  * After how many payload-changing operations
49  * do we sync our statistics?
50  */
51 #define MAX_STAT_SYNC_LAG 50
52
53
54 /**
55  * Our datastore plugin.
56  */
57 struct DatastorePlugin
58 {
59
60   /**
61    * API of the transport as returned by the plugin's
62    * initialization function.
63    */
64   struct GNUNET_DATASTORE_PluginFunctions *api;
65
66   /**
67    * Short name for the plugin (i.e. "sqlite").
68    */
69   char *short_name;
70
71   /**
72    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
73    */
74   char *lib_name;
75
76   /**
77    * Environment this transport service is using
78    * for this plugin.
79    */
80   struct GNUNET_DATASTORE_PluginEnvironment env;
81
82 };
83
84
85 /**
86  * Linked list of active reservations.
87  */
88 struct ReservationList 
89 {
90
91   /**
92    * This is a linked list.
93    */
94   struct ReservationList *next;
95
96   /**
97    * Client that made the reservation.
98    */
99   struct GNUNET_SERVER_Client *client;
100
101   /**
102    * Number of bytes (still) reserved.
103    */
104   uint64_t amount;
105
106   /**
107    * Number of items (still) reserved.
108    */
109   uint64_t entries;
110
111   /**
112    * Reservation identifier.
113    */
114   int32_t rid;
115
116 };
117
118
119
120 /**
121  * Our datastore plugin (NULL if not available).
122  */
123 static struct DatastorePlugin *plugin;
124
125 /**
126  * Linked list of space reservations made by clients.
127  */
128 static struct ReservationList *reservations;
129
130 /**
131  * Bloomfilter to quickly tell if we don't have the content.
132  */
133 static struct GNUNET_CONTAINER_BloomFilter *filter;
134
135 /**
136  * How much space are we allowed to use?
137  */
138 static unsigned long long quota;
139
140 /**
141  * How much space are we using for the cache?  (space available for
142  * insertions that will be instantly reclaimed by discarding less
143  * important content --- or possibly whatever we just inserted into
144  * the "cache").
145  */
146 static unsigned long long cache_size;
147
148 /**
149  * How much space have we currently reserved?
150  */
151 static unsigned long long reserved;
152   
153 /**
154  * How much data are we currently storing
155  * in the database?
156  */
157 static unsigned long long payload;
158
159 /**
160  * Number of updates that were made to the
161  * payload value since we last synchronized
162  * it with the statistics service.
163  */
164 static unsigned int lastSync;
165
166 /**
167  * Did we get an answer from statistics?
168  */
169 static int stats_worked;
170
171 /**
172  * Identity of the task that is used to delete
173  * expired content.
174  */
175 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
176
177 /**
178  * Our configuration.
179  */
180 const struct GNUNET_CONFIGURATION_Handle *cfg;
181
182
183 /**
184  * Handle for reporting statistics.
185  */
186 static struct GNUNET_STATISTICS_Handle *stats;
187
188
189 /**
190  * Synchronize our utilization statistics with the 
191  * statistics service.
192  */
193 static void 
194 sync_stats ()
195 {
196   GNUNET_STATISTICS_set (stats,
197                          QUOTA_STAT_NAME,
198                          payload,
199                          GNUNET_YES);
200   lastSync = 0;
201 }
202
203
204
205
206 /**
207  * Function called once the transmit operation has
208  * either failed or succeeded.
209  *
210  * @param cls closure
211  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
212  */
213 typedef void (*TransmitContinuation)(void *cls,
214                                      int status);
215
216
217 /**
218  * Context for transmitting replies to clients.
219  */
220 struct TransmitCallbackContext 
221 {
222   
223   /**
224    * We keep these in a doubly-linked list (for cleanup).
225    */
226   struct TransmitCallbackContext *next;
227   
228   /**
229    * We keep these in a doubly-linked list (for cleanup).
230    */
231   struct TransmitCallbackContext *prev;
232   
233   /**
234    * The message that we're asked to transmit.
235    */
236   struct GNUNET_MessageHeader *msg;
237   
238   /**
239    * Handle for the transmission request.
240    */
241   struct GNUNET_CONNECTION_TransmitHandle *th;
242
243   /**
244    * Client that we are transmitting to.
245    */
246   struct GNUNET_SERVER_Client *client;
247
248   /**
249    * Function to call once msg has been transmitted
250    * (or at least added to the buffer).
251    */
252   TransmitContinuation tc;
253
254   /**
255    * Closure for tc.
256    */
257   void *tc_cls;
258
259   /**
260    * GNUNET_YES if we are supposed to signal the server
261    * completion of the client's request.
262    */
263   int end;
264 };
265
266   
267 /**
268  * Head of the doubly-linked list (for cleanup).
269  */
270 static struct TransmitCallbackContext *tcc_head;
271
272 /**
273  * Tail of the doubly-linked list (for cleanup).
274  */
275 static struct TransmitCallbackContext *tcc_tail;
276
277 /**
278  * Have we already cleaned up the TCCs and are hence no longer
279  * willing (or able) to transmit anything to anyone?
280  */
281 static int cleaning_done;
282
283 /**
284  * Handle for pending get request.
285  */
286 static struct GNUNET_STATISTICS_GetHandle *stat_get;
287
288
289 /**
290  * Task that is used to remove expired entries from
291  * the datastore.  This task will schedule itself
292  * again automatically to always delete all expired
293  * content quickly.
294  *
295  * @param cls not used
296  * @param tc task context
297  */ 
298 static void
299 delete_expired (void *cls,
300                 const struct GNUNET_SCHEDULER_TaskContext *tc);
301
302
303 /**
304  * Iterate over the expired items stored in the datastore.
305  * Delete all expired items; once we have processed all
306  * expired items, re-schedule the "delete_expired" task.
307  *
308  * @param cls not used
309  * @param next_cls closure to pass to the "next" function.
310  * @param key key for the content
311  * @param size number of bytes in data
312  * @param data content stored
313  * @param type type of the content
314  * @param priority priority of the content
315  * @param anonymity anonymity-level for the content
316  * @param expiration expiration time for the content
317  * @param uid unique identifier for the datum;
318  *        maybe 0 if no unique identifier is available
319  *
320  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
321  *         (continue on call to "next", of course),
322  *         GNUNET_NO to delete the item and continue (if supported)
323  */
324 static int 
325 expired_processor (void *cls,
326                    void *next_cls,
327                    const GNUNET_HashCode * key,
328                    uint32_t size,
329                    const void *data,
330                    enum GNUNET_BLOCK_Type type,
331                    uint32_t priority,
332                    uint32_t anonymity,
333                    struct GNUNET_TIME_Absolute
334                    expiration, 
335                    uint64_t uid)
336 {
337   struct GNUNET_TIME_Absolute now;
338
339   if (key == NULL) 
340     {
341       expired_kill_task 
342         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
343                                         &delete_expired,
344                                         NULL);
345       return GNUNET_SYSERR;
346     }
347   now = GNUNET_TIME_absolute_get ();
348   if (expiration.abs_value > now.abs_value)
349     {
350       /* finished processing */
351       plugin->api->next_request (next_cls, GNUNET_YES);
352       return GNUNET_SYSERR;
353     }
354   plugin->api->next_request (next_cls, GNUNET_NO);
355 #if DEBUG_DATASTORE
356   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
357               "Deleting content `%s' of type %u that expired %llu ms ago\n",
358               GNUNET_h2s (key),
359               type,
360               (unsigned long long) (now.abs_value - expiration.abs_value));
361 #endif
362   GNUNET_STATISTICS_update (stats,
363                             gettext_noop ("# bytes expired"),
364                             size,
365                             GNUNET_YES);
366   GNUNET_CONTAINER_bloomfilter_remove (filter,
367                                        key);
368   return GNUNET_NO; /* delete */
369 }
370
371
372 /**
373  * Task that is used to remove expired entries from
374  * the datastore.  This task will schedule itself
375  * again automatically to always delete all expired
376  * content quickly.
377  *
378  * @param cls not used
379  * @param tc task context
380  */ 
381 static void
382 delete_expired (void *cls,
383                 const struct GNUNET_SCHEDULER_TaskContext *tc)
384 {
385   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
386   plugin->api->iter_ascending_expiration (plugin->api->cls, 
387                                           0,
388                                           &expired_processor,
389                                           NULL);
390 }
391
392
393 /**
394  * An iterator over a set of items stored in the datastore.
395  *
396  * @param cls closure
397  * @param next_cls closure to pass to the "next" function.
398  * @param key key for the content
399  * @param size number of bytes in data
400  * @param data content stored
401  * @param type type of the content
402  * @param priority priority of the content
403  * @param anonymity anonymity-level for the content
404  * @param expiration expiration time for the content
405  * @param uid unique identifier for the datum;
406  *        maybe 0 if no unique identifier is available
407  *
408  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
409  *         (continue on call to "next", of course),
410  *         GNUNET_NO to delete the item and continue (if supported)
411  */
412 static int 
413 manage (void *cls,
414         void *next_cls,
415         const GNUNET_HashCode * key,
416         uint32_t size,
417         const void *data,
418         enum GNUNET_BLOCK_Type type,
419         uint32_t priority,
420         uint32_t anonymity,
421         struct GNUNET_TIME_Absolute
422         expiration, 
423         uint64_t uid)
424 {
425   unsigned long long *need = cls;
426
427   if (NULL == key)
428     {
429       GNUNET_free (need);
430       return GNUNET_SYSERR;
431     }
432   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
433     *need = 0;
434   else
435     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
436   plugin->api->next_request (next_cls, 
437                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
438 #if DEBUG_DATASTORE
439   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
440               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
441               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
442               GNUNET_h2s (key),
443               type,
444               *need);
445 #endif
446   GNUNET_STATISTICS_update (stats,
447                             gettext_noop ("# bytes purged (low-priority)"),
448                             size,
449                             GNUNET_YES);
450   GNUNET_CONTAINER_bloomfilter_remove (filter,
451                                        key);
452   return GNUNET_NO;
453 }
454
455
456 /**
457  * Manage available disk space by running tasks
458  * that will discard content if necessary.  This
459  * function will be run whenever a request for
460  * "need" bytes of storage could only be satisfied
461  * by eating into the "cache" (and we want our cache
462  * space back).
463  *
464  * @param need number of bytes of content that were
465  *        placed into the "cache" (and hence the
466  *        number of bytes that should be removed).
467  */
468 static void
469 manage_space (unsigned long long need)
470 {
471   unsigned long long *n;
472
473 #if DEBUG_DATASTORE
474   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
475               "Asked to free up %llu bytes of cache space\n",
476               need);
477 #endif
478   n = GNUNET_malloc (sizeof(unsigned long long));
479   *n = need;
480   plugin->api->iter_low_priority (plugin->api->cls,
481                                   0,
482                                   &manage,
483                                   n);
484 }
485
486
487 /**
488  * Function called to notify a client about the socket
489  * begin ready to queue more data.  "buf" will be
490  * NULL and "size" zero if the socket was closed for
491  * writing in the meantime.
492  *
493  * @param cls closure
494  * @param size number of bytes available in buf
495  * @param buf where the callee should write the message
496  * @return number of bytes written to buf
497  */
498 static size_t
499 transmit_callback (void *cls,
500                    size_t size, void *buf)
501 {
502   struct TransmitCallbackContext *tcc = cls;
503   size_t msize;
504   
505   tcc->th = NULL;
506   GNUNET_CONTAINER_DLL_remove (tcc_head,
507                                tcc_tail,
508                                tcc);
509   msize = ntohs(tcc->msg->size);
510   if (size == 0)
511     {
512       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
513                   _("Transmission to client failed!\n"));
514       if (tcc->tc != NULL)
515         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
516       if (GNUNET_YES == tcc->end)
517         {
518           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
519                       _("Disconnecting client due to transmission failure!\n"));
520           GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
521         }
522       GNUNET_SERVER_client_drop (tcc->client);
523       GNUNET_free (tcc->msg);
524       GNUNET_free (tcc);
525       return 0;
526     }
527   GNUNET_assert (size >= msize);
528   memcpy (buf, tcc->msg, msize);
529   if (tcc->tc != NULL)
530     tcc->tc (tcc->tc_cls, GNUNET_OK);
531   if (GNUNET_YES == tcc->end)
532     {
533       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
534     }
535   else
536     {
537 #if DEBUG_DATASTORE
538       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
539                   "Response transmitted, more pending!\n");
540 #endif
541     }
542   GNUNET_SERVER_client_drop (tcc->client);
543   GNUNET_free (tcc->msg);
544   GNUNET_free (tcc);
545   return msize;
546 }
547
548
549 /**
550  * Transmit the given message to the client.
551  *
552  * @param client target of the message
553  * @param msg message to transmit, will be freed!
554  * @param tc function to call afterwards
555  * @param tc_cls closure for tc
556  * @param end is this the last response (and we should
557  *        signal the server completion accodingly after
558  *        transmitting this message)?
559  */
560 static void
561 transmit (struct GNUNET_SERVER_Client *client,
562           struct GNUNET_MessageHeader *msg,
563           TransmitContinuation tc,
564           void *tc_cls,
565           int end)
566 {
567   struct TransmitCallbackContext *tcc;
568
569   if (GNUNET_YES == cleaning_done)
570     {
571 #if DEBUG_DATASTORE
572       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
573                   "Shutdown in progress, aborting transmission.\n");
574 #endif
575       GNUNET_free (msg);
576       if (NULL != tc)
577         tc (tc_cls, GNUNET_SYSERR);
578       return;
579     }
580   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
581   tcc->msg = msg;
582   tcc->client = client;
583   tcc->tc = tc;
584   tcc->tc_cls = tc_cls;
585   tcc->end = end;
586   if (NULL ==
587       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
588                                                       ntohs(msg->size),
589                                                       GNUNET_TIME_UNIT_FOREVER_REL,
590                                                       &transmit_callback,
591                                                       tcc)))
592     {
593       GNUNET_break (0);
594       if (GNUNET_YES == end)
595         {
596           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
597                       _("Forcefully disconnecting client.\n"));
598           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
599         }
600       if (NULL != tc)
601         tc (tc_cls, GNUNET_SYSERR);
602       GNUNET_free (msg);
603       GNUNET_free (tcc);
604       return;
605     }
606   GNUNET_SERVER_client_keep (client);
607   GNUNET_CONTAINER_DLL_insert (tcc_head,
608                                tcc_tail,
609                                tcc);
610 }
611
612
613 /**
614  * Transmit a status code to the client.
615  *
616  * @param client receiver of the response
617  * @param code status code
618  * @param msg optional error message (can be NULL)
619  */
620 static void
621 transmit_status (struct GNUNET_SERVER_Client *client,
622                  int code,
623                  const char *msg)
624 {
625   struct StatusMessage *sm;
626   size_t slen;
627
628 #if DEBUG_DATASTORE
629   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
630               "Transmitting `%s' message with value %d and message `%s'\n",
631               "STATUS",
632               code,
633               msg != NULL ? msg : "(none)");
634 #endif
635   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
636   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
637   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
638   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
639   sm->status = htonl(code);
640   if (slen > 0)
641     memcpy (&sm[1], msg, slen);  
642   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
643 }
644
645
646 /**
647  * Function called once the transmit operation has
648  * either failed or succeeded.
649  *
650  * @param next_cls closure for calling "next_request" callback
651  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
652  */
653 static void 
654 get_next(void *next_cls,
655          int status)
656 {
657   if (status != GNUNET_OK)
658     {
659       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
660                   _("Failed to transmit an item to the client; aborting iteration.\n"));
661       if (plugin != NULL)
662         plugin->api->next_request (next_cls, GNUNET_YES);
663       return;
664     }
665   plugin->api->next_request (next_cls, GNUNET_NO);
666 }
667
668
669 /**
670  * Function that will transmit the given datastore entry
671  * to the client.
672  *
673  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
674  * @param next_cls closure to use to ask for the next item
675  * @param key key for the content
676  * @param size number of bytes in data
677  * @param data content stored
678  * @param type type of the content
679  * @param priority priority of the content
680  * @param anonymity anonymity-level for the content
681  * @param expiration expiration time for the content
682  * @param uid unique identifier for the datum;
683  *        maybe 0 if no unique identifier is available
684  *
685  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
686  *         GNUNET_NO to delete the item and continue (if supported)
687  */
688 static int
689 transmit_item (void *cls,
690                void *next_cls,
691                const GNUNET_HashCode * key,
692                uint32_t size,
693                const void *data,
694                enum GNUNET_BLOCK_Type type,
695                uint32_t priority,
696                uint32_t anonymity,
697                struct GNUNET_TIME_Absolute
698                expiration, uint64_t uid)
699 {
700   struct GNUNET_SERVER_Client *client = cls;
701   struct GNUNET_MessageHeader *end;
702   struct DataMessage *dm;
703
704   if (key == NULL)
705     {
706       /* transmit 'DATA_END' */
707 #if DEBUG_DATASTORE
708       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
709                   "Transmitting `%s' message\n",
710                   "DATA_END");
711 #endif
712       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
713       end->size = htons(sizeof(struct GNUNET_MessageHeader));
714       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
715       transmit (client, end, NULL, NULL, GNUNET_YES);
716       GNUNET_SERVER_client_drop (client);
717       return GNUNET_OK;
718     }
719   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
720   dm->header.size = htons(sizeof(struct DataMessage) + size);
721   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
722   dm->rid = htonl(0);
723   dm->size = htonl(size);
724   dm->type = htonl(type);
725   dm->priority = htonl(priority);
726   dm->anonymity = htonl(anonymity);
727   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
728   dm->uid = GNUNET_htonll(uid);
729   dm->key = *key;
730   memcpy (&dm[1], data, size);
731 #if DEBUG_DATASTORE
732   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
733               "Transmitting `%s' message for `%s' of type %u\n",
734               "DATA",
735               GNUNET_h2s (key),
736               type);
737 #endif
738   GNUNET_STATISTICS_update (stats,
739                             gettext_noop ("# results found"),
740                             1,
741                             GNUNET_NO);
742   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
743   return GNUNET_OK;
744 }
745
746
747 /**
748  * Handle RESERVE-message.
749  *
750  * @param cls closure
751  * @param client identification of the client
752  * @param message the actual message
753  */
754 static void
755 handle_reserve (void *cls,
756                 struct GNUNET_SERVER_Client *client,
757                 const struct GNUNET_MessageHeader *message)
758 {
759   /**
760    * Static counter to produce reservation identifiers.
761    */
762   static int reservation_gen;
763
764   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
765   struct ReservationList *e;
766   unsigned long long used;
767   unsigned long long req;
768   uint64_t amount;
769   uint32_t entries;
770
771 #if DEBUG_DATASTORE
772   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
773               "Processing `%s' request\n",
774               "RESERVE");
775 #endif
776   amount = GNUNET_ntohll(msg->amount);
777   entries = ntohl(msg->entries);
778   used = payload + reserved;
779   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
780   if (used + req > quota)
781     {
782       if (quota < used)
783         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
784       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
785                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
786                   quota - used,
787                   "RESERVE",
788                   req);
789       if (cache_size < req)
790         {
791           /* TODO: document this in the FAQ; essentially, if this
792              message happens, the insertion request could be blocked
793              by less-important content from migration because it is
794              larger than 1/8th of the overall available space, and
795              we only reserve 1/8th for "fresh" insertions */
796           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
797                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
798                       req,
799                       cache_size);
800           transmit_status (client, 0, 
801                            gettext_noop ("Insufficient space to satisfy request and "
802                                          "requested amount is larger than cache size"));
803         }
804       else
805         {
806           transmit_status (client, 0, 
807                            gettext_noop ("Insufficient space to satisfy request"));
808         }
809       return;      
810     }
811   reserved += req;
812   GNUNET_STATISTICS_set (stats,
813                          gettext_noop ("# reserved"),
814                          reserved,
815                          GNUNET_NO);
816   e = GNUNET_malloc (sizeof(struct ReservationList));
817   e->next = reservations;
818   reservations = e;
819   e->client = client;
820   e->amount = amount;
821   e->entries = entries;
822   e->rid = ++reservation_gen;
823   if (reservation_gen < 0)
824     reservation_gen = 0; /* wrap around */
825   transmit_status (client, e->rid, NULL);
826 }
827
828
829 /**
830  * Handle RELEASE_RESERVE-message.
831  *
832  * @param cls closure
833  * @param client identification of the client
834  * @param message the actual message
835  */
836 static void
837 handle_release_reserve (void *cls,
838                         struct GNUNET_SERVER_Client *client,
839                         const struct GNUNET_MessageHeader *message)
840 {
841   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
842   struct ReservationList *pos;
843   struct ReservationList *prev;
844   struct ReservationList *next;
845   int rid = ntohl(msg->rid);
846   unsigned long long rem;
847
848 #if DEBUG_DATASTORE
849   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850               "Processing `%s' request\n",
851               "RELEASE_RESERVE");
852 #endif
853   next = reservations;
854   prev = NULL;
855   while (NULL != (pos = next))
856     {
857       next = pos->next;
858       if (rid == pos->rid)
859         {
860           if (prev == NULL)
861             reservations = next;
862           else
863             prev->next = next;
864           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
865           GNUNET_assert (reserved >= rem);
866           reserved -= rem;
867           GNUNET_STATISTICS_set (stats,
868                          gettext_noop ("# reserved"),
869                                  reserved,
870                                  GNUNET_NO);
871 #if DEBUG_DATASTORE
872           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873                       "Returning %llu remaining reserved bytes to storage pool\n",
874                       rem);
875 #endif    
876           GNUNET_free (pos);
877           transmit_status (client, GNUNET_OK, NULL);
878           return;
879         }       
880       prev = pos;
881     }
882   GNUNET_break (0);
883   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
884 }
885
886
887 /**
888  * Check that the given message is a valid data message.
889  *
890  * @return NULL if the message is not well-formed, otherwise the message
891  */
892 static const struct DataMessage *
893 check_data (const struct GNUNET_MessageHeader *message)
894 {
895   uint16_t size;
896   uint32_t dsize;
897   const struct DataMessage *dm;
898
899   size = ntohs(message->size);
900   if (size < sizeof(struct DataMessage))
901     { 
902       GNUNET_break (0);
903       return NULL;
904     }
905   dm = (const struct DataMessage *) message;
906   dsize = ntohl(dm->size);
907   if (size != dsize + sizeof(struct DataMessage))
908     {
909       GNUNET_break (0);
910       return NULL;
911     }
912   return dm;
913 }
914
915
916 /**
917  * Context for a put request used to see if the content is
918  * already present.
919  */
920 struct PutContext
921 {
922   /**
923    * Client to notify on completion.
924    */
925   struct GNUNET_SERVER_Client *client;
926
927   /**
928    * Did we find the data already in the database?
929    */
930   int is_present;
931   
932   /* followed by the 'struct DataMessage' */
933 };
934
935
936 /**
937  * Actually put the data message.
938  */
939 static void
940 execute_put (struct GNUNET_SERVER_Client *client,
941              const struct DataMessage *dm)
942 {
943   uint32_t size;
944   char *msg;
945   int ret;
946
947   size = ntohl(dm->size);
948   msg = NULL;
949   ret = plugin->api->put (plugin->api->cls,
950                           &dm->key,
951                           size,
952                           &dm[1],
953                           ntohl(dm->type),
954                           ntohl(dm->priority),
955                           ntohl(dm->anonymity),
956                           GNUNET_TIME_absolute_ntoh(dm->expiration),
957                           &msg);
958   if (GNUNET_OK == ret)
959     {
960       GNUNET_STATISTICS_update (stats,
961                                 gettext_noop ("# bytes stored"),
962                                 size,
963                                 GNUNET_YES);
964       GNUNET_CONTAINER_bloomfilter_add (filter,
965                                         &dm->key);
966 #if DEBUG_DATASTORE
967       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
968                   "Successfully stored %u bytes of type %u under key `%s'\n",
969                   size,
970                   ntohl(dm->type),
971                   GNUNET_h2s (&dm->key));
972 #endif
973     }
974   transmit_status (client, 
975                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
976                    msg);
977   GNUNET_free_non_null (msg);
978   if (quota - reserved - cache_size < payload)
979     {
980       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
981                   _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
982                   (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
983                   (unsigned long long) (quota - reserved - cache_size),
984                   (unsigned long long) payload);
985       manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
986     }
987 }
988
989
990
991 /**
992  * Function that will check if the given datastore entry
993  * matches the put and if none match executes the put.
994  *
995  * @param cls closure, pointer to the client (of type 'struct PutContext').
996  * @param next_cls closure to use to ask for the next item
997  * @param key key for the content
998  * @param size number of bytes in data
999  * @param data content stored
1000  * @param type type of the content
1001  * @param priority priority of the content
1002  * @param anonymity anonymity-level for the content
1003  * @param expiration expiration time for the content
1004  * @param uid unique identifier for the datum;
1005  *        maybe 0 if no unique identifier is available
1006  *
1007  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
1008  *         GNUNET_NO to delete the item and continue (if supported)
1009  */
1010 static int
1011 check_present (void *cls,
1012                void *next_cls,
1013                const GNUNET_HashCode * key,
1014                uint32_t size,
1015                const void *data,
1016                enum GNUNET_BLOCK_Type type,
1017                uint32_t priority,
1018                uint32_t anonymity,
1019                struct GNUNET_TIME_Absolute
1020                expiration, uint64_t uid)
1021 {
1022   struct PutContext *pc = cls;
1023   const struct DataMessage *dm;
1024
1025   dm = (const struct DataMessage*) &pc[1];
1026   if (key == NULL)
1027     {
1028       if (pc->is_present == GNUNET_YES) 
1029         transmit_status (pc->client, GNUNET_OK, NULL);
1030       else
1031         execute_put (pc->client, dm);
1032       GNUNET_SERVER_client_drop (pc->client);
1033       GNUNET_free (pc);
1034       return GNUNET_SYSERR;
1035     }
1036   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
1037        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
1038        ( (size == ntohl(dm->size)) &&
1039          (0 == memcmp (&dm[1],
1040                        data,
1041                        size)) ) )
1042     {
1043       pc->is_present = GNUNET_YES;
1044       plugin->api->next_request (next_cls, GNUNET_YES);
1045     }
1046   else
1047     {
1048       plugin->api->next_request (next_cls, GNUNET_NO);
1049     }
1050   return GNUNET_OK;
1051 }
1052
1053
1054 /**
1055  * Handle PUT-message.
1056  *
1057  * @param cls closure
1058  * @param client identification of the client
1059  * @param message the actual message
1060  */
1061 static void
1062 handle_put (void *cls,
1063             struct GNUNET_SERVER_Client *client,
1064             const struct GNUNET_MessageHeader *message)
1065 {
1066   const struct DataMessage *dm = check_data (message);
1067   int rid;
1068   struct ReservationList *pos;
1069   struct PutContext *pc;
1070   uint32_t size;
1071
1072   if ( (dm == NULL) ||
1073        (ntohl(dm->type) == 0) ) 
1074     {
1075       GNUNET_break (0);
1076       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1077       return;
1078     }
1079 #if DEBUG_DATASTORE
1080   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1081               "Processing `%s' request for `%s' of type %u\n",
1082               "PUT",
1083               GNUNET_h2s (&dm->key),
1084               ntohl (dm->type));
1085 #endif
1086   rid = ntohl(dm->rid);
1087   size = ntohl(dm->size);
1088   if (rid > 0)
1089     {
1090       pos = reservations;
1091       while ( (NULL != pos) &&
1092               (rid != pos->rid) )
1093         pos = pos->next;
1094       GNUNET_break (pos != NULL);
1095       if (NULL != pos)
1096         {
1097           GNUNET_break (pos->entries > 0);
1098           GNUNET_break (pos->amount >= size);
1099           pos->entries--;
1100           pos->amount -= size;
1101           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1102           GNUNET_STATISTICS_set (stats,
1103                                  gettext_noop ("# reserved"),
1104                                  reserved,
1105                                  GNUNET_NO);
1106         }
1107     }
1108   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1109                                                        &dm->key))
1110     {
1111       pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1112       pc->client = client;
1113       GNUNET_SERVER_client_keep (client);
1114       memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1115       plugin->api->get (plugin->api->cls,
1116                         &dm->key,
1117                         NULL,
1118                         ntohl (dm->type),
1119                         &check_present,
1120                         pc);      
1121       return;
1122     }
1123   execute_put (client, dm);
1124 }
1125
1126
1127 /**
1128  * Handle GET-message.
1129  *
1130  * @param cls closure
1131  * @param client identification of the client
1132  * @param message the actual message
1133  */
1134 static void
1135 handle_get (void *cls,
1136             struct GNUNET_SERVER_Client *client,
1137             const struct GNUNET_MessageHeader *message)
1138 {
1139   const struct GetMessage *msg;
1140   uint16_t size;
1141
1142   size = ntohs(message->size);
1143   if ( (size != sizeof(struct GetMessage)) &&
1144        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
1145     {
1146       GNUNET_break (0);
1147       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1148       return;
1149     }
1150   msg = (const struct GetMessage*) message;
1151 #if DEBUG_DATASTORE
1152   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1153               "Processing `%s' request for `%s' of type %u\n",
1154               "GET",
1155               GNUNET_h2s (&msg->key),
1156               ntohl (msg->type));
1157 #endif
1158   GNUNET_STATISTICS_update (stats,
1159                             gettext_noop ("# GET requests received"),
1160                             1,
1161                             GNUNET_NO);
1162   GNUNET_SERVER_client_keep (client);
1163   if ( (size == sizeof(struct GetMessage)) &&
1164        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
1165                                                          &msg->key)) )
1166     {
1167       /* don't bother database... */
1168 #if DEBUG_DATASTORE
1169       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1170                   "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1171                   "GET",
1172                   GNUNET_h2s (&msg->key));
1173 #endif  
1174       GNUNET_STATISTICS_update (stats,
1175                                 gettext_noop ("# requests filtered by bloomfilter"),
1176                                 1,
1177                                 GNUNET_NO);
1178       transmit_item (client,
1179                      NULL, NULL, 0, NULL, 0, 0, 0, 
1180                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
1181       return;
1182     }
1183   plugin->api->get (plugin->api->cls,
1184                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1185                     NULL,
1186                     ntohl(msg->type),
1187                     &transmit_item,
1188                     client);    
1189 }
1190
1191
1192 /**
1193  * Handle UPDATE-message.
1194  *
1195  * @param cls closure
1196  * @param client identification of the client
1197  * @param message the actual message
1198  */
1199 static void
1200 handle_update (void *cls,
1201                struct GNUNET_SERVER_Client *client,
1202                const struct GNUNET_MessageHeader *message)
1203 {
1204   const struct UpdateMessage *msg;
1205   int ret;
1206   char *emsg;
1207
1208   GNUNET_STATISTICS_update (stats,
1209                             gettext_noop ("# UPDATE requests received"),
1210                             1,
1211                             GNUNET_NO);
1212   msg = (const struct UpdateMessage*) message;
1213   emsg = NULL;
1214 #if DEBUG_DATASTORE
1215   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1216               "Processing `%s' request for %llu\n",
1217               "UPDATE",
1218               (unsigned long long) GNUNET_ntohll (msg->uid));
1219 #endif
1220   ret = plugin->api->update (plugin->api->cls,
1221                              GNUNET_ntohll(msg->uid),
1222                              (int32_t) ntohl(msg->priority),
1223                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1224                              &emsg);
1225   transmit_status (client, ret, emsg);
1226   GNUNET_free_non_null (emsg);
1227 }
1228
1229
1230 /**
1231  * Handle GET_RANDOM-message.
1232  *
1233  * @param cls closure
1234  * @param client identification of the client
1235  * @param message the actual message
1236  */
1237 static void
1238 handle_get_random (void *cls,
1239                    struct GNUNET_SERVER_Client *client,
1240                    const struct GNUNET_MessageHeader *message)
1241 {
1242 #if DEBUG_DATASTORE
1243   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1244               "Processing `%s' request\n",
1245               "GET_RANDOM");
1246 #endif
1247   GNUNET_STATISTICS_update (stats,
1248                             gettext_noop ("# GET RANDOM requests received"),
1249                             1,
1250                             GNUNET_NO);
1251   GNUNET_SERVER_client_keep (client);
1252   plugin->api->iter_migration_order (plugin->api->cls,
1253                                      GNUNET_BLOCK_TYPE_ANY,
1254                                      &transmit_item,
1255                                      client);  
1256 }
1257
1258 /**
1259  * Handle GET_ZERO_ANONYMITY-message.
1260  *
1261  * @param cls closure
1262  * @param client identification of the client
1263  * @param message the actual message
1264  */
1265 static void
1266 handle_get_zero_anonymity (void *cls,
1267                            struct GNUNET_SERVER_Client *client,
1268                            const struct GNUNET_MessageHeader *message)
1269 {
1270   const struct GetZeroAnonymityMessage * msg = (const struct GetZeroAnonymityMessage*) message;
1271   enum GNUNET_BLOCK_Type type;
1272
1273   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1274 #if DEBUG_DATASTORE
1275   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1276               "Processing `%s' request\n",
1277               "GET_ZERO_ANONYMITY");
1278 #endif
1279   GNUNET_STATISTICS_update (stats,
1280                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1281                             1,
1282                             GNUNET_NO);
1283   GNUNET_SERVER_client_keep (client);
1284   plugin->api->iter_zero_anonymity (plugin->api->cls,
1285                                     type,
1286                                     &transmit_item,
1287                                     client);  
1288 }
1289
1290
1291 /**
1292  * Context for the 'remove_callback'.
1293  */
1294 struct RemoveContext 
1295 {
1296   /**
1297    * Client for whom we're doing the remvoing.
1298    */
1299   struct GNUNET_SERVER_Client *client;
1300
1301   /**
1302    * GNUNET_YES if we managed to remove something.
1303    */
1304   int found;
1305 };
1306
1307
1308 /**
1309  * Callback function that will cause the item that is passed
1310  * in to be deleted (by returning GNUNET_NO).
1311  */
1312 static int
1313 remove_callback (void *cls,
1314                  void *next_cls,
1315                  const GNUNET_HashCode * key,
1316                  uint32_t size,
1317                  const void *data,
1318                  enum GNUNET_BLOCK_Type type,
1319                  uint32_t priority,
1320                  uint32_t anonymity,
1321                  struct GNUNET_TIME_Absolute
1322                  expiration, uint64_t uid)
1323 {
1324   struct RemoveContext *rc = cls;
1325
1326   if (key == NULL)
1327     {
1328 #if DEBUG_DATASTORE
1329       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1330                   "No further matches for `%s' request.\n",
1331                   "REMOVE");
1332 #endif  
1333       if (GNUNET_YES == rc->found)
1334         transmit_status (rc->client, GNUNET_OK, NULL);       
1335       else
1336         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1337       GNUNET_SERVER_client_drop (rc->client);
1338       GNUNET_free (rc);
1339       return GNUNET_OK; /* last item */
1340     }
1341   rc->found = GNUNET_YES;
1342 #if DEBUG_DATASTORE
1343   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1344               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1345               (unsigned long long) uid,
1346               "REMOVE",
1347               GNUNET_h2s (key),
1348               type);
1349 #endif  
1350   GNUNET_STATISTICS_update (stats,
1351                             gettext_noop ("# bytes removed (explicit request)"),
1352                             size,
1353                             GNUNET_YES);
1354   GNUNET_CONTAINER_bloomfilter_remove (filter,
1355                                        key);
1356   plugin->api->next_request (next_cls, GNUNET_YES);
1357   return GNUNET_NO;
1358 }
1359
1360
1361 /**
1362  * Handle REMOVE-message.
1363  *
1364  * @param cls closure
1365  * @param client identification of the client
1366  * @param message the actual message
1367  */
1368 static void
1369 handle_remove (void *cls,
1370              struct GNUNET_SERVER_Client *client,
1371              const struct GNUNET_MessageHeader *message)
1372 {
1373   const struct DataMessage *dm = check_data (message);
1374   GNUNET_HashCode vhash;
1375   struct RemoveContext *rc;
1376
1377   if (dm == NULL)
1378     {
1379       GNUNET_break (0);
1380       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1381       return;
1382     }
1383 #if DEBUG_DATASTORE
1384   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1385               "Processing `%s' request for `%s' of type %u\n",
1386               "REMOVE",
1387               GNUNET_h2s (&dm->key),
1388               ntohl (dm->type));
1389 #endif
1390   GNUNET_STATISTICS_update (stats,
1391                             gettext_noop ("# REMOVE requests received"),
1392                             1,
1393                             GNUNET_NO);
1394   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1395   GNUNET_SERVER_client_keep (client);
1396   rc->client = client;
1397   GNUNET_CRYPTO_hash (&dm[1],
1398                       ntohl(dm->size),
1399                       &vhash);
1400   plugin->api->get (plugin->api->cls,
1401                     &dm->key,
1402                     &vhash,
1403                     (enum GNUNET_BLOCK_Type) ntohl(dm->type),
1404                     &remove_callback,
1405                     rc);
1406 }
1407
1408
1409 /**
1410  * Handle DROP-message.
1411  *
1412  * @param cls closure
1413  * @param client identification of the client
1414  * @param message the actual message
1415  */
1416 static void
1417 handle_drop (void *cls,
1418              struct GNUNET_SERVER_Client *client,
1419              const struct GNUNET_MessageHeader *message)
1420 {
1421 #if DEBUG_DATASTORE
1422   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1423               "Processing `%s' request\n",
1424               "DROP");
1425 #endif
1426   plugin->api->drop (plugin->api->cls);
1427   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1428 }
1429
1430
1431 /**
1432  * Function called by plugins to notify us about a
1433  * change in their disk utilization.
1434  *
1435  * @param cls closure (NULL)
1436  * @param delta change in disk utilization, 
1437  *        0 for "reset to empty"
1438  */
1439 static void
1440 disk_utilization_change_cb (void *cls,
1441                             int delta)
1442 {
1443   if ( (delta < 0) &&
1444        (payload < -delta) )
1445     {
1446       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1447                   _("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1448                   (long long) payload,
1449                   (long long) -delta);
1450       payload = plugin->api->get_size (plugin->api->cls);
1451       sync_stats ();
1452       return;
1453     }
1454   payload += delta;
1455   lastSync++;
1456   if (lastSync >= MAX_STAT_SYNC_LAG)
1457     sync_stats ();
1458 }
1459
1460
1461 /**
1462  * Callback function to process statistic values.
1463  *
1464  * @param cls closure (struct Plugin*)
1465  * @param subsystem name of subsystem that created the statistic
1466  * @param name the name of the datum
1467  * @param value the current value
1468  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1469  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1470  */
1471 static int
1472 process_stat_in (void *cls,
1473                  const char *subsystem,
1474                  const char *name,
1475                  uint64_t value,
1476                  int is_persistent)
1477 {
1478   GNUNET_assert (stats_worked == GNUNET_NO);
1479   stats_worked = GNUNET_YES;
1480   payload += value;
1481 #if DEBUG_SQLITE
1482   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1483               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1484               abs_value,
1485               payload);
1486 #endif
1487   return GNUNET_OK;
1488 }
1489
1490
1491 static void
1492 process_stat_done (void *cls,
1493                    int success)
1494 {
1495   struct DatastorePlugin *plugin = cls;
1496
1497   stat_get = NULL;
1498   if (stats_worked == GNUNET_NO) 
1499     payload = plugin->api->get_size (plugin->api->cls);
1500 }
1501
1502
1503 /**
1504  * Load the datastore plugin.
1505  */
1506 static struct DatastorePlugin *
1507 load_plugin () 
1508 {
1509   struct DatastorePlugin *ret;
1510   char *libname;
1511   char *name;
1512
1513   if (GNUNET_OK !=
1514       GNUNET_CONFIGURATION_get_value_string (cfg,
1515                                              "DATASTORE", "DATABASE", &name))
1516     {
1517       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1518                   _("No `%s' specified for `%s' in configuration!\n"),
1519                   "DATABASE",
1520                   "DATASTORE");
1521       return NULL;
1522     }
1523   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1524   ret->env.cfg = cfg;
1525   ret->env.duc = &disk_utilization_change_cb;
1526   ret->env.cls = NULL;
1527   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1528               _("Loading `%s' datastore plugin\n"), name);
1529   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1530   ret->short_name = name;
1531   ret->lib_name = libname;
1532   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1533   if (ret->api == NULL)
1534     {
1535       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1536                   _("Failed to load datastore plugin for `%s'\n"), name);
1537       GNUNET_free (ret->short_name);
1538       GNUNET_free (libname);
1539       GNUNET_free (ret);
1540       return NULL;
1541     }
1542   return ret;
1543 }
1544
1545
1546 /**
1547  * Function called when the service shuts
1548  * down.  Unloads our datastore plugin.
1549  *
1550  * @param plug plugin to unload
1551  */
1552 static void
1553 unload_plugin (struct DatastorePlugin *plug)
1554 {
1555 #if DEBUG_DATASTORE
1556   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1557               "Datastore service is unloading plugin...\n");
1558 #endif
1559   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1560   GNUNET_free (plug->lib_name);
1561   GNUNET_free (plug->short_name);
1562   GNUNET_free (plug);
1563 }
1564
1565
1566 /**
1567  * Final task run after shutdown.  Unloads plugins and disconnects us from
1568  * statistics.
1569  */
1570 static void
1571 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1572 {
1573   unload_plugin (plugin);
1574   plugin = NULL;
1575   if (filter != NULL)
1576     {
1577       GNUNET_CONTAINER_bloomfilter_free (filter);
1578       filter = NULL;
1579     }
1580   if (lastSync > 0)
1581     sync_stats ();
1582   if (stat_get != NULL)
1583     {
1584       GNUNET_STATISTICS_get_cancel (stat_get);
1585       stat_get = NULL;
1586     }
1587   if (stats != NULL)
1588     {
1589       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1590       stats = NULL;
1591     }
1592 }
1593
1594
1595 /**
1596  * Last task run during shutdown.  Disconnects us from
1597  * the transport and core.
1598  */
1599 static void
1600 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1601 {
1602   struct TransmitCallbackContext *tcc;
1603
1604   cleaning_done = GNUNET_YES;
1605   while (NULL != (tcc = tcc_head))
1606     {
1607       GNUNET_CONTAINER_DLL_remove (tcc_head,
1608                                    tcc_tail,
1609                                    tcc);
1610       if (tcc->th != NULL)
1611         {
1612           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1613           GNUNET_SERVER_client_drop (tcc->client);
1614         }
1615       if (NULL != tcc->tc)
1616         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1617       GNUNET_free (tcc->msg);
1618       GNUNET_free (tcc);
1619     }
1620   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1621     {
1622       GNUNET_SCHEDULER_cancel (expired_kill_task);
1623       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1624     }
1625   GNUNET_SCHEDULER_add_continuation (&unload_task,
1626                                      NULL,
1627                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1628 }
1629
1630
1631 /**
1632  * Function that removes all active reservations made
1633  * by the given client and releases the space for other
1634  * requests.
1635  *
1636  * @param cls closure
1637  * @param client identification of the client
1638  */
1639 static void
1640 cleanup_reservations (void *cls,
1641                       struct GNUNET_SERVER_Client
1642                       * client)
1643 {
1644   struct ReservationList *pos;
1645   struct ReservationList *prev;
1646   struct ReservationList *next;
1647
1648   if (client == NULL)
1649     return;
1650   prev = NULL;
1651   pos = reservations;
1652   while (NULL != pos)
1653     {
1654       next = pos->next;
1655       if (pos->client == client)
1656         {
1657           if (prev == NULL)
1658             reservations = next;
1659           else
1660             prev->next = next;
1661           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1662           GNUNET_free (pos);
1663         }
1664       else
1665         {
1666           prev = pos;
1667         }
1668       pos = next;
1669     }
1670   GNUNET_STATISTICS_set (stats,
1671                          gettext_noop ("# reserved"),
1672                          reserved,
1673                          GNUNET_NO);
1674 }
1675
1676
1677 /**
1678  * Process datastore requests.
1679  *
1680  * @param cls closure
1681  * @param server the initialized server
1682  * @param c configuration to use
1683  */
1684 static void
1685 run (void *cls,
1686      struct GNUNET_SERVER_Handle *server,
1687      const struct GNUNET_CONFIGURATION_Handle *c)
1688 {
1689   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1690     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1691      sizeof(struct ReserveMessage) }, 
1692     {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1693      sizeof(struct ReleaseReserveMessage) }, 
1694     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1695     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1696      sizeof (struct UpdateMessage) }, 
1697     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1698     {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1699      sizeof(struct GNUNET_MessageHeader) }, 
1700     {&handle_get_zero_anonymity, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, 
1701      sizeof(struct GetZeroAnonymityMessage) }, 
1702     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1703     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1704      sizeof(struct GNUNET_MessageHeader) }, 
1705     {NULL, NULL, 0, 0}
1706   };
1707   char *fn;
1708   unsigned int bf_size;
1709
1710   cfg = c;
1711   if (GNUNET_OK !=
1712       GNUNET_CONFIGURATION_get_value_number (cfg,
1713                                              "DATASTORE", "QUOTA", &quota))
1714     {
1715       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1716                   _("No `%s' specified for `%s' in configuration!\n"),
1717                   "QUOTA",
1718                   "DATASTORE");
1719       return;
1720     }
1721   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1722   GNUNET_STATISTICS_set (stats,
1723                          gettext_noop ("# quota"),
1724                          quota,
1725                          GNUNET_NO);
1726   cache_size = quota / 8; /* Or should we make this an option? */
1727   GNUNET_STATISTICS_set (stats,
1728                          gettext_noop ("# cache size"),
1729                          cache_size,
1730                          GNUNET_NO);
1731   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1732   fn = NULL;
1733   if ( (GNUNET_OK !=
1734         GNUNET_CONFIGURATION_get_value_filename (cfg,
1735                                                  "DATASTORE",
1736                                                  "BLOOMFILTER",
1737                                                  &fn)) ||
1738        (GNUNET_OK !=
1739         GNUNET_DISK_directory_create_for_file (fn)) )
1740     {
1741       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1742                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1743                   fn != NULL ? fn : "");
1744       GNUNET_free_non_null (fn);
1745       fn = NULL;
1746     }
1747   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1748   GNUNET_free_non_null (fn);
1749   if (filter == NULL)
1750     {
1751       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1752                   _("Failed to initialize bloomfilter.\n"));
1753       if (stats != NULL)
1754         {
1755           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1756           stats = NULL;
1757         }
1758       return;
1759     }
1760   plugin = load_plugin ();
1761   if (NULL == plugin)
1762     {
1763       GNUNET_CONTAINER_bloomfilter_free (filter);
1764       filter = NULL;
1765       if (stats != NULL)
1766         {
1767           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1768           stats = NULL;
1769         }
1770       return;
1771     }
1772   stat_get = GNUNET_STATISTICS_get (stats,
1773                                     "datastore",
1774                                     QUOTA_STAT_NAME,
1775                                     GNUNET_TIME_UNIT_SECONDS,
1776                                     &process_stat_done,
1777                                     &process_stat_in,
1778                                     plugin);
1779   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1780   GNUNET_SERVER_add_handlers (server, handlers);
1781   expired_kill_task
1782     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1783                                           &delete_expired, NULL);
1784   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1785                                 &cleaning_task, NULL);
1786 }
1787
1788
1789 /**
1790  * The main function for the datastore service.
1791  *
1792  * @param argc number of arguments from the command line
1793  * @param argv command line arguments
1794  * @return 0 ok, 1 on error
1795  */
1796 int
1797 main (int argc, char *const *argv)
1798 {
1799   int ret;
1800
1801   ret = (GNUNET_OK ==
1802          GNUNET_SERVICE_run (argc,
1803                              argv,
1804                              "datastore",
1805                              GNUNET_SERVICE_OPTION_NONE,
1806                              &run, NULL)) ? 0 : 1;
1807   return ret;
1808 }
1809
1810
1811 /* end of gnunet-service-datastore.c */