fix
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_arm_service.h"
30 #include "gnunet_protocols.h"
31 #include "plugin_datastore.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45
46
47 /**
48  * Our datastore plugin.
49  */
50 struct DatastorePlugin
51 {
52
53   /**
54    * API of the transport as returned by the plugin's
55    * initialization function.
56    */
57   struct GNUNET_DATASTORE_PluginFunctions *api;
58
59   /**
60    * Short name for the plugin (i.e. "sqlite").
61    */
62   char *short_name;
63
64   /**
65    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
66    */
67   char *lib_name;
68
69   /**
70    * Environment this transport service is using
71    * for this plugin.
72    */
73   struct GNUNET_DATASTORE_PluginEnvironment env;
74
75 };
76
77
78 /**
79  * Linked list of active reservations.
80  */
81 struct ReservationList 
82 {
83
84   /**
85    * This is a linked list.
86    */
87   struct ReservationList *next;
88
89   /**
90    * Client that made the reservation.
91    */
92   struct GNUNET_SERVER_Client *client;
93
94   /**
95    * Number of bytes (still) reserved.
96    */
97   uint64_t amount;
98
99   /**
100    * Number of items (still) reserved.
101    */
102   uint64_t entries;
103
104   /**
105    * Reservation identifier.
106    */
107   int32_t rid;
108
109 };
110
111
112 /**
113  * Our datastore plugin (NULL if not available).
114  */
115 static struct DatastorePlugin *plugin;
116
117 /**
118  * Linked list of space reservations made by clients.
119  */
120 static struct ReservationList *reservations;
121
122 /**
123  * Bloomfilter to quickly tell if we don't have the content.
124  */
125 static struct GNUNET_CONTAINER_BloomFilter *filter;
126
127 /**
128  * Static counter to produce reservation identifiers.
129  */
130 static int reservation_gen;
131
132 /**
133  * How much space are we allowed to use?
134  */
135 static unsigned long long quota;
136
137 /**
138  * How much space are we using for the cache?  (space available for
139  * insertions that will be instantly reclaimed by discarding less
140  * important content --- or possibly whatever we just inserted into
141  * the "cache").
142  */
143 static unsigned long long cache_size;
144
145 /**
146  * How much space have we currently reserved?
147  */
148 static unsigned long long reserved;
149
150 /**
151  * Identity of the task that is used to delete
152  * expired content.
153  */
154 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
155
156 /**
157  * Our configuration.
158  */
159 const struct GNUNET_CONFIGURATION_Handle *cfg;
160
161 /**
162  * Our scheduler.
163  */
164 struct GNUNET_SCHEDULER_Handle *sched; 
165
166 /**
167  * Function called once the transmit operation has
168  * either failed or succeeded.
169  *
170  * @param cls closure
171  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
172  */
173 typedef void (*TransmitContinuation)(void *cls,
174                                      int status);
175
176
177 struct TransmitCallbackContext 
178 {
179   /**
180    * The message that we're asked to transmit.
181    */
182   struct GNUNET_MessageHeader *msg;
183
184   /**
185    * Client that we are transmitting to.
186    */
187   struct GNUNET_SERVER_Client *client;
188
189   /**
190    * Function to call once msg has been transmitted
191    * (or at least added to the buffer).
192    */
193   TransmitContinuation tc;
194
195   /**
196    * Closure for tc.
197    */
198   void *tc_cls;
199
200   /**
201    * GNUNET_YES if we are supposed to signal the server
202    * completion of the client's request.
203    */
204   int end;
205 };
206
207
208 /**
209  * Task that is used to remove expired entries from
210  * the datastore.  This task will schedule itself
211  * again automatically to always delete all expired
212  * content quickly.
213  *
214  * @param cls not used
215  * @param tc task context
216  */ 
217 static void
218 delete_expired (void *cls,
219                 const struct GNUNET_SCHEDULER_TaskContext *tc);
220
221
222 /**
223  * Iterate over the expired items stored in the datastore.
224  * Delete all expired items; once we have processed all
225  * expired items, re-schedule the "delete_expired" task.
226  *
227  * @param cls not used
228  * @param next_cls closure to pass to the "next" function.
229  * @param key key for the content
230  * @param size number of bytes in data
231  * @param data content stored
232  * @param type type of the content
233  * @param priority priority of the content
234  * @param anonymity anonymity-level for the content
235  * @param expiration expiration time for the content
236  * @param uid unique identifier for the datum;
237  *        maybe 0 if no unique identifier is available
238  *
239  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
240  *         (continue on call to "next", of course),
241  *         GNUNET_NO to delete the item and continue (if supported)
242  */
243 static int 
244 expired_processor (void *cls,
245                    void *next_cls,
246                    const GNUNET_HashCode * key,
247                    uint32_t size,
248                    const void *data,
249                    uint32_t type,
250                    uint32_t priority,
251                    uint32_t anonymity,
252                    struct GNUNET_TIME_Absolute
253                    expiration, 
254                    uint64_t uid)
255 {
256   struct GNUNET_TIME_Absolute now;
257
258   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
259   if (key == NULL) 
260     {
261       expired_kill_task 
262         = GNUNET_SCHEDULER_add_delayed (sched,
263                                         MAX_EXPIRE_DELAY,
264                                         &delete_expired,
265                                         NULL);
266       return GNUNET_SYSERR;
267     }
268   now = GNUNET_TIME_absolute_get ();
269   if (expiration.value > now.value)
270     {
271       /* finished processing */
272       plugin->api->next_request (next_cls, GNUNET_YES);
273       return GNUNET_SYSERR;
274     }
275   plugin->api->next_request (next_cls, GNUNET_NO);
276 #if DEBUG_DATASTORE
277   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
278               "Deleting content that expired %llu ms ago\n",
279               (unsigned long long) (now.value - expiration.value));
280 #endif
281   GNUNET_CONTAINER_bloomfilter_remove (filter,
282                                        key);
283   return GNUNET_NO; /* delete */
284 }
285
286
287 /**
288  * Task that is used to remove expired entries from
289  * the datastore.  This task will schedule itself
290  * again automatically to always delete all expired
291  * content quickly.
292  *
293  * @param cls not used
294  * @param tc task context
295  */ 
296 static void
297 delete_expired (void *cls,
298                 const struct GNUNET_SCHEDULER_TaskContext *tc)
299 {
300   plugin->api->iter_ascending_expiration (plugin->api->cls, 
301                                           0,
302                                           &expired_processor,
303                                           NULL);
304 }
305
306
307 /**
308  * An iterator over a set of items stored in the datastore.
309  *
310  * @param cls closure
311  * @param next_cls closure to pass to the "next" function.
312  * @param key key for the content
313  * @param size number of bytes in data
314  * @param data content stored
315  * @param type type of the content
316  * @param priority priority of the content
317  * @param anonymity anonymity-level for the content
318  * @param expiration expiration time for the content
319  * @param uid unique identifier for the datum;
320  *        maybe 0 if no unique identifier is available
321  *
322  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
323  *         (continue on call to "next", of course),
324  *         GNUNET_NO to delete the item and continue (if supported)
325  */
326 static int 
327 manage (void *cls,
328         void *next_cls,
329         const GNUNET_HashCode * key,
330         uint32_t size,
331         const void *data,
332         uint32_t type,
333         uint32_t priority,
334         uint32_t anonymity,
335         struct GNUNET_TIME_Absolute
336         expiration, 
337         uint64_t uid)
338 {
339   unsigned long long *need = cls;
340
341   if (NULL == key)
342     {
343       GNUNET_free (need);
344       return GNUNET_SYSERR;
345     }
346   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
347     *need = 0;
348   else
349     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
350   plugin->api->next_request (next_cls, 
351                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
352 #if DEBUG_DATASTORE
353   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
354               "Deleting %llu bytes of low-priority content (still trying to free another %llu bytes)\n",
355               size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
356               *need);
357 #endif
358   GNUNET_CONTAINER_bloomfilter_remove (filter,
359                                        key);
360   return GNUNET_NO;
361 }
362
363
364 /**
365  * Manage available disk space by running tasks
366  * that will discard content if necessary.  This
367  * function will be run whenever a request for
368  * "need" bytes of storage could only be satisfied
369  * by eating into the "cache" (and we want our cache
370  * space back).
371  *
372  * @param need number of bytes of content that were
373  *        placed into the "cache" (and hence the
374  *        number of bytes that should be removed).
375  */
376 static void
377 manage_space (unsigned long long need)
378 {
379   unsigned long long *n;
380
381 #if DEBUG_DATASTORE
382   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
383               "Asked to free up %llu bytes of cache space\n",
384               need);
385 #endif
386   n = GNUNET_malloc (sizeof(unsigned long long));
387   *n = need;
388   plugin->api->iter_low_priority (plugin->api->cls,
389                                   0,
390                                   &manage,
391                                   n);
392 }
393
394
395 /**
396  * Function called to notify a client about the socket
397  * begin ready to queue more data.  "buf" will be
398  * NULL and "size" zero if the socket was closed for
399  * writing in the meantime.
400  *
401  * @param cls closure
402  * @param size number of bytes available in buf
403  * @param buf where the callee should write the message
404  * @return number of bytes written to buf
405  */
406 static size_t
407 transmit_callback (void *cls,
408                    size_t size, void *buf)
409 {
410   struct TransmitCallbackContext *tcc = cls;
411   size_t msize;
412   
413   msize = ntohs(tcc->msg->size);
414   if (size == 0)
415     {
416 #if DEBUG_DATASTORE
417       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
418                   "Transmission failed.\n");
419 #endif
420       if (tcc->tc != NULL)
421         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
422       if (GNUNET_YES == tcc->end)
423         {
424           GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
425         }
426       GNUNET_free (tcc->msg);
427       GNUNET_free (tcc);
428       return 0;
429     }
430   GNUNET_assert (size >= msize);
431   memcpy (buf, tcc->msg, msize);
432   if (tcc->tc != NULL)
433     tcc->tc (tcc->tc_cls, GNUNET_OK);
434   if (GNUNET_YES == tcc->end)
435     {
436       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
437     }
438   else
439     {
440 #if DEBUG_DATASTORE
441       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
442                   "Response transmitted, more pending!\n");
443 #endif
444     }
445   GNUNET_free (tcc->msg);
446   GNUNET_free (tcc);
447   return msize;
448 }
449
450
451 /**
452  * Transmit the given message to the client.
453  *
454  * @param client target of the message
455  * @param msg message to transmit, will be freed!
456  * @param tc function to call afterwards
457  * @param tc_cls closure for tc
458  * @param end is this the last response (and we should
459  *        signal the server completion accodingly after
460  *        transmitting this message)?
461  */
462 static void
463 transmit (struct GNUNET_SERVER_Client *client,
464           struct GNUNET_MessageHeader *msg,
465           TransmitContinuation tc,
466           void *tc_cls,
467           int end)
468 {
469   struct TransmitCallbackContext *tcc;
470
471   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
472   tcc->msg = msg;
473   tcc->client = client;
474   tcc->tc = tc;
475   tcc->tc_cls = tc_cls;
476   tcc->end = end;
477
478   if (NULL ==
479       GNUNET_SERVER_notify_transmit_ready (client,
480                                            ntohs(msg->size),
481                                            GNUNET_TIME_UNIT_FOREVER_REL,
482                                            &transmit_callback,
483                                            tcc))
484     {
485       GNUNET_break (0);
486       if (GNUNET_YES == end)
487         {
488 #if DEBUG_DATASTORE
489           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
490                       "Disconnecting client.\n");
491 #endif    
492           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
493         }
494       if (NULL != tc)
495         tc (tc_cls, GNUNET_SYSERR);
496       GNUNET_free (msg);
497       GNUNET_free (tcc);
498     }
499 }
500
501
502 /**
503  * Transmit a status code to the client.
504  *
505  * @param client receiver of the response
506  * @param code status code
507  * @param msg optional error message (can be NULL)
508  */
509 static void
510 transmit_status (struct GNUNET_SERVER_Client *client,
511                  int code,
512                  const char *msg)
513 {
514   struct StatusMessage *sm;
515   size_t slen;
516
517 #if DEBUG_DATASTORE
518   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519               "Transmitting `%s' message with value %d and message `%s'\n",
520               "STATUS",
521               code,
522               msg != NULL ? msg : "(none)");
523 #endif
524   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
525   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
526   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
527   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
528   sm->status = htonl(code);
529   if (slen > 0)
530     memcpy (&sm[1], msg, slen);  
531   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
532 }
533
534
535 /**
536  * Function called once the transmit operation has
537  * either failed or succeeded.
538  *
539  * @param next_cls closure for calling "next_request" callback
540  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
541  */
542 static void 
543 get_next(void *next_cls,
544          int status)
545 {
546   if (status != GNUNET_OK)
547     {
548       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
549                   _("Failed to transmit an item to the client; aborting iteration.\n"));
550       if (plugin != NULL)
551         plugin->api->next_request (next_cls, GNUNET_YES);
552       return;
553     }
554   plugin->api->next_request (next_cls, GNUNET_NO);
555 }
556
557
558 /**
559  * Function that will transmit the given datastore entry
560  * to the client.
561  *
562  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
563  * @param next_cls closure to use to ask for the next item
564  * @param key key for the content
565  * @param size number of bytes in data
566  * @param data content stored
567  * @param type type of the content
568  * @param priority priority of the content
569  * @param anonymity anonymity-level for the content
570  * @param expiration expiration time for the content
571  * @param uid unique identifier for the datum;
572  *        maybe 0 if no unique identifier is available
573  *
574  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
575  *         GNUNET_NO to delete the item and continue (if supported)
576  */
577 static int
578 transmit_item (void *cls,
579                void *next_cls,
580                const GNUNET_HashCode * key,
581                uint32_t size,
582                const void *data,
583                uint32_t type,
584                uint32_t priority,
585                uint32_t anonymity,
586                struct GNUNET_TIME_Absolute
587                expiration, uint64_t uid)
588 {
589   struct GNUNET_SERVER_Client *client = cls;
590   struct GNUNET_MessageHeader *end;
591   struct DataMessage *dm;
592
593   if (key == NULL)
594     {
595       /* transmit 'DATA_END' */
596 #if DEBUG_DATASTORE
597       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
598                   "Transmitting `%s' message\n",
599                   "DATA_END");
600 #endif
601       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
602       end->size = htons(sizeof(struct GNUNET_MessageHeader));
603       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
604       transmit (client, end, NULL, NULL, GNUNET_YES);
605       GNUNET_SERVER_client_drop (client);
606       return GNUNET_OK;
607     }
608   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
609   dm->header.size = htons(sizeof(struct DataMessage) + size);
610   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
611   dm->rid = htonl(0);
612   dm->size = htonl(size);
613   dm->type = htonl(type);
614   dm->priority = htonl(priority);
615   dm->anonymity = htonl(anonymity);
616   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
617   dm->uid = GNUNET_htonll(uid);
618   dm->key = *key;
619   memcpy (&dm[1], data, size);
620 #if DEBUG_DATASTORE
621   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622               "Transmitting `%s' message\n",
623               "DATA");
624 #endif
625   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
626   return GNUNET_OK;
627 }
628
629
630 /**
631  * Handle RESERVE-message.
632  *
633  * @param cls closure
634  * @param client identification of the client
635  * @param message the actual message
636  */
637 static void
638 handle_reserve (void *cls,
639                 struct GNUNET_SERVER_Client *client,
640                 const struct GNUNET_MessageHeader *message)
641 {
642   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
643   struct ReservationList *e;
644   unsigned long long used;
645   unsigned long long req;
646   uint64_t amount;
647   uint32_t entries;
648
649 #if DEBUG_DATASTORE
650   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
651               "Processing `%s' request\n",
652               "RESERVE");
653 #endif
654   amount = GNUNET_ntohll(msg->amount);
655   entries = ntohl(msg->entries);
656   used = plugin->api->get_size (plugin->api->cls) + reserved;
657   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
658   if (used + req > quota)
659     {
660       if (quota < used)
661         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
662       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
663                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
664                   quota - used,
665                   "RESERVE",
666                   req);
667       if (cache_size < req)
668         {
669           /* TODO: document this in the FAQ; essentially, if this
670              message happens, the insertion request could be blocked
671              by less-important content from migration because it is
672              larger than 1/8th of the overall available space, and
673              we only reserve 1/8th for "fresh" insertions */
674           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
675                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
676                       req,
677                       cache_size);
678           transmit_status (client, 0, 
679                            gettext_noop ("Insufficient space to satisfy request and "
680                                          "requested amount is larger than cache size"));
681         }
682       else
683         {
684           transmit_status (client, 0, 
685                            gettext_noop ("Insufficient space to satisfy request"));
686         }
687       return;      
688     }
689   reserved += req;
690   e = GNUNET_malloc (sizeof(struct ReservationList));
691   e->next = reservations;
692   reservations = e;
693   e->client = client;
694   e->amount = amount;
695   e->entries = entries;
696   e->rid = ++reservation_gen;
697   if (reservation_gen < 0)
698     reservation_gen = 0; /* wrap around */
699   transmit_status (client, e->rid, NULL);
700 }
701
702
703 /**
704  * Handle RELEASE_RESERVE-message.
705  *
706  * @param cls closure
707  * @param client identification of the client
708  * @param message the actual message
709  */
710 static void
711 handle_release_reserve (void *cls,
712                         struct GNUNET_SERVER_Client *client,
713                         const struct GNUNET_MessageHeader *message)
714 {
715   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
716   struct ReservationList *pos;
717   struct ReservationList *prev;
718   struct ReservationList *next;
719   int rid = ntohl(msg->rid);
720   unsigned long long rem;
721
722 #if DEBUG_DATASTORE
723   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
724               "Processing `%s' request\n",
725               "RELEASE_RESERVE");
726 #endif
727   next = reservations;
728   prev = NULL;
729   while (NULL != (pos = next))
730     {
731       next = pos->next;
732       if (rid == pos->rid)
733         {
734           if (prev == NULL)
735             reservations = next;
736           else
737             prev->next = next;
738           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
739           GNUNET_assert (reserved >= rem);
740           reserved -= rem;
741 #if DEBUG_DATASTORE
742           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
743                       "Returning %llu remaining reserved bytes to storage pool\n",
744                       rem);
745 #endif    
746           GNUNET_free (pos);
747           transmit_status (client, GNUNET_OK, NULL);
748           return;
749         }       
750       prev = pos;
751     }
752   GNUNET_break (0);
753   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
754 }
755
756
757 /**
758  * Check that the given message is a valid data message.
759  *
760  * @return NULL if the message is not well-formed, otherwise the message
761  */
762 static const struct DataMessage *
763 check_data (const struct GNUNET_MessageHeader *message)
764 {
765   uint16_t size;
766   uint32_t dsize;
767   const struct DataMessage *dm;
768
769   size = ntohs(message->size);
770   if (size < sizeof(struct DataMessage))
771     { 
772       GNUNET_break (0);
773       return NULL;
774     }
775   dm = (const struct DataMessage *) message;
776   dsize = ntohl(dm->size);
777   if (size != dsize + sizeof(struct DataMessage))
778     {
779       GNUNET_break (0);
780       return NULL;
781     }
782   return dm;
783 }
784
785
786 /**
787  * Handle PUT-message.
788  *
789  * @param cls closure
790  * @param client identification of the client
791  * @param message the actual message
792  */
793 static void
794 handle_put (void *cls,
795             struct GNUNET_SERVER_Client *client,
796             const struct GNUNET_MessageHeader *message)
797 {
798   const struct DataMessage *dm = check_data (message);
799   char *msg;
800   int ret;
801   int rid;
802   struct ReservationList *pos;
803   uint32_t size;
804
805 #if DEBUG_DATASTORE
806   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807               "Processing `%s' request\n",
808               "PUT");
809 #endif
810   if (ntohl(dm->type) == 0) 
811     {
812       GNUNET_break (0);
813       dm = NULL;
814     }
815   if (dm == NULL)
816     {
817       GNUNET_break (0);
818       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
819       return;
820     }
821   rid = ntohl(dm->rid);
822   size = ntohl(dm->size);
823   if (rid > 0)
824     {
825       pos = reservations;
826       while ( (NULL != pos) &&
827               (rid != pos->rid) )
828         pos = pos->next;
829       GNUNET_break (pos != NULL);
830       if (NULL != pos)
831         {
832           GNUNET_break (pos->entries > 0);
833           GNUNET_break (pos->amount > size);
834           pos->entries--;
835           pos->amount -= size;
836           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
837         }
838     }
839   msg = NULL;
840   ret = plugin->api->put (plugin->api->cls,
841                           &dm->key,
842                           size,
843                           &dm[1],
844                           ntohl(dm->type),
845                           ntohl(dm->priority),
846                           ntohl(dm->anonymity),
847                           GNUNET_TIME_absolute_ntoh(dm->expiration),
848                           &msg);
849   if (GNUNET_OK == ret)
850     {
851       GNUNET_CONTAINER_bloomfilter_add (filter,
852                                         &dm->key);
853 #if DEBUG_DATASTORE
854       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
855                   "Successfully stored %u bytes under key `%s'\n",
856                   size,
857                   GNUNET_h2s (&dm->key));
858 #endif
859     }
860   transmit_status (client, 
861                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
862                    msg);
863   GNUNET_free_non_null (msg);
864   if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
865     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
866 }
867
868
869 /**
870  * Handle GET-message.
871  *
872  * @param cls closure
873  * @param client identification of the client
874  * @param message the actual message
875  */
876 static void
877 handle_get (void *cls,
878              struct GNUNET_SERVER_Client *client,
879              const struct GNUNET_MessageHeader *message)
880 {
881   const struct GetMessage *msg;
882   uint16_t size;
883
884 #if DEBUG_DATASTORE
885   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886               "Processing `%s' request\n",
887               "GET");
888 #endif
889   size = ntohs(message->size);
890   if ( (size != sizeof(struct GetMessage)) &&
891        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
892     {
893       GNUNET_break (0);
894       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
895       return;
896     }
897   msg = (const struct GetMessage*) message;
898   if ( (size == sizeof(struct GetMessage)) &&
899        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
900                                                          &msg->key)) )
901     {
902       /* don't bother database... */
903 #if DEBUG_DATASTORE
904       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
905                   "Empty result set for `%s' request for `%s'.\n",
906                   "GET",
907                   GNUNET_h2s (&msg->key));
908 #endif  
909       GNUNET_SERVER_client_keep (client);
910       transmit_item (client,
911                      NULL, NULL, 0, NULL, 0, 0, 0, 
912                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
913       return;
914     }
915   GNUNET_SERVER_client_keep (client);
916   plugin->api->get (plugin->api->cls,
917                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
918                     NULL,
919                     ntohl(msg->type),
920                     &transmit_item,
921                     client);    
922 }
923
924
925 /**
926  * Handle UPDATE-message.
927  *
928  * @param cls closure
929  * @param client identification of the client
930  * @param message the actual message
931  */
932 static void
933 handle_update (void *cls,
934                struct GNUNET_SERVER_Client *client,
935                const struct GNUNET_MessageHeader *message)
936 {
937   const struct UpdateMessage *msg;
938   int ret;
939   char *emsg;
940
941 #if DEBUG_DATASTORE
942   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
943               "Processing `%s' request\n",
944               "UPDATE");
945 #endif
946   msg = (const struct UpdateMessage*) message;
947   emsg = NULL;
948   ret = plugin->api->update (plugin->api->cls,
949                              GNUNET_ntohll(msg->uid),
950                              (int32_t) ntohl(msg->priority),
951                              GNUNET_TIME_absolute_ntoh(msg->expiration),
952                              &emsg);
953   transmit_status (client, ret, emsg);
954   GNUNET_free_non_null (emsg);
955 }
956
957
958 /**
959  * Handle GET_RANDOM-message.
960  *
961  * @param cls closure
962  * @param client identification of the client
963  * @param message the actual message
964  */
965 static void
966 handle_get_random (void *cls,
967                    struct GNUNET_SERVER_Client *client,
968                    const struct GNUNET_MessageHeader *message)
969 {
970 #if DEBUG_DATASTORE
971   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
972               "Processing `%s' request\n",
973               "GET_RANDOM");
974 #endif
975   GNUNET_SERVER_client_keep (client);
976   plugin->api->iter_migration_order (plugin->api->cls,
977                                      0,
978                                      &transmit_item,
979                                      client);  
980 }
981
982
983 /**
984  * Context for the 'remove_callback'.
985  */
986 struct RemoveContext 
987 {
988   /**
989    * Client for whom we're doing the remvoing.
990    */
991   struct GNUNET_SERVER_Client *client;
992
993   /**
994    * GNUNET_YES if we managed to remove something.
995    */
996   int found;
997 };
998
999
1000 /**
1001  * Callback function that will cause the item that is passed
1002  * in to be deleted (by returning GNUNET_NO).
1003  */
1004 static int
1005 remove_callback (void *cls,
1006                  void *next_cls,
1007                  const GNUNET_HashCode * key,
1008                  uint32_t size,
1009                  const void *data,
1010                  uint32_t type,
1011                  uint32_t priority,
1012                  uint32_t anonymity,
1013                  struct GNUNET_TIME_Absolute
1014                  expiration, uint64_t uid)
1015 {
1016   struct RemoveContext *rc = cls;
1017
1018   if (key == NULL)
1019     {
1020 #if DEBUG_DATASTORE
1021       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022                   "No further matches for `%s' request.\n",
1023                   "REMOVE");
1024 #endif  
1025       if (GNUNET_YES == rc->found)
1026         transmit_status (rc->client, GNUNET_OK, NULL);       
1027       else
1028         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1029       GNUNET_SERVER_client_drop (rc->client);
1030       GNUNET_free (rc);
1031       return GNUNET_OK; /* last item */
1032     }
1033   rc->found = GNUNET_YES;
1034 #if DEBUG_DATASTORE
1035   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1036               "Item %llu matches `%s' request.\n",
1037               (unsigned long long) uid,
1038               "REMOVE");
1039 #endif  
1040   GNUNET_CONTAINER_bloomfilter_remove (filter,
1041                                        key);
1042   plugin->api->next_request (next_cls, GNUNET_YES);
1043   return GNUNET_NO;
1044 }
1045
1046
1047 /**
1048  * Handle REMOVE-message.
1049  *
1050  * @param cls closure
1051  * @param client identification of the client
1052  * @param message the actual message
1053  */
1054 static void
1055 handle_remove (void *cls,
1056              struct GNUNET_SERVER_Client *client,
1057              const struct GNUNET_MessageHeader *message)
1058 {
1059   const struct DataMessage *dm = check_data (message);
1060   GNUNET_HashCode vhash;
1061   struct RemoveContext *rc;
1062
1063 #if DEBUG_DATASTORE
1064   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1065               "Processing `%s' request\n",
1066               "REMOVE");
1067 #endif
1068   if (dm == NULL)
1069     {
1070       GNUNET_break (0);
1071       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1072       return;
1073     }
1074   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1075   GNUNET_SERVER_client_keep (client);
1076   rc->client = client;
1077   GNUNET_CRYPTO_hash (&dm[1],
1078                       ntohl(dm->size),
1079                       &vhash);
1080   plugin->api->get (plugin->api->cls,
1081                     &dm->key,
1082                     &vhash,
1083                     ntohl(dm->type),
1084                     &remove_callback,
1085                     rc);
1086 }
1087
1088
1089 /**
1090  * Handle DROP-message.
1091  *
1092  * @param cls closure
1093  * @param client identification of the client
1094  * @param message the actual message
1095  */
1096 static void
1097 handle_drop (void *cls,
1098              struct GNUNET_SERVER_Client *client,
1099              const struct GNUNET_MessageHeader *message)
1100 {
1101 #if DEBUG_DATASTORE
1102   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103               "Processing `%s' request\n",
1104               "DROP");
1105 #endif
1106   plugin->api->drop (plugin->api->cls);
1107   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1108 }
1109
1110
1111 /**
1112  * List of handlers for the messages understood by this
1113  * service.
1114  */
1115 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1116   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1117    sizeof(struct ReserveMessage) }, 
1118   {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1119    sizeof(struct ReleaseReserveMessage) }, 
1120   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1121   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1122    sizeof (struct UpdateMessage) }, 
1123   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1124   {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1125    sizeof(struct GNUNET_MessageHeader) }, 
1126   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1127   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1128    sizeof(struct GNUNET_MessageHeader) }, 
1129   {NULL, NULL, 0, 0}
1130 };
1131
1132
1133
1134 /**
1135  * Load the datastore plugin.
1136  */
1137 static struct DatastorePlugin *
1138 load_plugin () 
1139 {
1140   struct DatastorePlugin *ret;
1141   char *libname;
1142   char *name;
1143
1144   if (GNUNET_OK !=
1145       GNUNET_CONFIGURATION_get_value_string (cfg,
1146                                              "DATASTORE", "DATABASE", &name))
1147     {
1148       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1149                   _("No `%s' specified for `%s' in configuration!\n"),
1150                   "DATABASE",
1151                   "DATASTORE");
1152       return NULL;
1153     }
1154   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1155   ret->env.cfg = cfg;
1156   ret->env.sched = sched;  
1157   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1158               _("Loading `%s' datastore plugin\n"), name);
1159   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1160   ret->short_name = name;
1161   ret->lib_name = libname;
1162   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1163   if (ret->api == NULL)
1164     {
1165       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1166                   _("Failed to load datastore plugin for `%s'\n"), name);
1167       GNUNET_free (ret->short_name);
1168       GNUNET_free (libname);
1169       GNUNET_free (ret);
1170       return NULL;
1171     }
1172   return ret;
1173 }
1174
1175
1176 /**
1177  * Function called when the service shuts
1178  * down.  Unloads our datastore plugin.
1179  *
1180  * @param plug plugin to unload
1181  */
1182 static void
1183 unload_plugin (struct DatastorePlugin *plug)
1184 {
1185 #if DEBUG_DATASTORE
1186   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1187               "Datastore service is unloading plugin...\n");
1188 #endif
1189   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1190   GNUNET_free (plug->lib_name);
1191   GNUNET_free (plug->short_name);
1192   GNUNET_free (plug);
1193 }
1194
1195
1196 /**
1197  * Last task run during shutdown.  Disconnects us from
1198  * the transport and core.
1199  */
1200 static void
1201 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1202 {
1203   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1204     {
1205       GNUNET_SCHEDULER_cancel (sched,
1206                                expired_kill_task);
1207       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1208     }
1209   unload_plugin (plugin);
1210   plugin = NULL;
1211   if (filter != NULL)
1212     {
1213       GNUNET_CONTAINER_bloomfilter_free (filter);
1214       filter = NULL;
1215     }
1216   GNUNET_ARM_stop_services (cfg, tc->sched, "statistics", NULL);
1217 }
1218
1219
1220 /**
1221  * Function that removes all active reservations made
1222  * by the given client and releases the space for other
1223  * requests.
1224  *
1225  * @param cls closure
1226  * @param client identification of the client
1227  */
1228 static void
1229 cleanup_reservations (void *cls,
1230                       struct GNUNET_SERVER_Client
1231                       * client)
1232 {
1233   struct ReservationList *pos;
1234   struct ReservationList *prev;
1235   struct ReservationList *next;
1236
1237   if (client == NULL)
1238     return;
1239   prev = NULL;
1240   pos = reservations;
1241   while (NULL != pos)
1242     {
1243       next = pos->next;
1244       if (pos->client == client)
1245         {
1246           if (prev == NULL)
1247             reservations = next;
1248           else
1249             prev->next = next;
1250           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1251           GNUNET_free (pos);
1252         }
1253       else
1254         {
1255           prev = pos;
1256         }
1257       pos = next;
1258     }
1259 }
1260
1261
1262 /**
1263  * Process datastore requests.
1264  *
1265  * @param cls closure
1266  * @param s scheduler to use
1267  * @param server the initialized server
1268  * @param c configuration to use
1269  */
1270 static void
1271 run (void *cls,
1272      struct GNUNET_SCHEDULER_Handle *s,
1273      struct GNUNET_SERVER_Handle *server,
1274      const struct GNUNET_CONFIGURATION_Handle *c)
1275 {
1276   char *fn;
1277   unsigned int bf_size;
1278
1279   sched = s;
1280   cfg = c;
1281   if (GNUNET_OK !=
1282       GNUNET_CONFIGURATION_get_value_number (cfg,
1283                                              "DATASTORE", "QUOTA", &quota))
1284     {
1285       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1286                   _("No `%s' specified for `%s' in configuration!\n"),
1287                   "QUOTA",
1288                   "DATASTORE");
1289       return;
1290     }
1291   cache_size = quota / 8; /* Or should we make this an option? */
1292   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1293   fn = NULL;
1294   if ( (GNUNET_OK !=
1295         GNUNET_CONFIGURATION_get_value_filename (cfg,
1296                                                  "DATASTORE",
1297                                                  "BLOOMFILTER",
1298                                                  &fn)) ||
1299        (GNUNET_OK !=
1300         GNUNET_DISK_directory_create_for_file (fn)) )
1301     {
1302       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1303                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1304                   fn != NULL ? fn : "");
1305       GNUNET_free_non_null (fn);
1306       fn = NULL;
1307     }
1308   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1309   GNUNET_free_non_null (fn);
1310   if (filter == NULL)
1311     {
1312       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1313                   _("Failed to initialize bloomfilter.\n"));
1314       return;
1315     }
1316   GNUNET_ARM_start_services (cfg, sched, "statistics", NULL);
1317   plugin = load_plugin ();
1318   if (NULL == plugin)
1319     {
1320       GNUNET_CONTAINER_bloomfilter_free (filter);
1321       filter = NULL;
1322       GNUNET_ARM_stop_services (cfg, sched, "statistics", NULL);
1323       return;
1324     }
1325   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1326   GNUNET_SERVER_add_handlers (server, handlers);
1327   expired_kill_task
1328     = GNUNET_SCHEDULER_add_with_priority (sched,
1329                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1330                                           &delete_expired, NULL);
1331   GNUNET_SCHEDULER_add_delayed (sched,
1332                                 GNUNET_TIME_UNIT_FOREVER_REL,
1333                                 &cleaning_task, NULL);
1334   
1335 }
1336
1337
1338 /**
1339  * The main function for the datastore service.
1340  *
1341  * @param argc number of arguments from the command line
1342  * @param argv command line arguments
1343  * @return 0 ok, 1 on error
1344  */
1345 int
1346 main (int argc, char *const *argv)
1347 {
1348   int ret;
1349
1350   ret = (GNUNET_OK ==
1351          GNUNET_SERVICE_run (argc,
1352                              argv,
1353                              "datastore",
1354                              GNUNET_SERVICE_OPTION_NONE,
1355                              &run, NULL)) ? 0 : 1;
1356   return ret;
1357 }
1358
1359
1360 /* end of gnunet-service-datastore.c */