2 This file is part of GNUnet
3 (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file datastore/datastore_api.c
23 * @brief Management for the datastore for files stored on a GNUnet node. Implements
24 * a priority queue for requests (with timeouts).
25 * @author Christian Grothoff
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
35 * Context for processing status messages.
40 * Continuation to call with the status.
42 GNUNET_DATASTORE_ContinuationWithStatus cont;
53 * Context for processing result messages.
58 * Iterator to call with the result.
60 GNUNET_DATASTORE_Iterator iter;
71 * Context for a queue operation.
76 struct StatusContext sc;
78 struct ResultContext rc;
85 * Entry in our priority queue.
87 struct GNUNET_DATASTORE_QueueEntry
91 * This is a linked list.
93 struct GNUNET_DATASTORE_QueueEntry *next;
96 * This is a linked list.
98 struct GNUNET_DATASTORE_QueueEntry *prev;
101 * Handle to the master context.
103 struct GNUNET_DATASTORE_Handle *h;
106 * Response processor (NULL if we are not waiting for a response).
107 * This struct should be used for the closure, function-specific
108 * arguments can be passed via 'qc'.
110 GNUNET_CLIENT_MessageHandler response_proc;
113 * Function to call after transmission of the request.
115 GNUNET_DATASTORE_ContinuationWithStatus cont;
118 * Closure for 'cont'.
123 * Context for the operation.
125 union QueueContext qc;
128 * Task for timeout signalling.
130 GNUNET_SCHEDULER_TaskIdentifier task;
133 * Timeout for the current operation.
135 struct GNUNET_TIME_Absolute timeout;
138 * Priority in the queue.
140 unsigned int priority;
143 * Maximum allowed length of queue (otherwise
144 * this request should be discarded).
146 unsigned int max_queue;
149 * Number of bytes in the request message following
150 * this struct. 32-bit value for nicer memory
151 * access (and overall struct alignment).
153 uint32_t message_size;
156 * Has this message been transmitted to the service?
157 * Only ever GNUNET_YES for the head of the queue.
158 * Note that the overall struct should end at a
159 * multiple of 64 bits.
161 int32_t was_transmitted;
166 * Handle to the datastore service.
168 struct GNUNET_DATASTORE_Handle
174 const struct GNUNET_CONFIGURATION_Handle *cfg;
179 struct GNUNET_SCHEDULER_Handle *sched;
182 * Current connection to the datastore service.
184 struct GNUNET_CLIENT_Connection *client;
187 * Current transmit handle.
189 struct GNUNET_CLIENT_TransmitHandle *th;
192 * Current head of priority queue.
194 struct GNUNET_DATASTORE_QueueEntry *queue_head;
197 * Current tail of priority queue.
199 struct GNUNET_DATASTORE_QueueEntry *queue_tail;
202 * Task for trying to reconnect.
204 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
207 * How quickly should we retry? Used for exponential back-off on
210 struct GNUNET_TIME_Relative retry_time;
213 * Number of entries in the queue.
215 unsigned int queue_size;
222 * Connect to the datastore service.
224 * @param cfg configuration to use
225 * @param sched scheduler to use
226 * @return handle to use to access the service
228 struct GNUNET_DATASTORE_Handle *
229 GNUNET_DATASTORE_connect (const struct
230 GNUNET_CONFIGURATION_Handle
233 GNUNET_SCHEDULER_Handle
236 struct GNUNET_CLIENT_Connection *c;
237 struct GNUNET_DATASTORE_Handle *h;
239 c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
241 return NULL; /* oops */
242 h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) +
243 GNUNET_SERVER_MAX_MESSAGE_SIZE);
252 * Transmit DROP message to datastore service.
254 * @param cls the 'struct GNUNET_DATASTORE_Handle'
255 * @param size number of bytes that can be copied to buf
256 * @param buf where to copy the drop message
257 * @return number of bytes written to buf
260 transmit_drop (void *cls,
264 struct GNUNET_DATASTORE_Handle *h = cls;
265 struct GNUNET_MessageHeader *hdr;
269 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
270 _("Failed to transmit request to drop database.\n"));
271 GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
274 GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
276 hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
277 hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
278 GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
279 return sizeof(struct GNUNET_MessageHeader);
284 * Disconnect from the datastore service (and free
285 * associated resources).
287 * @param h handle to the datastore
288 * @param drop set to GNUNET_YES to delete all data in datastore (!)
290 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
293 struct GNUNET_DATASTORE_QueueEntry *qe;
295 if (h->client != NULL)
297 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
300 if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
302 GNUNET_SCHEDULER_cancel (h->sched,
304 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
306 while (NULL != (qe = h->queue_head))
308 GNUNET_assert (NULL != qe->response_proc);
309 qe->response_proc (qe, NULL);
311 if (GNUNET_YES == drop)
313 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
314 if (h->client != NULL)
317 GNUNET_CLIENT_notify_transmit_ready (h->client,
318 sizeof(struct GNUNET_MessageHeader),
319 GNUNET_TIME_UNIT_MINUTES,
324 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
333 * A request has timed out (before being transmitted to the service).
335 * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
336 * @param tc scheduler context
339 timeout_queue_entry (void *cls,
340 const struct GNUNET_SCHEDULER_TaskContext *tc)
342 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
344 qe->task = GNUNET_SCHEDULER_NO_TASK;
345 GNUNET_assert (qe->was_transmitted == GNUNET_NO);
346 qe->response_proc (qe, NULL);
351 * Create a new entry for our priority queue (and possibly discard other entires if
352 * the queue is getting too long).
354 * @param h handle to the datastore
355 * @param msize size of the message to queue
356 * @param queue_priority priority of the entry
357 * @param max_queue_size at what queue size should this request be dropped
358 * (if other requests of higher priority are in the queue)
359 * @param timeout timeout for the operation
360 * @param response_proc function to call with replies (can be NULL)
361 * @param qc client context (NOT a closure for response_proc)
362 * @return NULL if the queue is full (and this entry was dropped)
364 static struct GNUNET_DATASTORE_QueueEntry *
365 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
367 unsigned int queue_priority,
368 unsigned int max_queue_size,
369 struct GNUNET_TIME_Relative timeout,
370 GNUNET_CLIENT_MessageHandler response_proc,
371 const union QueueContext *qc)
373 struct GNUNET_DATASTORE_QueueEntry *ret;
374 struct GNUNET_DATASTORE_QueueEntry *pos;
379 while ( (pos != NULL) &&
380 (c < max_queue_size) &&
381 (pos->priority >= queue_priority) )
386 ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
388 ret->response_proc = response_proc;
390 ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
391 ret->priority = queue_priority;
392 ret->max_queue = max_queue_size;
393 ret->message_size = msize;
394 ret->was_transmitted = GNUNET_NO;
397 /* append at the tail */
403 /* do not insert at HEAD if HEAD query was already
404 transmitted and we are still receiving replies! */
405 if ( (pos == NULL) &&
406 (h->queue_head->was_transmitted) )
410 GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
415 if (c > max_queue_size)
417 response_proc (ret, NULL);
420 ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
422 &timeout_queue_entry,
427 if (pos->max_queue < h->queue_size)
429 GNUNET_assert (pos->response_proc != NULL);
430 pos->response_proc (pos, NULL);
440 * Process entries in the queue (or do nothing if we are already
443 * @param h handle to the datastore
446 process_queue (struct GNUNET_DATASTORE_Handle *h);
450 * Try reconnecting to the datastore service.
452 * @param cls the 'struct GNUNET_DATASTORE_Handle'
453 * @param tc scheduler context
456 try_reconnect (void *cls,
457 const struct GNUNET_SCHEDULER_TaskContext *tc)
459 struct GNUNET_DATASTORE_Handle *h = cls;
461 if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
462 h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
464 h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
465 if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
466 h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
467 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
468 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
469 if (h->client == NULL)
476 * Disconnect from the service and then try reconnecting to the datastore service
479 * @param h handle to datastore to disconnect and reconnect
482 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
484 if (h->client == NULL)
487 GNUNET_STATISTICS_update (stats,
488 gettext_noop ("# reconnected to datastore"),
492 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
494 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
502 * Transmit request from queue to datastore service.
504 * @param cls the 'struct GNUNET_DATASTORE_Handle'
505 * @param size number of bytes that can be copied to buf
506 * @param buf where to copy the drop message
507 * @return number of bytes written to buf
510 transmit_request (void *cls,
514 struct GNUNET_DATASTORE_Handle *h = cls;
515 struct GNUNET_DATASTORE_QueueEntry *qe;
519 if (NULL == (qe = h->queue_head))
520 return 0; /* no entry in queue */
523 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
524 _("Failed to transmit request to database.\n"));
528 if (size < (msize = qe->message_size))
533 memcpy (buf, &qe[1], msize);
534 qe->was_transmitted = GNUNET_YES;
535 GNUNET_SCHEDULER_cancel (h->sched,
537 qe->task = GNUNET_SCHEDULER_NO_TASK;
538 GNUNET_CLIENT_receive (h->client,
541 GNUNET_TIME_absolute_get_remaining (qe->timeout));
547 * Process entries in the queue (or do nothing if we are already
550 * @param h handle to the datastore
553 process_queue (struct GNUNET_DATASTORE_Handle *h)
555 struct GNUNET_DATASTORE_QueueEntry *qe;
557 if (NULL == (qe = h->queue_head))
558 return; /* no entry in queue */
559 if (qe->was_transmitted == GNUNET_YES)
560 return; /* waiting for replies */
562 return; /* request pending */
563 if (h->client == NULL)
564 return; /* waiting for reconnect */
565 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
567 GNUNET_TIME_absolute_get_remaining (qe->timeout),
575 * Dummy continuation used to do nothing (but be non-zero).
578 * @param result result
579 * @param emsg error message
582 drop_status_cont (void *cls, int result, const char *emsg)
589 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
591 struct GNUNET_DATASTORE_Handle *h = qe->h;
593 GNUNET_CONTAINER_DLL_remove (h->queue_head,
596 if (qe->task != GNUNET_SCHEDULER_NO_TASK)
598 GNUNET_SCHEDULER_cancel (h->sched,
600 qe->task = GNUNET_SCHEDULER_NO_TASK;
607 * Type of a function to call when we receive a message
611 * @param msg message received, NULL on timeout or fatal error
614 process_status_message (void *cls,
616 GNUNET_MessageHeader * msg)
618 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
619 struct GNUNET_DATASTORE_Handle *h = qe->h;
620 struct StatusContext rc = qe->qc.sc;
621 const struct StatusMessage *sm;
625 free_queue_entry (qe);
628 if (NULL == h->client)
629 return; /* forced disconnect */
630 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
631 _("Failed to receive response from database.\n"));
636 if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
637 (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) )
640 h->retry_time = GNUNET_TIME_UNIT_ZERO;
642 rc.cont (rc.cont_cls,
644 _("Error reading response from datastore service"));
647 sm = (const struct StatusMessage*) msg;
648 status = ntohl(sm->status);
650 if (ntohs(msg->size) > sizeof(struct StatusMessage))
652 emsg = (const char*) &sm[1];
653 if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
656 emsg = _("Invalid error message received from datastore service");
659 if ( (status == GNUNET_SYSERR) &&
663 emsg = _("Invalid error message received from datastore service");
666 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
667 "Received status %d/%s\n",
671 rc.cont (rc.cont_cls,
679 * Store an item in the datastore. If the item is already present,
680 * the priorities are summed up and the higher expiration time and
681 * lower anonymity level is used.
683 * @param h handle to the datastore
684 * @param rid reservation ID to use (from "reserve"); use 0 if no
685 * prior reservation was made
686 * @param key key for the value
687 * @param size number of bytes in data
688 * @param data content stored
689 * @param type type of the content
690 * @param priority priority of the content
691 * @param anonymity anonymity-level for the content
692 * @param expiration expiration time for the content
693 * @param queue_priority ranking of this request in the priority queue
694 * @param max_queue_size at what queue size should this request be dropped
695 * (if other requests of higher priority are in the queue)
696 * @param timeout timeout for the operation
697 * @param cont continuation to call when done
698 * @param cont_cls closure for cont
699 * @return NULL if the entry was not queued, otherwise a handle that can be used to
700 * cancel; note that even if NULL is returned, the callback will be invoked
701 * (or rather, will already have been invoked)
703 struct GNUNET_DATASTORE_QueueEntry *
704 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
706 const GNUNET_HashCode * key,
709 enum GNUNET_BLOCK_Type type,
712 struct GNUNET_TIME_Absolute expiration,
713 unsigned int queue_priority,
714 unsigned int max_queue_size,
715 struct GNUNET_TIME_Relative timeout,
716 GNUNET_DATASTORE_ContinuationWithStatus cont,
719 struct GNUNET_DATASTORE_QueueEntry *qe;
720 struct DataMessage *dm;
722 union QueueContext qc;
725 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
726 "Asked to put %u bytes of data under key `%s'\n",
730 msize = sizeof(struct DataMessage) + size;
731 GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
733 qc.sc.cont_cls = cont_cls;
734 qe = make_queue_entry (h, msize,
735 queue_priority, max_queue_size, timeout,
736 &process_status_message, &qc);
739 dm = (struct DataMessage* ) &qe[1];
740 dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
741 dm->header.size = htons(msize);
742 dm->rid = htonl(rid);
743 dm->size = htonl(size);
744 dm->type = htonl(type);
745 dm->priority = htonl(priority);
746 dm->anonymity = htonl(anonymity);
747 dm->uid = GNUNET_htonll(0);
748 dm->expiration = GNUNET_TIME_absolute_hton(expiration);
750 memcpy (&dm[1], data, size);
757 * Reserve space in the datastore. This function should be used
758 * to avoid "out of space" failures during a longer sequence of "put"
759 * operations (for example, when a file is being inserted).
761 * @param h handle to the datastore
762 * @param amount how much space (in bytes) should be reserved (for content only)
763 * @param entries how many entries will be created (to calculate per-entry overhead)
764 * @param queue_priority ranking of this request in the priority queue
765 * @param max_queue_size at what queue size should this request be dropped
766 * (if other requests of higher priority are in the queue)
767 * @param timeout how long to wait at most for a response (or before dying in queue)
768 * @param cont continuation to call when done; "success" will be set to
769 * a positive reservation value if space could be reserved.
770 * @param cont_cls closure for cont
771 * @return NULL if the entry was not queued, otherwise a handle that can be used to
772 * cancel; note that even if NULL is returned, the callback will be invoked
773 * (or rather, will already have been invoked)
775 struct GNUNET_DATASTORE_QueueEntry *
776 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
779 unsigned int queue_priority,
780 unsigned int max_queue_size,
781 struct GNUNET_TIME_Relative timeout,
782 GNUNET_DATASTORE_ContinuationWithStatus cont,
785 struct GNUNET_DATASTORE_QueueEntry *qe;
786 struct ReserveMessage *rm;
787 union QueueContext qc;
790 cont = &drop_status_cont;
792 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
793 "Asked to reserve %llu bytes of data and %u entries'\n",
794 (unsigned long long) amount,
795 (unsigned int) entries);
798 qc.sc.cont_cls = cont_cls;
799 qe = make_queue_entry (h, sizeof(struct ReserveMessage),
800 queue_priority, max_queue_size, timeout,
801 &process_status_message, &qc);
804 rm = (struct ReserveMessage*) &qe[1];
805 rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
806 rm->header.size = htons(sizeof (struct ReserveMessage));
807 rm->entries = htonl(entries);
808 rm->amount = GNUNET_htonll(amount);
815 * Signal that all of the data for which a reservation was made has
816 * been stored and that whatever excess space might have been reserved
817 * can now be released.
819 * @param h handle to the datastore
820 * @param rid reservation ID (value of "success" in original continuation
821 * from the "reserve" function).
822 * @param queue_priority ranking of this request in the priority queue
823 * @param max_queue_size at what queue size should this request be dropped
824 * (if other requests of higher priority are in the queue)
825 * @param queue_priority ranking of this request in the priority queue
826 * @param max_queue_size at what queue size should this request be dropped
827 * (if other requests of higher priority are in the queue)
828 * @param timeout how long to wait at most for a response
829 * @param cont continuation to call when done
830 * @param cont_cls closure for cont
831 * @return NULL if the entry was not queued, otherwise a handle that can be used to
832 * cancel; note that even if NULL is returned, the callback will be invoked
833 * (or rather, will already have been invoked)
835 struct GNUNET_DATASTORE_QueueEntry *
836 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
838 unsigned int queue_priority,
839 unsigned int max_queue_size,
840 struct GNUNET_TIME_Relative timeout,
841 GNUNET_DATASTORE_ContinuationWithStatus cont,
844 struct GNUNET_DATASTORE_QueueEntry *qe;
845 struct ReleaseReserveMessage *rrm;
846 union QueueContext qc;
849 cont = &drop_status_cont;
851 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
852 "Asked to release reserve %d\n",
856 qc.sc.cont_cls = cont_cls;
857 qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
858 queue_priority, max_queue_size, timeout,
859 &process_status_message, &qc);
862 rrm = (struct ReleaseReserveMessage*) &qe[1];
863 rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
864 rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
865 rrm->rid = htonl(rid);
872 * Update a value in the datastore.
874 * @param h handle to the datastore
875 * @param uid identifier for the value
876 * @param priority how much to increase the priority of the value
877 * @param expiration new expiration value should be MAX of existing and this argument
878 * @param queue_priority ranking of this request in the priority queue
879 * @param max_queue_size at what queue size should this request be dropped
880 * (if other requests of higher priority are in the queue)
881 * @param timeout how long to wait at most for a response
882 * @param cont continuation to call when done
883 * @param cont_cls closure for cont
884 * @return NULL if the entry was not queued, otherwise a handle that can be used to
885 * cancel; note that even if NULL is returned, the callback will be invoked
886 * (or rather, will already have been invoked)
888 struct GNUNET_DATASTORE_QueueEntry *
889 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
890 unsigned long long uid,
892 struct GNUNET_TIME_Absolute expiration,
893 unsigned int queue_priority,
894 unsigned int max_queue_size,
895 struct GNUNET_TIME_Relative timeout,
896 GNUNET_DATASTORE_ContinuationWithStatus cont,
899 struct GNUNET_DATASTORE_QueueEntry *qe;
900 struct UpdateMessage *um;
901 union QueueContext qc;
904 cont = &drop_status_cont;
906 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
907 "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
909 (unsigned int) priority,
910 (unsigned long long) expiration.value);
913 qc.sc.cont_cls = cont_cls;
914 qe = make_queue_entry (h, sizeof(struct UpdateMessage),
915 queue_priority, max_queue_size, timeout,
916 &process_status_message, &qc);
919 um = (struct UpdateMessage*) &qe[1];
920 um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
921 um->header.size = htons(sizeof (struct UpdateMessage));
922 um->priority = htonl(priority);
923 um->expiration = GNUNET_TIME_absolute_hton(expiration);
924 um->uid = GNUNET_htonll(uid);
931 * Explicitly remove some content from the database.
932 * The "cont"inuation will be called with status
933 * "GNUNET_OK" if content was removed, "GNUNET_NO"
934 * if no matching entry was found and "GNUNET_SYSERR"
935 * on all other types of errors.
937 * @param h handle to the datastore
938 * @param key key for the value
939 * @param size number of bytes in data
940 * @param data content stored
941 * @param queue_priority ranking of this request in the priority queue
942 * @param max_queue_size at what queue size should this request be dropped
943 * (if other requests of higher priority are in the queue)
944 * @param timeout how long to wait at most for a response
945 * @param cont continuation to call when done
946 * @param cont_cls closure for cont
947 * @return NULL if the entry was not queued, otherwise a handle that can be used to
948 * cancel; note that even if NULL is returned, the callback will be invoked
949 * (or rather, will already have been invoked)
951 struct GNUNET_DATASTORE_QueueEntry *
952 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
953 const GNUNET_HashCode *key,
956 unsigned int queue_priority,
957 unsigned int max_queue_size,
958 struct GNUNET_TIME_Relative timeout,
959 GNUNET_DATASTORE_ContinuationWithStatus cont,
962 struct GNUNET_DATASTORE_QueueEntry *qe;
963 struct DataMessage *dm;
965 union QueueContext qc;
968 cont = &drop_status_cont;
970 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
971 "Asked to remove %u bytes under key `%s'\n",
976 qc.sc.cont_cls = cont_cls;
977 msize = sizeof(struct DataMessage) + size;
978 GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
979 qe = make_queue_entry (h, msize,
980 queue_priority, max_queue_size, timeout,
981 &process_status_message, &qc);
984 dm = (struct DataMessage*) &qe[1];
985 dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
986 dm->header.size = htons(msize);
988 dm->size = htonl(size);
990 dm->priority = htonl(0);
991 dm->anonymity = htonl(0);
992 dm->uid = GNUNET_htonll(0);
993 dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
995 memcpy (&dm[1], data, size);
1002 * Type of a function to call when we receive a message
1005 * @param cls closure
1006 * @param msg message received, NULL on timeout or fatal error
1009 process_result_message (void *cls,
1010 const struct GNUNET_MessageHeader * msg)
1012 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1013 struct GNUNET_DATASTORE_Handle *h = qe->h;
1014 struct ResultContext rc = qe->qc.rc;
1015 const struct DataMessage *dm;
1016 int was_transmitted;
1021 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022 "Failed to receive response from datastore or queue full\n");
1024 was_transmitted = qe->was_transmitted;
1025 free_queue_entry (qe);
1026 if (GNUNET_YES == was_transmitted)
1028 if (rc.iter != NULL)
1029 rc.iter (rc.iter_cls,
1030 NULL, 0, NULL, 0, 0, 0,
1031 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1034 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1035 GNUNET_assert (h->queue_head == qe);
1036 if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1038 GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1040 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041 "Received end of result set\n");
1043 free_queue_entry (qe);
1044 if (rc.iter != NULL)
1045 rc.iter (rc.iter_cls,
1046 NULL, 0, NULL, 0, 0, 0,
1047 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1051 if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1052 (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1053 (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1056 free_queue_entry (qe);
1057 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1059 if (rc.iter != NULL)
1060 rc.iter (rc.iter_cls,
1061 NULL, 0, NULL, 0, 0, 0,
1062 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1065 if (rc.iter == NULL)
1067 /* abort iteration */
1071 dm = (const struct DataMessage*) msg;
1073 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1074 "Received result %llu with type %u and size %u with key %s\n",
1075 (unsigned long long) GNUNET_ntohll(dm->uid),
1078 GNUNET_h2s(&dm->key));
1080 rc.iter (rc.iter_cls,
1085 ntohl(dm->priority),
1086 ntohl(dm->anonymity),
1087 GNUNET_TIME_absolute_ntoh(dm->expiration),
1088 GNUNET_ntohll(dm->uid));
1093 * Get a random value from the datastore.
1095 * @param h handle to the datastore
1096 * @param queue_priority ranking of this request in the priority queue
1097 * @param max_queue_size at what queue size should this request be dropped
1098 * (if other requests of higher priority are in the queue)
1099 * @param timeout how long to wait at most for a response
1100 * @param iter function to call on a random value; it
1101 * will be called once with a value (if available)
1102 * and always once with a value of NULL.
1103 * @param iter_cls closure for iter
1104 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1105 * cancel; note that even if NULL is returned, the callback will be invoked
1106 * (or rather, will already have been invoked)
1108 struct GNUNET_DATASTORE_QueueEntry *
1109 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1110 unsigned int queue_priority,
1111 unsigned int max_queue_size,
1112 struct GNUNET_TIME_Relative timeout,
1113 GNUNET_DATASTORE_Iterator iter,
1116 struct GNUNET_DATASTORE_QueueEntry *qe;
1117 struct GNUNET_MessageHeader *m;
1118 union QueueContext qc;
1121 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1122 "Asked to get random entry in %llu ms\n",
1123 (unsigned long long) timeout.value);
1126 qc.rc.iter_cls = iter_cls;
1127 qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1128 queue_priority, max_queue_size, timeout,
1129 &process_result_message, &qc);
1132 m = (struct GNUNET_MessageHeader*) &qe[1];
1133 m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1134 m->size = htons(sizeof (struct GNUNET_MessageHeader));
1142 * Iterate over the results for a particular key
1143 * in the datastore. The iterator will only be called
1144 * once initially; if the first call did contain a
1145 * result, further results can be obtained by calling
1146 * "GNUNET_DATASTORE_get_next" with the given argument.
1148 * @param h handle to the datastore
1149 * @param key maybe NULL (to match all entries)
1150 * @param type desired type, 0 for any
1151 * @param queue_priority ranking of this request in the priority queue
1152 * @param max_queue_size at what queue size should this request be dropped
1153 * (if other requests of higher priority are in the queue)
1154 * @param timeout how long to wait at most for a response
1155 * @param iter function to call on each matching value;
1156 * will be called once with a NULL value at the end
1157 * @param iter_cls closure for iter
1158 * @return NULL if the entry was not queued, otherwise a handle that can be used to
1159 * cancel; note that even if NULL is returned, the callback will be invoked
1160 * (or rather, will already have been invoked)
1162 struct GNUNET_DATASTORE_QueueEntry *
1163 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1164 const GNUNET_HashCode * key,
1165 enum GNUNET_BLOCK_Type type,
1166 unsigned int queue_priority,
1167 unsigned int max_queue_size,
1168 struct GNUNET_TIME_Relative timeout,
1169 GNUNET_DATASTORE_Iterator iter,
1172 struct GNUNET_DATASTORE_QueueEntry *qe;
1173 struct GetMessage *gm;
1174 union QueueContext qc;
1177 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1178 "Asked to look for data of type %u under key `%s'\n",
1179 (unsigned int) type,
1183 qc.rc.iter_cls = iter_cls;
1184 qe = make_queue_entry (h, sizeof(struct GetMessage),
1185 queue_priority, max_queue_size, timeout,
1186 &process_result_message, &qc);
1189 gm = (struct GetMessage*) &qe[1];
1190 gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1191 gm->type = htonl(type);
1194 gm->header.size = htons(sizeof (struct GetMessage));
1199 gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1207 * Function called to trigger obtaining the next result
1208 * from the datastore.
1210 * @param h handle to the datastore
1211 * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1212 * iteration (with a final call to "iter" with key/data == NULL).
1215 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1218 struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1219 struct ResultContext rc = qe->qc.rc;
1221 GNUNET_assert (NULL != qe);
1222 GNUNET_assert (&process_result_message == qe->response_proc);
1223 if (GNUNET_YES == more)
1225 GNUNET_CLIENT_receive (h->client,
1228 GNUNET_TIME_absolute_get_remaining (qe->timeout));
1231 free_queue_entry (qe);
1232 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1234 rc.iter (rc.iter_cls,
1235 NULL, 0, NULL, 0, 0, 0,
1236 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1241 * Cancel a datastore operation. The final callback from the
1242 * operation must not have been done yet.
1244 * @param qe operation to cancel
1247 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1249 struct GNUNET_DATASTORE_Handle *h;
1253 reconnect = GNUNET_NO;
1254 if (GNUNET_YES == qe->was_transmitted)
1256 if (qe->response_proc == &process_result_message)
1257 qe->qc.rc.iter = NULL;
1259 reconnect = GNUNET_YES;
1262 free_queue_entry (qe);
1266 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1272 /* end of datastore_api.c */