From: Bart Polot Date: Wed, 4 Jan 2012 20:00:59 +0000 (+0000) Subject: New DHT-monitor functionality X-Git-Tag: initial-import-from-subversion-38251~15429 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=09372b5120f5905546bd4e73f02f8afeec7e1b1e;p=oweals%2Fgnunet.git New DHT-monitor functionality --- diff --git a/src/dht/dht.h b/src/dht/dht.h index c9fdd3479..9894be89c 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h @@ -193,6 +193,69 @@ struct GNUNET_DHT_ClientPutMessage /* DATA copied to end of this message */ }; + + +/** + * Message to monitor requests going through peer, clients <--> DHT service. + */ +struct GNUNET_DHT_MonitorMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_{GET, PUT, GET_RESP, PUT_RESP*} + * (*) not yet implemented, necessary for key randomization + */ + struct GNUNET_MessageHeader header; + + /** + * The type of data in the request. + */ + uint32_t type GNUNET_PACKED; + + /** + * Message options, actually an 'enum GNUNET_DHT_RouteOption' value. + */ + uint32_t options GNUNET_PACKED; + + /** + * Replication level for this message + */ + uint32_t desired_replication_level GNUNET_PACKED; + + /** + * Number of peers recorded in the outgoing path from source to the + * storgage location of this message. + */ + uint32_t put_path_length GNUNET_PACKED; + + /** + * The number of peer identities recorded from the storage location + * to this peer. + */ + uint32_t get_path_length GNUNET_PACKED; + + /** + * Unique ID for GET / GET responses. + */ + uint64_t unique_id GNUNET_PACKED; + + /** + * How long should this data persist? + */ + struct GNUNET_TIME_AbsoluteNBO expiration; + + /** + * The key to store the value under. + */ + GNUNET_HashCode key; + + /* put path (if tracked) */ + + /* get path (if tracked) */ + + /* Payload */ + +}; + GNUNET_NETWORK_STRUCT_END #endif diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index ac69b7a4e..3f0d709e0 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -33,6 +33,7 @@ #include "gnunet_protocols.h" #include "gnunet_dht_service.h" #include "dht.h" +#include #define DEBUG_DHT_API GNUNET_EXTRA_LOGGING @@ -142,6 +143,49 @@ struct GNUNET_DHT_GetHandle }; +/** + * Handle to a monitoring request. + */ +struct GNUNET_DHT_MonitorHandle +{ + /** + * DLL. + */ + struct GNUNET_DHT_MonitorHandle *next; + + /** + * DLL. + */ + struct GNUNET_DHT_MonitorHandle *prev; + + /** + * Main handle to this DHT api. + */ + struct GNUNET_DHT_Handle *dht_handle; + + /** + * Type of block looked for. + */ + enum GNUNET_BLOCK_Type type; + + /** + * Key being looked for, NULL == all. + */ + GNUNET_HashCode *key; + + /** + * Callback for each received message of interest. + */ + GNUNET_DHT_MonitorCB cb; + + /** + * Closure for cb. + */ + void *cb_cls; + +}; + + /** * Connection to the DHT service. */ @@ -173,6 +217,16 @@ struct GNUNET_DHT_Handle */ struct PendingMessage *pending_tail; + /** + * Head of linked list of messages we would like to monitor. + */ + struct GNUNET_DHT_MonitorHandle *monitor_head; + + /** + * Tail of linked list of messages we would like to monitor. + */ + struct GNUNET_DHT_MonitorHandle *monitor_tail; + /** * Hash map containing the current outstanding unique requests * (values are of type 'struct GNUNET_DHT_RouteHandle'). @@ -500,6 +554,62 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value) } +/** + * Process a monitoring message from the service. + * + * @param handle The DHT handle. + * @param msg Message from the service. + * + * @return GNUNET_OK if everything went fine, + * GNUNET_SYSERR if the message is malformed. + */ +static int +process_monitor_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_DHT_MonitorMessage *m; + struct GNUNET_DHT_MonitorHandle *h; + size_t msize; + + if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET || + ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT) + return GNUNET_SYSERR; + msize = ntohs (msg->size); + if (msize < sizeof (struct GNUNET_DHT_MonitorMessage)) + return GNUNET_SYSERR; + + m = (struct GNUNET_DHT_MonitorMessage *) msg; + h = handle->monitor_head; + while (NULL != h) + { + if (h->type == ntohl(m->type) && + (NULL == h->key || + memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0)) + { + struct GNUNET_PeerIdentity *path; + uint32_t getl; + uint32_t putl; + + path = (struct GNUNET_PeerIdentity *) &m[1]; + getl = ntohl (m->get_path_length); + putl = ntohl (m->put_path_length); + h->cb (h->cb_cls, ntohs(msg->type), + GNUNET_TIME_absolute_ntoh(m->expiration), + &m->key, + &path[getl], putl, path, getl, + ntohl (m->desired_replication_level), + ntohl (m->options), ntohl (m->type), + (void *) &path[getl + putl], + ntohs (msg->size) - + sizeof (struct GNUNET_DHT_MonitorMessage) - + sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); + } + h = h->next; + } + + return GNUNET_OK; +} + /** * Handler for messages received from the DHT service * a demultiplexer which handles numerous message types @@ -524,6 +634,8 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) } if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT) { + if (process_monitor_message (handle, msg) == GNUNET_OK) + return; GNUNET_break (0); do_disconnect (handle); return; @@ -832,4 +944,73 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) } +/** + * Start monitoring the local DHT service. + * + * @param handle Handle to the DHT service. + * @param type Type of blocks that are of interest. + * @param key Key of data of interest, NULL for all. + * @param cb Callback to process all monitored data. + * @param cb_cls Closure for cb. + * + * @return Handle to stop monitoring. + */ +struct GNUNET_DHT_MonitorHandle * +GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, + enum GNUNET_BLOCK_Type type, + const GNUNET_HashCode *key, + GNUNET_DHT_MonitorCB cb, + void *cb_cls) +{ + struct GNUNET_DHT_MonitorHandle *h; + struct GNUNET_DHT_MonitorMessage *m; + struct PendingMessage *pending; + + h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle)); + GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h); + + h->cb = cb; + h->cb_cls = cb_cls; + h->type = type; + h->dht_handle = handle; + if (NULL != key) + { + h->key = GNUNET_malloc (sizeof(GNUNET_HashCode)); + memcpy (h->key, key, sizeof(GNUNET_HashCode)); + } + + pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorMessage) + + sizeof (struct PendingMessage)); + m = (struct GNUNET_DHT_MonitorMessage *) &pending[1]; + pending->msg = &m->header; + pending->handle = handle; + pending->free_on_send = GNUNET_YES; + GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, + pending); + pending->in_pending_queue = GNUNET_YES; + process_pending_messages (handle); + + return h; +} + + +/** + * Stop monitoring. + * + * @param handle The handle to the monitor request returned by monitor_start. + * + * On return get_handle will no longer be valid, caller must not use again!!! + */ +void +GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle) +{ + GNUNET_free_non_null (handle->key); + GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head, + handle->dht_handle->monitor_tail, + handle); + GNUNET_free (handle); +} + + + /* end of dht_api.c */ diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c index dde8c6d7a..7642dc6c3 100644 --- a/src/dht/gnunet-service-dht_clients.c +++ b/src/dht/gnunet-service-dht_clients.c @@ -177,6 +177,39 @@ struct ClientQueryRecord }; +/** + * Struct containing paremeters of monitoring requests. + */ +struct ClientMonitorRecord +{ + + /** + * Next element in DLL. + */ + struct ClientMonitorRecord *next; + + /** + * Previous element in DLL. + */ + struct ClientMonitorRecord *prev; + + /** + * Type of blocks that are of interest + */ + enum GNUNET_BLOCK_Type type; + + /** + * Key of data of interest, NULL for all. + */ + GNUNET_HashCode *key; + + /** + * Client to notify of these requests. + */ + struct ClientList *client; +}; + + /** * List of active clients. */ @@ -187,6 +220,16 @@ static struct ClientList *client_head; */ static struct ClientList *client_tail; +/** + * List of active monitoring requests. + */ +static struct ClientMonitorRecord *monitor_head; + +/** + * List of active monitoring requests.. + */ +static struct ClientMonitorRecord *monitor_tail; + /** * Hashmap for fast key based lookup, maps keys to 'struct ClientQueryRecord' entries. */ @@ -275,6 +318,7 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) { struct ClientList *pos; struct PendingMessage *reply; + struct ClientMonitorRecord *monitor; #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", client); @@ -288,6 +332,22 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, reply); GNUNET_free (reply); } + monitor = monitor_head; + while (NULL != monitor) + { + if (monitor->client == pos) + { + struct ClientMonitorRecord *next; + + GNUNET_free_non_null (monitor->key); + next = monitor->next; + GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, monitor); + GNUNET_free (monitor); + monitor = next; + } + else + monitor = monitor->next; + } GNUNET_CONTAINER_multihashmap_iterate (forward_map, &remove_client_records, pos); GNUNET_free (pos); @@ -575,6 +635,41 @@ handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client, } +/** + * Handler for monitor messages + * + * @param cls closure for the service + * @param client the client we received this message from + * @param message the actual message received + * + */ +static void +handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + struct ClientMonitorRecord *r; + const struct GNUNET_DHT_MonitorMessage *msg; + unsigned int i; + char *c; + + msg = (struct GNUNET_DHT_MonitorMessage *) message; + r = GNUNET_malloc (sizeof(struct ClientMonitorRecord)); + + r->client = find_active_client(client); + r->type = ntohl(msg->type); + c = (char *) &msg->key; + for (i = 0; i < sizeof (GNUNET_HashCode) && c[i] == 0; i++); + if (sizeof (GNUNET_HashCode) == i) + r->key = NULL; + else + { + r->key = GNUNET_malloc (sizeof (GNUNET_HashCode)); + memcpy (r->key, &msg->key, sizeof (GNUNET_HashCode)); + } + +} + + /** * Task run to check for messages that need to be sent to a client. * @@ -929,6 +1024,83 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, } +/** + * Check if some client is monitoring messages of this type and notify + * him in that case. + * + * @param mtype Type of the DHT message. + * @param exp When will this value expire. + * @param key Key of the result/request. + * @param get_path Peers on reply path (or NULL if not recorded). + * @param get_path_length number of entries in get_path. + * @param put_path peers on the PUT path (or NULL if not recorded). + * @param put_path_length number of entries in get_path. + * @param desired_replication_level Desired replication level. + * @param type Type of the result/request. + * @param data Pointer to the result data. + * @param size Number of bytes in data. + */ +void +GDS_CLIENTS_process_monitor (uint16_t mtype, + const struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode *key, + uint32_t putl, + const struct GNUNET_PeerIdentity *put_path, + uint32_t getl, + const struct GNUNET_PeerIdentity *get_path, + uint32_t replevel, + enum GNUNET_BLOCK_Type type, + const struct GNUNET_MessageHeader *data, + uint16_t size) +{ + struct ClientMonitorRecord *m; + struct ClientList **cl; + unsigned int cl_size; + + cl = NULL; + cl_size = 0; + for (m = monitor_head; NULL != m; m = m->next) + { + if (m->type == type && + (NULL == m->key || + memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) + { + struct PendingMessage *pm; + struct GNUNET_DHT_MonitorMessage *mmsg; + struct GNUNET_PeerIdentity *path; + size_t msize; + unsigned int i; + + /* Don't send duplicates */ + for (i = 0; i < cl_size; i++) + if (cl[i] == m->client) + break; + if (i < cl_size) + continue; + GNUNET_array_append (cl, cl_size, m->client); + + msize = size; + msize += (getl + putl) * sizeof (struct GNUNET_PeerIdentity); + msize += sizeof (struct GNUNET_DHT_MonitorMessage); + msize += sizeof (struct PendingMessage); + pm = (struct PendingMessage *) GNUNET_malloc (msize); + mmsg = (struct GNUNET_DHT_MonitorMessage *) &pm[1]; + pm->msg = (struct GNUNET_MessageHeader *) mmsg; + mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); + mmsg->header.type = htons (mtype); + mmsg->expiration = GNUNET_TIME_absolute_hton(exp); + path = (struct GNUNET_PeerIdentity *) &mmsg[1]; + memcpy (path, put_path, putl * sizeof (struct GNUNET_PeerIdentity)); + path = &path[putl]; + memcpy (path, get_path, getl * sizeof (struct GNUNET_PeerIdentity)); + memcpy (&path[getl], data, size); + add_pending_message (m->client, pm); + } + } + GNUNET_free_non_null (cl); +} + + /** * Initialize client subsystem. * @@ -945,6 +1117,9 @@ GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server) {&handle_dht_local_get_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, sizeof (struct GNUNET_DHT_ClientGetStopMessage)}, + {&handle_dht_local_monitor, NULL, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET, + sizeof (struct GNUNET_DHT_MonitorMessage)}, {NULL, NULL, 0, 0} }; forward_map = GNUNET_CONTAINER_multihashmap_create (1024); diff --git a/src/dht/gnunet-service-dht_clients.h b/src/dht/gnunet-service-dht_clients.h index 21b2343e7..a8241d289 100644 --- a/src/dht/gnunet-service-dht_clients.h +++ b/src/dht/gnunet-service-dht_clients.h @@ -56,6 +56,35 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, const void *data); +/** + * Check if some client is monitoring messages of this type and notify + * him in that case. + * + * @param mtype Type of the DHT message. + * @param exp When will this value expire. + * @param key Key of the result/request. + * @param get_path Peers on reply path (or NULL if not recorded). + * @param get_path_length number of entries in get_path. + * @param put_path peers on the PUT path (or NULL if not recorded). + * @param put_path_length number of entries in get_path. + * @param desired_replication_level Desired replication level. + * @param type Type of the result/request. + * @param data Pointer to the result data. + * @param size Number of bytes in data. + */ +void +GDS_CLIENTS_process_monitor (uint16_t mtype, + const struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode *key, + uint32_t putl, + const struct GNUNET_PeerIdentity *put_path, + uint32_t getl, + const struct GNUNET_PeerIdentity *get_path, + uint32_t replevel, + enum GNUNET_BLOCK_Type type, + const struct GNUNET_MessageHeader *data, + uint16_t size); + /** * Initialize client subsystem. * diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c index 907de9950..8c1f42499 100644 --- a/src/dht/gnunet-service-dht_neighbours.c +++ b/src/dht/gnunet-service-dht_neighbours.c @@ -1617,6 +1617,10 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer, pp, payload, payload_size); } GNUNET_CONTAINER_bloomfilter_free (bf); + GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, + GNUNET_TIME_absolute_ntoh (put->expiration_time), &put->key, + putlen, put_path, 0, NULL, ntohl(put->desired_replication_level), + ntohl (put->type), payload, payload_size); return GNUNET_YES; } @@ -1822,6 +1826,10 @@ handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer, 1, GNUNET_NO); } + GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_P2P_GET, + GNUNET_TIME_UNIT_FOREVER_ABS, &get->key, 0, NULL, 0, NULL, + ntohl (get->desired_replication_level), type, NULL, 0); + /* P2P forwarding */ if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) GDS_NEIGHBOURS_handle_get (type, options, @@ -1953,6 +1961,12 @@ handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer, &prm->key, put_path_length, put_path, get_path_length, xget_path, data, data_size); } + + GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, + GNUNET_TIME_absolute_ntoh (prm->expiration_time), &prm->key, + put_path_length, put_path, get_path_length, get_path, + 0, type, data, data_size); + return GNUNET_YES; } diff --git a/src/include/gnunet_dht_service.h b/src/include/gnunet_dht_service.h index f047bd8f8..fd89642c6 100644 --- a/src/include/gnunet_dht_service.h +++ b/src/include/gnunet_dht_service.h @@ -214,6 +214,70 @@ void GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle); +/* *************** Extended API: monitor ******************* */ + +struct GNUNET_DHT_MonitorHandle; + +/** + * Callback called on each request going through the DHT. + * + * @param cls Closure. + * @param mtype Type of the DHT message monitored. + * @param exp When will this value expire. + * @param key Key of the result/request. + * @param get_path Peers on reply path (or NULL if not recorded). + * @param get_path_length number of entries in get_path. + * @param put_path peers on the PUT path (or NULL if not recorded). + * @param put_path_length number of entries in get_path. + * @param desired_replication_level Desired replication level. + * @param type Type of the result/request. + * @param data Pointer to the result data. + * @param size Number of bytes in data. + */ +typedef void (*GNUNET_DHT_MonitorCB) (void *cls, + uint16_t mtype, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + const struct GNUNET_PeerIdentity * + get_path, unsigned int get_path_length, + const struct GNUNET_PeerIdentity * + put_path, unsigned int put_path_length, + uint32_t desired_replication_level, + enum GNUNET_DHT_RouteOption options, + enum GNUNET_BLOCK_Type type, + const void *data, + size_t size); + +/** + * Start monitoring the local DHT service. + * + * @param handle Handle to the DHT service. + * @param type Type of blocks that are of interest. + * @param key Key of data of interest, NULL for all. + * @param cb Callback to process all monitored data. + * @param cb_cls Closure for cb. + * + * @return Handle to stop monitoring. + */ +struct GNUNET_DHT_MonitorHandle * +GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, + enum GNUNET_BLOCK_Type type, + const GNUNET_HashCode *key, + GNUNET_DHT_MonitorCB cb, + void *cb_cls); + + +/** + * Stop monitoring. + * + * @param handle The handle to the monitor request returned by monitor_start. + * + * On return get_handle will no longer be valid, caller must not use again!!! + */ +void +GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle); + + #if 0 /* keep Emacsens' auto-indent happy */ { #endif diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 89b00e20d..790457f72 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -517,6 +517,26 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT 148 +/** + * Request / receive information about transiting GETs + */ +#define GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET 149 + +/** + * Request / receive information about transiting GET responses + */ +#define GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP 150 + +/** + * Request / receive information about transiting PUTs + */ +#define GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT 151 + +/** + * Request / receive information about transiting PUT responses (TODO) + */ +#define GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP 152 + /******************************************************************************* * HOSTLIST message types