X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fdht%2Fgnunet-service-dht_clients.c;h=173a1c3efdefd2ac79cd06b35fd6d40e9bff1d03;hb=6c471eeb15e27f8226492b4860a3c2acb94c5f25;hp=a5ac0c32b1ac00ac6be087666949c3352404173c;hpb=3c5c48af5d473594a46903d9cba99c5a20fa5d95;p=oweals%2Fgnunet.git diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c index a5ac0c32b..173a1c3ef 100644 --- a/src/dht/gnunet-service-dht_clients.c +++ b/src/dht/gnunet-service-dht_clients.c @@ -87,7 +87,7 @@ struct ClientList * Handle to the current transmission request, NULL * if none pending. */ - struct GNUNET_CONNECTION_TransmitHandle *transmit_handle; + struct GNUNET_SERVER_TransmitHandle *transmit_handle; /** * Linked list of pending messages for this client @@ -111,7 +111,7 @@ struct ClientQueryRecord /** * The key this request was about */ - GNUNET_HashCode key; + struct GNUNET_HashCode key; /** * Client responsible for the request. @@ -126,7 +126,7 @@ struct ClientQueryRecord /** * Replies we have already seen for this request. */ - GNUNET_HashCode *seen_replies; + struct GNUNET_HashCode *seen_replies; /** * Pointer to this nodes heap location in the retry-heap (for fast removal) @@ -201,7 +201,22 @@ struct ClientMonitorRecord /** * Key of data of interest, NULL for all. */ - GNUNET_HashCode *key; + struct GNUNET_HashCode *key; + + /** + * Flag whether to notify about GET messages. + */ + int16_t get; + + /** + * Flag whether to notify about GET_REPONSE messages. + */ + int16_t get_resp; + + /** + * Flag whether to notify about PUT messages. + */ + uint16_t put; /** * Client to notify of these requests. @@ -226,7 +241,7 @@ static struct ClientList *client_tail; static struct ClientMonitorRecord *monitor_head; /** - * List of active monitoring requests.. + * List of active monitoring requests. */ static struct ClientMonitorRecord *monitor_tail; @@ -246,6 +261,31 @@ static struct GNUNET_CONTAINER_Heap *retry_heap; static GNUNET_SCHEDULER_TaskIdentifier retry_task; +/** + * Task run to check for messages that need to be sent to a client. + * + * @param client a ClientList, containing the client and any messages to be sent to it + */ +static void +process_pending_messages (struct ClientList *client); + + +/** + * Add a PendingMessage to the clients list of messages to be sent + * + * @param client the active client to send the message to + * @param pending_message the actual message to send + */ +static void +add_pending_message (struct ClientList *client, + struct PendingMessage *pending_message) +{ + GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail, + pending_message); + process_pending_messages (client); +} + + /** * Find a client if it exists, add it otherwise. * @@ -282,18 +322,16 @@ find_active_client (struct GNUNET_SERVER_Client *client) * @return GNUNET_YES (we should continue to iterate) */ static int -remove_client_records (void *cls, const GNUNET_HashCode * key, void *value) +remove_client_records (void *cls, const struct GNUNET_HashCode * key, void *value) { struct ClientList *client = cls; struct ClientQueryRecord *record = value; if (record->client != client) return GNUNET_YES; -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Removing client %p's record for key %s\n", client, GNUNET_h2s (key)); -#endif GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (forward_map, key, record)); @@ -320,13 +358,11 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) struct PendingMessage *reply; struct ClientMonitorRecord *monitor; -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", client); -#endif pos = find_active_client (client); GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos); if (pos->transmit_handle != NULL) - GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_handle); + GNUNET_SERVER_notify_transmit_ready_cancel (pos->transmit_handle); while (NULL != (reply = pos->pending_head)) { GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, reply); @@ -449,6 +485,8 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_DHT_ClientPutMessage *dht_msg; struct GNUNET_CONTAINER_BloomFilter *peer_bf; uint16_t size; + struct PendingMessage *pm; + struct GNUNET_DHT_ClientPutConfirmationMessage *conf; size = ntohs (message->size); if (size < sizeof (struct GNUNET_DHT_ClientPutMessage)) @@ -463,12 +501,10 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_NO); dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message; /* give to local clients */ -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Handling local PUT of %u-bytes for query %s\n", size - sizeof (struct GNUNET_DHT_ClientPutMessage), GNUNET_h2s (&dht_msg->key)); -#endif GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration), &dht_msg->key, 0, NULL, 0, NULL, ntohl (dht_msg->type), @@ -490,7 +526,26 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1], size - sizeof (struct GNUNET_DHT_ClientPutMessage)); + GDS_CLIENTS_process_put (ntohl (dht_msg->options), + ntohl (dht_msg->type), + 0, + ntohl (dht_msg->desired_replication_level), + 1, + GDS_NEIGHBOURS_get_id(), + GNUNET_TIME_absolute_ntoh (dht_msg->expiration), + &dht_msg->key, + &dht_msg[1], + size - sizeof (struct GNUNET_DHT_ClientPutMessage)); GNUNET_CONTAINER_bloomfilter_free (peer_bf); + pm = GNUNET_malloc (sizeof (struct PendingMessage) + + sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)); + conf = (struct GNUNET_DHT_ClientPutConfirmationMessage *) &pm[1]; + conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)); + conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK); + conf->reserved = htonl (0); + conf->unique_id = dht_msg->unique_id; + pm->msg = &conf->header; + add_pending_message (find_active_client (client), pm); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -528,11 +583,9 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, gettext_noop ("# GET requests received from clients"), 1, GNUNET_NO); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received request for %s from local client %p\n", GNUNET_h2s (&get->key), client); -#endif cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); cqr->key = get->key; cqr->client = find_active_client (client); @@ -548,6 +601,13 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, cqr->type = ntohl (get->type); GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GDS_CLIENTS_process_get (ntohl (get->options), + ntohl (get->type), + 0, + ntohl (get->desired_replication_level), + 1, + GDS_NEIGHBOURS_get_id(), + &get->key); /* start remote requests */ if (GNUNET_SCHEDULER_NO_TASK != retry_task) GNUNET_SCHEDULER_cancel (retry_task); @@ -586,18 +646,16 @@ struct RemoveByUniqueIdContext * @return GNUNET_YES (we should continue to iterate) */ static int -remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value) +remove_by_unique_id (void *cls, const struct GNUNET_HashCode * key, void *value) { const struct RemoveByUniqueIdContext *ctx = cls; struct ClientQueryRecord *record = value; if (record->unique_id != ctx->unique_id) return GNUNET_YES; -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Removing client %p's record for key %s (by unique id)\n", ctx->client->client_handle, GNUNET_h2s (key)); -#endif return remove_client_records (ctx->client, key, record); } @@ -623,10 +681,8 @@ handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client, gettext_noop ("# GET STOP requests received from clients"), 1, GNUNET_NO); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p stopped request for key %s\n", client, GNUNET_h2s (&dht_stop_msg->key)); -#endif ctx.client = find_active_client (client); ctx.unique_id = dht_stop_msg->unique_id; GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->key, @@ -636,7 +692,7 @@ handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client, /** - * Handler for monitor messages + * Handler for monitor start messages * * @param cls closure for the service * @param client the client we received this message from @@ -648,37 +704,74 @@ handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { struct ClientMonitorRecord *r; - const struct GNUNET_DHT_MonitorMessage *msg; - unsigned int i; - char *c; + const struct GNUNET_DHT_MonitorStartStopMessage *msg; - msg = (struct GNUNET_DHT_MonitorMessage *) message; + msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message; r = GNUNET_malloc (sizeof(struct ClientMonitorRecord)); r->client = find_active_client(client); r->type = ntohl(msg->type); - c = (char *) &msg->key; - for (i = 0; i < sizeof (GNUNET_HashCode) && c[i] == 0; i++); - if (sizeof (GNUNET_HashCode) == i) - r->key = NULL; + r->get = ntohs(msg->get); + r->get_resp = ntohs(msg->get_resp); + r->put = ntohs(msg->put); + if (0 == ntohs(msg->filter_key)) + r->key = NULL; else { - r->key = GNUNET_malloc (sizeof (GNUNET_HashCode)); - memcpy (r->key, &msg->key, sizeof (GNUNET_HashCode)); + r->key = GNUNET_malloc (sizeof (struct GNUNET_HashCode)); + memcpy (r->key, &msg->key, sizeof (struct GNUNET_HashCode)); } GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r); - // FIXME add remove somewhere GNUNET_SERVER_receive_done (client, GNUNET_OK); } - /** - * Task run to check for messages that need to be sent to a client. + * Handler for monitor stop messages + * + * @param cls closure for the service + * @param client the client we received this message from + * @param message the actual message received * - * @param client a ClientList, containing the client and any messages to be sent to it */ static void -process_pending_messages (struct ClientList *client); +handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + struct ClientMonitorRecord *r; + const struct GNUNET_DHT_MonitorStartStopMessage *msg; + int keys_match; + + msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message; + r = monitor_head; + + while (NULL != r) + { + if (NULL == r->key) + keys_match = (0 == ntohs(msg->filter_key)); + else + { + keys_match = (0 != ntohs(msg->filter_key) + && !memcmp(r->key, &msg->key, sizeof(struct GNUNET_HashCode))); + } + if (find_active_client(client) == r->client + && ntohl(msg->type) == r->type + && r->get == msg->get + && r->get_resp == msg->get_resp + && r->put == msg->put + && keys_match + ) + { + GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, r); + GNUNET_free_non_null (r->key); + GNUNET_free (r); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; /* Delete only ONE entry */ + } + r = r->next; + } + + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} /** @@ -706,11 +799,9 @@ send_reply_to_client (void *cls, size_t size, void *buf) if (buf == NULL) { /* client disconnected */ -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected, pending messages will be discarded\n", client->client_handle); -#endif return 0; } off = 0; @@ -721,17 +812,13 @@ send_reply_to_client (void *cls, size_t size, void *buf) reply); memcpy (&cbuf[off], reply->msg, msize); GNUNET_free (reply); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client %p\n", msize, client->client_handle); -#endif off += msize; } process_pending_messages (client); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client %p\n", (unsigned int) off, (unsigned int) size, client->client_handle); -#endif return off; } @@ -746,20 +833,16 @@ process_pending_messages (struct ClientList *client) { if ((client->pending_head == NULL) || (client->transmit_handle != NULL)) { -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not asking for transmission to %p now: %s\n", client->client_handle, client->pending_head == NULL ? "no more messages" : "request already pending"); -#endif return; } -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking for transmission of %u bytes to client %p\n", ntohs (client->pending_head->msg->size), client->client_handle); -#endif client->transmit_handle = GNUNET_SERVER_notify_transmit_ready (client->client_handle, ntohs (client->pending_head-> @@ -769,22 +852,6 @@ process_pending_messages (struct ClientList *client) } -/** - * Add a PendingMessage to the clients list of messages to be sent - * - * @param client the active client to send the message to - * @param pending_message the actual message to send - */ -static void -add_pending_message (struct ClientList *client, - struct PendingMessage *pending_message) -{ - GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail, - pending_message); - process_pending_messages (client); -} - - /** * Closure for 'forward_reply' */ @@ -831,7 +898,7 @@ struct ForwardReplyContext * if the result is mal-formed, GNUNET_NO */ static int -forward_reply (void *cls, const GNUNET_HashCode * key, void *value) +forward_reply (void *cls, const struct GNUNET_HashCode * key, void *value) { struct ForwardReplyContext *frc = cls; struct ClientQueryRecord *record = value; @@ -839,16 +906,14 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) struct GNUNET_DHT_ClientResultMessage *reply; enum GNUNET_BLOCK_EvaluationResult eval; int do_free; - GNUNET_HashCode ch; + struct GNUNET_HashCode ch; unsigned int i; if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type)) { -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Record type missmatch, not passing request for key %s to local client\n", GNUNET_h2s (key)); -#endif GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Key match, type mismatches in REPLY to CLIENT"), @@ -857,13 +922,11 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) } GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch); for (i = 0; i < record->seen_replies_count; i++) - if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (GNUNET_HashCode))) + if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (struct GNUNET_HashCode))) { -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Duplicate reply, not passing request for key %s to local client\n", GNUNET_h2s (key)); -#endif GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Duplicate REPLIES to CLIENT request dropped"), @@ -874,11 +937,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) GNUNET_BLOCK_evaluate (GDS_block_context, record->type, key, NULL, 0, record->xquery, record->xquery_size, frc->data, frc->data_size); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Evaluation result is %d for key %s for local client's query\n", (int) eval, GNUNET_h2s (key)); -#endif switch (eval) { case GNUNET_BLOCK_EVALUATION_OK_LAST: @@ -929,11 +990,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) GNUNET_NO); reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1]; reply->unique_id = record->unique_id; -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Queueing reply to query %s for client %p\n", GNUNET_h2s (key), record->client->client_handle); -#endif add_pending_message (record->client, pm); if (GNUNET_YES == do_free) remove_client_records (record->client, key, record); @@ -958,7 +1017,7 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) */ void GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, - const GNUNET_HashCode * key, + const struct GNUNET_HashCode * key, unsigned int get_path_length, const struct GNUNET_PeerIdentity *get_path, unsigned int put_path_length, @@ -1027,33 +1086,101 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, /** - * Check if some client is monitoring messages of this type and notify - * him in that case. + * Check if some client is monitoring GET messages and notify + * them in that case. * - * @param mtype Type of the DHT message. - * @param exp When will this value expire. - * @param key Key of the result/request. - * @param get_path Peers on reply path (or NULL if not recorded). + * @param options Options, for instance RecordRoute, DemultiplexEverywhere. + * @param type The type of data in the request. + * @param hop_count Hop count so far. + * @param path_length number of entries in path (or 0 if not recorded). + * @param path peers on the GET path (or NULL if not recorded). + * @param desired_replication_level Desired replication level. + * @param key Key of the requested data. + */ +void +GDS_CLIENTS_process_get (uint32_t options, + enum GNUNET_BLOCK_Type type, + uint32_t hop_count, + uint32_t desired_replication_level, + unsigned int path_length, + const struct GNUNET_PeerIdentity *path, + const struct GNUNET_HashCode * key) +{ + struct ClientMonitorRecord *m; + struct ClientList **cl; + unsigned int cl_size; + + cl = NULL; + cl_size = 0; + for (m = monitor_head; NULL != m; m = m->next) + { + if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && + (NULL == m->key || + memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) + { + struct PendingMessage *pm; + struct GNUNET_DHT_MonitorGetMessage *mmsg; + struct GNUNET_PeerIdentity *msg_path; + size_t msize; + unsigned int i; + + /* Don't send duplicates */ + for (i = 0; i < cl_size; i++) + if (cl[i] == m->client) + break; + if (i < cl_size) + continue; + GNUNET_array_append (cl, cl_size, m->client); + + msize = path_length * sizeof (struct GNUNET_PeerIdentity); + msize += sizeof (struct GNUNET_DHT_MonitorGetMessage); + msize += sizeof (struct PendingMessage); + pm = (struct PendingMessage *) GNUNET_malloc (msize); + mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1]; + pm->msg = &mmsg->header; + mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); + mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); + mmsg->options = htonl(options); + mmsg->type = htonl(type); + mmsg->hop_count = htonl(hop_count); + mmsg->desired_replication_level = htonl(desired_replication_level); + mmsg->get_path_length = htonl(path_length); + memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); + msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; + if (path_length > 0) + memcpy (msg_path, path, + path_length * sizeof (struct GNUNET_PeerIdentity)); + add_pending_message (m->client, pm); + } + } + GNUNET_free_non_null (cl); +} + + +/** + * Check if some client is monitoring GET RESP messages and notify + * them in that case. + * + * @param type The type of data in the result. + * @param get_path Peers on GET path (or NULL if not recorded). * @param get_path_length number of entries in get_path. * @param put_path peers on the PUT path (or NULL if not recorded). * @param put_path_length number of entries in get_path. - * @param desired_replication_level Desired replication level. - * @param type Type of the result/request. + * @param exp Expiration time of the data. + * @param key Key of the data. * @param data Pointer to the result data. * @param size Number of bytes in data. */ void -GDS_CLIENTS_process_monitor (uint16_t mtype, - const struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode *key, - uint32_t putl, - const struct GNUNET_PeerIdentity *put_path, - uint32_t getl, - const struct GNUNET_PeerIdentity *get_path, - uint32_t replevel, - enum GNUNET_BLOCK_Type type, - const struct GNUNET_MessageHeader *data, - uint16_t size) +GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, + const struct GNUNET_PeerIdentity *get_path, + unsigned int get_path_length, + const struct GNUNET_PeerIdentity *put_path, + unsigned int put_path_length, + struct GNUNET_TIME_Absolute exp, + const struct GNUNET_HashCode * key, + const void *data, + size_t size) { struct ClientMonitorRecord *m; struct ClientList **cl; @@ -1065,10 +1192,10 @@ GDS_CLIENTS_process_monitor (uint16_t mtype, { if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && (NULL == m->key || - memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) + memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) { struct PendingMessage *pm; - struct GNUNET_DHT_MonitorMessage *mmsg; + struct GNUNET_DHT_MonitorGetRespMessage *mmsg; struct GNUNET_PeerIdentity *path; size_t msize; unsigned int i; @@ -1082,28 +1209,116 @@ GDS_CLIENTS_process_monitor (uint16_t mtype, GNUNET_array_append (cl, cl_size, m->client); msize = size; - msize += (getl + putl) * sizeof (struct GNUNET_PeerIdentity); - msize += sizeof (struct GNUNET_DHT_MonitorMessage); + msize += (get_path_length + put_path_length) + * sizeof (struct GNUNET_PeerIdentity); + msize += sizeof (struct GNUNET_DHT_MonitorGetRespMessage); msize += sizeof (struct PendingMessage); pm = (struct PendingMessage *) GNUNET_malloc (msize); - mmsg = (struct GNUNET_DHT_MonitorMessage *) &pm[1]; + mmsg = (struct GNUNET_DHT_MonitorGetRespMessage *) &pm[1]; pm->msg = (struct GNUNET_MessageHeader *) mmsg; mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); - mmsg->header.type = htons (mtype); - mmsg->expiration = GNUNET_TIME_absolute_hton(exp); - memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); - mmsg->put_path_length = htonl(putl); - mmsg->get_path_length = htonl(getl); + mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP); + mmsg->type = htonl(type); + mmsg->put_path_length = htonl(put_path_length); + mmsg->get_path_length = htonl(get_path_length); path = (struct GNUNET_PeerIdentity *) &mmsg[1]; - if (putl > 0) + if (put_path_length > 0) { - memcpy (path, put_path, putl * sizeof (struct GNUNET_PeerIdentity)); - path = &path[putl]; + memcpy (path, put_path, + put_path_length * sizeof (struct GNUNET_PeerIdentity)); + path = &path[put_path_length]; } - if (getl > 0) - memcpy (path, get_path, getl * sizeof (struct GNUNET_PeerIdentity)); + if (get_path_length > 0) + memcpy (path, get_path, + get_path_length * sizeof (struct GNUNET_PeerIdentity)); + mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp); + memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); if (size > 0) - memcpy (&path[getl], data, size); + memcpy (&path[get_path_length], data, size); + add_pending_message (m->client, pm); + } + } + GNUNET_free_non_null (cl); +} + + +/** + * Check if some client is monitoring PUT messages and notify + * them in that case. + * + * @param options Options, for instance RecordRoute, DemultiplexEverywhere. + * @param type The type of data in the request. + * @param hop_count Hop count so far. + * @param path_length number of entries in path (or 0 if not recorded). + * @param path peers on the PUT path (or NULL if not recorded). + * @param desired_replication_level Desired replication level. + * @param exp Expiration time of the data. + * @param key Key under which data is to be stored. + * @param data Pointer to the data carried. + * @param size Number of bytes in data. + */ +void +GDS_CLIENTS_process_put (uint32_t options, + enum GNUNET_BLOCK_Type type, + uint32_t hop_count, + uint32_t desired_replication_level, + unsigned int path_length, + const struct GNUNET_PeerIdentity *path, + struct GNUNET_TIME_Absolute exp, + const struct GNUNET_HashCode * key, + const void *data, + size_t size) +{ + struct ClientMonitorRecord *m; + struct ClientList **cl; + unsigned int cl_size; + + cl = NULL; + cl_size = 0; + for (m = monitor_head; NULL != m; m = m->next) + { + if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && + (NULL == m->key || + memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) + { + struct PendingMessage *pm; + struct GNUNET_DHT_MonitorPutMessage *mmsg; + struct GNUNET_PeerIdentity *msg_path; + size_t msize; + unsigned int i; + + /* Don't send duplicates */ + for (i = 0; i < cl_size; i++) + if (cl[i] == m->client) + break; + if (i < cl_size) + continue; + GNUNET_array_append (cl, cl_size, m->client); + + msize = size; + msize += path_length * sizeof (struct GNUNET_PeerIdentity); + msize += sizeof (struct GNUNET_DHT_MonitorPutMessage); + msize += sizeof (struct PendingMessage); + pm = (struct PendingMessage *) GNUNET_malloc (msize); + mmsg = (struct GNUNET_DHT_MonitorPutMessage *) &pm[1]; + pm->msg = (struct GNUNET_MessageHeader *) mmsg; + mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); + mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT); + mmsg->options = htonl(options); + mmsg->type = htonl(type); + mmsg->hop_count = htonl(hop_count); + mmsg->desired_replication_level = htonl(desired_replication_level); + mmsg->put_path_length = htonl(path_length); + msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; + if (path_length > 0) + { + memcpy (msg_path, path, + path_length * sizeof (struct GNUNET_PeerIdentity)); + } + mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp); + memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); + if (size > 0) + memcpy (&msg_path[path_length], data, size); add_pending_message (m->client, pm); } } @@ -1128,8 +1343,11 @@ GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server) GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, sizeof (struct GNUNET_DHT_ClientGetStopMessage)}, {&handle_dht_local_monitor, NULL, - GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET, - sizeof (struct GNUNET_DHT_MonitorMessage)}, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, + sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, + {&handle_dht_local_monitor_stop, NULL, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, + sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, {NULL, NULL, 0, 0} }; forward_map = GNUNET_CONTAINER_multihashmap_create (1024); @@ -1152,12 +1370,18 @@ GDS_CLIENTS_done () GNUNET_SCHEDULER_cancel (retry_task); retry_task = GNUNET_SCHEDULER_NO_TASK; } - GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap)); - GNUNET_CONTAINER_heap_destroy (retry_heap); - retry_heap = NULL; - GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map)); - GNUNET_CONTAINER_multihashmap_destroy (forward_map); - forward_map = NULL; + if (NULL != retry_heap) + { + GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap)); + GNUNET_CONTAINER_heap_destroy (retry_heap); + retry_heap = NULL; + } + if (NULL != forward_map) + { + GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map)); + GNUNET_CONTAINER_multihashmap_destroy (forward_map); + forward_map = NULL; + } } /* end of gnunet-service-dht_clients.c */