2 This file is part of GNUnet
3 Copyright (C) 2004-2013, 2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file datastore/datastore_api.c
23 * @brief Management for the datastore for files stored on a GNUnet node. Implements
24 * a priority queue for requests
25 * @author Christian Grothoff
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
37 * Collect an instane number of statistics? May cause excessive IPC.
39 #define INSANE_STATISTICS GNUNET_NO
42 * If a client stopped asking for more results, how many more do
43 * we receive from the DB before killing the connection? Trade-off
44 * between re-doing TCP handshakes and (needlessly) receiving
47 #define MAX_EXCESS_RESULTS 8
50 * Context for processing status messages.
55 * Continuation to call with the status.
57 GNUNET_DATASTORE_ContinuationWithStatus cont;
60 * Closure for @e cont.
68 * Context for processing result messages.
73 * Function to call with the result.
75 GNUNET_DATASTORE_DatumProcessor proc;
78 * Closure for @e proc.
86 * Context for a queue operation.
91 struct StatusContext sc;
93 struct ResultContext rc;
99 * Entry in our priority queue.
101 struct GNUNET_DATASTORE_QueueEntry
105 * This is a linked list.
107 struct GNUNET_DATASTORE_QueueEntry *next;
110 * This is a linked list.
112 struct GNUNET_DATASTORE_QueueEntry *prev;
115 * Handle to the master context.
117 struct GNUNET_DATASTORE_Handle *h;
120 * Function to call after transmission of the request.
122 GNUNET_DATASTORE_ContinuationWithStatus cont;
125 * Closure for @e cont.
130 * Context for the operation.
132 union QueueContext qc;
135 * Envelope of the request to transmit, NULL after
138 struct GNUNET_MQ_Envelope *env;
141 * Priority in the queue.
143 unsigned int priority;
146 * Maximum allowed length of queue (otherwise
147 * this request should be discarded).
149 unsigned int max_queue;
152 * Expected response type.
154 uint16_t response_type;
160 * Handle to the datastore service.
162 struct GNUNET_DATASTORE_Handle
168 const struct GNUNET_CONFIGURATION_Handle *cfg;
171 * Current connection to the datastore service.
173 struct GNUNET_MQ_Handle *mq;
176 * Handle for statistics.
178 struct GNUNET_STATISTICS_Handle *stats;
181 * Current head of priority queue.
183 struct GNUNET_DATASTORE_QueueEntry *queue_head;
186 * Current tail of priority queue.
188 struct GNUNET_DATASTORE_QueueEntry *queue_tail;
191 * Task for trying to reconnect.
193 struct GNUNET_SCHEDULER_Task *reconnect_task;
196 * How quickly should we retry? Used for exponential back-off on
199 struct GNUNET_TIME_Relative retry_time;
202 * Number of entries in the queue.
204 unsigned int queue_size;
207 * Number of results we're receiving for the current query
208 * after application stopped to care. Used to determine when
209 * to reset the connection.
211 unsigned int result_count;
214 * We should ignore the next message(s) from the service.
216 unsigned int skip_next_messages;
222 * Try reconnecting to the datastore service.
224 * @param cls the `struct GNUNET_DATASTORE_Handle`
227 try_reconnect (void *cls);
231 * Disconnect from the service and then try reconnecting to the datastore service
234 * @param h handle to datastore to disconnect and reconnect
237 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
244 GNUNET_MQ_destroy (h->mq);
246 h->skip_next_messages = 0;
248 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
255 * Free a queue entry. Removes the given entry from the
256 * queue and releases associated resources. Does NOT
259 * @param qe entry to free.
262 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
264 struct GNUNET_DATASTORE_Handle *h = qe->h;
266 GNUNET_CONTAINER_DLL_remove (h->queue_head,
271 GNUNET_MQ_discard (qe->env);
277 * Handle error in sending drop request to datastore.
279 * @param cls closure with the datastore handle
280 * @param error error code
283 mq_error_handler (void *cls,
284 enum GNUNET_MQ_Error error)
286 struct GNUNET_DATASTORE_Handle *h = cls;
287 struct GNUNET_DATASTORE_QueueEntry *qe;
289 LOG (GNUNET_ERROR_TYPE_DEBUG,
290 "MQ error, reconnecting to DATASTORE\n");
296 union QueueContext qc = qe->qc;
297 uint16_t rt = qe->response_type;
299 LOG (GNUNET_ERROR_TYPE_DEBUG,
300 "Failed to receive response from database.\n");
301 free_queue_entry (qe);
304 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
305 if (NULL != qc.sc.cont)
306 qc.sc.cont (qc.sc.cont_cls,
308 GNUNET_TIME_UNIT_ZERO_ABS,
309 _("DATASTORE disconnected"));
311 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
312 if (NULL != qc.rc.proc)
313 qc.rc.proc (qc.rc.proc_cls,
317 GNUNET_TIME_UNIT_ZERO_ABS,
328 * Connect to the datastore service.
330 * @param cfg configuration to use
331 * @return handle to use to access the service
333 struct GNUNET_DATASTORE_Handle *
334 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
336 struct GNUNET_DATASTORE_Handle *h;
338 LOG (GNUNET_ERROR_TYPE_DEBUG,
339 "Establishing DATASTORE connection!\n");
340 h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
348 h->stats = GNUNET_STATISTICS_create ("datastore-api",
355 * Task used by to disconnect from the datastore after
356 * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
358 * @param cls the datastore handle
361 disconnect_after_drop (void *cls)
363 struct GNUNET_DATASTORE_Handle *h = cls;
365 LOG (GNUNET_ERROR_TYPE_DEBUG,
366 "Drop sent, disconnecting\n");
367 GNUNET_DATASTORE_disconnect (h,
373 * Handle error in sending drop request to datastore.
375 * @param cls closure with the datastore handle
376 * @param error error code
379 disconnect_on_mq_error (void *cls,
380 enum GNUNET_MQ_Error error)
382 struct GNUNET_DATASTORE_Handle *h = cls;
384 LOG (GNUNET_ERROR_TYPE_ERROR,
385 "Failed to ask datastore to drop tables\n");
386 GNUNET_DATASTORE_disconnect (h,
392 * Disconnect from the datastore service (and free
393 * associated resources).
395 * @param h handle to the datastore
396 * @param drop set to #GNUNET_YES to delete all data in datastore (!)
399 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
402 struct GNUNET_DATASTORE_QueueEntry *qe;
404 LOG (GNUNET_ERROR_TYPE_DEBUG,
405 "Datastore disconnect\n");
408 GNUNET_MQ_destroy (h->mq);
411 if (NULL != h->reconnect_task)
413 GNUNET_SCHEDULER_cancel (h->reconnect_task);
414 h->reconnect_task = NULL;
416 while (NULL != (qe = h->queue_head))
418 switch (qe->response_type)
420 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
421 if (NULL != qe->qc.sc.cont)
422 qe->qc.sc.cont (qe->qc.sc.cont_cls,
424 GNUNET_TIME_UNIT_ZERO_ABS,
425 _("Disconnected from DATASTORE"));
427 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
428 if (NULL != qe->qc.rc.proc)
429 qe->qc.rc.proc (qe->qc.rc.proc_cls,
433 GNUNET_TIME_UNIT_ZERO_ABS,
439 free_queue_entry (qe);
441 if (GNUNET_YES == drop)
443 LOG (GNUNET_ERROR_TYPE_DEBUG,
444 "Re-connecting to issue DROP!\n");
445 GNUNET_assert (NULL == h->mq);
446 h->mq = GNUNET_CLIENT_connect (h->cfg,
449 &disconnect_on_mq_error,
453 struct GNUNET_MessageHeader *hdr;
454 struct GNUNET_MQ_Envelope *env;
456 env = GNUNET_MQ_msg (hdr,
457 GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
458 GNUNET_MQ_notify_sent (env,
459 &disconnect_after_drop,
461 GNUNET_MQ_send (h->mq,
467 GNUNET_STATISTICS_destroy (h->stats,
475 * Create a new entry for our priority queue (and possibly discard other entires if
476 * the queue is getting too long).
478 * @param h handle to the datastore
479 * @param env envelope with the message to queue
480 * @param queue_priority priority of the entry
481 * @param max_queue_size at what queue size should this request be dropped
482 * (if other requests of higher priority are in the queue)
483 * @param expected_type which type of response do we expect,
484 * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
485 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
486 * @param qc client context (NOT a closure for @a response_proc)
487 * @return NULL if the queue is full
489 static struct GNUNET_DATASTORE_QueueEntry *
490 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
491 struct GNUNET_MQ_Envelope *env,
492 unsigned int queue_priority,
493 unsigned int max_queue_size,
494 uint16_t expected_type,
495 const union QueueContext *qc)
497 struct GNUNET_DATASTORE_QueueEntry *qe;
498 struct GNUNET_DATASTORE_QueueEntry *pos;
501 if ( (NULL != h->queue_tail) &&
502 (h->queue_tail->priority >= queue_priority) )
512 while ( (NULL != pos) &&
513 (c < max_queue_size) &&
514 (pos->priority >= queue_priority) )
519 if (c >= max_queue_size)
521 GNUNET_STATISTICS_update (h->stats,
522 gettext_noop ("# queue overflows"),
525 GNUNET_MQ_discard (env);
528 qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
531 qe->response_type = expected_type;
533 qe->priority = queue_priority;
534 qe->max_queue = max_queue_size;
537 /* append at the tail */
543 /* do not insert at HEAD if HEAD query was already
544 * transmitted and we are still receiving replies! */
545 if ( (NULL == pos) &&
546 (NULL == h->queue_head->env) )
550 #if INSANE_STATISTICS
551 GNUNET_STATISTICS_update (h->stats,
552 gettext_noop ("# queue entries created"),
556 GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
566 * Process entries in the queue (or do nothing if we are already
569 * @param h handle to the datastore
572 process_queue (struct GNUNET_DATASTORE_Handle *h)
574 struct GNUNET_DATASTORE_QueueEntry *qe;
576 if (NULL == (qe = h->queue_head))
578 /* no entry in queue */
579 LOG (GNUNET_ERROR_TYPE_DEBUG,
585 /* waiting for replies */
586 LOG (GNUNET_ERROR_TYPE_DEBUG,
587 "Head request already transmitted\n");
592 /* waiting for reconnect */
593 LOG (GNUNET_ERROR_TYPE_DEBUG,
597 GNUNET_MQ_send (h->mq,
606 * Function called to check status message from the service.
609 * @param sm status message received
610 * @return #GNUNET_OK if the message is well-formed
613 check_status (void *cls,
614 const struct StatusMessage *sm)
616 uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
617 int32_t status = ntohl (sm->status);
621 const char *emsg = (const char *) &sm[1];
623 if ('\0' != emsg[msize - 1])
626 return GNUNET_SYSERR;
629 else if (GNUNET_SYSERR == status)
632 return GNUNET_SYSERR;
639 * Function called to handle status message from the service.
642 * @param sm status message received
645 handle_status (void *cls,
646 const struct StatusMessage *sm)
648 struct GNUNET_DATASTORE_Handle *h = cls;
649 struct GNUNET_DATASTORE_QueueEntry *qe;
650 struct StatusContext rc;
652 int32_t status = ntohl (sm->status);
654 if (h->skip_next_messages > 0)
656 h->skip_next_messages--;
660 if (NULL == (qe = h->queue_head))
672 if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
679 free_queue_entry (qe);
680 if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
681 emsg = (const char *) &sm[1];
684 LOG (GNUNET_ERROR_TYPE_DEBUG,
685 "Received status %d/%s\n",
688 GNUNET_STATISTICS_update (h->stats,
689 gettext_noop ("# status messages received"),
692 h->retry_time = GNUNET_TIME_UNIT_ZERO;
695 rc.cont (rc.cont_cls,
697 GNUNET_TIME_absolute_ntoh (sm->min_expiration),
703 * Check data message we received from the service.
705 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
706 * @param dm message received
709 check_data (void *cls,
710 const struct DataMessage *dm)
712 uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
714 if (msize != ntohl (dm->size))
717 return GNUNET_SYSERR;
724 * Handle data message we got from the service.
726 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
727 * @param dm message received
730 handle_data (void *cls,
731 const struct DataMessage *dm)
733 struct GNUNET_DATASTORE_Handle *h = cls;
734 struct GNUNET_DATASTORE_QueueEntry *qe;
735 struct ResultContext rc;
737 if (h->skip_next_messages > 0)
755 if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
761 #if INSANE_STATISTICS
762 GNUNET_STATISTICS_update (h->stats,
763 gettext_noop ("# Results received"),
767 LOG (GNUNET_ERROR_TYPE_DEBUG,
768 "Received result %llu with type %u and size %u with key %s\n",
769 (unsigned long long) GNUNET_ntohll (dm->uid),
772 GNUNET_h2s (&dm->key));
774 free_queue_entry (qe);
775 h->retry_time = GNUNET_TIME_UNIT_ZERO;
778 rc.proc (rc.proc_cls,
783 ntohl (dm->priority),
784 ntohl (dm->anonymity),
785 GNUNET_TIME_absolute_ntoh (dm->expiration),
786 GNUNET_ntohll (dm->uid));
791 * Type of a function to call when we receive a
792 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
794 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
795 * @param msg message received
798 handle_data_end (void *cls,
799 const struct GNUNET_MessageHeader *msg)
801 struct GNUNET_DATASTORE_Handle *h = cls;
802 struct GNUNET_DATASTORE_QueueEntry *qe;
803 struct ResultContext rc;
805 if (h->skip_next_messages > 0)
807 h->skip_next_messages--;
824 if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
831 free_queue_entry (qe);
832 LOG (GNUNET_ERROR_TYPE_DEBUG,
833 "Received end of result set, new queue size is %u\n",
835 h->retry_time = GNUNET_TIME_UNIT_ZERO;
838 /* signal end of iteration */
840 rc.proc (rc.proc_cls,
847 GNUNET_TIME_UNIT_ZERO_ABS,
853 * Try reconnecting to the datastore service.
855 * @param cls the `struct GNUNET_DATASTORE_Handle`
858 try_reconnect (void *cls)
860 struct GNUNET_DATASTORE_Handle *h = cls;
861 struct GNUNET_MQ_MessageHandler handlers[] = {
862 GNUNET_MQ_hd_var_size (status,
863 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
864 struct StatusMessage,
866 GNUNET_MQ_hd_var_size (data,
867 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
870 GNUNET_MQ_hd_fixed_size (data_end,
871 GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
872 struct GNUNET_MessageHeader,
874 GNUNET_MQ_handler_end ()
877 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
878 h->reconnect_task = NULL;
879 GNUNET_assert (NULL == h->mq);
880 h->mq = GNUNET_CLIENT_connect (h->cfg,
887 GNUNET_STATISTICS_update (h->stats,
888 gettext_noop ("# datastore connections (re)created"),
891 LOG (GNUNET_ERROR_TYPE_DEBUG,
892 "Reconnected to DATASTORE\n");
898 * Dummy continuation used to do nothing (but be non-zero).
901 * @param result result
902 * @param min_expiration expiration time
903 * @param emsg error message
906 drop_status_cont (void *cls,
908 struct GNUNET_TIME_Absolute min_expiration,
916 * Store an item in the datastore. If the item is already present,
917 * the priorities are summed up and the higher expiration time and
918 * lower anonymity level is used.
920 * @param h handle to the datastore
921 * @param rid reservation ID to use (from "reserve"); use 0 if no
922 * prior reservation was made
923 * @param key key for the value
924 * @param size number of bytes in data
925 * @param data content stored
926 * @param type type of the content
927 * @param priority priority of the content
928 * @param anonymity anonymity-level for the content
929 * @param replication how often should the content be replicated to other peers?
930 * @param expiration expiration time for the content
931 * @param queue_priority ranking of this request in the priority queue
932 * @param max_queue_size at what queue size should this request be dropped
933 * (if other requests of higher priority are in the queue)
934 * @param cont continuation to call when done
935 * @param cont_cls closure for @a cont
936 * @return NULL if the entry was not queued, otherwise a handle that can be used to
937 * cancel; note that even if NULL is returned, the callback will be invoked
938 * (or rather, will already have been invoked)
940 struct GNUNET_DATASTORE_QueueEntry *
941 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
943 const struct GNUNET_HashCode *key,
946 enum GNUNET_BLOCK_Type type,
949 uint32_t replication,
950 struct GNUNET_TIME_Absolute expiration,
951 unsigned int queue_priority,
952 unsigned int max_queue_size,
953 GNUNET_DATASTORE_ContinuationWithStatus cont,
956 struct GNUNET_DATASTORE_QueueEntry *qe;
957 struct GNUNET_MQ_Envelope *env;
958 struct DataMessage *dm;
959 union QueueContext qc;
961 if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
967 LOG (GNUNET_ERROR_TYPE_DEBUG,
968 "Asked to put %u bytes of data under key `%s' for %s\n",
971 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
973 env = GNUNET_MQ_msg_extra (dm,
975 GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
976 dm->rid = htonl (rid);
977 dm->size = htonl ((uint32_t) size);
978 dm->type = htonl (type);
979 dm->priority = htonl (priority);
980 dm->anonymity = htonl (anonymity);
981 dm->replication = htonl (replication);
982 dm->reserved = htonl (0);
983 dm->uid = GNUNET_htonll (0);
984 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
986 GNUNET_memcpy (&dm[1],
990 qc.sc.cont_cls = cont_cls;
991 qe = make_queue_entry (h,
995 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
999 LOG (GNUNET_ERROR_TYPE_DEBUG,
1000 "Could not create queue entry for PUT\n");
1003 GNUNET_STATISTICS_update (h->stats,
1004 gettext_noop ("# PUT requests executed"),
1013 * Reserve space in the datastore. This function should be used
1014 * to avoid "out of space" failures during a longer sequence of "put"
1015 * operations (for example, when a file is being inserted).
1017 * @param h handle to the datastore
1018 * @param amount how much space (in bytes) should be reserved (for content only)
1019 * @param entries how many entries will be created (to calculate per-entry overhead)
1020 * @param cont continuation to call when done; "success" will be set to
1021 * a positive reservation value if space could be reserved.
1022 * @param cont_cls closure for @a cont
1023 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1024 * cancel; note that even if NULL is returned, the callback will be invoked
1025 * (or rather, will already have been invoked)
1027 struct GNUNET_DATASTORE_QueueEntry *
1028 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1031 GNUNET_DATASTORE_ContinuationWithStatus cont,
1034 struct GNUNET_DATASTORE_QueueEntry *qe;
1035 struct GNUNET_MQ_Envelope *env;
1036 struct ReserveMessage *rm;
1037 union QueueContext qc;
1040 cont = &drop_status_cont;
1041 LOG (GNUNET_ERROR_TYPE_DEBUG,
1042 "Asked to reserve %llu bytes of data and %u entries\n",
1043 (unsigned long long) amount,
1044 (unsigned int) entries);
1045 env = GNUNET_MQ_msg (rm,
1046 GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1047 rm->entries = htonl (entries);
1048 rm->amount = GNUNET_htonll (amount);
1051 qc.sc.cont_cls = cont_cls;
1052 qe = make_queue_entry (h,
1056 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1060 LOG (GNUNET_ERROR_TYPE_DEBUG,
1061 "Could not create queue entry to reserve\n");
1064 GNUNET_STATISTICS_update (h->stats,
1065 gettext_noop ("# RESERVE requests executed"),
1074 * Signal that all of the data for which a reservation was made has
1075 * been stored and that whatever excess space might have been reserved
1076 * can now be released.
1078 * @param h handle to the datastore
1079 * @param rid reservation ID (value of "success" in original continuation
1080 * from the "reserve" function).
1081 * @param queue_priority ranking of this request in the priority queue
1082 * @param max_queue_size at what queue size should this request be dropped
1083 * (if other requests of higher priority are in the queue)
1084 * @param queue_priority ranking of this request in the priority queue
1085 * @param max_queue_size at what queue size should this request be dropped
1086 * (if other requests of higher priority are in the queue)
1087 * @param cont continuation to call when done
1088 * @param cont_cls closure for @a cont
1089 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1090 * cancel; note that even if NULL is returned, the callback will be invoked
1091 * (or rather, will already have been invoked)
1093 struct GNUNET_DATASTORE_QueueEntry *
1094 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1096 unsigned int queue_priority,
1097 unsigned int max_queue_size,
1098 GNUNET_DATASTORE_ContinuationWithStatus cont,
1101 struct GNUNET_DATASTORE_QueueEntry *qe;
1102 struct GNUNET_MQ_Envelope *env;
1103 struct ReleaseReserveMessage *rrm;
1104 union QueueContext qc;
1107 cont = &drop_status_cont;
1108 LOG (GNUNET_ERROR_TYPE_DEBUG,
1109 "Asked to release reserve %d\n",
1111 env = GNUNET_MQ_msg (rrm,
1112 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1113 rrm->rid = htonl (rid);
1115 qc.sc.cont_cls = cont_cls;
1116 qe = make_queue_entry (h,
1120 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1124 LOG (GNUNET_ERROR_TYPE_DEBUG,
1125 "Could not create queue entry to release reserve\n");
1128 GNUNET_STATISTICS_update (h->stats,
1130 ("# RELEASE RESERVE requests executed"), 1,
1138 * Explicitly remove some content from the database.
1139 * The @a cont continuation will be called with `status`
1140 * #GNUNET_OK" if content was removed, #GNUNET_NO
1141 * if no matching entry was found and #GNUNET_SYSERR
1142 * on all other types of errors.
1144 * @param h handle to the datastore
1145 * @param key key for the value
1146 * @param size number of bytes in data
1147 * @param data content stored
1148 * @param queue_priority ranking of this request in the priority queue
1149 * @param max_queue_size at what queue size should this request be dropped
1150 * (if other requests of higher priority are in the queue)
1151 * @param cont continuation to call when done
1152 * @param cont_cls closure for @a cont
1153 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1154 * cancel; note that even if NULL is returned, the callback will be invoked
1155 * (or rather, will already have been invoked)
1157 struct GNUNET_DATASTORE_QueueEntry *
1158 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1159 const struct GNUNET_HashCode *key,
1162 unsigned int queue_priority,
1163 unsigned int max_queue_size,
1164 GNUNET_DATASTORE_ContinuationWithStatus cont,
1167 struct GNUNET_DATASTORE_QueueEntry *qe;
1168 struct DataMessage *dm;
1169 struct GNUNET_MQ_Envelope *env;
1170 union QueueContext qc;
1172 if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1178 cont = &drop_status_cont;
1179 LOG (GNUNET_ERROR_TYPE_DEBUG,
1180 "Asked to remove %u bytes under key `%s'\n",
1183 env = GNUNET_MQ_msg_extra (dm,
1185 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1186 dm->rid = htonl (0);
1187 dm->size = htonl (size);
1188 dm->type = htonl (0);
1189 dm->priority = htonl (0);
1190 dm->anonymity = htonl (0);
1191 dm->uid = GNUNET_htonll (0);
1192 dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1194 GNUNET_memcpy (&dm[1],
1199 qc.sc.cont_cls = cont_cls;
1201 qe = make_queue_entry (h,
1205 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1209 LOG (GNUNET_ERROR_TYPE_DEBUG,
1210 "Could not create queue entry for REMOVE\n");
1213 GNUNET_STATISTICS_update (h->stats,
1214 gettext_noop ("# REMOVE requests executed"),
1224 * Get a random value from the datastore for content replication.
1225 * Returns a single, random value among those with the highest
1226 * replication score, lowering positive replication scores by one for
1227 * the chosen value (if only content with a replication score exists,
1228 * a random value is returned and replication scores are not changed).
1230 * @param h handle to the datastore
1231 * @param queue_priority ranking of this request in the priority queue
1232 * @param max_queue_size at what queue size should this request be dropped
1233 * (if other requests of higher priority are in the queue)
1234 * @param proc function to call on a random value; it
1235 * will be called once with a value (if available)
1236 * and always once with a value of NULL.
1237 * @param proc_cls closure for @a proc
1238 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1241 struct GNUNET_DATASTORE_QueueEntry *
1242 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1243 unsigned int queue_priority,
1244 unsigned int max_queue_size,
1245 GNUNET_DATASTORE_DatumProcessor proc,
1248 struct GNUNET_DATASTORE_QueueEntry *qe;
1249 struct GNUNET_MQ_Envelope *env;
1250 struct GNUNET_MessageHeader *m;
1251 union QueueContext qc;
1253 GNUNET_assert (NULL != proc);
1254 LOG (GNUNET_ERROR_TYPE_DEBUG,
1255 "Asked to get replication entry\n");
1256 env = GNUNET_MQ_msg (m,
1257 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1259 qc.rc.proc_cls = proc_cls;
1260 qe = make_queue_entry (h,
1264 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1268 LOG (GNUNET_ERROR_TYPE_DEBUG,
1269 "Could not create queue entry for GET REPLICATION\n");
1272 GNUNET_STATISTICS_update (h->stats,
1274 ("# GET REPLICATION requests executed"), 1,
1282 * Get a single zero-anonymity value from the datastore.
1284 * @param h handle to the datastore
1285 * @param offset offset of the result (modulo num-results); set to
1286 * a random 64-bit value initially; then increment by
1287 * one each time; detect that all results have been found by uid
1288 * being again the first uid ever returned.
1289 * @param queue_priority ranking of this request in the priority queue
1290 * @param max_queue_size at what queue size should this request be dropped
1291 * (if other requests of higher priority are in the queue)
1292 * @param type allowed type for the operation (never zero)
1293 * @param proc function to call on a random value; it
1294 * will be called once with a value (if available)
1295 * or with NULL if none value exists.
1296 * @param proc_cls closure for @a proc
1297 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1300 struct GNUNET_DATASTORE_QueueEntry *
1301 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1303 unsigned int queue_priority,
1304 unsigned int max_queue_size,
1305 enum GNUNET_BLOCK_Type type,
1306 GNUNET_DATASTORE_DatumProcessor proc,
1309 struct GNUNET_DATASTORE_QueueEntry *qe;
1310 struct GNUNET_MQ_Envelope *env;
1311 struct GetZeroAnonymityMessage *m;
1312 union QueueContext qc;
1314 GNUNET_assert (NULL != proc);
1315 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1316 LOG (GNUNET_ERROR_TYPE_DEBUG,
1317 "Asked to get %llu-th zero-anonymity entry of type %d\n",
1318 (unsigned long long) offset,
1320 env = GNUNET_MQ_msg (m,
1321 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1322 m->type = htonl ((uint32_t) type);
1323 m->offset = GNUNET_htonll (offset);
1325 qc.rc.proc_cls = proc_cls;
1326 qe = make_queue_entry (h,
1330 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1334 LOG (GNUNET_ERROR_TYPE_DEBUG,
1335 "Could not create queue entry for zero-anonymity procation\n");
1338 GNUNET_STATISTICS_update (h->stats,
1340 ("# GET ZERO ANONYMITY requests executed"), 1,
1348 * Get a result for a particular key from the datastore. The processor
1349 * will only be called once.
1351 * @param h handle to the datastore
1352 * @param offset offset of the result (modulo num-results); set to
1353 * a random 64-bit value initially; then increment by
1354 * one each time; detect that all results have been found by uid
1355 * being again the first uid ever returned.
1356 * @param key maybe NULL (to match all entries)
1357 * @param type desired type, 0 for any
1358 * @param queue_priority ranking of this request in the priority queue
1359 * @param max_queue_size at what queue size should this request be dropped
1360 * (if other requests of higher priority are in the queue)
1361 * @param proc function to call on each matching value;
1362 * will be called once with a NULL value at the end
1363 * @param proc_cls closure for @a proc
1364 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1367 struct GNUNET_DATASTORE_QueueEntry *
1368 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1370 const struct GNUNET_HashCode *key,
1371 enum GNUNET_BLOCK_Type type,
1372 unsigned int queue_priority,
1373 unsigned int max_queue_size,
1374 GNUNET_DATASTORE_DatumProcessor proc,
1377 struct GNUNET_DATASTORE_QueueEntry *qe;
1378 struct GNUNET_MQ_Envelope *env;
1379 struct GetKeyMessage *gkm;
1380 struct GetMessage *gm;
1381 union QueueContext qc;
1383 GNUNET_assert (NULL != proc);
1384 LOG (GNUNET_ERROR_TYPE_DEBUG,
1385 "Asked to look for data of type %u under key `%s'\n",
1386 (unsigned int) type,
1390 env = GNUNET_MQ_msg (gm,
1391 GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1392 gm->type = htonl (type);
1393 gm->offset = GNUNET_htonll (offset);
1397 env = GNUNET_MQ_msg (gkm,
1398 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1399 gkm->type = htonl (type);
1400 gkm->offset = GNUNET_htonll (offset);
1404 qc.rc.proc_cls = proc_cls;
1405 qe = make_queue_entry (h,
1409 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1413 LOG (GNUNET_ERROR_TYPE_DEBUG,
1414 "Could not queue request for `%s'\n",
1418 #if INSANE_STATISTICS
1419 GNUNET_STATISTICS_update (h->stats,
1420 gettext_noop ("# GET requests executed"),
1430 * Cancel a datastore operation. The final callback from the
1431 * operation must not have been done yet.
1433 * @param qe operation to cancel
1436 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1438 struct GNUNET_DATASTORE_Handle *h = qe->h;
1440 LOG (GNUNET_ERROR_TYPE_DEBUG,
1441 "Pending DATASTORE request %p cancelled (%d, %d)\n",
1444 h->queue_head == qe);
1445 if (NULL == qe->env)
1447 free_queue_entry (qe);
1448 h->skip_next_messages++;
1451 free_queue_entry (qe);
1456 /* end of datastore_api.c */