2 This file is part of GNUnet
3 Copyright (C) 2004-2013, 2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file datastore/datastore_api.c
23 * @brief Management for the datastore for files stored on a GNUnet node. Implements
24 * a priority queue for requests
25 * @author Christian Grothoff
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
37 * Collect an instane number of statistics? May cause excessive IPC.
39 #define INSANE_STATISTICS GNUNET_NO
42 * If a client stopped asking for more results, how many more do
43 * we receive from the DB before killing the connection? Trade-off
44 * between re-doing TCP handshakes and (needlessly) receiving
47 #define MAX_EXCESS_RESULTS 8
50 * Context for processing status messages.
55 * Continuation to call with the status.
57 GNUNET_DATASTORE_ContinuationWithStatus cont;
60 * Closure for @e cont.
68 * Context for processing result messages.
73 * Function to call with the result.
75 GNUNET_DATASTORE_DatumProcessor proc;
78 * Closure for @e proc.
86 * Context for a queue operation.
91 struct StatusContext sc;
93 struct ResultContext rc;
99 * Entry in our priority queue.
101 struct GNUNET_DATASTORE_QueueEntry
105 * This is a linked list.
107 struct GNUNET_DATASTORE_QueueEntry *next;
110 * This is a linked list.
112 struct GNUNET_DATASTORE_QueueEntry *prev;
115 * Handle to the master context.
117 struct GNUNET_DATASTORE_Handle *h;
120 * Function to call after transmission of the request.
122 GNUNET_DATASTORE_ContinuationWithStatus cont;
125 * Closure for @e cont.
130 * Context for the operation.
132 union QueueContext qc;
135 * Envelope of the request to transmit, NULL after
138 struct GNUNET_MQ_Envelope *env;
141 * Priority in the queue.
143 unsigned int priority;
146 * Maximum allowed length of queue (otherwise
147 * this request should be discarded).
149 unsigned int max_queue;
152 * Expected response type.
154 uint16_t response_type;
160 * Handle to the datastore service.
162 struct GNUNET_DATASTORE_Handle
168 const struct GNUNET_CONFIGURATION_Handle *cfg;
171 * Current connection to the datastore service.
173 struct GNUNET_MQ_Handle *mq;
176 * Handle for statistics.
178 struct GNUNET_STATISTICS_Handle *stats;
181 * Current head of priority queue.
183 struct GNUNET_DATASTORE_QueueEntry *queue_head;
186 * Current tail of priority queue.
188 struct GNUNET_DATASTORE_QueueEntry *queue_tail;
191 * Task for trying to reconnect.
193 struct GNUNET_SCHEDULER_Task *reconnect_task;
196 * How quickly should we retry? Used for exponential back-off on
199 struct GNUNET_TIME_Relative retry_time;
202 * Number of entries in the queue.
204 unsigned int queue_size;
207 * Number of results we're receiving for the current query
208 * after application stopped to care. Used to determine when
209 * to reset the connection.
211 unsigned int result_count;
214 * We should ignore the next message(s) from the service.
216 unsigned int skip_next_messages;
222 * Try reconnecting to the datastore service.
224 * @param cls the `struct GNUNET_DATASTORE_Handle`
227 try_reconnect (void *cls);
231 * Disconnect from the service and then try reconnecting to the datastore service
234 * @param h handle to datastore to disconnect and reconnect
237 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
244 GNUNET_MQ_destroy (h->mq);
246 h->skip_next_messages = 0;
248 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
255 * Free a queue entry. Removes the given entry from the
256 * queue and releases associated resources. Does NOT
259 * @param qe entry to free.
262 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
264 struct GNUNET_DATASTORE_Handle *h = qe->h;
266 GNUNET_CONTAINER_DLL_remove (h->queue_head,
271 GNUNET_MQ_discard (qe->env);
277 * Handle error in sending drop request to datastore.
279 * @param cls closure with the datastore handle
280 * @param error error code
283 mq_error_handler (void *cls,
284 enum GNUNET_MQ_Error error)
286 struct GNUNET_DATASTORE_Handle *h = cls;
287 struct GNUNET_DATASTORE_QueueEntry *qe;
289 LOG (GNUNET_ERROR_TYPE_DEBUG,
290 "MQ error, reconnecting to DATASTORE\n");
296 union QueueContext qc = qe->qc;
297 uint16_t rt = qe->response_type;
299 LOG (GNUNET_ERROR_TYPE_DEBUG,
300 "Failed to receive response from database.\n");
301 free_queue_entry (qe);
304 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
305 if (NULL != qc.sc.cont)
306 qc.sc.cont (qc.sc.cont_cls,
308 GNUNET_TIME_UNIT_ZERO_ABS,
309 _("DATASTORE disconnected"));
311 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
312 if (NULL != qc.rc.proc)
313 qc.rc.proc (qc.rc.proc_cls,
317 GNUNET_TIME_UNIT_ZERO_ABS,
328 * Connect to the datastore service.
330 * @param cfg configuration to use
331 * @return handle to use to access the service
333 struct GNUNET_DATASTORE_Handle *
334 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
336 struct GNUNET_DATASTORE_Handle *h;
338 LOG (GNUNET_ERROR_TYPE_DEBUG,
339 "Establishing DATASTORE connection!\n");
340 h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
348 h->stats = GNUNET_STATISTICS_create ("datastore-api",
355 * Task used by to disconnect from the datastore after
356 * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
358 * @param cls the datastore handle
361 disconnect_after_drop (void *cls)
363 struct GNUNET_DATASTORE_Handle *h = cls;
365 LOG (GNUNET_ERROR_TYPE_DEBUG,
366 "Drop sent, disconnecting\n");
367 GNUNET_DATASTORE_disconnect (h,
373 * Handle error in sending drop request to datastore.
375 * @param cls closure with the datastore handle
376 * @param error error code
379 disconnect_on_mq_error (void *cls,
380 enum GNUNET_MQ_Error error)
382 struct GNUNET_DATASTORE_Handle *h = cls;
384 LOG (GNUNET_ERROR_TYPE_ERROR,
385 "Failed to ask datastore to drop tables\n");
386 GNUNET_DATASTORE_disconnect (h,
392 * Disconnect from the datastore service (and free
393 * associated resources).
395 * @param h handle to the datastore
396 * @param drop set to #GNUNET_YES to delete all data in datastore (!)
399 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
402 struct GNUNET_DATASTORE_QueueEntry *qe;
404 LOG (GNUNET_ERROR_TYPE_DEBUG,
405 "Datastore disconnect\n");
408 GNUNET_MQ_destroy (h->mq);
411 if (NULL != h->reconnect_task)
413 GNUNET_SCHEDULER_cancel (h->reconnect_task);
414 h->reconnect_task = NULL;
416 while (NULL != (qe = h->queue_head))
418 switch (qe->response_type)
420 case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
421 if (NULL != qe->qc.sc.cont)
422 qe->qc.sc.cont (qe->qc.sc.cont_cls,
424 GNUNET_TIME_UNIT_ZERO_ABS,
425 _("Disconnected from DATASTORE"));
427 case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
428 if (NULL != qe->qc.rc.proc)
429 qe->qc.rc.proc (qe->qc.rc.proc_cls,
433 GNUNET_TIME_UNIT_ZERO_ABS,
439 free_queue_entry (qe);
441 if (GNUNET_YES == drop)
443 LOG (GNUNET_ERROR_TYPE_DEBUG,
444 "Re-connecting to issue DROP!\n");
445 GNUNET_assert (NULL == h->mq);
446 h->mq = GNUNET_CLIENT_connect (h->cfg,
449 &disconnect_on_mq_error,
453 struct GNUNET_MessageHeader *hdr;
454 struct GNUNET_MQ_Envelope *env;
456 env = GNUNET_MQ_msg (hdr,
457 GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
458 GNUNET_MQ_notify_sent (env,
459 &disconnect_after_drop,
461 GNUNET_MQ_send (h->mq,
467 GNUNET_STATISTICS_destroy (h->stats,
475 * Create a new entry for our priority queue (and possibly discard other entires if
476 * the queue is getting too long).
478 * @param h handle to the datastore
479 * @param env envelope with the message to queue
480 * @param queue_priority priority of the entry
481 * @param max_queue_size at what queue size should this request be dropped
482 * (if other requests of higher priority are in the queue)
483 * @param expected_type which type of response do we expect,
484 * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
485 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
486 * @param qc client context (NOT a closure for @a response_proc)
487 * @return NULL if the queue is full
489 static struct GNUNET_DATASTORE_QueueEntry *
490 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
491 struct GNUNET_MQ_Envelope *env,
492 unsigned int queue_priority,
493 unsigned int max_queue_size,
494 uint16_t expected_type,
495 const union QueueContext *qc)
497 struct GNUNET_DATASTORE_QueueEntry *qe;
498 struct GNUNET_DATASTORE_QueueEntry *pos;
501 if ( (h->queue_size == max_queue_size) &&
502 (h->queue_tail->priority >= queue_priority) )
504 GNUNET_STATISTICS_update (h->stats,
505 gettext_noop ("# queue overflows"),
508 GNUNET_MQ_discard (env);
514 while ( (NULL != pos) &&
515 (c < max_queue_size) &&
516 (pos->priority >= queue_priority) )
521 if (c >= max_queue_size)
523 GNUNET_STATISTICS_update (h->stats,
524 gettext_noop ("# queue overflows"),
527 GNUNET_MQ_discard (env);
530 qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
533 qe->response_type = expected_type;
535 qe->priority = queue_priority;
536 qe->max_queue = max_queue_size;
539 /* append at the tail */
545 /* do not insert at HEAD if HEAD query was already
546 * transmitted and we are still receiving replies! */
547 if ( (NULL == pos) &&
548 (NULL == h->queue_head->env) )
552 #if INSANE_STATISTICS
553 GNUNET_STATISTICS_update (h->stats,
554 gettext_noop ("# queue entries created"),
558 GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
568 * Process entries in the queue (or do nothing if we are already
571 * @param h handle to the datastore
574 process_queue (struct GNUNET_DATASTORE_Handle *h)
576 struct GNUNET_DATASTORE_QueueEntry *qe;
578 if (NULL == (qe = h->queue_head))
580 /* no entry in queue */
581 LOG (GNUNET_ERROR_TYPE_DEBUG,
587 /* waiting for replies */
588 LOG (GNUNET_ERROR_TYPE_DEBUG,
589 "Head request already transmitted\n");
594 /* waiting for reconnect */
595 LOG (GNUNET_ERROR_TYPE_DEBUG,
599 GNUNET_MQ_send (h->mq,
608 * Function called to check status message from the service.
611 * @param sm status message received
612 * @return #GNUNET_OK if the message is well-formed
615 check_status (void *cls,
616 const struct StatusMessage *sm)
618 uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
619 int32_t status = ntohl (sm->status);
623 const char *emsg = (const char *) &sm[1];
625 if ('\0' != emsg[msize - 1])
628 return GNUNET_SYSERR;
631 else if (GNUNET_SYSERR == status)
634 return GNUNET_SYSERR;
641 * Function called to handle status message from the service.
644 * @param sm status message received
647 handle_status (void *cls,
648 const struct StatusMessage *sm)
650 struct GNUNET_DATASTORE_Handle *h = cls;
651 struct GNUNET_DATASTORE_QueueEntry *qe;
652 struct StatusContext rc;
654 int32_t status = ntohl (sm->status);
656 if (h->skip_next_messages > 0)
658 h->skip_next_messages--;
662 if (NULL == (qe = h->queue_head))
674 if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
681 free_queue_entry (qe);
682 if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
683 emsg = (const char *) &sm[1];
686 LOG (GNUNET_ERROR_TYPE_DEBUG,
687 "Received status %d/%s\n",
690 GNUNET_STATISTICS_update (h->stats,
691 gettext_noop ("# status messages received"),
694 h->retry_time = GNUNET_TIME_UNIT_ZERO;
697 rc.cont (rc.cont_cls,
699 GNUNET_TIME_absolute_ntoh (sm->min_expiration),
705 * Check data message we received from the service.
707 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
708 * @param dm message received
711 check_data (void *cls,
712 const struct DataMessage *dm)
714 uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
716 if (msize != ntohl (dm->size))
719 return GNUNET_SYSERR;
726 * Handle data message we got from the service.
728 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
729 * @param dm message received
732 handle_data (void *cls,
733 const struct DataMessage *dm)
735 struct GNUNET_DATASTORE_Handle *h = cls;
736 struct GNUNET_DATASTORE_QueueEntry *qe;
737 struct ResultContext rc;
739 if (h->skip_next_messages > 0)
757 if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
763 #if INSANE_STATISTICS
764 GNUNET_STATISTICS_update (h->stats,
765 gettext_noop ("# Results received"),
769 LOG (GNUNET_ERROR_TYPE_DEBUG,
770 "Received result %llu with type %u and size %u with key %s\n",
771 (unsigned long long) GNUNET_ntohll (dm->uid),
774 GNUNET_h2s (&dm->key));
776 free_queue_entry (qe);
777 h->retry_time = GNUNET_TIME_UNIT_ZERO;
780 rc.proc (rc.proc_cls,
785 ntohl (dm->priority),
786 ntohl (dm->anonymity),
787 GNUNET_TIME_absolute_ntoh (dm->expiration),
788 GNUNET_ntohll (dm->uid));
793 * Type of a function to call when we receive a
794 * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
796 * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
797 * @param msg message received
800 handle_data_end (void *cls,
801 const struct GNUNET_MessageHeader *msg)
803 struct GNUNET_DATASTORE_Handle *h = cls;
804 struct GNUNET_DATASTORE_QueueEntry *qe;
805 struct ResultContext rc;
807 if (h->skip_next_messages > 0)
809 h->skip_next_messages--;
826 if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
833 free_queue_entry (qe);
834 LOG (GNUNET_ERROR_TYPE_DEBUG,
835 "Received end of result set, new queue size is %u\n",
837 h->retry_time = GNUNET_TIME_UNIT_ZERO;
840 /* signal end of iteration */
842 rc.proc (rc.proc_cls,
849 GNUNET_TIME_UNIT_ZERO_ABS,
855 * Try reconnecting to the datastore service.
857 * @param cls the `struct GNUNET_DATASTORE_Handle`
860 try_reconnect (void *cls)
862 struct GNUNET_DATASTORE_Handle *h = cls;
863 struct GNUNET_MQ_MessageHandler handlers[] = {
864 GNUNET_MQ_hd_var_size (status,
865 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
866 struct StatusMessage,
868 GNUNET_MQ_hd_var_size (data,
869 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
872 GNUNET_MQ_hd_fixed_size (data_end,
873 GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
874 struct GNUNET_MessageHeader,
876 GNUNET_MQ_handler_end ()
879 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
880 h->reconnect_task = NULL;
881 GNUNET_assert (NULL == h->mq);
882 h->mq = GNUNET_CLIENT_connect (h->cfg,
889 GNUNET_STATISTICS_update (h->stats,
890 gettext_noop ("# datastore connections (re)created"),
893 LOG (GNUNET_ERROR_TYPE_DEBUG,
894 "Reconnected to DATASTORE\n");
900 * Dummy continuation used to do nothing (but be non-zero).
903 * @param result result
904 * @param min_expiration expiration time
905 * @param emsg error message
908 drop_status_cont (void *cls,
910 struct GNUNET_TIME_Absolute min_expiration,
918 * Store an item in the datastore. If the item is already present,
919 * the priorities are summed up and the higher expiration time and
920 * lower anonymity level is used.
922 * @param h handle to the datastore
923 * @param rid reservation ID to use (from "reserve"); use 0 if no
924 * prior reservation was made
925 * @param key key for the value
926 * @param size number of bytes in data
927 * @param data content stored
928 * @param type type of the content
929 * @param priority priority of the content
930 * @param anonymity anonymity-level for the content
931 * @param replication how often should the content be replicated to other peers?
932 * @param expiration expiration time for the content
933 * @param queue_priority ranking of this request in the priority queue
934 * @param max_queue_size at what queue size should this request be dropped
935 * (if other requests of higher priority are in the queue)
936 * @param cont continuation to call when done
937 * @param cont_cls closure for @a cont
938 * @return NULL if the entry was not queued, otherwise a handle that can be used to
939 * cancel; note that even if NULL is returned, the callback will be invoked
940 * (or rather, will already have been invoked)
942 struct GNUNET_DATASTORE_QueueEntry *
943 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
945 const struct GNUNET_HashCode *key,
948 enum GNUNET_BLOCK_Type type,
951 uint32_t replication,
952 struct GNUNET_TIME_Absolute expiration,
953 unsigned int queue_priority,
954 unsigned int max_queue_size,
955 GNUNET_DATASTORE_ContinuationWithStatus cont,
958 struct GNUNET_DATASTORE_QueueEntry *qe;
959 struct GNUNET_MQ_Envelope *env;
960 struct DataMessage *dm;
961 union QueueContext qc;
963 if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
969 LOG (GNUNET_ERROR_TYPE_DEBUG,
970 "Asked to put %u bytes of data under key `%s' for %s\n",
973 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
975 env = GNUNET_MQ_msg_extra (dm,
977 GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
978 dm->rid = htonl (rid);
979 dm->size = htonl ((uint32_t) size);
980 dm->type = htonl (type);
981 dm->priority = htonl (priority);
982 dm->anonymity = htonl (anonymity);
983 dm->replication = htonl (replication);
984 dm->reserved = htonl (0);
985 dm->uid = GNUNET_htonll (0);
986 dm->expiration = GNUNET_TIME_absolute_hton (expiration);
988 GNUNET_memcpy (&dm[1],
992 qc.sc.cont_cls = cont_cls;
993 qe = make_queue_entry (h,
997 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1001 LOG (GNUNET_ERROR_TYPE_DEBUG,
1002 "Could not create queue entry for PUT\n");
1005 GNUNET_STATISTICS_update (h->stats,
1006 gettext_noop ("# PUT requests executed"),
1015 * Reserve space in the datastore. This function should be used
1016 * to avoid "out of space" failures during a longer sequence of "put"
1017 * operations (for example, when a file is being inserted).
1019 * @param h handle to the datastore
1020 * @param amount how much space (in bytes) should be reserved (for content only)
1021 * @param entries how many entries will be created (to calculate per-entry overhead)
1022 * @param cont continuation to call when done; "success" will be set to
1023 * a positive reservation value if space could be reserved.
1024 * @param cont_cls closure for @a cont
1025 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1026 * cancel; note that even if NULL is returned, the callback will be invoked
1027 * (or rather, will already have been invoked)
1029 struct GNUNET_DATASTORE_QueueEntry *
1030 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1033 GNUNET_DATASTORE_ContinuationWithStatus cont,
1036 struct GNUNET_DATASTORE_QueueEntry *qe;
1037 struct GNUNET_MQ_Envelope *env;
1038 struct ReserveMessage *rm;
1039 union QueueContext qc;
1042 cont = &drop_status_cont;
1043 LOG (GNUNET_ERROR_TYPE_DEBUG,
1044 "Asked to reserve %llu bytes of data and %u entries\n",
1045 (unsigned long long) amount,
1046 (unsigned int) entries);
1047 env = GNUNET_MQ_msg (rm,
1048 GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1049 rm->entries = htonl (entries);
1050 rm->amount = GNUNET_htonll (amount);
1053 qc.sc.cont_cls = cont_cls;
1054 qe = make_queue_entry (h,
1058 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1062 LOG (GNUNET_ERROR_TYPE_DEBUG,
1063 "Could not create queue entry to reserve\n");
1066 GNUNET_STATISTICS_update (h->stats,
1067 gettext_noop ("# RESERVE requests executed"),
1076 * Signal that all of the data for which a reservation was made has
1077 * been stored and that whatever excess space might have been reserved
1078 * can now be released.
1080 * @param h handle to the datastore
1081 * @param rid reservation ID (value of "success" in original continuation
1082 * from the "reserve" function).
1083 * @param queue_priority ranking of this request in the priority queue
1084 * @param max_queue_size at what queue size should this request be dropped
1085 * (if other requests of higher priority are in the queue)
1086 * @param queue_priority ranking of this request in the priority queue
1087 * @param max_queue_size at what queue size should this request be dropped
1088 * (if other requests of higher priority are in the queue)
1089 * @param cont continuation to call when done
1090 * @param cont_cls closure for @a cont
1091 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1092 * cancel; note that even if NULL is returned, the callback will be invoked
1093 * (or rather, will already have been invoked)
1095 struct GNUNET_DATASTORE_QueueEntry *
1096 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1098 unsigned int queue_priority,
1099 unsigned int max_queue_size,
1100 GNUNET_DATASTORE_ContinuationWithStatus cont,
1103 struct GNUNET_DATASTORE_QueueEntry *qe;
1104 struct GNUNET_MQ_Envelope *env;
1105 struct ReleaseReserveMessage *rrm;
1106 union QueueContext qc;
1109 cont = &drop_status_cont;
1110 LOG (GNUNET_ERROR_TYPE_DEBUG,
1111 "Asked to release reserve %d\n",
1113 env = GNUNET_MQ_msg (rrm,
1114 GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1115 rrm->rid = htonl (rid);
1117 qc.sc.cont_cls = cont_cls;
1118 qe = make_queue_entry (h,
1122 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1126 LOG (GNUNET_ERROR_TYPE_DEBUG,
1127 "Could not create queue entry to release reserve\n");
1130 GNUNET_STATISTICS_update (h->stats,
1132 ("# RELEASE RESERVE requests executed"), 1,
1140 * Explicitly remove some content from the database.
1141 * The @a cont continuation will be called with `status`
1142 * #GNUNET_OK" if content was removed, #GNUNET_NO
1143 * if no matching entry was found and #GNUNET_SYSERR
1144 * on all other types of errors.
1146 * @param h handle to the datastore
1147 * @param key key for the value
1148 * @param size number of bytes in data
1149 * @param data content stored
1150 * @param queue_priority ranking of this request in the priority queue
1151 * @param max_queue_size at what queue size should this request be dropped
1152 * (if other requests of higher priority are in the queue)
1153 * @param cont continuation to call when done
1154 * @param cont_cls closure for @a cont
1155 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1156 * cancel; note that even if NULL is returned, the callback will be invoked
1157 * (or rather, will already have been invoked)
1159 struct GNUNET_DATASTORE_QueueEntry *
1160 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1161 const struct GNUNET_HashCode *key,
1164 unsigned int queue_priority,
1165 unsigned int max_queue_size,
1166 GNUNET_DATASTORE_ContinuationWithStatus cont,
1169 struct GNUNET_DATASTORE_QueueEntry *qe;
1170 struct DataMessage *dm;
1171 struct GNUNET_MQ_Envelope *env;
1172 union QueueContext qc;
1174 if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1180 cont = &drop_status_cont;
1181 LOG (GNUNET_ERROR_TYPE_DEBUG,
1182 "Asked to remove %u bytes under key `%s'\n",
1185 env = GNUNET_MQ_msg_extra (dm,
1187 GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1188 dm->rid = htonl (0);
1189 dm->size = htonl (size);
1190 dm->type = htonl (0);
1191 dm->priority = htonl (0);
1192 dm->anonymity = htonl (0);
1193 dm->uid = GNUNET_htonll (0);
1194 dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1196 GNUNET_memcpy (&dm[1],
1201 qc.sc.cont_cls = cont_cls;
1203 qe = make_queue_entry (h,
1207 GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1211 LOG (GNUNET_ERROR_TYPE_DEBUG,
1212 "Could not create queue entry for REMOVE\n");
1215 GNUNET_STATISTICS_update (h->stats,
1216 gettext_noop ("# REMOVE requests executed"),
1226 * Get a random value from the datastore for content replication.
1227 * Returns a single, random value among those with the highest
1228 * replication score, lowering positive replication scores by one for
1229 * the chosen value (if only content with a replication score exists,
1230 * a random value is returned and replication scores are not changed).
1232 * @param h handle to the datastore
1233 * @param queue_priority ranking of this request in the priority queue
1234 * @param max_queue_size at what queue size should this request be dropped
1235 * (if other requests of higher priority are in the queue)
1236 * @param proc function to call on a random value; it
1237 * will be called once with a value (if available)
1238 * and always once with a value of NULL.
1239 * @param proc_cls closure for @a proc
1240 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1243 struct GNUNET_DATASTORE_QueueEntry *
1244 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1245 unsigned int queue_priority,
1246 unsigned int max_queue_size,
1247 GNUNET_DATASTORE_DatumProcessor proc,
1250 struct GNUNET_DATASTORE_QueueEntry *qe;
1251 struct GNUNET_MQ_Envelope *env;
1252 struct GNUNET_MessageHeader *m;
1253 union QueueContext qc;
1255 GNUNET_assert (NULL != proc);
1256 LOG (GNUNET_ERROR_TYPE_DEBUG,
1257 "Asked to get replication entry\n");
1258 env = GNUNET_MQ_msg (m,
1259 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1261 qc.rc.proc_cls = proc_cls;
1262 qe = make_queue_entry (h,
1266 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1270 LOG (GNUNET_ERROR_TYPE_DEBUG,
1271 "Could not create queue entry for GET REPLICATION\n");
1274 GNUNET_STATISTICS_update (h->stats,
1276 ("# GET REPLICATION requests executed"), 1,
1284 * Get a single zero-anonymity value from the datastore.
1286 * @param h handle to the datastore
1287 * @param offset offset of the result (modulo num-results); set to
1288 * a random 64-bit value initially; then increment by
1289 * one each time; detect that all results have been found by uid
1290 * being again the first uid ever returned.
1291 * @param queue_priority ranking of this request in the priority queue
1292 * @param max_queue_size at what queue size should this request be dropped
1293 * (if other requests of higher priority are in the queue)
1294 * @param type allowed type for the operation (never zero)
1295 * @param proc function to call on a random value; it
1296 * will be called once with a value (if available)
1297 * or with NULL if none value exists.
1298 * @param proc_cls closure for @a proc
1299 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1302 struct GNUNET_DATASTORE_QueueEntry *
1303 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1305 unsigned int queue_priority,
1306 unsigned int max_queue_size,
1307 enum GNUNET_BLOCK_Type type,
1308 GNUNET_DATASTORE_DatumProcessor proc,
1311 struct GNUNET_DATASTORE_QueueEntry *qe;
1312 struct GNUNET_MQ_Envelope *env;
1313 struct GetZeroAnonymityMessage *m;
1314 union QueueContext qc;
1316 GNUNET_assert (NULL != proc);
1317 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1318 LOG (GNUNET_ERROR_TYPE_DEBUG,
1319 "Asked to get %llu-th zero-anonymity entry of type %d\n",
1320 (unsigned long long) offset,
1322 env = GNUNET_MQ_msg (m,
1323 GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1324 m->type = htonl ((uint32_t) type);
1325 m->offset = GNUNET_htonll (offset);
1327 qc.rc.proc_cls = proc_cls;
1328 qe = make_queue_entry (h,
1332 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1336 LOG (GNUNET_ERROR_TYPE_DEBUG,
1337 "Could not create queue entry for zero-anonymity procation\n");
1340 GNUNET_STATISTICS_update (h->stats,
1342 ("# GET ZERO ANONYMITY requests executed"), 1,
1350 * Get a result for a particular key from the datastore. The processor
1351 * will only be called once.
1353 * @param h handle to the datastore
1354 * @param offset offset of the result (modulo num-results); set to
1355 * a random 64-bit value initially; then increment by
1356 * one each time; detect that all results have been found by uid
1357 * being again the first uid ever returned.
1358 * @param key maybe NULL (to match all entries)
1359 * @param type desired type, 0 for any
1360 * @param queue_priority ranking of this request in the priority queue
1361 * @param max_queue_size at what queue size should this request be dropped
1362 * (if other requests of higher priority are in the queue)
1363 * @param proc function to call on each matching value;
1364 * will be called once with a NULL value at the end
1365 * @param proc_cls closure for @a proc
1366 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1369 struct GNUNET_DATASTORE_QueueEntry *
1370 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1372 const struct GNUNET_HashCode *key,
1373 enum GNUNET_BLOCK_Type type,
1374 unsigned int queue_priority,
1375 unsigned int max_queue_size,
1376 GNUNET_DATASTORE_DatumProcessor proc,
1379 struct GNUNET_DATASTORE_QueueEntry *qe;
1380 struct GNUNET_MQ_Envelope *env;
1381 struct GetKeyMessage *gkm;
1382 struct GetMessage *gm;
1383 union QueueContext qc;
1385 GNUNET_assert (NULL != proc);
1386 LOG (GNUNET_ERROR_TYPE_DEBUG,
1387 "Asked to look for data of type %u under key `%s'\n",
1388 (unsigned int) type,
1392 env = GNUNET_MQ_msg (gm,
1393 GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1394 gm->type = htonl (type);
1395 gm->offset = GNUNET_htonll (offset);
1399 env = GNUNET_MQ_msg (gkm,
1400 GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1401 gkm->type = htonl (type);
1402 gkm->offset = GNUNET_htonll (offset);
1406 qc.rc.proc_cls = proc_cls;
1407 qe = make_queue_entry (h,
1411 GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1415 LOG (GNUNET_ERROR_TYPE_DEBUG,
1416 "Could not queue request for `%s'\n",
1420 #if INSANE_STATISTICS
1421 GNUNET_STATISTICS_update (h->stats,
1422 gettext_noop ("# GET requests executed"),
1432 * Cancel a datastore operation. The final callback from the
1433 * operation must not have been done yet.
1435 * @param qe operation to cancel
1438 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1440 struct GNUNET_DATASTORE_Handle *h = qe->h;
1442 LOG (GNUNET_ERROR_TYPE_DEBUG,
1443 "Pending DATASTORE request %p cancelled (%d, %d)\n",
1446 h->queue_head == qe);
1447 if (NULL == qe->env)
1449 free_queue_entry (qe);
1450 h->skip_next_messages++;
1453 free_queue_entry (qe);
1458 /* end of datastore_api.c */