* @brief gnunet anonymity protocol implementation
* @author Christian Grothoff
*
- * FIXME:
- * - TTL/priority calculations are absent!
* TODO:
- * - have non-zero preference / priority for requests we initiate!
- * - track stats for hot-path routing
- * - implement hot-path routing decision procedure
- * - implement: bound_priority, test_load_too_high, validate_nblock
- * - add content migration support (store locally) [or create new service]
- * - statistics
+ * - trust not properly received and pushed back to peerinfo!
+ * - bound_priority by priorities used by other peers
+ * - have a way to drop queries based on load
+ * - introduce random latency in processing
+ * - consider more precise latency estimation (per-peer & request)
+ * - better algorithm for priority selection for requests we initiate?
+ * - tell other peers to stop migration if our PUTs fail (or if
+ * we don't support migration per configuration?)
+ * - more statistics
*/
#include "platform.h"
#include <float.h>
#include "gnunet-service-fs_indexing.h"
#include "fs.h"
-#define DEBUG_FS GNUNET_YES
+#define DEBUG_FS GNUNET_NO
/**
* Maximum number of outgoing messages we queue per peer.
- * FIXME: make configurable?
*/
#define MAX_QUEUE_PER_PEER 16
/**
* Increase in traffic preference still to be submitted
- * to the core service for this peer. FIXME: double or 'uint64_t'?
+ * to the core service for this peer.
*/
- double inc_preference;
+ uint64_t inc_preference;
+
+ /**
+ * Trust delta to still commit to the system.
+ */
+ uint32_t trust_delta;
/**
* The peer's identity.
};
+/**
+ * Block that is ready for migration to other peers. Actual data is at the end of the block.
+ */
+struct MigrationReadyBlock
+{
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct MigrationReadyBlock *next;
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct MigrationReadyBlock *prev;
+
+ /**
+ * Query for the block.
+ */
+ GNUNET_HashCode query;
+
+ /**
+ * When does this block expire?
+ */
+ struct GNUNET_TIME_Absolute expiration;
+
+ /**
+ * Peers we would consider forwarding this
+ * block to. Zero for empty entries.
+ */
+ GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
+
+ /**
+ * Size of the block.
+ */
+ size_t size;
+
+ /**
+ * Number of targets already used.
+ */
+ unsigned int used_targets;
+
+ /**
+ * Type of the block.
+ */
+ enum GNUNET_BLOCK_Type type;
+};
+
+
/**
* Our scheduler.
*/
*/
static struct GNUNET_CORE_Handle *core;
+/**
+ * Head of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_head;
+
+/**
+ * Tail of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_tail;
+
+/**
+ * Request to datastore for migration (or NULL).
+ */
+static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
+
+/**
+ * ID of task that collects blocks for migration.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier mig_task;
+
+/**
+ * What is the maximum frequency at which we are allowed to
+ * poll the datastore for migration content?
+ */
+static struct GNUNET_TIME_Relative min_migration_delay;
+
+/**
+ * Size of the doubly-linked list of migration blocks.
+ */
+static unsigned int mig_size;
+
/**
* Are we allowed to migrate content to this peer.
*/
static int active_migration;
+
+/**
+ * Transmit messages by copying it to the target buffer
+ * "buf". "buf" will be NULL and "size" zero if the socket was closed
+ * for writing in the meantime. In that case, do nothing
+ * (the disconnect or shutdown handler will take care of the rest).
+ * If we were able to transmit messages and there are still more
+ * pending, ask core again for further calls to this function.
+ *
+ * @param cls closure, pointer to the 'struct ConnectedPeer*'
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_to_peer (void *cls,
+ size_t size, void *buf);
+
+
/* ******************* clean up functions ************************ */
+/**
+ * Delete the given migration block.
+ *
+ * @param mb block to delete
+ */
+static void
+delete_migration_block (struct MigrationReadyBlock *mb)
+{
+ GNUNET_CONTAINER_DLL_remove (mig_head,
+ mig_tail,
+ mb);
+ GNUNET_PEER_decrement_rcs (mb->target_list,
+ MIGRATION_LIST_SIZE);
+ mig_size--;
+ GNUNET_free (mb);
+}
+
+
+/**
+ * Compare the distance of two peers to a key.
+ *
+ * @param key key
+ * @param p1 first peer
+ * @param p2 second peer
+ * @return GNUNET_YES if P1 is closer to key than P2
+ */
+static int
+is_closer (const GNUNET_HashCode *key,
+ const struct GNUNET_PeerIdentity *p1,
+ const struct GNUNET_PeerIdentity *p2)
+{
+ return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
+ &p2->hashPubKey,
+ key);
+}
+
+
+/**
+ * Consider migrating content to a given peer.
+ *
+ * @param cls 'struct MigrationReadyBlock*' to select
+ * targets for (or NULL for none)
+ * @param key ID of the peer
+ * @param value 'struct ConnectedPeer' of the peer
+ * @return GNUNET_YES (always continue iteration)
+ */
+static int
+consider_migration (void *cls,
+ const GNUNET_HashCode *key,
+ void *value)
+{
+ struct MigrationReadyBlock *mb = cls;
+ struct ConnectedPeer *cp = value;
+ struct MigrationReadyBlock *pos;
+ struct GNUNET_PeerIdentity cppid;
+ struct GNUNET_PeerIdentity otherpid;
+ struct GNUNET_PeerIdentity worstpid;
+ size_t msize;
+ unsigned int i;
+ unsigned int repl;
+
+ /* consider 'cp' as a migration target for mb */
+ if (mb != NULL)
+ {
+ GNUNET_PEER_resolve (cp->pid,
+ &cppid);
+ repl = MIGRATION_LIST_SIZE;
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if (mb->target_list[i] == 0)
+ {
+ mb->target_list[i] = cp->pid;
+ GNUNET_PEER_change_rc (mb->target_list[i], 1);
+ repl = MIGRATION_LIST_SIZE;
+ break;
+ }
+ GNUNET_PEER_resolve (mb->target_list[i],
+ &otherpid);
+ if ( (repl == MIGRATION_LIST_SIZE) &&
+ is_closer (&mb->query,
+ &cppid,
+ &otherpid))
+ {
+ repl = i;
+ worstpid = otherpid;
+ }
+ else if ( (repl != MIGRATION_LIST_SIZE) &&
+ (is_closer (&mb->query,
+ &worstpid,
+ &otherpid) ) )
+ {
+ repl = i;
+ worstpid = otherpid;
+ }
+ }
+ if (repl != MIGRATION_LIST_SIZE)
+ {
+ GNUNET_PEER_change_rc (mb->target_list[repl], -1);
+ mb->target_list[repl] = cp->pid;
+ GNUNET_PEER_change_rc (mb->target_list[repl], 1);
+ }
+ }
+
+ /* consider scheduling transmission to cp for content migration */
+ if (cp->cth != NULL)
+ return GNUNET_YES;
+ msize = 0;
+ pos = mig_head;
+ while (pos != NULL)
+ {
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if (cp->pid == pos->target_list[i])
+ {
+ if (msize == 0)
+ msize = pos->size;
+ else
+ msize = GNUNET_MIN (msize,
+ pos->size);
+ break;
+ }
+ }
+ pos = pos->next;
+ }
+ if (msize == 0)
+ return GNUNET_YES; /* no content available */
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Trying to migrate at least %u bytes to peer `%s'\n",
+ msize,
+ GNUNET_h2s (key));
+#endif
+ cp->cth
+ = GNUNET_CORE_notify_transmit_ready (core,
+ 0, GNUNET_TIME_UNIT_FOREVER_REL,
+ (const struct GNUNET_PeerIdentity*) key,
+ msize + sizeof (struct PutMessage),
+ &transmit_to_peer,
+ cp);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ *
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * If the migration task is not currently running, consider
+ * (re)scheduling it with the appropriate delay.
+ */
+static void
+consider_migration_gathering ()
+{
+ struct GNUNET_TIME_Relative delay;
+
+ if (mig_qe != NULL)
+ return;
+ if (mig_task != GNUNET_SCHEDULER_NO_TASK)
+ return;
+ delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ mig_size);
+ delay = GNUNET_TIME_relative_divide (delay,
+ MAX_MIGRATION_QUEUE);
+ delay = GNUNET_TIME_relative_max (delay,
+ min_migration_delay);
+ mig_task = GNUNET_SCHEDULER_add_delayed (sched,
+ delay,
+ &gather_migration_blocks,
+ NULL);
+}
+
+
+/**
+ * Process content offered for migration.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ */
+static void
+process_migration_content (void *cls,
+ const GNUNET_HashCode * key,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration, uint64_t uid)
+{
+ struct MigrationReadyBlock *mb;
+
+ if (key == NULL)
+ {
+ mig_qe = NULL;
+ if (mig_size < MAX_MIGRATION_QUEUE)
+ consider_migration_gathering ();
+ return;
+ }
+ if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+ {
+ if (GNUNET_OK !=
+ GNUNET_FS_handle_on_demand_block (key, size, data,
+ type, priority, anonymity,
+ expiration, uid,
+ &process_migration_content,
+ NULL))
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ return;
+ }
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Retrieved block `%s' of type %u for migration\n",
+ GNUNET_h2s (key),
+ type);
+#endif
+ mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
+ mb->query = *key;
+ mb->expiration = expiration;
+ mb->size = size;
+ mb->type = type;
+ memcpy (&mb[1], data, size);
+ GNUNET_CONTAINER_DLL_insert_after (mig_head,
+ mig_tail,
+ mig_tail,
+ mb);
+ mig_size++;
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &consider_migration,
+ mb);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+}
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ *
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ mig_task = GNUNET_SCHEDULER_NO_TASK;
+ mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_migration_content, NULL);
+ GNUNET_assert (mig_qe != NULL);
+}
+
+
/**
* We're done with a particular message list entry.
* Free all associated resources.
uint32_t distance)
{
struct ConnectedPeer *cp;
-
+ struct MigrationReadyBlock *pos;
+
cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
cp->pid = GNUNET_PEER_intern (peer);
GNUNET_break (GNUNET_OK ==
&peer->hashPubKey,
cp,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+
+ pos = mig_head;
+ while (NULL != pos)
+ {
+ (void) consider_migration (pos, &peer->hashPubKey, cp);
+ pos = pos->next;
+ }
}
+
/**
* Free (each) request made by the peer.
*
struct ConnectedPeer *cp;
struct PendingMessage *pm;
unsigned int i;
+ struct MigrationReadyBlock *pos;
+ struct MigrationReadyBlock *next;
GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
&peer->hashPubKey,
GNUNET_CONTAINER_multihashmap_remove (connected_peers,
&peer->hashPubKey,
cp));
+ /* remove this peer from migration considerations; schedule
+ alternatives */
+ next = mig_head;
+ while (NULL != (pos = next))
+ {
+ next = pos->next;
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if (pos->target_list[i] == cp->pid)
+ {
+ GNUNET_PEER_change_rc (pos->target_list[i], -1);
+ pos->target_list[i] = 0;
+ }
+ }
+ if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
+ {
+ delete_migration_block (pos);
+ consider_migration_gathering ();
+ continue;
+ }
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &consider_migration,
+ pos);
+ }
+ if (cp->trust_delta > 0)
+ {
+ /* FIXME: push trust back to peerinfo!
+ (need better peerinfo API!) */
+ }
GNUNET_PEER_change_rc (cp->pid, -1);
GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
if (NULL != cp->cth)
shutdown_task (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ if (mig_qe != NULL)
+ {
+ GNUNET_DATASTORE_cancel (mig_qe);
+ mig_qe = NULL;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != mig_task)
+ {
+ GNUNET_SCHEDULER_cancel (sched, mig_task);
+ mig_task = GNUNET_SCHEDULER_NO_TASK;
+ }
while (client_list != NULL)
handle_client_disconnect (NULL,
client_list->client);
}
GNUNET_DATASTORE_disconnect (dsh,
GNUNET_NO);
+ while (mig_head != NULL)
+ delete_migration_block (mig_head);
+ GNUNET_assert (0 == mig_size);
dsh = NULL;
sched = NULL;
cfg = NULL;
/**
- * Transmit the given message by copying it to the target buffer
+ * Transmit messages by copying it to the target buffer
* "buf". "buf" will be NULL and "size" zero if the socket was closed
* for writing in the meantime. In that case, do nothing
* (the disconnect or shutdown handler will take care of the rest).
char *cbuf = buf;
struct GNUNET_PeerIdentity pid;
struct PendingMessage *pm;
+ struct MigrationReadyBlock *mb;
+ struct MigrationReadyBlock *next;
+ struct PutMessage migm;
size_t msize;
+ unsigned int i;
cp->cth = NULL;
if (NULL == buf)
&transmit_to_peer,
cp);
}
+ else
+ {
+ next = mig_head;
+ while (NULL != (mb = next))
+ {
+ next = mb->next;
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if ( (cp->pid == mb->target_list[i]) &&
+ (mb->size + sizeof (migm) <= size) )
+ {
+ GNUNET_PEER_change_rc (mb->target_list[i], -1);
+ mb->target_list[i] = 0;
+ mb->used_targets++;
+ migm.header.size = htons (sizeof (migm) + mb->size);
+ migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+ migm.type = htonl (mb->type);
+ migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
+ memcpy (&cbuf[msize], &migm, sizeof (migm));
+ msize += sizeof (migm);
+ size -= sizeof (migm);
+ memcpy (&cbuf[msize], &mb[1], mb->size);
+ msize += mb->size;
+ size -= mb->size;
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Pushing migration block `%s' (%u bytes) to `%s'\n",
+ GNUNET_h2s (&mb->query),
+ mb->size,
+ GNUNET_i2s (&pid));
+#endif
+ break;
+ }
+ else
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
+ GNUNET_h2s (&mb->query),
+ mb->size,
+ GNUNET_i2s (&pid));
+#endif
+ }
+ }
+ if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
+ (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
+ {
+ delete_migration_block (mb);
+ consider_migration_gathering ();
+ }
+ }
+ consider_migration (NULL,
+ &pid.hashPubKey,
+ cp);
+ }
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting %u bytes to peer %u.\n",
+ "Transmitting %u bytes to peer %u\n",
msize,
cp->pid);
#endif
cp->pending_requests++;
if (cp->pending_requests > MAX_QUEUE_PER_PEER)
destroy_pending_message (cp->pending_messages_tail, 0);
- if (cp->cth == NULL)
- {
- /* need to schedule transmission */
- GNUNET_PEER_resolve (cp->pid, &pid);
- cp->cth = GNUNET_CORE_notify_transmit_ready (core,
- cp->pending_messages_head->priority,
- MAX_TRANSMIT_DELAY,
- &pid,
- cp->pending_messages_head->msize,
- &transmit_to_peer,
- cp);
- }
+ GNUNET_PEER_resolve (cp->pid, &pid);
+ if (NULL != cp->cth)
+ GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+ /* need to schedule transmission */
+ cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+ cp->pending_messages_head->priority,
+ MAX_TRANSMIT_DELAY,
+ &pid,
+ cp->pending_messages_head->msize,
+ &transmit_to_peer,
+ cp);
if (cp->cth == NULL)
{
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Failed to schedule transmission with core!\n");
#endif
- /* FIXME: call stats (rare, bad case) */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# CORE transmission failures"),
+ 1,
+ GNUNET_NO);
}
}
return;
}
no_route = GNUNET_NO;
- /* FIXME: check against DBLOCK_SIZE and possibly return
- amount to reserve; however, this also needs to work
- with testcases which currently start out with a far
- too low per-peer bw limit, so they would never send
- anything. Big issue. */
if (amount == 0)
{
if (pr->cp == NULL)
/* 1) check that this peer is not the initiator */
if (cp == pr->cp)
- return GNUNET_YES; /* skip */
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Skipping initiator in forwarding selection\n");
+#endif
+ return GNUNET_YES; /* skip */
+ }
/* 2) check if we have already (recently) forwarded to this peer */
pc = 0;
"Re-trying query that was previously transmitted %u times to this peer\n",
(unsigned int) pc);
#endif
- // 3) calculate how much we'd like to forward to this peer
- score = 42; // FIXME!
- // FIXME: also need API to gather data on responsiveness
- // of this peer (we have fields for that in 'cp', but
- // they are never set!)
-
+ /* 3) calculate how much we'd like to forward to this peer,
+ starting with a random value that is strong enough
+ to at least give any peer a chance sometimes
+ (compared to the other factors that come later) */
+ /* 3a) count successful (recent) routes from cp for same source */
+ if (pr->cp != NULL)
+ {
+ score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ P2P_SUCCESS_LIST_SIZE);
+ for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
+ if (cp->last_p2p_replies[i] == pr->cp->pid)
+ score += 1; /* likely successful based on hot path */
+ }
+ else
+ {
+ score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ CS2P_SUCCESS_LIST_SIZE);
+ for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
+ if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
+ score += 1; /* likely successful based on hot path */
+ }
+ /* 3b) include latency */
+ if (cp->avg_delay.value < 4 * TTL_DECREMENT)
+ score += 1; /* likely fast based on latency */
+ /* 3c) include priorities */
+ if (cp->avg_priority <= pr->remaining_priority / 2.0)
+ score += 1; /* likely successful based on priorities */
+ /* 3d) penalize for queue size */
+ score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER);
+ /* 3e) include peer proximity */
+ score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
+ &pr->query)) / (double) UINT32_MAX);
/* store best-fit in closure */
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer `%s' gets score %f for forwarding query, max is %f\n",
+ GNUNET_h2s (key),
+ score,
+ psc->target_score);
+#endif
+ score++; /* avoid zero */
if (score > psc->target_score)
{
psc->target_score = score;
/**
- * We're processing a GET request from another peer and have decided
+ * We're processing a GET request and have decided
* to forward it to other peers. This function is called periodically
* and should forward the request to other peers until we have all
* possible replies. If we have transmitted the *only* reply to
return; /* configured to not do P2P search */
/* (1) select target */
psc.pr = pr;
- psc.target_score = DBL_MIN;
+ psc.target_score = -DBL_MAX;
GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
&target_peer_select_cb,
&psc);
- if (psc.target_score == DBL_MIN)
+ if (psc.target_score == -DBL_MAX)
{
delay = get_processing_delay ();
#if DEBUG_FS
return; /* nobody selected */
}
/* (3) update TTL/priority */
-
if (pr->client_request_list != NULL)
{
/* FIXME: use better algorithm!? */
pr->ttl);
#endif
}
- else
- {
- /* FIXME: should we do something here as well!? */
- }
/* (3) reserve reply bandwidth */
cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
&psc.target,
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
- GNUNET_BANDWIDTH_value_init ((uint32_t) -1 /* no limit */),
+ GNUNET_BANDWIDTH_value_init (UINT32_MAX),
DBLOCK_SIZE * 2,
- (uint64_t) cp->inc_preference,
+ cp->inc_preference,
&target_reservation_cb,
pr);
- cp->inc_preference = 0.0;
+ cp->inc_preference = 0;
}
*/
const void *data;
- // FIXME: add 'struct ConnectedPeer' to track 'last_xxx_replies' here!
+ /**
+ * Who gave us this reply? NULL for local host.
+ */
+ struct ConnectedPeer *sender;
/**
* When the reply expires.
* How much was this reply worth to us?
*/
uint32_t priority;
+
+ /**
+ * Did we finish processing the associated request?
+ */
+ int finished;
};
struct ClientList *cl;
struct PutMessage *pm;
struct ConnectedPeer *cp;
+ struct GNUNET_TIME_Relative cur_delay;
GNUNET_HashCode chash;
GNUNET_HashCode mhash;
size_t msize;
gettext_noop ("# replies received and matched"),
1,
GNUNET_NO);
+ if (prq->sender != NULL)
+ {
+ /* FIXME: should we be more precise here and not use
+ "start_time" but a peer-specific time stamp? */
+ cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
+ prq->sender->avg_delay.value
+ = (prq->sender->avg_delay.value *
+ (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N;
+ prq->sender->avg_priority
+ = (prq->sender->avg_priority *
+ (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+ if (pr->cp != NULL)
+ {
+ GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
+ [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
+ -1);
+ GNUNET_PEER_change_rc (pr->cp->pid, 1);
+ prq->sender->last_p2p_replies
+ [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
+ = pr->cp->pid;
+ }
+ else
+ {
+ if (NULL != prq->sender->last_client_replies
+ [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
+ GNUNET_SERVER_client_drop (prq->sender->last_client_replies
+ [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
+ prq->sender->last_client_replies
+ [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
+ = pr->client_request_list->client_list->client;
+ GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
+ }
+ }
GNUNET_CRYPTO_hash (prq->data,
prq->size,
&chash);
}
prq->priority += pr->remaining_priority;
pr->remaining_priority = 0;
- if (pr->client_request_list != NULL)
+ if (NULL != pr->client_request_list)
{
GNUNET_STATISTICS_update (stats,
gettext_noop ("# replies received for local clients"),
}
GNUNET_break (cl->th != NULL);
if (pr->do_remove)
- destroy_pending_request (pr);
+ {
+ prq->finished = GNUNET_YES;
+ destroy_pending_request (pr);
+ }
}
else
{
reply->cont = &transmit_reply_continuation;
reply->cont_cls = pr;
reply->msize = msize;
- reply->priority = (uint32_t) -1; /* send replies first! */
+ reply->priority = UINT32_MAX; /* send replies first! */
pm = (struct PutMessage*) &reply[1];
pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
pm->header.size = htons (msize);
memcpy (&pm[1], prq->data, prq->size);
add_to_pending_messages_for_peer (cp, reply, pr);
}
- // FIXME: implement hot-path routing statistics keeping!
return GNUNET_YES;
}
-
/**
* Continuation called to notify client about result of the
* operation.
GNUNET_NO);
/* now, lookup 'query' */
prq.data = (const void*) &put[1];
+ if (other != NULL)
+ prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &other->hashPubKey);
+ else
+ prq.sender = NULL;
prq.size = dsize;
prq.type = type;
prq.expiration = expiration;
prq.priority = 0;
+ prq.finished = GNUNET_NO;
GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
&query,
&process_reply,
&prq);
+ if (prq.sender != NULL)
+ {
+ prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
+ prq.sender->trust_delta += prq.priority;
+ }
if (GNUNET_YES == active_migration)
{
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Replicating result for query `%s' with priority %u\n",
+ GNUNET_h2s (&query),
+ prq.priority);
+#endif
GNUNET_DATASTORE_put (dsh,
0, &query, dsize, &put[1],
type, prq.priority, 1 /* anonymity */,
expiration,
- 0, 64 /* FIXME: use define */,
+ 1 + prq.priority, MAX_DATASTORE_QUEUE,
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
&put_migration_continuation,
NULL);
&query))
{
GNUNET_break (0);
- /* FIXME: consider removing the block? */
+ GNUNET_DATASTORE_remove (dsh,
+ key,
+ size, data,
+ -1, -1,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ NULL, NULL);
GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
return;
}
prq.type = type;
prq.priority = priority;
+ prq.finished = GNUNET_NO;
process_reply (&prq, key, pr);
-
+ if (prq.finished == GNUNET_YES)
+ return;
+ if (pr->qe == NULL)
+ return; /* done here */
if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
(type == GNUNET_BLOCK_TYPE_IBLOCK) )
{
- if (pr->qe != NULL)
- GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
return;
}
if ( (pr->client_request_list == NULL) &&
gettext_noop ("# processing result set cut short due to load"),
1,
GNUNET_NO);
- if (pr->qe != NULL)
- GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
return;
}
- if (pr->qe != NULL)
- GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
}
bound_priority (uint32_t prio_in,
struct ConnectedPeer *cp)
{
- return 0; // FIXME!
+ if (cp->trust_delta > prio_in)
+ {
+ cp->trust_delta -= prio_in;
+ return prio_in;
+ }
+ // FIXME: get out trust in the target peer from peerinfo!
+ return 0;
}
size_t bfsize;
uint32_t ttl_decrement;
enum GNUNET_BLOCK_Type type;
- double preference;
int have_ns;
msize = ntohs(message->size);
pr = GNUNET_malloc (sizeof (struct PendingRequest) +
(have_ns ? sizeof(GNUNET_HashCode) : 0));
if (have_ns)
- pr->namespace = (GNUNET_HashCode*) &pr[1];
+ {
+ pr->namespace = (GNUNET_HashCode*) &pr[1];
+ memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
+ }
pr->type = type;
pr->mingle = ntohl (gm->filter_mutator);
- if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
- memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
gettext_noop ("# requests dropped due TTL underflow"),
1,
GNUNET_NO);
- /* integer underflow => drop (should be very rare)! */
+ /* integer underflow => drop (should be very rare)! */
GNUNET_free (pr);
return GNUNET_OK;
}
GNUNET_NO);
/* calculate change in traffic preference */
- preference = (double) pr->priority;
- if (preference < QUERY_BANDWIDTH_VALUE)
- preference = QUERY_BANDWIDTH_VALUE;
- cps->inc_preference += preference;
-
+ cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
/* process locally */
if (type == GNUNET_BLOCK_TYPE_DBLOCK)
type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
pr->qe = GNUNET_DATASTORE_get (dsh,
&gm->query,
type,
- (unsigned int) preference, 64 /* FIXME */,
-
+ pr->priority + 1,
+ MAX_DATASTORE_QUEUE,
timeout,
&process_local_reply,
pr);
/* **************************** Startup ************************ */
-
-/**
- * List of handlers for P2P messages
- * that we care about.
- */
-static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
- {
- { &handle_p2p_get,
- GNUNET_MESSAGE_TYPE_FS_GET, 0 },
- { &handle_p2p_put,
- GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
- { NULL, 0, 0 }
- };
-
-
-/**
- * List of handlers for the messages understood by this
- * service.
- */
-static struct GNUNET_SERVER_MessageHandler handlers[] = {
- {&GNUNET_FS_handle_index_start, NULL,
- GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
- {&GNUNET_FS_handle_index_list_get, NULL,
- GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
- {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
- sizeof (struct UnindexMessage) },
- {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
- 0 },
- {NULL, NULL, 0, 0}
-};
-
-
/**
* Process fs requests.
*
struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *c)
{
+ static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
+ {
+ { &handle_p2p_get,
+ GNUNET_MESSAGE_TYPE_FS_GET, 0 },
+ { &handle_p2p_put,
+ GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
+ { NULL, 0, 0 }
+ };
+ static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+ {&GNUNET_FS_handle_index_start, NULL,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
+ {&GNUNET_FS_handle_index_list_get, NULL,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
+ {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
+ sizeof (struct UnindexMessage) },
+ {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
+ 0 },
+ {NULL, NULL, 0, 0}
+ };
+
sched = s;
cfg = c;
stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
+ min_migration_delay = GNUNET_TIME_UNIT_SECONDS; // FIXME: get from config
connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
}
return GNUNET_SYSERR;
}
+ /* FIXME: distinguish between sending and storing in options? */
+ if (active_migration)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Content migration is enabled, will start to gather data\n"));
+ consider_migration_gathering ();
+ }
GNUNET_SERVER_disconnect_notify (server,
&handle_client_disconnect,
NULL);