2 This file is part of GNUnet
3 (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file datastore/gnunet-service-datastore.c
23 * @brief Management for the datastore for files stored on a GNUnet node
24 * @author Christian Grothoff
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
35 * How many messages do we queue at most per client?
37 #define MAX_PENDING 1024
40 * How long are we at most keeping "expired" content
41 * past the expiration date in the database?
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
45 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
48 * After how many payload-changing operations
49 * do we sync our statistics?
51 #define MAX_STAT_SYNC_LAG 50
55 * Our datastore plugin.
57 struct DatastorePlugin
61 * API of the transport as returned by the plugin's
62 * initialization function.
64 struct GNUNET_DATASTORE_PluginFunctions *api;
67 * Short name for the plugin (i.e. "sqlite").
72 * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
77 * Environment this transport service is using
80 struct GNUNET_DATASTORE_PluginEnvironment env;
86 * Linked list of active reservations.
88 struct ReservationList
92 * This is a linked list.
94 struct ReservationList *next;
97 * Client that made the reservation.
99 struct GNUNET_SERVER_Client *client;
102 * Number of bytes (still) reserved.
107 * Number of items (still) reserved.
112 * Reservation identifier.
121 * Our datastore plugin (NULL if not available).
123 static struct DatastorePlugin *plugin;
126 * Linked list of space reservations made by clients.
128 static struct ReservationList *reservations;
131 * Bloomfilter to quickly tell if we don't have the content.
133 static struct GNUNET_CONTAINER_BloomFilter *filter;
136 * How much space are we allowed to use?
138 static unsigned long long quota;
141 * How much space are we using for the cache? (space available for
142 * insertions that will be instantly reclaimed by discarding less
143 * important content --- or possibly whatever we just inserted into
146 static unsigned long long cache_size;
149 * How much space have we currently reserved?
151 static unsigned long long reserved;
154 * How much data are we currently storing
157 static unsigned long long payload;
160 * Number of updates that were made to the
161 * payload value since we last synchronized
162 * it with the statistics service.
164 static unsigned int lastSync;
167 * Did we get an answer from statistics?
169 static int stats_worked;
172 * Identity of the task that is used to delete
175 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
180 const struct GNUNET_CONFIGURATION_Handle *cfg;
184 * Handle for reporting statistics.
186 static struct GNUNET_STATISTICS_Handle *stats;
190 * Synchronize our utilization statistics with the
191 * statistics service.
196 GNUNET_STATISTICS_set (stats,
207 * Function called once the transmit operation has
208 * either failed or succeeded.
211 * @param status GNUNET_OK on success, GNUNET_SYSERR on error
213 typedef void (*TransmitContinuation)(void *cls,
218 * Context for transmitting replies to clients.
220 struct TransmitCallbackContext
224 * We keep these in a doubly-linked list (for cleanup).
226 struct TransmitCallbackContext *next;
229 * We keep these in a doubly-linked list (for cleanup).
231 struct TransmitCallbackContext *prev;
234 * The message that we're asked to transmit.
236 struct GNUNET_MessageHeader *msg;
239 * Handle for the transmission request.
241 struct GNUNET_CONNECTION_TransmitHandle *th;
244 * Client that we are transmitting to.
246 struct GNUNET_SERVER_Client *client;
249 * Function to call once msg has been transmitted
250 * (or at least added to the buffer).
252 TransmitContinuation tc;
260 * GNUNET_YES if we are supposed to signal the server
261 * completion of the client's request.
268 * Head of the doubly-linked list (for cleanup).
270 static struct TransmitCallbackContext *tcc_head;
273 * Tail of the doubly-linked list (for cleanup).
275 static struct TransmitCallbackContext *tcc_tail;
278 * Have we already cleaned up the TCCs and are hence no longer
279 * willing (or able) to transmit anything to anyone?
281 static int cleaning_done;
284 * Handle for pending get request.
286 static struct GNUNET_STATISTICS_GetHandle *stat_get;
290 * Task that is used to remove expired entries from
291 * the datastore. This task will schedule itself
292 * again automatically to always delete all expired
295 * @param cls not used
296 * @param tc task context
299 delete_expired (void *cls,
300 const struct GNUNET_SCHEDULER_TaskContext *tc);
304 * Iterate over the expired items stored in the datastore.
305 * Delete all expired items; once we have processed all
306 * expired items, re-schedule the "delete_expired" task.
308 * @param cls not used
309 * @param next_cls closure to pass to the "next" function.
310 * @param key key for the content
311 * @param size number of bytes in data
312 * @param data content stored
313 * @param type type of the content
314 * @param priority priority of the content
315 * @param anonymity anonymity-level for the content
316 * @param expiration expiration time for the content
317 * @param uid unique identifier for the datum;
318 * maybe 0 if no unique identifier is available
320 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
321 * (continue on call to "next", of course),
322 * GNUNET_NO to delete the item and continue (if supported)
325 expired_processor (void *cls,
327 const GNUNET_HashCode * key,
330 enum GNUNET_BLOCK_Type type,
333 struct GNUNET_TIME_Absolute
337 struct GNUNET_TIME_Absolute now;
342 = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
345 return GNUNET_SYSERR;
347 now = GNUNET_TIME_absolute_get ();
348 if (expiration.abs_value > now.abs_value)
350 /* finished processing */
351 plugin->api->next_request (next_cls, GNUNET_YES);
352 return GNUNET_SYSERR;
354 plugin->api->next_request (next_cls, GNUNET_NO);
356 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
357 "Deleting content `%s' of type %u that expired %llu ms ago\n",
360 (unsigned long long) (now.abs_value - expiration.abs_value));
362 GNUNET_STATISTICS_update (stats,
363 gettext_noop ("# bytes expired"),
366 GNUNET_CONTAINER_bloomfilter_remove (filter,
368 return GNUNET_NO; /* delete */
373 * Task that is used to remove expired entries from
374 * the datastore. This task will schedule itself
375 * again automatically to always delete all expired
378 * @param cls not used
379 * @param tc task context
382 delete_expired (void *cls,
383 const struct GNUNET_SCHEDULER_TaskContext *tc)
385 expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
386 plugin->api->iter_ascending_expiration (plugin->api->cls,
394 * An iterator over a set of items stored in the datastore.
397 * @param next_cls closure to pass to the "next" function.
398 * @param key key for the content
399 * @param size number of bytes in data
400 * @param data content stored
401 * @param type type of the content
402 * @param priority priority of the content
403 * @param anonymity anonymity-level for the content
404 * @param expiration expiration time for the content
405 * @param uid unique identifier for the datum;
406 * maybe 0 if no unique identifier is available
408 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
409 * (continue on call to "next", of course),
410 * GNUNET_NO to delete the item and continue (if supported)
415 const GNUNET_HashCode * key,
418 enum GNUNET_BLOCK_Type type,
421 struct GNUNET_TIME_Absolute
425 unsigned long long *need = cls;
430 return GNUNET_SYSERR;
432 if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
435 *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
436 plugin->api->next_request (next_cls,
437 (0 == *need) ? GNUNET_YES : GNUNET_NO);
439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
440 "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
441 (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
446 GNUNET_STATISTICS_update (stats,
447 gettext_noop ("# bytes purged (low-priority)"),
450 GNUNET_CONTAINER_bloomfilter_remove (filter,
457 * Manage available disk space by running tasks
458 * that will discard content if necessary. This
459 * function will be run whenever a request for
460 * "need" bytes of storage could only be satisfied
461 * by eating into the "cache" (and we want our cache
464 * @param need number of bytes of content that were
465 * placed into the "cache" (and hence the
466 * number of bytes that should be removed).
469 manage_space (unsigned long long need)
471 unsigned long long *n;
474 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
475 "Asked to free up %llu bytes of cache space\n",
478 n = GNUNET_malloc (sizeof(unsigned long long));
480 plugin->api->iter_low_priority (plugin->api->cls,
488 * Function called to notify a client about the socket
489 * begin ready to queue more data. "buf" will be
490 * NULL and "size" zero if the socket was closed for
491 * writing in the meantime.
494 * @param size number of bytes available in buf
495 * @param buf where the callee should write the message
496 * @return number of bytes written to buf
499 transmit_callback (void *cls,
500 size_t size, void *buf)
502 struct TransmitCallbackContext *tcc = cls;
506 GNUNET_CONTAINER_DLL_remove (tcc_head,
509 msize = ntohs(tcc->msg->size);
512 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
513 _("Transmission to client failed!\n"));
515 tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
516 if (GNUNET_YES == tcc->end)
518 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
519 _("Disconnecting client due to transmission failure!\n"));
520 GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
522 GNUNET_SERVER_client_drop (tcc->client);
523 GNUNET_free (tcc->msg);
527 GNUNET_assert (size >= msize);
528 memcpy (buf, tcc->msg, msize);
530 tcc->tc (tcc->tc_cls, GNUNET_OK);
531 if (GNUNET_YES == tcc->end)
533 GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
538 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
539 "Response transmitted, more pending!\n");
542 GNUNET_SERVER_client_drop (tcc->client);
543 GNUNET_free (tcc->msg);
550 * Transmit the given message to the client.
552 * @param client target of the message
553 * @param msg message to transmit, will be freed!
554 * @param tc function to call afterwards
555 * @param tc_cls closure for tc
556 * @param end is this the last response (and we should
557 * signal the server completion accodingly after
558 * transmitting this message)?
561 transmit (struct GNUNET_SERVER_Client *client,
562 struct GNUNET_MessageHeader *msg,
563 TransmitContinuation tc,
567 struct TransmitCallbackContext *tcc;
569 if (GNUNET_YES == cleaning_done)
572 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
573 "Shutdown in progress, aborting transmission.\n");
577 tc (tc_cls, GNUNET_SYSERR);
580 tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
582 tcc->client = client;
584 tcc->tc_cls = tc_cls;
587 (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
589 GNUNET_TIME_UNIT_FOREVER_REL,
594 if (GNUNET_YES == end)
596 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
597 _("Forcefully disconnecting client.\n"));
598 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
601 tc (tc_cls, GNUNET_SYSERR);
606 GNUNET_SERVER_client_keep (client);
607 GNUNET_CONTAINER_DLL_insert (tcc_head,
614 * Transmit a status code to the client.
616 * @param client receiver of the response
617 * @param code status code
618 * @param msg optional error message (can be NULL)
621 transmit_status (struct GNUNET_SERVER_Client *client,
625 struct StatusMessage *sm;
629 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
630 "Transmitting `%s' message with value %d and message `%s'\n",
633 msg != NULL ? msg : "(none)");
635 slen = (msg == NULL) ? 0 : strlen(msg) + 1;
636 sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
637 sm->header.size = htons(sizeof(struct StatusMessage) + slen);
638 sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
639 sm->status = htonl(code);
641 memcpy (&sm[1], msg, slen);
642 transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
647 * Function called once the transmit operation has
648 * either failed or succeeded.
650 * @param next_cls closure for calling "next_request" callback
651 * @param status GNUNET_OK on success, GNUNET_SYSERR on error
654 get_next(void *next_cls,
657 if (status != GNUNET_OK)
659 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
660 _("Failed to transmit an item to the client; aborting iteration.\n"));
662 plugin->api->next_request (next_cls, GNUNET_YES);
665 plugin->api->next_request (next_cls, GNUNET_NO);
670 * Function that will transmit the given datastore entry
673 * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
674 * @param next_cls closure to use to ask for the next item
675 * @param key key for the content
676 * @param size number of bytes in data
677 * @param data content stored
678 * @param type type of the content
679 * @param priority priority of the content
680 * @param anonymity anonymity-level for the content
681 * @param expiration expiration time for the content
682 * @param uid unique identifier for the datum;
683 * maybe 0 if no unique identifier is available
685 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
686 * GNUNET_NO to delete the item and continue (if supported)
689 transmit_item (void *cls,
691 const GNUNET_HashCode * key,
694 enum GNUNET_BLOCK_Type type,
697 struct GNUNET_TIME_Absolute
698 expiration, uint64_t uid)
700 struct GNUNET_SERVER_Client *client = cls;
701 struct GNUNET_MessageHeader *end;
702 struct DataMessage *dm;
706 /* transmit 'DATA_END' */
708 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
709 "Transmitting `%s' message\n",
712 end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
713 end->size = htons(sizeof(struct GNUNET_MessageHeader));
714 end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
715 transmit (client, end, NULL, NULL, GNUNET_YES);
716 GNUNET_SERVER_client_drop (client);
719 dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
720 dm->header.size = htons(sizeof(struct DataMessage) + size);
721 dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
723 dm->size = htonl(size);
724 dm->type = htonl(type);
725 dm->priority = htonl(priority);
726 dm->anonymity = htonl(anonymity);
727 dm->expiration = GNUNET_TIME_absolute_hton(expiration);
728 dm->uid = GNUNET_htonll(uid);
730 memcpy (&dm[1], data, size);
732 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
733 "Transmitting `%s' message for `%s' of type %u\n",
738 GNUNET_STATISTICS_update (stats,
739 gettext_noop ("# results found"),
742 transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
748 * Handle RESERVE-message.
751 * @param client identification of the client
752 * @param message the actual message
755 handle_reserve (void *cls,
756 struct GNUNET_SERVER_Client *client,
757 const struct GNUNET_MessageHeader *message)
760 * Static counter to produce reservation identifiers.
762 static int reservation_gen;
764 const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
765 struct ReservationList *e;
766 unsigned long long used;
767 unsigned long long req;
772 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
773 "Processing `%s' request\n",
776 amount = GNUNET_ntohll(msg->amount);
777 entries = ntohl(msg->entries);
778 used = payload + reserved;
779 req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
780 if (used + req > quota)
783 used = quota; /* cheat a bit for error message (to avoid negative numbers) */
784 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
785 _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
789 if (cache_size < req)
791 /* TODO: document this in the FAQ; essentially, if this
792 message happens, the insertion request could be blocked
793 by less-important content from migration because it is
794 larger than 1/8th of the overall available space, and
795 we only reserve 1/8th for "fresh" insertions */
796 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
797 _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
800 transmit_status (client, 0,
801 gettext_noop ("Insufficient space to satisfy request and "
802 "requested amount is larger than cache size"));
806 transmit_status (client, 0,
807 gettext_noop ("Insufficient space to satisfy request"));
812 GNUNET_STATISTICS_set (stats,
813 gettext_noop ("# reserved"),
816 e = GNUNET_malloc (sizeof(struct ReservationList));
817 e->next = reservations;
821 e->entries = entries;
822 e->rid = ++reservation_gen;
823 if (reservation_gen < 0)
824 reservation_gen = 0; /* wrap around */
825 transmit_status (client, e->rid, NULL);
830 * Handle RELEASE_RESERVE-message.
833 * @param client identification of the client
834 * @param message the actual message
837 handle_release_reserve (void *cls,
838 struct GNUNET_SERVER_Client *client,
839 const struct GNUNET_MessageHeader *message)
841 const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
842 struct ReservationList *pos;
843 struct ReservationList *prev;
844 struct ReservationList *next;
845 int rid = ntohl(msg->rid);
846 unsigned long long rem;
849 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850 "Processing `%s' request\n",
855 while (NULL != (pos = next))
864 rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
865 GNUNET_assert (reserved >= rem);
867 GNUNET_STATISTICS_set (stats,
868 gettext_noop ("# reserved"),
872 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873 "Returning %llu remaining reserved bytes to storage pool\n",
877 transmit_status (client, GNUNET_OK, NULL);
883 transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
888 * Check that the given message is a valid data message.
890 * @return NULL if the message is not well-formed, otherwise the message
892 static const struct DataMessage *
893 check_data (const struct GNUNET_MessageHeader *message)
897 const struct DataMessage *dm;
899 size = ntohs(message->size);
900 if (size < sizeof(struct DataMessage))
905 dm = (const struct DataMessage *) message;
906 dsize = ntohl(dm->size);
907 if (size != dsize + sizeof(struct DataMessage))
917 * Context for a put request used to see if the content is
923 * Client to notify on completion.
925 struct GNUNET_SERVER_Client *client;
928 * Did we find the data already in the database?
932 /* followed by the 'struct DataMessage' */
937 * Actually put the data message.
940 execute_put (struct GNUNET_SERVER_Client *client,
941 const struct DataMessage *dm)
947 size = ntohl(dm->size);
949 ret = plugin->api->put (plugin->api->cls,
955 ntohl(dm->anonymity),
956 GNUNET_TIME_absolute_ntoh(dm->expiration),
958 if (GNUNET_OK == ret)
960 GNUNET_STATISTICS_update (stats,
961 gettext_noop ("# bytes stored"),
964 GNUNET_CONTAINER_bloomfilter_add (filter,
967 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
968 "Successfully stored %u bytes of type %u under key `%s'\n",
971 GNUNET_h2s (&dm->key));
974 transmit_status (client,
975 (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK,
977 GNUNET_free_non_null (msg);
978 if (quota - reserved - cache_size < payload)
980 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
981 _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
982 (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
983 (unsigned long long) (quota - reserved - cache_size),
984 (unsigned long long) payload);
985 manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
992 * Function that will check if the given datastore entry
993 * matches the put and if none match executes the put.
995 * @param cls closure, pointer to the client (of type 'struct PutContext').
996 * @param next_cls closure to use to ask for the next item
997 * @param key key for the content
998 * @param size number of bytes in data
999 * @param data content stored
1000 * @param type type of the content
1001 * @param priority priority of the content
1002 * @param anonymity anonymity-level for the content
1003 * @param expiration expiration time for the content
1004 * @param uid unique identifier for the datum;
1005 * maybe 0 if no unique identifier is available
1007 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
1008 * GNUNET_NO to delete the item and continue (if supported)
1011 check_present (void *cls,
1013 const GNUNET_HashCode * key,
1016 enum GNUNET_BLOCK_Type type,
1019 struct GNUNET_TIME_Absolute
1020 expiration, uint64_t uid)
1022 struct PutContext *pc = cls;
1023 const struct DataMessage *dm;
1025 dm = (const struct DataMessage*) &pc[1];
1028 if (pc->is_present == GNUNET_YES)
1029 transmit_status (pc->client, GNUNET_OK, NULL);
1031 execute_put (pc->client, dm);
1032 GNUNET_SERVER_client_drop (pc->client);
1034 return GNUNET_SYSERR;
1036 if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
1037 (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
1038 ( (size == ntohl(dm->size)) &&
1039 (0 == memcmp (&dm[1],
1043 pc->is_present = GNUNET_YES;
1044 plugin->api->next_request (next_cls, GNUNET_YES);
1048 plugin->api->next_request (next_cls, GNUNET_NO);
1055 * Handle PUT-message.
1057 * @param cls closure
1058 * @param client identification of the client
1059 * @param message the actual message
1062 handle_put (void *cls,
1063 struct GNUNET_SERVER_Client *client,
1064 const struct GNUNET_MessageHeader *message)
1066 const struct DataMessage *dm = check_data (message);
1068 struct ReservationList *pos;
1069 struct PutContext *pc;
1072 if ( (dm == NULL) ||
1073 (ntohl(dm->type) == 0) )
1076 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1080 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1081 "Processing `%s' request for `%s' of type %u\n",
1083 GNUNET_h2s (&dm->key),
1086 rid = ntohl(dm->rid);
1087 size = ntohl(dm->size);
1091 while ( (NULL != pos) &&
1094 GNUNET_break (pos != NULL);
1097 GNUNET_break (pos->entries > 0);
1098 GNUNET_break (pos->amount >= size);
1100 pos->amount -= size;
1101 reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1102 GNUNET_STATISTICS_set (stats,
1103 gettext_noop ("# reserved"),
1108 if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1111 pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1112 pc->client = client;
1113 GNUNET_SERVER_client_keep (client);
1114 memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1115 plugin->api->get (plugin->api->cls,
1123 execute_put (client, dm);
1128 * Handle GET-message.
1130 * @param cls closure
1131 * @param client identification of the client
1132 * @param message the actual message
1135 handle_get (void *cls,
1136 struct GNUNET_SERVER_Client *client,
1137 const struct GNUNET_MessageHeader *message)
1139 const struct GetMessage *msg;
1142 size = ntohs(message->size);
1143 if ( (size != sizeof(struct GetMessage)) &&
1144 (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
1147 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1150 msg = (const struct GetMessage*) message;
1152 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1153 "Processing `%s' request for `%s' of type %u\n",
1155 GNUNET_h2s (&msg->key),
1158 GNUNET_STATISTICS_update (stats,
1159 gettext_noop ("# GET requests received"),
1162 GNUNET_SERVER_client_keep (client);
1163 if ( (size == sizeof(struct GetMessage)) &&
1164 (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
1167 /* don't bother database... */
1169 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1170 "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1172 GNUNET_h2s (&msg->key));
1174 GNUNET_STATISTICS_update (stats,
1175 gettext_noop ("# requests filtered by bloomfilter"),
1178 transmit_item (client,
1179 NULL, NULL, 0, NULL, 0, 0, 0,
1180 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1183 plugin->api->get (plugin->api->cls,
1184 ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1193 * Handle UPDATE-message.
1195 * @param cls closure
1196 * @param client identification of the client
1197 * @param message the actual message
1200 handle_update (void *cls,
1201 struct GNUNET_SERVER_Client *client,
1202 const struct GNUNET_MessageHeader *message)
1204 const struct UpdateMessage *msg;
1208 GNUNET_STATISTICS_update (stats,
1209 gettext_noop ("# UPDATE requests received"),
1212 msg = (const struct UpdateMessage*) message;
1215 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1216 "Processing `%s' request for %llu\n",
1218 (unsigned long long) GNUNET_ntohll (msg->uid));
1220 ret = plugin->api->update (plugin->api->cls,
1221 GNUNET_ntohll(msg->uid),
1222 (int32_t) ntohl(msg->priority),
1223 GNUNET_TIME_absolute_ntoh(msg->expiration),
1225 transmit_status (client, ret, emsg);
1226 GNUNET_free_non_null (emsg);
1231 * Handle GET_RANDOM-message.
1233 * @param cls closure
1234 * @param client identification of the client
1235 * @param message the actual message
1238 handle_get_random (void *cls,
1239 struct GNUNET_SERVER_Client *client,
1240 const struct GNUNET_MessageHeader *message)
1243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1244 "Processing `%s' request\n",
1247 GNUNET_STATISTICS_update (stats,
1248 gettext_noop ("# GET RANDOM requests received"),
1251 GNUNET_SERVER_client_keep (client);
1252 plugin->api->iter_migration_order (plugin->api->cls,
1253 GNUNET_BLOCK_TYPE_ANY,
1259 * Handle GET_ZERO_ANONYMITY-message.
1261 * @param cls closure
1262 * @param client identification of the client
1263 * @param message the actual message
1266 handle_get_zero_anonymity (void *cls,
1267 struct GNUNET_SERVER_Client *client,
1268 const struct GNUNET_MessageHeader *message)
1270 const struct GetZeroAnonymityMessage * msg = (const struct GetZeroAnonymityMessage*) message;
1271 enum GNUNET_BLOCK_Type type;
1273 type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1275 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1276 "Processing `%s' request\n",
1277 "GET_ZERO_ANONYMITY");
1279 GNUNET_STATISTICS_update (stats,
1280 gettext_noop ("# GET ZERO ANONYMITY requests received"),
1283 GNUNET_SERVER_client_keep (client);
1284 plugin->api->iter_zero_anonymity (plugin->api->cls,
1292 * Context for the 'remove_callback'.
1294 struct RemoveContext
1297 * Client for whom we're doing the remvoing.
1299 struct GNUNET_SERVER_Client *client;
1302 * GNUNET_YES if we managed to remove something.
1309 * Callback function that will cause the item that is passed
1310 * in to be deleted (by returning GNUNET_NO).
1313 remove_callback (void *cls,
1315 const GNUNET_HashCode * key,
1318 enum GNUNET_BLOCK_Type type,
1321 struct GNUNET_TIME_Absolute
1322 expiration, uint64_t uid)
1324 struct RemoveContext *rc = cls;
1329 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1330 "No further matches for `%s' request.\n",
1333 if (GNUNET_YES == rc->found)
1334 transmit_status (rc->client, GNUNET_OK, NULL);
1336 transmit_status (rc->client, GNUNET_NO, _("Content not found"));
1337 GNUNET_SERVER_client_drop (rc->client);
1339 return GNUNET_OK; /* last item */
1341 rc->found = GNUNET_YES;
1343 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1344 "Item %llu matches `%s' request for key `%s' and type %u.\n",
1345 (unsigned long long) uid,
1350 GNUNET_STATISTICS_update (stats,
1351 gettext_noop ("# bytes removed (explicit request)"),
1354 GNUNET_CONTAINER_bloomfilter_remove (filter,
1356 plugin->api->next_request (next_cls, GNUNET_YES);
1362 * Handle REMOVE-message.
1364 * @param cls closure
1365 * @param client identification of the client
1366 * @param message the actual message
1369 handle_remove (void *cls,
1370 struct GNUNET_SERVER_Client *client,
1371 const struct GNUNET_MessageHeader *message)
1373 const struct DataMessage *dm = check_data (message);
1374 GNUNET_HashCode vhash;
1375 struct RemoveContext *rc;
1380 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1384 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1385 "Processing `%s' request for `%s' of type %u\n",
1387 GNUNET_h2s (&dm->key),
1390 GNUNET_STATISTICS_update (stats,
1391 gettext_noop ("# REMOVE requests received"),
1394 rc = GNUNET_malloc (sizeof(struct RemoveContext));
1395 GNUNET_SERVER_client_keep (client);
1396 rc->client = client;
1397 GNUNET_CRYPTO_hash (&dm[1],
1400 plugin->api->get (plugin->api->cls,
1403 (enum GNUNET_BLOCK_Type) ntohl(dm->type),
1410 * Handle DROP-message.
1412 * @param cls closure
1413 * @param client identification of the client
1414 * @param message the actual message
1417 handle_drop (void *cls,
1418 struct GNUNET_SERVER_Client *client,
1419 const struct GNUNET_MessageHeader *message)
1422 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1423 "Processing `%s' request\n",
1426 plugin->api->drop (plugin->api->cls);
1427 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1432 * Function called by plugins to notify us about a
1433 * change in their disk utilization.
1435 * @param cls closure (NULL)
1436 * @param delta change in disk utilization,
1437 * 0 for "reset to empty"
1440 disk_utilization_change_cb (void *cls,
1444 (payload < -delta) )
1446 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1447 _("Datastore payload inaccurate (%lld < %lld). Trying to fix.\n"),
1448 (long long) payload,
1449 (long long) -delta);
1450 payload = plugin->api->get_size (plugin->api->cls);
1456 if (lastSync >= MAX_STAT_SYNC_LAG)
1462 * Callback function to process statistic values.
1464 * @param cls closure (struct Plugin*)
1465 * @param subsystem name of subsystem that created the statistic
1466 * @param name the name of the datum
1467 * @param value the current value
1468 * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1469 * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1472 process_stat_in (void *cls,
1473 const char *subsystem,
1478 GNUNET_assert (stats_worked == GNUNET_NO);
1479 stats_worked = GNUNET_YES;
1482 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1483 "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1492 process_stat_done (void *cls,
1495 struct DatastorePlugin *plugin = cls;
1498 if (stats_worked == GNUNET_NO)
1499 payload = plugin->api->get_size (plugin->api->cls);
1504 * Load the datastore plugin.
1506 static struct DatastorePlugin *
1509 struct DatastorePlugin *ret;
1514 GNUNET_CONFIGURATION_get_value_string (cfg,
1515 "DATASTORE", "DATABASE", &name))
1517 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1518 _("No `%s' specified for `%s' in configuration!\n"),
1523 ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1525 ret->env.duc = &disk_utilization_change_cb;
1526 ret->env.cls = NULL;
1527 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1528 _("Loading `%s' datastore plugin\n"), name);
1529 GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1530 ret->short_name = name;
1531 ret->lib_name = libname;
1532 ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1533 if (ret->api == NULL)
1535 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1536 _("Failed to load datastore plugin for `%s'\n"), name);
1537 GNUNET_free (ret->short_name);
1538 GNUNET_free (libname);
1547 * Function called when the service shuts
1548 * down. Unloads our datastore plugin.
1550 * @param plug plugin to unload
1553 unload_plugin (struct DatastorePlugin *plug)
1556 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1557 "Datastore service is unloading plugin...\n");
1559 GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1560 GNUNET_free (plug->lib_name);
1561 GNUNET_free (plug->short_name);
1567 * Final task run after shutdown. Unloads plugins and disconnects us from
1571 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1573 unload_plugin (plugin);
1577 GNUNET_CONTAINER_bloomfilter_free (filter);
1582 if (stat_get != NULL)
1584 GNUNET_STATISTICS_get_cancel (stat_get);
1589 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1596 * Last task run during shutdown. Disconnects us from
1597 * the transport and core.
1600 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1602 struct TransmitCallbackContext *tcc;
1604 cleaning_done = GNUNET_YES;
1605 while (NULL != (tcc = tcc_head))
1607 GNUNET_CONTAINER_DLL_remove (tcc_head,
1610 if (tcc->th != NULL)
1612 GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1613 GNUNET_SERVER_client_drop (tcc->client);
1615 if (NULL != tcc->tc)
1616 tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1617 GNUNET_free (tcc->msg);
1620 if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1622 GNUNET_SCHEDULER_cancel (expired_kill_task);
1623 expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1625 GNUNET_SCHEDULER_add_continuation (&unload_task,
1627 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1632 * Function that removes all active reservations made
1633 * by the given client and releases the space for other
1636 * @param cls closure
1637 * @param client identification of the client
1640 cleanup_reservations (void *cls,
1641 struct GNUNET_SERVER_Client
1644 struct ReservationList *pos;
1645 struct ReservationList *prev;
1646 struct ReservationList *next;
1655 if (pos->client == client)
1658 reservations = next;
1661 reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1670 GNUNET_STATISTICS_set (stats,
1671 gettext_noop ("# reserved"),
1678 * Process datastore requests.
1680 * @param cls closure
1681 * @param server the initialized server
1682 * @param c configuration to use
1686 struct GNUNET_SERVER_Handle *server,
1687 const struct GNUNET_CONFIGURATION_Handle *c)
1689 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1690 {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
1691 sizeof(struct ReserveMessage) },
1692 {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
1693 sizeof(struct ReleaseReserveMessage) },
1694 {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 },
1695 {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1696 sizeof (struct UpdateMessage) },
1697 {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 },
1698 {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM,
1699 sizeof(struct GNUNET_MessageHeader) },
1700 {&handle_get_zero_anonymity, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
1701 sizeof(struct GetZeroAnonymityMessage) },
1702 {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 },
1703 {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
1704 sizeof(struct GNUNET_MessageHeader) },
1708 unsigned int bf_size;
1712 GNUNET_CONFIGURATION_get_value_number (cfg,
1713 "DATASTORE", "QUOTA", "a))
1715 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1716 _("No `%s' specified for `%s' in configuration!\n"),
1721 stats = GNUNET_STATISTICS_create ("datastore", cfg);
1722 GNUNET_STATISTICS_set (stats,
1723 gettext_noop ("# quota"),
1726 cache_size = quota / 8; /* Or should we make this an option? */
1727 GNUNET_STATISTICS_set (stats,
1728 gettext_noop ("# cache size"),
1731 bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1734 GNUNET_CONFIGURATION_get_value_filename (cfg,
1739 GNUNET_DISK_directory_create_for_file (fn)) )
1741 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1742 _("Could not use specified filename `%s' for bloomfilter.\n"),
1743 fn != NULL ? fn : "");
1744 GNUNET_free_non_null (fn);
1747 filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5); /* approx. 3% false positives at max use */
1748 GNUNET_free_non_null (fn);
1751 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1752 _("Failed to initialize bloomfilter.\n"));
1755 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1760 plugin = load_plugin ();
1763 GNUNET_CONTAINER_bloomfilter_free (filter);
1767 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1772 stat_get = GNUNET_STATISTICS_get (stats,
1775 GNUNET_TIME_UNIT_SECONDS,
1779 GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1780 GNUNET_SERVER_add_handlers (server, handlers);
1782 = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1783 &delete_expired, NULL);
1784 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1785 &cleaning_task, NULL);
1790 * The main function for the datastore service.
1792 * @param argc number of arguments from the command line
1793 * @param argv command line arguments
1794 * @return 0 ok, 1 on error
1797 main (int argc, char *const *argv)
1802 GNUNET_SERVICE_run (argc,
1805 GNUNET_SERVICE_OPTION_NONE,
1806 &run, NULL)) ? 0 : 1;
1811 /* end of gnunet-service-datastore.c */