/**
* Entry in our priority queue.
*/
-struct QueueEntry
+struct GNUNET_DATASTORE_QueueEntry
{
/**
* This is a linked list.
*/
- struct QueueEntry *next;
+ struct GNUNET_DATASTORE_QueueEntry *next;
/**
* This is a linked list.
*/
- struct QueueEntry *prev;
+ struct GNUNET_DATASTORE_QueueEntry *prev;
/**
* Handle to the master context.
/**
* Function to call after transmission of the request.
*/
- GNUNET_DATASTORE_ContinuationWithStatus contX;
+ GNUNET_DATASTORE_ContinuationWithStatus cont;
/**
* Closure for 'cont'.
*/
- void *cont_clsX;
+ void *cont_cls;
/**
* Task for timeout signalling.
/**
* Current head of priority queue.
*/
- struct QueueEntry *queue_head;
+ struct GNUNET_DATASTORE_QueueEntry *queue_head;
/**
* Current tail of priority queue.
*/
- struct QueueEntry *queue_tail;
+ struct GNUNET_DATASTORE_QueueEntry *queue_tail;
/**
* Task for trying to reconnect.
void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
int drop)
{
- struct QueueEntry *qe;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
if (h->client != NULL)
GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
/**
* A request has timed out (before being transmitted to the service).
*
- * @param cls the 'struct QueueEntry'
+ * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
* @param tc scheduler context
*/
static void
timeout_queue_entry (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct QueueEntry *qe = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe = cls;
qe->task = GNUNET_SCHEDULER_NO_TASK;
GNUNET_assert (qe->was_transmitted == GNUNET_NO);
* @param client_ctx client context (NOT a closure for response_proc)
* @return NULL if the queue is full (and this entry was dropped)
*/
-static struct QueueEntry *
+static struct GNUNET_DATASTORE_QueueEntry *
make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
size_t msize,
unsigned int queue_priority,
GNUNET_CLIENT_MessageHandler response_proc,
void *client_ctx)
{
- struct QueueEntry *ret;
- struct QueueEntry *pos;
+ struct GNUNET_DATASTORE_QueueEntry *ret;
+ struct GNUNET_DATASTORE_QueueEntry *pos;
unsigned int c;
c = 0;
(h->queue_head->was_transmitted) )
pos = h->queue_head;
}
- ret = GNUNET_malloc (sizeof (struct QueueEntry) + msize);
+ ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
h->queue_tail,
pos,
void *buf)
{
struct GNUNET_DATASTORE_Handle *h = cls;
- struct QueueEntry *qe;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
size_t msize;
h->th = NULL;
static void
process_queue (struct GNUNET_DATASTORE_Handle *h)
{
- struct QueueEntry *qe;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
if (NULL == (qe = h->queue_head))
return; /* no entry in queue */
const struct
GNUNET_MessageHeader * msg)
{
- struct QueueEntry *qe = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe = cls;
struct GNUNET_DATASTORE_Handle *h = qe->h;
struct StatusContext *rc = qe->client_ctx;
const struct StatusMessage *sm;
* @param timeout timeout for the operation
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
int rid,
const GNUNET_HashCode * key,
void *cont_cls)
{
struct StatusContext *scont;
- struct QueueEntry *qe;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
struct DataMessage *dm;
size_t msize;
queue_priority, max_queue_size, timeout,
&process_status_message, scont);
if (qe == NULL)
- return;
+ return NULL;
dm = (struct DataMessage* ) &qe[1];
dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
dm->header.size = htons(msize);
dm->key = *key;
memcpy (&dm[1], data, size);
process_queue (h);
+ return qe;
}
* @param cont continuation to call when done; "success" will be set to
* a positive reservation value if space could be reserved.
* @param cont_cls closure for cont
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
uint64_t amount,
uint32_t entries,
GNUNET_DATASTORE_ContinuationWithStatus cont,
void *cont_cls)
{
- struct QueueEntry *qe;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
struct ReserveMessage *rm;
struct StatusContext *scont;
queue_priority, max_queue_size, timeout,
&process_status_message, scont);
if (qe == NULL)
- return;
+ return NULL;
rm = (struct ReserveMessage*) &qe[1];
rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
rm->header.size = htons(sizeof (struct ReserveMessage));
rm->entries = htonl(entries);
rm->amount = GNUNET_htonll(amount);
process_queue (h);
+ return qe;
}
* @param timeout how long to wait at most for a response
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
int rid,
unsigned int queue_priority,
GNUNET_DATASTORE_ContinuationWithStatus cont,
void *cont_cls)
{
- struct QueueEntry *qe;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
struct ReleaseReserveMessage *rrm;
struct StatusContext *scont;
queue_priority, max_queue_size, timeout,
&process_status_message, scont);
if (qe == NULL)
- return;
+ return NULL;
rrm = (struct ReleaseReserveMessage*) &qe[1];
rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
rrm->rid = htonl(rid);
process_queue (h);
+ return qe;
}
* @param timeout how long to wait at most for a response
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
unsigned long long uid,
uint32_t priority,
GNUNET_DATASTORE_ContinuationWithStatus cont,
void *cont_cls)
{
- struct QueueEntry *qe;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
struct UpdateMessage *um;
struct StatusContext *scont;
queue_priority, max_queue_size, timeout,
&process_status_message, scont);
if (qe == NULL)
- return;
+ return NULL;
um = (struct UpdateMessage*) &qe[1];
um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
um->header.size = htons(sizeof (struct UpdateMessage));
um->expiration = GNUNET_TIME_absolute_hton(expiration);
um->uid = GNUNET_htonll(uid);
process_queue (h);
+ return qe;
}
* @param timeout how long to wait at most for a response
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
const GNUNET_HashCode *key,
uint32_t size,
GNUNET_DATASTORE_ContinuationWithStatus cont,
void *cont_cls)
{
- struct QueueEntry *qe;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
struct DataMessage *dm;
size_t msize;
struct StatusContext *scont;
queue_priority, max_queue_size, timeout,
&process_status_message, scont);
if (qe == NULL)
- return;
+ return NULL;
dm = (struct DataMessage*) &qe[1];
dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
dm->header.size = htons(msize);
dm->key = *key;
memcpy (&dm[1], data, size);
process_queue (h);
+ return qe;
}
process_result_message (void *cls,
const struct GNUNET_MessageHeader * msg)
{
- struct QueueEntry *qe = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe = cls;
struct GNUNET_DATASTORE_Handle *h = qe->h;
struct ResultContext *rc = qe->client_ctx;
const struct DataMessage *dm;
* will be called once with a value (if available)
* and always once with a value of NULL.
* @param iter_cls closure for iter
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
unsigned int queue_priority,
unsigned int max_queue_size,
GNUNET_DATASTORE_Iterator iter,
void *iter_cls)
{
- struct QueueEntry *qe;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
struct GNUNET_MessageHeader *m;
struct ResultContext *rcont;
queue_priority, max_queue_size, timeout,
&process_result_message, rcont);
if (qe == NULL)
- return;
+ return NULL;
m = (struct GNUNET_MessageHeader*) &qe[1];
m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
m->size = htons(sizeof (struct GNUNET_MessageHeader));
process_queue (h);
+ return qe;
}
* @param iter function to call on each matching value;
* will be called once with a NULL value at the end
* @param iter_cls closure for iter
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
const GNUNET_HashCode * key,
enum GNUNET_BLOCK_Type type,
GNUNET_DATASTORE_Iterator iter,
void *iter_cls)
{
- struct QueueEntry *qe;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
struct GetMessage *gm;
struct ResultContext *rcont;
queue_priority, max_queue_size, timeout,
&process_result_message, rcont);
if (qe == NULL)
- return;
+ return NULL;
gm = (struct GetMessage*) &qe[1];
gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
gm->type = htonl(type);
gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
}
process_queue (h);
+ return qe;
}
GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
int more)
{
- struct QueueEntry *qe = h->queue_head;
+ struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
struct ResultContext *rc = qe->client_ctx;
GNUNET_assert (NULL != qe);
}
+/**
+ * Cancel a datastore operation. The final callback from the
+ * operation must not have been done yet.
+ *
+ * @param qe operation to cancel
+ */
+void
+GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
+{
+ struct GNUNET_DATASTORE_Handle *h;
+ int reconnect;
+
+ h = qe->h;
+ reconnect = qe->was_transmitted;
+ GNUNET_CONTAINER_DLL_remove (h->queue_head,
+ h->queue_tail,
+ qe);
+ if (qe->task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (h->sched,
+ qe->task);
+ GNUNET_free (qe);
+ if (reconnect)
+ {
+ h->retry_time = GNUNET_TIME_UNIT_ZERO;
+ do_disconnect (h);
+ }
+}
+
+
/* end of datastore_api.c */
* @param cont continuation to call when done; "success" will be set to
* a positive reservation value if space could be reserved.
* @param cont_cls closure for cont
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
uint64_t amount,
uint32_t entries,
* @param timeout timeout for the operation
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
int rid,
const GNUNET_HashCode * key,
* @param timeout how long to wait at most for a response
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
int rid,
unsigned int queue_priority,
* @param timeout how long to wait at most for a response
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
unsigned long long uid,
uint32_t priority,
* @param timeout how long to wait at most for a response
* @param cont continuation to call when done
* @param cont_cls closure for cont
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
const GNUNET_HashCode *key,
uint32_t size,
* @param iter function to call on each matching value;
* will be called once with a NULL value at the end
* @param iter_cls closure for iter
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
const GNUNET_HashCode * key,
enum GNUNET_BLOCK_Type type,
* will be called once with a value (if available)
* and always once with a value of NULL.
* @param iter_cls closure for iter
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
*/
-void
+struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
unsigned int queue_priority,
unsigned int max_queue_size,
GNUNET_DATASTORE_Iterator iter,
void *iter_cls);
+/**
+ * Cancel a datastore operation. The final callback from the
+ * operation must not have been done yet.
+ *
+ * @param qe operation to cancel
+ */
+void
+GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe);
#if 0 /* keep Emacsens' auto-indent happy */