* @author Christian Grothoff
*
* TODO:
+ * - track per-peer request latency (using new load API)
+ * - consider more precise latency estimation (per-peer & request) -- again load API?
* - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
- * - consider more precise latency estimation (per-peer & request)
* - introduce random latency in processing
- * - tell other peers to stop migration if our PUTs fail (or if
- * we don't support migration per configuration?)
* - more statistics
*/
#include "platform.h"
#include <float.h>
#include "gnunet_constants.h"
#include "gnunet_core_service.h"
+#include "gnunet_dht_service.h"
#include "gnunet_datastore_service.h"
+#include "gnunet_load_lib.h"
#include "gnunet_peer_lib.h"
#include "gnunet_protocols.h"
#include "gnunet_signatures.h"
*/
#define MAX_QUEUE_PER_PEER 16
+/**
+ * Size for the hash map for DHT requests from the FS
+ * service. Should be about the number of concurrent
+ * DHT requests we plan to make.
+ */
+#define FS_DHT_HT_SIZE 1024
+
/**
* How often do we flush trust values to disk?
*/
*/
struct GNUNET_TIME_Relative avg_delay;
+ /**
+ * Point in time until which this peer does not want us to migrate content
+ * to it.
+ */
+ struct GNUNET_TIME_Absolute migration_blocked;
+
+ /**
+ * Time until when we blocked this peer from migrating
+ * data to us.
+ */
+ struct GNUNET_TIME_Absolute last_migration_block;
+
/**
* Handle for an active request for transmission to this
* peer, or NULL.
*/
static struct GNUNET_DATASTORE_Handle *dsh;
+/**
+ * Our block context.
+ */
+static struct GNUNET_BLOCK_Context *block_ctx;
+
+/**
+ * Our block configuration.
+ */
+static struct GNUNET_CONFIGURATION_Handle *block_cfg;
/**
* Our scheduler.
*/
static struct GNUNET_TIME_Relative min_migration_delay;
+/**
+ * Handle for DHT operations.
+ */
+static struct GNUNET_DHT_Handle *dht_handle;
+
/**
* Size of the doubly-linked list of migration blocks.
*/
*/
static double current_priorities;
+/**
+ * Datastore 'GET' load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_get_load;
+
+/**
+ * Datastore 'PUT' load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_put_load;
+
+
+/**
+ * We've just now completed a datastore request. Update our
+ * datastore load calculations.
+ *
+ * @param start time when the datastore request was issued
+ */
+static void
+update_datastore_delays (struct GNUNET_TIME_Absolute start)
+{
+ struct GNUNET_TIME_Relative delay;
+
+ delay = GNUNET_TIME_absolute_get_duration (start);
+ GNUNET_LOAD_update (datastore_get_load,
+ delay.value);
+}
+
+
/**
* Get the filename under which we would store the GNUNET_HELLO_Message
* for the given host and protocol.
/* ******************* clean up functions ************************ */
-
/**
* Delete the given migration block.
*
unsigned int repl;
/* consider 'cp' as a migration target for mb */
+ if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
+ return GNUNET_YES; /* peer has requested no migration! */
if (mb != NULL)
{
GNUNET_PEER_resolve (cp->pid,
{
struct GNUNET_TIME_Relative delay;
+ if (dsh == NULL)
+ return;
if (mig_qe != NULL)
return;
if (mig_task != GNUNET_SCHEDULER_NO_TASK)
consider_migration_gathering ();
return;
}
- if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+ if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
{
if (GNUNET_OK !=
GNUNET_FS_handle_on_demand_block (key, size, data,
expiration, uid,
&process_migration_content,
NULL))
- GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ {
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ }
return;
}
#if DEBUG_FS
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
mig_task = GNUNET_SCHEDULER_NO_TASK;
- mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &process_migration_content, NULL);
- GNUNET_assert (mig_qe != NULL);
+ if (dsh != NULL)
+ {
+ mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_migration_content, NULL);
+ GNUNET_assert (mig_qe != NULL);
+ }
}
TransmissionContinuation cont;
void *cont_cls;
- GNUNET_assert (pml->pm == pm);
- GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
- cont = pm->cont;
- cont_cls = pm->cont_cls;
- destroy_pending_message_list_entry (pml);
- cont (cont_cls, tpid);
+ if (pml != NULL)
+ {
+ GNUNET_assert (pml->pm == pm);
+ GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
+ cont = pm->cont;
+ cont_cls = pm->cont_cls;
+ destroy_pending_message_list_entry (pml);
+ }
+ else
+ {
+ GNUNET_free (pm);
+ }
+ if (cont != NULL)
+ cont (cont_cls, tpid);
}
GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
stats = NULL;
}
- GNUNET_DATASTORE_disconnect (dsh,
- GNUNET_NO);
+ if (dsh != NULL)
+ {
+ GNUNET_DATASTORE_disconnect (dsh,
+ GNUNET_NO);
+ dsh = NULL;
+ }
while (mig_head != NULL)
delete_migration_block (mig_head);
GNUNET_assert (0 == mig_size);
- dsh = NULL;
+ GNUNET_DHT_disconnect (dht_handle);
+ dht_handle = NULL;
+ GNUNET_LOAD_value_free (datastore_get_load);
+ datastore_get_load = NULL;
+ GNUNET_LOAD_value_free (datastore_put_load);
+ datastore_put_load = NULL;
+ GNUNET_BLOCK_context_destroy (block_ctx);
+ block_ctx = NULL;
+ GNUNET_CONFIGURATION_destroy (block_cfg);
+ block_cfg = NULL;
sched = NULL;
cfg = NULL;
GNUNET_free_non_null (trustDirectory);
GNUNET_PEER_change_rc (mb->target_list[i], -1);
mb->target_list[i] = 0;
mb->used_targets++;
+ memset (&migm, 0, sizeof (migm));
migm.header.size = htons (sizeof (migm) + mb->size);
migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
migm.type = htonl (mb->type);
GNUNET_assert (pm->next == NULL);
GNUNET_assert (pm->pml == NULL);
- pml = GNUNET_malloc (sizeof (struct PendingMessageList));
- pml->req = pr;
- pml->target = cp;
- pml->pm = pm;
- pm->pml = pml;
- GNUNET_CONTAINER_DLL_insert (pr->pending_head,
- pr->pending_tail,
- pml);
+ if (pr != NULL)
+ {
+ pml = GNUNET_malloc (sizeof (struct PendingMessageList));
+ pml->req = pr;
+ pml->target = cp;
+ pml->pm = pm;
+ pm->pml = pml;
+ GNUNET_CONTAINER_DLL_insert (pr->pending_head,
+ pr->pending_tail,
+ pml);
+ }
pos = cp->pending_messages_head;
while ( (pos != NULL) &&
(pm->priority < pos->priority) )
}
-/**
- * Mingle hash with the mingle_number to produce different bits.
- */
-static void
-mingle_hash (const GNUNET_HashCode * in,
- int32_t mingle_number,
- GNUNET_HashCode * hc)
-{
- GNUNET_HashCode m;
-
- GNUNET_CRYPTO_hash (&mingle_number,
- sizeof (int32_t),
- &m);
- GNUNET_CRYPTO_hash_xor (&m, in, hc);
-}
-
-
/**
* Test if the load on this peer is too high
* to even consider processing the query at
BLOOMFILTER_K);
for (i=0;i<pr->replies_seen_off;i++)
{
- mingle_hash (&pr->replies_seen[i], pr->mingle, &mhash);
+ GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
+ pr->mingle,
+ &mhash);
GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
}
}
}
if (GNUNET_YES == pr->local_only)
return; /* configured to not do P2P search */
+ /* (0) try DHT */
+ if (0 == pr->anonymity_level)
+ {
+#if 0
+ /* DHT API needs fixing... */
+ pr->dht_get = GNUNET_DHT_get_start (dht_handle,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ pr->type,
+ &pr->query,
+ &process_dht_reply,
+ pr,
+ FIXME,
+ FIXME);
+#endif
+ }
/* (1) select target */
psc.pr = pr;
psc.target_score = -DBL_MAX;
switch (pr->type)
{
- case GNUNET_BLOCK_TYPE_DBLOCK:
- case GNUNET_BLOCK_TYPE_IBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_IBLOCK:
/* only one reply expected, done with the request! */
destroy_pending_request (pr);
break;
case GNUNET_BLOCK_TYPE_ANY:
- case GNUNET_BLOCK_TYPE_KBLOCK:
- case GNUNET_BLOCK_TYPE_SBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_KBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_SBLOCK:
break;
default:
GNUNET_break (0);
*/
size_t size;
- /**
- * Namespace that this reply belongs to
- * (if it is of type SBLOCK).
- */
- GNUNET_HashCode namespace;
-
/**
* Type of the block.
*/
*/
uint32_t priority;
+ /**
+ * Evaluation result (returned).
+ */
+ enum GNUNET_BLOCK_EvaluationResult eval;
+
/**
* Did we finish processing the associated request?
*/
int finished;
+
+ /**
+ * Did we find a matching request?
+ */
+ int request_found;
};
struct PutMessage *pm;
struct ConnectedPeer *cp;
struct GNUNET_TIME_Relative cur_delay;
- GNUNET_HashCode chash;
- GNUNET_HashCode mhash;
size_t msize;
#if DEBUG_FS
GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
}
}
- GNUNET_CRYPTO_hash (prq->data,
- prq->size,
- &chash);
- switch (prq->type)
- {
- case GNUNET_BLOCK_TYPE_DBLOCK:
- case GNUNET_BLOCK_TYPE_IBLOCK:
- /* only possible reply, stop requesting! */
+ prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
+ prq->type,
+ key,
+ &pr->bf,
+ pr->mingle,
+ pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
+ prq->data,
+ prq->size);
+ switch (prq->eval)
+ {
+ case GNUNET_BLOCK_EVALUATION_OK_MORE:
+ break;
+ case GNUNET_BLOCK_EVALUATION_OK_LAST:
while (NULL != pr->pending_head)
destroy_pending_message_list_entry (pr->pending_head);
if (pr->qe != NULL)
if (pr->client_request_list != NULL)
GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
GNUNET_YES);
- GNUNET_DATASTORE_cancel (pr->qe);
+ GNUNET_DATASTORE_cancel (pr->qe);
pr->qe = NULL;
}
pr->do_remove = GNUNET_YES;
key,
pr));
break;
- case GNUNET_BLOCK_TYPE_SBLOCK:
- if (pr->namespace == NULL)
- {
- GNUNET_break (0);
- return GNUNET_YES;
- }
- if (0 != memcmp (pr->namespace,
- &prq->namespace,
- sizeof (GNUNET_HashCode)))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Reply mismatched in terms of namespace. Discarded.\n"));
- return GNUNET_YES; /* wrong namespace */
- }
- /* then: fall-through! */
- case GNUNET_BLOCK_TYPE_KBLOCK:
- case GNUNET_BLOCK_TYPE_NBLOCK:
- if (pr->bf != NULL)
- {
- mingle_hash (&chash, pr->mingle, &mhash);
- if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
- &mhash))
- {
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# duplicate replies discarded (bloomfilter)"),
- 1,
- GNUNET_NO);
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Duplicate response `%s', discarding.\n",
- GNUNET_h2s (&mhash));
-#endif
- return GNUNET_YES; /* duplicate */
- }
+ case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# duplicate replies discarded (bloomfilter)"),
+ 1,
+ GNUNET_NO);
#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "New response `%s', adding to filter.\n",
- GNUNET_h2s (&mhash));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Duplicate response `%s', discarding.\n",
+ GNUNET_h2s (&mhash));
#endif
- }
- if (pr->client_request_list != NULL)
- {
- if (pr->replies_seen_size == pr->replies_seen_off)
- GNUNET_array_grow (pr->replies_seen,
- pr->replies_seen_size,
- pr->replies_seen_size * 2 + 4);
- pr->replies_seen[pr->replies_seen_off++] = chash;
- }
- if ( (pr->bf == NULL) ||
- (pr->client_request_list != NULL) )
- refresh_bloomfilter (pr);
- GNUNET_CONTAINER_bloomfilter_add (pr->bf,
- &mhash);
- break;
- default:
+ return GNUNET_YES; /* duplicate */
+ case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
+ return GNUNET_YES; /* wrong namespace */
+ case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
+ GNUNET_break (0);
+ return GNUNET_YES;
+ case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
GNUNET_break (0);
return GNUNET_YES;
+ case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Unsupported block type %u\n"),
+ prq->type);
+ return GNUNET_NO;
+ }
+ if (pr->client_request_list != NULL)
+ {
+ if (pr->replies_seen_size == pr->replies_seen_off)
+ GNUNET_array_grow (pr->replies_seen,
+ pr->replies_seen_size,
+ pr->replies_seen_size * 2 + 4);
+ GNUNET_CRYPTO_hash (prq->data,
+ prq->size,
+ &pr->replies_seen[pr->replies_seen_off++]);
+ refresh_bloomfilter (pr);
+ }
+ if (NULL == prq->sender)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Found result for query `%s' in local datastore\n",
+ GNUNET_h2s (key));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# results found locally"),
+ 1,
+ GNUNET_NO);
}
prq->priority += pr->remaining_priority;
pr->remaining_priority = 0;
+ pr->results_found++;
+ prq->request_found = GNUNET_YES;
if (NULL != pr->client_request_list)
{
GNUNET_STATISTICS_update (stats,
int success,
const char *msg)
{
- /* FIXME */
+ struct GNUNET_TIME_Absolute *start = cls;
+ struct GNUNET_TIME_Relative delay;
+
+ delay = GNUNET_TIME_absolute_get_duration (*start);
+ GNUNET_free (start);
+ GNUNET_LOAD_update (datastore_put_load,
+ delay.value);
+ if (GNUNET_OK == success)
+ return;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# datastore 'put' failures"),
+ 1,
+ GNUNET_NO);
}
struct GNUNET_TIME_Absolute expiration;
GNUNET_HashCode query;
struct ProcessReplyClosure prq;
- const struct SBlock *sb;
+ struct GNUNET_TIME_Absolute *start;
+ struct GNUNET_TIME_Relative block_time;
+ double putl;
+ struct ConnectedPeer *cp;
+ struct PendingMessage *pm;
+ struct MigrationStopMessage *msm;
msize = ntohs (message->size);
if (msize < sizeof (struct PutMessage))
type = ntohl (put->type);
expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
+ if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+ return GNUNET_SYSERR;
if (GNUNET_OK !=
- GNUNET_BLOCK_check_block (type,
- &put[1],
- dsize,
- &query))
+ GNUNET_BLOCK_get_key (block_ctx,
+ type,
+ &put[1],
+ dsize,
+ &query))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
- if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
- return GNUNET_SYSERR;
- if (GNUNET_BLOCK_TYPE_SBLOCK == type)
- {
- sb = (const struct SBlock*) &put[1];
- GNUNET_CRYPTO_hash (&sb->subspace,
- sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
- &prq.namespace);
- }
-
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received result for query `%s' from peer `%4s'\n",
prq.expiration = expiration;
prq.priority = 0;
prq.finished = GNUNET_NO;
+ prq.request_found = GNUNET_NO;
GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
&query,
&process_reply,
GNUNET_h2s (&query),
prq.priority);
#endif
+ start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
+ *start = GNUNET_TIME_absolute_get ();
GNUNET_DATASTORE_put (dsh,
0, &query, dsize, &put[1],
type, prq.priority, 1 /* anonymity */,
1 + prq.priority, MAX_DATASTORE_QUEUE,
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
&put_migration_continuation,
- NULL);
+ start);
+ }
+ putl = GNUNET_LOAD_get_load (datastore_put_load);
+ if ( (GNUNET_NO == prq.request_found) &&
+ ( (GNUNET_YES != active_migration) ||
+ (putl > 2.0) ) )
+ {
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &other->hashPubKey);
+ if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
+ return GNUNET_OK; /* already blocked */
+ /* We're too busy; send MigrationStop message! */
+ if (GNUNET_YES != active_migration)
+ putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
+ block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ (unsigned int) (60000 * putl * putl)));
+
+ cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) +
+ sizeof (struct MigrationStopMessage));
+ pm->msize = sizeof (struct MigrationStopMessage);
+ pm->priority = UINT32_MAX;
+ msm = (struct MigrationStopMessage*) &pm[1];
+ msm->header.size = htons (sizeof (struct MigrationStopMessage));
+ msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+ msm->duration = GNUNET_TIME_relative_hton (block_time);
+ add_to_pending_messages_for_peer (cp,
+ pm,
+ NULL);
}
return GNUNET_OK;
}
+/**
+ * Handle P2P "MIGRATION_STOP" message.
+ *
+ * @param cls closure, always NULL
+ * @param other the other peer involved (sender or receiver, NULL
+ * for loopback messages where we are both sender and receiver)
+ * @param message the actual message
+ * @param latency reported latency of the connection with 'other'
+ * @param distance reported distance (DV) to 'other'
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_p2p_migration_stop (void *cls,
+ const struct GNUNET_PeerIdentity *other,
+ const struct GNUNET_MessageHeader *message,
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance)
+{
+ struct ConnectedPeer *cp;
+ const struct MigrationStopMessage *msm;
+
+ msm = (const struct MigrationStopMessage*) message;
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &other->hashPubKey);
+ if (cp == NULL)
+ {
+ GNUNET_break (0);
+ return GNUNET_OK;
+ }
+ cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
+ return GNUNET_OK;
+}
+
+
+
/* **************************** P2P GET Handling ************************ */
struct PendingRequest *pr = cls;
struct ProcessReplyClosure prq;
struct CheckDuplicateRequestClosure cdrc;
- const struct SBlock *sb;
- GNUNET_HashCode dhash;
- GNUNET_HashCode mhash;
GNUNET_HashCode query;
+ unsigned int old_rf;
if (NULL == key)
{
GNUNET_h2s (key),
type);
#endif
- if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+ if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
{
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
&process_local_reply,
pr))
if (pr->qe != NULL)
- GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
- return;
- }
- /* check for duplicates */
- GNUNET_CRYPTO_hash (data, size, &dhash);
- mingle_hash (&dhash,
- pr->mingle,
- &mhash);
- if ( (pr->bf != NULL) &&
- (GNUNET_YES ==
- GNUNET_CONTAINER_bloomfilter_test (pr->bf,
- &mhash)) )
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Result from datastore filtered by bloomfilter (duplicate).\n");
-#endif
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# results filtered by query bloomfilter"),
- 1,
- GNUNET_NO);
- if (pr->qe != NULL)
- GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ {
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ }
return;
}
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found result for query `%s' in local datastore\n",
- GNUNET_h2s (key));
-#endif
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# results found locally"),
- 1,
- GNUNET_NO);
- pr->results_found++;
+ old_rf = pr->results_found;
memset (&prq, 0, sizeof (prq));
prq.data = data;
prq.expiration = expiration;
prq.size = size;
- if (GNUNET_BLOCK_TYPE_SBLOCK == type)
- {
- sb = (const struct SBlock*) data;
- GNUNET_CRYPTO_hash (&sb->subspace,
- sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
- &prq.namespace);
- }
- if (GNUNET_OK != GNUNET_BLOCK_check_block (type,
- data,
- size,
- &query))
+ if (GNUNET_OK !=
+ GNUNET_BLOCK_get_key (block_ctx,
+ type,
+ data,
+ size,
+ &query))
{
GNUNET_break (0);
GNUNET_DATASTORE_remove (dsh,
prq.type = type;
prq.priority = priority;
prq.finished = GNUNET_NO;
+ prq.request_found = GNUNET_NO;
process_reply (&prq, key, pr);
+ if ( (old_rf == 0) &&
+ (pr->results_found == 1) )
+ update_datastore_delays (pr->start_time);
if (prq.finished == GNUNET_YES)
return;
if (pr->qe == NULL)
return; /* done here */
- if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
- (type == GNUNET_BLOCK_TYPE_IBLOCK) )
+ if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
{
GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
return;
}
gm = (const struct GetMessage*) message;
type = ntohl (gm->type);
- switch (type)
- {
- case GNUNET_BLOCK_TYPE_ANY:
- case GNUNET_BLOCK_TYPE_DBLOCK:
- case GNUNET_BLOCK_TYPE_IBLOCK:
- case GNUNET_BLOCK_TYPE_KBLOCK:
- case GNUNET_BLOCK_TYPE_SBLOCK:
- break;
- default:
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
bm = ntohl (gm->hash_bitmap);
bits = 0;
while (bm > 0)
opt = (const GNUNET_HashCode*) &gm[1];
bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
bm = ntohl (gm->hash_bitmap);
- if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
- (type != GNUNET_BLOCK_TYPE_SBLOCK) )
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
bits = 0;
cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
&other->hashPubKey);
pr->mingle = ntohl (gm->filter_mutator);
if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
-
pr->anonymity_level = 1;
pr->priority = bound_priority (ntohl (gm->priority), cps);
pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
/* calculate change in traffic preference */
cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
/* process locally */
- if (type == GNUNET_BLOCK_TYPE_DBLOCK)
+ if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
(pr->priority + 1));
/* Are multiple results possible? If so, start processing remotely now! */
switch (pr->type)
{
- case GNUNET_BLOCK_TYPE_DBLOCK:
- case GNUNET_BLOCK_TYPE_IBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_IBLOCK:
/* only one result, wait for datastore */
break;
default:
if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
{
pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
+ GNUNET_assert (pr != NULL);
destroy_pending_request (pr);
}
return GNUNET_OK;
GNUNET_h2s (&sm->query),
(unsigned int) type);
#endif
- switch (type)
- {
- case GNUNET_BLOCK_TYPE_ANY:
- case GNUNET_BLOCK_TYPE_DBLOCK:
- case GNUNET_BLOCK_TYPE_IBLOCK:
- case GNUNET_BLOCK_TYPE_KBLOCK:
- case GNUNET_BLOCK_TYPE_SBLOCK:
- case GNUNET_BLOCK_TYPE_NBLOCK:
- break;
- default:
- GNUNET_break (0);
- GNUNET_SERVER_receive_done (client,
- GNUNET_SYSERR);
- return;
- }
-
cl = client_list;
while ( (cl != NULL) &&
(cl->client != client) )
client_list = cl;
}
/* detect duplicate KBLOCK requests */
- if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
- (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
+ if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
+ (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
(type == GNUNET_BLOCK_TYPE_ANY) )
{
crl = cl->rl_head;
1,
GNUNET_NO);
pr = GNUNET_malloc (sizeof (struct PendingRequest) +
- ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
+ ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
crl = GNUNET_malloc (sizeof (struct ClientRequestList));
memset (crl, 0, sizeof (struct ClientRequestList));
crl->client_list = cl;
sc * sizeof (GNUNET_HashCode));
pr->replies_seen_off = sc;
pr->anonymity_level = ntohl (sm->anonymity_level);
+ pr->start_time = GNUNET_TIME_absolute_get ();
refresh_bloomfilter (pr);
pr->query = sm->query;
if (0 == (1 & ntohl (sm->options)))
pr->local_only = GNUNET_YES;
switch (type)
{
- case GNUNET_BLOCK_TYPE_DBLOCK:
- case GNUNET_BLOCK_TYPE_IBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_IBLOCK:
if (0 != memcmp (&sm->target,
&all_zeros,
sizeof (GNUNET_HashCode)))
pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
break;
- case GNUNET_BLOCK_TYPE_SBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_SBLOCK:
pr->namespace = (GNUNET_HashCode*) &pr[1];
memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
break;
&sm->query,
pr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- if (type == GNUNET_BLOCK_TYPE_DBLOCK)
+ if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
pr->qe = GNUNET_DATASTORE_get (dsh,
&sm->query,
GNUNET_MESSAGE_TYPE_FS_GET, 0 },
{ &handle_p2p_put,
GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
+ { &handle_p2p_migration_stop,
+ GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
+ sizeof (struct MigrationStopMessage) },
{ NULL, 0, 0 }
};
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
GNUNET_SCHEDULER_shutdown (sched);
return;
}
+ datastore_get_load = GNUNET_LOAD_value_init ();
+ datastore_put_load = GNUNET_LOAD_value_init ();
+ block_cfg = GNUNET_CONFIGURATION_create ();
+ GNUNET_CONFIGURATION_set_value_string (block_cfg,
+ "block",
+ "PLUGINS",
+ "fs");
+ block_ctx = GNUNET_BLOCK_context_create (block_cfg);
+ GNUNET_assert (NULL != block_ctx);
+ dht_handle = GNUNET_DHT_connect (sched,
+ cfg,
+ FS_DHT_HT_SIZE);
if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
(GNUNET_OK != main_init (sched, server, cfg)) )
{
GNUNET_SCHEDULER_shutdown (sched);
GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
dsh = NULL;
+ GNUNET_DHT_disconnect (dht_handle);
+ dht_handle = NULL;
+ GNUNET_BLOCK_context_destroy (block_ctx);
+ block_ctx = NULL;
+ GNUNET_CONFIGURATION_destroy (block_cfg);
+ block_cfg = NULL;
+ GNUNET_LOAD_value_free (datastore_get_load);
+ datastore_get_load = NULL;
+ GNUNET_LOAD_value_free (datastore_put_load);
+ datastore_put_load = NULL;
return;
}
}