* - check that we decrement PIDs always where necessary (can wait)
*/
#include "platform.h"
+#include <values.h>
#include "gnunet_core_service.h"
#include "gnunet_datastore_service.h"
#include "gnunet_peer_lib.h"
/**
* Pending transmission request with the core service for the target
- * peer (for processing of 'replies_pending').
+ * peer (for processing of 'replies_pending') or Handle for a
+ * pending query-request for P2P-transmission with the core service.
+ * If non-NULL, this request must be cancelled should this struct be
+ * destroyed!
*/
struct GNUNET_CORE_TransmitHandle *cth;
*/
GNUNET_HashCode query;
+ /**
+ * The task responsible for transmitting queries
+ * for this request.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier task;
+
/**
* (Interned) Peer identifier (only valid if "client" is NULL)
* that identifies a peer that gave us this request.
};
+/**
+ * Information about a peer that we are connected to.
+ * We track data that is useful for determining which
+ * peers should receive our requests.
+ */
+struct ConnectedPeer
+{
+
+ /**
+ * List of the last clients for which this peer
+ * successfully answered a query.
+ */
+ struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
+
+ /**
+ * List of the last PIDs for which
+ * this peer successfully answered a query;
+ * We use 0 to indicate no successful reply.
+ */
+ GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
+
+ /**
+ * Average delay between sending the peer a request and
+ * getting a reply (only calculated over the requests for
+ * which we actually got a reply). Calculated
+ * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
+ */
+ struct GNUNET_TIME_Relative avg_delay;
+
+ /**
+ * Average priority of successful replies. Calculated
+ * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
+ */
+ double avg_priority;
+
+ /**
+ * The peer's identity.
+ */
+ GNUNET_PEER_Id pid;
+
+ /**
+ * Number of requests we have currently pending
+ * with this peer (that is, requests that were
+ * transmitted so recently that we would not retransmit
+ * them right now).
+ */
+ unsigned int pending_requests;
+
+ /**
+ * Which offset in "last_p2p_replies" will be updated next?
+ * (we go round-robin).
+ */
+ unsigned int last_p2p_replies_woff;
+
+ /**
+ * Which offset in "last_client_replies" will be updated next?
+ * (we go round-robin).
+ */
+ unsigned int last_client_replies_woff;
+
+};
+
+
/**
* Our connection to the datastore.
*/
*/
static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
+/**
+ * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
+
/**
* Maximum number of requests (from other peers) that we're
* willing to have pending at any given point in time.
}
+/**
+ * Closure used for "target_peer_select_cb".
+ */
+struct PeerSelectionContext
+{
+ /**
+ * The request for which we are selecting
+ * peers.
+ */
+ struct PendingRequest *pr;
+
+ /**
+ * Current "prime" target.
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
+ * How much do we like this target?
+ */
+ double target_score;
+
+};
+
+
+/**
+ * Function called for each connected peer to determine
+ * which one(s) would make good targets for forwarding.
+ *
+ * @param cls closure (struct PeerSelectionContext)
+ * @param key current key code (peer identity)
+ * @param value value in the hash map (struct ConnectedPeer)
+ * @return GNUNET_YES if we should continue to
+ * iterate,
+ * GNUNET_NO if not.
+ */
+static int
+target_peer_select_cb (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct PeerSelectionContext *psc = cls;
+ // struct ConnectedPeer *cp = value;
+ double score;
+ // FIXME (CRITICAL: would always sent to same peer without this!)
+ // 1) check if we have already (recently) forwarded to this peer, if so, skip
+ // 2) calculate how much we'd like to forward to this peer
+ score = 0;
+
+ // 3) store in closure
+ if (score > psc->target_score)
+ {
+ psc->target_score = score;
+ psc->target.hashPubKey = *key;
+ }
+ return GNUNET_YES;
+}
+
+
+
+
+/**
+ * We use a random delay to make the timing of requests
+ * less predictable. This function returns such a random
+ * delay.
+ *
+ * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
+ */
+static struct GNUNET_TIME_Relative
+get_processing_delay ()
+{
+ return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ TTL_DECREMENT));
+}
+
+
+/**
+ * Task that is run for each request with the
+ * goal of forwarding the associated query to
+ * other peers. The task should re-schedule
+ * itself to be re-run once the TTL has expired.
+ * (or at a later time if more peers should
+ * be queried earlier).
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param tc task context (unused)
+ */
+static void
+forward_request_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * We've selected a peer for forwarding of a query.
+ * Construct the message and then re-schedule the
+ * task to forward again to (other) peers.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_request_cb (void *cls,
+ size_t size,
+ void *buf)
+{
+ struct PendingRequest *pr = cls;
+ uint16_t msize;
+
+ pr->cth = NULL;
+ /* (1) check for timeout */
+ if (NULL == buf)
+ {
+ /* timeout, try another peer immediately again */
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ GNUNET_TIME_UNIT_ZERO,
+ &forward_request_task,
+ pr);
+ return 0;
+ }
+ /* (2) build query message */
+ msize = 0;
+ // CRITICAL-FIXME! (nothing goes without this!)
+ /* (3) schedule job to do it again (or another peer, etc.) */
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (), // FIXME!
+ &forward_request_task,
+ pr);
+
+ return msize;
+}
+
+
+/**
+ * Function called after we've tried to reserve
+ * a certain amount of bandwidth for a reply.
+ * Check if we succeeded and if so send our query.
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param peer identifies the peer
+ * @param latency current latency estimate, "FOREVER" if we have been
+ * disconnected
+ * @param bpm_in set to the current bandwidth limit (receiving) for this peer
+ * @param bpm_out set to the current bandwidth limit (sending) for this peer
+ * @param amount set to the amount that was actually reserved or unreserved
+ * @param preference current traffic preference for the given peer
+ */
+static void
+target_reservation_cb (void *cls,
+ const struct
+ GNUNET_PeerIdentity * peer,
+ unsigned int bpm_in,
+ unsigned int bpm_out,
+ struct GNUNET_TIME_Relative
+ latency, int amount,
+ unsigned long long preference)
+{
+ struct PendingRequest *pr = cls;
+ uint32_t priority;
+ uint16_t size;
+ struct GNUNET_TIME_Relative maxdelay;
+
+ GNUNET_assert (peer != NULL);
+ if ( (amount != DBLOCK_SIZE) ||
+ (pr->cth != NULL) )
+ {
+ /* try again later; FIXME: we may need to un-reserve "amount"? */
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (), // FIXME: longer?
+ &forward_request_task,
+ pr);
+ return;
+ }
+ // (2) transmit, update ttl/priority
+ // FIXME: calculate priority, maxdelay, size properly!
+ priority = 0;
+ size = 60000;
+ maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
+ pr->cth = GNUNET_CORE_notify_transmit_ready (core,
+ priority,
+ maxdelay,
+ peer,
+ size,
+ &transmit_request_cb,
+ pr);
+ if (pr->cth == NULL)
+ {
+ /* try again later */
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (), // FIXME: longer?
+ &forward_request_task,
+ pr);
+ }
+}
+
+
+/**
+ * Task that is run for each request with the
+ * goal of forwarding the associated query to
+ * other peers. The task should re-schedule
+ * itself to be re-run once the TTL has expired.
+ * (or at a later time if more peers should
+ * be queried earlier).
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param tc task context (unused)
+ */
+static void
+forward_request_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PendingRequest *pr = cls;
+ struct PeerSelectionContext psc;
+
+ pr->task = GNUNET_SCHEDULER_NO_TASK;
+ if (pr->cth != NULL)
+ {
+ /* we're busy transmitting a result, wait a bit */
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (),
+ &forward_request_task,
+ pr);
+ return;
+ }
+ /* (1) select target */
+ psc.pr = pr;
+ psc.target_score = MINDOUBLE;
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &target_peer_select_cb,
+ &psc);
+ if (psc.target_score == MINDOUBLE)
+ {
+ /* no possible target found, wait some time */
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
+ &forward_request_task,
+ pr);
+ return;
+ }
+ /* (2) reserve reply bandwidth */
+ // FIXME: need a way to cancel; this
+ // async operation is problematic (segv-problematic)
+ // if "pr" is destroyed while it happens!
+ GNUNET_CORE_peer_configure (core,
+ &psc.target,
+ GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+ -1,
+ DBLOCK_SIZE, // FIXME: make dependent on type?
+ 0,
+ &target_reservation_cb,
+ pr);
+}
+
+
/**
* We're processing (local) results for a search request
* from a (local) client. Pass applicable results to the
&pr->query,
pr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
- // FIXME: trigger some processing NOW!
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (),
+ &forward_request_task,
+ pr);
local_get_context_free (lgc);
return;
}
cl->tail,
pr->crl_entry);
}
- if (NULL != pr->bf)
- GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+ if (GNUNET_SCHEDULER_NO_TASK != pr->task)
+ GNUNET_SCHEDULER_cancel (sched, pr->task);
if (NULL != pr->cth)
GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
+ if (NULL != pr->bf)
+ GNUNET_CONTAINER_bloomfilter_free (pr->bf);
if (NULL != pr->th)
GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
while (NULL != (reply = pr->replies_pending))
requests_by_peer = NULL;
GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
requests_by_expiration = NULL;
+ // FIXME: iterate over entries and free individually?
+ // (or do we get disconnect notifications?)
+ GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
+ connected_peers = NULL;
GNUNET_CONTAINER_multihashmap_destroy (ifm);
ifm = NULL;
while (NULL != (pos = indexed_files))
}
+
+/**
+ * Method called whenever a given peer connects.
+ *
+ * @param cls closure, not used
+ * @param peer peer identity this notification is about
+ */
+static void
+peer_connect_handler (void *cls,
+ const struct
+ GNUNET_PeerIdentity * peer)
+{
+ struct ConnectedPeer *cp;
+
+ cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
+ cp->pid = GNUNET_PEER_intern (peer);
+ GNUNET_CONTAINER_multihashmap_put (connected_peers,
+ &peer->hashPubKey,
+ cp,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+}
+
+
/**
* Method called whenever a peer disconnects.
*
const struct
GNUNET_PeerIdentity * peer)
{
+ struct ConnectedPeer *cp;
+
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &peer->hashPubKey);
+ GNUNET_PEER_change_rc (cp->pid, -1);
+ GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
+ GNUNET_free (cp);
GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
&peer->hashPubKey,
&destroy_request,
eer);
destroy_pending_request (eer);
}
- // FIXME: trigger actual forwarding NOW!
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_TASK,
+ get_processing_delay (),
+ &forward_request_task,
+ pr);
GNUNET_free (pgc);
}
-
-
/**
* Transmit the given message by copying it to
* the target buffer "buf". "buf" will be
GNUNET_TIME_UNIT_FOREVER_REL,
NULL,
&core_start_cb,
- NULL,
+ &peer_connect_handler,
&peer_disconnect_handler,
NULL,
NULL, GNUNET_NO,
ifm = GNUNET_CONTAINER_multihashmap_create (128);
requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+ connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
read_index_list ();
dsh = GNUNET_DATASTORE_connect (cfg,