2 This file is part of GNUnet
3 (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file datastore/datastore_api.c
23 * @brief Management for the datastore for files stored on a GNUnet node. Implements
24 * a priority queue for requests (with timeouts).
25 * @author Christian Grothoff
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
35 * If a client stopped asking for more results, how many more do
36 * we receive from the DB before killing the connection? Trade-off
37 * between re-doing TCP handshakes and (needlessly) receiving
40 #define MAX_EXCESS_RESULTS 8
43 * Context for processing status messages.
48 * Continuation to call with the status.
50 GNUNET_DATASTORE_ContinuationWithStatus cont;
61 * Context for processing result messages.
66 * Iterator to call with the result.
68 GNUNET_DATASTORE_Iterator iter;
79 * Context for a queue operation.
84 struct StatusContext sc;
86 struct ResultContext rc;
93 * Entry in our priority queue.
95 struct GNUNET_DATASTORE_QueueEntry
99 * This is a linked list.
101 struct GNUNET_DATASTORE_QueueEntry *next;
104 * This is a linked list.
106 struct GNUNET_DATASTORE_QueueEntry *prev;
109 * Handle to the master context.
111 struct GNUNET_DATASTORE_Handle *h;
114 * Response processor (NULL if we are not waiting for a response).
115 * This struct should be used for the closure, function-specific
116 * arguments can be passed via 'qc'.
118 GNUNET_CLIENT_MessageHandler response_proc;
121 * Function to call after transmission of the request.
123 GNUNET_DATASTORE_ContinuationWithStatus cont;
126 * Closure for 'cont'.
131 * Context for the operation.
133 union QueueContext qc;
136 * Task for timeout signalling.
138 GNUNET_SCHEDULER_TaskIdentifier task;
141 * Timeout for the current operation.
143 struct GNUNET_TIME_Absolute timeout;
146 * Priority in the queue.
148 unsigned int priority;
151 * Maximum allowed length of queue (otherwise
152 * this request should be discarded).
154 unsigned int max_queue;
157 * Number of bytes in the request message following
158 * this struct. 32-bit value for nicer memory
159 * access (and overall struct alignment).
161 uint32_t message_size;
164 * Has this message been transmitted to the service?
165 * Only ever GNUNET_YES for the head of the queue.
166 * Note that the overall struct should end at a
167 * multiple of 64 bits.
172 * Are we expecting a single message in response to this
173 * request (and, if it is data, no 'END' message)?
180 * Handle to the datastore service.
182 struct GNUNET_DATASTORE_Handle
188 const struct GNUNET_CONFIGURATION_Handle *cfg;
192 * Current connection to the datastore service.
194 struct GNUNET_CLIENT_Connection *client;
197 * Handle for statistics.
199 struct GNUNET_STATISTICS_Handle *stats;
202 * Current transmit handle.
204 struct GNUNET_CLIENT_TransmitHandle *th;
207 * Current head of priority queue.
209 struct GNUNET_DATASTORE_QueueEntry *queue_head;
212 * Current tail of priority queue.
214 struct GNUNET_DATASTORE_QueueEntry *queue_tail;
217 * Task for trying to reconnect.
219 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
222 * How quickly should we retry? Used for exponential back-off on
225 struct GNUNET_TIME_Relative retry_time;
228 * Number of entries in the queue.
230 unsigned int queue_size;
233 * Number of results we're receiving for the current query
234 * after application stopped to care. Used to determine when
235 * to reset the connection.
237 unsigned int result_count;
240 * Are we currently trying to receive from the service?
249 * Connect to the datastore service.
251 * @param cfg configuration to use
252 * @return handle to use to access the service
254 struct GNUNET_DATASTORE_Handle *
255 GNUNET_DATASTORE_connect (const struct
256 GNUNET_CONFIGURATION_Handle
259 struct GNUNET_CLIENT_Connection *c;
260 struct GNUNET_DATASTORE_Handle *h;
262 c = GNUNET_CLIENT_connect ("datastore", cfg);
264 return NULL; /* oops */
265 h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) +
266 GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
269 h->stats = GNUNET_STATISTICS_create ("datastore-api",
276 * Transmit DROP message to datastore service.
278 * @param cls the 'struct GNUNET_DATASTORE_Handle'
279 * @param size number of bytes that can be copied to buf
280 * @param buf where to copy the drop message
281 * @return number of bytes written to buf
284 transmit_drop (void *cls,
288 struct GNUNET_DATASTORE_Handle *h = cls;
289 struct GNUNET_MessageHeader *hdr;
293 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
294 _("Failed to transmit request to drop database.\n"));
295 GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
298 GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
300 hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
301 hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
302 GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
303 return sizeof(struct GNUNET_MessageHeader);
308 * Disconnect from the datastore service (and free
309 * associated resources).
311 * @param h handle to the datastore
312 * @param drop set to GNUNET_YES to delete all data in datastore (!)
315 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
318 struct GNUNET_DATASTORE_QueueEntry *qe;
320 if (h->client != NULL)
322 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
325 if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
327 GNUNET_SCHEDULER_cancel (h->reconnect_task);
328 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
330 while (NULL != (qe = h->queue_head))
332 GNUNET_assert (NULL != qe->response_proc);
333 qe->response_proc (qe, NULL);
335 if (GNUNET_YES == drop)
337 h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
338 if (h->client != NULL)
341 GNUNET_CLIENT_notify_transmit_ready (h->client,
342 sizeof(struct GNUNET_MessageHeader),
343 GNUNET_TIME_UNIT_MINUTES,
348 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
352 GNUNET_STATISTICS_destroy (h->stats,
359 * A request has timed out (before being transmitted to the service).
361 * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
362 * @param tc scheduler context
365 timeout_queue_entry (void *cls,
366 const struct GNUNET_SCHEDULER_TaskContext *tc)
368 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
370 GNUNET_STATISTICS_update (qe->h->stats,
371 gettext_noop ("# queue entry timeouts"),
374 qe->task = GNUNET_SCHEDULER_NO_TASK;
375 GNUNET_assert (qe->was_transmitted == GNUNET_NO);
376 qe->response_proc (qe, NULL);
381 * Create a new entry for our priority queue (and possibly discard other entires if
382 * the queue is getting too long).
384 * @param h handle to the datastore
385 * @param msize size of the message to queue
386 * @param queue_priority priority of the entry
387 * @param max_queue_size at what queue size should this request be dropped
388 * (if other requests of higher priority are in the queue)
389 * @param timeout timeout for the operation
390 * @param response_proc function to call with replies (can be NULL)
391 * @param qc client context (NOT a closure for response_proc)
392 * @return NULL if the queue is full (and this entry was dropped)
394 static struct GNUNET_DATASTORE_QueueEntry *
395 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
397 unsigned int queue_priority,
398 unsigned int max_queue_size,
399 struct GNUNET_TIME_Relative timeout,
400 GNUNET_CLIENT_MessageHandler response_proc,
401 const union QueueContext *qc)
403 struct GNUNET_DATASTORE_QueueEntry *ret;
404 struct GNUNET_DATASTORE_QueueEntry *pos;
409 while ( (pos != NULL) &&
410 (c < max_queue_size) &&
411 (pos->priority >= queue_priority) )
416 ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
418 ret->response_proc = response_proc;
420 ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
421 ret->priority = queue_priority;
422 ret->max_queue = max_queue_size;
423 ret->message_size = msize;
424 ret->was_transmitted = GNUNET_NO;
427 /* append at the tail */
433 /* do not insert at HEAD if HEAD query was already
434 transmitted and we are still receiving replies! */
435 if ( (pos == NULL) &&
436 (h->queue_head->was_transmitted) )
440 GNUNET_STATISTICS_update (h->stats,
441 gettext_noop ("# queue entries created"),
444 GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
449 if (c > max_queue_size)
451 GNUNET_STATISTICS_update (h->stats,
452 gettext_noop ("# queue overflows"),
455 response_proc (ret, NULL);
458 ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
459 &timeout_queue_entry,
464 if (pos->max_queue < h->queue_size)
466 GNUNET_assert (pos->response_proc != NULL);
467 pos->response_proc (pos, NULL);
477 * Process entries in the queue (or do nothing if we are already
480 * @param h handle to the datastore
483 process_queue (struct GNUNET_DATASTORE_Handle *h);
487 * Try reconnecting to the datastore service.
489 * @param cls the 'struct GNUNET_DATASTORE_Handle'
490 * @param tc scheduler context
493 try_reconnect (void *cls,
494 const struct GNUNET_SCHEDULER_TaskContext *tc)
496 struct GNUNET_DATASTORE_Handle *h = cls;
498 if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
499 h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
501 h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
502 if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
503 h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
504 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
505 h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
506 if (h->client == NULL)
508 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
509 "DATASTORE reconnect failed (fatally)\n");
512 GNUNET_STATISTICS_update (h->stats,
513 gettext_noop ("# datastore connections (re)created"),
517 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518 "Reconnected to DATASTORE\n");
525 * Disconnect from the service and then try reconnecting to the datastore service
528 * @param h handle to datastore to disconnect and reconnect
531 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
533 if (h->client == NULL)
536 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
537 "client NULL in disconnect, will not try to reconnect\n");
542 GNUNET_STATISTICS_update (stats,
543 gettext_noop ("# reconnected to DATASTORE"),
547 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
549 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
556 * Transmit request from queue to datastore service.
558 * @param cls the 'struct GNUNET_DATASTORE_Handle'
559 * @param size number of bytes that can be copied to buf
560 * @param buf where to copy the drop message
561 * @return number of bytes written to buf
564 transmit_request (void *cls,
568 struct GNUNET_DATASTORE_Handle *h = cls;
569 struct GNUNET_DATASTORE_QueueEntry *qe;
573 if (NULL == (qe = h->queue_head))
574 return 0; /* no entry in queue */
577 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
578 _("Failed to transmit request to DATASTORE.\n"));
579 GNUNET_STATISTICS_update (h->stats,
580 gettext_noop ("# transmission request failures"),
586 if (size < (msize = qe->message_size))
592 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
593 "Transmitting %u byte request to DATASTORE\n",
596 memcpy (buf, &qe[1], msize);
597 qe->was_transmitted = GNUNET_YES;
598 GNUNET_SCHEDULER_cancel (qe->task);
599 qe->task = GNUNET_SCHEDULER_NO_TASK;
600 h->in_receive = GNUNET_YES;
601 GNUNET_CLIENT_receive (h->client,
604 GNUNET_TIME_absolute_get_remaining (qe->timeout));
605 GNUNET_STATISTICS_update (h->stats,
606 gettext_noop ("# bytes sent to datastore"),
614 * Process entries in the queue (or do nothing if we are already
617 * @param h handle to the datastore
620 process_queue (struct GNUNET_DATASTORE_Handle *h)
622 struct GNUNET_DATASTORE_QueueEntry *qe;
624 if (NULL == (qe = h->queue_head))
626 #if DEBUG_DATASTORE > 1
627 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
630 return; /* no entry in queue */
632 if (qe->was_transmitted == GNUNET_YES)
634 #if DEBUG_DATASTORE > 1
635 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
636 "Head request already transmitted\n");
638 return; /* waiting for replies */
642 #if DEBUG_DATASTORE > 1
643 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
644 "Pending transmission request\n");
646 return; /* request pending */
648 if (h->client == NULL)
650 #if DEBUG_DATASTORE > 1
651 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
654 return; /* waiting for reconnect */
657 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
658 "Queueing %u byte request to DATASTORE\n",
661 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
663 GNUNET_TIME_absolute_get_remaining (qe->timeout),
671 * Dummy continuation used to do nothing (but be non-zero).
674 * @param result result
675 * @param emsg error message
678 drop_status_cont (void *cls, int32_t result, const char *emsg)
685 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
687 struct GNUNET_DATASTORE_Handle *h = qe->h;
689 GNUNET_CONTAINER_DLL_remove (h->queue_head,
692 if (qe->task != GNUNET_SCHEDULER_NO_TASK)
694 GNUNET_SCHEDULER_cancel (qe->task);
695 qe->task = GNUNET_SCHEDULER_NO_TASK;
702 * Type of a function to call when we receive a message
706 * @param msg message received, NULL on timeout or fatal error
709 process_status_message (void *cls,
711 GNUNET_MessageHeader * msg)
713 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
714 struct GNUNET_DATASTORE_Handle *h = qe->h;
715 struct StatusContext rc = qe->qc.sc;
716 const struct StatusMessage *sm;
721 h->in_receive = GNUNET_NO;
722 was_transmitted = qe->was_transmitted;
725 free_queue_entry (qe);
726 if (NULL == h->client)
727 return; /* forced disconnect */
729 rc.cont (rc.cont_cls,
731 _("Failed to receive status response from database."));
732 if (was_transmitted == GNUNET_YES)
736 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
737 GNUNET_assert (h->queue_head == qe);
738 free_queue_entry (qe);
739 if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
740 (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) )
743 h->retry_time = GNUNET_TIME_UNIT_ZERO;
746 rc.cont (rc.cont_cls,
748 _("Error reading response from datastore service"));
751 sm = (const struct StatusMessage*) msg;
752 status = ntohl(sm->status);
754 if (ntohs(msg->size) > sizeof(struct StatusMessage))
756 emsg = (const char*) &sm[1];
757 if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
760 emsg = _("Invalid error message received from datastore service");
763 if ( (status == GNUNET_SYSERR) &&
767 emsg = _("Invalid error message received from datastore service");
770 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
771 "Received status %d/%s\n",
775 GNUNET_STATISTICS_update (h->stats,
776 gettext_noop ("# status messages received"),
779 h->retry_time.rel_value = 0;
782 rc.cont (rc.cont_cls,
789 * Store an item in the datastore. If the item is already present,
790 * the priorities are summed up and the higher expiration time and
791 * lower anonymity level is used.
793 * @param h handle to the datastore
794 * @param rid reservation ID to use (from "reserve"); use 0 if no
795 * prior reservation was made
796 * @param key key for the value
797 * @param size number of bytes in data
798 * @param data content stored
799 * @param type type of the content
800 * @param priority priority of the content
801 * @param anonymity anonymity-level for the content
802 * @param replication how often should the content be replicated to other peers?
803 * @param expiration expiration time for the content
804 * @param queue_priority ranking of this request in the priority queue
805 * @param max_queue_size at what queue size should this request be dropped
806 * (if other requests of higher priority are in the queue)
807 * @param timeout timeout for the operation
808 * @param cont continuation to call when done
809 * @param cont_cls closure for cont
810 * @return NULL if the entry was not queued, otherwise a handle that can be used to
811 * cancel; note that even if NULL is returned, the callback will be invoked
812 * (or rather, will already have been invoked)
814 struct GNUNET_DATASTORE_QueueEntry *
815 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
817 const GNUNET_HashCode * key,
820 enum GNUNET_BLOCK_Type type,
823 uint32_t replication,
824 struct GNUNET_TIME_Absolute expiration,
825 unsigned int queue_priority,
826 unsigned int max_queue_size,
827 struct GNUNET_TIME_Relative timeout,
828 GNUNET_DATASTORE_ContinuationWithStatus cont,
831 struct GNUNET_DATASTORE_QueueEntry *qe;
832 struct DataMessage *dm;
834 union QueueContext qc;
837 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838 "Asked to put %u bytes of data under key `%s'\n",
842 msize = sizeof(struct DataMessage) + size;
843 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
845 qc.sc.cont_cls = cont_cls;
846 qe = make_queue_entry (h, msize,
847 queue_priority, max_queue_size, timeout,
848 &process_status_message, &qc);
852 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
853 "Could not create queue entry for PUT\n");
857 GNUNET_STATISTICS_update (h->stats,
858 gettext_noop ("# PUT requests executed"),
861 dm = (struct DataMessage* ) &qe[1];
862 dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
863 dm->header.size = htons(msize);
864 dm->rid = htonl(rid);
865 dm->size = htonl( (uint32_t) size);
866 dm->type = htonl(type);
867 dm->priority = htonl(priority);
868 dm->anonymity = htonl(anonymity);
869 dm->uid = GNUNET_htonll(0);
870 dm->expiration = GNUNET_TIME_absolute_hton(expiration);
872 memcpy (&dm[1], data, size);
879 * Reserve space in the datastore. This function should be used
880 * to avoid "out of space" failures during a longer sequence of "put"
881 * operations (for example, when a file is being inserted).
883 * @param h handle to the datastore
884 * @param amount how much space (in bytes) should be reserved (for content only)
885 * @param entries how many entries will be created (to calculate per-entry overhead)
886 * @param queue_priority ranking of this request in the priority queue
887 * @param max_queue_size at what queue size should this request be dropped
888 * (if other requests of higher priority are in the queue)
889 * @param timeout how long to wait at most for a response (or before dying in queue)
890 * @param cont continuation to call when done; "success" will be set to
891 * a positive reservation value if space could be reserved.
892 * @param cont_cls closure for cont
893 * @return NULL if the entry was not queued, otherwise a handle that can be used to
894 * cancel; note that even if NULL is returned, the callback will be invoked
895 * (or rather, will already have been invoked)
897 struct GNUNET_DATASTORE_QueueEntry *
898 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
901 unsigned int queue_priority,
902 unsigned int max_queue_size,
903 struct GNUNET_TIME_Relative timeout,
904 GNUNET_DATASTORE_ContinuationWithStatus cont,
907 struct GNUNET_DATASTORE_QueueEntry *qe;
908 struct ReserveMessage *rm;
909 union QueueContext qc;
912 cont = &drop_status_cont;
914 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
915 "Asked to reserve %llu bytes of data and %u entries'\n",
916 (unsigned long long) amount,
917 (unsigned int) entries);
920 qc.sc.cont_cls = cont_cls;
921 qe = make_queue_entry (h, sizeof(struct ReserveMessage),
922 queue_priority, max_queue_size, timeout,
923 &process_status_message, &qc);
927 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
928 "Could not create queue entry to reserve\n");
932 GNUNET_STATISTICS_update (h->stats,
933 gettext_noop ("# RESERVE requests executed"),
936 rm = (struct ReserveMessage*) &qe[1];
937 rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
938 rm->header.size = htons(sizeof (struct ReserveMessage));
939 rm->entries = htonl(entries);
940 rm->amount = GNUNET_htonll(amount);
947 * Signal that all of the data for which a reservation was made has
948 * been stored and that whatever excess space might have been reserved
949 * can now be released.
951 * @param h handle to the datastore
952 * @param rid reservation ID (value of "success" in original continuation
953 * from the "reserve" function).
954 * @param queue_priority ranking of this request in the priority queue
955 * @param max_queue_size at what queue size should this request be dropped
956 * (if other requests of higher priority are in the queue)
957 * @param queue_priority ranking of this request in the priority queue
958 * @param max_queue_size at what queue size should this request be dropped
959 * (if other requests of higher priority are in the queue)
960 * @param timeout how long to wait at most for a response
961 * @param cont continuation to call when done
962 * @param cont_cls closure for cont
963 * @return NULL if the entry was not queued, otherwise a handle that can be used to
964 * cancel; note that even if NULL is returned, the callback will be invoked
965 * (or rather, will already have been invoked)
967 struct GNUNET_DATASTORE_QueueEntry *
968 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
970 unsigned int queue_priority,
971 unsigned int max_queue_size,
972 struct GNUNET_TIME_Relative timeout,
973 GNUNET_DATASTORE_ContinuationWithStatus cont,
976 struct GNUNET_DATASTORE_QueueEntry *qe;
977 struct ReleaseReserveMessage *rrm;
978 union QueueContext qc;
981 cont = &drop_status_cont;
983 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
984 "Asked to release reserve %d\n",
988 qc.sc.cont_cls = cont_cls;
989 qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
990 queue_priority, max_queue_size, timeout,
991 &process_status_message, &qc);
995 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
996 "Could not create queue entry to release reserve\n");
1000 GNUNET_STATISTICS_update (h->stats,
1001 gettext_noop ("# RELEASE RESERVE requests executed"),
1004 rrm = (struct ReleaseReserveMessage*) &qe[1];
1005 rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1006 rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1007 rrm->rid = htonl(rid);
1014 * Update a value in the datastore.
1016 * @param h handle to the datastore
1017 * @param uid identifier for the value
1018 * @param priority how much to increase the priority of the value
1019 * @param expiration new expiration value should be MAX of existing and this argument
1020 * @param queue_priority ranking of this request in the priority queue
1021 * @param max_queue_size at what queue size should this request be dropped
1022 * (if other requests of higher priority are in the queue)
1023 * @param timeout how long to wait at most for a response
1024 * @param cont continuation to call when done
1025 * @param cont_cls closure for cont
1026 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1027 * cancel; note that even if NULL is returned, the callback will be invoked
1028 * (or rather, will already have been invoked)
1030 struct GNUNET_DATASTORE_QueueEntry *
1031 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1034 struct GNUNET_TIME_Absolute expiration,
1035 unsigned int queue_priority,
1036 unsigned int max_queue_size,
1037 struct GNUNET_TIME_Relative timeout,
1038 GNUNET_DATASTORE_ContinuationWithStatus cont,
1041 struct GNUNET_DATASTORE_QueueEntry *qe;
1042 struct UpdateMessage *um;
1043 union QueueContext qc;
1046 cont = &drop_status_cont;
1048 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049 "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1051 (unsigned int) priority,
1052 (unsigned long long) expiration.abs_value);
1055 qc.sc.cont_cls = cont_cls;
1056 qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1057 queue_priority, max_queue_size, timeout,
1058 &process_status_message, &qc);
1062 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1063 "Could not create queue entry for UPDATE\n");
1067 GNUNET_STATISTICS_update (h->stats,
1068 gettext_noop ("# UPDATE requests executed"),
1071 um = (struct UpdateMessage*) &qe[1];
1072 um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1073 um->header.size = htons(sizeof (struct UpdateMessage));
1074 um->priority = htonl(priority);
1075 um->expiration = GNUNET_TIME_absolute_hton(expiration);
1076 um->uid = GNUNET_htonll(uid);
1083 * Explicitly remove some content from the database.
1084 * The "cont"inuation will be called with status
1085 * "GNUNET_OK" if content was removed, "GNUNET_NO"
1086 * if no matching entry was found and "GNUNET_SYSERR"
1087 * on all other types of errors.
1089 * @param h handle to the datastore
1090 * @param key key for the value
1091 * @param size number of bytes in data
1092 * @param data content stored
1093 * @param queue_priority ranking of this request in the priority queue
1094 * @param max_queue_size at what queue size should this request be dropped
1095 * (if other requests of higher priority are in the queue)
1096 * @param timeout how long to wait at most for a response
1097 * @param cont continuation to call when done
1098 * @param cont_cls closure for cont
1099 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1100 * cancel; note that even if NULL is returned, the callback will be invoked
1101 * (or rather, will already have been invoked)
1103 struct GNUNET_DATASTORE_QueueEntry *
1104 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1105 const GNUNET_HashCode *key,
1108 unsigned int queue_priority,
1109 unsigned int max_queue_size,
1110 struct GNUNET_TIME_Relative timeout,
1111 GNUNET_DATASTORE_ContinuationWithStatus cont,
1114 struct GNUNET_DATASTORE_QueueEntry *qe;
1115 struct DataMessage *dm;
1117 union QueueContext qc;
1120 cont = &drop_status_cont;
1122 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1123 "Asked to remove %u bytes under key `%s'\n",
1128 qc.sc.cont_cls = cont_cls;
1129 msize = sizeof(struct DataMessage) + size;
1130 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1131 qe = make_queue_entry (h, msize,
1132 queue_priority, max_queue_size, timeout,
1133 &process_status_message, &qc);
1137 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1138 "Could not create queue entry for REMOVE\n");
1142 GNUNET_STATISTICS_update (h->stats,
1143 gettext_noop ("# REMOVE requests executed"),
1146 dm = (struct DataMessage*) &qe[1];
1147 dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1148 dm->header.size = htons(msize);
1150 dm->size = htonl(size);
1151 dm->type = htonl(0);
1152 dm->priority = htonl(0);
1153 dm->anonymity = htonl(0);
1154 dm->uid = GNUNET_htonll(0);
1155 dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1157 memcpy (&dm[1], data, size);
1164 * Type of a function to call when we receive a message
1167 * @param cls closure
1168 * @param msg message received, NULL on timeout or fatal error
1171 process_result_message (void *cls,
1172 const struct GNUNET_MessageHeader * msg)
1174 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1175 struct GNUNET_DATASTORE_Handle *h = qe->h;
1176 struct ResultContext rc = qe->qc.rc;
1177 const struct DataMessage *dm;
1178 int was_transmitted;
1180 h->in_receive = GNUNET_NO;
1183 was_transmitted = qe->was_transmitted;
1184 free_queue_entry (qe);
1185 if (was_transmitted == GNUNET_YES)
1187 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1188 _("Failed to receive response from database.\n"));
1194 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1195 "Request dropped due to finite datastore queue length.\n");
1198 if (rc.iter != NULL)
1199 rc.iter (rc.iter_cls,
1200 NULL, 0, NULL, 0, 0, 0,
1201 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1204 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1205 GNUNET_assert (h->queue_head == qe);
1206 if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1208 GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1209 free_queue_entry (qe);
1211 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212 "Received end of result set, new queue size is %u\n",
1215 if (rc.iter != NULL)
1216 rc.iter (rc.iter_cls,
1217 NULL, 0, NULL, 0, 0, 0,
1218 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1219 h->retry_time.rel_value = 0;
1220 h->result_count = 0;
1224 if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1225 (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1226 (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1229 free_queue_entry (qe);
1230 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1232 if (rc.iter != NULL)
1233 rc.iter (rc.iter_cls,
1234 NULL, 0, NULL, 0, 0, 0,
1235 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1238 GNUNET_STATISTICS_update (h->stats,
1239 gettext_noop ("# Results received"),
1242 if (rc.iter == NULL)
1245 GNUNET_STATISTICS_update (h->stats,
1246 gettext_noop ("# Excess results received"),
1249 if (h->result_count > MAX_EXCESS_RESULTS)
1251 free_queue_entry (qe);
1252 GNUNET_STATISTICS_update (h->stats,
1253 gettext_noop ("# Forced database connection resets"),
1256 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1260 if (GNUNET_YES == qe->one_shot)
1261 free_queue_entry (qe);
1263 GNUNET_DATASTORE_iterate_get_next (h);
1266 dm = (const struct DataMessage*) msg;
1268 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1269 "Received result %llu with type %u and size %u with key %s\n",
1270 (unsigned long long) GNUNET_ntohll(dm->uid),
1273 GNUNET_h2s(&dm->key));
1275 if (GNUNET_YES == qe->one_shot)
1276 free_queue_entry (qe);
1277 h->retry_time.rel_value = 0;
1278 rc.iter (rc.iter_cls,
1283 ntohl(dm->priority),
1284 ntohl(dm->anonymity),
1285 GNUNET_TIME_absolute_ntoh(dm->expiration),
1286 GNUNET_ntohll(dm->uid));
1291 * Get a random value from the datastore for content replication.
1292 * Returns a single, random value among those with the highest
1293 * replication score, lowering positive replication scores by one for
1294 * the chosen value (if only content with a replication score exists,
1295 * a random value is returned and replication scores are not changed).
1297 * @param h handle to the datastore
1298 * @param queue_priority ranking of this request in the priority queue
1299 * @param max_queue_size at what queue size should this request be dropped
1300 * (if other requests of higher priority are in the queue)
1301 * @param timeout how long to wait at most for a response
1302 * @param iter function to call on a random value; it
1303 * will be called once with a value (if available)
1304 * and always once with a value of NULL.
1305 * @param iter_cls closure for iter
1306 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1307 * cancel; note that even if NULL is returned, the callback will be invoked
1308 * (or rather, will already have been invoked)
1310 struct GNUNET_DATASTORE_QueueEntry *
1311 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1312 unsigned int queue_priority,
1313 unsigned int max_queue_size,
1314 struct GNUNET_TIME_Relative timeout,
1315 GNUNET_DATASTORE_Iterator iter,
1318 struct GNUNET_DATASTORE_QueueEntry *qe;
1319 struct GNUNET_MessageHeader *m;
1320 union QueueContext qc;
1323 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1324 "Asked to get replication entry in %llu ms\n",
1325 (unsigned long long) timeout.rel_value);
1328 qc.rc.iter_cls = iter_cls;
1329 qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1330 queue_priority, max_queue_size, timeout,
1331 &process_result_message, &qc);
1335 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1336 "Could not create queue entry for GET REPLICATION\n");
1340 qe->one_shot = GNUNET_YES;
1341 GNUNET_STATISTICS_update (h->stats,
1342 gettext_noop ("# GET REPLICATION requests executed"),
1345 m = (struct GNUNET_MessageHeader*) &qe[1];
1346 m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1347 m->size = htons(sizeof (struct GNUNET_MessageHeader));
1354 * Get a zero-anonymity value from the datastore.
1356 * @param h handle to the datastore
1357 * @param queue_priority ranking of this request in the priority queue
1358 * @param max_queue_size at what queue size should this request be dropped
1359 * (if other requests of higher priority are in the queue)
1360 * @param timeout how long to wait at most for a response
1361 * @param type allowed type for the operation
1362 * @param iter function to call on a random value; it
1363 * will be called once with a value (if available)
1364 * and always once with a value of NULL.
1365 * @param iter_cls closure for iter
1366 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1367 * cancel; note that even if NULL is returned, the callback will be invoked
1368 * (or rather, will already have been invoked)
1370 struct GNUNET_DATASTORE_QueueEntry *
1371 GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1372 unsigned int queue_priority,
1373 unsigned int max_queue_size,
1374 struct GNUNET_TIME_Relative timeout,
1375 enum GNUNET_BLOCK_Type type,
1376 GNUNET_DATASTORE_Iterator iter,
1379 struct GNUNET_DATASTORE_QueueEntry *qe;
1380 struct GetZeroAnonymityMessage *m;
1381 union QueueContext qc;
1383 GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386 "Asked to get zero-anonymity entry in %llu ms\n",
1387 (unsigned long long) timeout.rel_value);
1390 qc.rc.iter_cls = iter_cls;
1391 qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1392 queue_priority, max_queue_size, timeout,
1393 &process_result_message, &qc);
1397 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1398 "Could not create queue entry for zero-anonymity iteration\n");
1402 GNUNET_STATISTICS_update (h->stats,
1403 gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1406 m = (struct GetZeroAnonymityMessage*) &qe[1];
1407 m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1408 m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1409 m->type = htonl ((uint32_t) type);
1417 * Iterate over the results for a particular key
1418 * in the datastore. The iterator will only be called
1419 * once initially; if the first call did contain a
1420 * result, further results can be obtained by calling
1421 * "GNUNET_DATASTORE_iterate_get_next" with the given argument.
1423 * @param h handle to the datastore
1424 * @param key maybe NULL (to match all entries)
1425 * @param type desired type, 0 for any
1426 * @param queue_priority ranking of this request in the priority queue
1427 * @param max_queue_size at what queue size should this request be dropped
1428 * (if other requests of higher priority are in the queue)
1429 * @param timeout how long to wait at most for a response
1430 * @param iter function to call on each matching value;
1431 * will be called once with a NULL value at the end
1432 * @param iter_cls closure for iter
1433 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1434 * cancel; note that even if NULL is returned, the callback will be invoked
1435 * (or rather, will already have been invoked)
1437 struct GNUNET_DATASTORE_QueueEntry *
1438 GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
1439 const GNUNET_HashCode * key,
1440 enum GNUNET_BLOCK_Type type,
1441 unsigned int queue_priority,
1442 unsigned int max_queue_size,
1443 struct GNUNET_TIME_Relative timeout,
1444 GNUNET_DATASTORE_Iterator iter,
1447 struct GNUNET_DATASTORE_QueueEntry *qe;
1448 struct GetMessage *gm;
1449 union QueueContext qc;
1452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1453 "Asked to look for data of type %u under key `%s'\n",
1454 (unsigned int) type,
1458 qc.rc.iter_cls = iter_cls;
1459 qe = make_queue_entry (h, sizeof(struct GetMessage),
1460 queue_priority, max_queue_size, timeout,
1461 &process_result_message, &qc);
1465 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1466 "Could not queue request for `%s'\n",
1471 GNUNET_STATISTICS_update (h->stats,
1472 gettext_noop ("# GET requests executed"),
1475 gm = (struct GetMessage*) &qe[1];
1476 gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1477 gm->type = htonl(type);
1480 gm->header.size = htons(sizeof (struct GetMessage));
1485 gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1493 * Function called to trigger obtaining the next result
1494 * from the datastore.
1496 * @param h handle to the datastore
1499 GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h)
1501 struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1503 GNUNET_assert (&process_result_message == qe->response_proc);
1504 h->in_receive = GNUNET_YES;
1505 GNUNET_CLIENT_receive (h->client,
1508 GNUNET_TIME_absolute_get_remaining (qe->timeout));
1513 * Cancel a datastore operation. The final callback from the
1514 * operation must not have been done yet.
1516 * @param qe operation to cancel
1519 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1521 struct GNUNET_DATASTORE_Handle *h;
1525 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1526 "Pending DATASTORE request %p cancelled (%d, %d)\n",
1528 qe->was_transmitted,
1529 h->queue_head == qe);
1531 if (GNUNET_YES == qe->was_transmitted)
1533 free_queue_entry (qe);
1534 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1538 free_queue_entry (qe);
1543 /* end of datastore_api.c */