improving datastore API --- not working yet
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51
52 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
53
54 /**
55  * After how many payload-changing operations
56  * do we sync our statistics?
57  */
58 #define MAX_STAT_SYNC_LAG 50
59
60
61 /**
62  * Our datastore plugin.
63  */
64 struct DatastorePlugin
65 {
66
67   /**
68    * API of the transport as returned by the plugin's
69    * initialization function.
70    */
71   struct GNUNET_DATASTORE_PluginFunctions *api;
72
73   /**
74    * Short name for the plugin (i.e. "sqlite").
75    */
76   char *short_name;
77
78   /**
79    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
80    */
81   char *lib_name;
82
83   /**
84    * Environment this transport service is using
85    * for this plugin.
86    */
87   struct GNUNET_DATASTORE_PluginEnvironment env;
88
89 };
90
91
92 /**
93  * Linked list of active reservations.
94  */
95 struct ReservationList 
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct ReservationList *next;
102
103   /**
104    * Client that made the reservation.
105    */
106   struct GNUNET_SERVER_Client *client;
107
108   /**
109    * Number of bytes (still) reserved.
110    */
111   uint64_t amount;
112
113   /**
114    * Number of items (still) reserved.
115    */
116   uint64_t entries;
117
118   /**
119    * Reservation identifier.
120    */
121   int32_t rid;
122
123 };
124
125
126
127 /**
128  * Our datastore plugin (NULL if not available).
129  */
130 static struct DatastorePlugin *plugin;
131
132 /**
133  * Linked list of space reservations made by clients.
134  */
135 static struct ReservationList *reservations;
136
137 /**
138  * Bloomfilter to quickly tell if we don't have the content.
139  */
140 static struct GNUNET_CONTAINER_BloomFilter *filter;
141
142 /**
143  * How much space are we allowed to use?
144  */
145 static unsigned long long quota;
146
147 /**
148  * How much space are we using for the cache?  (space available for
149  * insertions that will be instantly reclaimed by discarding less
150  * important content --- or possibly whatever we just inserted into
151  * the "cache").
152  */
153 static unsigned long long cache_size;
154
155 /**
156  * How much space have we currently reserved?
157  */
158 static unsigned long long reserved;
159   
160 /**
161  * How much data are we currently storing
162  * in the database?
163  */
164 static unsigned long long payload;
165
166 /**
167  * Number of updates that were made to the
168  * payload value since we last synchronized
169  * it with the statistics service.
170  */
171 static unsigned int lastSync;
172
173 /**
174  * Did we get an answer from statistics?
175  */
176 static int stats_worked;
177
178 /**
179  * Identity of the task that is used to delete
180  * expired content.
181  */
182 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
183
184 /**
185  * Our configuration.
186  */
187 const struct GNUNET_CONFIGURATION_Handle *cfg;
188
189
190 /**
191  * Handle for reporting statistics.
192  */
193 static struct GNUNET_STATISTICS_Handle *stats;
194
195
196 /**
197  * Synchronize our utilization statistics with the 
198  * statistics service.
199  */
200 static void 
201 sync_stats ()
202 {
203   GNUNET_STATISTICS_set (stats,
204                          QUOTA_STAT_NAME,
205                          payload,
206                          GNUNET_YES);
207   lastSync = 0;
208 }
209
210
211
212
213 /**
214  * Function called once the transmit operation has
215  * either failed or succeeded.
216  *
217  * @param cls closure
218  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
219  */
220 typedef void (*TransmitContinuation)(void *cls,
221                                      int status);
222
223
224 /**
225  * Context for transmitting replies to clients.
226  */
227 struct TransmitCallbackContext 
228 {
229   
230   /**
231    * We keep these in a doubly-linked list (for cleanup).
232    */
233   struct TransmitCallbackContext *next;
234   
235   /**
236    * We keep these in a doubly-linked list (for cleanup).
237    */
238   struct TransmitCallbackContext *prev;
239   
240   /**
241    * The message that we're asked to transmit.
242    */
243   struct GNUNET_MessageHeader *msg;
244   
245   /**
246    * Handle for the transmission request.
247    */
248   struct GNUNET_CONNECTION_TransmitHandle *th;
249
250   /**
251    * Client that we are transmitting to.
252    */
253   struct GNUNET_SERVER_Client *client;
254
255   /**
256    * Function to call once msg has been transmitted
257    * (or at least added to the buffer).
258    */
259   TransmitContinuation tc;
260
261   /**
262    * Closure for tc.
263    */
264   void *tc_cls;
265
266   /**
267    * GNUNET_YES if we are supposed to signal the server
268    * completion of the client's request.
269    */
270   int end;
271 };
272
273   
274 /**
275  * Head of the doubly-linked list (for cleanup).
276  */
277 static struct TransmitCallbackContext *tcc_head;
278
279 /**
280  * Tail of the doubly-linked list (for cleanup).
281  */
282 static struct TransmitCallbackContext *tcc_tail;
283
284 /**
285  * Have we already cleaned up the TCCs and are hence no longer
286  * willing (or able) to transmit anything to anyone?
287  */
288 static int cleaning_done;
289
290 /**
291  * Handle for pending get request.
292  */
293 static struct GNUNET_STATISTICS_GetHandle *stat_get;
294
295
296 /**
297  * Task that is used to remove expired entries from
298  * the datastore.  This task will schedule itself
299  * again automatically to always delete all expired
300  * content quickly.
301  *
302  * @param cls not used
303  * @param tc task context
304  */ 
305 static void
306 delete_expired (void *cls,
307                 const struct GNUNET_SCHEDULER_TaskContext *tc);
308
309
310 /**
311  * Iterate over the expired items stored in the datastore.
312  * Delete all expired items; once we have processed all
313  * expired items, re-schedule the "delete_expired" task.
314  *
315  * @param cls not used
316  * @param next_cls closure to pass to the "next" function.
317  * @param key key for the content
318  * @param size number of bytes in data
319  * @param data content stored
320  * @param type type of the content
321  * @param priority priority of the content
322  * @param anonymity anonymity-level for the content
323  * @param expiration expiration time for the content
324  * @param uid unique identifier for the datum;
325  *        maybe 0 if no unique identifier is available
326  *
327  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
328  *         (continue on call to "next", of course),
329  *         GNUNET_NO to delete the item and continue (if supported)
330  */
331 static int 
332 expired_processor (void *cls,
333                    void *next_cls,
334                    const GNUNET_HashCode * key,
335                    uint32_t size,
336                    const void *data,
337                    enum GNUNET_BLOCK_Type type,
338                    uint32_t priority,
339                    uint32_t anonymity,
340                    struct GNUNET_TIME_Absolute
341                    expiration, 
342                    uint64_t uid)
343 {
344   struct GNUNET_TIME_Absolute now;
345
346   if (key == NULL) 
347     {
348       expired_kill_task 
349         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
350                                         &delete_expired,
351                                         NULL);
352       return GNUNET_SYSERR;
353     }
354   now = GNUNET_TIME_absolute_get ();
355   if (expiration.abs_value > now.abs_value)
356     {
357       /* finished processing */
358       expired_kill_task 
359         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
360                                         &delete_expired,
361                                         NULL);
362       return GNUNET_SYSERR;
363     }
364 #if DEBUG_DATASTORE
365   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
366               "Deleting content `%s' of type %u that expired %llu ms ago\n",
367               GNUNET_h2s (key),
368               type,
369               (unsigned long long) (now.abs_value - expiration.abs_value));
370 #endif
371   GNUNET_STATISTICS_update (stats,
372                             gettext_noop ("# bytes expired"),
373                             size,
374                             GNUNET_YES);
375   GNUNET_CONTAINER_bloomfilter_remove (filter,
376                                        key);
377   expired_kill_task 
378     = GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY,
379                                     &delete_expired,
380                                     NULL);
381   return GNUNET_NO;
382 }
383
384
385 /**
386  * Task that is used to remove expired entries from
387  * the datastore.  This task will schedule itself
388  * again automatically to always delete all expired
389  * content quickly.
390  *
391  * @param cls not used
392  * @param tc task context
393  */ 
394 static void
395 delete_expired (void *cls,
396                 const struct GNUNET_SCHEDULER_TaskContext *tc)
397 {
398   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
399   plugin->api->expiration_get (plugin->api->cls, 
400                                &expired_processor,
401                                NULL);
402 }
403
404
405 /**
406  * An iterator over a set of items stored in the datastore
407  * that deletes until we're happy with respect to our quota.
408  *
409  * @param cls closure
410  * @param next_cls closure to pass to the "next" function.
411  * @param key key for the content
412  * @param size number of bytes in data
413  * @param data content stored
414  * @param type type of the content
415  * @param priority priority of the content
416  * @param anonymity anonymity-level for the content
417  * @param expiration expiration time for the content
418  * @param uid unique identifier for the datum;
419  *        maybe 0 if no unique identifier is available
420  *
421  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
422  *         (continue on call to "next", of course),
423  *         GNUNET_NO to delete the item and continue (if supported)
424  */
425 static int 
426 quota_processor (void *cls,
427                  void *next_cls,
428                  const GNUNET_HashCode * key,
429                  uint32_t size,
430                  const void *data,
431                  enum GNUNET_BLOCK_Type type,
432                  uint32_t priority,
433                  uint32_t anonymity,
434                  struct GNUNET_TIME_Absolute expiration, 
435                  uint64_t uid)
436 {
437   unsigned long long *need = cls;
438
439   if (NULL == key)
440     return GNUNET_SYSERR;    
441 #if DEBUG_DATASTORE
442   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
444               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
445               GNUNET_h2s (key),
446               type,
447               *need);
448 #endif
449   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
450     *need = 0;
451   else
452     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
453   GNUNET_STATISTICS_update (stats,
454                             gettext_noop ("# bytes purged (low-priority)"),
455                             size,
456                             GNUNET_YES);
457   GNUNET_CONTAINER_bloomfilter_remove (filter,
458                                        key);
459   return GNUNET_NO;
460 }
461
462
463 /**
464  * Manage available disk space by running tasks
465  * that will discard content if necessary.  This
466  * function will be run whenever a request for
467  * "need" bytes of storage could only be satisfied
468  * by eating into the "cache" (and we want our cache
469  * space back).
470  *
471  * @param need number of bytes of content that were
472  *        placed into the "cache" (and hence the
473  *        number of bytes that should be removed).
474  */
475 static void
476 manage_space (unsigned long long need)
477 {
478   unsigned long long last;
479
480 #if DEBUG_DATASTORE
481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482               "Asked to free up %llu bytes of cache space\n",
483               need);
484 #endif
485   last = 0;
486   while ( (need > 0) &&
487           (last != need) )
488     {
489       last = need;
490       plugin->api->expiration_get (plugin->api->cls,
491                                    &quota_processor,
492                                    &need);    
493     }
494 }
495
496
497 /**
498  * Function called to notify a client about the socket
499  * begin ready to queue more data.  "buf" will be
500  * NULL and "size" zero if the socket was closed for
501  * writing in the meantime.
502  *
503  * @param cls closure
504  * @param size number of bytes available in buf
505  * @param buf where the callee should write the message
506  * @return number of bytes written to buf
507  */
508 static size_t
509 transmit_callback (void *cls,
510                    size_t size, void *buf)
511 {
512   struct TransmitCallbackContext *tcc = cls;
513   size_t msize;
514   
515   tcc->th = NULL;
516   GNUNET_CONTAINER_DLL_remove (tcc_head,
517                                tcc_tail,
518                                tcc);
519   msize = ntohs(tcc->msg->size);
520   if (size == 0)
521     {
522       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
523                   _("Transmission to client failed!\n"));
524       if (tcc->tc != NULL)
525         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
526       if (GNUNET_YES == tcc->end)
527         {
528           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
529                       _("Disconnecting client due to transmission failure!\n"));
530           GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
531         }
532       GNUNET_SERVER_client_drop (tcc->client);
533       GNUNET_free (tcc->msg);
534       GNUNET_free (tcc);
535       return 0;
536     }
537   GNUNET_assert (size >= msize);
538   memcpy (buf, tcc->msg, msize);
539   if (tcc->tc != NULL)
540     tcc->tc (tcc->tc_cls, GNUNET_OK);
541   if (GNUNET_YES == tcc->end)
542     {
543       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
544     }
545   else
546     {
547 #if DEBUG_DATASTORE
548       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
549                   "Response transmitted, more pending!\n");
550 #endif
551     }
552   GNUNET_SERVER_client_drop (tcc->client);
553   GNUNET_free (tcc->msg);
554   GNUNET_free (tcc);
555   return msize;
556 }
557
558
559 /**
560  * Transmit the given message to the client.
561  *
562  * @param client target of the message
563  * @param msg message to transmit, will be freed!
564  * @param tc function to call afterwards
565  * @param tc_cls closure for tc
566  * @param end is this the last response (and we should
567  *        signal the server completion accodingly after
568  *        transmitting this message)?
569  */
570 static void
571 transmit (struct GNUNET_SERVER_Client *client,
572           struct GNUNET_MessageHeader *msg,
573           TransmitContinuation tc,
574           void *tc_cls,
575           int end)
576 {
577   struct TransmitCallbackContext *tcc;
578
579   if (GNUNET_YES == cleaning_done)
580     {
581 #if DEBUG_DATASTORE
582       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
583                   "Shutdown in progress, aborting transmission.\n");
584 #endif
585       GNUNET_free (msg);
586       if (NULL != tc)
587         tc (tc_cls, GNUNET_SYSERR);
588       return;
589     }
590   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
591   tcc->msg = msg;
592   tcc->client = client;
593   tcc->tc = tc;
594   tcc->tc_cls = tc_cls;
595   tcc->end = end;
596   if (NULL ==
597       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
598                                                       ntohs(msg->size),
599                                                       GNUNET_TIME_UNIT_FOREVER_REL,
600                                                       &transmit_callback,
601                                                       tcc)))
602     {
603       GNUNET_break (0);
604       if (GNUNET_YES == end)
605         {
606           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
607                       _("Forcefully disconnecting client.\n"));
608           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
609         }
610       if (NULL != tc)
611         tc (tc_cls, GNUNET_SYSERR);
612       GNUNET_free (msg);
613       GNUNET_free (tcc);
614       return;
615     }
616   GNUNET_SERVER_client_keep (client);
617   GNUNET_CONTAINER_DLL_insert (tcc_head,
618                                tcc_tail,
619                                tcc);
620 }
621
622
623 /**
624  * Transmit a status code to the client.
625  *
626  * @param client receiver of the response
627  * @param code status code
628  * @param msg optional error message (can be NULL)
629  */
630 static void
631 transmit_status (struct GNUNET_SERVER_Client *client,
632                  int code,
633                  const char *msg)
634 {
635   struct StatusMessage *sm;
636   size_t slen;
637
638 #if DEBUG_DATASTORE
639   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
640               "Transmitting `%s' message with value %d and message `%s'\n",
641               "STATUS",
642               code,
643               msg != NULL ? msg : "(none)");
644 #endif
645   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
646   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
647   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
648   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
649   sm->status = htonl(code);
650   if (slen > 0)
651     memcpy (&sm[1], msg, slen);  
652   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
653 }
654
655
656 /**
657  * Function called once the transmit operation has
658  * either failed or succeeded.
659  *
660  * @param next_cls closure for calling "next_request" callback
661  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
662  */
663 static void 
664 get_next(void *next_cls,
665          int status)
666 {
667   if (status != GNUNET_OK)
668     {
669       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
670                   _("Failed to transmit an item to the client; aborting iteration.\n"));
671       if (plugin != NULL)
672         plugin->api->next_request (next_cls, GNUNET_YES);
673       return;
674     }
675   plugin->api->next_request (next_cls, GNUNET_NO);
676 }
677
678
679 /**
680  * Function that will transmit the given datastore entry
681  * to the client.
682  *
683  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
684  * @param next_cls closure to use to ask for the next item
685  * @param key key for the content
686  * @param size number of bytes in data
687  * @param data content stored
688  * @param type type of the content
689  * @param priority priority of the content
690  * @param anonymity anonymity-level for the content
691  * @param expiration expiration time for the content
692  * @param uid unique identifier for the datum;
693  *        maybe 0 if no unique identifier is available
694  *
695  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
696  *         GNUNET_NO to delete the item and continue (if supported)
697  */
698 static int
699 transmit_item (void *cls,
700                void *next_cls,
701                const GNUNET_HashCode * key,
702                uint32_t size,
703                const void *data,
704                enum GNUNET_BLOCK_Type type,
705                uint32_t priority,
706                uint32_t anonymity,
707                struct GNUNET_TIME_Absolute
708                expiration, uint64_t uid)
709 {
710   struct GNUNET_SERVER_Client *client = cls;
711   struct GNUNET_MessageHeader *end;
712   struct DataMessage *dm;
713
714   if (key == NULL)
715     {
716       /* transmit 'DATA_END' */
717 #if DEBUG_DATASTORE
718       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719                   "Transmitting `%s' message\n",
720                   "DATA_END");
721 #endif
722       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
723       end->size = htons(sizeof(struct GNUNET_MessageHeader));
724       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
725       transmit (client, end, NULL, NULL, GNUNET_YES);
726       GNUNET_SERVER_client_drop (client);
727       return GNUNET_OK;
728     }
729   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
730   dm->header.size = htons(sizeof(struct DataMessage) + size);
731   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
732   dm->rid = htonl(0);
733   dm->size = htonl(size);
734   dm->type = htonl(type);
735   dm->priority = htonl(priority);
736   dm->anonymity = htonl(anonymity);
737   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
738   dm->uid = GNUNET_htonll(uid);
739   dm->key = *key;
740   memcpy (&dm[1], data, size);
741 #if DEBUG_DATASTORE
742   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
743               "Transmitting `%s' message for `%s' of type %u\n",
744               "DATA",
745               GNUNET_h2s (key),
746               type);
747 #endif
748   GNUNET_STATISTICS_update (stats,
749                             gettext_noop ("# results found"),
750                             1,
751                             GNUNET_NO);
752   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
753   return GNUNET_OK;
754 }
755
756
757 /**
758  * Handle RESERVE-message.
759  *
760  * @param cls closure
761  * @param client identification of the client
762  * @param message the actual message
763  */
764 static void
765 handle_reserve (void *cls,
766                 struct GNUNET_SERVER_Client *client,
767                 const struct GNUNET_MessageHeader *message)
768 {
769   /**
770    * Static counter to produce reservation identifiers.
771    */
772   static int reservation_gen;
773
774   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
775   struct ReservationList *e;
776   unsigned long long used;
777   unsigned long long req;
778   uint64_t amount;
779   uint32_t entries;
780
781 #if DEBUG_DATASTORE
782   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783               "Processing `%s' request\n",
784               "RESERVE");
785 #endif
786   amount = GNUNET_ntohll(msg->amount);
787   entries = ntohl(msg->entries);
788   used = payload + reserved;
789   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
790   if (used + req > quota)
791     {
792       if (quota < used)
793         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
794       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
795                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
796                   quota - used,
797                   "RESERVE",
798                   req);
799       if (cache_size < req)
800         {
801           /* TODO: document this in the FAQ; essentially, if this
802              message happens, the insertion request could be blocked
803              by less-important content from migration because it is
804              larger than 1/8th of the overall available space, and
805              we only reserve 1/8th for "fresh" insertions */
806           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
807                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
808                       req,
809                       cache_size);
810           transmit_status (client, 0, 
811                            gettext_noop ("Insufficient space to satisfy request and "
812                                          "requested amount is larger than cache size"));
813         }
814       else
815         {
816           transmit_status (client, 0, 
817                            gettext_noop ("Insufficient space to satisfy request"));
818         }
819       return;      
820     }
821   reserved += req;
822   GNUNET_STATISTICS_set (stats,
823                          gettext_noop ("# reserved"),
824                          reserved,
825                          GNUNET_NO);
826   e = GNUNET_malloc (sizeof(struct ReservationList));
827   e->next = reservations;
828   reservations = e;
829   e->client = client;
830   e->amount = amount;
831   e->entries = entries;
832   e->rid = ++reservation_gen;
833   if (reservation_gen < 0)
834     reservation_gen = 0; /* wrap around */
835   transmit_status (client, e->rid, NULL);
836 }
837
838
839 /**
840  * Handle RELEASE_RESERVE-message.
841  *
842  * @param cls closure
843  * @param client identification of the client
844  * @param message the actual message
845  */
846 static void
847 handle_release_reserve (void *cls,
848                         struct GNUNET_SERVER_Client *client,
849                         const struct GNUNET_MessageHeader *message)
850 {
851   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
852   struct ReservationList *pos;
853   struct ReservationList *prev;
854   struct ReservationList *next;
855   int rid = ntohl(msg->rid);
856   unsigned long long rem;
857
858 #if DEBUG_DATASTORE
859   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
860               "Processing `%s' request\n",
861               "RELEASE_RESERVE");
862 #endif
863   next = reservations;
864   prev = NULL;
865   while (NULL != (pos = next))
866     {
867       next = pos->next;
868       if (rid == pos->rid)
869         {
870           if (prev == NULL)
871             reservations = next;
872           else
873             prev->next = next;
874           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
875           GNUNET_assert (reserved >= rem);
876           reserved -= rem;
877           GNUNET_STATISTICS_set (stats,
878                          gettext_noop ("# reserved"),
879                                  reserved,
880                                  GNUNET_NO);
881 #if DEBUG_DATASTORE
882           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
883                       "Returning %llu remaining reserved bytes to storage pool\n",
884                       rem);
885 #endif    
886           GNUNET_free (pos);
887           transmit_status (client, GNUNET_OK, NULL);
888           return;
889         }       
890       prev = pos;
891     }
892   GNUNET_break (0);
893   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
894 }
895
896
897 /**
898  * Check that the given message is a valid data message.
899  *
900  * @return NULL if the message is not well-formed, otherwise the message
901  */
902 static const struct DataMessage *
903 check_data (const struct GNUNET_MessageHeader *message)
904 {
905   uint16_t size;
906   uint32_t dsize;
907   const struct DataMessage *dm;
908
909   size = ntohs(message->size);
910   if (size < sizeof(struct DataMessage))
911     { 
912       GNUNET_break (0);
913       return NULL;
914     }
915   dm = (const struct DataMessage *) message;
916   dsize = ntohl(dm->size);
917   if (size != dsize + sizeof(struct DataMessage))
918     {
919       GNUNET_break (0);
920       return NULL;
921     }
922   return dm;
923 }
924
925
926 /**
927  * Context for a put request used to see if the content is
928  * already present.
929  */
930 struct PutContext
931 {
932   /**
933    * Client to notify on completion.
934    */
935   struct GNUNET_SERVER_Client *client;
936
937   /**
938    * Did we find the data already in the database?
939    */
940   int is_present;
941   
942   /* followed by the 'struct DataMessage' */
943 };
944
945
946 /**
947  * Actually put the data message.
948  */
949 static void
950 execute_put (struct GNUNET_SERVER_Client *client,
951              const struct DataMessage *dm)
952 {
953   uint32_t size;
954   char *msg;
955   int ret;
956
957   size = ntohl(dm->size);
958   msg = NULL;
959   ret = plugin->api->put (plugin->api->cls,
960                           &dm->key,
961                           size,
962                           &dm[1],
963                           ntohl(dm->type),
964                           ntohl(dm->priority),
965                           ntohl(dm->anonymity),
966                           0 /* FIXME: replication */,
967                           GNUNET_TIME_absolute_ntoh(dm->expiration),
968                           &msg);
969   if (GNUNET_OK == ret)
970     {
971       GNUNET_STATISTICS_update (stats,
972                                 gettext_noop ("# bytes stored"),
973                                 size,
974                                 GNUNET_YES);
975       GNUNET_CONTAINER_bloomfilter_add (filter,
976                                         &dm->key);
977 #if DEBUG_DATASTORE
978       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
979                   "Successfully stored %u bytes of type %u under key `%s'\n",
980                   size,
981                   ntohl(dm->type),
982                   GNUNET_h2s (&dm->key));
983 #endif
984     }
985   transmit_status (client, 
986                    ret,
987                    msg);
988   GNUNET_free_non_null (msg);
989   if (quota - reserved - cache_size < payload)
990     {
991       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
992                   _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
993                   (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
994                   (unsigned long long) (quota - reserved - cache_size),
995                   (unsigned long long) payload);
996       manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
997     }
998 }
999
1000
1001
1002 /**
1003  * Function that will check if the given datastore entry
1004  * matches the put and if none match executes the put.
1005  *
1006  * @param cls closure, pointer to the client (of type 'struct PutContext').
1007  * @param next_cls closure to use to ask for the next item
1008  * @param key key for the content
1009  * @param size number of bytes in data
1010  * @param data content stored
1011  * @param type type of the content
1012  * @param priority priority of the content
1013  * @param anonymity anonymity-level for the content
1014  * @param expiration expiration time for the content
1015  * @param uid unique identifier for the datum;
1016  *        maybe 0 if no unique identifier is available
1017  *
1018  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
1019  *         GNUNET_NO to delete the item and continue (if supported)
1020  */
1021 static int
1022 check_present (void *cls,
1023                void *next_cls,
1024                const GNUNET_HashCode * key,
1025                uint32_t size,
1026                const void *data,
1027                enum GNUNET_BLOCK_Type type,
1028                uint32_t priority,
1029                uint32_t anonymity,
1030                struct GNUNET_TIME_Absolute
1031                expiration, uint64_t uid)
1032 {
1033   struct PutContext *pc = cls;
1034   const struct DataMessage *dm;
1035
1036   dm = (const struct DataMessage*) &pc[1];
1037   if (key == NULL)
1038     {
1039       if (pc->is_present == GNUNET_YES) 
1040         transmit_status (pc->client, GNUNET_NO, NULL);
1041       else
1042         execute_put (pc->client, dm);
1043       GNUNET_SERVER_client_drop (pc->client);
1044       GNUNET_free (pc);
1045       return GNUNET_SYSERR;
1046     }
1047   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
1048        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
1049        ( (size == ntohl(dm->size)) &&
1050          (0 == memcmp (&dm[1],
1051                        data,
1052                        size)) ) )
1053     {
1054       pc->is_present = GNUNET_YES;
1055       plugin->api->next_request (next_cls, GNUNET_YES);
1056     }
1057   else
1058     {
1059       plugin->api->next_request (next_cls, GNUNET_NO);
1060     }
1061   return GNUNET_OK;
1062 }
1063
1064
1065 /**
1066  * Handle PUT-message.
1067  *
1068  * @param cls closure
1069  * @param client identification of the client
1070  * @param message the actual message
1071  */
1072 static void
1073 handle_put (void *cls,
1074             struct GNUNET_SERVER_Client *client,
1075             const struct GNUNET_MessageHeader *message)
1076 {
1077   const struct DataMessage *dm = check_data (message);
1078   int rid;
1079   struct ReservationList *pos;
1080   struct PutContext *pc;
1081   uint32_t size;
1082
1083   if ( (dm == NULL) ||
1084        (ntohl(dm->type) == 0) ) 
1085     {
1086       GNUNET_break (0);
1087       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1088       return;
1089     }
1090 #if DEBUG_DATASTORE
1091   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1092               "Processing `%s' request for `%s' of type %u\n",
1093               "PUT",
1094               GNUNET_h2s (&dm->key),
1095               ntohl (dm->type));
1096 #endif
1097   rid = ntohl(dm->rid);
1098   size = ntohl(dm->size);
1099   if (rid > 0)
1100     {
1101       pos = reservations;
1102       while ( (NULL != pos) &&
1103               (rid != pos->rid) )
1104         pos = pos->next;
1105       GNUNET_break (pos != NULL);
1106       if (NULL != pos)
1107         {
1108           GNUNET_break (pos->entries > 0);
1109           GNUNET_break (pos->amount >= size);
1110           pos->entries--;
1111           pos->amount -= size;
1112           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1113           GNUNET_STATISTICS_set (stats,
1114                                  gettext_noop ("# reserved"),
1115                                  reserved,
1116                                  GNUNET_NO);
1117         }
1118     }
1119   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1120                                                        &dm->key))
1121     {
1122       pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1123       pc->client = client;
1124       GNUNET_SERVER_client_keep (client);
1125       memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1126       plugin->api->get (plugin->api->cls,
1127                         &dm->key,
1128                         NULL,
1129                         ntohl (dm->type),
1130                         &check_present,
1131                         pc);      
1132       return;
1133     }
1134   execute_put (client, dm);
1135 }
1136
1137
1138 /**
1139  * Handle GET-message.
1140  *
1141  * @param cls closure
1142  * @param client identification of the client
1143  * @param message the actual message
1144  */
1145 static void
1146 handle_get (void *cls,
1147             struct GNUNET_SERVER_Client *client,
1148             const struct GNUNET_MessageHeader *message)
1149 {
1150   const struct GetMessage *msg;
1151   uint16_t size;
1152
1153   size = ntohs(message->size);
1154   if ( (size != sizeof(struct GetMessage)) &&
1155        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
1156     {
1157       GNUNET_break (0);
1158       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1159       return;
1160     }
1161   msg = (const struct GetMessage*) message;
1162 #if DEBUG_DATASTORE
1163   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1164               "Processing `%s' request for `%s' of type %u\n",
1165               "GET",
1166               GNUNET_h2s (&msg->key),
1167               ntohl (msg->type));
1168 #endif
1169   GNUNET_STATISTICS_update (stats,
1170                             gettext_noop ("# GET requests received"),
1171                             1,
1172                             GNUNET_NO);
1173   GNUNET_SERVER_client_keep (client);
1174   if ( (size == sizeof(struct GetMessage)) &&
1175        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
1176                                                          &msg->key)) )
1177     {
1178       /* don't bother database... */
1179 #if DEBUG_DATASTORE
1180       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1181                   "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1182                   "GET",
1183                   GNUNET_h2s (&msg->key));
1184 #endif  
1185       GNUNET_STATISTICS_update (stats,
1186                                 gettext_noop ("# requests filtered by bloomfilter"),
1187                                 1,
1188                                 GNUNET_NO);
1189       transmit_item (client,
1190                      NULL, NULL, 0, NULL, 0, 0, 0, 
1191                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
1192       return;
1193     }
1194   plugin->api->get (plugin->api->cls,
1195                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1196                     NULL,
1197                     ntohl(msg->type),
1198                     &transmit_item,
1199                     client);    
1200 }
1201
1202
1203 /**
1204  * Handle UPDATE-message.
1205  *
1206  * @param cls closure
1207  * @param client identification of the client
1208  * @param message the actual message
1209  */
1210 static void
1211 handle_update (void *cls,
1212                struct GNUNET_SERVER_Client *client,
1213                const struct GNUNET_MessageHeader *message)
1214 {
1215   const struct UpdateMessage *msg;
1216   int ret;
1217   char *emsg;
1218
1219   GNUNET_STATISTICS_update (stats,
1220                             gettext_noop ("# UPDATE requests received"),
1221                             1,
1222                             GNUNET_NO);
1223   msg = (const struct UpdateMessage*) message;
1224   emsg = NULL;
1225 #if DEBUG_DATASTORE
1226   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1227               "Processing `%s' request for %llu\n",
1228               "UPDATE",
1229               (unsigned long long) GNUNET_ntohll (msg->uid));
1230 #endif
1231   ret = plugin->api->update (plugin->api->cls,
1232                              GNUNET_ntohll(msg->uid),
1233                              (int32_t) ntohl(msg->priority),
1234                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1235                              &emsg);
1236   transmit_status (client, ret, emsg);
1237   GNUNET_free_non_null (emsg);
1238 }
1239
1240
1241 /**
1242  * Handle GET_RANDOM-message.
1243  *
1244  * @param cls closure
1245  * @param client identification of the client
1246  * @param message the actual message
1247  */
1248 static void
1249 handle_get_random (void *cls,
1250                    struct GNUNET_SERVER_Client *client,
1251                    const struct GNUNET_MessageHeader *message)
1252 {
1253 #if DEBUG_DATASTORE
1254   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1255               "Processing `%s' request\n",
1256               "GET_RANDOM");
1257 #endif
1258   GNUNET_STATISTICS_update (stats,
1259                             gettext_noop ("# GET RANDOM requests received"),
1260                             1,
1261                             GNUNET_NO);
1262   GNUNET_SERVER_client_keep (client);
1263   plugin->api->replication_get (plugin->api->cls,
1264                                 &transmit_item,
1265                                 client);  
1266 }
1267
1268 /**
1269  * Handle GET_ZERO_ANONYMITY-message.
1270  *
1271  * @param cls closure
1272  * @param client identification of the client
1273  * @param message the actual message
1274  */
1275 static void
1276 handle_get_zero_anonymity (void *cls,
1277                            struct GNUNET_SERVER_Client *client,
1278                            const struct GNUNET_MessageHeader *message)
1279 {
1280   const struct GetZeroAnonymityMessage * msg = (const struct GetZeroAnonymityMessage*) message;
1281   enum GNUNET_BLOCK_Type type;
1282
1283   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1284 #if DEBUG_DATASTORE
1285   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1286               "Processing `%s' request\n",
1287               "GET_ZERO_ANONYMITY");
1288 #endif
1289   GNUNET_STATISTICS_update (stats,
1290                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1291                             1,
1292                             GNUNET_NO);
1293   GNUNET_SERVER_client_keep (client);
1294   plugin->api->iter_zero_anonymity (plugin->api->cls,
1295                                     type,
1296                                     &transmit_item,
1297                                     client);  
1298 }
1299
1300
1301 /**
1302  * Context for the 'remove_callback'.
1303  */
1304 struct RemoveContext 
1305 {
1306   /**
1307    * Client for whom we're doing the remvoing.
1308    */
1309   struct GNUNET_SERVER_Client *client;
1310
1311   /**
1312    * GNUNET_YES if we managed to remove something.
1313    */
1314   int found;
1315 };
1316
1317
1318 /**
1319  * Callback function that will cause the item that is passed
1320  * in to be deleted (by returning GNUNET_NO).
1321  */
1322 static int
1323 remove_callback (void *cls,
1324                  void *next_cls,
1325                  const GNUNET_HashCode * key,
1326                  uint32_t size,
1327                  const void *data,
1328                  enum GNUNET_BLOCK_Type type,
1329                  uint32_t priority,
1330                  uint32_t anonymity,
1331                  struct GNUNET_TIME_Absolute
1332                  expiration, uint64_t uid)
1333 {
1334   struct RemoveContext *rc = cls;
1335
1336   if (key == NULL)
1337     {
1338 #if DEBUG_DATASTORE
1339       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1340                   "No further matches for `%s' request.\n",
1341                   "REMOVE");
1342 #endif  
1343       if (GNUNET_YES == rc->found)
1344         transmit_status (rc->client, GNUNET_OK, NULL);       
1345       else
1346         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1347       GNUNET_SERVER_client_drop (rc->client);
1348       GNUNET_free (rc);
1349       return GNUNET_OK; /* last item */
1350     }
1351   rc->found = GNUNET_YES;
1352 #if DEBUG_DATASTORE
1353   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1354               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1355               (unsigned long long) uid,
1356               "REMOVE",
1357               GNUNET_h2s (key),
1358               type);
1359 #endif  
1360   GNUNET_STATISTICS_update (stats,
1361                             gettext_noop ("# bytes removed (explicit request)"),
1362                             size,
1363                             GNUNET_YES);
1364   GNUNET_CONTAINER_bloomfilter_remove (filter,
1365                                        key);
1366   plugin->api->next_request (next_cls, GNUNET_YES);
1367   return GNUNET_NO;
1368 }
1369
1370
1371 /**
1372  * Handle REMOVE-message.
1373  *
1374  * @param cls closure
1375  * @param client identification of the client
1376  * @param message the actual message
1377  */
1378 static void
1379 handle_remove (void *cls,
1380              struct GNUNET_SERVER_Client *client,
1381              const struct GNUNET_MessageHeader *message)
1382 {
1383   const struct DataMessage *dm = check_data (message);
1384   GNUNET_HashCode vhash;
1385   struct RemoveContext *rc;
1386
1387   if (dm == NULL)
1388     {
1389       GNUNET_break (0);
1390       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1391       return;
1392     }
1393 #if DEBUG_DATASTORE
1394   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1395               "Processing `%s' request for `%s' of type %u\n",
1396               "REMOVE",
1397               GNUNET_h2s (&dm->key),
1398               ntohl (dm->type));
1399 #endif
1400   GNUNET_STATISTICS_update (stats,
1401                             gettext_noop ("# REMOVE requests received"),
1402                             1,
1403                             GNUNET_NO);
1404   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1405   GNUNET_SERVER_client_keep (client);
1406   rc->client = client;
1407   GNUNET_CRYPTO_hash (&dm[1],
1408                       ntohl(dm->size),
1409                       &vhash);
1410   plugin->api->get (plugin->api->cls,
1411                     &dm->key,
1412                     &vhash,
1413                     (enum GNUNET_BLOCK_Type) ntohl(dm->type),
1414                     &remove_callback,
1415                     rc);
1416 }
1417
1418
1419 /**
1420  * Handle DROP-message.
1421  *
1422  * @param cls closure
1423  * @param client identification of the client
1424  * @param message the actual message
1425  */
1426 static void
1427 handle_drop (void *cls,
1428              struct GNUNET_SERVER_Client *client,
1429              const struct GNUNET_MessageHeader *message)
1430 {
1431 #if DEBUG_DATASTORE
1432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1433               "Processing `%s' request\n",
1434               "DROP");
1435 #endif
1436   plugin->api->drop (plugin->api->cls);
1437   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1438 }
1439
1440
1441 /**
1442  * Function called by plugins to notify us about a
1443  * change in their disk utilization.
1444  *
1445  * @param cls closure (NULL)
1446  * @param delta change in disk utilization, 
1447  *        0 for "reset to empty"
1448  */
1449 static void
1450 disk_utilization_change_cb (void *cls,
1451                             int delta)
1452 {
1453   if ( (delta < 0) &&
1454        (payload < -delta) )
1455     {
1456       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1457                   _("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1458                   (long long) payload,
1459                   (long long) -delta);
1460       payload = plugin->api->get_size (plugin->api->cls);
1461       sync_stats ();
1462       return;
1463     }
1464   payload += delta;
1465   lastSync++;
1466   if (lastSync >= MAX_STAT_SYNC_LAG)
1467     sync_stats ();
1468 }
1469
1470
1471 /**
1472  * Callback function to process statistic values.
1473  *
1474  * @param cls closure (struct Plugin*)
1475  * @param subsystem name of subsystem that created the statistic
1476  * @param name the name of the datum
1477  * @param value the current value
1478  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1479  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1480  */
1481 static int
1482 process_stat_in (void *cls,
1483                  const char *subsystem,
1484                  const char *name,
1485                  uint64_t value,
1486                  int is_persistent)
1487 {
1488   GNUNET_assert (stats_worked == GNUNET_NO);
1489   stats_worked = GNUNET_YES;
1490   payload += value;
1491 #if DEBUG_SQLITE
1492   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1494               abs_value,
1495               payload);
1496 #endif
1497   return GNUNET_OK;
1498 }
1499
1500
1501 static void
1502 process_stat_done (void *cls,
1503                    int success)
1504 {
1505   struct DatastorePlugin *plugin = cls;
1506
1507   stat_get = NULL;
1508   if (stats_worked == GNUNET_NO) 
1509     payload = plugin->api->get_size (plugin->api->cls);
1510 }
1511
1512
1513 /**
1514  * Load the datastore plugin.
1515  */
1516 static struct DatastorePlugin *
1517 load_plugin () 
1518 {
1519   struct DatastorePlugin *ret;
1520   char *libname;
1521   char *name;
1522
1523   if (GNUNET_OK !=
1524       GNUNET_CONFIGURATION_get_value_string (cfg,
1525                                              "DATASTORE", "DATABASE", &name))
1526     {
1527       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1528                   _("No `%s' specified for `%s' in configuration!\n"),
1529                   "DATABASE",
1530                   "DATASTORE");
1531       return NULL;
1532     }
1533   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1534   ret->env.cfg = cfg;
1535   ret->env.duc = &disk_utilization_change_cb;
1536   ret->env.cls = NULL;
1537   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1538               _("Loading `%s' datastore plugin\n"), name);
1539   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1540   ret->short_name = name;
1541   ret->lib_name = libname;
1542   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1543   if (ret->api == NULL)
1544     {
1545       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1546                   _("Failed to load datastore plugin for `%s'\n"), name);
1547       GNUNET_free (ret->short_name);
1548       GNUNET_free (libname);
1549       GNUNET_free (ret);
1550       return NULL;
1551     }
1552   return ret;
1553 }
1554
1555
1556 /**
1557  * Function called when the service shuts
1558  * down.  Unloads our datastore plugin.
1559  *
1560  * @param plug plugin to unload
1561  */
1562 static void
1563 unload_plugin (struct DatastorePlugin *plug)
1564 {
1565 #if DEBUG_DATASTORE
1566   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1567               "Datastore service is unloading plugin...\n");
1568 #endif
1569   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1570   GNUNET_free (plug->lib_name);
1571   GNUNET_free (plug->short_name);
1572   GNUNET_free (plug);
1573 }
1574
1575
1576 /**
1577  * Final task run after shutdown.  Unloads plugins and disconnects us from
1578  * statistics.
1579  */
1580 static void
1581 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1582 {
1583   unload_plugin (plugin);
1584   plugin = NULL;
1585   if (filter != NULL)
1586     {
1587       GNUNET_CONTAINER_bloomfilter_free (filter);
1588       filter = NULL;
1589     }
1590   if (lastSync > 0)
1591     sync_stats ();
1592   if (stat_get != NULL)
1593     {
1594       GNUNET_STATISTICS_get_cancel (stat_get);
1595       stat_get = NULL;
1596     }
1597   if (stats != NULL)
1598     {
1599       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1600       stats = NULL;
1601     }
1602 }
1603
1604
1605 /**
1606  * Last task run during shutdown.  Disconnects us from
1607  * the transport and core.
1608  */
1609 static void
1610 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1611 {
1612   struct TransmitCallbackContext *tcc;
1613
1614   cleaning_done = GNUNET_YES;
1615   while (NULL != (tcc = tcc_head))
1616     {
1617       GNUNET_CONTAINER_DLL_remove (tcc_head,
1618                                    tcc_tail,
1619                                    tcc);
1620       if (tcc->th != NULL)
1621         {
1622           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1623           GNUNET_SERVER_client_drop (tcc->client);
1624         }
1625       if (NULL != tcc->tc)
1626         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1627       GNUNET_free (tcc->msg);
1628       GNUNET_free (tcc);
1629     }
1630   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1631     {
1632       GNUNET_SCHEDULER_cancel (expired_kill_task);
1633       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1634     }
1635   GNUNET_SCHEDULER_add_continuation (&unload_task,
1636                                      NULL,
1637                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1638 }
1639
1640
1641 /**
1642  * Function that removes all active reservations made
1643  * by the given client and releases the space for other
1644  * requests.
1645  *
1646  * @param cls closure
1647  * @param client identification of the client
1648  */
1649 static void
1650 cleanup_reservations (void *cls,
1651                       struct GNUNET_SERVER_Client
1652                       * client)
1653 {
1654   struct ReservationList *pos;
1655   struct ReservationList *prev;
1656   struct ReservationList *next;
1657
1658   if (client == NULL)
1659     return;
1660   prev = NULL;
1661   pos = reservations;
1662   while (NULL != pos)
1663     {
1664       next = pos->next;
1665       if (pos->client == client)
1666         {
1667           if (prev == NULL)
1668             reservations = next;
1669           else
1670             prev->next = next;
1671           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1672           GNUNET_free (pos);
1673         }
1674       else
1675         {
1676           prev = pos;
1677         }
1678       pos = next;
1679     }
1680   GNUNET_STATISTICS_set (stats,
1681                          gettext_noop ("# reserved"),
1682                          reserved,
1683                          GNUNET_NO);
1684 }
1685
1686
1687 /**
1688  * Process datastore requests.
1689  *
1690  * @param cls closure
1691  * @param server the initialized server
1692  * @param c configuration to use
1693  */
1694 static void
1695 run (void *cls,
1696      struct GNUNET_SERVER_Handle *server,
1697      const struct GNUNET_CONFIGURATION_Handle *c)
1698 {
1699   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1700     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1701      sizeof(struct ReserveMessage) }, 
1702     {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1703      sizeof(struct ReleaseReserveMessage) }, 
1704     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1705     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1706      sizeof (struct UpdateMessage) }, 
1707     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1708     {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1709      sizeof(struct GNUNET_MessageHeader) }, 
1710     {&handle_get_zero_anonymity, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, 
1711      sizeof(struct GetZeroAnonymityMessage) }, 
1712     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1713     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1714      sizeof(struct GNUNET_MessageHeader) }, 
1715     {NULL, NULL, 0, 0}
1716   };
1717   char *fn;
1718   unsigned int bf_size;
1719
1720   cfg = c;
1721   if (GNUNET_OK !=
1722       GNUNET_CONFIGURATION_get_value_number (cfg,
1723                                              "DATASTORE", "QUOTA", &quota))
1724     {
1725       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1726                   _("No `%s' specified for `%s' in configuration!\n"),
1727                   "QUOTA",
1728                   "DATASTORE");
1729       return;
1730     }
1731   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1732   GNUNET_STATISTICS_set (stats,
1733                          gettext_noop ("# quota"),
1734                          quota,
1735                          GNUNET_NO);
1736   cache_size = quota / 8; /* Or should we make this an option? */
1737   GNUNET_STATISTICS_set (stats,
1738                          gettext_noop ("# cache size"),
1739                          cache_size,
1740                          GNUNET_NO);
1741   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1742   fn = NULL;
1743   if ( (GNUNET_OK !=
1744         GNUNET_CONFIGURATION_get_value_filename (cfg,
1745                                                  "DATASTORE",
1746                                                  "BLOOMFILTER",
1747                                                  &fn)) ||
1748        (GNUNET_OK !=
1749         GNUNET_DISK_directory_create_for_file (fn)) )
1750     {
1751       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1752                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1753                   fn != NULL ? fn : "");
1754       GNUNET_free_non_null (fn);
1755       fn = NULL;
1756     }
1757   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1758   GNUNET_free_non_null (fn);
1759   if (filter == NULL)
1760     {
1761       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1762                   _("Failed to initialize bloomfilter.\n"));
1763       if (stats != NULL)
1764         {
1765           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1766           stats = NULL;
1767         }
1768       return;
1769     }
1770   plugin = load_plugin ();
1771   if (NULL == plugin)
1772     {
1773       GNUNET_CONTAINER_bloomfilter_free (filter);
1774       filter = NULL;
1775       if (stats != NULL)
1776         {
1777           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1778           stats = NULL;
1779         }
1780       return;
1781     }
1782   stat_get = GNUNET_STATISTICS_get (stats,
1783                                     "datastore",
1784                                     QUOTA_STAT_NAME,
1785                                     GNUNET_TIME_UNIT_SECONDS,
1786                                     &process_stat_done,
1787                                     &process_stat_in,
1788                                     plugin);
1789   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1790   GNUNET_SERVER_add_handlers (server, handlers);
1791   expired_kill_task
1792     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1793                                           &delete_expired, NULL);
1794   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1795                                 &cleaning_task, NULL);
1796 }
1797
1798
1799 /**
1800  * The main function for the datastore service.
1801  *
1802  * @param argc number of arguments from the command line
1803  * @param argv command line arguments
1804  * @return 0 ok, 1 on error
1805  */
1806 int
1807 main (int argc, char *const *argv)
1808 {
1809   int ret;
1810
1811   ret = (GNUNET_OK ==
1812          GNUNET_SERVICE_run (argc,
1813                              argv,
1814                              "datastore",
1815                              GNUNET_SERVICE_OPTION_NONE,
1816                              &run, NULL)) ? 0 : 1;
1817   return ret;
1818 }
1819
1820
1821 /* end of gnunet-service-datastore.c */