* @author Christian Grothoff
*/
#include "platform.h"
+#include "gnunet_load_lib.h"
#include "gnunet-service-fs.h"
#include "gnunet-service-fs_cp.h"
+#include "gnunet-service-fs_pe.h"
+#include "gnunet-service-fs_pr.h"
#include "gnunet-service-fs_push.h"
/**
struct GSF_PeerTransmitHandle
{
+ /**
+ * Kept in a doubly-linked list.
+ */
+ struct GSF_PeerTransmitHandle *next;
+
+ /**
+ * Kept in a doubly-linked list.
+ */
+ struct GSF_PeerTransmitHandle *prev;
+
/**
* Handle for an active request for transmission to this
* peer, or NULL (if core queue was full).
/**
* Task scheduled to revive migration to this peer.
*/
- struct GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
+ GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
/**
* Messages (replies, queries, content migration) we would like to
/**
* Active requests from this neighbour.
*/
- struct GNUNET_CONTAINER_MulitHashMap *request_map;
+ struct GNUNET_CONTAINER_MultiHashMap *request_map;
/**
* Increase in traffic preference still to be submitted
*/
static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
-
/**
* Where do we store trust information?
*/
struct GNUNET_TIME_Relative latency;
latency = get_latency (atsi);
- GNUNET_LOAD_value_set_decline (cp->transmission_delay,
+ GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
latency);
/* LATER: merge atsi into cp's performance data (if we ever care...) */
}
GNUNET_assert (0 < cp->ppd.pending_replies--);
}
GNUNET_LOAD_update (cp->ppd.transmission_delay,
- GNUNET_TIME_absolute_get_duration (pth->request_start_time).rel_value);
+ GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value);
ret = pth->gmc (pth->gmc_cls,
0, NULL);
GNUNET_free (pth);
*/
static void
core_reserve_callback (void *cls,
- const struct GNUNET_PeerIdentity * peer,
+ const struct GNUNET_PeerIdentity *peer,
struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
int amount,
uint64_t preference)
{
struct GSF_ConnectedPeer *cp = cls;
+ struct GSF_PeerTransmitHandle *pth;
uint64_t ip;
cp->irc = NULL;
GNUNET_i2s (peer));
ip = cp->inc_preference;
cp->inc_preference = 0;
- cp->irc = GNUNET_CORE_peer_change_preference (core,
+ cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
peer,
GNUNET_TIME_UNIT_FOREVER_REL,
GNUNET_BANDWIDTH_VALUE_MAX,
- GNUNET_FS_DBLOCK_SIZE,
+ DBLOCK_SIZE,
ip,
&core_reserve_callback,
cp);
(NULL == pth->cth) )
{
/* reservation success, try transmission now! */
- pth->cth = GNUNET_CORE_notify_transmit_ready (core,
- priority,
+ pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
+ pth->priority,
GNUNET_TIME_absolute_get_remaining (pth->timeout),
- &target,
- size,
+ peer,
+ pth->size,
&peer_transmit_ready_cb,
pth);
}
struct GSF_ConnectedPeer *cp;
char *fn;
uint32_t trust;
- struct GNUNET_TIME_Relative latency;
cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
- cp->transmission_delay = GNUNET_LOAD_value_init (latency);
cp->ppd.pid = GNUNET_PEER_intern (peer);
- cp->transmission_delay = GNUNET_LOAD_value_init (0);
- cp->irc = GNUNET_CORE_peer_change_preference (core,
+ cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
+ cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
peer,
GNUNET_TIME_UNIT_FOREVER_REL,
GNUNET_BANDWIDTH_VALUE_MAX,
- GNUNET_FS_DBLOCK_SIZE,
+ DBLOCK_SIZE,
0,
&core_reserve_callback,
cp);
fn = get_trust_filename (peer);
if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
(sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
- cp->disk_trust = cp->trust = ntohl (trust);
+ cp->disk_trust = cp->ppd.trust = ntohl (trust);
GNUNET_free (fn);
cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
GNUNET_break (GNUNET_OK ==
cp,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
update_atsi (cp, atsi);
- GSF_plan_notify_new_peer_ (cp);
GSF_push_start_ (cp);
return cp;
}
void *buf)
{
struct PutMessage *pm = cls;
+ size_t size;
if (buf != NULL)
{
- GNUNET_assert (size >= ntohs (pm->header.size));
+ GNUNET_assert (buf_size >= ntohs (pm->header.size));
size = ntohs (pm->header.size);
memcpy (buf, pm, size);
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies transmitted to other peers"),
1,
GNUNET_NO);
}
else
{
- GNUNET_STATISTICS_update (stats,
+ size = 0;
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies dropped"),
1,
GNUNET_NO);
int more)
{
struct GSF_ConnectedPeer *cp = cls;
- struct GSF_PendingRequest *prd;
+ struct GSF_PendingRequestData *prd;
struct PutMessage *pm;
size_t msize;
if (NULL == data)
{
GNUNET_assert (GNUNET_NO == more);
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# P2P searches active"),
-1,
GNUNET_NO);
"Transmitting result for query `%s'\n",
GNUNET_h2s (key));
#endif
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies received for other peers"),
1,
GNUNET_NO);
}
+/**
+ * Test if the DATABASE (GET) load on this peer is too high
+ * to even consider processing the query at
+ * all.
+ *
+ * @return GNUNET_YES if the load is too high to do anything (load high)
+ * GNUNET_NO to process normally (load normal)
+ * GNUNET_SYSERR to process for free (load low)
+ */
+static int
+test_get_load_too_high (uint32_t priority)
+{
+#if FIXME_later
+ double ld;
+
+ ld = GNUNET_LOAD_get_load (datastore_get_load);
+ if (ld < 1)
+ return GNUNET_SYSERR;
+ if (ld <= priority)
+ return GNUNET_NO;
+ return GNUNET_YES;
+#else
+ return GNUNET_SYSERR;
+#endif
+}
+
+
+/**
+ * Increase the host credit by a value.
+ *
+ * @param cp which peer to change the trust value on
+ * @param value is the int value by which the
+ * host credit is to be increased or decreased
+ * @returns the actual change in trust (positive or negative)
+ */
+static int
+change_host_trust (struct GSF_ConnectedPeer *cp, int value)
+{
+ if (value == 0)
+ return 0;
+ GNUNET_assert (cp != NULL);
+ if (value > 0)
+ {
+ if (cp->ppd.trust + value < cp->ppd.trust)
+ {
+ value = UINT32_MAX - cp->ppd.trust;
+ cp->ppd.trust = UINT32_MAX;
+ }
+ else
+ cp->ppd.trust += value;
+ }
+ else
+ {
+ if (cp->ppd.trust < -value)
+ {
+ value = -cp->ppd.trust;
+ cp->ppd.trust = 0;
+ }
+ else
+ cp->ppd.trust += value;
+ }
+ return value;
+}
+
+
+/**
+ * We've received a request with the specified priority. Bound it
+ * according to how much we trust the given peer.
+ *
+ * @param prio_in requested priority
+ * @param cp the peer making the request
+ * @return effective priority
+ */
+static int32_t
+bound_priority (uint32_t prio_in,
+ struct GSF_ConnectedPeer *cp)
+{
+#define N ((double)128.0)
+ uint32_t ret;
+ double rret;
+ int ld;
+
+ ld = test_get_load_too_high (0);
+ if (ld == GNUNET_SYSERR)
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# requests done for free (low load)"),
+ 1,
+ GNUNET_NO);
+ return 0; /* excess resources */
+ }
+ if (prio_in > INT32_MAX)
+ prio_in = INT32_MAX;
+ ret = - change_host_trust (cp, - (int) prio_in);
+ if (ret > 0)
+ {
+ if (ret > GSF_current_priorities + N)
+ rret = GSF_current_priorities + N;
+ else
+ rret = ret;
+ GSF_current_priorities
+ = (GSF_current_priorities * (N-1) + rret)/N;
+ }
+ if ( (ld == GNUNET_YES) && (ret > 0) )
+ {
+ /* try with charging */
+ ld = test_get_load_too_high (ret);
+ }
+ if (ld == GNUNET_YES)
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# request dropped, priority insufficient"),
+ 1,
+ GNUNET_NO);
+ /* undo charge */
+ change_host_trust (cp, (int) ret);
+ return -1; /* not enough resources */
+ }
+ else
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# requests done for a price (normal load)"),
+ 1,
+ GNUNET_NO);
+ }
+#undef N
+ return ret;
+}
+
+
+/**
+ * The priority level imposes a bound on the maximum
+ * value for the ttl that can be requested.
+ *
+ * @param ttl_in requested ttl
+ * @param prio given priority
+ * @return ttl_in if ttl_in is below the limit,
+ * otherwise the ttl-limit for the given priority
+ */
+static int32_t
+bound_ttl (int32_t ttl_in, uint32_t prio)
+{
+ unsigned long long allowed;
+
+ if (ttl_in <= 0)
+ return ttl_in;
+ allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
+ if (ttl_in > allowed)
+ {
+ if (allowed >= (1 << 30))
+ return 1 << 30;
+ return allowed;
+ }
+ return ttl_in;
+}
+
+
/**
* Handle P2P "QUERY" message. Creates the pending request entry
* and sets up all of the data structures to that we will
struct GSF_PendingRequestData *prd;
struct GSF_ConnectedPeer *cp;
struct GSF_ConnectedPeer *cps;
- GNUNET_HashCode *namespace;
- struct GNUNET_PeerIdentity *target;
+ const GNUNET_HashCode *namespace;
+ const struct GNUNET_PeerIdentity *target;
enum GSF_PendingRequestOptions options;
- struct GNUNET_TIME_Relative timeout;
uint16_t msize;
const struct GetMessage *gm;
unsigned int bits;
if (msize < sizeof (struct GetMessage))
{
GNUNET_break_op (0);
- return GNUNET_SYSERR;
+ return NULL;
}
gm = (const struct GetMessage*) message;
#if DEBUG_FS
if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
{
GNUNET_break_op (0);
- return GNUNET_SYSERR;
+ return NULL;
}
opt = (const GNUNET_HashCode*) &gm[1];
bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
if (0 != ( (bfsize - 1) & bfsize))
{
GNUNET_break_op (0);
- return GNUNET_SYSERR;
+ return NULL;
}
- cover_query_count++;
+ GSF_cover_query_count++;
bm = ntohl (gm->hash_bitmap);
bits = 0;
- cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ cps = GNUNET_CONTAINER_multihashmap_get (cp_map,
&other->hashPubKey);
if (NULL == cps)
{
/* peer must have just disconnected */
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# requests dropped due to initiator not being connected"),
1,
GNUNET_NO);
- return GNUNET_SYSERR;
+ return NULL;
}
if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
- cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
&opt[bits++]);
else
cp = cps;
"Failed to find peer `%4s' in connection set. Dropping query.\n",
GNUNET_i2s (other));
#endif
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# requests dropped due to missing reverse route"),
1,
GNUNET_NO);
- return GNUNET_OK;
+ return NULL;
}
/* note that we can really only check load here since otherwise
peers could find out that we are overloaded by not being
"Dropping query from `%s', this peer is too busy.\n",
GNUNET_i2s (other));
#endif
- return GNUNET_OK;
+ return NULL;
}
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL;
options = 0;
- if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
- (GNUNET_LOAD_get_average (cp->transmission_delay) >
- GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
+ if ( (GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority)) ||
+ (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
+ GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)) )
{
/* don't have BW to send to peer, or would likely take longer than we have for it,
so at best indirect the query */
priority = 0;
options |= GSF_PRO_FORWARD_ONLY;
}
- ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
+ ttl = bound_ttl (ntohl (gm->ttl), priority);
/* decrement ttl (always) */
ttl_decrement = 2 * TTL_DECREMENT +
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
ttl,
ttl_decrement);
#endif
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# requests dropped due TTL underflow"),
1,
GNUNET_NO);
/* integer underflow => drop (should be very rare)! */
- return GNUNET_OK;
+ return NULL;
}
ttl -= ttl_decrement;
{
prd = GSF_pending_request_get_data_ (pr);
if ( (prd->type == type) &&
- ( (type != GNUNET_BLOCK_TYPE_SBLOCK) ||
- (0 == memcmp (prd->namespace,
+ ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) ||
+ (0 == memcmp (&prd->namespace,
namespace,
sizeof (GNUNET_HashCode))) ) )
{
"Have existing request with higher TTL, dropping new request.\n",
GNUNET_i2s (other));
#endif
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# requests dropped due to higher-TTL request"),
1,
GNUNET_NO);
- return GNUNET_OK;
+ return NULL;
}
/* existing request has lower TTL, drop old one! */
- pr->priority += prd->priority;
+ priority += prd->priority;
GSF_pending_request_cancel_ (pr);
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
}
}
- pr = GSF_pending_request_create (options,
- type,
- &gm->query,
- namespace,
- target,
- (bf_size > 0) ? (const char*)&opt[bits] : NULL,
- bf_size,
- ntohl (gm->filter_mutator),
- 1 /* anonymity */
- (uint32_t) priority,
- ttl,
- NULL, 0, /* replies_seen */
- &handle_p2p_reply,
- cp);
+ pr = GSF_pending_request_create_ (options,
+ type,
+ &gm->query,
+ namespace,
+ target,
+ (bfsize > 0) ? (const char*)&opt[bits] : NULL,
+ bfsize,
+ ntohl (gm->filter_mutator),
+ 1 /* anonymity */,
+ (uint32_t) priority,
+ ttl,
+ NULL, 0, /* replies_seen */
+ &handle_p2p_reply,
+ cp);
GNUNET_break (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (cp->request_map,
&gm->query,
pr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# P2P searches received"),
1,
GNUNET_NO);
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# P2P searches active"),
1,
GNUNET_NO);
* If the peer disconnects before the transmission can happen,
* the callback is invoked with a 'NULL' buffer.
*
- * @param peer target peer
+ * @param cp target peer
* @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
* @param priority how important is this request?
* @param timeout when does this request timeout (call gmc with error)
* @return handle to cancel request
*/
struct GSF_PeerTransmitHandle *
-GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
+GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
int is_query,
uint32_t priority,
struct GNUNET_TIME_Relative timeout,
GSF_GetMessageCallback gmc,
void *gmc_cls)
{
- struct GSF_ConnectedPeer *cp;
struct GSF_PeerTransmitHandle *pth;
struct GSF_PeerTransmitHandle *pos;
struct GSF_PeerTransmitHandle *prev;
uint64_t ip;
int is_ready;
- cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
- &peer->hashPubKey);
- GNUNET_assert (NULL != cp);
pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
- pth->transmission_request_start_time = GNUNET_TIME_absolute_now ();
+ pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
pth->gmc = gmc;
pth->gmc_cls = gmc_cls;
pos = pos->next;
}
if (prev == NULL)
- GNUNET_CONTAINER_DLL_insert_head (cp->pth_head,
- cp->pth_tail,
- pth);
+ GNUNET_CONTAINER_DLL_insert (cp->pth_head,
+ cp->pth_tail,
+ pth);
else
GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
cp->pth_tail,
is_ready = GNUNET_YES;
ip = cp->inc_preference;
cp->inc_preference = 0;
- cp->irc = GNUNET_CORE_peer_change_preference (core,
- peer,
+ cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
+ &target,
GNUNET_TIME_UNIT_FOREVER_REL,
GNUNET_BANDWIDTH_VALUE_MAX,
- GNUNET_FS_DBLOCK_SIZE,
+ DBLOCK_SIZE,
ip,
&core_reserve_callback,
cp);
}
if (is_ready)
{
- pth->cth = GNUNET_CORE_notify_transmit_ready (core,
+ pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
priority,
timeout,
&target,
void
GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
{
- struct GSF_PeerTransmitHandle *pth = cls;
struct GSF_ConnectedPeer *cp;
if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
struct GNUNET_TIME_Relative delay;
delay = GNUNET_TIME_absolute_get_duration (request_time);
- cp->ppd.avg_reply_delay = (cp->ppd.avg_reply_delay * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
- cp->ppd.avg_priority = (cp->avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
+ cp->ppd.avg_reply_delay.rel_value = (cp->ppd.avg_reply_delay.rel_value * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
+ cp->ppd.avg_priority = (cp->ppd.avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
}
*/
void
GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
- const struct GSF_LocalClient *initiator_client)
+ struct GSF_LocalClient *initiator_client)
{
cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
}
GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
while (NULL != (pth = cp->pth_head))
{
- if (NULL != pth->th)
+ if (NULL != pth->cth)
{
- GNUNET_CORE_notify_transmit_ready_cancel (pth->th);
- pth->th = NULL;
+ GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
+ pth->cth = NULL;
}
GNUNET_CONTAINER_DLL_remove (cp->pth_head,
cp->pth_tail,
struct GNUNET_PeerIdentity *id)
{
GNUNET_PEER_resolve (cp->ppd.pid,
- &id);
+ id);
}
uint32_t trust;
struct GNUNET_PeerIdentity pid;
- if (cp->trust == cp->disk_trust)
+ if (cp->ppd.trust == cp->disk_trust)
return GNUNET_OK; /* unchanged */
GNUNET_PEER_resolve (cp->ppd.pid,
&pid);
fn = get_trust_filename (&pid);
- if (cp->trust == 0)
+ if (cp->ppd.trust == 0)
{
if ((0 != UNLINK (fn)) && (errno != ENOENT))
GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
}
else
{
- trust = htonl (cp->trust);
+ trust = htonl (cp->ppd.trust);
if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust,
sizeof(uint32_t),
GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
| GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
- cp->disk_trust = cp->trust;
+ cp->disk_trust = cp->ppd.trust;
}
GNUNET_free (fn);
return GNUNET_OK;