quota management and better name for NO_TASK'
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "plugin_datastore.h"
31 #include "datastore.h"
32
33 /**
34  * How many messages do we queue at most per client?
35  */
36 #define MAX_PENDING 1024
37
38 /**
39  * How long are we at most keeping "expired" content
40  * past the expiration date in the database?
41  */
42 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
43
44
45
46 /**
47  * Our datastore plugin.
48  */
49 struct DatastorePlugin
50 {
51
52   /**
53    * API of the transport as returned by the plugin's
54    * initialization function.
55    */
56   struct GNUNET_DATASTORE_PluginFunctions *api;
57
58   /**
59    * Short name for the plugin (i.e. "sqlite").
60    */
61   char *short_name;
62
63   /**
64    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
65    */
66   char *lib_name;
67
68   /**
69    * Environment this transport service is using
70    * for this plugin.
71    */
72   struct GNUNET_DATASTORE_PluginEnvironment env;
73
74 };
75
76
77 /**
78  * Linked list of active reservations.
79  */
80 struct ReservationList 
81 {
82
83   /**
84    * This is a linked list.
85    */
86   struct ReservationList *next;
87
88   /**
89    * Client that made the reservation.
90    */
91   struct GNUNET_SERVER_Client *client;
92
93   /**
94    * Number of bytes (still) reserved.
95    */
96   uint64_t amount;
97
98   /**
99    * Number of items (still) reserved.
100    */
101   uint64_t entries;
102
103   /**
104    * Reservation identifier.
105    */
106   int32_t rid;
107
108 };
109
110
111 /**
112  * Our datastore plugin (NULL if not available).
113  */
114 static struct DatastorePlugin *plugin;
115
116 /**
117  * Linked list of space reservations made by clients.
118  */
119 static struct ReservationList *reservations;
120
121 /**
122  * Bloomfilter to quickly tell if we don't have the content.
123  */
124 static struct GNUNET_CONTAINER_BloomFilter *filter;
125
126 /**
127  * Static counter to produce reservation identifiers.
128  */
129 static int reservation_gen;
130
131 /**
132  * How much space are we allowed to use?
133  */
134 static unsigned long long quota;
135
136 /**
137  * How much space are we using for the cache?
138  * (space available for insertions that will be
139  *  instantly reclaimed by discarding less 
140  *  important content --- or possibly whatever
141  *  we just inserted into the "cache").
142  */
143 static unsigned long long cache_size;
144
145 /**
146  * How much space have we currently reserved?
147  */
148 static unsigned long long reserved;
149
150 /**
151  * Identity of the task that is used to delete
152  * expired content.
153  */
154 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
155
156 /**
157  * Our configuration.
158  */
159 struct GNUNET_CONFIGURATION_Handle *cfg;
160
161 /**
162  * Our scheduler.
163  */
164 struct GNUNET_SCHEDULER_Handle *sched; 
165
166 /**
167  * Function called once the transmit operation has
168  * either failed or succeeded.
169  *
170  * @param cls closure
171  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
172  */
173 typedef void (*TransmitContinuation)(void *cls,
174                                      int status);
175
176
177 struct TransmitCallbackContext 
178 {
179   /**
180    * The message that we're asked to transmit.
181    */
182   struct GNUNET_MessageHeader *msg;
183
184   /**
185    * Client that we are transmitting to.
186    */
187   struct GNUNET_SERVER_Client *client;
188
189   /**
190    * Function to call once msg has been transmitted
191    * (or at least added to the buffer).
192    */
193   TransmitContinuation tc;
194
195   /**
196    * Closure for tc.
197    */
198   void *tc_cls;
199
200   /**
201    * GNUNET_YES if we are supposed to signal the server
202    * completion of the client's request.
203    */
204   int end;
205 };
206
207
208 /**
209  * Task that is used to remove expired entries from
210  * the datastore.  This task will schedule itself
211  * again automatically to always delete all expired
212  * content quickly.
213  *
214  * @param cls not used
215  * @param tc task context
216  */ 
217 static void
218 delete_expired (void *cls,
219                 const struct GNUNET_SCHEDULER_TaskContext *tc);
220
221
222 /**
223  * Iterate over the expired items stored in the datastore.
224  * Delete all expired items; once we have processed all
225  * expired items, re-schedule the "delete_expired" task.
226  *
227  * @param cls not used
228  * @param next_cls closure to pass to the "next" function.
229  * @param key key for the content
230  * @param size number of bytes in data
231  * @param data content stored
232  * @param type type of the content
233  * @param priority priority of the content
234  * @param anonymity anonymity-level for the content
235  * @param expiration expiration time for the content
236  * @param uid unique identifier for the datum;
237  *        maybe 0 if no unique identifier is available
238  *
239  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
240  *         (continue on call to "next", of course),
241  *         GNUNET_NO to delete the item and continue (if supported)
242  */
243 static int 
244 expired_processor (void *cls,
245                    void *next_cls,
246                    const GNUNET_HashCode * key,
247                    uint32_t size,
248                    const void *data,
249                    uint32_t type,
250                    uint32_t priority,
251                    uint32_t anonymity,
252                    struct GNUNET_TIME_Absolute
253                    expiration, 
254                    uint64_t uid)
255 {
256   struct GNUNET_TIME_Absolute now;
257
258   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
259   if (key == NULL) 
260     {
261       expired_kill_task 
262         = GNUNET_SCHEDULER_add_delayed (sched,
263                                         GNUNET_NO,
264                                         GNUNET_SCHEDULER_PRIORITY_IDLE,
265                                         GNUNET_SCHEDULER_NO_TASK,
266                                         MAX_EXPIRE_DELAY,
267                                         &delete_expired,
268                                         NULL);
269       return GNUNET_SYSERR;
270     }
271   now = GNUNET_TIME_absolute_get ();
272   if (expiration.value > now.value)
273     {
274       /* finished processing */
275       plugin->api->next_request (next_cls, GNUNET_YES);
276       return GNUNET_SYSERR;
277     }
278   plugin->api->next_request (next_cls, GNUNET_NO);
279   return GNUNET_NO; /* delete */
280 }
281
282
283 /**
284  * Task that is used to remove expired entries from
285  * the datastore.  This task will schedule itself
286  * again automatically to always delete all expired
287  * content quickly.
288  *
289  * @param cls not used
290  * @param tc task context
291  */ 
292 static void
293 delete_expired (void *cls,
294                 const struct GNUNET_SCHEDULER_TaskContext *tc)
295 {
296   plugin->api->iter_ascending_expiration (plugin->api->cls, 
297                                           0,
298                                           &expired_processor,
299                                           NULL);
300 }
301
302
303 /**
304  * An iterator over a set of items stored in the datastore.
305  *
306  * @param cls closure
307  * @param next_cls closure to pass to the "next" function.
308  * @param key key for the content
309  * @param size number of bytes in data
310  * @param data content stored
311  * @param type type of the content
312  * @param priority priority of the content
313  * @param anonymity anonymity-level for the content
314  * @param expiration expiration time for the content
315  * @param uid unique identifier for the datum;
316  *        maybe 0 if no unique identifier is available
317  *
318  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
319  *         (continue on call to "next", of course),
320  *         GNUNET_NO to delete the item and continue (if supported)
321  */
322 static int 
323 manage (void *cls,
324         void *next_cls,
325         const GNUNET_HashCode * key,
326         uint32_t size,
327         const void *data,
328         uint32_t type,
329         uint32_t priority,
330         uint32_t anonymity,
331         struct GNUNET_TIME_Absolute
332         expiration, 
333         uint64_t uid)
334 {
335   unsigned long long *need = cls;
336
337   if (NULL == key)
338     {
339       GNUNET_free (need);
340       return GNUNET_SYSERR;
341     }
342   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
343     *need = 0;
344   else
345     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
346   plugin->api->next_request (next_cls, 
347                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
348   return GNUNET_NO;
349 }
350
351
352 /**
353  * Manage available disk space by running tasks
354  * that will discard content if necessary.  This
355  * function will be run whenever a request for
356  * "need" bytes of storage could only be satisfied
357  * by eating into the "cache" (and we want our cache
358  * space back).
359  *
360  * @param need number of bytes of content that were
361  *        placed into the "cache" (and hence the
362  *        number of bytes that should be removed).
363  */
364 static void
365 manage_space (unsigned long long need)
366 {
367   unsigned long long *n;
368
369   n = GNUNET_malloc (sizeof(unsigned long long));
370   *n = need;
371   plugin->api->iter_low_priority (plugin->api->cls,
372                                   0,
373                                   &manage,
374                                   n);
375 }
376
377
378 /**
379  * Function called to notify a client about the socket
380  * begin ready to queue more data.  "buf" will be
381  * NULL and "size" zero if the socket was closed for
382  * writing in the meantime.
383  *
384  * @param cls closure
385  * @param size number of bytes available in buf
386  * @param buf where the callee should write the message
387  * @return number of bytes written to buf
388  */
389 static size_t
390 transmit_callback (void *cls,
391                    size_t size, void *buf)
392 {
393   struct TransmitCallbackContext *tcc = cls;
394   size_t msize;
395   
396   msize = ntohs(tcc->msg->size);
397   if (size == 0)
398     {
399 #if DEBUG_DATASTORE
400       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
401                   "Transmission failed.\n");
402 #endif
403       if (tcc->tc != NULL)
404         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
405       if (GNUNET_YES == tcc->end)
406         {
407           GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
408         }
409       GNUNET_free (tcc->msg);
410       GNUNET_free (tcc);
411       return 0;
412     }
413   GNUNET_assert (size >= msize);
414   memcpy (buf, tcc->msg, msize);
415   if (tcc->tc != NULL)
416     tcc->tc (tcc->tc_cls, GNUNET_OK);
417   if (GNUNET_YES == tcc->end)
418     {
419 #if DEBUG_DATASTORE
420       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
421                   "Request completed, ready for the next request!\n");
422 #endif
423       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
424     }
425   else
426     {
427 #if DEBUG_DATASTORE
428       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
429                   "Response transmitted, more pending!\n");
430 #endif
431     }
432   GNUNET_free (tcc->msg);
433   GNUNET_free (tcc);
434   return msize;
435 }
436
437
438 /**
439  * Transmit the given message to the client.
440  *
441  * @param client target of the message
442  * @param msg message to transmit, will be freed!
443  * @param end is this the last response (and we should
444  *        signal the server completion accodingly after
445  *        transmitting this message)?
446  */
447 static void
448 transmit (struct GNUNET_SERVER_Client *client,
449           struct GNUNET_MessageHeader *msg,
450           TransmitContinuation tc,
451           void *tc_cls,
452           int end)
453 {
454   struct TransmitCallbackContext *tcc;
455
456   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
457   tcc->msg = msg;
458   tcc->client = client;
459   tcc->tc = tc;
460   tcc->tc_cls = tc_cls;
461   tcc->end = end;
462
463   if (NULL ==
464       GNUNET_SERVER_notify_transmit_ready (client,
465                                            ntohs(msg->size),
466                                            GNUNET_TIME_UNIT_FOREVER_REL,
467                                            &transmit_callback,
468                                            tcc))
469     {
470       GNUNET_break (0);
471       if (GNUNET_YES == end)
472         {
473 #if DEBUG_DATASTORE
474           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
475                       "Disconnecting client.\n");
476 #endif    
477           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
478         }
479       if (NULL != tc)
480         tc (tc_cls, GNUNET_SYSERR);
481       GNUNET_free (msg);
482       GNUNET_free (tcc);
483     }
484 }
485
486
487 /**
488  * Transmit a status code to the client.
489  *
490  * @param client receiver of the response
491  * @param code status code
492  * @param msg optional error message (can be NULL)
493  */
494 static void
495 transmit_status (struct GNUNET_SERVER_Client *client,
496                  int code,
497                  const char *msg)
498 {
499   struct StatusMessage *sm;
500   size_t slen;
501
502 #if DEBUG_DATASTORE
503   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
504               "Transmitting `%s' message with value %d and message %s\n",
505               "STATUS",
506               code,
507               msg != NULL ? msg : "(none)");
508 #endif
509   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
510   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
511   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
512   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
513   sm->status = htonl(code);
514   memcpy (&sm[1], msg, slen);  
515   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
516 }
517
518
519 /**
520  * Function called once the transmit operation has
521  * either failed or succeeded.
522  *
523  * @param cls closure
524  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
525  */
526 static void 
527 get_next(void *next_cls,
528          int status)
529 {
530   if (status != GNUNET_OK)
531     {
532       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
533                   _("Failed to transmit an item to the client; aborting iteration.\n"));    
534       plugin->api->next_request (next_cls, GNUNET_YES);
535       return;
536     }
537   plugin->api->next_request (next_cls, GNUNET_NO);
538 }
539
540
541 /**
542  * Function that will transmit the given datastore entry
543  * to the client.
544  *
545  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
546  * @param next_cls closure to use to ask for the next item
547  * @param key key for the content
548  * @param size number of bytes in data
549  * @param data content stored
550  * @param type type of the content
551  * @param priority priority of the content
552  * @param anonymity anonymity-level for the content
553  * @param expiration expiration time for the content
554  * @param uid unique identifier for the datum;
555  *        maybe 0 if no unique identifier is available
556  *
557  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
558  *         GNUNET_NO to delete the item and continue (if supported)
559  */
560 static int
561 transmit_item (void *cls,
562                void *next_cls,
563                const GNUNET_HashCode * key,
564                uint32_t size,
565                const void *data,
566                uint32_t type,
567                uint32_t priority,
568                uint32_t anonymity,
569                struct GNUNET_TIME_Absolute
570                expiration, uint64_t uid)
571 {
572   struct GNUNET_SERVER_Client *client = cls;
573   struct GNUNET_MessageHeader *end;
574   struct DataMessage *dm;
575
576   if (key == NULL)
577     {
578       /* transmit 'DATA_END' */
579 #if DEBUG_DATASTORE
580       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
581                   "Transmitting `%s' message\n",
582                   "DATA_END");
583 #endif
584       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
585       end->size = htons(sizeof(struct GNUNET_MessageHeader));
586       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
587       transmit (client, end, NULL, NULL, GNUNET_YES);
588       GNUNET_SERVER_client_drop (client);
589       return GNUNET_OK;
590     }
591   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
592   dm->header.size = htons(sizeof(struct DataMessage) + size);
593   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
594   dm->rid = htonl(0);
595   dm->size = htonl(size);
596   dm->type = htonl(type);
597   dm->priority = htonl(priority);
598   dm->anonymity = htonl(anonymity);
599   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
600   dm->uid = GNUNET_htonll(uid);
601   dm->key = *key;
602   memcpy (&dm[1], data, size);
603 #if DEBUG_DATASTORE
604   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
605               "Transmitting `%s' message\n",
606               "DATA");
607 #endif
608   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
609   return GNUNET_OK;
610 }
611
612
613 /**
614  * Handle RESERVE-message.
615  *
616  * @param cls closure
617  * @param client identification of the client
618  * @param message the actual message
619  */
620 static void
621 handle_reserve (void *cls,
622                 struct GNUNET_SERVER_Client *client,
623                 const struct GNUNET_MessageHeader *message)
624 {
625   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
626   struct ReservationList *e;
627   unsigned long long used;
628   unsigned long long req;
629   uint64_t amount;
630   uint64_t entries;
631
632 #if DEBUG_DATASTORE
633   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
634               "Processing `%s' request\n",
635               "RESERVE");
636 #endif
637   amount = ntohl(msg->amount);
638   entries = GNUNET_ntohll(msg->entries);
639   used = plugin->api->get_size (plugin->api->cls) + reserved;
640   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
641   if (used + req > quota)
642     {
643       if (quota < used)
644         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
645       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
646                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
647                   quota - used,
648                   "RESERVE",
649                   req);
650       if (cache_size < req)
651         {
652           /* TODO: document this in the FAQ; essentially, if this
653              message happens, the insertion request could be blocked
654              by less-important content from migration because it is
655              larger than 1/8th of the overall available space, and
656              we only reserve 1/8th for "fresh" insertions */
657           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
658                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
659                       req,
660                       cache_size);
661           transmit_status (client, 0, 
662                            gettext_noop ("Insufficient space to satisfy request and "
663                                          "requested amount is larger than cache size"));
664         }
665       else
666         {
667           transmit_status (client, 0, 
668                            gettext_noop ("Insufficient space to satisfy request"));
669         }
670       return;      
671     }
672   reserved += req;
673   e = GNUNET_malloc (sizeof(struct ReservationList));
674   e->next = reservations;
675   reservations = e;
676   e->client = client;
677   e->amount = amount;
678   e->entries = entries;
679   e->rid = ++reservation_gen;
680   if (reservation_gen < 0)
681     reservation_gen = 0; /* wrap around */
682   transmit_status (client, e->rid, NULL);
683 }
684
685
686 /**
687  * Handle RELEASE_RESERVE-message.
688  *
689  * @param cls closure
690  * @param client identification of the client
691  * @param message the actual message
692  */
693 static void
694 handle_release_reserve (void *cls,
695                         struct GNUNET_SERVER_Client *client,
696                         const struct GNUNET_MessageHeader *message)
697 {
698   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
699   struct ReservationList *pos;
700   struct ReservationList *prev;
701   struct ReservationList *next;
702   int rid = ntohl(msg->rid);
703   unsigned long long rem;
704
705 #if DEBUG_DATASTORE
706   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707               "Processing `%s' request\n",
708               "RELEASE_RESERVE");
709 #endif
710   next = reservations;
711   prev = NULL;
712   while (NULL != (pos = next))
713     {
714       next = pos->next;
715       if (rid == pos->rid)
716         {
717           if (prev == NULL)
718             reservations = next;
719           else
720             prev->next = next;
721           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
722           GNUNET_assert (reserved >= rem);
723           reserved -= rem;
724 #if DEBUG_DATASTORE
725           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
726                       "Returning %llu remaining reserved bytes to storage pool\n",
727                       rem);
728 #endif    
729           GNUNET_free (pos);
730           transmit_status (client, GNUNET_OK, NULL);
731           return;
732         }       
733       prev = pos;
734       pos = next;
735     }
736   GNUNET_break (0);
737   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
738 }
739
740
741 /**
742  * Check that the given message is a valid data message.
743  *
744  * @return NULL if the message is not well-formed, otherwise the message
745  */
746 static const struct DataMessage *
747 check_data (const struct GNUNET_MessageHeader *message)
748 {
749   uint16_t size;
750   uint32_t dsize;
751   const struct DataMessage *dm;
752
753   size = ntohs(message->size);
754   if (size < sizeof(struct DataMessage))
755     { 
756       GNUNET_break (0);
757       return NULL;
758     }
759   dm = (const struct DataMessage *) message;
760   dsize = ntohl(dm->size);
761   if (size != dsize + sizeof(struct DataMessage))
762     {
763       GNUNET_break (0);
764       return NULL;
765     }
766   return dm;
767 }
768
769
770 /**
771  * Handle PUT-message.
772  *
773  * @param cls closure
774  * @param client identification of the client
775  * @param message the actual message
776  */
777 static void
778 handle_put (void *cls,
779             struct GNUNET_SERVER_Client *client,
780             const struct GNUNET_MessageHeader *message)
781 {
782   const struct DataMessage *dm = check_data (message);
783   char *msg;
784   int ret;
785   int rid;
786   struct ReservationList *pos;
787   uint32_t size;
788
789 #if DEBUG_DATASTORE
790   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791               "Processing `%s' request\n",
792               "PUT");
793 #endif
794   if (ntohl(dm->type) == 0) 
795     {
796       GNUNET_break (0);
797       dm = NULL;
798     }
799   if (dm == NULL)
800     {
801       GNUNET_break (0);
802       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
803       return;
804     }
805   rid = ntohl(dm->rid);
806   size = ntohl(dm->size);
807   if (rid > 0)
808     {
809       pos = reservations;
810       while ( (NULL != pos) &&
811               (rid != pos->rid) )
812         pos = pos->next;
813       GNUNET_break (pos != NULL);
814       if (NULL != pos)
815         {
816           GNUNET_break (pos->entries > 0);
817           GNUNET_break (pos->amount > size);
818           pos->entries--;
819           pos->amount -= size;
820           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
821         }
822     }
823   msg = NULL;
824   ret = plugin->api->put (plugin->api->cls,
825                           &dm->key,
826                           size,
827                           &dm[1],
828                           ntohl(dm->type),
829                           ntohl(dm->priority),
830                           ntohl(dm->anonymity),
831                           GNUNET_TIME_absolute_ntoh(dm->expiration),
832                           &msg);
833   if (GNUNET_OK == ret)
834     GNUNET_CONTAINER_bloomfilter_add (filter,
835                                       &dm->key);
836   transmit_status (client, 
837                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
838                    msg);
839   GNUNET_free_non_null (msg);
840   if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
841     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
842 }
843
844
845 /**
846  * Handle GET-message.
847  *
848  * @param cls closure
849  * @param client identification of the client
850  * @param message the actual message
851  */
852 static void
853 handle_get (void *cls,
854              struct GNUNET_SERVER_Client *client,
855              const struct GNUNET_MessageHeader *message)
856 {
857   static struct GNUNET_TIME_Absolute zero;
858   const struct GetMessage *msg;
859   uint16_t size;
860
861 #if DEBUG_DATASTORE
862   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
863               "Processing `%s' request\n",
864               "GET");
865 #endif
866   size = ntohs(message->size);
867   if ( (size != sizeof(struct GetMessage)) &&
868        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
869     {
870       GNUNET_break (0);
871       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
872       return;
873     }
874   msg = (const struct GetMessage*) message;
875   if ( (size == sizeof(struct GetMessage)) &&
876        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
877                                                          &msg->key)) )
878     {
879       /* don't bother database... */
880 #if DEBUG_DATASTORE
881       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
882                   "Empty result set for `%s' request.\n",
883                   "GET");
884 #endif  
885       transmit_item (client,
886                      NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
887       return;
888     }
889   GNUNET_SERVER_client_keep (client);
890   plugin->api->get (plugin->api->cls,
891                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
892                     NULL,
893                     ntohl(msg->type),
894                     &transmit_item,
895                     client);    
896 }
897
898
899 /**
900  * Handle UPDATE-message.
901  *
902  * @param cls closure
903  * @param client identification of the client
904  * @param message the actual message
905  */
906 static void
907 handle_update (void *cls,
908                struct GNUNET_SERVER_Client *client,
909                const struct GNUNET_MessageHeader *message)
910 {
911   const struct UpdateMessage *msg;
912   int ret;
913   char *emsg;
914
915 #if DEBUG_DATASTORE
916   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
917               "Processing `%s' request\n",
918               "UPDATE");
919 #endif
920   msg = (const struct UpdateMessage*) message;
921   emsg = NULL;
922   ret = plugin->api->update (plugin->api->cls,
923                              GNUNET_ntohll(msg->uid),
924                              (int32_t) ntohl(msg->priority),
925                              GNUNET_TIME_absolute_ntoh(msg->expiration),
926                              &emsg);
927   transmit_status (client, ret, emsg);
928   GNUNET_free_non_null (emsg);
929 }
930
931
932 /**
933  * Handle GET_RANDOM-message.
934  *
935  * @param cls closure
936  * @param client identification of the client
937  * @param message the actual message
938  */
939 static void
940 handle_get_random (void *cls,
941                    struct GNUNET_SERVER_Client *client,
942                    const struct GNUNET_MessageHeader *message)
943 {
944 #if DEBUG_DATASTORE
945   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
946               "Processing `%s' request\n",
947               "GET_RANDOM");
948 #endif
949   GNUNET_SERVER_client_keep (client);
950   plugin->api->iter_migration_order (plugin->api->cls,
951                                      0,
952                                      &transmit_item,
953                                      client);  
954 }
955
956
957 /**
958  * Context for the 'remove_callback'.
959  */
960 struct RemoveContext 
961 {
962   /**
963    * Client for whom we're doing the remvoing.
964    */
965   struct GNUNET_SERVER_Client *client;
966
967   /**
968    * GNUNET_YES if we managed to remove something.
969    */
970   int found;
971 };
972
973
974 /**
975  * Callback function that will cause the item that is passed
976  * in to be deleted (by returning GNUNET_NO).
977  */
978 static int
979 remove_callback (void *cls,
980                  void *next_cls,
981                  const GNUNET_HashCode * key,
982                  uint32_t size,
983                  const void *data,
984                  uint32_t type,
985                  uint32_t priority,
986                  uint32_t anonymity,
987                  struct GNUNET_TIME_Absolute
988                  expiration, uint64_t uid)
989 {
990   struct RemoveContext *rc = cls;
991
992   if (key == NULL)
993     {
994 #if DEBUG_DATASTORE
995       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
996                   "No further matches for `%s' request.\n",
997                   "REMOVE");
998 #endif  
999       if (GNUNET_YES == rc->found)
1000         transmit_status (rc->client, GNUNET_OK, NULL);       
1001       else
1002         transmit_status (rc->client, GNUNET_SYSERR, _("Content not found"));            
1003       GNUNET_SERVER_client_drop (rc->client);
1004       GNUNET_free (rc);
1005       return GNUNET_OK; /* last item */
1006     }
1007   rc->found = GNUNET_YES;
1008 #if DEBUG_DATASTORE
1009   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010               "Item %llu matches `%s' request.\n",
1011               (unsigned long long) uid,
1012               "REMOVE");
1013 #endif  
1014   GNUNET_CONTAINER_bloomfilter_remove (filter,
1015                                        key);
1016   plugin->api->next_request (next_cls, GNUNET_YES);
1017   return GNUNET_NO;
1018 }
1019
1020
1021 /**
1022  * Handle REMOVE-message.
1023  *
1024  * @param cls closure
1025  * @param client identification of the client
1026  * @param message the actual message
1027  */
1028 static void
1029 handle_remove (void *cls,
1030              struct GNUNET_SERVER_Client *client,
1031              const struct GNUNET_MessageHeader *message)
1032 {
1033   const struct DataMessage *dm = check_data (message);
1034   GNUNET_HashCode vhash;
1035   struct RemoveContext *rc;
1036
1037 #if DEBUG_DATASTORE
1038   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1039               "Processing `%s' request\n",
1040               "REMOVE");
1041 #endif
1042   if (dm == NULL)
1043     {
1044       GNUNET_break (0);
1045       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1046       return;
1047     }
1048   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1049   GNUNET_SERVER_client_keep (client);
1050   rc->client = client;
1051   GNUNET_CRYPTO_hash (&dm[1],
1052                       ntohl(dm->size),
1053                       &vhash);
1054   GNUNET_SERVER_client_keep (client);
1055   plugin->api->get (plugin->api->cls,
1056                     &dm->key,
1057                     &vhash,
1058                     ntohl(dm->type),
1059                     &remove_callback,
1060                     rc);
1061 }
1062
1063
1064 /**
1065  * Handle DROP-message.
1066  *
1067  * @param cls closure
1068  * @param client identification of the client
1069  * @param message the actual message
1070  */
1071 static void
1072 handle_drop (void *cls,
1073              struct GNUNET_SERVER_Client *client,
1074              const struct GNUNET_MessageHeader *message)
1075 {
1076 #if DEBUG_DATASTORE
1077   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1078               "Processing `%s' request\n",
1079               "DROP");
1080 #endif
1081   plugin->api->drop (plugin->api->cls);
1082   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1083 }
1084
1085
1086 /**
1087  * List of handlers for the messages understood by this
1088  * service.
1089  */
1090 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1091   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1092    sizeof(struct ReserveMessage) }, 
1093   {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1094    sizeof(struct ReleaseReserveMessage) }, 
1095   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1096   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1097    sizeof (struct UpdateMessage) }, 
1098   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1099   {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1100    sizeof(struct GNUNET_MessageHeader) }, 
1101   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1102   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1103    sizeof(struct GNUNET_MessageHeader) }, 
1104   {NULL, NULL, 0, 0}
1105 };
1106
1107
1108
1109 /**
1110  * Load the datastore plugin.
1111  */
1112 static struct DatastorePlugin *
1113 load_plugin () 
1114 {
1115   struct DatastorePlugin *ret;
1116   char *libname;
1117   char *name;
1118
1119   if (GNUNET_OK !=
1120       GNUNET_CONFIGURATION_get_value_string (cfg,
1121                                              "DATASTORE", "DATABASE", &name))
1122     {
1123       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1124                   _("No `%s' specified for `%s' in configuration!\n"),
1125                   "DATABASE",
1126                   "DATASTORE");
1127       return NULL;
1128     }
1129   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1130   ret->env.cfg = cfg;
1131   ret->env.sched = sched;  
1132   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1133               _("Loading `%s' datastore plugin\n"), name);
1134   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1135   ret->short_name = name;
1136   ret->lib_name = libname;
1137   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1138   if (ret->api == NULL)
1139     {
1140       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1141                   _("Failed to load datastore plugin for `%s'\n"), name);
1142       GNUNET_free (ret->short_name);
1143       GNUNET_free (libname);
1144       GNUNET_free (ret);
1145       return NULL;
1146     }
1147   return ret;
1148 }
1149
1150
1151 /**
1152  * Function called when the service shuts
1153  * down.  Unloads our datastore plugin.
1154  *
1155  * @param plug plugin to unload
1156  */
1157 static void
1158 unload_plugin (struct DatastorePlugin *plug)
1159 {
1160 #if DEBUG_DATASTORE
1161   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1162               "Datastore service is unloading plugin...\n");
1163 #endif
1164   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1165   GNUNET_free (plug->lib_name);
1166   GNUNET_free (plug->short_name);
1167   GNUNET_free (plug);
1168 }
1169
1170
1171 /**
1172  * Last task run during shutdown.  Disconnects us from
1173  * the transport and core.
1174  */
1175 static void
1176 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1177 {
1178   unload_plugin (plugin);
1179   plugin = NULL;
1180 }
1181
1182
1183 /**
1184  * Function that removes all active reservations made
1185  * by the given client and releases the space for other
1186  * requests.
1187  *
1188  * @param cls closure
1189  * @param client identification of the client
1190  */
1191 static void
1192 cleanup_reservations (void *cls,
1193                       struct GNUNET_SERVER_Client
1194                       * client)
1195 {
1196   struct ReservationList *pos;
1197   struct ReservationList *prev;
1198   struct ReservationList *next;
1199
1200   prev = NULL;
1201   pos = reservations;
1202   while (NULL != pos)
1203     {
1204       next = pos->next;
1205       if (pos->client == client)
1206         {
1207           if (prev == NULL)
1208             reservations = next;
1209           else
1210             prev->next = next;
1211           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1212           GNUNET_free (pos);
1213         }
1214       else
1215         {
1216           prev = pos;
1217         }
1218       pos = next;
1219     }
1220 }
1221
1222
1223 /**
1224  * Process datastore requests.
1225  *
1226  * @param cls closure
1227  * @param s scheduler to use
1228  * @param server the initialized server
1229  * @param c configuration to use
1230  */
1231 static void
1232 run (void *cls,
1233      struct GNUNET_SCHEDULER_Handle *s,
1234      struct GNUNET_SERVER_Handle *server,
1235      struct GNUNET_CONFIGURATION_Handle *c)
1236 {
1237   char *fn;
1238   unsigned int bf_size;
1239
1240   sched = s;
1241   cfg = c;
1242   if (GNUNET_OK !=
1243       GNUNET_CONFIGURATION_get_value_number (cfg,
1244                                              "DATASTORE", "QUOTA", &quota))
1245     {
1246       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1247                   _("No `%s' specified for `%s' in configuration!\n"),
1248                   "QUOTA",
1249                   "DATASTORE");
1250       return;
1251     }
1252   cache_size = quota / 8; /* Or should we make this an option? */
1253   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1254   fn = NULL;
1255   if ( (GNUNET_OK !=
1256         GNUNET_CONFIGURATION_get_value_filename (cfg,
1257                                                  "DATASTORE",
1258                                                  "BLOOMFILTER",
1259                                                  &fn)) ||
1260        (GNUNET_OK !=
1261         GNUNET_DISK_directory_create_for_file (fn)) )
1262     {
1263       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1264                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1265                   fn != NULL ? fn : "");
1266       GNUNET_free_non_null (fn);
1267       fn = NULL;
1268     }
1269   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1270   GNUNET_free_non_null (fn);
1271   if (filter == NULL)
1272     {
1273       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1274                   _("Failed to initialize bloomfilter.\n"));
1275       return;
1276     }
1277   plugin = load_plugin ();
1278   if (NULL == plugin)
1279     {
1280       GNUNET_CONTAINER_bloomfilter_free (filter);
1281       return;
1282     }
1283   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1284   GNUNET_SERVER_add_handlers (server, handlers);
1285   expired_kill_task
1286     = GNUNET_SCHEDULER_add_delayed (sched,
1287                                     GNUNET_NO,
1288                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
1289                                     GNUNET_SCHEDULER_NO_TASK,
1290                                     GNUNET_TIME_UNIT_ZERO,
1291                                     &delete_expired, NULL);
1292   GNUNET_SCHEDULER_add_delayed (sched,
1293                                 GNUNET_YES,
1294                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
1295                                 GNUNET_SCHEDULER_NO_TASK,
1296                                 GNUNET_TIME_UNIT_FOREVER_REL,
1297                                 &cleaning_task, NULL);
1298   
1299 }
1300
1301
1302 /**
1303  * The main function for the datastore service.
1304  *
1305  * @param argc number of arguments from the command line
1306  * @param argv command line arguments
1307  * @return 0 ok, 1 on error
1308  */
1309 int
1310 main (int argc, char *const *argv)
1311 {
1312   int ret;
1313
1314   ret = (GNUNET_OK ==
1315          GNUNET_SERVICE_run (argc,
1316                              argv,
1317                              "datastore", &run, NULL, NULL, NULL)) ? 0 : 1;
1318   return ret;
1319 }
1320
1321
1322 /* end of gnunet-service-datastore.c */