gpl3
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "plugin_datastore.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45
46
47 /**
48  * Our datastore plugin.
49  */
50 struct DatastorePlugin
51 {
52
53   /**
54    * API of the transport as returned by the plugin's
55    * initialization function.
56    */
57   struct GNUNET_DATASTORE_PluginFunctions *api;
58
59   /**
60    * Short name for the plugin (i.e. "sqlite").
61    */
62   char *short_name;
63
64   /**
65    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
66    */
67   char *lib_name;
68
69   /**
70    * Environment this transport service is using
71    * for this plugin.
72    */
73   struct GNUNET_DATASTORE_PluginEnvironment env;
74
75 };
76
77
78 /**
79  * Linked list of active reservations.
80  */
81 struct ReservationList 
82 {
83
84   /**
85    * This is a linked list.
86    */
87   struct ReservationList *next;
88
89   /**
90    * Client that made the reservation.
91    */
92   struct GNUNET_SERVER_Client *client;
93
94   /**
95    * Number of bytes (still) reserved.
96    */
97   uint64_t amount;
98
99   /**
100    * Number of items (still) reserved.
101    */
102   uint64_t entries;
103
104   /**
105    * Reservation identifier.
106    */
107   int32_t rid;
108
109 };
110
111
112 /**
113  * Our datastore plugin (NULL if not available).
114  */
115 static struct DatastorePlugin *plugin;
116
117 /**
118  * Linked list of space reservations made by clients.
119  */
120 static struct ReservationList *reservations;
121
122 /**
123  * Bloomfilter to quickly tell if we don't have the content.
124  */
125 static struct GNUNET_CONTAINER_BloomFilter *filter;
126
127 /**
128  * How much space are we allowed to use?
129  */
130 static unsigned long long quota;
131
132 /**
133  * How much space are we using for the cache?  (space available for
134  * insertions that will be instantly reclaimed by discarding less
135  * important content --- or possibly whatever we just inserted into
136  * the "cache").
137  */
138 static unsigned long long cache_size;
139
140 /**
141  * How much space have we currently reserved?
142  */
143 static unsigned long long reserved;
144
145 /**
146  * Identity of the task that is used to delete
147  * expired content.
148  */
149 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
150
151 /**
152  * Our configuration.
153  */
154 const struct GNUNET_CONFIGURATION_Handle *cfg;
155
156 /**
157  * Our scheduler.
158  */
159 struct GNUNET_SCHEDULER_Handle *sched; 
160
161 /**
162  * Handle for reporting statistics.
163  */
164 static struct GNUNET_STATISTICS_Handle *stats;
165
166
167 /**
168  * Function called once the transmit operation has
169  * either failed or succeeded.
170  *
171  * @param cls closure
172  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
173  */
174 typedef void (*TransmitContinuation)(void *cls,
175                                      int status);
176
177
178 /**
179  * Context for transmitting replies to clients.
180  */
181 struct TransmitCallbackContext 
182 {
183   
184   /**
185    * We keep these in a doubly-linked list (for cleanup).
186    */
187   struct TransmitCallbackContext *next;
188   
189   /**
190    * We keep these in a doubly-linked list (for cleanup).
191    */
192   struct TransmitCallbackContext *prev;
193   
194   /**
195    * The message that we're asked to transmit.
196    */
197   struct GNUNET_MessageHeader *msg;
198   
199   /**
200    * Handle for the transmission request.
201    */
202   struct GNUNET_CONNECTION_TransmitHandle *th;
203
204   /**
205    * Client that we are transmitting to.
206    */
207   struct GNUNET_SERVER_Client *client;
208
209   /**
210    * Function to call once msg has been transmitted
211    * (or at least added to the buffer).
212    */
213   TransmitContinuation tc;
214
215   /**
216    * Closure for tc.
217    */
218   void *tc_cls;
219
220   /**
221    * GNUNET_YES if we are supposed to signal the server
222    * completion of the client's request.
223    */
224   int end;
225 };
226
227   
228 /**
229  * Head of the doubly-linked list (for cleanup).
230  */
231 static struct TransmitCallbackContext *tcc_head;
232
233 /**
234  * Tail of the doubly-linked list (for cleanup).
235  */
236 static struct TransmitCallbackContext *tcc_tail;
237
238 /**
239  * Have we already cleaned up the TCCs and are hence no longer
240  * willing (or able) to transmit anything to anyone?
241  */
242 static int cleaning_done;
243
244 /**
245  * Task that is used to remove expired entries from
246  * the datastore.  This task will schedule itself
247  * again automatically to always delete all expired
248  * content quickly.
249  *
250  * @param cls not used
251  * @param tc task context
252  */ 
253 static void
254 delete_expired (void *cls,
255                 const struct GNUNET_SCHEDULER_TaskContext *tc);
256
257
258 /**
259  * Iterate over the expired items stored in the datastore.
260  * Delete all expired items; once we have processed all
261  * expired items, re-schedule the "delete_expired" task.
262  *
263  * @param cls not used
264  * @param next_cls closure to pass to the "next" function.
265  * @param key key for the content
266  * @param size number of bytes in data
267  * @param data content stored
268  * @param type type of the content
269  * @param priority priority of the content
270  * @param anonymity anonymity-level for the content
271  * @param expiration expiration time for the content
272  * @param uid unique identifier for the datum;
273  *        maybe 0 if no unique identifier is available
274  *
275  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
276  *         (continue on call to "next", of course),
277  *         GNUNET_NO to delete the item and continue (if supported)
278  */
279 static int 
280 expired_processor (void *cls,
281                    void *next_cls,
282                    const GNUNET_HashCode * key,
283                    uint32_t size,
284                    const void *data,
285                    enum GNUNET_BLOCK_Type type,
286                    uint32_t priority,
287                    uint32_t anonymity,
288                    struct GNUNET_TIME_Absolute
289                    expiration, 
290                    uint64_t uid)
291 {
292   struct GNUNET_TIME_Absolute now;
293
294   if (key == NULL) 
295     {
296       expired_kill_task 
297         = GNUNET_SCHEDULER_add_delayed (sched,
298                                         MAX_EXPIRE_DELAY,
299                                         &delete_expired,
300                                         NULL);
301       return GNUNET_SYSERR;
302     }
303   now = GNUNET_TIME_absolute_get ();
304   if (expiration.value > now.value)
305     {
306       /* finished processing */
307       plugin->api->next_request (next_cls, GNUNET_YES);
308       return GNUNET_SYSERR;
309     }
310   plugin->api->next_request (next_cls, GNUNET_NO);
311 #if DEBUG_DATASTORE
312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
313               "Deleting content `%s' of type %u that expired %llu ms ago\n",
314               GNUNET_h2s (key),
315               type,
316               (unsigned long long) (now.value - expiration.value));
317 #endif
318   GNUNET_STATISTICS_update (stats,
319                             gettext_noop ("# bytes expired"),
320                             size,
321                             GNUNET_YES);
322   GNUNET_CONTAINER_bloomfilter_remove (filter,
323                                        key);
324   return GNUNET_NO; /* delete */
325 }
326
327
328 /**
329  * Task that is used to remove expired entries from
330  * the datastore.  This task will schedule itself
331  * again automatically to always delete all expired
332  * content quickly.
333  *
334  * @param cls not used
335  * @param tc task context
336  */ 
337 static void
338 delete_expired (void *cls,
339                 const struct GNUNET_SCHEDULER_TaskContext *tc)
340 {
341   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
342   plugin->api->iter_ascending_expiration (plugin->api->cls, 
343                                           0,
344                                           &expired_processor,
345                                           NULL);
346 }
347
348
349 /**
350  * An iterator over a set of items stored in the datastore.
351  *
352  * @param cls closure
353  * @param next_cls closure to pass to the "next" function.
354  * @param key key for the content
355  * @param size number of bytes in data
356  * @param data content stored
357  * @param type type of the content
358  * @param priority priority of the content
359  * @param anonymity anonymity-level for the content
360  * @param expiration expiration time for the content
361  * @param uid unique identifier for the datum;
362  *        maybe 0 if no unique identifier is available
363  *
364  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
365  *         (continue on call to "next", of course),
366  *         GNUNET_NO to delete the item and continue (if supported)
367  */
368 static int 
369 manage (void *cls,
370         void *next_cls,
371         const GNUNET_HashCode * key,
372         uint32_t size,
373         const void *data,
374         enum GNUNET_BLOCK_Type type,
375         uint32_t priority,
376         uint32_t anonymity,
377         struct GNUNET_TIME_Absolute
378         expiration, 
379         uint64_t uid)
380 {
381   unsigned long long *need = cls;
382
383   if (NULL == key)
384     {
385       GNUNET_free (need);
386       return GNUNET_SYSERR;
387     }
388   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
389     *need = 0;
390   else
391     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
392   plugin->api->next_request (next_cls, 
393                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
394 #if DEBUG_DATASTORE
395   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
396               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
397               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
398               GNUNET_h2s (key),
399               type,
400               *need);
401 #endif
402   GNUNET_STATISTICS_update (stats,
403                             gettext_noop ("# bytes purged (low-priority)"),
404                             size,
405                             GNUNET_YES);
406   GNUNET_CONTAINER_bloomfilter_remove (filter,
407                                        key);
408   return GNUNET_NO;
409 }
410
411
412 /**
413  * Manage available disk space by running tasks
414  * that will discard content if necessary.  This
415  * function will be run whenever a request for
416  * "need" bytes of storage could only be satisfied
417  * by eating into the "cache" (and we want our cache
418  * space back).
419  *
420  * @param need number of bytes of content that were
421  *        placed into the "cache" (and hence the
422  *        number of bytes that should be removed).
423  */
424 static void
425 manage_space (unsigned long long need)
426 {
427   unsigned long long *n;
428
429 #if DEBUG_DATASTORE
430   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
431               "Asked to free up %llu bytes of cache space\n",
432               need);
433 #endif
434   n = GNUNET_malloc (sizeof(unsigned long long));
435   *n = need;
436   plugin->api->iter_low_priority (plugin->api->cls,
437                                   0,
438                                   &manage,
439                                   n);
440 }
441
442
443 /**
444  * Function called to notify a client about the socket
445  * begin ready to queue more data.  "buf" will be
446  * NULL and "size" zero if the socket was closed for
447  * writing in the meantime.
448  *
449  * @param cls closure
450  * @param size number of bytes available in buf
451  * @param buf where the callee should write the message
452  * @return number of bytes written to buf
453  */
454 static size_t
455 transmit_callback (void *cls,
456                    size_t size, void *buf)
457 {
458   struct TransmitCallbackContext *tcc = cls;
459   size_t msize;
460   
461   tcc->th = NULL;
462   GNUNET_CONTAINER_DLL_remove (tcc_head,
463                                tcc_tail,
464                                tcc);
465   msize = ntohs(tcc->msg->size);
466   if (size == 0)
467     {
468 #if DEBUG_DATASTORE
469       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
470                   "Transmission failed.\n");
471 #endif
472       if (tcc->tc != NULL)
473         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
474       if (GNUNET_YES == tcc->end)
475         GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
476       GNUNET_SERVER_client_drop (tcc->client);
477       GNUNET_free (tcc->msg);
478       GNUNET_free (tcc);
479       return 0;
480     }
481   GNUNET_assert (size >= msize);
482   memcpy (buf, tcc->msg, msize);
483   if (tcc->tc != NULL)
484     tcc->tc (tcc->tc_cls, GNUNET_OK);
485   if (GNUNET_YES == tcc->end)
486     {
487       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
488     }
489   else
490     {
491 #if DEBUG_DATASTORE
492       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
493                   "Response transmitted, more pending!\n");
494 #endif
495     }
496   GNUNET_SERVER_client_drop (tcc->client);
497   GNUNET_free (tcc->msg);
498   GNUNET_free (tcc);
499   return msize;
500 }
501
502
503 /**
504  * Transmit the given message to the client.
505  *
506  * @param client target of the message
507  * @param msg message to transmit, will be freed!
508  * @param tc function to call afterwards
509  * @param tc_cls closure for tc
510  * @param end is this the last response (and we should
511  *        signal the server completion accodingly after
512  *        transmitting this message)?
513  */
514 static void
515 transmit (struct GNUNET_SERVER_Client *client,
516           struct GNUNET_MessageHeader *msg,
517           TransmitContinuation tc,
518           void *tc_cls,
519           int end)
520 {
521   struct TransmitCallbackContext *tcc;
522
523   if (GNUNET_YES == cleaning_done)
524     {
525 #if DEBUG_DATASTORE
526       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
527                   "Shutdown in progress, aborting transmission.\n");
528 #endif
529       GNUNET_free (msg);
530       if (NULL != tc)
531         tc (tc_cls, GNUNET_SYSERR);
532       return;
533     }
534   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
535   tcc->msg = msg;
536   tcc->client = client;
537   tcc->tc = tc;
538   tcc->tc_cls = tc_cls;
539   tcc->end = end;
540   if (NULL ==
541       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
542                                                       ntohs(msg->size),
543                                                       GNUNET_TIME_UNIT_FOREVER_REL,
544                                                       &transmit_callback,
545                                                       tcc)))
546     {
547       GNUNET_break (0);
548       if (GNUNET_YES == end)
549         {
550 #if DEBUG_DATASTORE
551           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
552                       "Disconnecting client.\n");
553 #endif    
554           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
555         }
556       if (NULL != tc)
557         tc (tc_cls, GNUNET_SYSERR);
558       GNUNET_free (msg);
559       GNUNET_free (tcc);
560       return;
561     }
562   GNUNET_SERVER_client_keep (client);
563   GNUNET_CONTAINER_DLL_insert (tcc_head,
564                                tcc_tail,
565                                tcc);
566 }
567
568
569 /**
570  * Transmit a status code to the client.
571  *
572  * @param client receiver of the response
573  * @param code status code
574  * @param msg optional error message (can be NULL)
575  */
576 static void
577 transmit_status (struct GNUNET_SERVER_Client *client,
578                  int code,
579                  const char *msg)
580 {
581   struct StatusMessage *sm;
582   size_t slen;
583
584 #if DEBUG_DATASTORE
585   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586               "Transmitting `%s' message with value %d and message `%s'\n",
587               "STATUS",
588               code,
589               msg != NULL ? msg : "(none)");
590 #endif
591   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
592   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
593   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
594   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
595   sm->status = htonl(code);
596   if (slen > 0)
597     memcpy (&sm[1], msg, slen);  
598   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
599 }
600
601
602 /**
603  * Function called once the transmit operation has
604  * either failed or succeeded.
605  *
606  * @param next_cls closure for calling "next_request" callback
607  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
608  */
609 static void 
610 get_next(void *next_cls,
611          int status)
612 {
613   if (status != GNUNET_OK)
614     {
615       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
616                   _("Failed to transmit an item to the client; aborting iteration.\n"));
617       if (plugin != NULL)
618         plugin->api->next_request (next_cls, GNUNET_YES);
619       return;
620     }
621   plugin->api->next_request (next_cls, GNUNET_NO);
622 }
623
624
625 /**
626  * Function that will transmit the given datastore entry
627  * to the client.
628  *
629  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
630  * @param next_cls closure to use to ask for the next item
631  * @param key key for the content
632  * @param size number of bytes in data
633  * @param data content stored
634  * @param type type of the content
635  * @param priority priority of the content
636  * @param anonymity anonymity-level for the content
637  * @param expiration expiration time for the content
638  * @param uid unique identifier for the datum;
639  *        maybe 0 if no unique identifier is available
640  *
641  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
642  *         GNUNET_NO to delete the item and continue (if supported)
643  */
644 static int
645 transmit_item (void *cls,
646                void *next_cls,
647                const GNUNET_HashCode * key,
648                uint32_t size,
649                const void *data,
650                enum GNUNET_BLOCK_Type type,
651                uint32_t priority,
652                uint32_t anonymity,
653                struct GNUNET_TIME_Absolute
654                expiration, uint64_t uid)
655 {
656   struct GNUNET_SERVER_Client *client = cls;
657   struct GNUNET_MessageHeader *end;
658   struct DataMessage *dm;
659
660   if (key == NULL)
661     {
662       /* transmit 'DATA_END' */
663 #if DEBUG_DATASTORE
664       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
665                   "Transmitting `%s' message\n",
666                   "DATA_END");
667 #endif
668       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
669       end->size = htons(sizeof(struct GNUNET_MessageHeader));
670       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
671       transmit (client, end, NULL, NULL, GNUNET_YES);
672       GNUNET_SERVER_client_drop (client);
673       return GNUNET_OK;
674     }
675   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
676   dm->header.size = htons(sizeof(struct DataMessage) + size);
677   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
678   dm->rid = htonl(0);
679   dm->size = htonl(size);
680   dm->type = htonl(type);
681   dm->priority = htonl(priority);
682   dm->anonymity = htonl(anonymity);
683   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
684   dm->uid = GNUNET_htonll(uid);
685   dm->key = *key;
686   memcpy (&dm[1], data, size);
687 #if DEBUG_DATASTORE
688   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
689               "Transmitting `%s' message for `%s' of type %u\n",
690               "DATA",
691               GNUNET_h2s (key),
692               type);
693 #endif
694   GNUNET_STATISTICS_update (stats,
695                             gettext_noop ("# results found"),
696                             1,
697                             GNUNET_NO);
698   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
699   return GNUNET_OK;
700 }
701
702
703 /**
704  * Handle RESERVE-message.
705  *
706  * @param cls closure
707  * @param client identification of the client
708  * @param message the actual message
709  */
710 static void
711 handle_reserve (void *cls,
712                 struct GNUNET_SERVER_Client *client,
713                 const struct GNUNET_MessageHeader *message)
714 {
715   /**
716    * Static counter to produce reservation identifiers.
717    */
718   static int reservation_gen;
719
720   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
721   struct ReservationList *e;
722   unsigned long long used;
723   unsigned long long req;
724   uint64_t amount;
725   uint32_t entries;
726
727 #if DEBUG_DATASTORE
728   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
729               "Processing `%s' request\n",
730               "RESERVE");
731 #endif
732   amount = GNUNET_ntohll(msg->amount);
733   entries = ntohl(msg->entries);
734   used = plugin->api->get_size (plugin->api->cls) + reserved;
735   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
736   if (used + req > quota)
737     {
738       if (quota < used)
739         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
740       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
741                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
742                   quota - used,
743                   "RESERVE",
744                   req);
745       if (cache_size < req)
746         {
747           /* TODO: document this in the FAQ; essentially, if this
748              message happens, the insertion request could be blocked
749              by less-important content from migration because it is
750              larger than 1/8th of the overall available space, and
751              we only reserve 1/8th for "fresh" insertions */
752           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
753                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
754                       req,
755                       cache_size);
756           transmit_status (client, 0, 
757                            gettext_noop ("Insufficient space to satisfy request and "
758                                          "requested amount is larger than cache size"));
759         }
760       else
761         {
762           transmit_status (client, 0, 
763                            gettext_noop ("Insufficient space to satisfy request"));
764         }
765       return;      
766     }
767   reserved += req;
768   GNUNET_STATISTICS_set (stats,
769                          gettext_noop ("# reserved"),
770                          reserved,
771                          GNUNET_NO);
772   e = GNUNET_malloc (sizeof(struct ReservationList));
773   e->next = reservations;
774   reservations = e;
775   e->client = client;
776   e->amount = amount;
777   e->entries = entries;
778   e->rid = ++reservation_gen;
779   if (reservation_gen < 0)
780     reservation_gen = 0; /* wrap around */
781   transmit_status (client, e->rid, NULL);
782 }
783
784
785 /**
786  * Handle RELEASE_RESERVE-message.
787  *
788  * @param cls closure
789  * @param client identification of the client
790  * @param message the actual message
791  */
792 static void
793 handle_release_reserve (void *cls,
794                         struct GNUNET_SERVER_Client *client,
795                         const struct GNUNET_MessageHeader *message)
796 {
797   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
798   struct ReservationList *pos;
799   struct ReservationList *prev;
800   struct ReservationList *next;
801   int rid = ntohl(msg->rid);
802   unsigned long long rem;
803
804 #if DEBUG_DATASTORE
805   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
806               "Processing `%s' request\n",
807               "RELEASE_RESERVE");
808 #endif
809   next = reservations;
810   prev = NULL;
811   while (NULL != (pos = next))
812     {
813       next = pos->next;
814       if (rid == pos->rid)
815         {
816           if (prev == NULL)
817             reservations = next;
818           else
819             prev->next = next;
820           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
821           GNUNET_assert (reserved >= rem);
822           reserved -= rem;
823           GNUNET_STATISTICS_set (stats,
824                          gettext_noop ("# reserved"),
825                                  reserved,
826                                  GNUNET_NO);
827 #if DEBUG_DATASTORE
828           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
829                       "Returning %llu remaining reserved bytes to storage pool\n",
830                       rem);
831 #endif    
832           GNUNET_free (pos);
833           transmit_status (client, GNUNET_OK, NULL);
834           return;
835         }       
836       prev = pos;
837     }
838   GNUNET_break (0);
839   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
840 }
841
842
843 /**
844  * Check that the given message is a valid data message.
845  *
846  * @return NULL if the message is not well-formed, otherwise the message
847  */
848 static const struct DataMessage *
849 check_data (const struct GNUNET_MessageHeader *message)
850 {
851   uint16_t size;
852   uint32_t dsize;
853   const struct DataMessage *dm;
854
855   size = ntohs(message->size);
856   if (size < sizeof(struct DataMessage))
857     { 
858       GNUNET_break (0);
859       return NULL;
860     }
861   dm = (const struct DataMessage *) message;
862   dsize = ntohl(dm->size);
863   if (size != dsize + sizeof(struct DataMessage))
864     {
865       GNUNET_break (0);
866       return NULL;
867     }
868   return dm;
869 }
870
871
872 /**
873  * Context for a put request used to see if the content is
874  * already present.
875  */
876 struct PutContext
877 {
878   /**
879    * Client to notify on completion.
880    */
881   struct GNUNET_SERVER_Client *client;
882
883   /**
884    * Did we find the data already in the database?
885    */
886   int is_present;
887   
888   /* followed by the 'struct DataMessage' */
889 };
890
891
892 /**
893  * Actually put the data message.
894  */
895 static void
896 execute_put (struct GNUNET_SERVER_Client *client,
897              const struct DataMessage *dm)
898 {
899   uint32_t size;
900   char *msg;
901   int ret;
902
903   size = ntohl(dm->size);
904   msg = NULL;
905   ret = plugin->api->put (plugin->api->cls,
906                           &dm->key,
907                           size,
908                           &dm[1],
909                           ntohl(dm->type),
910                           ntohl(dm->priority),
911                           ntohl(dm->anonymity),
912                           GNUNET_TIME_absolute_ntoh(dm->expiration),
913                           &msg);
914   if (GNUNET_OK == ret)
915     {
916       GNUNET_STATISTICS_update (stats,
917                                 gettext_noop ("# bytes stored"),
918                                 size,
919                                 GNUNET_YES);
920       GNUNET_CONTAINER_bloomfilter_add (filter,
921                                         &dm->key);
922 #if DEBUG_DATASTORE
923       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
924                   "Successfully stored %u bytes of type %u under key `%s'\n",
925                   size,
926                   ntohl(dm->type),
927                   GNUNET_h2s (&dm->key));
928 #endif
929     }
930   transmit_status (client, 
931                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
932                    msg);
933   GNUNET_free_non_null (msg);
934   if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
935     {
936       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
937                   _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
938                   (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
939                   (unsigned long long) (quota - reserved - cache_size),
940                   (unsigned long long) plugin->api->get_size (plugin->api->cls));
941       manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
942     }
943 }
944
945
946
947 /**
948  * Function that will check if the given datastore entry
949  * matches the put and if none match executes the put.
950  *
951  * @param cls closure, pointer to the client (of type 'struct PutContext').
952  * @param next_cls closure to use to ask for the next item
953  * @param key key for the content
954  * @param size number of bytes in data
955  * @param data content stored
956  * @param type type of the content
957  * @param priority priority of the content
958  * @param anonymity anonymity-level for the content
959  * @param expiration expiration time for the content
960  * @param uid unique identifier for the datum;
961  *        maybe 0 if no unique identifier is available
962  *
963  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
964  *         GNUNET_NO to delete the item and continue (if supported)
965  */
966 static int
967 check_present (void *cls,
968                void *next_cls,
969                const GNUNET_HashCode * key,
970                uint32_t size,
971                const void *data,
972                enum GNUNET_BLOCK_Type type,
973                uint32_t priority,
974                uint32_t anonymity,
975                struct GNUNET_TIME_Absolute
976                expiration, uint64_t uid)
977 {
978   struct PutContext *pc = cls;
979   const struct DataMessage *dm;
980
981   dm = (const struct DataMessage*) &pc[1];
982   if (key == NULL)
983     {
984       if (pc->is_present == GNUNET_YES) 
985         transmit_status (pc->client, GNUNET_OK, NULL);
986       else
987         execute_put (pc->client, dm);
988       GNUNET_SERVER_client_drop (pc->client);
989       GNUNET_free (pc);
990       return GNUNET_SYSERR;
991     }
992   if ( (size == ntohl(dm->size)) &&
993        (0 == memcmp (&dm[1],
994                      data,
995                      size)) )
996     {
997       pc->is_present = GNUNET_YES;
998       plugin->api->next_request (next_cls, GNUNET_YES);
999     }
1000   else
1001     {
1002       plugin->api->next_request (next_cls, GNUNET_NO);
1003     }
1004   return GNUNET_OK;
1005 }
1006
1007
1008 /**
1009  * Handle PUT-message.
1010  *
1011  * @param cls closure
1012  * @param client identification of the client
1013  * @param message the actual message
1014  */
1015 static void
1016 handle_put (void *cls,
1017             struct GNUNET_SERVER_Client *client,
1018             const struct GNUNET_MessageHeader *message)
1019 {
1020   const struct DataMessage *dm = check_data (message);
1021   int rid;
1022   struct ReservationList *pos;
1023   struct PutContext *pc;
1024   uint32_t size;
1025
1026   if ( (dm == NULL) ||
1027        (ntohl(dm->type) == 0) ) 
1028     {
1029       GNUNET_break (0);
1030       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1031       return;
1032     }
1033 #if DEBUG_DATASTORE
1034   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1035               "Processing `%s' request for `%s' of type %u\n",
1036               "PUT",
1037               GNUNET_h2s (&dm->key),
1038               ntohl (dm->type));
1039 #endif
1040   rid = ntohl(dm->rid);
1041   size = ntohl(dm->size);
1042   if (rid > 0)
1043     {
1044       pos = reservations;
1045       while ( (NULL != pos) &&
1046               (rid != pos->rid) )
1047         pos = pos->next;
1048       GNUNET_break (pos != NULL);
1049       if (NULL != pos)
1050         {
1051           GNUNET_break (pos->entries > 0);
1052           GNUNET_break (pos->amount > size);
1053           pos->entries--;
1054           pos->amount -= size;
1055           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1056           GNUNET_STATISTICS_set (stats,
1057                                  gettext_noop ("# reserved"),
1058                                  reserved,
1059                                  GNUNET_NO);
1060         }
1061     }
1062   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1063                                                        &dm->key))
1064     {
1065       pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1066       pc->client = client;
1067       GNUNET_SERVER_client_keep (client);
1068       memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1069       plugin->api->get (plugin->api->cls,
1070                         &dm->key,
1071                         NULL,
1072                         ntohl (dm->type),
1073                         &check_present,
1074                         pc);      
1075       return;
1076     }
1077   execute_put (client, dm);
1078 }
1079
1080
1081 /**
1082  * Handle GET-message.
1083  *
1084  * @param cls closure
1085  * @param client identification of the client
1086  * @param message the actual message
1087  */
1088 static void
1089 handle_get (void *cls,
1090             struct GNUNET_SERVER_Client *client,
1091             const struct GNUNET_MessageHeader *message)
1092 {
1093   const struct GetMessage *msg;
1094   uint16_t size;
1095
1096   size = ntohs(message->size);
1097   if ( (size != sizeof(struct GetMessage)) &&
1098        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
1099     {
1100       GNUNET_break (0);
1101       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1102       return;
1103     }
1104   msg = (const struct GetMessage*) message;
1105 #if DEBUG_DATASTORE
1106   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1107               "Processing `%s' request for `%s' of type %u\n",
1108               "GET",
1109               GNUNET_h2s (&msg->key),
1110               ntohl (msg->type));
1111 #endif
1112   GNUNET_STATISTICS_update (stats,
1113                             gettext_noop ("# GET requests received"),
1114                             1,
1115                             GNUNET_NO);
1116   GNUNET_SERVER_client_keep (client);
1117   if ( (size == sizeof(struct GetMessage)) &&
1118        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
1119                                                          &msg->key)) )
1120     {
1121       /* don't bother database... */
1122 #if DEBUG_DATASTORE
1123       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124                   "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1125                   "GET",
1126                   GNUNET_h2s (&msg->key));
1127 #endif  
1128       GNUNET_STATISTICS_update (stats,
1129                                 gettext_noop ("# requests filtered by bloomfilter"),
1130                                 1,
1131                                 GNUNET_NO);
1132       transmit_item (client,
1133                      NULL, NULL, 0, NULL, 0, 0, 0, 
1134                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
1135       return;
1136     }
1137   plugin->api->get (plugin->api->cls,
1138                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1139                     NULL,
1140                     ntohl(msg->type),
1141                     &transmit_item,
1142                     client);    
1143 }
1144
1145
1146 /**
1147  * Handle UPDATE-message.
1148  *
1149  * @param cls closure
1150  * @param client identification of the client
1151  * @param message the actual message
1152  */
1153 static void
1154 handle_update (void *cls,
1155                struct GNUNET_SERVER_Client *client,
1156                const struct GNUNET_MessageHeader *message)
1157 {
1158   const struct UpdateMessage *msg;
1159   int ret;
1160   char *emsg;
1161
1162   GNUNET_STATISTICS_update (stats,
1163                             gettext_noop ("# UPDATE requests received"),
1164                             1,
1165                             GNUNET_NO);
1166   msg = (const struct UpdateMessage*) message;
1167   emsg = NULL;
1168 #if DEBUG_DATASTORE
1169   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1170               "Processing `%s' request for %llu\n",
1171               "UPDATE",
1172               (unsigned long long) GNUNET_ntohll (msg->uid));
1173 #endif
1174   ret = plugin->api->update (plugin->api->cls,
1175                              GNUNET_ntohll(msg->uid),
1176                              (int32_t) ntohl(msg->priority),
1177                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1178                              &emsg);
1179   transmit_status (client, ret, emsg);
1180   GNUNET_free_non_null (emsg);
1181 }
1182
1183
1184 /**
1185  * Handle GET_RANDOM-message.
1186  *
1187  * @param cls closure
1188  * @param client identification of the client
1189  * @param message the actual message
1190  */
1191 static void
1192 handle_get_random (void *cls,
1193                    struct GNUNET_SERVER_Client *client,
1194                    const struct GNUNET_MessageHeader *message)
1195 {
1196 #if DEBUG_DATASTORE
1197   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1198               "Processing `%s' request\n",
1199               "GET_RANDOM");
1200 #endif
1201   GNUNET_STATISTICS_update (stats,
1202                             gettext_noop ("# GET RANDOM requests received"),
1203                             1,
1204                             GNUNET_NO);
1205   GNUNET_SERVER_client_keep (client);
1206   plugin->api->iter_migration_order (plugin->api->cls,
1207                                      0,
1208                                      &transmit_item,
1209                                      client);  
1210 }
1211
1212
1213 /**
1214  * Context for the 'remove_callback'.
1215  */
1216 struct RemoveContext 
1217 {
1218   /**
1219    * Client for whom we're doing the remvoing.
1220    */
1221   struct GNUNET_SERVER_Client *client;
1222
1223   /**
1224    * GNUNET_YES if we managed to remove something.
1225    */
1226   int found;
1227 };
1228
1229
1230 /**
1231  * Callback function that will cause the item that is passed
1232  * in to be deleted (by returning GNUNET_NO).
1233  */
1234 static int
1235 remove_callback (void *cls,
1236                  void *next_cls,
1237                  const GNUNET_HashCode * key,
1238                  uint32_t size,
1239                  const void *data,
1240                  enum GNUNET_BLOCK_Type type,
1241                  uint32_t priority,
1242                  uint32_t anonymity,
1243                  struct GNUNET_TIME_Absolute
1244                  expiration, uint64_t uid)
1245 {
1246   struct RemoveContext *rc = cls;
1247
1248   if (key == NULL)
1249     {
1250 #if DEBUG_DATASTORE
1251       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1252                   "No further matches for `%s' request.\n",
1253                   "REMOVE");
1254 #endif  
1255       if (GNUNET_YES == rc->found)
1256         transmit_status (rc->client, GNUNET_OK, NULL);       
1257       else
1258         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1259       GNUNET_SERVER_client_drop (rc->client);
1260       GNUNET_free (rc);
1261       return GNUNET_OK; /* last item */
1262     }
1263   rc->found = GNUNET_YES;
1264 #if DEBUG_DATASTORE
1265   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1267               (unsigned long long) uid,
1268               "REMOVE",
1269               GNUNET_h2s (key),
1270               type);
1271 #endif  
1272   GNUNET_STATISTICS_update (stats,
1273                             gettext_noop ("# bytes removed (explicit request)"),
1274                             size,
1275                             GNUNET_YES);
1276   GNUNET_CONTAINER_bloomfilter_remove (filter,
1277                                        key);
1278   plugin->api->next_request (next_cls, GNUNET_YES);
1279   return GNUNET_NO;
1280 }
1281
1282
1283 /**
1284  * Handle REMOVE-message.
1285  *
1286  * @param cls closure
1287  * @param client identification of the client
1288  * @param message the actual message
1289  */
1290 static void
1291 handle_remove (void *cls,
1292              struct GNUNET_SERVER_Client *client,
1293              const struct GNUNET_MessageHeader *message)
1294 {
1295   const struct DataMessage *dm = check_data (message);
1296   GNUNET_HashCode vhash;
1297   struct RemoveContext *rc;
1298
1299   if (dm == NULL)
1300     {
1301       GNUNET_break (0);
1302       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1303       return;
1304     }
1305 #if DEBUG_DATASTORE
1306   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1307               "Processing `%s' request for `%s' of type %u\n",
1308               "REMOVE",
1309               GNUNET_h2s (&dm->key),
1310               ntohl (dm->type));
1311 #endif
1312   GNUNET_STATISTICS_update (stats,
1313                             gettext_noop ("# REMOVE requests received"),
1314                             1,
1315                             GNUNET_NO);
1316   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1317   GNUNET_SERVER_client_keep (client);
1318   rc->client = client;
1319   GNUNET_CRYPTO_hash (&dm[1],
1320                       ntohl(dm->size),
1321                       &vhash);
1322   plugin->api->get (plugin->api->cls,
1323                     &dm->key,
1324                     &vhash,
1325                     ntohl(dm->type),
1326                     &remove_callback,
1327                     rc);
1328 }
1329
1330
1331 /**
1332  * Handle DROP-message.
1333  *
1334  * @param cls closure
1335  * @param client identification of the client
1336  * @param message the actual message
1337  */
1338 static void
1339 handle_drop (void *cls,
1340              struct GNUNET_SERVER_Client *client,
1341              const struct GNUNET_MessageHeader *message)
1342 {
1343 #if DEBUG_DATASTORE
1344   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1345               "Processing `%s' request\n",
1346               "DROP");
1347 #endif
1348   plugin->api->drop (plugin->api->cls);
1349   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1350 }
1351
1352
1353 /**
1354  * Load the datastore plugin.
1355  */
1356 static struct DatastorePlugin *
1357 load_plugin () 
1358 {
1359   struct DatastorePlugin *ret;
1360   char *libname;
1361   char *name;
1362
1363   if (GNUNET_OK !=
1364       GNUNET_CONFIGURATION_get_value_string (cfg,
1365                                              "DATASTORE", "DATABASE", &name))
1366     {
1367       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1368                   _("No `%s' specified for `%s' in configuration!\n"),
1369                   "DATABASE",
1370                   "DATASTORE");
1371       return NULL;
1372     }
1373   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1374   ret->env.cfg = cfg;
1375   ret->env.sched = sched;  
1376   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1377               _("Loading `%s' datastore plugin\n"), name);
1378   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1379   ret->short_name = name;
1380   ret->lib_name = libname;
1381   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1382   if (ret->api == NULL)
1383     {
1384       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1385                   _("Failed to load datastore plugin for `%s'\n"), name);
1386       GNUNET_free (ret->short_name);
1387       GNUNET_free (libname);
1388       GNUNET_free (ret);
1389       return NULL;
1390     }
1391   return ret;
1392 }
1393
1394
1395 /**
1396  * Function called when the service shuts
1397  * down.  Unloads our datastore plugin.
1398  *
1399  * @param plug plugin to unload
1400  */
1401 static void
1402 unload_plugin (struct DatastorePlugin *plug)
1403 {
1404 #if DEBUG_DATASTORE
1405   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1406               "Datastore service is unloading plugin...\n");
1407 #endif
1408   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1409   GNUNET_free (plug->lib_name);
1410   GNUNET_free (plug->short_name);
1411   GNUNET_free (plug);
1412 }
1413
1414
1415 /**
1416  * Final task run after shutdown.  Unloads plugins and disconnects us from
1417  * statistics.
1418  */
1419 static void
1420 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1421 {
1422   unload_plugin (plugin);
1423   plugin = NULL;
1424   if (filter != NULL)
1425     {
1426       GNUNET_CONTAINER_bloomfilter_free (filter);
1427       filter = NULL;
1428     }
1429   if (stats != NULL)
1430     {
1431       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1432       stats = NULL;
1433     }
1434 }
1435
1436
1437 /**
1438  * Last task run during shutdown.  Disconnects us from
1439  * the transport and core.
1440  */
1441 static void
1442 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1443 {
1444   struct TransmitCallbackContext *tcc;
1445
1446   cleaning_done = GNUNET_YES;
1447   while (NULL != (tcc = tcc_head))
1448     {
1449       GNUNET_CONTAINER_DLL_remove (tcc_head,
1450                                    tcc_tail,
1451                                    tcc);
1452       if (tcc->th != NULL)
1453         {
1454           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1455           GNUNET_SERVER_client_drop (tcc->client);
1456         }
1457       if (NULL != tcc->tc)
1458         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1459       GNUNET_free (tcc->msg);
1460       GNUNET_free (tcc);
1461     }
1462   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1463     {
1464       GNUNET_SCHEDULER_cancel (sched,
1465                                expired_kill_task);
1466       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1467     }
1468   GNUNET_SCHEDULER_add_continuation (sched,
1469                                      &unload_task,
1470                                      NULL,
1471                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1472 }
1473
1474
1475 /**
1476  * Function that removes all active reservations made
1477  * by the given client and releases the space for other
1478  * requests.
1479  *
1480  * @param cls closure
1481  * @param client identification of the client
1482  */
1483 static void
1484 cleanup_reservations (void *cls,
1485                       struct GNUNET_SERVER_Client
1486                       * client)
1487 {
1488   struct ReservationList *pos;
1489   struct ReservationList *prev;
1490   struct ReservationList *next;
1491
1492   if (client == NULL)
1493     return;
1494   prev = NULL;
1495   pos = reservations;
1496   while (NULL != pos)
1497     {
1498       next = pos->next;
1499       if (pos->client == client)
1500         {
1501           if (prev == NULL)
1502             reservations = next;
1503           else
1504             prev->next = next;
1505           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1506           GNUNET_free (pos);
1507         }
1508       else
1509         {
1510           prev = pos;
1511         }
1512       pos = next;
1513     }
1514   GNUNET_STATISTICS_set (stats,
1515                          gettext_noop ("# reserved"),
1516                          reserved,
1517                          GNUNET_NO);
1518 }
1519
1520
1521 /**
1522  * Process datastore requests.
1523  *
1524  * @param cls closure
1525  * @param s scheduler to use
1526  * @param server the initialized server
1527  * @param c configuration to use
1528  */
1529 static void
1530 run (void *cls,
1531      struct GNUNET_SCHEDULER_Handle *s,
1532      struct GNUNET_SERVER_Handle *server,
1533      const struct GNUNET_CONFIGURATION_Handle *c)
1534 {
1535   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1536     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1537      sizeof(struct ReserveMessage) }, 
1538     {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1539      sizeof(struct ReleaseReserveMessage) }, 
1540     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1541     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1542      sizeof (struct UpdateMessage) }, 
1543     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1544     {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1545      sizeof(struct GNUNET_MessageHeader) }, 
1546     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1547     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1548      sizeof(struct GNUNET_MessageHeader) }, 
1549     {NULL, NULL, 0, 0}
1550   };
1551   char *fn;
1552   unsigned int bf_size;
1553
1554   sched = s;
1555   cfg = c;
1556   if (GNUNET_OK !=
1557       GNUNET_CONFIGURATION_get_value_number (cfg,
1558                                              "DATASTORE", "QUOTA", &quota))
1559     {
1560       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1561                   _("No `%s' specified for `%s' in configuration!\n"),
1562                   "QUOTA",
1563                   "DATASTORE");
1564       return;
1565     }
1566   stats = GNUNET_STATISTICS_create (sched, "datastore", cfg);
1567   GNUNET_STATISTICS_set (stats,
1568                          gettext_noop ("# quota"),
1569                          quota,
1570                          GNUNET_NO);
1571   cache_size = quota / 8; /* Or should we make this an option? */
1572   GNUNET_STATISTICS_set (stats,
1573                          gettext_noop ("# cache size"),
1574                          cache_size,
1575                          GNUNET_NO);
1576   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1577   fn = NULL;
1578   if ( (GNUNET_OK !=
1579         GNUNET_CONFIGURATION_get_value_filename (cfg,
1580                                                  "DATASTORE",
1581                                                  "BLOOMFILTER",
1582                                                  &fn)) ||
1583        (GNUNET_OK !=
1584         GNUNET_DISK_directory_create_for_file (fn)) )
1585     {
1586       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1587                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1588                   fn != NULL ? fn : "");
1589       GNUNET_free_non_null (fn);
1590       fn = NULL;
1591     }
1592   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1593   GNUNET_free_non_null (fn);
1594   if (filter == NULL)
1595     {
1596       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1597                   _("Failed to initialize bloomfilter.\n"));
1598       if (stats != NULL)
1599         {
1600           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1601           stats = NULL;
1602         }
1603       return;
1604     }
1605   plugin = load_plugin ();
1606   if (NULL == plugin)
1607     {
1608       GNUNET_CONTAINER_bloomfilter_free (filter);
1609       filter = NULL;
1610       if (stats != NULL)
1611         {
1612           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1613           stats = NULL;
1614         }
1615       return;
1616     }
1617   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1618   GNUNET_SERVER_add_handlers (server, handlers);
1619   expired_kill_task
1620     = GNUNET_SCHEDULER_add_with_priority (sched,
1621                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1622                                           &delete_expired, NULL);
1623   GNUNET_SCHEDULER_add_delayed (sched,
1624                                 GNUNET_TIME_UNIT_FOREVER_REL,
1625                                 &cleaning_task, NULL);
1626   
1627 }
1628
1629
1630 /**
1631  * The main function for the datastore service.
1632  *
1633  * @param argc number of arguments from the command line
1634  * @param argv command line arguments
1635  * @return 0 ok, 1 on error
1636  */
1637 int
1638 main (int argc, char *const *argv)
1639 {
1640   int ret;
1641
1642   ret = (GNUNET_OK ==
1643          GNUNET_SERVICE_run (argc,
1644                              argv,
1645                              "datastore",
1646                              GNUNET_SERVICE_OPTION_NONE,
1647                              &run, NULL)) ? 0 : 1;
1648   return ret;
1649 }
1650
1651
1652 /* end of gnunet-service-datastore.c */