5944640c40d29922cdef1e4bad6ca9741ab29d93
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_arm_service.h"
30 #include "gnunet_protocols.h"
31 #include "plugin_datastore.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45
46
47 /**
48  * Our datastore plugin.
49  */
50 struct DatastorePlugin
51 {
52
53   /**
54    * API of the transport as returned by the plugin's
55    * initialization function.
56    */
57   struct GNUNET_DATASTORE_PluginFunctions *api;
58
59   /**
60    * Short name for the plugin (i.e. "sqlite").
61    */
62   char *short_name;
63
64   /**
65    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
66    */
67   char *lib_name;
68
69   /**
70    * Environment this transport service is using
71    * for this plugin.
72    */
73   struct GNUNET_DATASTORE_PluginEnvironment env;
74
75 };
76
77
78 /**
79  * Linked list of active reservations.
80  */
81 struct ReservationList 
82 {
83
84   /**
85    * This is a linked list.
86    */
87   struct ReservationList *next;
88
89   /**
90    * Client that made the reservation.
91    */
92   struct GNUNET_SERVER_Client *client;
93
94   /**
95    * Number of bytes (still) reserved.
96    */
97   uint64_t amount;
98
99   /**
100    * Number of items (still) reserved.
101    */
102   uint64_t entries;
103
104   /**
105    * Reservation identifier.
106    */
107   int32_t rid;
108
109 };
110
111
112 /**
113  * Our datastore plugin (NULL if not available).
114  */
115 static struct DatastorePlugin *plugin;
116
117 /**
118  * Linked list of space reservations made by clients.
119  */
120 static struct ReservationList *reservations;
121
122 /**
123  * Bloomfilter to quickly tell if we don't have the content.
124  */
125 static struct GNUNET_CONTAINER_BloomFilter *filter;
126
127 /**
128  * Static counter to produce reservation identifiers.
129  */
130 static int reservation_gen;
131
132 /**
133  * How much space are we allowed to use?
134  */
135 static unsigned long long quota;
136
137 /**
138  * How much space are we using for the cache?  (space available for
139  * insertions that will be instantly reclaimed by discarding less
140  * important content --- or possibly whatever we just inserted into
141  * the "cache").
142  */
143 static unsigned long long cache_size;
144
145 /**
146  * How much space have we currently reserved?
147  */
148 static unsigned long long reserved;
149
150 /**
151  * Identity of the task that is used to delete
152  * expired content.
153  */
154 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
155
156 /**
157  * Our configuration.
158  */
159 const struct GNUNET_CONFIGURATION_Handle *cfg;
160
161 /**
162  * Our scheduler.
163  */
164 struct GNUNET_SCHEDULER_Handle *sched; 
165
166 /**
167  * Function called once the transmit operation has
168  * either failed or succeeded.
169  *
170  * @param cls closure
171  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
172  */
173 typedef void (*TransmitContinuation)(void *cls,
174                                      int status);
175
176
177 struct TransmitCallbackContext 
178 {
179   /**
180    * The message that we're asked to transmit.
181    */
182   struct GNUNET_MessageHeader *msg;
183
184   /**
185    * Client that we are transmitting to.
186    */
187   struct GNUNET_SERVER_Client *client;
188
189   /**
190    * Function to call once msg has been transmitted
191    * (or at least added to the buffer).
192    */
193   TransmitContinuation tc;
194
195   /**
196    * Closure for tc.
197    */
198   void *tc_cls;
199
200   /**
201    * GNUNET_YES if we are supposed to signal the server
202    * completion of the client's request.
203    */
204   int end;
205 };
206
207
208 /**
209  * Task that is used to remove expired entries from
210  * the datastore.  This task will schedule itself
211  * again automatically to always delete all expired
212  * content quickly.
213  *
214  * @param cls not used
215  * @param tc task context
216  */ 
217 static void
218 delete_expired (void *cls,
219                 const struct GNUNET_SCHEDULER_TaskContext *tc);
220
221
222 /**
223  * Iterate over the expired items stored in the datastore.
224  * Delete all expired items; once we have processed all
225  * expired items, re-schedule the "delete_expired" task.
226  *
227  * @param cls not used
228  * @param next_cls closure to pass to the "next" function.
229  * @param key key for the content
230  * @param size number of bytes in data
231  * @param data content stored
232  * @param type type of the content
233  * @param priority priority of the content
234  * @param anonymity anonymity-level for the content
235  * @param expiration expiration time for the content
236  * @param uid unique identifier for the datum;
237  *        maybe 0 if no unique identifier is available
238  *
239  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
240  *         (continue on call to "next", of course),
241  *         GNUNET_NO to delete the item and continue (if supported)
242  */
243 static int 
244 expired_processor (void *cls,
245                    void *next_cls,
246                    const GNUNET_HashCode * key,
247                    uint32_t size,
248                    const void *data,
249                    uint32_t type,
250                    uint32_t priority,
251                    uint32_t anonymity,
252                    struct GNUNET_TIME_Absolute
253                    expiration, 
254                    uint64_t uid)
255 {
256   struct GNUNET_TIME_Absolute now;
257
258   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
259   if (key == NULL) 
260     {
261       expired_kill_task 
262         = GNUNET_SCHEDULER_add_delayed (sched,
263                                         GNUNET_NO,
264                                         GNUNET_SCHEDULER_PRIORITY_IDLE,
265                                         GNUNET_SCHEDULER_NO_TASK,
266                                         MAX_EXPIRE_DELAY,
267                                         &delete_expired,
268                                         NULL);
269       return GNUNET_SYSERR;
270     }
271   now = GNUNET_TIME_absolute_get ();
272   if (expiration.value > now.value)
273     {
274       /* finished processing */
275       plugin->api->next_request (next_cls, GNUNET_YES);
276       return GNUNET_SYSERR;
277     }
278   plugin->api->next_request (next_cls, GNUNET_NO);
279 #if DEBUG_DATASTORE
280   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
281               "Deleting content that expired %llu ms ago\n",
282               (unsigned long long) (now.value - expiration.value));
283 #endif
284   GNUNET_CONTAINER_bloomfilter_remove (filter,
285                                        key);
286   return GNUNET_NO; /* delete */
287 }
288
289
290 /**
291  * Task that is used to remove expired entries from
292  * the datastore.  This task will schedule itself
293  * again automatically to always delete all expired
294  * content quickly.
295  *
296  * @param cls not used
297  * @param tc task context
298  */ 
299 static void
300 delete_expired (void *cls,
301                 const struct GNUNET_SCHEDULER_TaskContext *tc)
302 {
303   plugin->api->iter_ascending_expiration (plugin->api->cls, 
304                                           0,
305                                           &expired_processor,
306                                           NULL);
307 }
308
309
310 /**
311  * An iterator over a set of items stored in the datastore.
312  *
313  * @param cls closure
314  * @param next_cls closure to pass to the "next" function.
315  * @param key key for the content
316  * @param size number of bytes in data
317  * @param data content stored
318  * @param type type of the content
319  * @param priority priority of the content
320  * @param anonymity anonymity-level for the content
321  * @param expiration expiration time for the content
322  * @param uid unique identifier for the datum;
323  *        maybe 0 if no unique identifier is available
324  *
325  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
326  *         (continue on call to "next", of course),
327  *         GNUNET_NO to delete the item and continue (if supported)
328  */
329 static int 
330 manage (void *cls,
331         void *next_cls,
332         const GNUNET_HashCode * key,
333         uint32_t size,
334         const void *data,
335         uint32_t type,
336         uint32_t priority,
337         uint32_t anonymity,
338         struct GNUNET_TIME_Absolute
339         expiration, 
340         uint64_t uid)
341 {
342   unsigned long long *need = cls;
343
344   if (NULL == key)
345     {
346       GNUNET_free (need);
347       return GNUNET_SYSERR;
348     }
349   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
350     *need = 0;
351   else
352     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
353   plugin->api->next_request (next_cls, 
354                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
355 #if DEBUG_DATASTORE
356   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
357               "Deleting %llu bytes of low-priority content (still trying to free another %llu bytes)\n",
358               size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
359               *need);
360 #endif
361   GNUNET_CONTAINER_bloomfilter_remove (filter,
362                                        key);
363   return GNUNET_NO;
364 }
365
366
367 /**
368  * Manage available disk space by running tasks
369  * that will discard content if necessary.  This
370  * function will be run whenever a request for
371  * "need" bytes of storage could only be satisfied
372  * by eating into the "cache" (and we want our cache
373  * space back).
374  *
375  * @param need number of bytes of content that were
376  *        placed into the "cache" (and hence the
377  *        number of bytes that should be removed).
378  */
379 static void
380 manage_space (unsigned long long need)
381 {
382   unsigned long long *n;
383
384 #if DEBUG_DATASTORE
385   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
386               "Asked to free up %llu bytes of cache space\n",
387               need);
388 #endif
389   n = GNUNET_malloc (sizeof(unsigned long long));
390   *n = need;
391   plugin->api->iter_low_priority (plugin->api->cls,
392                                   0,
393                                   &manage,
394                                   n);
395 }
396
397
398 /**
399  * Function called to notify a client about the socket
400  * begin ready to queue more data.  "buf" will be
401  * NULL and "size" zero if the socket was closed for
402  * writing in the meantime.
403  *
404  * @param cls closure
405  * @param size number of bytes available in buf
406  * @param buf where the callee should write the message
407  * @return number of bytes written to buf
408  */
409 static size_t
410 transmit_callback (void *cls,
411                    size_t size, void *buf)
412 {
413   struct TransmitCallbackContext *tcc = cls;
414   size_t msize;
415   
416   msize = ntohs(tcc->msg->size);
417   if (size == 0)
418     {
419 #if DEBUG_DATASTORE
420       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
421                   "Transmission failed.\n");
422 #endif
423       if (tcc->tc != NULL)
424         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
425       if (GNUNET_YES == tcc->end)
426         {
427           GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
428         }
429       GNUNET_free (tcc->msg);
430       GNUNET_free (tcc);
431       return 0;
432     }
433   GNUNET_assert (size >= msize);
434   memcpy (buf, tcc->msg, msize);
435   if (tcc->tc != NULL)
436     tcc->tc (tcc->tc_cls, GNUNET_OK);
437   if (GNUNET_YES == tcc->end)
438     {
439       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
440     }
441   else
442     {
443 #if DEBUG_DATASTORE
444       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
445                   "Response transmitted, more pending!\n");
446 #endif
447     }
448   GNUNET_free (tcc->msg);
449   GNUNET_free (tcc);
450   return msize;
451 }
452
453
454 /**
455  * Transmit the given message to the client.
456  *
457  * @param client target of the message
458  * @param msg message to transmit, will be freed!
459  * @param tc function to call afterwards
460  * @param tc_cls closure for tc
461  * @param end is this the last response (and we should
462  *        signal the server completion accodingly after
463  *        transmitting this message)?
464  */
465 static void
466 transmit (struct GNUNET_SERVER_Client *client,
467           struct GNUNET_MessageHeader *msg,
468           TransmitContinuation tc,
469           void *tc_cls,
470           int end)
471 {
472   struct TransmitCallbackContext *tcc;
473
474   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
475   tcc->msg = msg;
476   tcc->client = client;
477   tcc->tc = tc;
478   tcc->tc_cls = tc_cls;
479   tcc->end = end;
480
481   if (NULL ==
482       GNUNET_SERVER_notify_transmit_ready (client,
483                                            ntohs(msg->size),
484                                            GNUNET_TIME_UNIT_FOREVER_REL,
485                                            &transmit_callback,
486                                            tcc))
487     {
488       GNUNET_break (0);
489       if (GNUNET_YES == end)
490         {
491 #if DEBUG_DATASTORE
492           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
493                       "Disconnecting client.\n");
494 #endif    
495           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
496         }
497       if (NULL != tc)
498         tc (tc_cls, GNUNET_SYSERR);
499       GNUNET_free (msg);
500       GNUNET_free (tcc);
501     }
502 }
503
504
505 /**
506  * Transmit a status code to the client.
507  *
508  * @param client receiver of the response
509  * @param code status code
510  * @param msg optional error message (can be NULL)
511  */
512 static void
513 transmit_status (struct GNUNET_SERVER_Client *client,
514                  int code,
515                  const char *msg)
516 {
517   struct StatusMessage *sm;
518   size_t slen;
519
520 #if DEBUG_DATASTORE
521   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
522               "Transmitting `%s' message with value %d and message `%s'\n",
523               "STATUS",
524               code,
525               msg != NULL ? msg : "(none)");
526 #endif
527   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
528   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
529   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
530   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
531   sm->status = htonl(code);
532   if (slen > 0)
533     memcpy (&sm[1], msg, slen);  
534   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
535 }
536
537
538 /**
539  * Function called once the transmit operation has
540  * either failed or succeeded.
541  *
542  * @param next_cls closure for calling "next_request" callback
543  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
544  */
545 static void 
546 get_next(void *next_cls,
547          int status)
548 {
549   if (status != GNUNET_OK)
550     {
551       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
552                   _("Failed to transmit an item to the client; aborting iteration.\n"));    
553       plugin->api->next_request (next_cls, GNUNET_YES);
554       return;
555     }
556   plugin->api->next_request (next_cls, GNUNET_NO);
557 }
558
559
560 /**
561  * Function that will transmit the given datastore entry
562  * to the client.
563  *
564  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
565  * @param next_cls closure to use to ask for the next item
566  * @param key key for the content
567  * @param size number of bytes in data
568  * @param data content stored
569  * @param type type of the content
570  * @param priority priority of the content
571  * @param anonymity anonymity-level for the content
572  * @param expiration expiration time for the content
573  * @param uid unique identifier for the datum;
574  *        maybe 0 if no unique identifier is available
575  *
576  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
577  *         GNUNET_NO to delete the item and continue (if supported)
578  */
579 static int
580 transmit_item (void *cls,
581                void *next_cls,
582                const GNUNET_HashCode * key,
583                uint32_t size,
584                const void *data,
585                uint32_t type,
586                uint32_t priority,
587                uint32_t anonymity,
588                struct GNUNET_TIME_Absolute
589                expiration, uint64_t uid)
590 {
591   struct GNUNET_SERVER_Client *client = cls;
592   struct GNUNET_MessageHeader *end;
593   struct DataMessage *dm;
594
595   if (key == NULL)
596     {
597       /* transmit 'DATA_END' */
598 #if DEBUG_DATASTORE
599       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
600                   "Transmitting `%s' message\n",
601                   "DATA_END");
602 #endif
603       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
604       end->size = htons(sizeof(struct GNUNET_MessageHeader));
605       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
606       transmit (client, end, NULL, NULL, GNUNET_YES);
607       GNUNET_SERVER_client_drop (client);
608       return GNUNET_OK;
609     }
610   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
611   dm->header.size = htons(sizeof(struct DataMessage) + size);
612   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
613   dm->rid = htonl(0);
614   dm->size = htonl(size);
615   dm->type = htonl(type);
616   dm->priority = htonl(priority);
617   dm->anonymity = htonl(anonymity);
618   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
619   dm->uid = GNUNET_htonll(uid);
620   dm->key = *key;
621   memcpy (&dm[1], data, size);
622 #if DEBUG_DATASTORE
623   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
624               "Transmitting `%s' message\n",
625               "DATA");
626 #endif
627   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
628   return GNUNET_OK;
629 }
630
631
632 /**
633  * Handle RESERVE-message.
634  *
635  * @param cls closure
636  * @param client identification of the client
637  * @param message the actual message
638  */
639 static void
640 handle_reserve (void *cls,
641                 struct GNUNET_SERVER_Client *client,
642                 const struct GNUNET_MessageHeader *message)
643 {
644   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
645   struct ReservationList *e;
646   unsigned long long used;
647   unsigned long long req;
648   uint64_t amount;
649   uint32_t entries;
650
651 #if DEBUG_DATASTORE
652   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
653               "Processing `%s' request\n",
654               "RESERVE");
655 #endif
656   amount = GNUNET_ntohll(msg->amount);
657   entries = ntohl(msg->entries);
658   used = plugin->api->get_size (plugin->api->cls) + reserved;
659   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
660   if (used + req > quota)
661     {
662       if (quota < used)
663         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
664       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
665                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
666                   quota - used,
667                   "RESERVE",
668                   req);
669       if (cache_size < req)
670         {
671           /* TODO: document this in the FAQ; essentially, if this
672              message happens, the insertion request could be blocked
673              by less-important content from migration because it is
674              larger than 1/8th of the overall available space, and
675              we only reserve 1/8th for "fresh" insertions */
676           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
677                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
678                       req,
679                       cache_size);
680           transmit_status (client, 0, 
681                            gettext_noop ("Insufficient space to satisfy request and "
682                                          "requested amount is larger than cache size"));
683         }
684       else
685         {
686           transmit_status (client, 0, 
687                            gettext_noop ("Insufficient space to satisfy request"));
688         }
689       return;      
690     }
691   reserved += req;
692   e = GNUNET_malloc (sizeof(struct ReservationList));
693   e->next = reservations;
694   reservations = e;
695   e->client = client;
696   e->amount = amount;
697   e->entries = entries;
698   e->rid = ++reservation_gen;
699   if (reservation_gen < 0)
700     reservation_gen = 0; /* wrap around */
701   transmit_status (client, e->rid, NULL);
702 }
703
704
705 /**
706  * Handle RELEASE_RESERVE-message.
707  *
708  * @param cls closure
709  * @param client identification of the client
710  * @param message the actual message
711  */
712 static void
713 handle_release_reserve (void *cls,
714                         struct GNUNET_SERVER_Client *client,
715                         const struct GNUNET_MessageHeader *message)
716 {
717   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
718   struct ReservationList *pos;
719   struct ReservationList *prev;
720   struct ReservationList *next;
721   int rid = ntohl(msg->rid);
722   unsigned long long rem;
723
724 #if DEBUG_DATASTORE
725   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
726               "Processing `%s' request\n",
727               "RELEASE_RESERVE");
728 #endif
729   next = reservations;
730   prev = NULL;
731   while (NULL != (pos = next))
732     {
733       next = pos->next;
734       if (rid == pos->rid)
735         {
736           if (prev == NULL)
737             reservations = next;
738           else
739             prev->next = next;
740           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
741           GNUNET_assert (reserved >= rem);
742           reserved -= rem;
743 #if DEBUG_DATASTORE
744           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
745                       "Returning %llu remaining reserved bytes to storage pool\n",
746                       rem);
747 #endif    
748           GNUNET_free (pos);
749           transmit_status (client, GNUNET_OK, NULL);
750           return;
751         }       
752       prev = pos;
753     }
754   GNUNET_break (0);
755   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
756 }
757
758
759 /**
760  * Check that the given message is a valid data message.
761  *
762  * @return NULL if the message is not well-formed, otherwise the message
763  */
764 static const struct DataMessage *
765 check_data (const struct GNUNET_MessageHeader *message)
766 {
767   uint16_t size;
768   uint32_t dsize;
769   const struct DataMessage *dm;
770
771   size = ntohs(message->size);
772   if (size < sizeof(struct DataMessage))
773     { 
774       GNUNET_break (0);
775       return NULL;
776     }
777   dm = (const struct DataMessage *) message;
778   dsize = ntohl(dm->size);
779   if (size != dsize + sizeof(struct DataMessage))
780     {
781       GNUNET_break (0);
782       return NULL;
783     }
784   return dm;
785 }
786
787
788 /**
789  * Handle PUT-message.
790  *
791  * @param cls closure
792  * @param client identification of the client
793  * @param message the actual message
794  */
795 static void
796 handle_put (void *cls,
797             struct GNUNET_SERVER_Client *client,
798             const struct GNUNET_MessageHeader *message)
799 {
800   const struct DataMessage *dm = check_data (message);
801   char *msg;
802   int ret;
803   int rid;
804   struct ReservationList *pos;
805   uint32_t size;
806
807 #if DEBUG_DATASTORE
808   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
809               "Processing `%s' request\n",
810               "PUT");
811 #endif
812   if (ntohl(dm->type) == 0) 
813     {
814       GNUNET_break (0);
815       dm = NULL;
816     }
817   if (dm == NULL)
818     {
819       GNUNET_break (0);
820       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
821       return;
822     }
823   rid = ntohl(dm->rid);
824   size = ntohl(dm->size);
825   if (rid > 0)
826     {
827       pos = reservations;
828       while ( (NULL != pos) &&
829               (rid != pos->rid) )
830         pos = pos->next;
831       GNUNET_break (pos != NULL);
832       if (NULL != pos)
833         {
834           GNUNET_break (pos->entries > 0);
835           GNUNET_break (pos->amount > size);
836           pos->entries--;
837           pos->amount -= size;
838           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
839         }
840     }
841   msg = NULL;
842   ret = plugin->api->put (plugin->api->cls,
843                           &dm->key,
844                           size,
845                           &dm[1],
846                           ntohl(dm->type),
847                           ntohl(dm->priority),
848                           ntohl(dm->anonymity),
849                           GNUNET_TIME_absolute_ntoh(dm->expiration),
850                           &msg);
851   if (GNUNET_OK == ret)
852     {
853       GNUNET_CONTAINER_bloomfilter_add (filter,
854                                         &dm->key);
855 #if DEBUG_DATASTORE
856       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857                   "Successfully stored %u bytes under key `%s'\n",
858                   size,
859                   GNUNET_h2s (&dm->key));
860 #endif
861     }
862   transmit_status (client, 
863                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
864                    msg);
865   GNUNET_free_non_null (msg);
866   if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
867     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
868 }
869
870
871 /**
872  * Handle GET-message.
873  *
874  * @param cls closure
875  * @param client identification of the client
876  * @param message the actual message
877  */
878 static void
879 handle_get (void *cls,
880              struct GNUNET_SERVER_Client *client,
881              const struct GNUNET_MessageHeader *message)
882 {
883   const struct GetMessage *msg;
884   uint16_t size;
885
886 #if DEBUG_DATASTORE
887   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
888               "Processing `%s' request\n",
889               "GET");
890 #endif
891   size = ntohs(message->size);
892   if ( (size != sizeof(struct GetMessage)) &&
893        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
894     {
895       GNUNET_break (0);
896       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
897       return;
898     }
899   msg = (const struct GetMessage*) message;
900   if ( (size == sizeof(struct GetMessage)) &&
901        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
902                                                          &msg->key)) )
903     {
904       /* don't bother database... */
905 #if DEBUG_DATASTORE
906       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
907                   "Empty result set for `%s' request for `%s'.\n",
908                   "GET",
909                   GNUNET_h2s (&msg->key));
910 #endif  
911       GNUNET_SERVER_client_keep (client);
912       transmit_item (client,
913                      NULL, NULL, 0, NULL, 0, 0, 0, 
914                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
915       return;
916     }
917   GNUNET_SERVER_client_keep (client);
918   plugin->api->get (plugin->api->cls,
919                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
920                     NULL,
921                     ntohl(msg->type),
922                     &transmit_item,
923                     client);    
924 }
925
926
927 /**
928  * Handle UPDATE-message.
929  *
930  * @param cls closure
931  * @param client identification of the client
932  * @param message the actual message
933  */
934 static void
935 handle_update (void *cls,
936                struct GNUNET_SERVER_Client *client,
937                const struct GNUNET_MessageHeader *message)
938 {
939   const struct UpdateMessage *msg;
940   int ret;
941   char *emsg;
942
943 #if DEBUG_DATASTORE
944   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
945               "Processing `%s' request\n",
946               "UPDATE");
947 #endif
948   msg = (const struct UpdateMessage*) message;
949   emsg = NULL;
950   ret = plugin->api->update (plugin->api->cls,
951                              GNUNET_ntohll(msg->uid),
952                              (int32_t) ntohl(msg->priority),
953                              GNUNET_TIME_absolute_ntoh(msg->expiration),
954                              &emsg);
955   transmit_status (client, ret, emsg);
956   GNUNET_free_non_null (emsg);
957 }
958
959
960 /**
961  * Handle GET_RANDOM-message.
962  *
963  * @param cls closure
964  * @param client identification of the client
965  * @param message the actual message
966  */
967 static void
968 handle_get_random (void *cls,
969                    struct GNUNET_SERVER_Client *client,
970                    const struct GNUNET_MessageHeader *message)
971 {
972 #if DEBUG_DATASTORE
973   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
974               "Processing `%s' request\n",
975               "GET_RANDOM");
976 #endif
977   GNUNET_SERVER_client_keep (client);
978   plugin->api->iter_migration_order (plugin->api->cls,
979                                      0,
980                                      &transmit_item,
981                                      client);  
982 }
983
984
985 /**
986  * Context for the 'remove_callback'.
987  */
988 struct RemoveContext 
989 {
990   /**
991    * Client for whom we're doing the remvoing.
992    */
993   struct GNUNET_SERVER_Client *client;
994
995   /**
996    * GNUNET_YES if we managed to remove something.
997    */
998   int found;
999 };
1000
1001
1002 /**
1003  * Callback function that will cause the item that is passed
1004  * in to be deleted (by returning GNUNET_NO).
1005  */
1006 static int
1007 remove_callback (void *cls,
1008                  void *next_cls,
1009                  const GNUNET_HashCode * key,
1010                  uint32_t size,
1011                  const void *data,
1012                  uint32_t type,
1013                  uint32_t priority,
1014                  uint32_t anonymity,
1015                  struct GNUNET_TIME_Absolute
1016                  expiration, uint64_t uid)
1017 {
1018   struct RemoveContext *rc = cls;
1019
1020   if (key == NULL)
1021     {
1022 #if DEBUG_DATASTORE
1023       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1024                   "No further matches for `%s' request.\n",
1025                   "REMOVE");
1026 #endif  
1027       if (GNUNET_YES == rc->found)
1028         transmit_status (rc->client, GNUNET_OK, NULL);       
1029       else
1030         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1031       GNUNET_SERVER_client_drop (rc->client);
1032       GNUNET_free (rc);
1033       return GNUNET_OK; /* last item */
1034     }
1035   rc->found = GNUNET_YES;
1036 #if DEBUG_DATASTORE
1037   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038               "Item %llu matches `%s' request.\n",
1039               (unsigned long long) uid,
1040               "REMOVE");
1041 #endif  
1042   GNUNET_CONTAINER_bloomfilter_remove (filter,
1043                                        key);
1044   plugin->api->next_request (next_cls, GNUNET_YES);
1045   return GNUNET_NO;
1046 }
1047
1048
1049 /**
1050  * Handle REMOVE-message.
1051  *
1052  * @param cls closure
1053  * @param client identification of the client
1054  * @param message the actual message
1055  */
1056 static void
1057 handle_remove (void *cls,
1058              struct GNUNET_SERVER_Client *client,
1059              const struct GNUNET_MessageHeader *message)
1060 {
1061   const struct DataMessage *dm = check_data (message);
1062   GNUNET_HashCode vhash;
1063   struct RemoveContext *rc;
1064
1065 #if DEBUG_DATASTORE
1066   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1067               "Processing `%s' request\n",
1068               "REMOVE");
1069 #endif
1070   if (dm == NULL)
1071     {
1072       GNUNET_break (0);
1073       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1074       return;
1075     }
1076   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1077   GNUNET_SERVER_client_keep (client);
1078   rc->client = client;
1079   GNUNET_CRYPTO_hash (&dm[1],
1080                       ntohl(dm->size),
1081                       &vhash);
1082   plugin->api->get (plugin->api->cls,
1083                     &dm->key,
1084                     &vhash,
1085                     ntohl(dm->type),
1086                     &remove_callback,
1087                     rc);
1088 }
1089
1090
1091 /**
1092  * Handle DROP-message.
1093  *
1094  * @param cls closure
1095  * @param client identification of the client
1096  * @param message the actual message
1097  */
1098 static void
1099 handle_drop (void *cls,
1100              struct GNUNET_SERVER_Client *client,
1101              const struct GNUNET_MessageHeader *message)
1102 {
1103 #if DEBUG_DATASTORE
1104   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1105               "Processing `%s' request\n",
1106               "DROP");
1107 #endif
1108   plugin->api->drop (plugin->api->cls);
1109   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1110 }
1111
1112
1113 /**
1114  * List of handlers for the messages understood by this
1115  * service.
1116  */
1117 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1118   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1119    sizeof(struct ReserveMessage) }, 
1120   {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1121    sizeof(struct ReleaseReserveMessage) }, 
1122   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1123   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1124    sizeof (struct UpdateMessage) }, 
1125   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1126   {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1127    sizeof(struct GNUNET_MessageHeader) }, 
1128   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1129   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1130    sizeof(struct GNUNET_MessageHeader) }, 
1131   {NULL, NULL, 0, 0}
1132 };
1133
1134
1135
1136 /**
1137  * Load the datastore plugin.
1138  */
1139 static struct DatastorePlugin *
1140 load_plugin () 
1141 {
1142   struct DatastorePlugin *ret;
1143   char *libname;
1144   char *name;
1145
1146   if (GNUNET_OK !=
1147       GNUNET_CONFIGURATION_get_value_string (cfg,
1148                                              "DATASTORE", "DATABASE", &name))
1149     {
1150       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1151                   _("No `%s' specified for `%s' in configuration!\n"),
1152                   "DATABASE",
1153                   "DATASTORE");
1154       return NULL;
1155     }
1156   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1157   ret->env.cfg = cfg;
1158   ret->env.sched = sched;  
1159   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1160               _("Loading `%s' datastore plugin\n"), name);
1161   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1162   ret->short_name = name;
1163   ret->lib_name = libname;
1164   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1165   if (ret->api == NULL)
1166     {
1167       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1168                   _("Failed to load datastore plugin for `%s'\n"), name);
1169       GNUNET_free (ret->short_name);
1170       GNUNET_free (libname);
1171       GNUNET_free (ret);
1172       return NULL;
1173     }
1174   return ret;
1175 }
1176
1177
1178 /**
1179  * Function called when the service shuts
1180  * down.  Unloads our datastore plugin.
1181  *
1182  * @param plug plugin to unload
1183  */
1184 static void
1185 unload_plugin (struct DatastorePlugin *plug)
1186 {
1187 #if DEBUG_DATASTORE
1188   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1189               "Datastore service is unloading plugin...\n");
1190 #endif
1191   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1192   GNUNET_free (plug->lib_name);
1193   GNUNET_free (plug->short_name);
1194   GNUNET_free (plug);
1195 }
1196
1197
1198 /**
1199  * Last task run during shutdown.  Disconnects us from
1200  * the transport and core.
1201  */
1202 static void
1203 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1204 {
1205   unload_plugin (plugin);
1206   plugin = NULL;
1207   if (filter != NULL)
1208     {
1209       GNUNET_CONTAINER_bloomfilter_free (filter);
1210       filter = NULL;
1211     }
1212   GNUNET_ARM_stop_services (cfg, tc->sched, "statistics", NULL);
1213 }
1214
1215
1216 /**
1217  * Function that removes all active reservations made
1218  * by the given client and releases the space for other
1219  * requests.
1220  *
1221  * @param cls closure
1222  * @param client identification of the client
1223  */
1224 static void
1225 cleanup_reservations (void *cls,
1226                       struct GNUNET_SERVER_Client
1227                       * client)
1228 {
1229   struct ReservationList *pos;
1230   struct ReservationList *prev;
1231   struct ReservationList *next;
1232
1233   prev = NULL;
1234   pos = reservations;
1235   while (NULL != pos)
1236     {
1237       next = pos->next;
1238       if (pos->client == client)
1239         {
1240           if (prev == NULL)
1241             reservations = next;
1242           else
1243             prev->next = next;
1244           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1245           GNUNET_free (pos);
1246         }
1247       else
1248         {
1249           prev = pos;
1250         }
1251       pos = next;
1252     }
1253 }
1254
1255
1256 /**
1257  * Process datastore requests.
1258  *
1259  * @param cls closure
1260  * @param s scheduler to use
1261  * @param server the initialized server
1262  * @param c configuration to use
1263  */
1264 static void
1265 run (void *cls,
1266      struct GNUNET_SCHEDULER_Handle *s,
1267      struct GNUNET_SERVER_Handle *server,
1268      const struct GNUNET_CONFIGURATION_Handle *c)
1269 {
1270   char *fn;
1271   unsigned int bf_size;
1272
1273   sched = s;
1274   cfg = c;
1275   if (GNUNET_OK !=
1276       GNUNET_CONFIGURATION_get_value_number (cfg,
1277                                              "DATASTORE", "QUOTA", &quota))
1278     {
1279       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1280                   _("No `%s' specified for `%s' in configuration!\n"),
1281                   "QUOTA",
1282                   "DATASTORE");
1283       return;
1284     }
1285   cache_size = quota / 8; /* Or should we make this an option? */
1286   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1287   fn = NULL;
1288   if ( (GNUNET_OK !=
1289         GNUNET_CONFIGURATION_get_value_filename (cfg,
1290                                                  "DATASTORE",
1291                                                  "BLOOMFILTER",
1292                                                  &fn)) ||
1293        (GNUNET_OK !=
1294         GNUNET_DISK_directory_create_for_file (fn)) )
1295     {
1296       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1297                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1298                   fn != NULL ? fn : "");
1299       GNUNET_free_non_null (fn);
1300       fn = NULL;
1301     }
1302   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1303   GNUNET_free_non_null (fn);
1304   if (filter == NULL)
1305     {
1306       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1307                   _("Failed to initialize bloomfilter.\n"));
1308       return;
1309     }
1310   GNUNET_ARM_start_services (cfg, sched, "statistics", NULL);
1311   plugin = load_plugin ();
1312   if (NULL == plugin)
1313     {
1314       GNUNET_CONTAINER_bloomfilter_free (filter);
1315       filter = NULL;
1316       GNUNET_ARM_stop_services (cfg, sched, "statistics", NULL);
1317       return;
1318     }
1319   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1320   GNUNET_SERVER_add_handlers (server, handlers);
1321   expired_kill_task
1322     = GNUNET_SCHEDULER_add_delayed (sched,
1323                                     GNUNET_NO,
1324                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
1325                                     GNUNET_SCHEDULER_NO_TASK,
1326                                     GNUNET_TIME_UNIT_ZERO,
1327                                     &delete_expired, NULL);
1328   GNUNET_SCHEDULER_add_delayed (sched,
1329                                 GNUNET_YES,
1330                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
1331                                 GNUNET_SCHEDULER_NO_TASK,
1332                                 GNUNET_TIME_UNIT_FOREVER_REL,
1333                                 &cleaning_task, NULL);
1334   
1335 }
1336
1337
1338 /**
1339  * The main function for the datastore service.
1340  *
1341  * @param argc number of arguments from the command line
1342  * @param argv command line arguments
1343  * @return 0 ok, 1 on error
1344  */
1345 int
1346 main (int argc, char *const *argv)
1347 {
1348   int ret;
1349
1350   ret = (GNUNET_OK ==
1351          GNUNET_SERVICE_run (argc,
1352                              argv,
1353                              "datastore", &run, NULL, NULL, NULL)) ? 0 : 1;
1354   return ret;
1355 }
1356
1357
1358 /* end of gnunet-service-datastore.c */