* @author Christian Grothoff
*
* TODO:
- * - forward_request_task (P2P forwarding!)
* - track stats for hot-path routing
* - implement hot-path routing decision procedure
- * - detect duplicate requests (P2P and CS)
* - implement: bound_priority, test_load_too_high, validate_skblock
* - add content migration support (store locally)
* - statistics
- *
*/
#include "platform.h"
#include <float.h>
uint32_t remaining_priority;
/**
- * Number to mingle hashes for bloom-filter
- * tests with.
+ * Number to mingle hashes for bloom-filter tests with.
*/
int32_t mingle;
pr->hnode);
pr->hnode = NULL;
}
- /* might have already been removed from map
- in 'process_reply' if there was a unique
- reply; hence ignore the return value here */
+ /* might have already been removed from map in 'process_reply' (if
+ there was a unique reply) or never inserted if it was a
+ duplicate; hence ignore the return value here */
(void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
&pr->query,
pr);
}
-#if 0
/**
* How many bytes should a bloomfilter be if we have already seen
* entry_count responses? Note that BLOOMFILTER_K gives us the number
}
return bf;
}
-#endif
/**
size_t msize;
unsigned int k;
int no_route;
+ uint32_t bm;
pr->irc = NULL;
GNUNET_assert (peer != NULL);
/* build message and insert message into priority queue */
k = 0;
+ bm = 0;
+ if (GNUNET_YES == no_route)
+ {
+ bm |= GET_MESSAGE_BIT_RETURN_TO;
+ k++;
+ }
if (pr->namespace != NULL)
- k++;
+ {
+ bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
+ k++;
+ }
if (pr->target_pid != 0)
- k++;
- if (GNUNET_YES == no_route)
- k++;
+ {
+ bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
+ k++;
+ }
msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
pr->remaining_priority /= 2;
gm->priority = htonl (pr->remaining_priority);
gm->ttl = htonl (pr->ttl);
- gm->filter_mutator = htonl(pr->mingle); // FIXME: bad endianess conversion?
- gm->hash_bitmap = htonl (42); // FIXME!
+ gm->filter_mutator = htonl(pr->mingle);
+ gm->hash_bitmap = htonl (bm);
gm->query = pr->query;
ext = (GNUNET_HashCode*) &gm[1];
k = 0;
+ if (GNUNET_YES == no_route)
+ GNUNET_PEER_resolve (pr->pht_entry->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
if (pr->namespace != NULL)
memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
if (pr->target_pid != 0)
GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
- if (GNUNET_YES == no_route)
- GNUNET_PEER_resolve (pr->pht_entry->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
bfdata = (char *) &ext[k];
if (pr->bf != NULL)
GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
GNUNET_array_grow (pr->replies_seen,
pr->replies_seen_size,
pr->replies_seen_size * 2 + 4);
- // FIXME: recalculate BF!
+ if (pr->bf != NULL)
+ GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+ pr->bf = refresh_bloomfilter (pr->replies_seen_off,
+ &pr->mingle,
+ &pr->bf_size,
+ pr->replies_seen);
}
- pr->replies_seen[pr->replies_seen_off++] = chash;
+ pr->replies_seen[pr->replies_seen_off++] = chash;
+
}
break;
case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
}
+/**
+ * Closure for 'check_duplicate_request'.
+ */
+struct CheckDuplicateRequestClosure
+{
+ /**
+ * The new request we should check if it already exists.
+ */
+ const struct PendingRequest *pr;
+
+ /**
+ * Existing request found by the checker, NULL if none.
+ */
+ struct PendingRequest *have;
+};
+
+
+/**
+ * Iterator over entries in the 'query_request_map' that
+ * tries to see if we have the same request pending from
+ * the same peer already.
+ *
+ * @param cls closure (our 'struct CheckDuplicateRequestClosure')
+ * @param key current key code (query, ignored, must match)
+ * @param value value in the hash map (a 'struct PendingRequest'
+ * that already exists)
+ * @return GNUNET_YES if we should continue to
+ * iterate (no match yet)
+ * GNUNET_NO if not (match found).
+ */
+static int
+check_duplicate_request (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct CheckDuplicateRequestClosure *cdc = cls;
+ struct PendingRequest *have = value;
+
+ if (cdc->pr->target_pid == have->target_pid)
+ {
+ cdc->have = have;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+
/**
* Handle P2P "GET" request.
*
struct PeerRequestEntry *pre;
struct ConnectedPeer *cp;
struct ConnectedPeer *cps;
+ struct CheckDuplicateRequestClosure cdc;
struct GNUNET_TIME_Relative timeout;
uint16_t msize;
const struct GetMessage *gm;
}
opt = (const GNUNET_HashCode*) &gm[1];
bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
-
bm = ntohl (gm->hash_bitmap);
if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
(ntohl (gm->type) == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) )
if ((bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
pr->namespace = (GNUNET_HashCode*) &pr[1];
pr->type = ntohl (gm->type);
- pr->mingle = gm->filter_mutator;
+ pr->mingle = ntohl (gm->filter_mutator);
if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
else if (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
pr->bf_size = bfsize;
}
- /* FIXME: check somewhere if request already exists, and if so,
- recycle old state... */
+ cdc.have = NULL;
+ cdc.pr = pr;
+ GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+ &gm->query,
+ &check_duplicate_request,
+ &cdc);
+ if (cdc.have != NULL)
+ {
+ if (cdc.have->start_time.value + cdc.have->ttl >=
+ pr->start_time.value + pr->ttl)
+ {
+ /* existing request has higher TTL, drop new one! */
+ cdc.have->priority += pr->priority;
+ destroy_pending_request (pr);
+ return GNUNET_OK;
+ }
+ else
+ {
+ /* existing request has lower TTL, drop old one! */
+ pr->priority += cdc.have->priority;
+ /* Possible optimization: if we have applicable pending
+ replies in 'cdc.have', we might want to move those over
+ (this is a really rare special-case, so it is not clear
+ that this would be worth it) */
+ destroy_pending_request (cdc.have);
+ /* keep processing 'pr'! */
+ }
+ }
+
pre = GNUNET_malloc (sizeof (struct PeerRequestEntry));
pre->cp = cp;
pre->req = pr;
pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
pr,
- GNUNET_TIME_absolute_get().value + pr->ttl);
+ pr->start_time.value + pr->ttl);
/* calculate change in traffic preference */
GNUNET_h2s (&sm->query),
(unsigned int) type);
#endif
- /* FIXME: detect duplicate request; if duplicate, simply update (merge)
- 'pr->replies_seen'! */
+
+ /* detect duplicate KBLOCK requests */
+ if (type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK)
+ {
+ crl = cl->rl_head;
+ while ( (crl != NULL) &&
+ ( (0 != memcmp (&crl->req->query,
+ &sm->query,
+ sizeof (GNUNET_HashCode))) ||
+ (crl->req->type != type) ) )
+ crl = crl->next;
+ if (crl != NULL)
+ {
+ pr = crl->req;
+ /* Duplicate request (used to send long list of
+ known/blocked results); merge 'pr->replies_seen'
+ and update bloom filter */
+ GNUNET_array_grow (pr->replies_seen,
+ pr->replies_seen_size,
+ pr->replies_seen_off + sc);
+ memcpy (&pr->replies_seen[pr->replies_seen_off],
+ &sm[1],
+ sc * sizeof (GNUNET_HashCode));
+ pr->replies_seen_off += sc;
+ if (pr->bf != NULL)
+ GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+ pr->bf = refresh_bloomfilter (pr->replies_seen_off,
+ &pr->mingle,
+ &pr->bf_size,
+ pr->replies_seen);
+ GNUNET_SERVER_receive_done (client,
+ GNUNET_OK);
+ return;
+ }
+ }
pr = GNUNET_malloc (sizeof (struct PendingRequest) +
((type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)?sizeof(GNUNET_HashCode):0));
crl = GNUNET_malloc (sizeof (struct ClientRequestList));
&sm[1],
sc * sizeof (GNUNET_HashCode));
pr->replies_seen_off = sc;
- pr->anonymity_level = ntohl (sm->anonymity_level);
- pr->mingle = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
- (uint32_t) -1);
- pr->query = sm->query;
+ pr->anonymity_level = ntohl (sm->anonymity_level);
+ pr->bf = refresh_bloomfilter (pr->replies_seen_off,
+ &pr->mingle,
+ &pr->bf_size,
+ pr->replies_seen);
+ pr->query = sm->query;
switch (type)
{
case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: