first hack at implementing new replication select code
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
46
47 /**
48  * After how many payload-changing operations
49  * do we sync our statistics?
50  */
51 #define MAX_STAT_SYNC_LAG 50
52
53
54 /**
55  * Our datastore plugin.
56  */
57 struct DatastorePlugin
58 {
59
60   /**
61    * API of the transport as returned by the plugin's
62    * initialization function.
63    */
64   struct GNUNET_DATASTORE_PluginFunctions *api;
65
66   /**
67    * Short name for the plugin (i.e. "sqlite").
68    */
69   char *short_name;
70
71   /**
72    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
73    */
74   char *lib_name;
75
76   /**
77    * Environment this transport service is using
78    * for this plugin.
79    */
80   struct GNUNET_DATASTORE_PluginEnvironment env;
81
82 };
83
84
85 /**
86  * Linked list of active reservations.
87  */
88 struct ReservationList 
89 {
90
91   /**
92    * This is a linked list.
93    */
94   struct ReservationList *next;
95
96   /**
97    * Client that made the reservation.
98    */
99   struct GNUNET_SERVER_Client *client;
100
101   /**
102    * Number of bytes (still) reserved.
103    */
104   uint64_t amount;
105
106   /**
107    * Number of items (still) reserved.
108    */
109   uint64_t entries;
110
111   /**
112    * Reservation identifier.
113    */
114   int32_t rid;
115
116 };
117
118
119
120 /**
121  * Our datastore plugin (NULL if not available).
122  */
123 static struct DatastorePlugin *plugin;
124
125 /**
126  * Linked list of space reservations made by clients.
127  */
128 static struct ReservationList *reservations;
129
130 /**
131  * Bloomfilter to quickly tell if we don't have the content.
132  */
133 static struct GNUNET_CONTAINER_BloomFilter *filter;
134
135 /**
136  * How much space are we allowed to use?
137  */
138 static unsigned long long quota;
139
140 /**
141  * How much space are we using for the cache?  (space available for
142  * insertions that will be instantly reclaimed by discarding less
143  * important content --- or possibly whatever we just inserted into
144  * the "cache").
145  */
146 static unsigned long long cache_size;
147
148 /**
149  * How much space have we currently reserved?
150  */
151 static unsigned long long reserved;
152   
153 /**
154  * How much data are we currently storing
155  * in the database?
156  */
157 static unsigned long long payload;
158
159 /**
160  * Number of updates that were made to the
161  * payload value since we last synchronized
162  * it with the statistics service.
163  */
164 static unsigned int lastSync;
165
166 /**
167  * Did we get an answer from statistics?
168  */
169 static int stats_worked;
170
171 /**
172  * Identity of the task that is used to delete
173  * expired content.
174  */
175 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
176
177 /**
178  * Our configuration.
179  */
180 const struct GNUNET_CONFIGURATION_Handle *cfg;
181
182
183 /**
184  * Handle for reporting statistics.
185  */
186 static struct GNUNET_STATISTICS_Handle *stats;
187
188
189 /**
190  * Synchronize our utilization statistics with the 
191  * statistics service.
192  */
193 static void 
194 sync_stats ()
195 {
196   GNUNET_STATISTICS_set (stats,
197                          QUOTA_STAT_NAME,
198                          payload,
199                          GNUNET_YES);
200   lastSync = 0;
201 }
202
203
204
205
206 /**
207  * Function called once the transmit operation has
208  * either failed or succeeded.
209  *
210  * @param cls closure
211  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
212  */
213 typedef void (*TransmitContinuation)(void *cls,
214                                      int status);
215
216
217 /**
218  * Context for transmitting replies to clients.
219  */
220 struct TransmitCallbackContext 
221 {
222   
223   /**
224    * We keep these in a doubly-linked list (for cleanup).
225    */
226   struct TransmitCallbackContext *next;
227   
228   /**
229    * We keep these in a doubly-linked list (for cleanup).
230    */
231   struct TransmitCallbackContext *prev;
232   
233   /**
234    * The message that we're asked to transmit.
235    */
236   struct GNUNET_MessageHeader *msg;
237   
238   /**
239    * Handle for the transmission request.
240    */
241   struct GNUNET_CONNECTION_TransmitHandle *th;
242
243   /**
244    * Client that we are transmitting to.
245    */
246   struct GNUNET_SERVER_Client *client;
247
248   /**
249    * Function to call once msg has been transmitted
250    * (or at least added to the buffer).
251    */
252   TransmitContinuation tc;
253
254   /**
255    * Closure for tc.
256    */
257   void *tc_cls;
258
259   /**
260    * GNUNET_YES if we are supposed to signal the server
261    * completion of the client's request.
262    */
263   int end;
264 };
265
266   
267 /**
268  * Head of the doubly-linked list (for cleanup).
269  */
270 static struct TransmitCallbackContext *tcc_head;
271
272 /**
273  * Tail of the doubly-linked list (for cleanup).
274  */
275 static struct TransmitCallbackContext *tcc_tail;
276
277 /**
278  * Have we already cleaned up the TCCs and are hence no longer
279  * willing (or able) to transmit anything to anyone?
280  */
281 static int cleaning_done;
282
283 /**
284  * Handle for pending get request.
285  */
286 static struct GNUNET_STATISTICS_GetHandle *stat_get;
287
288
289 /**
290  * Task that is used to remove expired entries from
291  * the datastore.  This task will schedule itself
292  * again automatically to always delete all expired
293  * content quickly.
294  *
295  * @param cls not used
296  * @param tc task context
297  */ 
298 static void
299 delete_expired (void *cls,
300                 const struct GNUNET_SCHEDULER_TaskContext *tc);
301
302
303 /**
304  * Iterate over the expired items stored in the datastore.
305  * Delete all expired items; once we have processed all
306  * expired items, re-schedule the "delete_expired" task.
307  *
308  * @param cls not used
309  * @param next_cls closure to pass to the "next" function.
310  * @param key key for the content
311  * @param size number of bytes in data
312  * @param data content stored
313  * @param type type of the content
314  * @param priority priority of the content
315  * @param anonymity anonymity-level for the content
316  * @param expiration expiration time for the content
317  * @param uid unique identifier for the datum;
318  *        maybe 0 if no unique identifier is available
319  *
320  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
321  *         (continue on call to "next", of course),
322  *         GNUNET_NO to delete the item and continue (if supported)
323  */
324 static int 
325 expired_processor (void *cls,
326                    void *next_cls,
327                    const GNUNET_HashCode * key,
328                    uint32_t size,
329                    const void *data,
330                    enum GNUNET_BLOCK_Type type,
331                    uint32_t priority,
332                    uint32_t anonymity,
333                    struct GNUNET_TIME_Absolute
334                    expiration, 
335                    uint64_t uid)
336 {
337   struct GNUNET_TIME_Absolute now;
338
339   if (key == NULL) 
340     {
341       expired_kill_task 
342         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
343                                         &delete_expired,
344                                         NULL);
345       return GNUNET_SYSERR;
346     }
347   now = GNUNET_TIME_absolute_get ();
348   if (expiration.abs_value > now.abs_value)
349     {
350       /* finished processing */
351       plugin->api->next_request (next_cls, GNUNET_YES);
352       return GNUNET_SYSERR;
353     }
354   plugin->api->next_request (next_cls, GNUNET_NO);
355 #if DEBUG_DATASTORE
356   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
357               "Deleting content `%s' of type %u that expired %llu ms ago\n",
358               GNUNET_h2s (key),
359               type,
360               (unsigned long long) (now.abs_value - expiration.abs_value));
361 #endif
362   GNUNET_STATISTICS_update (stats,
363                             gettext_noop ("# bytes expired"),
364                             size,
365                             GNUNET_YES);
366   GNUNET_CONTAINER_bloomfilter_remove (filter,
367                                        key);
368   return GNUNET_NO; /* delete */
369 }
370
371
372 /**
373  * Task that is used to remove expired entries from
374  * the datastore.  This task will schedule itself
375  * again automatically to always delete all expired
376  * content quickly.
377  *
378  * @param cls not used
379  * @param tc task context
380  */ 
381 static void
382 delete_expired (void *cls,
383                 const struct GNUNET_SCHEDULER_TaskContext *tc)
384 {
385   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
386   plugin->api->iter_ascending_expiration (plugin->api->cls, 
387                                           0,
388                                           &expired_processor,
389                                           NULL);
390 }
391
392
393 /**
394  * An iterator over a set of items stored in the datastore.
395  *
396  * @param cls closure
397  * @param next_cls closure to pass to the "next" function.
398  * @param key key for the content
399  * @param size number of bytes in data
400  * @param data content stored
401  * @param type type of the content
402  * @param priority priority of the content
403  * @param anonymity anonymity-level for the content
404  * @param expiration expiration time for the content
405  * @param uid unique identifier for the datum;
406  *        maybe 0 if no unique identifier is available
407  *
408  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
409  *         (continue on call to "next", of course),
410  *         GNUNET_NO to delete the item and continue (if supported)
411  */
412 static int 
413 manage (void *cls,
414         void *next_cls,
415         const GNUNET_HashCode * key,
416         uint32_t size,
417         const void *data,
418         enum GNUNET_BLOCK_Type type,
419         uint32_t priority,
420         uint32_t anonymity,
421         struct GNUNET_TIME_Absolute
422         expiration, 
423         uint64_t uid)
424 {
425   unsigned long long *need = cls;
426
427   if (NULL == key)
428     {
429       GNUNET_free (need);
430       return GNUNET_SYSERR;
431     }
432   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
433     *need = 0;
434   else
435     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
436   plugin->api->next_request (next_cls, 
437                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
438 #if DEBUG_DATASTORE
439   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
440               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
441               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
442               GNUNET_h2s (key),
443               type,
444               *need);
445 #endif
446   GNUNET_STATISTICS_update (stats,
447                             gettext_noop ("# bytes purged (low-priority)"),
448                             size,
449                             GNUNET_YES);
450   GNUNET_CONTAINER_bloomfilter_remove (filter,
451                                        key);
452   return GNUNET_NO;
453 }
454
455
456 /**
457  * Manage available disk space by running tasks
458  * that will discard content if necessary.  This
459  * function will be run whenever a request for
460  * "need" bytes of storage could only be satisfied
461  * by eating into the "cache" (and we want our cache
462  * space back).
463  *
464  * @param need number of bytes of content that were
465  *        placed into the "cache" (and hence the
466  *        number of bytes that should be removed).
467  */
468 static void
469 manage_space (unsigned long long need)
470 {
471   unsigned long long *n;
472
473 #if DEBUG_DATASTORE
474   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
475               "Asked to free up %llu bytes of cache space\n",
476               need);
477 #endif
478   n = GNUNET_malloc (sizeof(unsigned long long));
479   *n = need;
480   plugin->api->iter_low_priority (plugin->api->cls,
481                                   0,
482                                   &manage,
483                                   n);
484 }
485
486
487 /**
488  * Function called to notify a client about the socket
489  * begin ready to queue more data.  "buf" will be
490  * NULL and "size" zero if the socket was closed for
491  * writing in the meantime.
492  *
493  * @param cls closure
494  * @param size number of bytes available in buf
495  * @param buf where the callee should write the message
496  * @return number of bytes written to buf
497  */
498 static size_t
499 transmit_callback (void *cls,
500                    size_t size, void *buf)
501 {
502   struct TransmitCallbackContext *tcc = cls;
503   size_t msize;
504   
505   tcc->th = NULL;
506   GNUNET_CONTAINER_DLL_remove (tcc_head,
507                                tcc_tail,
508                                tcc);
509   msize = ntohs(tcc->msg->size);
510   if (size == 0)
511     {
512       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
513                   _("Transmission to client failed!\n"));
514       if (tcc->tc != NULL)
515         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
516       if (GNUNET_YES == tcc->end)
517         {
518           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
519                       _("Disconnecting client due to transmission failure!\n"));
520           GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
521         }
522       GNUNET_SERVER_client_drop (tcc->client);
523       GNUNET_free (tcc->msg);
524       GNUNET_free (tcc);
525       return 0;
526     }
527   GNUNET_assert (size >= msize);
528   memcpy (buf, tcc->msg, msize);
529   if (tcc->tc != NULL)
530     tcc->tc (tcc->tc_cls, GNUNET_OK);
531   if (GNUNET_YES == tcc->end)
532     {
533       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
534     }
535   else
536     {
537 #if DEBUG_DATASTORE
538       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
539                   "Response transmitted, more pending!\n");
540 #endif
541     }
542   GNUNET_SERVER_client_drop (tcc->client);
543   GNUNET_free (tcc->msg);
544   GNUNET_free (tcc);
545   return msize;
546 }
547
548
549 /**
550  * Transmit the given message to the client.
551  *
552  * @param client target of the message
553  * @param msg message to transmit, will be freed!
554  * @param tc function to call afterwards
555  * @param tc_cls closure for tc
556  * @param end is this the last response (and we should
557  *        signal the server completion accodingly after
558  *        transmitting this message)?
559  */
560 static void
561 transmit (struct GNUNET_SERVER_Client *client,
562           struct GNUNET_MessageHeader *msg,
563           TransmitContinuation tc,
564           void *tc_cls,
565           int end)
566 {
567   struct TransmitCallbackContext *tcc;
568
569   if (GNUNET_YES == cleaning_done)
570     {
571 #if DEBUG_DATASTORE
572       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
573                   "Shutdown in progress, aborting transmission.\n");
574 #endif
575       GNUNET_free (msg);
576       if (NULL != tc)
577         tc (tc_cls, GNUNET_SYSERR);
578       return;
579     }
580   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
581   tcc->msg = msg;
582   tcc->client = client;
583   tcc->tc = tc;
584   tcc->tc_cls = tc_cls;
585   tcc->end = end;
586   if (NULL ==
587       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
588                                                       ntohs(msg->size),
589                                                       GNUNET_TIME_UNIT_FOREVER_REL,
590                                                       &transmit_callback,
591                                                       tcc)))
592     {
593       GNUNET_break (0);
594       if (GNUNET_YES == end)
595         {
596           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
597                       _("Forcefully disconnecting client.\n"));
598           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
599         }
600       if (NULL != tc)
601         tc (tc_cls, GNUNET_SYSERR);
602       GNUNET_free (msg);
603       GNUNET_free (tcc);
604       return;
605     }
606   GNUNET_SERVER_client_keep (client);
607   GNUNET_CONTAINER_DLL_insert (tcc_head,
608                                tcc_tail,
609                                tcc);
610 }
611
612
613 /**
614  * Transmit a status code to the client.
615  *
616  * @param client receiver of the response
617  * @param code status code
618  * @param msg optional error message (can be NULL)
619  */
620 static void
621 transmit_status (struct GNUNET_SERVER_Client *client,
622                  int code,
623                  const char *msg)
624 {
625   struct StatusMessage *sm;
626   size_t slen;
627
628 #if DEBUG_DATASTORE
629   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
630               "Transmitting `%s' message with value %d and message `%s'\n",
631               "STATUS",
632               code,
633               msg != NULL ? msg : "(none)");
634 #endif
635   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
636   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
637   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
638   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
639   sm->status = htonl(code);
640   if (slen > 0)
641     memcpy (&sm[1], msg, slen);  
642   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
643 }
644
645
646 /**
647  * Function called once the transmit operation has
648  * either failed or succeeded.
649  *
650  * @param next_cls closure for calling "next_request" callback
651  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
652  */
653 static void 
654 get_next(void *next_cls,
655          int status)
656 {
657   if (status != GNUNET_OK)
658     {
659       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
660                   _("Failed to transmit an item to the client; aborting iteration.\n"));
661       if (plugin != NULL)
662         plugin->api->next_request (next_cls, GNUNET_YES);
663       return;
664     }
665   plugin->api->next_request (next_cls, GNUNET_NO);
666 }
667
668
669 /**
670  * Function that will transmit the given datastore entry
671  * to the client.
672  *
673  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
674  * @param next_cls closure to use to ask for the next item
675  * @param key key for the content
676  * @param size number of bytes in data
677  * @param data content stored
678  * @param type type of the content
679  * @param priority priority of the content
680  * @param anonymity anonymity-level for the content
681  * @param expiration expiration time for the content
682  * @param uid unique identifier for the datum;
683  *        maybe 0 if no unique identifier is available
684  *
685  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
686  *         GNUNET_NO to delete the item and continue (if supported)
687  */
688 static int
689 transmit_item (void *cls,
690                void *next_cls,
691                const GNUNET_HashCode * key,
692                uint32_t size,
693                const void *data,
694                enum GNUNET_BLOCK_Type type,
695                uint32_t priority,
696                uint32_t anonymity,
697                struct GNUNET_TIME_Absolute
698                expiration, uint64_t uid)
699 {
700   struct GNUNET_SERVER_Client *client = cls;
701   struct GNUNET_MessageHeader *end;
702   struct DataMessage *dm;
703
704   if (key == NULL)
705     {
706       /* transmit 'DATA_END' */
707 #if DEBUG_DATASTORE
708       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
709                   "Transmitting `%s' message\n",
710                   "DATA_END");
711 #endif
712       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
713       end->size = htons(sizeof(struct GNUNET_MessageHeader));
714       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
715       transmit (client, end, NULL, NULL, GNUNET_YES);
716       GNUNET_SERVER_client_drop (client);
717       return GNUNET_OK;
718     }
719   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
720   dm->header.size = htons(sizeof(struct DataMessage) + size);
721   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
722   dm->rid = htonl(0);
723   dm->size = htonl(size);
724   dm->type = htonl(type);
725   dm->priority = htonl(priority);
726   dm->anonymity = htonl(anonymity);
727   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
728   dm->uid = GNUNET_htonll(uid);
729   dm->key = *key;
730   memcpy (&dm[1], data, size);
731 #if DEBUG_DATASTORE
732   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
733               "Transmitting `%s' message for `%s' of type %u\n",
734               "DATA",
735               GNUNET_h2s (key),
736               type);
737 #endif
738   GNUNET_STATISTICS_update (stats,
739                             gettext_noop ("# results found"),
740                             1,
741                             GNUNET_NO);
742   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
743   return GNUNET_OK;
744 }
745
746
747 /**
748  * Handle RESERVE-message.
749  *
750  * @param cls closure
751  * @param client identification of the client
752  * @param message the actual message
753  */
754 static void
755 handle_reserve (void *cls,
756                 struct GNUNET_SERVER_Client *client,
757                 const struct GNUNET_MessageHeader *message)
758 {
759   /**
760    * Static counter to produce reservation identifiers.
761    */
762   static int reservation_gen;
763
764   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
765   struct ReservationList *e;
766   unsigned long long used;
767   unsigned long long req;
768   uint64_t amount;
769   uint32_t entries;
770
771 #if DEBUG_DATASTORE
772   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
773               "Processing `%s' request\n",
774               "RESERVE");
775 #endif
776   amount = GNUNET_ntohll(msg->amount);
777   entries = ntohl(msg->entries);
778   used = payload + reserved;
779   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
780   if (used + req > quota)
781     {
782       if (quota < used)
783         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
784       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
785                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
786                   quota - used,
787                   "RESERVE",
788                   req);
789       if (cache_size < req)
790         {
791           /* TODO: document this in the FAQ; essentially, if this
792              message happens, the insertion request could be blocked
793              by less-important content from migration because it is
794              larger than 1/8th of the overall available space, and
795              we only reserve 1/8th for "fresh" insertions */
796           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
797                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
798                       req,
799                       cache_size);
800           transmit_status (client, 0, 
801                            gettext_noop ("Insufficient space to satisfy request and "
802                                          "requested amount is larger than cache size"));
803         }
804       else
805         {
806           transmit_status (client, 0, 
807                            gettext_noop ("Insufficient space to satisfy request"));
808         }
809       return;      
810     }
811   reserved += req;
812   GNUNET_STATISTICS_set (stats,
813                          gettext_noop ("# reserved"),
814                          reserved,
815                          GNUNET_NO);
816   e = GNUNET_malloc (sizeof(struct ReservationList));
817   e->next = reservations;
818   reservations = e;
819   e->client = client;
820   e->amount = amount;
821   e->entries = entries;
822   e->rid = ++reservation_gen;
823   if (reservation_gen < 0)
824     reservation_gen = 0; /* wrap around */
825   transmit_status (client, e->rid, NULL);
826 }
827
828
829 /**
830  * Handle RELEASE_RESERVE-message.
831  *
832  * @param cls closure
833  * @param client identification of the client
834  * @param message the actual message
835  */
836 static void
837 handle_release_reserve (void *cls,
838                         struct GNUNET_SERVER_Client *client,
839                         const struct GNUNET_MessageHeader *message)
840 {
841   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
842   struct ReservationList *pos;
843   struct ReservationList *prev;
844   struct ReservationList *next;
845   int rid = ntohl(msg->rid);
846   unsigned long long rem;
847
848 #if DEBUG_DATASTORE
849   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850               "Processing `%s' request\n",
851               "RELEASE_RESERVE");
852 #endif
853   next = reservations;
854   prev = NULL;
855   while (NULL != (pos = next))
856     {
857       next = pos->next;
858       if (rid == pos->rid)
859         {
860           if (prev == NULL)
861             reservations = next;
862           else
863             prev->next = next;
864           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
865           GNUNET_assert (reserved >= rem);
866           reserved -= rem;
867           GNUNET_STATISTICS_set (stats,
868                          gettext_noop ("# reserved"),
869                                  reserved,
870                                  GNUNET_NO);
871 #if DEBUG_DATASTORE
872           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873                       "Returning %llu remaining reserved bytes to storage pool\n",
874                       rem);
875 #endif    
876           GNUNET_free (pos);
877           transmit_status (client, GNUNET_OK, NULL);
878           return;
879         }       
880       prev = pos;
881     }
882   GNUNET_break (0);
883   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
884 }
885
886
887 /**
888  * Check that the given message is a valid data message.
889  *
890  * @return NULL if the message is not well-formed, otherwise the message
891  */
892 static const struct DataMessage *
893 check_data (const struct GNUNET_MessageHeader *message)
894 {
895   uint16_t size;
896   uint32_t dsize;
897   const struct DataMessage *dm;
898
899   size = ntohs(message->size);
900   if (size < sizeof(struct DataMessage))
901     { 
902       GNUNET_break (0);
903       return NULL;
904     }
905   dm = (const struct DataMessage *) message;
906   dsize = ntohl(dm->size);
907   if (size != dsize + sizeof(struct DataMessage))
908     {
909       GNUNET_break (0);
910       return NULL;
911     }
912   return dm;
913 }
914
915
916 /**
917  * Context for a put request used to see if the content is
918  * already present.
919  */
920 struct PutContext
921 {
922   /**
923    * Client to notify on completion.
924    */
925   struct GNUNET_SERVER_Client *client;
926
927   /**
928    * Did we find the data already in the database?
929    */
930   int is_present;
931   
932   /* followed by the 'struct DataMessage' */
933 };
934
935
936 /**
937  * Actually put the data message.
938  */
939 static void
940 execute_put (struct GNUNET_SERVER_Client *client,
941              const struct DataMessage *dm)
942 {
943   uint32_t size;
944   char *msg;
945   int ret;
946
947   size = ntohl(dm->size);
948   msg = NULL;
949   ret = plugin->api->put (plugin->api->cls,
950                           &dm->key,
951                           size,
952                           &dm[1],
953                           ntohl(dm->type),
954                           ntohl(dm->priority),
955                           ntohl(dm->anonymity),
956                           0 /* FIXME: replication */,
957                           GNUNET_TIME_absolute_ntoh(dm->expiration),
958                           &msg);
959   if (GNUNET_OK == ret)
960     {
961       GNUNET_STATISTICS_update (stats,
962                                 gettext_noop ("# bytes stored"),
963                                 size,
964                                 GNUNET_YES);
965       GNUNET_CONTAINER_bloomfilter_add (filter,
966                                         &dm->key);
967 #if DEBUG_DATASTORE
968       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969                   "Successfully stored %u bytes of type %u under key `%s'\n",
970                   size,
971                   ntohl(dm->type),
972                   GNUNET_h2s (&dm->key));
973 #endif
974     }
975   transmit_status (client, 
976                    ret,
977                    msg);
978   GNUNET_free_non_null (msg);
979   if (quota - reserved - cache_size < payload)
980     {
981       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
982                   _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
983                   (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
984                   (unsigned long long) (quota - reserved - cache_size),
985                   (unsigned long long) payload);
986       manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
987     }
988 }
989
990
991
992 /**
993  * Function that will check if the given datastore entry
994  * matches the put and if none match executes the put.
995  *
996  * @param cls closure, pointer to the client (of type 'struct PutContext').
997  * @param next_cls closure to use to ask for the next item
998  * @param key key for the content
999  * @param size number of bytes in data
1000  * @param data content stored
1001  * @param type type of the content
1002  * @param priority priority of the content
1003  * @param anonymity anonymity-level for the content
1004  * @param expiration expiration time for the content
1005  * @param uid unique identifier for the datum;
1006  *        maybe 0 if no unique identifier is available
1007  *
1008  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
1009  *         GNUNET_NO to delete the item and continue (if supported)
1010  */
1011 static int
1012 check_present (void *cls,
1013                void *next_cls,
1014                const GNUNET_HashCode * key,
1015                uint32_t size,
1016                const void *data,
1017                enum GNUNET_BLOCK_Type type,
1018                uint32_t priority,
1019                uint32_t anonymity,
1020                struct GNUNET_TIME_Absolute
1021                expiration, uint64_t uid)
1022 {
1023   struct PutContext *pc = cls;
1024   const struct DataMessage *dm;
1025
1026   dm = (const struct DataMessage*) &pc[1];
1027   if (key == NULL)
1028     {
1029       if (pc->is_present == GNUNET_YES) 
1030         transmit_status (pc->client, GNUNET_NO, NULL);
1031       else
1032         execute_put (pc->client, dm);
1033       GNUNET_SERVER_client_drop (pc->client);
1034       GNUNET_free (pc);
1035       return GNUNET_SYSERR;
1036     }
1037   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
1038        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
1039        ( (size == ntohl(dm->size)) &&
1040          (0 == memcmp (&dm[1],
1041                        data,
1042                        size)) ) )
1043     {
1044       pc->is_present = GNUNET_YES;
1045       plugin->api->next_request (next_cls, GNUNET_YES);
1046     }
1047   else
1048     {
1049       plugin->api->next_request (next_cls, GNUNET_NO);
1050     }
1051   return GNUNET_OK;
1052 }
1053
1054
1055 /**
1056  * Handle PUT-message.
1057  *
1058  * @param cls closure
1059  * @param client identification of the client
1060  * @param message the actual message
1061  */
1062 static void
1063 handle_put (void *cls,
1064             struct GNUNET_SERVER_Client *client,
1065             const struct GNUNET_MessageHeader *message)
1066 {
1067   const struct DataMessage *dm = check_data (message);
1068   int rid;
1069   struct ReservationList *pos;
1070   struct PutContext *pc;
1071   uint32_t size;
1072
1073   if ( (dm == NULL) ||
1074        (ntohl(dm->type) == 0) ) 
1075     {
1076       GNUNET_break (0);
1077       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1078       return;
1079     }
1080 #if DEBUG_DATASTORE
1081   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1082               "Processing `%s' request for `%s' of type %u\n",
1083               "PUT",
1084               GNUNET_h2s (&dm->key),
1085               ntohl (dm->type));
1086 #endif
1087   rid = ntohl(dm->rid);
1088   size = ntohl(dm->size);
1089   if (rid > 0)
1090     {
1091       pos = reservations;
1092       while ( (NULL != pos) &&
1093               (rid != pos->rid) )
1094         pos = pos->next;
1095       GNUNET_break (pos != NULL);
1096       if (NULL != pos)
1097         {
1098           GNUNET_break (pos->entries > 0);
1099           GNUNET_break (pos->amount >= size);
1100           pos->entries--;
1101           pos->amount -= size;
1102           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1103           GNUNET_STATISTICS_set (stats,
1104                                  gettext_noop ("# reserved"),
1105                                  reserved,
1106                                  GNUNET_NO);
1107         }
1108     }
1109   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1110                                                        &dm->key))
1111     {
1112       pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1113       pc->client = client;
1114       GNUNET_SERVER_client_keep (client);
1115       memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1116       plugin->api->get (plugin->api->cls,
1117                         &dm->key,
1118                         NULL,
1119                         ntohl (dm->type),
1120                         &check_present,
1121                         pc);      
1122       return;
1123     }
1124   execute_put (client, dm);
1125 }
1126
1127
1128 /**
1129  * Handle GET-message.
1130  *
1131  * @param cls closure
1132  * @param client identification of the client
1133  * @param message the actual message
1134  */
1135 static void
1136 handle_get (void *cls,
1137             struct GNUNET_SERVER_Client *client,
1138             const struct GNUNET_MessageHeader *message)
1139 {
1140   const struct GetMessage *msg;
1141   uint16_t size;
1142
1143   size = ntohs(message->size);
1144   if ( (size != sizeof(struct GetMessage)) &&
1145        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
1146     {
1147       GNUNET_break (0);
1148       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1149       return;
1150     }
1151   msg = (const struct GetMessage*) message;
1152 #if DEBUG_DATASTORE
1153   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1154               "Processing `%s' request for `%s' of type %u\n",
1155               "GET",
1156               GNUNET_h2s (&msg->key),
1157               ntohl (msg->type));
1158 #endif
1159   GNUNET_STATISTICS_update (stats,
1160                             gettext_noop ("# GET requests received"),
1161                             1,
1162                             GNUNET_NO);
1163   GNUNET_SERVER_client_keep (client);
1164   if ( (size == sizeof(struct GetMessage)) &&
1165        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
1166                                                          &msg->key)) )
1167     {
1168       /* don't bother database... */
1169 #if DEBUG_DATASTORE
1170       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1171                   "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1172                   "GET",
1173                   GNUNET_h2s (&msg->key));
1174 #endif  
1175       GNUNET_STATISTICS_update (stats,
1176                                 gettext_noop ("# requests filtered by bloomfilter"),
1177                                 1,
1178                                 GNUNET_NO);
1179       transmit_item (client,
1180                      NULL, NULL, 0, NULL, 0, 0, 0, 
1181                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
1182       return;
1183     }
1184   plugin->api->get (plugin->api->cls,
1185                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1186                     NULL,
1187                     ntohl(msg->type),
1188                     &transmit_item,
1189                     client);    
1190 }
1191
1192
1193 /**
1194  * Handle UPDATE-message.
1195  *
1196  * @param cls closure
1197  * @param client identification of the client
1198  * @param message the actual message
1199  */
1200 static void
1201 handle_update (void *cls,
1202                struct GNUNET_SERVER_Client *client,
1203                const struct GNUNET_MessageHeader *message)
1204 {
1205   const struct UpdateMessage *msg;
1206   int ret;
1207   char *emsg;
1208
1209   GNUNET_STATISTICS_update (stats,
1210                             gettext_noop ("# UPDATE requests received"),
1211                             1,
1212                             GNUNET_NO);
1213   msg = (const struct UpdateMessage*) message;
1214   emsg = NULL;
1215 #if DEBUG_DATASTORE
1216   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1217               "Processing `%s' request for %llu\n",
1218               "UPDATE",
1219               (unsigned long long) GNUNET_ntohll (msg->uid));
1220 #endif
1221   ret = plugin->api->update (plugin->api->cls,
1222                              GNUNET_ntohll(msg->uid),
1223                              (int32_t) ntohl(msg->priority),
1224                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1225                              &emsg);
1226   transmit_status (client, ret, emsg);
1227   GNUNET_free_non_null (emsg);
1228 }
1229
1230
1231 /**
1232  * Handle GET_RANDOM-message.
1233  *
1234  * @param cls closure
1235  * @param client identification of the client
1236  * @param message the actual message
1237  */
1238 static void
1239 handle_get_random (void *cls,
1240                    struct GNUNET_SERVER_Client *client,
1241                    const struct GNUNET_MessageHeader *message)
1242 {
1243 #if DEBUG_DATASTORE
1244   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1245               "Processing `%s' request\n",
1246               "GET_RANDOM");
1247 #endif
1248   GNUNET_STATISTICS_update (stats,
1249                             gettext_noop ("# GET RANDOM requests received"),
1250                             1,
1251                             GNUNET_NO);
1252   GNUNET_SERVER_client_keep (client);
1253   plugin->api->iter_migration_order (plugin->api->cls,
1254                                      GNUNET_BLOCK_TYPE_ANY,
1255                                      &transmit_item,
1256                                      client);  
1257 }
1258
1259 /**
1260  * Handle GET_ZERO_ANONYMITY-message.
1261  *
1262  * @param cls closure
1263  * @param client identification of the client
1264  * @param message the actual message
1265  */
1266 static void
1267 handle_get_zero_anonymity (void *cls,
1268                            struct GNUNET_SERVER_Client *client,
1269                            const struct GNUNET_MessageHeader *message)
1270 {
1271   const struct GetZeroAnonymityMessage * msg = (const struct GetZeroAnonymityMessage*) message;
1272   enum GNUNET_BLOCK_Type type;
1273
1274   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1275 #if DEBUG_DATASTORE
1276   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1277               "Processing `%s' request\n",
1278               "GET_ZERO_ANONYMITY");
1279 #endif
1280   GNUNET_STATISTICS_update (stats,
1281                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1282                             1,
1283                             GNUNET_NO);
1284   GNUNET_SERVER_client_keep (client);
1285   plugin->api->iter_zero_anonymity (plugin->api->cls,
1286                                     type,
1287                                     &transmit_item,
1288                                     client);  
1289 }
1290
1291
1292 /**
1293  * Context for the 'remove_callback'.
1294  */
1295 struct RemoveContext 
1296 {
1297   /**
1298    * Client for whom we're doing the remvoing.
1299    */
1300   struct GNUNET_SERVER_Client *client;
1301
1302   /**
1303    * GNUNET_YES if we managed to remove something.
1304    */
1305   int found;
1306 };
1307
1308
1309 /**
1310  * Callback function that will cause the item that is passed
1311  * in to be deleted (by returning GNUNET_NO).
1312  */
1313 static int
1314 remove_callback (void *cls,
1315                  void *next_cls,
1316                  const GNUNET_HashCode * key,
1317                  uint32_t size,
1318                  const void *data,
1319                  enum GNUNET_BLOCK_Type type,
1320                  uint32_t priority,
1321                  uint32_t anonymity,
1322                  struct GNUNET_TIME_Absolute
1323                  expiration, uint64_t uid)
1324 {
1325   struct RemoveContext *rc = cls;
1326
1327   if (key == NULL)
1328     {
1329 #if DEBUG_DATASTORE
1330       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331                   "No further matches for `%s' request.\n",
1332                   "REMOVE");
1333 #endif  
1334       if (GNUNET_YES == rc->found)
1335         transmit_status (rc->client, GNUNET_OK, NULL);       
1336       else
1337         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1338       GNUNET_SERVER_client_drop (rc->client);
1339       GNUNET_free (rc);
1340       return GNUNET_OK; /* last item */
1341     }
1342   rc->found = GNUNET_YES;
1343 #if DEBUG_DATASTORE
1344   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1345               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1346               (unsigned long long) uid,
1347               "REMOVE",
1348               GNUNET_h2s (key),
1349               type);
1350 #endif  
1351   GNUNET_STATISTICS_update (stats,
1352                             gettext_noop ("# bytes removed (explicit request)"),
1353                             size,
1354                             GNUNET_YES);
1355   GNUNET_CONTAINER_bloomfilter_remove (filter,
1356                                        key);
1357   plugin->api->next_request (next_cls, GNUNET_YES);
1358   return GNUNET_NO;
1359 }
1360
1361
1362 /**
1363  * Handle REMOVE-message.
1364  *
1365  * @param cls closure
1366  * @param client identification of the client
1367  * @param message the actual message
1368  */
1369 static void
1370 handle_remove (void *cls,
1371              struct GNUNET_SERVER_Client *client,
1372              const struct GNUNET_MessageHeader *message)
1373 {
1374   const struct DataMessage *dm = check_data (message);
1375   GNUNET_HashCode vhash;
1376   struct RemoveContext *rc;
1377
1378   if (dm == NULL)
1379     {
1380       GNUNET_break (0);
1381       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1382       return;
1383     }
1384 #if DEBUG_DATASTORE
1385   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386               "Processing `%s' request for `%s' of type %u\n",
1387               "REMOVE",
1388               GNUNET_h2s (&dm->key),
1389               ntohl (dm->type));
1390 #endif
1391   GNUNET_STATISTICS_update (stats,
1392                             gettext_noop ("# REMOVE requests received"),
1393                             1,
1394                             GNUNET_NO);
1395   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1396   GNUNET_SERVER_client_keep (client);
1397   rc->client = client;
1398   GNUNET_CRYPTO_hash (&dm[1],
1399                       ntohl(dm->size),
1400                       &vhash);
1401   plugin->api->get (plugin->api->cls,
1402                     &dm->key,
1403                     &vhash,
1404                     (enum GNUNET_BLOCK_Type) ntohl(dm->type),
1405                     &remove_callback,
1406                     rc);
1407 }
1408
1409
1410 /**
1411  * Handle DROP-message.
1412  *
1413  * @param cls closure
1414  * @param client identification of the client
1415  * @param message the actual message
1416  */
1417 static void
1418 handle_drop (void *cls,
1419              struct GNUNET_SERVER_Client *client,
1420              const struct GNUNET_MessageHeader *message)
1421 {
1422 #if DEBUG_DATASTORE
1423   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1424               "Processing `%s' request\n",
1425               "DROP");
1426 #endif
1427   plugin->api->drop (plugin->api->cls);
1428   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1429 }
1430
1431
1432 /**
1433  * Function called by plugins to notify us about a
1434  * change in their disk utilization.
1435  *
1436  * @param cls closure (NULL)
1437  * @param delta change in disk utilization, 
1438  *        0 for "reset to empty"
1439  */
1440 static void
1441 disk_utilization_change_cb (void *cls,
1442                             int delta)
1443 {
1444   if ( (delta < 0) &&
1445        (payload < -delta) )
1446     {
1447       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1448                   _("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1449                   (long long) payload,
1450                   (long long) -delta);
1451       payload = plugin->api->get_size (plugin->api->cls);
1452       sync_stats ();
1453       return;
1454     }
1455   payload += delta;
1456   lastSync++;
1457   if (lastSync >= MAX_STAT_SYNC_LAG)
1458     sync_stats ();
1459 }
1460
1461
1462 /**
1463  * Callback function to process statistic values.
1464  *
1465  * @param cls closure (struct Plugin*)
1466  * @param subsystem name of subsystem that created the statistic
1467  * @param name the name of the datum
1468  * @param value the current value
1469  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1470  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1471  */
1472 static int
1473 process_stat_in (void *cls,
1474                  const char *subsystem,
1475                  const char *name,
1476                  uint64_t value,
1477                  int is_persistent)
1478 {
1479   GNUNET_assert (stats_worked == GNUNET_NO);
1480   stats_worked = GNUNET_YES;
1481   payload += value;
1482 #if DEBUG_SQLITE
1483   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1484               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1485               abs_value,
1486               payload);
1487 #endif
1488   return GNUNET_OK;
1489 }
1490
1491
1492 static void
1493 process_stat_done (void *cls,
1494                    int success)
1495 {
1496   struct DatastorePlugin *plugin = cls;
1497
1498   stat_get = NULL;
1499   if (stats_worked == GNUNET_NO) 
1500     payload = plugin->api->get_size (plugin->api->cls);
1501 }
1502
1503
1504 /**
1505  * Load the datastore plugin.
1506  */
1507 static struct DatastorePlugin *
1508 load_plugin () 
1509 {
1510   struct DatastorePlugin *ret;
1511   char *libname;
1512   char *name;
1513
1514   if (GNUNET_OK !=
1515       GNUNET_CONFIGURATION_get_value_string (cfg,
1516                                              "DATASTORE", "DATABASE", &name))
1517     {
1518       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1519                   _("No `%s' specified for `%s' in configuration!\n"),
1520                   "DATABASE",
1521                   "DATASTORE");
1522       return NULL;
1523     }
1524   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1525   ret->env.cfg = cfg;
1526   ret->env.duc = &disk_utilization_change_cb;
1527   ret->env.cls = NULL;
1528   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1529               _("Loading `%s' datastore plugin\n"), name);
1530   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1531   ret->short_name = name;
1532   ret->lib_name = libname;
1533   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1534   if (ret->api == NULL)
1535     {
1536       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1537                   _("Failed to load datastore plugin for `%s'\n"), name);
1538       GNUNET_free (ret->short_name);
1539       GNUNET_free (libname);
1540       GNUNET_free (ret);
1541       return NULL;
1542     }
1543   return ret;
1544 }
1545
1546
1547 /**
1548  * Function called when the service shuts
1549  * down.  Unloads our datastore plugin.
1550  *
1551  * @param plug plugin to unload
1552  */
1553 static void
1554 unload_plugin (struct DatastorePlugin *plug)
1555 {
1556 #if DEBUG_DATASTORE
1557   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1558               "Datastore service is unloading plugin...\n");
1559 #endif
1560   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1561   GNUNET_free (plug->lib_name);
1562   GNUNET_free (plug->short_name);
1563   GNUNET_free (plug);
1564 }
1565
1566
1567 /**
1568  * Final task run after shutdown.  Unloads plugins and disconnects us from
1569  * statistics.
1570  */
1571 static void
1572 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1573 {
1574   unload_plugin (plugin);
1575   plugin = NULL;
1576   if (filter != NULL)
1577     {
1578       GNUNET_CONTAINER_bloomfilter_free (filter);
1579       filter = NULL;
1580     }
1581   if (lastSync > 0)
1582     sync_stats ();
1583   if (stat_get != NULL)
1584     {
1585       GNUNET_STATISTICS_get_cancel (stat_get);
1586       stat_get = NULL;
1587     }
1588   if (stats != NULL)
1589     {
1590       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1591       stats = NULL;
1592     }
1593 }
1594
1595
1596 /**
1597  * Last task run during shutdown.  Disconnects us from
1598  * the transport and core.
1599  */
1600 static void
1601 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1602 {
1603   struct TransmitCallbackContext *tcc;
1604
1605   cleaning_done = GNUNET_YES;
1606   while (NULL != (tcc = tcc_head))
1607     {
1608       GNUNET_CONTAINER_DLL_remove (tcc_head,
1609                                    tcc_tail,
1610                                    tcc);
1611       if (tcc->th != NULL)
1612         {
1613           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1614           GNUNET_SERVER_client_drop (tcc->client);
1615         }
1616       if (NULL != tcc->tc)
1617         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1618       GNUNET_free (tcc->msg);
1619       GNUNET_free (tcc);
1620     }
1621   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1622     {
1623       GNUNET_SCHEDULER_cancel (expired_kill_task);
1624       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1625     }
1626   GNUNET_SCHEDULER_add_continuation (&unload_task,
1627                                      NULL,
1628                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1629 }
1630
1631
1632 /**
1633  * Function that removes all active reservations made
1634  * by the given client and releases the space for other
1635  * requests.
1636  *
1637  * @param cls closure
1638  * @param client identification of the client
1639  */
1640 static void
1641 cleanup_reservations (void *cls,
1642                       struct GNUNET_SERVER_Client
1643                       * client)
1644 {
1645   struct ReservationList *pos;
1646   struct ReservationList *prev;
1647   struct ReservationList *next;
1648
1649   if (client == NULL)
1650     return;
1651   prev = NULL;
1652   pos = reservations;
1653   while (NULL != pos)
1654     {
1655       next = pos->next;
1656       if (pos->client == client)
1657         {
1658           if (prev == NULL)
1659             reservations = next;
1660           else
1661             prev->next = next;
1662           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1663           GNUNET_free (pos);
1664         }
1665       else
1666         {
1667           prev = pos;
1668         }
1669       pos = next;
1670     }
1671   GNUNET_STATISTICS_set (stats,
1672                          gettext_noop ("# reserved"),
1673                          reserved,
1674                          GNUNET_NO);
1675 }
1676
1677
1678 /**
1679  * Process datastore requests.
1680  *
1681  * @param cls closure
1682  * @param server the initialized server
1683  * @param c configuration to use
1684  */
1685 static void
1686 run (void *cls,
1687      struct GNUNET_SERVER_Handle *server,
1688      const struct GNUNET_CONFIGURATION_Handle *c)
1689 {
1690   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1691     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1692      sizeof(struct ReserveMessage) }, 
1693     {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1694      sizeof(struct ReleaseReserveMessage) }, 
1695     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1696     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1697      sizeof (struct UpdateMessage) }, 
1698     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1699     {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1700      sizeof(struct GNUNET_MessageHeader) }, 
1701     {&handle_get_zero_anonymity, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, 
1702      sizeof(struct GetZeroAnonymityMessage) }, 
1703     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1704     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1705      sizeof(struct GNUNET_MessageHeader) }, 
1706     {NULL, NULL, 0, 0}
1707   };
1708   char *fn;
1709   unsigned int bf_size;
1710
1711   cfg = c;
1712   if (GNUNET_OK !=
1713       GNUNET_CONFIGURATION_get_value_number (cfg,
1714                                              "DATASTORE", "QUOTA", &quota))
1715     {
1716       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1717                   _("No `%s' specified for `%s' in configuration!\n"),
1718                   "QUOTA",
1719                   "DATASTORE");
1720       return;
1721     }
1722   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1723   GNUNET_STATISTICS_set (stats,
1724                          gettext_noop ("# quota"),
1725                          quota,
1726                          GNUNET_NO);
1727   cache_size = quota / 8; /* Or should we make this an option? */
1728   GNUNET_STATISTICS_set (stats,
1729                          gettext_noop ("# cache size"),
1730                          cache_size,
1731                          GNUNET_NO);
1732   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1733   fn = NULL;
1734   if ( (GNUNET_OK !=
1735         GNUNET_CONFIGURATION_get_value_filename (cfg,
1736                                                  "DATASTORE",
1737                                                  "BLOOMFILTER",
1738                                                  &fn)) ||
1739        (GNUNET_OK !=
1740         GNUNET_DISK_directory_create_for_file (fn)) )
1741     {
1742       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1743                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1744                   fn != NULL ? fn : "");
1745       GNUNET_free_non_null (fn);
1746       fn = NULL;
1747     }
1748   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1749   GNUNET_free_non_null (fn);
1750   if (filter == NULL)
1751     {
1752       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1753                   _("Failed to initialize bloomfilter.\n"));
1754       if (stats != NULL)
1755         {
1756           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1757           stats = NULL;
1758         }
1759       return;
1760     }
1761   plugin = load_plugin ();
1762   if (NULL == plugin)
1763     {
1764       GNUNET_CONTAINER_bloomfilter_free (filter);
1765       filter = NULL;
1766       if (stats != NULL)
1767         {
1768           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1769           stats = NULL;
1770         }
1771       return;
1772     }
1773   stat_get = GNUNET_STATISTICS_get (stats,
1774                                     "datastore",
1775                                     QUOTA_STAT_NAME,
1776                                     GNUNET_TIME_UNIT_SECONDS,
1777                                     &process_stat_done,
1778                                     &process_stat_in,
1779                                     plugin);
1780   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1781   GNUNET_SERVER_add_handlers (server, handlers);
1782   expired_kill_task
1783     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1784                                           &delete_expired, NULL);
1785   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1786                                 &cleaning_task, NULL);
1787 }
1788
1789
1790 /**
1791  * The main function for the datastore service.
1792  *
1793  * @param argc number of arguments from the command line
1794  * @param argv command line arguments
1795  * @return 0 ok, 1 on error
1796  */
1797 int
1798 main (int argc, char *const *argv)
1799 {
1800   int ret;
1801
1802   ret = (GNUNET_OK ==
1803          GNUNET_SERVICE_run (argc,
1804                              argv,
1805                              "datastore",
1806                              GNUNET_SERVICE_OPTION_NONE,
1807                              &run, NULL)) ? 0 : 1;
1808   return ret;
1809 }
1810
1811
1812 /* end of gnunet-service-datastore.c */