fixes
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "plugin_datastore.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45
46
47 /**
48  * Our datastore plugin.
49  */
50 struct DatastorePlugin
51 {
52
53   /**
54    * API of the transport as returned by the plugin's
55    * initialization function.
56    */
57   struct GNUNET_DATASTORE_PluginFunctions *api;
58
59   /**
60    * Short name for the plugin (i.e. "sqlite").
61    */
62   char *short_name;
63
64   /**
65    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
66    */
67   char *lib_name;
68
69   /**
70    * Environment this transport service is using
71    * for this plugin.
72    */
73   struct GNUNET_DATASTORE_PluginEnvironment env;
74
75 };
76
77
78 /**
79  * Linked list of active reservations.
80  */
81 struct ReservationList 
82 {
83
84   /**
85    * This is a linked list.
86    */
87   struct ReservationList *next;
88
89   /**
90    * Client that made the reservation.
91    */
92   struct GNUNET_SERVER_Client *client;
93
94   /**
95    * Number of bytes (still) reserved.
96    */
97   uint64_t amount;
98
99   /**
100    * Number of items (still) reserved.
101    */
102   uint64_t entries;
103
104   /**
105    * Reservation identifier.
106    */
107   int32_t rid;
108
109 };
110
111
112 /**
113  * Our datastore plugin (NULL if not available).
114  */
115 static struct DatastorePlugin *plugin;
116
117 /**
118  * Linked list of space reservations made by clients.
119  */
120 static struct ReservationList *reservations;
121
122 /**
123  * Bloomfilter to quickly tell if we don't have the content.
124  */
125 static struct GNUNET_CONTAINER_BloomFilter *filter;
126
127 /**
128  * Static counter to produce reservation identifiers.
129  */
130 static int reservation_gen;
131
132 /**
133  * How much space are we allowed to use?
134  */
135 static unsigned long long quota;
136
137 /**
138  * How much space are we using for the cache?  (space available for
139  * insertions that will be instantly reclaimed by discarding less
140  * important content --- or possibly whatever we just inserted into
141  * the "cache").
142  */
143 static unsigned long long cache_size;
144
145 /**
146  * How much space have we currently reserved?
147  */
148 static unsigned long long reserved;
149
150 /**
151  * Identity of the task that is used to delete
152  * expired content.
153  */
154 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
155
156 /**
157  * Our configuration.
158  */
159 const struct GNUNET_CONFIGURATION_Handle *cfg;
160
161 /**
162  * Our scheduler.
163  */
164 struct GNUNET_SCHEDULER_Handle *sched; 
165
166 /**
167  * Handle for reporting statistics.
168  */
169 static struct GNUNET_STATISTICS_Handle *stats;
170
171
172 /**
173  * Function called once the transmit operation has
174  * either failed or succeeded.
175  *
176  * @param cls closure
177  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
178  */
179 typedef void (*TransmitContinuation)(void *cls,
180                                      int status);
181
182
183 /**
184  * Context for transmitting replies to clients.
185  */
186 struct TransmitCallbackContext 
187 {
188   
189   /**
190    * We keep these in a doubly-linked list (for cleanup).
191    */
192   struct TransmitCallbackContext *next;
193   
194   /**
195    * We keep these in a doubly-linked list (for cleanup).
196    */
197   struct TransmitCallbackContext *prev;
198   
199   /**
200    * The message that we're asked to transmit.
201    */
202   struct GNUNET_MessageHeader *msg;
203   
204   /**
205    * Handle for the transmission request.
206    */
207   struct GNUNET_CONNECTION_TransmitHandle *th;
208
209   /**
210    * Client that we are transmitting to.
211    */
212   struct GNUNET_SERVER_Client *client;
213
214   /**
215    * Function to call once msg has been transmitted
216    * (or at least added to the buffer).
217    */
218   TransmitContinuation tc;
219
220   /**
221    * Closure for tc.
222    */
223   void *tc_cls;
224
225   /**
226    * GNUNET_YES if we are supposed to signal the server
227    * completion of the client's request.
228    */
229   int end;
230 };
231
232   
233 /**
234  * Head of the doubly-linked list (for cleanup).
235  */
236 static struct TransmitCallbackContext *tcc_head;
237
238 /**
239  * Tail of the doubly-linked list (for cleanup).
240  */
241 static struct TransmitCallbackContext *tcc_tail;
242
243 /**
244  * Have we already cleaned up the TCCs and are hence no longer
245  * willing (or able) to transmit anything to anyone?
246  */
247 static int cleaning_done;
248
249 /**
250  * Task that is used to remove expired entries from
251  * the datastore.  This task will schedule itself
252  * again automatically to always delete all expired
253  * content quickly.
254  *
255  * @param cls not used
256  * @param tc task context
257  */ 
258 static void
259 delete_expired (void *cls,
260                 const struct GNUNET_SCHEDULER_TaskContext *tc);
261
262
263 /**
264  * Iterate over the expired items stored in the datastore.
265  * Delete all expired items; once we have processed all
266  * expired items, re-schedule the "delete_expired" task.
267  *
268  * @param cls not used
269  * @param next_cls closure to pass to the "next" function.
270  * @param key key for the content
271  * @param size number of bytes in data
272  * @param data content stored
273  * @param type type of the content
274  * @param priority priority of the content
275  * @param anonymity anonymity-level for the content
276  * @param expiration expiration time for the content
277  * @param uid unique identifier for the datum;
278  *        maybe 0 if no unique identifier is available
279  *
280  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
281  *         (continue on call to "next", of course),
282  *         GNUNET_NO to delete the item and continue (if supported)
283  */
284 static int 
285 expired_processor (void *cls,
286                    void *next_cls,
287                    const GNUNET_HashCode * key,
288                    uint32_t size,
289                    const void *data,
290                    enum GNUNET_BLOCK_Type type,
291                    uint32_t priority,
292                    uint32_t anonymity,
293                    struct GNUNET_TIME_Absolute
294                    expiration, 
295                    uint64_t uid)
296 {
297   struct GNUNET_TIME_Absolute now;
298
299   if (key == NULL) 
300     {
301       expired_kill_task 
302         = GNUNET_SCHEDULER_add_delayed (sched,
303                                         MAX_EXPIRE_DELAY,
304                                         &delete_expired,
305                                         NULL);
306       return GNUNET_SYSERR;
307     }
308   now = GNUNET_TIME_absolute_get ();
309   if (expiration.value > now.value)
310     {
311       /* finished processing */
312       plugin->api->next_request (next_cls, GNUNET_YES);
313       return GNUNET_SYSERR;
314     }
315   plugin->api->next_request (next_cls, GNUNET_NO);
316 #if DEBUG_DATASTORE
317   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
318               "Deleting content that expired %llu ms ago\n",
319               (unsigned long long) (now.value - expiration.value));
320 #endif
321   GNUNET_STATISTICS_update (stats,
322                             gettext_noop ("# bytes expired"),
323                             size,
324                             GNUNET_YES);
325   GNUNET_CONTAINER_bloomfilter_remove (filter,
326                                        key);
327   return GNUNET_NO; /* delete */
328 }
329
330
331 /**
332  * Task that is used to remove expired entries from
333  * the datastore.  This task will schedule itself
334  * again automatically to always delete all expired
335  * content quickly.
336  *
337  * @param cls not used
338  * @param tc task context
339  */ 
340 static void
341 delete_expired (void *cls,
342                 const struct GNUNET_SCHEDULER_TaskContext *tc)
343 {
344   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
345   plugin->api->iter_ascending_expiration (plugin->api->cls, 
346                                           0,
347                                           &expired_processor,
348                                           NULL);
349 }
350
351
352 /**
353  * An iterator over a set of items stored in the datastore.
354  *
355  * @param cls closure
356  * @param next_cls closure to pass to the "next" function.
357  * @param key key for the content
358  * @param size number of bytes in data
359  * @param data content stored
360  * @param type type of the content
361  * @param priority priority of the content
362  * @param anonymity anonymity-level for the content
363  * @param expiration expiration time for the content
364  * @param uid unique identifier for the datum;
365  *        maybe 0 if no unique identifier is available
366  *
367  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
368  *         (continue on call to "next", of course),
369  *         GNUNET_NO to delete the item and continue (if supported)
370  */
371 static int 
372 manage (void *cls,
373         void *next_cls,
374         const GNUNET_HashCode * key,
375         uint32_t size,
376         const void *data,
377         enum GNUNET_BLOCK_Type type,
378         uint32_t priority,
379         uint32_t anonymity,
380         struct GNUNET_TIME_Absolute
381         expiration, 
382         uint64_t uid)
383 {
384   unsigned long long *need = cls;
385
386   if (NULL == key)
387     {
388       GNUNET_free (need);
389       return GNUNET_SYSERR;
390     }
391   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
392     *need = 0;
393   else
394     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
395   plugin->api->next_request (next_cls, 
396                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
397 #if DEBUG_DATASTORE
398   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399               "Deleting %llu bytes of low-priority content (still trying to free another %llu bytes)\n",
400               size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
401               *need);
402 #endif
403   GNUNET_STATISTICS_update (stats,
404                             gettext_noop ("# bytes purged (low-priority)"),
405                             size,
406                             GNUNET_YES);
407   GNUNET_CONTAINER_bloomfilter_remove (filter,
408                                        key);
409   return GNUNET_NO;
410 }
411
412
413 /**
414  * Manage available disk space by running tasks
415  * that will discard content if necessary.  This
416  * function will be run whenever a request for
417  * "need" bytes of storage could only be satisfied
418  * by eating into the "cache" (and we want our cache
419  * space back).
420  *
421  * @param need number of bytes of content that were
422  *        placed into the "cache" (and hence the
423  *        number of bytes that should be removed).
424  */
425 static void
426 manage_space (unsigned long long need)
427 {
428   unsigned long long *n;
429
430 #if DEBUG_DATASTORE
431   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
432               "Asked to free up %llu bytes of cache space\n",
433               need);
434 #endif
435   n = GNUNET_malloc (sizeof(unsigned long long));
436   *n = need;
437   plugin->api->iter_low_priority (plugin->api->cls,
438                                   0,
439                                   &manage,
440                                   n);
441 }
442
443
444 /**
445  * Function called to notify a client about the socket
446  * begin ready to queue more data.  "buf" will be
447  * NULL and "size" zero if the socket was closed for
448  * writing in the meantime.
449  *
450  * @param cls closure
451  * @param size number of bytes available in buf
452  * @param buf where the callee should write the message
453  * @return number of bytes written to buf
454  */
455 static size_t
456 transmit_callback (void *cls,
457                    size_t size, void *buf)
458 {
459   struct TransmitCallbackContext *tcc = cls;
460   size_t msize;
461   
462   tcc->th = NULL;
463   GNUNET_CONTAINER_DLL_remove (tcc_head,
464                                tcc_tail,
465                                tcc);
466   msize = ntohs(tcc->msg->size);
467   if (size == 0)
468     {
469 #if DEBUG_DATASTORE
470       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
471                   "Transmission failed.\n");
472 #endif
473       if (tcc->tc != NULL)
474         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
475       if (GNUNET_YES == tcc->end)
476         GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
477       GNUNET_SERVER_client_drop (tcc->client);
478       GNUNET_free (tcc->msg);
479       GNUNET_free (tcc);
480       return 0;
481     }
482   GNUNET_assert (size >= msize);
483   memcpy (buf, tcc->msg, msize);
484   if (tcc->tc != NULL)
485     tcc->tc (tcc->tc_cls, GNUNET_OK);
486   if (GNUNET_YES == tcc->end)
487     {
488       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
489     }
490   else
491     {
492 #if DEBUG_DATASTORE
493       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
494                   "Response transmitted, more pending!\n");
495 #endif
496     }
497   GNUNET_SERVER_client_drop (tcc->client);
498   GNUNET_free (tcc->msg);
499   GNUNET_free (tcc);
500   return msize;
501 }
502
503
504 /**
505  * Transmit the given message to the client.
506  *
507  * @param client target of the message
508  * @param msg message to transmit, will be freed!
509  * @param tc function to call afterwards
510  * @param tc_cls closure for tc
511  * @param end is this the last response (and we should
512  *        signal the server completion accodingly after
513  *        transmitting this message)?
514  */
515 static void
516 transmit (struct GNUNET_SERVER_Client *client,
517           struct GNUNET_MessageHeader *msg,
518           TransmitContinuation tc,
519           void *tc_cls,
520           int end)
521 {
522   struct TransmitCallbackContext *tcc;
523
524   if (GNUNET_YES == cleaning_done)
525     {
526 #if DEBUG_DATASTORE
527       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
528                   "Shutdown in progress, aborting transmission.\n");
529 #endif
530       GNUNET_free (msg);
531       if (NULL != tc)
532         tc (tc_cls, GNUNET_SYSERR);
533       return;
534     }
535   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
536   tcc->msg = msg;
537   tcc->client = client;
538   tcc->tc = tc;
539   tcc->tc_cls = tc_cls;
540   tcc->end = end;
541   if (NULL ==
542       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
543                                                       ntohs(msg->size),
544                                                       GNUNET_TIME_UNIT_FOREVER_REL,
545                                                       &transmit_callback,
546                                                       tcc)))
547     {
548       GNUNET_break (0);
549       if (GNUNET_YES == end)
550         {
551 #if DEBUG_DATASTORE
552           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
553                       "Disconnecting client.\n");
554 #endif    
555           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
556         }
557       if (NULL != tc)
558         tc (tc_cls, GNUNET_SYSERR);
559       GNUNET_free (msg);
560       GNUNET_free (tcc);
561       return;
562     }
563   GNUNET_SERVER_client_keep (client);
564   GNUNET_CONTAINER_DLL_insert (tcc_head,
565                                tcc_tail,
566                                tcc);
567 }
568
569
570 /**
571  * Transmit a status code to the client.
572  *
573  * @param client receiver of the response
574  * @param code status code
575  * @param msg optional error message (can be NULL)
576  */
577 static void
578 transmit_status (struct GNUNET_SERVER_Client *client,
579                  int code,
580                  const char *msg)
581 {
582   struct StatusMessage *sm;
583   size_t slen;
584
585 #if DEBUG_DATASTORE
586   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
587               "Transmitting `%s' message with value %d and message `%s'\n",
588               "STATUS",
589               code,
590               msg != NULL ? msg : "(none)");
591 #endif
592   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
593   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
594   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
595   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
596   sm->status = htonl(code);
597   if (slen > 0)
598     memcpy (&sm[1], msg, slen);  
599   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
600 }
601
602
603 /**
604  * Function called once the transmit operation has
605  * either failed or succeeded.
606  *
607  * @param next_cls closure for calling "next_request" callback
608  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
609  */
610 static void 
611 get_next(void *next_cls,
612          int status)
613 {
614   if (status != GNUNET_OK)
615     {
616       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
617                   _("Failed to transmit an item to the client; aborting iteration.\n"));
618       if (plugin != NULL)
619         plugin->api->next_request (next_cls, GNUNET_YES);
620       return;
621     }
622   plugin->api->next_request (next_cls, GNUNET_NO);
623 }
624
625
626 /**
627  * Function that will transmit the given datastore entry
628  * to the client.
629  *
630  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
631  * @param next_cls closure to use to ask for the next item
632  * @param key key for the content
633  * @param size number of bytes in data
634  * @param data content stored
635  * @param type type of the content
636  * @param priority priority of the content
637  * @param anonymity anonymity-level for the content
638  * @param expiration expiration time for the content
639  * @param uid unique identifier for the datum;
640  *        maybe 0 if no unique identifier is available
641  *
642  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
643  *         GNUNET_NO to delete the item and continue (if supported)
644  */
645 static int
646 transmit_item (void *cls,
647                void *next_cls,
648                const GNUNET_HashCode * key,
649                uint32_t size,
650                const void *data,
651                enum GNUNET_BLOCK_Type type,
652                uint32_t priority,
653                uint32_t anonymity,
654                struct GNUNET_TIME_Absolute
655                expiration, uint64_t uid)
656 {
657   struct GNUNET_SERVER_Client *client = cls;
658   struct GNUNET_MessageHeader *end;
659   struct DataMessage *dm;
660
661   if (key == NULL)
662     {
663       /* transmit 'DATA_END' */
664 #if DEBUG_DATASTORE
665       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
666                   "Transmitting `%s' message\n",
667                   "DATA_END");
668 #endif
669       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
670       end->size = htons(sizeof(struct GNUNET_MessageHeader));
671       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
672       transmit (client, end, NULL, NULL, GNUNET_YES);
673       GNUNET_SERVER_client_drop (client);
674       return GNUNET_OK;
675     }
676   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
677   dm->header.size = htons(sizeof(struct DataMessage) + size);
678   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
679   dm->rid = htonl(0);
680   dm->size = htonl(size);
681   dm->type = htonl(type);
682   dm->priority = htonl(priority);
683   dm->anonymity = htonl(anonymity);
684   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
685   dm->uid = GNUNET_htonll(uid);
686   dm->key = *key;
687   memcpy (&dm[1], data, size);
688 #if DEBUG_DATASTORE
689   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
690               "Transmitting `%s' message\n",
691               "DATA");
692 #endif
693   GNUNET_STATISTICS_update (stats,
694                             gettext_noop ("# results found"),
695                             1,
696                             GNUNET_NO);
697   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
698   return GNUNET_OK;
699 }
700
701
702 /**
703  * Handle RESERVE-message.
704  *
705  * @param cls closure
706  * @param client identification of the client
707  * @param message the actual message
708  */
709 static void
710 handle_reserve (void *cls,
711                 struct GNUNET_SERVER_Client *client,
712                 const struct GNUNET_MessageHeader *message)
713 {
714   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
715   struct ReservationList *e;
716   unsigned long long used;
717   unsigned long long req;
718   uint64_t amount;
719   uint32_t entries;
720
721 #if DEBUG_DATASTORE
722   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
723               "Processing `%s' request\n",
724               "RESERVE");
725 #endif
726   amount = GNUNET_ntohll(msg->amount);
727   entries = ntohl(msg->entries);
728   used = plugin->api->get_size (plugin->api->cls) + reserved;
729   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
730   if (used + req > quota)
731     {
732       if (quota < used)
733         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
734       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
735                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
736                   quota - used,
737                   "RESERVE",
738                   req);
739       if (cache_size < req)
740         {
741           /* TODO: document this in the FAQ; essentially, if this
742              message happens, the insertion request could be blocked
743              by less-important content from migration because it is
744              larger than 1/8th of the overall available space, and
745              we only reserve 1/8th for "fresh" insertions */
746           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
747                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
748                       req,
749                       cache_size);
750           transmit_status (client, 0, 
751                            gettext_noop ("Insufficient space to satisfy request and "
752                                          "requested amount is larger than cache size"));
753         }
754       else
755         {
756           transmit_status (client, 0, 
757                            gettext_noop ("Insufficient space to satisfy request"));
758         }
759       return;      
760     }
761   reserved += req;
762   e = GNUNET_malloc (sizeof(struct ReservationList));
763   e->next = reservations;
764   reservations = e;
765   e->client = client;
766   e->amount = amount;
767   e->entries = entries;
768   e->rid = ++reservation_gen;
769   if (reservation_gen < 0)
770     reservation_gen = 0; /* wrap around */
771   transmit_status (client, e->rid, NULL);
772 }
773
774
775 /**
776  * Handle RELEASE_RESERVE-message.
777  *
778  * @param cls closure
779  * @param client identification of the client
780  * @param message the actual message
781  */
782 static void
783 handle_release_reserve (void *cls,
784                         struct GNUNET_SERVER_Client *client,
785                         const struct GNUNET_MessageHeader *message)
786 {
787   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
788   struct ReservationList *pos;
789   struct ReservationList *prev;
790   struct ReservationList *next;
791   int rid = ntohl(msg->rid);
792   unsigned long long rem;
793
794 #if DEBUG_DATASTORE
795   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
796               "Processing `%s' request\n",
797               "RELEASE_RESERVE");
798 #endif
799   next = reservations;
800   prev = NULL;
801   while (NULL != (pos = next))
802     {
803       next = pos->next;
804       if (rid == pos->rid)
805         {
806           if (prev == NULL)
807             reservations = next;
808           else
809             prev->next = next;
810           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
811           GNUNET_assert (reserved >= rem);
812           reserved -= rem;
813 #if DEBUG_DATASTORE
814           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815                       "Returning %llu remaining reserved bytes to storage pool\n",
816                       rem);
817 #endif    
818           GNUNET_free (pos);
819           transmit_status (client, GNUNET_OK, NULL);
820           return;
821         }       
822       prev = pos;
823     }
824   GNUNET_break (0);
825   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
826 }
827
828
829 /**
830  * Check that the given message is a valid data message.
831  *
832  * @return NULL if the message is not well-formed, otherwise the message
833  */
834 static const struct DataMessage *
835 check_data (const struct GNUNET_MessageHeader *message)
836 {
837   uint16_t size;
838   uint32_t dsize;
839   const struct DataMessage *dm;
840
841   size = ntohs(message->size);
842   if (size < sizeof(struct DataMessage))
843     { 
844       GNUNET_break (0);
845       return NULL;
846     }
847   dm = (const struct DataMessage *) message;
848   dsize = ntohl(dm->size);
849   if (size != dsize + sizeof(struct DataMessage))
850     {
851       GNUNET_break (0);
852       return NULL;
853     }
854   return dm;
855 }
856
857
858 /**
859  * Handle PUT-message.
860  *
861  * @param cls closure
862  * @param client identification of the client
863  * @param message the actual message
864  */
865 static void
866 handle_put (void *cls,
867             struct GNUNET_SERVER_Client *client,
868             const struct GNUNET_MessageHeader *message)
869 {
870   const struct DataMessage *dm = check_data (message);
871   char *msg;
872   int ret;
873   int rid;
874   struct ReservationList *pos;
875   uint32_t size;
876
877   if ( (dm == NULL) ||
878        (ntohl(dm->type) == 0) ) 
879     {
880       GNUNET_break (0);
881       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
882       return;
883     }
884 #if DEBUG_DATASTORE
885   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886               "Processing `%s' request for `%s'\n",
887               "PUT",
888               GNUNET_h2s (&dm->key));
889 #endif
890   rid = ntohl(dm->rid);
891   size = ntohl(dm->size);
892   if (rid > 0)
893     {
894       pos = reservations;
895       while ( (NULL != pos) &&
896               (rid != pos->rid) )
897         pos = pos->next;
898       GNUNET_break (pos != NULL);
899       if (NULL != pos)
900         {
901           GNUNET_break (pos->entries > 0);
902           GNUNET_break (pos->amount > size);
903           pos->entries--;
904           pos->amount -= size;
905           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
906         }
907     }
908   msg = NULL;
909   ret = plugin->api->put (plugin->api->cls,
910                           &dm->key,
911                           size,
912                           &dm[1],
913                           ntohl(dm->type),
914                           ntohl(dm->priority),
915                           ntohl(dm->anonymity),
916                           GNUNET_TIME_absolute_ntoh(dm->expiration),
917                           &msg);
918   if (GNUNET_OK == ret)
919     {
920       GNUNET_STATISTICS_update (stats,
921                                 gettext_noop ("# bytes stored"),
922                                 size,
923                                 GNUNET_YES);
924       GNUNET_CONTAINER_bloomfilter_add (filter,
925                                         &dm->key);
926 #if DEBUG_DATASTORE
927       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
928                   "Successfully stored %u bytes under key `%s'\n",
929                   size,
930                   GNUNET_h2s (&dm->key));
931 #endif
932     }
933   transmit_status (client, 
934                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
935                    msg);
936   GNUNET_free_non_null (msg);
937   if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
938     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
939 }
940
941
942 /**
943  * Handle GET-message.
944  *
945  * @param cls closure
946  * @param client identification of the client
947  * @param message the actual message
948  */
949 static void
950 handle_get (void *cls,
951             struct GNUNET_SERVER_Client *client,
952             const struct GNUNET_MessageHeader *message)
953 {
954   const struct GetMessage *msg;
955   uint16_t size;
956
957   size = ntohs(message->size);
958   if ( (size != sizeof(struct GetMessage)) &&
959        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
960     {
961       GNUNET_break (0);
962       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
963       return;
964     }
965   msg = (const struct GetMessage*) message;
966 #if DEBUG_DATASTORE
967   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
968               "Processing `%s' request for `%s' of type %u\n",
969               "GET",
970               GNUNET_h2s (&msg->key),
971               ntohl (msg->type));
972 #endif
973   GNUNET_STATISTICS_update (stats,
974                             gettext_noop ("# GET requests received"),
975                             1,
976                             GNUNET_NO);
977   GNUNET_SERVER_client_keep (client);
978   if ( (size == sizeof(struct GetMessage)) &&
979        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
980                                                          &msg->key)) )
981     {
982       /* don't bother database... */
983 #if DEBUG_DATASTORE
984       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985                   "Empty result set for `%s' request for `%s'.\n",
986                   "GET",
987                   GNUNET_h2s (&msg->key));
988 #endif  
989       GNUNET_STATISTICS_update (stats,
990                                 gettext_noop ("# requests filtered by bloomfilter"),
991                                 1,
992                                 GNUNET_NO);
993       transmit_item (client,
994                      NULL, NULL, 0, NULL, 0, 0, 0, 
995                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
996       return;
997     }
998   plugin->api->get (plugin->api->cls,
999                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1000                     NULL,
1001                     ntohl(msg->type),
1002                     &transmit_item,
1003                     client);    
1004 }
1005
1006
1007 /**
1008  * Handle UPDATE-message.
1009  *
1010  * @param cls closure
1011  * @param client identification of the client
1012  * @param message the actual message
1013  */
1014 static void
1015 handle_update (void *cls,
1016                struct GNUNET_SERVER_Client *client,
1017                const struct GNUNET_MessageHeader *message)
1018 {
1019   const struct UpdateMessage *msg;
1020   int ret;
1021   char *emsg;
1022
1023   GNUNET_STATISTICS_update (stats,
1024                             gettext_noop ("# UPDATE requests received"),
1025                             1,
1026                             GNUNET_NO);
1027   msg = (const struct UpdateMessage*) message;
1028   emsg = NULL;
1029 #if DEBUG_DATASTORE
1030   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1031               "Processing `%s' request for %llu\n",
1032               "UPDATE",
1033               (unsigned long long) GNUNET_ntohll (msg->uid));
1034 #endif
1035   ret = plugin->api->update (plugin->api->cls,
1036                              GNUNET_ntohll(msg->uid),
1037                              (int32_t) ntohl(msg->priority),
1038                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1039                              &emsg);
1040   transmit_status (client, ret, emsg);
1041   GNUNET_free_non_null (emsg);
1042 }
1043
1044
1045 /**
1046  * Handle GET_RANDOM-message.
1047  *
1048  * @param cls closure
1049  * @param client identification of the client
1050  * @param message the actual message
1051  */
1052 static void
1053 handle_get_random (void *cls,
1054                    struct GNUNET_SERVER_Client *client,
1055                    const struct GNUNET_MessageHeader *message)
1056 {
1057 #if DEBUG_DATASTORE
1058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059               "Processing `%s' request\n",
1060               "GET_RANDOM");
1061 #endif
1062   GNUNET_STATISTICS_update (stats,
1063                             gettext_noop ("# GET RANDOM requests received"),
1064                             1,
1065                             GNUNET_NO);
1066   GNUNET_SERVER_client_keep (client);
1067   plugin->api->iter_migration_order (plugin->api->cls,
1068                                      0,
1069                                      &transmit_item,
1070                                      client);  
1071 }
1072
1073
1074 /**
1075  * Context for the 'remove_callback'.
1076  */
1077 struct RemoveContext 
1078 {
1079   /**
1080    * Client for whom we're doing the remvoing.
1081    */
1082   struct GNUNET_SERVER_Client *client;
1083
1084   /**
1085    * GNUNET_YES if we managed to remove something.
1086    */
1087   int found;
1088 };
1089
1090
1091 /**
1092  * Callback function that will cause the item that is passed
1093  * in to be deleted (by returning GNUNET_NO).
1094  */
1095 static int
1096 remove_callback (void *cls,
1097                  void *next_cls,
1098                  const GNUNET_HashCode * key,
1099                  uint32_t size,
1100                  const void *data,
1101                  enum GNUNET_BLOCK_Type type,
1102                  uint32_t priority,
1103                  uint32_t anonymity,
1104                  struct GNUNET_TIME_Absolute
1105                  expiration, uint64_t uid)
1106 {
1107   struct RemoveContext *rc = cls;
1108
1109   if (key == NULL)
1110     {
1111 #if DEBUG_DATASTORE
1112       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113                   "No further matches for `%s' request.\n",
1114                   "REMOVE");
1115 #endif  
1116       if (GNUNET_YES == rc->found)
1117         transmit_status (rc->client, GNUNET_OK, NULL);       
1118       else
1119         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1120       GNUNET_SERVER_client_drop (rc->client);
1121       GNUNET_free (rc);
1122       return GNUNET_OK; /* last item */
1123     }
1124   rc->found = GNUNET_YES;
1125 #if DEBUG_DATASTORE
1126   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1127               "Item %llu matches `%s' request for key `%s'.\n",
1128               (unsigned long long) uid,
1129               "REMOVE",
1130               GNUNET_h2s (key));
1131 #endif  
1132   GNUNET_STATISTICS_update (stats,
1133                             gettext_noop ("# bytes removed (explicit request)"),
1134                             size,
1135                             GNUNET_YES);
1136   GNUNET_CONTAINER_bloomfilter_remove (filter,
1137                                        key);
1138   plugin->api->next_request (next_cls, GNUNET_YES);
1139   return GNUNET_NO;
1140 }
1141
1142
1143 /**
1144  * Handle REMOVE-message.
1145  *
1146  * @param cls closure
1147  * @param client identification of the client
1148  * @param message the actual message
1149  */
1150 static void
1151 handle_remove (void *cls,
1152              struct GNUNET_SERVER_Client *client,
1153              const struct GNUNET_MessageHeader *message)
1154 {
1155   const struct DataMessage *dm = check_data (message);
1156   GNUNET_HashCode vhash;
1157   struct RemoveContext *rc;
1158
1159   if (dm == NULL)
1160     {
1161       GNUNET_break (0);
1162       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1163       return;
1164     }
1165 #if DEBUG_DATASTORE
1166   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1167               "Processing `%s' request for `%s'\n",
1168               "REMOVE",
1169               GNUNET_h2s (&dm->key));
1170 #endif
1171   GNUNET_STATISTICS_update (stats,
1172                             gettext_noop ("# REMOVE requests received"),
1173                             1,
1174                             GNUNET_NO);
1175   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1176   GNUNET_SERVER_client_keep (client);
1177   rc->client = client;
1178   GNUNET_CRYPTO_hash (&dm[1],
1179                       ntohl(dm->size),
1180                       &vhash);
1181   plugin->api->get (plugin->api->cls,
1182                     &dm->key,
1183                     &vhash,
1184                     ntohl(dm->type),
1185                     &remove_callback,
1186                     rc);
1187 }
1188
1189
1190 /**
1191  * Handle DROP-message.
1192  *
1193  * @param cls closure
1194  * @param client identification of the client
1195  * @param message the actual message
1196  */
1197 static void
1198 handle_drop (void *cls,
1199              struct GNUNET_SERVER_Client *client,
1200              const struct GNUNET_MessageHeader *message)
1201 {
1202 #if DEBUG_DATASTORE
1203   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1204               "Processing `%s' request\n",
1205               "DROP");
1206 #endif
1207   plugin->api->drop (plugin->api->cls);
1208   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1209 }
1210
1211
1212 /**
1213  * List of handlers for the messages understood by this
1214  * service.
1215  */
1216 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1217   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1218    sizeof(struct ReserveMessage) }, 
1219   {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1220    sizeof(struct ReleaseReserveMessage) }, 
1221   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1222   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1223    sizeof (struct UpdateMessage) }, 
1224   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1225   {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1226    sizeof(struct GNUNET_MessageHeader) }, 
1227   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1228   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1229    sizeof(struct GNUNET_MessageHeader) }, 
1230   {NULL, NULL, 0, 0}
1231 };
1232
1233
1234
1235 /**
1236  * Load the datastore plugin.
1237  */
1238 static struct DatastorePlugin *
1239 load_plugin () 
1240 {
1241   struct DatastorePlugin *ret;
1242   char *libname;
1243   char *name;
1244
1245   if (GNUNET_OK !=
1246       GNUNET_CONFIGURATION_get_value_string (cfg,
1247                                              "DATASTORE", "DATABASE", &name))
1248     {
1249       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1250                   _("No `%s' specified for `%s' in configuration!\n"),
1251                   "DATABASE",
1252                   "DATASTORE");
1253       return NULL;
1254     }
1255   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1256   ret->env.cfg = cfg;
1257   ret->env.sched = sched;  
1258   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1259               _("Loading `%s' datastore plugin\n"), name);
1260   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1261   ret->short_name = name;
1262   ret->lib_name = libname;
1263   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1264   if (ret->api == NULL)
1265     {
1266       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1267                   _("Failed to load datastore plugin for `%s'\n"), name);
1268       GNUNET_free (ret->short_name);
1269       GNUNET_free (libname);
1270       GNUNET_free (ret);
1271       return NULL;
1272     }
1273   return ret;
1274 }
1275
1276
1277 /**
1278  * Function called when the service shuts
1279  * down.  Unloads our datastore plugin.
1280  *
1281  * @param plug plugin to unload
1282  */
1283 static void
1284 unload_plugin (struct DatastorePlugin *plug)
1285 {
1286 #if DEBUG_DATASTORE
1287   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1288               "Datastore service is unloading plugin...\n");
1289 #endif
1290   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1291   GNUNET_free (plug->lib_name);
1292   GNUNET_free (plug->short_name);
1293   GNUNET_free (plug);
1294 }
1295
1296
1297 /**
1298  * Final task run after shutdown.  Unloads plugins and disconnects us from
1299  * statistics.
1300  */
1301 static void
1302 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1303 {
1304   unload_plugin (plugin);
1305   plugin = NULL;
1306   if (filter != NULL)
1307     {
1308       GNUNET_CONTAINER_bloomfilter_free (filter);
1309       filter = NULL;
1310     }
1311   if (stats != NULL)
1312     {
1313       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1314       stats = NULL;
1315     }
1316 }
1317
1318
1319 /**
1320  * Last task run during shutdown.  Disconnects us from
1321  * the transport and core.
1322  */
1323 static void
1324 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1325 {
1326   struct TransmitCallbackContext *tcc;
1327
1328   cleaning_done = GNUNET_YES;
1329   while (NULL != (tcc = tcc_head))
1330     {
1331       GNUNET_CONTAINER_DLL_remove (tcc_head,
1332                                    tcc_tail,
1333                                    tcc);
1334       if (tcc->th != NULL)
1335         {
1336           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1337           GNUNET_SERVER_client_drop (tcc->client);
1338         }
1339       if (NULL != tcc->tc)
1340         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1341       GNUNET_free (tcc->msg);
1342       GNUNET_free (tcc);
1343     }
1344   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1345     {
1346       GNUNET_SCHEDULER_cancel (sched,
1347                                expired_kill_task);
1348       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1349     }
1350   GNUNET_SCHEDULER_add_continuation (sched,
1351                                      &unload_task,
1352                                      NULL,
1353                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1354 }
1355
1356
1357 /**
1358  * Function that removes all active reservations made
1359  * by the given client and releases the space for other
1360  * requests.
1361  *
1362  * @param cls closure
1363  * @param client identification of the client
1364  */
1365 static void
1366 cleanup_reservations (void *cls,
1367                       struct GNUNET_SERVER_Client
1368                       * client)
1369 {
1370   struct ReservationList *pos;
1371   struct ReservationList *prev;
1372   struct ReservationList *next;
1373
1374   if (client == NULL)
1375     return;
1376   prev = NULL;
1377   pos = reservations;
1378   while (NULL != pos)
1379     {
1380       next = pos->next;
1381       if (pos->client == client)
1382         {
1383           if (prev == NULL)
1384             reservations = next;
1385           else
1386             prev->next = next;
1387           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1388           GNUNET_free (pos);
1389         }
1390       else
1391         {
1392           prev = pos;
1393         }
1394       pos = next;
1395     }
1396 }
1397
1398
1399 /**
1400  * Process datastore requests.
1401  *
1402  * @param cls closure
1403  * @param s scheduler to use
1404  * @param server the initialized server
1405  * @param c configuration to use
1406  */
1407 static void
1408 run (void *cls,
1409      struct GNUNET_SCHEDULER_Handle *s,
1410      struct GNUNET_SERVER_Handle *server,
1411      const struct GNUNET_CONFIGURATION_Handle *c)
1412 {
1413   char *fn;
1414   unsigned int bf_size;
1415
1416   sched = s;
1417   cfg = c;
1418   if (GNUNET_OK !=
1419       GNUNET_CONFIGURATION_get_value_number (cfg,
1420                                              "DATASTORE", "QUOTA", &quota))
1421     {
1422       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1423                   _("No `%s' specified for `%s' in configuration!\n"),
1424                   "QUOTA",
1425                   "DATASTORE");
1426       return;
1427     }
1428   stats = GNUNET_STATISTICS_create (sched, "datastore", cfg);
1429   cache_size = quota / 8; /* Or should we make this an option? */
1430   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1431   fn = NULL;
1432   if ( (GNUNET_OK !=
1433         GNUNET_CONFIGURATION_get_value_filename (cfg,
1434                                                  "DATASTORE",
1435                                                  "BLOOMFILTER",
1436                                                  &fn)) ||
1437        (GNUNET_OK !=
1438         GNUNET_DISK_directory_create_for_file (fn)) )
1439     {
1440       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1441                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1442                   fn != NULL ? fn : "");
1443       GNUNET_free_non_null (fn);
1444       fn = NULL;
1445     }
1446   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1447   GNUNET_free_non_null (fn);
1448   if (filter == NULL)
1449     {
1450       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1451                   _("Failed to initialize bloomfilter.\n"));
1452       if (stats != NULL)
1453         {
1454           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1455           stats = NULL;
1456         }
1457       return;
1458     }
1459   plugin = load_plugin ();
1460   if (NULL == plugin)
1461     {
1462       GNUNET_CONTAINER_bloomfilter_free (filter);
1463       filter = NULL;
1464       if (stats != NULL)
1465         {
1466           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1467           stats = NULL;
1468         }
1469       return;
1470     }
1471   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1472   GNUNET_SERVER_add_handlers (server, handlers);
1473   expired_kill_task
1474     = GNUNET_SCHEDULER_add_with_priority (sched,
1475                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1476                                           &delete_expired, NULL);
1477   GNUNET_SCHEDULER_add_delayed (sched,
1478                                 GNUNET_TIME_UNIT_FOREVER_REL,
1479                                 &cleaning_task, NULL);
1480   
1481 }
1482
1483
1484 /**
1485  * The main function for the datastore service.
1486  *
1487  * @param argc number of arguments from the command line
1488  * @param argv command line arguments
1489  * @return 0 ok, 1 on error
1490  */
1491 int
1492 main (int argc, char *const *argv)
1493 {
1494   int ret;
1495
1496   ret = (GNUNET_OK ==
1497          GNUNET_SERVICE_run (argc,
1498                              argv,
1499                              "datastore",
1500                              GNUNET_SERVICE_OPTION_NONE,
1501                              &run, NULL)) ? 0 : 1;
1502   return ret;
1503 }
1504
1505
1506 /* end of gnunet-service-datastore.c */