* @author Christian Grothoff
*
* TODO:
- * - tracking of PendingRequests
- * - setup P2P search on CS request
- * - setup P2P search on P2P GET
- * - forward replies based on tracked requests
* - validation of KBLOCKS (almost done)
* - validation of SBLOCKS
* - validation of KSBLOCKS
- * - content migration (put in local DS)
- * - possible major issue: we may
- * queue "gazillions" of (K|S)Blocks for the
- * core to transmit to another peer; need
- * to make sure this is bounded overall...
+ * - actually DO P2P search upon P2P/CS requests (pass appropriate handler to core
+ * and work through request list there; also update ttl/priority for our client's requests)
+ * - randomly delay processing for improved anonymity (can wait)
+ * - content migration (put in local DS) (can wait)
+ * - check that we decrement PIDs always where necessary (can wait)
+ * - handle some special cases when forwarding replies based on tracked requests (can wait)
+ * - tracking of success correlations for hot-path routing (can wait)
+ * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
+ * core to transmit to another peer; need to make sure this is bounded overall...
* - various load-based actions (can wait)
* - remove on-demand blocks if they keep failing (can wait)
*/
* level is >0 we may happen to know the responder's identity;
* nevertheless, we should probably not use it for a DHT-lookup
* or similar blunt actions in order to avoid exposing ourselves).
- * <p>
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
* If the request is for an SBLOCK, this is the identity of the
* pseudonym to which the SBLOCK belongs.
- * <p>
- * If the request is for a KBLOCK, "target" must be all zeros.
*/
- GNUNET_HashCode target;
+ GNUNET_HashCode namespace;
/**
* Hash of the keyword (aka query) for KBLOCKs; Hash of
};
+/**
+ * Information we keep for each pending reply.
+ */
+struct PendingReply
+{
+ /**
+ * This is a linked list.
+ */
+ struct PendingReply *next;
+
+ /**
+ * Size of the reply; actual reply message follows
+ * at the end of this struct.
+ */
+ size_t msize;
+
+};
+
+
/**
* All requests from a client are
* kept in a doubly-linked list.
*/
struct GNUNET_CONTAINER_BloomFilter *bf;
+ /**
+ * Replies that we have received but were unable to forward yet
+ * (typically non-null only if we have a pending transmission
+ * request with the client or the respective peer).
+ */
+ struct PendingReply *replies_pending;
+
+ /**
+ * Pending transmission request with the core service for the target
+ * peer (for processing of 'replies_pending').
+ */
+ struct GNUNET_CORE_TransmitHandle *cth;
+
+ /**
+ * Pending transmission request for the target client (for processing of
+ * 'replies_pending').
+ */
+ struct GNUNET_CONNECTION_TransmitHandle *th;
+
/**
* Hash code of all replies that we have seen so far (only valid
* if client is not NULL since we only track replies like this for
*/
GNUNET_PEER_Id *used_pids;
+ /**
+ * Desired anonymity level; only valid for requests from a local client.
+ */
+ uint32_t anonymity_level;
+
/**
* How many entries in "used_pids" are actually valid?
*/
*/
uint32_t remaining_priority;
+ /**
+ * Number to mingle hashes for bloom-filter
+ * tests with.
+ */
+ int32_t mingle;
+
/**
* TTL with which we saw this request (or, if we initiated, TTL that
* we used for the request).
};
-/**
- * Map from queries to pending requests ("struct PendingRequest") for
- * this query.
- */
-static struct GNUNET_CONTAINER_MultiHashMap *request_map;
-
/**
* Our connection to the datastore.
*/
static struct IndexInfo *indexed_files;
/**
- * Maps hash over content of indexed files
- * to the respective filename. The filenames
- * are pointers into the indexed_files linked
- * list and do not need to be freed.
+ * Maps hash over content of indexed files to the respective filename.
+ * The filenames are pointers into the indexed_files linked list and
+ * do not need to be freed.
*/
static struct GNUNET_CONTAINER_MultiHashMap *ifm;
static struct ClientList *clients;
/**
- * Heap with the request that will expire next at the top.
+ * Heap with the request that will expire next at the top. Contains
+ * pointers of type "struct PendingRequest*"; these will *also* be
+ * aliased from the "requests_by_peer" data structures and the
+ * "requests_by_query" table. Note that requests from our clients
+ * don't expire and are thus NOT in the "requests_by_expiration"
+ * (or the "requests_by_peer" tables).
*/
static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
+/**
+ * FIXME: set from configuration.
+ */
+static uint64_t max_pending_requests = 32;
/**
* Write the current index information list to disk.
}
+/**
+ * How many bytes should a bloomfilter be if we have already seen
+ * entry_count responses? Note that BLOOMFILTER_K gives us the number
+ * of bits set per entry. Furthermore, we should not re-size the
+ * filter too often (to keep it cheap).
+ *
+ * Since other peers will also add entries but not resize the filter,
+ * we should generally pick a slightly larger size than what the
+ * strict math would suggest.
+ *
+ * @return must be a power of two and smaller or equal to 2^15.
+ */
+static unsigned int
+compute_bloomfilter_size (unsigned int entry_count)
+{
+ unsigned int size;
+ unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
+ uint16_t max = 1 << 15;
+
+ if (entry_count > max)
+ return max;
+ size = 8;
+ while ((size < max) && (size < ideal))
+ size *= 2;
+ if (size > max)
+ return max;
+ return size;
+}
+
+
+/**
+ * Recalculate our bloom filter for filtering replies.
+ *
+ * @param count number of entries we are filtering right now
+ * @param mingle set to our new mingling value
+ * @param entries the entries to filter
+ * @return updated bloomfilter, NULL for none
+ */
+static struct GNUNET_CONTAINER_BloomFilter *
+refresh_bloomfilter (unsigned int count,
+ int32_t * mingle,
+ const GNUNET_HashCode *entries)
+{
+ struct GNUNET_CONTAINER_BloomFilter *bf;
+ unsigned int nsize;
+ unsigned int i;
+ GNUNET_HashCode mhash;
+
+ if (0 == count)
+ return NULL;
+ nsize = compute_bloomfilter_size (count);
+ *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
+ bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
+ nsize,
+ BLOOMFILTER_K);
+ for (i=0;i<count;i++)
+ {
+ mingle_hash (&entries[i], *mingle, &mhash);
+ GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
+ }
+ return bf;
+}
+
+
/**
* We're processing (local) results for a search request
* from a (local) client. Pass applicable results to the
uint64_t uid)
{
struct LocalGetContext *lgc = cls;
+ struct PendingRequest *pr;
+ struct ClientRequestList *crl;
+ struct ClientList *cl;
size_t msize;
unsigned int i;
(lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
(lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
{
- // FIXME: initiate P2P search
- return;
+ cl = clients;
+ while ( (NULL != cl) &&
+ (cl->client != lgc->client) )
+ cl = cl->next;
+ if (cl == NULL)
+ {
+ cl = GNUNET_malloc (sizeof (struct ClientList));
+ cl->client = lgc->client;
+ cl->next = clients;
+ clients = cl;
+ }
+ crl = GNUNET_malloc (sizeof (struct ClientRequestList));
+ GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
+ pr = GNUNET_malloc (sizeof (struct PendingRequest));
+ pr->client = lgc->client;
+ GNUNET_SERVER_client_keep (pr->client);
+ pr->crl_entry = crl;
+ crl->req = pr;
+ if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
+ {
+ pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
+ *pr->namespace = lgc->namespace;
+ }
+ pr->replies_seen = lgc->results;
+ lgc->results = NULL;
+ pr->start_time = GNUNET_TIME_absolute_get ();
+ pr->query = lgc->query;
+ pr->target_pid = GNUNET_PEER_intern (&lgc->target);
+ pr->replies_seen_off = lgc->results_used;
+ pr->replies_seen_size = lgc->results_size;
+ lgc->results_size = 0;
+ pr->type = lgc->type;
+ pr->anonymity_level = lgc->anonymity_level;
+ pr->bf = refresh_bloomfilter (pr->replies_seen_off,
+ &pr->mingle,
+ pr->replies_seen);
+ GNUNET_CONTAINER_multihashmap_put (requests_by_query,
+ &pr->query,
+ pr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+ // FIXME: trigger some processing NOW!
+ local_get_context_free (lgc);
+ return;
}
/* got all possible results, clean up! */
local_get_context_free (lgc);
lgc->client = client;
lgc->type = ntohl (sm->type);
lgc->anonymity_level = ntohl (sm->anonymity_level);
- lgc->target = sm->target;
+ switch (lgc->type)
+ {
+ case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
+ case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+ lgc->target.hashPubKey = sm->target;
+ break;
+ case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
+ lgc->namespace = sm->target;
+ break;
+ default:
+ break;
+ }
lgc->query = sm->query;
GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
static void
destroy_pending_request (struct PendingRequest *pr)
{
+ struct PendingReply *reply;
+
GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
&pr->query,
pr);
// FIXME: not sure how this can work (efficiently)
// also, what does the return value mean?
- GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
- pr);
+ if (pr->client != NULL)
+ GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
+ pr);
if (NULL != pr->bf)
GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+ if (NULL != pr->cth)
+ GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
+ if (NULL != pr->th)
+ GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
+ while (NULL != (reply = pr->replies_pending))
+ {
+ pr->replies_pending = reply->next;
+ GNUNET_free (reply);
+ }
GNUNET_PEER_change_rc (pr->source_pid, -1);
GNUNET_PEER_change_rc (pr->target_pid, -1);
GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
GNUNET_DATASTORE_disconnect (dsh,
GNUNET_NO);
dsh = NULL;
- // FIXME: iterate over 'request_map' to free entries!
- GNUNET_CONTAINER_multihashmap_destroy (request_map);
- request_map = NULL;
+ // FIXME: iterate over maps to free entries!
+ GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
+ requests_by_query = NULL;
+ GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
+ requests_by_peer = NULL;
+ GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
+ requests_by_expiration = NULL;
GNUNET_CONTAINER_multihashmap_destroy (ifm);
ifm = NULL;
while (NULL != (pos = indexed_files))
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct ProcessGetContext *pgc = cls;
+ struct PendingRequest *pr;
+ struct PendingRequest *eer;
+ struct GNUNET_PeerIdentity target;
- // FIXME: install entry in
- // 'request_map' and do actual
- // forwarding...
- if (pgc->bf != NULL)
- GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
+ pr = GNUNET_malloc (sizeof (struct PendingRequest));
+ if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
+ {
+ pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
+ *pr->namespace = pgc->namespace;
+ }
+ pr->bf = pgc->bf;
+ pgc->bf = NULL;
+ pr->start_time = pgc->start_time;
+ pr->query = pgc->query;
+ pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
+ if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
+ pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
+ pr->anonymity_level = 1; /* default */
+ pr->priority = pgc->priority;
+ pr->remaining_priority = pr->priority;
+ pr->mingle = pgc->mingle;
+ pr->ttl = pgc->ttl;
+ pr->type = pgc->type;
+ GNUNET_CONTAINER_multihashmap_put (requests_by_query,
+ &pr->query,
+ pr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
+ &pgc->reply_to.hashPubKey,
+ pr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ GNUNET_CONTAINER_heap_insert (requests_by_expiration,
+ pr,
+ pr->start_time.value + pr->ttl);
+ if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
+ {
+ /* expire oldest request! */
+ eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
+ GNUNET_PEER_resolve (eer->source_pid,
+ &target);
+ GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
+ &target.hashPubKey,
+ eer);
+ destroy_pending_request (eer);
+ }
+ // FIXME: trigger actual forwarding NOW!
GNUNET_free (pgc);
}
/**
- * We're processing a GET request from
- * another peer. Give it to our local
- * datastore.
+ * We're processing a GET request from another peer. Give it to our
+ * local datastore.
*
* @param cls our "struct ProcessGetContext"
* @param ok did we get a datastore slice or not?
}
+/**
+ * Function called to notify us that we can now transmit a reply to a
+ * client or peer. "buf" will be NULL and "size" zero if the socket was
+ * closed for writing in the meantime.
+ *
+ * @param cls closure, points to a "struct PendingRequest*" with
+ * one or more pending replies
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_result (void *cls,
+ size_t size,
+ void *buf)
+{
+ struct PendingRequest *pr = cls;
+ char *cbuf = buf;
+ struct PendingReply *reply;
+ size_t ret;
+
+ ret = 0;
+ while (NULL != (reply = pr->replies_pending))
+ {
+ if ( (reply->msize + ret < ret) ||
+ (reply->msize + ret > size) )
+ break;
+ pr->replies_pending = reply->next;
+ memcpy (&cbuf[ret], &reply[1], reply->msize);
+ ret += reply->msize;
+ GNUNET_free (pr);
+ }
+ return 0;
+}
+
+
/**
* Iterator over pending requests.
*
{
struct ProcessReplyClosure *prq = cls;
struct PendingRequest *pr = value;
-
- fprintf (stderr, "FIXME %p %p\n", prq, pr);
- // FIXME: forward reply to client
- // or other peers (depending on pr...)
+ struct PendingRequest *eer;
+ struct PendingReply *reply;
+ struct PutMessage *pm;
+ struct ContentMessage *cm;
+ GNUNET_HashCode chash;
+ GNUNET_HashCode mhash;
+ struct GNUNET_PeerIdentity target;
+ size_t msize;
+ uint32_t prio;
+ struct GNUNET_TIME_Relative max_delay;
+
+ GNUNET_CRYPTO_hash (prq->data,
+ prq->size,
+ &chash);
+ switch (prq->type)
+ {
+ case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
+ case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+ if (0 != memcmp (&chash,
+ key,
+ sizeof (GNUNET_HashCode)))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_YES;
+ }
+ break;
+ case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
+ // FIXME: validate KBlock!
+ if (pr->bf != NULL)
+ {
+ mingle_hash (&chash, pr->mingle, &mhash);
+ if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
+ &mhash))
+ return GNUNET_YES; /* duplicate */
+ GNUNET_CONTAINER_bloomfilter_add (pr->bf,
+ &mhash);
+ }
+ break;
+ case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
+ // FIXME: validate SBlock!
+ if (pr->bf != NULL)
+ {
+ mingle_hash (&chash, pr->mingle, &mhash);
+ if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
+ &mhash))
+ return GNUNET_YES; /* duplicate */
+ GNUNET_CONTAINER_bloomfilter_add (pr->bf,
+ &mhash);
+ }
+ break;
+ case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
+ // FIXME: validate SKBlock!
+ break;
+ }
+ prio = pr->priority;
+ prq->priority += pr->remaining_priority;
+ pr->remaining_priority = 0;
+ if (pr->client != NULL)
+ {
+ if (pr->replies_seen_size == pr->replies_seen_off)
+ GNUNET_array_grow (pr->replies_seen,
+ pr->replies_seen_size,
+ pr->replies_seen_size * 2 + 4);
+ pr->replies_seen[pr->replies_seen_off++] = chash;
+ // FIXME: possibly recalculate BF!
+ }
+ if (pr->client == NULL)
+ {
+ msize = sizeof (struct ContentMessage) + prq->size;
+ reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
+ reply->msize = msize;
+ cm = (struct ContentMessage*) &reply[1];
+ cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
+ cm->header.size = htons (msize);
+ cm->type = htonl (prq->type);
+ cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
+ reply->next = pr->replies_pending;
+ pr->replies_pending = reply;
+ memcpy (&reply[1], prq->data, prq->size);
+ if (pr->cth != NULL)
+ return GNUNET_YES;
+ max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
+ if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
+ {
+ /* estimate expiration time from time difference between
+ first request that will be discarded and this request */
+ eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
+ max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
+ eer->start_time);
+ }
+ GNUNET_PEER_resolve (pr->source_pid,
+ &target);
+ pr->cth = GNUNET_CORE_notify_transmit_ready (core,
+ prio,
+ max_delay,
+ &target,
+ msize,
+ &transmit_result,
+ pr);
+ if (NULL == pr->cth)
+ {
+ // FIXME: now what? discard?
+ }
+ }
+ else
+ {
+ msize = sizeof (struct PutMessage) + prq->size;
+ reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
+ reply->msize = msize;
+ reply->next = pr->replies_pending;
+ pm = (struct PutMessage*) &reply[1];
+ pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+ pm->header.size = htons (msize);
+ pm->type = htonl (prq->type);
+ pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
+ pr->replies_pending = reply;
+ memcpy (&reply[1], prq->data, prq->size);
+ if (pr->th != NULL)
+ return GNUNET_YES;
+ pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_result,
+ pr);
+ if (pr->th == NULL)
+ {
+ // FIXME: need to try again later (not much
+ // to do here specifically, but we need to
+ // check somewhere else to handle this case!)
+ }
+ }
+ // FIXME: implement hot-path routing statistics keeping!
return GNUNET_YES;
}
prq.type = type;
prq.expiration = expiration;
prq.priority = 0;
- GNUNET_CONTAINER_multihashmap_get_multiple (request_map,
+ GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
&query,
&process_reply,
&prq);
cfg = c;
ifm = GNUNET_CONTAINER_multihashmap_create (128);
- request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+ requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+ requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+ requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
read_index_list ();
dsh = GNUNET_DATASTORE_connect (cfg,
sched);