87ae427e4cb1704f8020d66a50c6520868640492
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "plugin_datastore.h"
31 #include "datastore.h"
32
33 /**
34  * How many messages do we queue at most per client?
35  */
36 #define MAX_PENDING 1024
37
38 /**
39  * How long are we at most keeping "expired" content
40  * past the expiration date in the database?
41  */
42 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
43
44
45
46 /**
47  * Our datastore plugin.
48  */
49 struct DatastorePlugin
50 {
51
52   /**
53    * API of the transport as returned by the plugin's
54    * initialization function.
55    */
56   struct GNUNET_DATASTORE_PluginFunctions *api;
57
58   /**
59    * Short name for the plugin (i.e. "sqlite").
60    */
61   char *short_name;
62
63   /**
64    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
65    */
66   char *lib_name;
67
68   /**
69    * Environment this transport service is using
70    * for this plugin.
71    */
72   struct GNUNET_DATASTORE_PluginEnvironment env;
73
74 };
75
76
77 /**
78  * Linked list of active reservations.
79  */
80 struct ReservationList 
81 {
82
83   /**
84    * This is a linked list.
85    */
86   struct ReservationList *next;
87
88   /**
89    * Client that made the reservation.
90    */
91   struct GNUNET_SERVER_Client *client;
92
93   /**
94    * Number of bytes (still) reserved.
95    */
96   uint64_t amount;
97
98   /**
99    * Number of items (still) reserved.
100    */
101   uint64_t entries;
102
103   /**
104    * Reservation identifier.
105    */
106   int32_t rid;
107
108 };
109
110
111 /**
112  * Our datastore plugin (NULL if not available).
113  */
114 static struct DatastorePlugin *plugin;
115
116 /**
117  * Linked list of space reservations made by clients.
118  */
119 static struct ReservationList *reservations;
120
121 /**
122  * Bloomfilter to quickly tell if we don't have the content.
123  */
124 static struct GNUNET_CONTAINER_BloomFilter *filter;
125
126 /**
127  * Static counter to produce reservation identifiers.
128  */
129 static int reservation_gen;
130
131 /**
132  * How much space are we allowed to use?
133  */
134 static unsigned long long quota;
135
136 /**
137  * How much space are we using for the cache?
138  * (space available for insertions that will be
139  *  instantly reclaimed by discarding less 
140  *  important content --- or possibly whatever
141  *  we just inserted into the "cache").
142  */
143 static unsigned long long cache_size;
144
145 /**
146  * How much space have we currently reserved?
147  */
148 static unsigned long long reserved;
149
150 /**
151  * Identity of the task that is used to delete
152  * expired content.
153  */
154 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
155
156 /**
157  * Our configuration.
158  */
159 const struct GNUNET_CONFIGURATION_Handle *cfg;
160
161 /**
162  * Our scheduler.
163  */
164 struct GNUNET_SCHEDULER_Handle *sched; 
165
166 /**
167  * Function called once the transmit operation has
168  * either failed or succeeded.
169  *
170  * @param cls closure
171  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
172  */
173 typedef void (*TransmitContinuation)(void *cls,
174                                      int status);
175
176
177 struct TransmitCallbackContext 
178 {
179   /**
180    * The message that we're asked to transmit.
181    */
182   struct GNUNET_MessageHeader *msg;
183
184   /**
185    * Client that we are transmitting to.
186    */
187   struct GNUNET_SERVER_Client *client;
188
189   /**
190    * Function to call once msg has been transmitted
191    * (or at least added to the buffer).
192    */
193   TransmitContinuation tc;
194
195   /**
196    * Closure for tc.
197    */
198   void *tc_cls;
199
200   /**
201    * GNUNET_YES if we are supposed to signal the server
202    * completion of the client's request.
203    */
204   int end;
205 };
206
207
208 /**
209  * Task that is used to remove expired entries from
210  * the datastore.  This task will schedule itself
211  * again automatically to always delete all expired
212  * content quickly.
213  *
214  * @param cls not used
215  * @param tc task context
216  */ 
217 static void
218 delete_expired (void *cls,
219                 const struct GNUNET_SCHEDULER_TaskContext *tc);
220
221
222 /**
223  * Iterate over the expired items stored in the datastore.
224  * Delete all expired items; once we have processed all
225  * expired items, re-schedule the "delete_expired" task.
226  *
227  * @param cls not used
228  * @param next_cls closure to pass to the "next" function.
229  * @param key key for the content
230  * @param size number of bytes in data
231  * @param data content stored
232  * @param type type of the content
233  * @param priority priority of the content
234  * @param anonymity anonymity-level for the content
235  * @param expiration expiration time for the content
236  * @param uid unique identifier for the datum;
237  *        maybe 0 if no unique identifier is available
238  *
239  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
240  *         (continue on call to "next", of course),
241  *         GNUNET_NO to delete the item and continue (if supported)
242  */
243 static int 
244 expired_processor (void *cls,
245                    void *next_cls,
246                    const GNUNET_HashCode * key,
247                    uint32_t size,
248                    const void *data,
249                    uint32_t type,
250                    uint32_t priority,
251                    uint32_t anonymity,
252                    struct GNUNET_TIME_Absolute
253                    expiration, 
254                    uint64_t uid)
255 {
256   struct GNUNET_TIME_Absolute now;
257
258   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
259   if (key == NULL) 
260     {
261       expired_kill_task 
262         = GNUNET_SCHEDULER_add_delayed (sched,
263                                         GNUNET_NO,
264                                         GNUNET_SCHEDULER_PRIORITY_IDLE,
265                                         GNUNET_SCHEDULER_NO_TASK,
266                                         MAX_EXPIRE_DELAY,
267                                         &delete_expired,
268                                         NULL);
269       return GNUNET_SYSERR;
270     }
271   now = GNUNET_TIME_absolute_get ();
272   if (expiration.value > now.value)
273     {
274       /* finished processing */
275       plugin->api->next_request (next_cls, GNUNET_YES);
276       return GNUNET_SYSERR;
277     }
278   plugin->api->next_request (next_cls, GNUNET_NO);
279 #if DEBUG_DATASTORE
280   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
281               "Deleting content that expired %llu ms ago\n",
282               (unsigned long long) (now.value - expiration.value));
283 #endif
284   GNUNET_CONTAINER_bloomfilter_remove (filter,
285                                        key);
286   return GNUNET_NO; /* delete */
287 }
288
289
290 /**
291  * Task that is used to remove expired entries from
292  * the datastore.  This task will schedule itself
293  * again automatically to always delete all expired
294  * content quickly.
295  *
296  * @param cls not used
297  * @param tc task context
298  */ 
299 static void
300 delete_expired (void *cls,
301                 const struct GNUNET_SCHEDULER_TaskContext *tc)
302 {
303   plugin->api->iter_ascending_expiration (plugin->api->cls, 
304                                           0,
305                                           &expired_processor,
306                                           NULL);
307 }
308
309
310 /**
311  * An iterator over a set of items stored in the datastore.
312  *
313  * @param cls closure
314  * @param next_cls closure to pass to the "next" function.
315  * @param key key for the content
316  * @param size number of bytes in data
317  * @param data content stored
318  * @param type type of the content
319  * @param priority priority of the content
320  * @param anonymity anonymity-level for the content
321  * @param expiration expiration time for the content
322  * @param uid unique identifier for the datum;
323  *        maybe 0 if no unique identifier is available
324  *
325  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
326  *         (continue on call to "next", of course),
327  *         GNUNET_NO to delete the item and continue (if supported)
328  */
329 static int 
330 manage (void *cls,
331         void *next_cls,
332         const GNUNET_HashCode * key,
333         uint32_t size,
334         const void *data,
335         uint32_t type,
336         uint32_t priority,
337         uint32_t anonymity,
338         struct GNUNET_TIME_Absolute
339         expiration, 
340         uint64_t uid)
341 {
342   unsigned long long *need = cls;
343
344   if (NULL == key)
345     {
346       GNUNET_free (need);
347       return GNUNET_SYSERR;
348     }
349   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
350     *need = 0;
351   else
352     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
353   plugin->api->next_request (next_cls, 
354                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
355 #if DEBUG_DATASTORE
356   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
357               "Deleting %llu bytes of low-priority content (still trying to recover %llu bytes)\n",
358               size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
359               *need);
360 #endif
361   GNUNET_CONTAINER_bloomfilter_remove (filter,
362                                        key);
363   return GNUNET_NO;
364 }
365
366
367 /**
368  * Manage available disk space by running tasks
369  * that will discard content if necessary.  This
370  * function will be run whenever a request for
371  * "need" bytes of storage could only be satisfied
372  * by eating into the "cache" (and we want our cache
373  * space back).
374  *
375  * @param need number of bytes of content that were
376  *        placed into the "cache" (and hence the
377  *        number of bytes that should be removed).
378  */
379 static void
380 manage_space (unsigned long long need)
381 {
382   unsigned long long *n;
383
384 #if DEBUG_DATASTORE
385   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386               "Asked to recover %llu bytes of cache space\n",
387               need);
388 #endif
389   n = GNUNET_malloc (sizeof(unsigned long long));
390   *n = need;
391   plugin->api->iter_low_priority (plugin->api->cls,
392                                   0,
393                                   &manage,
394                                   n);
395 }
396
397
398 /**
399  * Function called to notify a client about the socket
400  * begin ready to queue more data.  "buf" will be
401  * NULL and "size" zero if the socket was closed for
402  * writing in the meantime.
403  *
404  * @param cls closure
405  * @param size number of bytes available in buf
406  * @param buf where the callee should write the message
407  * @return number of bytes written to buf
408  */
409 static size_t
410 transmit_callback (void *cls,
411                    size_t size, void *buf)
412 {
413   struct TransmitCallbackContext *tcc = cls;
414   size_t msize;
415   
416   msize = ntohs(tcc->msg->size);
417   if (size == 0)
418     {
419 #if DEBUG_DATASTORE
420       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
421                   "Transmission failed.\n");
422 #endif
423       if (tcc->tc != NULL)
424         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
425       if (GNUNET_YES == tcc->end)
426         {
427           GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
428         }
429       GNUNET_free (tcc->msg);
430       GNUNET_free (tcc);
431       return 0;
432     }
433   GNUNET_assert (size >= msize);
434   memcpy (buf, tcc->msg, msize);
435   if (tcc->tc != NULL)
436     tcc->tc (tcc->tc_cls, GNUNET_OK);
437   if (GNUNET_YES == tcc->end)
438     {
439       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
440     }
441   else
442     {
443 #if DEBUG_DATASTORE
444       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
445                   "Response transmitted, more pending!\n");
446 #endif
447     }
448   GNUNET_free (tcc->msg);
449   GNUNET_free (tcc);
450   return msize;
451 }
452
453
454 /**
455  * Transmit the given message to the client.
456  *
457  * @param client target of the message
458  * @param msg message to transmit, will be freed!
459  * @param end is this the last response (and we should
460  *        signal the server completion accodingly after
461  *        transmitting this message)?
462  */
463 static void
464 transmit (struct GNUNET_SERVER_Client *client,
465           struct GNUNET_MessageHeader *msg,
466           TransmitContinuation tc,
467           void *tc_cls,
468           int end)
469 {
470   struct TransmitCallbackContext *tcc;
471
472   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
473   tcc->msg = msg;
474   tcc->client = client;
475   tcc->tc = tc;
476   tcc->tc_cls = tc_cls;
477   tcc->end = end;
478
479   if (NULL ==
480       GNUNET_SERVER_notify_transmit_ready (client,
481                                            ntohs(msg->size),
482                                            GNUNET_TIME_UNIT_FOREVER_REL,
483                                            &transmit_callback,
484                                            tcc))
485     {
486       GNUNET_break (0);
487       if (GNUNET_YES == end)
488         {
489 #if DEBUG_DATASTORE
490           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
491                       "Disconnecting client.\n");
492 #endif    
493           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
494         }
495       if (NULL != tc)
496         tc (tc_cls, GNUNET_SYSERR);
497       GNUNET_free (msg);
498       GNUNET_free (tcc);
499     }
500 }
501
502
503 /**
504  * Transmit a status code to the client.
505  *
506  * @param client receiver of the response
507  * @param code status code
508  * @param msg optional error message (can be NULL)
509  */
510 static void
511 transmit_status (struct GNUNET_SERVER_Client *client,
512                  int code,
513                  const char *msg)
514 {
515   struct StatusMessage *sm;
516   size_t slen;
517
518 #if DEBUG_DATASTORE
519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
520               "Transmitting `%s' message with value %d and message `%s'\n",
521               "STATUS",
522               code,
523               msg != NULL ? msg : "(none)");
524 #endif
525   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
526   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
527   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
528   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
529   sm->status = htonl(code);
530   if (slen > 0)
531     memcpy (&sm[1], msg, slen);  
532   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
533 }
534
535
536 /**
537  * Function called once the transmit operation has
538  * either failed or succeeded.
539  *
540  * @param cls closure
541  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
542  */
543 static void 
544 get_next(void *next_cls,
545          int status)
546 {
547   if (status != GNUNET_OK)
548     {
549       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
550                   _("Failed to transmit an item to the client; aborting iteration.\n"));    
551       plugin->api->next_request (next_cls, GNUNET_YES);
552       return;
553     }
554   plugin->api->next_request (next_cls, GNUNET_NO);
555 }
556
557
558 /**
559  * Function that will transmit the given datastore entry
560  * to the client.
561  *
562  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
563  * @param next_cls closure to use to ask for the next item
564  * @param key key for the content
565  * @param size number of bytes in data
566  * @param data content stored
567  * @param type type of the content
568  * @param priority priority of the content
569  * @param anonymity anonymity-level for the content
570  * @param expiration expiration time for the content
571  * @param uid unique identifier for the datum;
572  *        maybe 0 if no unique identifier is available
573  *
574  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
575  *         GNUNET_NO to delete the item and continue (if supported)
576  */
577 static int
578 transmit_item (void *cls,
579                void *next_cls,
580                const GNUNET_HashCode * key,
581                uint32_t size,
582                const void *data,
583                uint32_t type,
584                uint32_t priority,
585                uint32_t anonymity,
586                struct GNUNET_TIME_Absolute
587                expiration, uint64_t uid)
588 {
589   struct GNUNET_SERVER_Client *client = cls;
590   struct GNUNET_MessageHeader *end;
591   struct DataMessage *dm;
592
593   if (key == NULL)
594     {
595       /* transmit 'DATA_END' */
596 #if DEBUG_DATASTORE
597       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
598                   "Transmitting `%s' message\n",
599                   "DATA_END");
600 #endif
601       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
602       end->size = htons(sizeof(struct GNUNET_MessageHeader));
603       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
604       transmit (client, end, NULL, NULL, GNUNET_YES);
605       GNUNET_SERVER_client_drop (client);
606       return GNUNET_OK;
607     }
608   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
609   dm->header.size = htons(sizeof(struct DataMessage) + size);
610   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
611   dm->rid = htonl(0);
612   dm->size = htonl(size);
613   dm->type = htonl(type);
614   dm->priority = htonl(priority);
615   dm->anonymity = htonl(anonymity);
616   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
617   dm->uid = GNUNET_htonll(uid);
618   dm->key = *key;
619   memcpy (&dm[1], data, size);
620 #if DEBUG_DATASTORE
621   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622               "Transmitting `%s' message\n",
623               "DATA");
624 #endif
625   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
626   return GNUNET_OK;
627 }
628
629
630 /**
631  * Handle RESERVE-message.
632  *
633  * @param cls closure
634  * @param client identification of the client
635  * @param message the actual message
636  */
637 static void
638 handle_reserve (void *cls,
639                 struct GNUNET_SERVER_Client *client,
640                 const struct GNUNET_MessageHeader *message)
641 {
642   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
643   struct ReservationList *e;
644   unsigned long long used;
645   unsigned long long req;
646   uint64_t amount;
647   uint32_t entries;
648
649 #if DEBUG_DATASTORE
650   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
651               "Processing `%s' request\n",
652               "RESERVE");
653 #endif
654   amount = GNUNET_ntohll(msg->amount);
655   entries = ntohl(msg->entries);
656   used = plugin->api->get_size (plugin->api->cls) + reserved;
657   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
658   if (used + req > quota)
659     {
660       if (quota < used)
661         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
662       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
663                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
664                   quota - used,
665                   "RESERVE",
666                   req);
667       if (cache_size < req)
668         {
669           /* TODO: document this in the FAQ; essentially, if this
670              message happens, the insertion request could be blocked
671              by less-important content from migration because it is
672              larger than 1/8th of the overall available space, and
673              we only reserve 1/8th for "fresh" insertions */
674           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
675                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
676                       req,
677                       cache_size);
678           transmit_status (client, 0, 
679                            gettext_noop ("Insufficient space to satisfy request and "
680                                          "requested amount is larger than cache size"));
681         }
682       else
683         {
684           transmit_status (client, 0, 
685                            gettext_noop ("Insufficient space to satisfy request"));
686         }
687       return;      
688     }
689   reserved += req;
690   e = GNUNET_malloc (sizeof(struct ReservationList));
691   e->next = reservations;
692   reservations = e;
693   e->client = client;
694   e->amount = amount;
695   e->entries = entries;
696   e->rid = ++reservation_gen;
697   if (reservation_gen < 0)
698     reservation_gen = 0; /* wrap around */
699   transmit_status (client, e->rid, NULL);
700 }
701
702
703 /**
704  * Handle RELEASE_RESERVE-message.
705  *
706  * @param cls closure
707  * @param client identification of the client
708  * @param message the actual message
709  */
710 static void
711 handle_release_reserve (void *cls,
712                         struct GNUNET_SERVER_Client *client,
713                         const struct GNUNET_MessageHeader *message)
714 {
715   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
716   struct ReservationList *pos;
717   struct ReservationList *prev;
718   struct ReservationList *next;
719   int rid = ntohl(msg->rid);
720   unsigned long long rem;
721
722 #if DEBUG_DATASTORE
723   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
724               "Processing `%s' request\n",
725               "RELEASE_RESERVE");
726 #endif
727   next = reservations;
728   prev = NULL;
729   while (NULL != (pos = next))
730     {
731       next = pos->next;
732       if (rid == pos->rid)
733         {
734           if (prev == NULL)
735             reservations = next;
736           else
737             prev->next = next;
738           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
739           GNUNET_assert (reserved >= rem);
740           reserved -= rem;
741 #if DEBUG_DATASTORE
742           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
743                       "Returning %llu remaining reserved bytes to storage pool\n",
744                       rem);
745 #endif    
746           GNUNET_free (pos);
747           transmit_status (client, GNUNET_OK, NULL);
748           return;
749         }       
750       prev = pos;
751       pos = next;
752     }
753   GNUNET_break (0);
754   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
755 }
756
757
758 /**
759  * Check that the given message is a valid data message.
760  *
761  * @return NULL if the message is not well-formed, otherwise the message
762  */
763 static const struct DataMessage *
764 check_data (const struct GNUNET_MessageHeader *message)
765 {
766   uint16_t size;
767   uint32_t dsize;
768   const struct DataMessage *dm;
769
770   size = ntohs(message->size);
771   if (size < sizeof(struct DataMessage))
772     { 
773       GNUNET_break (0);
774       return NULL;
775     }
776   dm = (const struct DataMessage *) message;
777   dsize = ntohl(dm->size);
778   if (size != dsize + sizeof(struct DataMessage))
779     {
780       GNUNET_break (0);
781       return NULL;
782     }
783   return dm;
784 }
785
786
787 /**
788  * Handle PUT-message.
789  *
790  * @param cls closure
791  * @param client identification of the client
792  * @param message the actual message
793  */
794 static void
795 handle_put (void *cls,
796             struct GNUNET_SERVER_Client *client,
797             const struct GNUNET_MessageHeader *message)
798 {
799   const struct DataMessage *dm = check_data (message);
800   char *msg;
801   int ret;
802   int rid;
803   struct ReservationList *pos;
804   uint32_t size;
805
806 #if DEBUG_DATASTORE
807   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808               "Processing `%s' request\n",
809               "PUT");
810 #endif
811   if (ntohl(dm->type) == 0) 
812     {
813       GNUNET_break (0);
814       dm = NULL;
815     }
816   if (dm == NULL)
817     {
818       GNUNET_break (0);
819       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
820       return;
821     }
822   rid = ntohl(dm->rid);
823   size = ntohl(dm->size);
824   if (rid > 0)
825     {
826       pos = reservations;
827       while ( (NULL != pos) &&
828               (rid != pos->rid) )
829         pos = pos->next;
830       GNUNET_break (pos != NULL);
831       if (NULL != pos)
832         {
833           GNUNET_break (pos->entries > 0);
834           GNUNET_break (pos->amount > size);
835           pos->entries--;
836           pos->amount -= size;
837           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
838         }
839     }
840   msg = NULL;
841   ret = plugin->api->put (plugin->api->cls,
842                           &dm->key,
843                           size,
844                           &dm[1],
845                           ntohl(dm->type),
846                           ntohl(dm->priority),
847                           ntohl(dm->anonymity),
848                           GNUNET_TIME_absolute_ntoh(dm->expiration),
849                           &msg);
850   if (GNUNET_OK == ret)
851     GNUNET_CONTAINER_bloomfilter_add (filter,
852                                       &dm->key);
853   transmit_status (client, 
854                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
855                    msg);
856   GNUNET_free_non_null (msg);
857   if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
858     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
859 }
860
861
862 /**
863  * Handle GET-message.
864  *
865  * @param cls closure
866  * @param client identification of the client
867  * @param message the actual message
868  */
869 static void
870 handle_get (void *cls,
871              struct GNUNET_SERVER_Client *client,
872              const struct GNUNET_MessageHeader *message)
873 {
874   const struct GetMessage *msg;
875   uint16_t size;
876
877 #if DEBUG_DATASTORE
878   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
879               "Processing `%s' request\n",
880               "GET");
881 #endif
882   size = ntohs(message->size);
883   if ( (size != sizeof(struct GetMessage)) &&
884        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
885     {
886       GNUNET_break (0);
887       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
888       return;
889     }
890   msg = (const struct GetMessage*) message;
891   if ( (size == sizeof(struct GetMessage)) &&
892        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
893                                                          &msg->key)) )
894     {
895       /* don't bother database... */
896 #if DEBUG_DATASTORE
897       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
898                   "Empty result set for `%s' request.\n",
899                   "GET");
900 #endif  
901       transmit_item (client,
902                      NULL, NULL, 0, NULL, 0, 0, 0, 
903                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
904       return;
905     }
906   GNUNET_SERVER_client_keep (client);
907   plugin->api->get (plugin->api->cls,
908                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
909                     NULL,
910                     ntohl(msg->type),
911                     &transmit_item,
912                     client);    
913 }
914
915
916 /**
917  * Handle UPDATE-message.
918  *
919  * @param cls closure
920  * @param client identification of the client
921  * @param message the actual message
922  */
923 static void
924 handle_update (void *cls,
925                struct GNUNET_SERVER_Client *client,
926                const struct GNUNET_MessageHeader *message)
927 {
928   const struct UpdateMessage *msg;
929   int ret;
930   char *emsg;
931
932 #if DEBUG_DATASTORE
933   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
934               "Processing `%s' request\n",
935               "UPDATE");
936 #endif
937   msg = (const struct UpdateMessage*) message;
938   emsg = NULL;
939   ret = plugin->api->update (plugin->api->cls,
940                              GNUNET_ntohll(msg->uid),
941                              (int32_t) ntohl(msg->priority),
942                              GNUNET_TIME_absolute_ntoh(msg->expiration),
943                              &emsg);
944   transmit_status (client, ret, emsg);
945   GNUNET_free_non_null (emsg);
946 }
947
948
949 /**
950  * Handle GET_RANDOM-message.
951  *
952  * @param cls closure
953  * @param client identification of the client
954  * @param message the actual message
955  */
956 static void
957 handle_get_random (void *cls,
958                    struct GNUNET_SERVER_Client *client,
959                    const struct GNUNET_MessageHeader *message)
960 {
961 #if DEBUG_DATASTORE
962   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
963               "Processing `%s' request\n",
964               "GET_RANDOM");
965 #endif
966   GNUNET_SERVER_client_keep (client);
967   plugin->api->iter_migration_order (plugin->api->cls,
968                                      0,
969                                      &transmit_item,
970                                      client);  
971 }
972
973
974 /**
975  * Context for the 'remove_callback'.
976  */
977 struct RemoveContext 
978 {
979   /**
980    * Client for whom we're doing the remvoing.
981    */
982   struct GNUNET_SERVER_Client *client;
983
984   /**
985    * GNUNET_YES if we managed to remove something.
986    */
987   int found;
988 };
989
990
991 /**
992  * Callback function that will cause the item that is passed
993  * in to be deleted (by returning GNUNET_NO).
994  */
995 static int
996 remove_callback (void *cls,
997                  void *next_cls,
998                  const GNUNET_HashCode * key,
999                  uint32_t size,
1000                  const void *data,
1001                  uint32_t type,
1002                  uint32_t priority,
1003                  uint32_t anonymity,
1004                  struct GNUNET_TIME_Absolute
1005                  expiration, uint64_t uid)
1006 {
1007   struct RemoveContext *rc = cls;
1008
1009   if (key == NULL)
1010     {
1011 #if DEBUG_DATASTORE
1012       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1013                   "No further matches for `%s' request.\n",
1014                   "REMOVE");
1015 #endif  
1016       if (GNUNET_YES == rc->found)
1017         transmit_status (rc->client, GNUNET_OK, NULL);       
1018       else
1019         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1020       GNUNET_SERVER_client_drop (rc->client);
1021       GNUNET_free (rc);
1022       return GNUNET_OK; /* last item */
1023     }
1024   rc->found = GNUNET_YES;
1025 #if DEBUG_DATASTORE
1026   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027               "Item %llu matches `%s' request.\n",
1028               (unsigned long long) uid,
1029               "REMOVE");
1030 #endif  
1031   GNUNET_CONTAINER_bloomfilter_remove (filter,
1032                                        key);
1033   plugin->api->next_request (next_cls, GNUNET_YES);
1034   return GNUNET_NO;
1035 }
1036
1037
1038 /**
1039  * Handle REMOVE-message.
1040  *
1041  * @param cls closure
1042  * @param client identification of the client
1043  * @param message the actual message
1044  */
1045 static void
1046 handle_remove (void *cls,
1047              struct GNUNET_SERVER_Client *client,
1048              const struct GNUNET_MessageHeader *message)
1049 {
1050   const struct DataMessage *dm = check_data (message);
1051   GNUNET_HashCode vhash;
1052   struct RemoveContext *rc;
1053
1054 #if DEBUG_DATASTORE
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056               "Processing `%s' request\n",
1057               "REMOVE");
1058 #endif
1059   if (dm == NULL)
1060     {
1061       GNUNET_break (0);
1062       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1063       return;
1064     }
1065   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1066   GNUNET_SERVER_client_keep (client);
1067   rc->client = client;
1068   GNUNET_CRYPTO_hash (&dm[1],
1069                       ntohl(dm->size),
1070                       &vhash);
1071   GNUNET_SERVER_client_keep (client);
1072   plugin->api->get (plugin->api->cls,
1073                     &dm->key,
1074                     &vhash,
1075                     ntohl(dm->type),
1076                     &remove_callback,
1077                     rc);
1078 }
1079
1080
1081 /**
1082  * Handle DROP-message.
1083  *
1084  * @param cls closure
1085  * @param client identification of the client
1086  * @param message the actual message
1087  */
1088 static void
1089 handle_drop (void *cls,
1090              struct GNUNET_SERVER_Client *client,
1091              const struct GNUNET_MessageHeader *message)
1092 {
1093 #if DEBUG_DATASTORE
1094   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095               "Processing `%s' request\n",
1096               "DROP");
1097 #endif
1098   plugin->api->drop (plugin->api->cls);
1099   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1100 }
1101
1102
1103 /**
1104  * List of handlers for the messages understood by this
1105  * service.
1106  */
1107 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1108   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1109    sizeof(struct ReserveMessage) }, 
1110   {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1111    sizeof(struct ReleaseReserveMessage) }, 
1112   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1113   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1114    sizeof (struct UpdateMessage) }, 
1115   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1116   {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1117    sizeof(struct GNUNET_MessageHeader) }, 
1118   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1119   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1120    sizeof(struct GNUNET_MessageHeader) }, 
1121   {NULL, NULL, 0, 0}
1122 };
1123
1124
1125
1126 /**
1127  * Load the datastore plugin.
1128  */
1129 static struct DatastorePlugin *
1130 load_plugin () 
1131 {
1132   struct DatastorePlugin *ret;
1133   char *libname;
1134   char *name;
1135
1136   if (GNUNET_OK !=
1137       GNUNET_CONFIGURATION_get_value_string (cfg,
1138                                              "DATASTORE", "DATABASE", &name))
1139     {
1140       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1141                   _("No `%s' specified for `%s' in configuration!\n"),
1142                   "DATABASE",
1143                   "DATASTORE");
1144       return NULL;
1145     }
1146   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1147   ret->env.cfg = cfg;
1148   ret->env.sched = sched;  
1149   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1150               _("Loading `%s' datastore plugin\n"), name);
1151   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1152   ret->short_name = name;
1153   ret->lib_name = libname;
1154   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1155   if (ret->api == NULL)
1156     {
1157       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1158                   _("Failed to load datastore plugin for `%s'\n"), name);
1159       GNUNET_free (ret->short_name);
1160       GNUNET_free (libname);
1161       GNUNET_free (ret);
1162       return NULL;
1163     }
1164   return ret;
1165 }
1166
1167
1168 /**
1169  * Function called when the service shuts
1170  * down.  Unloads our datastore plugin.
1171  *
1172  * @param plug plugin to unload
1173  */
1174 static void
1175 unload_plugin (struct DatastorePlugin *plug)
1176 {
1177 #if DEBUG_DATASTORE
1178   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1179               "Datastore service is unloading plugin...\n");
1180 #endif
1181   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1182   GNUNET_free (plug->lib_name);
1183   GNUNET_free (plug->short_name);
1184   GNUNET_free (plug);
1185 }
1186
1187
1188 /**
1189  * Last task run during shutdown.  Disconnects us from
1190  * the transport and core.
1191  */
1192 static void
1193 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1194 {
1195   unload_plugin (plugin);
1196   plugin = NULL;
1197 }
1198
1199
1200 /**
1201  * Function that removes all active reservations made
1202  * by the given client and releases the space for other
1203  * requests.
1204  *
1205  * @param cls closure
1206  * @param client identification of the client
1207  */
1208 static void
1209 cleanup_reservations (void *cls,
1210                       struct GNUNET_SERVER_Client
1211                       * client)
1212 {
1213   struct ReservationList *pos;
1214   struct ReservationList *prev;
1215   struct ReservationList *next;
1216
1217   prev = NULL;
1218   pos = reservations;
1219   while (NULL != pos)
1220     {
1221       next = pos->next;
1222       if (pos->client == client)
1223         {
1224           if (prev == NULL)
1225             reservations = next;
1226           else
1227             prev->next = next;
1228           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1229           GNUNET_free (pos);
1230         }
1231       else
1232         {
1233           prev = pos;
1234         }
1235       pos = next;
1236     }
1237 }
1238
1239
1240 /**
1241  * Process datastore requests.
1242  *
1243  * @param cls closure
1244  * @param s scheduler to use
1245  * @param server the initialized server
1246  * @param c configuration to use
1247  */
1248 static void
1249 run (void *cls,
1250      struct GNUNET_SCHEDULER_Handle *s,
1251      struct GNUNET_SERVER_Handle *server,
1252      const struct GNUNET_CONFIGURATION_Handle *c)
1253 {
1254   char *fn;
1255   unsigned int bf_size;
1256
1257   sched = s;
1258   cfg = c;
1259   if (GNUNET_OK !=
1260       GNUNET_CONFIGURATION_get_value_number (cfg,
1261                                              "DATASTORE", "QUOTA", &quota))
1262     {
1263       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1264                   _("No `%s' specified for `%s' in configuration!\n"),
1265                   "QUOTA",
1266                   "DATASTORE");
1267       return;
1268     }
1269   cache_size = quota / 8; /* Or should we make this an option? */
1270   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1271   fn = NULL;
1272   if ( (GNUNET_OK !=
1273         GNUNET_CONFIGURATION_get_value_filename (cfg,
1274                                                  "DATASTORE",
1275                                                  "BLOOMFILTER",
1276                                                  &fn)) ||
1277        (GNUNET_OK !=
1278         GNUNET_DISK_directory_create_for_file (fn)) )
1279     {
1280       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1281                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1282                   fn != NULL ? fn : "");
1283       GNUNET_free_non_null (fn);
1284       fn = NULL;
1285     }
1286   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1287   GNUNET_free_non_null (fn);
1288   if (filter == NULL)
1289     {
1290       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1291                   _("Failed to initialize bloomfilter.\n"));
1292       return;
1293     }
1294   plugin = load_plugin ();
1295   if (NULL == plugin)
1296     {
1297       GNUNET_CONTAINER_bloomfilter_free (filter);
1298       return;
1299     }
1300   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1301   GNUNET_SERVER_add_handlers (server, handlers);
1302   expired_kill_task
1303     = GNUNET_SCHEDULER_add_delayed (sched,
1304                                     GNUNET_NO,
1305                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
1306                                     GNUNET_SCHEDULER_NO_TASK,
1307                                     GNUNET_TIME_UNIT_ZERO,
1308                                     &delete_expired, NULL);
1309   GNUNET_SCHEDULER_add_delayed (sched,
1310                                 GNUNET_YES,
1311                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
1312                                 GNUNET_SCHEDULER_NO_TASK,
1313                                 GNUNET_TIME_UNIT_FOREVER_REL,
1314                                 &cleaning_task, NULL);
1315   
1316 }
1317
1318
1319 /**
1320  * The main function for the datastore service.
1321  *
1322  * @param argc number of arguments from the command line
1323  * @param argv command line arguments
1324  * @return 0 ok, 1 on error
1325  */
1326 int
1327 main (int argc, char *const *argv)
1328 {
1329   int ret;
1330
1331   ret = (GNUNET_OK ==
1332          GNUNET_SERVICE_run (argc,
1333                              argv,
1334                              "datastore", &run, NULL, NULL, NULL)) ? 0 : 1;
1335   return ret;
1336 }
1337
1338
1339 /* end of gnunet-service-datastore.c */