2 This file is part of GNUnet
3 (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file datastore/datastore_api.c
23 * @brief Management for the datastore for files stored on a GNUnet node
24 * @author Christian Grothoff
27 #include "gnunet_arm_service.h"
28 #include "gnunet_datastore_service.h"
29 #include "datastore.h"
32 * Entry in our priority queue.
38 * This is a linked list.
40 struct QueueEntry *next;
43 * This is a linked list.
45 struct QueueEntry *prev;
48 * Handle to the master context.
50 struct GNUNET_DATASTORE_Handle *h;
53 * Task for timeout signalling.
55 GNUNET_SCHEDULER_TaskIdentifier task;
58 * Timeout for the current operation.
60 struct GNUNET_TIME_Absolute timeout;
63 * Priority in the queue.
65 unsigned int priority;
68 * Maximum allowed length of queue (otherwise
69 * this request should be discarded).
71 unsigned int max_queue;
74 * Number of bytes in the request message following
77 uint16_t message_size;
80 * Has this message been transmitted to the service?
81 * Only ever GNUNET_YES for the head of the queue.
83 int16_t was_transmitted;
86 * Response processor (NULL if we are not waiting for a response).
87 * This struct should be used for the closure, function-specific
88 * arguments can be passed via 'client_ctx'.
90 GNUNET_CLIENT_MessageHandler response_proc;
93 * Specific context (variable argument that
94 * can be used by the response processor).
101 * Handle to the datastore service. Followed
102 * by 65536 bytes used for storing messages.
104 struct GNUNET_DATASTORE_Handle
110 const struct GNUNET_CONFIGURATION_Handle *cfg;
115 struct GNUNET_SCHEDULER_Handle *sched;
118 * Current connection to the datastore service.
120 struct GNUNET_CLIENT_Connection *client;
123 * Current head of priority queue.
125 struct QueueEntry *queue_head;
128 * Current tail of priority queue.
130 struct QueueEntry *queue_tail;
133 * Number of entries in the queue.
135 unsigned int queue_size;
142 * Connect to the datastore service.
144 * @param cfg configuration to use
145 * @param sched scheduler to use
146 * @return handle to use to access the service
148 struct GNUNET_DATASTORE_Handle *
149 GNUNET_DATASTORE_connect (const struct
150 GNUNET_CONFIGURATION_Handle
153 GNUNET_SCHEDULER_Handle
156 struct GNUNET_CLIENT_Connection *c;
157 struct GNUNET_DATASTORE_Handle *h;
159 c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
161 return NULL; /* oops */
162 h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) +
163 GNUNET_SERVER_MAX_MESSAGE_SIZE);
172 * Transmit DROP message to datastore service.
174 * @param cls the 'struct GNUNET_DATASTORE_Handle'
175 * @param size number of bytes that can be copied to buf
176 * @param buf where to copy the drop message
177 * @return number of bytes written to buf
180 transmit_drop (void *cls,
184 struct GNUNET_DATASTORE_Handle *h = cls;
185 struct GNUNET_MessageHeader *hdr;
189 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
190 _("Failed to transmit request to drop database.\n"));
191 GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
194 GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
196 hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
197 hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
198 GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
199 return sizeof(struct GNUNET_MessageHeader);
204 * Disconnect from the datastore service (and free
205 * associated resources).
207 * @param h handle to the datastore
208 * @param drop set to GNUNET_YES to delete all data in datastore (!)
210 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
213 struct QueueEntry *qe;
215 if (h->client != NULL)
216 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
218 while (NULL != (qe = h->queue_head))
220 GNUNET_CONTAINER_DLL_remove (h->queue_head,
223 if (NULL != qe->response_proc)
224 qe->response_proc (qe, NULL);
227 if (GNUNET_YES == drop)
229 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
230 if (h->client != NULL)
233 GNUNET_CLIENT_notify_transmit_ready (h->client,
234 sizeof(struct GNUNET_MessageHeader),
235 GNUNET_TIME_UNIT_MINUTES,
240 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
250 * Type of a function to call when we receive a message
251 * from the service. This specific function is used
252 * to handle messages of type "struct StatusMessage".
255 * @param msg message received, NULL on timeout or fatal error
258 with_status_response_handler (void *cls,
260 GNUNET_MessageHeader * msg)
262 struct GNUNET_DATASTORE_Handle *h = cls;
263 GNUNET_DATASTORE_ContinuationWithStatus cont = h->response_proc;
264 const struct StatusMessage *sm;
271 h->response_proc = NULL;
272 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
273 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
274 cont (h->response_proc_cls,
276 _("Timeout trying to read response from datastore service"));
279 if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
280 (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) )
283 h->response_proc = NULL;
284 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
285 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
286 cont (h->response_proc_cls,
288 _("Error reading response from datastore service"));
291 sm = (const struct StatusMessage*) msg;
292 status = ntohl(sm->status);
294 if (ntohs(msg->size) > sizeof(struct StatusMessage))
296 emsg = (const char*) &sm[1];
297 if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
300 emsg = _("Invalid error message received from datastore service");
303 if ( (status == GNUNET_SYSERR) &&
307 emsg = _("Invalid error message received from datastore service");
309 h->response_proc = NULL;
311 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
312 "Received status %d/%s\n",
316 cont (h->response_proc_cls,
323 * Helper function that will initiate the
324 * transmission of a message to the datastore
325 * service. The message must already be prepared
326 * and stored in the buffer at the end of the
327 * handle. The message must be of a type that
328 * expects a "StatusMessage" in response.
330 * @param h handle to the service with prepared message
331 * @param cont function to call with result
332 * @param cont_cls closure
333 * @param timeout timeout for the operation
336 transmit_for_status (struct GNUNET_DATASTORE_Handle *h,
337 GNUNET_DATASTORE_ContinuationWithStatus cont,
339 struct GNUNET_TIME_Relative timeout)
341 const struct GNUNET_MessageHeader *hdr;
344 GNUNET_assert (cont != NULL);
345 hdr = (const struct GNUNET_MessageHeader*) &h[1];
346 msize = ntohs(hdr->size);
348 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
349 "Transmitting %u byte message of type %u to datastore service\n",
353 GNUNET_assert (h->response_proc == NULL);
354 h->response_proc = cont;
355 h->response_proc_cls = cont_cls;
356 h->timeout = GNUNET_TIME_relative_to_absolute (timeout);
357 h->message_size = msize;
359 GNUNET_CLIENT_transmit_and_get_response (h->client,
363 &with_status_response_handler,
367 h->response_proc = NULL;
371 _("Not ready to transmit request to datastore service"));
377 * Store an item in the datastore. If the item is already present,
378 * the priorities are summed up and the higher expiration time and
379 * lower anonymity level is used.
381 * @param h handle to the datastore
382 * @param rid reservation ID to use (from "reserve"); use 0 if no
383 * prior reservation was made
384 * @param key key for the value
385 * @param size number of bytes in data
386 * @param data content stored
387 * @param type type of the content
388 * @param priority priority of the content
389 * @param anonymity anonymity-level for the content
390 * @param expiration expiration time for the content
391 * @param timeout timeout for the operation
392 * @param cont continuation to call when done
393 * @param cont_cls closure for cont
396 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
398 const GNUNET_HashCode * key,
401 enum GNUNET_BLOCK_Type type,
404 struct GNUNET_TIME_Absolute expiration,
405 struct GNUNET_TIME_Relative timeout,
406 GNUNET_DATASTORE_ContinuationWithStatus cont,
409 struct DataMessage *dm;
413 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
414 "Asked to put %u bytes of data under key `%s'\n",
418 msize = sizeof(struct DataMessage) + size;
419 GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
420 dm = (struct DataMessage*) &h[1];
421 dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
422 dm->header.size = htons(msize);
423 dm->rid = htonl(rid);
424 dm->size = htonl(size);
425 dm->type = htonl(type);
426 dm->priority = htonl(priority);
427 dm->anonymity = htonl(anonymity);
428 dm->uid = GNUNET_htonll(0);
429 dm->expiration = GNUNET_TIME_absolute_hton(expiration);
431 memcpy (&dm[1], data, size);
432 transmit_for_status (h, cont, cont_cls, timeout);
437 * Reserve space in the datastore. This function should be used
438 * to avoid "out of space" failures during a longer sequence of "put"
439 * operations (for example, when a file is being inserted).
441 * @param h handle to the datastore
442 * @param amount how much space (in bytes) should be reserved (for content only)
443 * @param entries how many entries will be created (to calculate per-entry overhead)
444 * @param cont continuation to call when done; "success" will be set to
445 * a positive reservation value if space could be reserved.
446 * @param cont_cls closure for cont
447 * @param timeout how long to wait at most for a response
450 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
453 GNUNET_DATASTORE_ContinuationWithStatus cont,
455 struct GNUNET_TIME_Relative timeout)
457 struct ReserveMessage *rm;
459 rm = (struct ReserveMessage*) &h[1];
460 rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
461 rm->header.size = htons(sizeof (struct ReserveMessage));
462 rm->entries = htonl(entries);
463 rm->amount = GNUNET_htonll(amount);
464 transmit_for_status (h, cont, cont_cls, timeout);
469 * Signal that all of the data for which a reservation was made has
470 * been stored and that whatever excess space might have been reserved
471 * can now be released.
473 * @param h handle to the datastore
474 * @param rid reservation ID (value of "success" in original continuation
475 * from the "reserve" function).
476 * @param cont continuation to call when done
477 * @param cont_cls closure for cont
478 * @param timeout how long to wait at most for a response
481 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
483 GNUNET_DATASTORE_ContinuationWithStatus cont,
485 struct GNUNET_TIME_Relative timeout)
487 struct ReleaseReserveMessage *rrm;
489 rrm = (struct ReleaseReserveMessage*) &h[1];
490 rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
491 rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
492 rrm->rid = htonl(rid);
493 transmit_for_status (h, cont, cont_cls, timeout);
498 * Update a value in the datastore.
500 * @param h handle to the datastore
501 * @param uid identifier for the value
502 * @param priority how much to increase the priority of the value
503 * @param expiration new expiration value should be MAX of existing and this argument
504 * @param cont continuation to call when done
505 * @param cont_cls closure for cont
506 * @param timeout how long to wait at most for a response
509 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
510 unsigned long long uid,
512 struct GNUNET_TIME_Absolute expiration,
513 GNUNET_DATASTORE_ContinuationWithStatus cont,
515 struct GNUNET_TIME_Relative timeout)
517 struct UpdateMessage *um;
519 um = (struct UpdateMessage*) &h[1];
520 um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
521 um->header.size = htons(sizeof (struct UpdateMessage));
522 um->priority = htonl(priority);
523 um->expiration = GNUNET_TIME_absolute_hton(expiration);
524 um->uid = GNUNET_htonll(uid);
525 transmit_for_status (h, cont, cont_cls, timeout);
530 * Helper function that will initiate the transmission of a message to
531 * the datastore service. The message must already be prepared and
532 * stored in the buffer at the end of the handle. The message must be
533 * of a type that expects a "DataMessage" in response.
535 * @param h handle to the service with prepared message
536 * @param cont function to call with result
537 * @param cont_cls closure
538 * @param timeout timeout for the operation
541 transmit_for_result (struct GNUNET_DATASTORE_Handle *h,
542 GNUNET_DATASTORE_Iterator cont,
544 struct GNUNET_TIME_Relative timeout);
548 * Type of a function to call when we receive a message
549 * from the service. This specific function is used
550 * to handle messages of type "struct DataMessage".
553 * @param msg message received, NULL on timeout or fatal error
556 with_result_response_handler (void *cls,
558 GNUNET_MessageHeader * msg)
560 struct GNUNET_DATASTORE_Handle *h = cls;
561 GNUNET_DATASTORE_Iterator cont = h->response_proc;
562 const struct DataMessage *dm;
564 struct GNUNET_TIME_Relative remaining;
569 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
570 "Got disconnected from datastore\n");
572 h->response_proc = NULL;
573 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
574 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
575 remaining = GNUNET_TIME_absolute_get_remaining (h->timeout);
576 if (remaining.value > 0)
578 transmit_for_result (h,
580 h->response_proc_cls,
586 cont (h->response_proc_cls,
587 NULL, 0, NULL, 0, 0, 0,
588 GNUNET_TIME_UNIT_ZERO_ABS, 0);
593 if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
595 GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
596 h->response_proc = NULL;
598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599 "Received end of result set\n");
601 cont (h->response_proc_cls,
602 NULL, 0, NULL, 0, 0, 0,
603 GNUNET_TIME_UNIT_ZERO_ABS, 0);
606 if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
607 (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) )
610 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
611 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
612 h->response_proc = NULL;
613 cont (h->response_proc_cls,
614 NULL, 0, NULL, 0, 0, 0,
615 GNUNET_TIME_UNIT_ZERO_ABS, 0);
618 dm = (const struct DataMessage*) msg;
619 msize = ntohl(dm->size);
620 if (ntohs(msg->size) != msize + sizeof(struct DataMessage))
623 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
624 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
625 h->response_proc = NULL;
626 cont (h->response_proc_cls,
627 NULL, 0, NULL, 0, 0, 0,
628 GNUNET_TIME_UNIT_ZERO_ABS, 0);
632 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
633 "Received result %llu with type %u and size %u with key %s\n",
634 (unsigned long long) GNUNET_ntohll(dm->uid),
637 GNUNET_h2s(&dm->key));
639 cont (h->response_proc_cls,
645 ntohl(dm->anonymity),
646 GNUNET_TIME_absolute_ntoh(dm->expiration),
647 GNUNET_ntohll(dm->uid));
652 * Function called to trigger obtaining the next result
653 * from the datastore.
655 * @param h handle to the datastore
656 * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
657 * iteration (with a final call to "iter" with key/data == NULL).
660 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
663 GNUNET_DATASTORE_Iterator cont;
665 if (GNUNET_YES == more)
667 GNUNET_CLIENT_receive (h->client,
668 &with_result_response_handler,
670 GNUNET_TIME_absolute_get_remaining (h->timeout));
673 cont = h->response_proc;
674 h->response_proc = NULL;
675 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
676 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
677 cont (h->response_proc_cls,
678 NULL, 0, NULL, 0, 0, 0,
679 GNUNET_TIME_UNIT_ZERO_ABS, 0);
684 * Helper function that will initiate the transmission of a message to
685 * the datastore service. The message must already be prepared and
686 * stored in the buffer at the end of the handle. The message must be
687 * of a type that expects a "DataMessage" in response.
689 * @param h handle to the service with prepared message
690 * @param cont function to call with result
691 * @param cont_cls closure
692 * @param timeout timeout for the operation
695 transmit_for_result (struct GNUNET_DATASTORE_Handle *h,
696 GNUNET_DATASTORE_Iterator cont,
698 struct GNUNET_TIME_Relative timeout)
700 const struct GNUNET_MessageHeader *hdr;
703 GNUNET_assert (cont != NULL);
704 hdr = (const struct GNUNET_MessageHeader*) &h[1];
705 msize = ntohs(hdr->size);
707 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
708 "Transmitting %u byte message of type %u to datastore service\n",
712 GNUNET_assert (h->response_proc == NULL);
713 h->response_proc = cont;
714 h->response_proc_cls = cont_cls;
715 h->timeout = GNUNET_TIME_relative_to_absolute (timeout);
716 h->message_size = msize;
718 GNUNET_CLIENT_transmit_and_get_response (h->client,
722 &with_result_response_handler,
726 h->response_proc = NULL;
728 cont (h->response_proc_cls,
729 NULL, 0, NULL, 0, 0, 0,
730 GNUNET_TIME_UNIT_ZERO_ABS, 0);
736 * Iterate over the results for a particular key
739 * @param h handle to the datastore
740 * @param key maybe NULL (to match all entries)
741 * @param type desired type, 0 for any
742 * @param iter function to call on each matching value;
743 * will be called once with a NULL value at the end
744 * @param iter_cls closure for iter
745 * @param timeout how long to wait at most for a response
748 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
749 const GNUNET_HashCode * key,
750 enum GNUNET_BLOCK_Type type,
751 GNUNET_DATASTORE_Iterator iter, void *iter_cls,
752 struct GNUNET_TIME_Relative timeout)
754 struct GetMessage *gm;
757 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
758 "Asked to look for data under key `%s'\n",
761 gm = (struct GetMessage*) &h[1];
762 gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
763 gm->type = htonl(type);
766 gm->header.size = htons(sizeof (struct GetMessage));
771 gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
773 GNUNET_assert (h->response_proc == NULL);
774 transmit_for_result (h, iter, iter_cls, timeout);
779 * Get a random value from the datastore.
781 * @param h handle to the datastore
782 * @param iter function to call on a random value; it
783 * will be called exactly once; if no values
784 * are available, the value will be NULL.
785 * @param iter_cls closure for iter
786 * @param timeout how long to wait at most for a response
789 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
790 GNUNET_DATASTORE_Iterator iter, void *iter_cls,
791 struct GNUNET_TIME_Relative timeout)
793 struct GNUNET_MessageHeader *m;
795 m = (struct GNUNET_MessageHeader*) &h[1];
796 m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
797 m->size = htons(sizeof (struct GNUNET_MessageHeader));
798 GNUNET_assert (h->response_proc == NULL);
799 transmit_for_result (h, iter, iter_cls, timeout);
804 * Explicitly remove some content from the database.
806 * @param h handle to the datastore
807 * @param key key for the value
808 * @param size number of bytes in data
809 * @param data content stored
810 * @param cont continuation to call when done
811 * @param cont_cls closure for cont
812 * @param timeout how long to wait at most for a response
815 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
816 const GNUNET_HashCode * key,
817 uint32_t size, const void *data,
818 GNUNET_DATASTORE_ContinuationWithStatus cont,
820 struct GNUNET_TIME_Relative timeout)
822 struct DataMessage *dm;
826 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
827 "Asked to remove %u bytes of data under key `%s'\n",
831 msize = sizeof(struct DataMessage) + size;
832 GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
833 dm = (struct DataMessage*) &h[1];
834 dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
835 dm->header.size = htons(msize);
837 dm->size = htonl(size);
839 dm->priority = htonl(0);
840 dm->anonymity = htonl(0);
841 dm->uid = GNUNET_htonll(0);
842 dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
844 memcpy (&dm[1], data, size);
845 transmit_for_status (h, cont, cont_cls, timeout);
849 /* end of datastore_api.c */