#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
-
/**
* Handle to cancel a transmission request.
*/
*/
struct GSF_PeerTransmitHandle *pth_tail;
+ /**
+ * Migration stop message in our queue, or NULL if we have none pending.
+ */
+ struct GSF_PeerTransmitHandle *migration_pth;
+
/**
* Context of our GNUNET_CORE_peer_change_preference call (or NULL).
* NULL if we have successfully reserved 32k, otherwise non-NULL.
*/
struct GNUNET_CORE_InformationRequestContext *irc;
+ /**
+ * Active requests from this neighbour.
+ */
+ struct GNUNET_CONTAINER_MulitHashMap *request_map;
+
/**
* ID of delay task for scheduling transmission.
*/
- GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: unused!
+ GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: used in 'push' (ugh!)
/**
* Increase in traffic preference still to be submitted
GNUNET_CONTAINER_DLL_remove (cp->pth_head,
cp->pth_tail,
pth);
- if (pth->is_query)
+ if (GNUNET_YES == pth->is_query)
{
cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
GNUNET_assert (0 < cp->ppd.pending_queries--);
}
- else
+ else if (GNUNET_NO == pth->is_query)
{
GNUNET_assert (0 < cp->ppd.pending_replies--);
}
(sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
cp->disk_trust = cp->trust = ntohl (trust);
GNUNET_free (fn);
+ cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
GNUNET_break (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (cp_map,
&peer->hashPubKey,
* and will also not be called anymore after a call signalling
* expiration.
*
- * @param cls user-specified closure
+ * @param cls 'struct GSF_ConnectedPeer' of the peer that would
+ * have liked an answer to the request
* @param pr handle to the original pending request
* @param data response data, NULL on request expiration
* @param data_len number of bytes in data
const void *data,
size_t data_len)
{
+ struct GSF_ConnectedPeer *cp = cls;
+
#if SUPPORT_DELAYS
struct GNUNET_TIME_Relative art_delay;
#endif
/* FIXME: adapt code fragments below to new API! */
-
+ if (NULL == data)
+ {
+ /* FIXME: request expired! clean up! */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# P2P searches active"),
+ -1,
+ GNUNET_NO);
+ return;
+ }
/* reply will go over the network, check for cover traffic */
if ( (prq->anonymity_level > 1) &&
}
-
/**
- * Handle P2P "QUERY" message.
+ * Handle P2P "QUERY" message. Creates the pending request entry
+ * and sets up all of the data structures to that we will
+ * process replies properly. Does not initiate forwarding or
+ * local database lookups.
*
* @param other the other peer involved (sender or receiver, NULL
* for loopback messages where we are both sender and receiver)
GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
const struct GNUNET_MessageHeader *message)
{
- /* FIXME: adapt old code to new API! */
- struct PendingRequest *pr;
- struct ConnectedPeer *cp;
- struct ConnectedPeer *cps;
- struct CheckDuplicateRequestClosure cdc;
+ struct GSF_PendingRequest *pr;
+ struct GSF_PendingRequestData *prd;
+ struct GSF_ConnectedPeer *cp;
+ struct GSF_ConnectedPeer *cps;
+ GNUNET_HashCode *namespace;
+ struct GNUNET_PeerIdentity *target;
+ enum GSF_PendingRequestOptions options;
struct GNUNET_TIME_Relative timeout;
uint16_t msize;
const struct GetMessage *gm;
size_t bfsize;
uint32_t ttl_decrement;
int32_t priority;
+ int32_t ttl;
enum GNUNET_BLOCK_Type type;
- int have_ns;
+
msize = ntohs(message->size);
if (msize < sizeof (struct GetMessage))
gettext_noop ("# requests dropped due to missing reverse route"),
1,
GNUNET_NO);
- /* FIXME: try connect? */
return GNUNET_OK;
}
/* note that we can really only check load here since otherwise
GNUNET_i2s (other),
(unsigned int) bm);
#endif
- have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
- pr = GNUNET_malloc (sizeof (struct PendingRequest) +
- (have_ns ? sizeof(GNUNET_HashCode) : 0));
- if (have_ns)
- {
- pr->namespace = (GNUNET_HashCode*) &pr[1];
- memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
- }
+ namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
+ target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL;
+ options = 0;
if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
(GNUNET_LOAD_get_average (cp->transmission_delay) >
GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
/* don't have BW to send to peer, or would likely take longer than we have for it,
so at best indirect the query */
priority = 0;
- pr->forward_only = GNUNET_YES;
+ options |= GSF_PRO_FORWARD_ONLY;
}
- pr->type = type;
- pr->mingle = ntohl (gm->filter_mutator);
- if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
- pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
- pr->anonymity_level = 1;
- pr->priority = (uint32_t) priority;
- pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
- pr->query = gm->query;
+ ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
/* decrement ttl (always) */
ttl_decrement = 2 * TTL_DECREMENT +
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
TTL_DECREMENT);
- if ( (pr->ttl < 0) &&
- (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
+ if ( (ttl < 0) &&
+ (((int32_t)(ttl - ttl_decrement)) > 0) )
{
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Dropping query from `%s' due to TTL underflow (%d - %u).\n",
GNUNET_i2s (other),
- pr->ttl,
+ ttl,
ttl_decrement);
#endif
GNUNET_STATISTICS_update (stats,
1,
GNUNET_NO);
/* integer underflow => drop (should be very rare)! */
- GNUNET_free (pr);
return GNUNET_OK;
}
- pr->ttl -= ttl_decrement;
- pr->start_time = GNUNET_TIME_absolute_get ();
-
- /* get bloom filter */
- if (bfsize > 0)
- {
- pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
- bfsize,
- BLOOMFILTER_K);
- pr->bf_size = bfsize;
- }
- cdc.have = NULL;
- cdc.pr = pr;
- GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
- &gm->query,
- &check_duplicate_request_peer,
- &cdc);
- if (cdc.have != NULL)
- {
- if (cdc.have->start_time.abs_value + cdc.have->ttl >=
- pr->start_time.abs_value + pr->ttl)
+ ttl -= ttl_decrement;
+
+ /* test if the request already exists */
+ pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
+ &gm->query);
+ if (pr != NULL)
+ {
+ prd = GSF_pending_request_get_data_ (pr);
+ if ( (prd->type == type) &&
+ ( (type != GNUNET_BLOCK_TYPE_SBLOCK) ||
+ (0 == memcmp (prd->namespace,
+ namespace,
+ sizeof (GNUNET_HashCode))) ) )
{
- /* existing request has higher TTL, drop new one! */
- cdc.have->priority += pr->priority;
- destroy_pending_request (pr);
+ if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get().abs_value + ttl)
+ {
+ /* existing request has higher TTL, drop new one! */
+ prd->priority += priority;
#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Have existing request with higher TTL, dropping new request.\n",
- GNUNET_i2s (other));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Have existing request with higher TTL, dropping new request.\n",
+ GNUNET_i2s (other));
#endif
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# requests dropped due to higher-TTL request"),
- 1,
- GNUNET_NO);
- return GNUNET_OK;
- }
- else
- {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped due to higher-TTL request"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_OK;
+ }
/* existing request has lower TTL, drop old one! */
- pr->priority += cdc.have->priority;
- /* Possible optimization: if we have applicable pending
- replies in 'cdc.have', we might want to move those over
- (this is a really rare special-case, so it is not clear
- that this would be worth it) */
- destroy_pending_request (cdc.have);
- /* keep processing 'pr'! */
+ pr->priority += prd->priority;
+ GSF_pending_request_cancel_ (pr);
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
+ &gm->query,
+ pr));
}
}
-
- pr->cp = cp;
+
+ pr = GSF_pending_request_create (options,
+ type,
+ &gm->query,
+ namespace,
+ target,
+ (bf_size > 0) ? (const char*)&opt[bits] : NULL,
+ bf_size,
+ ntohl (gm->filter_mutator),
+ 1 /* anonymity */
+ (uint32_t) priority,
+ ttl,
+ NULL, 0, /* replies_seen */
+ &handle_p2p_reply,
+ cp);
GNUNET_break (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (query_request_map,
+ GNUNET_CONTAINER_multihashmap_put (cp->request_map,
&gm->query,
pr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- GNUNET_break (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (peer_request_map,
- &other->hashPubKey,
- pr,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-
- pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
- pr,
- pr->start_time.abs_value + pr->ttl);
-
GNUNET_STATISTICS_update (stats,
gettext_noop ("# P2P searches received"),
1,
gettext_noop ("# P2P searches active"),
1,
GNUNET_NO);
-
- /* calculate change in traffic preference */
- cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
- /* process locally */
- if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
- type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
- timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
- (pr->priority + 1));
- if (GNUNET_YES != pr->forward_only)
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Handing request for `%s' to datastore\n",
- GNUNET_h2s (&gm->query));
-#endif
- pr->qe = GNUNET_DATASTORE_get (dsh,
- &gm->query,
- type,
- pr->priority + 1,
- MAX_DATASTORE_QUEUE,
- timeout,
- &process_local_reply,
- pr);
- if (NULL == pr->qe)
- {
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# requests dropped by datastore (queue length limit)"),
- 1,
- GNUNET_NO);
- }
- }
- else
- {
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# requests forwarded due to high load"),
- 1,
- GNUNET_NO);
- }
-
- /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */
- switch (pr->type)
- {
- case GNUNET_BLOCK_TYPE_FS_DBLOCK:
- case GNUNET_BLOCK_TYPE_FS_IBLOCK:
- /* only one result, wait for datastore */
- if (GNUNET_YES != pr->forward_only)
- {
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
- 1,
- GNUNET_NO);
- break;
- }
- default:
- if (pr->task == GNUNET_SCHEDULER_NO_TASK)
- pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
- pr);
- }
-
- /* make sure we don't track too many requests */
- if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
- {
- pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
- GNUNET_assert (pr != NULL);
- destroy_pending_request (pr);
- }
- return GNUNET_OK;
-
-
-
- // FIXME!
- // parse request
- // setup pending request (use 'handle_p2p_reply')
- // track pending request to cancel it on peer disconnect (!)
- // return it!
- // (actual planning & execution up to caller!)
- return NULL;
+ return pr;
}
GNUNET_CONTAINER_DLL_remove (cp->pth_head,
cp->pth_tail,
pth);
- if (pth->is_query)
+ if (GNUNET_YES == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_queries--);
- else
+ else if (GNUNET_NO == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_replies--);
GNUNET_LOAD_update (cp->ppd.transmission_delay,
UINT64_MAX);
* the callback is invoked with a 'NULL' buffer.
*
* @param peer target peer
- * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO)
+ * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
* @param priority how important is this request?
* @param timeout when does this request timeout (call gmc with error)
* @param size number of bytes we would like to send to the peer
pth);
GNUNET_PEER_resolve (cp->pid,
&target);
- if (is_query)
+ if (GNUNET_YES == is_query)
{
/* query, need reservation */
+ cp->ppd.pending_queries++;
if (NULL == cp->irc)
{
/* reservation already done! */
is_ready = GNUNET_NO;
}
}
- else
+ else if (GNUNET_NO == is_query)
{
/* no reservation needed for content */
+ cp->ppd.pending_replies++;
+ is_ready = GNUNET_YES;
+ }
+ else
+ {
+ /* not a query or content, no reservation needed */
is_ready = GNUNET_YES;
}
if (is_ready)
GNUNET_CONTAINER_DLL_remove (cp->pth_head,
cp->pth_tail,
pth);
- if (pth->is_query)
+ if (GNUNET_YES == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_queries--);
- else
+ else if (GNUNET_NO == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_replies--);
GNUNET_free (pth);
}
}
+/**
+ * Cancel all requests associated with the peer.
+ *
+ * @param cls unused
+ * @param query hash code of the request
+ * @param value the 'struct GSF_PendingRequest'
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+cancel_pending_request (void *cls,
+ const GNUNET_HashCode *query,
+ void *value)
+{
+ struct GSF_PendingRequest *pr = value;
+
+ GSF_pending_request_cancel_ (pr);
+ return GNUNET_OK;
+}
+
+
/**
* A peer disconnected from us. Tear down the connected peer
* record.
GNUNET_CONTAINER_multihashmap_remove (cp_map,
&peer->hashPubKey,
cp);
+ if (NULL != cp->migration_pth)
+ {
+ GSF_peer_transmit_cancel_ (cp->migration_pth);
+ cp->migration_pth = NULL;
+ }
if (NULL != cp->irc)
{
GNUNET_CORE_peer_change_preference_cancel (cp->irc);
cp->irc = NULL;
}
+ GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
+ &cancel_pending_request,
+ cp);
+ GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
+ cp->request_map = NULL;
GSF_plan_notify_peer_disconnect_ (cp);
GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
}
+/**
+ * Assemble a migration stop message for transmission.
+ *
+ * @param cls the 'struct GSF_ConnectedPeer' to use
+ * @param size number of bytes we're allowed to write to buf
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
+ */
+static size_t
+create_migration_stop_message (void *cls,
+ size_t size,
+ void *buf)
+{
+ struct GSF_ConnectedPeer *cp = cls;
+ struct MigrationStopMessage msm;
+
+ cp->migration_pth = NULL;
+ if (NULL == buf)
+ return 0;
+ GNUNET_assert (size > sizeof (struct MigrationStopMessage));
+ msm.header.size = htons (sizeof (struct MigrationStopMessage));
+ msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+ msm.duration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block));
+ memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
+ return sizeof (struct MigrationStopMessage);
+}
+
+
/**
* Ask a peer to stop migrating data to us until the given point
* in time.
GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
struct GNUNET_TIME_Relative block_time)
{
- struct PendingMessage *pm;
- struct MigrationStopMessage *msm;
-
if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value)
return; /* already blocked */
cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
-
- /* FIXME: adapt old code below to new API! */
- pm = GNUNET_malloc (sizeof (struct PendingMessage) +
- sizeof (struct MigrationStopMessage));
- pm->msize = sizeof (struct MigrationStopMessage);
- pm->priority = UINT32_MAX;
- msm = (struct MigrationStopMessage*) &pm[1];
- msm->header.size = htons (sizeof (struct MigrationStopMessage));
- msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
- msm->duration = GNUNET_TIME_relative_hton (block_time);
- add_to_pending_messages_for_peer (cp,
- pm,
- NULL);
+ if (cp->migration_pth != NULL)
+ GSF_peer_transmit_cancel_ (cp->migration_pth);
+ cp->migration_pth
+ = GSF_peer_transmit_ (cp,
+ GNUNET_SYSERR,
+ UINT32_MAX,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ sizeof (struct MigrationStopMessage),
+ &create_migration_stop_message,
+ cp);
}
-
-
/**
* Write host-trust information to a file - flush the buffer entry!
*