2 This file is part of GNUnet
3 Copyright (C) 2004-2013, 2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file datastore/datastore_api.c
23 * @brief Management for the datastore for files stored on a GNUnet node. Implements
24 * a priority queue for requests
25 * @author Christian Grothoff
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
36 #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
39 * Collect an instane number of statistics? May cause excessive IPC.
41 #define INSANE_STATISTICS GNUNET_NO
44 * If a client stopped asking for more results, how many more do
45 * we receive from the DB before killing the connection? Trade-off
46 * between re-doing TCP handshakes and (needlessly) receiving
49 #define MAX_EXCESS_RESULTS 8
52 * Context for processing status messages.
57 * Continuation to call with the status.
59 GNUNET_DATASTORE_ContinuationWithStatus cont;
62 * Closure for @e cont.
70 * Context for processing result messages.
75 * Function to call with the result.
77 GNUNET_DATASTORE_DatumProcessor proc;
80 * Closure for @e proc.
88 * Context for a queue operation.
93 struct StatusContext sc;
95 struct ResultContext rc;
101 * Entry in our priority queue.
103 struct GNUNET_DATASTORE_QueueEntry
107 * This is a linked list.
109 struct GNUNET_DATASTORE_QueueEntry *next;
112 * This is a linked list.
114 struct GNUNET_DATASTORE_QueueEntry *prev;
117 * Handle to the master context.
119 struct GNUNET_DATASTORE_Handle *h;
122 * Function to call after transmission of the request.
124 GNUNET_DATASTORE_ContinuationWithStatus cont;
127 * Closure for @e cont.
132 * Context for the operation.
134 union QueueContext qc;
137 * Envelope of the request to transmit, NULL after
140 struct GNUNET_MQ_Envelope *env;
143 * Task we run if this entry stalls the queue and we
144 * need to warn the user.
146 struct GNUNET_SCHEDULER_Task *delay_warn_task;
149 * Priority in the queue.
151 unsigned int priority;
154 * Maximum allowed length of queue (otherwise
155 * this request should be discarded).
157 unsigned int max_queue;
160 * Expected response type.
162 uint16_t response_type;
168 * Handle to the datastore service.
170 struct GNUNET_DATASTORE_Handle
176 const struct GNUNET_CONFIGURATION_Handle *cfg;
179 * Current connection to the datastore service.
181 struct GNUNET_MQ_Handle *mq;
184 * Handle for statistics.
186 struct GNUNET_STATISTICS_Handle *stats;
189 * Current head of priority queue.
191 struct GNUNET_DATASTORE_QueueEntry *queue_head;
194 * Current tail of priority queue.
196 struct GNUNET_DATASTORE_QueueEntry *queue_tail;
199 * Task for trying to reconnect.
201 struct GNUNET_SCHEDULER_Task *reconnect_task;
204 * How quickly should we retry? Used for exponential back-off on
207 struct GNUNET_TIME_Relative retry_time;
210 * Number of entries in the queue.
212 unsigned int queue_size;
215 * Number of results we're receiving for the current query
216 * after application stopped to care. Used to determine when
217 * to reset the connection.
219 unsigned int result_count;
222 * We should ignore the next message(s) from the service.
224 unsigned int skip_next_messages;
230 * Try reconnecting to the datastore service.
232 * @param cls the `struct GNUNET_DATASTORE_Handle`
235 try_reconnect (void *cls);
239 * Disconnect from the service and then try reconnecting to the datastore service
242 * @param h handle to datastore to disconnect and reconnect
245 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
252 GNUNET_MQ_destroy (h->mq);
254 h->skip_next_messages = 0;
256 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
263 * Free a queue entry. Removes the given entry from the
264 * queue and releases associated resources. Does NOT
267 * @param qe entry to free.
270 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
272 struct GNUNET_DATASTORE_Handle *h = qe->h;
274 GNUNET_CONTAINER_DLL_remove (h->queue_head,
279 GNUNET_MQ_discard (qe->env);
280 if (NULL != qe->delay_warn_task)
281 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
287 * Task that logs an error after some time.
289 * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is
292 delay_warning (void *cls)
294 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
296 qe->delay_warn_task = NULL;
297 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
298 "Request %p of type %u at head of datastore queue for more than %s\n",
300 (unsigned int) qe->response_type,
301 GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT,
303 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
310 * Handle error in sending drop request to datastore.
312 * @param cls closure with the datastore handle
313 * @param error error code
316 mq_error_handler (void *cls,
317 enum GNUNET_MQ_Error error)
319 struct GNUNET_DATASTORE_Handle *h = cls;
320 struct GNUNET_DATASTORE_QueueEntry *qe;
322 LOG (GNUNET_ERROR_TYPE_DEBUG,
323 "MQ error, reconnecting to DATASTORE\n");
328 if (NULL != qe->delay_warn_task)
330 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
331 qe->delay_warn_task = NULL;
335 union QueueContext qc = qe->qc;
336 uint16_t rt = qe->response_type;
338 LOG (GNUNET_ERROR_TYPE_DEBUG,
339 "Failed to receive response from database.\n");
340 free_queue_entry (qe);
343 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
344 if (NULL != qc.sc.cont)
345 qc.sc.cont (qc.sc.cont_cls,
347 GNUNET_TIME_UNIT_ZERO_ABS,
348 _("DATASTORE disconnected"));
350 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
351 if (NULL != qc.rc.proc)
352 qc.rc.proc (qc.rc.proc_cls,
360 GNUNET_TIME_UNIT_ZERO_ABS,
371 * Connect to the datastore service.
373 * @param cfg configuration to use
374 * @return handle to use to access the service
376 struct GNUNET_DATASTORE_Handle *
377 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
379 struct GNUNET_DATASTORE_Handle *h;
381 LOG (GNUNET_ERROR_TYPE_DEBUG,
382 "Establishing DATASTORE connection!\n");
383 h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
391 h->stats = GNUNET_STATISTICS_create ("datastore-api",
398 * Task used by to disconnect from the datastore after
399 * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
401 * @param cls the datastore handle
404 disconnect_after_drop (void *cls)
406 struct GNUNET_DATASTORE_Handle *h = cls;
408 LOG (GNUNET_ERROR_TYPE_DEBUG,
409 "Drop sent, disconnecting\n");
410 GNUNET_DATASTORE_disconnect (h,
416 * Handle error in sending drop request to datastore.
418 * @param cls closure with the datastore handle
419 * @param error error code
422 disconnect_on_mq_error (void *cls,
423 enum GNUNET_MQ_Error error)
425 struct GNUNET_DATASTORE_Handle *h = cls;
427 LOG (GNUNET_ERROR_TYPE_ERROR,
428 "Failed to ask datastore to drop tables\n");
429 GNUNET_DATASTORE_disconnect (h,
435 * Disconnect from the datastore service (and free
436 * associated resources).
438 * @param h handle to the datastore
439 * @param drop set to #GNUNET_YES to delete all data in datastore (!)
442 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
445 struct GNUNET_DATASTORE_QueueEntry *qe;
447 LOG (GNUNET_ERROR_TYPE_DEBUG,
448 "Datastore disconnect\n");
451 GNUNET_MQ_destroy (h->mq);
454 if (NULL != h->reconnect_task)
456 GNUNET_SCHEDULER_cancel (h->reconnect_task);
457 h->reconnect_task = NULL;
459 while (NULL != (qe = h->queue_head))
461 switch (qe->response_type)
463 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
464 if (NULL != qe->qc.sc.cont)
465 qe->qc.sc.cont (qe->qc.sc.cont_cls,
467 GNUNET_TIME_UNIT_ZERO_ABS,
468 _("Disconnected from DATASTORE"));
470 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
471 if (NULL != qe->qc.rc.proc)
472 qe->qc.rc.proc (qe->qc.rc.proc_cls,
480 GNUNET_TIME_UNIT_ZERO_ABS,
486 free_queue_entry (qe);
488 if (GNUNET_YES == drop)
490 LOG (GNUNET_ERROR_TYPE_DEBUG,
491 "Re-connecting to issue DROP!\n");
492 GNUNET_assert (NULL == h->mq);
493 h->mq = GNUNET_CLIENT_connect (h->cfg,
496 &disconnect_on_mq_error,
500 struct GNUNET_MessageHeader *hdr;
501 struct GNUNET_MQ_Envelope *env;
503 env = GNUNET_MQ_msg (hdr,
504 GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
505 GNUNET_MQ_notify_sent (env,
506 &disconnect_after_drop,
508 GNUNET_MQ_send (h->mq,
514 GNUNET_STATISTICS_destroy (h->stats,
522 * Create a new entry for our priority queue (and possibly discard other entires if
523 * the queue is getting too long).
525 * @param h handle to the datastore
526 * @param env envelope with the message to queue
527 * @param queue_priority priority of the entry
528 * @param max_queue_size at what queue size should this request be dropped
529 * (if other requests of higher priority are in the queue)
530 * @param expected_type which type of response do we expect,
531 * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
532 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
533 * @param qc client context (NOT a closure for @a response_proc)
534 * @return NULL if the queue is full
536 static struct GNUNET_DATASTORE_QueueEntry *
537 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
538 struct GNUNET_MQ_Envelope *env,
539 unsigned int queue_priority,
540 unsigned int max_queue_size,
541 uint16_t expected_type,
542 const union QueueContext *qc)
544 struct GNUNET_DATASTORE_QueueEntry *qe;
545 struct GNUNET_DATASTORE_QueueEntry *pos;
548 if ( (NULL != h->queue_tail) &&
549 (h->queue_tail->priority >= queue_priority) )
559 while ( (NULL != pos) &&
560 (c < max_queue_size) &&
561 (pos->priority >= queue_priority) )
566 if (c >= max_queue_size)
568 GNUNET_STATISTICS_update (h->stats,
569 gettext_noop ("# queue overflows"),
572 GNUNET_MQ_discard (env);
575 qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
578 qe->response_type = expected_type;
580 qe->priority = queue_priority;
581 qe->max_queue = max_queue_size;
584 /* append at the tail */
590 /* do not insert at HEAD if HEAD query was already
591 * transmitted and we are still receiving replies! */
592 if ( (NULL == pos) &&
593 (NULL == h->queue_head->env) )
597 #if INSANE_STATISTICS
598 GNUNET_STATISTICS_update (h->stats,
599 gettext_noop ("# queue entries created"),
603 GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
613 * Process entries in the queue (or do nothing if we are already
616 * @param h handle to the datastore
619 process_queue (struct GNUNET_DATASTORE_Handle *h)
621 struct GNUNET_DATASTORE_QueueEntry *qe;
623 if (NULL == (qe = h->queue_head))
625 /* no entry in queue */
626 LOG (GNUNET_ERROR_TYPE_DEBUG,
632 /* waiting for replies */
633 LOG (GNUNET_ERROR_TYPE_DEBUG,
634 "Head request already transmitted\n");
639 /* waiting for reconnect */
640 LOG (GNUNET_ERROR_TYPE_DEBUG,
644 GNUNET_assert (NULL == qe->delay_warn_task);
645 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
648 GNUNET_MQ_send (h->mq,
657 * Function called to check status message from the service.
660 * @param sm status message received
661 * @return #GNUNET_OK if the message is well-formed
664 check_status (void *cls,
665 const struct StatusMessage *sm)
667 uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
668 int32_t status = ntohl (sm->status);
672 const char *emsg = (const char *) &sm[1];
674 if ('\0' != emsg[msize - 1])
677 return GNUNET_SYSERR;
680 else if (GNUNET_SYSERR == status)
683 return GNUNET_SYSERR;
690 * Function called to handle status message from the service.
693 * @param sm status message received
696 handle_status (void *cls,
697 const struct StatusMessage *sm)
699 struct GNUNET_DATASTORE_Handle *h = cls;
700 struct GNUNET_DATASTORE_QueueEntry *qe;
701 struct StatusContext rc;
703 int32_t status = ntohl (sm->status);
705 if (h->skip_next_messages > 0)
707 h->skip_next_messages--;
711 if (NULL == (qe = h->queue_head))
723 if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
730 free_queue_entry (qe);
731 if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
732 emsg = (const char *) &sm[1];
735 LOG (GNUNET_ERROR_TYPE_DEBUG,
736 "Received status %d/%s\n",
739 GNUNET_STATISTICS_update (h->stats,
740 gettext_noop ("# status messages received"),
743 h->retry_time = GNUNET_TIME_UNIT_ZERO;
746 rc.cont (rc.cont_cls,
748 GNUNET_TIME_absolute_ntoh (sm->min_expiration),
754 * Check data message we received from the service.
756 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
757 * @param dm message received
760 check_data (void *cls,
761 const struct DataMessage *dm)
763 uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
765 if (msize != ntohl (dm->size))
768 return GNUNET_SYSERR;
775 * Handle data message we got from the service.
777 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
778 * @param dm message received
781 handle_data (void *cls,
782 const struct DataMessage *dm)
784 struct GNUNET_DATASTORE_Handle *h = cls;
785 struct GNUNET_DATASTORE_QueueEntry *qe;
786 struct ResultContext rc;
788 if (h->skip_next_messages > 0)
806 if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
812 #if INSANE_STATISTICS
813 GNUNET_STATISTICS_update (h->stats,
814 gettext_noop ("# Results received"),
818 LOG (GNUNET_ERROR_TYPE_DEBUG,
819 "Received result %llu with type %u and size %u with key %s\n",
820 (unsigned long long) GNUNET_ntohll (dm->uid),
823 GNUNET_h2s (&dm->key));
825 free_queue_entry (qe);
826 h->retry_time = GNUNET_TIME_UNIT_ZERO;
829 rc.proc (rc.proc_cls,
834 ntohl (dm->priority),
835 ntohl (dm->anonymity),
836 ntohl (dm->replication),
837 GNUNET_TIME_absolute_ntoh (dm->expiration),
838 GNUNET_ntohll (dm->uid));
843 * Type of a function to call when we receive a
844 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
846 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
847 * @param msg message received
850 handle_data_end (void *cls,
851 const struct GNUNET_MessageHeader *msg)
853 struct GNUNET_DATASTORE_Handle *h = cls;
854 struct GNUNET_DATASTORE_QueueEntry *qe;
855 struct ResultContext rc;
857 if (h->skip_next_messages > 0)
859 h->skip_next_messages--;
876 if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
883 free_queue_entry (qe);
884 LOG (GNUNET_ERROR_TYPE_DEBUG,
885 "Received end of result set, new queue size is %u\n",
887 h->retry_time = GNUNET_TIME_UNIT_ZERO;
890 /* signal end of iteration */
892 rc.proc (rc.proc_cls,
900 GNUNET_TIME_UNIT_ZERO_ABS,
906 * Try reconnecting to the datastore service.
908 * @param cls the `struct GNUNET_DATASTORE_Handle`
911 try_reconnect (void *cls)
913 struct GNUNET_DATASTORE_Handle *h = cls;
914 struct GNUNET_MQ_MessageHandler handlers[] = {
915 GNUNET_MQ_hd_var_size (status,
916 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
917 struct StatusMessage,
919 GNUNET_MQ_hd_var_size (data,
920 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
923 GNUNET_MQ_hd_fixed_size (data_end,
924 GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
925 struct GNUNET_MessageHeader,
927 GNUNET_MQ_handler_end ()
930 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
931 h->reconnect_task = NULL;
932 GNUNET_assert (NULL == h->mq);
933 h->mq = GNUNET_CLIENT_connect (h->cfg,
940 GNUNET_STATISTICS_update (h->stats,
941 gettext_noop ("# datastore connections (re)created"),
944 LOG (GNUNET_ERROR_TYPE_DEBUG,
945 "Reconnected to DATASTORE\n");
951 * Dummy continuation used to do nothing (but be non-zero).
954 * @param result result
955 * @param min_expiration expiration time
956 * @param emsg error message
959 drop_status_cont (void *cls,
961 struct GNUNET_TIME_Absolute min_expiration,
969 * Store an item in the datastore. If the item is already present,
970 * the priorities are summed up and the higher expiration time and
971 * lower anonymity level is used.
973 * @param h handle to the datastore
974 * @param rid reservation ID to use (from "reserve"); use 0 if no
975 * prior reservation was made
976 * @param key key for the value
977 * @param size number of bytes in data
978 * @param data content stored
979 * @param type type of the content
980 * @param priority priority of the content
981 * @param anonymity anonymity-level for the content
982 * @param replication how often should the content be replicated to other peers?
983 * @param expiration expiration time for the content
984 * @param queue_priority ranking of this request in the priority queue
985 * @param max_queue_size at what queue size should this request be dropped
986 * (if other requests of higher priority are in the queue)
987 * @param cont continuation to call when done
988 * @param cont_cls closure for @a cont
989 * @return NULL if the entry was not queued, otherwise a handle that can be used to
990 * cancel; note that even if NULL is returned, the callback will be invoked
991 * (or rather, will already have been invoked)
993 struct GNUNET_DATASTORE_QueueEntry *
994 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
996 const struct GNUNET_HashCode *key,
999 enum GNUNET_BLOCK_Type type,
1002 uint32_t replication,
1003 struct GNUNET_TIME_Absolute expiration,
1004 unsigned int queue_priority,
1005 unsigned int max_queue_size,
1006 GNUNET_DATASTORE_ContinuationWithStatus cont,
1009 struct GNUNET_DATASTORE_QueueEntry *qe;
1010 struct GNUNET_MQ_Envelope *env;
1011 struct DataMessage *dm;
1012 union QueueContext qc;
1014 if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE)
1020 LOG (GNUNET_ERROR_TYPE_DEBUG,
1021 "Asked to put %u bytes of data under key `%s' for %s\n",
1024 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
1026 env = GNUNET_MQ_msg_extra (dm,
1028 GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
1029 dm->rid = htonl (rid);
1030 dm->size = htonl ((uint32_t) size);
1031 dm->type = htonl (type);
1032 dm->priority = htonl (priority);
1033 dm->anonymity = htonl (anonymity);
1034 dm->replication = htonl (replication);
1035 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
1037 GNUNET_memcpy (&dm[1],
1041 qc.sc.cont_cls = cont_cls;
1042 qe = make_queue_entry (h,
1046 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1050 LOG (GNUNET_ERROR_TYPE_DEBUG,
1051 "Could not create queue entry for PUT\n");
1054 GNUNET_STATISTICS_update (h->stats,
1055 gettext_noop ("# PUT requests executed"),
1064 * Reserve space in the datastore. This function should be used
1065 * to avoid "out of space" failures during a longer sequence of "put"
1066 * operations (for example, when a file is being inserted).
1068 * @param h handle to the datastore
1069 * @param amount how much space (in bytes) should be reserved (for content only)
1070 * @param entries how many entries will be created (to calculate per-entry overhead)
1071 * @param cont continuation to call when done; "success" will be set to
1072 * a positive reservation value if space could be reserved.
1073 * @param cont_cls closure for @a cont
1074 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1075 * cancel; note that even if NULL is returned, the callback will be invoked
1076 * (or rather, will already have been invoked)
1078 struct GNUNET_DATASTORE_QueueEntry *
1079 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1082 GNUNET_DATASTORE_ContinuationWithStatus cont,
1085 struct GNUNET_DATASTORE_QueueEntry *qe;
1086 struct GNUNET_MQ_Envelope *env;
1087 struct ReserveMessage *rm;
1088 union QueueContext qc;
1091 cont = &drop_status_cont;
1092 LOG (GNUNET_ERROR_TYPE_DEBUG,
1093 "Asked to reserve %llu bytes of data and %u entries\n",
1094 (unsigned long long) amount,
1095 (unsigned int) entries);
1096 env = GNUNET_MQ_msg (rm,
1097 GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1098 rm->entries = htonl (entries);
1099 rm->amount = GNUNET_htonll (amount);
1102 qc.sc.cont_cls = cont_cls;
1103 qe = make_queue_entry (h,
1107 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1111 LOG (GNUNET_ERROR_TYPE_DEBUG,
1112 "Could not create queue entry to reserve\n");
1115 GNUNET_STATISTICS_update (h->stats,
1116 gettext_noop ("# RESERVE requests executed"),
1125 * Signal that all of the data for which a reservation was made has
1126 * been stored and that whatever excess space might have been reserved
1127 * can now be released.
1129 * @param h handle to the datastore
1130 * @param rid reservation ID (value of "success" in original continuation
1131 * from the "reserve" function).
1132 * @param queue_priority ranking of this request in the priority queue
1133 * @param max_queue_size at what queue size should this request be dropped
1134 * (if other requests of higher priority are in the queue)
1135 * @param queue_priority ranking of this request in the priority queue
1136 * @param max_queue_size at what queue size should this request be dropped
1137 * (if other requests of higher priority are in the queue)
1138 * @param cont continuation to call when done
1139 * @param cont_cls closure for @a cont
1140 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1141 * cancel; note that even if NULL is returned, the callback will be invoked
1142 * (or rather, will already have been invoked)
1144 struct GNUNET_DATASTORE_QueueEntry *
1145 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1147 unsigned int queue_priority,
1148 unsigned int max_queue_size,
1149 GNUNET_DATASTORE_ContinuationWithStatus cont,
1152 struct GNUNET_DATASTORE_QueueEntry *qe;
1153 struct GNUNET_MQ_Envelope *env;
1154 struct ReleaseReserveMessage *rrm;
1155 union QueueContext qc;
1158 cont = &drop_status_cont;
1159 LOG (GNUNET_ERROR_TYPE_DEBUG,
1160 "Asked to release reserve %d\n",
1162 env = GNUNET_MQ_msg (rrm,
1163 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1164 rrm->rid = htonl (rid);
1166 qc.sc.cont_cls = cont_cls;
1167 qe = make_queue_entry (h,
1171 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1175 LOG (GNUNET_ERROR_TYPE_DEBUG,
1176 "Could not create queue entry to release reserve\n");
1179 GNUNET_STATISTICS_update (h->stats,
1181 ("# RELEASE RESERVE requests executed"), 1,
1189 * Explicitly remove some content from the database.
1190 * The @a cont continuation will be called with `status`
1191 * #GNUNET_OK" if content was removed, #GNUNET_NO
1192 * if no matching entry was found and #GNUNET_SYSERR
1193 * on all other types of errors.
1195 * @param h handle to the datastore
1196 * @param key key for the value
1197 * @param size number of bytes in data
1198 * @param data content stored
1199 * @param queue_priority ranking of this request in the priority queue
1200 * @param max_queue_size at what queue size should this request be dropped
1201 * (if other requests of higher priority are in the queue)
1202 * @param cont continuation to call when done
1203 * @param cont_cls closure for @a cont
1204 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1205 * cancel; note that even if NULL is returned, the callback will be invoked
1206 * (or rather, will already have been invoked)
1208 struct GNUNET_DATASTORE_QueueEntry *
1209 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1210 const struct GNUNET_HashCode *key,
1213 unsigned int queue_priority,
1214 unsigned int max_queue_size,
1215 GNUNET_DATASTORE_ContinuationWithStatus cont,
1218 struct GNUNET_DATASTORE_QueueEntry *qe;
1219 struct DataMessage *dm;
1220 struct GNUNET_MQ_Envelope *env;
1221 union QueueContext qc;
1223 if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1229 cont = &drop_status_cont;
1230 LOG (GNUNET_ERROR_TYPE_DEBUG,
1231 "Asked to remove %u bytes under key `%s'\n",
1234 env = GNUNET_MQ_msg_extra (dm,
1236 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1237 dm->size = htonl (size);
1239 GNUNET_memcpy (&dm[1],
1244 qc.sc.cont_cls = cont_cls;
1246 qe = make_queue_entry (h,
1250 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1254 LOG (GNUNET_ERROR_TYPE_DEBUG,
1255 "Could not create queue entry for REMOVE\n");
1258 GNUNET_STATISTICS_update (h->stats,
1259 gettext_noop ("# REMOVE requests executed"),
1269 * Get a random value from the datastore for content replication.
1270 * Returns a single, random value among those with the highest
1271 * replication score, lowering positive replication scores by one for
1272 * the chosen value (if only content with a replication score exists,
1273 * a random value is returned and replication scores are not changed).
1275 * @param h handle to the datastore
1276 * @param queue_priority ranking of this request in the priority queue
1277 * @param max_queue_size at what queue size should this request be dropped
1278 * (if other requests of higher priority are in the queue)
1279 * @param proc function to call on a random value; it
1280 * will be called once with a value (if available)
1281 * and always once with a value of NULL.
1282 * @param proc_cls closure for @a proc
1283 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1286 struct GNUNET_DATASTORE_QueueEntry *
1287 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1288 unsigned int queue_priority,
1289 unsigned int max_queue_size,
1290 GNUNET_DATASTORE_DatumProcessor proc,
1293 struct GNUNET_DATASTORE_QueueEntry *qe;
1294 struct GNUNET_MQ_Envelope *env;
1295 struct GNUNET_MessageHeader *m;
1296 union QueueContext qc;
1298 GNUNET_assert (NULL != proc);
1299 LOG (GNUNET_ERROR_TYPE_DEBUG,
1300 "Asked to get replication entry\n");
1301 env = GNUNET_MQ_msg (m,
1302 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1304 qc.rc.proc_cls = proc_cls;
1305 qe = make_queue_entry (h,
1309 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1313 LOG (GNUNET_ERROR_TYPE_DEBUG,
1314 "Could not create queue entry for GET REPLICATION\n");
1317 GNUNET_STATISTICS_update (h->stats,
1319 ("# GET REPLICATION requests executed"), 1,
1327 * Get a single zero-anonymity value from the datastore.
1329 * @param h handle to the datastore
1330 * @param next_uid return the result with lowest uid >= next_uid
1331 * @param queue_priority ranking of this request in the priority queue
1332 * @param max_queue_size at what queue size should this request be dropped
1333 * (if other requests of higher priority are in the queue)
1334 * @param type allowed type for the operation (never zero)
1335 * @param proc function to call on a random value; it
1336 * will be called once with a value (if available)
1337 * or with NULL if none value exists.
1338 * @param proc_cls closure for @a proc
1339 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1342 struct GNUNET_DATASTORE_QueueEntry *
1343 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1345 unsigned int queue_priority,
1346 unsigned int max_queue_size,
1347 enum GNUNET_BLOCK_Type type,
1348 GNUNET_DATASTORE_DatumProcessor proc,
1351 struct GNUNET_DATASTORE_QueueEntry *qe;
1352 struct GNUNET_MQ_Envelope *env;
1353 struct GetZeroAnonymityMessage *m;
1354 union QueueContext qc;
1356 GNUNET_assert (NULL != proc);
1357 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1358 LOG (GNUNET_ERROR_TYPE_DEBUG,
1359 "Asked to get a zero-anonymity entry of type %d\n",
1361 env = GNUNET_MQ_msg (m,
1362 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1363 m->type = htonl ((uint32_t) type);
1364 m->next_uid = GNUNET_htonll (next_uid);
1366 qc.rc.proc_cls = proc_cls;
1367 qe = make_queue_entry (h,
1371 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1375 LOG (GNUNET_ERROR_TYPE_DEBUG,
1376 "Could not create queue entry for zero-anonymity procation\n");
1379 GNUNET_STATISTICS_update (h->stats,
1381 ("# GET ZERO ANONYMITY requests executed"), 1,
1389 * Get a result for a particular key from the datastore. The processor
1390 * will only be called once.
1392 * @param h handle to the datastore
1393 * @param next_uid return the result with lowest uid >= next_uid
1394 * @param random if true, return a random result instead of using next_uid
1395 * @param key maybe NULL (to match all entries)
1396 * @param type desired type, 0 for any
1397 * @param queue_priority ranking of this request in the priority queue
1398 * @param max_queue_size at what queue size should this request be dropped
1399 * (if other requests of higher priority are in the queue)
1400 * @param proc function to call on each matching value;
1401 * will be called once with a NULL value at the end
1402 * @param proc_cls closure for @a proc
1403 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1406 struct GNUNET_DATASTORE_QueueEntry *
1407 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1410 const struct GNUNET_HashCode *key,
1411 enum GNUNET_BLOCK_Type type,
1412 unsigned int queue_priority,
1413 unsigned int max_queue_size,
1414 GNUNET_DATASTORE_DatumProcessor proc,
1417 struct GNUNET_DATASTORE_QueueEntry *qe;
1418 struct GNUNET_MQ_Envelope *env;
1419 struct GetKeyMessage *gkm;
1420 struct GetMessage *gm;
1421 union QueueContext qc;
1423 GNUNET_assert (NULL != proc);
1424 LOG (GNUNET_ERROR_TYPE_DEBUG,
1425 "Asked to look for data of type %u under key `%s'\n",
1426 (unsigned int) type,
1430 env = GNUNET_MQ_msg (gm,
1431 GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1432 gm->type = htonl (type);
1433 gm->next_uid = GNUNET_htonll (next_uid);
1434 gm->random = random;
1438 env = GNUNET_MQ_msg (gkm,
1439 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1440 gkm->type = htonl (type);
1441 gkm->next_uid = GNUNET_htonll (next_uid);
1442 gkm->random = random;
1446 qc.rc.proc_cls = proc_cls;
1447 qe = make_queue_entry (h,
1451 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1455 LOG (GNUNET_ERROR_TYPE_DEBUG,
1456 "Could not queue request for `%s'\n",
1460 #if INSANE_STATISTICS
1461 GNUNET_STATISTICS_update (h->stats,
1462 gettext_noop ("# GET requests executed"),
1472 * Cancel a datastore operation. The final callback from the
1473 * operation must not have been done yet.
1475 * @param qe operation to cancel
1478 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1480 struct GNUNET_DATASTORE_Handle *h = qe->h;
1482 LOG (GNUNET_ERROR_TYPE_DEBUG,
1483 "Pending DATASTORE request %p cancelled (%d, %d)\n",
1486 h->queue_head == qe);
1487 if (NULL == qe->env)
1489 free_queue_entry (qe);
1490 h->skip_next_messages++;
1493 free_queue_entry (qe);
1498 /* end of datastore_api.c */