}
GNUNET_CONTAINER_multihashmap_put (pr_map, query, pr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- if (0 != (options & GSF_PRO_REQUEST_EXPIRES))
+ if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES))
{
pr->hnode =
GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, pr,
GNUNET_assert (dpr != NULL);
if (pr == dpr)
break; /* let the request live briefly... */
- dpr->rh (dpr->rh_cls, GNUNET_BLOCK_EVALUATION_REQUEST_VALID, dpr,
- UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_BLOCK_TYPE_ANY,
- NULL, 0);
+ if (NULL != dpr->rh)
+ dpr->rh (dpr->rh_cls, GNUNET_BLOCK_EVALUATION_REQUEST_VALID, dpr,
+ UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_TIME_UNIT_FOREVER_ABS,
+ GNUNET_BLOCK_TYPE_ANY, NULL, 0);
GSF_pending_request_cancel_ (dpr, GNUNET_YES);
}
}
return pr;
}
-
/**
* Obtain the public data associated with a pending request
*
struct ProcessReplyClosure *prq = cls;
struct GSF_PendingRequest *pr = value;
GNUNET_HashCode chash;
+ struct GNUNET_TIME_Absolute last_transmission;
if (NULL == pr->rh)
return GNUNET_YES;
GNUNET_LOAD_update (GSF_rt_entry_lifetime,
GNUNET_TIME_absolute_get_duration (pr->
public_data.start_time).rel_value);
+ if (!GSF_request_plan_reference_get_last_transmission_ (pr->public_data.rpr_head, prq->sender, &last_transmission))
+ last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value;
/* pass on to other peers / local clients */
pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration,
- prq->type, prq->data, prq->size);
+ last_transmission, prq->type, prq->data, prq->size);
return GNUNET_YES;
case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
GNUNET_STATISTICS_update (GSF_stats,
pr->public_data.results_found++;
prq->request_found = GNUNET_YES;
/* finally, pass on to other peer / local client */
+ if (!GSF_request_plan_reference_get_last_transmission_ (pr->public_data.rpr_head, prq->sender, &last_transmission))
+ last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value;
pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration,
- prq->type, prq->data, prq->size);
+ last_transmission, prq->type, prq->data, prq->size);
return GNUNET_YES;
}
*
* @param cls closure
* @param success GNUNET_SYSERR on failure
+ * @param min_expiration minimum expiration time required for content to be stored
* @param msg NULL on success, otherwise an error message
*/
static void
-put_migration_continuation (void *cls, int success, const char *msg)
+put_migration_continuation (void *cls, int success,
+ struct GNUNET_TIME_Absolute min_expiration,
+ const char *msg)
{
struct PutMigrationContext *pmc = cls;
- struct GNUNET_TIME_Relative delay;
- struct GNUNET_TIME_Relative block_time;
struct GSF_ConnectedPeer *cp;
+ struct GNUNET_TIME_Relative mig_pause;
struct GSF_PeerPerformanceData *ppd;
- delay = GNUNET_TIME_absolute_get_duration (pmc->start);
+ if (NULL != datastore_put_load)
+ {
+ if (GNUNET_SYSERR != success)
+ {
+ GNUNET_LOAD_update (datastore_put_load,
+ GNUNET_TIME_absolute_get_duration (pmc->start).rel_value);
+ }
+ else
+ {
+ /* on queue failure / timeout, increase the put load dramatically */
+ GNUNET_LOAD_update (datastore_put_load,
+ GNUNET_TIME_UNIT_MINUTES.rel_value);
+ }
+ }
cp = GSF_peer_get_ (&pmc->origin);
- if ((GNUNET_OK != success) && (GNUNET_NO == pmc->requested))
+ if (GNUNET_OK == success)
{
- /* block migration for a bit... */
if (NULL != cp)
{
ppd = GSF_get_peer_performance_data_ (cp);
- ppd->migration_duplication++;
- block_time =
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
- 5 * ppd->migration_duplication +
- GNUNET_CRYPTO_random_u32
- (GNUNET_CRYPTO_QUALITY_WEAK, 5));
- GSF_block_peer_migration_ (cp, block_time);
+ ppd->migration_delay.rel_value /= 2;
}
+ GNUNET_free (pmc);
+ return;
}
- else
+ if ( (GNUNET_NO == success) &&
+ (GNUNET_NO == pmc->requested) &&
+ (NULL != cp) )
{
- if (NULL != cp)
+ ppd = GSF_get_peer_performance_data_ (cp);
+ if (min_expiration.abs_value > 0)
{
- ppd = GSF_get_peer_performance_data_ (cp);
- ppd->migration_duplication = 0; /* reset counter */
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Asking to stop migration for %llu ms because datastore is full\n",
+ (unsigned long long) GNUNET_TIME_absolute_get_remaining (min_expiration).rel_value);
+#endif
+ GSF_block_peer_migration_ (cp, min_expiration);
+ }
+ else
+ {
+ ppd->migration_delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_SECONDS,
+ ppd->migration_delay);
+ ppd->migration_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS,
+ ppd->migration_delay);
+ mig_pause.rel_value = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ ppd->migration_delay.rel_value);
+ ppd->migration_delay = GNUNET_TIME_relative_multiply (ppd->migration_delay, 2);
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Replicated content already exists locally, asking to stop migration for %llu ms\n",
+ (unsigned long long) mig_pause.rel_value);
+#endif
+ GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (mig_pause));
}
}
GNUNET_free (pmc);
- /* FIXME: should we really update the load value on failure? */
- if (NULL != datastore_put_load)
- GNUNET_LOAD_update (datastore_put_load, delay.rel_value);
- if (GNUNET_OK == success)
- return;
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# Datastore `PUT' failures"), 1,
GNUNET_NO);
memset (&prq, 0, sizeof (prq));
prq.data = data;
prq.expiration = exp;
+ /* do not allow migrated content to live longer than 1 year */
+ prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
+ prq.expiration);
prq.size = size;
prq.type = type;
process_reply (&prq, key, pr);
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
&put_migration_continuation, pmc))
{
- put_migration_continuation (pmc, GNUNET_NO, NULL);
+ put_migration_continuation (pmc, GNUNET_SYSERR, GNUNET_TIME_UNIT_ZERO_ABS, NULL);
}
}
}
if (NULL == key)
{
#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
"No further local responses available.\n");
#endif
if ((pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) ||
dsize = msize - sizeof (struct PutMessage);
type = ntohl (put->type);
expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
+ /* do not allow migrated content to live longer than 1 year */
+ expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
+ expiration);
if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
return GNUNET_SYSERR;
if (GNUNET_OK !=
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
&put_migration_continuation, pmc))
{
- put_migration_continuation (pmc, GNUNET_NO, NULL);
+ put_migration_continuation (pmc, GNUNET_SYSERR, GNUNET_TIME_UNIT_ZERO_ABS, NULL);
}
}
else
GNUNET_CRYPTO_random_u32
(GNUNET_CRYPTO_QUALITY_WEAK,
(unsigned int) (60000 * putl * putl)));
- GSF_block_peer_migration_ (cp, block_time);
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Asking to stop migration for %llu ms because of load %f and events %d/%d\n",
+ (unsigned long long) block_time.rel_value,
+ putl,
+ active_to_migration,
+ (GNUNET_NO == prq.request_found));
+#endif
+ GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (block_time));
}
return GNUNET_OK;
}