From be23aeefdd4f3732ede252f21445c30331788405 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 17 May 2010 07:17:48 +0000 Subject: [PATCH] lc stuff --- TODO | 57 +++++- src/datastore/datastore_api.c | 85 +++------ src/fs/fs.h | 15 ++ src/fs/fs_test_lib_data.conf | 2 +- src/fs/gnunet-service-fs.c | 332 ++++++++++++++++++++++++++++------ 5 files changed, 371 insertions(+), 120 deletions(-) diff --git a/TODO b/TODO index 4bc73c0b4..9cf5a1645 100644 --- a/TODO +++ b/TODO @@ -1,12 +1,55 @@ 0.9.0pre1: * FS: [CG] - - migration: - + on-demand encoding - + peer selection => how to consider latency/bw/etc? - + content transmission => how often the same block? - + testing - - gnunet-service-fs (hot-path routing, load-based routing, nitpicks) - + active reply route caching design & implementation of service; gap extension! + - test migration + - TTL/priority calculations + - hot-path routing, load considerations + - statistics + - active reply route caching design & implementation of service; gap extension! + - Indexing: +May 16 12:49:50 fs-13737 WARNING `open' failed on file `/home/grothoff/svn/gnunet/src/fs/H/' at disk.c:1253 with error: No such file or directory +May 16 12:49:50 fs-13737 WARNING Could not access indexed file `ENUTBMBR' at offset 2064384: No such file or directory + NOTE: corrupted filename in open message + NOTE: odd directory name in open message + +==14995== 8 bytes in 1 blocks are definitely lost in loss record 1 of 12 +==14995== at 0x4024C4C: malloc (vg_replace_malloc.c:195) +==14995== by 0x4068F05: GNUNET_xmalloc_unchecked_ (common_allocation.c:92) +==14995== by 0x4068E33: GNUNET_xmalloc_ (common_allocation.c:61) +==14995== by 0x40519F5: GNUNET_DATASTORE_get_random (datastore_api.c:1102) +==14995== by 0x804ADCF: gather_migration_blocks (gnunet-service-fs.c:969) +==14995== by 0x40864C8: run_ready (scheduler.c:514) +==14995== by 0x4086970: GNUNET_SCHEDULER_run (scheduler.c:642) +==14995== by 0x408CF1B: GNUNET_SERVICE_run (service.c:1404) +==14995== by 0x804F725: main (gnunet-service-fs.c:3506) +==14995== +==14995== 8 bytes in 1 blocks are definitely lost in loss record 2 of 12 +==14995== at 0x4024C4C: malloc (vg_replace_malloc.c:195) +==14995== by 0x4068F05: GNUNET_xmalloc_unchecked_ (common_allocation.c:92) +==14995== by 0x4068E33: GNUNET_xmalloc_ (common_allocation.c:61) +==14995== by 0x4051ACB: GNUNET_DATASTORE_get (datastore_api.c:1160) +==14995== by 0x804F39A: handle_start_search (gnunet-service-fs.c:3352) +==14995== by 0x4087F9A: GNUNET_SERVER_inject (server.c:653) +==14995== by 0x40880A8: process_client_buffer (server.c:714) +==14995== by 0x4088529: restart_processing (server.c:848) +==14995== by 0x40864C8: run_ready (scheduler.c:514) +==14995== by 0x4086970: GNUNET_SCHEDULER_run (scheduler.c:642) +==14995== by 0x408CF1B: GNUNET_SERVICE_run (service.c:1404) +==14995== by 0x804F725: main (gnunet-service-fs.c:3506) +==14995== +==14995== 120 bytes in 15 blocks are definitely lost in loss record 5 of 12 +==14995== at 0x4024C4C: malloc (vg_replace_malloc.c:195) +==14995== by 0x4068F05: GNUNET_xmalloc_unchecked_ (common_allocation.c:92) +==14995== by 0x4068E33: GNUNET_xmalloc_ (common_allocation.c:61) +==14995== by 0x4050DA1: GNUNET_DATASTORE_put (datastore_api.c:695) +==14995== by 0x804DD79: handle_p2p_put (gnunet-service-fs.c:2591) +==14995== by 0x40588B8: main_notify_handler (core_api.c:468) +==14995== by 0x4067DAE: receive_task (client.c:499) +==14995== by 0x40864C8: run_ready (scheduler.c:514) +==14995== by 0x4086970: GNUNET_SCHEDULER_run (scheduler.c:642) +==14995== by 0x408CF1B: GNUNET_SERVICE_run (service.c:1404) +==14995== by 0x804F725: main (gnunet-service-fs.c:3506) +==14995== + * TBENCH: [MW] - good to have for transport/DV evaluation! * DV: [Nate] diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 89c7edba3..16db19f94 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -255,20 +255,8 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, } while (NULL != (qe = h->queue_head)) { - if (NULL != qe->response_proc) - { - qe->response_proc (qe, NULL); - } - else - { - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); - if (qe->task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (h->sched, - qe->task); - GNUNET_free (qe); - } + GNUNET_assert (NULL != qe->response_proc); + qe->response_proc (qe, NULL); } if (GNUNET_YES == drop) { @@ -385,15 +373,8 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, { if (pos->max_queue < h->queue_size) { - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - pos); - GNUNET_SCHEDULER_cancel (h->sched, - pos->task); - if (pos->response_proc != NULL) - pos->response_proc (pos, NULL); - GNUNET_free (pos); - h->queue_size--; + GNUNET_assert (pos->response_proc != NULL); + pos->response_proc (pos, NULL); break; } pos = pos->next; @@ -565,6 +546,24 @@ drop_status_cont (void *cls, int result, const char *emsg) } +static void +free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) +{ + struct GNUNET_DATASTORE_Handle *h = qe->h; + + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + if (qe->task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->sched, + qe->task); + qe->task = GNUNET_SCHEDULER_NO_TASK; + } + h->queue_size--; + GNUNET_free (qe); +} + /** * Type of a function to call when we receive a message * from the service. @@ -584,16 +583,7 @@ process_status_message (void *cls, const char *emsg; int32_t status; - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); - if (qe->task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, - qe->task); - qe->task = GNUNET_SCHEDULER_NO_TASK; - } - GNUNET_free (qe); + free_queue_entry (qe); if (msg == NULL) { if (NULL == h->client) @@ -1018,10 +1008,7 @@ process_result_message (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to receive response from datastore\n")); #endif - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); - GNUNET_free (qe); + free_queue_entry (qe); do_disconnect (h); rc->iter (rc->iter_cls, NULL, 0, NULL, 0, 0, 0, @@ -1036,10 +1023,7 @@ process_result_message (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received end of result set\n"); #endif - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); - GNUNET_free (qe); + free_queue_entry (qe); rc->iter (rc->iter_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); @@ -1052,10 +1036,7 @@ process_result_message (void *cls, (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) ) { GNUNET_break (0); - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); - GNUNET_free (qe); + free_queue_entry (qe); h->retry_time = GNUNET_TIME_UNIT_ZERO; do_disconnect (h); rc->iter (rc->iter_cls, @@ -1226,10 +1207,7 @@ GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h, GNUNET_TIME_absolute_get_remaining (qe->timeout)); return; } - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); - GNUNET_free (qe); + free_queue_entry (qe); h->retry_time = GNUNET_TIME_UNIT_ZERO; do_disconnect (h); rc->iter (rc->iter_cls, @@ -1253,13 +1231,8 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) h = qe->h; reconnect = qe->was_transmitted; - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); - if (qe->task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (h->sched, - qe->task); - GNUNET_free (qe); + free_queue_entry (qe); + h->queue_size--; if (reconnect) { h->retry_time = GNUNET_TIME_UNIT_ZERO; diff --git a/src/fs/fs.h b/src/fs/fs.h index aff90d4fe..32b4181dd 100644 --- a/src/fs/fs.h +++ b/src/fs/fs.h @@ -1,3 +1,4 @@ + /* This file is part of GNUnet. (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Christian Grothoff (and other contributing authors) @@ -41,6 +42,20 @@ */ #define MAX_MIGRATION_QUEUE 32 +/** + * How many peers do we select as possible + * targets per block obtained for migration? + */ +#define MIGRATION_LIST_SIZE 4 + +/** + * To how many peers do we forward each migration block ultimately? + * This number must be smaller or equal to MIGRATION_LIST_SIZE. Using + * a smaller value allows for variation in available bandwidth (for + * migration) between the peers. + */ +#define MIGRATION_TARGET_COUNT 2 + /** * Ratio for moving average delay calculation. The previous * average goes in with a factor of (n-1) into the calculation. diff --git a/src/fs/fs_test_lib_data.conf b/src/fs/fs_test_lib_data.conf index 164c884c0..3f00352a7 100644 --- a/src/fs/fs_test_lib_data.conf +++ b/src/fs/fs_test_lib_data.conf @@ -53,7 +53,7 @@ PORT = 43471 HOSTNAME = localhost #OPTIONS = -L DEBUG #DEBUG = YES -#PREFIX = valgrind --tool=memcheck --leak-check=yes +PREFIX = valgrind --tool=memcheck --leak-check=yes #BINARY = /home/grothoff/bin/gnunet-service-fs #PREFIX = xterm -e gdb -x cmd --args diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 418ed459a..b8184262f 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -28,8 +28,7 @@ * TODO: * - have non-zero preference / priority for requests we initiate! * - implement hot-path routing decision procedure - * - implement: bound_priority, test_load_too_high, validate_nblock - * - add content migration support (forward from migration list) + * - implement: bound_priority, test_load_too_high * - statistics */ #include "platform.h" @@ -585,11 +584,22 @@ struct MigrationReadyBlock */ struct GNUNET_TIME_Absolute expiration; + /** + * Peers we would consider forwarding this + * block to. Zero for empty entries. + */ + GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE]; + /** * Size of the block. */ size_t size; + /** + * Number of targets already used. + */ + unsigned int used_targets; + /** * Type of the block. */ @@ -684,6 +694,25 @@ static unsigned int mig_size; */ static int active_migration; + +/** + * Transmit messages by copying it to the target buffer + * "buf". "buf" will be NULL and "size" zero if the socket was closed + * for writing in the meantime. In that case, do nothing + * (the disconnect or shutdown handler will take care of the rest). + * If we were able to transmit messages and there are still more + * pending, ask core again for further calls to this function. + * + * @param cls closure, pointer to the 'struct ConnectedPeer*' + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +transmit_to_peer (void *cls, + size_t size, void *buf); + + /* ******************* clean up functions ************************ */ @@ -698,16 +727,38 @@ delete_migration_block (struct MigrationReadyBlock *mb) GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb); + GNUNET_PEER_decrement_rcs (mb->target_list, + MIGRATION_LIST_SIZE); mig_size--; GNUNET_free (mb); } +/** + * Compare the distance of two peers to a key. + * + * @param key key + * @param p1 first peer + * @param p2 second peer + * @return GNUNET_YES if P1 is closer to key than P2 + */ +static int +is_closer (const GNUNET_HashCode *key, + const struct GNUNET_PeerIdentity *p1, + const struct GNUNET_PeerIdentity *p2) +{ + return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey, + &p2->hashPubKey, + key); +} + + /** * Consider migrating content to a given peer. * - * @param cls not used - * @param key ID of the peer (not used) + * @param cls 'struct MigrationReadyBlock*' to select + * targets for (or NULL for none) + * @param key ID of the peer * @param value 'struct ConnectedPeer' of the peer * @return GNUNET_YES (always continue iteration)2 */ @@ -716,17 +767,92 @@ consider_migration (void *cls, const GNUNET_HashCode *key, void *value) { + struct MigrationReadyBlock *mb = cls; struct ConnectedPeer *cp = value; + struct MigrationReadyBlock *pos; + struct GNUNET_PeerIdentity cppid; + struct GNUNET_PeerIdentity otherpid; + struct GNUNET_PeerIdentity worstpid; + size_t msize; + unsigned int i; + unsigned int repl; + /* consider 'cp' as a migration target for mb */ + if (mb != NULL) + { + GNUNET_PEER_resolve (cp->pid, + &cppid); + repl = MIGRATION_LIST_SIZE; + for (i=0;itarget_list[i] == 0) + { + mb->target_list[i] = cp->pid; + GNUNET_PEER_change_rc (mb->target_list[i], 1); + repl = MIGRATION_LIST_SIZE; + break; + } + GNUNET_PEER_resolve (mb->target_list[i], + &otherpid); + if ( (repl == MIGRATION_LIST_SIZE) && + is_closer (&mb->query, + &cppid, + &otherpid)) + { + repl = i; + worstpid = otherpid; + } + else if ( (repl != MIGRATION_LIST_SIZE) && + (is_closer (&mb->query, + &worstpid, + &otherpid) ) ) + { + repl = i; + worstpid = otherpid; + } + } + if (repl != MIGRATION_LIST_SIZE) + { + GNUNET_PEER_change_rc (mb->target_list[repl], -1); + mb->target_list[repl] = cp->pid; + GNUNET_PEER_change_rc (mb->target_list[repl], 1); + } + } + + /* consider scheduling transmission to cp for content migration */ if (cp->cth != NULL) - return GNUNET_YES; /* or what? */ - /* FIXME: not implemented! */ + return GNUNET_YES; + msize = 0; + pos = mig_head; + while (pos != NULL) + { + for (i=0;ipid == pos->target_list[i]) + { + if (msize == 0) + msize = pos->size; + else + msize = GNUNET_MIN (msize, + pos->size); + break; + } + } + pos = pos->next; + } + if (msize == 0) + return GNUNET_YES; /* no content available */ + cp->cth + = GNUNET_CORE_notify_transmit_ready (core, + 0, GNUNET_TIME_UNIT_FOREVER_REL, + (const struct GNUNET_PeerIdentity*) key, + msize + sizeof (struct PutMessage), + &transmit_to_peer, + cp); return GNUNET_YES; } - - /** * Task that is run periodically to obtain blocks for content * migration @@ -739,6 +865,32 @@ gather_migration_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +/** + * If the migration task is not currently running, consider + * (re)scheduling it with the appropriate delay. + */ +static void +consider_migration_gathering () +{ + struct GNUNET_TIME_Relative delay; + + if (mig_qe != NULL) + return; + if (mig_task != GNUNET_SCHEDULER_NO_TASK) + return; + delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + mig_size); + delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS, + MAX_MIGRATION_QUEUE); + delay = GNUNET_TIME_relative_max (delay, + min_migration_delay); + mig_task = GNUNET_SCHEDULER_add_delayed (sched, + delay, + &gather_migration_blocks, + NULL); +} + + /** * Process content offered for migration. * @@ -765,24 +917,23 @@ process_migration_content (void *cls, expiration, uint64_t uid) { struct MigrationReadyBlock *mb; - struct GNUNET_TIME_Relative delay; if (key == NULL) { mig_qe = NULL; if (mig_size < MAX_MIGRATION_QUEUE) - { - delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - mig_size); - delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS, - MAX_MIGRATION_QUEUE); - delay = GNUNET_TIME_relative_max (delay, - min_migration_delay); - mig_task = GNUNET_SCHEDULER_add_delayed (sched, - delay, - &gather_migration_blocks, - NULL); - } + consider_migration_gathering (); + return; + } + if (type == GNUNET_BLOCK_TYPE_ONDEMAND) + { + if (GNUNET_OK != + GNUNET_FS_handle_on_demand_block (key, size, data, + type, priority, anonymity, + expiration, uid, + &process_migration_content, + NULL)) + GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); return; } mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size); @@ -796,10 +947,9 @@ process_migration_content (void *cls, mig_tail, mb); mig_size++; - if (mig_size == 1) - GNUNET_CONTAINER_multihashmap_iterate (connected_peers, - &consider_migration, - NULL); + GNUNET_CONTAINER_multihashmap_iterate (connected_peers, + &consider_migration, + mb); GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); } @@ -978,7 +1128,8 @@ peer_connect_handler (void *cls, uint32_t distance) { struct ConnectedPeer *cp; - + struct MigrationReadyBlock *pos; + cp = GNUNET_malloc (sizeof (struct ConnectedPeer)); cp->pid = GNUNET_PEER_intern (peer); GNUNET_break (GNUNET_OK == @@ -986,8 +1137,13 @@ peer_connect_handler (void *cls, &peer->hashPubKey, cp, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - if (mig_size > 0) - (void) consider_migration (NULL, &peer->hashPubKey, cp); + + pos = mig_head; + while (NULL != pos) + { + (void) consider_migration (pos, &peer->hashPubKey, cp); + pos = pos->next; + } } @@ -1031,6 +1187,8 @@ peer_disconnect_handler (void *cls, struct ConnectedPeer *cp; struct PendingMessage *pm; unsigned int i; + struct MigrationReadyBlock *pos; + struct MigrationReadyBlock *next; GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map, &peer->hashPubKey, @@ -1052,6 +1210,31 @@ peer_disconnect_handler (void *cls, GNUNET_CONTAINER_multihashmap_remove (connected_peers, &peer->hashPubKey, cp)); + /* remove this peer from migration considerations; schedule + alternatives */ + next = mig_head; + while (NULL != (pos = next)) + { + next = pos->next; + for (i=0;itarget_list[i] == cp->pid) + { + GNUNET_PEER_change_rc (pos->target_list[i], -1); + pos->target_list[i] = 0; + if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) + { + delete_migration_block (pos); + consider_migration_gathering (); + } + GNUNET_CONTAINER_multihashmap_iterate (connected_peers, + &consider_migration, + pos); + break; + } + } + } + GNUNET_PEER_change_rc (cp->pid, -1); GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); if (NULL != cp->cth) @@ -1231,7 +1414,7 @@ shutdown_task (void *cls, /** - * Transmit the given message by copying it to the target buffer + * Transmit messages by copying it to the target buffer * "buf". "buf" will be NULL and "size" zero if the socket was closed * for writing in the meantime. In that case, do nothing * (the disconnect or shutdown handler will take care of the rest). @@ -1251,7 +1434,11 @@ transmit_to_peer (void *cls, char *cbuf = buf; struct GNUNET_PeerIdentity pid; struct PendingMessage *pm; + struct MigrationReadyBlock *mb; + struct MigrationReadyBlock *next; + struct PutMessage migm; size_t msize; + unsigned int i; cp->cth = NULL; if (NULL == buf) @@ -1283,6 +1470,44 @@ transmit_to_peer (void *cls, &transmit_to_peer, cp); } + else + { + next = mig_head; + while (NULL != (mb = next)) + { + next = mb->next; + for (i=0;ipid == mb->target_list[i]) && + (mb->size + sizeof (migm) <= size) ) + { + GNUNET_PEER_change_rc (mb->target_list[i], -1); + mb->target_list[i] = 0; + mb->used_targets++; + migm.header.size = htons (sizeof (migm) + mb->size); + migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); + migm.type = htonl (mb->type); + migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration); + memcpy (&cbuf[msize], &migm, sizeof (migm)); + msize += sizeof (migm); + size -= sizeof (migm); + memcpy (&cbuf[msize], &mb[1], mb->size); + msize += mb->size; + size -= mb->size; + break; + } + } + if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) || + (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) ) + { + delete_migration_block (mb); + consider_migration_gathering (); + } + } + consider_migration (NULL, + &pid.hashPubKey, + cp); + } #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to peer %u\n", @@ -1330,18 +1555,17 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp, cp->pending_requests++; if (cp->pending_requests > MAX_QUEUE_PER_PEER) destroy_pending_message (cp->pending_messages_tail, 0); - if (cp->cth == NULL) - { - /* need to schedule transmission */ - GNUNET_PEER_resolve (cp->pid, &pid); - cp->cth = GNUNET_CORE_notify_transmit_ready (core, - cp->pending_messages_head->priority, - MAX_TRANSMIT_DELAY, - &pid, - cp->pending_messages_head->msize, - &transmit_to_peer, - cp); - } + GNUNET_PEER_resolve (cp->pid, &pid); + if (NULL != cp->cth) + GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); + /* need to schedule transmission */ + cp->cth = GNUNET_CORE_notify_transmit_ready (core, + cp->pending_messages_head->priority, + MAX_TRANSMIT_DELAY, + &pid, + cp->pending_messages_head->msize, + &transmit_to_peer, + cp); if (cp->cth == NULL) { #if DEBUG_FS @@ -2095,9 +2319,14 @@ process_reply (void *cls, } else { + if (NULL != prq->sender->last_client_replies + [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]) + GNUNET_SERVER_client_drop (prq->sender->last_client_replies + [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]); prq->sender->last_client_replies [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE] = pr->client_request_list->client_list->client; + GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client); } } GNUNET_CRYPTO_hash (prq->data, @@ -2255,12 +2484,10 @@ process_reply (void *cls, memcpy (&pm[1], prq->data, prq->size); add_to_pending_messages_for_peer (cp, reply, pr); } - // FIXME: implement hot-path routing statistics keeping! return GNUNET_YES; } - /** * Continuation called to notify client about result of the * operation. @@ -2586,18 +2813,13 @@ process_local_reply (void *cls, GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); return; } - prq.type = type; - prq.priority = priority; - process_reply (&prq, key, pr); - if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) || (type == GNUNET_BLOCK_TYPE_IBLOCK) ) { if (pr->qe != NULL) GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); - return; } - if ( (pr->client_request_list == NULL) && + else if ( (pr->client_request_list == NULL) && ( (GNUNET_YES == test_load_too_high()) || (pr->results_found > 5 + 2 * pr->priority) ) ) { @@ -2611,10 +2833,12 @@ process_local_reply (void *cls, GNUNET_NO); if (pr->qe != NULL) GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); - return; } - if (pr->qe != NULL) + else if (pr->qe != NULL) GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + prq.type = type; + prq.priority = priority; + process_reply (&prq, key, pr); } @@ -3221,11 +3445,7 @@ main_init (struct GNUNET_SCHEDULER_Handle *s, } /* FIXME: distinguish between sending and storing in options? */ if (active_migration) - { - mig_task = GNUNET_SCHEDULER_add_now (sched, - &gather_migration_blocks, - NULL); - } + consider_migration_gathering (); GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL); -- 2.25.1