* targets for (or NULL for none)
* @param key ID of the peer
* @param value 'struct ConnectedPeer' of the peer
- * @return GNUNET_YES (always continue iteration)2
+ * @return GNUNET_YES (always continue iteration)
*/
static int
consider_migration (void *cls,
}
if (msize == 0)
return GNUNET_YES; /* no content available */
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Trying to migrate at least %u bytes to peer `%s'\n",
+ msize,
+ GNUNET_h2s (key));
+#endif
cp->cth
= GNUNET_CORE_notify_transmit_ready (core,
0, GNUNET_TIME_UNIT_FOREVER_REL,
return;
delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
mig_size);
- delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS,
+ delay = GNUNET_TIME_relative_divide (delay,
MAX_MIGRATION_QUEUE);
delay = GNUNET_TIME_relative_max (delay,
min_migration_delay);
GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
return;
}
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Retrieved block `%s' of type %u for migration\n",
+ GNUNET_h2s (key),
+ type);
+#endif
mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
mb->query = *key;
mb->expiration = expiration;
{
GNUNET_PEER_change_rc (pos->target_list[i], -1);
pos->target_list[i] = 0;
- if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
- {
- delete_migration_block (pos);
- consider_migration_gathering ();
- continue;
- }
- GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
- &consider_migration,
- pos);
- break;
- }
+ }
+ }
+ if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
+ {
+ delete_migration_block (pos);
+ consider_migration_gathering ();
+ continue;
}
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &consider_migration,
+ pos);
}
GNUNET_PEER_change_rc (cp->pid, -1);
size -= sizeof (migm);
memcpy (&cbuf[msize], &mb[1], mb->size);
msize += mb->size;
- size -= mb->size;
+ size -= mb->size;
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Pushing migration block `%s' (%u bytes) to `%s'\n",
+ GNUNET_h2s (&mb->query),
+ mb->size,
+ GNUNET_i2s (&pid));
+#endif
break;
}
+ else
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
+ GNUNET_h2s (&mb->query),
+ mb->size,
+ GNUNET_i2s (&pid));
+#endif
+ }
}
if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
(mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Failed to schedule transmission with core!\n");
#endif
- /* FIXME: call stats (rare, bad case) */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# CORE transmission failures"),
+ 1,
+ GNUNET_NO);
}
}
pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
&psc.target,
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
- GNUNET_BANDWIDTH_value_init ((uint32_t) -1 /* no limit */),
+ GNUNET_BANDWIDTH_value_init (UINT32_MAX),
DBLOCK_SIZE * 2,
(uint64_t) cp->inc_preference,
&target_reservation_cb,
* How much was this reply worth to us?
*/
uint32_t priority;
+
+ /**
+ * Did we finish processing the associated request?
+ */
+ int finished;
};
if (pr->cp != NULL)
{
GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
- [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
+ [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
+ -1);
GNUNET_PEER_change_rc (pr->cp->pid, 1);
prq->sender->last_p2p_replies
[(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
}
prq->priority += pr->remaining_priority;
pr->remaining_priority = 0;
- if (pr->client_request_list != NULL)
+ if (NULL != pr->client_request_list)
{
GNUNET_STATISTICS_update (stats,
gettext_noop ("# replies received for local clients"),
}
GNUNET_break (cl->th != NULL);
if (pr->do_remove)
- destroy_pending_request (pr);
+ {
+ prq->finished = GNUNET_YES;
+ destroy_pending_request (pr);
+ }
}
else
{
reply->cont = &transmit_reply_continuation;
reply->cont_cls = pr;
reply->msize = msize;
- reply->priority = (uint32_t) -1; /* send replies first! */
+ reply->priority = UINT32_MAX; /* send replies first! */
pm = (struct PutMessage*) &reply[1];
pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
pm->header.size = htons (msize);
prq.type = type;
prq.expiration = expiration;
prq.priority = 0;
+ prq.finished = GNUNET_NO;
GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
&query,
&process_reply,
&prq);
if (GNUNET_YES == active_migration)
{
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Replicating result for query `%s' with priority %u\n",
+ GNUNET_h2s (&query),
+ prq.priority);
+#endif
GNUNET_DATASTORE_put (dsh,
0, &query, dsize, &put[1],
type, prq.priority, 1 /* anonymity */,
GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
return;
}
+ prq.type = type;
+ prq.priority = priority;
+ prq.finished = GNUNET_NO;
+ process_reply (&prq, key, pr);
+ if (prq.finished == GNUNET_YES)
+ return;
+ if (pr->qe == NULL)
+ return; /* done here */
if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
(type == GNUNET_BLOCK_TYPE_IBLOCK) )
{
- if (pr->qe != NULL)
- GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+ return;
}
- else if ( (pr->client_request_list == NULL) &&
+ if ( (pr->client_request_list == NULL) &&
( (GNUNET_YES == test_load_too_high()) ||
(pr->results_found > 5 + 2 * pr->priority) ) )
{
gettext_noop ("# processing result set cut short due to load"),
1,
GNUNET_NO);
- if (pr->qe != NULL)
- GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+ return;
}
- else if (pr->qe != NULL)
- GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
- prq.type = type;
- prq.priority = priority;
- process_reply (&prq, key, pr);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
}
pr = GNUNET_malloc (sizeof (struct PendingRequest) +
(have_ns ? sizeof(GNUNET_HashCode) : 0));
if (have_ns)
- pr->namespace = (GNUNET_HashCode*) &pr[1];
+ {
+ pr->namespace = (GNUNET_HashCode*) &pr[1];
+ memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
+ }
pr->type = type;
pr->mingle = ntohl (gm->filter_mutator);
- if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
- memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
gettext_noop ("# requests dropped due TTL underflow"),
1,
GNUNET_NO);
- /* integer underflow => drop (should be very rare)! */
+ /* integer underflow => drop (should be very rare)! */
GNUNET_free (pr);
return GNUNET_OK;
}
/* **************************** Startup ************************ */
-
-/**
- * List of handlers for P2P messages
- * that we care about.
- */
-static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
- {
- { &handle_p2p_get,
- GNUNET_MESSAGE_TYPE_FS_GET, 0 },
- { &handle_p2p_put,
- GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
- { NULL, 0, 0 }
- };
-
-
-/**
- * List of handlers for the messages understood by this
- * service.
- */
-static struct GNUNET_SERVER_MessageHandler handlers[] = {
- {&GNUNET_FS_handle_index_start, NULL,
- GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
- {&GNUNET_FS_handle_index_list_get, NULL,
- GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
- {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
- sizeof (struct UnindexMessage) },
- {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
- 0 },
- {NULL, NULL, 0, 0}
-};
-
-
/**
* Process fs requests.
*
struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *c)
{
+ static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
+ {
+ { &handle_p2p_get,
+ GNUNET_MESSAGE_TYPE_FS_GET, 0 },
+ { &handle_p2p_put,
+ GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
+ { NULL, 0, 0 }
+ };
+ static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+ {&GNUNET_FS_handle_index_start, NULL,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
+ {&GNUNET_FS_handle_index_list_get, NULL,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
+ {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
+ sizeof (struct UnindexMessage) },
+ {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
+ 0 },
+ {NULL, NULL, 0, 0}
+ };
+
sched = s;
cfg = c;
stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
}
/* FIXME: distinguish between sending and storing in options? */
if (active_migration)
- consider_migration_gathering ();
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Content migration is enabled, will start to gather data\n"));
+ consider_migration_gathering ();
+ }
GNUNET_SERVER_disconnect_notify (server,
&handle_client_disconnect,
NULL);