#include "gnunet-service-fs_pr.h"
#include "gnunet-service-fs_push.h"
+
+/**
+ * Ratio for moving average delay calculation. The previous
+ * average goes in with a factor of (n-1) into the calculation.
+ * Must be > 0.
+ */
+#define RUNAVG_DELAY_N 16
+
/**
* How often do we flush trust values to disk?
*/
* @return connection latency
*/
static struct GNUNET_TIME_Relative
-get_latency (const struct GNUNET_ATS_Information *atsi,
- unsigned int atsi_count)
+get_latency (const struct GNUNET_ATS_Information *atsi, unsigned int atsi_count)
{
unsigned int i;
- for (i=0;i<atsi_count;i++)
+ for (i = 0; i < atsi_count; i++)
if (ntohl (atsi->type) == GNUNET_ATS_QUALITY_NET_DELAY)
return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
- ntohl (atsi->value));
+ ntohl (atsi->value));
return GNUNET_TIME_UNIT_SECONDS;
}
*/
static void
update_atsi (struct GSF_ConnectedPeer *cp,
- const struct GNUNET_ATS_Information *atsi,
- unsigned int atsi_count)
+ const struct GNUNET_ATS_Information *atsi, unsigned int atsi_count)
{
struct GNUNET_TIME_Relative latency;
*/
static void
ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
- int32_t amount, struct GNUNET_TIME_Relative res_delay);
+ int32_t amount, struct GNUNET_TIME_Relative res_delay);
/**
if (0 != cp->inc_preference)
{
- GNUNET_ATS_change_preference (ats,
- &target,
- GNUNET_ATS_PREFERENCE_BANDWIDTH,
- (double) cp->inc_preference,
- GNUNET_ATS_PREFERENCE_END);
+ GNUNET_ATS_change_preference (ats, &target, GNUNET_ATS_PREFERENCE_BANDWIDTH,
+ (double) cp->inc_preference,
+ GNUNET_ATS_PREFERENCE_END);
cp->inc_preference = 0;
}
/* reservation already done! */
pth->was_reserved = GNUNET_YES;
cp->rc =
- GNUNET_ATS_reserve_bandwidth (ats, &target,
- DBLOCK_SIZE,
- &ats_reserve_callback, cp);
+ GNUNET_ATS_reserve_bandwidth (ats, &target, DBLOCK_SIZE,
+ &ats_reserve_callback, cp);
}
GNUNET_assert (pth->cth == NULL);
pth->cth_in_progress++;
GNUNET_PEER_resolve (cp->ppd.pid, &target);
cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
cp->rc =
- GNUNET_ATS_reserve_bandwidth (ats, &target,
- DBLOCK_SIZE,
- &ats_reserve_callback, cp);
+ GNUNET_ATS_reserve_bandwidth (ats, &target, DBLOCK_SIZE,
+ &ats_reserve_callback, cp);
}
*/
static void
ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
- int32_t amount, struct GNUNET_TIME_Relative res_delay)
+ int32_t amount, struct GNUNET_TIME_Relative res_delay)
{
struct GSF_ConnectedPeer *cp = cls;
struct GSF_PeerTransmitHandle *pth;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Reserved %d bytes / need to wait %llu ms for reservation\n",
- (int) amount,
- (unsigned long long) res_delay.rel_value);
+ "Reserved %d bytes / need to wait %llu ms for reservation\n",
+ (int) amount, (unsigned long long) res_delay.rel_value);
cp->rc = NULL;
if (0 == amount)
{
struct GSF_ConnectedPeer *
GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_ATS_Information *atsi,
- unsigned int atsi_count)
+ unsigned int atsi_count)
{
struct GSF_ConnectedPeer *cp;
char *fn;
uint32_t trust;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Connected to peer %s\n",
- GNUNET_i2s (peer));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to peer %s\n",
+ GNUNET_i2s (peer));
cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
cp->ppd.pid = GNUNET_PEER_intern (peer);
cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
cp->rc =
- GNUNET_ATS_reserve_bandwidth (ats, peer,
- DBLOCK_SIZE,
- &ats_reserve_callback, cp);
+ GNUNET_ATS_reserve_bandwidth (ats, peer, DBLOCK_SIZE,
+ &ats_reserve_callback, cp);
fn = get_trust_filename (peer);
if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
(sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
GNUNET_CONTAINER_multihashmap_put (cp_map, &peer->hashPubKey,
cp,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- GNUNET_STATISTICS_set (GSF_stats,
- gettext_noop
- ("# peers connected"),
- GNUNET_CONTAINER_multihashmap_size (cp_map),
- GNUNET_NO);
+ GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
+ GNUNET_CONTAINER_multihashmap_size (cp_map),
+ GNUNET_NO);
update_atsi (cp, atsi, atsi_count);
GSF_push_start_ (cp);
return cp;
GSF_handle_p2p_migration_stop_ (void *cls,
const struct GNUNET_PeerIdentity *other,
const struct GNUNET_MessageHeader *message,
- const struct GNUNET_ATS_Information
- *atsi,
- unsigned int atsi_count)
+ const struct GNUNET_ATS_Information *atsi,
+ unsigned int atsi_count)
{
struct GSF_ConnectedPeer *cp;
const struct MigrationStopMessage *msm;
return GNUNET_OK;
}
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop
- ("# migration stop messages received"),
- 1, GNUNET_NO);
+ gettext_noop ("# migration stop messages received"),
+ 1, GNUNET_NO);
bt = GNUNET_TIME_relative_ntoh (msm->duration);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_("Migration of content to peer `%s' blocked for %llu ms\n"),
}
+/**
+ * Free resources associated with the given peer request.
+ *
+ * @param peerreq request to free
+ * @param query associated key for the request
+ */
+static void
+free_pending_request (struct PeerRequest *peerreq,
+ const GNUNET_HashCode *query)
+{
+ struct GSF_ConnectedPeer *cp = peerreq->cp;
+
+ if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (peerreq->kill_task);
+ peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
+ -1, GNUNET_NO);
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
+ query, peerreq));
+ GNUNET_free (peerreq);
+}
+
+
/**
* Cancel all requests associated with the peer.
*
{
struct PeerRequest *peerreq = value;
struct GSF_PendingRequest *pr = peerreq->pr;
- struct GSF_ConnectedPeer *cp = peerreq->cp;
struct GSF_PendingRequestData *prd;
- if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (peerreq->kill_task);
- peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
- }
- GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
- -1, GNUNET_NO);
prd = GSF_pending_request_get_data_ (pr);
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
- &prd->query, peerreq));
GSF_pending_request_cancel_ (pr, GNUNET_NO);
- GNUNET_free (peerreq);
+ free_pending_request (peerreq, &prd->query);
return GNUNET_OK;
}
ret =
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
GNUNET_CRYPTO_random_u32
- (GNUNET_CRYPTO_QUALITY_WEAK,
- 2 * GSF_avg_latency.rel_value + 1));
+ (GNUNET_CRYPTO_QUALITY_WEAK,
+ 2 * GSF_avg_latency.rel_value + 1));
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# artificial delays introduced (ms)"),
* @param pr handle to the original pending request
* @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
* @param expiration when does 'data' expire?
+ * @param last_transmission when did we last transmit a request for this block
* @param type type of the block
* @param data response data, NULL on request expiration
* @param data_len number of bytes in data
handle_p2p_reply (void *cls, enum GNUNET_BLOCK_EvaluationResult eval,
struct GSF_PendingRequest *pr, uint32_t reply_anonymity_level,
struct GNUNET_TIME_Absolute expiration,
+ struct GNUNET_TIME_Absolute last_transmission,
enum GNUNET_BLOCK_Type type, const void *data,
size_t data_len)
{
prd = GSF_pending_request_get_data_ (pr);
if (NULL == data)
{
- GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
- -1, GNUNET_NO);
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
- &prd->query, peerreq));
- GNUNET_free (peerreq);
+ free_pending_request (peerreq, &prd->query);
return;
}
GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY);
if ((prd->type != type) && (prd->type != GNUNET_BLOCK_TYPE_ANY))
{
- GNUNET_break (0);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop
+ ("# replies dropped due to type mismatch"),
+ 1, GNUNET_NO);
return;
}
-#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmitting result for query `%s' to peer\n",
GNUNET_h2s (&prd->query));
-#endif
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies received for other peers"),
1, GNUNET_NO);
return NULL;
}
if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
- cp = GSF_peer_get_ ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
+ cp = GSF_peer_get_ ((const struct GNUNET_PeerIdentity *) &opt[bits++]);
else
cp = cps;
if (cp == NULL)
{
-#if DEBUG_FS
if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Failed to find peer `%4s' in connection set. Dropping query.\n",
GNUNET_i2s (other));
-#endif
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# requests dropped due to missing reverse route"),
priority = bound_priority (ntohl (gm->priority), cps);
if (priority < 0)
{
-#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Dropping query from `%s', this peer is too busy.\n",
GNUNET_i2s (other));
-#endif
return NULL;
}
-#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received request for `%s' of type %u from peer `%4s' with flags %u\n",
GNUNET_h2s (&gm->query), (unsigned int) type, GNUNET_i2s (other),
(unsigned int) bm);
-#endif
namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
if ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) && (namespace == NULL))
{
(0 !=
(bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity
*) &opt[bits++]) : NULL;
- options = 0;
+ options = GSF_PRO_DEFAULTS;
spid = 0;
if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority))
|| (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
TTL_DECREMENT);
if ((ttl < 0) && (((int32_t) (ttl - ttl_decrement)) > 0))
{
-#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Dropping query from `%s' due to TTL underflow (%d - %u).\n",
GNUNET_i2s (other), ttl, ttl_decrement);
-#endif
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# requests dropped due TTL underflow"), 1,
{
/* existing request has higher TTL, drop new one! */
prd->priority += priority;
-#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Have existing request with higher TTL, dropping new request.\n",
GNUNET_i2s (other));
-#endif
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# requests dropped due to higher-TTL request"),
return NULL;
}
/* existing request has lower TTL, drop old one! */
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# P2P searches active"), -1,
- GNUNET_NO);
priority += prd->priority;
GSF_pending_request_cancel_ (pr, GNUNET_YES);
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
- &gm->query,
- peerreq));
- if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (peerreq->kill_task);
- peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
- }
- GNUNET_free (peerreq);
+ free_pending_request (peerreq, &gm->query);
}
}
0) ? (const char *) &opt[bits] : NULL,
bfsize, ntohl (gm->filter_mutator),
1 /* anonymity */ ,
- (uint32_t) priority, ttl, spid,
- GNUNET_PEER_intern (other),
- NULL, 0, /* replies_seen */
+ (uint32_t) priority, ttl, spid, GNUNET_PEER_intern (other), NULL, 0, /* replies_seen */
&handle_p2p_reply, peerreq);
GNUNET_assert (NULL != pr);
peerreq->pr = pr;
struct GSF_PeerTransmitHandle *pth = cls;
struct GSF_ConnectedPeer *cp;
-#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Timeout trying to transmit to other peer\n");
-#endif
pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
cp = pth->cp;
GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (cp_map,
&peer->hashPubKey, cp));
- GNUNET_STATISTICS_set (GSF_stats,
- gettext_noop
- ("# peers connected"),
- GNUNET_CONTAINER_multihashmap_size (cp_map),
- GNUNET_NO);
+ GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
+ GNUNET_CONTAINER_multihashmap_size (cp_map),
+ GNUNET_NO);
if (NULL != cp->migration_pth)
{
GSF_peer_transmit_cancel_ (cp->migration_pth);
GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
(cp->last_migration_block));
memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# migration stop messages sent"),
+ 1, GNUNET_NO);
return sizeof (struct MigrationStopMessage);
}
*/
void
GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
- struct GNUNET_TIME_Relative block_time)
+ struct GNUNET_TIME_Absolute block_time)
{
- if (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block).rel_value >
- block_time.rel_value)
+ if (cp->last_migration_block.abs_value > block_time.abs_value)
{
-#if DEBUG_FS && 0
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Migration already blocked for another %llu ms\n",
(unsigned long long)
GNUNET_TIME_absolute_get_remaining
(cp->last_migration_block).rel_value);
-#endif
return; /* already blocked */
}
-#if DEBUG_FS && 0
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %llu ms\n",
- (unsigned long long) block_time.rel_value);
-#endif
- cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
+ (unsigned long long) GNUNET_TIME_absolute_get_remaining (block_time).rel_value);
+ cp->last_migration_block = block_time;
if (cp->migration_pth != NULL)
GSF_peer_transmit_cancel_ (cp->migration_pth);
cp->migration_pth =
return;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
- GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
+ GNUNET_SCHEDULER_add_delayed_with_priority (TRUST_FLUSH_FREQ,
+ GNUNET_SCHEDULER_PRIORITY_HIGH,
+ &cron_flush_trust, NULL);
}