From: Christian Grothoff Date: Tue, 15 Feb 2011 14:01:44 +0000 (+0000) Subject: stuff X-Git-Tag: initial-import-from-subversion-38251~19132 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=0ad72d359ab3ea5f232f4a8a0eb04700e8c84b49;p=oweals%2Fgnunet.git stuff --- diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index f9a642199..d88598be7 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -32,6 +32,11 @@ */ #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) +/** + * After how long do we discard a reply? + */ +#define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2) + /** * Handle to cancel a transmission request. @@ -444,6 +449,43 @@ GSF_handle_p2p_migration_stop_ (void *cls, } +/** + * Copy reply and free put message. + * + * @param cls the 'struct PutMessage' + * @param buf_size number of bytes available in buf + * @param buf where to copy the message, NULL on error (peer disconnect) + * @return number of bytes copied to 'buf', can be 0 (without indicating an error) + */ +static size_t +copy_reply (void *cls, + size_t buf_size, + void *buf) +{ + struct PutMessage *pm = cls; + + if (buf != NULL) + { + GNUNET_assert (size >= ntohs (pm->header.size)); + size = ntohs (pm->header.size); + memcpy (buf, pm, size); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# replies transmitted to other peers"), + 1, + GNUNET_NO); + } + else + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# replies dropped"), + 1, + GNUNET_NO); + } + GNUNET_free (pm); + return size; +} + + /** * Handle a reply to a pending request. Also called if a request * expires (then with data == NULL). The handler may be called @@ -455,84 +497,62 @@ GSF_handle_p2p_migration_stop_ (void *cls, * @param cls 'struct GSF_ConnectedPeer' of the peer that would * have liked an answer to the request * @param pr handle to the original pending request + * @param expiration when does 'data' expire? * @param data response data, NULL on request expiration * @param data_len number of bytes in data + * @param more GNUNET_YES if the request remains active (may call + * this function again), GNUNET_NO if the request is + * finished (client must not call GSF_pending_request_cancel_) */ static void handle_p2p_reply (void *cls, struct GSF_PendingRequest *pr, + struct GNUNET_TIME_Absolute expiration, const void *data, - size_t data_len) + size_t data_len, + int more) { struct GSF_ConnectedPeer *cp = cls; + struct GSF_PendingRequest *prd; + struct PutMessage *pm; + size_t msize; -#if SUPPORT_DELAYS - struct GNUNET_TIME_Relative art_delay; -#endif - - /* FIXME: adapt code fragments below to new API! */ + prd = GSF_pending_request_get_data_ (pr); if (NULL == data) { - /* FIXME: request expired! clean up! */ + GNUNET_assert (GNUNET_NO == more); GNUNET_STATISTICS_update (stats, gettext_noop ("# P2P searches active"), -1, GNUNET_NO); + GNUNET_break (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (cp->request_map, + &prd->query, + pr)); return; } - - /* reply will go over the network, check for cover traffic */ - if ( (prq->anonymity_level > 1) && - (cover_content_count < prq->anonymity_level - 1) ) - { - /* insufficient cover traffic, skip */ - GNUNET_STATISTICS_update (stats, - gettext_noop ("# replies suppressed due to lack of cover traffic"), - 1, - GNUNET_NO); - return GNUNET_YES; - } - if (prq->anonymity_level > 1) - cover_content_count -= prq->anonymity_level - 1; - - - cp = pr->cp; #if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting result for query `%s' to other peer (PID=%u)\n", - GNUNET_h2s (key), - (unsigned int) cp->pid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmitting result for query `%s'\n", + GNUNET_h2s (key)); #endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# replies received for other peers"), - 1, - GNUNET_NO); - msize = sizeof (struct PutMessage) + prq->size; - reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); - reply->cont = &transmit_reply_continuation; - reply->cont_cls = pr; -#if SUPPORT_DELAYS - art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - TTL_DECREMENT)); - reply->delay_until - = GNUNET_TIME_relative_to_absolute (art_delay); - GNUNET_STATISTICS_update (stats, - gettext_noop ("cummulative artificial delay introduced (ms)"), - art_delay.abs_value, - GNUNET_NO); -#endif - reply->msize = msize; - reply->priority = UINT32_MAX; /* send replies first! */ - pm = (struct PutMessage*) &reply[1]; - pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); - pm->header.size = htons (msize); - pm->type = htonl (prq->type); - pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); - memcpy (&pm[1], prq->data, prq->size); - add_to_pending_messages_for_peer (cp, reply, pr); - - + GNUNET_STATISTICS_update (stats, + gettext_noop ("# replies received for other peers"), + 1, + GNUNET_NO); + msize = sizeof (struct PutMessage) + data_len; + pm = GNUNET_malloc (sizeof (msize)); + pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); + pm->header.size = htons (msize); + pm->type = htonl (prd->type); + pm->expiration = GNUNET_TIME_absolute_hton (expiration); + memcpy (&pm[1], data, data_len); + (void) GSF_peer_transmit_ (cp, GNUNET_NO, + UINT32_MAX, + REPLY_TIMEOUT, + msize, + ©_reply, + pm); } diff --git a/src/fs/gnunet-service-fs_lc.c b/src/fs/gnunet-service-fs_lc.c index 469475fe0..e8d6bda47 100644 --- a/src/fs/gnunet-service-fs_lc.c +++ b/src/fs/gnunet-service-fs_lc.c @@ -190,13 +190,26 @@ GSF_local_client_lookup_ (struct GNUNET_SERVER_Client *client) * @param pr handle to the original pending request * @param data response data, NULL on request expiration * @param data_len number of bytes in data + * @param more GNUNET_YES if the request remains active (may call + * this function again), GNUNET_NO if the request is + * finished (client must not call GSF_pending_request_cancel_) */ static void client_response_handler (void *cls, struct GSF_PendingRequest *pr, const void *data, - size_t data_len) + size_t data_len, + int more) { + struct ClientRequest *cr = cls; + + if (NULL == data) + { + /* ugh, request 'timed out' -- how can this be? */ + GNUNET_break (0); + GNUNET_assert (GNUNET_NO == more); + return; + } /* FIXME: adapt old code below to new API! */ GNUNET_STATISTICS_update (stats, diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index d2248989f..4dc00f54c 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -615,7 +615,6 @@ update_request_performance_data (struct ProcessReplyClosure *prq, } - /** * We have received a reply; handle it! * @@ -663,30 +662,13 @@ process_reply (void *cls, break; case GNUNET_BLOCK_EVALUATION_OK_LAST: update_request_performance_data (prq, pr); - /* FIXME: adapt code to new API! */ - while (NULL != pr->pending_head) - destroy_pending_message_list_entry (pr->pending_head); - if (pr->qe != NULL) - { - if (pr->client_request_list != NULL) - GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, - GNUNET_YES); - GNUNET_DATASTORE_cancel (pr->qe); - pr->qe = NULL; - } - pr->do_remove = GNUNET_YES; - if (pr->task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (pr->task); - pr->task = GNUNET_SCHEDULER_NO_TASK; - } - GNUNET_break (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (query_request_map, - key, - pr)); GNUNET_LOAD_update (rt_entry_lifetime, GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value); - break; + /* pass on to other peers / local clients */ + pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_NO); + /* destroy request, we're done */ + GSF_pending_request_cancel_ (pr); + return GNUNET_YES; case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: GNUNET_STATISTICS_update (stats, gettext_noop ("# duplicate replies discarded (bloomfilter)"), @@ -741,7 +723,7 @@ process_reply (void *cls, pr->results_found++; prq->request_found = GNUNET_YES; /* finally, pass on to other peers / local clients */ - pr->rh (pr->rh_cls, pr, prq->data, prq->size); + pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_YES); return GNUNET_YES; } @@ -975,6 +957,7 @@ void GSF_pending_request_init_ () { pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024); + requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); } @@ -989,6 +972,8 @@ GSF_pending_request_done_ () NULL); GNUNET_CONTAINER_multihashmap_destroy (pr_map); pr_map = NULL; + GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap); + requests_by_expiration_heap = NULL; } diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h index bb6920ab1..2cb7cb843 100644 --- a/src/fs/gnunet-service-fs_pr.h +++ b/src/fs/gnunet-service-fs_pr.h @@ -139,13 +139,19 @@ struct GSF_PendingRequestData * * @param cls user-specified closure * @param pr handle to the original pending request + * @param expiration when does 'data' expire? * @param data response data, NULL on request expiration * @param data_len number of bytes in data + * @param more GNUNET_YES if the request remains active (may call + * this function again), GNUNET_NO if the request is + * finished (client must not call GSF_pending_request_cancel_) */ typedef void (*GSF_PendingRequestReplyHandler)(void *cls, struct GSF_PendingRequest *pr, + struct GNUNET_TIME_Absolute expiration, const void *data, - size_t data_len); + size_t data_len, + int more); /**