*/
struct GSF_ConnectedPeer;
-
/**
* An active request.
*/
struct GSF_PendingRequest;
-
/**
* A local client.
*/
struct GSF_LocalClient;
+/**
+ * Our connection to the datastore.
+ */
+extern struct GNUNET_DATASTORE_Handle *GSF_dsh;
+
+/**
+ * Our configuration.
+ */
+extern const struct GNUNET_CONFIGURATION_Handle *GSF_cfg;
+
+/**
+ * Handle for reporting statistics.
+ */
+extern struct GNUNET_STATISTICS_Handle *GSF_stats;
+
+/**
+ * Pointer to handle to the core service (points to NULL until we've
+ * connected to it).
+ */
+extern struct GNUNET_CORE_Handle *GSF_core;
+
+/**
+ * Handle for DHT operations.
+ */
+static struct GNUNET_DHT_Handle *GSF_dht;
+
+
+
#endif
/* end of gnunet-service-fs.h */
*/
uint64_t inc_preference;
- /**
- * Trust rating for this peer
- */
- uint32_t trust;
-
/**
* Trust rating for this peer on disk.
*/
}
+/**
+ * Return the performance data record for the given peer
+ *
+ * @param cp peer to query
+ * @return performance data record for the peer
+ */
+struct GSF_PeerPerformanceData *
+GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
+{
+ return &cp->ppd;
+}
+
+
/**
* Core is ready to transmit to a peer, get the message.
*
GNUNET_break (0);
return GNUNET_OK;
}
- cp->ppd.migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
+ cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
update_atsi (cp, atsi);
return GNUNET_OK;
}
+/**
+ * Handle a reply to a pending request. Also called if a request
+ * expires (then with data == NULL). The handler may be called
+ * many times (depending on the request type), but will not be
+ * called during or after a call to GSF_pending_request_cancel
+ * and will also not be called anymore after a call signalling
+ * expiration.
+ *
+ * @param cls user-specified closure
+ * @param pr handle to the original pending request
+ * @param data response data, NULL on request expiration
+ * @param data_len number of bytes in data
+ */
+static void
+handle_p2p_reply (void *cls,
+ struct GSF_PendingRequest *pr,
+ const void *data,
+ size_t data_len)
+{
+#if SUPPORT_DELAYS
+ struct GNUNET_TIME_Relative art_delay;
+#endif
+
+ /* FIXME: adapt code fragments below to new API! */
+
+
+ /* reply will go over the network, check for cover traffic */
+ if ( (prq->anonymity_level > 1) &&
+ (cover_content_count < prq->anonymity_level - 1) )
+ {
+ /* insufficient cover traffic, skip */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# replies suppressed due to lack of cover traffic"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_YES;
+ }
+ if (prq->anonymity_level > 1)
+ cover_content_count -= prq->anonymity_level - 1;
+
+
+ cp = pr->cp;
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitting result for query `%s' to other peer (PID=%u)\n",
+ GNUNET_h2s (key),
+ (unsigned int) cp->pid);
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# replies received for other peers"),
+ 1,
+ GNUNET_NO);
+ msize = sizeof (struct PutMessage) + prq->size;
+ reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
+ reply->cont = &transmit_reply_continuation;
+ reply->cont_cls = pr;
+#if SUPPORT_DELAYS
+ art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ TTL_DECREMENT));
+ reply->delay_until
+ = GNUNET_TIME_relative_to_absolute (art_delay);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("cummulative artificial delay introduced (ms)"),
+ art_delay.abs_value,
+ GNUNET_NO);
+#endif
+ reply->msize = msize;
+ reply->priority = UINT32_MAX; /* send replies first! */
+ pm = (struct PutMessage*) &reply[1];
+ pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+ pm->header.size = htons (msize);
+ pm->type = htonl (prq->type);
+ pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
+ memcpy (&pm[1], prq->data, prq->size);
+ add_to_pending_messages_for_peer (cp, reply, pr);
+
+
+}
+
+
+
/**
* Handle P2P "QUERY" message.
*
GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
const struct GNUNET_MessageHeader *message)
{
+ /* FIXME: adapt old code to new API! */
+ struct PendingRequest *pr;
+ struct ConnectedPeer *cp;
+ struct ConnectedPeer *cps;
+ struct CheckDuplicateRequestClosure cdc;
+ struct GNUNET_TIME_Relative timeout;
+ uint16_t msize;
+ const struct GetMessage *gm;
+ unsigned int bits;
+ const GNUNET_HashCode *opt;
+ uint32_t bm;
+ size_t bfsize;
+ uint32_t ttl_decrement;
+ int32_t priority;
+ enum GNUNET_BLOCK_Type type;
+ int have_ns;
+
+ msize = ntohs(message->size);
+ if (msize < sizeof (struct GetMessage))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ gm = (const struct GetMessage*) message;
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received request for `%s'\n",
+ GNUNET_h2s (&gm->query));
+#endif
+ type = ntohl (gm->type);
+ bm = ntohl (gm->hash_bitmap);
+ bits = 0;
+ while (bm > 0)
+ {
+ if (1 == (bm & 1))
+ bits++;
+ bm >>= 1;
+ }
+ if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ opt = (const GNUNET_HashCode*) &gm[1];
+ bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
+ /* bfsize must be power of 2, check! */
+ if (0 != ( (bfsize - 1) & bfsize))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ cover_query_count++;
+ bm = ntohl (gm->hash_bitmap);
+ bits = 0;
+ cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &other->hashPubKey);
+ if (NULL == cps)
+ {
+ /* peer must have just disconnected */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped due to initiator not being connected"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_SYSERR;
+ }
+ if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &opt[bits++]);
+ else
+ cp = cps;
+ if (cp == NULL)
+ {
+#if DEBUG_FS
+ if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
+ GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
+
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to find peer `%4s' in connection set. Dropping query.\n",
+ GNUNET_i2s (other));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped due to missing reverse route"),
+ 1,
+ GNUNET_NO);
+ /* FIXME: try connect? */
+ return GNUNET_OK;
+ }
+ /* note that we can really only check load here since otherwise
+ peers could find out that we are overloaded by not being
+ disconnected after sending us a malformed query... */
+ priority = bound_priority (ntohl (gm->priority), cps);
+ if (priority < 0)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Dropping query from `%s', this peer is too busy.\n",
+ GNUNET_i2s (other));
+#endif
+ return GNUNET_OK;
+ }
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
+ GNUNET_h2s (&gm->query),
+ (unsigned int) type,
+ GNUNET_i2s (other),
+ (unsigned int) bm);
+#endif
+ have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
+ pr = GNUNET_malloc (sizeof (struct PendingRequest) +
+ (have_ns ? sizeof(GNUNET_HashCode) : 0));
+ if (have_ns)
+ {
+ pr->namespace = (GNUNET_HashCode*) &pr[1];
+ memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
+ }
+ if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
+ (GNUNET_LOAD_get_average (cp->transmission_delay) >
+ GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
+ {
+ /* don't have BW to send to peer, or would likely take longer than we have for it,
+ so at best indirect the query */
+ priority = 0;
+ pr->forward_only = GNUNET_YES;
+ }
+ pr->type = type;
+ pr->mingle = ntohl (gm->filter_mutator);
+ if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
+ pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
+ pr->anonymity_level = 1;
+ pr->priority = (uint32_t) priority;
+ pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
+ pr->query = gm->query;
+ /* decrement ttl (always) */
+ ttl_decrement = 2 * TTL_DECREMENT +
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ TTL_DECREMENT);
+ if ( (pr->ttl < 0) &&
+ (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
+ GNUNET_i2s (other),
+ pr->ttl,
+ ttl_decrement);
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped due TTL underflow"),
+ 1,
+ GNUNET_NO);
+ /* integer underflow => drop (should be very rare)! */
+ GNUNET_free (pr);
+ return GNUNET_OK;
+ }
+ pr->ttl -= ttl_decrement;
+ pr->start_time = GNUNET_TIME_absolute_get ();
+
+ /* get bloom filter */
+ if (bfsize > 0)
+ {
+ pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
+ bfsize,
+ BLOOMFILTER_K);
+ pr->bf_size = bfsize;
+ }
+ cdc.have = NULL;
+ cdc.pr = pr;
+ GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+ &gm->query,
+ &check_duplicate_request_peer,
+ &cdc);
+ if (cdc.have != NULL)
+ {
+ if (cdc.have->start_time.abs_value + cdc.have->ttl >=
+ pr->start_time.abs_value + pr->ttl)
+ {
+ /* existing request has higher TTL, drop new one! */
+ cdc.have->priority += pr->priority;
+ destroy_pending_request (pr);
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Have existing request with higher TTL, dropping new request.\n",
+ GNUNET_i2s (other));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped due to higher-TTL request"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_OK;
+ }
+ else
+ {
+ /* existing request has lower TTL, drop old one! */
+ pr->priority += cdc.have->priority;
+ /* Possible optimization: if we have applicable pending
+ replies in 'cdc.have', we might want to move those over
+ (this is a really rare special-case, so it is not clear
+ that this would be worth it) */
+ destroy_pending_request (cdc.have);
+ /* keep processing 'pr'! */
+ }
+ }
+
+ pr->cp = cp;
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (query_request_map,
+ &gm->query,
+ pr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (peer_request_map,
+ &other->hashPubKey,
+ pr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+
+ pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
+ pr,
+ pr->start_time.abs_value + pr->ttl);
+
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# P2P searches received"),
+ 1,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# P2P searches active"),
+ 1,
+ GNUNET_NO);
+
+ /* calculate change in traffic preference */
+ cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
+ /* process locally */
+ if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
+ type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
+ timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
+ (pr->priority + 1));
+ if (GNUNET_YES != pr->forward_only)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Handing request for `%s' to datastore\n",
+ GNUNET_h2s (&gm->query));
+#endif
+ pr->qe = GNUNET_DATASTORE_get (dsh,
+ &gm->query,
+ type,
+ pr->priority + 1,
+ MAX_DATASTORE_QUEUE,
+ timeout,
+ &process_local_reply,
+ pr);
+ if (NULL == pr->qe)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped by datastore (queue length limit)"),
+ 1,
+ GNUNET_NO);
+ }
+ }
+ else
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests forwarded due to high load"),
+ 1,
+ GNUNET_NO);
+ }
+
+ /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */
+ switch (pr->type)
+ {
+ case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_IBLOCK:
+ /* only one result, wait for datastore */
+ if (GNUNET_YES != pr->forward_only)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
+ 1,
+ GNUNET_NO);
+ break;
+ }
+ default:
+ if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+ pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
+ pr);
+ }
+
+ /* make sure we don't track too many requests */
+ if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
+ {
+ pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
+ GNUNET_assert (pr != NULL);
+ destroy_pending_request (pr);
+ }
+ return GNUNET_OK;
+
+
+
// FIXME!
// parse request
- // setup pending request
+ // setup pending request (use 'handle_p2p_reply')
// track pending request to cancel it on peer disconnect (!)
// return it!
// (actual planning & execution up to caller!)
}
+/**
+ * Ask a peer to stop migrating data to us until the given point
+ * in time.
+ *
+ * @param cp peer to ask
+ * @param block_time until when to block
+ */
+void
+GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
+ struct GNUNET_TIME_Relative block_time)
+{
+ struct PendingMessage *pm;
+ struct MigrationStopMessage *msm;
+
+ if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value)
+ return; /* already blocked */
+ cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
+
+ /* FIXME: adapt old code below to new API! */
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) +
+ sizeof (struct MigrationStopMessage));
+ pm->msize = sizeof (struct MigrationStopMessage);
+ pm->priority = UINT32_MAX;
+ msm = (struct MigrationStopMessage*) &pm[1];
+ msm->header.size = htons (sizeof (struct MigrationStopMessage));
+ msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+ msm->duration = GNUNET_TIME_relative_hton (block_time);
+ add_to_pending_messages_for_peer (cp,
+ pm,
+ NULL);
+}
+
+
+
+
/**
* Write host-trust information to a file - flush the buffer entry!
*
* @return GNUNET_YES (we should continue to iterate)
*/
static int
-clean_peer (void *cls,
- const GNUNET_HashCode * key,
- void *value)
+clean_local_client (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
{
const struct GSF_LocalClient *lc = cls;
struct GSF_ConnectedPeer *cp = value;
*/
double avg_priority;
+ /**
+ * Trust rating for this peer
+ */
+ uint32_t trust;
+
/**
* Number of pending queries (replies are not counted)
*/
const struct GNUNET_MessageHeader *message);
+/**
+ * Return the performance data record for the given peer
+ *
+ * @param cp peer to query
+ * @return performance data record for the peer
+ */
+struct GSF_PeerPerformanceData *
+GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp);
+
+
+/**
+ * Ask a peer to stop migrating data to us until the given point
+ * in time.
+ *
+ * @param cp peer to ask
+ * @param block_time until when to block
+ */
+void
+GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
+ struct GNUNET_TIME_Relative block_time);
+
+
/**
* A peer disconnected from us. Tear down the connected peer
* record.
}
+/**
+ * Handle a reply to a pending request. Also called if a request
+ * expires (then with data == NULL). The handler may be called
+ * many times (depending on the request type), but will not be
+ * called during or after a call to GSF_pending_request_cancel
+ * and will also not be called anymore after a call signalling
+ * expiration.
+ *
+ * @param cls user-specified closure
+ * @param pr handle to the original pending request
+ * @param data response data, NULL on request expiration
+ * @param data_len number of bytes in data
+ */
+static void
+client_response_handler (void *cls,
+ struct GSF_PendingRequest *pr,
+ const void *data,
+ size_t data_len)
+{
+ /* FIXME: adapt old code below to new API! */
+
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# replies received for local clients"),
+ 1,
+ GNUNET_NO);
+ cl = pr->client_request_list->client_list;
+ msize = sizeof (struct PutMessage) + prq->size;
+ creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
+ creply->msize = msize;
+ creply->client_list = cl;
+ GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
+ cl->res_tail,
+ cl->res_tail,
+ creply);
+ pm = (struct PutMessage*) &creply[1];
+ pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+ pm->header.size = htons (msize);
+ pm->type = htonl (prq->type);
+ pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
+ memcpy (&pm[1], prq->data, prq->size);
+ if (NULL == cl->th)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitting result for query `%s' to client\n",
+ GNUNET_h2s (key));
+#endif
+ cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client,
+ cl);
+ }
+ GNUNET_break (cl->th != NULL);
+ if (pr->do_remove)
+ {
+ prq->finished = GNUNET_YES;
+ destroy_pending_request (pr);
+ }
+
+}
+
+
+
/**
* Handle START_SEARCH-message (search request from local client).
*
*/
struct GSF_PendingRequestData public_data;
+ /**
+ * Function to call if we encounter a reply.
+ */
GSF_PendingRequestReplyHandler rh;
+ /**
+ * Closure for 'rh'
+ */
void *rh_cls;
- const GNUNET_HashCode *replies_seen;
+ /**
+ * Array of hash codes of replies we've already seen.
+ */
+ GNUNET_HashCode *replies_seen;
+ /**
+ * Bloomfilter masking replies we've already seen.
+ */
struct GNUNET_CONTAINER_BloomFilter *bf;
+ /**
+ * Number of valid entries in the 'replies_seen' array.
+ */
unsigned int replies_seen_count;
+ /**
+ * Length of the 'replies_seen' array.
+ */
+ unsigned int replies_seen_size;
+
+ /**
+ * Mingle value we currently use for the bf.
+ */
int32_t mingle;
};
* All pending requests, ordered by the query. Entries
* are of type 'struct GSF_PendingRequest*'.
*/
-static struct GNUNET_CONTAINER_MultiHashMap *requests;
+static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
+
+
+/**
+ * Datastore 'PUT' load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_put_load;
+
+
+/**
+ * Are we allowed to migrate content to this peer.
+ */
+static int active_to_migration;
+
+
+/**
+ * Heap with the request that will expire next at the top. Contains
+ * pointers of type "struct PendingRequest*"; these will *also* be
+ * aliased from the "requests_by_peer" data structures and the
+ * "requests_by_query" table. Note that requests from our clients
+ * don't expire and are thus NOT in the "requests_by_expiration"
+ * (or the "requests_by_peer" tables).
+ */
+static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
+
+
+/**
+ * How many bytes should a bloomfilter be if we have already seen
+ * entry_count responses? Note that BLOOMFILTER_K gives us the number
+ * of bits set per entry. Furthermore, we should not re-size the
+ * filter too often (to keep it cheap).
+ *
+ * Since other peers will also add entries but not resize the filter,
+ * we should generally pick a slightly larger size than what the
+ * strict math would suggest.
+ *
+ * @return must be a power of two and smaller or equal to 2^15.
+ */
+static size_t
+compute_bloomfilter_size (unsigned int entry_count)
+{
+ size_t size;
+ unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
+ uint16_t max = 1 << 15;
+
+ if (entry_count > max)
+ return max;
+ size = 8;
+ while ((size < max) && (size < ideal))
+ size *= 2;
+ if (size > max)
+ return max;
+ return size;
+}
+
+
+/**
+ * Recalculate our bloom filter for filtering replies. This function
+ * will create a new bloom filter from scratch, so it should only be
+ * called if we have no bloomfilter at all (and hence can create a
+ * fresh one of minimal size without problems) OR if our peer is the
+ * initiator (in which case we may resize to larger than mimimum size).
+ *
+ * @param pr request for which the BF is to be recomputed
+ * @return GNUNET_YES if a refresh actually happened
+ */
+static int
+refresh_bloomfilter (struct GSF_PendingRequest *pr)
+{
+ unsigned int i;
+ size_t nsize;
+ GNUNET_HashCode mhash;
+
+ nsize = compute_bloomfilter_size (pr->replies_seen_off);
+ if ( (bf != NULL) &&
+ (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
+ return GNUNET_NO; /* size not changed */
+ if (pr->bf != NULL)
+ GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+ pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT32_MAX);
+ pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
+ nsize,
+ BLOOMFILTER_K);
+ for (i=0;i<pr->replies_seen_count;i++)
+ {
+ GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
+ pr->mingle,
+ &mhash);
+ GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
+ }
+ return GNUNET_YES;
+}
/**
GSF_PendingRequestReplyHandler rh,
void *rh_cls)
{
- return NULL; // FIXME
+ struct GSF_PendingRequest *pr;
+
+
+ pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
+ pr->public_data.query = *query;
+ if (GNUNET_BLOCK_TYPE_SBLOCK == type)
+ {
+ GNUNET_assert (NULL != namespace);
+ pr->public_data.namespace = *namespace;
+ }
+ if (NULL != target)
+ {
+ pr->public_data.target = *target;
+ pr->has_target = GNUNET_YES;
+ }
+ pr->public_data.anonymity_level = anonymity_data;
+ pr->public_data.priority = priority;
+ pr->public_data.options = options;
+ pr->public_data.type = type;
+ pr->rh = rh;
+ pr->rh_cls = rh_cls;
+ if (replies_seen_count > 0)
+ {
+ pr->replies_seen_size = replies_seen_count;
+ pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
+ memcpy (pr->replies_seen,
+ replies_seen,
+ replies_seen_count * sizeof (struct GNUNET_HashCode));
+ pr->replies_seen_count = replies_seen_count;
+ }
+ if (NULL != bf)
+ {
+ pr->bf = GNUNET_CONTAINER_bloomfilter_copy (bf);
+ pr->mingle = mingle;
+ }
+ else if ( (replies_seen_count > 0) &&
+ (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
+ {
+ GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
+ }
+ GNUNET_CONTAINER_multihashmap_put (pr_map,
+ query,
+ pr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ // FIXME: if not a local query, we also need to track the
+ // total number of external queries we currently have and
+ // bound it => need an additional heap!
+ return pr;
}
const GNUNET_HashCode *replies_seen,
unsigned int replies_seen_count)
{
- // FIXME
-}
-
-
-
-/**
- * Get the query for a given pending request.
- *
- * @param pr the request
- * @return pointer to the query (only valid as long as pr is valid)
- */
-const GNUNET_HashCode *
-GSF_pending_request_get_query_ (const struct GSF_PendingRequest *pr)
-{
- return NULL; // FIXME
-}
-
-
-/**
- * Get the type of a given pending request.
- *
- * @param pr the request
- * @return query type
- */
-enum GNUNET_BLOCK_Type
-GSF_pending_request_get_type_ (const struct GSF_PendingRequest *pr)
-{
- return 0; // FIXME
+ unsigned int i;
+ GNUNET_HashCode mhash;
+
+ if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
+ return; /* integer overflow */
+ if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
+ {
+ /* we're responsible for the BF, full refresh */
+ if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
+ GNUNET_array_grow (pr->replies_seen,
+ pr->replies_seen_size,
+ replies_seen_count + pr->replies_seen_count);
+ memcpy (&pr->replies_seen[pr->replies_seen_count],
+ replies_seen,
+ sizeof (GNUNET_HashCode) * replies_seen_count);
+ pr->replies_seen_count += replies_seen;
+ if (GNUNET_NO == refresh_bloomfilter (pr))
+ {
+ /* bf not recalculated, simply extend it with new bits */
+ for (i=0;i<pr->replies_seen_count;i++)
+ {
+ GNUNET_BLOCK_mingle_hash (&replies_seen[i],
+ pr->mingle,
+ &mhash);
+ GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
+ }
+ }
+ }
+ else
+ {
+ if (NULL == pr->bf)
+ {
+ /* we're not the initiator, but the initiator did not give us
+ any bloom-filter, so we need to create one on-the-fly */
+ pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT32_MAX);
+ pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count),
+ pr->mingle,
+ BLOOMFILTER_K);
+ }
+ for (i=0;i<pr->replies_seen_count;i++)
+ {
+ GNUNET_BLOCK_mingle_hash (&replies_seen[i],
+ pr->mingle,
+ &mhash);
+ GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
+ }
+ }
}
* transmission to other peers (or at least determine its size).
*
* @param pr request to generate the message for
+ * @param do_route are we routing the reply
* @param buf_size number of bytes available in buf
* @param buf where to copy the message (can be NULL)
* @return number of bytes needed (if > buf_size) or used
*/
size_t
GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
+ int do_route,
size_t buf_size,
void *buf)
{
- return 0; // FIXME
+ struct PendingMessage *pm;
+ char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
+ struct GetMessage *gm;
+ GNUNET_HashCode *ext;
+ size_t msize;
+ unsigned int k;
+ int no_route;
+ uint32_t bm;
+ uint32_t prio;
+ size_t bf_size;
+
+ k = 0;
+ bm = 0;
+ if (GNUNET_YES != do_route)
+ {
+ bm |= GET_MESSAGE_BIT_RETURN_TO;
+ k++;
+ }
+ if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
+ {
+ bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
+ k++;
+ }
+ if (GNUNET_YES == pr->has_target)
+ {
+ bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
+ k++;
+ }
+ bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
+ msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
+ GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
+ if (buf_size < msize)
+ return msize;
+ gm = (struct GetMessage*) lbuf;
+ gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
+ gm->header.size = htons (msize);
+ gm->type = htonl (pr->type);
+ if (GNUNET_YES == do_route)
+ prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ pr->public_data.priority + 1);
+ else
+ prio = 0;
+ pr->public_data.priority -= prio;
+ gm->priority = htonl (prio);
+ gm->ttl = htonl (pr->ttl);
+ gm->filter_mutator = htonl(pr->mingle);
+ gm->hash_bitmap = htonl (bm);
+ gm->query = pr->query;
+ ext = (GNUNET_HashCode*) &gm[1];
+ k = 0;
+ if (GNUNET_YES != do_route)
+ GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
+ if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
+ memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
+ if (GNUNET_YES == pr->has_target)
+ GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
+ if (pr->bf != NULL)
+ GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
+ (char*) &ext[k],
+ bf_size);
+ memcpy (buf, gm, msize);
+ return msize;
+}
+
+
+/**
+ * Iterator to free pending requests.
+ *
+ * @param cls closure, unused
+ * @param key current key code
+ * @param value value in the hash map (pending request)
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int
+clean_request (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct GSF_PendingRequest *pr = value;
+
+ GNUNET_free_non_null (pr->replies_seen);
+ if (NULL != pr->bf)
+ GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+ GNUNET_free (pr);
+ return GNUNET_YES;
}
void
GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
{
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_remove (pr_map,
+ &pr->public_data.query,
+ pr));
+ GNUNET_assert (GNUNET_YES ==
+ clean_request (NULL, &pr->public_data.query, pr));
}
* @param cls closure for it
*/
void
-GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it,
- void *cls)
+GSF_iterate_pending_pr_map_ (GSF_PendingRequestIterator it,
+ void *cls)
+{
+ GNUNET_CONTAINER_multihashmap_iterate (pr_map,
+ (GNUNET_CONTAINER_HashMapIterator) it,
+ cls);
+}
+
+
+
+
+/**
+ * Closure for "process_reply" function.
+ */
+struct ProcessReplyClosure
+{
+ /**
+ * The data for the reply.
+ */
+ const void *data;
+
+ /**
+ * Who gave us this reply? NULL for local host (or DHT)
+ */
+ struct ConnectedPeer *sender;
+
+ /**
+ * When the reply expires.
+ */
+ struct GNUNET_TIME_Absolute expiration;
+
+ /**
+ * Size of data.
+ */
+ size_t size;
+
+ /**
+ * Type of the block.
+ */
+ enum GNUNET_BLOCK_Type type;
+
+ /**
+ * How much was this reply worth to us?
+ */
+ uint32_t priority;
+
+ /**
+ * Anonymity requirements for this reply.
+ */
+ uint32_t anonymity_level;
+
+ /**
+ * Evaluation result (returned).
+ */
+ enum GNUNET_BLOCK_EvaluationResult eval;
+
+ /**
+ * Did we finish processing the associated request?
+ */
+ int finished;
+
+ /**
+ * Did we find a matching request?
+ */
+ int request_found;
+};
+
+
+/**
+ * Update the performance data for the sender (if any) since
+ * the sender successfully answered one of our queries.
+ *
+ * @param prq information about the sender
+ * @param pr request that was satisfied
+ */
+static void
+update_request_performance_data (struct ProcessReplyClosure *prq,
+ struct GSF_PendingRequest *pr)
+{
+ unsigned int i;
+ struct GNUNET_TIME_Relative cur_delay;
+
+ if (prq->sender == NULL)
+ return;
+ /* FIXME: adapt code to new API... */
+ for (i=0;i<pr->used_targets_off;i++)
+ if (pr->used_targets[i].pid == prq->sender->pid)
+ break;
+ if (i < pr->used_targets_off)
+ {
+ cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);
+ prq->sender->avg_delay.rel_value
+ = (prq->sender->avg_delay.rel_value *
+ (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N;
+ prq->sender->avg_priority
+ = (prq->sender->avg_priority *
+ (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+ }
+ if (pr->cp != NULL)
+ {
+ GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
+ [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
+ -1);
+ GNUNET_PEER_change_rc (pr->cp->pid, 1);
+ prq->sender->last_p2p_replies
+ [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
+ = pr->cp->pid;
+ }
+ else
+ {
+ if (NULL != prq->sender->last_client_replies
+ [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
+ GNUNET_SERVER_client_drop (prq->sender->last_client_replies
+ [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
+ prq->sender->last_client_replies
+ [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
+ = pr->client_request_list->client_list->client;
+ GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
+ }
+}
+
+
+
+/**
+ * We have received a reply; handle it!
+ *
+ * @param cls response (struct ProcessReplyClosure)
+ * @param key our query
+ * @param value value in the hash map (info about the query)
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int
+process_reply (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct ProcessReplyClosure *prq = cls;
+ struct GSF_PendingRequest *pr = value;
+ struct PendingMessage *reply;
+ struct ClientResponseMessage *creply;
+ struct ClientList *cl;
+ struct PutMessage *pm;
+ struct ConnectedPeer *cp;
+ size_t msize;
+
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Matched result (type %u) for query `%s' with pending request\n",
+ (unsigned int) prq->type,
+ GNUNET_h2s (key));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# replies received and matched"),
+ 1,
+ GNUNET_NO);
+ prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
+ prq->type,
+ key,
+ &pr->bf,
+ pr->mingle,
+ pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
+ prq->data,
+ prq->size);
+ switch (prq->eval)
+ {
+ case GNUNET_BLOCK_EVALUATION_OK_MORE:
+ update_request_performance_data (prq, pr);
+ break;
+ case GNUNET_BLOCK_EVALUATION_OK_LAST:
+ update_request_performance_data (prq, pr);
+ /* FIXME: adapt code to new API! */
+ while (NULL != pr->pending_head)
+ destroy_pending_message_list_entry (pr->pending_head);
+ if (pr->qe != NULL)
+ {
+ if (pr->client_request_list != NULL)
+ GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
+ GNUNET_YES);
+ GNUNET_DATASTORE_cancel (pr->qe);
+ pr->qe = NULL;
+ }
+ pr->do_remove = GNUNET_YES;
+ if (pr->task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (pr->task);
+ pr->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (query_request_map,
+ key,
+ pr));
+ GNUNET_LOAD_update (rt_entry_lifetime,
+ GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
+ break;
+ case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# duplicate replies discarded (bloomfilter)"),
+ 1,
+ GNUNET_NO);
+#if DEBUG_FS && 0
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Duplicate response `%s', discarding.\n",
+ GNUNET_h2s (&mhash));
+#endif
+ return GNUNET_YES; /* duplicate */
+ case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
+ return GNUNET_YES; /* wrong namespace */
+ case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
+ GNUNET_break (0);
+ return GNUNET_YES;
+ case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
+ GNUNET_break (0);
+ return GNUNET_YES;
+ case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Unsupported block type %u\n"),
+ prq->type);
+ return GNUNET_NO;
+ }
+ /* FIXME: adapt code to new API! */
+ if (pr->client_request_list != NULL)
+ {
+ if (pr->replies_seen_size == pr->replies_seen_off)
+ GNUNET_array_grow (pr->replies_seen,
+ pr->replies_seen_size,
+ pr->replies_seen_size * 2 + 4);
+ GNUNET_CRYPTO_hash (prq->data,
+ prq->size,
+ &pr->replies_seen[pr->replies_seen_off++]);
+ refresh_bloomfilter (pr);
+ }
+ if (NULL == prq->sender)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Found result for query `%s' in local datastore\n",
+ GNUNET_h2s (key));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# results found locally"),
+ 1,
+ GNUNET_NO);
+ }
+ prq->priority += pr->remaining_priority;
+ pr->remaining_priority = 0;
+ pr->results_found++;
+ prq->request_found = GNUNET_YES;
+ /* finally, pass on to other peers / local clients */
+ pr->rh (pr->rh_cls, pr, prq->data, prq->size);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Continuation called to notify client about result of the
+ * operation.
+ *
+ * @param cls closure
+ * @param success GNUNET_SYSERR on failure
+ * @param msg NULL on success, otherwise an error message
+ */
+static void
+put_migration_continuation (void *cls,
+ int success,
+ const char *msg)
+{
+ struct GNUNET_TIME_Absolute *start = cls;
+ struct GNUNET_TIME_Relative delay;
+
+ delay = GNUNET_TIME_absolute_get_duration (*start);
+ GNUNET_free (start);
+ /* FIXME: should we really update the load value on failure? */
+ GNUNET_LOAD_update (datastore_put_load,
+ delay.rel_value);
+ if (GNUNET_OK == success)
+ return;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# datastore 'put' failures"),
+ 1,
+ GNUNET_NO);
+}
+
+
+/**
+ * Test if the DATABASE (PUT) load on this peer is too high
+ * to even consider processing the query at
+ * all.
+ *
+ * @return GNUNET_YES if the load is too high to do anything (load high)
+ * GNUNET_NO to process normally (load normal or low)
+ */
+static int
+test_put_load_too_high (uint32_t priority)
+{
+ double ld;
+
+ if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
+ return GNUNET_NO; /* very fast */
+ ld = GNUNET_LOAD_get_load (datastore_put_load);
+ if (ld < 2.0 * (1 + priority))
+ return GNUNET_NO;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# storage requests dropped due to high load"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Iterator called on each result obtained for a DHT
+ * operation that expects a reply
+ *
+ * @param cls closure
+ * @param exp when will this value expire
+ * @param key key of the result
+ * @param get_path NULL-terminated array of pointers
+ * to the peers on reverse GET path (or NULL if not recorded)
+ * @param put_path NULL-terminated array of pointers
+ * to the peers on the PUT path (or NULL if not recorded)
+ * @param type type of the result
+ * @param size number of bytes in data
+ * @param data pointer to the result data
+ */
+void
+GSF_handle_dht_reply_ (void *cls,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key,
+ const struct GNUNET_PeerIdentity * const *get_path,
+ const struct GNUNET_PeerIdentity * const *put_path,
+ enum GNUNET_BLOCK_Type type,
+ size_t size,
+ const void *data)
{
- // FIXME
+ struct GSF_PendingRequest *pr = cls;
+ struct ProcessReplyClosure prq;
+
+ memset (&prq, 0, sizeof (prq));
+ prq.data = data;
+ prq.expiration = exp;
+ prq.size = size;
+ prq.type = type;
+ process_reply (&prq, key, pr);
+ if ( (GNUNET_YES == active_to_migration) &&
+ (GNUNET_NO == test_put_load_too_high (prq.priority)) )
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Replicating result for query `%s' with priority %u\n",
+ GNUNET_h2s (&query),
+ prq.priority);
+#endif
+ start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
+ *start = GNUNET_TIME_absolute_get ();
+ GNUNET_DATASTORE_put (dsh,
+ 0, &query, dsize, &put[1],
+ type, prq.priority, 1 /* anonymity */,
+ expiration,
+ 1 + prq.priority, MAX_DATASTORE_QUEUE,
+ GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+ &put_migration_continuation,
+ start);
+ }
}
* this content and possibly passes it on (to local clients or other
* peers). Does NOT perform migration (content caching at this peer).
*
- * @param other the other peer involved (sender or receiver, NULL
+ * @param cp the other peer involved (sender or receiver, NULL
* for loopback messages where we are both sender and receiver)
* @param message the actual message
- * @return how valueable was the content to us (0 for not at all),
+ * @return GNUNET_OK if the message was well-formed,
* GNUNET_SYSERR if the message was malformed (close connection,
* do not cache under any circumstances)
*/
int
-GSF_handle_p2p_content_ (const struct GNUNET_PeerIdentity *other,
+GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
const struct GNUNET_MessageHeader *message)
{
- return GNUNET_SYSERR; // FIXME
+ const struct PutMessage *put;
+ uint16_t msize;
+ size_t dsize;
+ enum GNUNET_BLOCK_Type type;
+ struct GNUNET_TIME_Absolute expiration;
+ GNUNET_HashCode query;
+ struct ProcessReplyClosure prq;
+ struct GNUNET_TIME_Relative block_time;
+ double putl;
+ struct GNUNET_TIME_Absolute *start;
+
+ msize = ntohs (message->size);
+ if (msize < sizeof (struct PutMessage))
+ {
+ GNUNET_break_op(0);
+ return GNUNET_SYSERR;
+ }
+ put = (const struct PutMessage*) message;
+ dsize = msize - sizeof (struct PutMessage);
+ type = ntohl (put->type);
+ expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
+ if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+ return GNUNET_SYSERR;
+ if (GNUNET_OK !=
+ GNUNET_BLOCK_get_key (block_ctx,
+ type,
+ &put[1],
+ dsize,
+ &query))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ /* now, lookup 'query' */
+ prq.data = (const void*) &put[1];
+ if (NULL != cp)
+ prq.sender = cp;
+ else
+ prq.sender = NULL;
+ prq.size = dsize;
+ prq.type = type;
+ prq.expiration = expiration;
+ prq.priority = 0;
+ prq.anonymity_level = 1;
+ prq.finished = GNUNET_NO;
+ prq.request_found = GNUNET_NO;
+ GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+ &query,
+ &process_reply,
+ &prq);
+ if (NULL != cp)
+ {
+ GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
+ GSF_get_peer_performance_data (cp)->trust += prq.priority;
+ }
+ if ( (GNUNET_YES == active_to_migration) &&
+ (GNUNET_NO == test_put_load_too_high (prq.priority)) )
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Replicating result for query `%s' with priority %u\n",
+ GNUNET_h2s (&query),
+ prq.priority);
+#endif
+ start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
+ *start = GNUNET_TIME_absolute_get ();
+ GNUNET_DATASTORE_put (dsh,
+ 0, &query, dsize, &put[1],
+ type, prq.priority, 1 /* anonymity */,
+ expiration,
+ 1 + prq.priority, MAX_DATASTORE_QUEUE,
+ GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+ &put_migration_continuation,
+ start);
+ }
+ putl = GNUNET_LOAD_get_load (datastore_put_load);
+ if ( (NULL != (cp = prq.sender)) &&
+ (GNUNET_NO == prq.request_found) &&
+ ( (GNUNET_YES != active_to_migration) ||
+ (putl > 2.5 * (1 + prq.priority)) ) )
+ {
+ if (GNUNET_YES != active_to_migration)
+ putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
+ block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ (unsigned int) (60000 * putl * putl)));
+ GSF_block_peer_migration (cp, block_time);
+ }
+ return GNUNET_OK;
}
void
GSF_pending_request_init_ ()
{
- // FIXME
+ pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
}
void
GSF_pending_request_done_ ()
{
- // FIXME
+ GNUNET_CONTAINER_multihashmap_iterate (pr_map,
+ &clean_request,
+ NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (pr_map);
+ pr_map = NULL;
}
/**
- * Public data associated with each pending request.
+ * Public data (in the sense of not encapsulated within
+ * 'gnunet-service-fs_pr', not in the sense of network-wide
+ * known) associated with each pending request.
*/
struct GSF_PendingRequestData
{
unsigned int replies_seen_count);
-/**
- * Get the query for a given pending request.
- *
- * @param pr the request
- * @return pointer to the query (only valid as long as pr is valid)
- */
-const GNUNET_HashCode *
-GSF_pending_request_get_query_ (const struct GSF_PendingRequest *pr);
-
-
-/**
- * Get the type of a given pending request.
- *
- * @param pr the request
- * @return query type
- */
-enum GNUNET_BLOCK_Type
-GSF_pending_request_get_type_ (const struct GSF_PendingRequest *pr);
-
-
/**
* Generate the message corresponding to the given pending request for
* transmission to other peers (or at least determine its size).
*
* @param pr request to generate the message for
+ * @param do_route are we routing the reply
* @param buf_size number of bytes available in buf
* @param buf where to copy the message (can be NULL)
- * @return number of bytes needed (if > buf_size) or used
+ * @return number of bytes needed (if buf_size too small) or used
*/
size_t
GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
+ int do_route,
size_t buf_size,
void *buf);
/**
* Signature of function called on each request.
+ * (Note: 'subtype' of GNUNET_CONTAINER_HashMapIterator).
*
* @param cls closure
* @param key query for the request
* @param pr handle to the pending request
+ * @return GNUNET_YES to continue to iterate
*/
typedef int (*GSF_PendingRequestIterator)(void *cls,
const GNUNET_HashCode *key,
* this content and possibly passes it on (to local clients or other
* peers). Does NOT perform migration (content caching at this peer).
*
- * @param other the other peer involved (sender or receiver, NULL
+ * @param cp the other peer involved (sender or receiver, NULL
* for loopback messages where we are both sender and receiver)
* @param message the actual message
- * @return how valueable was the content to us (0 for not at all),
+ * @return GNUNET_OK if the message was well-formed,
* GNUNET_SYSERR if the message was malformed (close connection,
* do not cache under any circumstances)
*/
int
-GSF_handle_p2p_content_ (const struct GNUNET_PeerIdentity *other,
+GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
const struct GNUNET_MessageHeader *message);
+/**
+ * Iterator called on each result obtained for a DHT
+ * operation that expects a reply
+ *
+ * @param cls closure, the 'struct GSF_PendingRequest *'.
+ * @param exp when will this value expire
+ * @param key key of the result
+ * @param get_path NULL-terminated array of pointers
+ * to the peers on reverse GET path (or NULL if not recorded)
+ * @param put_path NULL-terminated array of pointers
+ * to the peers on the PUT path (or NULL if not recorded)
+ * @param type type of the result
+ * @param size number of bytes in data
+ * @param data pointer to the result data
+ */
+void
+GSF_handle_dht_reply_ (void *cls,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key,
+ const struct GNUNET_PeerIdentity * const *get_path,
+ const struct GNUNET_PeerIdentity * const *put_path,
+ enum GNUNET_BLOCK_Type type,
+ size_t size,
+ const void *data);
+
+
/**
* Setup the subsystem.
*/
--- /dev/null
+/*
+ This file is part of GNUnet.
+ (C) 2011 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_push.c
+ * @brief API to push content from our datastore to other peers
+ * ('anonymous'-content P2P migration)
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet-service-fs_push.h"
+
+
+/* FIXME: below are only old code fragments to use... */
+
+/**
+ * Block that is ready for migration to other peers. Actual data is at the end of the block.
+ */
+struct MigrationReadyBlock
+{
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct MigrationReadyBlock *next;
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct MigrationReadyBlock *prev;
+
+ /**
+ * Query for the block.
+ */
+ GNUNET_HashCode query;
+
+ /**
+ * When does this block expire?
+ */
+ struct GNUNET_TIME_Absolute expiration;
+
+ /**
+ * Peers we would consider forwarding this
+ * block to. Zero for empty entries.
+ */
+ GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
+
+ /**
+ * Size of the block.
+ */
+ size_t size;
+
+ /**
+ * Number of targets already used.
+ */
+ unsigned int used_targets;
+
+ /**
+ * Type of the block.
+ */
+ enum GNUNET_BLOCK_Type type;
+};
+
+
+/**
+ * Head of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_head;
+
+/**
+ * Tail of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_tail;
+
+/**
+ * Request to datastore for migration (or NULL).
+ */
+static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
+
+/**
+ * ID of task that collects blocks for migration.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier mig_task;
+
+/**
+ * What is the maximum frequency at which we are allowed to
+ * poll the datastore for migration content?
+ */
+static struct GNUNET_TIME_Relative min_migration_delay;
+
+/**
+ * Are we allowed to push out content from this peer.
+ */
+static int active_from_migration;
+
+/**
+ * Size of the doubly-linked list of migration blocks.
+ */
+static unsigned int mig_size;
+
+
+/**
+ * Delete the given migration block.
+ *
+ * @param mb block to delete
+ */
+static void
+delete_migration_block (struct MigrationReadyBlock *mb)
+{
+ GNUNET_CONTAINER_DLL_remove (mig_head,
+ mig_tail,
+ mb);
+ GNUNET_PEER_decrement_rcs (mb->target_list,
+ MIGRATION_LIST_SIZE);
+ mig_size--;
+ GNUNET_free (mb);
+}
+
+
+/**
+ * Compare the distance of two peers to a key.
+ *
+ * @param key key
+ * @param p1 first peer
+ * @param p2 second peer
+ * @return GNUNET_YES if P1 is closer to key than P2
+ */
+static int
+is_closer (const GNUNET_HashCode *key,
+ const struct GNUNET_PeerIdentity *p1,
+ const struct GNUNET_PeerIdentity *p2)
+{
+ return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
+ &p2->hashPubKey,
+ key);
+}
+
+
+/**
+ * Consider migrating content to a given peer.
+ *
+ * @param cls 'struct MigrationReadyBlock*' to select
+ * targets for (or NULL for none)
+ * @param key ID of the peer
+ * @param value 'struct ConnectedPeer' of the peer
+ * @return GNUNET_YES (always continue iteration)
+ */
+static int
+consider_migration (void *cls,
+ const GNUNET_HashCode *key,
+ void *value)
+{
+ struct MigrationReadyBlock *mb = cls;
+ struct ConnectedPeer *cp = value;
+ struct MigrationReadyBlock *pos;
+ struct GNUNET_PeerIdentity cppid;
+ struct GNUNET_PeerIdentity otherpid;
+ struct GNUNET_PeerIdentity worstpid;
+ size_t msize;
+ unsigned int i;
+ unsigned int repl;
+
+ /* consider 'cp' as a migration target for mb */
+ if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
+ return GNUNET_YES; /* peer has requested no migration! */
+ if (mb != NULL)
+ {
+ GNUNET_PEER_resolve (cp->pid,
+ &cppid);
+ repl = MIGRATION_LIST_SIZE;
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if (mb->target_list[i] == 0)
+ {
+ mb->target_list[i] = cp->pid;
+ GNUNET_PEER_change_rc (mb->target_list[i], 1);
+ repl = MIGRATION_LIST_SIZE;
+ break;
+ }
+ GNUNET_PEER_resolve (mb->target_list[i],
+ &otherpid);
+ if ( (repl == MIGRATION_LIST_SIZE) &&
+ is_closer (&mb->query,
+ &cppid,
+ &otherpid))
+ {
+ repl = i;
+ worstpid = otherpid;
+ }
+ else if ( (repl != MIGRATION_LIST_SIZE) &&
+ (is_closer (&mb->query,
+ &worstpid,
+ &otherpid) ) )
+ {
+ repl = i;
+ worstpid = otherpid;
+ }
+ }
+ if (repl != MIGRATION_LIST_SIZE)
+ {
+ GNUNET_PEER_change_rc (mb->target_list[repl], -1);
+ mb->target_list[repl] = cp->pid;
+ GNUNET_PEER_change_rc (mb->target_list[repl], 1);
+ }
+ }
+
+ /* consider scheduling transmission to cp for content migration */
+ if (cp->cth != NULL)
+ return GNUNET_YES;
+ msize = 0;
+ pos = mig_head;
+ while (pos != NULL)
+ {
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if (cp->pid == pos->target_list[i])
+ {
+ if (msize == 0)
+ msize = pos->size;
+ else
+ msize = GNUNET_MIN (msize,
+ pos->size);
+ break;
+ }
+ }
+ pos = pos->next;
+ }
+ if (msize == 0)
+ return GNUNET_YES; /* no content available */
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Trying to migrate at least %u bytes to peer `%s'\n",
+ msize,
+ GNUNET_h2s (key));
+#endif
+ if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
+ cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ cp->cth
+ = GNUNET_CORE_notify_transmit_ready (core,
+ 0, GNUNET_TIME_UNIT_FOREVER_REL,
+ (const struct GNUNET_PeerIdentity*) key,
+ msize + sizeof (struct PutMessage),
+ &transmit_to_peer,
+ cp);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ *
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+
+
+/**
+ * If the migration task is not currently running, consider
+ * (re)scheduling it with the appropriate delay.
+ */
+static void
+consider_migration_gathering ()
+{
+ struct GNUNET_TIME_Relative delay;
+
+ if (dsh == NULL)
+ return;
+ if (mig_qe != NULL)
+ return;
+ if (mig_task != GNUNET_SCHEDULER_NO_TASK)
+ return;
+ delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ mig_size);
+ delay = GNUNET_TIME_relative_divide (delay,
+ MAX_MIGRATION_QUEUE);
+ delay = GNUNET_TIME_relative_max (delay,
+ min_migration_delay);
+ mig_task = GNUNET_SCHEDULER_add_delayed (delay,
+ &gather_migration_blocks,
+ NULL);
+}
+
+
+
+
+/**
+ * Process content offered for migration.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ */
+static void
+process_migration_content (void *cls,
+ const GNUNET_HashCode * key,
+ size_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration, uint64_t uid)
+{
+ struct MigrationReadyBlock *mb;
+
+ if (key == NULL)
+ {
+ mig_qe = NULL;
+ if (mig_size < MAX_MIGRATION_QUEUE)
+ consider_migration_gathering ();
+ return;
+ }
+ if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
+ MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
+ {
+ /* content will expire soon, don't bother */
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ return;
+ }
+ if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+ {
+ if (GNUNET_OK !=
+ GNUNET_FS_handle_on_demand_block (key, size, data,
+ type, priority, anonymity,
+ expiration, uid,
+ &process_migration_content,
+ NULL))
+ {
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ }
+ return;
+ }
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Retrieved block `%s' of type %u for migration\n",
+ GNUNET_h2s (key),
+ type);
+#endif
+ mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
+ mb->query = *key;
+ mb->expiration = expiration;
+ mb->size = size;
+ mb->type = type;
+ memcpy (&mb[1], data, size);
+ GNUNET_CONTAINER_DLL_insert_after (mig_head,
+ mig_tail,
+ mig_tail,
+ mb);
+ mig_size++;
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &consider_migration,
+ mb);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+}
+
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ *
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ mig_task = GNUNET_SCHEDULER_NO_TASK;
+ if (dsh != NULL)
+ {
+ mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_migration_content, NULL);
+ GNUNET_assert (mig_qe != NULL);
+ }
+}
+
+
+
+size_t
+API_ (void *cls,
+ size_t size, void *buf)
+{
+ next = mig_head;
+ while (NULL != (mb = next))
+ {
+ next = mb->next;
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if ( (cp->pid == mb->target_list[i]) &&
+ (mb->size + sizeof (migm) <= size) )
+ {
+ GNUNET_PEER_change_rc (mb->target_list[i], -1);
+ mb->target_list[i] = 0;
+ mb->used_targets++;
+ memset (&migm, 0, sizeof (migm));
+ migm.header.size = htons (sizeof (migm) + mb->size);
+ migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+ migm.type = htonl (mb->type);
+ migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
+ memcpy (&cbuf[msize], &migm, sizeof (migm));
+ msize += sizeof (migm);
+ size -= sizeof (migm);
+ memcpy (&cbuf[msize], &mb[1], mb->size);
+ msize += mb->size;
+ size -= mb->size;
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Pushing migration block `%s' (%u bytes) to `%s'\n",
+ GNUNET_h2s (&mb->query),
+ (unsigned int) mb->size,
+ GNUNET_i2s (&pid));
+#endif
+ break;
+ }
+ else
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
+ GNUNET_h2s (&mb->query),
+ (unsigned int) mb->size,
+ GNUNET_i2s (&pid));
+#endif
+ }
+ }
+ if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
+ (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
+ {
+ delete_migration_block (mb);
+ consider_migration_gathering ();
+ }
+ }
+ consider_migration (NULL,
+ &pid.hashPubKey,
+ cp);
+
+}
+
+
+
--- /dev/null
+/*
+ This file is part of GNUnet.
+ (C) 2011 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_put.c
+ * @brief API to PUT zero-anonymity index data from our datastore into the DHT
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet-service-fs_put.h"
+
+/* FIXME: below are only old code fragments to use... */
+
+
+/**
+ * Request to datastore for DHT PUTs (or NULL).
+ */
+static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
+
+
+/**
+ * Type we will request for the next DHT PUT round from the datastore.
+ */
+static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
+
+/**
+ * ID of task that collects blocks for DHT PUTs.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier dht_task;
+
+/**
+ * How many entires with zero anonymity do we currently estimate
+ * to have in the database?
+ */
+static unsigned int zero_anonymity_count_estimate;
+
+
+
+
+
+/**
+ * Task that is run periodically to obtain blocks for DHT PUTs.
+ *
+ * @param cls type of blocks to gather
+ * @param tc scheduler context (unused)
+ */
+static void
+gather_dht_put_blocks (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+
+/**
+ * If the DHT PUT gathering task is not currently running, consider
+ * (re)scheduling it with the appropriate delay.
+ */
+static void
+consider_dht_put_gathering (void *cls)
+{
+ struct GNUNET_TIME_Relative delay;
+
+ if (dsh == NULL)
+ return;
+ if (dht_qe != NULL)
+ return;
+ if (dht_task != GNUNET_SCHEDULER_NO_TASK)
+ return;
+ if (zero_anonymity_count_estimate > 0)
+ {
+ delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
+ zero_anonymity_count_estimate);
+ delay = GNUNET_TIME_relative_min (delay,
+ MAX_DHT_PUT_FREQ);
+ }
+ else
+ {
+ /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
+ (hopefully) appear */
+ delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
+ }
+ dht_task = GNUNET_SCHEDULER_add_delayed (delay,
+ &gather_dht_put_blocks,
+ cls);
+}
+
+
+
+/**
+ * Store content in DHT.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ */
+static void
+process_dht_put_content (void *cls,
+ const GNUNET_HashCode * key,
+ size_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration, uint64_t uid)
+{
+ static unsigned int counter;
+ static GNUNET_HashCode last_vhash;
+ static GNUNET_HashCode vhash;
+
+ if (key == NULL)
+ {
+ dht_qe = NULL;
+ consider_dht_put_gathering (cls);
+ return;
+ }
+ /* slightly funky code to estimate the total number of values with zero
+ anonymity from the maximum observed length of a monotonically increasing
+ sequence of hashes over the contents */
+ GNUNET_CRYPTO_hash (data, size, &vhash);
+ if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
+ {
+ if (zero_anonymity_count_estimate > 0)
+ zero_anonymity_count_estimate /= 2;
+ counter = 0;
+ }
+ last_vhash = vhash;
+ if (counter < 31)
+ counter++;
+ if (zero_anonymity_count_estimate < (1 << counter))
+ zero_anonymity_count_estimate = (1 << counter);
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Retrieved block `%s' of type %u for DHT PUT\n",
+ GNUNET_h2s (key),
+ type);
+#endif
+ GNUNET_DHT_put (dht_handle,
+ key,
+ DEFAULT_PUT_REPLICATION,
+ GNUNET_DHT_RO_NONE,
+ type,
+ size,
+ data,
+ expiration,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &dht_put_continuation,
+ cls);
+}
+
+
+
+/**
+ * Task that is run periodically to obtain blocks for DHT PUTs.
+ *
+ * @param cls type of blocks to gather
+ * @param tc scheduler context (unused)
+ */
+static void
+gather_dht_put_blocks (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ dht_task = GNUNET_SCHEDULER_NO_TASK;
+ if (dsh != NULL)
+ {
+ if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+ dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
+ dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ dht_put_type++,
+ &process_dht_put_content, NULL);
+ GNUNET_assert (dht_qe != NULL);
+ }
+}