fix
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51
52 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
53
54 /**
55  * After how many payload-changing operations
56  * do we sync our statistics?
57  */
58 #define MAX_STAT_SYNC_LAG 50
59
60
61 /**
62  * Our datastore plugin.
63  */
64 struct DatastorePlugin
65 {
66
67   /**
68    * API of the transport as returned by the plugin's
69    * initialization function.
70    */
71   struct GNUNET_DATASTORE_PluginFunctions *api;
72
73   /**
74    * Short name for the plugin (i.e. "sqlite").
75    */
76   char *short_name;
77
78   /**
79    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
80    */
81   char *lib_name;
82
83   /**
84    * Environment this transport service is using
85    * for this plugin.
86    */
87   struct GNUNET_DATASTORE_PluginEnvironment env;
88
89 };
90
91
92 /**
93  * Linked list of active reservations.
94  */
95 struct ReservationList 
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct ReservationList *next;
102
103   /**
104    * Client that made the reservation.
105    */
106   struct GNUNET_SERVER_Client *client;
107
108   /**
109    * Number of bytes (still) reserved.
110    */
111   uint64_t amount;
112
113   /**
114    * Number of items (still) reserved.
115    */
116   uint64_t entries;
117
118   /**
119    * Reservation identifier.
120    */
121   int32_t rid;
122
123 };
124
125
126
127 /**
128  * Our datastore plugin (NULL if not available).
129  */
130 static struct DatastorePlugin *plugin;
131
132 /**
133  * Linked list of space reservations made by clients.
134  */
135 static struct ReservationList *reservations;
136
137 /**
138  * Bloomfilter to quickly tell if we don't have the content.
139  */
140 static struct GNUNET_CONTAINER_BloomFilter *filter;
141
142 /**
143  * How much space are we allowed to use?
144  */
145 static unsigned long long quota;
146
147 /**
148  * Should the database be dropped on exit?
149  */
150 static int do_drop;
151
152 /**
153  * How much space are we using for the cache?  (space available for
154  * insertions that will be instantly reclaimed by discarding less
155  * important content --- or possibly whatever we just inserted into
156  * the "cache").
157  */
158 static unsigned long long cache_size;
159
160 /**
161  * How much space have we currently reserved?
162  */
163 static unsigned long long reserved;
164   
165 /**
166  * How much data are we currently storing
167  * in the database?
168  */
169 static unsigned long long payload;
170
171 /**
172  * Number of updates that were made to the
173  * payload value since we last synchronized
174  * it with the statistics service.
175  */
176 static unsigned int lastSync;
177
178 /**
179  * Did we get an answer from statistics?
180  */
181 static int stats_worked;
182
183 /**
184  * Identity of the task that is used to delete
185  * expired content.
186  */
187 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
188
189 /**
190  * Our configuration.
191  */
192 const struct GNUNET_CONFIGURATION_Handle *cfg;
193
194
195 /**
196  * Handle for reporting statistics.
197  */
198 static struct GNUNET_STATISTICS_Handle *stats;
199
200
201 /**
202  * Synchronize our utilization statistics with the 
203  * statistics service.
204  */
205 static void 
206 sync_stats ()
207 {
208   GNUNET_STATISTICS_set (stats,
209                          QUOTA_STAT_NAME,
210                          payload,
211                          GNUNET_YES);
212   lastSync = 0;
213 }
214
215
216
217 /**
218  * Context for transmitting replies to clients.
219  */
220 struct TransmitCallbackContext 
221 {
222   
223   /**
224    * We keep these in a doubly-linked list (for cleanup).
225    */
226   struct TransmitCallbackContext *next;
227   
228   /**
229    * We keep these in a doubly-linked list (for cleanup).
230    */
231   struct TransmitCallbackContext *prev;
232   
233   /**
234    * The message that we're asked to transmit.
235    */
236   struct GNUNET_MessageHeader *msg;
237   
238   /**
239    * Handle for the transmission request.
240    */
241   struct GNUNET_CONNECTION_TransmitHandle *th;
242
243   /**
244    * Client that we are transmitting to.
245    */
246   struct GNUNET_SERVER_Client *client;
247
248 };
249
250   
251 /**
252  * Head of the doubly-linked list (for cleanup).
253  */
254 static struct TransmitCallbackContext *tcc_head;
255
256 /**
257  * Tail of the doubly-linked list (for cleanup).
258  */
259 static struct TransmitCallbackContext *tcc_tail;
260
261 /**
262  * Have we already cleaned up the TCCs and are hence no longer
263  * willing (or able) to transmit anything to anyone?
264  */
265 static int cleaning_done;
266
267 /**
268  * Handle for pending get request.
269  */
270 static struct GNUNET_STATISTICS_GetHandle *stat_get;
271
272
273 /**
274  * Task that is used to remove expired entries from
275  * the datastore.  This task will schedule itself
276  * again automatically to always delete all expired
277  * content quickly.
278  *
279  * @param cls not used
280  * @param tc task context
281  */ 
282 static void
283 delete_expired (void *cls,
284                 const struct GNUNET_SCHEDULER_TaskContext *tc);
285
286
287 /**
288  * Iterate over the expired items stored in the datastore.
289  * Delete all expired items; once we have processed all
290  * expired items, re-schedule the "delete_expired" task.
291  *
292  * @param cls not used
293  * @param next_cls closure to pass to the "next" function.
294  * @param key key for the content
295  * @param size number of bytes in data
296  * @param data content stored
297  * @param type type of the content
298  * @param priority priority of the content
299  * @param anonymity anonymity-level for the content
300  * @param expiration expiration time for the content
301  * @param uid unique identifier for the datum;
302  *        maybe 0 if no unique identifier is available
303  *
304  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
305  *         (continue on call to "next", of course),
306  *         GNUNET_NO to delete the item and continue (if supported)
307  */
308 static int 
309 expired_processor (void *cls,
310                    const GNUNET_HashCode * key,
311                    uint32_t size,
312                    const void *data,
313                    enum GNUNET_BLOCK_Type type,
314                    uint32_t priority,
315                    uint32_t anonymity,
316                    struct GNUNET_TIME_Absolute
317                    expiration, 
318                    uint64_t uid)
319 {
320   struct GNUNET_TIME_Absolute now;
321
322   if (key == NULL) 
323     {
324       expired_kill_task 
325         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
326                                         &delete_expired,
327                                         NULL);
328       return GNUNET_SYSERR;
329     }
330   now = GNUNET_TIME_absolute_get ();
331   if (expiration.abs_value > now.abs_value)
332     {
333       /* finished processing */
334       expired_kill_task 
335         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
336                                         &delete_expired,
337                                         NULL);
338       return GNUNET_SYSERR;
339     }
340 #if DEBUG_DATASTORE
341   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
342               "Deleting content `%s' of type %u that expired %llu ms ago\n",
343               GNUNET_h2s (key),
344               type,
345               (unsigned long long) (now.abs_value - expiration.abs_value));
346 #endif
347   GNUNET_STATISTICS_update (stats,
348                             gettext_noop ("# bytes expired"),
349                             size,
350                             GNUNET_YES);
351   GNUNET_CONTAINER_bloomfilter_remove (filter,
352                                        key);
353   expired_kill_task 
354     = GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY,
355                                     &delete_expired,
356                                     NULL);
357   return GNUNET_NO;
358 }
359
360
361 /**
362  * Task that is used to remove expired entries from
363  * the datastore.  This task will schedule itself
364  * again automatically to always delete all expired
365  * content quickly.
366  *
367  * @param cls not used
368  * @param tc task context
369  */ 
370 static void
371 delete_expired (void *cls,
372                 const struct GNUNET_SCHEDULER_TaskContext *tc)
373 {
374   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
375   plugin->api->get_expiration (plugin->api->cls, 
376                                &expired_processor,
377                                NULL);
378 }
379
380
381 /**
382  * An iterator over a set of items stored in the datastore
383  * that deletes until we're happy with respect to our quota.
384  *
385  * @param cls closure
386  * @param next_cls closure to pass to the "next" function.
387  * @param key key for the content
388  * @param size number of bytes in data
389  * @param data content stored
390  * @param type type of the content
391  * @param priority priority of the content
392  * @param anonymity anonymity-level for the content
393  * @param expiration expiration time for the content
394  * @param uid unique identifier for the datum;
395  *        maybe 0 if no unique identifier is available
396  *
397  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
398  *         (continue on call to "next", of course),
399  *         GNUNET_NO to delete the item and continue (if supported)
400  */
401 static int 
402 quota_processor (void *cls,
403                  const GNUNET_HashCode * key,
404                  uint32_t size,
405                  const void *data,
406                  enum GNUNET_BLOCK_Type type,
407                  uint32_t priority,
408                  uint32_t anonymity,
409                  struct GNUNET_TIME_Absolute expiration, 
410                  uint64_t uid)
411 {
412   unsigned long long *need = cls;
413
414   if (NULL == key)
415     return GNUNET_SYSERR;    
416 #if DEBUG_DATASTORE
417   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
418               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
419               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
420               GNUNET_h2s (key),
421               type,
422               *need);
423 #endif
424   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
425     *need = 0;
426   else
427     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
428   GNUNET_STATISTICS_update (stats,
429                             gettext_noop ("# bytes purged (low-priority)"),
430                             size,
431                             GNUNET_YES);
432   GNUNET_CONTAINER_bloomfilter_remove (filter,
433                                        key);
434   return GNUNET_NO;
435 }
436
437
438 /**
439  * Manage available disk space by running tasks
440  * that will discard content if necessary.  This
441  * function will be run whenever a request for
442  * "need" bytes of storage could only be satisfied
443  * by eating into the "cache" (and we want our cache
444  * space back).
445  *
446  * @param need number of bytes of content that were
447  *        placed into the "cache" (and hence the
448  *        number of bytes that should be removed).
449  */
450 static void
451 manage_space (unsigned long long need)
452 {
453   unsigned long long last;
454
455 #if DEBUG_DATASTORE
456   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
457               "Asked to free up %llu bytes of cache space\n",
458               need);
459 #endif
460   last = 0;
461   while ( (need > 0) &&
462           (last != need) )
463     {
464       last = need;
465       plugin->api->get_expiration (plugin->api->cls,
466                                    &quota_processor,
467                                    &need);    
468     }
469 }
470
471
472 /**
473  * Function called to notify a client about the socket
474  * begin ready to queue more data.  "buf" will be
475  * NULL and "size" zero if the socket was closed for
476  * writing in the meantime.
477  *
478  * @param cls closure
479  * @param size number of bytes available in buf
480  * @param buf where the callee should write the message
481  * @return number of bytes written to buf
482  */
483 static size_t
484 transmit_callback (void *cls,
485                    size_t size, void *buf)
486 {
487   struct TransmitCallbackContext *tcc = cls;
488   size_t msize;
489   
490   tcc->th = NULL;
491   GNUNET_CONTAINER_DLL_remove (tcc_head,
492                                tcc_tail,
493                                tcc);
494   msize = ntohs(tcc->msg->size);
495   if (size == 0)
496     {
497       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
498                   _("Transmission to client failed!\n"));
499       GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
500       GNUNET_SERVER_client_drop (tcc->client);
501       GNUNET_free (tcc->msg);
502       GNUNET_free (tcc);
503       return 0;
504     }
505   GNUNET_assert (size >= msize);
506   memcpy (buf, tcc->msg, msize);
507   GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
508   GNUNET_SERVER_client_drop (tcc->client);
509   GNUNET_free (tcc->msg);
510   GNUNET_free (tcc);
511   return msize;
512 }
513
514
515 /**
516  * Transmit the given message to the client.
517  *
518  * @param client target of the message
519  * @param msg message to transmit, will be freed!
520  * @param tc function to call afterwards
521  * @param tc_cls closure for tc
522  */
523 static void
524 transmit (struct GNUNET_SERVER_Client *client,
525           struct GNUNET_MessageHeader *msg)
526 {
527   struct TransmitCallbackContext *tcc;
528
529   if (GNUNET_YES == cleaning_done)
530     {
531 #if DEBUG_DATASTORE
532       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
533                   "Shutdown in progress, aborting transmission.\n");
534 #endif
535       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
536       GNUNET_free (msg);
537       return;
538     }
539   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
540   tcc->msg = msg;
541   tcc->client = client;
542   if (NULL ==
543       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
544                                                       ntohs(msg->size),
545                                                       GNUNET_TIME_UNIT_FOREVER_REL,
546                                                       &transmit_callback,
547                                                       tcc)))
548     {
549       GNUNET_break (0);
550       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
551       GNUNET_free (msg);
552       GNUNET_free (tcc);
553       return;
554     }
555   GNUNET_SERVER_client_keep (client);
556   GNUNET_CONTAINER_DLL_insert (tcc_head,
557                                tcc_tail,
558                                tcc);
559 }
560
561
562 /**
563  * Transmit a status code to the client.
564  *
565  * @param client receiver of the response
566  * @param code status code
567  * @param msg optional error message (can be NULL)
568  */
569 static void
570 transmit_status (struct GNUNET_SERVER_Client *client,
571                  int code,
572                  const char *msg)
573 {
574   struct StatusMessage *sm;
575   size_t slen;
576
577 #if DEBUG_DATASTORE
578   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
579               "Transmitting `%s' message with value %d and message `%s'\n",
580               "STATUS",
581               code,
582               msg != NULL ? msg : "(none)");
583 #endif
584   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
585   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
586   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
587   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
588   sm->status = htonl(code);
589   if (slen > 0)
590     memcpy (&sm[1], msg, slen);  
591   transmit (client, &sm->header);
592 }
593
594
595
596 /**
597  * Function that will transmit the given datastore entry
598  * to the client.
599  *
600  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
601  * @param next_cls closure to use to ask for the next item
602  * @param key key for the content
603  * @param size number of bytes in data
604  * @param data content stored
605  * @param type type of the content
606  * @param priority priority of the content
607  * @param anonymity anonymity-level for the content
608  * @param expiration expiration time for the content
609  * @param uid unique identifier for the datum;
610  *        maybe 0 if no unique identifier is available
611  *
612  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
613  *         GNUNET_NO to delete the item and continue (if supported)
614  */
615 static int
616 transmit_item (void *cls,
617                const GNUNET_HashCode * key,
618                uint32_t size,
619                const void *data,
620                enum GNUNET_BLOCK_Type type,
621                uint32_t priority,
622                uint32_t anonymity,
623                struct GNUNET_TIME_Absolute
624                expiration, uint64_t uid)
625 {
626   struct GNUNET_SERVER_Client *client = cls;
627   struct GNUNET_MessageHeader *end;
628   struct DataMessage *dm;
629
630   if (key == NULL)
631     {
632       /* transmit 'DATA_END' */
633 #if DEBUG_DATASTORE
634       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
635                   "Transmitting `%s' message\n",
636                   "DATA_END");
637 #endif
638       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
639       end->size = htons(sizeof(struct GNUNET_MessageHeader));
640       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
641       transmit (client, end);
642       GNUNET_SERVER_client_drop (client);
643       return GNUNET_OK;
644     }
645   GNUNET_assert (sizeof (struct DataMessage) + size < GNUNET_SERVER_MAX_MESSAGE_SIZE);
646   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
647   dm->header.size = htons(sizeof(struct DataMessage) + size);
648   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
649   dm->rid = htonl(0);
650   dm->size = htonl(size);
651   dm->type = htonl(type);
652   dm->priority = htonl(priority);
653   dm->anonymity = htonl(anonymity);
654   dm->replication = htonl (0);
655   dm->reserved = htonl (0);
656   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
657   dm->uid = GNUNET_htonll(uid);
658   dm->key = *key;
659   memcpy (&dm[1], data, size);
660 #if DEBUG_DATASTORE
661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
662               "Transmitting `%s' message for `%s' of type %u with expiration %llu (now: %llu)\n",
663               "DATA",
664               GNUNET_h2s (key),
665               type,
666               (unsigned long long) expiration.abs_value,
667               (unsigned long long) GNUNET_TIME_absolute_get ().abs_value);
668 #endif
669   GNUNET_STATISTICS_update (stats,
670                             gettext_noop ("# results found"),
671                             1,
672                             GNUNET_NO);
673   transmit (client, &dm->header);
674   return GNUNET_OK;
675 }
676
677
678 /**
679  * Handle RESERVE-message.
680  *
681  * @param cls closure
682  * @param client identification of the client
683  * @param message the actual message
684  */
685 static void
686 handle_reserve (void *cls,
687                 struct GNUNET_SERVER_Client *client,
688                 const struct GNUNET_MessageHeader *message)
689 {
690   /**
691    * Static counter to produce reservation identifiers.
692    */
693   static int reservation_gen;
694
695   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
696   struct ReservationList *e;
697   unsigned long long used;
698   unsigned long long req;
699   uint64_t amount;
700   uint32_t entries;
701
702 #if DEBUG_DATASTORE
703   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
704               "Processing `%s' request\n",
705               "RESERVE");
706 #endif
707   amount = GNUNET_ntohll(msg->amount);
708   entries = ntohl(msg->entries);
709   used = payload + reserved;
710   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
711   if (used + req > quota)
712     {
713       if (quota < used)
714         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
715       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
716                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
717                   quota - used,
718                   "RESERVE",
719                   req);
720       if (cache_size < req)
721         {
722           /* TODO: document this in the FAQ; essentially, if this
723              message happens, the insertion request could be blocked
724              by less-important content from migration because it is
725              larger than 1/8th of the overall available space, and
726              we only reserve 1/8th for "fresh" insertions */
727           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
728                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
729                       req,
730                       cache_size);
731           transmit_status (client, 0, 
732                            gettext_noop ("Insufficient space to satisfy request and "
733                                          "requested amount is larger than cache size"));
734         }
735       else
736         {
737           transmit_status (client, 0, 
738                            gettext_noop ("Insufficient space to satisfy request"));
739         }
740       return;      
741     }
742   reserved += req;
743   GNUNET_STATISTICS_set (stats,
744                          gettext_noop ("# reserved"),
745                          reserved,
746                          GNUNET_NO);
747   e = GNUNET_malloc (sizeof(struct ReservationList));
748   e->next = reservations;
749   reservations = e;
750   e->client = client;
751   e->amount = amount;
752   e->entries = entries;
753   e->rid = ++reservation_gen;
754   if (reservation_gen < 0)
755     reservation_gen = 0; /* wrap around */
756   transmit_status (client, e->rid, NULL);
757 }
758
759
760 /**
761  * Handle RELEASE_RESERVE-message.
762  *
763  * @param cls closure
764  * @param client identification of the client
765  * @param message the actual message
766  */
767 static void
768 handle_release_reserve (void *cls,
769                         struct GNUNET_SERVER_Client *client,
770                         const struct GNUNET_MessageHeader *message)
771 {
772   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
773   struct ReservationList *pos;
774   struct ReservationList *prev;
775   struct ReservationList *next;
776   int rid = ntohl(msg->rid);
777   unsigned long long rem;
778
779 #if DEBUG_DATASTORE
780   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
781               "Processing `%s' request\n",
782               "RELEASE_RESERVE");
783 #endif
784   next = reservations;
785   prev = NULL;
786   while (NULL != (pos = next))
787     {
788       next = pos->next;
789       if (rid == pos->rid)
790         {
791           if (prev == NULL)
792             reservations = next;
793           else
794             prev->next = next;
795           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
796           GNUNET_assert (reserved >= rem);
797           reserved -= rem;
798           GNUNET_STATISTICS_set (stats,
799                          gettext_noop ("# reserved"),
800                                  reserved,
801                                  GNUNET_NO);
802 #if DEBUG_DATASTORE
803           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
804                       "Returning %llu remaining reserved bytes to storage pool\n",
805                       rem);
806 #endif    
807           GNUNET_free (pos);
808           transmit_status (client, GNUNET_OK, NULL);
809           return;
810         }       
811       prev = pos;
812     }
813   GNUNET_break (0);
814   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
815 }
816
817
818 /**
819  * Check that the given message is a valid data message.
820  *
821  * @return NULL if the message is not well-formed, otherwise the message
822  */
823 static const struct DataMessage *
824 check_data (const struct GNUNET_MessageHeader *message)
825 {
826   uint16_t size;
827   uint32_t dsize;
828   const struct DataMessage *dm;
829
830   size = ntohs(message->size);
831   if (size < sizeof(struct DataMessage))
832     { 
833       GNUNET_break (0);
834       return NULL;
835     }
836   dm = (const struct DataMessage *) message;
837   dsize = ntohl(dm->size);
838   if (size != dsize + sizeof(struct DataMessage))
839     {
840       GNUNET_break (0);
841       return NULL;
842     }
843   return dm;
844 }
845
846
847 /**
848  * Context for a put request used to see if the content is
849  * already present.
850  */
851 struct PutContext
852 {
853   /**
854    * Client to notify on completion.
855    */
856   struct GNUNET_SERVER_Client *client;
857   
858   /* followed by the 'struct DataMessage' */
859 };
860
861
862 /**
863  * Actually put the data message.
864  */
865 static void
866 execute_put (struct GNUNET_SERVER_Client *client,
867              const struct DataMessage *dm)
868 {
869   uint32_t size;
870   char *msg;
871   int ret;
872
873   size = ntohl(dm->size);
874   msg = NULL;
875   ret = plugin->api->put (plugin->api->cls,
876                           &dm->key,
877                           size,
878                           &dm[1],
879                           ntohl(dm->type),
880                           ntohl(dm->priority),
881                           ntohl(dm->anonymity),
882                           ntohl(dm->replication),
883                           GNUNET_TIME_absolute_ntoh(dm->expiration),
884                           &msg);
885   if (GNUNET_OK == ret)
886     {
887       GNUNET_STATISTICS_update (stats,
888                                 gettext_noop ("# bytes stored"),
889                                 size,
890                                 GNUNET_YES);
891       GNUNET_CONTAINER_bloomfilter_add (filter,
892                                         &dm->key);
893 #if DEBUG_DATASTORE
894       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
895                   "Successfully stored %u bytes of type %u under key `%s'\n",
896                   size,
897                   ntohl(dm->type),
898                   GNUNET_h2s (&dm->key));
899 #endif
900     }
901   transmit_status (client, 
902                    ret,
903                    msg);
904   GNUNET_free_non_null (msg);
905   if (quota - reserved - cache_size < payload)
906     {
907       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
908                   _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
909                   (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
910                   (unsigned long long) (quota - reserved - cache_size),
911                   (unsigned long long) payload);
912       manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
913     }
914 }
915
916
917 /**
918  * Function that will check if the given datastore entry
919  * matches the put and if none match executes the put.
920  *
921  * @param cls closure, pointer to the client (of type 'struct PutContext').
922  * @param key key for the content
923  * @param size number of bytes in data
924  * @param data content stored
925  * @param type type of the content
926  * @param priority priority of the content
927  * @param anonymity anonymity-level for the content
928  * @param expiration expiration time for the content
929  * @param uid unique identifier for the datum;
930  *        maybe 0 if no unique identifier is available
931  *
932  * @return GNUNET_OK usually
933  *         GNUNET_NO to delete the item 
934  */
935 static int
936 check_present (void *cls,
937                const GNUNET_HashCode * key,
938                uint32_t size,
939                const void *data,
940                enum GNUNET_BLOCK_Type type,
941                uint32_t priority,
942                uint32_t anonymity,
943                struct GNUNET_TIME_Absolute
944                expiration, uint64_t uid)
945 {
946   struct PutContext *pc = cls;
947   const struct DataMessage *dm;
948
949   dm = (const struct DataMessage*) &pc[1];
950   if (key == NULL)
951     {
952       execute_put (pc->client, dm);
953       GNUNET_SERVER_client_drop (pc->client);
954       GNUNET_free (pc);
955       return GNUNET_OK;
956     }
957   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
958        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
959        ( (size == ntohl(dm->size)) &&
960          (0 == memcmp (&dm[1],
961                        data,
962                        size)) ) )
963     {
964 #if DEBUG_MYSQL
965       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
966                   "Result already present in datastore\n");
967 #endif
968       /* FIXME: change API to allow increasing 'replication' counter */
969       if ( (ntohl (dm->priority) > 0) ||
970            (GNUNET_TIME_absolute_ntoh(dm->expiration).abs_value >
971             expiration.abs_value) )
972         plugin->api->update (plugin->api->cls,
973                              uid,
974                              (int32_t) ntohl(dm->priority),
975                              GNUNET_TIME_absolute_ntoh(dm->expiration),
976                              NULL);
977       transmit_status (pc->client, GNUNET_NO, NULL);
978       GNUNET_SERVER_client_drop (pc->client);
979       GNUNET_free (pc);
980     }
981   else
982     {
983       execute_put (pc->client, dm);
984       GNUNET_SERVER_client_drop (pc->client);
985       GNUNET_free (pc);
986     }
987   return GNUNET_OK;
988 }
989
990
991 /**
992  * Handle PUT-message.
993  *
994  * @param cls closure
995  * @param client identification of the client
996  * @param message the actual message
997  */
998 static void
999 handle_put (void *cls,
1000             struct GNUNET_SERVER_Client *client,
1001             const struct GNUNET_MessageHeader *message)
1002 {
1003   const struct DataMessage *dm = check_data (message);
1004   int rid;
1005   struct ReservationList *pos;
1006   struct PutContext *pc;
1007   GNUNET_HashCode vhash;
1008   uint32_t size;
1009
1010   if ( (dm == NULL) ||
1011        (ntohl(dm->type) == 0) ) 
1012     {
1013       GNUNET_break (0);
1014       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1015       return;
1016     }
1017 #if DEBUG_DATASTORE
1018   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1019               "Processing `%s' request for `%s' of type %u\n",
1020               "PUT",
1021               GNUNET_h2s (&dm->key),
1022               ntohl (dm->type));
1023 #endif
1024   rid = ntohl(dm->rid);
1025   size = ntohl(dm->size);
1026   if (rid > 0)
1027     {
1028       pos = reservations;
1029       while ( (NULL != pos) &&
1030               (rid != pos->rid) )
1031         pos = pos->next;
1032       GNUNET_break (pos != NULL);
1033       if (NULL != pos)
1034         {
1035           GNUNET_break (pos->entries > 0);
1036           GNUNET_break (pos->amount >= size);
1037           pos->entries--;
1038           pos->amount -= size;
1039           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1040           GNUNET_STATISTICS_set (stats,
1041                                  gettext_noop ("# reserved"),
1042                                  reserved,
1043                                  GNUNET_NO);
1044         }
1045     }
1046   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1047                                                        &dm->key))
1048     {
1049       GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
1050       pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1051       pc->client = client;
1052       GNUNET_SERVER_client_keep (client);
1053       memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1054       plugin->api->get_key (plugin->api->cls,
1055                             0,
1056                             &dm->key,
1057                             &vhash,
1058                             ntohl (dm->type),
1059                             &check_present,
1060                             pc);      
1061       return;
1062     }
1063   execute_put (client, dm);
1064 }
1065
1066
1067 /**
1068  * Handle GET-message.
1069  *
1070  * @param cls closure
1071  * @param client identification of the client
1072  * @param message the actual message
1073  */
1074 static void
1075 handle_get (void *cls,
1076             struct GNUNET_SERVER_Client *client,
1077             const struct GNUNET_MessageHeader *message)
1078 {
1079   const struct GetMessage *msg;
1080   uint16_t size;
1081
1082   size = ntohs(message->size);
1083   if ( (size != sizeof(struct GetMessage)) &&
1084        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
1085     {
1086       GNUNET_break (0);
1087       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1088       return;
1089     }
1090   msg = (const struct GetMessage*) message;
1091 #if DEBUG_DATASTORE
1092   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1093               "Processing `%s' request for `%s' of type %u\n",
1094               "GET",
1095               GNUNET_h2s (&msg->key),
1096               ntohl (msg->type));
1097 #endif
1098   GNUNET_STATISTICS_update (stats,
1099                             gettext_noop ("# GET requests received"),
1100                             1,
1101                             GNUNET_NO);
1102   GNUNET_SERVER_client_keep (client);
1103   if ( (size == sizeof(struct GetMessage)) &&
1104        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
1105                                                          &msg->key)) )
1106     {
1107       /* don't bother database... */
1108 #if DEBUG_DATASTORE
1109       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110                   "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1111                   "GET",
1112                   GNUNET_h2s (&msg->key));
1113 #endif  
1114       GNUNET_STATISTICS_update (stats,
1115                                 gettext_noop ("# requests filtered by bloomfilter"),
1116                                 1,
1117                                 GNUNET_NO);
1118       transmit_item (client,
1119                      NULL, 0, NULL, 0, 0, 0, 
1120                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
1121       return;
1122     }
1123   plugin->api->get_key (plugin->api->cls,
1124                         GNUNET_ntohll (msg->offset),
1125                         ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1126                         NULL,
1127                         ntohl(msg->type),
1128                         &transmit_item,
1129                         client);    
1130 }
1131
1132
1133 /**
1134  * Handle UPDATE-message.
1135  *
1136  * @param cls closure
1137  * @param client identification of the client
1138  * @param message the actual message
1139  */
1140 static void
1141 handle_update (void *cls,
1142                struct GNUNET_SERVER_Client *client,
1143                const struct GNUNET_MessageHeader *message)
1144 {
1145   const struct UpdateMessage *msg;
1146   int ret;
1147   char *emsg;
1148
1149   GNUNET_STATISTICS_update (stats,
1150                             gettext_noop ("# UPDATE requests received"),
1151                             1,
1152                             GNUNET_NO);
1153   msg = (const struct UpdateMessage*) message;
1154   emsg = NULL;
1155 #if DEBUG_DATASTORE
1156   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157               "Processing `%s' request for %llu\n",
1158               "UPDATE",
1159               (unsigned long long) GNUNET_ntohll (msg->uid));
1160 #endif
1161   ret = plugin->api->update (plugin->api->cls,
1162                              GNUNET_ntohll(msg->uid),
1163                              (int32_t) ntohl(msg->priority),
1164                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1165                              &emsg);
1166   transmit_status (client, ret, emsg);
1167   GNUNET_free_non_null (emsg);
1168 }
1169
1170
1171 /**
1172  * Handle GET_REPLICATION-message.
1173  *
1174  * @param cls closure
1175  * @param client identification of the client
1176  * @param message the actual message
1177  */
1178 static void
1179 handle_get_replication (void *cls,
1180                         struct GNUNET_SERVER_Client *client,
1181                         const struct GNUNET_MessageHeader *message)
1182 {
1183 #if DEBUG_DATASTORE
1184   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185               "Processing `%s' request\n",
1186               "GET_REPLICATION");
1187 #endif
1188   GNUNET_STATISTICS_update (stats,
1189                             gettext_noop ("# GET REPLICATION requests received"),
1190                             1,
1191                             GNUNET_NO);
1192   GNUNET_SERVER_client_keep (client);
1193   plugin->api->get_replication (plugin->api->cls,
1194                                 &transmit_item,
1195                                 client);  
1196 }
1197
1198
1199 /**
1200  * Handle GET_ZERO_ANONYMITY-message.
1201  *
1202  * @param cls closure
1203  * @param client identification of the client
1204  * @param message the actual message
1205  */
1206 static void
1207 handle_get_zero_anonymity (void *cls,
1208                            struct GNUNET_SERVER_Client *client,
1209                            const struct GNUNET_MessageHeader *message)
1210 {
1211   const struct GetZeroAnonymityMessage * msg = (const struct GetZeroAnonymityMessage*) message;
1212   enum GNUNET_BLOCK_Type type;
1213
1214   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1215   if (type == GNUNET_BLOCK_TYPE_ANY)
1216     {
1217       GNUNET_break (0);
1218       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1219       return;
1220     }
1221 #if DEBUG_DATASTORE
1222   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1223               "Processing `%s' request\n",
1224               "GET_ZERO_ANONYMITY");
1225 #endif
1226   GNUNET_STATISTICS_update (stats,
1227                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1228                             1,
1229                             GNUNET_NO);
1230   GNUNET_SERVER_client_keep (client);
1231   plugin->api->get_zero_anonymity (plugin->api->cls,
1232                                    GNUNET_ntohll (msg->offset),
1233                                    type,
1234                                    &transmit_item,
1235                                    client);  
1236 }
1237
1238
1239 /**
1240  * Callback function that will cause the item that is passed
1241  * in to be deleted (by returning GNUNET_NO).
1242  */
1243 static int
1244 remove_callback (void *cls,
1245                  const GNUNET_HashCode * key,
1246                  uint32_t size,
1247                  const void *data,
1248                  enum GNUNET_BLOCK_Type type,
1249                  uint32_t priority,
1250                  uint32_t anonymity,
1251                  struct GNUNET_TIME_Absolute
1252                  expiration, uint64_t uid)
1253 {
1254   struct GNUNET_SERVER_Client *client = cls;
1255
1256   if (key == NULL)
1257     {
1258 #if DEBUG_DATASTORE
1259       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1260                   "No further matches for `%s' request.\n",
1261                   "REMOVE");
1262 #endif  
1263       transmit_status (client, GNUNET_NO, _("Content not found"));              
1264       GNUNET_SERVER_client_drop (client);
1265       return GNUNET_OK; /* last item */
1266     }
1267 #if DEBUG_DATASTORE
1268   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1269               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1270               (unsigned long long) uid,
1271               "REMOVE",
1272               GNUNET_h2s (key),
1273               type);
1274 #endif  
1275   GNUNET_STATISTICS_update (stats,
1276                             gettext_noop ("# bytes removed (explicit request)"),
1277                             size,
1278                             GNUNET_YES);
1279   GNUNET_CONTAINER_bloomfilter_remove (filter,
1280                                        key);
1281   transmit_status (client, GNUNET_OK, NULL);       
1282   GNUNET_SERVER_client_drop (client);
1283   return GNUNET_NO;
1284 }
1285
1286
1287 /**
1288  * Handle REMOVE-message.
1289  *
1290  * @param cls closure
1291  * @param client identification of the client
1292  * @param message the actual message
1293  */
1294 static void
1295 handle_remove (void *cls,
1296                struct GNUNET_SERVER_Client *client,
1297                const struct GNUNET_MessageHeader *message)
1298 {
1299   const struct DataMessage *dm = check_data (message);
1300   GNUNET_HashCode vhash;
1301
1302   if (dm == NULL)
1303     {
1304       GNUNET_break (0);
1305       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1306       return;
1307     }
1308 #if DEBUG_DATASTORE
1309   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1310               "Processing `%s' request for `%s' of type %u\n",
1311               "REMOVE",
1312               GNUNET_h2s (&dm->key),
1313               ntohl (dm->type));
1314 #endif
1315   GNUNET_STATISTICS_update (stats,
1316                             gettext_noop ("# REMOVE requests received"),
1317                             1,
1318                             GNUNET_NO);
1319   GNUNET_SERVER_client_keep (client);
1320   GNUNET_CRYPTO_hash (&dm[1],
1321                       ntohl(dm->size),
1322                       &vhash);
1323   plugin->api->get_key (plugin->api->cls,
1324                         0,
1325                         &dm->key,
1326                         &vhash,
1327                         (enum GNUNET_BLOCK_Type) ntohl(dm->type),
1328                         &remove_callback,
1329                         client);
1330 }
1331
1332
1333 /**
1334  * Handle DROP-message.
1335  *
1336  * @param cls closure
1337  * @param client identification of the client
1338  * @param message the actual message
1339  */
1340 static void
1341 handle_drop (void *cls,
1342              struct GNUNET_SERVER_Client *client,
1343              const struct GNUNET_MessageHeader *message)
1344 {
1345 #if DEBUG_DATASTORE
1346   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1347               "Processing `%s' request\n",
1348               "DROP");
1349 #endif
1350   do_drop = GNUNET_YES;
1351   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1352 }
1353
1354
1355 /**
1356  * Function called by plugins to notify us about a
1357  * change in their disk utilization.
1358  *
1359  * @param cls closure (NULL)
1360  * @param delta change in disk utilization, 
1361  *        0 for "reset to empty"
1362  */
1363 static void
1364 disk_utilization_change_cb (void *cls,
1365                             int delta)
1366 {
1367   if ( (delta < 0) &&
1368        (payload < -delta) )
1369     {
1370       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1371                   _("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1372                   (long long) payload,
1373                   (long long) -delta);
1374       payload = plugin->api->estimate_size (plugin->api->cls);
1375       sync_stats ();
1376       return;
1377     }
1378   payload += delta;
1379   lastSync++;
1380   if (lastSync >= MAX_STAT_SYNC_LAG)
1381     sync_stats ();
1382 }
1383
1384
1385 /**
1386  * Callback function to process statistic values.
1387  *
1388  * @param cls closure (struct Plugin*)
1389  * @param subsystem name of subsystem that created the statistic
1390  * @param name the name of the datum
1391  * @param value the current value
1392  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1393  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1394  */
1395 static int
1396 process_stat_in (void *cls,
1397                  const char *subsystem,
1398                  const char *name,
1399                  uint64_t value,
1400                  int is_persistent)
1401 {
1402   GNUNET_assert (stats_worked == GNUNET_NO);
1403   stats_worked = GNUNET_YES;
1404   payload += value;
1405 #if DEBUG_SQLITE
1406   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1407               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1408               abs_value,
1409               payload);
1410 #endif
1411   return GNUNET_OK;
1412 }
1413
1414
1415 static void
1416 process_stat_done (void *cls,
1417                    int success)
1418 {
1419   struct DatastorePlugin *plugin = cls;
1420
1421   stat_get = NULL;
1422   if (stats_worked == GNUNET_NO) 
1423     payload = plugin->api->estimate_size (plugin->api->cls);
1424 }
1425
1426
1427 /**
1428  * Load the datastore plugin.
1429  */
1430 static struct DatastorePlugin *
1431 load_plugin () 
1432 {
1433   struct DatastorePlugin *ret;
1434   char *libname;
1435   char *name;
1436
1437   if (GNUNET_OK !=
1438       GNUNET_CONFIGURATION_get_value_string (cfg,
1439                                              "DATASTORE", "DATABASE", &name))
1440     {
1441       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1442                   _("No `%s' specified for `%s' in configuration!\n"),
1443                   "DATABASE",
1444                   "DATASTORE");
1445       return NULL;
1446     }
1447   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1448   ret->env.cfg = cfg;
1449   ret->env.duc = &disk_utilization_change_cb;
1450   ret->env.cls = NULL;
1451   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1452               _("Loading `%s' datastore plugin\n"), name);
1453   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1454   ret->short_name = name;
1455   ret->lib_name = libname;
1456   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1457   if (ret->api == NULL)
1458     {
1459       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1460                   _("Failed to load datastore plugin for `%s'\n"), name);
1461       GNUNET_free (ret->short_name);
1462       GNUNET_free (libname);
1463       GNUNET_free (ret);
1464       return NULL;
1465     }
1466   return ret;
1467 }
1468
1469
1470 /**
1471  * Function called when the service shuts
1472  * down.  Unloads our datastore plugin.
1473  *
1474  * @param plug plugin to unload
1475  */
1476 static void
1477 unload_plugin (struct DatastorePlugin *plug)
1478 {
1479 #if DEBUG_DATASTORE
1480   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1481               "Datastore service is unloading plugin...\n");
1482 #endif
1483   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1484   GNUNET_free (plug->lib_name);
1485   GNUNET_free (plug->short_name);
1486   GNUNET_free (plug);
1487 }
1488
1489
1490 /**
1491  * Final task run after shutdown.  Unloads plugins and disconnects us from
1492  * statistics.
1493  */
1494 static void
1495 unload_task (void *cls,
1496              const struct GNUNET_SCHEDULER_TaskContext *tc)
1497 {
1498   if (GNUNET_YES == do_drop)
1499     plugin->api->drop (plugin->api->cls);
1500   unload_plugin (plugin);
1501   plugin = NULL;
1502   if (filter != NULL)
1503     {
1504       GNUNET_CONTAINER_bloomfilter_free (filter);
1505       filter = NULL;
1506     }
1507   if (lastSync > 0)
1508     sync_stats ();
1509   if (stat_get != NULL)
1510     {
1511       GNUNET_STATISTICS_get_cancel (stat_get);
1512       stat_get = NULL;
1513     }
1514   if (stats != NULL)
1515     {
1516       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1517       stats = NULL;
1518     }
1519 }
1520
1521
1522 /**
1523  * Last task run during shutdown.  Disconnects us from
1524  * the transport and core.
1525  */
1526 static void
1527 cleaning_task (void *cls,
1528                const struct GNUNET_SCHEDULER_TaskContext *tc)
1529 {
1530   struct TransmitCallbackContext *tcc;
1531
1532   cleaning_done = GNUNET_YES;
1533   while (NULL != (tcc = tcc_head))
1534     {
1535       GNUNET_CONTAINER_DLL_remove (tcc_head,
1536                                    tcc_tail,
1537                                    tcc);
1538       if (tcc->th != NULL)
1539         {
1540           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1541           GNUNET_SERVER_client_drop (tcc->client);
1542         }
1543       GNUNET_free (tcc->msg);
1544       GNUNET_free (tcc);
1545     }
1546   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1547     {
1548       GNUNET_SCHEDULER_cancel (expired_kill_task);
1549       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1550     }
1551   GNUNET_SCHEDULER_add_continuation (&unload_task,
1552                                      NULL,
1553                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1554 }
1555
1556
1557 /**
1558  * Function that removes all active reservations made
1559  * by the given client and releases the space for other
1560  * requests.
1561  *
1562  * @param cls closure
1563  * @param client identification of the client
1564  */
1565 static void
1566 cleanup_reservations (void *cls,
1567                       struct GNUNET_SERVER_Client *client)
1568 {
1569   struct ReservationList *pos;
1570   struct ReservationList *prev;
1571   struct ReservationList *next;
1572
1573   if (client == NULL)
1574     return;
1575   prev = NULL;
1576   pos = reservations;
1577   while (NULL != pos)
1578     {
1579       next = pos->next;
1580       if (pos->client == client)
1581         {
1582           if (prev == NULL)
1583             reservations = next;
1584           else
1585             prev->next = next;
1586           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1587           GNUNET_free (pos);
1588         }
1589       else
1590         {
1591           prev = pos;
1592         }
1593       pos = next;
1594     }
1595   GNUNET_STATISTICS_set (stats,
1596                          gettext_noop ("# reserved"),
1597                          reserved,
1598                          GNUNET_NO);
1599 }
1600
1601
1602 /**
1603  * Process datastore requests.
1604  *
1605  * @param cls closure
1606  * @param server the initialized server
1607  * @param c configuration to use
1608  */
1609 static void
1610 run (void *cls,
1611      struct GNUNET_SERVER_Handle *server,
1612      const struct GNUNET_CONFIGURATION_Handle *c)
1613 {
1614   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1615     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1616      sizeof(struct ReserveMessage) }, 
1617     {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1618      sizeof(struct ReleaseReserveMessage) }, 
1619     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1620     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1621      sizeof (struct UpdateMessage) }, 
1622     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1623     {&handle_get_replication, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, 
1624      sizeof(struct GNUNET_MessageHeader) }, 
1625     {&handle_get_zero_anonymity, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, 
1626      sizeof(struct GetZeroAnonymityMessage) }, 
1627     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1628     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1629      sizeof(struct GNUNET_MessageHeader) }, 
1630     {NULL, NULL, 0, 0}
1631   };
1632   char *fn;
1633   unsigned int bf_size;
1634
1635   cfg = c;
1636   if (GNUNET_OK !=
1637       GNUNET_CONFIGURATION_get_value_number (cfg,
1638                                              "DATASTORE", "QUOTA", &quota))
1639     {
1640       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1641                   _("No `%s' specified for `%s' in configuration!\n"),
1642                   "QUOTA",
1643                   "DATASTORE");
1644       return;
1645     }
1646   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1647   GNUNET_STATISTICS_set (stats,
1648                          gettext_noop ("# quota"),
1649                          quota,
1650                          GNUNET_NO);
1651   cache_size = quota / 8; /* Or should we make this an option? */
1652   GNUNET_STATISTICS_set (stats,
1653                          gettext_noop ("# cache size"),
1654                          cache_size,
1655                          GNUNET_NO);
1656   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1657   fn = NULL;
1658   if ( (GNUNET_OK !=
1659         GNUNET_CONFIGURATION_get_value_filename (cfg,
1660                                                  "DATASTORE",
1661                                                  "BLOOMFILTER",
1662                                                  &fn)) ||
1663        (GNUNET_OK !=
1664         GNUNET_DISK_directory_create_for_file (fn)) )
1665     {
1666       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1667                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1668                   fn != NULL ? fn : "");
1669       GNUNET_free_non_null (fn);
1670       fn = NULL;
1671     }
1672   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1673   GNUNET_free_non_null (fn);
1674   if (filter == NULL)
1675     {
1676       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1677                   _("Failed to initialize bloomfilter.\n"));
1678       if (stats != NULL)
1679         {
1680           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1681           stats = NULL;
1682         }
1683       return;
1684     }
1685   plugin = load_plugin ();
1686   if (NULL == plugin)
1687     {
1688       GNUNET_CONTAINER_bloomfilter_free (filter);
1689       filter = NULL;
1690       if (stats != NULL)
1691         {
1692           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1693           stats = NULL;
1694         }
1695       return;
1696     }
1697   stat_get = GNUNET_STATISTICS_get (stats,
1698                                     "datastore",
1699                                     QUOTA_STAT_NAME,
1700                                     GNUNET_TIME_UNIT_SECONDS,
1701                                     &process_stat_done,
1702                                     &process_stat_in,
1703                                     plugin);
1704   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1705   GNUNET_SERVER_add_handlers (server, handlers);
1706   expired_kill_task
1707     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1708                                           &delete_expired, NULL);
1709   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1710                                 &cleaning_task, NULL);
1711 }
1712
1713
1714 /**
1715  * The main function for the datastore service.
1716  *
1717  * @param argc number of arguments from the command line
1718  * @param argv command line arguments
1719  * @return 0 ok, 1 on error
1720  */
1721 int
1722 main (int argc, char *const *argv)
1723 {
1724   int ret;
1725
1726   ret = (GNUNET_OK ==
1727          GNUNET_SERVICE_run (argc,
1728                              argv,
1729                              "datastore",
1730                              GNUNET_SERVICE_OPTION_NONE,
1731                              &run, NULL)) ? 0 : 1;
1732   return ret;
1733 }
1734
1735
1736 /* end of gnunet-service-datastore.c */