From f54389f6724ecbd39389d53fba7b3bfdb2e0a8eb Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 10 Feb 2011 12:59:38 +0000 Subject: [PATCH] stuff --- src/fs/gnunet-service-fs.h | 30 +- src/fs/gnunet-service-fs_cp.c | 446 +++++++++++++++++- src/fs/gnunet-service-fs_cp.h | 27 ++ src/fs/gnunet-service-fs_lc.c | 64 +++ src/fs/gnunet-service-fs_pr.c | 807 ++++++++++++++++++++++++++++++-- src/fs/gnunet-service-fs_pr.h | 62 ++- src/fs/gnunet-service-fs_push.c | 475 +++++++++++++++++++ src/fs/gnunet-service-fs_put.c | 197 ++++++++ 8 files changed, 2030 insertions(+), 78 deletions(-) create mode 100644 src/fs/gnunet-service-fs_push.c create mode 100644 src/fs/gnunet-service-fs_put.c diff --git a/src/fs/gnunet-service-fs.h b/src/fs/gnunet-service-fs.h index ca8d4cdc5..52579188d 100644 --- a/src/fs/gnunet-service-fs.h +++ b/src/fs/gnunet-service-fs.h @@ -31,18 +31,44 @@ */ struct GSF_ConnectedPeer; - /** * An active request. */ struct GSF_PendingRequest; - /** * A local client. */ struct GSF_LocalClient; +/** + * Our connection to the datastore. + */ +extern struct GNUNET_DATASTORE_Handle *GSF_dsh; + +/** + * Our configuration. + */ +extern const struct GNUNET_CONFIGURATION_Handle *GSF_cfg; + +/** + * Handle for reporting statistics. + */ +extern struct GNUNET_STATISTICS_Handle *GSF_stats; + +/** + * Pointer to handle to the core service (points to NULL until we've + * connected to it). + */ +extern struct GNUNET_CORE_Handle *GSF_core; + +/** + * Handle for DHT operations. + */ +static struct GNUNET_DHT_Handle *GSF_dht; + + + #endif /* end of gnunet-service-fs.h */ diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index 2361cd4fc..903549cb7 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -140,11 +140,6 @@ struct GSF_ConnectedPeer */ uint64_t inc_preference; - /** - * Trust rating for this peer - */ - uint32_t trust; - /** * Trust rating for this peer on disk. */ @@ -248,6 +243,19 @@ update_atsi (struct GSF_ConnectedPeer *cp, } +/** + * Return the performance data record for the given peer + * + * @param cp peer to query + * @return performance data record for the peer + */ +struct GSF_PeerPerformanceData * +GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp) +{ + return &cp->ppd; +} + + /** * Core is ready to transmit to a peer, get the message. * @@ -420,12 +428,94 @@ GSF_handle_p2p_migration_stop_ (void *cls, GNUNET_break (0); return GNUNET_OK; } - cp->ppd.migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration)); + cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration)); update_atsi (cp, atsi); return GNUNET_OK; } +/** + * Handle a reply to a pending request. Also called if a request + * expires (then with data == NULL). The handler may be called + * many times (depending on the request type), but will not be + * called during or after a call to GSF_pending_request_cancel + * and will also not be called anymore after a call signalling + * expiration. + * + * @param cls user-specified closure + * @param pr handle to the original pending request + * @param data response data, NULL on request expiration + * @param data_len number of bytes in data + */ +static void +handle_p2p_reply (void *cls, + struct GSF_PendingRequest *pr, + const void *data, + size_t data_len) +{ +#if SUPPORT_DELAYS + struct GNUNET_TIME_Relative art_delay; +#endif + + /* FIXME: adapt code fragments below to new API! */ + + + /* reply will go over the network, check for cover traffic */ + if ( (prq->anonymity_level > 1) && + (cover_content_count < prq->anonymity_level - 1) ) + { + /* insufficient cover traffic, skip */ + GNUNET_STATISTICS_update (stats, + gettext_noop ("# replies suppressed due to lack of cover traffic"), + 1, + GNUNET_NO); + return GNUNET_YES; + } + if (prq->anonymity_level > 1) + cover_content_count -= prq->anonymity_level - 1; + + + cp = pr->cp; +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmitting result for query `%s' to other peer (PID=%u)\n", + GNUNET_h2s (key), + (unsigned int) cp->pid); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# replies received for other peers"), + 1, + GNUNET_NO); + msize = sizeof (struct PutMessage) + prq->size; + reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); + reply->cont = &transmit_reply_continuation; + reply->cont_cls = pr; +#if SUPPORT_DELAYS + art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + TTL_DECREMENT)); + reply->delay_until + = GNUNET_TIME_relative_to_absolute (art_delay); + GNUNET_STATISTICS_update (stats, + gettext_noop ("cummulative artificial delay introduced (ms)"), + art_delay.abs_value, + GNUNET_NO); +#endif + reply->msize = msize; + reply->priority = UINT32_MAX; /* send replies first! */ + pm = (struct PutMessage*) &reply[1]; + pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); + pm->header.size = htons (msize); + pm->type = htonl (prq->type); + pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); + memcpy (&pm[1], prq->data, prq->size); + add_to_pending_messages_for_peer (cp, reply, pr); + + +} + + + /** * Handle P2P "QUERY" message. * @@ -438,9 +528,310 @@ struct GSF_PendingRequest * GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, const struct GNUNET_MessageHeader *message) { + /* FIXME: adapt old code to new API! */ + struct PendingRequest *pr; + struct ConnectedPeer *cp; + struct ConnectedPeer *cps; + struct CheckDuplicateRequestClosure cdc; + struct GNUNET_TIME_Relative timeout; + uint16_t msize; + const struct GetMessage *gm; + unsigned int bits; + const GNUNET_HashCode *opt; + uint32_t bm; + size_t bfsize; + uint32_t ttl_decrement; + int32_t priority; + enum GNUNET_BLOCK_Type type; + int have_ns; + + msize = ntohs(message->size); + if (msize < sizeof (struct GetMessage)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + gm = (const struct GetMessage*) message; +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received request for `%s'\n", + GNUNET_h2s (&gm->query)); +#endif + type = ntohl (gm->type); + bm = ntohl (gm->hash_bitmap); + bits = 0; + while (bm > 0) + { + if (1 == (bm & 1)) + bits++; + bm >>= 1; + } + if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + opt = (const GNUNET_HashCode*) &gm[1]; + bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode); + /* bfsize must be power of 2, check! */ + if (0 != ( (bfsize - 1) & bfsize)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + cover_query_count++; + bm = ntohl (gm->hash_bitmap); + bits = 0; + cps = GNUNET_CONTAINER_multihashmap_get (connected_peers, + &other->hashPubKey); + if (NULL == cps) + { + /* peer must have just disconnected */ + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests dropped due to initiator not being connected"), + 1, + GNUNET_NO); + return GNUNET_SYSERR; + } + if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) + cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, + &opt[bits++]); + else + cp = cps; + if (cp == NULL) + { +#if DEBUG_FS + if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n", + GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1])); + + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to find peer `%4s' in connection set. Dropping query.\n", + GNUNET_i2s (other)); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests dropped due to missing reverse route"), + 1, + GNUNET_NO); + /* FIXME: try connect? */ + return GNUNET_OK; + } + /* note that we can really only check load here since otherwise + peers could find out that we are overloaded by not being + disconnected after sending us a malformed query... */ + priority = bound_priority (ntohl (gm->priority), cps); + if (priority < 0) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Dropping query from `%s', this peer is too busy.\n", + GNUNET_i2s (other)); +#endif + return GNUNET_OK; + } +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received request for `%s' of type %u from peer `%4s' with flags %u\n", + GNUNET_h2s (&gm->query), + (unsigned int) type, + GNUNET_i2s (other), + (unsigned int) bm); +#endif + have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)); + pr = GNUNET_malloc (sizeof (struct PendingRequest) + + (have_ns ? sizeof(GNUNET_HashCode) : 0)); + if (have_ns) + { + pr->namespace = (GNUNET_HashCode*) &pr[1]; + memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode)); + } + if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) || + (GNUNET_LOAD_get_average (cp->transmission_delay) > + GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) ) + { + /* don't have BW to send to peer, or would likely take longer than we have for it, + so at best indirect the query */ + priority = 0; + pr->forward_only = GNUNET_YES; + } + pr->type = type; + pr->mingle = ntohl (gm->filter_mutator); + if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) + pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]); + pr->anonymity_level = 1; + pr->priority = (uint32_t) priority; + pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority); + pr->query = gm->query; + /* decrement ttl (always) */ + ttl_decrement = 2 * TTL_DECREMENT + + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + TTL_DECREMENT); + if ( (pr->ttl < 0) && + (((int32_t)(pr->ttl - ttl_decrement)) > 0) ) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Dropping query from `%s' due to TTL underflow (%d - %u).\n", + GNUNET_i2s (other), + pr->ttl, + ttl_decrement); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests dropped due TTL underflow"), + 1, + GNUNET_NO); + /* integer underflow => drop (should be very rare)! */ + GNUNET_free (pr); + return GNUNET_OK; + } + pr->ttl -= ttl_decrement; + pr->start_time = GNUNET_TIME_absolute_get (); + + /* get bloom filter */ + if (bfsize > 0) + { + pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits], + bfsize, + BLOOMFILTER_K); + pr->bf_size = bfsize; + } + cdc.have = NULL; + cdc.pr = pr; + GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, + &gm->query, + &check_duplicate_request_peer, + &cdc); + if (cdc.have != NULL) + { + if (cdc.have->start_time.abs_value + cdc.have->ttl >= + pr->start_time.abs_value + pr->ttl) + { + /* existing request has higher TTL, drop new one! */ + cdc.have->priority += pr->priority; + destroy_pending_request (pr); +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Have existing request with higher TTL, dropping new request.\n", + GNUNET_i2s (other)); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests dropped due to higher-TTL request"), + 1, + GNUNET_NO); + return GNUNET_OK; + } + else + { + /* existing request has lower TTL, drop old one! */ + pr->priority += cdc.have->priority; + /* Possible optimization: if we have applicable pending + replies in 'cdc.have', we might want to move those over + (this is a really rare special-case, so it is not clear + that this would be worth it) */ + destroy_pending_request (cdc.have); + /* keep processing 'pr'! */ + } + } + + pr->cp = cp; + GNUNET_break (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (query_request_map, + &gm->query, + pr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); + GNUNET_break (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (peer_request_map, + &other->hashPubKey, + pr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); + + pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, + pr, + pr->start_time.abs_value + pr->ttl); + + GNUNET_STATISTICS_update (stats, + gettext_noop ("# P2P searches received"), + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# P2P searches active"), + 1, + GNUNET_NO); + + /* calculate change in traffic preference */ + cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE; + /* process locally */ + if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK) + type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */ + timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY, + (pr->priority + 1)); + if (GNUNET_YES != pr->forward_only) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Handing request for `%s' to datastore\n", + GNUNET_h2s (&gm->query)); +#endif + pr->qe = GNUNET_DATASTORE_get (dsh, + &gm->query, + type, + pr->priority + 1, + MAX_DATASTORE_QUEUE, + timeout, + &process_local_reply, + pr); + if (NULL == pr->qe) + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests dropped by datastore (queue length limit)"), + 1, + GNUNET_NO); + } + } + else + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests forwarded due to high load"), + 1, + GNUNET_NO); + } + + /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */ + switch (pr->type) + { + case GNUNET_BLOCK_TYPE_FS_DBLOCK: + case GNUNET_BLOCK_TYPE_FS_IBLOCK: + /* only one result, wait for datastore */ + if (GNUNET_YES != pr->forward_only) + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests not instantly forwarded (waiting for datastore)"), + 1, + GNUNET_NO); + break; + } + default: + if (pr->task == GNUNET_SCHEDULER_NO_TASK) + pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task, + pr); + } + + /* make sure we don't track too many requests */ + if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) + { + pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap); + GNUNET_assert (pr != NULL); + destroy_pending_request (pr); + } + return GNUNET_OK; + + + // FIXME! // parse request - // setup pending request + // setup pending request (use 'handle_p2p_reply') // track pending request to cancel it on peer disconnect (!) // return it! // (actual planning & execution up to caller!) @@ -814,6 +1205,41 @@ GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp, } +/** + * Ask a peer to stop migrating data to us until the given point + * in time. + * + * @param cp peer to ask + * @param block_time until when to block + */ +void +GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, + struct GNUNET_TIME_Relative block_time) +{ + struct PendingMessage *pm; + struct MigrationStopMessage *msm; + + if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value) + return; /* already blocked */ + cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time); + + /* FIXME: adapt old code below to new API! */ + pm = GNUNET_malloc (sizeof (struct PendingMessage) + + sizeof (struct MigrationStopMessage)); + pm->msize = sizeof (struct MigrationStopMessage); + pm->priority = UINT32_MAX; + msm = (struct MigrationStopMessage*) &pm[1]; + msm->header.size = htons (sizeof (struct MigrationStopMessage)); + msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); + msm->duration = GNUNET_TIME_relative_hton (block_time); + add_to_pending_messages_for_peer (cp, + pm, + NULL); +} + + + + /** * Write host-trust information to a file - flush the buffer entry! * @@ -965,9 +1391,9 @@ GSF_connected_peer_done_ () * @return GNUNET_YES (we should continue to iterate) */ static int -clean_peer (void *cls, - const GNUNET_HashCode * key, - void *value) +clean_local_client (void *cls, + const GNUNET_HashCode * key, + void *value) { const struct GSF_LocalClient *lc = cls; struct GSF_ConnectedPeer *cp = value; diff --git a/src/fs/gnunet-service-fs_cp.h b/src/fs/gnunet-service-fs_cp.h index 9bf36186c..f08e31a72 100644 --- a/src/fs/gnunet-service-fs_cp.h +++ b/src/fs/gnunet-service-fs_cp.h @@ -89,6 +89,11 @@ struct GSF_PeerPerformanceData */ double avg_priority; + /** + * Trust rating for this peer + */ + uint32_t trust; + /** * Number of pending queries (replies are not counted) */ @@ -264,6 +269,28 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, const struct GNUNET_MessageHeader *message); +/** + * Return the performance data record for the given peer + * + * @param cp peer to query + * @return performance data record for the peer + */ +struct GSF_PeerPerformanceData * +GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp); + + +/** + * Ask a peer to stop migrating data to us until the given point + * in time. + * + * @param cp peer to ask + * @param block_time until when to block + */ +void +GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, + struct GNUNET_TIME_Relative block_time); + + /** * A peer disconnected from us. Tear down the connected peer * record. diff --git a/src/fs/gnunet-service-fs_lc.c b/src/fs/gnunet-service-fs_lc.c index 9c9c0d568..ea33580f9 100644 --- a/src/fs/gnunet-service-fs_lc.c +++ b/src/fs/gnunet-service-fs_lc.c @@ -178,6 +178,70 @@ GSF_local_client_lookup_ (struct GNUNET_SERVER_Client *client) } +/** + * Handle a reply to a pending request. Also called if a request + * expires (then with data == NULL). The handler may be called + * many times (depending on the request type), but will not be + * called during or after a call to GSF_pending_request_cancel + * and will also not be called anymore after a call signalling + * expiration. + * + * @param cls user-specified closure + * @param pr handle to the original pending request + * @param data response data, NULL on request expiration + * @param data_len number of bytes in data + */ +static void +client_response_handler (void *cls, + struct GSF_PendingRequest *pr, + const void *data, + size_t data_len) +{ + /* FIXME: adapt old code below to new API! */ + + GNUNET_STATISTICS_update (stats, + gettext_noop ("# replies received for local clients"), + 1, + GNUNET_NO); + cl = pr->client_request_list->client_list; + msize = sizeof (struct PutMessage) + prq->size; + creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage)); + creply->msize = msize; + creply->client_list = cl; + GNUNET_CONTAINER_DLL_insert_after (cl->res_head, + cl->res_tail, + cl->res_tail, + creply); + pm = (struct PutMessage*) &creply[1]; + pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); + pm->header.size = htons (msize); + pm->type = htonl (prq->type); + pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); + memcpy (&pm[1], prq->data, prq->size); + if (NULL == cl->th) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmitting result for query `%s' to client\n", + GNUNET_h2s (key)); +#endif + cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_to_client, + cl); + } + GNUNET_break (cl->th != NULL); + if (pr->do_remove) + { + prq->finished = GNUNET_YES; + destroy_pending_request (pr); + } + +} + + + /** * Handle START_SEARCH-message (search request from local client). * diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index ad19a807e..047c07587 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -37,16 +37,39 @@ struct GSF_PendingRequest */ struct GSF_PendingRequestData public_data; + /** + * Function to call if we encounter a reply. + */ GSF_PendingRequestReplyHandler rh; + /** + * Closure for 'rh' + */ void *rh_cls; - const GNUNET_HashCode *replies_seen; + /** + * Array of hash codes of replies we've already seen. + */ + GNUNET_HashCode *replies_seen; + /** + * Bloomfilter masking replies we've already seen. + */ struct GNUNET_CONTAINER_BloomFilter *bf; + /** + * Number of valid entries in the 'replies_seen' array. + */ unsigned int replies_seen_count; + /** + * Length of the 'replies_seen' array. + */ + unsigned int replies_seen_size; + + /** + * Mingle value we currently use for the bf. + */ int32_t mingle; }; @@ -56,7 +79,99 @@ struct GSF_PendingRequest * All pending requests, ordered by the query. Entries * are of type 'struct GSF_PendingRequest*'. */ -static struct GNUNET_CONTAINER_MultiHashMap *requests; +static struct GNUNET_CONTAINER_MultiHashMap *pr_map; + + +/** + * Datastore 'PUT' load tracking. + */ +static struct GNUNET_LOAD_Value *datastore_put_load; + + +/** + * Are we allowed to migrate content to this peer. + */ +static int active_to_migration; + + +/** + * Heap with the request that will expire next at the top. Contains + * pointers of type "struct PendingRequest*"; these will *also* be + * aliased from the "requests_by_peer" data structures and the + * "requests_by_query" table. Note that requests from our clients + * don't expire and are thus NOT in the "requests_by_expiration" + * (or the "requests_by_peer" tables). + */ +static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap; + + +/** + * How many bytes should a bloomfilter be if we have already seen + * entry_count responses? Note that BLOOMFILTER_K gives us the number + * of bits set per entry. Furthermore, we should not re-size the + * filter too often (to keep it cheap). + * + * Since other peers will also add entries but not resize the filter, + * we should generally pick a slightly larger size than what the + * strict math would suggest. + * + * @return must be a power of two and smaller or equal to 2^15. + */ +static size_t +compute_bloomfilter_size (unsigned int entry_count) +{ + size_t size; + unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4; + uint16_t max = 1 << 15; + + if (entry_count > max) + return max; + size = 8; + while ((size < max) && (size < ideal)) + size *= 2; + if (size > max) + return max; + return size; +} + + +/** + * Recalculate our bloom filter for filtering replies. This function + * will create a new bloom filter from scratch, so it should only be + * called if we have no bloomfilter at all (and hence can create a + * fresh one of minimal size without problems) OR if our peer is the + * initiator (in which case we may resize to larger than mimimum size). + * + * @param pr request for which the BF is to be recomputed + * @return GNUNET_YES if a refresh actually happened + */ +static int +refresh_bloomfilter (struct GSF_PendingRequest *pr) +{ + unsigned int i; + size_t nsize; + GNUNET_HashCode mhash; + + nsize = compute_bloomfilter_size (pr->replies_seen_off); + if ( (bf != NULL) && + (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) ) + return GNUNET_NO; /* size not changed */ + if (pr->bf != NULL) + GNUNET_CONTAINER_bloomfilter_free (pr->bf); + pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + UINT32_MAX); + pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, + nsize, + BLOOMFILTER_K); + for (i=0;ireplies_seen_count;i++) + { + GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i], + pr->mingle, + &mhash); + GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash); + } + return GNUNET_YES; +} /** @@ -92,7 +207,54 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, GSF_PendingRequestReplyHandler rh, void *rh_cls) { - return NULL; // FIXME + struct GSF_PendingRequest *pr; + + + pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); + pr->public_data.query = *query; + if (GNUNET_BLOCK_TYPE_SBLOCK == type) + { + GNUNET_assert (NULL != namespace); + pr->public_data.namespace = *namespace; + } + if (NULL != target) + { + pr->public_data.target = *target; + pr->has_target = GNUNET_YES; + } + pr->public_data.anonymity_level = anonymity_data; + pr->public_data.priority = priority; + pr->public_data.options = options; + pr->public_data.type = type; + pr->rh = rh; + pr->rh_cls = rh_cls; + if (replies_seen_count > 0) + { + pr->replies_seen_size = replies_seen_count; + pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size); + memcpy (pr->replies_seen, + replies_seen, + replies_seen_count * sizeof (struct GNUNET_HashCode)); + pr->replies_seen_count = replies_seen_count; + } + if (NULL != bf) + { + pr->bf = GNUNET_CONTAINER_bloomfilter_copy (bf); + pr->mingle = mingle; + } + else if ( (replies_seen_count > 0) && + (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) ) + { + GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr)); + } + GNUNET_CONTAINER_multihashmap_put (pr_map, + query, + pr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + // FIXME: if not a local query, we also need to track the + // total number of external queries we currently have and + // bound it => need an additional heap! + return pr; } @@ -109,34 +271,54 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, const GNUNET_HashCode *replies_seen, unsigned int replies_seen_count) { - // FIXME -} - - - -/** - * Get the query for a given pending request. - * - * @param pr the request - * @return pointer to the query (only valid as long as pr is valid) - */ -const GNUNET_HashCode * -GSF_pending_request_get_query_ (const struct GSF_PendingRequest *pr) -{ - return NULL; // FIXME -} - - -/** - * Get the type of a given pending request. - * - * @param pr the request - * @return query type - */ -enum GNUNET_BLOCK_Type -GSF_pending_request_get_type_ (const struct GSF_PendingRequest *pr) -{ - return 0; // FIXME + unsigned int i; + GNUNET_HashCode mhash; + + if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count) + return; /* integer overflow */ + if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) + { + /* we're responsible for the BF, full refresh */ + if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size) + GNUNET_array_grow (pr->replies_seen, + pr->replies_seen_size, + replies_seen_count + pr->replies_seen_count); + memcpy (&pr->replies_seen[pr->replies_seen_count], + replies_seen, + sizeof (GNUNET_HashCode) * replies_seen_count); + pr->replies_seen_count += replies_seen; + if (GNUNET_NO == refresh_bloomfilter (pr)) + { + /* bf not recalculated, simply extend it with new bits */ + for (i=0;ireplies_seen_count;i++) + { + GNUNET_BLOCK_mingle_hash (&replies_seen[i], + pr->mingle, + &mhash); + GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash); + } + } + } + else + { + if (NULL == pr->bf) + { + /* we're not the initiator, but the initiator did not give us + any bloom-filter, so we need to create one on-the-fly */ + pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + UINT32_MAX); + pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count), + pr->mingle, + BLOOMFILTER_K); + } + for (i=0;ireplies_seen_count;i++) + { + GNUNET_BLOCK_mingle_hash (&replies_seen[i], + pr->mingle, + &mhash); + GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash); + } + } } @@ -145,16 +327,102 @@ GSF_pending_request_get_type_ (const struct GSF_PendingRequest *pr) * transmission to other peers (or at least determine its size). * * @param pr request to generate the message for + * @param do_route are we routing the reply * @param buf_size number of bytes available in buf * @param buf where to copy the message (can be NULL) * @return number of bytes needed (if > buf_size) or used */ size_t GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, + int do_route, size_t buf_size, void *buf) { - return 0; // FIXME + struct PendingMessage *pm; + char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE]; + struct GetMessage *gm; + GNUNET_HashCode *ext; + size_t msize; + unsigned int k; + int no_route; + uint32_t bm; + uint32_t prio; + size_t bf_size; + + k = 0; + bm = 0; + if (GNUNET_YES != do_route) + { + bm |= GET_MESSAGE_BIT_RETURN_TO; + k++; + } + if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type) + { + bm |= GET_MESSAGE_BIT_SKS_NAMESPACE; + k++; + } + if (GNUNET_YES == pr->has_target) + { + bm |= GET_MESSAGE_BIT_TRANSMIT_TO; + k++; + } + bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf); + msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode); + GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); + if (buf_size < msize) + return msize; + gm = (struct GetMessage*) lbuf; + gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); + gm->header.size = htons (msize); + gm->type = htonl (pr->type); + if (GNUNET_YES == do_route) + prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + pr->public_data.priority + 1); + else + prio = 0; + pr->public_data.priority -= prio; + gm->priority = htonl (prio); + gm->ttl = htonl (pr->ttl); + gm->filter_mutator = htonl(pr->mingle); + gm->hash_bitmap = htonl (bm); + gm->query = pr->query; + ext = (GNUNET_HashCode*) &gm[1]; + k = 0; + if (GNUNET_YES != do_route) + GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]); + if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type) + memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode)); + if (GNUNET_YES == pr->has_target) + GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]); + if (pr->bf != NULL) + GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, + (char*) &ext[k], + bf_size); + memcpy (buf, gm, msize); + return msize; +} + + +/** + * Iterator to free pending requests. + * + * @param cls closure, unused + * @param key current key code + * @param value value in the hash map (pending request) + * @return GNUNET_YES (we should continue to iterate) + */ +static int +clean_request (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct GSF_PendingRequest *pr = value; + + GNUNET_free_non_null (pr->replies_seen); + if (NULL != pr->bf) + GNUNET_CONTAINER_bloomfilter_free (pr->bf); + GNUNET_free (pr); + return GNUNET_YES; } @@ -166,6 +434,12 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, void GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr) { + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (pr_map, + &pr->public_data.query, + pr)); + GNUNET_assert (GNUNET_YES == + clean_request (NULL, &pr->public_data.query, pr)); } @@ -176,10 +450,369 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr) * @param cls closure for it */ void -GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, - void *cls) +GSF_iterate_pending_pr_map_ (GSF_PendingRequestIterator it, + void *cls) +{ + GNUNET_CONTAINER_multihashmap_iterate (pr_map, + (GNUNET_CONTAINER_HashMapIterator) it, + cls); +} + + + + +/** + * Closure for "process_reply" function. + */ +struct ProcessReplyClosure +{ + /** + * The data for the reply. + */ + const void *data; + + /** + * Who gave us this reply? NULL for local host (or DHT) + */ + struct ConnectedPeer *sender; + + /** + * When the reply expires. + */ + struct GNUNET_TIME_Absolute expiration; + + /** + * Size of data. + */ + size_t size; + + /** + * Type of the block. + */ + enum GNUNET_BLOCK_Type type; + + /** + * How much was this reply worth to us? + */ + uint32_t priority; + + /** + * Anonymity requirements for this reply. + */ + uint32_t anonymity_level; + + /** + * Evaluation result (returned). + */ + enum GNUNET_BLOCK_EvaluationResult eval; + + /** + * Did we finish processing the associated request? + */ + int finished; + + /** + * Did we find a matching request? + */ + int request_found; +}; + + +/** + * Update the performance data for the sender (if any) since + * the sender successfully answered one of our queries. + * + * @param prq information about the sender + * @param pr request that was satisfied + */ +static void +update_request_performance_data (struct ProcessReplyClosure *prq, + struct GSF_PendingRequest *pr) +{ + unsigned int i; + struct GNUNET_TIME_Relative cur_delay; + + if (prq->sender == NULL) + return; + /* FIXME: adapt code to new API... */ + for (i=0;iused_targets_off;i++) + if (pr->used_targets[i].pid == prq->sender->pid) + break; + if (i < pr->used_targets_off) + { + cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time); + prq->sender->avg_delay.rel_value + = (prq->sender->avg_delay.rel_value * + (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; + prq->sender->avg_priority + = (prq->sender->avg_priority * + (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N; + } + if (pr->cp != NULL) + { + GNUNET_PEER_change_rc (prq->sender->last_p2p_replies + [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], + -1); + GNUNET_PEER_change_rc (pr->cp->pid, 1); + prq->sender->last_p2p_replies + [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE] + = pr->cp->pid; + } + else + { + if (NULL != prq->sender->last_client_replies + [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]) + GNUNET_SERVER_client_drop (prq->sender->last_client_replies + [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]); + prq->sender->last_client_replies + [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE] + = pr->client_request_list->client_list->client; + GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client); + } +} + + + +/** + * We have received a reply; handle it! + * + * @param cls response (struct ProcessReplyClosure) + * @param key our query + * @param value value in the hash map (info about the query) + * @return GNUNET_YES (we should continue to iterate) + */ +static int +process_reply (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct ProcessReplyClosure *prq = cls; + struct GSF_PendingRequest *pr = value; + struct PendingMessage *reply; + struct ClientResponseMessage *creply; + struct ClientList *cl; + struct PutMessage *pm; + struct ConnectedPeer *cp; + size_t msize; + +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Matched result (type %u) for query `%s' with pending request\n", + (unsigned int) prq->type, + GNUNET_h2s (key)); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# replies received and matched"), + 1, + GNUNET_NO); + prq->eval = GNUNET_BLOCK_evaluate (block_ctx, + prq->type, + key, + &pr->bf, + pr->mingle, + pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0, + prq->data, + prq->size); + switch (prq->eval) + { + case GNUNET_BLOCK_EVALUATION_OK_MORE: + update_request_performance_data (prq, pr); + break; + case GNUNET_BLOCK_EVALUATION_OK_LAST: + update_request_performance_data (prq, pr); + /* FIXME: adapt code to new API! */ + while (NULL != pr->pending_head) + destroy_pending_message_list_entry (pr->pending_head); + if (pr->qe != NULL) + { + if (pr->client_request_list != NULL) + GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, + GNUNET_YES); + GNUNET_DATASTORE_cancel (pr->qe); + pr->qe = NULL; + } + pr->do_remove = GNUNET_YES; + if (pr->task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (pr->task); + pr->task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (query_request_map, + key, + pr)); + GNUNET_LOAD_update (rt_entry_lifetime, + GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value); + break; + case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: + GNUNET_STATISTICS_update (stats, + gettext_noop ("# duplicate replies discarded (bloomfilter)"), + 1, + GNUNET_NO); +#if DEBUG_FS && 0 + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Duplicate response `%s', discarding.\n", + GNUNET_h2s (&mhash)); +#endif + return GNUNET_YES; /* duplicate */ + case GNUNET_BLOCK_EVALUATION_RESULT_INVALID: + return GNUNET_YES; /* wrong namespace */ + case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: + GNUNET_break (0); + return GNUNET_YES; + case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: + GNUNET_break (0); + return GNUNET_YES; + case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Unsupported block type %u\n"), + prq->type); + return GNUNET_NO; + } + /* FIXME: adapt code to new API! */ + if (pr->client_request_list != NULL) + { + if (pr->replies_seen_size == pr->replies_seen_off) + GNUNET_array_grow (pr->replies_seen, + pr->replies_seen_size, + pr->replies_seen_size * 2 + 4); + GNUNET_CRYPTO_hash (prq->data, + prq->size, + &pr->replies_seen[pr->replies_seen_off++]); + refresh_bloomfilter (pr); + } + if (NULL == prq->sender) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found result for query `%s' in local datastore\n", + GNUNET_h2s (key)); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# results found locally"), + 1, + GNUNET_NO); + } + prq->priority += pr->remaining_priority; + pr->remaining_priority = 0; + pr->results_found++; + prq->request_found = GNUNET_YES; + /* finally, pass on to other peers / local clients */ + pr->rh (pr->rh_cls, pr, prq->data, prq->size); + return GNUNET_YES; +} + + +/** + * Continuation called to notify client about result of the + * operation. + * + * @param cls closure + * @param success GNUNET_SYSERR on failure + * @param msg NULL on success, otherwise an error message + */ +static void +put_migration_continuation (void *cls, + int success, + const char *msg) +{ + struct GNUNET_TIME_Absolute *start = cls; + struct GNUNET_TIME_Relative delay; + + delay = GNUNET_TIME_absolute_get_duration (*start); + GNUNET_free (start); + /* FIXME: should we really update the load value on failure? */ + GNUNET_LOAD_update (datastore_put_load, + delay.rel_value); + if (GNUNET_OK == success) + return; + GNUNET_STATISTICS_update (stats, + gettext_noop ("# datastore 'put' failures"), + 1, + GNUNET_NO); +} + + +/** + * Test if the DATABASE (PUT) load on this peer is too high + * to even consider processing the query at + * all. + * + * @return GNUNET_YES if the load is too high to do anything (load high) + * GNUNET_NO to process normally (load normal or low) + */ +static int +test_put_load_too_high (uint32_t priority) +{ + double ld; + + if (GNUNET_LOAD_get_average (datastore_put_load) < 50) + return GNUNET_NO; /* very fast */ + ld = GNUNET_LOAD_get_load (datastore_put_load); + if (ld < 2.0 * (1 + priority)) + return GNUNET_NO; + GNUNET_STATISTICS_update (stats, + gettext_noop ("# storage requests dropped due to high load"), + 1, + GNUNET_NO); + return GNUNET_YES; +} + + +/** + * Iterator called on each result obtained for a DHT + * operation that expects a reply + * + * @param cls closure + * @param exp when will this value expire + * @param key key of the result + * @param get_path NULL-terminated array of pointers + * to the peers on reverse GET path (or NULL if not recorded) + * @param put_path NULL-terminated array of pointers + * to the peers on the PUT path (or NULL if not recorded) + * @param type type of the result + * @param size number of bytes in data + * @param data pointer to the result data + */ +void +GSF_handle_dht_reply_ (void *cls, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + const struct GNUNET_PeerIdentity * const *get_path, + const struct GNUNET_PeerIdentity * const *put_path, + enum GNUNET_BLOCK_Type type, + size_t size, + const void *data) { - // FIXME + struct GSF_PendingRequest *pr = cls; + struct ProcessReplyClosure prq; + + memset (&prq, 0, sizeof (prq)); + prq.data = data; + prq.expiration = exp; + prq.size = size; + prq.type = type; + process_reply (&prq, key, pr); + if ( (GNUNET_YES == active_to_migration) && + (GNUNET_NO == test_put_load_too_high (prq.priority)) ) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Replicating result for query `%s' with priority %u\n", + GNUNET_h2s (&query), + prq.priority); +#endif + start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); + *start = GNUNET_TIME_absolute_get (); + GNUNET_DATASTORE_put (dsh, + 0, &query, dsize, &put[1], + type, prq.priority, 1 /* anonymity */, + expiration, + 1 + prq.priority, MAX_DATASTORE_QUEUE, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + &put_migration_continuation, + start); + } } @@ -189,18 +822,106 @@ GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, * this content and possibly passes it on (to local clients or other * peers). Does NOT perform migration (content caching at this peer). * - * @param other the other peer involved (sender or receiver, NULL + * @param cp the other peer involved (sender or receiver, NULL * for loopback messages where we are both sender and receiver) * @param message the actual message - * @return how valueable was the content to us (0 for not at all), + * @return GNUNET_OK if the message was well-formed, * GNUNET_SYSERR if the message was malformed (close connection, * do not cache under any circumstances) */ int -GSF_handle_p2p_content_ (const struct GNUNET_PeerIdentity *other, +GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, const struct GNUNET_MessageHeader *message) { - return GNUNET_SYSERR; // FIXME + const struct PutMessage *put; + uint16_t msize; + size_t dsize; + enum GNUNET_BLOCK_Type type; + struct GNUNET_TIME_Absolute expiration; + GNUNET_HashCode query; + struct ProcessReplyClosure prq; + struct GNUNET_TIME_Relative block_time; + double putl; + struct GNUNET_TIME_Absolute *start; + + msize = ntohs (message->size); + if (msize < sizeof (struct PutMessage)) + { + GNUNET_break_op(0); + return GNUNET_SYSERR; + } + put = (const struct PutMessage*) message; + dsize = msize - sizeof (struct PutMessage); + type = ntohl (put->type); + expiration = GNUNET_TIME_absolute_ntoh (put->expiration); + if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) + return GNUNET_SYSERR; + if (GNUNET_OK != + GNUNET_BLOCK_get_key (block_ctx, + type, + &put[1], + dsize, + &query)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + /* now, lookup 'query' */ + prq.data = (const void*) &put[1]; + if (NULL != cp) + prq.sender = cp; + else + prq.sender = NULL; + prq.size = dsize; + prq.type = type; + prq.expiration = expiration; + prq.priority = 0; + prq.anonymity_level = 1; + prq.finished = GNUNET_NO; + prq.request_found = GNUNET_NO; + GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, + &query, + &process_reply, + &prq); + if (NULL != cp) + { + GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority); + GSF_get_peer_performance_data (cp)->trust += prq.priority; + } + if ( (GNUNET_YES == active_to_migration) && + (GNUNET_NO == test_put_load_too_high (prq.priority)) ) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Replicating result for query `%s' with priority %u\n", + GNUNET_h2s (&query), + prq.priority); +#endif + start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); + *start = GNUNET_TIME_absolute_get (); + GNUNET_DATASTORE_put (dsh, + 0, &query, dsize, &put[1], + type, prq.priority, 1 /* anonymity */, + expiration, + 1 + prq.priority, MAX_DATASTORE_QUEUE, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + &put_migration_continuation, + start); + } + putl = GNUNET_LOAD_get_load (datastore_put_load); + if ( (NULL != (cp = prq.sender)) && + (GNUNET_NO == prq.request_found) && + ( (GNUNET_YES != active_to_migration) || + (putl > 2.5 * (1 + prq.priority)) ) ) + { + if (GNUNET_YES != active_to_migration) + putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5); + block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + (unsigned int) (60000 * putl * putl))); + GSF_block_peer_migration (cp, block_time); + } + return GNUNET_OK; } @@ -210,7 +931,7 @@ GSF_handle_p2p_content_ (const struct GNUNET_PeerIdentity *other, void GSF_pending_request_init_ () { - // FIXME + pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024); } @@ -220,7 +941,11 @@ GSF_pending_request_init_ () void GSF_pending_request_done_ () { - // FIXME + GNUNET_CONTAINER_multihashmap_iterate (pr_map, + &clean_request, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (pr_map); + pr_map = NULL; } diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h index 5fb9d2a5a..88c650042 100644 --- a/src/fs/gnunet-service-fs_pr.h +++ b/src/fs/gnunet-service-fs_pr.h @@ -68,7 +68,9 @@ enum GSF_PendingRequestOptions /** - * Public data associated with each pending request. + * Public data (in the sense of not encapsulated within + * 'gnunet-service-fs_pr', not in the sense of network-wide + * known) associated with each pending request. */ struct GSF_PendingRequestData { @@ -184,37 +186,19 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, unsigned int replies_seen_count); -/** - * Get the query for a given pending request. - * - * @param pr the request - * @return pointer to the query (only valid as long as pr is valid) - */ -const GNUNET_HashCode * -GSF_pending_request_get_query_ (const struct GSF_PendingRequest *pr); - - -/** - * Get the type of a given pending request. - * - * @param pr the request - * @return query type - */ -enum GNUNET_BLOCK_Type -GSF_pending_request_get_type_ (const struct GSF_PendingRequest *pr); - - /** * Generate the message corresponding to the given pending request for * transmission to other peers (or at least determine its size). * * @param pr request to generate the message for + * @param do_route are we routing the reply * @param buf_size number of bytes available in buf * @param buf where to copy the message (can be NULL) - * @return number of bytes needed (if > buf_size) or used + * @return number of bytes needed (if buf_size too small) or used */ size_t GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, + int do_route, size_t buf_size, void *buf); @@ -230,10 +214,12 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr); /** * Signature of function called on each request. + * (Note: 'subtype' of GNUNET_CONTAINER_HashMapIterator). * * @param cls closure * @param key query for the request * @param pr handle to the pending request + * @return GNUNET_YES to continue to iterate */ typedef int (*GSF_PendingRequestIterator)(void *cls, const GNUNET_HashCode *key, @@ -257,18 +243,44 @@ GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, * this content and possibly passes it on (to local clients or other * peers). Does NOT perform migration (content caching at this peer). * - * @param other the other peer involved (sender or receiver, NULL + * @param cp the other peer involved (sender or receiver, NULL * for loopback messages where we are both sender and receiver) * @param message the actual message - * @return how valueable was the content to us (0 for not at all), + * @return GNUNET_OK if the message was well-formed, * GNUNET_SYSERR if the message was malformed (close connection, * do not cache under any circumstances) */ int -GSF_handle_p2p_content_ (const struct GNUNET_PeerIdentity *other, +GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, const struct GNUNET_MessageHeader *message); +/** + * Iterator called on each result obtained for a DHT + * operation that expects a reply + * + * @param cls closure, the 'struct GSF_PendingRequest *'. + * @param exp when will this value expire + * @param key key of the result + * @param get_path NULL-terminated array of pointers + * to the peers on reverse GET path (or NULL if not recorded) + * @param put_path NULL-terminated array of pointers + * to the peers on the PUT path (or NULL if not recorded) + * @param type type of the result + * @param size number of bytes in data + * @param data pointer to the result data + */ +void +GSF_handle_dht_reply_ (void *cls, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + const struct GNUNET_PeerIdentity * const *get_path, + const struct GNUNET_PeerIdentity * const *put_path, + enum GNUNET_BLOCK_Type type, + size_t size, + const void *data); + + /** * Setup the subsystem. */ diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c new file mode 100644 index 000000000..9f515e2ee --- /dev/null +++ b/src/fs/gnunet-service-fs_push.c @@ -0,0 +1,475 @@ +/* + This file is part of GNUnet. + (C) 2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file fs/gnunet-service-fs_push.c + * @brief API to push content from our datastore to other peers + * ('anonymous'-content P2P migration) + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet-service-fs_push.h" + + +/* FIXME: below are only old code fragments to use... */ + +/** + * Block that is ready for migration to other peers. Actual data is at the end of the block. + */ +struct MigrationReadyBlock +{ + + /** + * This is a doubly-linked list. + */ + struct MigrationReadyBlock *next; + + /** + * This is a doubly-linked list. + */ + struct MigrationReadyBlock *prev; + + /** + * Query for the block. + */ + GNUNET_HashCode query; + + /** + * When does this block expire? + */ + struct GNUNET_TIME_Absolute expiration; + + /** + * Peers we would consider forwarding this + * block to. Zero for empty entries. + */ + GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE]; + + /** + * Size of the block. + */ + size_t size; + + /** + * Number of targets already used. + */ + unsigned int used_targets; + + /** + * Type of the block. + */ + enum GNUNET_BLOCK_Type type; +}; + + +/** + * Head of linked list of blocks that can be migrated. + */ +static struct MigrationReadyBlock *mig_head; + +/** + * Tail of linked list of blocks that can be migrated. + */ +static struct MigrationReadyBlock *mig_tail; + +/** + * Request to datastore for migration (or NULL). + */ +static struct GNUNET_DATASTORE_QueueEntry *mig_qe; + +/** + * ID of task that collects blocks for migration. + */ +static GNUNET_SCHEDULER_TaskIdentifier mig_task; + +/** + * What is the maximum frequency at which we are allowed to + * poll the datastore for migration content? + */ +static struct GNUNET_TIME_Relative min_migration_delay; + +/** + * Are we allowed to push out content from this peer. + */ +static int active_from_migration; + +/** + * Size of the doubly-linked list of migration blocks. + */ +static unsigned int mig_size; + + +/** + * Delete the given migration block. + * + * @param mb block to delete + */ +static void +delete_migration_block (struct MigrationReadyBlock *mb) +{ + GNUNET_CONTAINER_DLL_remove (mig_head, + mig_tail, + mb); + GNUNET_PEER_decrement_rcs (mb->target_list, + MIGRATION_LIST_SIZE); + mig_size--; + GNUNET_free (mb); +} + + +/** + * Compare the distance of two peers to a key. + * + * @param key key + * @param p1 first peer + * @param p2 second peer + * @return GNUNET_YES if P1 is closer to key than P2 + */ +static int +is_closer (const GNUNET_HashCode *key, + const struct GNUNET_PeerIdentity *p1, + const struct GNUNET_PeerIdentity *p2) +{ + return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey, + &p2->hashPubKey, + key); +} + + +/** + * Consider migrating content to a given peer. + * + * @param cls 'struct MigrationReadyBlock*' to select + * targets for (or NULL for none) + * @param key ID of the peer + * @param value 'struct ConnectedPeer' of the peer + * @return GNUNET_YES (always continue iteration) + */ +static int +consider_migration (void *cls, + const GNUNET_HashCode *key, + void *value) +{ + struct MigrationReadyBlock *mb = cls; + struct ConnectedPeer *cp = value; + struct MigrationReadyBlock *pos; + struct GNUNET_PeerIdentity cppid; + struct GNUNET_PeerIdentity otherpid; + struct GNUNET_PeerIdentity worstpid; + size_t msize; + unsigned int i; + unsigned int repl; + + /* consider 'cp' as a migration target for mb */ + if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0) + return GNUNET_YES; /* peer has requested no migration! */ + if (mb != NULL) + { + GNUNET_PEER_resolve (cp->pid, + &cppid); + repl = MIGRATION_LIST_SIZE; + for (i=0;itarget_list[i] == 0) + { + mb->target_list[i] = cp->pid; + GNUNET_PEER_change_rc (mb->target_list[i], 1); + repl = MIGRATION_LIST_SIZE; + break; + } + GNUNET_PEER_resolve (mb->target_list[i], + &otherpid); + if ( (repl == MIGRATION_LIST_SIZE) && + is_closer (&mb->query, + &cppid, + &otherpid)) + { + repl = i; + worstpid = otherpid; + } + else if ( (repl != MIGRATION_LIST_SIZE) && + (is_closer (&mb->query, + &worstpid, + &otherpid) ) ) + { + repl = i; + worstpid = otherpid; + } + } + if (repl != MIGRATION_LIST_SIZE) + { + GNUNET_PEER_change_rc (mb->target_list[repl], -1); + mb->target_list[repl] = cp->pid; + GNUNET_PEER_change_rc (mb->target_list[repl], 1); + } + } + + /* consider scheduling transmission to cp for content migration */ + if (cp->cth != NULL) + return GNUNET_YES; + msize = 0; + pos = mig_head; + while (pos != NULL) + { + for (i=0;ipid == pos->target_list[i]) + { + if (msize == 0) + msize = pos->size; + else + msize = GNUNET_MIN (msize, + pos->size); + break; + } + } + pos = pos->next; + } + if (msize == 0) + return GNUNET_YES; /* no content available */ +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Trying to migrate at least %u bytes to peer `%s'\n", + msize, + GNUNET_h2s (key)); +#endif + if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task); + cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; + } + cp->cth + = GNUNET_CORE_notify_transmit_ready (core, + 0, GNUNET_TIME_UNIT_FOREVER_REL, + (const struct GNUNET_PeerIdentity*) key, + msize + sizeof (struct PutMessage), + &transmit_to_peer, + cp); + return GNUNET_YES; +} + + +/** + * Task that is run periodically to obtain blocks for content + * migration + * + * @param cls unused + * @param tc scheduler context (also unused) + */ +static void +gather_migration_blocks (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + + + + +/** + * If the migration task is not currently running, consider + * (re)scheduling it with the appropriate delay. + */ +static void +consider_migration_gathering () +{ + struct GNUNET_TIME_Relative delay; + + if (dsh == NULL) + return; + if (mig_qe != NULL) + return; + if (mig_task != GNUNET_SCHEDULER_NO_TASK) + return; + delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + mig_size); + delay = GNUNET_TIME_relative_divide (delay, + MAX_MIGRATION_QUEUE); + delay = GNUNET_TIME_relative_max (delay, + min_migration_delay); + mig_task = GNUNET_SCHEDULER_add_delayed (delay, + &gather_migration_blocks, + NULL); +} + + + + +/** + * Process content offered for migration. + * + * @param cls closure + * @param key key for the content + * @param size number of bytes in data + * @param data content stored + * @param type type of the content + * @param priority priority of the content + * @param anonymity anonymity-level for the content + * @param expiration expiration time for the content + * @param uid unique identifier for the datum; + * maybe 0 if no unique identifier is available + */ +static void +process_migration_content (void *cls, + const GNUNET_HashCode * key, + size_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute + expiration, uint64_t uid) +{ + struct MigrationReadyBlock *mb; + + if (key == NULL) + { + mig_qe = NULL; + if (mig_size < MAX_MIGRATION_QUEUE) + consider_migration_gathering (); + return; + } + if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < + MIN_MIGRATION_CONTENT_LIFETIME.rel_value) + { + /* content will expire soon, don't bother */ + GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + return; + } + if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) + { + if (GNUNET_OK != + GNUNET_FS_handle_on_demand_block (key, size, data, + type, priority, anonymity, + expiration, uid, + &process_migration_content, + NULL)) + { + GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + } + return; + } +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Retrieved block `%s' of type %u for migration\n", + GNUNET_h2s (key), + type); +#endif + mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size); + mb->query = *key; + mb->expiration = expiration; + mb->size = size; + mb->type = type; + memcpy (&mb[1], data, size); + GNUNET_CONTAINER_DLL_insert_after (mig_head, + mig_tail, + mig_tail, + mb); + mig_size++; + GNUNET_CONTAINER_multihashmap_iterate (connected_peers, + &consider_migration, + mb); + GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); +} + + + +/** + * Task that is run periodically to obtain blocks for content + * migration + * + * @param cls unused + * @param tc scheduler context (also unused) + */ +static void +gather_migration_blocks (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + mig_task = GNUNET_SCHEDULER_NO_TASK; + if (dsh != NULL) + { + mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_migration_content, NULL); + GNUNET_assert (mig_qe != NULL); + } +} + + + +size_t +API_ (void *cls, + size_t size, void *buf) +{ + next = mig_head; + while (NULL != (mb = next)) + { + next = mb->next; + for (i=0;ipid == mb->target_list[i]) && + (mb->size + sizeof (migm) <= size) ) + { + GNUNET_PEER_change_rc (mb->target_list[i], -1); + mb->target_list[i] = 0; + mb->used_targets++; + memset (&migm, 0, sizeof (migm)); + migm.header.size = htons (sizeof (migm) + mb->size); + migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); + migm.type = htonl (mb->type); + migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration); + memcpy (&cbuf[msize], &migm, sizeof (migm)); + msize += sizeof (migm); + size -= sizeof (migm); + memcpy (&cbuf[msize], &mb[1], mb->size); + msize += mb->size; + size -= mb->size; +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Pushing migration block `%s' (%u bytes) to `%s'\n", + GNUNET_h2s (&mb->query), + (unsigned int) mb->size, + GNUNET_i2s (&pid)); +#endif + break; + } + else + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n", + GNUNET_h2s (&mb->query), + (unsigned int) mb->size, + GNUNET_i2s (&pid)); +#endif + } + } + if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) || + (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) ) + { + delete_migration_block (mb); + consider_migration_gathering (); + } + } + consider_migration (NULL, + &pid.hashPubKey, + cp); + +} + + + diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c new file mode 100644 index 000000000..eb7289f1e --- /dev/null +++ b/src/fs/gnunet-service-fs_put.c @@ -0,0 +1,197 @@ +/* + This file is part of GNUnet. + (C) 2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file fs/gnunet-service-fs_put.c + * @brief API to PUT zero-anonymity index data from our datastore into the DHT + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet-service-fs_put.h" + +/* FIXME: below are only old code fragments to use... */ + + +/** + * Request to datastore for DHT PUTs (or NULL). + */ +static struct GNUNET_DATASTORE_QueueEntry *dht_qe; + + +/** + * Type we will request for the next DHT PUT round from the datastore. + */ +static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; + +/** + * ID of task that collects blocks for DHT PUTs. + */ +static GNUNET_SCHEDULER_TaskIdentifier dht_task; + +/** + * How many entires with zero anonymity do we currently estimate + * to have in the database? + */ +static unsigned int zero_anonymity_count_estimate; + + + + + +/** + * Task that is run periodically to obtain blocks for DHT PUTs. + * + * @param cls type of blocks to gather + * @param tc scheduler context (unused) + */ +static void +gather_dht_put_blocks (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + + + +/** + * If the DHT PUT gathering task is not currently running, consider + * (re)scheduling it with the appropriate delay. + */ +static void +consider_dht_put_gathering (void *cls) +{ + struct GNUNET_TIME_Relative delay; + + if (dsh == NULL) + return; + if (dht_qe != NULL) + return; + if (dht_task != GNUNET_SCHEDULER_NO_TASK) + return; + if (zero_anonymity_count_estimate > 0) + { + delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY, + zero_anonymity_count_estimate); + delay = GNUNET_TIME_relative_min (delay, + MAX_DHT_PUT_FREQ); + } + else + { + /* if we have NO zero-anonymity content yet, wait 5 minutes for some to + (hopefully) appear */ + delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5); + } + dht_task = GNUNET_SCHEDULER_add_delayed (delay, + &gather_dht_put_blocks, + cls); +} + + + +/** + * Store content in DHT. + * + * @param cls closure + * @param key key for the content + * @param size number of bytes in data + * @param data content stored + * @param type type of the content + * @param priority priority of the content + * @param anonymity anonymity-level for the content + * @param expiration expiration time for the content + * @param uid unique identifier for the datum; + * maybe 0 if no unique identifier is available + */ +static void +process_dht_put_content (void *cls, + const GNUNET_HashCode * key, + size_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute + expiration, uint64_t uid) +{ + static unsigned int counter; + static GNUNET_HashCode last_vhash; + static GNUNET_HashCode vhash; + + if (key == NULL) + { + dht_qe = NULL; + consider_dht_put_gathering (cls); + return; + } + /* slightly funky code to estimate the total number of values with zero + anonymity from the maximum observed length of a monotonically increasing + sequence of hashes over the contents */ + GNUNET_CRYPTO_hash (data, size, &vhash); + if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0) + { + if (zero_anonymity_count_estimate > 0) + zero_anonymity_count_estimate /= 2; + counter = 0; + } + last_vhash = vhash; + if (counter < 31) + counter++; + if (zero_anonymity_count_estimate < (1 << counter)) + zero_anonymity_count_estimate = (1 << counter); +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Retrieved block `%s' of type %u for DHT PUT\n", + GNUNET_h2s (key), + type); +#endif + GNUNET_DHT_put (dht_handle, + key, + DEFAULT_PUT_REPLICATION, + GNUNET_DHT_RO_NONE, + type, + size, + data, + expiration, + GNUNET_TIME_UNIT_FOREVER_REL, + &dht_put_continuation, + cls); +} + + + +/** + * Task that is run periodically to obtain blocks for DHT PUTs. + * + * @param cls type of blocks to gather + * @param tc scheduler context (unused) + */ +static void +gather_dht_put_blocks (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + dht_task = GNUNET_SCHEDULER_NO_TASK; + if (dsh != NULL) + { + if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) + dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; + dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX, + GNUNET_TIME_UNIT_FOREVER_REL, + dht_put_type++, + &process_dht_put_content, NULL); + GNUNET_assert (dht_qe != NULL); + } +} -- 2.25.1