#include "gnunet-service-fs_push.h"
+#define DEBUG_FS_MIGRATION GNUNET_NO
+
/**
* How long must content remain valid for us to consider it for migration?
* If content will expire too soon, there is clearly no point in pushing
peer->msg = NULL;
if (buf == NULL)
{
+#if DEBUG_FS_MIGRATION
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to migrate content to another peer (disconnect)\n");
+#endif
GNUNET_free (msg);
return 0;
}
GNUNET_assert (msize <= buf_size);
memcpy (buf, msg, msize);
GNUNET_free (msg);
+#if DEBUG_FS_MIGRATION
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Pushing %u bytes to another peer\n",
+ msize);
+#endif
find_content (peer);
return msize;
}
GNUNET_assert (NULL == peer->th);
msize = sizeof (struct PutMessage) + block->size;
msg = GNUNET_malloc (msize);
- msg->header.type = htons (42);
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
msg->header.size = htons (msize);
-
+ msg->type = htonl (block->type);
+ msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
memcpy (&msg[1],
&block[1],
block->size);
{
ret = GNUNET_NO;
}
+#if DEBUG_FS_MIGRATION
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Asking for transmission of %u bytes for migration\n",
+ msize);
+#endif
peer->th = GSF_peer_transmit_ (peer->peer,
GNUNET_NO,
0 /* priority */,
if (NULL == best)
{
if (mig_size < MAX_MIGRATION_QUEUE)
- return; /* will fill up eventually... */
+ {
+#if DEBUG_FS_MIGRATION
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No content found for pushing, waiting for queue to fill\n");
+#endif
+ return; /* will fill up eventually... */
+ }
+#if DEBUG_FS_MIGRATION
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No suitable content found, purging content from full queue\n");
+#endif
/* failed to find migration target AND
queue is full, purge most-forwarded
block from queue to make room for more */
- score = 0;
pos = mig_head;
while (NULL != pos)
{
pos = pos->next;
}
GNUNET_assert (NULL != best);
- delete_migration_block (best);
+ delete_migration_block (best);
consider_gathering ();
return;
}
+#if DEBUG_FS_MIGRATION
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Preparing to push best content to peer\n");
+#endif
transmit_content (mrp, best);
}
MAX_MIGRATION_QUEUE);
delay = GNUNET_TIME_relative_max (delay,
min_migration_delay);
+#if DEBUG_FS_MIGRATION
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Scheduling gathering task (queue size: %u)\n",
+ mig_size);
+#endif
mig_task = GNUNET_SCHEDULER_add_delayed (delay,
&gather_migration_blocks,
NULL);
*/
static void
process_migration_content (void *cls,
- const GNUNET_HashCode * key,
+ const GNUNET_HashCode *key,
size_t size,
const void *data,
enum GNUNET_BLOCK_Type type,
struct MigrationReadyBlock *mb;
struct MigrationReadyPeer *pos;
+ mig_qe = NULL;
if (key == NULL)
{
- mig_qe = NULL;
+#if DEBUG_FS_MIGRATION
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No content found for migration...\n");
+#endif
consider_gathering ();
return;
}
if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
{
- /* content will expire soon, don't bother */
- GNUNET_DATASTORE_get_next (GSF_dsh);
+ /* content will expire soon, don't bother */
+ consider_gathering ();
return;
}
if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
type, priority, anonymity,
expiration, uid,
&process_migration_content,
- NULL))
- {
- GNUNET_DATASTORE_get_next (GSF_dsh);
- }
+ NULL))
+ consider_gathering ();
return;
}
-#if DEBUG_FS
+#if DEBUG_FS_MIGRATION
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Retrieved block `%s' of type %u for migration\n",
+ "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
GNUNET_h2s (key),
- type);
+ type,
+ mig_size + 1,
+ MAX_MIGRATION_QUEUE);
#endif
mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
mb->query = *key;
{
if (NULL == pos->th)
{
+#if DEBUG_FS_MIGRATION
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Preparing to push best content to peer\n");
+#endif
if (GNUNET_YES == transmit_content (pos, mb))
break; /* 'mb' was freed! */
}
pos = pos->next;
}
- GNUNET_DATASTORE_get_next (GSF_dsh);
+ consider_gathering ();
}
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
mig_task = GNUNET_SCHEDULER_NO_TASK;
+ if (mig_size >= MAX_MIGRATION_QUEUE)
+ return;
if (GSF_dsh != NULL)
{
- mig_qe = GNUNET_DATASTORE_get_random (GSF_dsh,
- 0, UINT_MAX,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &process_migration_content, NULL);
- GNUNET_assert (mig_qe != NULL);
+#if DEBUG_FS_MIGRATION
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Asking datastore for content for replication (queue size: %u)\n",
+ mig_size);
+#endif
+ mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
+ 0, UINT_MAX,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_migration_content, NULL);
+ if (NULL == mig_qe)
+ consider_gathering ();
}
}