From: Christian Grothoff Date: Fri, 9 Apr 2010 11:10:25 +0000 (+0000) Subject: breaking DHT code X-Git-Tag: initial-import-from-subversion-38251~22219 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=54eccc36f12430505b3b946f88ac61087cc82793;p=oweals%2Fgnunet.git breaking DHT code --- diff --git a/src/Makefile.am b/src/Makefile.am index fadfab928..a01288078 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -24,7 +24,6 @@ SUBDIRS = \ transport \ peerinfo-tool \ core \ - dht \ testing \ $(HOSTLIST_DIR) \ topology \ diff --git a/src/dht/dht.h b/src/dht/dht.h index 6c2f4cbd8..38503cfe2 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h @@ -30,11 +30,11 @@ #define DEBUG_DHT GNUNET_NO typedef void (*GNUNET_DHT_MessageReceivedHandler) (void *cls, - struct GNUNET_MessageHeader + const struct GNUNET_MessageHeader * msg); /** - * Generic DHT message, wrapper for other message types + * FIXME. */ struct GNUNET_DHT_StopMessage { @@ -43,10 +43,15 @@ struct GNUNET_DHT_StopMessage */ struct GNUNET_MessageHeader header; + /** + * Always zero. + */ + uint32_t reserved GNUNET_PACKED; + /** * Unique ID identifying this request */ - uint64_t unique_id; + uint64_t unique_id GNUNET_PACKED; }; @@ -61,35 +66,33 @@ struct GNUNET_DHT_Message */ struct GNUNET_MessageHeader header; + /** + * Message options + */ + uint32_t options GNUNET_PACKED; + /** * The key to search for */ GNUNET_HashCode key; /** - * Replication level for this message + * Unique ID identifying this request */ - uint16_t desired_replication_level; + uint64_t unique_id GNUNET_PACKED; /** - * Message options + * Replication level for this message */ - uint16_t options; + uint32_t desired_replication_level GNUNET_PACKED; /** * Is this message uniquely identified? If so it will * be fire and forget, if not we will wait for a receipt * from the service. */ - uint16_t unique; - + uint32_t unique GNUNET_PACKED; - /** - * Unique ID identifying this request - */ - uint64_t unique_id; - - /* */ /* GNUNET_MessageHeader *enc actual DHT message, copied to end of this dealy do */ }; @@ -107,17 +110,17 @@ struct GNUNET_DHT_PutMessage /** * The type of data to insert. */ - size_t type; + size_t type GNUNET_PACKED; /** - * The size of the data, appended to the end of this message. + * How long should this data persist? */ - size_t data_size; + struct GNUNET_TIME_AbsoluteNBO expiration; /** - * How long should this data persist? + * The size of the data, appended to the end of this message. */ - struct GNUNET_TIME_Absolute expiration; + size_t data_size GNUNET_PACKED; }; @@ -135,7 +138,7 @@ struct GNUNET_DHT_GetMessage /** * The type for the data for the GET request */ - size_t type; + uint32_t type; }; @@ -152,7 +155,7 @@ struct GNUNET_DHT_GetResultMessage /** * The type for the data for the GET request */ - size_t type; + uint32_t type; /** * The key to search for @@ -164,55 +167,7 @@ struct GNUNET_DHT_GetResultMessage */ struct GNUNET_TIME_Absolute expiration; - /** - * The size of the data, appended to the end of this message. - */ - size_t data_size; - -}; - -/** - * Message to issue find peer request to the DHT - */ -struct GNUNET_DHT_FindPeerMessage -{ - /** - * Type: GNUNET_MESSAGE_TYPE_DHT_FIND_PEER - */ - struct GNUNET_MessageHeader header; - - /** - * Size of inject message (may be zero) - */ - size_t msg_len; - - /* Followed by message to inject at found peers */ - }; -/** - * Message to return data from the DHT - */ -struct GNUNET_DHT_FindPeerResultMessage -{ - /** - * Type: GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT - */ - struct GNUNET_MessageHeader header; - - /** - * The peer that was found - */ - struct GNUNET_PeerIdentity peer; - - /** - * The size of the return message from the peer - * (defaults to HELLO for the peer), - * appended to the end of this message, 0 if - * no message. - */ - size_t data_size; - -}; #endif /* DHT_H_ */ diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 8722e7b75..7dd3305eb 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -248,46 +248,36 @@ struct GNUNET_DHT_Handle struct GNUNET_DHT_NonUniqueHandle *non_unique_request; /** - * Kill off the connection and any pending messages. + * Generator for unique ids. */ - int do_destroy; + uint64_t uid_gen; }; -static struct GNUNET_TIME_Relative default_request_timeout; -/* Forward declaration */ -static void process_pending_message (struct GNUNET_DHT_Handle *handle); - -static GNUNET_HashCode * -hash_from_uid (uint64_t uid) +/** + * Convert unique ID to hash code. + * + * @param uid unique ID to convert + * @param hash set to uid (extended with zeros) + */ +static void +hash_from_uid (uint64_t uid, + GNUNET_HashCode *hash) { - int count; - int remaining; - GNUNET_HashCode *hash; - hash = GNUNET_malloc (sizeof (GNUNET_HashCode)); - count = 0; - - while (count < sizeof (GNUNET_HashCode)) - { - remaining = sizeof (GNUNET_HashCode) - count; - if (remaining > sizeof (uid)) - remaining = sizeof (uid); - - memcpy (hash, &uid, remaining); - count += remaining; - } - - return hash; + memset (hash, 0, sizeof(GNUNET_HashCode)); + *((uint64_t*)hash) = uid; } + /** * Handler for messages received from the DHT service * a demultiplexer which handles numerous message types * */ void -service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) +service_message_handler (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_DHT_Handle *handle = cls; struct GNUNET_DHT_Message *dht_msg; @@ -295,7 +285,7 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) struct GNUNET_MessageHeader *enc_msg; struct GNUNET_DHT_RouteHandle *route_handle; uint64_t uid; - GNUNET_HashCode *uid_hash; + GNUNET_HashCode uid_hash; size_t enc_size; /* TODO: find out message type, handle callbacks for different types of messages. * Should be a non unique acknowledgment, or unique result. */ @@ -304,9 +294,15 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) { #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received NULL from server, connection down?\n", + "`%s': Received NULL from server, connection down!\n", "DHT API"); #endif + GNUNET_CLIENT_disconnect (handle->client); + handle->client = GNUNET_CLIENT_connect (handle->sched, + "dht", + handle->cfg); + /* FIXME: re-transmit *all* of our GET requests AND re-start + receiving responses! */ return; } @@ -321,13 +317,12 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) "`%s': Received response to message (uid %llu)\n", "DHT API", uid); #endif - if (ntohs (dht_msg->unique)) + if (ntohl (dht_msg->unique)) { - uid_hash = hash_from_uid (uid); + hash_from_uid (uid, &uid_hash); route_handle = GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests, - uid_hash); - GNUNET_free (uid_hash); + &uid_hash); if (route_handle == NULL) /* We have no recollection of this request */ { #if DEBUG_DHT_API @@ -344,7 +339,6 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) GNUNET_assert (enc_size > 0); enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1]; route_handle->iter (route_handle->iter_cls, enc_msg); - } } break; @@ -409,25 +403,16 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched, struct GNUNET_DHT_Handle *handle; handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle)); - - default_request_timeout = - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5); handle->cfg = cfg; handle->sched = sched; - - handle->current = NULL; - handle->do_destroy = GNUNET_NO; - handle->th = NULL; - handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg); - handle->outstanding_requests = - GNUNET_CONTAINER_multihashmap_create (ht_len); - if (handle->client == NULL) { GNUNET_free (handle); return NULL; } + handle->outstanding_requests = + GNUNET_CONTAINER_multihashmap_create (ht_len); #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Connection to service in progress\n", "DHT API"); @@ -435,7 +420,6 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched, GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, GNUNET_TIME_UNIT_FOREVER_REL); - return handle; } @@ -453,7 +437,6 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) "`%s': Called GNUNET_DHT_disconnect\n", "DHT API"); #endif GNUNET_assert (handle != NULL); - if (handle->th != NULL) /* We have a live transmit request in the Aether */ { GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); @@ -467,7 +450,8 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); handle->client = NULL; } - + /* Either assert that outstanding_requests is empty */ + /* FIXME: handle->outstanding_requests not freed! */ GNUNET_free (handle); } @@ -506,6 +490,7 @@ finish (struct GNUNET_DHT_Handle *handle, int code) /* Otherwise we need to wait for a response to this message! */ } + /** * Transmit the next pending message, called by notify_transmit_ready */ @@ -591,13 +576,6 @@ process_pending_message (struct GNUNET_DHT_Handle *handle) return; } - /* TODO: set do_destroy somewhere's, see what needs to happen in that case! */ - if (handle->do_destroy) - { - //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */ - } - - if (NULL == (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, ntohs (handle-> @@ -613,6 +591,7 @@ process_pending_message (struct GNUNET_DHT_Handle *handle) "Failed to transmit request to dht service.\n"); #endif finish (handle, GNUNET_SYSERR); + return; } #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -722,28 +701,21 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, struct GNUNET_DHT_RouteHandle *route_handle; struct PendingMessage *pending; struct GNUNET_DHT_Message *message; - size_t is_unique; - size_t msize; - GNUNET_HashCode *uid_key; + size_t expects_response; + uint16_t msize; + GNUNET_HashCode uid_key; uint64_t uid; - is_unique = GNUNET_YES; - if (iter == NULL) - is_unique = GNUNET_NO; - - route_handle = NULL; - uid_key = NULL; - - do + if (sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { - GNUNET_free_non_null (uid_key); - uid = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, -1); - uid_key = hash_from_uid (uid); + GNUNET_break (0); + return NULL; } - while (GNUNET_CONTAINER_multihashmap_contains - (handle->outstanding_requests, uid_key) == GNUNET_YES); - - if (is_unique) + expects_response = GNUNET_YES; + if (iter == NULL) + expects_response = GNUNET_NO; + uid = handle->uid_gen++; + if (expects_response) { route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle)); memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode)); @@ -755,51 +727,33 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Unique ID is %llu\n", "DHT API", uid); #endif - /** - * Store based on random identifier! - */ GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests, - uid_key, route_handle, + &uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size); - - } - else - { - msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size); } - - GNUNET_free (uid_key); + msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size); message = GNUNET_malloc (msize); message->header.size = htons (msize); message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT); memcpy (&message->key, key, sizeof (GNUNET_HashCode)); - message->options = htons (options); - message->desired_replication_level = htons (options); - message->unique = htons (is_unique); + message->options = htonl (options); + message->desired_replication_level = htonl (options); + message->unique = htonl (expects_response); message->unique_id = GNUNET_htonll (uid); memcpy (&message[1], enc, ntohs (enc->size)); - pending = GNUNET_malloc (sizeof (struct PendingMessage)); pending->msg = &message->header; pending->timeout = timeout; pending->cont = cont; pending->cont_cls = cont_cls; - pending->is_unique = is_unique; + pending->expects_response = expects_response; pending->unique_id = uid; - GNUNET_assert (handle->current == NULL); - handle->current = pending; - process_pending_message (handle); - return route_handle; } -void -GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, - GNUNET_SCHEDULER_Task cont, void *cont_cls); /** * Perform an asynchronous GET operation on the DHT identified. @@ -851,13 +805,13 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, return get_handle; } + /** * Stop a previously issued routing request * * @param route_handle handle to the request to stop * @param cont continuation to call once this message is sent to the service or times out * @param cont_cls closure for the continuation - * */ void GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, @@ -866,10 +820,9 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, struct PendingMessage *pending; struct GNUNET_DHT_StopMessage *message; size_t msize; - GNUNET_HashCode *uid_key; + GNUNET_HashCode uid_key; msize = sizeof (struct GNUNET_DHT_StopMessage); - message = GNUNET_malloc (msize); message->header.size = htons (msize); message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP); @@ -879,37 +832,20 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, route_handle->uid); #endif message->unique_id = GNUNET_htonll (route_handle->uid); - GNUNET_assert (route_handle->dht_handle->current == NULL); - pending = GNUNET_malloc (sizeof (struct PendingMessage)); pending->msg = (struct GNUNET_MessageHeader *) message; pending->timeout = DEFAULT_DHT_TIMEOUT; pending->cont = cont; pending->cont_cls = cont_cls; - pending->is_unique = GNUNET_NO; pending->unique_id = route_handle->uid; - GNUNET_assert (route_handle->dht_handle->current == NULL); - route_handle->dht_handle->current = pending; - process_pending_message (route_handle->dht_handle); - - uid_key = hash_from_uid (route_handle->uid); - - if (GNUNET_CONTAINER_multihashmap_remove - (route_handle->dht_handle->outstanding_requests, uid_key, - route_handle) != GNUNET_YES) - { -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n", - "DHT API", GNUNET_h2s (uid_key), route_handle->uid); -#endif - } - GNUNET_free (uid_key); - return; + hash_from_uid (route_handle->uid, &uid_key); + GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove + (route_handle->dht_handle->outstanding_requests, &uid_key, + route_handle) == GNUNET_YES); } @@ -932,7 +868,6 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, #endif GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls); GNUNET_free (get_handle); - } diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index c1950673e..1214a8bab 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -54,11 +54,6 @@ static struct GNUNET_SCHEDULER_Handle *sched; */ static const struct GNUNET_CONFIGURATION_Handle *cfg; -/** - * Timeout for transmissions to clients - */ -static struct GNUNET_TIME_Relative client_transmit_timeout; - /** * Handle to the core service */ @@ -96,9 +91,14 @@ struct PendingMessage struct PendingMessage *next; /** - * Actual message to be sent + * Pointer to previous item in the list */ - struct GNUNET_MessageHeader *msg; + struct PendingMessage *prev; + + /** + * Actual message to be sent; // avoid allocation + */ + const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len); }; @@ -130,6 +130,11 @@ struct ClientList */ struct PendingMessage *pending_head; + /** + * Tail of linked list of pending messages for this client + */ + struct PendingMessage *pending_tail; + }; /** @@ -184,101 +189,30 @@ struct DHT_MessageContext */ static struct ClientList *client_list; - -/** - * Server handlers for handling locally received dht requests - */ -static void -handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message); - -static void -handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message); - -static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { - {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0}, - {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_STOP, 0}, - {NULL, NULL, 0, 0} -}; - - -/** - * Core handler for p2p dht get requests. - */ -static int handle_dht_p2p_get (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message, - struct GNUNET_TIME_Relative latency, - uint32_t distance); - -/** - * Core handler for p2p dht put requests. - */ -static int handle_dht_p2p_put (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message, - struct GNUNET_TIME_Relative latency, - uint32_t distance); - -/** - * Core handler for p2p dht find peer requests. - */ -static int handle_dht_p2p_find_peer (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader - *message, - struct GNUNET_TIME_Relative latency, - uint32_t distance); - -static struct GNUNET_CORE_MessageHandler core_handlers[] = { - {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_GET, 0}, - {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_PUT, 0}, - {&handle_dht_p2p_find_peer, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0}, - {NULL, 0, 0} -}; - /** * Forward declaration. */ static size_t send_generic_reply (void *cls, size_t size, void *buf); + /** * Task run to check for messages that need to be sent to a client. * - * @param cls a ClientList, containing the client and any messages to be sent to it - * @param tc reason this was called + * @param client a ClientList, containing the client and any messages to be sent to it */ static void -process_pending_messages (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct ClientList *client = cls; - - if (client->pending_head == NULL) /* No messages queued */ - { -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Have no pending messages for client.\n", "DHT"); -#endif - return; - } - - if (client->transmit_handle == NULL) /* No current pending messages, we can try to send! */ - client->transmit_handle = - GNUNET_SERVER_notify_transmit_ready (client->client_handle, - ntohs (client->pending_head->msg-> - size), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &send_generic_reply, client); - else - { -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Transmit handle is non-null.\n", "DHT"); -#endif - } +process_pending_messages (struct ClientList *client) +{ + if (client->pending_head == NULL) + return; + if (client->transmit_handle != NULL) + return; + client->transmit_handle = + GNUNET_SERVER_notify_transmit_ready (client->client_handle, + ntohs (client->pending_head->msg-> + size), + GNUNET_TIME_UNIT_FOREVER_REL, + &send_generic_reply, client); } /** @@ -297,42 +231,41 @@ static size_t send_generic_reply (void *cls, size_t size, void *buf) { struct ClientList *client = cls; - struct PendingMessage *reply = client->pending_head; - int ret; + char *cbuf = buf; + struct PendingMessage *reply; + size_t off; + size_t msize; client->transmit_handle = NULL; - if (buf == NULL) /* Message timed out, that's crappy... */ + if (buf == NULL) { + /* client disconnected */ #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': buffer was NULL\n", "DHT"); #endif - client->pending_head = reply->next; - GNUNET_free (reply->msg); - GNUNET_free (reply); return 0; } - - if (size >= ntohs (reply->msg->size)) + off = 0; + while ( (NULL != (reply = client->pending_head)) && + (size >= off + (msize = ntohs (reply->msg->size)))) { + GNUNET_CONTAINER_DLL_remove (client->pending_head, + client->pending_tail, + reply); + memcpy (&cbuf[off], reply->msg, msize); + GNUNET_free (reply->msg); + GNUNET_free (reply); + off += msize; + } #if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Copying reply to buffer, REALLY SENT\n", "DHT"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Copying reply to buffer, REALLY SENT\n", "DHT"); #endif - memcpy (buf, reply->msg, ntohs (reply->msg->size)); - - ret = ntohs (reply->msg->size); - } - else - ret = 0; - - client->pending_head = reply->next; - GNUNET_free (reply->msg); - GNUNET_free (reply); - - GNUNET_SCHEDULER_add_now (sched, &process_pending_messages, client); - return ret; + process_pending_messages (client); + return off; } + /** * Add a PendingMessage to the clients list of messages to be sent * @@ -343,36 +276,14 @@ static void add_pending_message (struct ClientList *client, struct PendingMessage *pending_message) { - struct PendingMessage *pos; - struct PendingMessage *prev; - - pos = client->pending_head; - -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Adding pending message for client.\n", "DHT"); -#endif - - if (pos == NULL) - { - client->pending_head = pending_message; - } - else /* This means another request is already queued, rely on send_reply to process all pending messages */ - { - while (pos != NULL) /* Find end of list */ - { - prev = pos; - pos = pos->next; - } - - GNUNET_assert (prev != NULL); - prev->next = pending_message; - } - - GNUNET_SCHEDULER_add_now (sched, &process_pending_messages, client); - + GNUNET_CONTAINER_DLL_insert_after (client->pending_head, + client->pending_tail, + client->pending_tail, + pending_message); + process_pending_messages (client); } + /** * Called when a reply needs to be sent to a client, either as * a result it found to a GET or FIND PEER request. @@ -383,13 +294,12 @@ add_pending_message (struct ClientList *client, */ static void send_reply_to_client (struct ClientList *client, - struct GNUNET_MessageHeader *message, + const struct GNUNET_MessageHeader *message, unsigned long long uid) { struct GNUNET_DHT_Message *reply; struct PendingMessage *pending_message; - - size_t msize; + uint16_t msize; size_t tsize; #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -397,17 +307,20 @@ send_reply_to_client (struct ClientList *client, #endif msize = ntohs (message->size); tsize = sizeof (struct GNUNET_DHT_Message) + msize; + if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_BREAK_op (0); + return; + } reply = GNUNET_malloc (tsize); reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT); reply->header.size = htons (tsize); if (uid != 0) - reply->unique = htons (GNUNET_YES); + reply->unique = htonl (GNUNET_YES); // ???? reply->unique_id = GNUNET_htonll (uid); memcpy (&reply[1], message, msize); - - pending_message = GNUNET_malloc (sizeof (struct PendingMessage)); + pending_message = GNUNET_malloc (sizeof (struct PendingMessage)); // inline pending_message->msg = &reply->header; - add_pending_message (client, pending_message); } @@ -447,57 +360,53 @@ datacache_get_iterator (void *cls, memcpy (&get_result->key, key, sizeof (GNUNET_HashCode)); get_result->type = htons (type); memcpy (&get_result[1], data, size); - send_reply_to_client (datacache_get_ctx->client, &get_result->header, datacache_get_ctx->unique_id); - GNUNET_free (get_result); return GNUNET_OK; } + /** * Server handler for initiating local dht get requests * * @param cls closure for service - * @param get_msg the actual get message + * @param msg the actual get message * @param message_context struct containing pertinent information about the get request - * */ static void -handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg, +handle_dht_get (void *cls, + const struct GNUNET_MessageHeader *msg, struct DHT_MessageContext *message_context) { - size_t get_type; + const struct GNUNET_DHT_GetMessage *get_msg; + uint16_t get_type; unsigned int results; - struct DatacacheGetContext *datacache_get_context; + struct DatacacheGetContext datacache_get_context; - GNUNET_assert (ntohs (get_msg->header.size) >= - sizeof (struct GNUNET_DHT_GetMessage)); + if (ntohs (msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage)) + { + GNUNET_break (0); + return; + } + get_msg = (const struct GNUNET_DHT_GetMessage *) msg; get_type = ntohs (get_msg->type); - #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n", + "`%s': Received `%s' request from client, message type %u, key %s, uid %llu\n", "DHT", "GET", get_type, GNUNET_h2s (message_context->key), message_context->unique_id); #endif - - datacache_get_context = GNUNET_malloc (sizeof (struct DatacacheGetContext)); - datacache_get_context->client = message_context->client; - datacache_get_context->unique_id = message_context->unique_id; - + datacache_get_context.client = message_context->client; + datacache_get_context.unique_id = message_context->unique_id; results = 0; if (datacache != NULL) results = GNUNET_DATACACHE_get (datacache, message_context->key, get_type, - &datacache_get_iterator, datacache_get_context); - + &datacache_get_iterator, &datacache_get_context); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Found %d results for local `%s' request\n", "DHT", results, "GET"); - - GNUNET_free (datacache_get_context); - /* FIXME: Implement get functionality here */ } @@ -510,12 +419,14 @@ handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg, * */ static void -handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg, +handle_dht_find_peer (void *cls, + const struct GNUNET_MessageHeader *find_msg, struct DHT_MessageContext *message_context) { struct GNUNET_DHT_FindPeerResultMessage *find_peer_result; size_t hello_size; size_t tsize; + #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n", @@ -523,10 +434,6 @@ handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg, ntohs (find_msg->header.size), sizeof (struct GNUNET_DHT_FindPeerMessage)); #endif - - GNUNET_assert (ntohs (find_msg->header.size) >= - sizeof (struct GNUNET_DHT_FindPeerMessage)); - if (my_hello == NULL) { #if DEBUG_DHT @@ -534,23 +441,18 @@ handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg, "`%s': Our HELLO is null, can't return.\n", "DHT"); #endif - return; } - /* Simplistic find_peer functionality, always return our hello */ hello_size = ntohs(my_hello->size); tsize = hello_size + sizeof (struct GNUNET_DHT_FindPeerResultMessage); + // check tsize < MAX find_peer_result = GNUNET_malloc (tsize); find_peer_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT); find_peer_result->header.size = htons (tsize); - find_peer_result->data_size = htons (hello_size); - memcpy(&find_peer_result->peer, &my_identity, sizeof(struct GNUNET_PeerIdentity)); memcpy (&find_peer_result[1], &my_hello, hello_size); - send_reply_to_client(message_context->client, &find_peer_result->header, message_context->unique_id); GNUNET_free(find_peer_result); - /* FIXME: Implement find peer functionality here */ } @@ -562,43 +464,32 @@ handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg, * @param message_context struct containing pertinent information about the request */ static void -handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg, +handle_dht_put (void *cls, + const struct GNUNET_MessageHeader *msg, struct DHT_MessageContext *message_context) { + struct GNUNET_DHT_PutMessage *put_msg; size_t put_type; size_t data_size; - GNUNET_assert (ntohs (put_msg->header.size) >= + GNUNET_assert (ntohs (msg->header.size) >= sizeof (struct GNUNET_DHT_PutMessage)); - - put_type = ntohs (put_msg->type); - data_size = ntohs (put_msg->data_size); + put_msg = (struct GNUNET_DHT_PutMessage *)msg; + put_type = ntohl (put_msg->type); + data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage); #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': %s msg total size is %d, data size %d, struct size %d\n", "DHT", "PUT", ntohs (put_msg->header.size), data_size, sizeof (struct GNUNET_DHT_PutMessage)); -#endif - GNUNET_assert (ntohs (put_msg->header.size) == - sizeof (struct GNUNET_DHT_PutMessage) + data_size); - -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "PUT", put_type, GNUNET_h2s (message_context->key)); #endif - - /** - * Simplest DHT functionality, store any message we receive a put request for. - */ if (datacache != NULL) GNUNET_DATACACHE_put (datacache, message_context->key, data_size, (char *) &put_msg[1], put_type, put_msg->expiration); - /** - * FIXME: Implement dht put request functionality here! - */ - } @@ -626,8 +517,6 @@ find_active_client (struct GNUNET_SERVER_Client *client) ret->client_handle = client; ret->next = client_list; client_list = ret; - ret->pending_head = NULL; - return ret; } @@ -679,13 +568,12 @@ static void handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { - struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *) message; - struct GNUNET_MessageHeader *enc_msg; + const struct GNUNET_DHT_Message *dht_msg = (const struct GNUNET_DHT_Message *) message; + const struct GNUNET_MessageHeader *enc_msg; struct DHT_MessageContext *message_context; - size_t enc_type; - enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1]; + enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1]; enc_type = ntohs (enc_msg->type); @@ -700,31 +588,32 @@ handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, message_context->client = find_active_client (client); message_context->key = &dht_msg->key; message_context->unique_id = GNUNET_ntohll (dht_msg->unique_id); - message_context->replication = ntohs (dht_msg->desired_replication_level); - message_context->msg_options = ntohs (dht_msg->options); + message_context->replication = ntohl (dht_msg->desired_replication_level); + message_context->msg_options = ntohl (dht_msg->options); + /* FIXME: Implement *remote* DHT operations here (forward request) */ + /* FIXME: *IF* handling should be local, then do this: */ switch (enc_type) { case GNUNET_MESSAGE_TYPE_DHT_GET: - handle_dht_get (cls, (struct GNUNET_DHT_GetMessage *) enc_msg, + handle_dht_get (cls, enc_msg, message_context); break; case GNUNET_MESSAGE_TYPE_DHT_PUT: - handle_dht_put (cls, (struct GNUNET_DHT_PutMessage *) enc_msg, + handle_dht_put (cls, enc_msg, message_context); send_client_receipt_confirmation (client, GNUNET_ntohll (dht_msg->unique_id)); break; case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: handle_dht_find_peer (cls, - (struct GNUNET_DHT_FindPeerMessage *) enc_msg, + enc_msg, message_context); break; default: GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "`%s': Message type (%d) not handled\n", "DHT", enc_type); } - GNUNET_free (message_context); GNUNET_SERVER_receive_done (client, GNUNET_OK); @@ -745,75 +634,55 @@ static void handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { - struct GNUNET_DHT_StopMessage *dht_stop_msg = - (struct GNUNET_DHT_StopMessage *) message; + const struct GNUNET_DHT_StopMessage *dht_stop_msg = + (const struct GNUNET_DHT_StopMessage *) message; #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Received `%s' request from client, uid %llu\n", "DHT", "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id)); #endif - - /* TODO: Put in demultiplexing here */ - - send_client_receipt_confirmation (client, - GNUNET_ntohll (dht_stop_msg->unique_id)); + /* TODO: actually stop... */ GNUNET_SERVER_receive_done (client, GNUNET_OK); } /** - * Core handler for p2p dht get requests. + * Core handler for p2p route requests. */ static int -handle_dht_p2p_get (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message, - struct GNUNET_TIME_Relative latency, uint32_t distance) +handle_dht_p2p_route_request (void *cls, + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + struct GNUNET_TIME_Relative latency, uint32_t distance) { #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Received `%s' request from another peer\n", "DHT", "GET"); #endif - + // FIXME: setup tracking for sending replies to peer (with timeout) + // FIXME: call code from handle_dht_start_message (refactor...) return GNUNET_YES; } -/** - * Core handler for p2p dht put requests. - */ -static int -handle_dht_p2p_put (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message, - struct GNUNET_TIME_Relative latency, uint32_t distance) -{ -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from another peer\n", "DHT", - "PUT"); -#endif - - return GNUNET_YES; -} /** - * Core handler for p2p dht find peer requests. + * Core handler for p2p route results. */ static int -handle_dht_p2p_find_peer (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message, - struct GNUNET_TIME_Relative latency, - uint32_t distance) +handle_dht_p2p_route_result (void *cls, + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + struct GNUNET_TIME_Relative latency, uint32_t distance) { #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Received `%s' request from another peer\n", "DHT", - "FIND PEER"); + "GET"); #endif - + // FIXME: setup tracking for sending replies to peer + // FIXME: possibly call code from handle_dht_stop_message? (unique result?) (refactor...) return GNUNET_YES; } @@ -898,6 +767,20 @@ core_init (void *cls, } +static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { + {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0}, + {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_STOP, 0}, + {NULL, NULL, 0, 0} +}; + + +static struct GNUNET_CORE_MessageHandler core_handlers[] = { + {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_ROUTE_REQUEST, 0}, + {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT, 0}, + {NULL, 0, 0} +}; + + /** * Process dht requests. * @@ -914,16 +797,11 @@ run (void *cls, { sched = scheduler; cfg = c; - datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache"); - - client_transmit_timeout = - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5); GNUNET_SERVER_add_handlers (server, plugin_handlers); - coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */ cfg, /* Main configuration */ - client_transmit_timeout, /* Delay for connecting */ + GNUNET_TIME_UNIT_FOREVER_REL, NULL, /* FIXME: anything we want to pass around? */ &core_init, /* Call core_init once connected */ NULL, /* Don't care about pre-connects */ @@ -934,18 +812,13 @@ run (void *cls, NULL, /* Don't want notified about all outbound messages */ GNUNET_NO, /* For header only outbound notification */ core_handlers); /* Register these handlers */ - + if (coreAPI == NULL) + return; transport_handle = GNUNET_TRANSPORT_connect(sched, cfg, NULL, NULL, NULL, NULL); - if (transport_handle != NULL) GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL); else GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to connect to transport service!\n"); - - - if (coreAPI == NULL) - return; - /* Scheduled the task to clean up when shutdown is called */ cleanup_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_UNIT_FOREVER_REL, diff --git a/src/include/gnunet_dht_service.h b/src/include/gnunet_dht_service.h index f4c68bab6..d14606d70 100644 --- a/src/include/gnunet_dht_service.h +++ b/src/include/gnunet_dht_service.h @@ -28,6 +28,7 @@ #define GNUNET_DHT_SERVICE_H #include "gnunet_util_lib.h" +#include "gnunet_hello_lib.h" #ifdef __cplusplus extern "C" @@ -145,15 +146,16 @@ typedef void (*GNUNET_DHT_GetIterator)(void *cls, * * @param handle handle to the DHT service * @param timeout timeout for this request to be sent to the - * service - * @param type expected type of the response object + * service (this is NOT a timeout for receiving responses) + * @param type expected type of the response object (GNUNET_DATASTORE_BLOCKTYPE_*) * @param key the key to look up * @param iter function to call on each result * @param iter_cls closure for iter - * @param cont continuation to call once message sent + * @param cont continuation to call once message sent (and it is now + * safe to do another operation on the DHT) * @param cont_cls closure for continuation - * - * @return handle to stop the async get, NULL on error + * @return handle to stop the async get, NULL on error (two + * concurrent operations scheduled) */ struct GNUNET_DHT_GetHandle * GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, @@ -165,13 +167,18 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls); + /** * Stop async DHT-get. Frees associated resources. * * @param get_handle GET operation to stop. + * @param cont continuation to call once this message is sent to the service + * @param cont_cls closure for the continuation */ void -GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls); +GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, + GNUNET_SCHEDULER_Task cont, + void *cont_cls); /** @@ -197,11 +204,10 @@ enum GNUNET_DHT_RouteOption * operation * * @param cls closure - * @param reply response + * @param peer hello of a target (peer near key) */ typedef void (*GNUNET_DHT_FindPeerProcessor)(void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *reply); + const struct GNUNET_HELLO_Message *peer); /** @@ -211,43 +217,50 @@ typedef void (*GNUNET_DHT_FindPeerProcessor)(void *cls, * @param timeout timeout for this request to be sent to the * service * @param options routing options for this message - * @param message a message to inject at found peers (may be null) * @param key the key to look up * @param proc function to call on each result * @param proc_cls closure for proc * @param cont continuation to call once message sent * @param cont_cls closure for continuation - * * @return handle to stop the async get, NULL on error */ struct GNUNET_DHT_FindPeerHandle * GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle, - struct GNUNET_TIME_Relative timeout, - enum GNUNET_DHT_RouteOption options, - struct GNUNET_MessageHeader *message, - const GNUNET_HashCode * key, - GNUNET_DHT_FindPeerProcessor proc, - void *proc_cls, - GNUNET_SCHEDULER_Task cont, - void *cont_cls); + struct GNUNET_TIME_Relative timeout, + enum GNUNET_DHT_RouteOption options, + const GNUNET_HashCode * key, + GNUNET_DHT_FindPeerProcessor proc, + void *proc_cls, + GNUNET_SCHEDULER_Task cont, + void *cont_cls); + /** * Stop async find peer. Frees associated resources. * * @param find_peer_handle GET operation to stop. + * @param cont continuation to call once this message is sent to the service + * @param cont_cls closure for the continuation */ void -GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls); +GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, + GNUNET_SCHEDULER_Task cont, + void *cont_cls); + /** * Iterator called on each result obtained from a generic route * operation + * + * @param cls closure + * @param reply response */ typedef void (*GNUNET_DHT_ReplyProcessor)(void *cls, const struct GNUNET_MessageHeader *reply); + /** - * Perform an asynchronous FIND_PEER operation on the DHT. + * Perform an asynchronous ROUTE_START operation on the DHT. * * @param handle handle to the DHT service * @param key the key to look up @@ -261,7 +274,6 @@ typedef void (*GNUNET_DHT_ReplyProcessor)(void *cls, * to wait for transmission to the service * @param iter function to call on each result, NULL if no replies are expected * @param iter_cls closure for iter - * @param cont continuation to call when done, GNUNET_SYSERR if failed * GNUNET_OK otherwise * @param cont_cls closure for cont @@ -279,8 +291,18 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls); + +/** + * Stop async route stop. Frees associated resources. + * + * @param route_handle operation to stop. + * @param cont continuation to call once this message is sent to the service + * @param cont_cls closure for the continuation + */ void -GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls); +GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, + GNUNET_SCHEDULER_Task cont, + void *cont_cls); #if 0 /* keep Emacsens' auto-indent happy */