fixing reconnect issues
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "plugin_datastore.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45
46
47 /**
48  * Our datastore plugin.
49  */
50 struct DatastorePlugin
51 {
52
53   /**
54    * API of the transport as returned by the plugin's
55    * initialization function.
56    */
57   struct GNUNET_DATASTORE_PluginFunctions *api;
58
59   /**
60    * Short name for the plugin (i.e. "sqlite").
61    */
62   char *short_name;
63
64   /**
65    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
66    */
67   char *lib_name;
68
69   /**
70    * Environment this transport service is using
71    * for this plugin.
72    */
73   struct GNUNET_DATASTORE_PluginEnvironment env;
74
75 };
76
77
78 /**
79  * Linked list of active reservations.
80  */
81 struct ReservationList 
82 {
83
84   /**
85    * This is a linked list.
86    */
87   struct ReservationList *next;
88
89   /**
90    * Client that made the reservation.
91    */
92   struct GNUNET_SERVER_Client *client;
93
94   /**
95    * Number of bytes (still) reserved.
96    */
97   uint64_t amount;
98
99   /**
100    * Number of items (still) reserved.
101    */
102   uint64_t entries;
103
104   /**
105    * Reservation identifier.
106    */
107   int32_t rid;
108
109 };
110
111
112 /**
113  * Our datastore plugin (NULL if not available).
114  */
115 static struct DatastorePlugin *plugin;
116
117 /**
118  * Linked list of space reservations made by clients.
119  */
120 static struct ReservationList *reservations;
121
122 /**
123  * Bloomfilter to quickly tell if we don't have the content.
124  */
125 static struct GNUNET_CONTAINER_BloomFilter *filter;
126
127 /**
128  * Static counter to produce reservation identifiers.
129  */
130 static int reservation_gen;
131
132 /**
133  * How much space are we allowed to use?
134  */
135 static unsigned long long quota;
136
137 /**
138  * How much space are we using for the cache?  (space available for
139  * insertions that will be instantly reclaimed by discarding less
140  * important content --- or possibly whatever we just inserted into
141  * the "cache").
142  */
143 static unsigned long long cache_size;
144
145 /**
146  * How much space have we currently reserved?
147  */
148 static unsigned long long reserved;
149
150 /**
151  * Identity of the task that is used to delete
152  * expired content.
153  */
154 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
155
156 /**
157  * Our configuration.
158  */
159 const struct GNUNET_CONFIGURATION_Handle *cfg;
160
161 /**
162  * Our scheduler.
163  */
164 struct GNUNET_SCHEDULER_Handle *sched; 
165
166 /**
167  * Handle for reporting statistics.
168  */
169 static struct GNUNET_STATISTICS_Handle *stats;
170
171
172 /**
173  * Function called once the transmit operation has
174  * either failed or succeeded.
175  *
176  * @param cls closure
177  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
178  */
179 typedef void (*TransmitContinuation)(void *cls,
180                                      int status);
181
182
183 /**
184  * Context for transmitting replies to clients.
185  */
186 struct TransmitCallbackContext 
187 {
188   
189   /**
190    * We keep these in a doubly-linked list (for cleanup).
191    */
192   struct TransmitCallbackContext *next;
193   
194   /**
195    * We keep these in a doubly-linked list (for cleanup).
196    */
197   struct TransmitCallbackContext *prev;
198   
199   /**
200    * The message that we're asked to transmit.
201    */
202   struct GNUNET_MessageHeader *msg;
203   
204   /**
205    * Handle for the transmission request.
206    */
207   struct GNUNET_CONNECTION_TransmitHandle *th;
208
209   /**
210    * Client that we are transmitting to.
211    */
212   struct GNUNET_SERVER_Client *client;
213
214   /**
215    * Function to call once msg has been transmitted
216    * (or at least added to the buffer).
217    */
218   TransmitContinuation tc;
219
220   /**
221    * Closure for tc.
222    */
223   void *tc_cls;
224
225   /**
226    * GNUNET_YES if we are supposed to signal the server
227    * completion of the client's request.
228    */
229   int end;
230 };
231
232   
233 /**
234  * Head of the doubly-linked list (for cleanup).
235  */
236 static struct TransmitCallbackContext *tcc_head;
237
238 /**
239  * Tail of the doubly-linked list (for cleanup).
240  */
241 static struct TransmitCallbackContext *tcc_tail;
242
243 /**
244  * Have we already cleaned up the TCCs and are hence no longer
245  * willing (or able) to transmit anything to anyone?
246  */
247 static int cleaning_done;
248
249 /**
250  * Task that is used to remove expired entries from
251  * the datastore.  This task will schedule itself
252  * again automatically to always delete all expired
253  * content quickly.
254  *
255  * @param cls not used
256  * @param tc task context
257  */ 
258 static void
259 delete_expired (void *cls,
260                 const struct GNUNET_SCHEDULER_TaskContext *tc);
261
262
263 /**
264  * Iterate over the expired items stored in the datastore.
265  * Delete all expired items; once we have processed all
266  * expired items, re-schedule the "delete_expired" task.
267  *
268  * @param cls not used
269  * @param next_cls closure to pass to the "next" function.
270  * @param key key for the content
271  * @param size number of bytes in data
272  * @param data content stored
273  * @param type type of the content
274  * @param priority priority of the content
275  * @param anonymity anonymity-level for the content
276  * @param expiration expiration time for the content
277  * @param uid unique identifier for the datum;
278  *        maybe 0 if no unique identifier is available
279  *
280  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
281  *         (continue on call to "next", of course),
282  *         GNUNET_NO to delete the item and continue (if supported)
283  */
284 static int 
285 expired_processor (void *cls,
286                    void *next_cls,
287                    const GNUNET_HashCode * key,
288                    uint32_t size,
289                    const void *data,
290                    enum GNUNET_BLOCK_Type type,
291                    uint32_t priority,
292                    uint32_t anonymity,
293                    struct GNUNET_TIME_Absolute
294                    expiration, 
295                    uint64_t uid)
296 {
297   struct GNUNET_TIME_Absolute now;
298
299   if (key == NULL) 
300     {
301       expired_kill_task 
302         = GNUNET_SCHEDULER_add_delayed (sched,
303                                         MAX_EXPIRE_DELAY,
304                                         &delete_expired,
305                                         NULL);
306       return GNUNET_SYSERR;
307     }
308   now = GNUNET_TIME_absolute_get ();
309   if (expiration.value > now.value)
310     {
311       /* finished processing */
312       plugin->api->next_request (next_cls, GNUNET_YES);
313       return GNUNET_SYSERR;
314     }
315   plugin->api->next_request (next_cls, GNUNET_NO);
316 #if DEBUG_DATASTORE
317   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
318               "Deleting content that expired %llu ms ago\n",
319               (unsigned long long) (now.value - expiration.value));
320 #endif
321   GNUNET_STATISTICS_update (stats,
322                             gettext_noop ("# bytes expired"),
323                             size,
324                             GNUNET_YES);
325   GNUNET_CONTAINER_bloomfilter_remove (filter,
326                                        key);
327   return GNUNET_NO; /* delete */
328 }
329
330
331 /**
332  * Task that is used to remove expired entries from
333  * the datastore.  This task will schedule itself
334  * again automatically to always delete all expired
335  * content quickly.
336  *
337  * @param cls not used
338  * @param tc task context
339  */ 
340 static void
341 delete_expired (void *cls,
342                 const struct GNUNET_SCHEDULER_TaskContext *tc)
343 {
344   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
345   plugin->api->iter_ascending_expiration (plugin->api->cls, 
346                                           0,
347                                           &expired_processor,
348                                           NULL);
349 }
350
351
352 /**
353  * An iterator over a set of items stored in the datastore.
354  *
355  * @param cls closure
356  * @param next_cls closure to pass to the "next" function.
357  * @param key key for the content
358  * @param size number of bytes in data
359  * @param data content stored
360  * @param type type of the content
361  * @param priority priority of the content
362  * @param anonymity anonymity-level for the content
363  * @param expiration expiration time for the content
364  * @param uid unique identifier for the datum;
365  *        maybe 0 if no unique identifier is available
366  *
367  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
368  *         (continue on call to "next", of course),
369  *         GNUNET_NO to delete the item and continue (if supported)
370  */
371 static int 
372 manage (void *cls,
373         void *next_cls,
374         const GNUNET_HashCode * key,
375         uint32_t size,
376         const void *data,
377         enum GNUNET_BLOCK_Type type,
378         uint32_t priority,
379         uint32_t anonymity,
380         struct GNUNET_TIME_Absolute
381         expiration, 
382         uint64_t uid)
383 {
384   unsigned long long *need = cls;
385
386   if (NULL == key)
387     {
388       GNUNET_free (need);
389       return GNUNET_SYSERR;
390     }
391   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
392     *need = 0;
393   else
394     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
395   plugin->api->next_request (next_cls, 
396                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
397 #if DEBUG_DATASTORE
398   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399               "Deleting %llu bytes of low-priority content (still trying to free another %llu bytes)\n",
400               size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
401               *need);
402 #endif
403   GNUNET_STATISTICS_update (stats,
404                             gettext_noop ("# bytes purged (low-priority)"),
405                             size,
406                             GNUNET_YES);
407   GNUNET_CONTAINER_bloomfilter_remove (filter,
408                                        key);
409   return GNUNET_NO;
410 }
411
412
413 /**
414  * Manage available disk space by running tasks
415  * that will discard content if necessary.  This
416  * function will be run whenever a request for
417  * "need" bytes of storage could only be satisfied
418  * by eating into the "cache" (and we want our cache
419  * space back).
420  *
421  * @param need number of bytes of content that were
422  *        placed into the "cache" (and hence the
423  *        number of bytes that should be removed).
424  */
425 static void
426 manage_space (unsigned long long need)
427 {
428   unsigned long long *n;
429
430 #if DEBUG_DATASTORE
431   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
432               "Asked to free up %llu bytes of cache space\n",
433               need);
434 #endif
435   n = GNUNET_malloc (sizeof(unsigned long long));
436   *n = need;
437   plugin->api->iter_low_priority (plugin->api->cls,
438                                   0,
439                                   &manage,
440                                   n);
441 }
442
443
444 /**
445  * Function called to notify a client about the socket
446  * begin ready to queue more data.  "buf" will be
447  * NULL and "size" zero if the socket was closed for
448  * writing in the meantime.
449  *
450  * @param cls closure
451  * @param size number of bytes available in buf
452  * @param buf where the callee should write the message
453  * @return number of bytes written to buf
454  */
455 static size_t
456 transmit_callback (void *cls,
457                    size_t size, void *buf)
458 {
459   struct TransmitCallbackContext *tcc = cls;
460   size_t msize;
461   
462   tcc->th = NULL;
463   GNUNET_CONTAINER_DLL_remove (tcc_head,
464                                tcc_tail,
465                                tcc);
466   msize = ntohs(tcc->msg->size);
467   if (size == 0)
468     {
469 #if DEBUG_DATASTORE
470       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
471                   "Transmission failed.\n");
472 #endif
473       if (tcc->tc != NULL)
474         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
475       if (GNUNET_YES == tcc->end)
476         GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
477       GNUNET_SERVER_client_drop (tcc->client);
478       GNUNET_free (tcc->msg);
479       GNUNET_free (tcc);
480       return 0;
481     }
482   GNUNET_assert (size >= msize);
483   memcpy (buf, tcc->msg, msize);
484   if (tcc->tc != NULL)
485     tcc->tc (tcc->tc_cls, GNUNET_OK);
486   if (GNUNET_YES == tcc->end)
487     {
488       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
489     }
490   else
491     {
492 #if DEBUG_DATASTORE
493       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
494                   "Response transmitted, more pending!\n");
495 #endif
496     }
497   GNUNET_SERVER_client_drop (tcc->client);
498   GNUNET_free (tcc->msg);
499   GNUNET_free (tcc);
500   return msize;
501 }
502
503
504 /**
505  * Transmit the given message to the client.
506  *
507  * @param client target of the message
508  * @param msg message to transmit, will be freed!
509  * @param tc function to call afterwards
510  * @param tc_cls closure for tc
511  * @param end is this the last response (and we should
512  *        signal the server completion accodingly after
513  *        transmitting this message)?
514  */
515 static void
516 transmit (struct GNUNET_SERVER_Client *client,
517           struct GNUNET_MessageHeader *msg,
518           TransmitContinuation tc,
519           void *tc_cls,
520           int end)
521 {
522   struct TransmitCallbackContext *tcc;
523
524   if (GNUNET_YES == cleaning_done)
525     {
526 #if DEBUG_DATASTORE
527       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
528                   "Shutdown in progress, aborting transmission.\n");
529 #endif
530      if (NULL != tc)
531         tc (tc_cls, GNUNET_SYSERR);
532       return;
533     }
534   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
535   tcc->msg = msg;
536   tcc->client = client;
537   tcc->tc = tc;
538   tcc->tc_cls = tc_cls;
539   tcc->end = end;
540   if (NULL ==
541       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
542                                                       ntohs(msg->size),
543                                                       GNUNET_TIME_UNIT_FOREVER_REL,
544                                                       &transmit_callback,
545                                                       tcc)))
546     {
547       GNUNET_break (0);
548       if (GNUNET_YES == end)
549         {
550 #if DEBUG_DATASTORE
551           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
552                       "Disconnecting client.\n");
553 #endif    
554           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
555         }
556       if (NULL != tc)
557         tc (tc_cls, GNUNET_SYSERR);
558       GNUNET_free (msg);
559       GNUNET_free (tcc);
560       return;
561     }
562   GNUNET_SERVER_client_keep (client);
563   GNUNET_CONTAINER_DLL_insert (tcc_head,
564                                tcc_tail,
565                                tcc);
566 }
567
568
569 /**
570  * Transmit a status code to the client.
571  *
572  * @param client receiver of the response
573  * @param code status code
574  * @param msg optional error message (can be NULL)
575  */
576 static void
577 transmit_status (struct GNUNET_SERVER_Client *client,
578                  int code,
579                  const char *msg)
580 {
581   struct StatusMessage *sm;
582   size_t slen;
583
584 #if DEBUG_DATASTORE
585   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586               "Transmitting `%s' message with value %d and message `%s'\n",
587               "STATUS",
588               code,
589               msg != NULL ? msg : "(none)");
590 #endif
591   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
592   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
593   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
594   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
595   sm->status = htonl(code);
596   if (slen > 0)
597     memcpy (&sm[1], msg, slen);  
598   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
599 }
600
601
602 /**
603  * Function called once the transmit operation has
604  * either failed or succeeded.
605  *
606  * @param next_cls closure for calling "next_request" callback
607  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
608  */
609 static void 
610 get_next(void *next_cls,
611          int status)
612 {
613   if (status != GNUNET_OK)
614     {
615       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
616                   _("Failed to transmit an item to the client; aborting iteration.\n"));
617       if (plugin != NULL)
618         plugin->api->next_request (next_cls, GNUNET_YES);
619       return;
620     }
621   plugin->api->next_request (next_cls, GNUNET_NO);
622 }
623
624
625 /**
626  * Function that will transmit the given datastore entry
627  * to the client.
628  *
629  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
630  * @param next_cls closure to use to ask for the next item
631  * @param key key for the content
632  * @param size number of bytes in data
633  * @param data content stored
634  * @param type type of the content
635  * @param priority priority of the content
636  * @param anonymity anonymity-level for the content
637  * @param expiration expiration time for the content
638  * @param uid unique identifier for the datum;
639  *        maybe 0 if no unique identifier is available
640  *
641  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
642  *         GNUNET_NO to delete the item and continue (if supported)
643  */
644 static int
645 transmit_item (void *cls,
646                void *next_cls,
647                const GNUNET_HashCode * key,
648                uint32_t size,
649                const void *data,
650                enum GNUNET_BLOCK_Type type,
651                uint32_t priority,
652                uint32_t anonymity,
653                struct GNUNET_TIME_Absolute
654                expiration, uint64_t uid)
655 {
656   struct GNUNET_SERVER_Client *client = cls;
657   struct GNUNET_MessageHeader *end;
658   struct DataMessage *dm;
659
660   if (key == NULL)
661     {
662       /* transmit 'DATA_END' */
663 #if DEBUG_DATASTORE
664       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
665                   "Transmitting `%s' message\n",
666                   "DATA_END");
667 #endif
668       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
669       end->size = htons(sizeof(struct GNUNET_MessageHeader));
670       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
671       transmit (client, end, NULL, NULL, GNUNET_YES);
672       GNUNET_SERVER_client_drop (client);
673       return GNUNET_OK;
674     }
675   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
676   dm->header.size = htons(sizeof(struct DataMessage) + size);
677   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
678   dm->rid = htonl(0);
679   dm->size = htonl(size);
680   dm->type = htonl(type);
681   dm->priority = htonl(priority);
682   dm->anonymity = htonl(anonymity);
683   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
684   dm->uid = GNUNET_htonll(uid);
685   dm->key = *key;
686   memcpy (&dm[1], data, size);
687 #if DEBUG_DATASTORE
688   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
689               "Transmitting `%s' message\n",
690               "DATA");
691 #endif
692   GNUNET_STATISTICS_update (stats,
693                             gettext_noop ("# results found"),
694                             1,
695                             GNUNET_NO);
696   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
697   return GNUNET_OK;
698 }
699
700
701 /**
702  * Handle RESERVE-message.
703  *
704  * @param cls closure
705  * @param client identification of the client
706  * @param message the actual message
707  */
708 static void
709 handle_reserve (void *cls,
710                 struct GNUNET_SERVER_Client *client,
711                 const struct GNUNET_MessageHeader *message)
712 {
713   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
714   struct ReservationList *e;
715   unsigned long long used;
716   unsigned long long req;
717   uint64_t amount;
718   uint32_t entries;
719
720 #if DEBUG_DATASTORE
721   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
722               "Processing `%s' request\n",
723               "RESERVE");
724 #endif
725   amount = GNUNET_ntohll(msg->amount);
726   entries = ntohl(msg->entries);
727   used = plugin->api->get_size (plugin->api->cls) + reserved;
728   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
729   if (used + req > quota)
730     {
731       if (quota < used)
732         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
733       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
734                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
735                   quota - used,
736                   "RESERVE",
737                   req);
738       if (cache_size < req)
739         {
740           /* TODO: document this in the FAQ; essentially, if this
741              message happens, the insertion request could be blocked
742              by less-important content from migration because it is
743              larger than 1/8th of the overall available space, and
744              we only reserve 1/8th for "fresh" insertions */
745           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
746                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
747                       req,
748                       cache_size);
749           transmit_status (client, 0, 
750                            gettext_noop ("Insufficient space to satisfy request and "
751                                          "requested amount is larger than cache size"));
752         }
753       else
754         {
755           transmit_status (client, 0, 
756                            gettext_noop ("Insufficient space to satisfy request"));
757         }
758       return;      
759     }
760   reserved += req;
761   e = GNUNET_malloc (sizeof(struct ReservationList));
762   e->next = reservations;
763   reservations = e;
764   e->client = client;
765   e->amount = amount;
766   e->entries = entries;
767   e->rid = ++reservation_gen;
768   if (reservation_gen < 0)
769     reservation_gen = 0; /* wrap around */
770   transmit_status (client, e->rid, NULL);
771 }
772
773
774 /**
775  * Handle RELEASE_RESERVE-message.
776  *
777  * @param cls closure
778  * @param client identification of the client
779  * @param message the actual message
780  */
781 static void
782 handle_release_reserve (void *cls,
783                         struct GNUNET_SERVER_Client *client,
784                         const struct GNUNET_MessageHeader *message)
785 {
786   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
787   struct ReservationList *pos;
788   struct ReservationList *prev;
789   struct ReservationList *next;
790   int rid = ntohl(msg->rid);
791   unsigned long long rem;
792
793 #if DEBUG_DATASTORE
794   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
795               "Processing `%s' request\n",
796               "RELEASE_RESERVE");
797 #endif
798   next = reservations;
799   prev = NULL;
800   while (NULL != (pos = next))
801     {
802       next = pos->next;
803       if (rid == pos->rid)
804         {
805           if (prev == NULL)
806             reservations = next;
807           else
808             prev->next = next;
809           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
810           GNUNET_assert (reserved >= rem);
811           reserved -= rem;
812 #if DEBUG_DATASTORE
813           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
814                       "Returning %llu remaining reserved bytes to storage pool\n",
815                       rem);
816 #endif    
817           GNUNET_free (pos);
818           transmit_status (client, GNUNET_OK, NULL);
819           return;
820         }       
821       prev = pos;
822     }
823   GNUNET_break (0);
824   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
825 }
826
827
828 /**
829  * Check that the given message is a valid data message.
830  *
831  * @return NULL if the message is not well-formed, otherwise the message
832  */
833 static const struct DataMessage *
834 check_data (const struct GNUNET_MessageHeader *message)
835 {
836   uint16_t size;
837   uint32_t dsize;
838   const struct DataMessage *dm;
839
840   size = ntohs(message->size);
841   if (size < sizeof(struct DataMessage))
842     { 
843       GNUNET_break (0);
844       return NULL;
845     }
846   dm = (const struct DataMessage *) message;
847   dsize = ntohl(dm->size);
848   if (size != dsize + sizeof(struct DataMessage))
849     {
850       GNUNET_break (0);
851       return NULL;
852     }
853   return dm;
854 }
855
856
857 /**
858  * Handle PUT-message.
859  *
860  * @param cls closure
861  * @param client identification of the client
862  * @param message the actual message
863  */
864 static void
865 handle_put (void *cls,
866             struct GNUNET_SERVER_Client *client,
867             const struct GNUNET_MessageHeader *message)
868 {
869   const struct DataMessage *dm = check_data (message);
870   char *msg;
871   int ret;
872   int rid;
873   struct ReservationList *pos;
874   uint32_t size;
875
876   if ( (dm == NULL) ||
877        (ntohl(dm->type) == 0) ) 
878     {
879       GNUNET_break (0);
880       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
881       return;
882     }
883 #if DEBUG_DATASTORE
884   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885               "Processing `%s' request for `%s'\n",
886               "PUT",
887               GNUNET_h2s (&dm->key));
888 #endif
889   rid = ntohl(dm->rid);
890   size = ntohl(dm->size);
891   if (rid > 0)
892     {
893       pos = reservations;
894       while ( (NULL != pos) &&
895               (rid != pos->rid) )
896         pos = pos->next;
897       GNUNET_break (pos != NULL);
898       if (NULL != pos)
899         {
900           GNUNET_break (pos->entries > 0);
901           GNUNET_break (pos->amount > size);
902           pos->entries--;
903           pos->amount -= size;
904           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
905         }
906     }
907   msg = NULL;
908   ret = plugin->api->put (plugin->api->cls,
909                           &dm->key,
910                           size,
911                           &dm[1],
912                           ntohl(dm->type),
913                           ntohl(dm->priority),
914                           ntohl(dm->anonymity),
915                           GNUNET_TIME_absolute_ntoh(dm->expiration),
916                           &msg);
917   if (GNUNET_OK == ret)
918     {
919       GNUNET_STATISTICS_update (stats,
920                                 gettext_noop ("# bytes stored"),
921                                 size,
922                                 GNUNET_YES);
923       GNUNET_CONTAINER_bloomfilter_add (filter,
924                                         &dm->key);
925 #if DEBUG_DATASTORE
926       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
927                   "Successfully stored %u bytes under key `%s'\n",
928                   size,
929                   GNUNET_h2s (&dm->key));
930 #endif
931     }
932   transmit_status (client, 
933                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
934                    msg);
935   GNUNET_free_non_null (msg);
936   if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
937     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
938 }
939
940
941 /**
942  * Handle GET-message.
943  *
944  * @param cls closure
945  * @param client identification of the client
946  * @param message the actual message
947  */
948 static void
949 handle_get (void *cls,
950             struct GNUNET_SERVER_Client *client,
951             const struct GNUNET_MessageHeader *message)
952 {
953   const struct GetMessage *msg;
954   uint16_t size;
955
956   size = ntohs(message->size);
957   if ( (size != sizeof(struct GetMessage)) &&
958        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
959     {
960       GNUNET_break (0);
961       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
962       return;
963     }
964   msg = (const struct GetMessage*) message;
965 #if DEBUG_DATASTORE
966   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
967               "Processing `%s' request for `%s' of type %u\n",
968               "GET",
969               GNUNET_h2s (&msg->key),
970               ntohl (msg->type));
971 #endif
972   GNUNET_STATISTICS_update (stats,
973                             gettext_noop ("# GET requests received"),
974                             1,
975                             GNUNET_NO);
976   GNUNET_SERVER_client_keep (client);
977   if ( (size == sizeof(struct GetMessage)) &&
978        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
979                                                          &msg->key)) )
980     {
981       /* don't bother database... */
982 #if DEBUG_DATASTORE
983       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
984                   "Empty result set for `%s' request for `%s'.\n",
985                   "GET",
986                   GNUNET_h2s (&msg->key));
987 #endif  
988       GNUNET_STATISTICS_update (stats,
989                                 gettext_noop ("# requests filtered by bloomfilter"),
990                                 1,
991                                 GNUNET_NO);
992       transmit_item (client,
993                      NULL, NULL, 0, NULL, 0, 0, 0, 
994                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
995       return;
996     }
997   plugin->api->get (plugin->api->cls,
998                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
999                     NULL,
1000                     ntohl(msg->type),
1001                     &transmit_item,
1002                     client);    
1003 }
1004
1005
1006 /**
1007  * Handle UPDATE-message.
1008  *
1009  * @param cls closure
1010  * @param client identification of the client
1011  * @param message the actual message
1012  */
1013 static void
1014 handle_update (void *cls,
1015                struct GNUNET_SERVER_Client *client,
1016                const struct GNUNET_MessageHeader *message)
1017 {
1018   const struct UpdateMessage *msg;
1019   int ret;
1020   char *emsg;
1021
1022   GNUNET_STATISTICS_update (stats,
1023                             gettext_noop ("# UPDATE requests received"),
1024                             1,
1025                             GNUNET_NO);
1026   msg = (const struct UpdateMessage*) message;
1027   emsg = NULL;
1028 #if DEBUG_DATASTORE
1029   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030               "Processing `%s' request for %llu\n",
1031               "UPDATE",
1032               (unsigned long long) GNUNET_ntohll (msg->uid));
1033 #endif
1034   ret = plugin->api->update (plugin->api->cls,
1035                              GNUNET_ntohll(msg->uid),
1036                              (int32_t) ntohl(msg->priority),
1037                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1038                              &emsg);
1039   transmit_status (client, ret, emsg);
1040   GNUNET_free_non_null (emsg);
1041 }
1042
1043
1044 /**
1045  * Handle GET_RANDOM-message.
1046  *
1047  * @param cls closure
1048  * @param client identification of the client
1049  * @param message the actual message
1050  */
1051 static void
1052 handle_get_random (void *cls,
1053                    struct GNUNET_SERVER_Client *client,
1054                    const struct GNUNET_MessageHeader *message)
1055 {
1056 #if DEBUG_DATASTORE
1057   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1058               "Processing `%s' request\n",
1059               "GET_RANDOM");
1060 #endif
1061   GNUNET_STATISTICS_update (stats,
1062                             gettext_noop ("# GET RANDOM requests received"),
1063                             1,
1064                             GNUNET_NO);
1065   GNUNET_SERVER_client_keep (client);
1066   plugin->api->iter_migration_order (plugin->api->cls,
1067                                      0,
1068                                      &transmit_item,
1069                                      client);  
1070 }
1071
1072
1073 /**
1074  * Context for the 'remove_callback'.
1075  */
1076 struct RemoveContext 
1077 {
1078   /**
1079    * Client for whom we're doing the remvoing.
1080    */
1081   struct GNUNET_SERVER_Client *client;
1082
1083   /**
1084    * GNUNET_YES if we managed to remove something.
1085    */
1086   int found;
1087 };
1088
1089
1090 /**
1091  * Callback function that will cause the item that is passed
1092  * in to be deleted (by returning GNUNET_NO).
1093  */
1094 static int
1095 remove_callback (void *cls,
1096                  void *next_cls,
1097                  const GNUNET_HashCode * key,
1098                  uint32_t size,
1099                  const void *data,
1100                  enum GNUNET_BLOCK_Type type,
1101                  uint32_t priority,
1102                  uint32_t anonymity,
1103                  struct GNUNET_TIME_Absolute
1104                  expiration, uint64_t uid)
1105 {
1106   struct RemoveContext *rc = cls;
1107
1108   if (key == NULL)
1109     {
1110 #if DEBUG_DATASTORE
1111       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1112                   "No further matches for `%s' request.\n",
1113                   "REMOVE");
1114 #endif  
1115       if (GNUNET_YES == rc->found)
1116         transmit_status (rc->client, GNUNET_OK, NULL);       
1117       else
1118         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1119       GNUNET_SERVER_client_drop (rc->client);
1120       GNUNET_free (rc);
1121       return GNUNET_OK; /* last item */
1122     }
1123   rc->found = GNUNET_YES;
1124 #if DEBUG_DATASTORE
1125   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1126               "Item %llu matches `%s' request for key `%s'.\n",
1127               (unsigned long long) uid,
1128               "REMOVE",
1129               GNUNET_h2s (key));
1130 #endif  
1131   GNUNET_STATISTICS_update (stats,
1132                             gettext_noop ("# bytes removed (explicit request)"),
1133                             size,
1134                             GNUNET_YES);
1135   GNUNET_CONTAINER_bloomfilter_remove (filter,
1136                                        key);
1137   plugin->api->next_request (next_cls, GNUNET_YES);
1138   return GNUNET_NO;
1139 }
1140
1141
1142 /**
1143  * Handle REMOVE-message.
1144  *
1145  * @param cls closure
1146  * @param client identification of the client
1147  * @param message the actual message
1148  */
1149 static void
1150 handle_remove (void *cls,
1151              struct GNUNET_SERVER_Client *client,
1152              const struct GNUNET_MessageHeader *message)
1153 {
1154   const struct DataMessage *dm = check_data (message);
1155   GNUNET_HashCode vhash;
1156   struct RemoveContext *rc;
1157
1158   if (dm == NULL)
1159     {
1160       GNUNET_break (0);
1161       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1162       return;
1163     }
1164 #if DEBUG_DATASTORE
1165   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1166               "Processing `%s' request for `%s'\n",
1167               "REMOVE",
1168               GNUNET_h2s (&dm->key));
1169 #endif
1170   GNUNET_STATISTICS_update (stats,
1171                             gettext_noop ("# REMOVE requests received"),
1172                             1,
1173                             GNUNET_NO);
1174   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1175   GNUNET_SERVER_client_keep (client);
1176   rc->client = client;
1177   GNUNET_CRYPTO_hash (&dm[1],
1178                       ntohl(dm->size),
1179                       &vhash);
1180   plugin->api->get (plugin->api->cls,
1181                     &dm->key,
1182                     &vhash,
1183                     ntohl(dm->type),
1184                     &remove_callback,
1185                     rc);
1186 }
1187
1188
1189 /**
1190  * Handle DROP-message.
1191  *
1192  * @param cls closure
1193  * @param client identification of the client
1194  * @param message the actual message
1195  */
1196 static void
1197 handle_drop (void *cls,
1198              struct GNUNET_SERVER_Client *client,
1199              const struct GNUNET_MessageHeader *message)
1200 {
1201 #if DEBUG_DATASTORE
1202   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1203               "Processing `%s' request\n",
1204               "DROP");
1205 #endif
1206   plugin->api->drop (plugin->api->cls);
1207   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1208 }
1209
1210
1211 /**
1212  * List of handlers for the messages understood by this
1213  * service.
1214  */
1215 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1216   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1217    sizeof(struct ReserveMessage) }, 
1218   {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1219    sizeof(struct ReleaseReserveMessage) }, 
1220   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1221   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1222    sizeof (struct UpdateMessage) }, 
1223   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1224   {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1225    sizeof(struct GNUNET_MessageHeader) }, 
1226   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1227   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1228    sizeof(struct GNUNET_MessageHeader) }, 
1229   {NULL, NULL, 0, 0}
1230 };
1231
1232
1233
1234 /**
1235  * Load the datastore plugin.
1236  */
1237 static struct DatastorePlugin *
1238 load_plugin () 
1239 {
1240   struct DatastorePlugin *ret;
1241   char *libname;
1242   char *name;
1243
1244   if (GNUNET_OK !=
1245       GNUNET_CONFIGURATION_get_value_string (cfg,
1246                                              "DATASTORE", "DATABASE", &name))
1247     {
1248       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1249                   _("No `%s' specified for `%s' in configuration!\n"),
1250                   "DATABASE",
1251                   "DATASTORE");
1252       return NULL;
1253     }
1254   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1255   ret->env.cfg = cfg;
1256   ret->env.sched = sched;  
1257   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1258               _("Loading `%s' datastore plugin\n"), name);
1259   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1260   ret->short_name = name;
1261   ret->lib_name = libname;
1262   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1263   if (ret->api == NULL)
1264     {
1265       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1266                   _("Failed to load datastore plugin for `%s'\n"), name);
1267       GNUNET_free (ret->short_name);
1268       GNUNET_free (libname);
1269       GNUNET_free (ret);
1270       return NULL;
1271     }
1272   return ret;
1273 }
1274
1275
1276 /**
1277  * Function called when the service shuts
1278  * down.  Unloads our datastore plugin.
1279  *
1280  * @param plug plugin to unload
1281  */
1282 static void
1283 unload_plugin (struct DatastorePlugin *plug)
1284 {
1285 #if DEBUG_DATASTORE
1286   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1287               "Datastore service is unloading plugin...\n");
1288 #endif
1289   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1290   GNUNET_free (plug->lib_name);
1291   GNUNET_free (plug->short_name);
1292   GNUNET_free (plug);
1293 }
1294
1295
1296 /**
1297  * Final task run after shutdown.  Unloads plugins and disconnects us from
1298  * statistics.
1299  */
1300 static void
1301 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1302 {
1303   unload_plugin (plugin);
1304   plugin = NULL;
1305   if (filter != NULL)
1306     {
1307       GNUNET_CONTAINER_bloomfilter_free (filter);
1308       filter = NULL;
1309     }
1310   if (stats != NULL)
1311     {
1312       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1313       stats = NULL;
1314     }
1315 }
1316
1317
1318 /**
1319  * Last task run during shutdown.  Disconnects us from
1320  * the transport and core.
1321  */
1322 static void
1323 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1324 {
1325   struct TransmitCallbackContext *tcc;
1326
1327   cleaning_done = GNUNET_YES;
1328   while (NULL != (tcc = tcc_head))
1329     {
1330       GNUNET_CONTAINER_DLL_remove (tcc_head,
1331                                    tcc_tail,
1332                                    tcc);
1333       if (tcc->th != NULL)
1334         {
1335           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1336           GNUNET_SERVER_client_drop (tcc->client);
1337         }
1338    if (NULL != tcc->tc)
1339         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1340       GNUNET_free (tcc->msg);
1341       GNUNET_free (tcc);
1342     }
1343   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1344     {
1345       GNUNET_SCHEDULER_cancel (sched,
1346                                expired_kill_task);
1347       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1348     }
1349   GNUNET_SCHEDULER_add_continuation (sched,
1350                                      &unload_task,
1351                                      NULL,
1352                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1353 }
1354
1355
1356 /**
1357  * Function that removes all active reservations made
1358  * by the given client and releases the space for other
1359  * requests.
1360  *
1361  * @param cls closure
1362  * @param client identification of the client
1363  */
1364 static void
1365 cleanup_reservations (void *cls,
1366                       struct GNUNET_SERVER_Client
1367                       * client)
1368 {
1369   struct ReservationList *pos;
1370   struct ReservationList *prev;
1371   struct ReservationList *next;
1372
1373   if (client == NULL)
1374     return;
1375   prev = NULL;
1376   pos = reservations;
1377   while (NULL != pos)
1378     {
1379       next = pos->next;
1380       if (pos->client == client)
1381         {
1382           if (prev == NULL)
1383             reservations = next;
1384           else
1385             prev->next = next;
1386           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1387           GNUNET_free (pos);
1388         }
1389       else
1390         {
1391           prev = pos;
1392         }
1393       pos = next;
1394     }
1395 }
1396
1397
1398 /**
1399  * Process datastore requests.
1400  *
1401  * @param cls closure
1402  * @param s scheduler to use
1403  * @param server the initialized server
1404  * @param c configuration to use
1405  */
1406 static void
1407 run (void *cls,
1408      struct GNUNET_SCHEDULER_Handle *s,
1409      struct GNUNET_SERVER_Handle *server,
1410      const struct GNUNET_CONFIGURATION_Handle *c)
1411 {
1412   char *fn;
1413   unsigned int bf_size;
1414
1415   sched = s;
1416   cfg = c;
1417   if (GNUNET_OK !=
1418       GNUNET_CONFIGURATION_get_value_number (cfg,
1419                                              "DATASTORE", "QUOTA", &quota))
1420     {
1421       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1422                   _("No `%s' specified for `%s' in configuration!\n"),
1423                   "QUOTA",
1424                   "DATASTORE");
1425       return;
1426     }
1427   stats = GNUNET_STATISTICS_create (sched, "datastore", cfg);
1428   cache_size = quota / 8; /* Or should we make this an option? */
1429   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1430   fn = NULL;
1431   if ( (GNUNET_OK !=
1432         GNUNET_CONFIGURATION_get_value_filename (cfg,
1433                                                  "DATASTORE",
1434                                                  "BLOOMFILTER",
1435                                                  &fn)) ||
1436        (GNUNET_OK !=
1437         GNUNET_DISK_directory_create_for_file (fn)) )
1438     {
1439       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1440                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1441                   fn != NULL ? fn : "");
1442       GNUNET_free_non_null (fn);
1443       fn = NULL;
1444     }
1445   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1446   GNUNET_free_non_null (fn);
1447   if (filter == NULL)
1448     {
1449       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1450                   _("Failed to initialize bloomfilter.\n"));
1451       if (stats != NULL)
1452         {
1453           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1454           stats = NULL;
1455         }
1456       return;
1457     }
1458   plugin = load_plugin ();
1459   if (NULL == plugin)
1460     {
1461       GNUNET_CONTAINER_bloomfilter_free (filter);
1462       filter = NULL;
1463       if (stats != NULL)
1464         {
1465           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1466           stats = NULL;
1467         }
1468       return;
1469     }
1470   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1471   GNUNET_SERVER_add_handlers (server, handlers);
1472   expired_kill_task
1473     = GNUNET_SCHEDULER_add_with_priority (sched,
1474                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1475                                           &delete_expired, NULL);
1476   GNUNET_SCHEDULER_add_delayed (sched,
1477                                 GNUNET_TIME_UNIT_FOREVER_REL,
1478                                 &cleaning_task, NULL);
1479   
1480 }
1481
1482
1483 /**
1484  * The main function for the datastore service.
1485  *
1486  * @param argc number of arguments from the command line
1487  * @param argv command line arguments
1488  * @return 0 ok, 1 on error
1489  */
1490 int
1491 main (int argc, char *const *argv)
1492 {
1493   int ret;
1494
1495   ret = (GNUNET_OK ==
1496          GNUNET_SERVICE_run (argc,
1497                              argv,
1498                              "datastore",
1499                              GNUNET_SERVICE_OPTION_NONE,
1500                              &run, NULL)) ? 0 : 1;
1501   return ret;
1502 }
1503
1504
1505 /* end of gnunet-service-datastore.c */