fixing mess with search update serialization and parenting
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_arm_service.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "plugin_datastore.h"
33 #include "datastore.h"
34
35 /**
36  * How many messages do we queue at most per client?
37  */
38 #define MAX_PENDING 1024
39
40 /**
41  * How long are we at most keeping "expired" content
42  * past the expiration date in the database?
43  */
44 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
45
46
47
48 /**
49  * Our datastore plugin.
50  */
51 struct DatastorePlugin
52 {
53
54   /**
55    * API of the transport as returned by the plugin's
56    * initialization function.
57    */
58   struct GNUNET_DATASTORE_PluginFunctions *api;
59
60   /**
61    * Short name for the plugin (i.e. "sqlite").
62    */
63   char *short_name;
64
65   /**
66    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
67    */
68   char *lib_name;
69
70   /**
71    * Environment this transport service is using
72    * for this plugin.
73    */
74   struct GNUNET_DATASTORE_PluginEnvironment env;
75
76 };
77
78
79 /**
80  * Linked list of active reservations.
81  */
82 struct ReservationList 
83 {
84
85   /**
86    * This is a linked list.
87    */
88   struct ReservationList *next;
89
90   /**
91    * Client that made the reservation.
92    */
93   struct GNUNET_SERVER_Client *client;
94
95   /**
96    * Number of bytes (still) reserved.
97    */
98   uint64_t amount;
99
100   /**
101    * Number of items (still) reserved.
102    */
103   uint64_t entries;
104
105   /**
106    * Reservation identifier.
107    */
108   int32_t rid;
109
110 };
111
112
113 /**
114  * Our datastore plugin (NULL if not available).
115  */
116 static struct DatastorePlugin *plugin;
117
118 /**
119  * Linked list of space reservations made by clients.
120  */
121 static struct ReservationList *reservations;
122
123 /**
124  * Bloomfilter to quickly tell if we don't have the content.
125  */
126 static struct GNUNET_CONTAINER_BloomFilter *filter;
127
128 /**
129  * Static counter to produce reservation identifiers.
130  */
131 static int reservation_gen;
132
133 /**
134  * How much space are we allowed to use?
135  */
136 static unsigned long long quota;
137
138 /**
139  * How much space are we using for the cache?  (space available for
140  * insertions that will be instantly reclaimed by discarding less
141  * important content --- or possibly whatever we just inserted into
142  * the "cache").
143  */
144 static unsigned long long cache_size;
145
146 /**
147  * How much space have we currently reserved?
148  */
149 static unsigned long long reserved;
150
151 /**
152  * Identity of the task that is used to delete
153  * expired content.
154  */
155 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
156
157 /**
158  * Our configuration.
159  */
160 const struct GNUNET_CONFIGURATION_Handle *cfg;
161
162 /**
163  * Our scheduler.
164  */
165 struct GNUNET_SCHEDULER_Handle *sched; 
166
167 /**
168  * Handle for reporting statistics.
169  */
170 static struct GNUNET_STATISTICS_Handle *stats;
171
172
173 /**
174  * Function called once the transmit operation has
175  * either failed or succeeded.
176  *
177  * @param cls closure
178  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
179  */
180 typedef void (*TransmitContinuation)(void *cls,
181                                      int status);
182
183
184 /**
185  * Context for transmitting replies to clients.
186  */
187 struct TransmitCallbackContext 
188 {
189   
190   /**
191    * We keep these in a doubly-linked list (for cleanup).
192    */
193   struct TransmitCallbackContext *next;
194   
195   /**
196    * We keep these in a doubly-linked list (for cleanup).
197    */
198   struct TransmitCallbackContext *prev;
199   
200   /**
201    * The message that we're asked to transmit.
202    */
203   struct GNUNET_MessageHeader *msg;
204   
205   /**
206    * Handle for the transmission request.
207    */
208   struct GNUNET_CONNECTION_TransmitHandle *th;
209
210   /**
211    * Client that we are transmitting to.
212    */
213   struct GNUNET_SERVER_Client *client;
214
215   /**
216    * Function to call once msg has been transmitted
217    * (or at least added to the buffer).
218    */
219   TransmitContinuation tc;
220
221   /**
222    * Closure for tc.
223    */
224   void *tc_cls;
225
226   /**
227    * GNUNET_YES if we are supposed to signal the server
228    * completion of the client's request.
229    */
230   int end;
231 };
232
233   
234 /**
235  * Head of the doubly-linked list (for cleanup).
236  */
237 static struct TransmitCallbackContext *tcc_head;
238
239 /**
240  * Tail of the doubly-linked list (for cleanup).
241  */
242 static struct TransmitCallbackContext *tcc_tail;
243
244 /**
245  * Have we already clean ed up the TCCs and are hence no longer
246  * willing (or able) to transmit anything to anyone?
247  */
248 static int cleaning_done;
249
250 /**
251  * Task that is used to remove expired entries from
252  * the datastore.  This task will schedule itself
253  * again automatically to always delete all expired
254  * content quickly.
255  *
256  * @param cls not used
257  * @param tc task context
258  */ 
259 static void
260 delete_expired (void *cls,
261                 const struct GNUNET_SCHEDULER_TaskContext *tc);
262
263
264 /**
265  * Iterate over the expired items stored in the datastore.
266  * Delete all expired items; once we have processed all
267  * expired items, re-schedule the "delete_expired" task.
268  *
269  * @param cls not used
270  * @param next_cls closure to pass to the "next" function.
271  * @param key key for the content
272  * @param size number of bytes in data
273  * @param data content stored
274  * @param type type of the content
275  * @param priority priority of the content
276  * @param anonymity anonymity-level for the content
277  * @param expiration expiration time for the content
278  * @param uid unique identifier for the datum;
279  *        maybe 0 if no unique identifier is available
280  *
281  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
282  *         (continue on call to "next", of course),
283  *         GNUNET_NO to delete the item and continue (if supported)
284  */
285 static int 
286 expired_processor (void *cls,
287                    void *next_cls,
288                    const GNUNET_HashCode * key,
289                    uint32_t size,
290                    const void *data,
291                    enum GNUNET_BLOCK_Type type,
292                    uint32_t priority,
293                    uint32_t anonymity,
294                    struct GNUNET_TIME_Absolute
295                    expiration, 
296                    uint64_t uid)
297 {
298   struct GNUNET_TIME_Absolute now;
299
300   if (key == NULL) 
301     {
302       expired_kill_task 
303         = GNUNET_SCHEDULER_add_delayed (sched,
304                                         MAX_EXPIRE_DELAY,
305                                         &delete_expired,
306                                         NULL);
307       return GNUNET_SYSERR;
308     }
309   now = GNUNET_TIME_absolute_get ();
310   if (expiration.value > now.value)
311     {
312       /* finished processing */
313       plugin->api->next_request (next_cls, GNUNET_YES);
314       return GNUNET_SYSERR;
315     }
316   plugin->api->next_request (next_cls, GNUNET_NO);
317 #if DEBUG_DATASTORE
318   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
319               "Deleting content that expired %llu ms ago\n",
320               (unsigned long long) (now.value - expiration.value));
321 #endif
322   GNUNET_STATISTICS_update (stats,
323                             gettext_noop ("# bytes expired"),
324                             size,
325                             GNUNET_YES);
326   GNUNET_CONTAINER_bloomfilter_remove (filter,
327                                        key);
328   return GNUNET_NO; /* delete */
329 }
330
331
332 /**
333  * Task that is used to remove expired entries from
334  * the datastore.  This task will schedule itself
335  * again automatically to always delete all expired
336  * content quickly.
337  *
338  * @param cls not used
339  * @param tc task context
340  */ 
341 static void
342 delete_expired (void *cls,
343                 const struct GNUNET_SCHEDULER_TaskContext *tc)
344 {
345   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
346   plugin->api->iter_ascending_expiration (plugin->api->cls, 
347                                           0,
348                                           &expired_processor,
349                                           NULL);
350 }
351
352
353 /**
354  * An iterator over a set of items stored in the datastore.
355  *
356  * @param cls closure
357  * @param next_cls closure to pass to the "next" function.
358  * @param key key for the content
359  * @param size number of bytes in data
360  * @param data content stored
361  * @param type type of the content
362  * @param priority priority of the content
363  * @param anonymity anonymity-level for the content
364  * @param expiration expiration time for the content
365  * @param uid unique identifier for the datum;
366  *        maybe 0 if no unique identifier is available
367  *
368  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
369  *         (continue on call to "next", of course),
370  *         GNUNET_NO to delete the item and continue (if supported)
371  */
372 static int 
373 manage (void *cls,
374         void *next_cls,
375         const GNUNET_HashCode * key,
376         uint32_t size,
377         const void *data,
378         enum GNUNET_BLOCK_Type type,
379         uint32_t priority,
380         uint32_t anonymity,
381         struct GNUNET_TIME_Absolute
382         expiration, 
383         uint64_t uid)
384 {
385   unsigned long long *need = cls;
386
387   if (NULL == key)
388     {
389       GNUNET_free (need);
390       return GNUNET_SYSERR;
391     }
392   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
393     *need = 0;
394   else
395     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
396   plugin->api->next_request (next_cls, 
397                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
398 #if DEBUG_DATASTORE
399   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
400               "Deleting %llu bytes of low-priority content (still trying to free another %llu bytes)\n",
401               size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
402               *need);
403 #endif
404   GNUNET_STATISTICS_update (stats,
405                             gettext_noop ("# bytes purged (low-priority)"),
406                             size,
407                             GNUNET_YES);
408   GNUNET_CONTAINER_bloomfilter_remove (filter,
409                                        key);
410   return GNUNET_NO;
411 }
412
413
414 /**
415  * Manage available disk space by running tasks
416  * that will discard content if necessary.  This
417  * function will be run whenever a request for
418  * "need" bytes of storage could only be satisfied
419  * by eating into the "cache" (and we want our cache
420  * space back).
421  *
422  * @param need number of bytes of content that were
423  *        placed into the "cache" (and hence the
424  *        number of bytes that should be removed).
425  */
426 static void
427 manage_space (unsigned long long need)
428 {
429   unsigned long long *n;
430
431 #if DEBUG_DATASTORE
432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433               "Asked to free up %llu bytes of cache space\n",
434               need);
435 #endif
436   n = GNUNET_malloc (sizeof(unsigned long long));
437   *n = need;
438   plugin->api->iter_low_priority (plugin->api->cls,
439                                   0,
440                                   &manage,
441                                   n);
442 }
443
444
445 /**
446  * Function called to notify a client about the socket
447  * begin ready to queue more data.  "buf" will be
448  * NULL and "size" zero if the socket was closed for
449  * writing in the meantime.
450  *
451  * @param cls closure
452  * @param size number of bytes available in buf
453  * @param buf where the callee should write the message
454  * @return number of bytes written to buf
455  */
456 static size_t
457 transmit_callback (void *cls,
458                    size_t size, void *buf)
459 {
460   struct TransmitCallbackContext *tcc = cls;
461   size_t msize;
462   
463   tcc->th = NULL;
464   GNUNET_CONTAINER_DLL_remove (tcc_head,
465                                tcc_tail,
466                                tcc);
467   msize = ntohs(tcc->msg->size);
468   if (size == 0)
469     {
470 #if DEBUG_DATASTORE
471       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
472                   "Transmission failed.\n");
473 #endif
474       if (tcc->tc != NULL)
475         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
476       if (GNUNET_YES == tcc->end)
477         GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
478       GNUNET_SERVER_client_drop (tcc->client);
479       GNUNET_free (tcc->msg);
480       GNUNET_free (tcc);
481       return 0;
482     }
483   GNUNET_assert (size >= msize);
484   memcpy (buf, tcc->msg, msize);
485   if (tcc->tc != NULL)
486     tcc->tc (tcc->tc_cls, GNUNET_OK);
487   if (GNUNET_YES == tcc->end)
488     {
489       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
490     }
491   else
492     {
493 #if DEBUG_DATASTORE
494       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
495                   "Response transmitted, more pending!\n");
496 #endif
497     }
498   GNUNET_SERVER_client_drop (tcc->client);
499   GNUNET_free (tcc->msg);
500   GNUNET_free (tcc);
501   return msize;
502 }
503
504
505 /**
506  * Transmit the given message to the client.
507  *
508  * @param client target of the message
509  * @param msg message to transmit, will be freed!
510  * @param tc function to call afterwards
511  * @param tc_cls closure for tc
512  * @param end is this the last response (and we should
513  *        signal the server completion accodingly after
514  *        transmitting this message)?
515  */
516 static void
517 transmit (struct GNUNET_SERVER_Client *client,
518           struct GNUNET_MessageHeader *msg,
519           TransmitContinuation tc,
520           void *tc_cls,
521           int end)
522 {
523   struct TransmitCallbackContext *tcc;
524
525   if (GNUNET_YES == cleaning_done)
526     {
527       if (NULL != tc)
528         tc (tc_cls, GNUNET_SYSERR);
529       return;
530     }
531   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
532   tcc->msg = msg;
533   tcc->client = client;
534   tcc->tc = tc;
535   tcc->tc_cls = tc_cls;
536   tcc->end = end;
537   if (NULL ==
538       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
539                                                       ntohs(msg->size),
540                                                       GNUNET_TIME_UNIT_FOREVER_REL,
541                                                       &transmit_callback,
542                                                       tcc)))
543     {
544       GNUNET_break (0);
545       if (GNUNET_YES == end)
546         {
547 #if DEBUG_DATASTORE
548           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
549                       "Disconnecting client.\n");
550 #endif    
551           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
552         }
553       if (NULL != tc)
554         tc (tc_cls, GNUNET_SYSERR);
555       GNUNET_free (msg);
556       GNUNET_free (tcc);
557       return;
558     }
559   GNUNET_SERVER_client_keep (client);
560   GNUNET_CONTAINER_DLL_insert (tcc_head,
561                                tcc_tail,
562                                tcc);
563 }
564
565
566 /**
567  * Transmit a status code to the client.
568  *
569  * @param client receiver of the response
570  * @param code status code
571  * @param msg optional error message (can be NULL)
572  */
573 static void
574 transmit_status (struct GNUNET_SERVER_Client *client,
575                  int code,
576                  const char *msg)
577 {
578   struct StatusMessage *sm;
579   size_t slen;
580
581 #if DEBUG_DATASTORE
582   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
583               "Transmitting `%s' message with value %d and message `%s'\n",
584               "STATUS",
585               code,
586               msg != NULL ? msg : "(none)");
587 #endif
588   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
589   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
590   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
591   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
592   sm->status = htonl(code);
593   if (slen > 0)
594     memcpy (&sm[1], msg, slen);  
595   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
596 }
597
598
599 /**
600  * Function called once the transmit operation has
601  * either failed or succeeded.
602  *
603  * @param next_cls closure for calling "next_request" callback
604  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
605  */
606 static void 
607 get_next(void *next_cls,
608          int status)
609 {
610   if (status != GNUNET_OK)
611     {
612       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
613                   _("Failed to transmit an item to the client; aborting iteration.\n"));
614       if (plugin != NULL)
615         plugin->api->next_request (next_cls, GNUNET_YES);
616       return;
617     }
618   plugin->api->next_request (next_cls, GNUNET_NO);
619 }
620
621
622 /**
623  * Function that will transmit the given datastore entry
624  * to the client.
625  *
626  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
627  * @param next_cls closure to use to ask for the next item
628  * @param key key for the content
629  * @param size number of bytes in data
630  * @param data content stored
631  * @param type type of the content
632  * @param priority priority of the content
633  * @param anonymity anonymity-level for the content
634  * @param expiration expiration time for the content
635  * @param uid unique identifier for the datum;
636  *        maybe 0 if no unique identifier is available
637  *
638  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
639  *         GNUNET_NO to delete the item and continue (if supported)
640  */
641 static int
642 transmit_item (void *cls,
643                void *next_cls,
644                const GNUNET_HashCode * key,
645                uint32_t size,
646                const void *data,
647                enum GNUNET_BLOCK_Type type,
648                uint32_t priority,
649                uint32_t anonymity,
650                struct GNUNET_TIME_Absolute
651                expiration, uint64_t uid)
652 {
653   struct GNUNET_SERVER_Client *client = cls;
654   struct GNUNET_MessageHeader *end;
655   struct DataMessage *dm;
656
657   if (key == NULL)
658     {
659       /* transmit 'DATA_END' */
660 #if DEBUG_DATASTORE
661       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
662                   "Transmitting `%s' message\n",
663                   "DATA_END");
664 #endif
665       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
666       end->size = htons(sizeof(struct GNUNET_MessageHeader));
667       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
668       transmit (client, end, NULL, NULL, GNUNET_YES);
669       GNUNET_SERVER_client_drop (client);
670       return GNUNET_OK;
671     }
672   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
673   dm->header.size = htons(sizeof(struct DataMessage) + size);
674   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
675   dm->rid = htonl(0);
676   dm->size = htonl(size);
677   dm->type = htonl(type);
678   dm->priority = htonl(priority);
679   dm->anonymity = htonl(anonymity);
680   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
681   dm->uid = GNUNET_htonll(uid);
682   dm->key = *key;
683   memcpy (&dm[1], data, size);
684 #if DEBUG_DATASTORE
685   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686               "Transmitting `%s' message\n",
687               "DATA");
688 #endif
689   GNUNET_STATISTICS_update (stats,
690                             gettext_noop ("# results found"),
691                             1,
692                             GNUNET_NO);
693   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
694   return GNUNET_OK;
695 }
696
697
698 /**
699  * Handle RESERVE-message.
700  *
701  * @param cls closure
702  * @param client identification of the client
703  * @param message the actual message
704  */
705 static void
706 handle_reserve (void *cls,
707                 struct GNUNET_SERVER_Client *client,
708                 const struct GNUNET_MessageHeader *message)
709 {
710   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
711   struct ReservationList *e;
712   unsigned long long used;
713   unsigned long long req;
714   uint64_t amount;
715   uint32_t entries;
716
717 #if DEBUG_DATASTORE
718   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719               "Processing `%s' request\n",
720               "RESERVE");
721 #endif
722   amount = GNUNET_ntohll(msg->amount);
723   entries = ntohl(msg->entries);
724   used = plugin->api->get_size (plugin->api->cls) + reserved;
725   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
726   if (used + req > quota)
727     {
728       if (quota < used)
729         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
730       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
731                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
732                   quota - used,
733                   "RESERVE",
734                   req);
735       if (cache_size < req)
736         {
737           /* TODO: document this in the FAQ; essentially, if this
738              message happens, the insertion request could be blocked
739              by less-important content from migration because it is
740              larger than 1/8th of the overall available space, and
741              we only reserve 1/8th for "fresh" insertions */
742           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
743                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
744                       req,
745                       cache_size);
746           transmit_status (client, 0, 
747                            gettext_noop ("Insufficient space to satisfy request and "
748                                          "requested amount is larger than cache size"));
749         }
750       else
751         {
752           transmit_status (client, 0, 
753                            gettext_noop ("Insufficient space to satisfy request"));
754         }
755       return;      
756     }
757   reserved += req;
758   e = GNUNET_malloc (sizeof(struct ReservationList));
759   e->next = reservations;
760   reservations = e;
761   e->client = client;
762   e->amount = amount;
763   e->entries = entries;
764   e->rid = ++reservation_gen;
765   if (reservation_gen < 0)
766     reservation_gen = 0; /* wrap around */
767   transmit_status (client, e->rid, NULL);
768 }
769
770
771 /**
772  * Handle RELEASE_RESERVE-message.
773  *
774  * @param cls closure
775  * @param client identification of the client
776  * @param message the actual message
777  */
778 static void
779 handle_release_reserve (void *cls,
780                         struct GNUNET_SERVER_Client *client,
781                         const struct GNUNET_MessageHeader *message)
782 {
783   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
784   struct ReservationList *pos;
785   struct ReservationList *prev;
786   struct ReservationList *next;
787   int rid = ntohl(msg->rid);
788   unsigned long long rem;
789
790 #if DEBUG_DATASTORE
791   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792               "Processing `%s' request\n",
793               "RELEASE_RESERVE");
794 #endif
795   next = reservations;
796   prev = NULL;
797   while (NULL != (pos = next))
798     {
799       next = pos->next;
800       if (rid == pos->rid)
801         {
802           if (prev == NULL)
803             reservations = next;
804           else
805             prev->next = next;
806           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
807           GNUNET_assert (reserved >= rem);
808           reserved -= rem;
809 #if DEBUG_DATASTORE
810           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
811                       "Returning %llu remaining reserved bytes to storage pool\n",
812                       rem);
813 #endif    
814           GNUNET_free (pos);
815           transmit_status (client, GNUNET_OK, NULL);
816           return;
817         }       
818       prev = pos;
819     }
820   GNUNET_break (0);
821   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
822 }
823
824
825 /**
826  * Check that the given message is a valid data message.
827  *
828  * @return NULL if the message is not well-formed, otherwise the message
829  */
830 static const struct DataMessage *
831 check_data (const struct GNUNET_MessageHeader *message)
832 {
833   uint16_t size;
834   uint32_t dsize;
835   const struct DataMessage *dm;
836
837   size = ntohs(message->size);
838   if (size < sizeof(struct DataMessage))
839     { 
840       GNUNET_break (0);
841       return NULL;
842     }
843   dm = (const struct DataMessage *) message;
844   dsize = ntohl(dm->size);
845   if (size != dsize + sizeof(struct DataMessage))
846     {
847       GNUNET_break (0);
848       return NULL;
849     }
850   return dm;
851 }
852
853
854 /**
855  * Handle PUT-message.
856  *
857  * @param cls closure
858  * @param client identification of the client
859  * @param message the actual message
860  */
861 static void
862 handle_put (void *cls,
863             struct GNUNET_SERVER_Client *client,
864             const struct GNUNET_MessageHeader *message)
865 {
866   const struct DataMessage *dm = check_data (message);
867   char *msg;
868   int ret;
869   int rid;
870   struct ReservationList *pos;
871   uint32_t size;
872
873   if ( (dm == NULL) ||
874        (ntohl(dm->type) == 0) ) 
875     {
876       GNUNET_break (0);
877       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
878       return;
879     }
880 #if DEBUG_DATASTORE
881   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
882               "Processing `%s' request for `%s'\n",
883               "PUT",
884               GNUNET_h2s (&dm->key));
885 #endif
886   rid = ntohl(dm->rid);
887   size = ntohl(dm->size);
888   if (rid > 0)
889     {
890       pos = reservations;
891       while ( (NULL != pos) &&
892               (rid != pos->rid) )
893         pos = pos->next;
894       GNUNET_break (pos != NULL);
895       if (NULL != pos)
896         {
897           GNUNET_break (pos->entries > 0);
898           GNUNET_break (pos->amount > size);
899           pos->entries--;
900           pos->amount -= size;
901           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
902         }
903     }
904   msg = NULL;
905   ret = plugin->api->put (plugin->api->cls,
906                           &dm->key,
907                           size,
908                           &dm[1],
909                           ntohl(dm->type),
910                           ntohl(dm->priority),
911                           ntohl(dm->anonymity),
912                           GNUNET_TIME_absolute_ntoh(dm->expiration),
913                           &msg);
914   if (GNUNET_OK == ret)
915     {
916       GNUNET_STATISTICS_update (stats,
917                                 gettext_noop ("# bytes stored"),
918                                 size,
919                                 GNUNET_YES);
920       GNUNET_CONTAINER_bloomfilter_add (filter,
921                                         &dm->key);
922 #if DEBUG_DATASTORE
923       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
924                   "Successfully stored %u bytes under key `%s'\n",
925                   size,
926                   GNUNET_h2s (&dm->key));
927 #endif
928     }
929   transmit_status (client, 
930                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
931                    msg);
932   GNUNET_free_non_null (msg);
933   if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
934     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
935 }
936
937
938 /**
939  * Handle GET-message.
940  *
941  * @param cls closure
942  * @param client identification of the client
943  * @param message the actual message
944  */
945 static void
946 handle_get (void *cls,
947             struct GNUNET_SERVER_Client *client,
948             const struct GNUNET_MessageHeader *message)
949 {
950   const struct GetMessage *msg;
951   uint16_t size;
952
953   size = ntohs(message->size);
954   if ( (size != sizeof(struct GetMessage)) &&
955        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
956     {
957       GNUNET_break (0);
958       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
959       return;
960     }
961   msg = (const struct GetMessage*) message;
962 #if DEBUG_DATASTORE
963   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
964               "Processing `%s' request for `%s' of type %u\n",
965               "GET",
966               GNUNET_h2s (&msg->key),
967               ntohl (msg->type));
968 #endif
969   GNUNET_STATISTICS_update (stats,
970                             gettext_noop ("# GET requests received"),
971                             1,
972                             GNUNET_NO);
973   GNUNET_SERVER_client_keep (client);
974   if ( (size == sizeof(struct GetMessage)) &&
975        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
976                                                          &msg->key)) )
977     {
978       /* don't bother database... */
979 #if DEBUG_DATASTORE
980       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
981                   "Empty result set for `%s' request for `%s'.\n",
982                   "GET",
983                   GNUNET_h2s (&msg->key));
984 #endif  
985       GNUNET_STATISTICS_update (stats,
986                                 gettext_noop ("# requests filtered by bloomfilter"),
987                                 1,
988                                 GNUNET_NO);
989       transmit_item (client,
990                      NULL, NULL, 0, NULL, 0, 0, 0, 
991                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
992       return;
993     }
994   plugin->api->get (plugin->api->cls,
995                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
996                     NULL,
997                     ntohl(msg->type),
998                     &transmit_item,
999                     client);    
1000 }
1001
1002
1003 /**
1004  * Handle UPDATE-message.
1005  *
1006  * @param cls closure
1007  * @param client identification of the client
1008  * @param message the actual message
1009  */
1010 static void
1011 handle_update (void *cls,
1012                struct GNUNET_SERVER_Client *client,
1013                const struct GNUNET_MessageHeader *message)
1014 {
1015   const struct UpdateMessage *msg;
1016   int ret;
1017   char *emsg;
1018
1019   GNUNET_STATISTICS_update (stats,
1020                             gettext_noop ("# UPDATE requests received"),
1021                             1,
1022                             GNUNET_NO);
1023   msg = (const struct UpdateMessage*) message;
1024   emsg = NULL;
1025 #if DEBUG_DATASTORE
1026   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027               "Processing `%s' request for %llu\n",
1028               "UPDATE",
1029               (unsigned long long) GNUNET_ntohll (msg->uid));
1030 #endif
1031   ret = plugin->api->update (plugin->api->cls,
1032                              GNUNET_ntohll(msg->uid),
1033                              (int32_t) ntohl(msg->priority),
1034                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1035                              &emsg);
1036   transmit_status (client, ret, emsg);
1037   GNUNET_free_non_null (emsg);
1038 }
1039
1040
1041 /**
1042  * Handle GET_RANDOM-message.
1043  *
1044  * @param cls closure
1045  * @param client identification of the client
1046  * @param message the actual message
1047  */
1048 static void
1049 handle_get_random (void *cls,
1050                    struct GNUNET_SERVER_Client *client,
1051                    const struct GNUNET_MessageHeader *message)
1052 {
1053 #if DEBUG_DATASTORE
1054   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1055               "Processing `%s' request\n",
1056               "GET_RANDOM");
1057 #endif
1058   GNUNET_STATISTICS_update (stats,
1059                             gettext_noop ("# GET RANDOM requests received"),
1060                             1,
1061                             GNUNET_NO);
1062   GNUNET_SERVER_client_keep (client);
1063   plugin->api->iter_migration_order (plugin->api->cls,
1064                                      0,
1065                                      &transmit_item,
1066                                      client);  
1067 }
1068
1069
1070 /**
1071  * Context for the 'remove_callback'.
1072  */
1073 struct RemoveContext 
1074 {
1075   /**
1076    * Client for whom we're doing the remvoing.
1077    */
1078   struct GNUNET_SERVER_Client *client;
1079
1080   /**
1081    * GNUNET_YES if we managed to remove something.
1082    */
1083   int found;
1084 };
1085
1086
1087 /**
1088  * Callback function that will cause the item that is passed
1089  * in to be deleted (by returning GNUNET_NO).
1090  */
1091 static int
1092 remove_callback (void *cls,
1093                  void *next_cls,
1094                  const GNUNET_HashCode * key,
1095                  uint32_t size,
1096                  const void *data,
1097                  enum GNUNET_BLOCK_Type type,
1098                  uint32_t priority,
1099                  uint32_t anonymity,
1100                  struct GNUNET_TIME_Absolute
1101                  expiration, uint64_t uid)
1102 {
1103   struct RemoveContext *rc = cls;
1104
1105   if (key == NULL)
1106     {
1107 #if DEBUG_DATASTORE
1108       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1109                   "No further matches for `%s' request.\n",
1110                   "REMOVE");
1111 #endif  
1112       if (GNUNET_YES == rc->found)
1113         transmit_status (rc->client, GNUNET_OK, NULL);       
1114       else
1115         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1116       GNUNET_SERVER_client_drop (rc->client);
1117       GNUNET_free (rc);
1118       return GNUNET_OK; /* last item */
1119     }
1120   rc->found = GNUNET_YES;
1121 #if DEBUG_DATASTORE
1122   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1123               "Item %llu matches `%s' request for key `%s'.\n",
1124               (unsigned long long) uid,
1125               "REMOVE",
1126               GNUNET_h2s (key));
1127 #endif  
1128   GNUNET_STATISTICS_update (stats,
1129                             gettext_noop ("# bytes removed (explicit request)"),
1130                             size,
1131                             GNUNET_YES);
1132   GNUNET_CONTAINER_bloomfilter_remove (filter,
1133                                        key);
1134   plugin->api->next_request (next_cls, GNUNET_YES);
1135   return GNUNET_NO;
1136 }
1137
1138
1139 /**
1140  * Handle REMOVE-message.
1141  *
1142  * @param cls closure
1143  * @param client identification of the client
1144  * @param message the actual message
1145  */
1146 static void
1147 handle_remove (void *cls,
1148              struct GNUNET_SERVER_Client *client,
1149              const struct GNUNET_MessageHeader *message)
1150 {
1151   const struct DataMessage *dm = check_data (message);
1152   GNUNET_HashCode vhash;
1153   struct RemoveContext *rc;
1154
1155   if (dm == NULL)
1156     {
1157       GNUNET_break (0);
1158       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1159       return;
1160     }
1161 #if DEBUG_DATASTORE
1162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163               "Processing `%s' request for `%s'\n",
1164               "REMOVE",
1165               GNUNET_h2s (&dm->key));
1166 #endif
1167   GNUNET_STATISTICS_update (stats,
1168                             gettext_noop ("# REMOVE requests received"),
1169                             1,
1170                             GNUNET_NO);
1171   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1172   GNUNET_SERVER_client_keep (client);
1173   rc->client = client;
1174   GNUNET_CRYPTO_hash (&dm[1],
1175                       ntohl(dm->size),
1176                       &vhash);
1177   plugin->api->get (plugin->api->cls,
1178                     &dm->key,
1179                     &vhash,
1180                     ntohl(dm->type),
1181                     &remove_callback,
1182                     rc);
1183 }
1184
1185
1186 /**
1187  * Handle DROP-message.
1188  *
1189  * @param cls closure
1190  * @param client identification of the client
1191  * @param message the actual message
1192  */
1193 static void
1194 handle_drop (void *cls,
1195              struct GNUNET_SERVER_Client *client,
1196              const struct GNUNET_MessageHeader *message)
1197 {
1198 #if DEBUG_DATASTORE
1199   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1200               "Processing `%s' request\n",
1201               "DROP");
1202 #endif
1203   plugin->api->drop (plugin->api->cls);
1204   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1205 }
1206
1207
1208 /**
1209  * List of handlers for the messages understood by this
1210  * service.
1211  */
1212 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1213   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1214    sizeof(struct ReserveMessage) }, 
1215   {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1216    sizeof(struct ReleaseReserveMessage) }, 
1217   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1218   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1219    sizeof (struct UpdateMessage) }, 
1220   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1221   {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1222    sizeof(struct GNUNET_MessageHeader) }, 
1223   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1224   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1225    sizeof(struct GNUNET_MessageHeader) }, 
1226   {NULL, NULL, 0, 0}
1227 };
1228
1229
1230
1231 /**
1232  * Load the datastore plugin.
1233  */
1234 static struct DatastorePlugin *
1235 load_plugin () 
1236 {
1237   struct DatastorePlugin *ret;
1238   char *libname;
1239   char *name;
1240
1241   if (GNUNET_OK !=
1242       GNUNET_CONFIGURATION_get_value_string (cfg,
1243                                              "DATASTORE", "DATABASE", &name))
1244     {
1245       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1246                   _("No `%s' specified for `%s' in configuration!\n"),
1247                   "DATABASE",
1248                   "DATASTORE");
1249       return NULL;
1250     }
1251   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1252   ret->env.cfg = cfg;
1253   ret->env.sched = sched;  
1254   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1255               _("Loading `%s' datastore plugin\n"), name);
1256   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1257   ret->short_name = name;
1258   ret->lib_name = libname;
1259   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1260   if (ret->api == NULL)
1261     {
1262       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1263                   _("Failed to load datastore plugin for `%s'\n"), name);
1264       GNUNET_free (ret->short_name);
1265       GNUNET_free (libname);
1266       GNUNET_free (ret);
1267       return NULL;
1268     }
1269   return ret;
1270 }
1271
1272
1273 /**
1274  * Function called when the service shuts
1275  * down.  Unloads our datastore plugin.
1276  *
1277  * @param plug plugin to unload
1278  */
1279 static void
1280 unload_plugin (struct DatastorePlugin *plug)
1281 {
1282 #if DEBUG_DATASTORE
1283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284               "Datastore service is unloading plugin...\n");
1285 #endif
1286   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1287   GNUNET_free (plug->lib_name);
1288   GNUNET_free (plug->short_name);
1289   GNUNET_free (plug);
1290 }
1291
1292
1293 /**
1294  * Final task run after shutdown.  Unloads plugins and disconnects us from
1295  * statistics.
1296  */
1297 static void
1298 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1299 {
1300   unload_plugin (plugin);
1301   plugin = NULL;
1302   if (filter != NULL)
1303     {
1304       GNUNET_CONTAINER_bloomfilter_free (filter);
1305       filter = NULL;
1306     }
1307   GNUNET_ARM_stop_services (cfg, tc->sched, "statistics", NULL);
1308   if (stats != NULL)
1309     {
1310       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1311       stats = NULL;
1312     }
1313 }
1314
1315
1316 /**
1317  * Last task run during shutdown.  Disconnects us from
1318  * the transport and core.
1319  */
1320 static void
1321 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1322 {
1323   struct TransmitCallbackContext *tcc;
1324
1325   cleaning_done = GNUNET_YES;
1326   while (NULL != (tcc = tcc_head))
1327     {
1328       GNUNET_CONTAINER_DLL_remove (tcc_head,
1329                                    tcc_tail,
1330                                    tcc);
1331       if (tcc->th != NULL)
1332         {
1333           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1334           GNUNET_SERVER_client_drop (tcc->client);
1335         }
1336    if (NULL != tcc->tc)
1337         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1338       GNUNET_free (tcc->msg);
1339       GNUNET_free (tcc);
1340     }
1341   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1342     {
1343       GNUNET_SCHEDULER_cancel (sched,
1344                                expired_kill_task);
1345       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1346     }
1347   GNUNET_SCHEDULER_add_continuation (sched,
1348                                      &unload_task,
1349                                      NULL,
1350                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1351 }
1352
1353
1354 /**
1355  * Function that removes all active reservations made
1356  * by the given client and releases the space for other
1357  * requests.
1358  *
1359  * @param cls closure
1360  * @param client identification of the client
1361  */
1362 static void
1363 cleanup_reservations (void *cls,
1364                       struct GNUNET_SERVER_Client
1365                       * client)
1366 {
1367   struct ReservationList *pos;
1368   struct ReservationList *prev;
1369   struct ReservationList *next;
1370
1371   if (client == NULL)
1372     return;
1373   prev = NULL;
1374   pos = reservations;
1375   while (NULL != pos)
1376     {
1377       next = pos->next;
1378       if (pos->client == client)
1379         {
1380           if (prev == NULL)
1381             reservations = next;
1382           else
1383             prev->next = next;
1384           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1385           GNUNET_free (pos);
1386         }
1387       else
1388         {
1389           prev = pos;
1390         }
1391       pos = next;
1392     }
1393 }
1394
1395
1396 /**
1397  * Process datastore requests.
1398  *
1399  * @param cls closure
1400  * @param s scheduler to use
1401  * @param server the initialized server
1402  * @param c configuration to use
1403  */
1404 static void
1405 run (void *cls,
1406      struct GNUNET_SCHEDULER_Handle *s,
1407      struct GNUNET_SERVER_Handle *server,
1408      const struct GNUNET_CONFIGURATION_Handle *c)
1409 {
1410   char *fn;
1411   unsigned int bf_size;
1412
1413   sched = s;
1414   cfg = c;
1415   if (GNUNET_OK !=
1416       GNUNET_CONFIGURATION_get_value_number (cfg,
1417                                              "DATASTORE", "QUOTA", &quota))
1418     {
1419       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1420                   _("No `%s' specified for `%s' in configuration!\n"),
1421                   "QUOTA",
1422                   "DATASTORE");
1423       return;
1424     }
1425   stats = GNUNET_STATISTICS_create (sched, "datastore", cfg);
1426   cache_size = quota / 8; /* Or should we make this an option? */
1427   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1428   fn = NULL;
1429   if ( (GNUNET_OK !=
1430         GNUNET_CONFIGURATION_get_value_filename (cfg,
1431                                                  "DATASTORE",
1432                                                  "BLOOMFILTER",
1433                                                  &fn)) ||
1434        (GNUNET_OK !=
1435         GNUNET_DISK_directory_create_for_file (fn)) )
1436     {
1437       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1438                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1439                   fn != NULL ? fn : "");
1440       GNUNET_free_non_null (fn);
1441       fn = NULL;
1442     }
1443   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1444   GNUNET_free_non_null (fn);
1445   if (filter == NULL)
1446     {
1447       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1448                   _("Failed to initialize bloomfilter.\n"));
1449       if (stats != NULL)
1450         {
1451           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1452           stats = NULL;
1453         }
1454       return;
1455     }
1456   GNUNET_ARM_start_services (cfg, sched, "statistics", NULL);
1457   plugin = load_plugin ();
1458   if (NULL == plugin)
1459     {
1460       GNUNET_CONTAINER_bloomfilter_free (filter);
1461       filter = NULL;
1462       GNUNET_ARM_stop_services (cfg, sched, "statistics", NULL);
1463       if (stats != NULL)
1464         {
1465           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1466           stats = NULL;
1467         }
1468       return;
1469     }
1470   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1471   GNUNET_SERVER_add_handlers (server, handlers);
1472   expired_kill_task
1473     = GNUNET_SCHEDULER_add_with_priority (sched,
1474                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1475                                           &delete_expired, NULL);
1476   GNUNET_SCHEDULER_add_delayed (sched,
1477                                 GNUNET_TIME_UNIT_FOREVER_REL,
1478                                 &cleaning_task, NULL);
1479   
1480 }
1481
1482
1483 /**
1484  * The main function for the datastore service.
1485  *
1486  * @param argc number of arguments from the command line
1487  * @param argv command line arguments
1488  * @return 0 ok, 1 on error
1489  */
1490 int
1491 main (int argc, char *const *argv)
1492 {
1493   int ret;
1494
1495   ret = (GNUNET_OK ==
1496          GNUNET_SERVICE_run (argc,
1497                              argv,
1498                              "datastore",
1499                              GNUNET_SERVICE_OPTION_NONE,
1500                              &run, NULL)) ? 0 : 1;
1501   return ret;
1502 }
1503
1504
1505 /* end of gnunet-service-datastore.c */