2 This file is part of GNUnet
3 Copyright (C) 2004-2013, 2016 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * @file datastore/datastore_api.c
21 * @brief Management for the datastore for files stored on a GNUnet node. Implements
22 * a priority queue for requests
23 * @author Christian Grothoff
26 #include "gnunet_arm_service.h"
27 #include "gnunet_constants.h"
28 #include "gnunet_datastore_service.h"
29 #include "gnunet_statistics_service.h"
30 #include "datastore.h"
32 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
34 #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37 * Collect an instane number of statistics? May cause excessive IPC.
39 #define INSANE_STATISTICS GNUNET_NO
42 * If a client stopped asking for more results, how many more do
43 * we receive from the DB before killing the connection? Trade-off
44 * between re-doing TCP handshakes and (needlessly) receiving
47 #define MAX_EXCESS_RESULTS 8
50 * Context for processing status messages.
55 * Continuation to call with the status.
57 GNUNET_DATASTORE_ContinuationWithStatus cont;
60 * Closure for @e cont.
68 * Context for processing result messages.
73 * Function to call with the result.
75 GNUNET_DATASTORE_DatumProcessor proc;
78 * Closure for @e proc.
86 * Context for a queue operation.
91 struct StatusContext sc;
93 struct ResultContext rc;
99 * Entry in our priority queue.
101 struct GNUNET_DATASTORE_QueueEntry
105 * This is a linked list.
107 struct GNUNET_DATASTORE_QueueEntry *next;
110 * This is a linked list.
112 struct GNUNET_DATASTORE_QueueEntry *prev;
115 * Handle to the master context.
117 struct GNUNET_DATASTORE_Handle *h;
120 * Function to call after transmission of the request.
122 GNUNET_DATASTORE_ContinuationWithStatus cont;
125 * Closure for @e cont.
130 * Context for the operation.
132 union QueueContext qc;
135 * Envelope of the request to transmit, NULL after
138 struct GNUNET_MQ_Envelope *env;
141 * Task we run if this entry stalls the queue and we
142 * need to warn the user.
144 struct GNUNET_SCHEDULER_Task *delay_warn_task;
147 * Priority in the queue.
149 unsigned int priority;
152 * Maximum allowed length of queue (otherwise
153 * this request should be discarded).
155 unsigned int max_queue;
158 * Expected response type.
160 uint16_t response_type;
166 * Handle to the datastore service.
168 struct GNUNET_DATASTORE_Handle
174 const struct GNUNET_CONFIGURATION_Handle *cfg;
177 * Current connection to the datastore service.
179 struct GNUNET_MQ_Handle *mq;
182 * Handle for statistics.
184 struct GNUNET_STATISTICS_Handle *stats;
187 * Current head of priority queue.
189 struct GNUNET_DATASTORE_QueueEntry *queue_head;
192 * Current tail of priority queue.
194 struct GNUNET_DATASTORE_QueueEntry *queue_tail;
197 * Task for trying to reconnect.
199 struct GNUNET_SCHEDULER_Task *reconnect_task;
202 * How quickly should we retry? Used for exponential back-off on
205 struct GNUNET_TIME_Relative retry_time;
208 * Number of entries in the queue.
210 unsigned int queue_size;
213 * Number of results we're receiving for the current query
214 * after application stopped to care. Used to determine when
215 * to reset the connection.
217 unsigned int result_count;
220 * We should ignore the next message(s) from the service.
222 unsigned int skip_next_messages;
228 * Try reconnecting to the datastore service.
230 * @param cls the `struct GNUNET_DATASTORE_Handle`
233 try_reconnect (void *cls);
237 * Disconnect from the service and then try reconnecting to the datastore service
240 * @param h handle to datastore to disconnect and reconnect
243 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
250 GNUNET_MQ_destroy (h->mq);
252 h->skip_next_messages = 0;
254 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
261 * Free a queue entry. Removes the given entry from the
262 * queue and releases associated resources. Does NOT
265 * @param qe entry to free.
268 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
270 struct GNUNET_DATASTORE_Handle *h = qe->h;
272 GNUNET_CONTAINER_DLL_remove (h->queue_head,
277 GNUNET_MQ_discard (qe->env);
278 if (NULL != qe->delay_warn_task)
279 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
285 * Task that logs an error after some time.
287 * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is
290 delay_warning (void *cls)
292 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
294 qe->delay_warn_task = NULL;
295 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
296 "Request %p of type %u at head of datastore queue for more than %s\n",
298 (unsigned int) qe->response_type,
299 GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT,
301 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
308 * Handle error in sending drop request to datastore.
310 * @param cls closure with the datastore handle
311 * @param error error code
314 mq_error_handler (void *cls,
315 enum GNUNET_MQ_Error error)
317 struct GNUNET_DATASTORE_Handle *h = cls;
318 struct GNUNET_DATASTORE_QueueEntry *qe;
320 LOG (GNUNET_ERROR_TYPE_DEBUG,
321 "MQ error, reconnecting to DATASTORE\n");
326 if (NULL != qe->delay_warn_task)
328 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
329 qe->delay_warn_task = NULL;
333 union QueueContext qc = qe->qc;
334 uint16_t rt = qe->response_type;
336 LOG (GNUNET_ERROR_TYPE_DEBUG,
337 "Failed to receive response from database.\n");
338 free_queue_entry (qe);
341 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
342 if (NULL != qc.sc.cont)
343 qc.sc.cont (qc.sc.cont_cls,
345 GNUNET_TIME_UNIT_ZERO_ABS,
346 _("DATASTORE disconnected"));
348 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
349 if (NULL != qc.rc.proc)
350 qc.rc.proc (qc.rc.proc_cls,
358 GNUNET_TIME_UNIT_ZERO_ABS,
369 * Connect to the datastore service.
371 * @param cfg configuration to use
372 * @return handle to use to access the service
374 struct GNUNET_DATASTORE_Handle *
375 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
377 struct GNUNET_DATASTORE_Handle *h;
379 LOG (GNUNET_ERROR_TYPE_DEBUG,
380 "Establishing DATASTORE connection!\n");
381 h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
389 h->stats = GNUNET_STATISTICS_create ("datastore-api",
396 * Task used by to disconnect from the datastore after
397 * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
399 * @param cls the datastore handle
402 disconnect_after_drop (void *cls)
404 struct GNUNET_DATASTORE_Handle *h = cls;
406 LOG (GNUNET_ERROR_TYPE_DEBUG,
407 "Drop sent, disconnecting\n");
408 GNUNET_DATASTORE_disconnect (h,
414 * Handle error in sending drop request to datastore.
416 * @param cls closure with the datastore handle
417 * @param error error code
420 disconnect_on_mq_error (void *cls,
421 enum GNUNET_MQ_Error error)
423 struct GNUNET_DATASTORE_Handle *h = cls;
425 LOG (GNUNET_ERROR_TYPE_ERROR,
426 "Failed to ask datastore to drop tables\n");
427 GNUNET_DATASTORE_disconnect (h,
433 * Disconnect from the datastore service (and free
434 * associated resources).
436 * @param h handle to the datastore
437 * @param drop set to #GNUNET_YES to delete all data in datastore (!)
440 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
443 struct GNUNET_DATASTORE_QueueEntry *qe;
445 LOG (GNUNET_ERROR_TYPE_DEBUG,
446 "Datastore disconnect\n");
449 GNUNET_MQ_destroy (h->mq);
452 if (NULL != h->reconnect_task)
454 GNUNET_SCHEDULER_cancel (h->reconnect_task);
455 h->reconnect_task = NULL;
457 while (NULL != (qe = h->queue_head))
459 switch (qe->response_type)
461 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
462 if (NULL != qe->qc.sc.cont)
463 qe->qc.sc.cont (qe->qc.sc.cont_cls,
465 GNUNET_TIME_UNIT_ZERO_ABS,
466 _("Disconnected from DATASTORE"));
468 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
469 if (NULL != qe->qc.rc.proc)
470 qe->qc.rc.proc (qe->qc.rc.proc_cls,
478 GNUNET_TIME_UNIT_ZERO_ABS,
484 free_queue_entry (qe);
486 if (GNUNET_YES == drop)
488 LOG (GNUNET_ERROR_TYPE_DEBUG,
489 "Re-connecting to issue DROP!\n");
490 GNUNET_assert (NULL == h->mq);
491 h->mq = GNUNET_CLIENT_connect (h->cfg,
494 &disconnect_on_mq_error,
498 struct GNUNET_MessageHeader *hdr;
499 struct GNUNET_MQ_Envelope *env;
501 env = GNUNET_MQ_msg (hdr,
502 GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
503 GNUNET_MQ_notify_sent (env,
504 &disconnect_after_drop,
506 GNUNET_MQ_send (h->mq,
512 GNUNET_STATISTICS_destroy (h->stats,
520 * Create a new entry for our priority queue (and possibly discard other entires if
521 * the queue is getting too long).
523 * @param h handle to the datastore
524 * @param env envelope with the message to queue
525 * @param queue_priority priority of the entry
526 * @param max_queue_size at what queue size should this request be dropped
527 * (if other requests of higher priority are in the queue)
528 * @param expected_type which type of response do we expect,
529 * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
530 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
531 * @param qc client context (NOT a closure for @a response_proc)
532 * @return NULL if the queue is full
534 static struct GNUNET_DATASTORE_QueueEntry *
535 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
536 struct GNUNET_MQ_Envelope *env,
537 unsigned int queue_priority,
538 unsigned int max_queue_size,
539 uint16_t expected_type,
540 const union QueueContext *qc)
542 struct GNUNET_DATASTORE_QueueEntry *qe;
543 struct GNUNET_DATASTORE_QueueEntry *pos;
546 if ( (NULL != h->queue_tail) &&
547 (h->queue_tail->priority >= queue_priority) )
557 while ( (NULL != pos) &&
558 (c < max_queue_size) &&
559 (pos->priority >= queue_priority) )
564 if (c >= max_queue_size)
566 GNUNET_STATISTICS_update (h->stats,
567 gettext_noop ("# queue overflows"),
570 GNUNET_MQ_discard (env);
573 qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
576 qe->response_type = expected_type;
578 qe->priority = queue_priority;
579 qe->max_queue = max_queue_size;
582 /* append at the tail */
588 /* do not insert at HEAD if HEAD query was already
589 * transmitted and we are still receiving replies! */
590 if ( (NULL == pos) &&
591 (NULL == h->queue_head->env) )
595 #if INSANE_STATISTICS
596 GNUNET_STATISTICS_update (h->stats,
597 gettext_noop ("# queue entries created"),
601 GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
611 * Process entries in the queue (or do nothing if we are already
614 * @param h handle to the datastore
617 process_queue (struct GNUNET_DATASTORE_Handle *h)
619 struct GNUNET_DATASTORE_QueueEntry *qe;
621 if (NULL == (qe = h->queue_head))
623 /* no entry in queue */
624 LOG (GNUNET_ERROR_TYPE_DEBUG,
630 /* waiting for replies */
631 LOG (GNUNET_ERROR_TYPE_DEBUG,
632 "Head request already transmitted\n");
637 /* waiting for reconnect */
638 LOG (GNUNET_ERROR_TYPE_DEBUG,
642 GNUNET_assert (NULL == qe->delay_warn_task);
643 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
646 GNUNET_MQ_send (h->mq,
653 * Get the entry at the head of the message queue.
655 * @param h handle to the datastore
656 * @param response_type the expected response type
657 * @return the queue entry
659 static struct GNUNET_DATASTORE_QueueEntry *
660 get_queue_head (struct GNUNET_DATASTORE_Handle *h,
661 uint16_t response_type)
663 struct GNUNET_DATASTORE_QueueEntry *qe;
665 if (h->skip_next_messages > 0)
667 h->skip_next_messages--;
684 if (response_type != qe->response_type)
695 * Function called to check status message from the service.
698 * @param sm status message received
699 * @return #GNUNET_OK if the message is well-formed
702 check_status (void *cls,
703 const struct StatusMessage *sm)
705 uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
706 int32_t status = ntohl (sm->status);
710 const char *emsg = (const char *) &sm[1];
712 if ('\0' != emsg[msize - 1])
715 return GNUNET_SYSERR;
718 else if (GNUNET_SYSERR == status)
721 return GNUNET_SYSERR;
728 * Function called to handle status message from the service.
731 * @param sm status message received
734 handle_status (void *cls,
735 const struct StatusMessage *sm)
737 struct GNUNET_DATASTORE_Handle *h = cls;
738 struct GNUNET_DATASTORE_QueueEntry *qe;
739 struct StatusContext rc;
741 int32_t status = ntohl (sm->status);
743 qe = get_queue_head (h,
744 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
748 free_queue_entry (qe);
749 if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
750 emsg = (const char *) &sm[1];
753 LOG (GNUNET_ERROR_TYPE_DEBUG,
754 "Received status %d/%s\n",
757 GNUNET_STATISTICS_update (h->stats,
758 gettext_noop ("# status messages received"),
761 h->retry_time = GNUNET_TIME_UNIT_ZERO;
764 rc.cont (rc.cont_cls,
766 GNUNET_TIME_absolute_ntoh (sm->min_expiration),
772 * Check data message we received from the service.
774 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
775 * @param dm message received
778 check_data (void *cls,
779 const struct DataMessage *dm)
781 uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
783 if (msize != ntohl (dm->size))
786 return GNUNET_SYSERR;
793 * Handle data message we got from the service.
795 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
796 * @param dm message received
799 handle_data (void *cls,
800 const struct DataMessage *dm)
802 struct GNUNET_DATASTORE_Handle *h = cls;
803 struct GNUNET_DATASTORE_QueueEntry *qe;
804 struct ResultContext rc;
806 qe = get_queue_head (h,
807 GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
810 #if INSANE_STATISTICS
811 GNUNET_STATISTICS_update (h->stats,
812 gettext_noop ("# Results received"),
816 LOG (GNUNET_ERROR_TYPE_DEBUG,
817 "Received result %llu with type %u and size %u with key %s\n",
818 (unsigned long long) GNUNET_ntohll (dm->uid),
821 GNUNET_h2s (&dm->key));
823 free_queue_entry (qe);
824 h->retry_time = GNUNET_TIME_UNIT_ZERO;
827 rc.proc (rc.proc_cls,
832 ntohl (dm->priority),
833 ntohl (dm->anonymity),
834 ntohl (dm->replication),
835 GNUNET_TIME_absolute_ntoh (dm->expiration),
836 GNUNET_ntohll (dm->uid));
841 * Type of a function to call when we receive a
842 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
844 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
845 * @param msg message received
848 handle_data_end (void *cls,
849 const struct GNUNET_MessageHeader *msg)
851 struct GNUNET_DATASTORE_Handle *h = cls;
852 struct GNUNET_DATASTORE_QueueEntry *qe;
853 struct ResultContext rc;
855 qe = get_queue_head (h,
856 GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
860 free_queue_entry (qe);
861 LOG (GNUNET_ERROR_TYPE_DEBUG,
862 "Received end of result set, new queue size is %u\n",
864 h->retry_time = GNUNET_TIME_UNIT_ZERO;
867 /* signal end of iteration */
869 rc.proc (rc.proc_cls,
877 GNUNET_TIME_UNIT_ZERO_ABS,
883 * Try reconnecting to the datastore service.
885 * @param cls the `struct GNUNET_DATASTORE_Handle`
888 try_reconnect (void *cls)
890 struct GNUNET_DATASTORE_Handle *h = cls;
891 struct GNUNET_MQ_MessageHandler handlers[] = {
892 GNUNET_MQ_hd_var_size (status,
893 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
894 struct StatusMessage,
896 GNUNET_MQ_hd_var_size (data,
897 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
900 GNUNET_MQ_hd_fixed_size (data_end,
901 GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
902 struct GNUNET_MessageHeader,
904 GNUNET_MQ_handler_end ()
907 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
908 h->reconnect_task = NULL;
909 GNUNET_assert (NULL == h->mq);
910 h->mq = GNUNET_CLIENT_connect (h->cfg,
917 GNUNET_STATISTICS_update (h->stats,
918 gettext_noop ("# datastore connections (re)created"),
921 LOG (GNUNET_ERROR_TYPE_DEBUG,
922 "Reconnected to DATASTORE\n");
928 * Dummy continuation used to do nothing (but be non-zero).
931 * @param result result
932 * @param min_expiration expiration time
933 * @param emsg error message
936 drop_status_cont (void *cls,
938 struct GNUNET_TIME_Absolute min_expiration,
946 * Store an item in the datastore. If the item is already present,
947 * the priorities are summed up and the higher expiration time and
948 * lower anonymity level is used.
950 * @param h handle to the datastore
951 * @param rid reservation ID to use (from "reserve"); use 0 if no
952 * prior reservation was made
953 * @param key key for the value
954 * @param size number of bytes in data
955 * @param data content stored
956 * @param type type of the content
957 * @param priority priority of the content
958 * @param anonymity anonymity-level for the content
959 * @param replication how often should the content be replicated to other peers?
960 * @param expiration expiration time for the content
961 * @param queue_priority ranking of this request in the priority queue
962 * @param max_queue_size at what queue size should this request be dropped
963 * (if other requests of higher priority are in the queue)
964 * @param cont continuation to call when done
965 * @param cont_cls closure for @a cont
966 * @return NULL if the entry was not queued, otherwise a handle that can be used to
967 * cancel; note that even if NULL is returned, the callback will be invoked
968 * (or rather, will already have been invoked)
970 struct GNUNET_DATASTORE_QueueEntry *
971 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
973 const struct GNUNET_HashCode *key,
976 enum GNUNET_BLOCK_Type type,
979 uint32_t replication,
980 struct GNUNET_TIME_Absolute expiration,
981 unsigned int queue_priority,
982 unsigned int max_queue_size,
983 GNUNET_DATASTORE_ContinuationWithStatus cont,
986 struct GNUNET_DATASTORE_QueueEntry *qe;
987 struct GNUNET_MQ_Envelope *env;
988 struct DataMessage *dm;
989 union QueueContext qc;
991 if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE)
997 LOG (GNUNET_ERROR_TYPE_DEBUG,
998 "Asked to put %u bytes of data under key `%s' for %s\n",
1001 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
1003 env = GNUNET_MQ_msg_extra (dm,
1005 GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
1006 dm->rid = htonl (rid);
1007 dm->size = htonl ((uint32_t) size);
1008 dm->type = htonl (type);
1009 dm->priority = htonl (priority);
1010 dm->anonymity = htonl (anonymity);
1011 dm->replication = htonl (replication);
1012 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
1014 GNUNET_memcpy (&dm[1],
1018 qc.sc.cont_cls = cont_cls;
1019 qe = make_queue_entry (h,
1023 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1027 LOG (GNUNET_ERROR_TYPE_DEBUG,
1028 "Could not create queue entry for PUT\n");
1031 GNUNET_STATISTICS_update (h->stats,
1032 gettext_noop ("# PUT requests executed"),
1041 * Reserve space in the datastore. This function should be used
1042 * to avoid "out of space" failures during a longer sequence of "put"
1043 * operations (for example, when a file is being inserted).
1045 * @param h handle to the datastore
1046 * @param amount how much space (in bytes) should be reserved (for content only)
1047 * @param entries how many entries will be created (to calculate per-entry overhead)
1048 * @param cont continuation to call when done; "success" will be set to
1049 * a positive reservation value if space could be reserved.
1050 * @param cont_cls closure for @a cont
1051 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1052 * cancel; note that even if NULL is returned, the callback will be invoked
1053 * (or rather, will already have been invoked)
1055 struct GNUNET_DATASTORE_QueueEntry *
1056 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1059 GNUNET_DATASTORE_ContinuationWithStatus cont,
1062 struct GNUNET_DATASTORE_QueueEntry *qe;
1063 struct GNUNET_MQ_Envelope *env;
1064 struct ReserveMessage *rm;
1065 union QueueContext qc;
1068 cont = &drop_status_cont;
1069 LOG (GNUNET_ERROR_TYPE_DEBUG,
1070 "Asked to reserve %llu bytes of data and %u entries\n",
1071 (unsigned long long) amount,
1072 (unsigned int) entries);
1073 env = GNUNET_MQ_msg (rm,
1074 GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1075 rm->entries = htonl (entries);
1076 rm->amount = GNUNET_htonll (amount);
1079 qc.sc.cont_cls = cont_cls;
1080 qe = make_queue_entry (h,
1084 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1088 LOG (GNUNET_ERROR_TYPE_DEBUG,
1089 "Could not create queue entry to reserve\n");
1092 GNUNET_STATISTICS_update (h->stats,
1093 gettext_noop ("# RESERVE requests executed"),
1102 * Signal that all of the data for which a reservation was made has
1103 * been stored and that whatever excess space might have been reserved
1104 * can now be released.
1106 * @param h handle to the datastore
1107 * @param rid reservation ID (value of "success" in original continuation
1108 * from the "reserve" function).
1109 * @param queue_priority ranking of this request in the priority queue
1110 * @param max_queue_size at what queue size should this request be dropped
1111 * (if other requests of higher priority are in the queue)
1112 * @param queue_priority ranking of this request in the priority queue
1113 * @param max_queue_size at what queue size should this request be dropped
1114 * (if other requests of higher priority are in the queue)
1115 * @param cont continuation to call when done
1116 * @param cont_cls closure for @a cont
1117 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1118 * cancel; note that even if NULL is returned, the callback will be invoked
1119 * (or rather, will already have been invoked)
1121 struct GNUNET_DATASTORE_QueueEntry *
1122 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1124 unsigned int queue_priority,
1125 unsigned int max_queue_size,
1126 GNUNET_DATASTORE_ContinuationWithStatus cont,
1129 struct GNUNET_DATASTORE_QueueEntry *qe;
1130 struct GNUNET_MQ_Envelope *env;
1131 struct ReleaseReserveMessage *rrm;
1132 union QueueContext qc;
1135 cont = &drop_status_cont;
1136 LOG (GNUNET_ERROR_TYPE_DEBUG,
1137 "Asked to release reserve %d\n",
1139 env = GNUNET_MQ_msg (rrm,
1140 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1141 rrm->rid = htonl (rid);
1143 qc.sc.cont_cls = cont_cls;
1144 qe = make_queue_entry (h,
1148 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1152 LOG (GNUNET_ERROR_TYPE_DEBUG,
1153 "Could not create queue entry to release reserve\n");
1156 GNUNET_STATISTICS_update (h->stats,
1158 ("# RELEASE RESERVE requests executed"), 1,
1166 * Explicitly remove some content from the database.
1167 * The @a cont continuation will be called with `status`
1168 * #GNUNET_OK" if content was removed, #GNUNET_NO
1169 * if no matching entry was found and #GNUNET_SYSERR
1170 * on all other types of errors.
1172 * @param h handle to the datastore
1173 * @param key key for the value
1174 * @param size number of bytes in data
1175 * @param data content stored
1176 * @param queue_priority ranking of this request in the priority queue
1177 * @param max_queue_size at what queue size should this request be dropped
1178 * (if other requests of higher priority are in the queue)
1179 * @param cont continuation to call when done
1180 * @param cont_cls closure for @a cont
1181 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1182 * cancel; note that even if NULL is returned, the callback will be invoked
1183 * (or rather, will already have been invoked)
1185 struct GNUNET_DATASTORE_QueueEntry *
1186 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1187 const struct GNUNET_HashCode *key,
1190 unsigned int queue_priority,
1191 unsigned int max_queue_size,
1192 GNUNET_DATASTORE_ContinuationWithStatus cont,
1195 struct GNUNET_DATASTORE_QueueEntry *qe;
1196 struct DataMessage *dm;
1197 struct GNUNET_MQ_Envelope *env;
1198 union QueueContext qc;
1200 if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1206 cont = &drop_status_cont;
1207 LOG (GNUNET_ERROR_TYPE_DEBUG,
1208 "Asked to remove %u bytes under key `%s'\n",
1211 env = GNUNET_MQ_msg_extra (dm,
1213 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1214 dm->size = htonl (size);
1216 GNUNET_memcpy (&dm[1],
1221 qc.sc.cont_cls = cont_cls;
1223 qe = make_queue_entry (h,
1227 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1231 LOG (GNUNET_ERROR_TYPE_DEBUG,
1232 "Could not create queue entry for REMOVE\n");
1235 GNUNET_STATISTICS_update (h->stats,
1236 gettext_noop ("# REMOVE requests executed"),
1246 * Get a random value from the datastore for content replication.
1247 * Returns a single, random value among those with the highest
1248 * replication score, lowering positive replication scores by one for
1249 * the chosen value (if only content with a replication score exists,
1250 * a random value is returned and replication scores are not changed).
1252 * @param h handle to the datastore
1253 * @param queue_priority ranking of this request in the priority queue
1254 * @param max_queue_size at what queue size should this request be dropped
1255 * (if other requests of higher priority are in the queue)
1256 * @param proc function to call on a random value; it
1257 * will be called once with a value (if available)
1258 * and always once with a value of NULL.
1259 * @param proc_cls closure for @a proc
1260 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1263 struct GNUNET_DATASTORE_QueueEntry *
1264 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1265 unsigned int queue_priority,
1266 unsigned int max_queue_size,
1267 GNUNET_DATASTORE_DatumProcessor proc,
1270 struct GNUNET_DATASTORE_QueueEntry *qe;
1271 struct GNUNET_MQ_Envelope *env;
1272 struct GNUNET_MessageHeader *m;
1273 union QueueContext qc;
1275 GNUNET_assert (NULL != proc);
1276 LOG (GNUNET_ERROR_TYPE_DEBUG,
1277 "Asked to get replication entry\n");
1278 env = GNUNET_MQ_msg (m,
1279 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1281 qc.rc.proc_cls = proc_cls;
1282 qe = make_queue_entry (h,
1286 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1290 LOG (GNUNET_ERROR_TYPE_DEBUG,
1291 "Could not create queue entry for GET REPLICATION\n");
1294 GNUNET_STATISTICS_update (h->stats,
1296 ("# GET REPLICATION requests executed"), 1,
1304 * Get a single zero-anonymity value from the datastore.
1306 * @param h handle to the datastore
1307 * @param next_uid return the result with lowest uid >= next_uid
1308 * @param queue_priority ranking of this request in the priority queue
1309 * @param max_queue_size at what queue size should this request be dropped
1310 * (if other requests of higher priority are in the queue)
1311 * @param type allowed type for the operation (never zero)
1312 * @param proc function to call on a random value; it
1313 * will be called once with a value (if available)
1314 * or with NULL if none value exists.
1315 * @param proc_cls closure for @a proc
1316 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1319 struct GNUNET_DATASTORE_QueueEntry *
1320 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1322 unsigned int queue_priority,
1323 unsigned int max_queue_size,
1324 enum GNUNET_BLOCK_Type type,
1325 GNUNET_DATASTORE_DatumProcessor proc,
1328 struct GNUNET_DATASTORE_QueueEntry *qe;
1329 struct GNUNET_MQ_Envelope *env;
1330 struct GetZeroAnonymityMessage *m;
1331 union QueueContext qc;
1333 GNUNET_assert (NULL != proc);
1334 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1335 LOG (GNUNET_ERROR_TYPE_DEBUG,
1336 "Asked to get a zero-anonymity entry of type %d\n",
1338 env = GNUNET_MQ_msg (m,
1339 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1340 m->type = htonl ((uint32_t) type);
1341 m->next_uid = GNUNET_htonll (next_uid);
1343 qc.rc.proc_cls = proc_cls;
1344 qe = make_queue_entry (h,
1348 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1352 LOG (GNUNET_ERROR_TYPE_DEBUG,
1353 "Could not create queue entry for zero-anonymity procation\n");
1356 GNUNET_STATISTICS_update (h->stats,
1358 ("# GET ZERO ANONYMITY requests executed"), 1,
1366 * Get a result for a particular key from the datastore. The processor
1367 * will only be called once.
1369 * @param h handle to the datastore
1370 * @param next_uid return the result with lowest uid >= next_uid
1371 * @param random if true, return a random result instead of using next_uid
1372 * @param key maybe NULL (to match all entries)
1373 * @param type desired type, 0 for any
1374 * @param queue_priority ranking of this request in the priority queue
1375 * @param max_queue_size at what queue size should this request be dropped
1376 * (if other requests of higher priority are in the queue)
1377 * @param proc function to call on each matching value;
1378 * will be called once with a NULL value at the end
1379 * @param proc_cls closure for @a proc
1380 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1383 struct GNUNET_DATASTORE_QueueEntry *
1384 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1387 const struct GNUNET_HashCode *key,
1388 enum GNUNET_BLOCK_Type type,
1389 unsigned int queue_priority,
1390 unsigned int max_queue_size,
1391 GNUNET_DATASTORE_DatumProcessor proc,
1394 struct GNUNET_DATASTORE_QueueEntry *qe;
1395 struct GNUNET_MQ_Envelope *env;
1396 struct GetKeyMessage *gkm;
1397 struct GetMessage *gm;
1398 union QueueContext qc;
1400 GNUNET_assert (NULL != proc);
1401 LOG (GNUNET_ERROR_TYPE_DEBUG,
1402 "Asked to look for data of type %u under key `%s'\n",
1403 (unsigned int) type,
1407 env = GNUNET_MQ_msg (gm,
1408 GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1409 gm->type = htonl (type);
1410 gm->next_uid = GNUNET_htonll (next_uid);
1411 gm->random = random;
1415 env = GNUNET_MQ_msg (gkm,
1416 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1417 gkm->type = htonl (type);
1418 gkm->next_uid = GNUNET_htonll (next_uid);
1419 gkm->random = random;
1423 qc.rc.proc_cls = proc_cls;
1424 qe = make_queue_entry (h,
1428 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1432 LOG (GNUNET_ERROR_TYPE_DEBUG,
1433 "Could not queue request for `%s'\n",
1437 #if INSANE_STATISTICS
1438 GNUNET_STATISTICS_update (h->stats,
1439 gettext_noop ("# GET requests executed"),
1449 * Cancel a datastore operation. The final callback from the
1450 * operation must not have been done yet.
1452 * @param qe operation to cancel
1455 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1457 struct GNUNET_DATASTORE_Handle *h = qe->h;
1459 LOG (GNUNET_ERROR_TYPE_DEBUG,
1460 "Pending DATASTORE request %p cancelled (%d, %d)\n",
1463 h->queue_head == qe);
1464 if (NULL == qe->env)
1466 free_queue_entry (qe);
1467 h->skip_next_messages++;
1470 free_queue_entry (qe);
1475 /* end of datastore_api.c */