From 0c996c9a9e418c21c7e063d443f4fa1846b9448e Mon Sep 17 00:00:00 2001 From: "Nathan S. Evans" Date: Fri, 19 Mar 2010 12:43:34 +0000 Subject: [PATCH] added better continuation behavior to get start, put, and route start. test case now properly handles message confirmation receipts from service. find_peer api call still needs implemented, but we are generally much closer to a working point. i'm sure there are coverity issues as well as doxygen crap to be addressed --- src/dht/dht_api.c | 81 +++++++++++++++++++++++------------- src/dht/gnunet-service-dht.c | 10 +++++ src/dht/test_dht_api.c | 33 +++++++++++---- 3 files changed, 87 insertions(+), 37 deletions(-) diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 2b105e621..a2a8a7bf3 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -64,7 +64,7 @@ struct PendingMessage * Continuation to call on message send * or message receipt confirmation */ - GNUNET_DHT_MessageCallback cont; + GNUNET_SCHEDULER_Task cont; /** * Continuation closure @@ -269,7 +269,6 @@ void service_message_handler (void *cls, return; } - if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT) { dht_msg = (struct GNUNET_DHT_Message *)msg; @@ -305,7 +304,7 @@ void service_message_handler (void *cls, uid = GNUNET_ntohll(stop_msg->unique_id); #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received response to message (uid %llu)\n", "DHT API", uid); + "`%s': Received response to message (uid %llu), current uid %llu\n", "DHT API", uid, handle->current->unique_id); #endif if (handle->current->unique_id == uid) { @@ -314,12 +313,24 @@ void service_message_handler (void *cls, "`%s': Have pending confirmation for this message!\n", "DHT API", uid); #endif if (handle->current->cont != NULL) - handle->current->cont(handle->current->cont_cls, GNUNET_OK); + GNUNET_SCHEDULER_add_continuation(handle->sched, handle->current->cont, handle->current->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE); + GNUNET_free(handle->current->msg); - handle->current = NULL; GNUNET_free(handle->current); + handle->current = NULL; } } + else + { +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received unknown message type %d\n", "DHT API", ntohs(msg->type)); +#endif + } + + GNUNET_CLIENT_receive (handle->client, + &service_message_handler, + handle, GNUNET_TIME_UNIT_FOREVER_REL); } @@ -416,7 +427,12 @@ finish (struct GNUNET_DHT_Handle *handle, int code) if (pos->is_unique) { if (pos->cont != NULL) - pos->cont(pos->cont_cls, code); + { + if (code == GNUNET_SYSERR) + GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT); + else + GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE); + } GNUNET_free(pos->msg); handle->current = NULL; @@ -480,12 +496,12 @@ transmit_pending (void *cls, size_t size, void *buf) * @return GNUNET_YES on success, GNUNET_NO on failure. */ static int -try_connect (struct GNUNET_DHT_Handle *ret) +try_connect (struct GNUNET_DHT_Handle *handle) { - if (ret->client != NULL) + if (handle->client != NULL) return GNUNET_OK; - ret->client = GNUNET_CLIENT_connect (ret->sched, "dht", ret->cfg); - if (ret->client != NULL) + handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg); + if (handle->client != NULL) return GNUNET_YES; #if DEBUG_STATISTICS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -574,7 +590,7 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, struct GNUNET_TIME_Relative timeout, GNUNET_DHT_ReplyProcessor iter, void *iter_cls, - GNUNET_DHT_MessageCallback cont, + GNUNET_SCHEDULER_Task cont, void *cont_cls) { struct GNUNET_DHT_RouteHandle *route_handle; @@ -583,15 +599,21 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, size_t is_unique; size_t msize; GNUNET_HashCode *uid_key; - int count; uint64_t uid; - uid = 0; is_unique = GNUNET_YES; if (iter == NULL) is_unique = GNUNET_NO; route_handle = NULL; + uid_key = NULL; + + do + { + GNUNET_free_non_null(uid_key); + uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1); + uid_key = hash_from_uid(uid); + } while (GNUNET_CONTAINER_multihashmap_contains(handle->outstanding_requests, uid_key) == GNUNET_YES); if (is_unique) { @@ -600,32 +622,24 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, route_handle->iter = iter; route_handle->iter_cls = iter_cls; route_handle->dht_handle = handle; - route_handle->uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1); - uid = route_handle->uid; + route_handle->uid = uid; #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Unique ID is %llu\n", "DHT API", uid); #endif - count = 0; - uid_key = hash_from_uid(route_handle->uid); - /* While we have an outstanding request with the same identifier! */ - while (GNUNET_CONTAINER_multihashmap_contains(handle->outstanding_requests, uid_key) == GNUNET_YES) - { - GNUNET_free(uid_key); - uid_key = hash_from_uid(route_handle->uid); - } /** * Store based on random identifier! */ GNUNET_CONTAINER_multihashmap_put(handle->outstanding_requests, uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size); - GNUNET_free(uid_key); + } else { msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size); } + GNUNET_free(uid_key); message = GNUNET_malloc(msize); message->header.size = htons(msize); message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT); @@ -642,6 +656,7 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, pending->cont = cont; pending->cont_cls = cont_cls; pending->is_unique = is_unique; + pending->unique_id = uid; GNUNET_assert(handle->current == NULL); @@ -670,6 +685,9 @@ void dht_get_processor (void *cls, * @param key the key to look up * @param iter function to call on each result * @param iter_cls closure for iter + * @param cont continuation to call once message sent + * @param cont_cls closure for continuation + * * @return handle to stop the async get */ struct GNUNET_DHT_RouteHandle * @@ -678,7 +696,9 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, uint32_t type, const GNUNET_HashCode * key, GNUNET_DHT_GetIterator iter, - void *iter_cls) + void *iter_cls, + GNUNET_SCHEDULER_Task cont, + void *cont_cls) { struct GNUNET_DHT_GetContext *get_context; struct GNUNET_DHT_GetMessage *get_msg; @@ -700,7 +720,7 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage)); get_msg->type = htonl(type); - return GNUNET_DHT_route_start(handle, key, 0, 0, &get_msg->header, timeout, &get_reply_iterator, get_context, NULL, NULL); + return GNUNET_DHT_route_start(handle, key, 0, 0, &get_msg->header, timeout, &get_reply_iterator, get_context, cont, cont_cls); } @@ -724,12 +744,15 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle) #endif message->unique_id = GNUNET_htonll(route_handle->uid); + GNUNET_assert(route_handle->dht_handle->current == NULL); + pending = GNUNET_malloc(sizeof(struct PendingMessage)); pending->msg = (struct GNUNET_MessageHeader *)message; pending->timeout = DEFAULT_DHT_TIMEOUT; pending->cont = NULL; pending->cont_cls = NULL; pending->is_unique = GNUNET_NO; + pending->unique_id = route_handle->uid; GNUNET_assert(route_handle->dht_handle->current == NULL); @@ -811,7 +834,7 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const char *data, struct GNUNET_TIME_Absolute exp, struct GNUNET_TIME_Relative timeout, - GNUNET_DHT_MessageCallback cont, + GNUNET_SCHEDULER_Task cont, void *cont_cls) { struct GNUNET_DHT_PutMessage *put_msg; @@ -819,7 +842,7 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, if (handle->current != NULL) { - cont(cont_cls, GNUNET_SYSERR); + GNUNET_SCHEDULER_add_continuation(handle->sched, cont, cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT); return; } @@ -833,6 +856,8 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT); put_msg->header.size = htons(msize); put_msg->type = htonl(type); + put_msg->data_size = htons(size); + put_msg->expiration = exp; memcpy(&put_msg[1], data, size); GNUNET_DHT_route_start(handle, key, 0, 0, &put_msg->header, timeout, NULL, NULL, cont, cont_cls); diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index 2ff1384d7..426d78097 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -193,6 +193,10 @@ static void handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg, GN put_type = ntohs(put_msg->type); data_size = ntohs(put_msg->data_size); +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': %s msg total size is %d, data size %d, struct size %d\n", "DHT", "PUT", ntohs(put_msg->header.size), data_size, sizeof(struct GNUNET_DHT_PutMessage)); +#endif GNUNET_assert(ntohs(put_msg->header.size) == sizeof(struct GNUNET_DHT_PutMessage) + data_size); data = GNUNET_malloc(data_size); memcpy(data, &put_msg[1], data_size); @@ -202,6 +206,7 @@ static void handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg, GN "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "PUT", put_type, GNUNET_h2s(key)); #endif + /** * FIXME: Implement dht put request functionality here! */ @@ -250,6 +255,10 @@ send_client_receipt_confirmation(struct GNUNET_SERVER_Client *client, uint64_t u { struct GNUNET_DHT_StopMessage *confirm_message; +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Sending receipt confirmation for uid %llu\n", "DHT", uid); +#endif confirm_message = GNUNET_malloc(sizeof(struct GNUNET_DHT_StopMessage)); confirm_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP); confirm_message->header.size = htons(sizeof(struct GNUNET_DHT_StopMessage)); @@ -287,6 +296,7 @@ handle_dht_start_message(void *cls, struct GNUNET_SERVER_Client * client, break; case GNUNET_MESSAGE_TYPE_DHT_PUT: handle_dht_put(cls, (struct GNUNET_DHT_PutMessage *)enc_msg, &dht_msg->key); + send_client_receipt_confirmation(client, GNUNET_ntohll(dht_msg->unique_id)); break; case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: handle_dht_find_peer(cls, (struct GNUNET_DHT_FindPeerMessage *)enc_msg, &dht_msg->key); diff --git a/src/dht/test_dht_api.c b/src/dht/test_dht_api.c index 578af5aef..78e7dcd87 100644 --- a/src/dht/test_dht_api.c +++ b/src/dht/test_dht_api.c @@ -75,7 +75,8 @@ GNUNET_SCHEDULER_TaskIdentifier die_task; static void -end () +end (void *cls, + const struct GNUNET_SCHEDULER_TaskContext * tc) { /* do work here */ GNUNET_SCHEDULER_cancel (sched, die_task); @@ -83,8 +84,17 @@ end () GNUNET_DHT_disconnect (p1.dht_handle); die_task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DHT disconnected, returning success!\n"); - ok = 0; + + if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DHT disconnected, returning FAIL!\n"); + ok = 365; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DHT disconnected, returning success!\n"); + ok = 0; + } } static void @@ -130,14 +140,14 @@ void test_put (void *cls, memset(&hash, 42, sizeof(GNUNET_HashCode)); data = GNUNET_malloc(data_size); memset(data, 43, data_size); - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_put!\n"); GNUNET_assert (peer->dht_handle != NULL); - GNUNET_DHT_put(peer->dht_handle, &hash, 0, data_size, data, GNUNET_TIME_relative_to_absolute(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 360)) ,GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 360), NULL, NULL); + GNUNET_DHT_put(peer->dht_handle, &hash, 0, data_size, data, GNUNET_TIME_relative_to_absolute(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 360)) ,GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 360), &end, NULL); //GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_put, &p1); - GNUNET_SCHEDULER_add_now(sched, &end, NULL); + //GNUNET_SCHEDULER_add_now(sched, &end, NULL); } /** @@ -153,12 +163,16 @@ void test_get_stop (void *cls, GNUNET_HashCode hash; memset(&hash, 42, sizeof(GNUNET_HashCode)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_get_stop!\n"); + if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT) + GNUNET_SCHEDULER_add_now(sched, &end_badly, NULL); + GNUNET_assert (peer->dht_handle != NULL); GNUNET_DHT_get_stop(peer->get_handle); //GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_put, &p1); - GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &end, &p1); + GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_put, &p1); } @@ -175,15 +189,16 @@ void test_get (void *cls, GNUNET_HashCode hash; memset(&hash, 42, sizeof(GNUNET_HashCode)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_get!\n"); peer->dht_handle = GNUNET_DHT_connect (sched, peer->cfg, 100); GNUNET_assert (peer->dht_handle != NULL); - peer->get_handle = GNUNET_DHT_get_start(peer->dht_handle, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 100), 42, &hash, NULL, NULL); + peer->get_handle = GNUNET_DHT_get_start(peer->dht_handle, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 100), 42, &hash, NULL, NULL, &test_get_stop, &p1); if (peer->get_handle == NULL) GNUNET_SCHEDULER_add_now(sched, &end_badly, &p1); - GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_get_stop, &p1); + //GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_get_stop, &p1); } static void -- 2.25.1