insanity
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51
52 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
53
54 /**
55  * After how many payload-changing operations
56  * do we sync our statistics?
57  */
58 #define MAX_STAT_SYNC_LAG 50
59
60
61 /**
62  * Our datastore plugin.
63  */
64 struct DatastorePlugin
65 {
66
67   /**
68    * API of the transport as returned by the plugin's
69    * initialization function.
70    */
71   struct GNUNET_DATASTORE_PluginFunctions *api;
72
73   /**
74    * Short name for the plugin (i.e. "sqlite").
75    */
76   char *short_name;
77
78   /**
79    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
80    */
81   char *lib_name;
82
83   /**
84    * Environment this transport service is using
85    * for this plugin.
86    */
87   struct GNUNET_DATASTORE_PluginEnvironment env;
88
89 };
90
91
92 /**
93  * Linked list of active reservations.
94  */
95 struct ReservationList 
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct ReservationList *next;
102
103   /**
104    * Client that made the reservation.
105    */
106   struct GNUNET_SERVER_Client *client;
107
108   /**
109    * Number of bytes (still) reserved.
110    */
111   uint64_t amount;
112
113   /**
114    * Number of items (still) reserved.
115    */
116   uint64_t entries;
117
118   /**
119    * Reservation identifier.
120    */
121   int32_t rid;
122
123 };
124
125
126
127 /**
128  * Our datastore plugin (NULL if not available).
129  */
130 static struct DatastorePlugin *plugin;
131
132 /**
133  * Linked list of space reservations made by clients.
134  */
135 static struct ReservationList *reservations;
136
137 /**
138  * Bloomfilter to quickly tell if we don't have the content.
139  */
140 static struct GNUNET_CONTAINER_BloomFilter *filter;
141
142 /**
143  * How much space are we allowed to use?
144  */
145 static unsigned long long quota;
146
147 /**
148  * How much space are we using for the cache?  (space available for
149  * insertions that will be instantly reclaimed by discarding less
150  * important content --- or possibly whatever we just inserted into
151  * the "cache").
152  */
153 static unsigned long long cache_size;
154
155 /**
156  * How much space have we currently reserved?
157  */
158 static unsigned long long reserved;
159   
160 /**
161  * How much data are we currently storing
162  * in the database?
163  */
164 static unsigned long long payload;
165
166 /**
167  * Number of updates that were made to the
168  * payload value since we last synchronized
169  * it with the statistics service.
170  */
171 static unsigned int lastSync;
172
173 /**
174  * Did we get an answer from statistics?
175  */
176 static int stats_worked;
177
178 /**
179  * Identity of the task that is used to delete
180  * expired content.
181  */
182 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
183
184 /**
185  * Our configuration.
186  */
187 const struct GNUNET_CONFIGURATION_Handle *cfg;
188
189
190 /**
191  * Handle for reporting statistics.
192  */
193 static struct GNUNET_STATISTICS_Handle *stats;
194
195
196 /**
197  * Synchronize our utilization statistics with the 
198  * statistics service.
199  */
200 static void 
201 sync_stats ()
202 {
203   GNUNET_STATISTICS_set (stats,
204                          QUOTA_STAT_NAME,
205                          payload,
206                          GNUNET_YES);
207   lastSync = 0;
208 }
209
210
211
212
213 /**
214  * Function called once the transmit operation has
215  * either failed or succeeded.
216  *
217  * @param cls closure
218  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
219  */
220 typedef void (*TransmitContinuation)(void *cls,
221                                      int status);
222
223
224 /**
225  * Context for transmitting replies to clients.
226  */
227 struct TransmitCallbackContext 
228 {
229   
230   /**
231    * We keep these in a doubly-linked list (for cleanup).
232    */
233   struct TransmitCallbackContext *next;
234   
235   /**
236    * We keep these in a doubly-linked list (for cleanup).
237    */
238   struct TransmitCallbackContext *prev;
239   
240   /**
241    * The message that we're asked to transmit.
242    */
243   struct GNUNET_MessageHeader *msg;
244   
245   /**
246    * Handle for the transmission request.
247    */
248   struct GNUNET_CONNECTION_TransmitHandle *th;
249
250   /**
251    * Client that we are transmitting to.
252    */
253   struct GNUNET_SERVER_Client *client;
254
255   /**
256    * Function to call once msg has been transmitted
257    * (or at least added to the buffer).
258    */
259   TransmitContinuation tc;
260
261   /**
262    * Closure for tc.
263    */
264   void *tc_cls;
265
266   /**
267    * GNUNET_YES if we are supposed to signal the server
268    * completion of the client's request.
269    */
270   int end;
271 };
272
273   
274 /**
275  * Head of the doubly-linked list (for cleanup).
276  */
277 static struct TransmitCallbackContext *tcc_head;
278
279 /**
280  * Tail of the doubly-linked list (for cleanup).
281  */
282 static struct TransmitCallbackContext *tcc_tail;
283
284 /**
285  * Have we already cleaned up the TCCs and are hence no longer
286  * willing (or able) to transmit anything to anyone?
287  */
288 static int cleaning_done;
289
290 /**
291  * Handle for pending get request.
292  */
293 static struct GNUNET_STATISTICS_GetHandle *stat_get;
294
295
296 /**
297  * Task that is used to remove expired entries from
298  * the datastore.  This task will schedule itself
299  * again automatically to always delete all expired
300  * content quickly.
301  *
302  * @param cls not used
303  * @param tc task context
304  */ 
305 static void
306 delete_expired (void *cls,
307                 const struct GNUNET_SCHEDULER_TaskContext *tc);
308
309
310 /**
311  * Iterate over the expired items stored in the datastore.
312  * Delete all expired items; once we have processed all
313  * expired items, re-schedule the "delete_expired" task.
314  *
315  * @param cls not used
316  * @param next_cls closure to pass to the "next" function.
317  * @param key key for the content
318  * @param size number of bytes in data
319  * @param data content stored
320  * @param type type of the content
321  * @param priority priority of the content
322  * @param anonymity anonymity-level for the content
323  * @param expiration expiration time for the content
324  * @param uid unique identifier for the datum;
325  *        maybe 0 if no unique identifier is available
326  *
327  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
328  *         (continue on call to "next", of course),
329  *         GNUNET_NO to delete the item and continue (if supported)
330  */
331 static int 
332 expired_processor (void *cls,
333                    void *next_cls,
334                    const GNUNET_HashCode * key,
335                    uint32_t size,
336                    const void *data,
337                    enum GNUNET_BLOCK_Type type,
338                    uint32_t priority,
339                    uint32_t anonymity,
340                    struct GNUNET_TIME_Absolute
341                    expiration, 
342                    uint64_t uid)
343 {
344   struct GNUNET_TIME_Absolute now;
345
346   if (key == NULL) 
347     {
348       expired_kill_task 
349         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
350                                         &delete_expired,
351                                         NULL);
352       return GNUNET_SYSERR;
353     }
354   now = GNUNET_TIME_absolute_get ();
355   if (expiration.abs_value > now.abs_value)
356     {
357       /* finished processing */
358       expired_kill_task 
359         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
360                                         &delete_expired,
361                                         NULL);
362       return GNUNET_SYSERR;
363     }
364 #if DEBUG_DATASTORE
365   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
366               "Deleting content `%s' of type %u that expired %llu ms ago\n",
367               GNUNET_h2s (key),
368               type,
369               (unsigned long long) (now.abs_value - expiration.abs_value));
370 #endif
371   GNUNET_STATISTICS_update (stats,
372                             gettext_noop ("# bytes expired"),
373                             size,
374                             GNUNET_YES);
375   GNUNET_CONTAINER_bloomfilter_remove (filter,
376                                        key);
377   expired_kill_task 
378     = GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY,
379                                     &delete_expired,
380                                     NULL);
381   return GNUNET_NO;
382 }
383
384
385 /**
386  * Task that is used to remove expired entries from
387  * the datastore.  This task will schedule itself
388  * again automatically to always delete all expired
389  * content quickly.
390  *
391  * @param cls not used
392  * @param tc task context
393  */ 
394 static void
395 delete_expired (void *cls,
396                 const struct GNUNET_SCHEDULER_TaskContext *tc)
397 {
398   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
399   plugin->api->expiration_get (plugin->api->cls, 
400                                &expired_processor,
401                                NULL);
402 }
403
404
405 /**
406  * An iterator over a set of items stored in the datastore
407  * that deletes until we're happy with respect to our quota.
408  *
409  * @param cls closure
410  * @param next_cls closure to pass to the "next" function.
411  * @param key key for the content
412  * @param size number of bytes in data
413  * @param data content stored
414  * @param type type of the content
415  * @param priority priority of the content
416  * @param anonymity anonymity-level for the content
417  * @param expiration expiration time for the content
418  * @param uid unique identifier for the datum;
419  *        maybe 0 if no unique identifier is available
420  *
421  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
422  *         (continue on call to "next", of course),
423  *         GNUNET_NO to delete the item and continue (if supported)
424  */
425 static int 
426 quota_processor (void *cls,
427                  void *next_cls,
428                  const GNUNET_HashCode * key,
429                  uint32_t size,
430                  const void *data,
431                  enum GNUNET_BLOCK_Type type,
432                  uint32_t priority,
433                  uint32_t anonymity,
434                  struct GNUNET_TIME_Absolute expiration, 
435                  uint64_t uid)
436 {
437   unsigned long long *need = cls;
438
439   if (NULL == key)
440     return GNUNET_SYSERR;    
441 #if DEBUG_DATASTORE
442   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
444               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
445               GNUNET_h2s (key),
446               type,
447               *need);
448 #endif
449   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
450     *need = 0;
451   else
452     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
453   GNUNET_STATISTICS_update (stats,
454                             gettext_noop ("# bytes purged (low-priority)"),
455                             size,
456                             GNUNET_YES);
457   GNUNET_CONTAINER_bloomfilter_remove (filter,
458                                        key);
459   return GNUNET_NO;
460 }
461
462
463 /**
464  * Manage available disk space by running tasks
465  * that will discard content if necessary.  This
466  * function will be run whenever a request for
467  * "need" bytes of storage could only be satisfied
468  * by eating into the "cache" (and we want our cache
469  * space back).
470  *
471  * @param need number of bytes of content that were
472  *        placed into the "cache" (and hence the
473  *        number of bytes that should be removed).
474  */
475 static void
476 manage_space (unsigned long long need)
477 {
478   unsigned long long last;
479
480 #if DEBUG_DATASTORE
481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482               "Asked to free up %llu bytes of cache space\n",
483               need);
484 #endif
485   last = 0;
486   while ( (need > 0) &&
487           (last != need) )
488     {
489       last = need;
490       plugin->api->expiration_get (plugin->api->cls,
491                                    &quota_processor,
492                                    &need);    
493     }
494 }
495
496
497 /**
498  * Function called to notify a client about the socket
499  * begin ready to queue more data.  "buf" will be
500  * NULL and "size" zero if the socket was closed for
501  * writing in the meantime.
502  *
503  * @param cls closure
504  * @param size number of bytes available in buf
505  * @param buf where the callee should write the message
506  * @return number of bytes written to buf
507  */
508 static size_t
509 transmit_callback (void *cls,
510                    size_t size, void *buf)
511 {
512   struct TransmitCallbackContext *tcc = cls;
513   size_t msize;
514   
515   tcc->th = NULL;
516   GNUNET_CONTAINER_DLL_remove (tcc_head,
517                                tcc_tail,
518                                tcc);
519   msize = ntohs(tcc->msg->size);
520   if (size == 0)
521     {
522       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
523                   _("Transmission to client failed!\n"));
524       if (tcc->tc != NULL)
525         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
526       if (GNUNET_YES == tcc->end)
527         {
528           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
529                       _("Disconnecting client due to transmission failure!\n"));
530           GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
531         }
532       GNUNET_SERVER_client_drop (tcc->client);
533       GNUNET_free (tcc->msg);
534       GNUNET_free (tcc);
535       return 0;
536     }
537   GNUNET_assert (size >= msize);
538   memcpy (buf, tcc->msg, msize);
539   if (tcc->tc != NULL)
540     tcc->tc (tcc->tc_cls, GNUNET_OK);
541   if (GNUNET_YES == tcc->end)
542     {
543       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
544     }
545   else
546     {
547 #if DEBUG_DATASTORE
548       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
549                   "Response transmitted, more pending!\n");
550 #endif
551     }
552   GNUNET_SERVER_client_drop (tcc->client);
553   GNUNET_free (tcc->msg);
554   GNUNET_free (tcc);
555   return msize;
556 }
557
558
559 /**
560  * Transmit the given message to the client.
561  *
562  * @param client target of the message
563  * @param msg message to transmit, will be freed!
564  * @param tc function to call afterwards
565  * @param tc_cls closure for tc
566  * @param end is this the last response (and we should
567  *        signal the server completion accodingly after
568  *        transmitting this message)?
569  */
570 static void
571 transmit (struct GNUNET_SERVER_Client *client,
572           struct GNUNET_MessageHeader *msg,
573           TransmitContinuation tc,
574           void *tc_cls,
575           int end)
576 {
577   struct TransmitCallbackContext *tcc;
578
579   if (GNUNET_YES == cleaning_done)
580     {
581 #if DEBUG_DATASTORE
582       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
583                   "Shutdown in progress, aborting transmission.\n");
584 #endif
585       GNUNET_free (msg);
586       if (NULL != tc)
587         tc (tc_cls, GNUNET_SYSERR);
588       return;
589     }
590   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
591   tcc->msg = msg;
592   tcc->client = client;
593   tcc->tc = tc;
594   tcc->tc_cls = tc_cls;
595   tcc->end = end;
596   if (NULL ==
597       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
598                                                       ntohs(msg->size),
599                                                       GNUNET_TIME_UNIT_FOREVER_REL,
600                                                       &transmit_callback,
601                                                       tcc)))
602     {
603       GNUNET_break (0);
604       if (GNUNET_YES == end)
605         {
606           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
607                       _("Forcefully disconnecting client.\n"));
608           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
609         }
610       if (NULL != tc)
611         tc (tc_cls, GNUNET_SYSERR);
612       GNUNET_free (msg);
613       GNUNET_free (tcc);
614       return;
615     }
616   GNUNET_SERVER_client_keep (client);
617   GNUNET_CONTAINER_DLL_insert (tcc_head,
618                                tcc_tail,
619                                tcc);
620 }
621
622
623 /**
624  * Transmit a status code to the client.
625  *
626  * @param client receiver of the response
627  * @param code status code
628  * @param msg optional error message (can be NULL)
629  */
630 static void
631 transmit_status (struct GNUNET_SERVER_Client *client,
632                  int code,
633                  const char *msg)
634 {
635   struct StatusMessage *sm;
636   size_t slen;
637
638 #if DEBUG_DATASTORE
639   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
640               "Transmitting `%s' message with value %d and message `%s'\n",
641               "STATUS",
642               code,
643               msg != NULL ? msg : "(none)");
644 #endif
645   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
646   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
647   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
648   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
649   sm->status = htonl(code);
650   if (slen > 0)
651     memcpy (&sm[1], msg, slen);  
652   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
653 }
654
655
656 /**
657  * Function called once the transmit operation has
658  * either failed or succeeded.
659  *
660  * @param next_cls closure for calling "next_request" callback
661  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
662  */
663 static void 
664 get_next(void *next_cls,
665          int status)
666 {
667   if (status != GNUNET_OK)
668     {
669       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
670                   _("Failed to transmit an item to the client; aborting iteration.\n"));
671       if (plugin != NULL)
672         plugin->api->next_request (next_cls, GNUNET_YES);
673       return;
674     }
675   if (next_cls != NULL)
676    plugin->api->next_request (next_cls, GNUNET_NO);
677 }
678
679
680 /**
681  * Function that will transmit the given datastore entry
682  * to the client.
683  *
684  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
685  * @param next_cls closure to use to ask for the next item
686  * @param key key for the content
687  * @param size number of bytes in data
688  * @param data content stored
689  * @param type type of the content
690  * @param priority priority of the content
691  * @param anonymity anonymity-level for the content
692  * @param expiration expiration time for the content
693  * @param uid unique identifier for the datum;
694  *        maybe 0 if no unique identifier is available
695  *
696  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
697  *         GNUNET_NO to delete the item and continue (if supported)
698  */
699 static int
700 transmit_item (void *cls,
701                void *next_cls,
702                const GNUNET_HashCode * key,
703                uint32_t size,
704                const void *data,
705                enum GNUNET_BLOCK_Type type,
706                uint32_t priority,
707                uint32_t anonymity,
708                struct GNUNET_TIME_Absolute
709                expiration, uint64_t uid)
710 {
711   struct GNUNET_SERVER_Client *client = cls;
712   struct GNUNET_MessageHeader *end;
713   struct DataMessage *dm;
714
715   if (key == NULL)
716     {
717       /* transmit 'DATA_END' */
718 #if DEBUG_DATASTORE
719       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720                   "Transmitting `%s' message\n",
721                   "DATA_END");
722 #endif
723       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
724       end->size = htons(sizeof(struct GNUNET_MessageHeader));
725       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
726       transmit (client, end, NULL, NULL, GNUNET_YES);
727       GNUNET_SERVER_client_drop (client);
728       return GNUNET_OK;
729     }
730   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
731   dm->header.size = htons(sizeof(struct DataMessage) + size);
732   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
733   dm->rid = htonl(0);
734   dm->size = htonl(size);
735   dm->type = htonl(type);
736   dm->priority = htonl(priority);
737   dm->anonymity = htonl(anonymity);
738   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
739   dm->uid = GNUNET_htonll(uid);
740   dm->key = *key;
741   memcpy (&dm[1], data, size);
742 #if DEBUG_DATASTORE
743   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744               "Transmitting `%s' message for `%s' of type %u\n",
745               "DATA",
746               GNUNET_h2s (key),
747               type);
748 #endif
749   GNUNET_STATISTICS_update (stats,
750                             gettext_noop ("# results found"),
751                             1,
752                             GNUNET_NO);
753   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
754   return GNUNET_OK;
755 }
756
757
758 /**
759  * Handle RESERVE-message.
760  *
761  * @param cls closure
762  * @param client identification of the client
763  * @param message the actual message
764  */
765 static void
766 handle_reserve (void *cls,
767                 struct GNUNET_SERVER_Client *client,
768                 const struct GNUNET_MessageHeader *message)
769 {
770   /**
771    * Static counter to produce reservation identifiers.
772    */
773   static int reservation_gen;
774
775   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
776   struct ReservationList *e;
777   unsigned long long used;
778   unsigned long long req;
779   uint64_t amount;
780   uint32_t entries;
781
782 #if DEBUG_DATASTORE
783   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784               "Processing `%s' request\n",
785               "RESERVE");
786 #endif
787   amount = GNUNET_ntohll(msg->amount);
788   entries = ntohl(msg->entries);
789   used = payload + reserved;
790   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
791   if (used + req > quota)
792     {
793       if (quota < used)
794         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
795       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
796                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
797                   quota - used,
798                   "RESERVE",
799                   req);
800       if (cache_size < req)
801         {
802           /* TODO: document this in the FAQ; essentially, if this
803              message happens, the insertion request could be blocked
804              by less-important content from migration because it is
805              larger than 1/8th of the overall available space, and
806              we only reserve 1/8th for "fresh" insertions */
807           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
808                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
809                       req,
810                       cache_size);
811           transmit_status (client, 0, 
812                            gettext_noop ("Insufficient space to satisfy request and "
813                                          "requested amount is larger than cache size"));
814         }
815       else
816         {
817           transmit_status (client, 0, 
818                            gettext_noop ("Insufficient space to satisfy request"));
819         }
820       return;      
821     }
822   reserved += req;
823   GNUNET_STATISTICS_set (stats,
824                          gettext_noop ("# reserved"),
825                          reserved,
826                          GNUNET_NO);
827   e = GNUNET_malloc (sizeof(struct ReservationList));
828   e->next = reservations;
829   reservations = e;
830   e->client = client;
831   e->amount = amount;
832   e->entries = entries;
833   e->rid = ++reservation_gen;
834   if (reservation_gen < 0)
835     reservation_gen = 0; /* wrap around */
836   transmit_status (client, e->rid, NULL);
837 }
838
839
840 /**
841  * Handle RELEASE_RESERVE-message.
842  *
843  * @param cls closure
844  * @param client identification of the client
845  * @param message the actual message
846  */
847 static void
848 handle_release_reserve (void *cls,
849                         struct GNUNET_SERVER_Client *client,
850                         const struct GNUNET_MessageHeader *message)
851 {
852   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
853   struct ReservationList *pos;
854   struct ReservationList *prev;
855   struct ReservationList *next;
856   int rid = ntohl(msg->rid);
857   unsigned long long rem;
858
859 #if DEBUG_DATASTORE
860   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861               "Processing `%s' request\n",
862               "RELEASE_RESERVE");
863 #endif
864   next = reservations;
865   prev = NULL;
866   while (NULL != (pos = next))
867     {
868       next = pos->next;
869       if (rid == pos->rid)
870         {
871           if (prev == NULL)
872             reservations = next;
873           else
874             prev->next = next;
875           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
876           GNUNET_assert (reserved >= rem);
877           reserved -= rem;
878           GNUNET_STATISTICS_set (stats,
879                          gettext_noop ("# reserved"),
880                                  reserved,
881                                  GNUNET_NO);
882 #if DEBUG_DATASTORE
883           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
884                       "Returning %llu remaining reserved bytes to storage pool\n",
885                       rem);
886 #endif    
887           GNUNET_free (pos);
888           transmit_status (client, GNUNET_OK, NULL);
889           return;
890         }       
891       prev = pos;
892     }
893   GNUNET_break (0);
894   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
895 }
896
897
898 /**
899  * Check that the given message is a valid data message.
900  *
901  * @return NULL if the message is not well-formed, otherwise the message
902  */
903 static const struct DataMessage *
904 check_data (const struct GNUNET_MessageHeader *message)
905 {
906   uint16_t size;
907   uint32_t dsize;
908   const struct DataMessage *dm;
909
910   size = ntohs(message->size);
911   if (size < sizeof(struct DataMessage))
912     { 
913       GNUNET_break (0);
914       return NULL;
915     }
916   dm = (const struct DataMessage *) message;
917   dsize = ntohl(dm->size);
918   if (size != dsize + sizeof(struct DataMessage))
919     {
920       GNUNET_break (0);
921       return NULL;
922     }
923   return dm;
924 }
925
926
927 /**
928  * Context for a put request used to see if the content is
929  * already present.
930  */
931 struct PutContext
932 {
933   /**
934    * Client to notify on completion.
935    */
936   struct GNUNET_SERVER_Client *client;
937
938   /**
939    * Did we find the data already in the database?
940    */
941   int is_present;
942   
943   /* followed by the 'struct DataMessage' */
944 };
945
946
947 /**
948  * Actually put the data message.
949  */
950 static void
951 execute_put (struct GNUNET_SERVER_Client *client,
952              const struct DataMessage *dm)
953 {
954   uint32_t size;
955   char *msg;
956   int ret;
957
958   size = ntohl(dm->size);
959   msg = NULL;
960   ret = plugin->api->put (plugin->api->cls,
961                           &dm->key,
962                           size,
963                           &dm[1],
964                           ntohl(dm->type),
965                           ntohl(dm->priority),
966                           ntohl(dm->anonymity),
967                           0 /* FIXME: replication */,
968                           GNUNET_TIME_absolute_ntoh(dm->expiration),
969                           &msg);
970   if (GNUNET_OK == ret)
971     {
972       GNUNET_STATISTICS_update (stats,
973                                 gettext_noop ("# bytes stored"),
974                                 size,
975                                 GNUNET_YES);
976       GNUNET_CONTAINER_bloomfilter_add (filter,
977                                         &dm->key);
978 #if DEBUG_DATASTORE
979       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980                   "Successfully stored %u bytes of type %u under key `%s'\n",
981                   size,
982                   ntohl(dm->type),
983                   GNUNET_h2s (&dm->key));
984 #endif
985     }
986   transmit_status (client, 
987                    ret,
988                    msg);
989   GNUNET_free_non_null (msg);
990   if (quota - reserved - cache_size < payload)
991     {
992       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
993                   _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
994                   (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
995                   (unsigned long long) (quota - reserved - cache_size),
996                   (unsigned long long) payload);
997       manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
998     }
999 }
1000
1001
1002 /**
1003  * Function that will check if the given datastore entry
1004  * matches the put and if none match executes the put.
1005  *
1006  * @param cls closure, pointer to the client (of type 'struct PutContext').
1007  * @param next_cls closure to use to ask for the next item
1008  * @param key key for the content
1009  * @param size number of bytes in data
1010  * @param data content stored
1011  * @param type type of the content
1012  * @param priority priority of the content
1013  * @param anonymity anonymity-level for the content
1014  * @param expiration expiration time for the content
1015  * @param uid unique identifier for the datum;
1016  *        maybe 0 if no unique identifier is available
1017  *
1018  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
1019  *         GNUNET_NO to delete the item and continue (if supported)
1020  */
1021 static int
1022 check_present (void *cls,
1023                void *next_cls,
1024                const GNUNET_HashCode * key,
1025                uint32_t size,
1026                const void *data,
1027                enum GNUNET_BLOCK_Type type,
1028                uint32_t priority,
1029                uint32_t anonymity,
1030                struct GNUNET_TIME_Absolute
1031                expiration, uint64_t uid)
1032 {
1033   struct PutContext *pc = cls;
1034   const struct DataMessage *dm;
1035
1036   dm = (const struct DataMessage*) &pc[1];
1037   if (key == NULL)
1038     {
1039       if (pc->is_present == GNUNET_YES) 
1040         transmit_status (pc->client, GNUNET_NO, NULL);
1041       else
1042         execute_put (pc->client, dm);
1043       GNUNET_SERVER_client_drop (pc->client);
1044       GNUNET_free (pc);
1045       return GNUNET_SYSERR;
1046     }
1047   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
1048        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
1049        ( (size == ntohl(dm->size)) &&
1050          (0 == memcmp (&dm[1],
1051                        data,
1052                        size)) ) )
1053     {
1054       pc->is_present = GNUNET_YES;
1055       plugin->api->next_request (next_cls, GNUNET_YES);
1056     }
1057   else
1058     {
1059       plugin->api->next_request (next_cls, GNUNET_NO);
1060     }
1061   return GNUNET_OK;
1062 }
1063
1064
1065 /**
1066  * Handle PUT-message.
1067  *
1068  * @param cls closure
1069  * @param client identification of the client
1070  * @param message the actual message
1071  */
1072 static void
1073 handle_put (void *cls,
1074             struct GNUNET_SERVER_Client *client,
1075             const struct GNUNET_MessageHeader *message)
1076 {
1077   const struct DataMessage *dm = check_data (message);
1078   int rid;
1079   struct ReservationList *pos;
1080   struct PutContext *pc;
1081   uint32_t size;
1082
1083   if ( (dm == NULL) ||
1084        (ntohl(dm->type) == 0) ) 
1085     {
1086       GNUNET_break (0);
1087       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1088       return;
1089     }
1090 #if DEBUG_DATASTORE
1091   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1092               "Processing `%s' request for `%s' of type %u\n",
1093               "PUT",
1094               GNUNET_h2s (&dm->key),
1095               ntohl (dm->type));
1096 #endif
1097   rid = ntohl(dm->rid);
1098   size = ntohl(dm->size);
1099   if (rid > 0)
1100     {
1101       pos = reservations;
1102       while ( (NULL != pos) &&
1103               (rid != pos->rid) )
1104         pos = pos->next;
1105       GNUNET_break (pos != NULL);
1106       if (NULL != pos)
1107         {
1108           GNUNET_break (pos->entries > 0);
1109           GNUNET_break (pos->amount >= size);
1110           pos->entries--;
1111           pos->amount -= size;
1112           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1113           GNUNET_STATISTICS_set (stats,
1114                                  gettext_noop ("# reserved"),
1115                                  reserved,
1116                                  GNUNET_NO);
1117         }
1118     }
1119   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1120                                                        &dm->key))
1121     {
1122       pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1123       pc->client = client;
1124       GNUNET_SERVER_client_keep (client);
1125       memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1126       plugin->api->get (plugin->api->cls,
1127                         &dm->key,
1128                         NULL,
1129                         ntohl (dm->type),
1130                         &check_present,
1131                         pc);      
1132       return;
1133     }
1134   execute_put (client, dm);
1135 }
1136
1137
1138 /**
1139  * Handle GET-message.
1140  *
1141  * @param cls closure
1142  * @param client identification of the client
1143  * @param message the actual message
1144  */
1145 static void
1146 handle_get (void *cls,
1147             struct GNUNET_SERVER_Client *client,
1148             const struct GNUNET_MessageHeader *message)
1149 {
1150   const struct GetMessage *msg;
1151   uint16_t size;
1152
1153   size = ntohs(message->size);
1154   if ( (size != sizeof(struct GetMessage)) &&
1155        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
1156     {
1157       GNUNET_break (0);
1158       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1159       return;
1160     }
1161   msg = (const struct GetMessage*) message;
1162 #if DEBUG_DATASTORE
1163   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1164               "Processing `%s' request for `%s' of type %u\n",
1165               "GET",
1166               GNUNET_h2s (&msg->key),
1167               ntohl (msg->type));
1168 #endif
1169   GNUNET_STATISTICS_update (stats,
1170                             gettext_noop ("# GET requests received"),
1171                             1,
1172                             GNUNET_NO);
1173   GNUNET_SERVER_client_keep (client);
1174   if ( (size == sizeof(struct GetMessage)) &&
1175        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
1176                                                          &msg->key)) )
1177     {
1178       /* don't bother database... */
1179 #if DEBUG_DATASTORE
1180       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1181                   "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1182                   "GET",
1183                   GNUNET_h2s (&msg->key));
1184 #endif  
1185       GNUNET_STATISTICS_update (stats,
1186                                 gettext_noop ("# requests filtered by bloomfilter"),
1187                                 1,
1188                                 GNUNET_NO);
1189       transmit_item (client,
1190                      NULL, NULL, 0, NULL, 0, 0, 0, 
1191                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
1192       return;
1193     }
1194   plugin->api->get (plugin->api->cls,
1195                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1196                     NULL,
1197                     ntohl(msg->type),
1198                     &transmit_item,
1199                     client);    
1200 }
1201
1202
1203 /**
1204  * Handle UPDATE-message.
1205  *
1206  * @param cls closure
1207  * @param client identification of the client
1208  * @param message the actual message
1209  */
1210 static void
1211 handle_update (void *cls,
1212                struct GNUNET_SERVER_Client *client,
1213                const struct GNUNET_MessageHeader *message)
1214 {
1215   const struct UpdateMessage *msg;
1216   int ret;
1217   char *emsg;
1218
1219   GNUNET_STATISTICS_update (stats,
1220                             gettext_noop ("# UPDATE requests received"),
1221                             1,
1222                             GNUNET_NO);
1223   msg = (const struct UpdateMessage*) message;
1224   emsg = NULL;
1225 #if DEBUG_DATASTORE
1226   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1227               "Processing `%s' request for %llu\n",
1228               "UPDATE",
1229               (unsigned long long) GNUNET_ntohll (msg->uid));
1230 #endif
1231   ret = plugin->api->update (plugin->api->cls,
1232                              GNUNET_ntohll(msg->uid),
1233                              (int32_t) ntohl(msg->priority),
1234                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1235                              &emsg);
1236   transmit_status (client, ret, emsg);
1237   GNUNET_free_non_null (emsg);
1238 }
1239
1240
1241 /**
1242  * Handle GET_REPLICATION-message.
1243  *
1244  * @param cls closure
1245  * @param client identification of the client
1246  * @param message the actual message
1247  */
1248 static void
1249 handle_get_replication (void *cls,
1250                         struct GNUNET_SERVER_Client *client,
1251                         const struct GNUNET_MessageHeader *message)
1252 {
1253 #if DEBUG_DATASTORE
1254   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1255               "Processing `%s' request\n",
1256               "GET_REPLICATION");
1257 #endif
1258   GNUNET_STATISTICS_update (stats,
1259                             gettext_noop ("# GET REPLICATION requests received"),
1260                             1,
1261                             GNUNET_NO);
1262   GNUNET_SERVER_client_keep (client);
1263   plugin->api->replication_get (plugin->api->cls,
1264                                 &transmit_item,
1265                                 client);  
1266 }
1267
1268
1269 /**
1270  * Handle GET_ZERO_ANONYMITY-message.
1271  *
1272  * @param cls closure
1273  * @param client identification of the client
1274  * @param message the actual message
1275  */
1276 static void
1277 handle_get_zero_anonymity (void *cls,
1278                            struct GNUNET_SERVER_Client *client,
1279                            const struct GNUNET_MessageHeader *message)
1280 {
1281   const struct GetZeroAnonymityMessage * msg = (const struct GetZeroAnonymityMessage*) message;
1282   enum GNUNET_BLOCK_Type type;
1283
1284   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1285   if (type == GNUNET_BLOCK_TYPE_ANY)
1286     {
1287       GNUNET_break (0);
1288       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1289       return;
1290     }
1291 #if DEBUG_DATASTORE
1292   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1293               "Processing `%s' request\n",
1294               "GET_ZERO_ANONYMITY");
1295 #endif
1296   GNUNET_STATISTICS_update (stats,
1297                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1298                             1,
1299                             GNUNET_NO);
1300   GNUNET_SERVER_client_keep (client);
1301   plugin->api->iter_zero_anonymity (plugin->api->cls,
1302                                     type,
1303                                     &transmit_item,
1304                                     client);  
1305 }
1306
1307
1308 /**
1309  * Context for the 'remove_callback'.
1310  */
1311 struct RemoveContext 
1312 {
1313   /**
1314    * Client for whom we're doing the remvoing.
1315    */
1316   struct GNUNET_SERVER_Client *client;
1317
1318   /**
1319    * GNUNET_YES if we managed to remove something.
1320    */
1321   int found;
1322 };
1323
1324
1325 /**
1326  * Callback function that will cause the item that is passed
1327  * in to be deleted (by returning GNUNET_NO).
1328  */
1329 static int
1330 remove_callback (void *cls,
1331                  void *next_cls,
1332                  const GNUNET_HashCode * key,
1333                  uint32_t size,
1334                  const void *data,
1335                  enum GNUNET_BLOCK_Type type,
1336                  uint32_t priority,
1337                  uint32_t anonymity,
1338                  struct GNUNET_TIME_Absolute
1339                  expiration, uint64_t uid)
1340 {
1341   struct RemoveContext *rc = cls;
1342
1343   if (key == NULL)
1344     {
1345 #if DEBUG_DATASTORE
1346       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1347                   "No further matches for `%s' request.\n",
1348                   "REMOVE");
1349 #endif  
1350       if (GNUNET_YES == rc->found)
1351         transmit_status (rc->client, GNUNET_OK, NULL);       
1352       else
1353         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1354       GNUNET_SERVER_client_drop (rc->client);
1355       GNUNET_free (rc);
1356       return GNUNET_OK; /* last item */
1357     }
1358   rc->found = GNUNET_YES;
1359 #if DEBUG_DATASTORE
1360   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1361               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1362               (unsigned long long) uid,
1363               "REMOVE",
1364               GNUNET_h2s (key),
1365               type);
1366 #endif  
1367   GNUNET_STATISTICS_update (stats,
1368                             gettext_noop ("# bytes removed (explicit request)"),
1369                             size,
1370                             GNUNET_YES);
1371   GNUNET_CONTAINER_bloomfilter_remove (filter,
1372                                        key);
1373   plugin->api->next_request (next_cls, GNUNET_YES);
1374   return GNUNET_NO;
1375 }
1376
1377
1378 /**
1379  * Handle REMOVE-message.
1380  *
1381  * @param cls closure
1382  * @param client identification of the client
1383  * @param message the actual message
1384  */
1385 static void
1386 handle_remove (void *cls,
1387              struct GNUNET_SERVER_Client *client,
1388              const struct GNUNET_MessageHeader *message)
1389 {
1390   const struct DataMessage *dm = check_data (message);
1391   GNUNET_HashCode vhash;
1392   struct RemoveContext *rc;
1393
1394   if (dm == NULL)
1395     {
1396       GNUNET_break (0);
1397       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1398       return;
1399     }
1400 #if DEBUG_DATASTORE
1401   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1402               "Processing `%s' request for `%s' of type %u\n",
1403               "REMOVE",
1404               GNUNET_h2s (&dm->key),
1405               ntohl (dm->type));
1406 #endif
1407   GNUNET_STATISTICS_update (stats,
1408                             gettext_noop ("# REMOVE requests received"),
1409                             1,
1410                             GNUNET_NO);
1411   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1412   GNUNET_SERVER_client_keep (client);
1413   rc->client = client;
1414   GNUNET_CRYPTO_hash (&dm[1],
1415                       ntohl(dm->size),
1416                       &vhash);
1417   plugin->api->get (plugin->api->cls,
1418                     &dm->key,
1419                     &vhash,
1420                     (enum GNUNET_BLOCK_Type) ntohl(dm->type),
1421                     &remove_callback,
1422                     rc);
1423 }
1424
1425
1426 /**
1427  * Handle DROP-message.
1428  *
1429  * @param cls closure
1430  * @param client identification of the client
1431  * @param message the actual message
1432  */
1433 static void
1434 handle_drop (void *cls,
1435              struct GNUNET_SERVER_Client *client,
1436              const struct GNUNET_MessageHeader *message)
1437 {
1438 #if DEBUG_DATASTORE
1439   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1440               "Processing `%s' request\n",
1441               "DROP");
1442 #endif
1443   plugin->api->drop (plugin->api->cls);
1444   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1445 }
1446
1447
1448 /**
1449  * Function called by plugins to notify us about a
1450  * change in their disk utilization.
1451  *
1452  * @param cls closure (NULL)
1453  * @param delta change in disk utilization, 
1454  *        0 for "reset to empty"
1455  */
1456 static void
1457 disk_utilization_change_cb (void *cls,
1458                             int delta)
1459 {
1460   if ( (delta < 0) &&
1461        (payload < -delta) )
1462     {
1463       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1464                   _("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1465                   (long long) payload,
1466                   (long long) -delta);
1467       payload = plugin->api->get_size (plugin->api->cls);
1468       sync_stats ();
1469       return;
1470     }
1471   payload += delta;
1472   lastSync++;
1473   if (lastSync >= MAX_STAT_SYNC_LAG)
1474     sync_stats ();
1475 }
1476
1477
1478 /**
1479  * Callback function to process statistic values.
1480  *
1481  * @param cls closure (struct Plugin*)
1482  * @param subsystem name of subsystem that created the statistic
1483  * @param name the name of the datum
1484  * @param value the current value
1485  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1486  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1487  */
1488 static int
1489 process_stat_in (void *cls,
1490                  const char *subsystem,
1491                  const char *name,
1492                  uint64_t value,
1493                  int is_persistent)
1494 {
1495   GNUNET_assert (stats_worked == GNUNET_NO);
1496   stats_worked = GNUNET_YES;
1497   payload += value;
1498 #if DEBUG_SQLITE
1499   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1500               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1501               abs_value,
1502               payload);
1503 #endif
1504   return GNUNET_OK;
1505 }
1506
1507
1508 static void
1509 process_stat_done (void *cls,
1510                    int success)
1511 {
1512   struct DatastorePlugin *plugin = cls;
1513
1514   stat_get = NULL;
1515   if (stats_worked == GNUNET_NO) 
1516     payload = plugin->api->get_size (plugin->api->cls);
1517 }
1518
1519
1520 /**
1521  * Load the datastore plugin.
1522  */
1523 static struct DatastorePlugin *
1524 load_plugin () 
1525 {
1526   struct DatastorePlugin *ret;
1527   char *libname;
1528   char *name;
1529
1530   if (GNUNET_OK !=
1531       GNUNET_CONFIGURATION_get_value_string (cfg,
1532                                              "DATASTORE", "DATABASE", &name))
1533     {
1534       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1535                   _("No `%s' specified for `%s' in configuration!\n"),
1536                   "DATABASE",
1537                   "DATASTORE");
1538       return NULL;
1539     }
1540   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1541   ret->env.cfg = cfg;
1542   ret->env.duc = &disk_utilization_change_cb;
1543   ret->env.cls = NULL;
1544   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1545               _("Loading `%s' datastore plugin\n"), name);
1546   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1547   ret->short_name = name;
1548   ret->lib_name = libname;
1549   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1550   if (ret->api == NULL)
1551     {
1552       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1553                   _("Failed to load datastore plugin for `%s'\n"), name);
1554       GNUNET_free (ret->short_name);
1555       GNUNET_free (libname);
1556       GNUNET_free (ret);
1557       return NULL;
1558     }
1559   return ret;
1560 }
1561
1562
1563 /**
1564  * Function called when the service shuts
1565  * down.  Unloads our datastore plugin.
1566  *
1567  * @param plug plugin to unload
1568  */
1569 static void
1570 unload_plugin (struct DatastorePlugin *plug)
1571 {
1572 #if DEBUG_DATASTORE
1573   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1574               "Datastore service is unloading plugin...\n");
1575 #endif
1576   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1577   GNUNET_free (plug->lib_name);
1578   GNUNET_free (plug->short_name);
1579   GNUNET_free (plug);
1580 }
1581
1582
1583 /**
1584  * Final task run after shutdown.  Unloads plugins and disconnects us from
1585  * statistics.
1586  */
1587 static void
1588 unload_task (void *cls,
1589              const struct GNUNET_SCHEDULER_TaskContext *tc)
1590 {
1591   unload_plugin (plugin);
1592   plugin = NULL;
1593   if (filter != NULL)
1594     {
1595       GNUNET_CONTAINER_bloomfilter_free (filter);
1596       filter = NULL;
1597     }
1598   if (lastSync > 0)
1599     sync_stats ();
1600   if (stat_get != NULL)
1601     {
1602       GNUNET_STATISTICS_get_cancel (stat_get);
1603       stat_get = NULL;
1604     }
1605   if (stats != NULL)
1606     {
1607       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1608       stats = NULL;
1609     }
1610 }
1611
1612
1613 /**
1614  * Last task run during shutdown.  Disconnects us from
1615  * the transport and core.
1616  */
1617 static void
1618 cleaning_task (void *cls,
1619                const struct GNUNET_SCHEDULER_TaskContext *tc)
1620 {
1621   struct TransmitCallbackContext *tcc;
1622
1623   cleaning_done = GNUNET_YES;
1624   while (NULL != (tcc = tcc_head))
1625     {
1626       GNUNET_CONTAINER_DLL_remove (tcc_head,
1627                                    tcc_tail,
1628                                    tcc);
1629       if (tcc->th != NULL)
1630         {
1631           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1632           GNUNET_SERVER_client_drop (tcc->client);
1633         }
1634       if (NULL != tcc->tc)
1635         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1636       GNUNET_free (tcc->msg);
1637       GNUNET_free (tcc);
1638     }
1639   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1640     {
1641       GNUNET_SCHEDULER_cancel (expired_kill_task);
1642       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1643     }
1644   GNUNET_SCHEDULER_add_continuation (&unload_task,
1645                                      NULL,
1646                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1647 }
1648
1649
1650 /**
1651  * Function that removes all active reservations made
1652  * by the given client and releases the space for other
1653  * requests.
1654  *
1655  * @param cls closure
1656  * @param client identification of the client
1657  */
1658 static void
1659 cleanup_reservations (void *cls,
1660                       struct GNUNET_SERVER_Client *client)
1661 {
1662   struct ReservationList *pos;
1663   struct ReservationList *prev;
1664   struct ReservationList *next;
1665
1666   if (client == NULL)
1667     return;
1668   prev = NULL;
1669   pos = reservations;
1670   while (NULL != pos)
1671     {
1672       next = pos->next;
1673       if (pos->client == client)
1674         {
1675           if (prev == NULL)
1676             reservations = next;
1677           else
1678             prev->next = next;
1679           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1680           GNUNET_free (pos);
1681         }
1682       else
1683         {
1684           prev = pos;
1685         }
1686       pos = next;
1687     }
1688   GNUNET_STATISTICS_set (stats,
1689                          gettext_noop ("# reserved"),
1690                          reserved,
1691                          GNUNET_NO);
1692 }
1693
1694
1695 /**
1696  * Process datastore requests.
1697  *
1698  * @param cls closure
1699  * @param server the initialized server
1700  * @param c configuration to use
1701  */
1702 static void
1703 run (void *cls,
1704      struct GNUNET_SERVER_Handle *server,
1705      const struct GNUNET_CONFIGURATION_Handle *c)
1706 {
1707   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1708     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1709      sizeof(struct ReserveMessage) }, 
1710     {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1711      sizeof(struct ReleaseReserveMessage) }, 
1712     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1713     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1714      sizeof (struct UpdateMessage) }, 
1715     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1716     {&handle_get_replication, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, 
1717      sizeof(struct GNUNET_MessageHeader) }, 
1718     {&handle_get_zero_anonymity, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, 
1719      sizeof(struct GetZeroAnonymityMessage) }, 
1720     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1721     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1722      sizeof(struct GNUNET_MessageHeader) }, 
1723     {NULL, NULL, 0, 0}
1724   };
1725   char *fn;
1726   unsigned int bf_size;
1727
1728   cfg = c;
1729   if (GNUNET_OK !=
1730       GNUNET_CONFIGURATION_get_value_number (cfg,
1731                                              "DATASTORE", "QUOTA", &quota))
1732     {
1733       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1734                   _("No `%s' specified for `%s' in configuration!\n"),
1735                   "QUOTA",
1736                   "DATASTORE");
1737       return;
1738     }
1739   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1740   GNUNET_STATISTICS_set (stats,
1741                          gettext_noop ("# quota"),
1742                          quota,
1743                          GNUNET_NO);
1744   cache_size = quota / 8; /* Or should we make this an option? */
1745   GNUNET_STATISTICS_set (stats,
1746                          gettext_noop ("# cache size"),
1747                          cache_size,
1748                          GNUNET_NO);
1749   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1750   fn = NULL;
1751   if ( (GNUNET_OK !=
1752         GNUNET_CONFIGURATION_get_value_filename (cfg,
1753                                                  "DATASTORE",
1754                                                  "BLOOMFILTER",
1755                                                  &fn)) ||
1756        (GNUNET_OK !=
1757         GNUNET_DISK_directory_create_for_file (fn)) )
1758     {
1759       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1760                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1761                   fn != NULL ? fn : "");
1762       GNUNET_free_non_null (fn);
1763       fn = NULL;
1764     }
1765   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1766   GNUNET_free_non_null (fn);
1767   if (filter == NULL)
1768     {
1769       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1770                   _("Failed to initialize bloomfilter.\n"));
1771       if (stats != NULL)
1772         {
1773           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1774           stats = NULL;
1775         }
1776       return;
1777     }
1778   plugin = load_plugin ();
1779   if (NULL == plugin)
1780     {
1781       GNUNET_CONTAINER_bloomfilter_free (filter);
1782       filter = NULL;
1783       if (stats != NULL)
1784         {
1785           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1786           stats = NULL;
1787         }
1788       return;
1789     }
1790   stat_get = GNUNET_STATISTICS_get (stats,
1791                                     "datastore",
1792                                     QUOTA_STAT_NAME,
1793                                     GNUNET_TIME_UNIT_SECONDS,
1794                                     &process_stat_done,
1795                                     &process_stat_in,
1796                                     plugin);
1797   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1798   GNUNET_SERVER_add_handlers (server, handlers);
1799   expired_kill_task
1800     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1801                                           &delete_expired, NULL);
1802   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1803                                 &cleaning_task, NULL);
1804 }
1805
1806
1807 /**
1808  * The main function for the datastore service.
1809  *
1810  * @param argc number of arguments from the command line
1811  * @param argv command line arguments
1812  * @return 0 ok, 1 on error
1813  */
1814 int
1815 main (int argc, char *const *argv)
1816 {
1817   int ret;
1818
1819   ret = (GNUNET_OK ==
1820          GNUNET_SERVICE_run (argc,
1821                              argv,
1822                              "datastore",
1823                              GNUNET_SERVICE_OPTION_NONE,
1824                              &run, NULL)) ? 0 : 1;
1825   return ret;
1826 }
1827
1828
1829 /* end of gnunet-service-datastore.c */