From: Christian Grothoff Date: Tue, 15 Feb 2011 13:07:14 +0000 (+0000) Subject: stuff X-Git-Tag: initial-import-from-subversion-38251~19133 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=e3d12cb6fa5ddfb181dcade2e06888619f384457;p=oweals%2Fgnunet.git stuff --- diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index 903549cb7..f9a642199 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -33,7 +33,6 @@ #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) - /** * Handle to cancel a transmission request. */ @@ -123,16 +122,26 @@ struct GSF_ConnectedPeer */ struct GSF_PeerTransmitHandle *pth_tail; + /** + * Migration stop message in our queue, or NULL if we have none pending. + */ + struct GSF_PeerTransmitHandle *migration_pth; + /** * Context of our GNUNET_CORE_peer_change_preference call (or NULL). * NULL if we have successfully reserved 32k, otherwise non-NULL. */ struct GNUNET_CORE_InformationRequestContext *irc; + /** + * Active requests from this neighbour. + */ + struct GNUNET_CONTAINER_MulitHashMap *request_map; + /** * ID of delay task for scheduling transmission. */ - GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: unused! + GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: used in 'push' (ugh!) /** * Increase in traffic preference still to be submitted @@ -282,12 +291,12 @@ peer_transmit_ready_cb (void *cls, GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth); - if (pth->is_query) + if (GNUNET_YES == pth->is_query) { cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get (); GNUNET_assert (0 < cp->ppd.pending_queries--); } - else + else if (GNUNET_NO == pth->is_query) { GNUNET_assert (0 < cp->ppd.pending_replies--); } @@ -389,6 +398,7 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust)))) cp->disk_trust = cp->trust = ntohl (trust); GNUNET_free (fn); + cp->request_map = GNUNET_CONTAINER_multihashmap_create (128); GNUNET_break (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (cp_map, &peer->hashPubKey, @@ -442,7 +452,8 @@ GSF_handle_p2p_migration_stop_ (void *cls, * and will also not be called anymore after a call signalling * expiration. * - * @param cls user-specified closure + * @param cls 'struct GSF_ConnectedPeer' of the peer that would + * have liked an answer to the request * @param pr handle to the original pending request * @param data response data, NULL on request expiration * @param data_len number of bytes in data @@ -453,12 +464,22 @@ handle_p2p_reply (void *cls, const void *data, size_t data_len) { + struct GSF_ConnectedPeer *cp = cls; + #if SUPPORT_DELAYS struct GNUNET_TIME_Relative art_delay; #endif /* FIXME: adapt code fragments below to new API! */ - + if (NULL == data) + { + /* FIXME: request expired! clean up! */ + GNUNET_STATISTICS_update (stats, + gettext_noop ("# P2P searches active"), + -1, + GNUNET_NO); + return; + } /* reply will go over the network, check for cover traffic */ if ( (prq->anonymity_level > 1) && @@ -515,9 +536,11 @@ handle_p2p_reply (void *cls, } - /** - * Handle P2P "QUERY" message. + * Handle P2P "QUERY" message. Creates the pending request entry + * and sets up all of the data structures to that we will + * process replies properly. Does not initiate forwarding or + * local database lookups. * * @param other the other peer involved (sender or receiver, NULL * for loopback messages where we are both sender and receiver) @@ -528,11 +551,13 @@ struct GSF_PendingRequest * GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, const struct GNUNET_MessageHeader *message) { - /* FIXME: adapt old code to new API! */ - struct PendingRequest *pr; - struct ConnectedPeer *cp; - struct ConnectedPeer *cps; - struct CheckDuplicateRequestClosure cdc; + struct GSF_PendingRequest *pr; + struct GSF_PendingRequestData *prd; + struct GSF_ConnectedPeer *cp; + struct GSF_ConnectedPeer *cps; + GNUNET_HashCode *namespace; + struct GNUNET_PeerIdentity *target; + enum GSF_PendingRequestOptions options; struct GNUNET_TIME_Relative timeout; uint16_t msize; const struct GetMessage *gm; @@ -542,8 +567,9 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, size_t bfsize; uint32_t ttl_decrement; int32_t priority; + int32_t ttl; enum GNUNET_BLOCK_Type type; - int have_ns; + msize = ntohs(message->size); if (msize < sizeof (struct GetMessage)) @@ -615,7 +641,6 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, gettext_noop ("# requests dropped due to missing reverse route"), 1, GNUNET_NO); - /* FIXME: try connect? */ return GNUNET_OK; } /* note that we can really only check load here since otherwise @@ -639,14 +664,9 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, GNUNET_i2s (other), (unsigned int) bm); #endif - have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)); - pr = GNUNET_malloc (sizeof (struct PendingRequest) + - (have_ns ? sizeof(GNUNET_HashCode) : 0)); - if (have_ns) - { - pr->namespace = (GNUNET_HashCode*) &pr[1]; - memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode)); - } + namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL; + target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL; + options = 0; if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) || (GNUNET_LOAD_get_average (cp->transmission_delay) > GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) ) @@ -654,28 +674,21 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, /* don't have BW to send to peer, or would likely take longer than we have for it, so at best indirect the query */ priority = 0; - pr->forward_only = GNUNET_YES; + options |= GSF_PRO_FORWARD_ONLY; } - pr->type = type; - pr->mingle = ntohl (gm->filter_mutator); - if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) - pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]); - pr->anonymity_level = 1; - pr->priority = (uint32_t) priority; - pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority); - pr->query = gm->query; + ttl = bound_ttl (ntohl (gm->ttl), pr->priority); /* decrement ttl (always) */ ttl_decrement = 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, TTL_DECREMENT); - if ( (pr->ttl < 0) && - (((int32_t)(pr->ttl - ttl_decrement)) > 0) ) + if ( (ttl < 0) && + (((int32_t)(ttl - ttl_decrement)) > 0) ) { #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Dropping query from `%s' due to TTL underflow (%d - %u).\n", GNUNET_i2s (other), - pr->ttl, + ttl, ttl_decrement); #endif GNUNET_STATISTICS_update (stats, @@ -683,74 +696,66 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, 1, GNUNET_NO); /* integer underflow => drop (should be very rare)! */ - GNUNET_free (pr); return GNUNET_OK; } - pr->ttl -= ttl_decrement; - pr->start_time = GNUNET_TIME_absolute_get (); - - /* get bloom filter */ - if (bfsize > 0) - { - pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits], - bfsize, - BLOOMFILTER_K); - pr->bf_size = bfsize; - } - cdc.have = NULL; - cdc.pr = pr; - GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, - &gm->query, - &check_duplicate_request_peer, - &cdc); - if (cdc.have != NULL) - { - if (cdc.have->start_time.abs_value + cdc.have->ttl >= - pr->start_time.abs_value + pr->ttl) + ttl -= ttl_decrement; + + /* test if the request already exists */ + pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map, + &gm->query); + if (pr != NULL) + { + prd = GSF_pending_request_get_data_ (pr); + if ( (prd->type == type) && + ( (type != GNUNET_BLOCK_TYPE_SBLOCK) || + (0 == memcmp (prd->namespace, + namespace, + sizeof (GNUNET_HashCode))) ) ) { - /* existing request has higher TTL, drop new one! */ - cdc.have->priority += pr->priority; - destroy_pending_request (pr); + if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get().abs_value + ttl) + { + /* existing request has higher TTL, drop new one! */ + prd->priority += priority; #if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Have existing request with higher TTL, dropping new request.\n", - GNUNET_i2s (other)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Have existing request with higher TTL, dropping new request.\n", + GNUNET_i2s (other)); #endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# requests dropped due to higher-TTL request"), - 1, - GNUNET_NO); - return GNUNET_OK; - } - else - { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests dropped due to higher-TTL request"), + 1, + GNUNET_NO); + return GNUNET_OK; + } /* existing request has lower TTL, drop old one! */ - pr->priority += cdc.have->priority; - /* Possible optimization: if we have applicable pending - replies in 'cdc.have', we might want to move those over - (this is a really rare special-case, so it is not clear - that this would be worth it) */ - destroy_pending_request (cdc.have); - /* keep processing 'pr'! */ + pr->priority += prd->priority; + GSF_pending_request_cancel_ (pr); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (cp->request_map, + &gm->query, + pr)); } } - - pr->cp = cp; + + pr = GSF_pending_request_create (options, + type, + &gm->query, + namespace, + target, + (bf_size > 0) ? (const char*)&opt[bits] : NULL, + bf_size, + ntohl (gm->filter_mutator), + 1 /* anonymity */ + (uint32_t) priority, + ttl, + NULL, 0, /* replies_seen */ + &handle_p2p_reply, + cp); GNUNET_break (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (query_request_map, + GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query, pr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); - GNUNET_break (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (peer_request_map, - &other->hashPubKey, - pr, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); - - pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, - pr, - pr->start_time.abs_value + pr->ttl); - GNUNET_STATISTICS_update (stats, gettext_noop ("# P2P searches received"), 1, @@ -759,83 +764,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, gettext_noop ("# P2P searches active"), 1, GNUNET_NO); - - /* calculate change in traffic preference */ - cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE; - /* process locally */ - if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK) - type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */ - timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY, - (pr->priority + 1)); - if (GNUNET_YES != pr->forward_only) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Handing request for `%s' to datastore\n", - GNUNET_h2s (&gm->query)); -#endif - pr->qe = GNUNET_DATASTORE_get (dsh, - &gm->query, - type, - pr->priority + 1, - MAX_DATASTORE_QUEUE, - timeout, - &process_local_reply, - pr); - if (NULL == pr->qe) - { - GNUNET_STATISTICS_update (stats, - gettext_noop ("# requests dropped by datastore (queue length limit)"), - 1, - GNUNET_NO); - } - } - else - { - GNUNET_STATISTICS_update (stats, - gettext_noop ("# requests forwarded due to high load"), - 1, - GNUNET_NO); - } - - /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */ - switch (pr->type) - { - case GNUNET_BLOCK_TYPE_FS_DBLOCK: - case GNUNET_BLOCK_TYPE_FS_IBLOCK: - /* only one result, wait for datastore */ - if (GNUNET_YES != pr->forward_only) - { - GNUNET_STATISTICS_update (stats, - gettext_noop ("# requests not instantly forwarded (waiting for datastore)"), - 1, - GNUNET_NO); - break; - } - default: - if (pr->task == GNUNET_SCHEDULER_NO_TASK) - pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task, - pr); - } - - /* make sure we don't track too many requests */ - if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) - { - pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap); - GNUNET_assert (pr != NULL); - destroy_pending_request (pr); - } - return GNUNET_OK; - - - - // FIXME! - // parse request - // setup pending request (use 'handle_p2p_reply') - // track pending request to cancel it on peer disconnect (!) - // return it! - // (actual planning & execution up to caller!) - return NULL; + return pr; } @@ -858,9 +787,9 @@ peer_transmit_timeout (void *cls, GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth); - if (pth->is_query) + if (GNUNET_YES == pth->is_query) GNUNET_assert (0 < cp->ppd.pending_queries--); - else + else if (GNUNET_NO == pth->is_query) GNUNET_assert (0 < cp->ppd.pending_replies--); GNUNET_LOAD_update (cp->ppd.transmission_delay, UINT64_MAX); @@ -876,7 +805,7 @@ peer_transmit_timeout (void *cls, * the callback is invoked with a 'NULL' buffer. * * @param peer target peer - * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) + * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR) * @param priority how important is this request? * @param timeout when does this request timeout (call gmc with error) * @param size number of bytes we would like to send to the peer @@ -933,9 +862,10 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer, pth); GNUNET_PEER_resolve (cp->pid, &target); - if (is_query) + if (GNUNET_YES == is_query) { /* query, need reservation */ + cp->ppd.pending_queries++; if (NULL == cp->irc) { /* reservation already done! */ @@ -957,9 +887,15 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer, is_ready = GNUNET_NO; } } - else + else if (GNUNET_NO == is_query) { /* no reservation needed for content */ + cp->ppd.pending_replies++; + is_ready = GNUNET_YES; + } + else + { + /* not a query or content, no reservation needed */ is_ready = GNUNET_YES; } if (is_ready) @@ -1011,9 +947,9 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth) GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth); - if (pth->is_query) + if (GNUNET_YES == pth->is_query) GNUNET_assert (0 < cp->ppd.pending_queries--); - else + else if (GNUNET_NO == pth->is_query) GNUNET_assert (0 < cp->ppd.pending_replies--); GNUNET_free (pth); } @@ -1084,6 +1020,26 @@ GSF_peer_status_handler_ (void *cls, } +/** + * Cancel all requests associated with the peer. + * + * @param cls unused + * @param query hash code of the request + * @param value the 'struct GSF_PendingRequest' + * @return GNUNET_YES (continue to iterate) + */ +static int +cancel_pending_request (void *cls, + const GNUNET_HashCode *query, + void *value) +{ + struct GSF_PendingRequest *pr = value; + + GSF_pending_request_cancel_ (pr); + return GNUNET_OK; +} + + /** * A peer disconnected from us. Tear down the connected peer * record. @@ -1104,11 +1060,21 @@ GSF_peer_disconnect_handler_ (void *cls, GNUNET_CONTAINER_multihashmap_remove (cp_map, &peer->hashPubKey, cp); + if (NULL != cp->migration_pth) + { + GSF_peer_transmit_cancel_ (cp->migration_pth); + cp->migration_pth = NULL; + } if (NULL != cp->irc) { GNUNET_CORE_peer_change_preference_cancel (cp->irc); cp->irc = NULL; } + GNUNET_CONTAINER_multihashmap_iterate (cp->request_map, + &cancel_pending_request, + cp); + GNUNET_CONTAINER_multihashmap_destroy (cp->request_map); + cp->request_map = NULL; GSF_plan_notify_peer_disconnect_ (cp); GNUNET_LOAD_value_free (cp->ppd.transmission_delay); GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE); @@ -1205,6 +1171,34 @@ GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp, } +/** + * Assemble a migration stop message for transmission. + * + * @param cls the 'struct GSF_ConnectedPeer' to use + * @param size number of bytes we're allowed to write to buf + * @param buf where to copy the message + * @return number of bytes copied to buf + */ +static size_t +create_migration_stop_message (void *cls, + size_t size, + void *buf) +{ + struct GSF_ConnectedPeer *cp = cls; + struct MigrationStopMessage msm; + + cp->migration_pth = NULL; + if (NULL == buf) + return 0; + GNUNET_assert (size > sizeof (struct MigrationStopMessage)); + msm.header.size = htons (sizeof (struct MigrationStopMessage)); + msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); + msm.duration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block)); + memcpy (buf, &msm, sizeof (struct MigrationStopMessage)); + return sizeof (struct MigrationStopMessage); +} + + /** * Ask a peer to stop migrating data to us until the given point * in time. @@ -1216,30 +1210,22 @@ void GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, struct GNUNET_TIME_Relative block_time) { - struct PendingMessage *pm; - struct MigrationStopMessage *msm; - if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value) return; /* already blocked */ cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time); - - /* FIXME: adapt old code below to new API! */ - pm = GNUNET_malloc (sizeof (struct PendingMessage) + - sizeof (struct MigrationStopMessage)); - pm->msize = sizeof (struct MigrationStopMessage); - pm->priority = UINT32_MAX; - msm = (struct MigrationStopMessage*) &pm[1]; - msm->header.size = htons (sizeof (struct MigrationStopMessage)); - msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); - msm->duration = GNUNET_TIME_relative_hton (block_time); - add_to_pending_messages_for_peer (cp, - pm, - NULL); + if (cp->migration_pth != NULL) + GSF_peer_transmit_cancel_ (cp->migration_pth); + cp->migration_pth + = GSF_peer_transmit_ (cp, + GNUNET_SYSERR, + UINT32_MAX, + GNUNET_TIME_UNIT_FOREVER_REL, + sizeof (struct MigrationStopMessage), + &create_migration_stop_message, + cp); } - - /** * Write host-trust information to a file - flush the buffer entry! * diff --git a/src/fs/gnunet-service-fs_cp.h b/src/fs/gnunet-service-fs_cp.h index f08e31a72..bc561f792 100644 --- a/src/fs/gnunet-service-fs_cp.h +++ b/src/fs/gnunet-service-fs_cp.h @@ -257,7 +257,7 @@ GSF_handle_p2p_migration_stop_ (void *cls, * Handle P2P "QUERY" message. Only responsible for creating the * request entry itself and setting up reply callback and cancellation * on peer disconnect. Does NOT execute the actual request strategy - * (planning). + * (planning) or local database operations. * * @param other the other peer involved (sender or receiver, NULL * for loopback messages where we are both sender and receiver) diff --git a/src/fs/gnunet-service-fs_lc.c b/src/fs/gnunet-service-fs_lc.c index ea33580f9..469475fe0 100644 --- a/src/fs/gnunet-service-fs_lc.c +++ b/src/fs/gnunet-service-fs_lc.c @@ -341,7 +341,7 @@ GSF_local_client_start_search_handler_ (struct GNUNET_SERVER_Client *client, sizeof (GNUNET_HashCode))) ? &sm->target, : NULL, - NULL /* bf */, 0 /* mingle */, + NULL, 0, 0 /* bf */, ntohl (sm->anonymity_level), 0 /* priority */, &sm[1], sc, diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 047c07587..d2248989f 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -182,10 +182,12 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr) * @param query key for the lookup * @param namespace namespace to lookup, NULL for no namespace * @param target preferred target for the request, NULL for none - * @param bf bloom filter for known replies, can be NULL + * @param bf_data raw data for bloom filter for known replies, can be NULL + * @param bf_size number of bytes in bf_data * @param mingle mingle value for bf * @param anonymity_level desired anonymity level * @param priority maximum outgoing cummulative request priority to use + * @param ttl current time-to-live for the request * @param replies_seen hash codes of known local replies * @param replies_seen_count size of the 'replies_seen' array * @param rh handle to call when we get a reply @@ -198,10 +200,12 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, const GNUNET_HashCode *query, const GNUNET_HashCode *namespace, const struct GNUNET_PeerIdentity *target, - const struct GNUNET_CONTAINER_BloomFilter *bf, + const char *bf_data, + size_t bf_size, int32_t mingle, uint32_t anonymity_level, uint32_t priority, + int32_t ttl, const GNUNET_HashCode *replies_seen, unsigned int replies_seen_count, GSF_PendingRequestReplyHandler rh, @@ -226,8 +230,16 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, pr->public_data.priority = priority; pr->public_data.options = options; pr->public_data.type = type; + pr->public_data.start_time = GNUNET_TIME_absolute_get (); pr->rh = rh; pr->rh_cls = rh_cls; + if (ttl >= 0) + pr->ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + (uint32_t) ttl)); + else + pr->ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time, + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + (uint32_t) (- ttl))); if (replies_seen_count > 0) { pr->replies_seen_size = replies_seen_count; @@ -237,9 +249,11 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, replies_seen_count * sizeof (struct GNUNET_HashCode)); pr->replies_seen_count = replies_seen_count; } - if (NULL != bf) + if (NULL != bf_data) { - pr->bf = GNUNET_CONTAINER_bloomfilter_copy (bf); + pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data, + bf_size, + BLOOMFILTER_K); pr->mingle = mingle; } else if ( (replies_seen_count > 0) && @@ -254,10 +268,39 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, // FIXME: if not a local query, we also need to track the // total number of external queries we currently have and // bound it => need an additional heap! + + pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, + pr, + pr->start_time.abs_value + pr->ttl); + + + + /* make sure we don't track too many requests */ + if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) + { + pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap); + GNUNET_assert (pr != NULL); + destroy_pending_request (pr); + } + + return pr; } +/** + * Obtain the public data associated with a pending request + * + * @param pr pending request + * @return associated public data + */ +struct GSF_PendingRequestData * +GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr) +{ + return &pr->public_data; +} + + /** * Update a given pending request with additional replies * that have been seen. diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h index 88c650042..bb6920ab1 100644 --- a/src/fs/gnunet-service-fs_pr.h +++ b/src/fs/gnunet-service-fs_pr.h @@ -91,6 +91,16 @@ struct GSF_PendingRequestData */ struct GNUNET_PeerIdentity target; + /** + * Current TTL for the request. + */ + struct GNUNET_TIME_Absolute ttl; + + /** + * When did we start with the request. + */ + struct GNUNET_TIME_Absolute start_time; + /** * Desired anonymity level. */ @@ -146,10 +156,12 @@ typedef void (*GSF_PendingRequestReplyHandler)(void *cls, * @param query key for the lookup * @param namespace namespace to lookup, NULL for no namespace * @param target preferred target for the request, NULL for none - * @param bf bloom filter for known replies, can be NULL + * @param bf_data raw data for bloom filter for known replies, can be NULL + * @param bf_size number of bytes in bf_data * @param mingle mingle value for bf * @param anonymity_level desired anonymity level * @param priority maximum outgoing cummulative request priority to use + * @param ttl current time-to-live for the request * @param replies_seen hash codes of known local replies * @param replies_seen_count size of the 'replies_seen' array * @param rh handle to call when we get a reply @@ -162,10 +174,12 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, const GNUNET_HashCode *query, const GNUNET_HashCode *namespace, const struct GNUNET_PeerIdentity *target, - const struct GNUNET_CONTAINER_BloomFilter *bf, + const char *bf_data, + size_t bf_size, int32_t mingle, uint32_t anonymity_level, uint32_t priority, + int32_t ttl, const GNUNET_HashCode *replies_seen, unsigned int replies_seen_count, GSF_PendingRequestReplyHandler rh, @@ -186,6 +200,16 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, unsigned int replies_seen_count); +/** + * Obtain the public data associated with a pending request + * + * @param pr pending request + * @return associated public data + */ +struct GSF_PendingRequestData * +GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr); + + /** * Generate the message corresponding to the given pending request for * transmission to other peers (or at least determine its size).