X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffs%2Fgnunet-service-fs.c;h=06ac91c73d5f1a21f9b79d59f6454801106b0df7;hb=1f09f4f7716db5939ec1c9a278b5661616dd72d6;hp=610c6e6dd4ef5dff877afd5e23f55543a0f10bff;hpb=7d2cbe5bd19ec95ada5a28a0134185ef29007b69;p=oweals%2Fgnunet.git diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 610c6e6dd..06ac91c73 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009, 2010 Christian Grothoff (and other contributing authors) + (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -23,12 +23,8 @@ * @brief gnunet anonymity protocol implementation * @author Christian Grothoff * - * TODO: - * - track per-peer request latency (using new load API) - * - consider more precise latency estimation (per-peer & request) -- again load API? - * - implement test_load_too_high, make decision priority-based, implement forwarding, etc. - * - introduce random latency in processing - * - more statistics + * To use: + * - consider re-issue GSF_dht_lookup_ after non-DHT reply received */ #include "platform.h" #include @@ -41,17 +37,17 @@ #include "gnunet_protocols.h" #include "gnunet_signatures.h" #include "gnunet_statistics_service.h" +#include "gnunet_transport_service.h" #include "gnunet_util_lib.h" +#include "gnunet-service-fs_cp.h" #include "gnunet-service-fs_indexing.h" +#include "gnunet-service-fs_lc.h" +#include "gnunet-service-fs_pe.h" +#include "gnunet-service-fs_pr.h" +#include "gnunet-service-fs_push.h" +#include "gnunet-service-fs_put.h" #include "fs.h" -#define DEBUG_FS GNUNET_NO - -/** - * Maximum number of outgoing messages we queue per peer. - */ -#define MAX_QUEUE_PER_PEER 16 - /** * Size for the hash map for DHT requests from the FS * service. Should be about the number of concurrent @@ -59,716 +55,125 @@ */ #define FS_DHT_HT_SIZE 1024 -/** - * How often do we flush trust values to disk? - */ -#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) - -/** - * Inverse of the probability that we will submit the same query - * to the same peer again. If the same peer already got the query - * repeatedly recently, the probability is multiplied by the inverse - * of this number each time. Note that we only try about every TTL_DECREMENT/2 - * plus MAX_CORK_DELAY (so roughly every 3.5s). - */ -#define RETRY_PROBABILITY_INV 3 - -/** - * What is the maximum delay for a P2P FS message (in our interaction - * with core)? FS-internal delays are another story. The value is - * chosen based on the 32k block size. Given that peers typcially - * have at least 1 kb/s bandwidth, 45s waits give us a chance to - * transmit one message even to the lowest-bandwidth peers. - */ -#define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45) - -/** - * Maximum number of requests (from other peers) that we're - * willing to have pending at any given point in time. - */ -static unsigned long long max_pending_requests = (32 * 1024); - - -/** - * Information we keep for each pending reply. The - * actual message follows at the end of this struct. - */ -struct PendingMessage; - -/** - * Function called upon completion of a transmission. - * - * @param cls closure - * @param pid ID of receiving peer, 0 on transmission error - */ -typedef void (*TransmissionContinuation)(void * cls, - GNUNET_PEER_Id tpid); - - -/** - * Information we keep for each pending message (GET/PUT). The - * actual message follows at the end of this struct. - */ -struct PendingMessage -{ - /** - * This is a doubly-linked list of messages to the same peer. - */ - struct PendingMessage *next; - - /** - * This is a doubly-linked list of messages to the same peer. - */ - struct PendingMessage *prev; - - /** - * Entry in pending message list for this pending message. - */ - struct PendingMessageList *pml; - - /** - * Function to call immediately once we have transmitted this - * message. - */ - TransmissionContinuation cont; - - /** - * Closure for cont. - */ - void *cont_cls; - - /** - * Size of the reply; actual reply message follows - * at the end of this struct. - */ - size_t msize; - - /** - * How important is this message for us? - */ - uint32_t priority; - -}; - - -/** - * Information about a peer that we are connected to. - * We track data that is useful for determining which - * peers should receive our requests. We also keep - * a list of messages to transmit to this peer. - */ -struct ConnectedPeer -{ - - /** - * List of the last clients for which this peer successfully - * answered a query. - */ - struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE]; - - /** - * List of the last PIDs for which - * this peer successfully answered a query; - * We use 0 to indicate no successful reply. - */ - GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE]; - - /** - * Average delay between sending the peer a request and - * getting a reply (only calculated over the requests for - * which we actually got a reply). Calculated - * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n - */ - struct GNUNET_TIME_Relative avg_delay; - - /** - * Point in time until which this peer does not want us to migrate content - * to it. - */ - struct GNUNET_TIME_Absolute migration_blocked; - - /** - * Time until when we blocked this peer from migrating - * data to us. - */ - struct GNUNET_TIME_Absolute last_migration_block; - - /** - * Handle for an active request for transmission to this - * peer, or NULL. - */ - struct GNUNET_CORE_TransmitHandle *cth; - - /** - * Messages (replies, queries, content migration) we would like to - * send to this peer in the near future. Sorted by priority, head. - */ - struct PendingMessage *pending_messages_head; - - /** - * Messages (replies, queries, content migration) we would like to - * send to this peer in the near future. Sorted by priority, tail. - */ - struct PendingMessage *pending_messages_tail; - - /** - * Average priority of successful replies. Calculated - * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n - */ - double avg_priority; - - /** - * Increase in traffic preference still to be submitted - * to the core service for this peer. - */ - uint64_t inc_preference; - - /** - * Trust rating for this peer - */ - uint32_t trust; - - /** - * Trust rating for this peer on disk. - */ - uint32_t disk_trust; - - /** - * The peer's identity. - */ - GNUNET_PEER_Id pid; - - /** - * Size of the linked list of 'pending_messages'. - */ - unsigned int pending_requests; - - /** - * Which offset in "last_p2p_replies" will be updated next? - * (we go round-robin). - */ - unsigned int last_p2p_replies_woff; - - /** - * Which offset in "last_client_replies" will be updated next? - * (we go round-robin). - */ - unsigned int last_client_replies_woff; - -}; - - -/** - * Information we keep for each pending request. We should try to - * keep this struct as small as possible since its memory consumption - * is key to how many requests we can have pending at once. - */ -struct PendingRequest; - - -/** - * Doubly-linked list of requests we are performing - * on behalf of the same client. - */ -struct ClientRequestList -{ - - /** - * This is a doubly-linked list. - */ - struct ClientRequestList *next; - - /** - * This is a doubly-linked list. - */ - struct ClientRequestList *prev; - - /** - * Request this entry represents. - */ - struct PendingRequest *req; - - /** - * Client list this request belongs to. - */ - struct ClientList *client_list; - -}; - - -/** - * Replies to be transmitted to the client. The actual - * response message is allocated after this struct. - */ -struct ClientResponseMessage -{ - /** - * This is a doubly-linked list. - */ - struct ClientResponseMessage *next; - - /** - * This is a doubly-linked list. - */ - struct ClientResponseMessage *prev; - - /** - * Client list entry this response belongs to. - */ - struct ClientList *client_list; - - /** - * Number of bytes in the response. - */ - size_t msize; -}; - - -/** - * Linked list of clients we are performing requests - * for right now. - */ -struct ClientList -{ - /** - * This is a linked list. - */ - struct ClientList *next; - - /** - * ID of a client making a request, NULL if this entry is for a - * peer. - */ - struct GNUNET_SERVER_Client *client; - - /** - * Head of list of requests performed on behalf - * of this client right now. - */ - struct ClientRequestList *rl_head; - - /** - * Tail of list of requests performed on behalf - * of this client right now. - */ - struct ClientRequestList *rl_tail; - - /** - * Head of linked list of responses. - */ - struct ClientResponseMessage *res_head; - - /** - * Tail of linked list of responses. - */ - struct ClientResponseMessage *res_tail; - - /** - * Context for sending replies. - */ - struct GNUNET_CONNECTION_TransmitHandle *th; - -}; - - -/** - * Doubly-linked list of messages we are performing - * due to a pending request. - */ -struct PendingMessageList -{ - - /** - * This is a doubly-linked list of messages on behalf of the same request. - */ - struct PendingMessageList *next; - - /** - * This is a doubly-linked list of messages on behalf of the same request. - */ - struct PendingMessageList *prev; - - /** - * Message this entry represents. - */ - struct PendingMessage *pm; - - /** - * Request this entry belongs to. - */ - struct PendingRequest *req; - - /** - * Peer this message is targeted for. - */ - struct ConnectedPeer *target; - -}; - - -/** - * Information we keep for each pending request. We should try to - * keep this struct as small as possible since its memory consumption - * is key to how many requests we can have pending at once. - */ -struct PendingRequest -{ - - /** - * If this request was made by a client, this is our entry in the - * client request list; otherwise NULL. - */ - struct ClientRequestList *client_request_list; - - /** - * Entry of peer responsible for this entry (if this request - * was made by a peer). - */ - struct ConnectedPeer *cp; - - /** - * If this is a namespace query, pointer to the hash of the public - * key of the namespace; otherwise NULL. Pointer will be to the - * end of this struct (so no need to free it). - */ - const GNUNET_HashCode *namespace; - - /** - * Bloomfilter we use to filter out replies that we don't care about - * (anymore). NULL as long as we are interested in all replies. - */ - struct GNUNET_CONTAINER_BloomFilter *bf; - - /** - * Context of our GNUNET_CORE_peer_change_preference call. - */ - struct GNUNET_CORE_InformationRequestContext *irc; - - /** - * Reference to DHT get operation for this request (or NULL). - */ - struct GNUNET_DHT_GetHandle *dht_get; - - /** - * Hash code of all replies that we have seen so far (only valid - * if client is not NULL since we only track replies like this for - * our own clients). - */ - GNUNET_HashCode *replies_seen; - - /** - * Node in the heap representing this entry; NULL - * if we have no heap node. - */ - struct GNUNET_CONTAINER_HeapNode *hnode; - - /** - * Head of list of messages being performed on behalf of this - * request. - */ - struct PendingMessageList *pending_head; - - /** - * Tail of list of messages being performed on behalf of this - * request. - */ - struct PendingMessageList *pending_tail; - - /** - * When did we first see this request (form this peer), or, if our - * client is initiating, when did we last initiate a search? - */ - struct GNUNET_TIME_Absolute start_time; - - /** - * The query that this request is for. - */ - GNUNET_HashCode query; - - /** - * The task responsible for transmitting queries - * for this request. - */ - GNUNET_SCHEDULER_TaskIdentifier task; - - /** - * (Interned) Peer identifier that identifies a preferred target - * for requests. - */ - GNUNET_PEER_Id target_pid; - - /** - * (Interned) Peer identifiers of peers that have already - * received our query for this content. - */ - GNUNET_PEER_Id *used_pids; - - /** - * Our entry in the queue (non-NULL while we wait for our - * turn to interact with the local database). - */ - struct GNUNET_DATASTORE_QueueEntry *qe; - - /** - * Size of the 'bf' (in bytes). - */ - size_t bf_size; - - /** - * Desired anonymity level; only valid for requests from a local client. - */ - uint32_t anonymity_level; - - /** - * How many entries in "used_pids" are actually valid? - */ - unsigned int used_pids_off; - - /** - * How long is the "used_pids" array? - */ - unsigned int used_pids_size; - - /** - * Number of results found for this request. - */ - unsigned int results_found; - - /** - * How many entries in "replies_seen" are actually valid? - */ - unsigned int replies_seen_off; - - /** - * How long is the "replies_seen" array? - */ - unsigned int replies_seen_size; - - /** - * Priority with which this request was made. If one of our clients - * made the request, then this is the current priority that we are - * using when initiating the request. This value is used when - * we decide to reward other peers with trust for providing a reply. - */ - uint32_t priority; - - /** - * Priority points left for us to spend when forwarding this request - * to other peers. - */ - uint32_t remaining_priority; - - /** - * Number to mingle hashes for bloom-filter tests with. - */ - int32_t mingle; - - /** - * TTL with which we saw this request (or, if we initiated, TTL that - * we used for the request). - */ - int32_t ttl; - - /** - * Type of the content that this request is for. - */ - enum GNUNET_BLOCK_Type type; - - /** - * Remove this request after transmission of the current response. - */ - int16_t do_remove; - - /** - * GNUNET_YES if we should not forward this request to other peers. - */ - int16_t local_only; - -}; - /** - * Block that is ready for migration to other peers. Actual data is at the end of the block. + * How quickly do we age cover traffic? At the given + * time interval, remaining cover traffic counters are + * decremented by 1/16th. */ -struct MigrationReadyBlock -{ +#define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) - /** - * This is a doubly-linked list. - */ - struct MigrationReadyBlock *next; - - /** - * This is a doubly-linked list. - */ - struct MigrationReadyBlock *prev; - - /** - * Query for the block. - */ - GNUNET_HashCode query; - - /** - * When does this block expire? - */ - struct GNUNET_TIME_Absolute expiration; - - /** - * Peers we would consider forwarding this - * block to. Zero for empty entries. - */ - GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE]; - - /** - * Size of the block. - */ - size_t size; - - /** - * Number of targets already used. - */ - unsigned int used_targets; - - /** - * Type of the block. - */ - enum GNUNET_BLOCK_Type type; -}; +/* ****************************** globals ****************************** */ /** * Our connection to the datastore. */ -static struct GNUNET_DATASTORE_Handle *dsh; - -/** - * Our block context. - */ -static struct GNUNET_BLOCK_Context *block_ctx; +struct GNUNET_DATASTORE_Handle *GSF_dsh; /** - * Our block configuration. + * Our configuration. */ -static struct GNUNET_CONFIGURATION_Handle *block_cfg; +const struct GNUNET_CONFIGURATION_Handle *GSF_cfg; /** - * Our scheduler. + * Handle for reporting statistics. */ -static struct GNUNET_SCHEDULER_Handle *sched; +struct GNUNET_STATISTICS_Handle *GSF_stats; /** - * Our configuration. + * Handle for DHT operations. */ -static const struct GNUNET_CONFIGURATION_Handle *cfg; +struct GNUNET_DHT_Handle *GSF_dht; /** - * Map of peer identifiers to "struct ConnectedPeer" (for that peer). + * How long do requests typically stay in the routing table? */ -static struct GNUNET_CONTAINER_MultiHashMap *connected_peers; +struct GNUNET_LOAD_Value *GSF_rt_entry_lifetime; /** - * Map of peer identifiers to "struct PendingRequest" (for that peer). + * Running average of the observed latency to other peers (round trip). + * Initialized to 5s as the initial default. */ -static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map; +struct GNUNET_TIME_Relative GSF_avg_latency = { 500 }; /** - * Map of query identifiers to "struct PendingRequest" (for that query). + * Typical priorities we're seeing from other peers right now. Since + * most priorities will be zero, this value is the weighted average of + * non-zero priorities seen "recently". In order to ensure that new + * values do not dramatically change the ratio, values are first + * "capped" to a reasonable range (+N of the current value) and then + * averaged into the existing value by a ratio of 1:N. Hence + * receiving the largest possible priority can still only raise our + * "current_priorities" by at most 1. */ -static struct GNUNET_CONTAINER_MultiHashMap *query_request_map; +double GSF_current_priorities; /** - * Heap with the request that will expire next at the top. Contains - * pointers of type "struct PendingRequest*"; these will *also* be - * aliased from the "requests_by_peer" data structures and the - * "requests_by_query" table. Note that requests from our clients - * don't expire and are thus NOT in the "requests_by_expiration" - * (or the "requests_by_peer" tables). + * How many query messages have we received 'recently' that + * have not yet been claimed as cover traffic? */ -static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap; +unsigned int GSF_cover_query_count; /** - * Handle for reporting statistics. + * How many content messages have we received 'recently' that + * have not yet been claimed as cover traffic? */ -static struct GNUNET_STATISTICS_Handle *stats; +unsigned int GSF_cover_content_count; /** - * Linked list of clients we are currently processing requests for. + * Our block context. */ -static struct ClientList *client_list; +struct GNUNET_BLOCK_Context *GSF_block_ctx; /** * Pointer to handle to the core service (points to NULL until we've * connected to it). */ -static struct GNUNET_CORE_Handle *core; - -/** - * Head of linked list of blocks that can be migrated. - */ -static struct MigrationReadyBlock *mig_head; - -/** - * Tail of linked list of blocks that can be migrated. - */ -static struct MigrationReadyBlock *mig_tail; - -/** - * Request to datastore for migration (or NULL). - */ -static struct GNUNET_DATASTORE_QueueEntry *mig_qe; - -/** - * Where do we store trust information? - */ -static char *trustDirectory; +struct GNUNET_CORE_Handle *GSF_core; /** - * ID of task that collects blocks for migration. + * Are we introducing randomized delays for better anonymity? */ -static GNUNET_SCHEDULER_TaskIdentifier mig_task; +int GSF_enable_randomized_delays; -/** - * What is the maximum frequency at which we are allowed to - * poll the datastore for migration content? - */ -static struct GNUNET_TIME_Relative min_migration_delay; - -/** - * Handle for DHT operations. - */ -static struct GNUNET_DHT_Handle *dht_handle; +/* ***************************** locals ******************************* */ /** - * Size of the doubly-linked list of migration blocks. + * Configuration for block library. */ -static unsigned int mig_size; +static struct GNUNET_CONFIGURATION_Handle *block_cfg; /** - * Are we allowed to migrate content to this peer. + * ID of our task that we use to age the cover counters. */ -static int active_migration; +static GNUNET_SCHEDULER_TaskIdentifier cover_age_task; /** - * Typical priorities we're seeing from other peers right now. Since - * most priorities will be zero, this value is the weighted average of - * non-zero priorities seen "recently". In order to ensure that new - * values do not dramatically change the ratio, values are first - * "capped" to a reasonable range (+N of the current value) and then - * averaged into the existing value by a ratio of 1:N. Hence - * receiving the largest possible priority can still only raise our - * "current_priorities" by at most 1. + * Datastore 'GET' load tracking. */ -static double current_priorities; +static struct GNUNET_LOAD_Value *datastore_get_load; /** - * Datastore 'GET' load tracking. + * Identity of this peer. */ -static struct GNUNET_LOAD_Value *datastore_get_load; +static struct GNUNET_PeerIdentity my_id; /** - * Datastore 'PUT' load tracking. + * Task that periodically ages our cover traffic statistics. + * + * @param cls unused closure + * @param tc task context */ -static struct GNUNET_LOAD_Value *datastore_put_load; +static void +age_cover_counters (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + GSF_cover_content_count = (GSF_cover_content_count * 15) / 16; + GSF_cover_query_count = (GSF_cover_query_count * 15) / 16; + cover_age_task = + GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters, + NULL); +} /** @@ -777,3165 +182,414 @@ static struct GNUNET_LOAD_Value *datastore_put_load; * * @param start time when the datastore request was issued */ -static void -update_datastore_delays (struct GNUNET_TIME_Absolute start) +void +GSF_update_datastore_delay_ (struct GNUNET_TIME_Absolute start) { struct GNUNET_TIME_Relative delay; delay = GNUNET_TIME_absolute_get_duration (start); - GNUNET_LOAD_update (datastore_get_load, - delay.value); + GNUNET_LOAD_update (datastore_get_load, delay.rel_value); } /** - * Get the filename under which we would store the GNUNET_HELLO_Message - * for the given host and protocol. - * @return filename of the form DIRECTORY/HOSTID + * Test if the DATABASE (GET) load on this peer is too high + * to even consider processing the query at + * all. + * + * @return GNUNET_YES if the load is too high to do anything (load high) + * GNUNET_NO to process normally (load normal) + * GNUNET_SYSERR to process for free (load low) */ -static char * -get_trust_filename (const struct GNUNET_PeerIdentity *id) +int +GSF_test_get_load_too_high_ (uint32_t priority) { - struct GNUNET_CRYPTO_HashAsciiEncoded fil; - char *fn; + double ld; - GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil); - GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil); - return fn; + ld = GNUNET_LOAD_get_load (datastore_get_load); + if (ld < 1) + return GNUNET_SYSERR; + if (ld <= priority) + return GNUNET_NO; + return GNUNET_YES; } - /** - * Transmit messages by copying it to the target buffer - * "buf". "buf" will be NULL and "size" zero if the socket was closed - * for writing in the meantime. In that case, do nothing - * (the disconnect or shutdown handler will take care of the rest). - * If we were able to transmit messages and there are still more - * pending, ask core again for further calls to this function. + * We've received peer performance information. Update + * our running average for the P2P latency. * - * @param cls closure, pointer to the 'struct ConnectedPeer*' - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param atsi performance information + * @param atsi_count number of 'atsi' records */ -static size_t -transmit_to_peer (void *cls, - size_t size, void *buf); +static void +update_latencies (const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) +{ + unsigned int i; + for (i = 0; i < atsi_count; i++) + { + if (ntohl (atsi[i].type) == GNUNET_ATS_QUALITY_NET_DELAY) + { + GSF_avg_latency.rel_value = + (GSF_avg_latency.rel_value * 31 + + GNUNET_MIN (5000, ntohl (atsi[i].value))) / 32; + GNUNET_STATISTICS_set (GSF_stats, + gettext_noop + ("# running average P2P latency (ms)"), + GSF_avg_latency.rel_value, GNUNET_NO); + break; + } + } +} -/* ******************* clean up functions ************************ */ /** - * Delete the given migration block. + * Handle P2P "PUT" message. * - * @param mb block to delete + * @param cls closure, always NULL + * @param other the other peer involved (sender or receiver, NULL + * for loopback messages where we are both sender and receiver) + * @param message the actual message + * @param atsi performance information + * @param atsi_count number of records in 'atsi' + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) */ -static void -delete_migration_block (struct MigrationReadyBlock *mb) +static int +handle_p2p_put (void *cls, const struct GNUNET_PeerIdentity *other, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) { - GNUNET_CONTAINER_DLL_remove (mig_head, - mig_tail, - mb); - GNUNET_PEER_decrement_rcs (mb->target_list, - MIGRATION_LIST_SIZE); - mig_size--; - GNUNET_free (mb); + struct GSF_ConnectedPeer *cp; + + cp = GSF_peer_get_ (other); + if (NULL == cp) + { + GNUNET_break (0); + return GNUNET_OK; + } + GSF_cover_content_count++; + update_latencies (atsi, atsi_count); + return GSF_handle_p2p_content_ (cp, message); } /** - * Compare the distance of two peers to a key. + * We have a new request, consider forwarding it to the given + * peer. * - * @param key key - * @param p1 first peer - * @param p2 second peer - * @return GNUNET_YES if P1 is closer to key than P2 + * @param cls the 'struct GSF_PendingRequest' + * @param peer identity of the peer + * @param cp handle to the connected peer record + * @param ppd peer performance data */ -static int -is_closer (const GNUNET_HashCode *key, - const struct GNUNET_PeerIdentity *p1, - const struct GNUNET_PeerIdentity *p2) -{ - return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey, - &p2->hashPubKey, - key); +static void +consider_request_for_forwarding (void *cls, + const struct GNUNET_PeerIdentity *peer, + struct GSF_ConnectedPeer *cp, + const struct GSF_PeerPerformanceData *ppd) +{ + struct GSF_PendingRequest *pr = cls; + + if (GNUNET_YES != GSF_pending_request_test_target_ (pr, peer)) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Loopback routes suppressed"), 1, + GNUNET_NO); + return; + } + GSF_plan_add_ (cp, pr); } /** - * Consider migrating content to a given peer. + * Function to be called after we're done processing + * replies from the local lookup. If the result status + * code indicates that there may be more replies, plan + * forwarding the request. * - * @param cls 'struct MigrationReadyBlock*' to select - * targets for (or NULL for none) - * @param key ID of the peer - * @param value 'struct ConnectedPeer' of the peer - * @return GNUNET_YES (always continue iteration) + * @param cls closure (NULL) + * @param pr the pending request we were processing + * @param result final datastore lookup result */ -static int -consider_migration (void *cls, - const GNUNET_HashCode *key, - void *value) +static void +consider_forwarding (void *cls, struct GSF_PendingRequest *pr, + enum GNUNET_BLOCK_EvaluationResult result) { - struct MigrationReadyBlock *mb = cls; - struct ConnectedPeer *cp = value; - struct MigrationReadyBlock *pos; - struct GNUNET_PeerIdentity cppid; - struct GNUNET_PeerIdentity otherpid; - struct GNUNET_PeerIdentity worstpid; - size_t msize; - unsigned int i; - unsigned int repl; - - /* consider 'cp' as a migration target for mb */ - if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0) - return GNUNET_YES; /* peer has requested no migration! */ - if (mb != NULL) - { - GNUNET_PEER_resolve (cp->pid, - &cppid); - repl = MIGRATION_LIST_SIZE; - for (i=0;itarget_list[i] == 0) - { - mb->target_list[i] = cp->pid; - GNUNET_PEER_change_rc (mb->target_list[i], 1); - repl = MIGRATION_LIST_SIZE; - break; - } - GNUNET_PEER_resolve (mb->target_list[i], - &otherpid); - if ( (repl == MIGRATION_LIST_SIZE) && - is_closer (&mb->query, - &cppid, - &otherpid)) - { - repl = i; - worstpid = otherpid; - } - else if ( (repl != MIGRATION_LIST_SIZE) && - (is_closer (&mb->query, - &worstpid, - &otherpid) ) ) - { - repl = i; - worstpid = otherpid; - } - } - if (repl != MIGRATION_LIST_SIZE) - { - GNUNET_PEER_change_rc (mb->target_list[repl], -1); - mb->target_list[repl] = cp->pid; - GNUNET_PEER_change_rc (mb->target_list[repl], 1); - } - } - - /* consider scheduling transmission to cp for content migration */ - if (cp->cth != NULL) - return GNUNET_YES; - msize = 0; - pos = mig_head; - while (pos != NULL) - { - for (i=0;ipid == pos->target_list[i]) - { - if (msize == 0) - msize = pos->size; - else - msize = GNUNET_MIN (msize, - pos->size); - break; - } - } - pos = pos->next; - } - if (msize == 0) - return GNUNET_YES; /* no content available */ -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Trying to migrate at least %u bytes to peer `%s'\n", - msize, - GNUNET_h2s (key)); -#endif - cp->cth - = GNUNET_CORE_notify_transmit_ready (core, - 0, GNUNET_TIME_UNIT_FOREVER_REL, - (const struct GNUNET_PeerIdentity*) key, - msize + sizeof (struct PutMessage), - &transmit_to_peer, - cp); - return GNUNET_YES; + if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) + return; /* we're done... */ + GSF_iterate_connected_peers_ (&consider_request_for_forwarding, pr); } /** - * Task that is run periodically to obtain blocks for content - * migration - * - * @param cls unused - * @param tc scheduler context (also unused) + * Handle P2P "GET" request. + * + * @param cls closure, always NULL + * @param other the other peer involved (sender or receiver, NULL + * for loopback messages where we are both sender and receiver) + * @param message the actual message + * @param atsi performance information + * @param atsi_count number of records in 'atsi' + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) */ -static void -gather_migration_blocks (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc); +static int +handle_p2p_get (void *cls, const struct GNUNET_PeerIdentity *other, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) +{ + struct GSF_PendingRequest *pr; + + pr = GSF_handle_p2p_query_ (other, message); + if (NULL == pr) + return GNUNET_SYSERR; + GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES; + GSF_local_lookup_ (pr, &consider_forwarding, NULL); + update_latencies (atsi, atsi_count); + return GNUNET_OK; +} /** - * If the migration task is not currently running, consider - * (re)scheduling it with the appropriate delay. + * We're done with the local lookup, now consider + * P2P processing (depending on request options and + * result status). Also signal that we can now + * receive more request information from the client. + * + * @param cls the client doing the request ('struct GNUNET_SERVER_Client') + * @param pr the pending request we were processing + * @param result final datastore lookup result */ static void -consider_migration_gathering () +start_p2p_processing (void *cls, struct GSF_PendingRequest *pr, + enum GNUNET_BLOCK_EvaluationResult result) { - struct GNUNET_TIME_Relative delay; + struct GNUNET_SERVER_Client *client = cls; + struct GSF_PendingRequestData *prd; - if (dsh == NULL) - return; - if (mig_qe != NULL) - return; - if (mig_task != GNUNET_SCHEDULER_NO_TASK) + prd = GSF_pending_request_get_data_ (pr); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Finished database lookup for local request `%s' with result %d\n", + GNUNET_h2s (&prd->query), result); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) + return; /* we're done, 'pr' was already destroyed... */ + if (0 != (GSF_PRO_LOCAL_ONLY & prd->options)) + { + GSF_pending_request_cancel_ (pr, GNUNET_YES); return; - delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - mig_size); - delay = GNUNET_TIME_relative_divide (delay, - MAX_MIGRATION_QUEUE); - delay = GNUNET_TIME_relative_max (delay, - min_migration_delay); - mig_task = GNUNET_SCHEDULER_add_delayed (sched, - delay, - &gather_migration_blocks, - NULL); + } + GSF_dht_lookup_ (pr); + consider_forwarding (NULL, pr, result); } /** - * Process content offered for migration. + * Handle START_SEARCH-message (search request from client). * * @param cls closure - * @param key key for the content - * @param size number of bytes in data - * @param data content stored - * @param type type of the content - * @param priority priority of the content - * @param anonymity anonymity-level for the content - * @param expiration expiration time for the content - * @param uid unique identifier for the datum; - * maybe 0 if no unique identifier is available + * @param client identification of the client + * @param message the actual message */ static void -process_migration_content (void *cls, - const GNUNET_HashCode * key, - size_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, uint64_t uid) +handle_start_search (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { - struct MigrationReadyBlock *mb; - - if (key == NULL) - { - mig_qe = NULL; - if (mig_size < MAX_MIGRATION_QUEUE) - consider_migration_gathering (); - return; - } - if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) - { - if (GNUNET_OK != - GNUNET_FS_handle_on_demand_block (key, size, data, - type, priority, anonymity, - expiration, uid, - &process_migration_content, - NULL)) - { - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); - } - return; - } -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Retrieved block `%s' of type %u for migration\n", - GNUNET_h2s (key), - type); -#endif - mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size); - mb->query = *key; - mb->expiration = expiration; - mb->size = size; - mb->type = type; - memcpy (&mb[1], data, size); - GNUNET_CONTAINER_DLL_insert_after (mig_head, - mig_tail, - mig_tail, - mb); - mig_size++; - GNUNET_CONTAINER_multihashmap_iterate (connected_peers, - &consider_migration, - mb); - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + struct GSF_PendingRequest *pr; + int ret; + + pr = NULL; + ret = GSF_local_client_start_search_handler_ (client, message, &pr); + switch (ret) + { + case GNUNET_SYSERR: + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + break; + case GNUNET_NO: + GNUNET_SERVER_receive_done (client, GNUNET_OK); + break; + case GNUNET_YES: + GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES; + GSF_local_lookup_ (pr, &start_p2p_processing, client); + break; + default: + GNUNET_assert (0); + } } /** - * Task that is run periodically to obtain blocks for content - * migration - * + * Task run during shutdown. + * * @param cls unused - * @param tc scheduler context (also unused) + * @param tc unused */ static void -gather_migration_blocks (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - mig_task = GNUNET_SCHEDULER_NO_TASK; - if (dsh != NULL) - { - mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_migration_content, NULL); - GNUNET_assert (mig_qe != NULL); - } +shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + if (NULL != GSF_core) + { + GNUNET_CORE_disconnect (GSF_core); + GSF_core = NULL; + } + GSF_put_done_ (); + GSF_push_done_ (); + GSF_pending_request_done_ (); + GSF_plan_done (); + GSF_connected_peer_done_ (); + GNUNET_DATASTORE_disconnect (GSF_dsh, GNUNET_NO); + GSF_dsh = NULL; + GNUNET_DHT_disconnect (GSF_dht); + GSF_dht = NULL; + GNUNET_BLOCK_context_destroy (GSF_block_ctx); + GSF_block_ctx = NULL; + GNUNET_CONFIGURATION_destroy (block_cfg); + block_cfg = NULL; + GNUNET_STATISTICS_destroy (GSF_stats, GNUNET_NO); + GSF_stats = NULL; + if (GNUNET_SCHEDULER_NO_TASK != cover_age_task) + { + GNUNET_SCHEDULER_cancel (cover_age_task); + cover_age_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_FS_indexing_done (); + GNUNET_LOAD_value_free (datastore_get_load); + datastore_get_load = NULL; + GNUNET_LOAD_value_free (GSF_rt_entry_lifetime); + GSF_rt_entry_lifetime = NULL; } /** - * We're done with a particular message list entry. - * Free all associated resources. - * - * @param pml entry to destroy + * Function called for each pending request whenever a new + * peer connects, giving us a chance to decide about submitting + * the existing request to the new peer. + * + * @param cls the 'struct GSF_ConnectedPeer' of the new peer + * @param key query for the request + * @param pr handle to the pending request + * @return GNUNET_YES to continue to iterate */ -static void -destroy_pending_message_list_entry (struct PendingMessageList *pml) +static int +consider_peer_for_forwarding (void *cls, const GNUNET_HashCode * key, + struct GSF_PendingRequest *pr) { - GNUNET_CONTAINER_DLL_remove (pml->req->pending_head, - pml->req->pending_tail, - pml); - GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head, - pml->target->pending_messages_tail, - pml->pm); - pml->target->pending_requests--; - GNUNET_free (pml->pm); - GNUNET_free (pml); + struct GSF_ConnectedPeer *cp = cls; + struct GNUNET_PeerIdentity pid; + + GSF_connected_peer_get_identity_ (cp, &pid); + if (GNUNET_YES != GSF_pending_request_test_target_ (pr, &pid)) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Loopback routes suppressed"), 1, + GNUNET_NO); + return GNUNET_YES; + } + GSF_plan_add_ (cp, pr); + return GNUNET_YES; } /** - * Destroy the given pending message (and call the respective - * continuation). + * Method called whenever a given peer connects. * - * @param pm message to destroy - * @param tpid id of peer that the message was delivered to, or 0 for none + * @param cls closure, not used + * @param peer peer identity this notification is about + * @param atsi performance information + * @param atsi_count number of records in 'atsi' */ static void -destroy_pending_message (struct PendingMessage *pm, - GNUNET_PEER_Id tpid) +peer_connect_handler (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) { - struct PendingMessageList *pml = pm->pml; - TransmissionContinuation cont; - void *cont_cls; + struct GSF_ConnectedPeer *cp; - if (pml != NULL) - { - GNUNET_assert (pml->pm == pm); - GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) ); - cont = pm->cont; - cont_cls = pm->cont_cls; - destroy_pending_message_list_entry (pml); - } - else - { - GNUNET_free (pm); - } - if (cont != NULL) - cont (cont_cls, tpid); + if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity))) + return; + cp = GSF_peer_connect_handler_ (peer, atsi, atsi_count); + if (NULL == cp) + return; + GSF_iterate_pending_requests_ (&consider_peer_for_forwarding, cp); } /** - * We're done processing a particular request. - * Free all associated resources. + * Function called after GNUNET_CORE_connect has succeeded + * (or failed for good). Note that the private key of the + * peer is intentionally not exposed here; if you need it, + * your process should try to read the private key file + * directly (which should work if you are authorized...). * - * @param pr request to destroy + * @param cls closure + * @param server handle to the server, NULL if we failed + * @param my_identity ID of this peer, NULL if we failed */ static void -destroy_pending_request (struct PendingRequest *pr) +peer_init_handler (void *cls, struct GNUNET_CORE_Handle *server, + const struct GNUNET_PeerIdentity *my_identity) { - struct GNUNET_PeerIdentity pid; - - if (pr->hnode != NULL) - { - GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap, - pr->hnode); - pr->hnode = NULL; - } - if (NULL == pr->client_request_list) - { - GNUNET_STATISTICS_update (stats, - gettext_noop ("# P2P searches active"), - -1, - GNUNET_NO); - } - else - { - GNUNET_STATISTICS_update (stats, - gettext_noop ("# client searches active"), - -1, - GNUNET_NO); - } - /* might have already been removed from map in 'process_reply' (if - there was a unique reply) or never inserted if it was a - duplicate; hence ignore the return value here */ - (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map, - &pr->query, - pr); - if (pr->qe != NULL) - { - GNUNET_DATASTORE_cancel (pr->qe); - pr->qe = NULL; - } - if (pr->dht_get != NULL) - { - GNUNET_DHT_get_stop (pr->dht_get); - pr->dht_get = NULL; - } - if (pr->client_request_list != NULL) - { - GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head, - pr->client_request_list->client_list->rl_tail, - pr->client_request_list); - GNUNET_free (pr->client_request_list); - pr->client_request_list = NULL; - } - if (pr->cp != NULL) - { - GNUNET_PEER_resolve (pr->cp->pid, - &pid); - (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map, - &pid.hashPubKey, - pr); - pr->cp = NULL; - } - if (pr->bf != NULL) - { - GNUNET_CONTAINER_bloomfilter_free (pr->bf); - pr->bf = NULL; - } - if (pr->irc != NULL) - { - GNUNET_CORE_peer_change_preference_cancel (pr->irc); - pr->irc = NULL; - } - if (pr->replies_seen != NULL) - { - GNUNET_free (pr->replies_seen); - pr->replies_seen = NULL; - } - if (pr->task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (sched, - pr->task); - pr->task = GNUNET_SCHEDULER_NO_TASK; - } - while (NULL != pr->pending_head) - destroy_pending_message_list_entry (pr->pending_head); - GNUNET_PEER_change_rc (pr->target_pid, -1); - if (pr->used_pids != NULL) - { - GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off); - GNUNET_free (pr->used_pids); - pr->used_pids_off = 0; - pr->used_pids_size = 0; - pr->used_pids = NULL; - } - GNUNET_free (pr); + my_id = *my_identity; } -/** - * Method called whenever a given peer connects. - * - * @param cls closure, not used - * @param peer peer identity this notification is about - * @param latency reported latency of the connection with 'other' - * @param distance reported distance (DV) to 'other' - */ -static void -peer_connect_handler (void *cls, - const struct - GNUNET_PeerIdentity * peer, - struct GNUNET_TIME_Relative latency, - uint32_t distance) -{ - struct ConnectedPeer *cp; - struct MigrationReadyBlock *pos; - char *fn; - uint32_t trust; - - cp = GNUNET_malloc (sizeof (struct ConnectedPeer)); - cp->pid = GNUNET_PEER_intern (peer); - - fn = get_trust_filename (peer); - if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) && - (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust)))) - cp->disk_trust = cp->trust = ntohl (trust); - GNUNET_free (fn); - - GNUNET_break (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (connected_peers, - &peer->hashPubKey, - cp, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - - pos = mig_head; - while (NULL != pos) - { - (void) consider_migration (pos, &peer->hashPubKey, cp); - pos = pos->next; - } -} - - -/** - * Increase the host credit by a value. - * - * @param host which peer to change the trust value on - * @param value is the int value by which the - * host credit is to be increased or decreased - * @returns the actual change in trust (positive or negative) - */ -static int -change_host_trust (struct ConnectedPeer *host, int value) -{ - unsigned int old_trust; - - if (value == 0) - return 0; - GNUNET_assert (host != NULL); - old_trust = host->trust; - if (value > 0) - { - if (host->trust + value < host->trust) - { - value = UINT32_MAX - host->trust; - host->trust = UINT32_MAX; - } - else - host->trust += value; - } - else - { - if (host->trust < -value) - { - value = -host->trust; - host->trust = 0; - } - else - host->trust += value; - } - return value; -} - - -/** - * Write host-trust information to a file - flush the buffer entry! - */ -static int -flush_trust (void *cls, - const GNUNET_HashCode *key, - void *value) -{ - struct ConnectedPeer *host = value; - char *fn; - uint32_t trust; - struct GNUNET_PeerIdentity pid; - - if (host->trust == host->disk_trust) - return GNUNET_OK; /* unchanged */ - GNUNET_PEER_resolve (host->pid, - &pid); - fn = get_trust_filename (&pid); - if (host->trust == 0) - { - if ((0 != UNLINK (fn)) && (errno != ENOENT)) - GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING | - GNUNET_ERROR_TYPE_BULK, "unlink", fn); - } - else - { - trust = htonl (host->trust); - if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, - sizeof(uint32_t), - GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE - | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ)) - host->disk_trust = host->trust; - } - GNUNET_free (fn); - return GNUNET_OK; -} - -/** - * Call this method periodically to scan data/hosts for new hosts. - */ -static void -cron_flush_trust (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - - if (NULL == connected_peers) - return; - GNUNET_CONTAINER_multihashmap_iterate (connected_peers, - &flush_trust, - NULL); - if (NULL == tc) - return; - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - GNUNET_SCHEDULER_add_delayed (tc->sched, - TRUST_FLUSH_FREQ, &cron_flush_trust, NULL); -} - - -/** - * Free (each) request made by the peer. - * - * @param cls closure, points to peer that the request belongs to - * @param key current key code - * @param value value in the hash map - * @return GNUNET_YES (we should continue to iterate) - */ -static int -destroy_request (void *cls, - const GNUNET_HashCode * key, - void *value) -{ - const struct GNUNET_PeerIdentity * peer = cls; - struct PendingRequest *pr = value; - - GNUNET_break (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (peer_request_map, - &peer->hashPubKey, - pr)); - destroy_pending_request (pr); - return GNUNET_YES; -} - - -/** - * Method called whenever a peer disconnects. - * - * @param cls closure, not used - * @param peer peer identity this notification is about - */ -static void -peer_disconnect_handler (void *cls, - const struct - GNUNET_PeerIdentity * peer) -{ - struct ConnectedPeer *cp; - struct PendingMessage *pm; - unsigned int i; - struct MigrationReadyBlock *pos; - struct MigrationReadyBlock *next; - - GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map, - &peer->hashPubKey, - &destroy_request, - (void*) peer); - cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, - &peer->hashPubKey); - if (cp == NULL) - return; - for (i=0;ilast_client_replies[i]) - { - GNUNET_SERVER_client_drop (cp->last_client_replies[i]); - cp->last_client_replies[i] = NULL; - } - } - GNUNET_break (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (connected_peers, - &peer->hashPubKey, - cp)); - /* remove this peer from migration considerations; schedule - alternatives */ - next = mig_head; - while (NULL != (pos = next)) - { - next = pos->next; - for (i=0;itarget_list[i] == cp->pid) - { - GNUNET_PEER_change_rc (pos->target_list[i], -1); - pos->target_list[i] = 0; - } - } - if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) - { - delete_migration_block (pos); - consider_migration_gathering (); - continue; - } - GNUNET_CONTAINER_multihashmap_iterate (connected_peers, - &consider_migration, - pos); - } - GNUNET_PEER_change_rc (cp->pid, -1); - GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); - if (NULL != cp->cth) - GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); - while (NULL != (pm = cp->pending_messages_head)) - destroy_pending_message (pm, 0 /* delivery failed */); - GNUNET_break (0 == cp->pending_requests); - GNUNET_free (cp); -} - - -/** - * Iterator over hash map entries that removes all occurences - * of the given 'client' from the 'last_client_replies' of the - * given connected peer. - * - * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove - * @param key current key code (unused) - * @param value value in the hash map (the 'struct ConnectedPeer*' to change) - * @return GNUNET_YES (we should continue to iterate) - */ -static int -remove_client_from_last_client_replies (void *cls, - const GNUNET_HashCode * key, - void *value) -{ - struct GNUNET_SERVER_Client *client = cls; - struct ConnectedPeer *cp = value; - unsigned int i; - - for (i=0;ilast_client_replies[i] == client) - { - GNUNET_SERVER_client_drop (cp->last_client_replies[i]); - cp->last_client_replies[i] = NULL; - } - } - return GNUNET_YES; -} - - -/** - * A client disconnected. Remove all of its pending queries. - * - * @param cls closure, NULL - * @param client identification of the client - */ -static void -handle_client_disconnect (void *cls, - struct GNUNET_SERVER_Client - * client) -{ - struct ClientList *pos; - struct ClientList *prev; - struct ClientRequestList *rcl; - struct ClientResponseMessage *creply; - - if (client == NULL) - return; - prev = NULL; - pos = client_list; - while ( (NULL != pos) && - (pos->client != client) ) - { - prev = pos; - pos = pos->next; - } - if (pos == NULL) - return; /* no requests pending for this client */ - while (NULL != (rcl = pos->rl_head)) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Destroying pending request `%s' on disconnect\n", - GNUNET_h2s (&rcl->req->query)); - destroy_pending_request (rcl->req); - } - if (prev == NULL) - client_list = pos->next; - else - prev->next = pos->next; - if (pos->th != NULL) - { - GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th); - pos->th = NULL; - } - while (NULL != (creply = pos->res_head)) - { - GNUNET_CONTAINER_DLL_remove (pos->res_head, - pos->res_tail, - creply); - GNUNET_free (creply); - } - GNUNET_SERVER_client_drop (pos->client); - GNUNET_free (pos); - GNUNET_CONTAINER_multihashmap_iterate (connected_peers, - &remove_client_from_last_client_replies, - client); -} - - -/** - * Iterator to free peer entries. - * - * @param cls closure, unused - * @param key current key code - * @param value value in the hash map (peer entry) - * @return GNUNET_YES (we should continue to iterate) - */ -static int -clean_peer (void *cls, - const GNUNET_HashCode * key, - void *value) -{ - peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key); - return GNUNET_YES; -} - - -/** - * Task run during shutdown. - * - * @param cls unused - * @param tc unused - */ -static void -shutdown_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - if (mig_qe != NULL) - { - GNUNET_DATASTORE_cancel (mig_qe); - mig_qe = NULL; - } - if (GNUNET_SCHEDULER_NO_TASK != mig_task) - { - GNUNET_SCHEDULER_cancel (sched, mig_task); - mig_task = GNUNET_SCHEDULER_NO_TASK; - } - while (client_list != NULL) - handle_client_disconnect (NULL, - client_list->client); - cron_flush_trust (NULL, NULL); - GNUNET_CONTAINER_multihashmap_iterate (connected_peers, - &clean_peer, - NULL); - GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap)); - GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap); - requests_by_expiration_heap = 0; - GNUNET_CONTAINER_multihashmap_destroy (connected_peers); - connected_peers = NULL; - GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map)); - GNUNET_CONTAINER_multihashmap_destroy (query_request_map); - query_request_map = NULL; - GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map)); - GNUNET_CONTAINER_multihashmap_destroy (peer_request_map); - peer_request_map = NULL; - GNUNET_assert (NULL != core); - GNUNET_CORE_disconnect (core); - core = NULL; - if (stats != NULL) - { - GNUNET_STATISTICS_destroy (stats, GNUNET_NO); - stats = NULL; - } - if (dsh != NULL) - { - GNUNET_DATASTORE_disconnect (dsh, - GNUNET_NO); - dsh = NULL; - } - while (mig_head != NULL) - delete_migration_block (mig_head); - GNUNET_assert (0 == mig_size); - GNUNET_DHT_disconnect (dht_handle); - dht_handle = NULL; - GNUNET_LOAD_value_free (datastore_get_load); - datastore_get_load = NULL; - GNUNET_LOAD_value_free (datastore_put_load); - datastore_put_load = NULL; - GNUNET_BLOCK_context_destroy (block_ctx); - block_ctx = NULL; - GNUNET_CONFIGURATION_destroy (block_cfg); - block_cfg = NULL; - sched = NULL; - cfg = NULL; - GNUNET_free_non_null (trustDirectory); - trustDirectory = NULL; -} - - -/* ******************* Utility functions ******************** */ - - -/** - * Transmit messages by copying it to the target buffer - * "buf". "buf" will be NULL and "size" zero if the socket was closed - * for writing in the meantime. In that case, do nothing - * (the disconnect or shutdown handler will take care of the rest). - * If we were able to transmit messages and there are still more - * pending, ask core again for further calls to this function. - * - * @param cls closure, pointer to the 'struct ConnectedPeer*' - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_to_peer (void *cls, - size_t size, void *buf) -{ - struct ConnectedPeer *cp = cls; - char *cbuf = buf; - struct GNUNET_PeerIdentity pid; - struct PendingMessage *pm; - struct MigrationReadyBlock *mb; - struct MigrationReadyBlock *next; - struct PutMessage migm; - size_t msize; - unsigned int i; - - cp->cth = NULL; - if (NULL == buf) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dropping message, core too busy.\n"); -#endif - return 0; - } - msize = 0; - while ( (NULL != (pm = cp->pending_messages_head) ) && - (pm->msize <= size) ) - { - memcpy (&cbuf[msize], &pm[1], pm->msize); - msize += pm->msize; - size -= pm->msize; - destroy_pending_message (pm, cp->pid); - } - if (NULL != pm) - { - GNUNET_PEER_resolve (cp->pid, - &pid); - cp->cth = GNUNET_CORE_notify_transmit_ready (core, - pm->priority, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - &pid, - pm->msize, - &transmit_to_peer, - cp); - } - else - { - next = mig_head; - while (NULL != (mb = next)) - { - next = mb->next; - for (i=0;ipid == mb->target_list[i]) && - (mb->size + sizeof (migm) <= size) ) - { - GNUNET_PEER_change_rc (mb->target_list[i], -1); - mb->target_list[i] = 0; - mb->used_targets++; - memset (&migm, 0, sizeof (migm)); - migm.header.size = htons (sizeof (migm) + mb->size); - migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); - migm.type = htonl (mb->type); - migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration); - memcpy (&cbuf[msize], &migm, sizeof (migm)); - msize += sizeof (migm); - size -= sizeof (migm); - memcpy (&cbuf[msize], &mb[1], mb->size); - msize += mb->size; - size -= mb->size; -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Pushing migration block `%s' (%u bytes) to `%s'\n", - GNUNET_h2s (&mb->query), - mb->size, - GNUNET_i2s (&pid)); -#endif - break; - } - else - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n", - GNUNET_h2s (&mb->query), - mb->size, - GNUNET_i2s (&pid)); -#endif - } - } - if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) || - (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) ) - { - delete_migration_block (mb); - consider_migration_gathering (); - } - } - consider_migration (NULL, - &pid.hashPubKey, - cp); - } -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u bytes to peer %u\n", - msize, - cp->pid); -#endif - return msize; -} - - -/** - * Add a message to the set of pending messages for the given peer. - * - * @param cp peer to send message to - * @param pm message to queue - * @param pr request on which behalf this message is being queued - */ -static void -add_to_pending_messages_for_peer (struct ConnectedPeer *cp, - struct PendingMessage *pm, - struct PendingRequest *pr) -{ - struct PendingMessage *pos; - struct PendingMessageList *pml; - struct GNUNET_PeerIdentity pid; - - GNUNET_assert (pm->next == NULL); - GNUNET_assert (pm->pml == NULL); - if (pr != NULL) - { - pml = GNUNET_malloc (sizeof (struct PendingMessageList)); - pml->req = pr; - pml->target = cp; - pml->pm = pm; - pm->pml = pml; - GNUNET_CONTAINER_DLL_insert (pr->pending_head, - pr->pending_tail, - pml); - } - pos = cp->pending_messages_head; - while ( (pos != NULL) && - (pm->priority < pos->priority) ) - pos = pos->next; - GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head, - cp->pending_messages_tail, - pos, - pm); - cp->pending_requests++; - if (cp->pending_requests > MAX_QUEUE_PER_PEER) - destroy_pending_message (cp->pending_messages_tail, 0); - GNUNET_PEER_resolve (cp->pid, &pid); - if (NULL != cp->cth) - GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); - /* need to schedule transmission */ - cp->cth = GNUNET_CORE_notify_transmit_ready (core, - cp->pending_messages_head->priority, - MAX_TRANSMIT_DELAY, - &pid, - cp->pending_messages_head->msize, - &transmit_to_peer, - cp); - if (cp->cth == NULL) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to schedule transmission with core!\n"); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# CORE transmission failures"), - 1, - GNUNET_NO); - } -} - - -/** - * Test if the load on this peer is too high - * to even consider processing the query at - * all. - * - * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low) - */ -static int -test_load_too_high () -{ - return GNUNET_SYSERR; // FIXME -} - - -/* ******************* Pending Request Refresh Task ******************** */ - - - -/** - * We use a random delay to make the timing of requests less - * predictable. This function returns such a random delay. We add a base - * delay of MAX_CORK_DELAY (1s). - * - * FIXME: make schedule dependent on the specifics of the request? - * Or bandwidth and number of connected peers and load? - * - * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms - */ -static struct GNUNET_TIME_Relative -get_processing_delay () -{ - return - GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY, - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - TTL_DECREMENT))); -} - - -/** - * We're processing a GET request from another peer and have decided - * to forward it to other peers. This function is called periodically - * and should forward the request to other peers until we have all - * possible replies. If we have transmitted the *only* reply to - * the initiator we should destroy the pending request. If we have - * many replies in the queue to the initiator, we should delay sending - * out more queries until the reply queue has shrunk some. - * - * @param cls our "struct ProcessGetContext *" - * @param tc unused - */ -static void -forward_request_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc); - - -/** - * Function called after we either failed or succeeded - * at transmitting a query to a peer. - * - * @param cls the requests "struct PendingRequest*" - * @param tpid ID of receiving peer, 0 on transmission error - */ -static void -transmit_query_continuation (void *cls, - GNUNET_PEER_Id tpid) -{ - struct PendingRequest *pr = cls; - - GNUNET_STATISTICS_update (stats, - gettext_noop ("# queries scheduled for forwarding"), - -1, - GNUNET_NO); - if (tpid == 0) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission of request failed, will try again later.\n"); -#endif - if (pr->task == GNUNET_SCHEDULER_NO_TASK) - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - get_processing_delay (), - &forward_request_task, - pr); - return; - } - GNUNET_STATISTICS_update (stats, - gettext_noop ("# queries forwarded"), - 1, - GNUNET_NO); - GNUNET_PEER_change_rc (tpid, 1); - if (pr->used_pids_off == pr->used_pids_size) - GNUNET_array_grow (pr->used_pids, - pr->used_pids_size, - pr->used_pids_size * 2 + 2); - pr->used_pids[pr->used_pids_off++] = tpid; - if (pr->task == GNUNET_SCHEDULER_NO_TASK) - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - get_processing_delay (), - &forward_request_task, - pr); -} - - -/** - * How many bytes should a bloomfilter be if we have already seen - * entry_count responses? Note that BLOOMFILTER_K gives us the number - * of bits set per entry. Furthermore, we should not re-size the - * filter too often (to keep it cheap). - * - * Since other peers will also add entries but not resize the filter, - * we should generally pick a slightly larger size than what the - * strict math would suggest. - * - * @return must be a power of two and smaller or equal to 2^15. - */ -static size_t -compute_bloomfilter_size (unsigned int entry_count) -{ - size_t size; - unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4; - uint16_t max = 1 << 15; - - if (entry_count > max) - return max; - size = 8; - while ((size < max) && (size < ideal)) - size *= 2; - if (size > max) - return max; - return size; -} - - -/** - * Recalculate our bloom filter for filtering replies. This function - * will create a new bloom filter from scratch, so it should only be - * called if we have no bloomfilter at all (and hence can create a - * fresh one of minimal size without problems) OR if our peer is the - * initiator (in which case we may resize to larger than mimimum size). - * - * @param pr request for which the BF is to be recomputed - */ -static void -refresh_bloomfilter (struct PendingRequest *pr) -{ - unsigned int i; - size_t nsize; - GNUNET_HashCode mhash; - - nsize = compute_bloomfilter_size (pr->replies_seen_off); - if (nsize == pr->bf_size) - return; /* size not changed */ - if (pr->bf != NULL) - GNUNET_CONTAINER_bloomfilter_free (pr->bf); - pr->bf_size = nsize; - pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1); - pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, - pr->bf_size, - BLOOMFILTER_K); - for (i=0;ireplies_seen_off;i++) - { - GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i], - pr->mingle, - &mhash); - GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash); - } -} - - -/** - * Function called after we've tried to reserve a certain amount of - * bandwidth for a reply. Check if we succeeded and if so send our - * query. - * - * @param cls the requests "struct PendingRequest*" - * @param peer identifies the peer - * @param bpm_in set to the current bandwidth limit (receiving) for this peer - * @param bpm_out set to the current bandwidth limit (sending) for this peer - * @param amount set to the amount that was actually reserved or unreserved - * @param preference current traffic preference for the given peer - */ -static void -target_reservation_cb (void *cls, - const struct - GNUNET_PeerIdentity * peer, - struct GNUNET_BANDWIDTH_Value32NBO bpm_in, - struct GNUNET_BANDWIDTH_Value32NBO bpm_out, - int amount, - uint64_t preference) -{ - struct PendingRequest *pr = cls; - struct ConnectedPeer *cp; - struct PendingMessage *pm; - struct GetMessage *gm; - GNUNET_HashCode *ext; - char *bfdata; - size_t msize; - unsigned int k; - int no_route; - uint32_t bm; - - pr->irc = NULL; - if (peer == NULL) - { - /* error in communication with core, try again later */ - if (pr->task == GNUNET_SCHEDULER_NO_TASK) - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - get_processing_delay (), - &forward_request_task, - pr); - return; - } - // (3) transmit, update ttl/priority - cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, - &peer->hashPubKey); - if (cp == NULL) - { - /* Peer must have just left */ -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Selected peer disconnected!\n"); -#endif - if (pr->task == GNUNET_SCHEDULER_NO_TASK) - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - get_processing_delay (), - &forward_request_task, - pr); - return; - } - no_route = GNUNET_NO; - if (amount == 0) - { - if (pr->cp == NULL) - { -#if DEBUG_FS > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n", - amount, - DBLOCK_SIZE); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# reply bandwidth reservation requests failed"), - 1, - GNUNET_NO); - if (pr->task == GNUNET_SCHEDULER_NO_TASK) - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - get_processing_delay (), - &forward_request_task, - pr); - return; /* this target round failed */ - } - /* FIXME: if we are "quite" busy, we may still want to skip - this round; need more load detection code! */ - no_route = GNUNET_YES; - } - - GNUNET_STATISTICS_update (stats, - gettext_noop ("# queries scheduled for forwarding"), - 1, - GNUNET_NO); - /* build message and insert message into priority queue */ -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Forwarding request `%s' to `%4s'!\n", - GNUNET_h2s (&pr->query), - GNUNET_i2s (peer)); -#endif - k = 0; - bm = 0; - if (GNUNET_YES == no_route) - { - bm |= GET_MESSAGE_BIT_RETURN_TO; - k++; - } - if (pr->namespace != NULL) - { - bm |= GET_MESSAGE_BIT_SKS_NAMESPACE; - k++; - } - if (pr->target_pid != 0) - { - bm |= GET_MESSAGE_BIT_TRANSMIT_TO; - k++; - } - msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode); - GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); - pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); - pm->msize = msize; - gm = (struct GetMessage*) &pm[1]; - gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); - gm->header.size = htons (msize); - gm->type = htonl (pr->type); - pr->remaining_priority /= 2; - gm->priority = htonl (pr->remaining_priority); - gm->ttl = htonl (pr->ttl); - gm->filter_mutator = htonl(pr->mingle); - gm->hash_bitmap = htonl (bm); - gm->query = pr->query; - ext = (GNUNET_HashCode*) &gm[1]; - k = 0; - if (GNUNET_YES == no_route) - GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]); - if (pr->namespace != NULL) - memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode)); - if (pr->target_pid != 0) - GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]); - bfdata = (char *) &ext[k]; - if (pr->bf != NULL) - GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, - bfdata, - pr->bf_size); - pm->cont = &transmit_query_continuation; - pm->cont_cls = pr; - add_to_pending_messages_for_peer (cp, pm, pr); -} - - -/** - * Closure used for "target_peer_select_cb". - */ -struct PeerSelectionContext -{ - /** - * The request for which we are selecting - * peers. - */ - struct PendingRequest *pr; - - /** - * Current "prime" target. - */ - struct GNUNET_PeerIdentity target; - - /** - * How much do we like this target? - */ - double target_score; - -}; - - -/** - * Function called for each connected peer to determine - * which one(s) would make good targets for forwarding. - * - * @param cls closure (struct PeerSelectionContext) - * @param key current key code (peer identity) - * @param value value in the hash map (struct ConnectedPeer) - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. - */ -static int -target_peer_select_cb (void *cls, - const GNUNET_HashCode * key, - void *value) -{ - struct PeerSelectionContext *psc = cls; - struct ConnectedPeer *cp = value; - struct PendingRequest *pr = psc->pr; - double score; - unsigned int i; - unsigned int pc; - - /* 1) check that this peer is not the initiator */ - if (cp == pr->cp) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Skipping initiator in forwarding selection\n"); -#endif - return GNUNET_YES; /* skip */ - } - - /* 2) check if we have already (recently) forwarded to this peer */ - pc = 0; - for (i=0;iused_pids_off;i++) - if (pr->used_pids[i] == cp->pid) - { - pc++; - if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - RETRY_PROBABILITY_INV)) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "NOT re-trying query that was previously transmitted %u times\n", - (unsigned int) pr->used_pids_off); -#endif - return GNUNET_YES; /* skip */ - } - } -#if DEBUG_FS - if (0 < pc) - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Re-trying query that was previously transmitted %u times to this peer\n", - (unsigned int) pc); -#endif - /* 3) calculate how much we'd like to forward to this peer, - starting with a random value that is strong enough - to at least give any peer a chance sometimes - (compared to the other factors that come later) */ - /* 3a) count successful (recent) routes from cp for same source */ - if (pr->cp != NULL) - { - score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - P2P_SUCCESS_LIST_SIZE); - for (i=0;ilast_p2p_replies[i] == pr->cp->pid) - score += 1; /* likely successful based on hot path */ - } - else - { - score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - CS2P_SUCCESS_LIST_SIZE); - for (i=0;ilast_client_replies[i] == pr->client_request_list->client_list->client) - score += 1; /* likely successful based on hot path */ - } - /* 3b) include latency */ - if (cp->avg_delay.value < 4 * TTL_DECREMENT) - score += 1; /* likely fast based on latency */ - /* 3c) include priorities */ - if (cp->avg_priority <= pr->remaining_priority / 2.0) - score += 1; /* likely successful based on priorities */ - /* 3d) penalize for queue size */ - score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); - /* 3e) include peer proximity */ - score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key, - &pr->query)) / (double) UINT32_MAX); - /* 4) super-bonus for being the known target */ - if (pr->target_pid == cp->pid) - score += 100.0; - /* store best-fit in closure */ -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer `%s' gets score %f for forwarding query, max is %f\n", - GNUNET_h2s (key), - score, - psc->target_score); -#endif - score++; /* avoid zero */ - if (score > psc->target_score) - { - psc->target_score = score; - psc->target.hashPubKey = *key; - } - return GNUNET_YES; -} - - -/** - * The priority level imposes a bound on the maximum - * value for the ttl that can be requested. - * - * @param ttl_in requested ttl - * @param prio given priority - * @return ttl_in if ttl_in is below the limit, - * otherwise the ttl-limit for the given priority - */ -static int32_t -bound_ttl (int32_t ttl_in, uint32_t prio) -{ - unsigned long long allowed; - - if (ttl_in <= 0) - return ttl_in; - allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; - if (ttl_in > allowed) - { - if (allowed >= (1 << 30)) - return 1 << 30; - return allowed; - } - return ttl_in; -} - - -/** - * Iterator called on each result obtained for a DHT - * operation that expects a reply - * - * @param cls closure - * @param exp when will this value expire - * @param key key of the result - * @param get_path NULL-terminated array of pointers - * to the peers on reverse GET path (or NULL if not recorded) - * @param put_path NULL-terminated array of pointers - * to the peers on the PUT path (or NULL if not recorded) - * @param type type of the result - * @param size number of bytes in data - * @param data pointer to the result data - */ -static void -process_dht_reply (void *cls, - struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode * key, - const struct GNUNET_PeerIdentity * const *get_path, - const struct GNUNET_PeerIdentity * const *put_path, - enum GNUNET_BLOCK_Type type, - size_t size, - const void *data); - - -/** - * We're processing a GET request and have decided - * to forward it to other peers. This function is called periodically - * and should forward the request to other peers until we have all - * possible replies. If we have transmitted the *only* reply to - * the initiator we should destroy the pending request. If we have - * many replies in the queue to the initiator, we should delay sending - * out more queries until the reply queue has shrunk some. - * - * @param cls our "struct ProcessGetContext *" - * @param tc unused - */ -static void -forward_request_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct PendingRequest *pr = cls; - struct PeerSelectionContext psc; - struct ConnectedPeer *cp; - struct GNUNET_TIME_Relative delay; - - pr->task = GNUNET_SCHEDULER_NO_TASK; - if (pr->irc != NULL) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Forwarding of query `%s' not attempted due to pending local lookup!\n", - GNUNET_h2s (&pr->query)); -#endif - return; /* already pending */ - } - if (GNUNET_YES == pr->local_only) - return; /* configured to not do P2P search */ - /* (0) try DHT */ - if ( (0 == pr->anonymity_level) && - (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) && - (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) ) - { - pr->dht_get = GNUNET_DHT_get_start (dht_handle, - GNUNET_TIME_UNIT_FOREVER_REL, - pr->type, - &pr->query, - GNUNET_DHT_RO_NONE, - pr->bf, - pr->mingle, - pr->namespace, - (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0, - &process_dht_reply, - pr); - } - /* (1) select target */ - psc.pr = pr; - psc.target_score = -DBL_MAX; - GNUNET_CONTAINER_multihashmap_iterate (connected_peers, - &target_peer_select_cb, - &psc); - if (psc.target_score == -DBL_MAX) - { - delay = get_processing_delay (); -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No peer selected for forwarding of query `%s', will try again in %llu ms!\n", - GNUNET_h2s (&pr->query), - delay.value); -#endif - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - delay, - &forward_request_task, - pr); - return; /* nobody selected */ - } - /* (3) update TTL/priority */ - if (pr->client_request_list != NULL) - { - /* FIXME: use better algorithm!? */ - if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - 4)) - pr->priority++; - /* bound priority we use by priorities we see from other peers - rounded up (must round up so that we can see non-zero - priorities, but round up as little as possible to make it - plausible that we forwarded another peers request) */ - if (pr->priority > current_priorities + 1.0) - pr->priority = (uint32_t) current_priorities + 1.0; - pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2, - pr->priority); -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Trying query `%s' with priority %u and TTL %d.\n", - GNUNET_h2s (&pr->query), - pr->priority, - pr->ttl); -#endif - } - - /* (3) reserve reply bandwidth */ - cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, - &psc.target.hashPubKey); - GNUNET_assert (NULL != cp); - pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg, - &psc.target, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - GNUNET_BANDWIDTH_value_init (UINT32_MAX), - DBLOCK_SIZE * 2, - cp->inc_preference, - &target_reservation_cb, - pr); - cp->inc_preference = 0; -} - - -/* **************************** P2P PUT Handling ************************ */ - - -/** - * Function called after we either failed or succeeded - * at transmitting a reply to a peer. - * - * @param cls the requests "struct PendingRequest*" - * @param tpid ID of receiving peer, 0 on transmission error - */ -static void -transmit_reply_continuation (void *cls, - GNUNET_PEER_Id tpid) -{ - struct PendingRequest *pr = cls; - - switch (pr->type) - { - case GNUNET_BLOCK_TYPE_FS_DBLOCK: - case GNUNET_BLOCK_TYPE_FS_IBLOCK: - /* only one reply expected, done with the request! */ - destroy_pending_request (pr); - break; - case GNUNET_BLOCK_TYPE_ANY: - case GNUNET_BLOCK_TYPE_FS_KBLOCK: - case GNUNET_BLOCK_TYPE_FS_SBLOCK: - break; - default: - GNUNET_break (0); - break; - } -} - - -/** - * Transmit the given message by copying it to the target buffer - * "buf". "buf" will be NULL and "size" zero if the socket was closed - * for writing in the meantime. In that case, do nothing - * (the disconnect or shutdown handler will take care of the rest). - * If we were able to transmit messages and there are still more - * pending, ask core again for further calls to this function. - * - * @param cls closure, pointer to the 'struct ClientList*' - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_to_client (void *cls, - size_t size, void *buf) -{ - struct ClientList *cl = cls; - char *cbuf = buf; - struct ClientResponseMessage *creply; - size_t msize; - - cl->th = NULL; - if (NULL == buf) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not sending reply, client communication problem.\n"); -#endif - return 0; - } - msize = 0; - while ( (NULL != (creply = cl->res_head) ) && - (creply->msize <= size) ) - { - memcpy (&cbuf[msize], &creply[1], creply->msize); - msize += creply->msize; - size -= creply->msize; - GNUNET_CONTAINER_DLL_remove (cl->res_head, - cl->res_tail, - creply); - GNUNET_free (creply); - } - if (NULL != creply) - cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client, - creply->msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_to_client, - cl); -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitted %u bytes to client\n", - (unsigned int) msize); -#endif - return msize; -} - - -/** - * Closure for "process_reply" function. - */ -struct ProcessReplyClosure -{ - /** - * The data for the reply. - */ - const void *data; - - /** - * Who gave us this reply? NULL for local host (or DHT) - */ - struct ConnectedPeer *sender; - - /** - * When the reply expires. - */ - struct GNUNET_TIME_Absolute expiration; - - /** - * Size of data. - */ - size_t size; - - /** - * Type of the block. - */ - enum GNUNET_BLOCK_Type type; - - /** - * How much was this reply worth to us? - */ - uint32_t priority; - - /** - * Evaluation result (returned). - */ - enum GNUNET_BLOCK_EvaluationResult eval; - - /** - * Did we finish processing the associated request? - */ - int finished; - - /** - * Did we find a matching request? - */ - int request_found; -}; - - -/** - * We have received a reply; handle it! - * - * @param cls response (struct ProcessReplyClosure) - * @param key our query - * @param value value in the hash map (info about the query) - * @return GNUNET_YES (we should continue to iterate) - */ -static int -process_reply (void *cls, - const GNUNET_HashCode * key, - void *value) -{ - struct ProcessReplyClosure *prq = cls; - struct PendingRequest *pr = value; - struct PendingMessage *reply; - struct ClientResponseMessage *creply; - struct ClientList *cl; - struct PutMessage *pm; - struct ConnectedPeer *cp; - struct GNUNET_TIME_Relative cur_delay; - size_t msize; - -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Matched result (type %u) for query `%s' with pending request\n", - (unsigned int) prq->type, - GNUNET_h2s (key)); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# replies received and matched"), - 1, - GNUNET_NO); - if (prq->sender != NULL) - { - /* FIXME: should we be more precise here and not use - "start_time" but a peer-specific time stamp? */ - cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time); - prq->sender->avg_delay.value - = (prq->sender->avg_delay.value * - (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; - prq->sender->avg_priority - = (prq->sender->avg_priority * - (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N; - if (pr->cp != NULL) - { - GNUNET_PEER_change_rc (prq->sender->last_p2p_replies - [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], - -1); - GNUNET_PEER_change_rc (pr->cp->pid, 1); - prq->sender->last_p2p_replies - [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE] - = pr->cp->pid; - } - else - { - if (NULL != prq->sender->last_client_replies - [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]) - GNUNET_SERVER_client_drop (prq->sender->last_client_replies - [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]); - prq->sender->last_client_replies - [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE] - = pr->client_request_list->client_list->client; - GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client); - } - } - prq->eval = GNUNET_BLOCK_evaluate (block_ctx, - prq->type, - key, - &pr->bf, - pr->mingle, - pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0, - prq->data, - prq->size); - switch (prq->eval) - { - case GNUNET_BLOCK_EVALUATION_OK_MORE: - break; - case GNUNET_BLOCK_EVALUATION_OK_LAST: - while (NULL != pr->pending_head) - destroy_pending_message_list_entry (pr->pending_head); - if (pr->qe != NULL) - { - if (pr->client_request_list != NULL) - GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, - GNUNET_YES); - GNUNET_DATASTORE_cancel (pr->qe); - pr->qe = NULL; - } - pr->do_remove = GNUNET_YES; - if (pr->task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (sched, - pr->task); - pr->task = GNUNET_SCHEDULER_NO_TASK; - } - GNUNET_break (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (query_request_map, - key, - pr)); - break; - case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: - GNUNET_STATISTICS_update (stats, - gettext_noop ("# duplicate replies discarded (bloomfilter)"), - 1, - GNUNET_NO); -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Duplicate response `%s', discarding.\n", - GNUNET_h2s (&mhash)); -#endif - return GNUNET_YES; /* duplicate */ - case GNUNET_BLOCK_EVALUATION_RESULT_INVALID: - return GNUNET_YES; /* wrong namespace */ - case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: - GNUNET_break (0); - return GNUNET_YES; - case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: - GNUNET_break (0); - return GNUNET_YES; - case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Unsupported block type %u\n"), - prq->type); - return GNUNET_NO; - } - if (pr->client_request_list != NULL) - { - if (pr->replies_seen_size == pr->replies_seen_off) - GNUNET_array_grow (pr->replies_seen, - pr->replies_seen_size, - pr->replies_seen_size * 2 + 4); - GNUNET_CRYPTO_hash (prq->data, - prq->size, - &pr->replies_seen[pr->replies_seen_off++]); - refresh_bloomfilter (pr); - } - if (NULL == prq->sender) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found result for query `%s' in local datastore\n", - GNUNET_h2s (key)); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# results found locally"), - 1, - GNUNET_NO); - } - prq->priority += pr->remaining_priority; - pr->remaining_priority = 0; - pr->results_found++; - prq->request_found = GNUNET_YES; - if (NULL != pr->client_request_list) - { - GNUNET_STATISTICS_update (stats, - gettext_noop ("# replies received for local clients"), - 1, - GNUNET_NO); - cl = pr->client_request_list->client_list; - msize = sizeof (struct PutMessage) + prq->size; - creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage)); - creply->msize = msize; - creply->client_list = cl; - GNUNET_CONTAINER_DLL_insert_after (cl->res_head, - cl->res_tail, - cl->res_tail, - creply); - pm = (struct PutMessage*) &creply[1]; - pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); - pm->header.size = htons (msize); - pm->type = htonl (prq->type); - pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); - memcpy (&pm[1], prq->data, prq->size); - if (NULL == cl->th) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting result for query `%s' to client\n", - GNUNET_h2s (key)); -#endif - cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_to_client, - cl); - } - GNUNET_break (cl->th != NULL); - if (pr->do_remove) - { - prq->finished = GNUNET_YES; - destroy_pending_request (pr); - } - } - else - { - cp = pr->cp; -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting result for query `%s' to other peer (PID=%u)\n", - GNUNET_h2s (key), - (unsigned int) cp->pid); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# replies received for other peers"), - 1, - GNUNET_NO); - msize = sizeof (struct PutMessage) + prq->size; - reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); - reply->cont = &transmit_reply_continuation; - reply->cont_cls = pr; - reply->msize = msize; - reply->priority = UINT32_MAX; /* send replies first! */ - pm = (struct PutMessage*) &reply[1]; - pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); - pm->header.size = htons (msize); - pm->type = htonl (prq->type); - pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); - memcpy (&pm[1], prq->data, prq->size); - add_to_pending_messages_for_peer (cp, reply, pr); - } - return GNUNET_YES; -} - - -/** - * Iterator called on each result obtained for a DHT - * operation that expects a reply - * - * @param cls closure - * @param exp when will this value expire - * @param key key of the result - * @param get_path NULL-terminated array of pointers - * to the peers on reverse GET path (or NULL if not recorded) - * @param put_path NULL-terminated array of pointers - * to the peers on the PUT path (or NULL if not recorded) - * @param type type of the result - * @param size number of bytes in data - * @param data pointer to the result data - */ -static void -process_dht_reply (void *cls, - struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode * key, - const struct GNUNET_PeerIdentity * const *get_path, - const struct GNUNET_PeerIdentity * const *put_path, - enum GNUNET_BLOCK_Type type, - size_t size, - const void *data) -{ - struct PendingRequest *pr = cls; - struct ProcessReplyClosure prq; - - memset (&prq, 0, sizeof (prq)); - prq.data = data; - prq.expiration = exp; - prq.size = size; - prq.type = type; - process_reply (&prq, key, pr); -} - - - -/** - * Continuation called to notify client about result of the - * operation. - * - * @param cls closure - * @param success GNUNET_SYSERR on failure - * @param msg NULL on success, otherwise an error message - */ -static void -put_migration_continuation (void *cls, - int success, - const char *msg) -{ - struct GNUNET_TIME_Absolute *start = cls; - struct GNUNET_TIME_Relative delay; - - delay = GNUNET_TIME_absolute_get_duration (*start); - GNUNET_free (start); - GNUNET_LOAD_update (datastore_put_load, - delay.value); - if (GNUNET_OK == success) - return; - GNUNET_STATISTICS_update (stats, - gettext_noop ("# datastore 'put' failures"), - 1, - GNUNET_NO); -} - - -/** - * Handle P2P "PUT" message. - * - * @param cls closure, always NULL - * @param other the other peer involved (sender or receiver, NULL - * for loopback messages where we are both sender and receiver) - * @param message the actual message - * @param latency reported latency of the connection with 'other' - * @param distance reported distance (DV) to 'other' - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) - */ -static int -handle_p2p_put (void *cls, - const struct GNUNET_PeerIdentity *other, - const struct GNUNET_MessageHeader *message, - struct GNUNET_TIME_Relative latency, - uint32_t distance) -{ - const struct PutMessage *put; - uint16_t msize; - size_t dsize; - enum GNUNET_BLOCK_Type type; - struct GNUNET_TIME_Absolute expiration; - GNUNET_HashCode query; - struct ProcessReplyClosure prq; - struct GNUNET_TIME_Absolute *start; - struct GNUNET_TIME_Relative block_time; - double putl; - struct ConnectedPeer *cp; - struct PendingMessage *pm; - struct MigrationStopMessage *msm; - - msize = ntohs (message->size); - if (msize < sizeof (struct PutMessage)) - { - GNUNET_break_op(0); - return GNUNET_SYSERR; - } - put = (const struct PutMessage*) message; - dsize = msize - sizeof (struct PutMessage); - type = ntohl (put->type); - expiration = GNUNET_TIME_absolute_ntoh (put->expiration); - - if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) - return GNUNET_SYSERR; - if (GNUNET_OK != - GNUNET_BLOCK_get_key (block_ctx, - type, - &put[1], - dsize, - &query)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received result for query `%s' from peer `%4s'\n", - GNUNET_h2s (&query), - GNUNET_i2s (other)); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# replies received (overall)"), - 1, - GNUNET_NO); - /* now, lookup 'query' */ - prq.data = (const void*) &put[1]; - if (other != NULL) - prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers, - &other->hashPubKey); - else - prq.sender = NULL; - prq.size = dsize; - prq.type = type; - prq.expiration = expiration; - prq.priority = 0; - prq.finished = GNUNET_NO; - prq.request_found = GNUNET_NO; - GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, - &query, - &process_reply, - &prq); - if (prq.sender != NULL) - { - prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority; - prq.sender->trust += prq.priority; - } - if (GNUNET_YES == active_migration) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Replicating result for query `%s' with priority %u\n", - GNUNET_h2s (&query), - prq.priority); -#endif - start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); - *start = GNUNET_TIME_absolute_get (); - GNUNET_DATASTORE_put (dsh, - 0, &query, dsize, &put[1], - type, prq.priority, 1 /* anonymity */, - expiration, - 1 + prq.priority, MAX_DATASTORE_QUEUE, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - &put_migration_continuation, - start); - } - putl = GNUNET_LOAD_get_load (datastore_put_load); - if ( (GNUNET_NO == prq.request_found) && - ( (GNUNET_YES != active_migration) || - (putl > 2.0) ) ) - { - cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, - &other->hashPubKey); - if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000) - return GNUNET_OK; /* already blocked */ - /* We're too busy; send MigrationStop message! */ - if (GNUNET_YES != active_migration) - putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5); - block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, - 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - (unsigned int) (60000 * putl * putl))); - - cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time); - pm = GNUNET_malloc (sizeof (struct PendingMessage) + - sizeof (struct MigrationStopMessage)); - pm->msize = sizeof (struct MigrationStopMessage); - pm->priority = UINT32_MAX; - msm = (struct MigrationStopMessage*) &pm[1]; - msm->header.size = htons (sizeof (struct MigrationStopMessage)); - msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); - msm->duration = GNUNET_TIME_relative_hton (block_time); - add_to_pending_messages_for_peer (cp, - pm, - NULL); - } - return GNUNET_OK; -} - - -/** - * Handle P2P "MIGRATION_STOP" message. - * - * @param cls closure, always NULL - * @param other the other peer involved (sender or receiver, NULL - * for loopback messages where we are both sender and receiver) - * @param message the actual message - * @param latency reported latency of the connection with 'other' - * @param distance reported distance (DV) to 'other' - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) - */ -static int -handle_p2p_migration_stop (void *cls, - const struct GNUNET_PeerIdentity *other, - const struct GNUNET_MessageHeader *message, - struct GNUNET_TIME_Relative latency, - uint32_t distance) -{ - struct ConnectedPeer *cp; - const struct MigrationStopMessage *msm; - - msm = (const struct MigrationStopMessage*) message; - cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, - &other->hashPubKey); - if (cp == NULL) - { - GNUNET_break (0); - return GNUNET_OK; - } - cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration)); - return GNUNET_OK; -} - - - -/* **************************** P2P GET Handling ************************ */ - - -/** - * Closure for 'check_duplicate_request_{peer,client}'. - */ -struct CheckDuplicateRequestClosure -{ - /** - * The new request we should check if it already exists. - */ - const struct PendingRequest *pr; - - /** - * Existing request found by the checker, NULL if none. - */ - struct PendingRequest *have; -}; - - -/** - * Iterator over entries in the 'query_request_map' that - * tries to see if we have the same request pending from - * the same client already. - * - * @param cls closure (our 'struct CheckDuplicateRequestClosure') - * @param key current key code (query, ignored, must match) - * @param value value in the hash map (a 'struct PendingRequest' - * that already exists) - * @return GNUNET_YES if we should continue to - * iterate (no match yet) - * GNUNET_NO if not (match found). - */ -static int -check_duplicate_request_client (void *cls, - const GNUNET_HashCode * key, - void *value) -{ - struct CheckDuplicateRequestClosure *cdc = cls; - struct PendingRequest *have = value; - - if (have->client_request_list == NULL) - return GNUNET_YES; - if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) && - (cdc->pr != have) ) - { - cdc->have = have; - return GNUNET_NO; - } - return GNUNET_YES; -} - - -/** - * We're processing (local) results for a search request - * from another peer. Pass applicable results to the - * peer and if we are done either clean up (operation - * complete) or forward to other peers (more results possible). - * - * @param cls our closure (struct LocalGetContext) - * @param key key for the content - * @param size number of bytes in data - * @param data content stored - * @param type type of the content - * @param priority priority of the content - * @param anonymity anonymity-level for the content - * @param expiration expiration time for the content - * @param uid unique identifier for the datum; - * maybe 0 if no unique identifier is available - */ -static void -process_local_reply (void *cls, - const GNUNET_HashCode * key, - size_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, - uint64_t uid) -{ - struct PendingRequest *pr = cls; - struct ProcessReplyClosure prq; - struct CheckDuplicateRequestClosure cdrc; - GNUNET_HashCode query; - unsigned int old_rf; - - if (NULL == key) - { -#if DEBUG_FS > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Done processing local replies, forwarding request to other peers.\n"); -#endif - pr->qe = NULL; - if (pr->client_request_list != NULL) - { - GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, - GNUNET_YES); - /* Figure out if this is a duplicate request and possibly - merge 'struct PendingRequest' entries */ - cdrc.have = NULL; - cdrc.pr = pr; - GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, - &pr->query, - &check_duplicate_request_client, - &cdrc); - if (cdrc.have != NULL) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received request for block `%s' twice from client, will only request once.\n", - GNUNET_h2s (&pr->query)); -#endif - - destroy_pending_request (pr); - return; - } - } - - /* no more results */ - if (pr->task == GNUNET_SCHEDULER_NO_TASK) - pr->task = GNUNET_SCHEDULER_add_now (sched, - &forward_request_task, - pr); - return; - } -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "New local response to `%s' of type %u.\n", - GNUNET_h2s (key), - type); -#endif - if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found ONDEMAND block, performing on-demand encoding\n"); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# on-demand blocks matched requests"), - 1, - GNUNET_NO); - if (GNUNET_OK != - GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, - anonymity, expiration, uid, - &process_local_reply, - pr)) - if (pr->qe != NULL) - { - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); - } - return; - } - old_rf = pr->results_found; - memset (&prq, 0, sizeof (prq)); - prq.data = data; - prq.expiration = expiration; - prq.size = size; - if (GNUNET_OK != - GNUNET_BLOCK_get_key (block_ctx, - type, - data, - size, - &query)) - { - GNUNET_break (0); - GNUNET_DATASTORE_remove (dsh, - key, - size, data, - -1, -1, - GNUNET_TIME_UNIT_FOREVER_REL, - NULL, NULL); - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); - return; - } - prq.type = type; - prq.priority = priority; - prq.finished = GNUNET_NO; - prq.request_found = GNUNET_NO; - process_reply (&prq, key, pr); - if ( (old_rf == 0) && - (pr->results_found == 1) ) - update_datastore_delays (pr->start_time); - if (prq.finished == GNUNET_YES) - return; - if (pr->qe == NULL) - return; /* done here */ - if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) - { - GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); - return; - } - if ( (pr->client_request_list == NULL) && - ( (GNUNET_YES == test_load_too_high()) || - (pr->results_found > 5 + 2 * pr->priority) ) ) - { -#if DEBUG_FS > 2 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Load too high, done with request\n"); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# processing result set cut short due to load"), - 1, - GNUNET_NO); - GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); - return; - } - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); -} - - -/** - * We've received a request with the specified priority. Bound it - * according to how much we trust the given peer. - * - * @param prio_in requested priority - * @param cp the peer making the request - * @return effective priority - */ -static uint32_t -bound_priority (uint32_t prio_in, - struct ConnectedPeer *cp) -{ -#define N ((double)128.0) - uint32_t ret; - double rret; - int ld; - - ld = test_load_too_high (); - if (ld == GNUNET_SYSERR) - return 0; /* excess resources */ - ret = change_host_trust (cp, prio_in); - if (ret > 0) - { - if (ret > current_priorities + N) - rret = current_priorities + N; - else - rret = ret; - current_priorities - = (current_priorities * (N-1) + rret)/N; - } -#undef N - return ret; -} - - -/** - * Iterator over entries in the 'query_request_map' that - * tries to see if we have the same request pending from - * the same peer already. - * - * @param cls closure (our 'struct CheckDuplicateRequestClosure') - * @param key current key code (query, ignored, must match) - * @param value value in the hash map (a 'struct PendingRequest' - * that already exists) - * @return GNUNET_YES if we should continue to - * iterate (no match yet) - * GNUNET_NO if not (match found). - */ -static int -check_duplicate_request_peer (void *cls, - const GNUNET_HashCode * key, - void *value) -{ - struct CheckDuplicateRequestClosure *cdc = cls; - struct PendingRequest *have = value; - - if (cdc->pr->target_pid == have->target_pid) - { - cdc->have = have; - return GNUNET_NO; - } - return GNUNET_YES; -} - - -/** - * Handle P2P "GET" request. - * - * @param cls closure, always NULL - * @param other the other peer involved (sender or receiver, NULL - * for loopback messages where we are both sender and receiver) - * @param message the actual message - * @param latency reported latency of the connection with 'other' - * @param distance reported distance (DV) to 'other' - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) - */ -static int -handle_p2p_get (void *cls, - const struct GNUNET_PeerIdentity *other, - const struct GNUNET_MessageHeader *message, - struct GNUNET_TIME_Relative latency, - uint32_t distance) -{ - struct PendingRequest *pr; - struct ConnectedPeer *cp; - struct ConnectedPeer *cps; - struct CheckDuplicateRequestClosure cdc; - struct GNUNET_TIME_Relative timeout; - uint16_t msize; - const struct GetMessage *gm; - unsigned int bits; - const GNUNET_HashCode *opt; - uint32_t bm; - size_t bfsize; - uint32_t ttl_decrement; - enum GNUNET_BLOCK_Type type; - int have_ns; - int ld; - - msize = ntohs(message->size); - if (msize < sizeof (struct GetMessage)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - gm = (const struct GetMessage*) message; - type = ntohl (gm->type); - bm = ntohl (gm->hash_bitmap); - bits = 0; - while (bm > 0) - { - if (1 == (bm & 1)) - bits++; - bm >>= 1; - } - if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - opt = (const GNUNET_HashCode*) &gm[1]; - bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode); - bm = ntohl (gm->hash_bitmap); - bits = 0; - cps = GNUNET_CONTAINER_multihashmap_get (connected_peers, - &other->hashPubKey); - if (NULL == cps) - { - /* peer must have just disconnected */ - GNUNET_STATISTICS_update (stats, - gettext_noop ("# requests dropped due to initiator not being connected"), - 1, - GNUNET_NO); - return GNUNET_SYSERR; - } - if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) - cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, - &opt[bits++]); - else - cp = cps; - if (cp == NULL) - { -#if DEBUG_FS - if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n", - GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1])); - - else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to find peer `%4s' in connection set. Dropping query.\n", - GNUNET_i2s (other)); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# requests dropped due to missing reverse route"), - 1, - GNUNET_NO); - /* FIXME: try connect? */ - return GNUNET_OK; - } - /* note that we can really only check load here since otherwise - peers could find out that we are overloaded by not being - disconnected after sending us a malformed query... */ - - /* FIXME: query priority should play - a major role here! */ - ld = test_load_too_high (); - if (GNUNET_YES == ld) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dropping query from `%s', this peer is too busy.\n", - GNUNET_i2s (other)); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# requests dropped due to high load"), - 1, - GNUNET_NO); - return GNUNET_OK; - } - /* FIXME: if ld == GNUNET_NO, forward - instead of indirecting! */ - -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received request for `%s' of type %u from peer `%4s' with flags %u\n", - GNUNET_h2s (&gm->query), - (unsigned int) type, - GNUNET_i2s (other), - (unsigned int) bm); -#endif - have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)); - pr = GNUNET_malloc (sizeof (struct PendingRequest) + - (have_ns ? sizeof(GNUNET_HashCode) : 0)); - if (have_ns) - { - pr->namespace = (GNUNET_HashCode*) &pr[1]; - memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode)); - } - pr->type = type; - pr->mingle = ntohl (gm->filter_mutator); - if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) - pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]); - pr->anonymity_level = 1; - pr->priority = bound_priority (ntohl (gm->priority), cps); - pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority); - pr->query = gm->query; - /* decrement ttl (always) */ - ttl_decrement = 2 * TTL_DECREMENT + - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - TTL_DECREMENT); - if ( (pr->ttl < 0) && - (((int32_t)(pr->ttl - ttl_decrement)) > 0) ) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dropping query from `%s' due to TTL underflow (%d - %u).\n", - GNUNET_i2s (other), - pr->ttl, - ttl_decrement); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# requests dropped due TTL underflow"), - 1, - GNUNET_NO); - /* integer underflow => drop (should be very rare)! */ - GNUNET_free (pr); - return GNUNET_OK; - } - pr->ttl -= ttl_decrement; - pr->start_time = GNUNET_TIME_absolute_get (); - - /* get bloom filter */ - if (bfsize > 0) - { - pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits], - bfsize, - BLOOMFILTER_K); - pr->bf_size = bfsize; - } - - cdc.have = NULL; - cdc.pr = pr; - GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, - &gm->query, - &check_duplicate_request_peer, - &cdc); - if (cdc.have != NULL) - { - if (cdc.have->start_time.value + cdc.have->ttl >= - pr->start_time.value + pr->ttl) - { - /* existing request has higher TTL, drop new one! */ - cdc.have->priority += pr->priority; - destroy_pending_request (pr); -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Have existing request with higher TTL, dropping new request.\n", - GNUNET_i2s (other)); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# requests dropped due to higher-TTL request"), - 1, - GNUNET_NO); - return GNUNET_OK; - } - else - { - /* existing request has lower TTL, drop old one! */ - pr->priority += cdc.have->priority; - /* Possible optimization: if we have applicable pending - replies in 'cdc.have', we might want to move those over - (this is a really rare special-case, so it is not clear - that this would be worth it) */ - destroy_pending_request (cdc.have); - /* keep processing 'pr'! */ - } - } - - pr->cp = cp; - GNUNET_break (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (query_request_map, - &gm->query, - pr, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); - GNUNET_break (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (peer_request_map, - &other->hashPubKey, - pr, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); - - pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, - pr, - pr->start_time.value + pr->ttl); - - GNUNET_STATISTICS_update (stats, - gettext_noop ("# P2P searches received"), - 1, - GNUNET_NO); - GNUNET_STATISTICS_update (stats, - gettext_noop ("# P2P searches active"), - 1, - GNUNET_NO); - - /* calculate change in traffic preference */ - cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE; - /* process locally */ - if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK) - type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */ - timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY, - (pr->priority + 1)); - pr->qe = GNUNET_DATASTORE_get (dsh, - &gm->query, - type, - pr->priority + 1, - MAX_DATASTORE_QUEUE, - timeout, - &process_local_reply, - pr); - - /* Are multiple results possible? If so, start processing remotely now! */ - switch (pr->type) - { - case GNUNET_BLOCK_TYPE_FS_DBLOCK: - case GNUNET_BLOCK_TYPE_FS_IBLOCK: - /* only one result, wait for datastore */ - break; - default: - if (pr->task == GNUNET_SCHEDULER_NO_TASK) - pr->task = GNUNET_SCHEDULER_add_now (sched, - &forward_request_task, - pr); - } - - /* make sure we don't track too many requests */ - if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) - { - pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap); - GNUNET_assert (pr != NULL); - destroy_pending_request (pr); - } - return GNUNET_OK; -} - - -/* **************************** CS GET Handling ************************ */ - - -/** - * Handle START_SEARCH-message (search request from client). - * - * @param cls closure - * @param client identification of the client - * @param message the actual message - */ -static void -handle_start_search (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - static GNUNET_HashCode all_zeros; - const struct SearchMessage *sm; - struct ClientList *cl; - struct ClientRequestList *crl; - struct PendingRequest *pr; - uint16_t msize; - unsigned int sc; - enum GNUNET_BLOCK_Type type; - - msize = ntohs (message->size); - if ( (msize < sizeof (struct SearchMessage)) || - (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) ) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, - GNUNET_SYSERR); - return; - } - GNUNET_STATISTICS_update (stats, - gettext_noop ("# client searches received"), - 1, - GNUNET_NO); - sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode); - sm = (const struct SearchMessage*) message; - type = ntohl (sm->type); -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received request for `%s' of type %u from local client\n", - GNUNET_h2s (&sm->query), - (unsigned int) type); -#endif - cl = client_list; - while ( (cl != NULL) && - (cl->client != client) ) - cl = cl->next; - if (cl == NULL) - { - cl = GNUNET_malloc (sizeof (struct ClientList)); - cl->client = client; - GNUNET_SERVER_client_keep (client); - cl->next = client_list; - client_list = cl; - } - /* detect duplicate KBLOCK requests */ - if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) || - (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) || - (type == GNUNET_BLOCK_TYPE_ANY) ) - { - crl = cl->rl_head; - while ( (crl != NULL) && - ( (0 != memcmp (&crl->req->query, - &sm->query, - sizeof (GNUNET_HashCode))) || - (crl->req->type != type) ) ) - crl = crl->next; - if (crl != NULL) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Have existing request, merging content-seen lists.\n"); -#endif - pr = crl->req; - /* Duplicate request (used to send long list of - known/blocked results); merge 'pr->replies_seen' - and update bloom filter */ - GNUNET_array_grow (pr->replies_seen, - pr->replies_seen_size, - pr->replies_seen_off + sc); - memcpy (&pr->replies_seen[pr->replies_seen_off], - &sm[1], - sc * sizeof (GNUNET_HashCode)); - pr->replies_seen_off += sc; - refresh_bloomfilter (pr); - GNUNET_STATISTICS_update (stats, - gettext_noop ("# client searches updated (merged content seen list)"), - 1, - GNUNET_NO); - GNUNET_SERVER_receive_done (client, - GNUNET_OK); - return; - } - } - GNUNET_STATISTICS_update (stats, - gettext_noop ("# client searches active"), - 1, - GNUNET_NO); - pr = GNUNET_malloc (sizeof (struct PendingRequest) + - ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0)); - crl = GNUNET_malloc (sizeof (struct ClientRequestList)); - memset (crl, 0, sizeof (struct ClientRequestList)); - crl->client_list = cl; - GNUNET_CONTAINER_DLL_insert (cl->rl_head, - cl->rl_tail, - crl); - crl->req = pr; - pr->type = type; - pr->client_request_list = crl; - GNUNET_array_grow (pr->replies_seen, - pr->replies_seen_size, - sc); - memcpy (pr->replies_seen, - &sm[1], - sc * sizeof (GNUNET_HashCode)); - pr->replies_seen_off = sc; - pr->anonymity_level = ntohl (sm->anonymity_level); - pr->start_time = GNUNET_TIME_absolute_get (); - refresh_bloomfilter (pr); - pr->query = sm->query; - if (0 == (1 & ntohl (sm->options))) - pr->local_only = GNUNET_NO; - else - pr->local_only = GNUNET_YES; - switch (type) - { - case GNUNET_BLOCK_TYPE_FS_DBLOCK: - case GNUNET_BLOCK_TYPE_FS_IBLOCK: - if (0 != memcmp (&sm->target, - &all_zeros, - sizeof (GNUNET_HashCode))) - pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target); - break; - case GNUNET_BLOCK_TYPE_FS_SBLOCK: - pr->namespace = (GNUNET_HashCode*) &pr[1]; - memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode)); - break; - default: - break; - } - GNUNET_break (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (query_request_map, - &sm->query, - pr, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); - if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK) - type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */ - pr->qe = GNUNET_DATASTORE_get (dsh, - &sm->query, - type, - -3, -1, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - &process_local_reply, - pr); -} - - -/* **************************** Startup ************************ */ - /** * Process fs requests. * - * @param s scheduler to use * @param server the initialized server * @param c configuration to use */ static int -main_init (struct GNUNET_SCHEDULER_Handle *s, - struct GNUNET_SERVER_Handle *server, - const struct GNUNET_CONFIGURATION_Handle *c) -{ - static const struct GNUNET_CORE_MessageHandler p2p_handlers[] = - { - { &handle_p2p_get, - GNUNET_MESSAGE_TYPE_FS_GET, 0 }, - { &handle_p2p_put, - GNUNET_MESSAGE_TYPE_FS_PUT, 0 }, - { &handle_p2p_migration_stop, - GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP, - sizeof (struct MigrationStopMessage) }, - { NULL, 0, 0 } - }; +main_init (struct GNUNET_SERVER_Handle *server, + const struct GNUNET_CONFIGURATION_Handle *c) +{ + static const struct GNUNET_CORE_MessageHandler p2p_handlers[] = { + {&handle_p2p_get, + GNUNET_MESSAGE_TYPE_FS_GET, 0}, + {&handle_p2p_put, + GNUNET_MESSAGE_TYPE_FS_PUT, 0}, + {&GSF_handle_p2p_migration_stop_, + GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP, + sizeof (struct MigrationStopMessage)}, + {NULL, 0, 0} + }; static const struct GNUNET_SERVER_MessageHandler handlers[] = { - {&GNUNET_FS_handle_index_start, NULL, + {&GNUNET_FS_handle_index_start, NULL, GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0}, - {&GNUNET_FS_handle_index_list_get, NULL, - GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) }, - {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, - sizeof (struct UnindexMessage) }, - {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, - 0 }, + {&GNUNET_FS_handle_index_list_get, NULL, + GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, + sizeof (struct GNUNET_MessageHeader)}, + {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, + sizeof (struct UnindexMessage)}, + {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, + 0}, {NULL, NULL, 0, 0} }; - unsigned long long enc = 128; - - sched = s; - cfg = c; - stats = GNUNET_STATISTICS_create (sched, "fs", cfg); - min_migration_delay = GNUNET_TIME_UNIT_SECONDS; - if ( (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_number (cfg, - "fs", - "MAX_PENDING_REQUESTS", - &max_pending_requests)) || - (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_number (cfg, - "fs", - "EXPECTED_NEIGHBOUR_COUNT", - &enc)) || - (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_time (cfg, - "fs", - "MIN_MIGRATION_DELAY", - &min_migration_delay)) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Configuration fails to specify certain parameters, assuming default values.")); - } - connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); - query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests); - peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc); - requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); - core = GNUNET_CORE_connect (sched, - cfg, - GNUNET_TIME_UNIT_FOREVER_REL, - NULL, - NULL, - &peer_connect_handler, - &peer_disconnect_handler, - NULL, - NULL, GNUNET_NO, - NULL, GNUNET_NO, - p2p_handlers); - if (NULL == core) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Failed to connect to `%s' service.\n"), - "core"); - GNUNET_CONTAINER_multihashmap_destroy (connected_peers); - connected_peers = NULL; - GNUNET_CONTAINER_multihashmap_destroy (query_request_map); - query_request_map = NULL; - GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap); - requests_by_expiration_heap = NULL; - GNUNET_CONTAINER_multihashmap_destroy (peer_request_map); - peer_request_map = NULL; - if (dsh != NULL) - { - GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO); - dsh = NULL; - } - return GNUNET_SYSERR; - } - /* FIXME: distinguish between sending and storing in options? */ - if (active_migration) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Content migration is enabled, will start to gather data\n")); - consider_migration_gathering (); - } - GNUNET_SERVER_disconnect_notify (server, - &handle_client_disconnect, - NULL); - GNUNET_assert (GNUNET_OK == - GNUNET_CONFIGURATION_get_value_filename (cfg, - "fs", - "TRUST", - &trustDirectory)); - GNUNET_DISK_directory_create (trustDirectory); - GNUNET_SCHEDULER_add_with_priority (sched, - GNUNET_SCHEDULER_PRIORITY_HIGH, - &cron_flush_trust, NULL); - + GSF_core = + GNUNET_CORE_connect (GSF_cfg, 1, NULL, &peer_init_handler, + &peer_connect_handler, &GSF_peer_disconnect_handler_, + NULL, GNUNET_NO, NULL, GNUNET_NO, p2p_handlers); + if (NULL == GSF_core) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to connect to `%s' service.\n"), "core"); + return GNUNET_SYSERR; + } + GNUNET_SERVER_disconnect_notify (server, &GSF_client_disconnect_handler_, + NULL); GNUNET_SERVER_add_handlers (server, handlers); - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_TIME_UNIT_FOREVER_REL, - &shutdown_task, - NULL); + cover_age_task = + GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters, + NULL); + datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, + NULL); return GNUNET_OK; } @@ -3944,56 +598,41 @@ main_init (struct GNUNET_SCHEDULER_Handle *s, * Process fs requests. * * @param cls closure - * @param sched scheduler to use * @param server the initialized server * @param cfg configuration to use */ static void -run (void *cls, - struct GNUNET_SCHEDULER_Handle *sched, - struct GNUNET_SERVER_Handle *server, +run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *cfg) { - active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg, - "FS", - "ACTIVEMIGRATION"); - dsh = GNUNET_DATASTORE_connect (cfg, - sched); - if (dsh == NULL) - { - GNUNET_SCHEDULER_shutdown (sched); - return; - } - datastore_get_load = GNUNET_LOAD_value_init (); - datastore_put_load = GNUNET_LOAD_value_init (); + GSF_cfg = cfg; + GSF_enable_randomized_delays = + GNUNET_CONFIGURATION_get_value_yesno (cfg, "fs", "DELAY"); + GSF_dsh = GNUNET_DATASTORE_connect (cfg); + if (NULL == GSF_dsh) + { + GNUNET_SCHEDULER_shutdown (); + return; + } + GSF_rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL); + GSF_stats = GNUNET_STATISTICS_create ("fs", cfg); block_cfg = GNUNET_CONFIGURATION_create (); - GNUNET_CONFIGURATION_set_value_string (block_cfg, - "block", - "PLUGINS", - "fs"); - block_ctx = GNUNET_BLOCK_context_create (block_cfg); - GNUNET_assert (NULL != block_ctx); - dht_handle = GNUNET_DHT_connect (sched, - cfg, - FS_DHT_HT_SIZE); - if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) || - (GNUNET_OK != main_init (sched, server, cfg)) ) - { - GNUNET_SCHEDULER_shutdown (sched); - GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO); - dsh = NULL; - GNUNET_DHT_disconnect (dht_handle); - dht_handle = NULL; - GNUNET_BLOCK_context_destroy (block_ctx); - block_ctx = NULL; - GNUNET_CONFIGURATION_destroy (block_cfg); - block_cfg = NULL; - GNUNET_LOAD_value_free (datastore_get_load); - datastore_get_load = NULL; - GNUNET_LOAD_value_free (datastore_put_load); - datastore_put_load = NULL; - return; - } + GNUNET_CONFIGURATION_set_value_string (block_cfg, "block", "PLUGINS", "fs"); + GSF_block_ctx = GNUNET_BLOCK_context_create (block_cfg); + GNUNET_assert (NULL != GSF_block_ctx); + GSF_dht = GNUNET_DHT_connect (cfg, FS_DHT_HT_SIZE); + GSF_plan_init (); + GSF_pending_request_init_ (); + GSF_connected_peer_init_ (); + GSF_push_init_ (); + GSF_put_init_ (); + if ((GNUNET_OK != GNUNET_FS_indexing_init (cfg, GSF_dsh)) || + (GNUNET_OK != main_init (server, cfg))) + { + GNUNET_SCHEDULER_shutdown (); + shutdown_task (NULL, NULL); + return; + } } @@ -4008,11 +647,8 @@ int main (int argc, char *const *argv) { return (GNUNET_OK == - GNUNET_SERVICE_run (argc, - argv, - "fs", - GNUNET_SERVICE_OPTION_NONE, - &run, NULL)) ? 0 : 1; + GNUNET_SERVICE_run (argc, argv, "fs", GNUNET_SERVICE_OPTION_NONE, + &run, NULL)) ? 0 : 1; } /* end of gnunet-service-fs.c */