Merge branch 'master' of git+ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2014, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * Limit size of bloom filter to 2 GB.
41  */
42 #define MAX_BF_SIZE ((uint32_t) (1LL << 31))
43
44 /**
45  * How long are we at most keeping "expired" content
46  * past the expiration date in the database?
47  */
48 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
49
50 /**
51  * How fast are we allowed to query the database for deleting
52  * expired content? (1 item per second).
53  */
54 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
55
56 /**
57  * Name under which we store current space consumption.
58  */
59 static char *quota_stat_name;
60
61 /**
62  * Task to timeout stat GET.
63  */
64 static struct GNUNET_SCHEDULER_Task *stat_timeout_task;
65
66 /**
67  * After how many payload-changing operations
68  * do we sync our statistics?
69  */
70 #define MAX_STAT_SYNC_LAG 50
71
72
73 /**
74  * Our datastore plugin.
75  */
76 struct DatastorePlugin
77 {
78
79   /**
80    * API of the transport as returned by the plugin's
81    * initialization function.
82    */
83   struct GNUNET_DATASTORE_PluginFunctions *api;
84
85   /**
86    * Short name for the plugin (i.e. "sqlite").
87    */
88   char *short_name;
89
90   /**
91    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
92    */
93   char *lib_name;
94
95   /**
96    * Environment this transport service is using
97    * for this plugin.
98    */
99   struct GNUNET_DATASTORE_PluginEnvironment env;
100
101 };
102
103
104 /**
105  * Linked list of active reservations.
106  */
107 struct ReservationList
108 {
109
110   /**
111    * This is a linked list.
112    */
113   struct ReservationList *next;
114
115   /**
116    * Client that made the reservation.
117    */
118   struct GNUNET_SERVICE_Client *client;
119
120   /**
121    * Number of bytes (still) reserved.
122    */
123   uint64_t amount;
124
125   /**
126    * Number of items (still) reserved.
127    */
128   uint64_t entries;
129
130   /**
131    * Reservation identifier.
132    */
133   int32_t rid;
134
135 };
136
137
138
139 /**
140  * Our datastore plugin (NULL if not available).
141  */
142 static struct DatastorePlugin *plugin;
143
144 /**
145  * Linked list of space reservations made by clients.
146  */
147 static struct ReservationList *reservations;
148
149 /**
150  * Bloomfilter to quickly tell if we don't have the content.
151  */
152 static struct GNUNET_CONTAINER_BloomFilter *filter;
153
154 /**
155  * Name of our plugin.
156  */
157 static char *plugin_name;
158
159 /**
160  * Our configuration.
161  */
162 static const struct GNUNET_CONFIGURATION_Handle *cfg;
163
164 /**
165  * Handle for reporting statistics.
166  */
167 static struct GNUNET_STATISTICS_Handle *stats;
168
169 /**
170  * How much space are we using for the cache?  (space available for
171  * insertions that will be instantly reclaimed by discarding less
172  * important content --- or possibly whatever we just inserted into
173  * the "cache").
174  */
175 static unsigned long long cache_size;
176
177 /**
178  * How much space have we currently reserved?
179  */
180 static unsigned long long reserved;
181
182 /**
183  * How much data are we currently storing
184  * in the database?
185  */
186 static unsigned long long payload;
187
188 /**
189  * Identity of the task that is used to delete
190  * expired content.
191  */
192 static struct GNUNET_SCHEDULER_Task *expired_kill_task;
193
194 /**
195  * Minimum time that content should have to not be discarded instantly
196  * (time stamp of any content that we've been discarding recently to
197  * stay below the quota).  FOREVER if we had to expire content with
198  * non-zero priority.
199  */
200 static struct GNUNET_TIME_Absolute min_expiration;
201
202 /**
203  * How much space are we allowed to use?
204  */
205 static unsigned long long quota;
206
207 /**
208  * Should the database be dropped on exit?
209  */
210 static int do_drop;
211
212 /**
213  * Should we refresh the BF when the DB is loaded?
214  */
215 static int refresh_bf;
216
217 /**
218  * Number of updates that were made to the
219  * payload value since we last synchronized
220  * it with the statistics service.
221  */
222 static unsigned int last_sync;
223
224 /**
225  * Did we get an answer from statistics?
226  */
227 static int stats_worked;
228
229
230 /**
231  * Synchronize our utilization statistics with the
232  * statistics service.
233  */
234 static void
235 sync_stats ()
236 {
237   GNUNET_STATISTICS_set (stats,
238                          quota_stat_name,
239                          payload,
240                          GNUNET_YES);
241   GNUNET_STATISTICS_set (stats,
242                          "# utilization by current datastore",
243                          payload,
244                          GNUNET_NO);
245   last_sync = 0;
246 }
247
248
249 /**
250  * Have we already cleaned up the TCCs and are hence no longer
251  * willing (or able) to transmit anything to anyone?
252  */
253 static int cleaning_done;
254
255 /**
256  * Handle for pending get request.
257  */
258 static struct GNUNET_STATISTICS_GetHandle *stat_get;
259
260 /**
261  * Handle to our server.
262  */
263 static struct GNUNET_SERVICE_Handle *service;
264
265 /**
266  * Task that is used to remove expired entries from
267  * the datastore.  This task will schedule itself
268  * again automatically to always delete all expired
269  * content quickly.
270  *
271  * @param cls not used
272  */
273 static void
274 delete_expired (void *cls);
275
276
277 /**
278  * Iterate over the expired items stored in the datastore.
279  * Delete all expired items; once we have processed all
280  * expired items, re-schedule the "delete_expired" task.
281  *
282  * @param cls not used
283  * @param key key for the content
284  * @param size number of bytes in data
285  * @param data content stored
286  * @param type type of the content
287  * @param priority priority of the content
288  * @param anonymity anonymity-level for the content
289  * @param expiration expiration time for the content
290  * @param uid unique identifier for the datum;
291  *        maybe 0 if no unique identifier is available
292  *
293  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
294  *         (continue on call to "next", of course),
295  *         #GNUNET_NO to delete the item and continue (if supported)
296  */
297 static int
298 expired_processor (void *cls,
299                    const struct GNUNET_HashCode *key,
300                    uint32_t size,
301                    const void *data,
302                    enum GNUNET_BLOCK_Type type,
303                    uint32_t priority,
304                    uint32_t anonymity,
305                    struct GNUNET_TIME_Absolute expiration,
306                    uint64_t uid)
307 {
308   struct GNUNET_TIME_Absolute now;
309
310   if (key == NULL)
311   {
312     expired_kill_task =
313         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
314                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
315                                                     &delete_expired, NULL);
316     return GNUNET_SYSERR;
317   }
318   now = GNUNET_TIME_absolute_get ();
319   if (expiration.abs_value_us > now.abs_value_us)
320   {
321     /* finished processing */
322     expired_kill_task =
323         GNUNET_SCHEDULER_add_delayed_with_priority (MAX_EXPIRE_DELAY,
324                                                     GNUNET_SCHEDULER_PRIORITY_IDLE,
325                                                     &delete_expired, NULL);
326     return GNUNET_SYSERR;
327   }
328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329               "Deleting content `%s' of type %u that expired %s ago\n",
330               GNUNET_h2s (key), type,
331               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_difference (expiration,
332                                                                                            now),
333                                                       GNUNET_YES));
334   min_expiration = now;
335   GNUNET_STATISTICS_update (stats,
336                             gettext_noop ("# bytes expired"),
337                             size,
338                             GNUNET_YES);
339   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
340   expired_kill_task =
341       GNUNET_SCHEDULER_add_delayed_with_priority (MIN_EXPIRE_DELAY,
342                                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
343                                                   &delete_expired, NULL);
344   return GNUNET_NO;
345 }
346
347
348 /**
349  * Task that is used to remove expired entries from
350  * the datastore.  This task will schedule itself
351  * again automatically to always delete all expired
352  * content quickly.
353  *
354  * @param cls not used
355  */
356 static void
357 delete_expired (void *cls)
358 {
359   expired_kill_task = NULL;
360   plugin->api->get_expiration (plugin->api->cls,
361                                &expired_processor,
362                                NULL);
363 }
364
365
366 /**
367  * An iterator over a set of items stored in the datastore
368  * that deletes until we're happy with respect to our quota.
369  *
370  * @param cls closure
371  * @param key key for the content
372  * @param size number of bytes in data
373  * @param data content stored
374  * @param type type of the content
375  * @param priority priority of the content
376  * @param anonymity anonymity-level for the content
377  * @param expiration expiration time for the content
378  * @param uid unique identifier for the datum;
379  *        maybe 0 if no unique identifier is available
380  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
381  *         (continue on call to "next", of course),
382  *         #GNUNET_NO to delete the item and continue (if supported)
383  */
384 static int
385 quota_processor (void *cls,
386                  const struct GNUNET_HashCode *key,
387                  uint32_t size,
388                  const void *data,
389                  enum GNUNET_BLOCK_Type type,
390                  uint32_t priority,
391                  uint32_t anonymity,
392                  struct GNUNET_TIME_Absolute expiration,
393                  uint64_t uid)
394 {
395   unsigned long long *need = cls;
396
397   if (NULL == key)
398     return GNUNET_SYSERR;
399   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
400               "Deleting %llu bytes of low-priority (%u) content `%s' of type %u at %s prior to expiration (still trying to free another %llu bytes)\n",
401               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
402               (unsigned int) priority,
403               GNUNET_h2s (key), type,
404               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
405                                                       GNUNET_YES),
406               *need);
407   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
408     *need = 0;
409   else
410     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
411   if (priority > 0)
412     min_expiration = GNUNET_TIME_UNIT_FOREVER_ABS;
413   else
414     min_expiration = expiration;
415   GNUNET_STATISTICS_update (stats,
416                             gettext_noop ("# bytes purged (low-priority)"),
417                             size, GNUNET_YES);
418   GNUNET_CONTAINER_bloomfilter_remove (filter, key);
419   return GNUNET_NO;
420 }
421
422
423 /**
424  * Manage available disk space by running tasks
425  * that will discard content if necessary.  This
426  * function will be run whenever a request for
427  * "need" bytes of storage could only be satisfied
428  * by eating into the "cache" (and we want our cache
429  * space back).
430  *
431  * @param need number of bytes of content that were
432  *        placed into the "cache" (and hence the
433  *        number of bytes that should be removed).
434  */
435 static void
436 manage_space (unsigned long long need)
437 {
438   unsigned long long last;
439
440   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
441               "Asked to free up %llu bytes of cache space\n",
442               need);
443   last = 0;
444   while ((need > 0) && (last != need))
445   {
446     last = need;
447     plugin->api->get_expiration (plugin->api->cls,
448                                  &quota_processor,
449                                  &need);
450   }
451 }
452
453
454 /**
455  * Transmit a status code to the client.
456  *
457  * @param client receiver of the response
458  * @param code status code
459  * @param msg optional error message (can be NULL)
460  */
461 static void
462 transmit_status (struct GNUNET_SERVICE_Client *client,
463                  int code,
464                  const char *msg)
465 {
466   struct GNUNET_MQ_Envelope *env;
467   struct StatusMessage *sm;
468   size_t slen;
469
470   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471               "Transmitting `%s' message with value %d and message `%s'\n",
472               "STATUS", code, msg != NULL ? msg : "(none)");
473   slen = (msg == NULL) ? 0 : strlen (msg) + 1;
474   env = GNUNET_MQ_msg_extra (sm,
475                              slen,
476                              GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
477   sm->status = htonl (code);
478   sm->min_expiration = GNUNET_TIME_absolute_hton (min_expiration);
479   GNUNET_memcpy (&sm[1],
480                  msg,
481                  slen);
482   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
483                   env);
484 }
485
486
487 /**
488  * Function that will transmit the given datastore entry
489  * to the client.
490  *
491  * @param cls closure, pointer to the client (of type `struct GNUNET_SERVICE_Client`).
492  * @param key key for the content
493  * @param size number of bytes in data
494  * @param data content stored
495  * @param type type of the content
496  * @param priority priority of the content
497  * @param anonymity anonymity-level for the content
498  * @param expiration expiration time for the content
499  * @param uid unique identifier for the datum;
500  *        maybe 0 if no unique identifier is available
501  * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue,
502  *         #GNUNET_NO to delete the item and continue (if supported)
503  */
504 static int
505 transmit_item (void *cls,
506                const struct GNUNET_HashCode *key,
507                uint32_t size,
508                const void *data,
509                enum GNUNET_BLOCK_Type type,
510                uint32_t priority,
511                uint32_t anonymity,
512                struct GNUNET_TIME_Absolute expiration,
513                uint64_t uid)
514 {
515   struct GNUNET_SERVICE_Client *client = cls;
516   struct GNUNET_MQ_Envelope *env;
517   struct GNUNET_MessageHeader *end;
518   struct DataMessage *dm;
519
520   if (NULL == key)
521   {
522     /* transmit 'DATA_END' */
523     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
524                 "Transmitting DATA_END message\n");
525     env = GNUNET_MQ_msg (end,
526                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
527     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
528                     env);
529     return GNUNET_OK;
530   }
531   GNUNET_assert (sizeof (struct DataMessage) + size <
532                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
533   env = GNUNET_MQ_msg_extra (dm,
534                              size,
535                              GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
536   dm->rid = htonl (0);
537   dm->size = htonl (size);
538   dm->type = htonl (type);
539   dm->priority = htonl (priority);
540   dm->anonymity = htonl (anonymity);
541   dm->replication = htonl (0);
542   dm->reserved = htonl (0);
543   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
544   dm->uid = GNUNET_htonll (uid);
545   dm->key = *key;
546   GNUNET_memcpy (&dm[1],
547                  data,
548                  size);
549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550               "Transmitting DATA message for `%s' of type %u with expiration %s (in: %s)\n",
551               GNUNET_h2s (key),
552               type,
553               GNUNET_STRINGS_absolute_time_to_string (expiration),
554               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
555                                                       GNUNET_YES));
556   GNUNET_STATISTICS_update (stats,
557                             gettext_noop ("# results found"),
558                             1,
559                             GNUNET_NO);
560   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
561                   env);
562   return GNUNET_OK;
563 }
564
565
566 /**
567  * Handle RESERVE-message.
568  *
569  * @param cls identification of the client
570  * @param message the actual message
571  */
572 static void
573 handle_reserve (void *cls,
574                 const struct ReserveMessage *msg)
575 {
576   /**
577    * Static counter to produce reservation identifiers.
578    */
579   static int reservation_gen;
580   struct GNUNET_SERVICE_Client *client = cls;
581   struct ReservationList *e;
582   unsigned long long used;
583   unsigned long long req;
584   uint64_t amount;
585   uint32_t entries;
586
587   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
588               "Processing RESERVE request\n");
589   amount = GNUNET_ntohll (msg->amount);
590   entries = ntohl (msg->entries);
591   used = payload + reserved;
592   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
593   if (used + req > quota)
594   {
595     if (quota < used)
596       used = quota;             /* cheat a bit for error message (to avoid negative numbers) */
597     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
598                 _("Insufficient space (%llu bytes are available) to satisfy RESERVE request for %llu bytes\n"),
599                 quota - used,
600                 req);
601     if (cache_size < req)
602     {
603       /* TODO: document this in the FAQ; essentially, if this
604        * message happens, the insertion request could be blocked
605        * by less-important content from migration because it is
606        * larger than 1/8th of the overall available space, and
607        * we only reserve 1/8th for "fresh" insertions */
608       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
609                   _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
610                   req,
611                   cache_size);
612       transmit_status (client,
613                        0,
614                        gettext_noop
615                        ("Insufficient space to satisfy request and "
616                         "requested amount is larger than cache size"));
617     }
618     else
619     {
620       transmit_status (client,
621                        0,
622                        gettext_noop ("Insufficient space to satisfy request"));
623     }
624     GNUNET_SERVICE_client_continue (client);
625     return;
626   }
627   reserved += req;
628   GNUNET_STATISTICS_set (stats,
629                          gettext_noop ("# reserved"),
630                          reserved,
631                          GNUNET_NO);
632   e = GNUNET_new (struct ReservationList);
633   e->next = reservations;
634   reservations = e;
635   e->client = client;
636   e->amount = amount;
637   e->entries = entries;
638   e->rid = ++reservation_gen;
639   if (reservation_gen < 0)
640     reservation_gen = 0;        /* wrap around */
641   transmit_status (client,
642                    e->rid,
643                    NULL);
644   GNUNET_SERVICE_client_continue (client);
645 }
646
647
648 /**
649  * Handle RELEASE_RESERVE-message.
650  *
651  * @param cls identification of the client
652  * @param message the actual message
653  */
654 static void
655 handle_release_reserve (void *cls,
656                         const struct ReleaseReserveMessage *msg)
657 {
658   struct GNUNET_SERVICE_Client *client = cls;
659   struct ReservationList *pos;
660   struct ReservationList *prev;
661   struct ReservationList *next;
662   int rid = ntohl (msg->rid);
663   unsigned long long rem;
664
665   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
666               "Processing RELEASE_RESERVE request\n");
667   next = reservations;
668   prev = NULL;
669   while (NULL != (pos = next))
670   {
671     next = pos->next;
672     if (rid == pos->rid)
673     {
674       if (prev == NULL)
675         reservations = next;
676       else
677         prev->next = next;
678       rem =
679           pos->amount +
680           ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
681       GNUNET_assert (reserved >= rem);
682       reserved -= rem;
683       GNUNET_STATISTICS_set (stats,
684                              gettext_noop ("# reserved"),
685                              reserved,
686                              GNUNET_NO);
687       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688                   "Returning %llu remaining reserved bytes to storage pool\n",
689                   rem);
690       GNUNET_free (pos);
691       transmit_status (client,
692                        GNUNET_OK,
693                        NULL);
694       GNUNET_SERVICE_client_continue (client);
695       return;
696     }
697     prev = pos;
698   }
699   GNUNET_break (0);
700   transmit_status (client,
701                    GNUNET_SYSERR,
702                    gettext_noop ("Could not find matching reservation"));
703   GNUNET_SERVICE_client_continue (client);
704 }
705
706
707 /**
708  * Check that the given message is a valid data message.
709  *
710  * @param dm message to check
711  * @return #GNUNET_SYSERR is not well-formed, otherwise #GNUNET_OK
712  */
713 static int
714 check_data (const struct DataMessage *dm)
715 {
716   uint16_t size;
717   uint32_t dsize;
718
719   size = ntohs (dm->header.size);
720   dsize = ntohl (dm->size);
721   if (size != dsize + sizeof (struct DataMessage))
722   {
723     GNUNET_break (0);
724     return GNUNET_SYSERR;
725   }
726   return GNUNET_OK;
727 }
728
729
730 /**
731  * Context for a PUT request used to see if the content is
732  * already present.
733  */
734 struct PutContext
735 {
736   /**
737    * Client to notify on completion.
738    */
739   struct GNUNET_SERVICE_Client *client;
740
741 #if ! HAVE_UNALIGNED_64_ACCESS
742   void *reserved;
743 #endif
744
745   /* followed by the 'struct DataMessage' */
746 };
747
748
749 /**
750  * Put continuation.
751  *
752  * @param cls closure
753  * @param key key for the item stored
754  * @param size size of the item stored
755  * @param status #GNUNET_OK or #GNUNET_SYSERROR
756  * @param msg error message on error
757  */
758 static void
759 put_continuation (void *cls,
760                   const struct GNUNET_HashCode *key,
761                   uint32_t size,
762                   int status,
763                   const char *msg)
764 {
765   struct PutContext *pc = cls;
766
767   if (GNUNET_OK == status)
768   {
769     GNUNET_STATISTICS_update (stats,
770                               gettext_noop ("# bytes stored"),
771                               size,
772                               GNUNET_YES);
773     GNUNET_CONTAINER_bloomfilter_add (filter,
774                                       key);
775     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776                 "Successfully stored %u bytes under key `%s'\n",
777                 size,
778                 GNUNET_h2s (key));
779   }
780   transmit_status (pc->client,
781                    status,
782                    msg);
783   GNUNET_free (pc);
784   if (quota - reserved - cache_size < payload)
785   {
786     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
787                 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
788                 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
789                 (unsigned long long) (quota - reserved - cache_size),
790                 (unsigned long long) payload);
791     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
792   }
793 }
794
795
796 /**
797  * Actually put the data message.
798  *
799  * @param pc put context
800  */
801 static void
802 execute_put (struct PutContext *pc)
803 {
804   const struct DataMessage *dm;
805
806   dm = (const struct DataMessage *) &pc[1];
807   plugin->api->put (plugin->api->cls,
808                     &dm->key,
809                     ntohl (dm->size),
810                     &dm[1],
811                     ntohl (dm->type),
812                     ntohl (dm->priority),
813                     ntohl (dm->anonymity),
814                     ntohl (dm->replication),
815                     GNUNET_TIME_absolute_ntoh (dm->expiration),
816                     &put_continuation,
817                     pc);
818 }
819
820
821 /**
822  *
823  * @param cls closure
824  * @param status #GNUNET_OK or #GNUNET_SYSERR
825  * @param msg error message on error
826  */
827 static void
828 check_present_continuation (void *cls,
829                             int status,
830                             const char *msg)
831 {
832   struct GNUNET_SERVICE_Client *client = cls;
833
834   transmit_status (client,
835                    GNUNET_NO,
836                    NULL);
837 }
838
839
840 /**
841  * Function that will check if the given datastore entry
842  * matches the put and if none match executes the put.
843  *
844  * @param cls closure, pointer to the client (of type `struct PutContext`).
845  * @param key key for the content
846  * @param size number of bytes in data
847  * @param data content stored
848  * @param type type of the content
849  * @param priority priority of the content
850  * @param anonymity anonymity-level for the content
851  * @param expiration expiration time for the content
852  * @param uid unique identifier for the datum;
853  *        maybe 0 if no unique identifier is available
854  * @return #GNUNET_OK usually
855  *         #GNUNET_NO to delete the item
856  */
857 static int
858 check_present (void *cls,
859                const struct GNUNET_HashCode *key,
860                uint32_t size,
861                const void *data,
862                enum GNUNET_BLOCK_Type type,
863                uint32_t priority,
864                uint32_t anonymity,
865                struct GNUNET_TIME_Absolute expiration,
866                uint64_t uid)
867 {
868   struct PutContext *pc = cls;
869   const struct DataMessage *dm;
870
871   dm = (const struct DataMessage *) &pc[1];
872   if (key == NULL)
873   {
874     execute_put (pc);
875     return GNUNET_OK;
876   }
877   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
878        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
879        ( (size == ntohl (dm->size)) &&
880          (0 == memcmp (&dm[1],
881                        data,
882                        size)) ) )
883   {
884     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885                 "Result already present in datastore\n");
886     /* FIXME: change API to allow increasing 'replication' counter */
887     if ((ntohl (dm->priority) > 0) ||
888         (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value_us >
889          expiration.abs_value_us))
890       plugin->api->update (plugin->api->cls,
891                            uid,
892                            (int32_t) ntohl (dm->priority),
893                            GNUNET_TIME_absolute_ntoh (dm->expiration),
894                            &check_present_continuation,
895                            pc->client);
896     else
897     {
898       transmit_status (pc->client,
899                        GNUNET_NO,
900                        NULL);
901     }
902     GNUNET_free (pc);
903   }
904   else
905   {
906     execute_put (pc);
907   }
908   return GNUNET_OK;
909 }
910
911
912 /**
913  * Verify PUT-message.
914  *
915  * @param cls identification of the client
916  * @param message the actual message
917  * @return #GNUNET_OK if @a dm is well-formed
918  */
919 static int
920 check_put (void *cls,
921            const struct DataMessage *dm)
922 {
923   if (GNUNET_OK != check_data (dm))
924   {
925     GNUNET_break (0);
926     return GNUNET_SYSERR;
927   }
928   return GNUNET_OK;
929 }
930
931
932 /**
933  * Handle PUT-message.
934  *
935  * @param cls identification of the client
936  * @param message the actual message
937  */
938 static void
939 handle_put (void *cls,
940             const struct DataMessage *dm)
941 {
942   struct GNUNET_SERVICE_Client *client = cls;
943   int rid;
944   struct ReservationList *pos;
945   struct PutContext *pc;
946   struct GNUNET_HashCode vhash;
947   uint32_t size;
948
949   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
950               "Processing PUT request for `%s' of type %u\n",
951               GNUNET_h2s (&dm->key),
952               ntohl (dm->type));
953   rid = ntohl (dm->rid);
954   size = ntohl (dm->size);
955   if (rid > 0)
956   {
957     pos = reservations;
958     while ((NULL != pos) && (rid != pos->rid))
959       pos = pos->next;
960     GNUNET_break (pos != NULL);
961     if (NULL != pos)
962     {
963       GNUNET_break (pos->entries > 0);
964       GNUNET_break (pos->amount >= size);
965       pos->entries--;
966       pos->amount -= size;
967       reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
968       GNUNET_STATISTICS_set (stats,
969                              gettext_noop ("# reserved"),
970                              reserved,
971                              GNUNET_NO);
972     }
973   }
974   pc = GNUNET_malloc (sizeof (struct PutContext) + size +
975                       sizeof (struct DataMessage));
976   pc->client = client;
977   GNUNET_memcpy (&pc[1],
978                  dm,
979                  size + sizeof (struct DataMessage));
980   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
981                                                        &dm->key))
982   {
983     GNUNET_CRYPTO_hash (&dm[1],
984                         size,
985                         &vhash);
986     plugin->api->get_key (plugin->api->cls,
987                           0,
988                           &dm->key,
989                           &vhash,
990                           ntohl (dm->type),
991                           &check_present,
992                           pc);
993     GNUNET_SERVICE_client_continue (client);
994     return;
995   }
996   execute_put (pc);
997   GNUNET_SERVICE_client_continue (client);
998 }
999
1000
1001 /**
1002  * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET-message.
1003  *
1004  * @param cls identification of the client
1005  * @param msg the actual message
1006  */
1007 static void
1008 handle_get (void *cls,
1009             const struct GetMessage *msg)
1010 {
1011   struct GNUNET_SERVICE_Client *client = cls;
1012
1013   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1014               "Processing GET request of type %u\n",
1015               ntohl (msg->type));
1016   GNUNET_STATISTICS_update (stats,
1017                             gettext_noop ("# GET requests received"),
1018                             1,
1019                             GNUNET_NO);
1020   plugin->api->get_key (plugin->api->cls,
1021                         GNUNET_ntohll (msg->offset),
1022                         NULL,
1023                         NULL,
1024                         ntohl (msg->type),
1025                         &transmit_item,
1026                         client);
1027   GNUNET_SERVICE_client_continue (client);
1028 }
1029
1030
1031 /**
1032  * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY-message.
1033  *
1034  * @param cls closure
1035  * @param msg the actual message
1036  */
1037 static void
1038 handle_get_key (void *cls,
1039                 const struct GetKeyMessage *msg)
1040 {
1041   struct GNUNET_SERVICE_Client *client = cls;
1042
1043   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044               "Processing GET request for `%s' of type %u\n",
1045               GNUNET_h2s (&msg->key),
1046               ntohl (msg->type));
1047   GNUNET_STATISTICS_update (stats,
1048                             gettext_noop ("# GET KEY requests received"),
1049                             1,
1050                             GNUNET_NO);
1051   if (GNUNET_YES !=
1052       GNUNET_CONTAINER_bloomfilter_test (filter,
1053                                          &msg->key))
1054   {
1055     /* don't bother database... */
1056     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057                 "Empty result set for GET request for `%s' (bloomfilter).\n",
1058                 GNUNET_h2s (&msg->key));
1059     GNUNET_STATISTICS_update (stats,
1060                               gettext_noop
1061                               ("# requests filtered by bloomfilter"),
1062                               1,
1063                               GNUNET_NO);
1064     transmit_item (client,
1065                    NULL, 0, NULL, 0, 0, 0,
1066                    GNUNET_TIME_UNIT_ZERO_ABS,
1067                    0);
1068     GNUNET_SERVICE_client_continue (client);
1069     return;
1070   }
1071   plugin->api->get_key (plugin->api->cls,
1072                         GNUNET_ntohll (msg->offset),
1073                         &msg->key,
1074                         NULL,
1075                         ntohl (msg->type),
1076                         &transmit_item,
1077                         client);
1078   GNUNET_SERVICE_client_continue (client);
1079 }
1080
1081
1082 /**
1083  * Function called with the result of an update operation.
1084  *
1085  * @param cls closure
1086  * @param status #GNUNET_OK or #GNUNET_SYSERR
1087  * @param msg error message on error
1088  */
1089 static void
1090 update_continuation (void *cls,
1091                      int status,
1092                      const char *msg)
1093 {
1094   struct GNUNET_SERVICE_Client *client = cls;
1095
1096   transmit_status (client,
1097                    status,
1098                    msg);
1099 }
1100
1101
1102 /**
1103  * Handle UPDATE-message.
1104  *
1105  * @param cls client identification of the client
1106  * @param message the actual message
1107  */
1108 static void
1109 handle_update (void *cls,
1110                const struct UpdateMessage *msg)
1111 {
1112   struct GNUNET_SERVICE_Client *client = cls;
1113
1114   GNUNET_STATISTICS_update (stats,
1115                             gettext_noop ("# UPDATE requests received"),
1116                             1,
1117                             GNUNET_NO);
1118   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1119               "Processing UPDATE request for %llu\n",
1120               (unsigned long long) GNUNET_ntohll (msg->uid));
1121   plugin->api->update (plugin->api->cls,
1122                        GNUNET_ntohll (msg->uid),
1123                        (int32_t) ntohl (msg->priority),
1124                        GNUNET_TIME_absolute_ntoh (msg->expiration),
1125                        &update_continuation,
1126                        client);
1127   GNUNET_SERVICE_client_continue (client);
1128 }
1129
1130
1131 /**
1132  * Handle GET_REPLICATION-message.
1133  *
1134  * @param cls identification of the client
1135  * @param message the actual message
1136  */
1137 static void
1138 handle_get_replication (void *cls,
1139                         const struct GNUNET_MessageHeader *message)
1140 {
1141   struct GNUNET_SERVICE_Client *client = cls;
1142
1143   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1144               "Processing GET_REPLICATION request\n");
1145   GNUNET_STATISTICS_update (stats,
1146                             gettext_noop ("# GET REPLICATION requests received"),
1147                             1,
1148                             GNUNET_NO);
1149   plugin->api->get_replication (plugin->api->cls,
1150                                 &transmit_item,
1151                                 client);
1152   GNUNET_SERVICE_client_continue (client);
1153 }
1154
1155
1156 /**
1157  * Handle GET_ZERO_ANONYMITY-message.
1158  *
1159  * @param cls client identification of the client
1160  * @param message the actual message
1161  */
1162 static void
1163 handle_get_zero_anonymity (void *cls,
1164                            const struct GetZeroAnonymityMessage *msg)
1165 {
1166   struct GNUNET_SERVICE_Client *client = cls;
1167   enum GNUNET_BLOCK_Type type;
1168
1169   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1170   if (type == GNUNET_BLOCK_TYPE_ANY)
1171   {
1172     GNUNET_break (0);
1173     GNUNET_SERVICE_client_drop (client);
1174     return;
1175   }
1176   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1177               "Processing GET_ZERO_ANONYMITY request\n");
1178   GNUNET_STATISTICS_update (stats,
1179                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1180                             1,
1181                             GNUNET_NO);
1182   plugin->api->get_zero_anonymity (plugin->api->cls,
1183                                    GNUNET_ntohll (msg->offset),
1184                                    type,
1185                                    &transmit_item,
1186                                    client);
1187   GNUNET_SERVICE_client_continue (client);
1188 }
1189
1190
1191 /**
1192  * Callback function that will cause the item that is passed
1193  * in to be deleted (by returning #GNUNET_NO).
1194  *
1195  * @param cls closure
1196  * @param key key for the content
1197  * @param size number of bytes in data
1198  * @param data content stored
1199  * @param type type of the content
1200  * @param priority priority of the content
1201  * @param anonymity anonymity-level for the content
1202  * @param expiration expiration time for the content
1203  * @param uid unique identifier for the datum
1204  * @return #GNUNET_OK to keep the item
1205  *         #GNUNET_NO to delete the item
1206  */
1207 static int
1208 remove_callback (void *cls,
1209                  const struct GNUNET_HashCode *key,
1210                  uint32_t size,
1211                  const void *data,
1212                  enum GNUNET_BLOCK_Type type,
1213                  uint32_t priority,
1214                  uint32_t anonymity,
1215                  struct GNUNET_TIME_Absolute expiration,
1216                  uint64_t uid)
1217 {
1218   struct GNUNET_SERVICE_Client *client = cls;
1219
1220   if (NULL == key)
1221   {
1222     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1223                 "No further matches for REMOVE request.\n");
1224     transmit_status (client,
1225                      GNUNET_NO,
1226                      _("Content not found"));
1227     return GNUNET_OK;           /* last item */
1228   }
1229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1230               "Item %llu matches REMOVE request for key `%s' and type %u.\n",
1231               (unsigned long long) uid,
1232               GNUNET_h2s (key),
1233               type);
1234   GNUNET_STATISTICS_update (stats,
1235                             gettext_noop ("# bytes removed (explicit request)"),
1236                             size,
1237                             GNUNET_YES);
1238   GNUNET_CONTAINER_bloomfilter_remove (filter,
1239                                        key);
1240   transmit_status (client,
1241                    GNUNET_OK,
1242                    NULL);
1243   return GNUNET_NO;
1244 }
1245
1246
1247 /**
1248  * Verify REMOVE-message.
1249  *
1250  * @param cls identification of the client
1251  * @param message the actual message
1252  * @return #GNUNET_OK if @a dm is well-formed
1253  */
1254 static int
1255 check_remove (void *cls,
1256               const struct DataMessage *dm)
1257 {
1258   if (GNUNET_OK != check_data (dm))
1259   {
1260     GNUNET_break (0);
1261     return GNUNET_SYSERR;
1262   }
1263   return GNUNET_OK;
1264 }
1265
1266
1267 /**
1268  * Handle REMOVE-message.
1269  *
1270  * @param cls closure
1271  * @param client identification of the client
1272  * @param message the actual message
1273  */
1274 static void
1275 handle_remove (void *cls,
1276                const struct DataMessage *dm)
1277 {
1278   struct GNUNET_SERVICE_Client *client = cls;
1279   struct GNUNET_HashCode vhash;
1280
1281   GNUNET_STATISTICS_update (stats,
1282                             gettext_noop ("# REMOVE requests received"),
1283                             1, GNUNET_NO);
1284   GNUNET_CRYPTO_hash (&dm[1],
1285                       ntohl (dm->size),
1286                       &vhash);
1287   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1288               "Processing REMOVE request for `%s' of type %u\n",
1289               GNUNET_h2s (&dm->key),
1290               ntohl (dm->type));
1291   plugin->api->get_key (plugin->api->cls,
1292                         0,
1293                         &dm->key,
1294                         &vhash,
1295                         (enum GNUNET_BLOCK_Type) ntohl (dm->type),
1296                         &remove_callback,
1297                         client);
1298   GNUNET_SERVICE_client_continue (client);
1299 }
1300
1301
1302 /**
1303  * Handle DROP-message.
1304  *
1305  * @param cls identification of the client
1306  * @param message the actual message
1307  */
1308 static void
1309 handle_drop (void *cls,
1310              const struct GNUNET_MessageHeader *message)
1311 {
1312   struct GNUNET_SERVICE_Client *client = cls;
1313
1314   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1315               "Processing DROP request\n");
1316   do_drop = GNUNET_YES;
1317   GNUNET_SERVICE_client_continue (client);
1318 }
1319
1320
1321 /**
1322  * Function called by plugins to notify us about a
1323  * change in their disk utilization.
1324  *
1325  * @param cls closure (NULL)
1326  * @param delta change in disk utilization,
1327  *        0 for "reset to empty"
1328  */
1329 static void
1330 disk_utilization_change_cb (void *cls,
1331                             int delta)
1332 {
1333   if ((delta < 0) && (payload < -delta))
1334   {
1335     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1336                 _("Datastore payload must have been inaccurate (%lld < %lld). Recomputing it.\n"),
1337                 (long long) payload,
1338                 (long long) -delta);
1339     plugin->api->estimate_size (plugin->api->cls,
1340                                 &payload);
1341     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1342                 _("New payload: %lld\n"),
1343                 (long long) payload);
1344      sync_stats ();
1345     return;
1346   }
1347   payload += delta;
1348   last_sync++;
1349   if (last_sync >= MAX_STAT_SYNC_LAG)
1350     sync_stats ();
1351 }
1352
1353
1354 /**
1355  * Callback function to process statistic values.
1356  *
1357  * @param cls closure (struct Plugin*)
1358  * @param subsystem name of subsystem that created the statistic
1359  * @param name the name of the datum
1360  * @param value the current value
1361  * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
1362  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
1363  */
1364 static int
1365 process_stat_in (void *cls,
1366                  const char *subsystem,
1367                  const char *name,
1368                  uint64_t value,
1369                  int is_persistent)
1370 {
1371   GNUNET_assert (GNUNET_NO == stats_worked);
1372   stats_worked = GNUNET_YES;
1373   payload += value;
1374   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1375               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1376               (unsigned long long) value,
1377               (unsigned long long) payload);
1378   return GNUNET_OK;
1379 }
1380
1381
1382 /**
1383  * Load the datastore plugin.
1384  */
1385 static struct DatastorePlugin *
1386 load_plugin ()
1387 {
1388   struct DatastorePlugin *ret;
1389   char *libname;
1390
1391   ret = GNUNET_new (struct DatastorePlugin);
1392   ret->env.cfg = cfg;
1393   ret->env.duc = &disk_utilization_change_cb;
1394   ret->env.cls = NULL;
1395   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1396               _("Loading `%s' datastore plugin\n"),
1397               plugin_name);
1398   GNUNET_asprintf (&libname,
1399                    "libgnunet_plugin_datastore_%s",
1400                    plugin_name);
1401   ret->short_name = GNUNET_strdup (plugin_name);
1402   ret->lib_name = libname;
1403   ret->api = GNUNET_PLUGIN_load (libname,
1404                                  &ret->env);
1405   if (NULL == ret->api)
1406   {
1407     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1408                 _("Failed to load datastore plugin for `%s'\n"),
1409                 plugin_name);
1410     GNUNET_free (ret->short_name);
1411     GNUNET_free (libname);
1412     GNUNET_free (ret);
1413     return NULL;
1414   }
1415   return ret;
1416 }
1417
1418
1419 /**
1420  * Function called when the service shuts
1421  * down.  Unloads our datastore plugin.
1422  *
1423  * @param plug plugin to unload
1424  */
1425 static void
1426 unload_plugin (struct DatastorePlugin *plug)
1427 {
1428   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1429               "Datastore service is unloading plugin...\n");
1430   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1431   GNUNET_free (plug->lib_name);
1432   GNUNET_free (plug->short_name);
1433   GNUNET_free (plug);
1434 }
1435
1436
1437 /**
1438  * Initialization complete, start operating the service.
1439  */
1440 static void
1441 begin_service ()
1442 {
1443   GNUNET_SERVICE_resume (service);
1444   expired_kill_task
1445     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1446                                           &delete_expired,
1447                                           NULL);
1448 }
1449
1450
1451 /**
1452  * Adds a given @a key to the bloomfilter in @a cls @a count times.
1453  *
1454  * @param cls the bloomfilter
1455  * @param key key to add
1456  * @param count number of times to add key
1457  */
1458 static void
1459 add_key_to_bloomfilter (void *cls,
1460                         const struct GNUNET_HashCode *key,
1461                         unsigned int count)
1462 {
1463   struct GNUNET_CONTAINER_BloomFilter *bf = cls;
1464
1465   if (NULL == key)
1466   {
1467     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1468                 _("Bloomfilter construction complete.\n"));
1469     begin_service ();
1470     return;
1471   }
1472
1473   while (0 < count--)
1474     GNUNET_CONTAINER_bloomfilter_add (bf,
1475                                       key);
1476 }
1477
1478
1479 /**
1480  * We finished receiving the statistic.  Initialize the plugin; if
1481  * loading the statistic failed, run the estimator.
1482  *
1483  * @param cls NULL
1484  * @param success #GNUNET_NO if we failed to read the stat
1485  */
1486 static void
1487 process_stat_done (void *cls,
1488                    int success)
1489 {
1490   stat_get = NULL;
1491   if (NULL != stat_timeout_task)
1492   {
1493     GNUNET_SCHEDULER_cancel (stat_timeout_task);
1494     stat_timeout_task = NULL;
1495   }
1496   plugin = load_plugin ();
1497   if (NULL == plugin)
1498   {
1499     GNUNET_CONTAINER_bloomfilter_free (filter);
1500     filter = NULL;
1501     if (NULL != stats)
1502     {
1503       GNUNET_STATISTICS_destroy (stats,
1504                                  GNUNET_YES);
1505       stats = NULL;
1506     }
1507     return;
1508   }
1509
1510   if (GNUNET_NO == stats_worked)
1511   {
1512     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1513                 "Failed to obtain value from statistics service, recomputing it\n");
1514     plugin->api->estimate_size (plugin->api->cls,
1515                                 &payload);
1516     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1517                 _("New payload: %lld\n"),
1518                 (long long) payload);
1519   }
1520
1521   if (GNUNET_YES == refresh_bf)
1522   {
1523     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1524                 _("Rebuilding bloomfilter.  Please be patient.\n"));
1525     if (NULL != plugin->api->get_keys)
1526     {
1527       plugin->api->get_keys (plugin->api->cls,
1528                              &add_key_to_bloomfilter,
1529                              filter);
1530       return;
1531     }
1532     else
1533     {
1534       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1535                   _("Plugin does not support get_keys function. Please fix!\n"));
1536     }
1537   }
1538   begin_service ();
1539 }
1540
1541
1542 /**
1543  * Fetching stats took to long, run without.
1544  *
1545  * @param cls NULL
1546  */
1547 static void
1548 stat_timeout (void *cls)
1549 {
1550   stat_timeout_task = NULL;
1551   GNUNET_STATISTICS_get_cancel (stat_get);
1552   process_stat_done (NULL,
1553                      GNUNET_NO);
1554 }
1555
1556
1557 /**
1558  * Task run during shutdown.
1559  */
1560 static void
1561 cleaning_task (void *cls)
1562 {
1563   cleaning_done = GNUNET_YES;
1564   if (NULL != expired_kill_task)
1565   {
1566     GNUNET_SCHEDULER_cancel (expired_kill_task);
1567     expired_kill_task = NULL;
1568   }
1569   if (GNUNET_YES == do_drop)
1570   {
1571     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1572                 "Dropping database!\n");
1573     plugin->api->drop (plugin->api->cls);
1574     payload = 0;
1575     last_sync++;
1576   }
1577   if (NULL != plugin)
1578   {
1579     unload_plugin (plugin);
1580     plugin = NULL;
1581   }
1582   if (NULL != filter)
1583   {
1584     GNUNET_CONTAINER_bloomfilter_free (filter);
1585     filter = NULL;
1586   }
1587   if (NULL != stat_get)
1588   {
1589     GNUNET_STATISTICS_get_cancel (stat_get);
1590     stat_get = NULL;
1591   }
1592   if (NULL != stat_timeout_task)
1593   {
1594     GNUNET_SCHEDULER_cancel (stat_timeout_task);
1595     stat_timeout_task = NULL;
1596   }
1597   GNUNET_free_non_null (plugin_name);
1598   plugin_name = NULL;
1599   if (last_sync > 0)
1600     sync_stats ();
1601   if (NULL != stats)
1602   {
1603     GNUNET_STATISTICS_destroy (stats,
1604                                GNUNET_YES);
1605     stats = NULL;
1606   }
1607   GNUNET_free (quota_stat_name);
1608   quota_stat_name = NULL;
1609 }
1610
1611
1612 /**
1613  * Add a client to our list of active clients.
1614  *
1615  * @param cls NULL
1616  * @param client client to add
1617  * @param mq message queue for @a client
1618  * @return @a client
1619  */
1620 static void *
1621 client_connect_cb (void *cls,
1622                    struct GNUNET_SERVICE_Client *client,
1623                    struct GNUNET_MQ_Handle *mq)
1624 {
1625   return client;
1626 }
1627
1628
1629 /**
1630  * Called whenever a client is disconnected.
1631  * Frees our resources associated with that client.
1632  *
1633  * @param cls closure
1634  * @param client identification of the client
1635  * @param app_ctx must match @a client
1636  */
1637 static void
1638 client_disconnect_cb (void *cls,
1639                       struct GNUNET_SERVICE_Client *client,
1640                       void *app_ctx)
1641 {
1642   struct ReservationList *pos;
1643   struct ReservationList *prev;
1644   struct ReservationList *next;
1645
1646   GNUNET_assert (app_ctx == client);
1647   prev = NULL;
1648   pos = reservations;
1649   while (NULL != pos)
1650   {
1651     next = pos->next;
1652     if (pos->client == client)
1653     {
1654       if (NULL == prev)
1655         reservations = next;
1656       else
1657         prev->next = next;
1658       reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1659       GNUNET_free (pos);
1660     }
1661     else
1662     {
1663       prev = pos;
1664     }
1665     pos = next;
1666   }
1667   GNUNET_STATISTICS_set (stats,
1668                          gettext_noop ("# reserved"),
1669                          reserved,
1670                          GNUNET_NO);
1671
1672 }
1673
1674
1675 /**
1676  * Process datastore requests.
1677  *
1678  * @param cls closure
1679  * @param serv the initialized service
1680  * @param c configuration to use
1681  */
1682 static void
1683 run (void *cls,
1684      const struct GNUNET_CONFIGURATION_Handle *c,
1685      struct GNUNET_SERVICE_Handle *serv)
1686 {
1687   char *fn;
1688   char *pfn;
1689   unsigned int bf_size;
1690
1691   service = serv;
1692   cfg = c;
1693   if (GNUNET_OK !=
1694       GNUNET_CONFIGURATION_get_value_string (cfg,
1695                                              "DATASTORE",
1696                                              "DATABASE",
1697                                              &plugin_name))
1698   {
1699     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1700                                "DATABASE",
1701                                "DATASTORE");
1702     return;
1703   }
1704   GNUNET_asprintf (&quota_stat_name,
1705                    _("# bytes used in file-sharing datastore `%s'"),
1706                    plugin_name);
1707   if (GNUNET_OK !=
1708       GNUNET_CONFIGURATION_get_value_size (cfg,
1709                                            "DATASTORE",
1710                                            "QUOTA",
1711                                            &quota))
1712   {
1713     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1714                                "QUOTA",
1715                                "DATASTORE");
1716     return;
1717   }
1718   stats = GNUNET_STATISTICS_create ("datastore",
1719                                     cfg);
1720   GNUNET_STATISTICS_set (stats,
1721                          gettext_noop ("# quota"),
1722                          quota,
1723                          GNUNET_NO);
1724   cache_size = quota / 8;       /* Or should we make this an option? */
1725   GNUNET_STATISTICS_set (stats,
1726                          gettext_noop ("# cache size"),
1727                          cache_size,
1728                          GNUNET_NO);
1729   if (quota / (32 * 1024LL) > MAX_BF_SIZE)
1730     bf_size = MAX_BF_SIZE;
1731   else
1732     bf_size = quota / (32 * 1024LL);         /* 8 bit per entry, 1 bit per 32 kb in DB */
1733   fn = NULL;
1734   if ((GNUNET_OK !=
1735        GNUNET_CONFIGURATION_get_value_filename (cfg,
1736                                                 "DATASTORE",
1737                                                 "BLOOMFILTER",
1738                                                 &fn)) ||
1739       (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
1740   {
1741     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1742                 _("Could not use specified filename `%s' for bloomfilter.\n"),
1743                 NULL != fn ? fn : "");
1744     GNUNET_free_non_null (fn);
1745     fn = NULL;
1746   }
1747   if (NULL != fn)
1748   {
1749     GNUNET_asprintf (&pfn, "%s.%s", fn, plugin_name);
1750     if (GNUNET_YES == GNUNET_DISK_file_test (pfn))
1751     {
1752       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1753       if (NULL == filter)
1754       {
1755         /* file exists but not valid, remove and try again, but refresh */
1756         if (0 != UNLINK (pfn))
1757         {
1758           /* failed to remove, run without file */
1759           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1760                       _("Failed to remove bogus bloomfilter file `%s'\n"),
1761                       pfn);
1762           GNUNET_free (pfn);
1763           pfn = NULL;
1764           filter = GNUNET_CONTAINER_bloomfilter_load (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1765           refresh_bf = GNUNET_YES;
1766         }
1767         else
1768         {
1769           /* try again after remove */
1770           filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1771           refresh_bf = GNUNET_YES;
1772           if (NULL == filter)
1773           {
1774             /* failed yet again, give up on using file */
1775             GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1776                         _("Failed to remove bogus bloomfilter file `%s'\n"),
1777                         pfn);
1778             GNUNET_free (pfn);
1779             pfn = NULL;
1780             filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);        /* approx. 3% false positives at max use */
1781           }
1782         }
1783       }
1784       else
1785       {
1786         /* normal case: have an existing valid bf file, no need to refresh */
1787         refresh_bf = GNUNET_NO;
1788       }
1789     }
1790     else
1791     {
1792       filter = GNUNET_CONTAINER_bloomfilter_load (pfn, bf_size, 5);        /* approx. 3% false positives at max use */
1793       refresh_bf = GNUNET_YES;
1794     }
1795     GNUNET_free (pfn);
1796   }
1797   else
1798   {
1799     filter = GNUNET_CONTAINER_bloomfilter_init (NULL,
1800                                                 bf_size,
1801                                                 5);      /* approx. 3% false positives at max use */
1802     refresh_bf = GNUNET_YES;
1803   }
1804   GNUNET_free_non_null (fn);
1805   if (NULL == filter)
1806   {
1807     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1808                 _("Failed to initialize bloomfilter.\n"));
1809     if (NULL != stats)
1810     {
1811       GNUNET_STATISTICS_destroy (stats,
1812                                  GNUNET_YES);
1813       stats = NULL;
1814     }
1815     return;
1816   }
1817   GNUNET_SERVICE_suspend (service);
1818   stat_get =
1819       GNUNET_STATISTICS_get (stats,
1820                              "datastore",
1821                              quota_stat_name,
1822                              &process_stat_done,
1823                              &process_stat_in,
1824                              NULL);
1825   if (NULL == stat_get)
1826     process_stat_done (NULL,
1827                        GNUNET_SYSERR);
1828   else
1829     stat_timeout_task
1830       = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
1831                                       &stat_timeout,
1832                                       NULL);
1833   GNUNET_SCHEDULER_add_shutdown (&cleaning_task,
1834                                  NULL);
1835 }
1836
1837
1838 /**
1839  * Define "main" method using service macro.
1840  */
1841 GNUNET_SERVICE_MAIN
1842 ("datastore",
1843  GNUNET_SERVICE_OPTION_NONE,
1844  &run,
1845  &client_connect_cb,
1846  &client_disconnect_cb,
1847  NULL,
1848  GNUNET_MQ_hd_fixed_size (reserve,
1849                           GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1850                           struct ReserveMessage,
1851                           NULL),
1852  GNUNET_MQ_hd_fixed_size (release_reserve,
1853                           GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1854                           struct ReleaseReserveMessage,
1855                           NULL),
1856  GNUNET_MQ_hd_var_size (put,
1857                         GNUNET_MESSAGE_TYPE_DATASTORE_PUT,
1858                         struct DataMessage,
1859                         NULL),
1860  GNUNET_MQ_hd_fixed_size (update,
1861                           GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1862                           struct UpdateMessage,
1863                           NULL),
1864  GNUNET_MQ_hd_fixed_size (get,
1865                           GNUNET_MESSAGE_TYPE_DATASTORE_GET,
1866                           struct GetMessage,
1867                           NULL),
1868  GNUNET_MQ_hd_fixed_size (get_key,
1869                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY,
1870                           struct GetKeyMessage,
1871                           NULL),
1872  GNUNET_MQ_hd_fixed_size (get_replication,
1873                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1874                           struct GNUNET_MessageHeader,
1875                           NULL),
1876  GNUNET_MQ_hd_fixed_size (get_zero_anonymity,
1877                           GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1878                           struct GetZeroAnonymityMessage,
1879                           NULL),
1880  GNUNET_MQ_hd_var_size (remove,
1881                         GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE,
1882                         struct DataMessage,
1883                         NULL),
1884  GNUNET_MQ_hd_fixed_size (drop,
1885                           GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1886                           struct GNUNET_MessageHeader,
1887                           NULL),
1888  GNUNET_MQ_handler_end ());
1889
1890
1891 /* end of gnunet-service-datastore.c */