From: Christian Grothoff Date: Mon, 26 Sep 2016 18:53:28 +0000 (+0000) Subject: refactor DHT for new service API X-Git-Tag: initial-import-from-subversion-38251~196 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=60ff113fe4e7bb71d5696063b9a9b81eba60a108;p=oweals%2Fgnunet.git refactor DHT for new service API --- diff --git a/src/dht/Makefile.am b/src/dht/Makefile.am index f44721094..f7dc5df6d 100644 --- a/src/dht/Makefile.am +++ b/src/dht/Makefile.am @@ -63,7 +63,6 @@ noinst_PROGRAMS = \ gnunet_service_dht_SOURCES = \ gnunet-service-dht.c gnunet-service-dht.h \ - gnunet-service-dht_clients.c gnunet-service-dht_clients.h \ gnunet-service-dht_datacache.c gnunet-service-dht_datacache.h \ gnunet-service-dht_hello.c gnunet-service-dht_hello.h \ gnunet-service-dht_nse.c gnunet-service-dht_nse.h \ diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index 1e3dd339d..a2ba2e8b0 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -33,7 +33,6 @@ #include "gnunet_dht_service.h" #include "gnunet_statistics_service.h" #include "gnunet-service-dht.h" -#include "gnunet-service-dht_clients.h" #include "gnunet-service-dht_datacache.h" #include "gnunet-service-dht_hello.h" #include "gnunet-service-dht_neighbours.h" @@ -46,6 +45,11 @@ */ struct GNUNET_STATISTICS_Handle *GDS_stats; +/** + * Handle for the service. + */ +struct GNUNET_SERVICE_Handle *GDS_service; + /** * Our handle to the BLOCK library. */ @@ -56,11 +60,6 @@ struct GNUNET_BLOCK_Context *GDS_block_context; */ const struct GNUNET_CONFIGURATION_Handle *GDS_cfg; -/** - * Handle to our server. - */ -struct GNUNET_SERVER_Handle *GDS_server; - /** * Our HELLO */ @@ -77,6 +76,10 @@ static struct GNUNET_TRANSPORT_HelloGetHandle *ghh; struct GNUNET_TIME_Relative hello_expiration; +/* Code shared between different DHT implementations */ +#include "gnunet-service-dht_clients.c" + + /** * Receive the HELLO from transport service, free current and replace * if necessary. @@ -90,7 +93,9 @@ process_hello (void *cls, { GNUNET_free_non_null (GDS_my_hello); GDS_my_hello = GNUNET_malloc (ntohs (message->size)); - GNUNET_memcpy (GDS_my_hello, message, ntohs (message->size)); + GNUNET_memcpy (GDS_my_hello, + message, + ntohs (message->size)); } @@ -133,17 +138,16 @@ shutdown_task (void *cls) * Process dht requests. * * @param cls closure - * @param server the initialized server * @param c configuration to use + * @param service the initialized service */ static void run (void *cls, - struct GNUNET_SERVER_Handle *server, - const struct GNUNET_CONFIGURATION_Handle *c) + const struct GNUNET_CONFIGURATION_Handle *c, + struct GNUNET_SERVICE_Handle *service) { GDS_cfg = c; - GDS_server = server; - GNUNET_SERVER_suspend (server); + GDS_service = service; if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (c, "transport", @@ -153,7 +157,10 @@ run (void *cls, hello_expiration = GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION; } GDS_block_context = GNUNET_BLOCK_context_create (GDS_cfg); - GDS_stats = GNUNET_STATISTICS_create ("dht", GDS_cfg); + GDS_stats = GNUNET_STATISTICS_create ("dht", + GDS_cfg); + GNUNET_SERVICE_suspend (GDS_service); + GDS_CLIENTS_init (); GDS_ROUTING_init (); GDS_NSE_init (); GDS_DATACACHE_init (); @@ -172,28 +179,10 @@ run (void *cls, } -/** - * The main function for the dht service. - * - * @param argc number of arguments from the command line - * @param argv command line arguments - * @return 0 ok, 1 on error - */ -int -main (int argc, - char *const *argv) -{ - int ret; - - ret = (GNUNET_OK == - GNUNET_SERVICE_run (argc, - argv, - "dht", - GNUNET_SERVICE_OPTION_NONE, - &run, - NULL)) ? 0 : 1; - GDS_CLIENTS_done (); - return ret; -} +/* Finally, define the main method */ +GDS_DHT_SERVICE_INIT(&run); + + + /* end of gnunet-service-dht.c */ diff --git a/src/dht/gnunet-service-dht.h b/src/dht/gnunet-service-dht.h index 4684c2324..bc7a48b5a 100644 --- a/src/dht/gnunet-service-dht.h +++ b/src/dht/gnunet-service-dht.h @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2011 GNUnet e.V. + Copyright (C) 2009-2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -29,6 +29,7 @@ #include "gnunet_util_lib.h" #include "gnunet_statistics_service.h" #include "gnunet_transport_service.h" +#include "gnunet_block_lib.h" #define DEBUG_DHT GNUNET_EXTRA_LOGGING @@ -37,6 +38,11 @@ */ extern const struct GNUNET_CONFIGURATION_Handle *GDS_cfg; +/** + * Handle for the service. + */ +extern struct GNUNET_SERVICE_Handle *GDS_service; + /** * Our handle to the BLOCK library. */ @@ -48,14 +54,111 @@ extern struct GNUNET_BLOCK_Context *GDS_block_context; extern struct GNUNET_STATISTICS_Handle *GDS_stats; /** - * Handle to our server. + * Our HELLO */ -extern struct GNUNET_SERVER_Handle *GDS_server; +extern struct GNUNET_MessageHeader *GDS_my_hello; + + /** - * Our HELLO + * Handle a reply we've received from another peer. If the reply + * matches any of our pending queries, forward it to the respective + * client(s). + * + * @param expiration when will the reply expire + * @param key the query this reply is for + * @param get_path_length number of peers in @a get_path + * @param get_path path the reply took on get + * @param put_path_length number of peers in @a put_path + * @param put_path path the reply took on put + * @param type type of the reply + * @param data_size number of bytes in @a data + * @param data application payload data */ -extern struct GNUNET_MessageHeader *GDS_my_hello; +void +GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, + const struct GNUNET_HashCode *key, + unsigned int get_path_length, + const struct GNUNET_PeerIdentity *get_path, + unsigned int put_path_length, + const struct GNUNET_PeerIdentity *put_path, + enum GNUNET_BLOCK_Type type, size_t data_size, + const void *data); + +/** + * Check if some client is monitoring GET messages and notify + * them in that case. + * + * @param options Options, for instance RecordRoute, DemultiplexEverywhere. + * @param type The type of data in the request. + * @param hop_count Hop count so far. + * @param path_length number of entries in path (or 0 if not recorded). + * @param path peers on the GET path (or NULL if not recorded). + * @param desired_replication_level Desired replication level. + * @param key Key of the requested data. + */ +void +GDS_CLIENTS_process_get (uint32_t options, + enum GNUNET_BLOCK_Type type, + uint32_t hop_count, + uint32_t desired_replication_level, + unsigned int path_length, + const struct GNUNET_PeerIdentity *path, + const struct GNUNET_HashCode *key); + + +/** + * Check if some client is monitoring GET RESP messages and notify + * them in that case. + * + * @param type The type of data in the result. + * @param get_path Peers on GET path (or NULL if not recorded). + * @param get_path_length number of entries in @a get_path. + * @param put_path peers on the PUT path (or NULL if not recorded). + * @param put_path_length number of entries in @a get_path. + * @param exp Expiration time of the data. + * @param key Key of the @a data. + * @param data Pointer to the result data. + * @param size Number of bytes in @a data. + */ +void +GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, + const struct GNUNET_PeerIdentity *get_path, + unsigned int get_path_length, + const struct GNUNET_PeerIdentity *put_path, + unsigned int put_path_length, + struct GNUNET_TIME_Absolute exp, + const struct GNUNET_HashCode * key, + const void *data, + size_t size); + + +/** + * Check if some client is monitoring PUT messages and notify + * them in that case. + * + * @param options Options, for instance RecordRoute, DemultiplexEverywhere. + * @param type The type of data in the request. + * @param hop_count Hop count so far. + * @param path_length number of entries in path (or 0 if not recorded). + * @param path peers on the PUT path (or NULL if not recorded). + * @param desired_replication_level Desired replication level. + * @param exp Expiration time of the data. + * @param key Key under which data is to be stored. + * @param data Pointer to the data carried. + * @param size Number of bytes in data. + */ +void +GDS_CLIENTS_process_put (uint32_t options, + enum GNUNET_BLOCK_Type type, + uint32_t hop_count, + uint32_t desired_replication_level, + unsigned int path_length, + const struct GNUNET_PeerIdentity *path, + struct GNUNET_TIME_Absolute exp, + const struct GNUNET_HashCode * key, + const void *data, + size_t size); #endif diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c index 9dbeef6bd..0e344b566 100644 --- a/src/dht/gnunet-service-dht_clients.c +++ b/src/dht/gnunet-service-dht_clients.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009, 2010, 2011 GNUnet e.V. + Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -30,7 +30,6 @@ #include "gnunet_protocols.h" #include "gnunet_statistics_service.h" #include "gnunet-service-dht.h" -#include "gnunet-service-dht_clients.h" #include "gnunet-service-dht_datacache.h" #include "gnunet-service-dht_neighbours.h" #include "dht.h" @@ -43,70 +42,13 @@ #define LOG(kind,...) GNUNET_log_from (kind, "dht-clients",__VA_ARGS__) -/** - * Linked list of messages to send to clients. - */ -struct PendingMessage -{ - /** - * Pointer to next item in the list - */ - struct PendingMessage *next; - - /** - * Pointer to previous item in the list - */ - struct PendingMessage *prev; - - /** - * Actual message to be sent, allocated at the end of the struct: - * // msg = (cast) &pm[1]; - * // GNUNET_memcpy (&pm[1], data, len); - */ - const struct GNUNET_MessageHeader *msg; - -}; - /** * Struct containing information about a client, * handle to connect to it, and any pending messages * that need to be sent to it. */ -struct ClientList -{ - /** - * Linked list of active clients - */ - struct ClientList *next; - - /** - * Linked list of active clients - */ - struct ClientList *prev; - - /** - * The handle to this client - */ - struct GNUNET_SERVER_Client *client_handle; - - /** - * Handle to the current transmission request, NULL - * if none pending. - */ - struct GNUNET_SERVER_TransmitHandle *transmit_handle; - - /** - * Linked list of pending messages for this client - */ - struct PendingMessage *pending_head; - - /** - * Tail of linked list of pending messages for this client - */ - struct PendingMessage *pending_tail; - -}; +struct ClientHandle; /** @@ -120,10 +62,20 @@ struct ClientQueryRecord */ struct GNUNET_HashCode key; + /** + * Kept in a DLL with @e client. + */ + struct ClientQueryRecord *next; + + /** + * Kept in a DLL with @e client. + */ + struct ClientQueryRecord *prev; + /** * Client responsible for the request. */ - struct ClientList *client; + struct ClientHandle *ch; /** * Extended query (see gnunet_block_lib.h), allocated at the end of this struct. @@ -193,22 +145,22 @@ struct ClientMonitorRecord /** * Next element in DLL. */ - struct ClientMonitorRecord *next; + struct ClientMonitorRecord *next; /** * Previous element in DLL. */ - struct ClientMonitorRecord *prev; + struct ClientMonitorRecord *prev; /** * Type of blocks that are of interest */ - enum GNUNET_BLOCK_Type type; + enum GNUNET_BLOCK_Type type; /** * Key of data of interest, NULL for all. */ - struct GNUNET_HashCode *key; + struct GNUNET_HashCode *key; /** * Flag whether to notify about GET messages. @@ -228,19 +180,39 @@ struct ClientMonitorRecord /** * Client to notify of these requests. */ - struct ClientList *client; + struct ClientHandle *ch; }; /** - * List of active clients. + * Struct containing information about a client, + * handle to connect to it, and any pending messages + * that need to be sent to it. */ -static struct ClientList *client_head; +struct ClientHandle +{ + /** + * Linked list of active queries of this client. + */ + struct ClientQueryRecord *cqr_head; + + /** + * Linked list of active queries of this client. + */ + struct ClientQueryRecord *cqr_tail; + + /** + * The handle to this client + */ + struct GNUNET_SERVICE_Client *client; + + /** + * The message queue to this client + */ + struct GNUNET_MQ_Handle *mq; + +}; -/** - * List of active clients. - */ -static struct ClientList *client_tail; /** * List of active monitoring requests. @@ -265,88 +237,55 @@ static struct GNUNET_CONTAINER_Heap *retry_heap; /** * Task that re-transmits requests (using retry_heap). */ -static struct GNUNET_SCHEDULER_Task * retry_task; +static struct GNUNET_SCHEDULER_Task *retry_task; /** - * Task run to check for messages that need to be sent to a client. + * Free data structures associated with the given query. * - * @param client a ClientList, containing the client and any messages to be sent to it + * @param record record to remove */ static void -process_pending_messages (struct ClientList *client); - - -/** - * Add a PendingMessage to the clients list of messages to be sent - * - * @param client the active client to send the message to - * @param pending_message the actual message to send - */ -static void -add_pending_message (struct ClientList *client, - struct PendingMessage *pending_message) +remove_client_record (struct ClientQueryRecord *record) { - GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail, - pending_message); - process_pending_messages (client); -} + struct ClientHandle *ch = record->ch; - -/** - * Find a client if it exists, add it otherwise. - * - * @param client the server handle to the client - * - * @return the client if found, a new client otherwise - */ -static struct ClientList * -find_active_client (struct GNUNET_SERVER_Client *client) -{ - struct ClientList *pos = client_head; - struct ClientList *ret; - - while (pos != NULL) - { - if (pos->client_handle == client) - return pos; - pos = pos->next; - } - ret = GNUNET_new (struct ClientList); - ret->client_handle = client; - GNUNET_CONTAINER_DLL_insert (client_head, client_tail, ret); - return ret; + GNUNET_CONTAINER_DLL_remove (ch->cqr_head, + ch->cqr_tail, + record); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (forward_map, + &record->key, + record)); + if (NULL != record->hnode) + GNUNET_CONTAINER_heap_remove_node (record->hnode); + GNUNET_array_grow (record->seen_replies, + record->seen_replies_count, + 0); + GNUNET_free (record); } /** - * Iterator over hash map entries that frees all entries - * associated with the given client. + * Functions with this signature are called whenever a local client is + * connects to us. * - * @param cls client to search for in source routes - * @param key current key code (ignored) - * @param value value in the hash map, a ClientQueryRecord - * @return #GNUNET_YES (we should continue to iterate) + * @param cls closure (NULL for dht) + * @param client identification of the client + * @param mq message queue for talking to @a client + * @return our `struct ClientHandle` for @a client */ -static int -remove_client_records (void *cls, const struct GNUNET_HashCode * key, void *value) +static void * +client_connect_cb (void *cls, + struct GNUNET_SERVICE_Client *client, + struct GNUNET_MQ_Handle *mq) { - struct ClientList *client = cls; - struct ClientQueryRecord *record = value; + struct ClientHandle *ch; - if (record->client != client) - return GNUNET_YES; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Removing client %p's record for key %s\n", client, - GNUNET_h2s (key)); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (forward_map, key, - record)); - if (NULL != record->hnode) - GNUNET_CONTAINER_heap_remove_node (record->hnode); - GNUNET_array_grow (record->seen_replies, record->seen_replies_count, 0); - GNUNET_free (record); - return GNUNET_YES; + ch = GNUNET_new (struct ClientHandle); + ch->client = client; + ch->mq = mq; + return ch; } @@ -355,48 +294,44 @@ remove_client_records (void *cls, const struct GNUNET_HashCode * key, void *valu * is disconnected on the network level. * * @param cls closure (NULL for dht) - * @param client identification of the client; NULL - * for the last call when the server is destroyed + * @param client identification of the client + * @param app_ctx our `struct ClientHandle` for @a client */ static void -handle_client_disconnect (void *cls, - struct GNUNET_SERVER_Client *client) +client_disconnect_cb (void *cls, + struct GNUNET_SERVICE_Client *client, + void *app_ctx) { - struct ClientList *pos; - struct PendingMessage *reply; + struct ClientHandle *ch = app_ctx; + struct ClientQueryRecord *cqr; struct ClientMonitorRecord *monitor; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", - client); - pos = find_active_client (client); - GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos); - if (pos->transmit_handle != NULL) - GNUNET_SERVER_notify_transmit_ready_cancel (pos->transmit_handle); - while (NULL != (reply = pos->pending_head)) - { - GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, reply); - GNUNET_free (reply); - } + ch); monitor = monitor_head; while (NULL != monitor) { - if (monitor->client == pos) + if (monitor->ch == ch) { struct ClientMonitorRecord *next; - GNUNET_free_non_null (monitor->key); next = monitor->next; - GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, monitor); + GNUNET_free_non_null (monitor->key); + GNUNET_CONTAINER_DLL_remove (monitor_head, + monitor_tail, + monitor); GNUNET_free (monitor); monitor = next; } else + { monitor = monitor->next; + } } - GNUNET_CONTAINER_multihashmap_iterate (forward_map, &remove_client_records, - pos); - GNUNET_free (pos); + while (NULL != (cqr = ch->cqr_head)) + remove_client_record (cqr); + GNUNET_free (ch); } @@ -413,27 +348,35 @@ transmit_request (struct ClientQueryRecord *cqr) struct GNUNET_CONTAINER_BloomFilter *peer_bf; GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# GET requests from clients injected"), 1, + gettext_noop ("# GET requests from clients injected"), + 1, GNUNET_NO); reply_bf_mutator = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); - reply_bf = - GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator, cqr->seen_replies, + reply_bf + = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator, + cqr->seen_replies, cqr->seen_replies_count); - peer_bf = - GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, + peer_bf + = GNUNET_CONTAINER_bloomfilter_init (NULL, + DHT_BLOOM_SIZE, GNUNET_CONSTANTS_BLOOMFILTER_K); LOG (GNUNET_ERROR_TYPE_DEBUG, "Initiating GET for %s, replication %u, already have %u replies\n", GNUNET_h2s (&cqr->key), cqr->replication, cqr->seen_replies_count); - GDS_NEIGHBOURS_handle_get (cqr->type, cqr->msg_options, cqr->replication, + GDS_NEIGHBOURS_handle_get (cqr->type, + cqr->msg_options, + cqr->replication, 0 /* hop count */ , - &cqr->key, cqr->xquery, cqr->xquery_size, reply_bf, - reply_bf_mutator, peer_bf); + &cqr->key, + cqr->xquery, + cqr->xquery_size, + reply_bf, + reply_bf_mutator, + peer_bf); GNUNET_CONTAINER_bloomfilter_free (reply_bf); GNUNET_CONTAINER_bloomfilter_free (peer_bf); @@ -445,7 +388,7 @@ transmit_request (struct ClientQueryRecord *cqr) /** - * Task that looks at the 'retry_heap' and transmits all of the requests + * Task that looks at the #retry_heap and transmits all of the requests * on the heap that are ready for transmission. Then re-schedules * itself (unless the heap is empty). * @@ -465,51 +408,62 @@ transmit_next_request_task (void *cls) if (delay.rel_value_us > 0) { cqr->hnode = - GNUNET_CONTAINER_heap_insert (retry_heap, cqr, + GNUNET_CONTAINER_heap_insert (retry_heap, + cqr, cqr->retry_time.abs_value_us); retry_task = - GNUNET_SCHEDULER_add_delayed (delay, &transmit_next_request_task, + GNUNET_SCHEDULER_add_delayed (delay, + &transmit_next_request_task, NULL); return; } transmit_request (cqr); - cqr->hnode = - GNUNET_CONTAINER_heap_insert (retry_heap, cqr, + cqr->hnode + = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, cqr->retry_time.abs_value_us); } } +/** + * Check DHT PUT messages from the client. + * + * @param cls the client we received this message from + * @param dht_msg the actual message received + * @return #GNUNET_OK (always) + */ +static int +check_dht_local_put (void *cls, + const struct GNUNET_DHT_ClientPutMessage *dht_msg) +{ + /* always well-formed */ + return GNUNET_OK; +} + + /** * Handler for PUT messages. * - * @param cls closure for the service - * @param client the client we received this message from - * @param message the actual message received + * @param cls the client we received this message from + * @param dht_msg the actual message received */ static void -handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +handle_dht_local_put (void *cls, + const struct GNUNET_DHT_ClientPutMessage *dht_msg) { - const struct GNUNET_DHT_ClientPutMessage *dht_msg; + struct ClientHandle *ch = cls; struct GNUNET_CONTAINER_BloomFilter *peer_bf; uint16_t size; - struct PendingMessage *pm; + struct GNUNET_MQ_Envelope *env; struct GNUNET_DHT_ClientPutConfirmationMessage *conf; - size = ntohs (message->size); - if (size < sizeof (struct GNUNET_DHT_ClientPutMessage)) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } + size = ntohs (dht_msg->header.size); GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# PUT requests received from clients"), 1, + gettext_noop ("# PUT requests received from clients"), + 1, GNUNET_NO); - dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message; - LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N CLIENT-PUT %s\n", + LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, + "CLIENT-PUT %s\n", GNUNET_h2s_full (&dht_msg->key)); /* give to local clients */ LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -517,26 +471,38 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, size - sizeof (struct GNUNET_DHT_ClientPutMessage), GNUNET_h2s (&dht_msg->key)); GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration), - &dht_msg->key, 0, NULL, 0, NULL, + &dht_msg->key, + 0, + NULL, + 0, + NULL, ntohl (dht_msg->type), size - sizeof (struct GNUNET_DHT_ClientPutMessage), &dht_msg[1]); /* store locally */ GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration), - &dht_msg->key, 0, NULL, ntohl (dht_msg->type), + &dht_msg->key, + 0, + NULL, + ntohl (dht_msg->type), size - sizeof (struct GNUNET_DHT_ClientPutMessage), &dht_msg[1]); /* route to other peers */ - peer_bf = - GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, + peer_bf + = GNUNET_CONTAINER_bloomfilter_init (NULL, + DHT_BLOOM_SIZE, GNUNET_CONSTANTS_BLOOMFILTER_K); - GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type), ntohl (dht_msg->options), + GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type), + ntohl (dht_msg->options), ntohl (dht_msg->desired_replication_level), GNUNET_TIME_absolute_ntoh (dht_msg->expiration), 0 /* hop count */ , - peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1], - size - - sizeof (struct GNUNET_DHT_ClientPutMessage)); + peer_bf, + &dht_msg->key, + 0, + NULL, + &dht_msg[1], + size - sizeof (struct GNUNET_DHT_ClientPutMessage)); GDS_CLIENTS_process_put (ntohl (dht_msg->options), ntohl (dht_msg->type), 0, @@ -548,45 +514,50 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, &dht_msg[1], size - sizeof (struct GNUNET_DHT_ClientPutMessage)); GNUNET_CONTAINER_bloomfilter_free (peer_bf); - pm = GNUNET_malloc (sizeof (struct PendingMessage) + - sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)); - conf = (struct GNUNET_DHT_ClientPutConfirmationMessage *) &pm[1]; - conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)); - conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK); + env = GNUNET_MQ_msg (conf, + GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK); conf->reserved = htonl (0); conf->unique_id = dht_msg->unique_id; - pm->msg = &conf->header; - add_pending_message (find_active_client (client), pm); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_MQ_send (ch->mq, + env); + GNUNET_SERVICE_client_continue (ch->client); +} + + +/** + * Check DHT GET messages from the client. + * + * @param cls the client we received this message from + * @param message the actual message received + * @return #GNUNET_OK (always) + */ +static int +check_dht_local_get (void *cls, + const struct GNUNET_DHT_ClientGetMessage *get) +{ + /* always well-formed */ + return GNUNET_OK; } /** * Handler for DHT GET messages from the client. * - * @param cls closure for the service - * @param client the client we received this message from + * @param cls the client we received this message from * @param message the actual message received */ static void -handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +handle_dht_local_get (void *cls, + const struct GNUNET_DHT_ClientGetMessage *get) { - const struct GNUNET_DHT_ClientGetMessage *get; + struct ClientHandle *ch = cls; struct ClientQueryRecord *cqr; size_t xquery_size; const char *xquery; uint16_t size; - size = ntohs (message->size); - if (size < sizeof (struct GNUNET_DHT_ClientGetMessage)) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } + size = ntohs (get->header.size); xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage); - get = (const struct GNUNET_DHT_ClientGetMessage *) message; xquery = (const char *) &get[1]; GNUNET_STATISTICS_update (GDS_stats, gettext_noop @@ -594,15 +565,17 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_NO); LOG (GNUNET_ERROR_TYPE_DEBUG, "Received GET request for %s from local client %p, xq: %.*s\n", - GNUNET_h2s (&get->key), client, xquery_size, xquery); - - LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N CLIENT-GET %s\n", + GNUNET_h2s (&get->key), + ch->client, + xquery_size, + xquery); + LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, + "CLIENT-GET %s\n", GNUNET_h2s_full (&get->key)); - cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); cqr->key = get->key; - cqr->client = find_active_client (client); + cqr->ch = ch; cqr->xquery = (void *) &cqr[1]; GNUNET_memcpy (&cqr[1], xquery, xquery_size); cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); @@ -613,8 +586,12 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, cqr->replication = ntohl (get->desired_replication_level); cqr->msg_options = ntohl (get->options); cqr->type = ntohl (get->type); - // FIXME use cqr->key, set multihashmap create to GNUNET_YES - GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr, + GNUNET_CONTAINER_DLL_insert (ch->cqr_head, + ch->cqr_tail, + cqr); + GNUNET_CONTAINER_multihashmap_put (forward_map, + &cqr->key, + cqr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); GDS_CLIENTS_process_get (ntohl (get->options), ntohl (get->type), @@ -635,13 +612,12 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, xquery_size, NULL, 0); - GNUNET_SERVER_receive_done (client, - GNUNET_OK); + GNUNET_SERVICE_client_continue (ch->client); } /** - * Closure for 'find_by_unique_id'. + * Closure for #find_by_unique_id(). */ struct FindByUniqueIdContext { @@ -661,8 +637,8 @@ struct FindByUniqueIdContext * * @param cls the search context * @param key query for the lookup (not used) - * @param value the 'struct ClientQueryRecord' - * @return GNUNET_YES to continue iteration (result not yet found) + * @param value the `struct ClientQueryRecord` + * @return #GNUNET_YES to continue iteration (result not yet found) */ static int find_by_unique_id (void *cls, @@ -679,18 +655,42 @@ find_by_unique_id (void *cls, } +/** + * Check "GET result seen" messages from the client. + * + * @param cls the client we received this message from + * @param message the actual message received + * @return #GNUNET_OK if @a seen is well-formed + */ +static int +check_dht_local_get_result_seen (void *cls, + const struct GNUNET_DHT_ClientGetResultSeenMessage *seen) +{ + uint16_t size; + unsigned int hash_count; + + size = ntohs (seen->header.size); + hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode); + if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + /** * Handler for "GET result seen" messages from the client. * - * @param cls closure for the service - * @param client the client we received this message from + * @param cls the client we received this message from * @param message the actual message received */ static void -handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +handle_dht_local_get_result_seen (void *cls, + const struct GNUNET_DHT_ClientGetResultSeenMessage *seen) { - const struct GNUNET_DHT_ClientGetResultSeenMessage *seen; + struct ClientHandle *ch = cls; uint16_t size; unsigned int hash_count; unsigned int old_count; @@ -698,21 +698,8 @@ handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client struct FindByUniqueIdContext fui_ctx; struct ClientQueryRecord *cqr; - size = ntohs (message->size); - if (size < sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - seen = (const struct GNUNET_DHT_ClientGetResultSeenMessage *) message; + size = ntohs (seen->header.size); hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode); - if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode)) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } hc = (const struct GNUNET_HashCode*) &seen[1]; fui_ctx.unique_id = seen->unique_id; fui_ctx.cqr = NULL; @@ -723,7 +710,7 @@ handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client if (NULL == (cqr = fui_ctx.cqr)) { GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + GNUNET_SERVICE_client_drop (ch->client); return; } /* finally, update 'seen' list */ @@ -732,20 +719,20 @@ handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client cqr->seen_replies_count, cqr->seen_replies_count + hash_count); GNUNET_memcpy (&cqr->seen_replies[old_count], - hc, - sizeof (struct GNUNET_HashCode) * hash_count); + hc, + sizeof (struct GNUNET_HashCode) * hash_count); } /** - * Closure for 'remove_by_unique_id'. + * Closure for #remove_by_unique_id(). */ struct RemoveByUniqueIdContext { /** * Client that issued the removal request. */ - struct ClientList *client; + struct ClientHandle *ch; /** * Unique ID of the request. @@ -761,20 +748,24 @@ struct RemoveByUniqueIdContext * @param cls unique ID and client to search for in source routes * @param key current key code * @param value value in the hash map, a ClientQueryRecord - * @return GNUNET_YES (we should continue to iterate) + * @return #GNUNET_YES (we should continue to iterate) */ static int -remove_by_unique_id (void *cls, const struct GNUNET_HashCode * key, void *value) +remove_by_unique_id (void *cls, + const struct GNUNET_HashCode *key, + void *value) { const struct RemoveByUniqueIdContext *ctx = cls; - struct ClientQueryRecord *record = value; + struct ClientQueryRecord *cqr = value; - if (record->unique_id != ctx->unique_id) + if (cqr->unique_id != ctx->unique_id) return GNUNET_YES; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Removing client %p's record for key %s (by unique id)\n", - ctx->client->client_handle, GNUNET_h2s (key)); - return remove_client_records (ctx->client, key, record); + ctx->ch->client, + GNUNET_h2s (key)); + remove_client_record (cqr); + return GNUNET_YES; } @@ -782,18 +773,15 @@ remove_by_unique_id (void *cls, const struct GNUNET_HashCode * key, void *value) * Handler for any generic DHT stop messages, calls the appropriate handler * depending on message type (if processed locally) * - * @param cls closure for the service - * @param client the client we received this message from + * @param cls client we received this message from * @param message the actual message received * */ static void handle_dht_local_get_stop (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg) { - const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg = - (const struct GNUNET_DHT_ClientGetStopMessage *) message; + struct ClientHandle *ch = cls; struct RemoveByUniqueIdContext ctx; GNUNET_STATISTICS_update (GDS_stats, @@ -803,179 +791,98 @@ handle_dht_local_get_stop (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Received GET STOP request for %s from local client %p\n", GNUNET_h2s (&dht_stop_msg->key), - client); - ctx.client = find_active_client (client); + ch->client); + ctx.ch = ch; ctx.unique_id = dht_stop_msg->unique_id; - GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->key, - &remove_by_unique_id, &ctx); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, + &dht_stop_msg->key, + &remove_by_unique_id, + &ctx); + GNUNET_SERVICE_client_continue (ch->client); } /** * Handler for monitor start messages * - * @param cls closure for the service - * @param client the client we received this message from - * @param message the actual message received + * @param cls the client we received this message from + * @param msg the actual message received * */ static void -handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +handle_dht_local_monitor (void *cls, + const struct GNUNET_DHT_MonitorStartStopMessage *msg) { + struct ClientHandle *ch = cls; struct ClientMonitorRecord *r; - const struct GNUNET_DHT_MonitorStartStopMessage *msg; - msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message; r = GNUNET_new (struct ClientMonitorRecord); - - r->client = find_active_client(client); - r->type = ntohl(msg->type); - r->get = ntohs(msg->get); - r->get_resp = ntohs(msg->get_resp); - r->put = ntohs(msg->put); - if (0 == ntohs(msg->filter_key)) - r->key = NULL; + r->ch = ch; + r->type = ntohl (msg->type); + r->get = ntohs (msg->get); + r->get_resp = ntohs (msg->get_resp); + r->put = ntohs (msg->put); + if (0 == ntohs (msg->filter_key)) + { + r->key = NULL; + } else { r->key = GNUNET_new (struct GNUNET_HashCode); - GNUNET_memcpy (r->key, &msg->key, sizeof (struct GNUNET_HashCode)); + GNUNET_memcpy (r->key, + &msg->key, + sizeof (struct GNUNET_HashCode)); } - GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_CONTAINER_DLL_insert (monitor_head, + monitor_tail, + r); + GNUNET_SERVICE_client_continue (ch->client); } /** * Handler for monitor stop messages * - * @param cls closure for the service - * @param client the client we received this message from - * @param message the actual message received - * + * @param cls the client we received this message from + * @param msg the actual message received */ static void -handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +handle_dht_local_monitor_stop (void *cls, + const struct GNUNET_DHT_MonitorStartStopMessage *msg) { + struct ClientHandle *ch = cls; struct ClientMonitorRecord *r; - const struct GNUNET_DHT_MonitorStartStopMessage *msg; int keys_match; - msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message; - r = monitor_head; - - while (NULL != r) + GNUNET_SERVICE_client_continue (ch->client); + for (r = monitor_head; NULL != r; r = r->next) { if (NULL == r->key) - keys_match = (0 == ntohs(msg->filter_key)); + { + keys_match = (0 == ntohs(msg->filter_key)); + } else { - keys_match = (0 != ntohs(msg->filter_key) - && !memcmp(r->key, &msg->key, sizeof(struct GNUNET_HashCode))); + keys_match = ( (0 != ntohs(msg->filter_key)) && + (! memcmp (r->key, + &msg->key, + sizeof(struct GNUNET_HashCode))) ); } - if (find_active_client(client) == r->client - && ntohl(msg->type) == r->type - && r->get == msg->get - && r->get_resp == msg->get_resp - && r->put == msg->put - && keys_match - ) + if ( (ch == r->ch) && + (ntohl(msg->type) == r->type) && + (r->get == msg->get) && + (r->get_resp == msg->get_resp) && + (r->put == msg->put) && + keys_match ) { - GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, r); - GNUNET_free_non_null (r->key); - GNUNET_free (r); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; /* Delete only ONE entry */ + GNUNET_CONTAINER_DLL_remove (monitor_head, + monitor_tail, + r); + GNUNET_free_non_null (r->key); + GNUNET_free (r); + return; /* Delete only ONE entry */ } - r = r->next; } - - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - - -/** - * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready - * request. A ClientList is passed as closure, take the head of the list - * and copy it into buf, which has the result of sending the message to the - * client. - * - * @param cls closure to this call - * @param size maximum number of bytes available to send - * @param buf where to copy the actual message to - * - * @return the number of bytes actually copied, 0 indicates failure - */ -static size_t -send_reply_to_client (void *cls, size_t size, void *buf) -{ - struct ClientList *client = cls; - char *cbuf = buf; - struct PendingMessage *reply; - size_t off; - size_t msize; - - client->transmit_handle = NULL; - if (buf == NULL) - { - /* client disconnected */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client %p disconnected, pending messages will be discarded\n", - client->client_handle); - return 0; - } - off = 0; - while ((NULL != (reply = client->pending_head)) && - (size >= off + (msize = ntohs (reply->msg->size)))) - { - GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail, - reply); - GNUNET_memcpy (&cbuf[off], reply->msg, msize); - GNUNET_free (reply); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u bytes to client %p\n", - (unsigned int) msize, - client->client_handle); - off += msize; - } - process_pending_messages (client); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitted %u/%u bytes to client %p\n", - (unsigned int) off, - (unsigned int) size, - client->client_handle); - return off; -} - - -/** - * Task run to check for messages that need to be sent to a client. - * - * @param client a ClientList, containing the client and any messages to be sent to it - */ -static void -process_pending_messages (struct ClientList *client) -{ - if ((client->pending_head == NULL) || (client->transmit_handle != NULL)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not asking for transmission to %p now: %s\n", - client->client_handle, - client->pending_head == - NULL ? "no more messages" : "request already pending"); - return; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking for transmission of %u bytes to client %p\n", - ntohs (client->pending_head->msg->size), client->client_handle); - client->transmit_handle = - GNUNET_SERVER_notify_transmit_ready (client->client_handle, - ntohs (client->pending_head-> - msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - &send_reply_to_client, client); } @@ -986,19 +893,24 @@ struct ForwardReplyContext { /** - * Actual message to send to matching clients. + * Expiration time of the reply. */ - struct PendingMessage *pm; + struct GNUNET_TIME_Absolute expiration; /** - * Embedded payload. + * GET path taken. */ - const void *data; + const struct GNUNET_PeerIdentity *get_path; /** - * Type of the data. + * PUT path taken. */ - enum GNUNET_BLOCK_Type type; + const struct GNUNET_PeerIdentity *put_path; + + /** + * Embedded payload. + */ + const void *data; /** * Number of bytes in data. @@ -1006,9 +918,19 @@ struct ForwardReplyContext size_t data_size; /** - * Do we need to copy @a pm because it was already used? + * Number of entries in @e get_path. */ - int do_copy; + unsigned int get_path_length; + + /** + * Number of entries in @e put_path. + */ + unsigned int put_path_length; + + /** + * Type of the data. + */ + enum GNUNET_BLOCK_Type type; }; @@ -1031,17 +953,18 @@ forward_reply (void *cls, { struct ForwardReplyContext *frc = cls; struct ClientQueryRecord *record = value; - struct PendingMessage *pm; + struct GNUNET_MQ_Envelope *env; struct GNUNET_DHT_ClientResultMessage *reply; enum GNUNET_BLOCK_EvaluationResult eval; int do_free; struct GNUNET_HashCode ch; - unsigned int i; + struct GNUNET_PeerIdentity *paths; LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, - "R5N CLIENT-RESULT %s\n", + "CLIENT-RESULT %s\n", GNUNET_h2s_full (key)); - if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type)) + if ( (record->type != GNUNET_BLOCK_TYPE_ANY) && + (record->type != frc->type)) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Record type missmatch, not passing request for key %s to local client\n", @@ -1053,8 +976,10 @@ forward_reply (void *cls, return GNUNET_YES; /* type mismatch */ } GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch); - for (i = 0; i < record->seen_replies_count; i++) - if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (struct GNUNET_HashCode))) + for (unsigned int i = 0; i < record->seen_replies_count; i++) + if (0 == memcmp (&record->seen_replies[i], + &ch, + sizeof (struct GNUNET_HashCode))) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Duplicate reply, not passing request for key %s to local client\n", @@ -1065,8 +990,8 @@ forward_reply (void *cls, 1, GNUNET_NO); return GNUNET_YES; /* duplicate */ } - eval = - GNUNET_BLOCK_evaluate (GDS_block_context, + eval + = GNUNET_BLOCK_evaluate (GDS_block_context, record->type, GNUNET_BLOCK_EO_NONE, key, @@ -1086,7 +1011,9 @@ forward_reply (void *cls, do_free = GNUNET_YES; break; case GNUNET_BLOCK_EVALUATION_OK_MORE: - GNUNET_array_append (record->seen_replies, record->seen_replies_count, ch); + GNUNET_array_append (record->seen_replies, + record->seen_replies_count, + ch); do_free = GNUNET_NO; break; case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: @@ -1112,34 +1039,38 @@ forward_reply (void *cls, GNUNET_break (0); return GNUNET_NO; } - if (GNUNET_NO == frc->do_copy) - { - /* first time, we can use the original data */ - pm = frc->pm; - frc->do_copy = GNUNET_YES; - } - else - { - /* two clients waiting for same reply, must copy for queueing */ - pm = GNUNET_malloc (sizeof (struct PendingMessage) + - ntohs (frc->pm->msg->size)); - GNUNET_memcpy (pm, frc->pm, - sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size)); - pm->next = pm->prev = NULL; - pm->msg = (struct GNUNET_MessageHeader *) &pm[1]; - } GNUNET_STATISTICS_update (GDS_stats, - gettext_noop ("# RESULTS queued for clients"), 1, + gettext_noop ("# RESULTS queued for clients"), + 1, GNUNET_NO); - reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1]; + env = GNUNET_MQ_msg_extra (reply, + frc->data_size + + (frc->get_path_length + frc->put_path_length) * sizeof (struct GNUNET_PeerIdentity), + GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT); + reply->type = htonl (frc->type); + reply->get_path_length = htonl (frc->get_path_length); + reply->put_path_length = htonl (frc->put_path_length); reply->unique_id = record->unique_id; + reply->expiration = GNUNET_TIME_absolute_hton (frc->expiration); + reply->key = *key; + paths = (struct GNUNET_PeerIdentity *) &reply[1]; + GNUNET_memcpy (paths, + frc->put_path, + sizeof (struct GNUNET_PeerIdentity) * frc->put_path_length); + GNUNET_memcpy (&paths[frc->put_path_length], + frc->get_path, + sizeof (struct GNUNET_PeerIdentity) * frc->get_path_length); + GNUNET_memcpy (&paths[frc->get_path_length + frc->put_path_length], + frc->data, + frc->data_size); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queueing reply to query %s for client %p\n", + "Sending reply to query %s for client %p\n", GNUNET_h2s (key), - record->client->client_handle); - add_pending_message (record->client, pm); + record->ch->client); + GNUNET_MQ_send (record->ch->mq, + env); if (GNUNET_YES == do_free) - remove_client_records (record->client, key, record); + remove_client_record (record); return GNUNET_YES; } @@ -1166,74 +1097,48 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, const struct GNUNET_PeerIdentity *get_path, unsigned int put_path_length, const struct GNUNET_PeerIdentity *put_path, - enum GNUNET_BLOCK_Type type, size_t data_size, + enum GNUNET_BLOCK_Type type, + size_t data_size, const void *data) { struct ForwardReplyContext frc; - struct PendingMessage *pm; - struct GNUNET_DHT_ClientResultMessage *reply; - struct GNUNET_PeerIdentity *paths; size_t msize; - if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map, key)) + msize = sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size + + (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity); + if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + return; + } + if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map, + key)) { LOG (GNUNET_ERROR_TYPE_DEBUG, "No matching client for reply for key %s\n", GNUNET_h2s (key)); GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# REPLIES ignored for CLIENTS (no match)"), 1, + gettext_noop ("# REPLIES ignored for CLIENTS (no match)"), + 1, GNUNET_NO); return; /* no matching request, fast exit! */ } - msize = - sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size + - (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity); - if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) - { - GNUNET_break (0); - return; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Forwarding reply for key %s to client\n", - GNUNET_h2s (key)); - - pm = GNUNET_malloc (msize + sizeof (struct PendingMessage)); - reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1]; - pm->msg = &reply->header; - reply->header.size = htons ((uint16_t) msize); - reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT); - reply->type = htonl (type); - reply->get_path_length = htonl (get_path_length); - reply->put_path_length = htonl (put_path_length); - reply->unique_id = 0; /* filled in later */ - reply->expiration = GNUNET_TIME_absolute_hton (expiration); - reply->key = *key; - paths = (struct GNUNET_PeerIdentity *) &reply[1]; - GNUNET_memcpy (paths, put_path, - sizeof (struct GNUNET_PeerIdentity) * put_path_length); - GNUNET_memcpy (&paths[put_path_length], get_path, - sizeof (struct GNUNET_PeerIdentity) * get_path_length); - GNUNET_memcpy (&paths[get_path_length + put_path_length], data, data_size); - frc.do_copy = GNUNET_NO; - frc.pm = pm; + frc.expiration = expiration; + frc.get_path = get_path; + frc.put_path = put_path; frc.data = data; frc.data_size = data_size; + frc.get_path_length = get_path_length; + frc.put_path_length = put_path_length; frc.type = type; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Forwarding reply for key %s to client\n", + GNUNET_h2s (key)); GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, key, &forward_reply, &frc); - if (GNUNET_NO == frc.do_copy) - { - /* did not match any of the requests, free! */ - GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# REPLIES ignored for CLIENTS (no match)"), 1, - GNUNET_NO); - GNUNET_free (pm); - } } @@ -1259,18 +1164,21 @@ GDS_CLIENTS_process_get (uint32_t options, const struct GNUNET_HashCode * key) { struct ClientMonitorRecord *m; - struct ClientList **cl; + struct ClientHandle **cl; unsigned int cl_size; cl = NULL; cl_size = 0; for (m = monitor_head; NULL != m; m = m->next) { - if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && - (NULL == m->key || - memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) + if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) || + (m->type == type) ) && + ( (NULL == m->key) || + (0 == memcmp (key, + m->key, + sizeof(struct GNUNET_HashCode))) ) ) { - struct PendingMessage *pm; + struct GNUNET_MQ_Envelope *env; struct GNUNET_DHT_MonitorGetMessage *mmsg; struct GNUNET_PeerIdentity *msg_path; size_t msize; @@ -1278,31 +1186,30 @@ GDS_CLIENTS_process_get (uint32_t options, /* Don't send duplicates */ for (i = 0; i < cl_size; i++) - if (cl[i] == m->client) + if (cl[i] == m->ch) break; if (i < cl_size) continue; - GNUNET_array_append (cl, cl_size, m->client); + GNUNET_array_append (cl, + cl_size, + m->ch); msize = path_length * sizeof (struct GNUNET_PeerIdentity); - msize += sizeof (struct GNUNET_DHT_MonitorGetMessage); - msize += sizeof (struct PendingMessage); - pm = GNUNET_malloc (msize); - mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1]; - pm->msg = &mmsg->header; - mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); - mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); + env = GNUNET_MQ_msg_extra (mmsg, + msize, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); mmsg->options = htonl(options); mmsg->type = htonl(type); mmsg->hop_count = htonl(hop_count); mmsg->desired_replication_level = htonl(desired_replication_level); mmsg->get_path_length = htonl(path_length); - GNUNET_memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); + mmsg->key = *key; msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; - if (path_length > 0) - GNUNET_memcpy (msg_path, path, - path_length * sizeof (struct GNUNET_PeerIdentity)); - add_pending_message (m->client, pm); + GNUNET_memcpy (msg_path, + path, + path_length * sizeof (struct GNUNET_PeerIdentity)); + GNUNET_MQ_send (m->ch->mq, + env); } } GNUNET_free_non_null (cl); @@ -1335,7 +1242,7 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, size_t size) { struct ClientMonitorRecord *m; - struct ClientList **cl; + struct ClientHandle **cl; unsigned int cl_size; cl = NULL; @@ -1346,7 +1253,7 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, (NULL == m->key || memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) { - struct PendingMessage *pm; + struct GNUNET_MQ_Envelope *env; struct GNUNET_DHT_MonitorGetRespMessage *mmsg; struct GNUNET_PeerIdentity *path; size_t msize; @@ -1354,40 +1261,37 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, /* Don't send duplicates */ for (i = 0; i < cl_size; i++) - if (cl[i] == m->client) + if (cl[i] == m->ch) break; if (i < cl_size) continue; - GNUNET_array_append (cl, cl_size, m->client); + GNUNET_array_append (cl, + cl_size, + m->ch); msize = size; msize += (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity); - msize += sizeof (struct GNUNET_DHT_MonitorGetRespMessage); - msize += sizeof (struct PendingMessage); - pm = GNUNET_malloc (msize); - mmsg = (struct GNUNET_DHT_MonitorGetRespMessage *) &pm[1]; - pm->msg = (struct GNUNET_MessageHeader *) mmsg; - mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); - mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP); + env = GNUNET_MQ_msg_extra (mmsg, + msize, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP); mmsg->type = htonl(type); mmsg->put_path_length = htonl(put_path_length); mmsg->get_path_length = htonl(get_path_length); - path = (struct GNUNET_PeerIdentity *) &mmsg[1]; - if (put_path_length > 0) - { - GNUNET_memcpy (path, put_path, - put_path_length * sizeof (struct GNUNET_PeerIdentity)); - path = &path[put_path_length]; - } - if (get_path_length > 0) - GNUNET_memcpy (path, get_path, - get_path_length * sizeof (struct GNUNET_PeerIdentity)); mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp); - GNUNET_memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); - if (size > 0) - GNUNET_memcpy (&path[get_path_length], data, size); - add_pending_message (m->client, pm); + mmsg->key = *key; + path = (struct GNUNET_PeerIdentity *) &mmsg[1]; + GNUNET_memcpy (path, + put_path, + put_path_length * sizeof (struct GNUNET_PeerIdentity)); + GNUNET_memcpy (path, + get_path, + get_path_length * sizeof (struct GNUNET_PeerIdentity)); + GNUNET_memcpy (&path[get_path_length], + data, + size); + GNUNET_MQ_send (m->ch->mq, + env); } } GNUNET_free_non_null (cl); @@ -1422,7 +1326,7 @@ GDS_CLIENTS_process_put (uint32_t options, size_t size) { struct ClientMonitorRecord *m; - struct ClientList **cl; + struct ClientHandle **cl; unsigned int cl_size; cl = NULL; @@ -1433,7 +1337,7 @@ GDS_CLIENTS_process_put (uint32_t options, (NULL == m->key || memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) { - struct PendingMessage *pm; + struct GNUNET_MQ_Envelope *env; struct GNUNET_DHT_MonitorPutMessage *mmsg; struct GNUNET_PeerIdentity *msg_path; size_t msize; @@ -1441,38 +1345,35 @@ GDS_CLIENTS_process_put (uint32_t options, /* Don't send duplicates */ for (i = 0; i < cl_size; i++) - if (cl[i] == m->client) + if (cl[i] == m->ch) break; if (i < cl_size) continue; - GNUNET_array_append (cl, cl_size, m->client); + GNUNET_array_append (cl, + cl_size, + m->ch); msize = size; msize += path_length * sizeof (struct GNUNET_PeerIdentity); - msize += sizeof (struct GNUNET_DHT_MonitorPutMessage); - msize += sizeof (struct PendingMessage); - pm = GNUNET_malloc (msize); - mmsg = (struct GNUNET_DHT_MonitorPutMessage *) &pm[1]; - pm->msg = (struct GNUNET_MessageHeader *) mmsg; - mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); - mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT); + env = GNUNET_MQ_msg_extra (mmsg, + msize, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT); mmsg->options = htonl(options); mmsg->type = htonl(type); mmsg->hop_count = htonl(hop_count); - mmsg->desired_replication_level = htonl(desired_replication_level); - mmsg->put_path_length = htonl(path_length); + mmsg->desired_replication_level = htonl (desired_replication_level); + mmsg->put_path_length = htonl (path_length); + mmsg->key = *key; + mmsg->expiration_time = GNUNET_TIME_absolute_hton (exp); msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; - if (path_length > 0) - { - GNUNET_memcpy (msg_path, - path, - path_length * sizeof (struct GNUNET_PeerIdentity)); - } - mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp); - GNUNET_memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); - if (size > 0) - GNUNET_memcpy (&msg_path[path_length], data, size); - add_pending_message (m->client, pm); + GNUNET_memcpy (msg_path, + path, + path_length * sizeof (struct GNUNET_PeerIdentity)); + GNUNET_memcpy (&msg_path[path_length], + data, + size); + GNUNET_MQ_send (m->ch->mq, + env); } } GNUNET_free_non_null (cl); @@ -1484,43 +1385,21 @@ GDS_CLIENTS_process_put (uint32_t options, * * @param server the initialized server */ -void +static void GDS_CLIENTS_init () { - static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { - {&handle_dht_local_put, NULL, - GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0}, - {&handle_dht_local_get, NULL, - GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0}, - {&handle_dht_local_get_stop, NULL, - GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, - sizeof (struct GNUNET_DHT_ClientGetStopMessage)}, - {&handle_dht_local_monitor, NULL, - GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, - sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, - {&handle_dht_local_monitor_stop, NULL, - GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, - sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, - {&handle_dht_local_get_result_seen, NULL, - GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, 0}, - {NULL, NULL, 0, 0} - }; - - forward_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO); - retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); - GNUNET_SERVER_resume (GDS_server); - GNUNET_SERVER_add_handlers (GDS_server, - plugin_handlers); - GNUNET_SERVER_disconnect_notify (GDS_server, - &handle_client_disconnect, - NULL); + forward_map + = GNUNET_CONTAINER_multihashmap_create (1024, + GNUNET_YES); + retry_heap + = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); } /** * Shutdown client subsystem. */ -void +static void GDS_CLIENTS_stop () { if (NULL != retry_task) @@ -1532,13 +1411,51 @@ GDS_CLIENTS_stop () /** - * Shutdown client subsystem. + * Define "main" method using service macro. + * + * @param run name of the initializaton method for the service */ -void +#define GDS_DHT_SERVICE_INIT(run) \ + GNUNET_SERVICE_MAIN \ + ("dht", \ + GNUNET_SERVICE_OPTION_NONE, \ + run, \ + &client_connect_cb, \ + &client_disconnect_cb, \ + NULL, \ + GNUNET_MQ_hd_var_size (dht_local_put, \ + GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \ + struct GNUNET_DHT_ClientPutMessage, \ + NULL), \ + GNUNET_MQ_hd_var_size (dht_local_get, \ + GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \ + struct GNUNET_DHT_ClientGetMessage, \ + NULL), \ + GNUNET_MQ_hd_fixed_size (dht_local_get_stop, \ + GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \ + struct GNUNET_DHT_ClientGetStopMessage, \ + NULL), \ + GNUNET_MQ_hd_fixed_size (dht_local_monitor, \ + GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \ + struct GNUNET_DHT_MonitorStartStopMessage, \ + NULL), \ + GNUNET_MQ_hd_fixed_size (dht_local_monitor_stop, \ + GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \ + struct GNUNET_DHT_MonitorStartStopMessage, \ + NULL), \ + GNUNET_MQ_hd_var_size (dht_local_get_result_seen, \ + GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \ + struct GNUNET_DHT_ClientGetResultSeenMessage , \ + NULL), \ + GNUNET_MQ_handler_end ()) + + +/** + * MINIMIZE heap size (way below 128k) since this process doesn't need much. + */ +void __attribute__ ((destructor)) GDS_CLIENTS_done () { - GNUNET_assert (NULL == client_head); - GNUNET_assert (NULL == client_tail); if (NULL != retry_heap) { GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap)); diff --git a/src/dht/gnunet-service-dht_clients.h b/src/dht/gnunet-service-dht_clients.h deleted file mode 100644 index 8a0931f6a..000000000 --- a/src/dht/gnunet-service-dht_clients.h +++ /dev/null @@ -1,153 +0,0 @@ -/* - This file is part of GNUnet. - Copyright (C) 2009, 2010, 2011 GNUnet e.V. - - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. - - GNUnet is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, - Boston, MA 02110-1301, USA. -*/ - -/** - * @file dht/gnunet-service-dht_clients.h - * @brief GNUnet DHT service's client management code - * @author Christian Grothoff - * @author Nathan Evans - */ -#ifndef GNUNET_SERVICE_DHT_CLIENT_H -#define GNUNET_SERVICE_DHT_CLIENT_H - -#include "gnunet_util_lib.h" -#include "gnunet_block_lib.h" - -/** - * Handle a reply we've received from another peer. If the reply - * matches any of our pending queries, forward it to the respective - * client(s). - * - * @param expiration when will the reply expire - * @param key the query this reply is for - * @param get_path_length number of peers in @a get_path - * @param get_path path the reply took on get - * @param put_path_length number of peers in @a put_path - * @param put_path path the reply took on put - * @param type type of the reply - * @param data_size number of bytes in @a data - * @param data application payload data - */ -void -GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, - const struct GNUNET_HashCode *key, - unsigned int get_path_length, - const struct GNUNET_PeerIdentity *get_path, - unsigned int put_path_length, - const struct GNUNET_PeerIdentity *put_path, - enum GNUNET_BLOCK_Type type, size_t data_size, - const void *data); - - -/** - * Check if some client is monitoring GET messages and notify - * them in that case. - * - * @param options Options, for instance RecordRoute, DemultiplexEverywhere. - * @param type The type of data in the request. - * @param hop_count Hop count so far. - * @param path_length number of entries in path (or 0 if not recorded). - * @param path peers on the GET path (or NULL if not recorded). - * @param desired_replication_level Desired replication level. - * @param key Key of the requested data. - */ -void -GDS_CLIENTS_process_get (uint32_t options, - enum GNUNET_BLOCK_Type type, - uint32_t hop_count, - uint32_t desired_replication_level, - unsigned int path_length, - const struct GNUNET_PeerIdentity *path, - const struct GNUNET_HashCode *key); - - -/** - * Check if some client is monitoring GET RESP messages and notify - * them in that case. - * - * @param type The type of data in the result. - * @param get_path Peers on GET path (or NULL if not recorded). - * @param get_path_length number of entries in @a get_path. - * @param put_path peers on the PUT path (or NULL if not recorded). - * @param put_path_length number of entries in @a get_path. - * @param exp Expiration time of the data. - * @param key Key of the @a data. - * @param data Pointer to the result data. - * @param size Number of bytes in @a data. - */ -void -GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, - const struct GNUNET_PeerIdentity *get_path, - unsigned int get_path_length, - const struct GNUNET_PeerIdentity *put_path, - unsigned int put_path_length, - struct GNUNET_TIME_Absolute exp, - const struct GNUNET_HashCode * key, - const void *data, - size_t size); - - -/** - * Check if some client is monitoring PUT messages and notify - * them in that case. - * - * @param options Options, for instance RecordRoute, DemultiplexEverywhere. - * @param type The type of data in the request. - * @param hop_count Hop count so far. - * @param path_length number of entries in path (or 0 if not recorded). - * @param path peers on the PUT path (or NULL if not recorded). - * @param desired_replication_level Desired replication level. - * @param exp Expiration time of the data. - * @param key Key under which data is to be stored. - * @param data Pointer to the data carried. - * @param size Number of bytes in data. - */ -void -GDS_CLIENTS_process_put (uint32_t options, - enum GNUNET_BLOCK_Type type, - uint32_t hop_count, - uint32_t desired_replication_level, - unsigned int path_length, - const struct GNUNET_PeerIdentity *path, - struct GNUNET_TIME_Absolute exp, - const struct GNUNET_HashCode * key, - const void *data, - size_t size); - -/** - * Initialize client subsystem. - */ -void -GDS_CLIENTS_init (void); - -/** - * Shutdown client subsystem. - */ -void -GDS_CLIENTS_stop (void); - - -/** - * Shutdown client subsystem. - */ -void -GDS_CLIENTS_done (void); - -#endif diff --git a/src/dht/gnunet-service-dht_datacache.c b/src/dht/gnunet-service-dht_datacache.c index 322b5af17..12c79764d 100644 --- a/src/dht/gnunet-service-dht_datacache.c +++ b/src/dht/gnunet-service-dht_datacache.c @@ -25,7 +25,6 @@ */ #include "platform.h" #include "gnunet_datacache_lib.h" -#include "gnunet-service-dht_clients.h" #include "gnunet-service-dht_datacache.h" #include "gnunet-service-dht_routing.h" #include "gnunet-service-dht.h" diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c index 683e0991a..39fb43495 100644 --- a/src/dht/gnunet-service-dht_neighbours.c +++ b/src/dht/gnunet-service-dht_neighbours.c @@ -39,7 +39,6 @@ #include "gnunet_dht_service.h" #include "gnunet_statistics_service.h" #include "gnunet-service-dht.h" -#include "gnunet-service-dht_clients.h" #include "gnunet-service-dht_datacache.h" #include "gnunet-service-dht_hello.h" #include "gnunet-service-dht_neighbours.h" @@ -1587,7 +1586,7 @@ core_init (void *cls, GNUNET_CRYPTO_hash (identity, sizeof (struct GNUNET_PeerIdentity), &my_identity_hash); - GDS_CLIENTS_init (); + GNUNET_SERVICE_resume (GDS_service); }