From 5e6ff76c94890e47b51f8bd555437c0ee181e851 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 13 May 2010 13:43:35 +0000 Subject: [PATCH] add cancellation --- src/datastore/datastore_api.c | 137 +++++++++++++++++-------- src/include/gnunet_datastore_service.h | 43 ++++++-- 2 files changed, 133 insertions(+), 47 deletions(-) diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index ef3dccbf6..f3db94469 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -33,18 +33,18 @@ /** * Entry in our priority queue. */ -struct QueueEntry +struct GNUNET_DATASTORE_QueueEntry { /** * This is a linked list. */ - struct QueueEntry *next; + struct GNUNET_DATASTORE_QueueEntry *next; /** * This is a linked list. */ - struct QueueEntry *prev; + struct GNUNET_DATASTORE_QueueEntry *prev; /** * Handle to the master context. @@ -67,12 +67,12 @@ struct QueueEntry /** * Function to call after transmission of the request. */ - GNUNET_DATASTORE_ContinuationWithStatus contX; + GNUNET_DATASTORE_ContinuationWithStatus cont; /** * Closure for 'cont'. */ - void *cont_clsX; + void *cont_cls; /** * Task for timeout signalling. @@ -141,12 +141,12 @@ struct GNUNET_DATASTORE_Handle /** * Current head of priority queue. */ - struct QueueEntry *queue_head; + struct GNUNET_DATASTORE_QueueEntry *queue_head; /** * Current tail of priority queue. */ - struct QueueEntry *queue_tail; + struct GNUNET_DATASTORE_QueueEntry *queue_tail; /** * Task for trying to reconnect. @@ -240,7 +240,7 @@ transmit_drop (void *cls, void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop) { - struct QueueEntry *qe; + struct GNUNET_DATASTORE_QueueEntry *qe; if (h->client != NULL) GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); @@ -281,14 +281,14 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, /** * A request has timed out (before being transmitted to the service). * - * @param cls the 'struct QueueEntry' + * @param cls the 'struct GNUNET_DATASTORE_QueueEntry' * @param tc scheduler context */ static void timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct QueueEntry *qe = cls; + struct GNUNET_DATASTORE_QueueEntry *qe = cls; qe->task = GNUNET_SCHEDULER_NO_TASK; GNUNET_assert (qe->was_transmitted == GNUNET_NO); @@ -310,7 +310,7 @@ timeout_queue_entry (void *cls, * @param client_ctx client context (NOT a closure for response_proc) * @return NULL if the queue is full (and this entry was dropped) */ -static struct QueueEntry * +static struct GNUNET_DATASTORE_QueueEntry * make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize, unsigned int queue_priority, @@ -319,8 +319,8 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, GNUNET_CLIENT_MessageHandler response_proc, void *client_ctx) { - struct QueueEntry *ret; - struct QueueEntry *pos; + struct GNUNET_DATASTORE_QueueEntry *ret; + struct GNUNET_DATASTORE_QueueEntry *pos; unsigned int c; c = 0; @@ -348,7 +348,7 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, (h->queue_head->was_transmitted) ) pos = h->queue_head; } - ret = GNUNET_malloc (sizeof (struct QueueEntry) + msize); + ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, @@ -458,7 +458,7 @@ transmit_request (void *cls, void *buf) { struct GNUNET_DATASTORE_Handle *h = cls; - struct QueueEntry *qe; + struct GNUNET_DATASTORE_QueueEntry *qe; size_t msize; h->th = NULL; @@ -498,7 +498,7 @@ transmit_request (void *cls, static void process_queue (struct GNUNET_DATASTORE_Handle *h) { - struct QueueEntry *qe; + struct GNUNET_DATASTORE_QueueEntry *qe; if (NULL == (qe = h->queue_head)) return; /* no entry in queue */ @@ -563,7 +563,7 @@ process_status_message (void *cls, const struct GNUNET_MessageHeader * msg) { - struct QueueEntry *qe = cls; + struct GNUNET_DATASTORE_QueueEntry *qe = cls; struct GNUNET_DATASTORE_Handle *h = qe->h; struct StatusContext *rc = qe->client_ctx; const struct StatusMessage *sm; @@ -647,8 +647,11 @@ process_status_message (void *cls, * @param timeout timeout for the operation * @param cont continuation to call when done * @param cont_cls closure for cont + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, int rid, const GNUNET_HashCode * key, @@ -665,7 +668,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, void *cont_cls) { struct StatusContext *scont; - struct QueueEntry *qe; + struct GNUNET_DATASTORE_QueueEntry *qe; struct DataMessage *dm; size_t msize; @@ -684,7 +687,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, queue_priority, max_queue_size, timeout, &process_status_message, scont); if (qe == NULL) - return; + return NULL; dm = (struct DataMessage* ) &qe[1]; dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT); dm->header.size = htons(msize); @@ -698,6 +701,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, dm->key = *key; memcpy (&dm[1], data, size); process_queue (h); + return qe; } @@ -716,8 +720,11 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, * @param cont continuation to call when done; "success" will be set to * a positive reservation value if space could be reserved. * @param cont_cls closure for cont + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, uint32_t entries, @@ -727,7 +734,7 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { - struct QueueEntry *qe; + struct GNUNET_DATASTORE_QueueEntry *qe; struct ReserveMessage *rm; struct StatusContext *scont; @@ -746,13 +753,14 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, queue_priority, max_queue_size, timeout, &process_status_message, scont); if (qe == NULL) - return; + return NULL; rm = (struct ReserveMessage*) &qe[1]; rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); rm->header.size = htons(sizeof (struct ReserveMessage)); rm->entries = htonl(entries); rm->amount = GNUNET_htonll(amount); process_queue (h); + return qe; } @@ -773,8 +781,11 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for cont + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, int rid, unsigned int queue_priority, @@ -783,7 +794,7 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { - struct QueueEntry *qe; + struct GNUNET_DATASTORE_QueueEntry *qe; struct ReleaseReserveMessage *rrm; struct StatusContext *scont; @@ -801,12 +812,13 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, queue_priority, max_queue_size, timeout, &process_status_message, scont); if (qe == NULL) - return; + return NULL; rrm = (struct ReleaseReserveMessage*) &qe[1]; rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); rrm->header.size = htons(sizeof (struct ReleaseReserveMessage)); rrm->rid = htonl(rid); process_queue (h); + return qe; } @@ -823,8 +835,11 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for cont + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, unsigned long long uid, uint32_t priority, @@ -835,7 +850,7 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { - struct QueueEntry *qe; + struct GNUNET_DATASTORE_QueueEntry *qe; struct UpdateMessage *um; struct StatusContext *scont; @@ -855,7 +870,7 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, queue_priority, max_queue_size, timeout, &process_status_message, scont); if (qe == NULL) - return; + return NULL; um = (struct UpdateMessage*) &qe[1]; um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); um->header.size = htons(sizeof (struct UpdateMessage)); @@ -863,6 +878,7 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, um->expiration = GNUNET_TIME_absolute_hton(expiration); um->uid = GNUNET_htonll(uid); process_queue (h); + return qe; } @@ -883,8 +899,11 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for cont + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, const GNUNET_HashCode *key, uint32_t size, @@ -895,7 +914,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { - struct QueueEntry *qe; + struct GNUNET_DATASTORE_QueueEntry *qe; struct DataMessage *dm; size_t msize; struct StatusContext *scont; @@ -917,7 +936,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, queue_priority, max_queue_size, timeout, &process_status_message, scont); if (qe == NULL) - return; + return NULL; dm = (struct DataMessage*) &qe[1]; dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); dm->header.size = htons(msize); @@ -931,6 +950,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, dm->key = *key; memcpy (&dm[1], data, size); process_queue (h); + return qe; } @@ -964,7 +984,7 @@ static void process_result_message (void *cls, const struct GNUNET_MessageHeader * msg) { - struct QueueEntry *qe = cls; + struct GNUNET_DATASTORE_QueueEntry *qe = cls; struct GNUNET_DATASTORE_Handle *h = qe->h; struct ResultContext *rc = qe->client_ctx; const struct DataMessage *dm; @@ -1054,8 +1074,11 @@ process_result_message (void *cls, * will be called once with a value (if available) * and always once with a value of NULL. * @param iter_cls closure for iter + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, @@ -1063,7 +1086,7 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, GNUNET_DATASTORE_Iterator iter, void *iter_cls) { - struct QueueEntry *qe; + struct GNUNET_DATASTORE_QueueEntry *qe; struct GNUNET_MessageHeader *m; struct ResultContext *rcont; @@ -1079,11 +1102,12 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, queue_priority, max_queue_size, timeout, &process_result_message, rcont); if (qe == NULL) - return; + return NULL; m = (struct GNUNET_MessageHeader*) &qe[1]; m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM); m->size = htons(sizeof (struct GNUNET_MessageHeader)); process_queue (h); + return qe; } @@ -1105,8 +1129,11 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, * @param iter function to call on each matching value; * will be called once with a NULL value at the end * @param iter_cls closure for iter + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, const GNUNET_HashCode * key, enum GNUNET_BLOCK_Type type, @@ -1116,7 +1143,7 @@ GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, GNUNET_DATASTORE_Iterator iter, void *iter_cls) { - struct QueueEntry *qe; + struct GNUNET_DATASTORE_QueueEntry *qe; struct GetMessage *gm; struct ResultContext *rcont; @@ -1133,7 +1160,7 @@ GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, queue_priority, max_queue_size, timeout, &process_result_message, rcont); if (qe == NULL) - return; + return NULL; gm = (struct GetMessage*) &qe[1]; gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET); gm->type = htonl(type); @@ -1147,6 +1174,7 @@ GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode)); } process_queue (h); + return qe; } @@ -1162,7 +1190,7 @@ void GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h, int more) { - struct QueueEntry *qe = h->queue_head; + struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head; struct ResultContext *rc = qe->client_ctx; GNUNET_assert (NULL != qe); @@ -1188,4 +1216,33 @@ GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h, } +/** + * Cancel a datastore operation. The final callback from the + * operation must not have been done yet. + * + * @param qe operation to cancel + */ +void +GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) +{ + struct GNUNET_DATASTORE_Handle *h; + int reconnect; + + h = qe->h; + reconnect = qe->was_transmitted; + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + if (qe->task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (h->sched, + qe->task); + GNUNET_free (qe); + if (reconnect) + { + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + } +} + + /* end of datastore_api.c */ diff --git a/src/include/gnunet_datastore_service.h b/src/include/gnunet_datastore_service.h index e2de185bd..9d3a4ceb0 100644 --- a/src/include/gnunet_datastore_service.h +++ b/src/include/gnunet_datastore_service.h @@ -105,8 +105,11 @@ typedef void (*GNUNET_DATASTORE_ContinuationWithStatus)(void *cls, * @param cont continuation to call when done; "success" will be set to * a positive reservation value if space could be reserved. * @param cont_cls closure for cont + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, uint32_t entries, @@ -138,8 +141,11 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, * @param timeout timeout for the operation * @param cont continuation to call when done * @param cont_cls closure for cont + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, int rid, const GNUNET_HashCode * key, @@ -173,8 +179,11 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for cont + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, int rid, unsigned int queue_priority, @@ -197,8 +206,11 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for cont + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, unsigned long long uid, uint32_t priority, @@ -227,8 +239,11 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for cont + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, const GNUNET_HashCode *key, uint32_t size, @@ -282,8 +297,11 @@ typedef void (*GNUNET_DATASTORE_Iterator) (void *cls, * @param iter function to call on each matching value; * will be called once with a NULL value at the end * @param iter_cls closure for iter + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, const GNUNET_HashCode * key, enum GNUNET_BLOCK_Type type, @@ -319,8 +337,11 @@ GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h, * will be called once with a value (if available) * and always once with a value of NULL. * @param iter_cls closure for iter + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, @@ -328,6 +349,14 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, GNUNET_DATASTORE_Iterator iter, void *iter_cls); +/** + * Cancel a datastore operation. The final callback from the + * operation must not have been done yet. + * + * @param qe operation to cancel + */ +void +GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe); #if 0 /* keep Emacsens' auto-indent happy */ -- 2.25.1