#include "gnunet-service-fs_indexing.h"
#include "gnunet-service-fs_pe.h"
#include "gnunet-service-fs_pr.h"
+#include "gnunet-service-fs_stream.h"
+
+
+/**
+ * Desired replication level for GETs.
+ */
+#define DHT_GET_REPLICATION 5
/**
* Maximum size of the datastore queue for P2P operations. Needs to
*/
#define INSANE_STATISTICS GNUNET_NO
+/**
+ * If obtaining a block via stream fails, how often do we retry it before
+ * giving up for good (and sticking to non-anonymous transfer)?
+ */
+#define STREAM_RETRY_MAX 3
+
+
/**
* An active request.
*/
*/
struct GNUNET_DHT_GetHandle *gh;
+ /**
+ * Stream request handle for this request (or NULL for none).
+ */
+ struct GSF_StreamRequest *stream_request;
+
/**
* Function to call upon completion of the local get
* request, or NULL for none.
*/
uint64_t first_uid;
+ /**
+ * How often have we retried this request via 'stream'?
+ * (used to bound overall retries).
+ */
+ unsigned int stream_retry_count;
+
/**
* Number of valid entries in the 'replies_seen' array.
*/
struct GSF_PendingRequest *
GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
enum GNUNET_BLOCK_Type type,
- const struct GNUNET_HashCode * query,
- const struct GNUNET_HashCode * namespace,
+ const struct GNUNET_HashCode *query,
+ const struct GNUNET_HashCode *namespace,
const struct GNUNET_PeerIdentity *target,
const char *bf_data, size_t bf_size,
uint32_t mingle, uint32_t anonymity_level,
uint32_t priority, int32_t ttl,
GNUNET_PEER_Id sender_pid,
GNUNET_PEER_Id origin_pid,
- const struct GNUNET_HashCode * replies_seen,
+ const struct GNUNET_HashCode *replies_seen,
unsigned int replies_seen_count,
GSF_PendingRequestReplyHandler rh, void *rh_cls)
{
GNUNET_DHT_get_stop (pr->gh);
pr->gh = NULL;
}
+ if (NULL != pr->stream_request)
+ {
+ GSF_stream_query_cancel (pr->stream_request);
+ pr->stream_request = NULL;
+ }
if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
{
GNUNET_SCHEDULER_cancel (pr->warn_task);
GNUNET_DHT_get_stop (pr->gh);
pr->gh = NULL;
}
+ if (NULL != pr->stream_request)
+ {
+ GSF_stream_query_cancel (pr->stream_request);
+ pr->stream_request = NULL;
+ }
if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
{
GNUNET_SCHEDULER_cancel (pr->warn_task);
pr->gh =
GNUNET_DHT_get_start (GSF_dht,
pr->public_data.type, &pr->public_data.query,
- 5 /* DEFAULT_GET_REPLICATION */ ,
+ DHT_GET_REPLICATION,
GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
xquery, xquery_size, &handle_dht_reply, pr);
if ( (NULL != pr->gh) &&
}
+/**
+ * Function called with a reply from the stream.
+ *
+ * @param cls the pending request struct
+ * @param type type of the block, ANY on error
+ * @param expiration expiration time for the block
+ * @param data_size number of bytes in 'data', 0 on error
+ * @param data reply block data, NULL on error
+ */
+static void
+stream_reply_proc (void *cls,
+ enum GNUNET_BLOCK_Type type,
+ struct GNUNET_TIME_Absolute expiration,
+ size_t data_size,
+ const void *data)
+{
+ struct GSF_PendingRequest *pr = cls;
+ struct ProcessReplyClosure prq;
+ struct GNUNET_HashCode query;
+
+ pr->stream_request = NULL;
+ if (GNUNET_BLOCK_TYPE_ANY == type)
+ {
+ GNUNET_break (NULL == data);
+ GNUNET_break (0 == data_size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Error retrieiving block via stream\n");
+ pr->stream_retry_count++;
+ if (pr->stream_retry_count >= STREAM_RETRY_MAX)
+ return; /* give up on stream */
+ /* retry -- without delay, as this is non-anonymous
+ and mesh/stream connect will take some time anyway */
+ pr->stream_request = GSF_stream_query (pr->public_data.target,
+ &pr->public_data.query,
+ pr->public_data.type,
+ &stream_reply_proc,
+ pr);
+ return;
+ }
+ if (GNUNET_YES !=
+ GNUNET_BLOCK_get_key (GSF_block_ctx,
+ type,
+ data, data_size, &query))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to derive key for block of type %d\n",
+ (int) type);
+ GNUNET_break_op (0);
+ return;
+ }
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# Replies received from STREAM"), 1,
+ GNUNET_NO);
+ memset (&prq, 0, sizeof (prq));
+ prq.data = data;
+ prq.expiration = expiration;
+ /* do not allow migrated content to live longer than 1 year */
+ prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
+ prq.expiration);
+ prq.size = data_size;
+ prq.type = type;
+ process_reply (&prq, &query, pr);
+}
+
+
+/**
+ * Consider downloading via stream (if possible)
+ *
+ * @param pr the pending request to process
+ */
+void
+GSF_stream_lookup_ (struct GSF_PendingRequest *pr)
+{
+ if (0 != pr->public_data.anonymity_level)
+ return;
+ if (0 == pr->public_data.target)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Cannot do stream-based download, target peer not known\n");
+ return;
+ }
+ if (NULL != pr->stream_request)
+ return;
+ pr->stream_request = GSF_stream_query (pr->public_data.target,
+ &pr->public_data.query,
+ pr->public_data.type,
+ &stream_reply_proc,
+ pr);
+}
+
+
/**
* Task that issues a warning if the datastore lookup takes too long.
*
GSF_LocalLookupContinuation cont, void *cont_cls)
{
GNUNET_assert (NULL == pr->gh);
+ GNUNET_assert (NULL == pr->stream_request);
GNUNET_assert (NULL == pr->llc_cont);
pr->llc_cont = cont;
pr->llc_cont_cls = cont_cls;