#include "gnunet_datastore_service.h"
#include "datastore.h"
-
/**
- *
+ * Handle to the datastore service. Followed
+ * by 65536 bytes used for storing messages.
*/
-struct MessageQueue
+struct GNUNET_DATASTORE_Handle
{
- /**
- * This is a linked list.
- */
- struct MessageQueue *next;
/**
- * Message we will transmit (allocated at the end
- * of this struct; do not free!).
+ * Our configuration.
*/
- struct GNUNET_MessageHeader *msg;
+ struct GNUNET_CONFIGURATION_Handle *cfg;
/**
- * Function to call on the response.
+ * Our scheduler.
*/
- GNUNET_CLIENT_MessageHandler response_processor;
-
- /**
- * Closure for response_processor.
- */
- void *response_processor_cls;
-
-};
-
-
-/**
- * Handle to the datastore service.
- */
-struct GNUNET_DATASTORE_Handle
-{
+ struct GNUNET_SCHEDULER_Handle *sched;
/**
* Current connection to the datastore service.
*/
struct GNUNET_CLIENT_Connection *client;
-
- /**
- * Linked list of messages waiting to be transmitted.
- */
- struct MessageQueue *messages;
/**
- * Current response processor (NULL if we are not waiting
- * for a response). Largely used only to know if we have
- * a 'receive' request pending.
+ * Current response processor (NULL if we are not waiting for a
+ * response). The specific type depends on the kind of message we
+ * just transmitted.
*/
- GNUNET_CLIENT_MessageHandler response_proc;
+ void *response_proc;
/**
* Closure for response_proc.
*/
void *response_proc_cls;
+ /**
+ * Timeout for the current operation.
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
+ /**
+ * Number of bytes in the message following
+ * this struct, 0 if we have no request pending.
+ */
+ size_t message_size;
+
};
c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
if (c == NULL)
return NULL; /* oops */
- h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle));
+ h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) +
+ GNUNET_SERVER_MAX_MESSAGE_SIZE);
h->client = c;
+ h->cfg = cfg;
+ h->sched = sched;
return h;
}
{
struct GNUNET_DATASTORE_Handle *h = cls;
struct GNUNET_MessageHeader *hdr;
-
+
if (buf == NULL)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
int drop)
{
- if (GNUNET_YES == drop)
+ GNUNET_assert (0 == h->message_size);
+ GNUNET_assert (NULL == h->response_proc);
+ if ( (GNUNET_YES == drop) &&
+ (h->client != NULL) )
{
if (NULL !=
GNUNET_CLIENT_notify_transmit_ready (h->client,
return;
GNUNET_break (0);
}
- GNUNET_CLIENT_disconnect (h->client);
+ if (h->client != NULL)
+ GNUNET_CLIENT_disconnect (h->client);
GNUNET_free (h);
}
/**
- * The closure is followed by the data message.
+ * Type of a function to call when we receive a message
+ * from the service. This specific function is used
+ * to handle messages of type "struct StatusMessage".
+ *
+ * @param cls closure
+ * @param msg message received, NULL on timeout or fatal error
*/
-struct PutClosure
+static void
+with_status_response_handler (void *cls,
+ const struct
+ GNUNET_MessageHeader * msg)
{
- struct GNUNET_DATASTORE_Handle *h;
- GNUNET_DATASTORE_ContinuationWithStatus cont;
- void *cont_cls;
-};
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ GNUNET_DATASTORE_ContinuationWithStatus cont = h->response_proc;
+ const struct StatusMessage *sm;
+ const char *emsg;
+ int status;
+
+ if (msg == NULL)
+ {
+ h->response_proc = NULL;
+ GNUNET_CLIENT_disconnect (h->client);
+ h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
+ cont (h->response_proc_cls,
+ GNUNET_SYSERR,
+ _("Timeout trying to read response from datastore service\n"));
+ return;
+ }
+ if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
+ (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) )
+ {
+ GNUNET_break (0);
+ GNUNET_CLIENT_disconnect (h->client);
+ h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
+ cont (h->response_proc_cls,
+ GNUNET_SYSERR,
+ _("Error reading response from datastore service\n"));
+ return;
+ }
+ sm = (const struct StatusMessage*) msg;
+ status = ntohl(sm->status);
+ emsg = NULL;
+ if (status == GNUNET_SYSERR)
+ {
+ emsg = (const char*) &sm[1];
+ if ( (ntohs(msg->size) == sizeof(struct StatusMessage)) ||
+ (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0') )
+ {
+ GNUNET_break (0);
+ emsg = _("Invalid error message received from datastore service");
+ }
+ }
+ h->response_proc = NULL;
+ cont (h->response_proc_cls,
+ status,
+ emsg);
+}
/**
- * Transmit PUT message to Database service.
+ * Transmit message to datastore service and then
+ * read a status message.
+ *
+ * @param cls closure with handle to datastore
+ * @param size number of bytes we can transmit at most
+ * @param buf where to write transmission, NULL on
+ * timeout
+ * @return number of bytes copied to buf
*/
static size_t
-transmit_put (void *cls,
- size_t size, void *buf)
+transmit_get_status (void *cls,
+ size_t size,
+ void *buf)
{
- struct PutClosure *pc = cls;
- struct DataMessage *dm;
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ GNUNET_DATASTORE_ContinuationWithStatus cont = h->response_proc;
uint16_t msize;
+ h->response_proc = NULL;
if (buf == NULL)
{
- pc->cont (pc->cont_cls, GNUNET_SYSERR,
- gettext_noop ("Error transmitting `PUT' message to datastore service.\n"));
- GNUNET_free (pc);
+ h->message_size = 0;
+ cont (h->response_proc_cls,
+ GNUNET_SYSERR,
+ gettext_noop ("Error transmitting message to datastore service.\n"));
return 0;
}
- dm = (struct DataMessage*) &pc[1];
- msize = ntohs(dm->size);
- GNUNET_assert (msize <= size);
- memcpy (buf, dm, msize);
- /* FIXME: wait for response from datastore, then
- call our continuation! */
+ GNUNET_assert (h->message_size <= size);
+ memcpy (buf, &h[1], h->message_size);
+ h->message_size = 0;
+ GNUNET_CLIENT_receive (h->client,
+ &with_status_response_handler,
+ h,
+ GNUNET_TIME_absolute_get_remaining (h->timeout));
return msize;
}
+/**
+ * Helper function that will initiate the
+ * transmission of a message to the datastore
+ * service. The message must already be prepared
+ * and stored in the buffer at the end of the
+ * handle. The message must be of a type that
+ * expects a "StatusMessage" in response.
+ *
+ * @param h handle to the service with prepared message
+ * @param cont function to call with result
+ * @param cont_cls closure
+ * @param timeout timeout for the operation
+ */
+static void
+transmit_for_status (struct GNUNET_DATASTORE_Handle *h,
+ GNUNET_DATASTORE_ContinuationWithStatus cont,
+ void *cont_cls,
+ struct GNUNET_TIME_Relative timeout)
+{
+ const struct GNUNET_MessageHeader *hdr;
+ uint16_t msize;
+
+ hdr = (const struct GNUNET_MessageHeader*) &h[1];
+ msize = ntohs(hdr->size);
+ GNUNET_assert (h->response_proc == NULL);
+ h->response_proc = cont;
+ h->response_proc_cls = cont_cls;
+ h->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+ h->message_size = msize;
+ if (NULL == GNUNET_CLIENT_notify_transmit_ready (h->client,
+ msize,
+ timeout,
+ &transmit_get_status,
+ h))
+ {
+ GNUNET_break (0);
+ h->response_proc = NULL;
+ h->message_size = 0;
+ cont (cont_cls,
+ GNUNET_SYSERR,
+ gettext_noop ("Not ready to transmit request to datastore service"));
+ }
+}
+
+
/**
* Store an item in the datastore. If the item is already present,
* the priorities are summed up and the higher expiration time and
GNUNET_DATASTORE_ContinuationWithStatus cont,
void *cont_cls)
{
- struct PutClosure *pc;
struct DataMessage *dm;
+ size_t msize;
- pc = GNUNET_malloc (sizeof(struct PutClosure) +
- sizeof(struct DataMessage) +
- size);
- dm = (struct DataMessage*) &pc[1];
- pc->h = h;
- pc->cont = cont;
- pc->cont_cls = cont_cls;
+ msize = sizeof(struct DataMessage) + size;
+ GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
+ dm = (struct DataMessage*) &h[1];
dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
- dm->header.size = htons(sizeof(struct DataMessage) + size);
+ dm->header.size = htons(msize);
dm->rid = htonl(rid);
dm->size = htonl(size);
dm->type = htonl(type);
dm->expiration = GNUNET_TIME_absolute_hton(expiration);
dm->key = *key;
memcpy (&dm[1], data, size);
- if (NULL == GNUNET_CLIENT_notify_transmit_ready (h->client,
- sizeof(struct DataMessage) + size,
- timeout,
- &transmit_put,
- pc))
- {
- GNUNET_break (0);
- cont (cont_cls, GNUNET_SYSERR,
- gettext_noop ("Not ready to transmit request to datastore service"));
- }
+ transmit_for_status (h, cont, cont_cls, timeout);
}
* @param cont continuation to call when done; "success" will be set to
* a positive reservation value if space could be reserved.
* @param cont_cls closure for cont
+ * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
uint64_t amount,
uint64_t entries,
GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls)
+ void *cont_cls,
+ struct GNUNET_TIME_Relative timeout)
{
- cont (cont_cls, GNUNET_SYSERR, "not implemented");
+ struct ReserveMessage *rm;
+
+ rm = (struct ReserveMessage*) &h[1];
+ rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
+ rm->header.size = htons(sizeof (struct ReserveMessage));
+ rm->reserved = htonl(0);
+ rm->amount = htonl(amount);
+ rm->entries = htonl(entries);
+ transmit_for_status (h, cont, cont_cls, timeout);
}
* from the "reserve" function).
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
int rid,
GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls)
+ void *cont_cls,
+ struct GNUNET_TIME_Relative timeout)
{
- cont (cont_cls, GNUNET_OK, NULL);
+ struct ReleaseReserveMessage *rrm;
+
+ rrm = (struct ReleaseReserveMessage*) &h[1];
+ rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
+ rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
+ rrm->rid = htonl(rid);
+ transmit_for_status (h, cont, cont_cls, timeout);
}
* @param expiration new expiration value should be MAX of existing and this argument
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
uint32_t priority,
struct GNUNET_TIME_Absolute expiration,
GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls)
+ void *cont_cls,
+ struct GNUNET_TIME_Relative timeout)
{
- cont (cont_cls, GNUNET_SYSERR, "not implemented");
+ struct UpdateMessage *um;
+
+ um = (struct UpdateMessage*) &h[1];
+ um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
+ um->header.size = htons(sizeof (struct UpdateMessage));
+ um->priority = htonl(priority);
+ um->expiration = GNUNET_TIME_absolute_hton(expiration);
+ um->uid = GNUNET_htonll(uid);
+ transmit_for_status (h, cont, cont_cls, timeout);
}
* @param iter function to call on each matching value;
* will be called once with a NULL value at the end
* @param iter_cls closure for iter
+ * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
const GNUNET_HashCode * key,
uint32_t type,
- GNUNET_DATASTORE_Iterator iter, void *iter_cls)
+ GNUNET_DATASTORE_Iterator iter, void *iter_cls,
+ struct GNUNET_TIME_Relative timeout)
{
static struct GNUNET_TIME_Absolute zero;
iter (iter_cls,
* will be called exactly once; if no values
* are available, the value will be NULL.
* @param iter_cls closure for iter
+ * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
- GNUNET_DATASTORE_Iterator iter, void *iter_cls)
+ GNUNET_DATASTORE_Iterator iter, void *iter_cls,
+ struct GNUNET_TIME_Relative timeout)
{
static struct GNUNET_TIME_Absolute zero;
* @param data content stored
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
const GNUNET_HashCode * key,
uint32_t size, const void *data,
GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls)
+ void *cont_cls,
+ struct GNUNET_TIME_Relative timeout)
{
- cont (cont_cls, GNUNET_SYSERR, "not implemented");
+ struct DataMessage *dm;
+ size_t msize;
+
+ msize = sizeof(struct DataMessage) + size;
+ GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
+ dm = (struct DataMessage*) &h[1];
+ dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
+ dm->header.size = htons(msize);
+ dm->rid = htonl(0);
+ dm->size = htonl(size);
+ dm->type = htonl(0);
+ dm->priority = htonl(0);
+ dm->anonymity = htonl(0);
+ dm->uid = GNUNET_htonll(0);
+ dm->expiration.value = 0;
+ dm->key = *key;
+ memcpy (&dm[1], data, size);
+ transmit_for_status (h, cont, cont_cls, timeout);
}
* @param cont continuation to call when done; "success" will be set to
* a positive reservation value if space could be reserved.
* @param cont_cls closure for cont
+ * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
uint64_t amount,
uint64_t entries,
GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls);
+ void *cont_cls,
+ struct GNUNET_TIME_Relative timeout);
/**
* from the "reserve" function).
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
int rid,
GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls);
+ void *cont_cls,
+ struct GNUNET_TIME_Relative timeout);
/**
* @param expiration new expiration value should be MAX of existing and this argument
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
uint32_t priority,
struct GNUNET_TIME_Absolute expiration,
GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls);
+ void *cont_cls,
+ struct GNUNET_TIME_Relative timeout);
/**
* @param iter function to call on each matching value;
* will be called once with a NULL value at the end
* @param iter_cls closure for iter
+ * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
const GNUNET_HashCode * key,
uint32_t type,
- GNUNET_DATASTORE_Iterator iter, void *iter_cls);
+ GNUNET_DATASTORE_Iterator iter, void *iter_cls,
+ struct GNUNET_TIME_Relative timeout);
/**
* will be called once with a value (if available)
* and always once with a value of NULL.
* @param iter_cls closure for iter
+ * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
- GNUNET_DATASTORE_Iterator iter, void *iter_cls);
+ GNUNET_DATASTORE_Iterator iter, void *iter_cls,
+ struct GNUNET_TIME_Relative timeout);
/**
* @param data content stored
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
const GNUNET_HashCode *key,
uint32_t size, const void *data,
GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls);
+ void *cont_cls,
+ struct GNUNET_TIME_Relative timeout);
#if 0 /* keep Emacsens' auto-indent happy */