2 This file is part of GNUnet
3 Copyright (C) 2004-2013, 2016 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
17 * @file datastore/datastore_api.c
18 * @brief Management for the datastore for files stored on a GNUnet node. Implements
19 * a priority queue for requests
20 * @author Christian Grothoff
23 #include "gnunet_arm_service.h"
24 #include "gnunet_constants.h"
25 #include "gnunet_datastore_service.h"
26 #include "gnunet_statistics_service.h"
27 #include "datastore.h"
29 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
31 #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
34 * Collect an instane number of statistics? May cause excessive IPC.
36 #define INSANE_STATISTICS GNUNET_NO
39 * If a client stopped asking for more results, how many more do
40 * we receive from the DB before killing the connection? Trade-off
41 * between re-doing TCP handshakes and (needlessly) receiving
44 #define MAX_EXCESS_RESULTS 8
47 * Context for processing status messages.
52 * Continuation to call with the status.
54 GNUNET_DATASTORE_ContinuationWithStatus cont;
57 * Closure for @e cont.
65 * Context for processing result messages.
70 * Function to call with the result.
72 GNUNET_DATASTORE_DatumProcessor proc;
75 * Closure for @e proc.
83 * Context for a queue operation.
88 struct StatusContext sc;
90 struct ResultContext rc;
96 * Entry in our priority queue.
98 struct GNUNET_DATASTORE_QueueEntry
102 * This is a linked list.
104 struct GNUNET_DATASTORE_QueueEntry *next;
107 * This is a linked list.
109 struct GNUNET_DATASTORE_QueueEntry *prev;
112 * Handle to the master context.
114 struct GNUNET_DATASTORE_Handle *h;
117 * Function to call after transmission of the request.
119 GNUNET_DATASTORE_ContinuationWithStatus cont;
122 * Closure for @e cont.
127 * Context for the operation.
129 union QueueContext qc;
132 * Envelope of the request to transmit, NULL after
135 struct GNUNET_MQ_Envelope *env;
138 * Task we run if this entry stalls the queue and we
139 * need to warn the user.
141 struct GNUNET_SCHEDULER_Task *delay_warn_task;
144 * Priority in the queue.
146 unsigned int priority;
149 * Maximum allowed length of queue (otherwise
150 * this request should be discarded).
152 unsigned int max_queue;
155 * Expected response type.
157 uint16_t response_type;
163 * Handle to the datastore service.
165 struct GNUNET_DATASTORE_Handle
171 const struct GNUNET_CONFIGURATION_Handle *cfg;
174 * Current connection to the datastore service.
176 struct GNUNET_MQ_Handle *mq;
179 * Handle for statistics.
181 struct GNUNET_STATISTICS_Handle *stats;
184 * Current head of priority queue.
186 struct GNUNET_DATASTORE_QueueEntry *queue_head;
189 * Current tail of priority queue.
191 struct GNUNET_DATASTORE_QueueEntry *queue_tail;
194 * Task for trying to reconnect.
196 struct GNUNET_SCHEDULER_Task *reconnect_task;
199 * How quickly should we retry? Used for exponential back-off on
202 struct GNUNET_TIME_Relative retry_time;
205 * Number of entries in the queue.
207 unsigned int queue_size;
210 * Number of results we're receiving for the current query
211 * after application stopped to care. Used to determine when
212 * to reset the connection.
214 unsigned int result_count;
217 * We should ignore the next message(s) from the service.
219 unsigned int skip_next_messages;
225 * Try reconnecting to the datastore service.
227 * @param cls the `struct GNUNET_DATASTORE_Handle`
230 try_reconnect (void *cls);
234 * Disconnect from the service and then try reconnecting to the datastore service
237 * @param h handle to datastore to disconnect and reconnect
240 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
247 GNUNET_MQ_destroy (h->mq);
249 h->skip_next_messages = 0;
251 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
258 * Free a queue entry. Removes the given entry from the
259 * queue and releases associated resources. Does NOT
262 * @param qe entry to free.
265 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
267 struct GNUNET_DATASTORE_Handle *h = qe->h;
269 GNUNET_CONTAINER_DLL_remove (h->queue_head,
274 GNUNET_MQ_discard (qe->env);
275 if (NULL != qe->delay_warn_task)
276 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
282 * Task that logs an error after some time.
284 * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is
287 delay_warning (void *cls)
289 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
291 qe->delay_warn_task = NULL;
292 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
293 "Request %p of type %u at head of datastore queue for more than %s\n",
295 (unsigned int) qe->response_type,
296 GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT,
298 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
305 * Handle error in sending drop request to datastore.
307 * @param cls closure with the datastore handle
308 * @param error error code
311 mq_error_handler (void *cls,
312 enum GNUNET_MQ_Error error)
314 struct GNUNET_DATASTORE_Handle *h = cls;
315 struct GNUNET_DATASTORE_QueueEntry *qe;
317 LOG (GNUNET_ERROR_TYPE_DEBUG,
318 "MQ error, reconnecting to DATASTORE\n");
323 if (NULL != qe->delay_warn_task)
325 GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
326 qe->delay_warn_task = NULL;
330 union QueueContext qc = qe->qc;
331 uint16_t rt = qe->response_type;
333 LOG (GNUNET_ERROR_TYPE_DEBUG,
334 "Failed to receive response from database.\n");
335 free_queue_entry (qe);
338 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
339 if (NULL != qc.sc.cont)
340 qc.sc.cont (qc.sc.cont_cls,
342 GNUNET_TIME_UNIT_ZERO_ABS,
343 _("DATASTORE disconnected"));
345 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
346 if (NULL != qc.rc.proc)
347 qc.rc.proc (qc.rc.proc_cls,
355 GNUNET_TIME_UNIT_ZERO_ABS,
366 * Connect to the datastore service.
368 * @param cfg configuration to use
369 * @return handle to use to access the service
371 struct GNUNET_DATASTORE_Handle *
372 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
374 struct GNUNET_DATASTORE_Handle *h;
376 LOG (GNUNET_ERROR_TYPE_DEBUG,
377 "Establishing DATASTORE connection!\n");
378 h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
386 h->stats = GNUNET_STATISTICS_create ("datastore-api",
393 * Task used by to disconnect from the datastore after
394 * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
396 * @param cls the datastore handle
399 disconnect_after_drop (void *cls)
401 struct GNUNET_DATASTORE_Handle *h = cls;
403 LOG (GNUNET_ERROR_TYPE_DEBUG,
404 "Drop sent, disconnecting\n");
405 GNUNET_DATASTORE_disconnect (h,
411 * Handle error in sending drop request to datastore.
413 * @param cls closure with the datastore handle
414 * @param error error code
417 disconnect_on_mq_error (void *cls,
418 enum GNUNET_MQ_Error error)
420 struct GNUNET_DATASTORE_Handle *h = cls;
422 LOG (GNUNET_ERROR_TYPE_ERROR,
423 "Failed to ask datastore to drop tables\n");
424 GNUNET_DATASTORE_disconnect (h,
430 * Disconnect from the datastore service (and free
431 * associated resources).
433 * @param h handle to the datastore
434 * @param drop set to #GNUNET_YES to delete all data in datastore (!)
437 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
440 struct GNUNET_DATASTORE_QueueEntry *qe;
442 LOG (GNUNET_ERROR_TYPE_DEBUG,
443 "Datastore disconnect\n");
446 GNUNET_MQ_destroy (h->mq);
449 if (NULL != h->reconnect_task)
451 GNUNET_SCHEDULER_cancel (h->reconnect_task);
452 h->reconnect_task = NULL;
454 while (NULL != (qe = h->queue_head))
456 switch (qe->response_type)
458 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
459 if (NULL != qe->qc.sc.cont)
460 qe->qc.sc.cont (qe->qc.sc.cont_cls,
462 GNUNET_TIME_UNIT_ZERO_ABS,
463 _("Disconnected from DATASTORE"));
465 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
466 if (NULL != qe->qc.rc.proc)
467 qe->qc.rc.proc (qe->qc.rc.proc_cls,
475 GNUNET_TIME_UNIT_ZERO_ABS,
481 free_queue_entry (qe);
483 if (GNUNET_YES == drop)
485 LOG (GNUNET_ERROR_TYPE_DEBUG,
486 "Re-connecting to issue DROP!\n");
487 GNUNET_assert (NULL == h->mq);
488 h->mq = GNUNET_CLIENT_connect (h->cfg,
491 &disconnect_on_mq_error,
495 struct GNUNET_MessageHeader *hdr;
496 struct GNUNET_MQ_Envelope *env;
498 env = GNUNET_MQ_msg (hdr,
499 GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
500 GNUNET_MQ_notify_sent (env,
501 &disconnect_after_drop,
503 GNUNET_MQ_send (h->mq,
509 GNUNET_STATISTICS_destroy (h->stats,
517 * Create a new entry for our priority queue (and possibly discard other entires if
518 * the queue is getting too long).
520 * @param h handle to the datastore
521 * @param env envelope with the message to queue
522 * @param queue_priority priority of the entry
523 * @param max_queue_size at what queue size should this request be dropped
524 * (if other requests of higher priority are in the queue)
525 * @param expected_type which type of response do we expect,
526 * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
527 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
528 * @param qc client context (NOT a closure for @a response_proc)
529 * @return NULL if the queue is full
531 static struct GNUNET_DATASTORE_QueueEntry *
532 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
533 struct GNUNET_MQ_Envelope *env,
534 unsigned int queue_priority,
535 unsigned int max_queue_size,
536 uint16_t expected_type,
537 const union QueueContext *qc)
539 struct GNUNET_DATASTORE_QueueEntry *qe;
540 struct GNUNET_DATASTORE_QueueEntry *pos;
543 if ( (NULL != h->queue_tail) &&
544 (h->queue_tail->priority >= queue_priority) )
554 while ( (NULL != pos) &&
555 (c < max_queue_size) &&
556 (pos->priority >= queue_priority) )
561 if (c >= max_queue_size)
563 GNUNET_STATISTICS_update (h->stats,
564 gettext_noop ("# queue overflows"),
567 GNUNET_MQ_discard (env);
570 qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
573 qe->response_type = expected_type;
575 qe->priority = queue_priority;
576 qe->max_queue = max_queue_size;
579 /* append at the tail */
585 /* do not insert at HEAD if HEAD query was already
586 * transmitted and we are still receiving replies! */
587 if ( (NULL == pos) &&
588 (NULL == h->queue_head->env) )
592 #if INSANE_STATISTICS
593 GNUNET_STATISTICS_update (h->stats,
594 gettext_noop ("# queue entries created"),
598 GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
608 * Process entries in the queue (or do nothing if we are already
611 * @param h handle to the datastore
614 process_queue (struct GNUNET_DATASTORE_Handle *h)
616 struct GNUNET_DATASTORE_QueueEntry *qe;
618 if (NULL == (qe = h->queue_head))
620 /* no entry in queue */
621 LOG (GNUNET_ERROR_TYPE_DEBUG,
627 /* waiting for replies */
628 LOG (GNUNET_ERROR_TYPE_DEBUG,
629 "Head request already transmitted\n");
634 /* waiting for reconnect */
635 LOG (GNUNET_ERROR_TYPE_DEBUG,
639 GNUNET_assert (NULL == qe->delay_warn_task);
640 qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
643 GNUNET_MQ_send (h->mq,
650 * Get the entry at the head of the message queue.
652 * @param h handle to the datastore
653 * @param response_type the expected response type
654 * @return the queue entry
656 static struct GNUNET_DATASTORE_QueueEntry *
657 get_queue_head (struct GNUNET_DATASTORE_Handle *h,
658 uint16_t response_type)
660 struct GNUNET_DATASTORE_QueueEntry *qe;
662 if (h->skip_next_messages > 0)
664 h->skip_next_messages--;
681 if (response_type != qe->response_type)
692 * Function called to check status message from the service.
695 * @param sm status message received
696 * @return #GNUNET_OK if the message is well-formed
699 check_status (void *cls,
700 const struct StatusMessage *sm)
702 uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
703 int32_t status = ntohl (sm->status);
707 const char *emsg = (const char *) &sm[1];
709 if ('\0' != emsg[msize - 1])
712 return GNUNET_SYSERR;
715 else if (GNUNET_SYSERR == status)
718 return GNUNET_SYSERR;
725 * Function called to handle status message from the service.
728 * @param sm status message received
731 handle_status (void *cls,
732 const struct StatusMessage *sm)
734 struct GNUNET_DATASTORE_Handle *h = cls;
735 struct GNUNET_DATASTORE_QueueEntry *qe;
736 struct StatusContext rc;
738 int32_t status = ntohl (sm->status);
740 qe = get_queue_head (h,
741 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
745 free_queue_entry (qe);
746 if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
747 emsg = (const char *) &sm[1];
750 LOG (GNUNET_ERROR_TYPE_DEBUG,
751 "Received status %d/%s\n",
754 GNUNET_STATISTICS_update (h->stats,
755 gettext_noop ("# status messages received"),
758 h->retry_time = GNUNET_TIME_UNIT_ZERO;
761 rc.cont (rc.cont_cls,
763 GNUNET_TIME_absolute_ntoh (sm->min_expiration),
769 * Check data message we received from the service.
771 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
772 * @param dm message received
775 check_data (void *cls,
776 const struct DataMessage *dm)
778 uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
780 if (msize != ntohl (dm->size))
783 return GNUNET_SYSERR;
790 * Handle data message we got from the service.
792 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
793 * @param dm message received
796 handle_data (void *cls,
797 const struct DataMessage *dm)
799 struct GNUNET_DATASTORE_Handle *h = cls;
800 struct GNUNET_DATASTORE_QueueEntry *qe;
801 struct ResultContext rc;
803 qe = get_queue_head (h,
804 GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
807 #if INSANE_STATISTICS
808 GNUNET_STATISTICS_update (h->stats,
809 gettext_noop ("# Results received"),
813 LOG (GNUNET_ERROR_TYPE_DEBUG,
814 "Received result %llu with type %u and size %u with key %s\n",
815 (unsigned long long) GNUNET_ntohll (dm->uid),
818 GNUNET_h2s (&dm->key));
820 free_queue_entry (qe);
821 h->retry_time = GNUNET_TIME_UNIT_ZERO;
824 rc.proc (rc.proc_cls,
829 ntohl (dm->priority),
830 ntohl (dm->anonymity),
831 ntohl (dm->replication),
832 GNUNET_TIME_absolute_ntoh (dm->expiration),
833 GNUNET_ntohll (dm->uid));
838 * Type of a function to call when we receive a
839 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
841 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
842 * @param msg message received
845 handle_data_end (void *cls,
846 const struct GNUNET_MessageHeader *msg)
848 struct GNUNET_DATASTORE_Handle *h = cls;
849 struct GNUNET_DATASTORE_QueueEntry *qe;
850 struct ResultContext rc;
852 qe = get_queue_head (h,
853 GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
857 free_queue_entry (qe);
858 LOG (GNUNET_ERROR_TYPE_DEBUG,
859 "Received end of result set, new queue size is %u\n",
861 h->retry_time = GNUNET_TIME_UNIT_ZERO;
864 /* signal end of iteration */
866 rc.proc (rc.proc_cls,
874 GNUNET_TIME_UNIT_ZERO_ABS,
880 * Try reconnecting to the datastore service.
882 * @param cls the `struct GNUNET_DATASTORE_Handle`
885 try_reconnect (void *cls)
887 struct GNUNET_DATASTORE_Handle *h = cls;
888 struct GNUNET_MQ_MessageHandler handlers[] = {
889 GNUNET_MQ_hd_var_size (status,
890 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
891 struct StatusMessage,
893 GNUNET_MQ_hd_var_size (data,
894 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
897 GNUNET_MQ_hd_fixed_size (data_end,
898 GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
899 struct GNUNET_MessageHeader,
901 GNUNET_MQ_handler_end ()
904 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
905 h->reconnect_task = NULL;
906 GNUNET_assert (NULL == h->mq);
907 h->mq = GNUNET_CLIENT_connect (h->cfg,
914 GNUNET_STATISTICS_update (h->stats,
915 gettext_noop ("# datastore connections (re)created"),
918 LOG (GNUNET_ERROR_TYPE_DEBUG,
919 "Reconnected to DATASTORE\n");
925 * Dummy continuation used to do nothing (but be non-zero).
928 * @param result result
929 * @param min_expiration expiration time
930 * @param emsg error message
933 drop_status_cont (void *cls,
935 struct GNUNET_TIME_Absolute min_expiration,
943 * Store an item in the datastore. If the item is already present,
944 * the priorities are summed up and the higher expiration time and
945 * lower anonymity level is used.
947 * @param h handle to the datastore
948 * @param rid reservation ID to use (from "reserve"); use 0 if no
949 * prior reservation was made
950 * @param key key for the value
951 * @param size number of bytes in data
952 * @param data content stored
953 * @param type type of the content
954 * @param priority priority of the content
955 * @param anonymity anonymity-level for the content
956 * @param replication how often should the content be replicated to other peers?
957 * @param expiration expiration time for the content
958 * @param queue_priority ranking of this request in the priority queue
959 * @param max_queue_size at what queue size should this request be dropped
960 * (if other requests of higher priority are in the queue)
961 * @param cont continuation to call when done
962 * @param cont_cls closure for @a cont
963 * @return NULL if the entry was not queued, otherwise a handle that can be used to
964 * cancel; note that even if NULL is returned, the callback will be invoked
965 * (or rather, will already have been invoked)
967 struct GNUNET_DATASTORE_QueueEntry *
968 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
970 const struct GNUNET_HashCode *key,
973 enum GNUNET_BLOCK_Type type,
976 uint32_t replication,
977 struct GNUNET_TIME_Absolute expiration,
978 unsigned int queue_priority,
979 unsigned int max_queue_size,
980 GNUNET_DATASTORE_ContinuationWithStatus cont,
983 struct GNUNET_DATASTORE_QueueEntry *qe;
984 struct GNUNET_MQ_Envelope *env;
985 struct DataMessage *dm;
986 union QueueContext qc;
988 if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE)
994 LOG (GNUNET_ERROR_TYPE_DEBUG,
995 "Asked to put %u bytes of data under key `%s' for %s\n",
998 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
1000 env = GNUNET_MQ_msg_extra (dm,
1002 GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
1003 dm->rid = htonl (rid);
1004 dm->size = htonl ((uint32_t) size);
1005 dm->type = htonl (type);
1006 dm->priority = htonl (priority);
1007 dm->anonymity = htonl (anonymity);
1008 dm->replication = htonl (replication);
1009 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
1011 GNUNET_memcpy (&dm[1],
1015 qc.sc.cont_cls = cont_cls;
1016 qe = make_queue_entry (h,
1020 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1024 LOG (GNUNET_ERROR_TYPE_DEBUG,
1025 "Could not create queue entry for PUT\n");
1028 GNUNET_STATISTICS_update (h->stats,
1029 gettext_noop ("# PUT requests executed"),
1038 * Reserve space in the datastore. This function should be used
1039 * to avoid "out of space" failures during a longer sequence of "put"
1040 * operations (for example, when a file is being inserted).
1042 * @param h handle to the datastore
1043 * @param amount how much space (in bytes) should be reserved (for content only)
1044 * @param entries how many entries will be created (to calculate per-entry overhead)
1045 * @param cont continuation to call when done; "success" will be set to
1046 * a positive reservation value if space could be reserved.
1047 * @param cont_cls closure for @a cont
1048 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1049 * cancel; note that even if NULL is returned, the callback will be invoked
1050 * (or rather, will already have been invoked)
1052 struct GNUNET_DATASTORE_QueueEntry *
1053 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1056 GNUNET_DATASTORE_ContinuationWithStatus cont,
1059 struct GNUNET_DATASTORE_QueueEntry *qe;
1060 struct GNUNET_MQ_Envelope *env;
1061 struct ReserveMessage *rm;
1062 union QueueContext qc;
1065 cont = &drop_status_cont;
1066 LOG (GNUNET_ERROR_TYPE_DEBUG,
1067 "Asked to reserve %llu bytes of data and %u entries\n",
1068 (unsigned long long) amount,
1069 (unsigned int) entries);
1070 env = GNUNET_MQ_msg (rm,
1071 GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1072 rm->entries = htonl (entries);
1073 rm->amount = GNUNET_htonll (amount);
1076 qc.sc.cont_cls = cont_cls;
1077 qe = make_queue_entry (h,
1081 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1085 LOG (GNUNET_ERROR_TYPE_DEBUG,
1086 "Could not create queue entry to reserve\n");
1089 GNUNET_STATISTICS_update (h->stats,
1090 gettext_noop ("# RESERVE requests executed"),
1099 * Signal that all of the data for which a reservation was made has
1100 * been stored and that whatever excess space might have been reserved
1101 * can now be released.
1103 * @param h handle to the datastore
1104 * @param rid reservation ID (value of "success" in original continuation
1105 * from the "reserve" function).
1106 * @param queue_priority ranking of this request in the priority queue
1107 * @param max_queue_size at what queue size should this request be dropped
1108 * (if other requests of higher priority are in the queue)
1109 * @param queue_priority ranking of this request in the priority queue
1110 * @param max_queue_size at what queue size should this request be dropped
1111 * (if other requests of higher priority are in the queue)
1112 * @param cont continuation to call when done
1113 * @param cont_cls closure for @a cont
1114 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1115 * cancel; note that even if NULL is returned, the callback will be invoked
1116 * (or rather, will already have been invoked)
1118 struct GNUNET_DATASTORE_QueueEntry *
1119 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1121 unsigned int queue_priority,
1122 unsigned int max_queue_size,
1123 GNUNET_DATASTORE_ContinuationWithStatus cont,
1126 struct GNUNET_DATASTORE_QueueEntry *qe;
1127 struct GNUNET_MQ_Envelope *env;
1128 struct ReleaseReserveMessage *rrm;
1129 union QueueContext qc;
1132 cont = &drop_status_cont;
1133 LOG (GNUNET_ERROR_TYPE_DEBUG,
1134 "Asked to release reserve %d\n",
1136 env = GNUNET_MQ_msg (rrm,
1137 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1138 rrm->rid = htonl (rid);
1140 qc.sc.cont_cls = cont_cls;
1141 qe = make_queue_entry (h,
1145 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1149 LOG (GNUNET_ERROR_TYPE_DEBUG,
1150 "Could not create queue entry to release reserve\n");
1153 GNUNET_STATISTICS_update (h->stats,
1155 ("# RELEASE RESERVE requests executed"), 1,
1163 * Explicitly remove some content from the database.
1164 * The @a cont continuation will be called with `status`
1165 * #GNUNET_OK" if content was removed, #GNUNET_NO
1166 * if no matching entry was found and #GNUNET_SYSERR
1167 * on all other types of errors.
1169 * @param h handle to the datastore
1170 * @param key key for the value
1171 * @param size number of bytes in data
1172 * @param data content stored
1173 * @param queue_priority ranking of this request in the priority queue
1174 * @param max_queue_size at what queue size should this request be dropped
1175 * (if other requests of higher priority are in the queue)
1176 * @param cont continuation to call when done
1177 * @param cont_cls closure for @a cont
1178 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1179 * cancel; note that even if NULL is returned, the callback will be invoked
1180 * (or rather, will already have been invoked)
1182 struct GNUNET_DATASTORE_QueueEntry *
1183 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1184 const struct GNUNET_HashCode *key,
1187 unsigned int queue_priority,
1188 unsigned int max_queue_size,
1189 GNUNET_DATASTORE_ContinuationWithStatus cont,
1192 struct GNUNET_DATASTORE_QueueEntry *qe;
1193 struct DataMessage *dm;
1194 struct GNUNET_MQ_Envelope *env;
1195 union QueueContext qc;
1197 if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1203 cont = &drop_status_cont;
1204 LOG (GNUNET_ERROR_TYPE_DEBUG,
1205 "Asked to remove %u bytes under key `%s'\n",
1208 env = GNUNET_MQ_msg_extra (dm,
1210 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1211 dm->size = htonl (size);
1213 GNUNET_memcpy (&dm[1],
1218 qc.sc.cont_cls = cont_cls;
1220 qe = make_queue_entry (h,
1224 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1228 LOG (GNUNET_ERROR_TYPE_DEBUG,
1229 "Could not create queue entry for REMOVE\n");
1232 GNUNET_STATISTICS_update (h->stats,
1233 gettext_noop ("# REMOVE requests executed"),
1243 * Get a random value from the datastore for content replication.
1244 * Returns a single, random value among those with the highest
1245 * replication score, lowering positive replication scores by one for
1246 * the chosen value (if only content with a replication score exists,
1247 * a random value is returned and replication scores are not changed).
1249 * @param h handle to the datastore
1250 * @param queue_priority ranking of this request in the priority queue
1251 * @param max_queue_size at what queue size should this request be dropped
1252 * (if other requests of higher priority are in the queue)
1253 * @param proc function to call on a random value; it
1254 * will be called once with a value (if available)
1255 * and always once with a value of NULL.
1256 * @param proc_cls closure for @a proc
1257 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1260 struct GNUNET_DATASTORE_QueueEntry *
1261 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1262 unsigned int queue_priority,
1263 unsigned int max_queue_size,
1264 GNUNET_DATASTORE_DatumProcessor proc,
1267 struct GNUNET_DATASTORE_QueueEntry *qe;
1268 struct GNUNET_MQ_Envelope *env;
1269 struct GNUNET_MessageHeader *m;
1270 union QueueContext qc;
1272 GNUNET_assert (NULL != proc);
1273 LOG (GNUNET_ERROR_TYPE_DEBUG,
1274 "Asked to get replication entry\n");
1275 env = GNUNET_MQ_msg (m,
1276 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1278 qc.rc.proc_cls = proc_cls;
1279 qe = make_queue_entry (h,
1283 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1287 LOG (GNUNET_ERROR_TYPE_DEBUG,
1288 "Could not create queue entry for GET REPLICATION\n");
1291 GNUNET_STATISTICS_update (h->stats,
1293 ("# GET REPLICATION requests executed"), 1,
1301 * Get a single zero-anonymity value from the datastore.
1303 * @param h handle to the datastore
1304 * @param next_uid return the result with lowest uid >= next_uid
1305 * @param queue_priority ranking of this request in the priority queue
1306 * @param max_queue_size at what queue size should this request be dropped
1307 * (if other requests of higher priority are in the queue)
1308 * @param type allowed type for the operation (never zero)
1309 * @param proc function to call on a random value; it
1310 * will be called once with a value (if available)
1311 * or with NULL if none value exists.
1312 * @param proc_cls closure for @a proc
1313 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1316 struct GNUNET_DATASTORE_QueueEntry *
1317 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1319 unsigned int queue_priority,
1320 unsigned int max_queue_size,
1321 enum GNUNET_BLOCK_Type type,
1322 GNUNET_DATASTORE_DatumProcessor proc,
1325 struct GNUNET_DATASTORE_QueueEntry *qe;
1326 struct GNUNET_MQ_Envelope *env;
1327 struct GetZeroAnonymityMessage *m;
1328 union QueueContext qc;
1330 GNUNET_assert (NULL != proc);
1331 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1332 LOG (GNUNET_ERROR_TYPE_DEBUG,
1333 "Asked to get a zero-anonymity entry of type %d\n",
1335 env = GNUNET_MQ_msg (m,
1336 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1337 m->type = htonl ((uint32_t) type);
1338 m->next_uid = GNUNET_htonll (next_uid);
1340 qc.rc.proc_cls = proc_cls;
1341 qe = make_queue_entry (h,
1345 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1349 LOG (GNUNET_ERROR_TYPE_DEBUG,
1350 "Could not create queue entry for zero-anonymity procation\n");
1353 GNUNET_STATISTICS_update (h->stats,
1355 ("# GET ZERO ANONYMITY requests executed"), 1,
1363 * Get a result for a particular key from the datastore. The processor
1364 * will only be called once.
1366 * @param h handle to the datastore
1367 * @param next_uid return the result with lowest uid >= next_uid
1368 * @param random if true, return a random result instead of using next_uid
1369 * @param key maybe NULL (to match all entries)
1370 * @param type desired type, 0 for any
1371 * @param queue_priority ranking of this request in the priority queue
1372 * @param max_queue_size at what queue size should this request be dropped
1373 * (if other requests of higher priority are in the queue)
1374 * @param proc function to call on each matching value;
1375 * will be called once with a NULL value at the end
1376 * @param proc_cls closure for @a proc
1377 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1380 struct GNUNET_DATASTORE_QueueEntry *
1381 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1384 const struct GNUNET_HashCode *key,
1385 enum GNUNET_BLOCK_Type type,
1386 unsigned int queue_priority,
1387 unsigned int max_queue_size,
1388 GNUNET_DATASTORE_DatumProcessor proc,
1391 struct GNUNET_DATASTORE_QueueEntry *qe;
1392 struct GNUNET_MQ_Envelope *env;
1393 struct GetKeyMessage *gkm;
1394 struct GetMessage *gm;
1395 union QueueContext qc;
1397 GNUNET_assert (NULL != proc);
1398 LOG (GNUNET_ERROR_TYPE_DEBUG,
1399 "Asked to look for data of type %u under key `%s'\n",
1400 (unsigned int) type,
1404 env = GNUNET_MQ_msg (gm,
1405 GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1406 gm->type = htonl (type);
1407 gm->next_uid = GNUNET_htonll (next_uid);
1408 gm->random = random;
1412 env = GNUNET_MQ_msg (gkm,
1413 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1414 gkm->type = htonl (type);
1415 gkm->next_uid = GNUNET_htonll (next_uid);
1416 gkm->random = random;
1420 qc.rc.proc_cls = proc_cls;
1421 qe = make_queue_entry (h,
1425 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1429 LOG (GNUNET_ERROR_TYPE_DEBUG,
1430 "Could not queue request for `%s'\n",
1434 #if INSANE_STATISTICS
1435 GNUNET_STATISTICS_update (h->stats,
1436 gettext_noop ("# GET requests executed"),
1446 * Cancel a datastore operation. The final callback from the
1447 * operation must not have been done yet.
1449 * @param qe operation to cancel
1452 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1454 struct GNUNET_DATASTORE_Handle *h = qe->h;
1456 LOG (GNUNET_ERROR_TYPE_DEBUG,
1457 "Pending DATASTORE request %p cancelled (%d, %d)\n",
1460 h->queue_head == qe);
1461 if (NULL == qe->env)
1463 free_queue_entry (qe);
1464 h->skip_next_messages++;
1467 free_queue_entry (qe);
1472 /* end of datastore_api.c */