*/
GNUNET_SCHEDULER_TaskIdentifier task;
+ /**
+ * Is this request at the head of the queue irrespective of its
+ * timeout value?
+ */
+ int forced_head;
+
};
/**
*/
static struct GNUNET_DATASTORE_Handle *dsh;
+/**
+ * Pointer to the currently actively running request,
+ * NULL if none is running.
+ */
+static struct DatastoreRequestQueue *drq_running;
+
/**
- * Run the next DS request in our
- * queue, we're done with the current one.
+ * A datastore request had to be timed out.
+ *
+ * @param cls closure (unused)
+ * @param tc task context, unused
*/
static void
-next_ds_request ()
+timeout_ds_request (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct DatastoreRequestQueue *e;
-
- while (NULL != (e = drq_head))
- {
- if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
- break;
- if (e->task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (sched, e->task);
- GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
- e->req (e->req_cls, GNUNET_NO);
- GNUNET_free (e);
- }
- if (e == NULL)
- return;
- if (e->task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (sched, e->task);
+ struct DatastoreRequestQueue *e = cls;
+
e->task = GNUNET_SCHEDULER_NO_TASK;
- e->req (e->req_cls, GNUNET_YES);
GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
+ e->req (e->req_cls, GNUNET_NO);
GNUNET_free (e);
}
/**
- * A datastore request had to be timed out.
+ * A datastore request can be run right now. Run it.
*
* @param cls closure (of type "struct DatastoreRequestQueue*")
* @param tc task context, unused
*/
static void
-timeout_ds_request (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+run_next_request (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct DatastoreRequestQueue *e = cls;
- e->task = GNUNET_SCHEDULER_NO_TASK;
GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
- e->req (e->req_cls, GNUNET_NO);
- GNUNET_free (e);
+ drq_running = e;
+ e->req (e->req_cls, GNUNET_YES);
}
+/**
+ * Run the next DS request in our queue, we're done with the current
+ * one.
+ */
+static void
+next_ds_request ()
+{
+ struct DatastoreRequestQueue *e;
+
+ GNUNET_free_non_null (drq_running);
+ drq_running = NULL;
+ e = drq_head;
+ if (e == NULL)
+ return;
+ GNUNET_SCHEDULER_cancel (sched, e->task);
+ e->task = GNUNET_SCHEDULER_add_now (sched,
+ &run_next_request,
+ e);
+}
+
+
+/**
+ * Remove a pending request from the request queue.
+ *
+ * @param req request to remove
+ */
static void
dequeue_ds_request (struct DatastoreRequestQueue *req)
{
- if (req->task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (sched, req->task);
GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, req);
+ GNUNET_SCHEDULER_cancel (sched, req->task);
GNUNET_free (req);
}
* @param deadline by when the request should run
* @param fun function to call once the request can be run
* @param fun_cls closure for fun
+ * @param immediate should this be queued immediately at
+ * the head of the queue (irrespecitive of the deadline)?
* @return handle that can be used to dequeue the request
*/
static struct DatastoreRequestQueue *
queue_ds_request (struct GNUNET_TIME_Relative deadline,
RequestFunction fun,
- void *fun_cls)
+ void *fun_cls,
+ int immediate)
{
struct DatastoreRequestQueue *e;
struct DatastoreRequestQueue *bef;
- if (drq_head == NULL)
- {
- /* no other requests pending, run immediately */
- // FIXME: should probably use scheduler nevertheless
- // and return non-null!
- fun (fun_cls, GNUNET_OK);
- return NULL;
- }
e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
e->req = fun;
e->req_cls = fun_cls;
- if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
+ e->forced_head = immediate;
+ if (GNUNET_YES == immediate)
{
/* local request, highest prio, put at head of queue
regardless of deadline */
{
bef = drq_tail;
while ( (NULL != bef) &&
- (e->timeout.value < bef->timeout.value) )
+ (e->timeout.value < bef->timeout.value) &&
+ (GNUNET_YES != e->forced_head) )
bef = bef->prev;
}
GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
- if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
- return e;
e->task = GNUNET_SCHEDULER_add_delayed (sched,
deadline,
&timeout_ds_request,
e);
+ if (drq_running == NULL)
+ next_ds_request ();
return e;
}
while (NULL != (drq = drq_head))
{
drq_head = drq->next;
+ GNUNET_SCHEDULER_cancel (sched, drq->task);
drq->req (drq->req_cls, GNUNET_NO);
- dequeue_ds_request (drq);
+ GNUNET_free (drq);
}
drq_tail = NULL;
}
+/**
+ * Closure for 'do_get' and 'get_iterator'.
+ */
struct GetClosure
{
+ /**
+ * Key we are doing the 'get' for.
+ */
GNUNET_HashCode key;
+
+ /**
+ * Datastore entry type we are doing the 'get' for.
+ */
uint32_t type;
+
+ /**
+ * Function to call for each entry.
+ */
GNUNET_DATASTORE_Iterator iter;
+
+ /**
+ * Closure for iter.
+ */
void *iter_cls;
+
+ /**
+ * Timeout for this operation.
+ */
struct GNUNET_TIME_Absolute timeout;
};
+/**
+ * Wrapper for the datastore get operation. Makes sure to trigger the
+ * next datastore operation in the queue once the operation is
+ * complete.
+ *
+ * @param cls our 'struct GetClosure*'
+ */
static void
get_iterator (void *cls,
const GNUNET_HashCode * key,
{
struct GetClosure *gc = cls;
- gc->iter (gc->iter_cls,
- key, size, data, type,
- priority, anonymity, expiration, uid);
+ if (gc->iter == NULL)
+ {
+ /* stop the iteration */
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+ }
+ else
+ {
+ gc->iter (gc->iter_cls,
+ key, size, data, type,
+ priority, anonymity, expiration, uid);
+ }
if (key == NULL)
{
next_ds_request ();
}
+/**
+ * We're at the head of the reqeust queue, execute the
+ * get operation (or signal error).
+ *
+ * @param cls the 'struct GetClosure'
+ * @param ok GNUNET_OK if we can run the GET, otherwise
+ * we need to time out
+ */
static void
do_get (void *cls,
int ok)
if (ok != GNUNET_OK)
{
- gc->iter (gc->iter_cls,
- NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ if (gc->iter != NULL)
+ gc->iter (gc->iter_cls,
+ NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
GNUNET_free (gc);
next_ds_request ();
return;
* will be called once with a NULL value at the end
* @param iter_cls closure for iter
* @param timeout how long to wait at most for a response
+ * @param immediate should this be queued immediately at
+ * the head of the queue (irrespecitive of the timeout)?
*/
struct DatastoreRequestQueue *
GNUNET_FS_drq_get (const GNUNET_HashCode * key,
uint32_t type,
GNUNET_DATASTORE_Iterator iter,
void *iter_cls,
- struct GNUNET_TIME_Relative timeout)
+ struct GNUNET_TIME_Relative timeout,
+ int immediate)
{
struct GetClosure *gc;
gc->timeout = GNUNET_TIME_relative_to_absolute (timeout);
return queue_ds_request (timeout,
&do_get,
- gc);
+ gc,
+ immediate);
}
+/**
+ * Cancel the given operation.
+ *
+ * @param drq the queued operation (must not have been
+ * triggered so far)
+ */
void
GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq)
{
- dequeue_ds_request (drq);
+ struct GetClosure *gc;
+ if (drq == drq_running)
+ {
+ /* 'DATASTORE_get' has already been started (and this call might
+ actually be be legal since it is possible that the client has
+ not yet received any calls to its the iterator; so we need
+ to cancel somehow; we do this by getting to the 'GetClosure'
+ and zeroing the 'iter' field, which stops the iteration */
+ gc = drq_running->req_cls;
+ gc->iter = NULL;
+ }
+ else
+ {
+ dequeue_ds_request (drq);
+ }
}