*/
#include "platform.h"
#include "gnunet_load_lib.h"
+#include "gnunet_ats_service.h"
#include "gnunet-service-fs.h"
#include "gnunet-service-fs_cp.h"
#include "gnunet-service-fs_pe.h"
#include "gnunet-service-fs_pr.h"
#include "gnunet-service-fs_push.h"
+
+/**
+ * Ratio for moving average delay calculation. The previous
+ * average goes in with a factor of (n-1) into the calculation.
+ * Must be > 0.
+ */
+#define RUNAVG_DELAY_N 16
+
/**
* How often do we flush trust values to disk?
*/
struct GSF_PeerTransmitHandle *migration_pth;
/**
- * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
+ * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
*/
- struct GNUNET_CORE_InformationRequestContext *irc;
+ struct GNUNET_ATS_ReservationContext *rc;
/**
* Task scheduled if we need to retry bandwidth reservation later.
*/
- GNUNET_SCHEDULER_TaskIdentifier irc_delay_task;
+ GNUNET_SCHEDULER_TaskIdentifier rc_delay_task;
/**
* Active requests from this neighbour, map of query to 'struct PeerRequest'.
*/
static char *trustDirectory;
+/**
+ * Handle to ATS service.
+ */
+static struct GNUNET_ATS_PerformanceHandle *ats;
/**
* Get the filename under which we would store the GNUNET_HELLO_Message
* Find latency information in 'atsi'.
*
* @param atsi performance data
+ * @param atsi_count number of records in 'atsi'
* @return connection latency
*/
static struct GNUNET_TIME_Relative
-get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+get_latency (const struct GNUNET_ATS_Information *atsi, unsigned int atsi_count)
{
- if (atsi == NULL)
- return GNUNET_TIME_UNIT_SECONDS;
- while ((ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
- (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY))
- atsi++;
- if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR)
- {
- GNUNET_break (0);
- /* how can we not have latency data? */
- return GNUNET_TIME_UNIT_SECONDS;
- }
- return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
- ntohl (atsi->value));
+ unsigned int i;
+
+ for (i = 0; i < atsi_count; i++)
+ if (ntohl (atsi->type) == GNUNET_ATS_QUALITY_NET_DELAY)
+ return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ ntohl (atsi->value));
+ return GNUNET_TIME_UNIT_SECONDS;
}
*
* @param cp peer record to update
* @param atsi transport performance data
+ * @param atsi_count number of records in 'atsi'
*/
static void
update_atsi (struct GSF_ConnectedPeer *cp,
- const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+ const struct GNUNET_ATS_Information *atsi, unsigned int atsi_count)
{
struct GNUNET_TIME_Relative latency;
- latency = get_latency (atsi);
+ latency = get_latency (atsi, atsi_count);
GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay, latency);
/* LATER: merge atsi into cp's performance data (if we ever care...) */
}
/**
* Return the performance data record for the given peer
- *
+ *
* @param cp peer to query
* @return performance data record for the peer
*/
* @param buf where to copy the message
* @return number of bytes copied to buf
*/
-static size_t peer_transmit_ready_cb (void *cls, size_t size, void *buf);
-
-
+static size_t
+peer_transmit_ready_cb (void *cls, size_t size, void *buf);
/**
*
* @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
* @param peer identifies the peer
- * @param bandwidth_out available amount of outbound bandwidth
* @param amount set to the amount that was actually reserved or unreserved;
* either the full requested amount or zero (no partial reservations)
* @param res_delay if the reservation could not be satisfied (amount was 0), how
* long should the client wait until re-trying?
- * @param preference current traffic preference for the given peer
*/
static void
-core_reserve_callback (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
- int32_t amount,
- struct GNUNET_TIME_Relative res_delay,
- uint64_t preference);
+ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
+ int32_t amount, struct GNUNET_TIME_Relative res_delay);
/**
{
struct GSF_ConnectedPeer *cp;
struct GNUNET_PeerIdentity target;
- uint64_t ip;
if ((NULL != pth->cth) || (0 != pth->cth_in_progress))
return; /* already done */
cp = pth->cp;
GNUNET_assert (0 != cp->ppd.pid);
GNUNET_PEER_resolve (cp->ppd.pid, &target);
+
+ if (0 != cp->inc_preference)
+ {
+ GNUNET_ATS_change_preference (ats, &target, GNUNET_ATS_PREFERENCE_BANDWIDTH,
+ (double) cp->inc_preference,
+ GNUNET_ATS_PREFERENCE_END);
+ cp->inc_preference = 0;
+ }
+
if ((GNUNET_YES == pth->is_query) && (GNUNET_YES != pth->was_reserved))
{
/* query, need reservation */
cp->did_reserve = GNUNET_NO;
/* reservation already done! */
pth->was_reserved = GNUNET_YES;
- ip = cp->inc_preference;
- cp->inc_preference = 0;
- cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
- &target,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_BANDWIDTH_VALUE_MAX,
- DBLOCK_SIZE,
- ip,
- &core_reserve_callback, cp);
+ cp->rc =
+ GNUNET_ATS_reserve_bandwidth (ats, &target, DBLOCK_SIZE,
+ &ats_reserve_callback, cp);
}
GNUNET_assert (pth->cth == NULL);
pth->cth_in_progress++;
- pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
- GNUNET_YES,
- pth->priority,
- GNUNET_TIME_absolute_get_remaining
- (pth->timeout), &target,
- pth->size,
- &peer_transmit_ready_cb, pth);
+ pth->cth =
+ GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES, pth->priority,
+ GNUNET_TIME_absolute_get_remaining
+ (pth->timeout), &target, pth->size,
+ &peer_transmit_ready_cb, pth);
GNUNET_assert (0 < pth->cth_in_progress--);
}
retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GSF_ConnectedPeer *cp = cls;
- uint64_t ip;
struct GNUNET_PeerIdentity target;
GNUNET_PEER_resolve (cp->ppd.pid, &target);
- cp->irc_delay_task = GNUNET_SCHEDULER_NO_TASK;
- ip = cp->inc_preference;
- cp->inc_preference = 0;
- cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
- &target,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_BANDWIDTH_VALUE_MAX,
- DBLOCK_SIZE,
- ip, &core_reserve_callback, cp);
+ cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ cp->rc =
+ GNUNET_ATS_reserve_bandwidth (ats, &target, DBLOCK_SIZE,
+ &ats_reserve_callback, cp);
}
*
* @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
* @param peer identifies the peer
- * @param bandwidth_out available amount of outbound bandwidth
* @param amount set to the amount that was actually reserved or unreserved;
* either the full requested amount or zero (no partial reservations)
* @param res_delay if the reservation could not be satisfied (amount was 0), how
* long should the client wait until re-trying?
- * @param preference current traffic preference for the given peer
*/
static void
-core_reserve_callback (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
- int32_t amount,
- struct GNUNET_TIME_Relative res_delay,
- uint64_t preference)
+ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
+ int32_t amount, struct GNUNET_TIME_Relative res_delay)
{
struct GSF_ConnectedPeer *cp = cls;
struct GSF_PeerTransmitHandle *pth;
- cp->irc = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Reserved %d bytes / need to wait %llu ms for reservation\n",
+ (int) amount, (unsigned long long) res_delay.rel_value);
+ cp->rc = NULL;
if (0 == amount)
{
- cp->irc_delay_task = GNUNET_SCHEDULER_add_delayed (res_delay,
- &retry_reservation, cp);
+ cp->rc_delay_task =
+ GNUNET_SCHEDULER_add_delayed (res_delay, &retry_reservation, cp);
return;
}
cp->did_reserve = GNUNET_YES;
{
/* reservation success, try transmission now! */
pth->cth_in_progress++;
- pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
- GNUNET_YES,
- pth->priority,
- GNUNET_TIME_absolute_get_remaining
- (pth->timeout), peer,
- pth->size,
- &peer_transmit_ready_cb, pth);
+ pth->cth =
+ GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES, pth->priority,
+ GNUNET_TIME_absolute_get_remaining
+ (pth->timeout), peer, pth->size,
+ &peer_transmit_ready_cb, pth);
GNUNET_assert (0 < pth->cth_in_progress--);
}
}
*
* @param peer identity of peer that connected
* @param atsi performance data for the connection
+ * @param atsi_count number of records in 'atsi'
* @return handle to connected peer entry
*/
struct GSF_ConnectedPeer *
GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_TRANSPORT_ATS_Information *atsi)
+ const struct GNUNET_ATS_Information *atsi,
+ unsigned int atsi_count)
{
struct GSF_ConnectedPeer *cp;
char *fn;
uint32_t trust;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to peer %s\n",
+ GNUNET_i2s (peer));
cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
cp->ppd.pid = GNUNET_PEER_intern (peer);
cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
- cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
- peer,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_BANDWIDTH_VALUE_MAX,
- DBLOCK_SIZE,
- 0, &core_reserve_callback, cp);
+ cp->rc =
+ GNUNET_ATS_reserve_bandwidth (ats, peer, DBLOCK_SIZE,
+ &ats_reserve_callback, cp);
fn = get_trust_filename (peer);
if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
(sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
GNUNET_free (fn);
cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
GNUNET_break (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (cp_map,
- &peer->hashPubKey,
+ GNUNET_CONTAINER_multihashmap_put (cp_map, &peer->hashPubKey,
cp,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- update_atsi (cp, atsi);
+ GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
+ GNUNET_CONTAINER_multihashmap_size (cp_map),
+ GNUNET_NO);
+ update_atsi (cp, atsi, atsi_count);
GSF_push_start_ (cp);
return cp;
}
if (0 != bt.rel_value)
{
/* still time left... */
- cp->mig_revive_task
- = GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
+ cp->mig_revive_task =
+ GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
return;
}
GSF_push_start_ (cp);
* for loopback messages where we are both sender and receiver)
* @param message the actual message
* @param atsi performance information
+ * @param atsi_count number of records in 'atsi'
* @return GNUNET_OK to keep the connection open,
* GNUNET_SYSERR to close it (signal serious error)
*/
GSF_handle_p2p_migration_stop_ (void *cls,
const struct GNUNET_PeerIdentity *other,
const struct GNUNET_MessageHeader *message,
- const struct GNUNET_TRANSPORT_ATS_Information
- *atsi)
+ const struct GNUNET_ATS_Information *atsi,
+ unsigned int atsi_count)
{
struct GSF_ConnectedPeer *cp;
const struct MigrationStopMessage *msm;
struct GNUNET_TIME_Relative bt;
msm = (const struct MigrationStopMessage *) message;
- cp = GNUNET_CONTAINER_multihashmap_get (cp_map, &other->hashPubKey);
+ cp = GSF_peer_get_ (other);
if (cp == NULL)
{
GNUNET_break (0);
return GNUNET_OK;
}
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# migration stop messages received"),
+ 1, GNUNET_NO);
bt = GNUNET_TIME_relative_ntoh (msm->duration);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_("Migration of content to peer `%s' blocked for %llu ms\n"),
if (cp->mig_revive_task == GNUNET_SCHEDULER_NO_TASK)
{
GSF_push_stop_ (cp);
- cp->mig_revive_task
- = GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
+ cp->mig_revive_task =
+ GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
}
- update_atsi (cp, atsi);
+ update_atsi (cp, atsi, atsi_count);
return GNUNET_OK;
}
else
{
size = 0;
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# replies dropped"), 1, GNUNET_NO);
+ GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies dropped"), 1,
+ GNUNET_NO);
}
GNUNET_free (pm);
return size;
}
+/**
+ * Free resources associated with the given peer request.
+ *
+ * @param peerreq request to free
+ * @param query associated key for the request
+ */
+static void
+free_pending_request (struct PeerRequest *peerreq,
+ const GNUNET_HashCode *query)
+{
+ struct GSF_ConnectedPeer *cp = peerreq->cp;
+
+ if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (peerreq->kill_task);
+ peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
+ -1, GNUNET_NO);
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
+ query, peerreq));
+ GNUNET_free (peerreq);
+}
+
+
/**
* Cancel all requests associated with the peer.
*
{
struct PeerRequest *peerreq = value;
struct GSF_PendingRequest *pr = peerreq->pr;
- struct GSF_ConnectedPeer *cp = peerreq->cp;
struct GSF_PendingRequestData *prd;
- if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (peerreq->kill_task);
- peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
- }
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# P2P searches active"),
- -1, GNUNET_NO);
prd = GSF_pending_request_get_data_ (pr);
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
- &prd->query, peerreq));
GSF_pending_request_cancel_ (pr, GNUNET_NO);
- GNUNET_free (peerreq);
+ free_pending_request (peerreq, &prd->query);
return GNUNET_OK;
}
GNUNET_free (dh);
return;
}
- (void) GSF_peer_transmit_ (cp, GNUNET_NO,
- UINT32_MAX,
- REPLY_TIMEOUT, dh->msize, ©_reply, dh->pm);
+ (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
+ dh->msize, ©_reply, dh->pm);
GNUNET_free (dh);
}
/**
* Get the randomized delay a response should be subjected to.
- *
+ *
* @return desired delay
*/
static struct GNUNET_TIME_Relative
{
struct GNUNET_TIME_Relative ret;
- /* FIXME: replace 5000 with something relating to current observed P2P message latency */
- ret = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
- GNUNET_CRYPTO_random_u32
- (GNUNET_CRYPTO_QUALITY_WEAK, 5000));
+ ret =
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ GNUNET_CRYPTO_random_u32
+ (GNUNET_CRYPTO_QUALITY_WEAK,
+ 2 * GSF_avg_latency.rel_value + 1));
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# artificial delays introduced (ms)"),
* Handle a reply to a pending request. Also called if a request
* expires (then with data == NULL). The handler may be called
* many times (depending on the request type), but will not be
- * called during or after a call to GSF_pending_request_cancel
+ * called during or after a call to GSF_pending_request_cancel
* and will also not be called anymore after a call signalling
* expiration.
*
* @param pr handle to the original pending request
* @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
* @param expiration when does 'data' expire?
+ * @param last_transmission when did we last transmit a request for this block
* @param type type of the block
* @param data response data, NULL on request expiration
* @param data_len number of bytes in data
*/
static void
-handle_p2p_reply (void *cls,
- enum GNUNET_BLOCK_EvaluationResult eval,
- struct GSF_PendingRequest *pr,
- uint32_t reply_anonymity_level,
+handle_p2p_reply (void *cls, enum GNUNET_BLOCK_EvaluationResult eval,
+ struct GSF_PendingRequest *pr, uint32_t reply_anonymity_level,
struct GNUNET_TIME_Absolute expiration,
- enum GNUNET_BLOCK_Type type,
- const void *data, size_t data_len)
+ struct GNUNET_TIME_Absolute last_transmission,
+ enum GNUNET_BLOCK_Type type, const void *data,
+ size_t data_len)
{
struct PeerRequest *peerreq = cls;
struct GSF_ConnectedPeer *cp = peerreq->cp;
prd = GSF_pending_request_get_data_ (pr);
if (NULL == data)
{
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# P2P searches active"),
- -1, GNUNET_NO);
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
- &prd->query, peerreq));
- GNUNET_free (peerreq);
+ free_pending_request (peerreq, &prd->query);
return;
}
GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY);
if ((prd->type != type) && (prd->type != GNUNET_BLOCK_TYPE_ANY))
{
- GNUNET_break (0);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop
+ ("# replies dropped due to type mismatch"),
+ 1, GNUNET_NO);
return;
}
-#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmitting result for query `%s' to peer\n",
GNUNET_h2s (&prd->query));
-#endif
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies received for other peers"),
1, GNUNET_NO);
pm->type = htonl (type);
pm->expiration = GNUNET_TIME_absolute_hton (expiration);
memcpy (&pm[1], data, data_len);
- if ((reply_anonymity_level != UINT32_MAX) &&
- (reply_anonymity_level != 0) &&
+ if ((reply_anonymity_level != UINT32_MAX) && (reply_anonymity_level != 0) &&
(GSF_enable_randomized_delays == GNUNET_YES))
{
struct GSF_DelayedHandle *dh;
dh->pm = pm;
dh->msize = msize;
GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
- dh->delay_task = GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
- &transmit_delayed_now, dh);
+ dh->delay_task =
+ GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
+ &transmit_delayed_now, dh);
}
else
{
- (void) GSF_peer_transmit_ (cp, GNUNET_NO,
- UINT32_MAX,
- REPLY_TIMEOUT, msize, ©_reply, pm);
+ (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
+ ©_reply, pm);
}
if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
return;
/**
* We've received a request with the specified priority. Bound it
* according to how much we trust the given peer.
- *
+ *
* @param prio_in requested priority
* @param cp the peer making the request
* @return effective priority
GSF_cover_query_count++;
bm = ntohl (gm->hash_bitmap);
bits = 0;
- cps = GNUNET_CONTAINER_multihashmap_get (cp_map, &other->hashPubKey);
+ cps = GSF_peer_get_ (other);
if (NULL == cps)
{
/* peer must have just disconnected */
return NULL;
}
if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
- cp = GNUNET_CONTAINER_multihashmap_get (cp_map, &opt[bits++]);
+ cp = GSF_peer_get_ ((const struct GNUNET_PeerIdentity *) &opt[bits++]);
else
cp = cps;
if (cp == NULL)
{
-#if DEBUG_FS
if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Failed to find peer `%4s' in connection set. Dropping query.\n",
GNUNET_i2s (other));
-#endif
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# requests dropped due to missing reverse route"),
priority = bound_priority (ntohl (gm->priority), cps);
if (priority < 0)
{
-#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Dropping query from `%s', this peer is too busy.\n",
GNUNET_i2s (other));
-#endif
return NULL;
}
-#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received request for `%s' of type %u from peer `%4s' with flags %u\n",
- GNUNET_h2s (&gm->query),
- (unsigned int) type, GNUNET_i2s (other), (unsigned int) bm);
-#endif
+ GNUNET_h2s (&gm->query), (unsigned int) type, GNUNET_i2s (other),
+ (unsigned int) bm);
namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
if ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) && (namespace == NULL))
{
(0 !=
(bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity
*) &opt[bits++]) : NULL;
- options = 0;
+ options = GSF_PRO_DEFAULTS;
spid = 0;
if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority))
|| (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
}
ttl = bound_ttl (ntohl (gm->ttl), priority);
/* decrement ttl (always) */
- ttl_decrement = 2 * TTL_DECREMENT +
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, TTL_DECREMENT);
+ ttl_decrement =
+ 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ TTL_DECREMENT);
if ((ttl < 0) && (((int32_t) (ttl - ttl_decrement)) > 0))
{
-#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Dropping query from `%s' due to TTL underflow (%d - %u).\n",
GNUNET_i2s (other), ttl, ttl_decrement);
-#endif
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# requests dropped due TTL underflow"), 1,
{
/* existing request has higher TTL, drop new one! */
prd->priority += priority;
-#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Have existing request with higher TTL, dropping new request.\n",
GNUNET_i2s (other));
-#endif
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# requests dropped due to higher-TTL request"),
return NULL;
}
/* existing request has lower TTL, drop old one! */
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# P2P searches active"),
- -1, GNUNET_NO);
priority += prd->priority;
GSF_pending_request_cancel_ (pr, GNUNET_YES);
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
- &gm->query,
- peerreq));
- if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (peerreq->kill_task);
- peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
- }
- GNUNET_free (peerreq);
+ free_pending_request (peerreq, &gm->query);
}
}
peerreq = GNUNET_malloc (sizeof (struct PeerRequest));
peerreq->cp = cp;
- pr = GSF_pending_request_create_ (options,
- type,
- &gm->query,
- namespace,
+ pr = GSF_pending_request_create_ (options, type, &gm->query, namespace,
target,
(bfsize >
0) ? (const char *) &opt[bits] : NULL,
bfsize, ntohl (gm->filter_mutator),
1 /* anonymity */ ,
- (uint32_t) priority, ttl, spid, NULL, 0, /* replies_seen */
+ (uint32_t) priority, ttl, spid, GNUNET_PEER_intern (other), NULL, 0, /* replies_seen */
&handle_p2p_reply, peerreq);
GNUNET_assert (NULL != pr);
peerreq->pr = pr;
GNUNET_break (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (cp->request_map,
- &gm->query,
+ GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query,
peerreq,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
GNUNET_STATISTICS_update (GSF_stats,
* Function called if there has been a timeout trying to satisfy
* a transmission request.
*
- * @param cls the 'struct GSF_PeerTransmitHandle' of the request
+ * @param cls the 'struct GSF_PeerTransmitHandle' of the request
* @param tc scheduler context
*/
static void
struct GSF_PeerTransmitHandle *pth = cls;
struct GSF_ConnectedPeer *cp;
-#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Timeout trying to transmit to other peer\n");
-#endif
pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
cp = pth->cp;
GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
* @return handle to cancel request
*/
struct GSF_PeerTransmitHandle *
-GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
- int is_query,
- uint32_t priority,
- struct GNUNET_TIME_Relative timeout,
+GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, int is_query,
+ uint32_t priority, struct GNUNET_TIME_Relative timeout,
size_t size, GSF_GetMessageCallback gmc, void *gmc_cls)
{
struct GSF_PeerTransmitHandle *pth;
cp->ppd.pending_queries++;
else if (GNUNET_NO == is_query)
cp->ppd.pending_replies++;
- pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
- &peer_transmit_timeout,
- pth);
+ pth->timeout_task =
+ GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
schedule_transmission (pth);
return pth;
}
}
-/**
- * Method called whenever a given peer has a status change.
- *
- * @param cls closure
- * @param peer peer identity this notification is about
- * @param bandwidth_in available amount of inbound bandwidth
- * @param bandwidth_out available amount of outbound bandwidth
- * @param timeout absolute time when this peer will time out
- * unless we see some further activity from it
- * @param atsi status information
- */
-void
-GSF_peer_status_handler_ (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
- struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
- struct GNUNET_TIME_Absolute timeout,
- const struct GNUNET_TRANSPORT_ATS_Information *atsi)
-{
- struct GSF_ConnectedPeer *cp;
-
- cp = GNUNET_CONTAINER_multihashmap_get (cp_map, &peer->hashPubKey);
- GNUNET_assert (NULL != cp);
- update_atsi (cp, atsi);
-}
-
-
/**
* A peer disconnected from us. Tear down the connected peer
* record.
struct GSF_PeerTransmitHandle *pth;
struct GSF_DelayedHandle *dh;
- cp = GNUNET_CONTAINER_multihashmap_get (cp_map, &peer->hashPubKey);
+ cp = GSF_peer_get_ (peer);
if (NULL == cp)
return; /* must have been disconnect from core with
* 'peer' == my_id, ignore */
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (cp_map,
&peer->hashPubKey, cp));
+ GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
+ GNUNET_CONTAINER_multihashmap_size (cp_map),
+ GNUNET_NO);
if (NULL != cp->migration_pth)
{
GSF_peer_transmit_cancel_ (cp->migration_pth);
cp->migration_pth = NULL;
}
- if (NULL != cp->irc)
+ if (NULL != cp->rc)
{
- GNUNET_CORE_peer_change_preference_cancel (cp->irc);
- cp->irc = NULL;
+ GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
+ cp->rc = NULL;
}
- if (GNUNET_SCHEDULER_NO_TASK != cp->irc_delay_task)
+ if (GNUNET_SCHEDULER_NO_TASK != cp->rc_delay_task)
{
- GNUNET_SCHEDULER_cancel (cp->irc_delay_task);
- cp->irc_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
+ cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
}
GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
&cancel_pending_request, cp);
GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
(cp->last_migration_block));
memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# migration stop messages sent"),
+ 1, GNUNET_NO);
return sizeof (struct MigrationStopMessage);
}
/**
* Ask a peer to stop migrating data to us until the given point
* in time.
- *
+ *
* @param cp peer to ask
* @param block_time until when to block
*/
void
GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
- struct GNUNET_TIME_Relative block_time)
+ struct GNUNET_TIME_Absolute block_time)
{
- if (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block).rel_value >
- block_time.rel_value)
+ if (cp->last_migration_block.abs_value > block_time.abs_value)
{
-#if DEBUG_FS && 0
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Migration already blocked for another %llu ms\n",
(unsigned long long)
GNUNET_TIME_absolute_get_remaining
(cp->last_migration_block).rel_value);
-#endif
return; /* already blocked */
}
-#if DEBUG_FS && 0
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Asking to stop migration for %llu ms\n",
- (unsigned long long) block_time.rel_value);
-#endif
- cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %llu ms\n",
+ (unsigned long long) GNUNET_TIME_absolute_get_remaining (block_time).rel_value);
+ cp->last_migration_block = block_time;
if (cp->migration_pth != NULL)
GSF_peer_transmit_cancel_ (cp->migration_pth);
- cp->migration_pth
- = GSF_peer_transmit_ (cp,
- GNUNET_SYSERR,
- UINT32_MAX,
- GNUNET_TIME_UNIT_FOREVER_REL,
- sizeof (struct MigrationStopMessage),
- &create_migration_stop_message, cp);
+ cp->migration_pth =
+ GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ sizeof (struct MigrationStopMessage),
+ &create_migration_stop_message, cp);
}
else
{
trust = htonl (cp->ppd.trust);
- if (sizeof (uint32_t) == GNUNET_DISK_fn_write (fn, &trust,
- sizeof (uint32_t),
- GNUNET_DISK_PERM_USER_READ |
- GNUNET_DISK_PERM_USER_WRITE |
- GNUNET_DISK_PERM_GROUP_READ |
- GNUNET_DISK_PERM_OTHER_READ))
+ if (sizeof (uint32_t) ==
+ GNUNET_DISK_fn_write (fn, &trust, sizeof (uint32_t),
+ GNUNET_DISK_PERM_USER_READ |
+ GNUNET_DISK_PERM_USER_WRITE |
+ GNUNET_DISK_PERM_GROUP_READ |
+ GNUNET_DISK_PERM_OTHER_READ))
cp->disk_trust = cp->ppd.trust;
}
GNUNET_free (fn);
return;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
- GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
+ GNUNET_SCHEDULER_add_delayed_with_priority (TRUST_FLUSH_FREQ,
+ GNUNET_SCHEDULER_PRIORITY_HIGH,
+ &cron_flush_trust, NULL);
}
GSF_connected_peer_init_ ()
{
cp_map = GNUNET_CONTAINER_multihashmap_create (128);
+ ats = GNUNET_ATS_performance_init (GSF_cfg, NULL, NULL);
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONFIGURATION_get_value_filename (GSF_cfg,
- "fs",
+ GNUNET_CONFIGURATION_get_value_filename (GSF_cfg, "fs",
"TRUST",
&trustDirectory));
GNUNET_DISK_directory_create (trustDirectory);
cp_map = NULL;
GNUNET_free (trustDirectory);
trustDirectory = NULL;
+ GNUNET_ATS_performance_done (ats);
+ ats = NULL;
}
{
if (NULL == cp_map)
return; /* already cleaned up */
- GNUNET_CONTAINER_multihashmap_iterate (cp_map,
- &clean_local_client, (void *) lc);
+ GNUNET_CONTAINER_multihashmap_iterate (cp_map, &clean_local_client,
+ (void *) lc);
}