From 72a844be131e93002257213893e4e377993f3cd8 Mon Sep 17 00:00:00 2001 From: "Nathan S. Evans" Date: Thu, 18 Mar 2010 21:19:10 +0000 Subject: [PATCH] basic api to service communication has been hammered out, as well as service confirmations going back. still needs work, many things not finished. --- src/dht/dht_api.c | 114 +++++++++++++++++++++++++---------- src/dht/gnunet-service-dht.c | 106 ++++++++++++++++++++++---------- src/dht/test_dht_api.c | 15 ++--- 3 files changed, 164 insertions(+), 71 deletions(-) diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 7ded088d8..2b105e621 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -250,50 +250,76 @@ void service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_DHT_Handle *handle = cls; - struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *)msg; + struct GNUNET_DHT_Message *dht_msg; + struct GNUNET_DHT_StopMessage *stop_msg; struct GNUNET_MessageHeader *enc_msg; struct GNUNET_DHT_RouteHandle *route_handle; + uint64_t uid; GNUNET_HashCode *uid_hash; size_t enc_size; /* TODO: find out message type, handle callbacks for different types of messages. * Should be a non unique acknowledgment, or unique result. */ + if (msg == NULL) + { #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received response to message (uid %llu)\n", "DHT API", ntohl(dht_msg->unique_id)); + "`%s': Received NULL from server, connection down?\n", "DHT API"); #endif + return; + } - if (ntohs(dht_msg->unique)) - { - uid_hash = hash_from_uid(ntohl(dht_msg->unique_id)); - route_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash); - if (route_handle == NULL) /* We have no recollection of this request */ - { + if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT) + { + dht_msg = (struct GNUNET_DHT_Message *)msg; + uid = GNUNET_ntohll(dht_msg->unique_id); +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received response to message (uid %llu)\n", "DHT API", uid); +#endif + if (ntohs(dht_msg->unique)) + { + uid_hash = hash_from_uid(ntohl(dht_msg->unique_id)); + + route_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash); + if (route_handle == NULL) /* We have no recollection of this request */ + { #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Received response to message (uid %llu), but have no recollection of it!\n", "DHT API", ntohl(dht_msg->unique_id)); #endif - } - else - { - enc_size = ntohs(dht_msg->header.size) - sizeof(struct GNUNET_DHT_Message); - GNUNET_assert(enc_size > 0); - enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1]; - route_handle->iter(route_handle->iter_cls, enc_msg); - } - } - else - { - if (handle->current->unique_id == ntohl(dht_msg->unique_id)) - { + } + else + { + enc_size = ntohs(dht_msg->header.size) - sizeof(struct GNUNET_DHT_Message); + GNUNET_assert(enc_size > 0); + enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1]; + route_handle->iter(route_handle->iter_cls, enc_msg); + } + } + } + else if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_STOP) + { + stop_msg = (struct GNUNET_DHT_StopMessage *)msg; + uid = GNUNET_ntohll(stop_msg->unique_id); +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received response to message (uid %llu)\n", "DHT API", uid); +#endif + if (handle->current->unique_id == uid) + { +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Have pending confirmation for this message!\n", "DHT API", uid); +#endif + if (handle->current->cont != NULL) handle->current->cont(handle->current->cont_cls, GNUNET_OK); - GNUNET_free(handle->current->msg); - handle->current = NULL; - GNUNET_free(handle->current); - } - } - + GNUNET_free(handle->current->msg); + handle->current = NULL; + GNUNET_free(handle->current); + } + } } @@ -381,7 +407,10 @@ finish (struct GNUNET_DHT_Handle *handle, int code) { /* TODO: if code is not GNUNET_OK, do something! */ struct PendingMessage *pos = handle->current; - +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Finish called!\n", "DHT API"); +#endif GNUNET_assert(pos != NULL); if (pos->is_unique) @@ -405,6 +434,10 @@ transmit_pending (void *cls, size_t size, void *buf) struct GNUNET_DHT_Handle *handle = cls; size_t tsize; +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': In transmit_pending\n", "DHT API"); +#endif if (buf == NULL) { #if DEBUG_DHT_API @@ -428,6 +461,7 @@ transmit_pending (void *cls, size_t size, void *buf) "`%s': Sending message size %d\n", "DHT API", tsize); #endif memcpy(buf, handle->current->msg, tsize); + finish(handle, GNUNET_OK); return tsize; } else @@ -467,7 +501,7 @@ try_connect (struct GNUNET_DHT_Handle *ret) static void process_pending_message(struct GNUNET_DHT_Handle *handle) { - if (handle->current != NULL) + if (handle->current == NULL) return; /* action already pending */ if (GNUNET_YES != try_connect (handle)) { @@ -550,7 +584,9 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, size_t msize; GNUNET_HashCode *uid_key; int count; + uint64_t uid; + uid = 0; is_unique = GNUNET_YES; if (iter == NULL) is_unique = GNUNET_NO; @@ -565,7 +601,11 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, route_handle->iter_cls = iter_cls; route_handle->dht_handle = handle; route_handle->uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1); - + uid = route_handle->uid; +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Unique ID is %llu\n", "DHT API", uid); +#endif count = 0; uid_key = hash_from_uid(route_handle->uid); /* While we have an outstanding request with the same identifier! */ @@ -578,7 +618,7 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, * Store based on random identifier! */ GNUNET_CONTAINER_multihashmap_put(handle->outstanding_requests, uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size) + sizeof(route_handle->uid); + msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size); GNUNET_free(uid_key); } else @@ -593,6 +633,8 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, message->options = htons(options); message->desired_replication_level = htons(options); message->unique = htons(is_unique); + message->unique_id = GNUNET_htonll(uid); + memcpy(&message[1], enc, ntohs(enc->size)); pending = GNUNET_malloc(sizeof(struct PendingMessage)); pending->msg = &message->header; @@ -603,6 +645,8 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, GNUNET_assert(handle->current == NULL); + handle->current = pending; + process_pending_message(handle); return route_handle; @@ -674,7 +718,11 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle) message = GNUNET_malloc(msize); message->header.size = htons(msize); message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP); - message->unique_id = htonl(route_handle->uid); +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Remove outstanding request for uid %llu\n", "DHT API", route_handle->uid); +#endif + message->unique_id = GNUNET_htonll(route_handle->uid); pending = GNUNET_malloc(sizeof(struct PendingMessage)); pending->msg = (struct GNUNET_MessageHeader *)message; @@ -685,6 +733,8 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle) GNUNET_assert(route_handle->dht_handle->current == NULL); + route_handle->dht_handle->current = pending; + process_pending_message(route_handle->dht_handle); uid_key = hash_from_uid(route_handle->uid); diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index 215d39c44..2ff1384d7 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -86,13 +86,19 @@ struct ClientList }; /** - * Server handler for initiating local dht get requests + * Server handler for handling locally received dht requests */ -static void handle_dht_plugin_message (void *cls, struct GNUNET_SERVER_Client * client, - const struct GNUNET_MessageHeader *message); +static void +handle_dht_start_message(void *cls, struct GNUNET_SERVER_Client * client, + const struct GNUNET_MessageHeader *message); + +static void +handle_dht_stop_message(void *cls, struct GNUNET_SERVER_Client * client, + const struct GNUNET_MessageHeader *message); static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { - {&handle_dht_plugin_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0}, + {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0}, + {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_STOP, 0}, /* {&handle_dht_get_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_GET_STOP, 0}, {&handle_dht_put, NULL, GNUNET_MESSAGE_TYPE_DHT_PUT, 0}, {&handle_dht_find_peer, NULL, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0}, @@ -203,10 +209,64 @@ static void handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg, GN GNUNET_free(data); } +/** + * Context for sending receipt confirmations. Not used yet. + */ +struct SendConfirmationContext +{ + /** + * The message to send. + */ + struct GNUNET_DHT_StopMessage *message; + + /** + * Transmit handle. + */ + struct GNUNET_CONNECTION_TransmitHandle * transmit_handle; +}; + +size_t send_confirmation (void *cls, + size_t size, void *buf) +{ + struct GNUNET_DHT_StopMessage *confirmation_message = cls; + + if (buf == NULL) /* Message timed out, that's crappy... */ + { + GNUNET_free(confirmation_message); + return 0; + } + + if (size >= ntohs(confirmation_message->header.size)) + { + memcpy(buf, confirmation_message, ntohs(confirmation_message->header.size)); + return ntohs(confirmation_message->header.size); + } + else + return 0; +} static void -handle_dht_start_message(void *cls, struct GNUNET_DHT_Message *dht_msg) +send_client_receipt_confirmation(struct GNUNET_SERVER_Client *client, uint64_t uid) { + struct GNUNET_DHT_StopMessage *confirm_message; + + confirm_message = GNUNET_malloc(sizeof(struct GNUNET_DHT_StopMessage)); + confirm_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP); + confirm_message->header.size = htons(sizeof(struct GNUNET_DHT_StopMessage)); + confirm_message->unique_id = GNUNET_htonll(uid); + + GNUNET_SERVER_notify_transmit_ready (client, + sizeof(struct GNUNET_DHT_StopMessage), + GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5), + &send_confirmation, confirm_message); + +} + +static void +handle_dht_start_message(void *cls, struct GNUNET_SERVER_Client * client, + const struct GNUNET_MessageHeader *message) +{ + struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *)message; struct GNUNET_MessageHeader *enc_msg; size_t enc_type; @@ -216,7 +276,7 @@ handle_dht_start_message(void *cls, struct GNUNET_DHT_Message *dht_msg) #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n", "DHT", "GENERIC", enc_type, GNUNET_h2s(&dht_msg->key), ntohl(dht_msg->unique_id)); + "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n", "DHT", "GENERIC", enc_type, GNUNET_h2s(&dht_msg->key), GNUNET_ntohll(dht_msg->unique_id)); #endif /* FIXME: Implement demultiplexing functionality here */ @@ -238,44 +298,26 @@ handle_dht_start_message(void *cls, struct GNUNET_DHT_Message *dht_msg) #endif } + GNUNET_SERVER_receive_done(client, GNUNET_OK); + } static void -handle_dht_stop_message(void *cls, struct GNUNET_DHT_StopMessage *dht_stop_msg) +handle_dht_stop_message(void *cls, struct GNUNET_SERVER_Client * client, + const struct GNUNET_MessageHeader *message) { + struct GNUNET_DHT_StopMessage * dht_stop_msg = (struct GNUNET_DHT_StopMessage *)message; #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from client, uid %llu\n", "DHT", "GENERIC STOP", ntohl(dht_stop_msg->unique_id)); -#endif -} - - - -/** - * Server handler for initiating local dht get requests - */ -static void handle_dht_plugin_message (void *cls, struct GNUNET_SERVER_Client * client, - const struct GNUNET_MessageHeader *message) -{ - - #if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from client, message type %d, size %d\n", "DHT", "GENERIC", ntohs(message->type), ntohs(message->size)); + "`%s': Received `%s' request from client, uid %llu\n", "DHT", "GENERIC STOP", GNUNET_ntohll(dht_stop_msg->unique_id)); #endif - - switch(ntohs(message->type)) - { - case GNUNET_MESSAGE_TYPE_DHT: - handle_dht_start_message(cls, (struct GNUNET_DHT_Message *)message); - case GNUNET_MESSAGE_TYPE_DHT_STOP: - handle_dht_stop_message(cls, (struct GNUNET_DHT_StopMessage *)message); - } - + send_client_receipt_confirmation(client, GNUNET_ntohll(dht_stop_msg->unique_id)); GNUNET_SERVER_receive_done(client, GNUNET_OK); } + /** * Core handler for p2p dht get requests. */ diff --git a/src/dht/test_dht_api.c b/src/dht/test_dht_api.c index 8092d7b2e..578af5aef 100644 --- a/src/dht/test_dht_api.c +++ b/src/dht/test_dht_api.c @@ -51,8 +51,8 @@ struct PeerContext struct GNUNET_CONFIGURATION_Handle *cfg; struct GNUNET_DHT_Handle *dht_handle; struct GNUNET_PeerIdentity id; - struct GNUNET_DHT_GetHandle *get_handle; - struct GNUNET_DHT_GetHandle *find_peer_handle; + struct GNUNET_DHT_RouteHandle *get_handle; + struct GNUNET_DHT_RouteHandle *find_peer_handle; #if START_ARM pid_t arm_pid; @@ -133,7 +133,7 @@ void test_put (void *cls, GNUNET_assert (peer->dht_handle != NULL); - GNUNET_DHT_put(peer->dht_handle, &hash, 0, data_size, data, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 360), NULL, NULL); + GNUNET_DHT_put(peer->dht_handle, &hash, 0, data_size, data, GNUNET_TIME_relative_to_absolute(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 360)) ,GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 360), NULL, NULL); //GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_put, &p1); @@ -155,9 +155,10 @@ void test_get_stop (void *cls, GNUNET_assert (peer->dht_handle != NULL); - GNUNET_DHT_get_stop(peer->dht_handle, peer->get_handle); + GNUNET_DHT_get_stop(peer->get_handle); - GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_put, &p1); + //GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_put, &p1); + GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &end, &p1); } @@ -174,10 +175,10 @@ void test_get (void *cls, GNUNET_HashCode hash; memset(&hash, 42, sizeof(GNUNET_HashCode)); - peer->dht_handle = GNUNET_DHT_connect (sched, peer->cfg); + peer->dht_handle = GNUNET_DHT_connect (sched, peer->cfg, 100); GNUNET_assert (peer->dht_handle != NULL); - peer->get_handle = GNUNET_DHT_get_start(peer->dht_handle, 42, &hash, NULL, NULL); + peer->get_handle = GNUNET_DHT_get_start(peer->dht_handle, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 100), 42, &hash, NULL, NULL); if (peer->get_handle == NULL) GNUNET_SCHEDULER_add_now(sched, &end_badly, &p1); -- 2.25.1