extra debug for datastore, stats api fix
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "plugin_datastore.h"
31 #include "datastore.h"
32
33 /**
34  * How many messages do we queue at most per client?
35  */
36 #define MAX_PENDING 1024
37
38 /**
39  * How long are we at most keeping "expired" content
40  * past the expiration date in the database?
41  */
42 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
43
44
45
46 /**
47  * Our datastore plugin.
48  */
49 struct DatastorePlugin
50 {
51
52   /**
53    * API of the transport as returned by the plugin's
54    * initialization function.
55    */
56   struct GNUNET_DATASTORE_PluginFunctions *api;
57
58   /**
59    * Short name for the plugin (i.e. "sqlite").
60    */
61   char *short_name;
62
63   /**
64    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
65    */
66   char *lib_name;
67
68   /**
69    * Environment this transport service is using
70    * for this plugin.
71    */
72   struct GNUNET_DATASTORE_PluginEnvironment env;
73
74 };
75
76
77 /**
78  * Linked list of active reservations.
79  */
80 struct ReservationList 
81 {
82
83   /**
84    * This is a linked list.
85    */
86   struct ReservationList *next;
87
88   /**
89    * Client that made the reservation.
90    */
91   struct GNUNET_SERVER_Client *client;
92
93   /**
94    * Number of bytes (still) reserved.
95    */
96   uint64_t amount;
97
98   /**
99    * Number of items (still) reserved.
100    */
101   uint64_t entries;
102
103   /**
104    * Reservation identifier.
105    */
106   int32_t rid;
107
108 };
109
110
111 /**
112  * Our datastore plugin (NULL if not available).
113  */
114 static struct DatastorePlugin *plugin;
115
116 /**
117  * Linked list of space reservations made by clients.
118  */
119 static struct ReservationList *reservations;
120
121 /**
122  * Bloomfilter to quickly tell if we don't have the content.
123  */
124 static struct GNUNET_CONTAINER_BloomFilter *filter;
125
126 /**
127  * Static counter to produce reservation identifiers.
128  */
129 static int reservation_gen;
130
131 /**
132  * How much space are we allowed to use?
133  */
134 static unsigned long long quota;
135
136 /**
137  * How much space are we using for the cache?  (space available for
138  * insertions that will be instantly reclaimed by discarding less
139  * important content --- or possibly whatever we just inserted into
140  * the "cache").
141  */
142 static unsigned long long cache_size;
143
144 /**
145  * How much space have we currently reserved?
146  */
147 static unsigned long long reserved;
148
149 /**
150  * Identity of the task that is used to delete
151  * expired content.
152  */
153 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
154
155 /**
156  * Our configuration.
157  */
158 const struct GNUNET_CONFIGURATION_Handle *cfg;
159
160 /**
161  * Our scheduler.
162  */
163 struct GNUNET_SCHEDULER_Handle *sched; 
164
165 /**
166  * Function called once the transmit operation has
167  * either failed or succeeded.
168  *
169  * @param cls closure
170  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
171  */
172 typedef void (*TransmitContinuation)(void *cls,
173                                      int status);
174
175
176 struct TransmitCallbackContext 
177 {
178   /**
179    * The message that we're asked to transmit.
180    */
181   struct GNUNET_MessageHeader *msg;
182
183   /**
184    * Client that we are transmitting to.
185    */
186   struct GNUNET_SERVER_Client *client;
187
188   /**
189    * Function to call once msg has been transmitted
190    * (or at least added to the buffer).
191    */
192   TransmitContinuation tc;
193
194   /**
195    * Closure for tc.
196    */
197   void *tc_cls;
198
199   /**
200    * GNUNET_YES if we are supposed to signal the server
201    * completion of the client's request.
202    */
203   int end;
204 };
205
206
207 /**
208  * Task that is used to remove expired entries from
209  * the datastore.  This task will schedule itself
210  * again automatically to always delete all expired
211  * content quickly.
212  *
213  * @param cls not used
214  * @param tc task context
215  */ 
216 static void
217 delete_expired (void *cls,
218                 const struct GNUNET_SCHEDULER_TaskContext *tc);
219
220
221 /**
222  * Iterate over the expired items stored in the datastore.
223  * Delete all expired items; once we have processed all
224  * expired items, re-schedule the "delete_expired" task.
225  *
226  * @param cls not used
227  * @param next_cls closure to pass to the "next" function.
228  * @param key key for the content
229  * @param size number of bytes in data
230  * @param data content stored
231  * @param type type of the content
232  * @param priority priority of the content
233  * @param anonymity anonymity-level for the content
234  * @param expiration expiration time for the content
235  * @param uid unique identifier for the datum;
236  *        maybe 0 if no unique identifier is available
237  *
238  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
239  *         (continue on call to "next", of course),
240  *         GNUNET_NO to delete the item and continue (if supported)
241  */
242 static int 
243 expired_processor (void *cls,
244                    void *next_cls,
245                    const GNUNET_HashCode * key,
246                    uint32_t size,
247                    const void *data,
248                    uint32_t type,
249                    uint32_t priority,
250                    uint32_t anonymity,
251                    struct GNUNET_TIME_Absolute
252                    expiration, 
253                    uint64_t uid)
254 {
255   struct GNUNET_TIME_Absolute now;
256
257   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
258   if (key == NULL) 
259     {
260       expired_kill_task 
261         = GNUNET_SCHEDULER_add_delayed (sched,
262                                         GNUNET_NO,
263                                         GNUNET_SCHEDULER_PRIORITY_IDLE,
264                                         GNUNET_SCHEDULER_NO_TASK,
265                                         MAX_EXPIRE_DELAY,
266                                         &delete_expired,
267                                         NULL);
268       return GNUNET_SYSERR;
269     }
270   now = GNUNET_TIME_absolute_get ();
271   if (expiration.value > now.value)
272     {
273       /* finished processing */
274       plugin->api->next_request (next_cls, GNUNET_YES);
275       return GNUNET_SYSERR;
276     }
277   plugin->api->next_request (next_cls, GNUNET_NO);
278 #if DEBUG_DATASTORE
279   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
280               "Deleting content that expired %llu ms ago\n",
281               (unsigned long long) (now.value - expiration.value));
282 #endif
283   GNUNET_CONTAINER_bloomfilter_remove (filter,
284                                        key);
285   return GNUNET_NO; /* delete */
286 }
287
288
289 /**
290  * Task that is used to remove expired entries from
291  * the datastore.  This task will schedule itself
292  * again automatically to always delete all expired
293  * content quickly.
294  *
295  * @param cls not used
296  * @param tc task context
297  */ 
298 static void
299 delete_expired (void *cls,
300                 const struct GNUNET_SCHEDULER_TaskContext *tc)
301 {
302   plugin->api->iter_ascending_expiration (plugin->api->cls, 
303                                           0,
304                                           &expired_processor,
305                                           NULL);
306 }
307
308
309 /**
310  * An iterator over a set of items stored in the datastore.
311  *
312  * @param cls closure
313  * @param next_cls closure to pass to the "next" function.
314  * @param key key for the content
315  * @param size number of bytes in data
316  * @param data content stored
317  * @param type type of the content
318  * @param priority priority of the content
319  * @param anonymity anonymity-level for the content
320  * @param expiration expiration time for the content
321  * @param uid unique identifier for the datum;
322  *        maybe 0 if no unique identifier is available
323  *
324  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
325  *         (continue on call to "next", of course),
326  *         GNUNET_NO to delete the item and continue (if supported)
327  */
328 static int 
329 manage (void *cls,
330         void *next_cls,
331         const GNUNET_HashCode * key,
332         uint32_t size,
333         const void *data,
334         uint32_t type,
335         uint32_t priority,
336         uint32_t anonymity,
337         struct GNUNET_TIME_Absolute
338         expiration, 
339         uint64_t uid)
340 {
341   unsigned long long *need = cls;
342
343   if (NULL == key)
344     {
345       GNUNET_free (need);
346       return GNUNET_SYSERR;
347     }
348   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
349     *need = 0;
350   else
351     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
352   plugin->api->next_request (next_cls, 
353                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
354 #if DEBUG_DATASTORE
355   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
356               "Deleting %llu bytes of low-priority content (still trying to free another %llu bytes)\n",
357               size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
358               *need);
359 #endif
360   GNUNET_CONTAINER_bloomfilter_remove (filter,
361                                        key);
362   return GNUNET_NO;
363 }
364
365
366 /**
367  * Manage available disk space by running tasks
368  * that will discard content if necessary.  This
369  * function will be run whenever a request for
370  * "need" bytes of storage could only be satisfied
371  * by eating into the "cache" (and we want our cache
372  * space back).
373  *
374  * @param need number of bytes of content that were
375  *        placed into the "cache" (and hence the
376  *        number of bytes that should be removed).
377  */
378 static void
379 manage_space (unsigned long long need)
380 {
381   unsigned long long *n;
382
383 #if DEBUG_DATASTORE
384   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
385               "Asked to free up %llu bytes of cache space\n",
386               need);
387 #endif
388   n = GNUNET_malloc (sizeof(unsigned long long));
389   *n = need;
390   plugin->api->iter_low_priority (plugin->api->cls,
391                                   0,
392                                   &manage,
393                                   n);
394 }
395
396
397 /**
398  * Function called to notify a client about the socket
399  * begin ready to queue more data.  "buf" will be
400  * NULL and "size" zero if the socket was closed for
401  * writing in the meantime.
402  *
403  * @param cls closure
404  * @param size number of bytes available in buf
405  * @param buf where the callee should write the message
406  * @return number of bytes written to buf
407  */
408 static size_t
409 transmit_callback (void *cls,
410                    size_t size, void *buf)
411 {
412   struct TransmitCallbackContext *tcc = cls;
413   size_t msize;
414   
415   msize = ntohs(tcc->msg->size);
416   if (size == 0)
417     {
418 #if DEBUG_DATASTORE
419       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
420                   "Transmission failed.\n");
421 #endif
422       if (tcc->tc != NULL)
423         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
424       if (GNUNET_YES == tcc->end)
425         {
426           GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
427         }
428       GNUNET_free (tcc->msg);
429       GNUNET_free (tcc);
430       return 0;
431     }
432   GNUNET_assert (size >= msize);
433   memcpy (buf, tcc->msg, msize);
434   if (tcc->tc != NULL)
435     tcc->tc (tcc->tc_cls, GNUNET_OK);
436   if (GNUNET_YES == tcc->end)
437     {
438       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
439     }
440   else
441     {
442 #if DEBUG_DATASTORE
443       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
444                   "Response transmitted, more pending!\n");
445 #endif
446     }
447   GNUNET_free (tcc->msg);
448   GNUNET_free (tcc);
449   return msize;
450 }
451
452
453 /**
454  * Transmit the given message to the client.
455  *
456  * @param client target of the message
457  * @param msg message to transmit, will be freed!
458  * @param tc function to call afterwards
459  * @param tc_cls closure for tc
460  * @param end is this the last response (and we should
461  *        signal the server completion accodingly after
462  *        transmitting this message)?
463  */
464 static void
465 transmit (struct GNUNET_SERVER_Client *client,
466           struct GNUNET_MessageHeader *msg,
467           TransmitContinuation tc,
468           void *tc_cls,
469           int end)
470 {
471   struct TransmitCallbackContext *tcc;
472
473   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
474   tcc->msg = msg;
475   tcc->client = client;
476   tcc->tc = tc;
477   tcc->tc_cls = tc_cls;
478   tcc->end = end;
479
480   if (NULL ==
481       GNUNET_SERVER_notify_transmit_ready (client,
482                                            ntohs(msg->size),
483                                            GNUNET_TIME_UNIT_FOREVER_REL,
484                                            &transmit_callback,
485                                            tcc))
486     {
487       GNUNET_break (0);
488       if (GNUNET_YES == end)
489         {
490 #if DEBUG_DATASTORE
491           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
492                       "Disconnecting client.\n");
493 #endif    
494           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
495         }
496       if (NULL != tc)
497         tc (tc_cls, GNUNET_SYSERR);
498       GNUNET_free (msg);
499       GNUNET_free (tcc);
500     }
501 }
502
503
504 /**
505  * Transmit a status code to the client.
506  *
507  * @param client receiver of the response
508  * @param code status code
509  * @param msg optional error message (can be NULL)
510  */
511 static void
512 transmit_status (struct GNUNET_SERVER_Client *client,
513                  int code,
514                  const char *msg)
515 {
516   struct StatusMessage *sm;
517   size_t slen;
518
519 #if DEBUG_DATASTORE
520   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521               "Transmitting `%s' message with value %d and message `%s'\n",
522               "STATUS",
523               code,
524               msg != NULL ? msg : "(none)");
525 #endif
526   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
527   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
528   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
529   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
530   sm->status = htonl(code);
531   if (slen > 0)
532     memcpy (&sm[1], msg, slen);  
533   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
534 }
535
536
537 /**
538  * Function called once the transmit operation has
539  * either failed or succeeded.
540  *
541  * @param next_cls closure for calling "next_request" callback
542  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
543  */
544 static void 
545 get_next(void *next_cls,
546          int status)
547 {
548   if (status != GNUNET_OK)
549     {
550       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
551                   _("Failed to transmit an item to the client; aborting iteration.\n"));    
552       plugin->api->next_request (next_cls, GNUNET_YES);
553       return;
554     }
555   plugin->api->next_request (next_cls, GNUNET_NO);
556 }
557
558
559 /**
560  * Function that will transmit the given datastore entry
561  * to the client.
562  *
563  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
564  * @param next_cls closure to use to ask for the next item
565  * @param key key for the content
566  * @param size number of bytes in data
567  * @param data content stored
568  * @param type type of the content
569  * @param priority priority of the content
570  * @param anonymity anonymity-level for the content
571  * @param expiration expiration time for the content
572  * @param uid unique identifier for the datum;
573  *        maybe 0 if no unique identifier is available
574  *
575  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
576  *         GNUNET_NO to delete the item and continue (if supported)
577  */
578 static int
579 transmit_item (void *cls,
580                void *next_cls,
581                const GNUNET_HashCode * key,
582                uint32_t size,
583                const void *data,
584                uint32_t type,
585                uint32_t priority,
586                uint32_t anonymity,
587                struct GNUNET_TIME_Absolute
588                expiration, uint64_t uid)
589 {
590   struct GNUNET_SERVER_Client *client = cls;
591   struct GNUNET_MessageHeader *end;
592   struct DataMessage *dm;
593
594   if (key == NULL)
595     {
596       /* transmit 'DATA_END' */
597 #if DEBUG_DATASTORE
598       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599                   "Transmitting `%s' message\n",
600                   "DATA_END");
601 #endif
602       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
603       end->size = htons(sizeof(struct GNUNET_MessageHeader));
604       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
605       transmit (client, end, NULL, NULL, GNUNET_YES);
606       GNUNET_SERVER_client_drop (client);
607       return GNUNET_OK;
608     }
609   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
610   dm->header.size = htons(sizeof(struct DataMessage) + size);
611   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
612   dm->rid = htonl(0);
613   dm->size = htonl(size);
614   dm->type = htonl(type);
615   dm->priority = htonl(priority);
616   dm->anonymity = htonl(anonymity);
617   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
618   dm->uid = GNUNET_htonll(uid);
619   dm->key = *key;
620   memcpy (&dm[1], data, size);
621 #if DEBUG_DATASTORE
622   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
623               "Transmitting `%s' message\n",
624               "DATA");
625 #endif
626   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
627   return GNUNET_OK;
628 }
629
630
631 /**
632  * Handle RESERVE-message.
633  *
634  * @param cls closure
635  * @param client identification of the client
636  * @param message the actual message
637  */
638 static void
639 handle_reserve (void *cls,
640                 struct GNUNET_SERVER_Client *client,
641                 const struct GNUNET_MessageHeader *message)
642 {
643   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
644   struct ReservationList *e;
645   unsigned long long used;
646   unsigned long long req;
647   uint64_t amount;
648   uint32_t entries;
649
650 #if DEBUG_DATASTORE
651   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
652               "Processing `%s' request\n",
653               "RESERVE");
654 #endif
655   amount = GNUNET_ntohll(msg->amount);
656   entries = ntohl(msg->entries);
657   used = plugin->api->get_size (plugin->api->cls) + reserved;
658   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
659   if (used + req > quota)
660     {
661       if (quota < used)
662         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
663       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
664                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
665                   quota - used,
666                   "RESERVE",
667                   req);
668       if (cache_size < req)
669         {
670           /* TODO: document this in the FAQ; essentially, if this
671              message happens, the insertion request could be blocked
672              by less-important content from migration because it is
673              larger than 1/8th of the overall available space, and
674              we only reserve 1/8th for "fresh" insertions */
675           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
676                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
677                       req,
678                       cache_size);
679           transmit_status (client, 0, 
680                            gettext_noop ("Insufficient space to satisfy request and "
681                                          "requested amount is larger than cache size"));
682         }
683       else
684         {
685           transmit_status (client, 0, 
686                            gettext_noop ("Insufficient space to satisfy request"));
687         }
688       return;      
689     }
690   reserved += req;
691   e = GNUNET_malloc (sizeof(struct ReservationList));
692   e->next = reservations;
693   reservations = e;
694   e->client = client;
695   e->amount = amount;
696   e->entries = entries;
697   e->rid = ++reservation_gen;
698   if (reservation_gen < 0)
699     reservation_gen = 0; /* wrap around */
700   transmit_status (client, e->rid, NULL);
701 }
702
703
704 /**
705  * Handle RELEASE_RESERVE-message.
706  *
707  * @param cls closure
708  * @param client identification of the client
709  * @param message the actual message
710  */
711 static void
712 handle_release_reserve (void *cls,
713                         struct GNUNET_SERVER_Client *client,
714                         const struct GNUNET_MessageHeader *message)
715 {
716   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
717   struct ReservationList *pos;
718   struct ReservationList *prev;
719   struct ReservationList *next;
720   int rid = ntohl(msg->rid);
721   unsigned long long rem;
722
723 #if DEBUG_DATASTORE
724   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
725               "Processing `%s' request\n",
726               "RELEASE_RESERVE");
727 #endif
728   next = reservations;
729   prev = NULL;
730   while (NULL != (pos = next))
731     {
732       next = pos->next;
733       if (rid == pos->rid)
734         {
735           if (prev == NULL)
736             reservations = next;
737           else
738             prev->next = next;
739           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
740           GNUNET_assert (reserved >= rem);
741           reserved -= rem;
742 #if DEBUG_DATASTORE
743           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744                       "Returning %llu remaining reserved bytes to storage pool\n",
745                       rem);
746 #endif    
747           GNUNET_free (pos);
748           transmit_status (client, GNUNET_OK, NULL);
749           return;
750         }       
751       prev = pos;
752     }
753   GNUNET_break (0);
754   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
755 }
756
757
758 /**
759  * Check that the given message is a valid data message.
760  *
761  * @return NULL if the message is not well-formed, otherwise the message
762  */
763 static const struct DataMessage *
764 check_data (const struct GNUNET_MessageHeader *message)
765 {
766   uint16_t size;
767   uint32_t dsize;
768   const struct DataMessage *dm;
769
770   size = ntohs(message->size);
771   if (size < sizeof(struct DataMessage))
772     { 
773       GNUNET_break (0);
774       return NULL;
775     }
776   dm = (const struct DataMessage *) message;
777   dsize = ntohl(dm->size);
778   if (size != dsize + sizeof(struct DataMessage))
779     {
780       GNUNET_break (0);
781       return NULL;
782     }
783   return dm;
784 }
785
786
787 /**
788  * Handle PUT-message.
789  *
790  * @param cls closure
791  * @param client identification of the client
792  * @param message the actual message
793  */
794 static void
795 handle_put (void *cls,
796             struct GNUNET_SERVER_Client *client,
797             const struct GNUNET_MessageHeader *message)
798 {
799   const struct DataMessage *dm = check_data (message);
800   char *msg;
801   int ret;
802   int rid;
803   struct ReservationList *pos;
804   uint32_t size;
805
806 #if DEBUG_DATASTORE
807   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808               "Processing `%s' request\n",
809               "PUT");
810 #endif
811   if (ntohl(dm->type) == 0) 
812     {
813       GNUNET_break (0);
814       dm = NULL;
815     }
816   if (dm == NULL)
817     {
818       GNUNET_break (0);
819       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
820       return;
821     }
822   rid = ntohl(dm->rid);
823   size = ntohl(dm->size);
824   if (rid > 0)
825     {
826       pos = reservations;
827       while ( (NULL != pos) &&
828               (rid != pos->rid) )
829         pos = pos->next;
830       GNUNET_break (pos != NULL);
831       if (NULL != pos)
832         {
833           GNUNET_break (pos->entries > 0);
834           GNUNET_break (pos->amount > size);
835           pos->entries--;
836           pos->amount -= size;
837           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
838         }
839     }
840   msg = NULL;
841   ret = plugin->api->put (plugin->api->cls,
842                           &dm->key,
843                           size,
844                           &dm[1],
845                           ntohl(dm->type),
846                           ntohl(dm->priority),
847                           ntohl(dm->anonymity),
848                           GNUNET_TIME_absolute_ntoh(dm->expiration),
849                           &msg);
850   if (GNUNET_OK == ret)
851     {
852       GNUNET_CONTAINER_bloomfilter_add (filter,
853                                         &dm->key);
854 #if DEBUG_DATASTORE
855       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
856                   "Successfully stored %u bytes under key `%s'\n",
857                   size,
858                   GNUNET_h2s (&dm->key));
859 #endif
860     }
861   transmit_status (client, 
862                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
863                    msg);
864   GNUNET_free_non_null (msg);
865   if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
866     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
867 }
868
869
870 /**
871  * Handle GET-message.
872  *
873  * @param cls closure
874  * @param client identification of the client
875  * @param message the actual message
876  */
877 static void
878 handle_get (void *cls,
879              struct GNUNET_SERVER_Client *client,
880              const struct GNUNET_MessageHeader *message)
881 {
882   const struct GetMessage *msg;
883   uint16_t size;
884
885 #if DEBUG_DATASTORE
886   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
887               "Processing `%s' request\n",
888               "GET");
889 #endif
890   size = ntohs(message->size);
891   if ( (size != sizeof(struct GetMessage)) &&
892        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
893     {
894       GNUNET_break (0);
895       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
896       return;
897     }
898   msg = (const struct GetMessage*) message;
899   if ( (size == sizeof(struct GetMessage)) &&
900        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
901                                                          &msg->key)) )
902     {
903       /* don't bother database... */
904 #if DEBUG_DATASTORE
905       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
906                   "Empty result set for `%s' request for `%s'.\n",
907                   "GET",
908                   GNUNET_h2s (&msg->key));
909 #endif  
910       GNUNET_SERVER_client_keep (client);
911       transmit_item (client,
912                      NULL, NULL, 0, NULL, 0, 0, 0, 
913                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
914       return;
915     }
916   GNUNET_SERVER_client_keep (client);
917   plugin->api->get (plugin->api->cls,
918                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
919                     NULL,
920                     ntohl(msg->type),
921                     &transmit_item,
922                     client);    
923 }
924
925
926 /**
927  * Handle UPDATE-message.
928  *
929  * @param cls closure
930  * @param client identification of the client
931  * @param message the actual message
932  */
933 static void
934 handle_update (void *cls,
935                struct GNUNET_SERVER_Client *client,
936                const struct GNUNET_MessageHeader *message)
937 {
938   const struct UpdateMessage *msg;
939   int ret;
940   char *emsg;
941
942 #if DEBUG_DATASTORE
943   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
944               "Processing `%s' request\n",
945               "UPDATE");
946 #endif
947   msg = (const struct UpdateMessage*) message;
948   emsg = NULL;
949   ret = plugin->api->update (plugin->api->cls,
950                              GNUNET_ntohll(msg->uid),
951                              (int32_t) ntohl(msg->priority),
952                              GNUNET_TIME_absolute_ntoh(msg->expiration),
953                              &emsg);
954   transmit_status (client, ret, emsg);
955   GNUNET_free_non_null (emsg);
956 }
957
958
959 /**
960  * Handle GET_RANDOM-message.
961  *
962  * @param cls closure
963  * @param client identification of the client
964  * @param message the actual message
965  */
966 static void
967 handle_get_random (void *cls,
968                    struct GNUNET_SERVER_Client *client,
969                    const struct GNUNET_MessageHeader *message)
970 {
971 #if DEBUG_DATASTORE
972   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
973               "Processing `%s' request\n",
974               "GET_RANDOM");
975 #endif
976   GNUNET_SERVER_client_keep (client);
977   plugin->api->iter_migration_order (plugin->api->cls,
978                                      0,
979                                      &transmit_item,
980                                      client);  
981 }
982
983
984 /**
985  * Context for the 'remove_callback'.
986  */
987 struct RemoveContext 
988 {
989   /**
990    * Client for whom we're doing the remvoing.
991    */
992   struct GNUNET_SERVER_Client *client;
993
994   /**
995    * GNUNET_YES if we managed to remove something.
996    */
997   int found;
998 };
999
1000
1001 /**
1002  * Callback function that will cause the item that is passed
1003  * in to be deleted (by returning GNUNET_NO).
1004  */
1005 static int
1006 remove_callback (void *cls,
1007                  void *next_cls,
1008                  const GNUNET_HashCode * key,
1009                  uint32_t size,
1010                  const void *data,
1011                  uint32_t type,
1012                  uint32_t priority,
1013                  uint32_t anonymity,
1014                  struct GNUNET_TIME_Absolute
1015                  expiration, uint64_t uid)
1016 {
1017   struct RemoveContext *rc = cls;
1018
1019   if (key == NULL)
1020     {
1021 #if DEBUG_DATASTORE
1022       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1023                   "No further matches for `%s' request.\n",
1024                   "REMOVE");
1025 #endif  
1026       if (GNUNET_YES == rc->found)
1027         transmit_status (rc->client, GNUNET_OK, NULL);       
1028       else
1029         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1030       GNUNET_SERVER_client_drop (rc->client);
1031       GNUNET_free (rc);
1032       return GNUNET_OK; /* last item */
1033     }
1034   rc->found = GNUNET_YES;
1035 #if DEBUG_DATASTORE
1036   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1037               "Item %llu matches `%s' request.\n",
1038               (unsigned long long) uid,
1039               "REMOVE");
1040 #endif  
1041   GNUNET_CONTAINER_bloomfilter_remove (filter,
1042                                        key);
1043   plugin->api->next_request (next_cls, GNUNET_YES);
1044   return GNUNET_NO;
1045 }
1046
1047
1048 /**
1049  * Handle REMOVE-message.
1050  *
1051  * @param cls closure
1052  * @param client identification of the client
1053  * @param message the actual message
1054  */
1055 static void
1056 handle_remove (void *cls,
1057              struct GNUNET_SERVER_Client *client,
1058              const struct GNUNET_MessageHeader *message)
1059 {
1060   const struct DataMessage *dm = check_data (message);
1061   GNUNET_HashCode vhash;
1062   struct RemoveContext *rc;
1063
1064 #if DEBUG_DATASTORE
1065   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1066               "Processing `%s' request\n",
1067               "REMOVE");
1068 #endif
1069   if (dm == NULL)
1070     {
1071       GNUNET_break (0);
1072       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1073       return;
1074     }
1075   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1076   GNUNET_SERVER_client_keep (client);
1077   rc->client = client;
1078   GNUNET_CRYPTO_hash (&dm[1],
1079                       ntohl(dm->size),
1080                       &vhash);
1081   plugin->api->get (plugin->api->cls,
1082                     &dm->key,
1083                     &vhash,
1084                     ntohl(dm->type),
1085                     &remove_callback,
1086                     rc);
1087 }
1088
1089
1090 /**
1091  * Handle DROP-message.
1092  *
1093  * @param cls closure
1094  * @param client identification of the client
1095  * @param message the actual message
1096  */
1097 static void
1098 handle_drop (void *cls,
1099              struct GNUNET_SERVER_Client *client,
1100              const struct GNUNET_MessageHeader *message)
1101 {
1102 #if DEBUG_DATASTORE
1103   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1104               "Processing `%s' request\n",
1105               "DROP");
1106 #endif
1107   plugin->api->drop (plugin->api->cls);
1108   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1109 }
1110
1111
1112 /**
1113  * List of handlers for the messages understood by this
1114  * service.
1115  */
1116 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1117   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1118    sizeof(struct ReserveMessage) }, 
1119   {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1120    sizeof(struct ReleaseReserveMessage) }, 
1121   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1122   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1123    sizeof (struct UpdateMessage) }, 
1124   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1125   {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1126    sizeof(struct GNUNET_MessageHeader) }, 
1127   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1128   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1129    sizeof(struct GNUNET_MessageHeader) }, 
1130   {NULL, NULL, 0, 0}
1131 };
1132
1133
1134
1135 /**
1136  * Load the datastore plugin.
1137  */
1138 static struct DatastorePlugin *
1139 load_plugin () 
1140 {
1141   struct DatastorePlugin *ret;
1142   char *libname;
1143   char *name;
1144
1145   if (GNUNET_OK !=
1146       GNUNET_CONFIGURATION_get_value_string (cfg,
1147                                              "DATASTORE", "DATABASE", &name))
1148     {
1149       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1150                   _("No `%s' specified for `%s' in configuration!\n"),
1151                   "DATABASE",
1152                   "DATASTORE");
1153       return NULL;
1154     }
1155   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1156   ret->env.cfg = cfg;
1157   ret->env.sched = sched;  
1158   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1159               _("Loading `%s' datastore plugin\n"), name);
1160   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1161   ret->short_name = name;
1162   ret->lib_name = libname;
1163   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1164   if (ret->api == NULL)
1165     {
1166       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1167                   _("Failed to load datastore plugin for `%s'\n"), name);
1168       GNUNET_free (ret->short_name);
1169       GNUNET_free (libname);
1170       GNUNET_free (ret);
1171       return NULL;
1172     }
1173   return ret;
1174 }
1175
1176
1177 /**
1178  * Function called when the service shuts
1179  * down.  Unloads our datastore plugin.
1180  *
1181  * @param plug plugin to unload
1182  */
1183 static void
1184 unload_plugin (struct DatastorePlugin *plug)
1185 {
1186 #if DEBUG_DATASTORE
1187   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1188               "Datastore service is unloading plugin...\n");
1189 #endif
1190   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1191   GNUNET_free (plug->lib_name);
1192   GNUNET_free (plug->short_name);
1193   GNUNET_free (plug);
1194 }
1195
1196
1197 /**
1198  * Last task run during shutdown.  Disconnects us from
1199  * the transport and core.
1200  */
1201 static void
1202 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1203 {
1204   unload_plugin (plugin);
1205   plugin = NULL;
1206   if (filter != NULL)
1207     {
1208       GNUNET_CONTAINER_bloomfilter_free (filter);
1209       filter = NULL;
1210     }
1211 }
1212
1213
1214 /**
1215  * Function that removes all active reservations made
1216  * by the given client and releases the space for other
1217  * requests.
1218  *
1219  * @param cls closure
1220  * @param client identification of the client
1221  */
1222 static void
1223 cleanup_reservations (void *cls,
1224                       struct GNUNET_SERVER_Client
1225                       * client)
1226 {
1227   struct ReservationList *pos;
1228   struct ReservationList *prev;
1229   struct ReservationList *next;
1230
1231   prev = NULL;
1232   pos = reservations;
1233   while (NULL != pos)
1234     {
1235       next = pos->next;
1236       if (pos->client == client)
1237         {
1238           if (prev == NULL)
1239             reservations = next;
1240           else
1241             prev->next = next;
1242           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1243           GNUNET_free (pos);
1244         }
1245       else
1246         {
1247           prev = pos;
1248         }
1249       pos = next;
1250     }
1251 }
1252
1253
1254 /**
1255  * Process datastore requests.
1256  *
1257  * @param cls closure
1258  * @param s scheduler to use
1259  * @param server the initialized server
1260  * @param c configuration to use
1261  */
1262 static void
1263 run (void *cls,
1264      struct GNUNET_SCHEDULER_Handle *s,
1265      struct GNUNET_SERVER_Handle *server,
1266      const struct GNUNET_CONFIGURATION_Handle *c)
1267 {
1268   char *fn;
1269   unsigned int bf_size;
1270
1271   sched = s;
1272   cfg = c;
1273   if (GNUNET_OK !=
1274       GNUNET_CONFIGURATION_get_value_number (cfg,
1275                                              "DATASTORE", "QUOTA", &quota))
1276     {
1277       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1278                   _("No `%s' specified for `%s' in configuration!\n"),
1279                   "QUOTA",
1280                   "DATASTORE");
1281       return;
1282     }
1283   cache_size = quota / 8; /* Or should we make this an option? */
1284   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1285   fn = NULL;
1286   if ( (GNUNET_OK !=
1287         GNUNET_CONFIGURATION_get_value_filename (cfg,
1288                                                  "DATASTORE",
1289                                                  "BLOOMFILTER",
1290                                                  &fn)) ||
1291        (GNUNET_OK !=
1292         GNUNET_DISK_directory_create_for_file (fn)) )
1293     {
1294       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1295                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1296                   fn != NULL ? fn : "");
1297       GNUNET_free_non_null (fn);
1298       fn = NULL;
1299     }
1300   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1301   GNUNET_free_non_null (fn);
1302   if (filter == NULL)
1303     {
1304       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1305                   _("Failed to initialize bloomfilter.\n"));
1306       return;
1307     }
1308   plugin = load_plugin ();
1309   if (NULL == plugin)
1310     {
1311       GNUNET_CONTAINER_bloomfilter_free (filter);
1312       filter = NULL;
1313       return;
1314     }
1315   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1316   GNUNET_SERVER_add_handlers (server, handlers);
1317   expired_kill_task
1318     = GNUNET_SCHEDULER_add_delayed (sched,
1319                                     GNUNET_NO,
1320                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
1321                                     GNUNET_SCHEDULER_NO_TASK,
1322                                     GNUNET_TIME_UNIT_ZERO,
1323                                     &delete_expired, NULL);
1324   GNUNET_SCHEDULER_add_delayed (sched,
1325                                 GNUNET_YES,
1326                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
1327                                 GNUNET_SCHEDULER_NO_TASK,
1328                                 GNUNET_TIME_UNIT_FOREVER_REL,
1329                                 &cleaning_task, NULL);
1330   
1331 }
1332
1333
1334 /**
1335  * The main function for the datastore service.
1336  *
1337  * @param argc number of arguments from the command line
1338  * @param argv command line arguments
1339  * @return 0 ok, 1 on error
1340  */
1341 int
1342 main (int argc, char *const *argv)
1343 {
1344   int ret;
1345
1346   ret = (GNUNET_OK ==
1347          GNUNET_SERVICE_run (argc,
1348                              argv,
1349                              "datastore", &run, NULL, NULL, NULL)) ? 0 : 1;
1350   return ret;
1351 }
1352
1353
1354 /* end of gnunet-service-datastore.c */