fix quota calculations
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "plugin_datastore.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45
46
47 /**
48  * Our datastore plugin.
49  */
50 struct DatastorePlugin
51 {
52
53   /**
54    * API of the transport as returned by the plugin's
55    * initialization function.
56    */
57   struct GNUNET_DATASTORE_PluginFunctions *api;
58
59   /**
60    * Short name for the plugin (i.e. "sqlite").
61    */
62   char *short_name;
63
64   /**
65    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
66    */
67   char *lib_name;
68
69   /**
70    * Environment this transport service is using
71    * for this plugin.
72    */
73   struct GNUNET_DATASTORE_PluginEnvironment env;
74
75 };
76
77
78 /**
79  * Linked list of active reservations.
80  */
81 struct ReservationList 
82 {
83
84   /**
85    * This is a linked list.
86    */
87   struct ReservationList *next;
88
89   /**
90    * Client that made the reservation.
91    */
92   struct GNUNET_SERVER_Client *client;
93
94   /**
95    * Number of bytes (still) reserved.
96    */
97   uint64_t amount;
98
99   /**
100    * Number of items (still) reserved.
101    */
102   uint64_t entries;
103
104   /**
105    * Reservation identifier.
106    */
107   int32_t rid;
108
109 };
110
111
112 /**
113  * Our datastore plugin (NULL if not available).
114  */
115 static struct DatastorePlugin *plugin;
116
117 /**
118  * Linked list of space reservations made by clients.
119  */
120 static struct ReservationList *reservations;
121
122 /**
123  * Bloomfilter to quickly tell if we don't have the content.
124  */
125 static struct GNUNET_CONTAINER_BloomFilter *filter;
126
127 /**
128  * Static counter to produce reservation identifiers.
129  */
130 static int reservation_gen;
131
132 /**
133  * How much space are we allowed to use?
134  */
135 static unsigned long long quota;
136
137 /**
138  * How much space are we using for the cache?  (space available for
139  * insertions that will be instantly reclaimed by discarding less
140  * important content --- or possibly whatever we just inserted into
141  * the "cache").
142  */
143 static unsigned long long cache_size;
144
145 /**
146  * How much space have we currently reserved?
147  */
148 static unsigned long long reserved;
149
150 /**
151  * Identity of the task that is used to delete
152  * expired content.
153  */
154 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
155
156 /**
157  * Our configuration.
158  */
159 const struct GNUNET_CONFIGURATION_Handle *cfg;
160
161 /**
162  * Our scheduler.
163  */
164 struct GNUNET_SCHEDULER_Handle *sched; 
165
166 /**
167  * Handle for reporting statistics.
168  */
169 static struct GNUNET_STATISTICS_Handle *stats;
170
171
172 /**
173  * Function called once the transmit operation has
174  * either failed or succeeded.
175  *
176  * @param cls closure
177  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
178  */
179 typedef void (*TransmitContinuation)(void *cls,
180                                      int status);
181
182
183 /**
184  * Context for transmitting replies to clients.
185  */
186 struct TransmitCallbackContext 
187 {
188   
189   /**
190    * We keep these in a doubly-linked list (for cleanup).
191    */
192   struct TransmitCallbackContext *next;
193   
194   /**
195    * We keep these in a doubly-linked list (for cleanup).
196    */
197   struct TransmitCallbackContext *prev;
198   
199   /**
200    * The message that we're asked to transmit.
201    */
202   struct GNUNET_MessageHeader *msg;
203   
204   /**
205    * Handle for the transmission request.
206    */
207   struct GNUNET_CONNECTION_TransmitHandle *th;
208
209   /**
210    * Client that we are transmitting to.
211    */
212   struct GNUNET_SERVER_Client *client;
213
214   /**
215    * Function to call once msg has been transmitted
216    * (or at least added to the buffer).
217    */
218   TransmitContinuation tc;
219
220   /**
221    * Closure for tc.
222    */
223   void *tc_cls;
224
225   /**
226    * GNUNET_YES if we are supposed to signal the server
227    * completion of the client's request.
228    */
229   int end;
230 };
231
232   
233 /**
234  * Head of the doubly-linked list (for cleanup).
235  */
236 static struct TransmitCallbackContext *tcc_head;
237
238 /**
239  * Tail of the doubly-linked list (for cleanup).
240  */
241 static struct TransmitCallbackContext *tcc_tail;
242
243 /**
244  * Have we already clean ed up the TCCs and are hence no longer
245  * willing (or able) to transmit anything to anyone?
246  */
247 static int cleaning_done;
248
249 /**
250  * Task that is used to remove expired entries from
251  * the datastore.  This task will schedule itself
252  * again automatically to always delete all expired
253  * content quickly.
254  *
255  * @param cls not used
256  * @param tc task context
257  */ 
258 static void
259 delete_expired (void *cls,
260                 const struct GNUNET_SCHEDULER_TaskContext *tc);
261
262
263 /**
264  * Iterate over the expired items stored in the datastore.
265  * Delete all expired items; once we have processed all
266  * expired items, re-schedule the "delete_expired" task.
267  *
268  * @param cls not used
269  * @param next_cls closure to pass to the "next" function.
270  * @param key key for the content
271  * @param size number of bytes in data
272  * @param data content stored
273  * @param type type of the content
274  * @param priority priority of the content
275  * @param anonymity anonymity-level for the content
276  * @param expiration expiration time for the content
277  * @param uid unique identifier for the datum;
278  *        maybe 0 if no unique identifier is available
279  *
280  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
281  *         (continue on call to "next", of course),
282  *         GNUNET_NO to delete the item and continue (if supported)
283  */
284 static int 
285 expired_processor (void *cls,
286                    void *next_cls,
287                    const GNUNET_HashCode * key,
288                    uint32_t size,
289                    const void *data,
290                    enum GNUNET_BLOCK_Type type,
291                    uint32_t priority,
292                    uint32_t anonymity,
293                    struct GNUNET_TIME_Absolute
294                    expiration, 
295                    uint64_t uid)
296 {
297   struct GNUNET_TIME_Absolute now;
298
299   if (key == NULL) 
300     {
301       expired_kill_task 
302         = GNUNET_SCHEDULER_add_delayed (sched,
303                                         MAX_EXPIRE_DELAY,
304                                         &delete_expired,
305                                         NULL);
306       return GNUNET_SYSERR;
307     }
308   now = GNUNET_TIME_absolute_get ();
309   if (expiration.value > now.value)
310     {
311       /* finished processing */
312       plugin->api->next_request (next_cls, GNUNET_YES);
313       return GNUNET_SYSERR;
314     }
315   plugin->api->next_request (next_cls, GNUNET_NO);
316 #if DEBUG_DATASTORE
317   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
318               "Deleting content that expired %llu ms ago\n",
319               (unsigned long long) (now.value - expiration.value));
320 #endif
321   GNUNET_STATISTICS_update (stats,
322                             gettext_noop ("# bytes expired"),
323                             size,
324                             GNUNET_YES);
325   GNUNET_CONTAINER_bloomfilter_remove (filter,
326                                        key);
327   return GNUNET_NO; /* delete */
328 }
329
330
331 /**
332  * Task that is used to remove expired entries from
333  * the datastore.  This task will schedule itself
334  * again automatically to always delete all expired
335  * content quickly.
336  *
337  * @param cls not used
338  * @param tc task context
339  */ 
340 static void
341 delete_expired (void *cls,
342                 const struct GNUNET_SCHEDULER_TaskContext *tc)
343 {
344   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
345   plugin->api->iter_ascending_expiration (plugin->api->cls, 
346                                           0,
347                                           &expired_processor,
348                                           NULL);
349 }
350
351
352 /**
353  * An iterator over a set of items stored in the datastore.
354  *
355  * @param cls closure
356  * @param next_cls closure to pass to the "next" function.
357  * @param key key for the content
358  * @param size number of bytes in data
359  * @param data content stored
360  * @param type type of the content
361  * @param priority priority of the content
362  * @param anonymity anonymity-level for the content
363  * @param expiration expiration time for the content
364  * @param uid unique identifier for the datum;
365  *        maybe 0 if no unique identifier is available
366  *
367  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
368  *         (continue on call to "next", of course),
369  *         GNUNET_NO to delete the item and continue (if supported)
370  */
371 static int 
372 manage (void *cls,
373         void *next_cls,
374         const GNUNET_HashCode * key,
375         uint32_t size,
376         const void *data,
377         enum GNUNET_BLOCK_Type type,
378         uint32_t priority,
379         uint32_t anonymity,
380         struct GNUNET_TIME_Absolute
381         expiration, 
382         uint64_t uid)
383 {
384   unsigned long long *need = cls;
385
386   if (NULL == key)
387     {
388       GNUNET_free (need);
389       return GNUNET_SYSERR;
390     }
391   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
392     *need = 0;
393   else
394     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
395   plugin->api->next_request (next_cls, 
396                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
397 #if DEBUG_DATASTORE
398   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399               "Deleting %llu bytes of low-priority content (still trying to free another %llu bytes)\n",
400               size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
401               *need);
402 #endif
403   GNUNET_STATISTICS_update (stats,
404                             gettext_noop ("# bytes purged (low-priority)"),
405                             size,
406                             GNUNET_YES);
407   GNUNET_CONTAINER_bloomfilter_remove (filter,
408                                        key);
409   return GNUNET_NO;
410 }
411
412
413 /**
414  * Manage available disk space by running tasks
415  * that will discard content if necessary.  This
416  * function will be run whenever a request for
417  * "need" bytes of storage could only be satisfied
418  * by eating into the "cache" (and we want our cache
419  * space back).
420  *
421  * @param need number of bytes of content that were
422  *        placed into the "cache" (and hence the
423  *        number of bytes that should be removed).
424  */
425 static void
426 manage_space (unsigned long long need)
427 {
428   unsigned long long *n;
429
430 #if DEBUG_DATASTORE
431   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
432               "Asked to free up %llu bytes of cache space\n",
433               need);
434 #endif
435   n = GNUNET_malloc (sizeof(unsigned long long));
436   *n = need;
437   plugin->api->iter_low_priority (plugin->api->cls,
438                                   0,
439                                   &manage,
440                                   n);
441 }
442
443
444 /**
445  * Function called to notify a client about the socket
446  * begin ready to queue more data.  "buf" will be
447  * NULL and "size" zero if the socket was closed for
448  * writing in the meantime.
449  *
450  * @param cls closure
451  * @param size number of bytes available in buf
452  * @param buf where the callee should write the message
453  * @return number of bytes written to buf
454  */
455 static size_t
456 transmit_callback (void *cls,
457                    size_t size, void *buf)
458 {
459   struct TransmitCallbackContext *tcc = cls;
460   size_t msize;
461   
462   tcc->th = NULL;
463   GNUNET_CONTAINER_DLL_remove (tcc_head,
464                                tcc_tail,
465                                tcc);
466   msize = ntohs(tcc->msg->size);
467   if (size == 0)
468     {
469 #if DEBUG_DATASTORE
470       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
471                   "Transmission failed.\n");
472 #endif
473       if (tcc->tc != NULL)
474         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
475       if (GNUNET_YES == tcc->end)
476         GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
477       GNUNET_SERVER_client_drop (tcc->client);
478       GNUNET_free (tcc->msg);
479       GNUNET_free (tcc);
480       return 0;
481     }
482   GNUNET_assert (size >= msize);
483   memcpy (buf, tcc->msg, msize);
484   if (tcc->tc != NULL)
485     tcc->tc (tcc->tc_cls, GNUNET_OK);
486   if (GNUNET_YES == tcc->end)
487     {
488       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
489     }
490   else
491     {
492 #if DEBUG_DATASTORE
493       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
494                   "Response transmitted, more pending!\n");
495 #endif
496     }
497   GNUNET_SERVER_client_drop (tcc->client);
498   GNUNET_free (tcc->msg);
499   GNUNET_free (tcc);
500   return msize;
501 }
502
503
504 /**
505  * Transmit the given message to the client.
506  *
507  * @param client target of the message
508  * @param msg message to transmit, will be freed!
509  * @param tc function to call afterwards
510  * @param tc_cls closure for tc
511  * @param end is this the last response (and we should
512  *        signal the server completion accodingly after
513  *        transmitting this message)?
514  */
515 static void
516 transmit (struct GNUNET_SERVER_Client *client,
517           struct GNUNET_MessageHeader *msg,
518           TransmitContinuation tc,
519           void *tc_cls,
520           int end)
521 {
522   struct TransmitCallbackContext *tcc;
523
524   if (GNUNET_YES == cleaning_done)
525     {
526       if (NULL != tc)
527         tc (tc_cls, GNUNET_SYSERR);
528       return;
529     }
530   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
531   tcc->msg = msg;
532   tcc->client = client;
533   tcc->tc = tc;
534   tcc->tc_cls = tc_cls;
535   tcc->end = end;
536   if (NULL ==
537       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
538                                                       ntohs(msg->size),
539                                                       GNUNET_TIME_UNIT_FOREVER_REL,
540                                                       &transmit_callback,
541                                                       tcc)))
542     {
543       GNUNET_break (0);
544       if (GNUNET_YES == end)
545         {
546 #if DEBUG_DATASTORE
547           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
548                       "Disconnecting client.\n");
549 #endif    
550           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
551         }
552       if (NULL != tc)
553         tc (tc_cls, GNUNET_SYSERR);
554       GNUNET_free (msg);
555       GNUNET_free (tcc);
556       return;
557     }
558   GNUNET_SERVER_client_keep (client);
559   GNUNET_CONTAINER_DLL_insert (tcc_head,
560                                tcc_tail,
561                                tcc);
562 }
563
564
565 /**
566  * Transmit a status code to the client.
567  *
568  * @param client receiver of the response
569  * @param code status code
570  * @param msg optional error message (can be NULL)
571  */
572 static void
573 transmit_status (struct GNUNET_SERVER_Client *client,
574                  int code,
575                  const char *msg)
576 {
577   struct StatusMessage *sm;
578   size_t slen;
579
580 #if DEBUG_DATASTORE
581   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
582               "Transmitting `%s' message with value %d and message `%s'\n",
583               "STATUS",
584               code,
585               msg != NULL ? msg : "(none)");
586 #endif
587   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
588   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
589   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
590   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
591   sm->status = htonl(code);
592   if (slen > 0)
593     memcpy (&sm[1], msg, slen);  
594   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
595 }
596
597
598 /**
599  * Function called once the transmit operation has
600  * either failed or succeeded.
601  *
602  * @param next_cls closure for calling "next_request" callback
603  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
604  */
605 static void 
606 get_next(void *next_cls,
607          int status)
608 {
609   if (status != GNUNET_OK)
610     {
611       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
612                   _("Failed to transmit an item to the client; aborting iteration.\n"));
613       if (plugin != NULL)
614         plugin->api->next_request (next_cls, GNUNET_YES);
615       return;
616     }
617   plugin->api->next_request (next_cls, GNUNET_NO);
618 }
619
620
621 /**
622  * Function that will transmit the given datastore entry
623  * to the client.
624  *
625  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
626  * @param next_cls closure to use to ask for the next item
627  * @param key key for the content
628  * @param size number of bytes in data
629  * @param data content stored
630  * @param type type of the content
631  * @param priority priority of the content
632  * @param anonymity anonymity-level for the content
633  * @param expiration expiration time for the content
634  * @param uid unique identifier for the datum;
635  *        maybe 0 if no unique identifier is available
636  *
637  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
638  *         GNUNET_NO to delete the item and continue (if supported)
639  */
640 static int
641 transmit_item (void *cls,
642                void *next_cls,
643                const GNUNET_HashCode * key,
644                uint32_t size,
645                const void *data,
646                enum GNUNET_BLOCK_Type type,
647                uint32_t priority,
648                uint32_t anonymity,
649                struct GNUNET_TIME_Absolute
650                expiration, uint64_t uid)
651 {
652   struct GNUNET_SERVER_Client *client = cls;
653   struct GNUNET_MessageHeader *end;
654   struct DataMessage *dm;
655
656   if (key == NULL)
657     {
658       /* transmit 'DATA_END' */
659 #if DEBUG_DATASTORE
660       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
661                   "Transmitting `%s' message\n",
662                   "DATA_END");
663 #endif
664       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
665       end->size = htons(sizeof(struct GNUNET_MessageHeader));
666       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
667       transmit (client, end, NULL, NULL, GNUNET_YES);
668       GNUNET_SERVER_client_drop (client);
669       return GNUNET_OK;
670     }
671   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
672   dm->header.size = htons(sizeof(struct DataMessage) + size);
673   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
674   dm->rid = htonl(0);
675   dm->size = htonl(size);
676   dm->type = htonl(type);
677   dm->priority = htonl(priority);
678   dm->anonymity = htonl(anonymity);
679   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
680   dm->uid = GNUNET_htonll(uid);
681   dm->key = *key;
682   memcpy (&dm[1], data, size);
683 #if DEBUG_DATASTORE
684   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
685               "Transmitting `%s' message\n",
686               "DATA");
687 #endif
688   GNUNET_STATISTICS_update (stats,
689                             gettext_noop ("# results found"),
690                             1,
691                             GNUNET_NO);
692   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
693   return GNUNET_OK;
694 }
695
696
697 /**
698  * Handle RESERVE-message.
699  *
700  * @param cls closure
701  * @param client identification of the client
702  * @param message the actual message
703  */
704 static void
705 handle_reserve (void *cls,
706                 struct GNUNET_SERVER_Client *client,
707                 const struct GNUNET_MessageHeader *message)
708 {
709   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
710   struct ReservationList *e;
711   unsigned long long used;
712   unsigned long long req;
713   uint64_t amount;
714   uint32_t entries;
715
716 #if DEBUG_DATASTORE
717   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
718               "Processing `%s' request\n",
719               "RESERVE");
720 #endif
721   amount = GNUNET_ntohll(msg->amount);
722   entries = ntohl(msg->entries);
723   used = plugin->api->get_size (plugin->api->cls) + reserved;
724   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
725   if (used + req > quota)
726     {
727       if (quota < used)
728         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
729       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
730                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
731                   quota - used,
732                   "RESERVE",
733                   req);
734       if (cache_size < req)
735         {
736           /* TODO: document this in the FAQ; essentially, if this
737              message happens, the insertion request could be blocked
738              by less-important content from migration because it is
739              larger than 1/8th of the overall available space, and
740              we only reserve 1/8th for "fresh" insertions */
741           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
742                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
743                       req,
744                       cache_size);
745           transmit_status (client, 0, 
746                            gettext_noop ("Insufficient space to satisfy request and "
747                                          "requested amount is larger than cache size"));
748         }
749       else
750         {
751           transmit_status (client, 0, 
752                            gettext_noop ("Insufficient space to satisfy request"));
753         }
754       return;      
755     }
756   reserved += req;
757   e = GNUNET_malloc (sizeof(struct ReservationList));
758   e->next = reservations;
759   reservations = e;
760   e->client = client;
761   e->amount = amount;
762   e->entries = entries;
763   e->rid = ++reservation_gen;
764   if (reservation_gen < 0)
765     reservation_gen = 0; /* wrap around */
766   transmit_status (client, e->rid, NULL);
767 }
768
769
770 /**
771  * Handle RELEASE_RESERVE-message.
772  *
773  * @param cls closure
774  * @param client identification of the client
775  * @param message the actual message
776  */
777 static void
778 handle_release_reserve (void *cls,
779                         struct GNUNET_SERVER_Client *client,
780                         const struct GNUNET_MessageHeader *message)
781 {
782   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
783   struct ReservationList *pos;
784   struct ReservationList *prev;
785   struct ReservationList *next;
786   int rid = ntohl(msg->rid);
787   unsigned long long rem;
788
789 #if DEBUG_DATASTORE
790   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791               "Processing `%s' request\n",
792               "RELEASE_RESERVE");
793 #endif
794   next = reservations;
795   prev = NULL;
796   while (NULL != (pos = next))
797     {
798       next = pos->next;
799       if (rid == pos->rid)
800         {
801           if (prev == NULL)
802             reservations = next;
803           else
804             prev->next = next;
805           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
806           GNUNET_assert (reserved >= rem);
807           reserved -= rem;
808 #if DEBUG_DATASTORE
809           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810                       "Returning %llu remaining reserved bytes to storage pool\n",
811                       rem);
812 #endif    
813           GNUNET_free (pos);
814           transmit_status (client, GNUNET_OK, NULL);
815           return;
816         }       
817       prev = pos;
818     }
819   GNUNET_break (0);
820   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
821 }
822
823
824 /**
825  * Check that the given message is a valid data message.
826  *
827  * @return NULL if the message is not well-formed, otherwise the message
828  */
829 static const struct DataMessage *
830 check_data (const struct GNUNET_MessageHeader *message)
831 {
832   uint16_t size;
833   uint32_t dsize;
834   const struct DataMessage *dm;
835
836   size = ntohs(message->size);
837   if (size < sizeof(struct DataMessage))
838     { 
839       GNUNET_break (0);
840       return NULL;
841     }
842   dm = (const struct DataMessage *) message;
843   dsize = ntohl(dm->size);
844   if (size != dsize + sizeof(struct DataMessage))
845     {
846       GNUNET_break (0);
847       return NULL;
848     }
849   return dm;
850 }
851
852
853 /**
854  * Handle PUT-message.
855  *
856  * @param cls closure
857  * @param client identification of the client
858  * @param message the actual message
859  */
860 static void
861 handle_put (void *cls,
862             struct GNUNET_SERVER_Client *client,
863             const struct GNUNET_MessageHeader *message)
864 {
865   const struct DataMessage *dm = check_data (message);
866   char *msg;
867   int ret;
868   int rid;
869   struct ReservationList *pos;
870   uint32_t size;
871
872   if ( (dm == NULL) ||
873        (ntohl(dm->type) == 0) ) 
874     {
875       GNUNET_break (0);
876       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
877       return;
878     }
879 #if DEBUG_DATASTORE
880   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
881               "Processing `%s' request for `%s'\n",
882               "PUT",
883               GNUNET_h2s (&dm->key));
884 #endif
885   rid = ntohl(dm->rid);
886   size = ntohl(dm->size);
887   if (rid > 0)
888     {
889       pos = reservations;
890       while ( (NULL != pos) &&
891               (rid != pos->rid) )
892         pos = pos->next;
893       GNUNET_break (pos != NULL);
894       if (NULL != pos)
895         {
896           GNUNET_break (pos->entries > 0);
897           GNUNET_break (pos->amount > size);
898           pos->entries--;
899           pos->amount -= size;
900           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
901         }
902     }
903   msg = NULL;
904   ret = plugin->api->put (plugin->api->cls,
905                           &dm->key,
906                           size,
907                           &dm[1],
908                           ntohl(dm->type),
909                           ntohl(dm->priority),
910                           ntohl(dm->anonymity),
911                           GNUNET_TIME_absolute_ntoh(dm->expiration),
912                           &msg);
913   if (GNUNET_OK == ret)
914     {
915       GNUNET_STATISTICS_update (stats,
916                                 gettext_noop ("# bytes stored"),
917                                 size,
918                                 GNUNET_YES);
919       GNUNET_CONTAINER_bloomfilter_add (filter,
920                                         &dm->key);
921 #if DEBUG_DATASTORE
922       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
923                   "Successfully stored %u bytes under key `%s'\n",
924                   size,
925                   GNUNET_h2s (&dm->key));
926 #endif
927     }
928   transmit_status (client, 
929                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
930                    msg);
931   GNUNET_free_non_null (msg);
932   if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
933     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
934 }
935
936
937 /**
938  * Handle GET-message.
939  *
940  * @param cls closure
941  * @param client identification of the client
942  * @param message the actual message
943  */
944 static void
945 handle_get (void *cls,
946             struct GNUNET_SERVER_Client *client,
947             const struct GNUNET_MessageHeader *message)
948 {
949   const struct GetMessage *msg;
950   uint16_t size;
951
952   size = ntohs(message->size);
953   if ( (size != sizeof(struct GetMessage)) &&
954        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
955     {
956       GNUNET_break (0);
957       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
958       return;
959     }
960   msg = (const struct GetMessage*) message;
961 #if DEBUG_DATASTORE
962   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
963               "Processing `%s' request for `%s' of type %u\n",
964               "GET",
965               GNUNET_h2s (&msg->key),
966               ntohl (msg->type));
967 #endif
968   GNUNET_STATISTICS_update (stats,
969                             gettext_noop ("# GET requests received"),
970                             1,
971                             GNUNET_NO);
972   GNUNET_SERVER_client_keep (client);
973   if ( (size == sizeof(struct GetMessage)) &&
974        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
975                                                          &msg->key)) )
976     {
977       /* don't bother database... */
978 #if DEBUG_DATASTORE
979       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980                   "Empty result set for `%s' request for `%s'.\n",
981                   "GET",
982                   GNUNET_h2s (&msg->key));
983 #endif  
984       GNUNET_STATISTICS_update (stats,
985                                 gettext_noop ("# requests filtered by bloomfilter"),
986                                 1,
987                                 GNUNET_NO);
988       transmit_item (client,
989                      NULL, NULL, 0, NULL, 0, 0, 0, 
990                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
991       return;
992     }
993   plugin->api->get (plugin->api->cls,
994                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
995                     NULL,
996                     ntohl(msg->type),
997                     &transmit_item,
998                     client);    
999 }
1000
1001
1002 /**
1003  * Handle UPDATE-message.
1004  *
1005  * @param cls closure
1006  * @param client identification of the client
1007  * @param message the actual message
1008  */
1009 static void
1010 handle_update (void *cls,
1011                struct GNUNET_SERVER_Client *client,
1012                const struct GNUNET_MessageHeader *message)
1013 {
1014   const struct UpdateMessage *msg;
1015   int ret;
1016   char *emsg;
1017
1018   GNUNET_STATISTICS_update (stats,
1019                             gettext_noop ("# UPDATE requests received"),
1020                             1,
1021                             GNUNET_NO);
1022   msg = (const struct UpdateMessage*) message;
1023   emsg = NULL;
1024 #if DEBUG_DATASTORE
1025   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1026               "Processing `%s' request for %llu\n",
1027               "UPDATE",
1028               (unsigned long long) GNUNET_ntohll (msg->uid));
1029 #endif
1030   ret = plugin->api->update (plugin->api->cls,
1031                              GNUNET_ntohll(msg->uid),
1032                              (int32_t) ntohl(msg->priority),
1033                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1034                              &emsg);
1035   transmit_status (client, ret, emsg);
1036   GNUNET_free_non_null (emsg);
1037 }
1038
1039
1040 /**
1041  * Handle GET_RANDOM-message.
1042  *
1043  * @param cls closure
1044  * @param client identification of the client
1045  * @param message the actual message
1046  */
1047 static void
1048 handle_get_random (void *cls,
1049                    struct GNUNET_SERVER_Client *client,
1050                    const struct GNUNET_MessageHeader *message)
1051 {
1052 #if DEBUG_DATASTORE
1053   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1054               "Processing `%s' request\n",
1055               "GET_RANDOM");
1056 #endif
1057   GNUNET_STATISTICS_update (stats,
1058                             gettext_noop ("# GET RANDOM requests received"),
1059                             1,
1060                             GNUNET_NO);
1061   GNUNET_SERVER_client_keep (client);
1062   plugin->api->iter_migration_order (plugin->api->cls,
1063                                      0,
1064                                      &transmit_item,
1065                                      client);  
1066 }
1067
1068
1069 /**
1070  * Context for the 'remove_callback'.
1071  */
1072 struct RemoveContext 
1073 {
1074   /**
1075    * Client for whom we're doing the remvoing.
1076    */
1077   struct GNUNET_SERVER_Client *client;
1078
1079   /**
1080    * GNUNET_YES if we managed to remove something.
1081    */
1082   int found;
1083 };
1084
1085
1086 /**
1087  * Callback function that will cause the item that is passed
1088  * in to be deleted (by returning GNUNET_NO).
1089  */
1090 static int
1091 remove_callback (void *cls,
1092                  void *next_cls,
1093                  const GNUNET_HashCode * key,
1094                  uint32_t size,
1095                  const void *data,
1096                  enum GNUNET_BLOCK_Type type,
1097                  uint32_t priority,
1098                  uint32_t anonymity,
1099                  struct GNUNET_TIME_Absolute
1100                  expiration, uint64_t uid)
1101 {
1102   struct RemoveContext *rc = cls;
1103
1104   if (key == NULL)
1105     {
1106 #if DEBUG_DATASTORE
1107       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1108                   "No further matches for `%s' request.\n",
1109                   "REMOVE");
1110 #endif  
1111       if (GNUNET_YES == rc->found)
1112         transmit_status (rc->client, GNUNET_OK, NULL);       
1113       else
1114         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1115       GNUNET_SERVER_client_drop (rc->client);
1116       GNUNET_free (rc);
1117       return GNUNET_OK; /* last item */
1118     }
1119   rc->found = GNUNET_YES;
1120 #if DEBUG_DATASTORE
1121   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1122               "Item %llu matches `%s' request for key `%s'.\n",
1123               (unsigned long long) uid,
1124               "REMOVE",
1125               GNUNET_h2s (key));
1126 #endif  
1127   GNUNET_STATISTICS_update (stats,
1128                             gettext_noop ("# bytes removed (explicit request)"),
1129                             size,
1130                             GNUNET_YES);
1131   GNUNET_CONTAINER_bloomfilter_remove (filter,
1132                                        key);
1133   plugin->api->next_request (next_cls, GNUNET_YES);
1134   return GNUNET_NO;
1135 }
1136
1137
1138 /**
1139  * Handle REMOVE-message.
1140  *
1141  * @param cls closure
1142  * @param client identification of the client
1143  * @param message the actual message
1144  */
1145 static void
1146 handle_remove (void *cls,
1147              struct GNUNET_SERVER_Client *client,
1148              const struct GNUNET_MessageHeader *message)
1149 {
1150   const struct DataMessage *dm = check_data (message);
1151   GNUNET_HashCode vhash;
1152   struct RemoveContext *rc;
1153
1154   if (dm == NULL)
1155     {
1156       GNUNET_break (0);
1157       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1158       return;
1159     }
1160 #if DEBUG_DATASTORE
1161   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1162               "Processing `%s' request for `%s'\n",
1163               "REMOVE",
1164               GNUNET_h2s (&dm->key));
1165 #endif
1166   GNUNET_STATISTICS_update (stats,
1167                             gettext_noop ("# REMOVE requests received"),
1168                             1,
1169                             GNUNET_NO);
1170   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1171   GNUNET_SERVER_client_keep (client);
1172   rc->client = client;
1173   GNUNET_CRYPTO_hash (&dm[1],
1174                       ntohl(dm->size),
1175                       &vhash);
1176   plugin->api->get (plugin->api->cls,
1177                     &dm->key,
1178                     &vhash,
1179                     ntohl(dm->type),
1180                     &remove_callback,
1181                     rc);
1182 }
1183
1184
1185 /**
1186  * Handle DROP-message.
1187  *
1188  * @param cls closure
1189  * @param client identification of the client
1190  * @param message the actual message
1191  */
1192 static void
1193 handle_drop (void *cls,
1194              struct GNUNET_SERVER_Client *client,
1195              const struct GNUNET_MessageHeader *message)
1196 {
1197 #if DEBUG_DATASTORE
1198   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1199               "Processing `%s' request\n",
1200               "DROP");
1201 #endif
1202   plugin->api->drop (plugin->api->cls);
1203   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1204 }
1205
1206
1207 /**
1208  * List of handlers for the messages understood by this
1209  * service.
1210  */
1211 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1212   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1213    sizeof(struct ReserveMessage) }, 
1214   {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1215    sizeof(struct ReleaseReserveMessage) }, 
1216   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1217   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1218    sizeof (struct UpdateMessage) }, 
1219   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1220   {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1221    sizeof(struct GNUNET_MessageHeader) }, 
1222   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1223   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1224    sizeof(struct GNUNET_MessageHeader) }, 
1225   {NULL, NULL, 0, 0}
1226 };
1227
1228
1229
1230 /**
1231  * Load the datastore plugin.
1232  */
1233 static struct DatastorePlugin *
1234 load_plugin () 
1235 {
1236   struct DatastorePlugin *ret;
1237   char *libname;
1238   char *name;
1239
1240   if (GNUNET_OK !=
1241       GNUNET_CONFIGURATION_get_value_string (cfg,
1242                                              "DATASTORE", "DATABASE", &name))
1243     {
1244       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1245                   _("No `%s' specified for `%s' in configuration!\n"),
1246                   "DATABASE",
1247                   "DATASTORE");
1248       return NULL;
1249     }
1250   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1251   ret->env.cfg = cfg;
1252   ret->env.sched = sched;  
1253   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1254               _("Loading `%s' datastore plugin\n"), name);
1255   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1256   ret->short_name = name;
1257   ret->lib_name = libname;
1258   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1259   if (ret->api == NULL)
1260     {
1261       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1262                   _("Failed to load datastore plugin for `%s'\n"), name);
1263       GNUNET_free (ret->short_name);
1264       GNUNET_free (libname);
1265       GNUNET_free (ret);
1266       return NULL;
1267     }
1268   return ret;
1269 }
1270
1271
1272 /**
1273  * Function called when the service shuts
1274  * down.  Unloads our datastore plugin.
1275  *
1276  * @param plug plugin to unload
1277  */
1278 static void
1279 unload_plugin (struct DatastorePlugin *plug)
1280 {
1281 #if DEBUG_DATASTORE
1282   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1283               "Datastore service is unloading plugin...\n");
1284 #endif
1285   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1286   GNUNET_free (plug->lib_name);
1287   GNUNET_free (plug->short_name);
1288   GNUNET_free (plug);
1289 }
1290
1291
1292 /**
1293  * Final task run after shutdown.  Unloads plugins and disconnects us from
1294  * statistics.
1295  */
1296 static void
1297 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1298 {
1299   unload_plugin (plugin);
1300   plugin = NULL;
1301   if (filter != NULL)
1302     {
1303       GNUNET_CONTAINER_bloomfilter_free (filter);
1304       filter = NULL;
1305     }
1306   if (stats != NULL)
1307     {
1308       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1309       stats = NULL;
1310     }
1311 }
1312
1313
1314 /**
1315  * Last task run during shutdown.  Disconnects us from
1316  * the transport and core.
1317  */
1318 static void
1319 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1320 {
1321   struct TransmitCallbackContext *tcc;
1322
1323   cleaning_done = GNUNET_YES;
1324   while (NULL != (tcc = tcc_head))
1325     {
1326       GNUNET_CONTAINER_DLL_remove (tcc_head,
1327                                    tcc_tail,
1328                                    tcc);
1329       if (tcc->th != NULL)
1330         {
1331           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1332           GNUNET_SERVER_client_drop (tcc->client);
1333         }
1334    if (NULL != tcc->tc)
1335         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1336       GNUNET_free (tcc->msg);
1337       GNUNET_free (tcc);
1338     }
1339   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1340     {
1341       GNUNET_SCHEDULER_cancel (sched,
1342                                expired_kill_task);
1343       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1344     }
1345   GNUNET_SCHEDULER_add_continuation (sched,
1346                                      &unload_task,
1347                                      NULL,
1348                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1349 }
1350
1351
1352 /**
1353  * Function that removes all active reservations made
1354  * by the given client and releases the space for other
1355  * requests.
1356  *
1357  * @param cls closure
1358  * @param client identification of the client
1359  */
1360 static void
1361 cleanup_reservations (void *cls,
1362                       struct GNUNET_SERVER_Client
1363                       * client)
1364 {
1365   struct ReservationList *pos;
1366   struct ReservationList *prev;
1367   struct ReservationList *next;
1368
1369   if (client == NULL)
1370     return;
1371   prev = NULL;
1372   pos = reservations;
1373   while (NULL != pos)
1374     {
1375       next = pos->next;
1376       if (pos->client == client)
1377         {
1378           if (prev == NULL)
1379             reservations = next;
1380           else
1381             prev->next = next;
1382           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1383           GNUNET_free (pos);
1384         }
1385       else
1386         {
1387           prev = pos;
1388         }
1389       pos = next;
1390     }
1391 }
1392
1393
1394 /**
1395  * Process datastore requests.
1396  *
1397  * @param cls closure
1398  * @param s scheduler to use
1399  * @param server the initialized server
1400  * @param c configuration to use
1401  */
1402 static void
1403 run (void *cls,
1404      struct GNUNET_SCHEDULER_Handle *s,
1405      struct GNUNET_SERVER_Handle *server,
1406      const struct GNUNET_CONFIGURATION_Handle *c)
1407 {
1408   char *fn;
1409   unsigned int bf_size;
1410
1411   sched = s;
1412   cfg = c;
1413   if (GNUNET_OK !=
1414       GNUNET_CONFIGURATION_get_value_number (cfg,
1415                                              "DATASTORE", "QUOTA", &quota))
1416     {
1417       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1418                   _("No `%s' specified for `%s' in configuration!\n"),
1419                   "QUOTA",
1420                   "DATASTORE");
1421       return;
1422     }
1423   stats = GNUNET_STATISTICS_create (sched, "datastore", cfg);
1424   cache_size = quota / 8; /* Or should we make this an option? */
1425   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1426   fn = NULL;
1427   if ( (GNUNET_OK !=
1428         GNUNET_CONFIGURATION_get_value_filename (cfg,
1429                                                  "DATASTORE",
1430                                                  "BLOOMFILTER",
1431                                                  &fn)) ||
1432        (GNUNET_OK !=
1433         GNUNET_DISK_directory_create_for_file (fn)) )
1434     {
1435       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1436                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1437                   fn != NULL ? fn : "");
1438       GNUNET_free_non_null (fn);
1439       fn = NULL;
1440     }
1441   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1442   GNUNET_free_non_null (fn);
1443   if (filter == NULL)
1444     {
1445       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1446                   _("Failed to initialize bloomfilter.\n"));
1447       if (stats != NULL)
1448         {
1449           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1450           stats = NULL;
1451         }
1452       return;
1453     }
1454   plugin = load_plugin ();
1455   if (NULL == plugin)
1456     {
1457       GNUNET_CONTAINER_bloomfilter_free (filter);
1458       filter = NULL;
1459       if (stats != NULL)
1460         {
1461           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1462           stats = NULL;
1463         }
1464       return;
1465     }
1466   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1467   GNUNET_SERVER_add_handlers (server, handlers);
1468   expired_kill_task
1469     = GNUNET_SCHEDULER_add_with_priority (sched,
1470                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1471                                           &delete_expired, NULL);
1472   GNUNET_SCHEDULER_add_delayed (sched,
1473                                 GNUNET_TIME_UNIT_FOREVER_REL,
1474                                 &cleaning_task, NULL);
1475   
1476 }
1477
1478
1479 /**
1480  * The main function for the datastore service.
1481  *
1482  * @param argc number of arguments from the command line
1483  * @param argv command line arguments
1484  * @return 0 ok, 1 on error
1485  */
1486 int
1487 main (int argc, char *const *argv)
1488 {
1489   int ret;
1490
1491   ret = (GNUNET_OK ==
1492          GNUNET_SERVICE_run (argc,
1493                              argv,
1494                              "datastore",
1495                              GNUNET_SERVICE_OPTION_NONE,
1496                              &run, NULL)) ? 0 : 1;
1497   return ret;
1498 }
1499
1500
1501 /* end of gnunet-service-datastore.c */