fix datastore quota management -- especially if stats is not working
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_arm_service.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "plugin_datastore.h"
33 #include "datastore.h"
34
35 /**
36  * How many messages do we queue at most per client?
37  */
38 #define MAX_PENDING 1024
39
40 /**
41  * How long are we at most keeping "expired" content
42  * past the expiration date in the database?
43  */
44 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
45
46
47
48 /**
49  * Our datastore plugin.
50  */
51 struct DatastorePlugin
52 {
53
54   /**
55    * API of the transport as returned by the plugin's
56    * initialization function.
57    */
58   struct GNUNET_DATASTORE_PluginFunctions *api;
59
60   /**
61    * Short name for the plugin (i.e. "sqlite").
62    */
63   char *short_name;
64
65   /**
66    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
67    */
68   char *lib_name;
69
70   /**
71    * Environment this transport service is using
72    * for this plugin.
73    */
74   struct GNUNET_DATASTORE_PluginEnvironment env;
75
76 };
77
78
79 /**
80  * Linked list of active reservations.
81  */
82 struct ReservationList 
83 {
84
85   /**
86    * This is a linked list.
87    */
88   struct ReservationList *next;
89
90   /**
91    * Client that made the reservation.
92    */
93   struct GNUNET_SERVER_Client *client;
94
95   /**
96    * Number of bytes (still) reserved.
97    */
98   uint64_t amount;
99
100   /**
101    * Number of items (still) reserved.
102    */
103   uint64_t entries;
104
105   /**
106    * Reservation identifier.
107    */
108   int32_t rid;
109
110 };
111
112
113 /**
114  * Our datastore plugin (NULL if not available).
115  */
116 static struct DatastorePlugin *plugin;
117
118 /**
119  * Linked list of space reservations made by clients.
120  */
121 static struct ReservationList *reservations;
122
123 /**
124  * Bloomfilter to quickly tell if we don't have the content.
125  */
126 static struct GNUNET_CONTAINER_BloomFilter *filter;
127
128 /**
129  * Static counter to produce reservation identifiers.
130  */
131 static int reservation_gen;
132
133 /**
134  * How much space are we allowed to use?
135  */
136 static unsigned long long quota;
137
138 /**
139  * How much space are we using for the cache?  (space available for
140  * insertions that will be instantly reclaimed by discarding less
141  * important content --- or possibly whatever we just inserted into
142  * the "cache").
143  */
144 static unsigned long long cache_size;
145
146 /**
147  * How much space have we currently reserved?
148  */
149 static unsigned long long reserved;
150
151 /**
152  * Identity of the task that is used to delete
153  * expired content.
154  */
155 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
156
157 /**
158  * Our configuration.
159  */
160 const struct GNUNET_CONFIGURATION_Handle *cfg;
161
162 /**
163  * Our scheduler.
164  */
165 struct GNUNET_SCHEDULER_Handle *sched; 
166
167 /**
168  * Handle for reporting statistics.
169  */
170 static struct GNUNET_STATISTICS_Handle *stats;
171
172
173 /**
174  * Function called once the transmit operation has
175  * either failed or succeeded.
176  *
177  * @param cls closure
178  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
179  */
180 typedef void (*TransmitContinuation)(void *cls,
181                                      int status);
182
183
184 /**
185  * Context for transmitting replies to clients.
186  */
187 struct TransmitCallbackContext 
188 {
189   
190   /**
191    * We keep these in a doubly-linked list (for cleanup).
192    */
193   struct TransmitCallbackContext *next;
194   
195   /**
196    * We keep these in a doubly-linked list (for cleanup).
197    */
198   struct TransmitCallbackContext *prev;
199   
200   /**
201    * The message that we're asked to transmit.
202    */
203   struct GNUNET_MessageHeader *msg;
204   
205   /**
206    * Handle for the transmission request.
207    */
208   struct GNUNET_CONNECTION_TransmitHandle *th;
209
210   /**
211    * Client that we are transmitting to.
212    */
213   struct GNUNET_SERVER_Client *client;
214
215   /**
216    * Function to call once msg has been transmitted
217    * (or at least added to the buffer).
218    */
219   TransmitContinuation tc;
220
221   /**
222    * Closure for tc.
223    */
224   void *tc_cls;
225
226   /**
227    * GNUNET_YES if we are supposed to signal the server
228    * completion of the client's request.
229    */
230   int end;
231 };
232
233   
234 /**
235  * Head of the doubly-linked list (for cleanup).
236  */
237 static struct TransmitCallbackContext *tcc_head;
238
239 /**
240  * Tail of the doubly-linked list (for cleanup).
241  */
242 static struct TransmitCallbackContext *tcc_tail;
243
244 /**
245  * Have we already clean ed up the TCCs and are hence no longer
246  * willing (or able) to transmit anything to anyone?
247  */
248 static int cleaning_done;
249
250 /**
251  * Task that is used to remove expired entries from
252  * the datastore.  This task will schedule itself
253  * again automatically to always delete all expired
254  * content quickly.
255  *
256  * @param cls not used
257  * @param tc task context
258  */ 
259 static void
260 delete_expired (void *cls,
261                 const struct GNUNET_SCHEDULER_TaskContext *tc);
262
263
264 /**
265  * Iterate over the expired items stored in the datastore.
266  * Delete all expired items; once we have processed all
267  * expired items, re-schedule the "delete_expired" task.
268  *
269  * @param cls not used
270  * @param next_cls closure to pass to the "next" function.
271  * @param key key for the content
272  * @param size number of bytes in data
273  * @param data content stored
274  * @param type type of the content
275  * @param priority priority of the content
276  * @param anonymity anonymity-level for the content
277  * @param expiration expiration time for the content
278  * @param uid unique identifier for the datum;
279  *        maybe 0 if no unique identifier is available
280  *
281  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
282  *         (continue on call to "next", of course),
283  *         GNUNET_NO to delete the item and continue (if supported)
284  */
285 static int 
286 expired_processor (void *cls,
287                    void *next_cls,
288                    const GNUNET_HashCode * key,
289                    uint32_t size,
290                    const void *data,
291                    uint32_t type,
292                    uint32_t priority,
293                    uint32_t anonymity,
294                    struct GNUNET_TIME_Absolute
295                    expiration, 
296                    uint64_t uid)
297 {
298   struct GNUNET_TIME_Absolute now;
299
300   if (key == NULL) 
301     {
302       expired_kill_task 
303         = GNUNET_SCHEDULER_add_delayed (sched,
304                                         MAX_EXPIRE_DELAY,
305                                         &delete_expired,
306                                         NULL);
307       return GNUNET_SYSERR;
308     }
309   now = GNUNET_TIME_absolute_get ();
310   if (expiration.value > now.value)
311     {
312       /* finished processing */
313       plugin->api->next_request (next_cls, GNUNET_YES);
314       return GNUNET_SYSERR;
315     }
316   plugin->api->next_request (next_cls, GNUNET_NO);
317 #if DEBUG_DATASTORE
318   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
319               "Deleting content that expired %llu ms ago\n",
320               (unsigned long long) (now.value - expiration.value));
321 #endif
322   GNUNET_STATISTICS_update (stats,
323                             gettext_noop ("# bytes expired"),
324                             size,
325                             GNUNET_NO);
326   GNUNET_CONTAINER_bloomfilter_remove (filter,
327                                        key);
328   return GNUNET_NO; /* delete */
329 }
330
331
332 /**
333  * Task that is used to remove expired entries from
334  * the datastore.  This task will schedule itself
335  * again automatically to always delete all expired
336  * content quickly.
337  *
338  * @param cls not used
339  * @param tc task context
340  */ 
341 static void
342 delete_expired (void *cls,
343                 const struct GNUNET_SCHEDULER_TaskContext *tc)
344 {
345   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
346   plugin->api->iter_ascending_expiration (plugin->api->cls, 
347                                           0,
348                                           &expired_processor,
349                                           NULL);
350 }
351
352
353 /**
354  * An iterator over a set of items stored in the datastore.
355  *
356  * @param cls closure
357  * @param next_cls closure to pass to the "next" function.
358  * @param key key for the content
359  * @param size number of bytes in data
360  * @param data content stored
361  * @param type type of the content
362  * @param priority priority of the content
363  * @param anonymity anonymity-level for the content
364  * @param expiration expiration time for the content
365  * @param uid unique identifier for the datum;
366  *        maybe 0 if no unique identifier is available
367  *
368  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
369  *         (continue on call to "next", of course),
370  *         GNUNET_NO to delete the item and continue (if supported)
371  */
372 static int 
373 manage (void *cls,
374         void *next_cls,
375         const GNUNET_HashCode * key,
376         uint32_t size,
377         const void *data,
378         uint32_t type,
379         uint32_t priority,
380         uint32_t anonymity,
381         struct GNUNET_TIME_Absolute
382         expiration, 
383         uint64_t uid)
384 {
385   unsigned long long *need = cls;
386
387   if (NULL == key)
388     {
389       GNUNET_free (need);
390       return GNUNET_SYSERR;
391     }
392   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
393     *need = 0;
394   else
395     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
396   plugin->api->next_request (next_cls, 
397                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
398 #if DEBUG_DATASTORE
399   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
400               "Deleting %llu bytes of low-priority content (still trying to free another %llu bytes)\n",
401               size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
402               *need);
403 #endif
404   GNUNET_STATISTICS_update (stats,
405                             gettext_noop ("# bytes purged (low-priority)"),
406                             size,
407                             GNUNET_NO);
408   GNUNET_CONTAINER_bloomfilter_remove (filter,
409                                        key);
410   return GNUNET_NO;
411 }
412
413
414 /**
415  * Manage available disk space by running tasks
416  * that will discard content if necessary.  This
417  * function will be run whenever a request for
418  * "need" bytes of storage could only be satisfied
419  * by eating into the "cache" (and we want our cache
420  * space back).
421  *
422  * @param need number of bytes of content that were
423  *        placed into the "cache" (and hence the
424  *        number of bytes that should be removed).
425  */
426 static void
427 manage_space (unsigned long long need)
428 {
429   unsigned long long *n;
430
431 #if DEBUG_DATASTORE
432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433               "Asked to free up %llu bytes of cache space\n",
434               need);
435 #endif
436   n = GNUNET_malloc (sizeof(unsigned long long));
437   *n = need;
438   plugin->api->iter_low_priority (plugin->api->cls,
439                                   0,
440                                   &manage,
441                                   n);
442 }
443
444
445 /**
446  * Function called to notify a client about the socket
447  * begin ready to queue more data.  "buf" will be
448  * NULL and "size" zero if the socket was closed for
449  * writing in the meantime.
450  *
451  * @param cls closure
452  * @param size number of bytes available in buf
453  * @param buf where the callee should write the message
454  * @return number of bytes written to buf
455  */
456 static size_t
457 transmit_callback (void *cls,
458                    size_t size, void *buf)
459 {
460   struct TransmitCallbackContext *tcc = cls;
461   size_t msize;
462   
463   tcc->th = NULL;
464   GNUNET_CONTAINER_DLL_remove (tcc_head,
465                                tcc_tail,
466                                tcc);
467   msize = ntohs(tcc->msg->size);
468   if (size == 0)
469     {
470 #if DEBUG_DATASTORE
471       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
472                   "Transmission failed.\n");
473 #endif
474       if (tcc->tc != NULL)
475         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
476       if (GNUNET_YES == tcc->end)
477         GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
478       GNUNET_SERVER_client_drop (tcc->client);
479       GNUNET_free (tcc->msg);
480       GNUNET_free (tcc);
481       return 0;
482     }
483   GNUNET_assert (size >= msize);
484   memcpy (buf, tcc->msg, msize);
485   if (tcc->tc != NULL)
486     tcc->tc (tcc->tc_cls, GNUNET_OK);
487   if (GNUNET_YES == tcc->end)
488     {
489       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
490     }
491   else
492     {
493 #if DEBUG_DATASTORE
494       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
495                   "Response transmitted, more pending!\n");
496 #endif
497     }
498   GNUNET_SERVER_client_drop (tcc->client);
499   GNUNET_free (tcc->msg);
500   GNUNET_free (tcc);
501   return msize;
502 }
503
504
505 /**
506  * Transmit the given message to the client.
507  *
508  * @param client target of the message
509  * @param msg message to transmit, will be freed!
510  * @param tc function to call afterwards
511  * @param tc_cls closure for tc
512  * @param end is this the last response (and we should
513  *        signal the server completion accodingly after
514  *        transmitting this message)?
515  */
516 static void
517 transmit (struct GNUNET_SERVER_Client *client,
518           struct GNUNET_MessageHeader *msg,
519           TransmitContinuation tc,
520           void *tc_cls,
521           int end)
522 {
523   struct TransmitCallbackContext *tcc;
524
525   if (GNUNET_YES == cleaning_done)
526     {
527       if (NULL != tc)
528         tc (tc_cls, GNUNET_SYSERR);
529       return;
530     }
531   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
532   tcc->msg = msg;
533   tcc->client = client;
534   tcc->tc = tc;
535   tcc->tc_cls = tc_cls;
536   tcc->end = end;
537   if (NULL ==
538       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
539                                                       ntohs(msg->size),
540                                                       GNUNET_TIME_UNIT_FOREVER_REL,
541                                                       &transmit_callback,
542                                                       tcc)))
543     {
544       GNUNET_break (0);
545       if (GNUNET_YES == end)
546         {
547 #if DEBUG_DATASTORE
548           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
549                       "Disconnecting client.\n");
550 #endif    
551           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
552         }
553       if (NULL != tc)
554         tc (tc_cls, GNUNET_SYSERR);
555       GNUNET_free (msg);
556       GNUNET_free (tcc);
557       return;
558     }
559   GNUNET_SERVER_client_keep (client);
560   GNUNET_CONTAINER_DLL_insert (tcc_head,
561                                tcc_tail,
562                                tcc);
563 }
564
565
566 /**
567  * Transmit a status code to the client.
568  *
569  * @param client receiver of the response
570  * @param code status code
571  * @param msg optional error message (can be NULL)
572  */
573 static void
574 transmit_status (struct GNUNET_SERVER_Client *client,
575                  int code,
576                  const char *msg)
577 {
578   struct StatusMessage *sm;
579   size_t slen;
580
581 #if DEBUG_DATASTORE
582   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
583               "Transmitting `%s' message with value %d and message `%s'\n",
584               "STATUS",
585               code,
586               msg != NULL ? msg : "(none)");
587 #endif
588   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
589   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
590   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
591   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
592   sm->status = htonl(code);
593   if (slen > 0)
594     memcpy (&sm[1], msg, slen);  
595   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
596 }
597
598
599 /**
600  * Function called once the transmit operation has
601  * either failed or succeeded.
602  *
603  * @param next_cls closure for calling "next_request" callback
604  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
605  */
606 static void 
607 get_next(void *next_cls,
608          int status)
609 {
610   if (status != GNUNET_OK)
611     {
612       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
613                   _("Failed to transmit an item to the client; aborting iteration.\n"));
614       if (plugin != NULL)
615         plugin->api->next_request (next_cls, GNUNET_YES);
616       return;
617     }
618   plugin->api->next_request (next_cls, GNUNET_NO);
619 }
620
621
622 /**
623  * Function that will transmit the given datastore entry
624  * to the client.
625  *
626  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
627  * @param next_cls closure to use to ask for the next item
628  * @param key key for the content
629  * @param size number of bytes in data
630  * @param data content stored
631  * @param type type of the content
632  * @param priority priority of the content
633  * @param anonymity anonymity-level for the content
634  * @param expiration expiration time for the content
635  * @param uid unique identifier for the datum;
636  *        maybe 0 if no unique identifier is available
637  *
638  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
639  *         GNUNET_NO to delete the item and continue (if supported)
640  */
641 static int
642 transmit_item (void *cls,
643                void *next_cls,
644                const GNUNET_HashCode * key,
645                uint32_t size,
646                const void *data,
647                uint32_t type,
648                uint32_t priority,
649                uint32_t anonymity,
650                struct GNUNET_TIME_Absolute
651                expiration, uint64_t uid)
652 {
653   struct GNUNET_SERVER_Client *client = cls;
654   struct GNUNET_MessageHeader *end;
655   struct DataMessage *dm;
656
657   if (key == NULL)
658     {
659       /* transmit 'DATA_END' */
660 #if DEBUG_DATASTORE
661       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
662                   "Transmitting `%s' message\n",
663                   "DATA_END");
664 #endif
665       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
666       end->size = htons(sizeof(struct GNUNET_MessageHeader));
667       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
668       transmit (client, end, NULL, NULL, GNUNET_YES);
669       GNUNET_SERVER_client_drop (client);
670       return GNUNET_OK;
671     }
672   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
673   dm->header.size = htons(sizeof(struct DataMessage) + size);
674   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
675   dm->rid = htonl(0);
676   dm->size = htonl(size);
677   dm->type = htonl(type);
678   dm->priority = htonl(priority);
679   dm->anonymity = htonl(anonymity);
680   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
681   dm->uid = GNUNET_htonll(uid);
682   dm->key = *key;
683   memcpy (&dm[1], data, size);
684 #if DEBUG_DATASTORE
685   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686               "Transmitting `%s' message\n",
687               "DATA");
688 #endif
689   GNUNET_STATISTICS_update (stats,
690                             gettext_noop ("# results found"),
691                             1,
692                             GNUNET_NO);
693   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
694   return GNUNET_OK;
695 }
696
697
698 /**
699  * Handle RESERVE-message.
700  *
701  * @param cls closure
702  * @param client identification of the client
703  * @param message the actual message
704  */
705 static void
706 handle_reserve (void *cls,
707                 struct GNUNET_SERVER_Client *client,
708                 const struct GNUNET_MessageHeader *message)
709 {
710   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
711   struct ReservationList *e;
712   unsigned long long used;
713   unsigned long long req;
714   uint64_t amount;
715   uint32_t entries;
716
717 #if DEBUG_DATASTORE
718   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719               "Processing `%s' request\n",
720               "RESERVE");
721 #endif
722   amount = GNUNET_ntohll(msg->amount);
723   entries = ntohl(msg->entries);
724   used = plugin->api->get_size (plugin->api->cls) + reserved;
725   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
726   if (used + req > quota)
727     {
728       if (quota < used)
729         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
730       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
731                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
732                   quota - used,
733                   "RESERVE",
734                   req);
735       if (cache_size < req)
736         {
737           /* TODO: document this in the FAQ; essentially, if this
738              message happens, the insertion request could be blocked
739              by less-important content from migration because it is
740              larger than 1/8th of the overall available space, and
741              we only reserve 1/8th for "fresh" insertions */
742           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
743                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
744                       req,
745                       cache_size);
746           transmit_status (client, 0, 
747                            gettext_noop ("Insufficient space to satisfy request and "
748                                          "requested amount is larger than cache size"));
749         }
750       else
751         {
752           transmit_status (client, 0, 
753                            gettext_noop ("Insufficient space to satisfy request"));
754         }
755       return;      
756     }
757   reserved += req;
758   e = GNUNET_malloc (sizeof(struct ReservationList));
759   e->next = reservations;
760   reservations = e;
761   e->client = client;
762   e->amount = amount;
763   e->entries = entries;
764   e->rid = ++reservation_gen;
765   if (reservation_gen < 0)
766     reservation_gen = 0; /* wrap around */
767   transmit_status (client, e->rid, NULL);
768 }
769
770
771 /**
772  * Handle RELEASE_RESERVE-message.
773  *
774  * @param cls closure
775  * @param client identification of the client
776  * @param message the actual message
777  */
778 static void
779 handle_release_reserve (void *cls,
780                         struct GNUNET_SERVER_Client *client,
781                         const struct GNUNET_MessageHeader *message)
782 {
783   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
784   struct ReservationList *pos;
785   struct ReservationList *prev;
786   struct ReservationList *next;
787   int rid = ntohl(msg->rid);
788   unsigned long long rem;
789
790 #if DEBUG_DATASTORE
791   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792               "Processing `%s' request\n",
793               "RELEASE_RESERVE");
794 #endif
795   next = reservations;
796   prev = NULL;
797   while (NULL != (pos = next))
798     {
799       next = pos->next;
800       if (rid == pos->rid)
801         {
802           if (prev == NULL)
803             reservations = next;
804           else
805             prev->next = next;
806           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
807           GNUNET_assert (reserved >= rem);
808           reserved -= rem;
809 #if DEBUG_DATASTORE
810           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
811                       "Returning %llu remaining reserved bytes to storage pool\n",
812                       rem);
813 #endif    
814           GNUNET_free (pos);
815           transmit_status (client, GNUNET_OK, NULL);
816           return;
817         }       
818       prev = pos;
819     }
820   GNUNET_break (0);
821   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
822 }
823
824
825 /**
826  * Check that the given message is a valid data message.
827  *
828  * @return NULL if the message is not well-formed, otherwise the message
829  */
830 static const struct DataMessage *
831 check_data (const struct GNUNET_MessageHeader *message)
832 {
833   uint16_t size;
834   uint32_t dsize;
835   const struct DataMessage *dm;
836
837   size = ntohs(message->size);
838   if (size < sizeof(struct DataMessage))
839     { 
840       GNUNET_break (0);
841       return NULL;
842     }
843   dm = (const struct DataMessage *) message;
844   dsize = ntohl(dm->size);
845   if (size != dsize + sizeof(struct DataMessage))
846     {
847       GNUNET_break (0);
848       return NULL;
849     }
850   return dm;
851 }
852
853
854 /**
855  * Handle PUT-message.
856  *
857  * @param cls closure
858  * @param client identification of the client
859  * @param message the actual message
860  */
861 static void
862 handle_put (void *cls,
863             struct GNUNET_SERVER_Client *client,
864             const struct GNUNET_MessageHeader *message)
865 {
866   const struct DataMessage *dm = check_data (message);
867   char *msg;
868   int ret;
869   int rid;
870   struct ReservationList *pos;
871   uint32_t size;
872
873 #if DEBUG_DATASTORE
874   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
875               "Processing `%s' request\n",
876               "PUT");
877 #endif
878   if (ntohl(dm->type) == 0) 
879     {
880       GNUNET_break (0);
881       dm = NULL;
882     }
883   if (dm == NULL)
884     {
885       GNUNET_break (0);
886       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
887       return;
888     }
889   rid = ntohl(dm->rid);
890   size = ntohl(dm->size);
891   if (rid > 0)
892     {
893       pos = reservations;
894       while ( (NULL != pos) &&
895               (rid != pos->rid) )
896         pos = pos->next;
897       GNUNET_break (pos != NULL);
898       if (NULL != pos)
899         {
900           GNUNET_break (pos->entries > 0);
901           GNUNET_break (pos->amount > size);
902           pos->entries--;
903           pos->amount -= size;
904           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
905         }
906     }
907   msg = NULL;
908   ret = plugin->api->put (plugin->api->cls,
909                           &dm->key,
910                           size,
911                           &dm[1],
912                           ntohl(dm->type),
913                           ntohl(dm->priority),
914                           ntohl(dm->anonymity),
915                           GNUNET_TIME_absolute_ntoh(dm->expiration),
916                           &msg);
917   if (GNUNET_OK == ret)
918     {
919       GNUNET_STATISTICS_update (stats,
920                                 gettext_noop ("# bytes stored"),
921                                 size,
922                                 GNUNET_NO);
923       GNUNET_CONTAINER_bloomfilter_add (filter,
924                                         &dm->key);
925 #if DEBUG_DATASTORE
926       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
927                   "Successfully stored %u bytes under key `%s'\n",
928                   size,
929                   GNUNET_h2s (&dm->key));
930 #endif
931     }
932   transmit_status (client, 
933                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
934                    msg);
935   GNUNET_free_non_null (msg);
936   if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
937     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
938 }
939
940
941 /**
942  * Handle GET-message.
943  *
944  * @param cls closure
945  * @param client identification of the client
946  * @param message the actual message
947  */
948 static void
949 handle_get (void *cls,
950              struct GNUNET_SERVER_Client *client,
951              const struct GNUNET_MessageHeader *message)
952 {
953   const struct GetMessage *msg;
954   uint16_t size;
955
956 #if DEBUG_DATASTORE
957   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958               "Processing `%s' request\n",
959               "GET");
960 #endif
961   size = ntohs(message->size);
962   if ( (size != sizeof(struct GetMessage)) &&
963        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
964     {
965       GNUNET_break (0);
966       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
967       return;
968     }
969   GNUNET_STATISTICS_update (stats,
970                             gettext_noop ("# GET requests received"),
971                             1,
972                             GNUNET_NO);
973   GNUNET_SERVER_client_keep (client);
974   msg = (const struct GetMessage*) message;
975   if ( (size == sizeof(struct GetMessage)) &&
976        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
977                                                          &msg->key)) )
978     {
979       /* don't bother database... */
980 #if DEBUG_DATASTORE
981       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
982                   "Empty result set for `%s' request for `%s'.\n",
983                   "GET",
984                   GNUNET_h2s (&msg->key));
985 #endif  
986       GNUNET_STATISTICS_update (stats,
987                                 gettext_noop ("# requests filtered by bloomfilter"),
988                                 1,
989                                 GNUNET_NO);
990       transmit_item (client,
991                      NULL, NULL, 0, NULL, 0, 0, 0, 
992                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
993       return;
994     }
995   plugin->api->get (plugin->api->cls,
996                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
997                     NULL,
998                     ntohl(msg->type),
999                     &transmit_item,
1000                     client);    
1001 }
1002
1003
1004 /**
1005  * Handle UPDATE-message.
1006  *
1007  * @param cls closure
1008  * @param client identification of the client
1009  * @param message the actual message
1010  */
1011 static void
1012 handle_update (void *cls,
1013                struct GNUNET_SERVER_Client *client,
1014                const struct GNUNET_MessageHeader *message)
1015 {
1016   const struct UpdateMessage *msg;
1017   int ret;
1018   char *emsg;
1019
1020 #if DEBUG_DATASTORE
1021   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022               "Processing `%s' request\n",
1023               "UPDATE");
1024 #endif
1025   GNUNET_STATISTICS_update (stats,
1026                             gettext_noop ("# UPDATE requests received"),
1027                             1,
1028                             GNUNET_NO);
1029   msg = (const struct UpdateMessage*) message;
1030   emsg = NULL;
1031   ret = plugin->api->update (plugin->api->cls,
1032                              GNUNET_ntohll(msg->uid),
1033                              (int32_t) ntohl(msg->priority),
1034                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1035                              &emsg);
1036   transmit_status (client, ret, emsg);
1037   GNUNET_free_non_null (emsg);
1038 }
1039
1040
1041 /**
1042  * Handle GET_RANDOM-message.
1043  *
1044  * @param cls closure
1045  * @param client identification of the client
1046  * @param message the actual message
1047  */
1048 static void
1049 handle_get_random (void *cls,
1050                    struct GNUNET_SERVER_Client *client,
1051                    const struct GNUNET_MessageHeader *message)
1052 {
1053 #if DEBUG_DATASTORE
1054   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1055               "Processing `%s' request\n",
1056               "GET_RANDOM");
1057 #endif
1058   GNUNET_STATISTICS_update (stats,
1059                             gettext_noop ("# GET RANDOM requests received"),
1060                             1,
1061                             GNUNET_NO);
1062   GNUNET_SERVER_client_keep (client);
1063   plugin->api->iter_migration_order (plugin->api->cls,
1064                                      0,
1065                                      &transmit_item,
1066                                      client);  
1067 }
1068
1069
1070 /**
1071  * Context for the 'remove_callback'.
1072  */
1073 struct RemoveContext 
1074 {
1075   /**
1076    * Client for whom we're doing the remvoing.
1077    */
1078   struct GNUNET_SERVER_Client *client;
1079
1080   /**
1081    * GNUNET_YES if we managed to remove something.
1082    */
1083   int found;
1084 };
1085
1086
1087 /**
1088  * Callback function that will cause the item that is passed
1089  * in to be deleted (by returning GNUNET_NO).
1090  */
1091 static int
1092 remove_callback (void *cls,
1093                  void *next_cls,
1094                  const GNUNET_HashCode * key,
1095                  uint32_t size,
1096                  const void *data,
1097                  uint32_t type,
1098                  uint32_t priority,
1099                  uint32_t anonymity,
1100                  struct GNUNET_TIME_Absolute
1101                  expiration, uint64_t uid)
1102 {
1103   struct RemoveContext *rc = cls;
1104
1105   if (key == NULL)
1106     {
1107 #if DEBUG_DATASTORE
1108       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1109                   "No further matches for `%s' request.\n",
1110                   "REMOVE");
1111 #endif  
1112       if (GNUNET_YES == rc->found)
1113         transmit_status (rc->client, GNUNET_OK, NULL);       
1114       else
1115         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1116       GNUNET_SERVER_client_drop (rc->client);
1117       GNUNET_free (rc);
1118       return GNUNET_OK; /* last item */
1119     }
1120   rc->found = GNUNET_YES;
1121 #if DEBUG_DATASTORE
1122   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1123               "Item %llu matches `%s' request.\n",
1124               (unsigned long long) uid,
1125               "REMOVE");
1126 #endif  
1127   GNUNET_STATISTICS_update (stats,
1128                             gettext_noop ("# bytes removed (explicit request)"),
1129                             size,
1130                             GNUNET_NO);
1131   GNUNET_CONTAINER_bloomfilter_remove (filter,
1132                                        key);
1133   plugin->api->next_request (next_cls, GNUNET_YES);
1134   return GNUNET_NO;
1135 }
1136
1137
1138 /**
1139  * Handle REMOVE-message.
1140  *
1141  * @param cls closure
1142  * @param client identification of the client
1143  * @param message the actual message
1144  */
1145 static void
1146 handle_remove (void *cls,
1147              struct GNUNET_SERVER_Client *client,
1148              const struct GNUNET_MessageHeader *message)
1149 {
1150   const struct DataMessage *dm = check_data (message);
1151   GNUNET_HashCode vhash;
1152   struct RemoveContext *rc;
1153
1154 #if DEBUG_DATASTORE
1155   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1156               "Processing `%s' request\n",
1157               "REMOVE");
1158 #endif
1159   if (dm == NULL)
1160     {
1161       GNUNET_break (0);
1162       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1163       return;
1164     }
1165   GNUNET_STATISTICS_update (stats,
1166                             gettext_noop ("# REMOVE requests received"),
1167                             1,
1168                             GNUNET_NO);
1169   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1170   GNUNET_SERVER_client_keep (client);
1171   rc->client = client;
1172   GNUNET_CRYPTO_hash (&dm[1],
1173                       ntohl(dm->size),
1174                       &vhash);
1175   plugin->api->get (plugin->api->cls,
1176                     &dm->key,
1177                     &vhash,
1178                     ntohl(dm->type),
1179                     &remove_callback,
1180                     rc);
1181 }
1182
1183
1184 /**
1185  * Handle DROP-message.
1186  *
1187  * @param cls closure
1188  * @param client identification of the client
1189  * @param message the actual message
1190  */
1191 static void
1192 handle_drop (void *cls,
1193              struct GNUNET_SERVER_Client *client,
1194              const struct GNUNET_MessageHeader *message)
1195 {
1196 #if DEBUG_DATASTORE
1197   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1198               "Processing `%s' request\n",
1199               "DROP");
1200 #endif
1201   plugin->api->drop (plugin->api->cls);
1202   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1203 }
1204
1205
1206 /**
1207  * List of handlers for the messages understood by this
1208  * service.
1209  */
1210 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1211   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1212    sizeof(struct ReserveMessage) }, 
1213   {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1214    sizeof(struct ReleaseReserveMessage) }, 
1215   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1216   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1217    sizeof (struct UpdateMessage) }, 
1218   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1219   {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1220    sizeof(struct GNUNET_MessageHeader) }, 
1221   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1222   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1223    sizeof(struct GNUNET_MessageHeader) }, 
1224   {NULL, NULL, 0, 0}
1225 };
1226
1227
1228
1229 /**
1230  * Load the datastore plugin.
1231  */
1232 static struct DatastorePlugin *
1233 load_plugin () 
1234 {
1235   struct DatastorePlugin *ret;
1236   char *libname;
1237   char *name;
1238
1239   if (GNUNET_OK !=
1240       GNUNET_CONFIGURATION_get_value_string (cfg,
1241                                              "DATASTORE", "DATABASE", &name))
1242     {
1243       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1244                   _("No `%s' specified for `%s' in configuration!\n"),
1245                   "DATABASE",
1246                   "DATASTORE");
1247       return NULL;
1248     }
1249   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1250   ret->env.cfg = cfg;
1251   ret->env.sched = sched;  
1252   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1253               _("Loading `%s' datastore plugin\n"), name);
1254   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1255   ret->short_name = name;
1256   ret->lib_name = libname;
1257   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1258   if (ret->api == NULL)
1259     {
1260       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1261                   _("Failed to load datastore plugin for `%s'\n"), name);
1262       GNUNET_free (ret->short_name);
1263       GNUNET_free (libname);
1264       GNUNET_free (ret);
1265       return NULL;
1266     }
1267   return ret;
1268 }
1269
1270
1271 /**
1272  * Function called when the service shuts
1273  * down.  Unloads our datastore plugin.
1274  *
1275  * @param plug plugin to unload
1276  */
1277 static void
1278 unload_plugin (struct DatastorePlugin *plug)
1279 {
1280 #if DEBUG_DATASTORE
1281   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1282               "Datastore service is unloading plugin...\n");
1283 #endif
1284   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1285   GNUNET_free (plug->lib_name);
1286   GNUNET_free (plug->short_name);
1287   GNUNET_free (plug);
1288 }
1289
1290
1291 /**
1292  * Final task run after shutdown.  Unloads plugins and disconnects us from
1293  * statistics.
1294  */
1295 static void
1296 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1297 {
1298   unload_plugin (plugin);
1299   plugin = NULL;
1300   if (filter != NULL)
1301     {
1302       GNUNET_CONTAINER_bloomfilter_free (filter);
1303       filter = NULL;
1304     }
1305   GNUNET_ARM_stop_services (cfg, tc->sched, "statistics", NULL);
1306   if (stats != NULL)
1307     {
1308       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1309       stats = NULL;
1310     }
1311 }
1312
1313
1314 /**
1315  * Last task run during shutdown.  Disconnects us from
1316  * the transport and core.
1317  */
1318 static void
1319 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1320 {
1321   struct TransmitCallbackContext *tcc;
1322
1323   cleaning_done = GNUNET_YES;
1324   while (NULL != (tcc = tcc_head))
1325     {
1326       GNUNET_CONTAINER_DLL_remove (tcc_head,
1327                                    tcc_tail,
1328                                    tcc);
1329       if (tcc->th != NULL)
1330         {
1331           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1332           GNUNET_SERVER_client_drop (tcc->client);
1333         }
1334    if (NULL != tcc->tc)
1335         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1336       GNUNET_free (tcc->msg);
1337       GNUNET_free (tcc);
1338     }
1339   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1340     {
1341       GNUNET_SCHEDULER_cancel (sched,
1342                                expired_kill_task);
1343       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1344     }
1345   GNUNET_SCHEDULER_add_continuation (sched,
1346                                      &unload_task,
1347                                      NULL,
1348                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1349 }
1350
1351
1352 /**
1353  * Function that removes all active reservations made
1354  * by the given client and releases the space for other
1355  * requests.
1356  *
1357  * @param cls closure
1358  * @param client identification of the client
1359  */
1360 static void
1361 cleanup_reservations (void *cls,
1362                       struct GNUNET_SERVER_Client
1363                       * client)
1364 {
1365   struct ReservationList *pos;
1366   struct ReservationList *prev;
1367   struct ReservationList *next;
1368
1369   if (client == NULL)
1370     return;
1371   prev = NULL;
1372   pos = reservations;
1373   while (NULL != pos)
1374     {
1375       next = pos->next;
1376       if (pos->client == client)
1377         {
1378           if (prev == NULL)
1379             reservations = next;
1380           else
1381             prev->next = next;
1382           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1383           GNUNET_free (pos);
1384         }
1385       else
1386         {
1387           prev = pos;
1388         }
1389       pos = next;
1390     }
1391 }
1392
1393
1394 /**
1395  * Process datastore requests.
1396  *
1397  * @param cls closure
1398  * @param s scheduler to use
1399  * @param server the initialized server
1400  * @param c configuration to use
1401  */
1402 static void
1403 run (void *cls,
1404      struct GNUNET_SCHEDULER_Handle *s,
1405      struct GNUNET_SERVER_Handle *server,
1406      const struct GNUNET_CONFIGURATION_Handle *c)
1407 {
1408   char *fn;
1409   unsigned int bf_size;
1410
1411   sched = s;
1412   cfg = c;
1413   if (GNUNET_OK !=
1414       GNUNET_CONFIGURATION_get_value_number (cfg,
1415                                              "DATASTORE", "QUOTA", &quota))
1416     {
1417       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1418                   _("No `%s' specified for `%s' in configuration!\n"),
1419                   "QUOTA",
1420                   "DATASTORE");
1421       return;
1422     }
1423   stats = GNUNET_STATISTICS_create (sched, "datastore", cfg);
1424   cache_size = quota / 8; /* Or should we make this an option? */
1425   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1426   fn = NULL;
1427   if ( (GNUNET_OK !=
1428         GNUNET_CONFIGURATION_get_value_filename (cfg,
1429                                                  "DATASTORE",
1430                                                  "BLOOMFILTER",
1431                                                  &fn)) ||
1432        (GNUNET_OK !=
1433         GNUNET_DISK_directory_create_for_file (fn)) )
1434     {
1435       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1436                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1437                   fn != NULL ? fn : "");
1438       GNUNET_free_non_null (fn);
1439       fn = NULL;
1440     }
1441   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1442   GNUNET_free_non_null (fn);
1443   if (filter == NULL)
1444     {
1445       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1446                   _("Failed to initialize bloomfilter.\n"));
1447       if (stats != NULL)
1448         {
1449           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1450           stats = NULL;
1451         }
1452       return;
1453     }
1454   GNUNET_ARM_start_services (cfg, sched, "statistics", NULL);
1455   plugin = load_plugin ();
1456   if (NULL == plugin)
1457     {
1458       GNUNET_CONTAINER_bloomfilter_free (filter);
1459       filter = NULL;
1460       GNUNET_ARM_stop_services (cfg, sched, "statistics", NULL);
1461       if (stats != NULL)
1462         {
1463           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1464           stats = NULL;
1465         }
1466       return;
1467     }
1468   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1469   GNUNET_SERVER_add_handlers (server, handlers);
1470   expired_kill_task
1471     = GNUNET_SCHEDULER_add_with_priority (sched,
1472                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1473                                           &delete_expired, NULL);
1474   GNUNET_SCHEDULER_add_delayed (sched,
1475                                 GNUNET_TIME_UNIT_FOREVER_REL,
1476                                 &cleaning_task, NULL);
1477   
1478 }
1479
1480
1481 /**
1482  * The main function for the datastore service.
1483  *
1484  * @param argc number of arguments from the command line
1485  * @param argv command line arguments
1486  * @return 0 ok, 1 on error
1487  */
1488 int
1489 main (int argc, char *const *argv)
1490 {
1491   int ret;
1492
1493   ret = (GNUNET_OK ==
1494          GNUNET_SERVICE_run (argc,
1495                              argv,
1496                              "datastore",
1497                              GNUNET_SERVICE_OPTION_NONE,
1498                              &run, NULL)) ? 0 : 1;
1499   return ret;
1500 }
1501
1502
1503 /* end of gnunet-service-datastore.c */