* - consider more precise latency estimation (per-peer & request) -- again load API?
* - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
* - introduce random latency in processing
- * - tell other peers to stop migration if our PUTs fail (or if
- * we don't support migration per configuration?)
* - more statistics
*/
#include "platform.h"
*/
struct GNUNET_TIME_Absolute migration_blocked;
+ /**
+ * Time until when we blocked this peer from migrating
+ * data to us.
+ */
+ struct GNUNET_TIME_Absolute last_migration_block;
+
/**
* Handle for an active request for transmission to this
* peer, or NULL.
static double current_priorities;
/**
- * Datastore load tracking.
+ * Datastore 'GET' load tracking.
*/
-static struct GNUNET_LOAD_Value *datastore_load;
+static struct GNUNET_LOAD_Value *datastore_get_load;
+
+/**
+ * Datastore 'PUT' load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_put_load;
/**
struct GNUNET_TIME_Relative delay;
delay = GNUNET_TIME_absolute_get_duration (start);
- GNUNET_LOAD_update (datastore_load,
+ GNUNET_LOAD_update (datastore_get_load,
delay.value);
}
TransmissionContinuation cont;
void *cont_cls;
- GNUNET_assert (pml->pm == pm);
- GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
- cont = pm->cont;
- cont_cls = pm->cont_cls;
- destroy_pending_message_list_entry (pml);
- cont (cont_cls, tpid);
+ if (pml != NULL)
+ {
+ GNUNET_assert (pml->pm == pm);
+ GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
+ cont = pm->cont;
+ cont_cls = pm->cont_cls;
+ destroy_pending_message_list_entry (pml);
+ }
+ else
+ {
+ GNUNET_free (pm);
+ }
+ if (cont != NULL)
+ cont (cont_cls, tpid);
}
GNUNET_assert (0 == mig_size);
GNUNET_DHT_disconnect (dht_handle);
dht_handle = NULL;
- GNUNET_LOAD_value_free (datastore_load);
- datastore_load = NULL;
+ GNUNET_LOAD_value_free (datastore_get_load);
+ datastore_get_load = NULL;
+ GNUNET_LOAD_value_free (datastore_put_load);
+ datastore_put_load = NULL;
GNUNET_BLOCK_context_destroy (block_ctx);
block_ctx = NULL;
GNUNET_CONFIGURATION_destroy (block_cfg);
GNUNET_assert (pm->next == NULL);
GNUNET_assert (pm->pml == NULL);
- pml = GNUNET_malloc (sizeof (struct PendingMessageList));
- pml->req = pr;
- pml->target = cp;
- pml->pm = pm;
- pm->pml = pml;
- GNUNET_CONTAINER_DLL_insert (pr->pending_head,
- pr->pending_tail,
- pml);
+ if (pr != NULL)
+ {
+ pml = GNUNET_malloc (sizeof (struct PendingMessageList));
+ pml->req = pr;
+ pml->target = cp;
+ pml->pm = pm;
+ pm->pml = pml;
+ GNUNET_CONTAINER_DLL_insert (pr->pending_head,
+ pr->pending_tail,
+ pml);
+ }
pos = cp->pending_messages_head;
while ( (pos != NULL) &&
(pm->priority < pos->priority) )
* Did we finish processing the associated request?
*/
int finished;
+
+ /**
+ * Did we find a matching request?
+ */
+ int request_found;
};
prq->priority += pr->remaining_priority;
pr->remaining_priority = 0;
pr->results_found++;
+ prq->request_found = GNUNET_YES;
if (NULL != pr->client_request_list)
{
GNUNET_STATISTICS_update (stats,
int success,
const char *msg)
{
- /* FIXME */
+ struct GNUNET_TIME_Absolute *start = cls;
+ struct GNUNET_TIME_Relative delay;
+
+ delay = GNUNET_TIME_absolute_get_duration (*start);
+ GNUNET_free (start);
+ GNUNET_LOAD_update (datastore_put_load,
+ delay.value);
+ if (GNUNET_OK == success)
+ return;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# datastore 'put' failures"),
+ 1,
+ GNUNET_NO);
}
struct GNUNET_TIME_Absolute expiration;
GNUNET_HashCode query;
struct ProcessReplyClosure prq;
+ struct GNUNET_TIME_Absolute *start;
+ struct GNUNET_TIME_Relative block_time;
+ double putl;
+ struct ConnectedPeer *cp;
+ struct PendingMessage *pm;
+ struct MigrationStopMessage *msm;
msize = ntohs (message->size);
if (msize < sizeof (struct PutMessage))
prq.expiration = expiration;
prq.priority = 0;
prq.finished = GNUNET_NO;
+ prq.request_found = GNUNET_NO;
GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
&query,
&process_reply,
GNUNET_h2s (&query),
prq.priority);
#endif
+ start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
+ *start = GNUNET_TIME_absolute_get ();
GNUNET_DATASTORE_put (dsh,
0, &query, dsize, &put[1],
type, prq.priority, 1 /* anonymity */,
1 + prq.priority, MAX_DATASTORE_QUEUE,
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
&put_migration_continuation,
- NULL);
+ start);
+ }
+ putl = GNUNET_LOAD_get_load (datastore_put_load);
+ if ( (GNUNET_NO == prq.request_found) &&
+ ( (GNUNET_YES != active_migration) ||
+ (putl > 2.0) ) )
+ {
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &other->hashPubKey);
+ if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
+ return GNUNET_OK; /* already blocked */
+ /* We're too busy; send MigrationStop message! */
+ if (GNUNET_YES != active_migration)
+ putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
+ block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ (unsigned int) (60000 * putl * putl)));
+
+ cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) +
+ sizeof (struct MigrationStopMessage));
+ pm->msize = sizeof (struct MigrationStopMessage);
+ pm->priority = UINT32_MAX;
+ msm = (struct MigrationStopMessage*) &pm[1];
+ msm->header.size = htons (sizeof (struct MigrationStopMessage));
+ msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+ msm->duration = GNUNET_TIME_relative_hton (block_time);
+ add_to_pending_messages_for_peer (cp,
+ pm,
+ NULL);
}
return GNUNET_OK;
}
struct GNUNET_TIME_Relative latency,
uint32_t distance)
{
- // FIXME!
+ struct ConnectedPeer *cp;
+ const struct MigrationStopMessage *msm;
+
+ msm = (const struct MigrationStopMessage*) message;
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &other->hashPubKey);
+ if (cp == NULL)
+ {
+ GNUNET_break (0);
+ return GNUNET_OK;
+ }
+ cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
return GNUNET_OK;
}
prq.type = type;
prq.priority = priority;
prq.finished = GNUNET_NO;
+ prq.request_found = GNUNET_NO;
process_reply (&prq, key, pr);
if ( (old_rf == 0) &&
(pr->results_found == 1) )
GNUNET_SCHEDULER_shutdown (sched);
return;
}
- datastore_load = GNUNET_LOAD_value_init ();
+ datastore_get_load = GNUNET_LOAD_value_init ();
+ datastore_put_load = GNUNET_LOAD_value_init ();
block_cfg = GNUNET_CONFIGURATION_create ();
GNUNET_CONFIGURATION_set_value_string (block_cfg,
"block",
block_ctx = NULL;
GNUNET_CONFIGURATION_destroy (block_cfg);
block_cfg = NULL;
- GNUNET_LOAD_value_free (datastore_load);
- datastore_load = NULL;
+ GNUNET_LOAD_value_free (datastore_get_load);
+ datastore_get_load = NULL;
+ GNUNET_LOAD_value_free (datastore_put_load);
+ datastore_put_load = NULL;
return;
}
}