GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
+ by the Free Software Foundation; either version 3, or (at your
option) any later version.
GNUnet is distributed in the hope that it will be useful, but
* @author Christian Grothoff
*
* TODO:
- * - forward_request_task (P2P forwarding!)
- * - track stats for hot-path routing
- * - implement hot-path routing decision procedure
- * - detect duplicate requests (P2P and CS)
- * - implement: bound_priority, test_load_too_high, validate_skblock
- * - add content migration support (store locally)
- * - statistics
- *
+ * - track per-peer request latency (using new load API)
+ * - consider more precise latency estimation (per-peer & request) -- again load API?
+ * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
+ * - introduce random latency in processing
+ * - more statistics
*/
#include "platform.h"
#include <float.h>
#include "gnunet_constants.h"
#include "gnunet_core_service.h"
+#include "gnunet_dht_service.h"
#include "gnunet_datastore_service.h"
+#include "gnunet_load_lib.h"
#include "gnunet_peer_lib.h"
#include "gnunet_protocols.h"
#include "gnunet_signatures.h"
+#include "gnunet_statistics_service.h"
#include "gnunet_util_lib.h"
-#include "gnunet-service-fs_drq.h"
#include "gnunet-service-fs_indexing.h"
#include "fs.h"
-#define DEBUG_FS GNUNET_YES
+#define DEBUG_FS GNUNET_NO
/**
* Maximum number of outgoing messages we queue per peer.
- * FIXME: set to a tiny value for testing; make configurable.
*/
-#define MAX_QUEUE_PER_PEER 2
+#define MAX_QUEUE_PER_PEER 16
+
+/**
+ * Size for the hash map for DHT requests from the FS
+ * service. Should be about the number of concurrent
+ * DHT requests we plan to make.
+ */
+#define FS_DHT_HT_SIZE 1024
+
+/**
+ * How often do we flush trust values to disk?
+ */
+#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
+
+/**
+ * Inverse of the probability that we will submit the same query
+ * to the same peer again. If the same peer already got the query
+ * repeatedly recently, the probability is multiplied by the inverse
+ * of this number each time. Note that we only try about every TTL_DECREMENT/2
+ * plus MAX_CORK_DELAY (so roughly every 3.5s).
+ */
+#define RETRY_PROBABILITY_INV 3
+/**
+ * What is the maximum delay for a P2P FS message (in our interaction
+ * with core)? FS-internal delays are another story. The value is
+ * chosen based on the 32k block size. Given that peers typcially
+ * have at least 1 kb/s bandwidth, 45s waits give us a chance to
+ * transmit one message even to the lowest-bandwidth peers.
+ */
+#define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
/**
* Maximum number of requests (from other peers) that we're
* willing to have pending at any given point in time.
- * FIXME: set from configuration (and 32 is a tiny value for testing only).
*/
-static uint64_t max_pending_requests = 32;
+static unsigned long long max_pending_requests = (32 * 1024);
/**
*/
struct PendingMessage;
-
/**
* Function called upon completion of a transmission.
*
/**
- * Information we keep for each pending reply. The
+ * Information we keep for each pending message (GET/PUT). The
* actual message follows at the end of this struct.
*/
struct PendingMessage
*/
struct GNUNET_TIME_Relative avg_delay;
+ /**
+ * Point in time until which this peer does not want us to migrate content
+ * to it.
+ */
+ struct GNUNET_TIME_Absolute migration_blocked;
+
+ /**
+ * Time until when we blocked this peer from migrating
+ * data to us.
+ */
+ struct GNUNET_TIME_Absolute last_migration_block;
+
/**
* Handle for an active request for transmission to this
* peer, or NULL.
/**
* Increase in traffic preference still to be submitted
- * to the core service for this peer. FIXME: double or 'uint64_t'?
+ * to the core service for this peer.
+ */
+ uint64_t inc_preference;
+
+ /**
+ * Trust rating for this peer
+ */
+ uint32_t trust;
+
+ /**
+ * Trust rating for this peer on disk.
*/
- double inc_preference;
+ uint32_t disk_trust;
/**
* The peer's identity.
};
-/**
- * Hash map entry of requests we are performing
- * on behalf of the same peer.
- */
-struct PeerRequestEntry
-{
-
- /**
- * Request this entry represents.
- */
- struct PendingRequest *req;
-
- /**
- * Entry of peer responsible for this entry.
- */
- struct ConnectedPeer *cp;
-
-};
-
-
/**
* Doubly-linked list of messages we are performing
* due to a pending request.
* client request list; otherwise NULL.
*/
struct ClientRequestList *client_request_list;
-
+
/**
- * If this request was made by a peer, this is our entry in the
- * per-peer multi-hash map; otherwise NULL.
+ * Entry of peer responsible for this entry (if this request
+ * was made by a peer).
*/
- struct PeerRequestEntry *pht_entry;
+ struct ConnectedPeer *cp;
/**
* If this is a namespace query, pointer to the hash of the public
GNUNET_PEER_Id *used_pids;
/**
- * Our entry in the DRQ (non-NULL while we wait for our
+ * Our entry in the queue (non-NULL while we wait for our
* turn to interact with the local database).
*/
- struct DatastoreRequestQueue *drq;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
/**
* Size of the 'bf' (in bytes).
uint32_t remaining_priority;
/**
- * Number to mingle hashes for bloom-filter
- * tests with.
+ * Number to mingle hashes for bloom-filter tests with.
*/
int32_t mingle;
/**
* Type of the content that this request is for.
*/
- uint32_t type;
+ enum GNUNET_BLOCK_Type type;
+
+ /**
+ * Remove this request after transmission of the current response.
+ */
+ int16_t do_remove;
+
+ /**
+ * GNUNET_YES if we should not forward this request to other peers.
+ */
+ int16_t local_only;
+
+};
+
+
+/**
+ * Block that is ready for migration to other peers. Actual data is at the end of the block.
+ */
+struct MigrationReadyBlock
+{
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct MigrationReadyBlock *next;
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct MigrationReadyBlock *prev;
+
+ /**
+ * Query for the block.
+ */
+ GNUNET_HashCode query;
+
+ /**
+ * When does this block expire?
+ */
+ struct GNUNET_TIME_Absolute expiration;
+
+ /**
+ * Peers we would consider forwarding this
+ * block to. Zero for empty entries.
+ */
+ GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
+
+ /**
+ * Size of the block.
+ */
+ size_t size;
+
+ /**
+ * Number of targets already used.
+ */
+ unsigned int used_targets;
+ /**
+ * Type of the block.
+ */
+ enum GNUNET_BLOCK_Type type;
};
+/**
+ * Our connection to the datastore.
+ */
+static struct GNUNET_DATASTORE_Handle *dsh;
+
+/**
+ * Our block context.
+ */
+static struct GNUNET_BLOCK_Context *block_ctx;
+
+/**
+ * Our block configuration.
+ */
+static struct GNUNET_CONFIGURATION_Handle *block_cfg;
+
/**
* Our scheduler.
*/
/**
* Our configuration.
*/
-const struct GNUNET_CONFIGURATION_Handle *cfg;
+static const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
* Map of peer identifiers to "struct ConnectedPeer" (for that peer).
*/
static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
+/**
+ * Handle for reporting statistics.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+
/**
* Linked list of clients we are currently processing requests for.
*/
-struct ClientList *client_list;
+static struct ClientList *client_list;
/**
* Pointer to handle to the core service (points to NULL until we've
* connected to it).
*/
-struct GNUNET_CORE_Handle *core;
+static struct GNUNET_CORE_Handle *core;
+/**
+ * Head of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_head;
-/* ******************* clean up functions ************************ */
+/**
+ * Tail of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_tail;
+/**
+ * Request to datastore for migration (or NULL).
+ */
+static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
/**
- * We're done with a particular message list entry.
- * Free all associated resources.
- *
- * @param pml entry to destroy
+ * Where do we store trust information?
*/
-static void
-destroy_pending_message_list_entry (struct PendingMessageList *pml)
-{
- GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
- pml->req->pending_tail,
- pml);
- GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
- pml->target->pending_messages_tail,
- pml->pm);
- pml->target->pending_requests--;
- GNUNET_free (pml->pm);
- GNUNET_free (pml);
-}
+static char *trustDirectory;
+/**
+ * ID of task that collects blocks for migration.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier mig_task;
/**
- * Destroy the given pending message (and call the respective
- * continuation).
- *
- * @param pm message to destroy
- * @param tpid id of peer that the message was delivered to, or 0 for none
+ * What is the maximum frequency at which we are allowed to
+ * poll the datastore for migration content?
*/
-static void
-destroy_pending_message (struct PendingMessage *pm,
- GNUNET_PEER_Id tpid)
-{
- struct PendingMessageList *pml = pm->pml;
+static struct GNUNET_TIME_Relative min_migration_delay;
- GNUNET_assert (pml->pm == pm);
- GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
- pm->cont (pm->cont_cls, 0);
- destroy_pending_message_list_entry (pml);
-}
+/**
+ * Handle for DHT operations.
+ */
+static struct GNUNET_DHT_Handle *dht_handle;
+/**
+ * Size of the doubly-linked list of migration blocks.
+ */
+static unsigned int mig_size;
+/**
+ * Are we allowed to migrate content to this peer.
+ */
+static int active_migration;
/**
- * We're done processing a particular request.
- * Free all associated resources.
- *
- * @param pr request to destroy
+ * Typical priorities we're seeing from other peers right now. Since
+ * most priorities will be zero, this value is the weighted average of
+ * non-zero priorities seen "recently". In order to ensure that new
+ * values do not dramatically change the ratio, values are first
+ * "capped" to a reasonable range (+N of the current value) and then
+ * averaged into the existing value by a ratio of 1:N. Hence
+ * receiving the largest possible priority can still only raise our
+ * "current_priorities" by at most 1.
*/
-static void
-destroy_pending_request (struct PendingRequest *pr)
-{
- struct GNUNET_PeerIdentity pid;
+static double current_priorities;
- if (pr->hnode != NULL)
- {
- GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
- pr->hnode);
- pr->hnode = NULL;
- }
- /* might have already been removed from map
- in 'process_reply' if there was a unique
- reply; hence ignore the return value here */
- (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
- &pr->query,
- pr);
- if (pr->drq != NULL)
- {
- GNUNET_FS_drq_get_cancel (pr->drq);
- pr->drq = NULL;
- }
- if (pr->client_request_list != NULL)
- {
- GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
- pr->client_request_list->client_list->rl_tail,
- pr->client_request_list);
- GNUNET_free (pr->client_request_list);
- pr->client_request_list = NULL;
- }
- if (pr->pht_entry != NULL)
- {
- GNUNET_PEER_resolve (pr->pht_entry->cp->pid,
- &pid);
- GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
- &pid.hashPubKey,
- pr->pht_entry);
- GNUNET_free (pr->pht_entry);
- pr->pht_entry = NULL;
- }
- if (pr->bf != NULL)
- {
- GNUNET_CONTAINER_bloomfilter_free (pr->bf);
- pr->bf = NULL;
- }
- if (pr->irc != NULL)
- {
- GNUNET_CORE_peer_change_preference_cancel (pr->irc);
- pr->irc = NULL;
- }
- if (pr->replies_seen != NULL)
- {
- GNUNET_free (pr->replies_seen);
- pr->replies_seen = NULL;
- }
- if (pr->task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (sched,
- pr->task);
- pr->task = GNUNET_SCHEDULER_NO_TASK;
- }
- while (NULL != pr->pending_head)
- destroy_pending_message_list_entry (pr->pending_head);
- GNUNET_PEER_change_rc (pr->target_pid, -1);
- if (pr->used_pids != NULL)
- {
- GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
- GNUNET_free (pr->used_pids);
- pr->used_pids_off = 0;
- pr->used_pids_size = 0;
- pr->used_pids = NULL;
- }
- GNUNET_free (pr);
-}
+/**
+ * Datastore 'GET' load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_get_load;
+
+/**
+ * Datastore 'PUT' load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_put_load;
/**
- * Method called whenever a given peer connects.
+ * We've just now completed a datastore request. Update our
+ * datastore load calculations.
*
- * @param cls closure, not used
- * @param peer peer identity this notification is about
- * @param latency reported latency of the connection with 'other'
- * @param distance reported distance (DV) to 'other'
+ * @param start time when the datastore request was issued
*/
-static void
-peer_connect_handler (void *cls,
- const struct
- GNUNET_PeerIdentity * peer,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance)
+static void
+update_datastore_delays (struct GNUNET_TIME_Absolute start)
{
- struct ConnectedPeer *cp;
+ struct GNUNET_TIME_Relative delay;
- cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
- cp->pid = GNUNET_PEER_intern (peer);
- GNUNET_CONTAINER_multihashmap_put (connected_peers,
- &peer->hashPubKey,
- cp,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ delay = GNUNET_TIME_absolute_get_duration (start);
+ GNUNET_LOAD_update (datastore_get_load,
+ delay.value);
}
/**
- * Free (each) request made by the peer.
- *
- * @param cls closure, points to peer that the request belongs to
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES (we should continue to iterate)
+ * Get the filename under which we would store the GNUNET_HELLO_Message
+ * for the given host and protocol.
+ * @return filename of the form DIRECTORY/HOSTID
*/
-static int
-destroy_request (void *cls,
- const GNUNET_HashCode * key,
- void *value)
+static char *
+get_trust_filename (const struct GNUNET_PeerIdentity *id)
{
- const struct GNUNET_PeerIdentity * peer = cls;
- struct PendingRequest *pr = value;
-
- GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
- &peer->hashPubKey,
- pr);
- destroy_pending_request (pr);
- return GNUNET_YES;
+ struct GNUNET_CRYPTO_HashAsciiEncoded fil;
+ char *fn;
+
+ GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
+ GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
+ return fn;
}
+
/**
- * Method called whenever a peer disconnects.
+ * Transmit messages by copying it to the target buffer
+ * "buf". "buf" will be NULL and "size" zero if the socket was closed
+ * for writing in the meantime. In that case, do nothing
+ * (the disconnect or shutdown handler will take care of the rest).
+ * If we were able to transmit messages and there are still more
+ * pending, ask core again for further calls to this function.
*
- * @param cls closure, not used
- * @param peer peer identity this notification is about
+ * @param cls closure, pointer to the 'struct ConnectedPeer*'
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
*/
-static void
+static size_t
+transmit_to_peer (void *cls,
+ size_t size, void *buf);
+
+
+/* ******************* clean up functions ************************ */
+
+/**
+ * Delete the given migration block.
+ *
+ * @param mb block to delete
+ */
+static void
+delete_migration_block (struct MigrationReadyBlock *mb)
+{
+ GNUNET_CONTAINER_DLL_remove (mig_head,
+ mig_tail,
+ mb);
+ GNUNET_PEER_decrement_rcs (mb->target_list,
+ MIGRATION_LIST_SIZE);
+ mig_size--;
+ GNUNET_free (mb);
+}
+
+
+/**
+ * Compare the distance of two peers to a key.
+ *
+ * @param key key
+ * @param p1 first peer
+ * @param p2 second peer
+ * @return GNUNET_YES if P1 is closer to key than P2
+ */
+static int
+is_closer (const GNUNET_HashCode *key,
+ const struct GNUNET_PeerIdentity *p1,
+ const struct GNUNET_PeerIdentity *p2)
+{
+ return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
+ &p2->hashPubKey,
+ key);
+}
+
+
+/**
+ * Consider migrating content to a given peer.
+ *
+ * @param cls 'struct MigrationReadyBlock*' to select
+ * targets for (or NULL for none)
+ * @param key ID of the peer
+ * @param value 'struct ConnectedPeer' of the peer
+ * @return GNUNET_YES (always continue iteration)
+ */
+static int
+consider_migration (void *cls,
+ const GNUNET_HashCode *key,
+ void *value)
+{
+ struct MigrationReadyBlock *mb = cls;
+ struct ConnectedPeer *cp = value;
+ struct MigrationReadyBlock *pos;
+ struct GNUNET_PeerIdentity cppid;
+ struct GNUNET_PeerIdentity otherpid;
+ struct GNUNET_PeerIdentity worstpid;
+ size_t msize;
+ unsigned int i;
+ unsigned int repl;
+
+ /* consider 'cp' as a migration target for mb */
+ if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
+ return GNUNET_YES; /* peer has requested no migration! */
+ if (mb != NULL)
+ {
+ GNUNET_PEER_resolve (cp->pid,
+ &cppid);
+ repl = MIGRATION_LIST_SIZE;
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if (mb->target_list[i] == 0)
+ {
+ mb->target_list[i] = cp->pid;
+ GNUNET_PEER_change_rc (mb->target_list[i], 1);
+ repl = MIGRATION_LIST_SIZE;
+ break;
+ }
+ GNUNET_PEER_resolve (mb->target_list[i],
+ &otherpid);
+ if ( (repl == MIGRATION_LIST_SIZE) &&
+ is_closer (&mb->query,
+ &cppid,
+ &otherpid))
+ {
+ repl = i;
+ worstpid = otherpid;
+ }
+ else if ( (repl != MIGRATION_LIST_SIZE) &&
+ (is_closer (&mb->query,
+ &worstpid,
+ &otherpid) ) )
+ {
+ repl = i;
+ worstpid = otherpid;
+ }
+ }
+ if (repl != MIGRATION_LIST_SIZE)
+ {
+ GNUNET_PEER_change_rc (mb->target_list[repl], -1);
+ mb->target_list[repl] = cp->pid;
+ GNUNET_PEER_change_rc (mb->target_list[repl], 1);
+ }
+ }
+
+ /* consider scheduling transmission to cp for content migration */
+ if (cp->cth != NULL)
+ return GNUNET_YES;
+ msize = 0;
+ pos = mig_head;
+ while (pos != NULL)
+ {
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if (cp->pid == pos->target_list[i])
+ {
+ if (msize == 0)
+ msize = pos->size;
+ else
+ msize = GNUNET_MIN (msize,
+ pos->size);
+ break;
+ }
+ }
+ pos = pos->next;
+ }
+ if (msize == 0)
+ return GNUNET_YES; /* no content available */
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Trying to migrate at least %u bytes to peer `%s'\n",
+ msize,
+ GNUNET_h2s (key));
+#endif
+ cp->cth
+ = GNUNET_CORE_notify_transmit_ready (core,
+ 0, GNUNET_TIME_UNIT_FOREVER_REL,
+ (const struct GNUNET_PeerIdentity*) key,
+ msize + sizeof (struct PutMessage),
+ &transmit_to_peer,
+ cp);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ *
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * If the migration task is not currently running, consider
+ * (re)scheduling it with the appropriate delay.
+ */
+static void
+consider_migration_gathering ()
+{
+ struct GNUNET_TIME_Relative delay;
+
+ if (dsh == NULL)
+ return;
+ if (mig_qe != NULL)
+ return;
+ if (mig_task != GNUNET_SCHEDULER_NO_TASK)
+ return;
+ delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ mig_size);
+ delay = GNUNET_TIME_relative_divide (delay,
+ MAX_MIGRATION_QUEUE);
+ delay = GNUNET_TIME_relative_max (delay,
+ min_migration_delay);
+ mig_task = GNUNET_SCHEDULER_add_delayed (sched,
+ delay,
+ &gather_migration_blocks,
+ NULL);
+}
+
+
+/**
+ * Process content offered for migration.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ */
+static void
+process_migration_content (void *cls,
+ const GNUNET_HashCode * key,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration, uint64_t uid)
+{
+ struct MigrationReadyBlock *mb;
+
+ if (key == NULL)
+ {
+ mig_qe = NULL;
+ if (mig_size < MAX_MIGRATION_QUEUE)
+ consider_migration_gathering ();
+ return;
+ }
+ if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+ {
+ if (GNUNET_OK !=
+ GNUNET_FS_handle_on_demand_block (key, size, data,
+ type, priority, anonymity,
+ expiration, uid,
+ &process_migration_content,
+ NULL))
+ {
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ }
+ return;
+ }
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Retrieved block `%s' of type %u for migration\n",
+ GNUNET_h2s (key),
+ type);
+#endif
+ mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
+ mb->query = *key;
+ mb->expiration = expiration;
+ mb->size = size;
+ mb->type = type;
+ memcpy (&mb[1], data, size);
+ GNUNET_CONTAINER_DLL_insert_after (mig_head,
+ mig_tail,
+ mig_tail,
+ mb);
+ mig_size++;
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &consider_migration,
+ mb);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+}
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ *
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ mig_task = GNUNET_SCHEDULER_NO_TASK;
+ if (dsh != NULL)
+ {
+ mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_migration_content, NULL);
+ GNUNET_assert (mig_qe != NULL);
+ }
+}
+
+
+/**
+ * We're done with a particular message list entry.
+ * Free all associated resources.
+ *
+ * @param pml entry to destroy
+ */
+static void
+destroy_pending_message_list_entry (struct PendingMessageList *pml)
+{
+ GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
+ pml->req->pending_tail,
+ pml);
+ GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
+ pml->target->pending_messages_tail,
+ pml->pm);
+ pml->target->pending_requests--;
+ GNUNET_free (pml->pm);
+ GNUNET_free (pml);
+}
+
+
+/**
+ * Destroy the given pending message (and call the respective
+ * continuation).
+ *
+ * @param pm message to destroy
+ * @param tpid id of peer that the message was delivered to, or 0 for none
+ */
+static void
+destroy_pending_message (struct PendingMessage *pm,
+ GNUNET_PEER_Id tpid)
+{
+ struct PendingMessageList *pml = pm->pml;
+ TransmissionContinuation cont;
+ void *cont_cls;
+
+ if (pml != NULL)
+ {
+ GNUNET_assert (pml->pm == pm);
+ GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
+ cont = pm->cont;
+ cont_cls = pm->cont_cls;
+ destroy_pending_message_list_entry (pml);
+ }
+ else
+ {
+ GNUNET_free (pm);
+ }
+ if (cont != NULL)
+ cont (cont_cls, tpid);
+}
+
+
+/**
+ * We're done processing a particular request.
+ * Free all associated resources.
+ *
+ * @param pr request to destroy
+ */
+static void
+destroy_pending_request (struct PendingRequest *pr)
+{
+ struct GNUNET_PeerIdentity pid;
+
+ if (pr->hnode != NULL)
+ {
+ GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
+ pr->hnode);
+ pr->hnode = NULL;
+ }
+ if (NULL == pr->client_request_list)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# P2P searches active"),
+ -1,
+ GNUNET_NO);
+ }
+ else
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# client searches active"),
+ -1,
+ GNUNET_NO);
+ }
+ /* might have already been removed from map in 'process_reply' (if
+ there was a unique reply) or never inserted if it was a
+ duplicate; hence ignore the return value here */
+ (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
+ &pr->query,
+ pr);
+ if (pr->qe != NULL)
+ {
+ GNUNET_DATASTORE_cancel (pr->qe);
+ pr->qe = NULL;
+ }
+ if (pr->client_request_list != NULL)
+ {
+ GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
+ pr->client_request_list->client_list->rl_tail,
+ pr->client_request_list);
+ GNUNET_free (pr->client_request_list);
+ pr->client_request_list = NULL;
+ }
+ if (pr->cp != NULL)
+ {
+ GNUNET_PEER_resolve (pr->cp->pid,
+ &pid);
+ (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
+ &pid.hashPubKey,
+ pr);
+ pr->cp = NULL;
+ }
+ if (pr->bf != NULL)
+ {
+ GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+ pr->bf = NULL;
+ }
+ if (pr->irc != NULL)
+ {
+ GNUNET_CORE_peer_change_preference_cancel (pr->irc);
+ pr->irc = NULL;
+ }
+ if (pr->replies_seen != NULL)
+ {
+ GNUNET_free (pr->replies_seen);
+ pr->replies_seen = NULL;
+ }
+ if (pr->task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched,
+ pr->task);
+ pr->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ while (NULL != pr->pending_head)
+ destroy_pending_message_list_entry (pr->pending_head);
+ GNUNET_PEER_change_rc (pr->target_pid, -1);
+ if (pr->used_pids != NULL)
+ {
+ GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
+ GNUNET_free (pr->used_pids);
+ pr->used_pids_off = 0;
+ pr->used_pids_size = 0;
+ pr->used_pids = NULL;
+ }
+ GNUNET_free (pr);
+}
+
+
+/**
+ * Method called whenever a given peer connects.
+ *
+ * @param cls closure, not used
+ * @param peer peer identity this notification is about
+ * @param latency reported latency of the connection with 'other'
+ * @param distance reported distance (DV) to 'other'
+ */
+static void
+peer_connect_handler (void *cls,
+ const struct
+ GNUNET_PeerIdentity * peer,
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance)
+{
+ struct ConnectedPeer *cp;
+ struct MigrationReadyBlock *pos;
+ char *fn;
+ uint32_t trust;
+
+ cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
+ cp->pid = GNUNET_PEER_intern (peer);
+
+ fn = get_trust_filename (peer);
+ if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
+ (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
+ cp->disk_trust = cp->trust = ntohl (trust);
+ GNUNET_free (fn);
+
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (connected_peers,
+ &peer->hashPubKey,
+ cp,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+
+ pos = mig_head;
+ while (NULL != pos)
+ {
+ (void) consider_migration (pos, &peer->hashPubKey, cp);
+ pos = pos->next;
+ }
+}
+
+
+/**
+ * Increase the host credit by a value.
+ *
+ * @param host which peer to change the trust value on
+ * @param value is the int value by which the
+ * host credit is to be increased or decreased
+ * @returns the actual change in trust (positive or negative)
+ */
+static int
+change_host_trust (struct ConnectedPeer *host, int value)
+{
+ unsigned int old_trust;
+
+ if (value == 0)
+ return 0;
+ GNUNET_assert (host != NULL);
+ old_trust = host->trust;
+ if (value > 0)
+ {
+ if (host->trust + value < host->trust)
+ {
+ value = UINT32_MAX - host->trust;
+ host->trust = UINT32_MAX;
+ }
+ else
+ host->trust += value;
+ }
+ else
+ {
+ if (host->trust < -value)
+ {
+ value = -host->trust;
+ host->trust = 0;
+ }
+ else
+ host->trust += value;
+ }
+ return value;
+}
+
+
+/**
+ * Write host-trust information to a file - flush the buffer entry!
+ */
+static int
+flush_trust (void *cls,
+ const GNUNET_HashCode *key,
+ void *value)
+{
+ struct ConnectedPeer *host = value;
+ char *fn;
+ uint32_t trust;
+ struct GNUNET_PeerIdentity pid;
+
+ if (host->trust == host->disk_trust)
+ return GNUNET_OK; /* unchanged */
+ GNUNET_PEER_resolve (host->pid,
+ &pid);
+ fn = get_trust_filename (&pid);
+ if (host->trust == 0)
+ {
+ if ((0 != UNLINK (fn)) && (errno != ENOENT))
+ GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
+ GNUNET_ERROR_TYPE_BULK, "unlink", fn);
+ }
+ else
+ {
+ trust = htonl (host->trust);
+ if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust,
+ sizeof(uint32_t),
+ GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
+ | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
+ host->disk_trust = host->trust;
+ }
+ GNUNET_free (fn);
+ return GNUNET_OK;
+}
+
+/**
+ * Call this method periodically to scan data/hosts for new hosts.
+ */
+static void
+cron_flush_trust (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+
+ if (NULL == connected_peers)
+ return;
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &flush_trust,
+ NULL);
+ if (NULL == tc)
+ return;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+ GNUNET_SCHEDULER_add_delayed (tc->sched,
+ TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
+}
+
+
+/**
+ * Free (each) request made by the peer.
+ *
+ * @param cls closure, points to peer that the request belongs to
+ * @param key current key code
+ * @param value value in the hash map
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int
+destroy_request (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ const struct GNUNET_PeerIdentity * peer = cls;
+ struct PendingRequest *pr = value;
+
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
+ &peer->hashPubKey,
+ pr));
+ destroy_pending_request (pr);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Method called whenever a peer disconnects.
+ *
+ * @param cls closure, not used
+ * @param peer peer identity this notification is about
+ */
+static void
peer_disconnect_handler (void *cls,
const struct
GNUNET_PeerIdentity * peer)
struct ConnectedPeer *cp;
struct PendingMessage *pm;
unsigned int i;
+ struct MigrationReadyBlock *pos;
+ struct MigrationReadyBlock *next;
GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
&peer->hashPubKey,
cp->last_client_replies[i] = NULL;
}
}
- GNUNET_CONTAINER_multihashmap_remove (connected_peers,
- &peer->hashPubKey,
- cp);
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (connected_peers,
+ &peer->hashPubKey,
+ cp));
+ /* remove this peer from migration considerations; schedule
+ alternatives */
+ next = mig_head;
+ while (NULL != (pos = next))
+ {
+ next = pos->next;
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if (pos->target_list[i] == cp->pid)
+ {
+ GNUNET_PEER_change_rc (pos->target_list[i], -1);
+ pos->target_list[i] = 0;
+ }
+ }
+ if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
+ {
+ delete_migration_block (pos);
+ consider_migration_gathering ();
+ continue;
+ }
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &consider_migration,
+ pos);
+ }
GNUNET_PEER_change_rc (cp->pid, -1);
GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
if (NULL != cp->cth)
struct ClientResponseMessage *creply;
if (client == NULL)
- return; /* huh? is this allowed? */
+ return;
prev = NULL;
pos = client_list;
while ( (NULL != pos) &&
if (pos == NULL)
return; /* no requests pending for this client */
while (NULL != (rcl = pos->rl_head))
- destroy_pending_request (rcl->req);
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Destroying pending request `%s' on disconnect\n",
+ GNUNET_h2s (&rcl->req->query));
+ destroy_pending_request (rcl->req);
+ }
if (prev == NULL)
client_list = pos->next;
else
shutdown_task (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ if (mig_qe != NULL)
+ {
+ GNUNET_DATASTORE_cancel (mig_qe);
+ mig_qe = NULL;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != mig_task)
+ {
+ GNUNET_SCHEDULER_cancel (sched, mig_task);
+ mig_task = GNUNET_SCHEDULER_NO_TASK;
+ }
while (client_list != NULL)
handle_client_disconnect (NULL,
client_list->client);
+ cron_flush_trust (NULL, NULL);
GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
&clean_peer,
NULL);
GNUNET_assert (NULL != core);
GNUNET_CORE_disconnect (core);
core = NULL;
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+ stats = NULL;
+ }
+ if (dsh != NULL)
+ {
+ GNUNET_DATASTORE_disconnect (dsh,
+ GNUNET_NO);
+ dsh = NULL;
+ }
+ while (mig_head != NULL)
+ delete_migration_block (mig_head);
+ GNUNET_assert (0 == mig_size);
+ GNUNET_DHT_disconnect (dht_handle);
+ dht_handle = NULL;
+ GNUNET_LOAD_value_free (datastore_get_load);
+ datastore_get_load = NULL;
+ GNUNET_LOAD_value_free (datastore_put_load);
+ datastore_put_load = NULL;
+ GNUNET_BLOCK_context_destroy (block_ctx);
+ block_ctx = NULL;
+ GNUNET_CONFIGURATION_destroy (block_cfg);
+ block_cfg = NULL;
sched = NULL;
cfg = NULL;
+ GNUNET_free_non_null (trustDirectory);
+ trustDirectory = NULL;
}
/**
- * Transmit the given message by copying it to the target buffer
+ * Transmit messages by copying it to the target buffer
* "buf". "buf" will be NULL and "size" zero if the socket was closed
* for writing in the meantime. In that case, do nothing
* (the disconnect or shutdown handler will take care of the rest).
char *cbuf = buf;
struct GNUNET_PeerIdentity pid;
struct PendingMessage *pm;
+ struct MigrationReadyBlock *mb;
+ struct MigrationReadyBlock *next;
+ struct PutMessage migm;
size_t msize;
-
+ unsigned int i;
+
cp->cth = NULL;
if (NULL == buf)
{
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Dropping reply, core too busy.\n");
+ "Dropping message, core too busy.\n");
#endif
return 0;
}
&pid,
pm->msize,
&transmit_to_peer,
- pm);
+ cp);
}
+ else
+ {
+ next = mig_head;
+ while (NULL != (mb = next))
+ {
+ next = mb->next;
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if ( (cp->pid == mb->target_list[i]) &&
+ (mb->size + sizeof (migm) <= size) )
+ {
+ GNUNET_PEER_change_rc (mb->target_list[i], -1);
+ mb->target_list[i] = 0;
+ mb->used_targets++;
+ memset (&migm, 0, sizeof (migm));
+ migm.header.size = htons (sizeof (migm) + mb->size);
+ migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+ migm.type = htonl (mb->type);
+ migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
+ memcpy (&cbuf[msize], &migm, sizeof (migm));
+ msize += sizeof (migm);
+ size -= sizeof (migm);
+ memcpy (&cbuf[msize], &mb[1], mb->size);
+ msize += mb->size;
+ size -= mb->size;
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Pushing migration block `%s' (%u bytes) to `%s'\n",
+ GNUNET_h2s (&mb->query),
+ mb->size,
+ GNUNET_i2s (&pid));
+#endif
+ break;
+ }
+ else
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
+ GNUNET_h2s (&mb->query),
+ mb->size,
+ GNUNET_i2s (&pid));
+#endif
+ }
+ }
+ if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
+ (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
+ {
+ delete_migration_block (mb);
+ consider_migration_gathering ();
+ }
+ }
+ consider_migration (NULL,
+ &pid.hashPubKey,
+ cp);
+ }
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitting %u bytes to peer %u\n",
+ msize,
+ cp->pid);
+#endif
return msize;
}
GNUNET_assert (pm->next == NULL);
GNUNET_assert (pm->pml == NULL);
- pml = GNUNET_malloc (sizeof (struct PendingMessageList));
- pml->req = pr;
- pml->target = cp;
- pml->pm = pm;
- pm->pml = pml;
- GNUNET_CONTAINER_DLL_insert (pr->pending_head,
- pr->pending_tail,
- pml);
+ if (pr != NULL)
+ {
+ pml = GNUNET_malloc (sizeof (struct PendingMessageList));
+ pml->req = pr;
+ pml->target = cp;
+ pml->pm = pm;
+ pm->pml = pml;
+ GNUNET_CONTAINER_DLL_insert (pr->pending_head,
+ pr->pending_tail,
+ pml);
+ }
pos = cp->pending_messages_head;
while ( (pos != NULL) &&
(pm->priority < pos->priority) )
cp->pending_requests++;
if (cp->pending_requests > MAX_QUEUE_PER_PEER)
destroy_pending_message (cp->pending_messages_tail, 0);
- if (cp->cth == NULL)
- {
- /* need to schedule transmission */
- GNUNET_PEER_resolve (cp->pid, &pid);
- cp->cth = GNUNET_CORE_notify_transmit_ready (core,
- cp->pending_messages_head->priority,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &pid,
- cp->pending_messages_head->msize,
- &transmit_to_peer,
- cp);
- }
- if (cp->cth == NULL)
- {
- /* FIXME: call stats (rare, bad case) */
- }
-}
-
-
-/**
- * Mingle hash with the mingle_number to produce different bits.
- */
-static void
-mingle_hash (const GNUNET_HashCode * in,
- int32_t mingle_number,
- GNUNET_HashCode * hc)
-{
- GNUNET_HashCode m;
-
- GNUNET_CRYPTO_hash (&mingle_number,
- sizeof (int32_t),
- &m);
- GNUNET_CRYPTO_hash_xor (&m, in, hc);
+ GNUNET_PEER_resolve (cp->pid, &pid);
+ if (NULL != cp->cth)
+ GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+ /* need to schedule transmission */
+ cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+ cp->pending_messages_head->priority,
+ MAX_TRANSMIT_DELAY,
+ &pid,
+ cp->pending_messages_head->msize,
+ &transmit_to_peer,
+ cp);
+ if (cp->cth == NULL)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to schedule transmission with core!\n");
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# CORE transmission failures"),
+ 1,
+ GNUNET_NO);
+ }
}
* to even consider processing the query at
* all.
*
- * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
+ * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
*/
static int
test_load_too_high ()
{
- return GNUNET_NO; // FIXME
+ return GNUNET_SYSERR; // FIXME
}
/* ******************* Pending Request Refresh Task ******************** */
+
+/**
+ * We use a random delay to make the timing of requests less
+ * predictable. This function returns such a random delay. We add a base
+ * delay of MAX_CORK_DELAY (1s).
+ *
+ * FIXME: make schedule dependent on the specifics of the request?
+ * Or bandwidth and number of connected peers and load?
+ *
+ * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
+ */
+static struct GNUNET_TIME_Relative
+get_processing_delay ()
+{
+ return
+ GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ TTL_DECREMENT)));
+}
+
+
+/**
+ * We're processing a GET request from another peer and have decided
+ * to forward it to other peers. This function is called periodically
+ * and should forward the request to other peers until we have all
+ * possible replies. If we have transmitted the *only* reply to
+ * the initiator we should destroy the pending request. If we have
+ * many replies in the queue to the initiator, we should delay sending
+ * out more queries until the reply queue has shrunk some.
+ *
+ * @param cls our "struct ProcessGetContext *"
+ * @param tc unused
+ */
+static void
+forward_request_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
/**
* Function called after we either failed or succeeded
* at transmitting a query to a peer.
*
* @param cls the requests "struct PendingRequest*"
- * @param pid ID of receiving peer, 0 on transmission error
+ * @param tpid ID of receiving peer, 0 on transmission error
*/
static void
transmit_query_continuation (void *cls,
{
struct PendingRequest *pr = cls;
- if (tpid == 0)
- return;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# queries scheduled for forwarding"),
+ -1,
+ GNUNET_NO);
+ if (tpid == 0)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmission of request failed, will try again later.\n");
+#endif
+ if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ get_processing_delay (),
+ &forward_request_task,
+ pr);
+ return;
+ }
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# queries forwarded"),
+ 1,
+ GNUNET_NO);
GNUNET_PEER_change_rc (tpid, 1);
if (pr->used_pids_off == pr->used_pids_size)
GNUNET_array_grow (pr->used_pids,
pr->used_pids_size,
pr->used_pids_size * 2 + 2);
pr->used_pids[pr->used_pids_off++] = tpid;
+ if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ get_processing_delay (),
+ &forward_request_task,
+ pr);
}
-#if 0
/**
* How many bytes should a bloomfilter be if we have already seen
* entry_count responses? Note that BLOOMFILTER_K gives us the number
/**
- * Recalculate our bloom filter for filtering replies.
+ * Recalculate our bloom filter for filtering replies. This function
+ * will create a new bloom filter from scratch, so it should only be
+ * called if we have no bloomfilter at all (and hence can create a
+ * fresh one of minimal size without problems) OR if our peer is the
+ * initiator (in which case we may resize to larger than mimimum size).
*
- * @param count number of entries we are filtering right now
- * @param mingle set to our new mingling value
- * @param bf_size set to the size of the bloomfilter
- * @param entries the entries to filter
- * @return updated bloomfilter, NULL for none
- */
-static struct GNUNET_CONTAINER_BloomFilter *
-refresh_bloomfilter (unsigned int count,
- int32_t * mingle,
- size_t *bf_size,
- const GNUNET_HashCode *entries)
+ * @param pr request for which the BF is to be recomputed
+ */
+static void
+refresh_bloomfilter (struct PendingRequest *pr)
{
- struct GNUNET_CONTAINER_BloomFilter *bf;
- size_t nsize;
unsigned int i;
+ size_t nsize;
GNUNET_HashCode mhash;
- if (0 == count)
- return NULL;
- nsize = compute_bloomfilter_size (count);
- *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
- *bf_size = nsize;
- bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
- nsize,
- BLOOMFILTER_K);
- for (i=0;i<count;i++)
+ nsize = compute_bloomfilter_size (pr->replies_seen_off);
+ if (nsize == pr->bf_size)
+ return; /* size not changed */
+ if (pr->bf != NULL)
+ GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+ pr->bf_size = nsize;
+ pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
+ pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
+ pr->bf_size,
+ BLOOMFILTER_K);
+ for (i=0;i<pr->replies_seen_off;i++)
{
- mingle_hash (&entries[i], *mingle, &mhash);
- GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
+ GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
+ pr->mingle,
+ &mhash);
+ GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
}
- return bf;
-}
-#endif
-
-
-/**
- * We use a random delay to make the timing of requests less
- * predictable. This function returns such a random delay.
- *
- * FIXME: make schedule dependent on the specifics of the request?
- * Or bandwidth and number of connected peers and load?
- *
- * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
- */
-static struct GNUNET_TIME_Relative
-get_processing_delay ()
-{
- return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- TTL_DECREMENT));
}
target_reservation_cb (void *cls,
const struct
GNUNET_PeerIdentity * peer,
- unsigned int bpm_in,
- unsigned int bpm_out,
+ struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
+ struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
int amount,
uint64_t preference)
{
size_t msize;
unsigned int k;
int no_route;
+ uint32_t bm;
pr->irc = NULL;
- GNUNET_assert (peer != NULL);
+ if (peer == NULL)
+ {
+ /* error in communication with core, try again later */
+ if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ get_processing_delay (),
+ &forward_request_task,
+ pr);
+ return;
+ }
// (3) transmit, update ttl/priority
cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
&peer->hashPubKey);
if (cp == NULL)
{
/* Peer must have just left */
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Selected peer disconnected!\n");
+#endif
+ if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ get_processing_delay (),
+ &forward_request_task,
+ pr);
return;
}
no_route = GNUNET_NO;
- if (amount != DBLOCK_SIZE)
+ if (amount == 0)
{
- if (pr->pht_entry == NULL)
- return; /* this target round failed */
+ if (pr->cp == NULL)
+ {
+#if DEBUG_FS > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
+ amount,
+ DBLOCK_SIZE);
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# reply bandwidth reservation requests failed"),
+ 1,
+ GNUNET_NO);
+ if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ get_processing_delay (),
+ &forward_request_task,
+ pr);
+ return; /* this target round failed */
+ }
/* FIXME: if we are "quite" busy, we may still want to skip
this round; need more load detection code! */
no_route = GNUNET_YES;
}
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# queries scheduled for forwarding"),
+ 1,
+ GNUNET_NO);
/* build message and insert message into priority queue */
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Forwarding request `%s' to `%4s'!\n",
+ GNUNET_h2s (&pr->query),
+ GNUNET_i2s (peer));
+#endif
k = 0;
+ bm = 0;
+ if (GNUNET_YES == no_route)
+ {
+ bm |= GET_MESSAGE_BIT_RETURN_TO;
+ k++;
+ }
if (pr->namespace != NULL)
- k++;
+ {
+ bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
+ k++;
+ }
if (pr->target_pid != 0)
- k++;
- if (GNUNET_YES == no_route)
- k++;
+ {
+ bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
+ k++;
+ }
msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
pr->remaining_priority /= 2;
gm->priority = htonl (pr->remaining_priority);
gm->ttl = htonl (pr->ttl);
- gm->filter_mutator = htonl(pr->mingle); // FIXME: bad endianess conversion?
- gm->hash_bitmap = htonl (42); // FIXME!
+ gm->filter_mutator = htonl(pr->mingle);
+ gm->hash_bitmap = htonl (bm);
gm->query = pr->query;
ext = (GNUNET_HashCode*) &gm[1];
k = 0;
+ if (GNUNET_YES == no_route)
+ GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
if (pr->namespace != NULL)
memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
if (pr->target_pid != 0)
GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
- if (GNUNET_YES == no_route)
- GNUNET_PEER_resolve (pr->pht_entry->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
bfdata = (char *) &ext[k];
if (pr->bf != NULL)
GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
struct PendingRequest *pr = psc->pr;
double score;
unsigned int i;
-
- /* 1) check if we have already (recently) forwarded to this peer */
+ unsigned int pc;
+
+ /* 1) check that this peer is not the initiator */
+ if (cp == pr->cp)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Skipping initiator in forwarding selection\n");
+#endif
+ return GNUNET_YES; /* skip */
+ }
+
+ /* 2) check if we have already (recently) forwarded to this peer */
+ pc = 0;
for (i=0;i<pr->used_pids_off;i++)
- if (pr->used_pids[i] == cp->pid)
- return GNUNET_YES; /* skip */
- // 2) calculate how much we'd like to forward to this peer
- score = 42; // FIXME!
- // FIXME: also need API to gather data on responsiveness
- // of this peer (we have fields for that in 'cp', but
- // they are never set!)
-
+ if (pr->used_pids[i] == cp->pid)
+ {
+ pc++;
+ if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ RETRY_PROBABILITY_INV))
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "NOT re-trying query that was previously transmitted %u times\n",
+ (unsigned int) pr->used_pids_off);
+#endif
+ return GNUNET_YES; /* skip */
+ }
+ }
+#if DEBUG_FS
+ if (0 < pc)
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Re-trying query that was previously transmitted %u times to this peer\n",
+ (unsigned int) pc);
+#endif
+ /* 3) calculate how much we'd like to forward to this peer,
+ starting with a random value that is strong enough
+ to at least give any peer a chance sometimes
+ (compared to the other factors that come later) */
+ /* 3a) count successful (recent) routes from cp for same source */
+ if (pr->cp != NULL)
+ {
+ score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ P2P_SUCCESS_LIST_SIZE);
+ for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
+ if (cp->last_p2p_replies[i] == pr->cp->pid)
+ score += 1; /* likely successful based on hot path */
+ }
+ else
+ {
+ score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ CS2P_SUCCESS_LIST_SIZE);
+ for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
+ if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
+ score += 1; /* likely successful based on hot path */
+ }
+ /* 3b) include latency */
+ if (cp->avg_delay.value < 4 * TTL_DECREMENT)
+ score += 1; /* likely fast based on latency */
+ /* 3c) include priorities */
+ if (cp->avg_priority <= pr->remaining_priority / 2.0)
+ score += 1; /* likely successful based on priorities */
+ /* 3d) penalize for queue size */
+ score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER);
+ /* 3e) include peer proximity */
+ score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
+ &pr->query)) / (double) UINT32_MAX);
+ /* 4) super-bonus for being the known target */
+ if (pr->target_pid == cp->pid)
+ score += 100.0;
/* store best-fit in closure */
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer `%s' gets score %f for forwarding query, max is %f\n",
+ GNUNET_h2s (key),
+ score,
+ psc->target_score);
+#endif
+ score++; /* avoid zero */
if (score > psc->target_score)
{
psc->target_score = score;
/**
- * We're processing a GET request from another peer and have decided
+ * The priority level imposes a bound on the maximum
+ * value for the ttl that can be requested.
+ *
+ * @param ttl_in requested ttl
+ * @param prio given priority
+ * @return ttl_in if ttl_in is below the limit,
+ * otherwise the ttl-limit for the given priority
+ */
+static int32_t
+bound_ttl (int32_t ttl_in, uint32_t prio)
+{
+ unsigned long long allowed;
+
+ if (ttl_in <= 0)
+ return ttl_in;
+ allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
+ if (ttl_in > allowed)
+ {
+ if (allowed >= (1 << 30))
+ return 1 << 30;
+ return allowed;
+ }
+ return ttl_in;
+}
+
+
+/**
+ * We're processing a GET request and have decided
* to forward it to other peers. This function is called periodically
* and should forward the request to other peers until we have all
* possible replies. If we have transmitted the *only* reply to
struct PendingRequest *pr = cls;
struct PeerSelectionContext psc;
struct ConnectedPeer *cp;
+ struct GNUNET_TIME_Relative delay;
- pr->task = GNUNET_SCHEDULER_add_delayed (sched,
- get_processing_delay (),
- &forward_request_task,
- pr);
+ pr->task = GNUNET_SCHEDULER_NO_TASK;
if (pr->irc != NULL)
- return; /* previous request still pending */
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Forwarding of query `%s' not attempted due to pending local lookup!\n",
+ GNUNET_h2s (&pr->query));
+#endif
+ return; /* already pending */
+ }
+ if (GNUNET_YES == pr->local_only)
+ return; /* configured to not do P2P search */
+ /* (0) try DHT */
+ if (0 == pr->anonymity_level)
+ {
+#if 0
+ /* DHT API needs fixing... */
+ pr->dht_get = GNUNET_DHT_get_start (dht_handle,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ pr->type,
+ &pr->query,
+ &process_dht_reply,
+ pr,
+ FIXME,
+ FIXME);
+#endif
+ }
/* (1) select target */
psc.pr = pr;
- psc.target_score = DBL_MIN;
+ psc.target_score = -DBL_MAX;
GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
&target_peer_select_cb,
&psc);
- if (psc.target_score == DBL_MIN)
- return; /* nobody selected */
+ if (psc.target_score == -DBL_MAX)
+ {
+ delay = get_processing_delay ();
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
+ GNUNET_h2s (&pr->query),
+ delay.value);
+#endif
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ delay,
+ &forward_request_task,
+ pr);
+ return; /* nobody selected */
+ }
+ /* (3) update TTL/priority */
+ if (pr->client_request_list != NULL)
+ {
+ /* FIXME: use better algorithm!? */
+ if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ 4))
+ pr->priority++;
+ /* bound priority we use by priorities we see from other peers
+ rounded up (must round up so that we can see non-zero
+ priorities, but round up as little as possible to make it
+ plausible that we forwarded another peers request) */
+ if (pr->priority > current_priorities + 1.0)
+ pr->priority = (uint32_t) current_priorities + 1.0;
+ pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
+ pr->priority);
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Trying query `%s' with priority %u and TTL %d.\n",
+ GNUNET_h2s (&pr->query),
+ pr->priority,
+ pr->ttl);
+#endif
+ }
- /* (2) reserve reply bandwidth */
+ /* (3) reserve reply bandwidth */
cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
&psc.target.hashPubKey);
+ GNUNET_assert (NULL != cp);
pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
&psc.target,
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
- (uint32_t) -1 /* no limit */,
- DBLOCK_SIZE,
- (uint64_t) cp->inc_preference,
+ GNUNET_BANDWIDTH_value_init (UINT32_MAX),
+ DBLOCK_SIZE * 2,
+ cp->inc_preference,
&target_reservation_cb,
pr);
- cp->inc_preference = 0.0;
+ cp->inc_preference = 0;
}
* at transmitting a reply to a peer.
*
* @param cls the requests "struct PendingRequest*"
- * @param pid ID of receiving peer, 0 on transmission error
+ * @param tpid ID of receiving peer, 0 on transmission error
*/
static void
transmit_reply_continuation (void *cls,
GNUNET_PEER_Id tpid)
{
struct PendingRequest *pr = cls;
-
+
switch (pr->type)
{
- case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
- case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_IBLOCK:
/* only one reply expected, done with the request! */
destroy_pending_request (pr);
break;
- case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
- case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
+ case GNUNET_BLOCK_TYPE_ANY:
+ case GNUNET_BLOCK_TYPE_FS_KBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_SBLOCK:
break;
default:
GNUNET_break (0);
}
-/**
- * Check if the given KBlock is well-formed.
- *
- * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
- * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
- * @param query where to store the query that this block answers
- * @return GNUNET_OK if this is actually a well-formed KBlock
- */
-static int
-check_kblock (const struct KBlock *kb,
- size_t dsize,
- GNUNET_HashCode *query)
-{
- if (dsize < sizeof (struct KBlock))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (dsize - sizeof (struct KBlock) !=
- ntohs (kb->purpose.size)
- - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose)
- - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) )
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (GNUNET_OK !=
- GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
- &kb->purpose,
- &kb->signature,
- &kb->keyspace))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (query != NULL)
- GNUNET_CRYPTO_hash (&kb->keyspace,
- sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
- query);
- return GNUNET_OK;
-}
-
-
-/**
- * Check if the given SBlock is well-formed.
- *
- * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
- * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
- * @param query where to store the query that this block answers
- * @param namespace where to store the namespace that this block belongs to
- * @return GNUNET_OK if this is actually a well-formed SBlock
- */
-static int
-check_sblock (const struct SBlock *sb,
- size_t dsize,
- GNUNET_HashCode *query,
- GNUNET_HashCode *namespace)
-{
- if (dsize < sizeof (struct SBlock))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (dsize !=
- ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (GNUNET_OK !=
- GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
- &sb->purpose,
- &sb->signature,
- &sb->subspace))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (query != NULL)
- *query = sb->identifier;
- if (namespace != NULL)
- GNUNET_CRYPTO_hash (&sb->subspace,
- sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
- namespace);
- return GNUNET_OK;
-}
-
-
/**
* Transmit the given message by copying it to the target buffer
* "buf". "buf" will be NULL and "size" zero if the socket was closed
*/
const void *data;
- // FIXME: add 'struct ConnectedPeer' to track 'last_xxx_replies' here!
+ /**
+ * Who gave us this reply? NULL for local host.
+ */
+ struct ConnectedPeer *sender;
/**
* When the reply expires.
*/
size_t size;
- /**
- * Namespace that this reply belongs to
- * (if it is of type SBLOCK).
- */
- GNUNET_HashCode namespace;
-
/**
* Type of the block.
*/
- uint32_t type;
+ enum GNUNET_BLOCK_Type type;
/**
* How much was this reply worth to us?
*/
uint32_t priority;
+
+ /**
+ * Evaluation result (returned).
+ */
+ enum GNUNET_BLOCK_EvaluationResult eval;
+
+ /**
+ * Did we finish processing the associated request?
+ */
+ int finished;
+
+ /**
+ * Did we find a matching request?
+ */
+ int request_found;
};
struct ClientList *cl;
struct PutMessage *pm;
struct ConnectedPeer *cp;
- GNUNET_HashCode chash;
- GNUNET_HashCode mhash;
+ struct GNUNET_TIME_Relative cur_delay;
size_t msize;
- uint32_t prio;
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Matched result for query `%s' with pending request\n",
+ "Matched result (type %u) for query `%s' with pending request\n",
+ (unsigned int) prq->type,
GNUNET_h2s (key));
#endif
- GNUNET_CRYPTO_hash (prq->data,
- prq->size,
- &chash);
- switch (prq->type)
- {
- case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
- case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
- /* only possible reply, stop requesting! */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# replies received and matched"),
+ 1,
+ GNUNET_NO);
+ if (prq->sender != NULL)
+ {
+ /* FIXME: should we be more precise here and not use
+ "start_time" but a peer-specific time stamp? */
+ cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
+ prq->sender->avg_delay.value
+ = (prq->sender->avg_delay.value *
+ (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N;
+ prq->sender->avg_priority
+ = (prq->sender->avg_priority *
+ (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+ if (pr->cp != NULL)
+ {
+ GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
+ [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
+ -1);
+ GNUNET_PEER_change_rc (pr->cp->pid, 1);
+ prq->sender->last_p2p_replies
+ [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
+ = pr->cp->pid;
+ }
+ else
+ {
+ if (NULL != prq->sender->last_client_replies
+ [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
+ GNUNET_SERVER_client_drop (prq->sender->last_client_replies
+ [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
+ prq->sender->last_client_replies
+ [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
+ = pr->client_request_list->client_list->client;
+ GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
+ }
+ }
+ prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
+ prq->type,
+ key,
+ &pr->bf,
+ pr->mingle,
+ pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
+ prq->data,
+ prq->size);
+ switch (prq->eval)
+ {
+ case GNUNET_BLOCK_EVALUATION_OK_MORE:
+ break;
+ case GNUNET_BLOCK_EVALUATION_OK_LAST:
while (NULL != pr->pending_head)
destroy_pending_message_list_entry (pr->pending_head);
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (query_request_map,
- key,
- pr));
- break;
- case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
- if (0 != memcmp (pr->namespace,
- &prq->namespace,
- sizeof (GNUNET_HashCode)))
- return GNUNET_YES; /* wrong namespace */
- /* then: fall-through! */
- case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
- if (pr->bf != NULL)
+ if (pr->qe != NULL)
{
- mingle_hash (&chash, pr->mingle, &mhash);
- if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
- &mhash))
- return GNUNET_YES; /* duplicate */
- GNUNET_CONTAINER_bloomfilter_add (pr->bf,
- &mhash);
+ if (pr->client_request_list != NULL)
+ GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
+ GNUNET_YES);
+ GNUNET_DATASTORE_cancel (pr->qe);
+ pr->qe = NULL;
}
- if (pr->client_request_list != NULL)
+ pr->do_remove = GNUNET_YES;
+ if (pr->task != GNUNET_SCHEDULER_NO_TASK)
{
- if (pr->replies_seen_size == pr->replies_seen_off)
- {
- GNUNET_array_grow (pr->replies_seen,
- pr->replies_seen_size,
- pr->replies_seen_size * 2 + 4);
- // FIXME: recalculate BF!
- }
- pr->replies_seen[pr->replies_seen_off++] = chash;
+ GNUNET_SCHEDULER_cancel (sched,
+ pr->task);
+ pr->task = GNUNET_SCHEDULER_NO_TASK;
}
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (query_request_map,
+ key,
+ pr));
break;
- case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
- // FIXME: any checks against duplicates for SKBlocks?
- break;
- default:
+ case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# duplicate replies discarded (bloomfilter)"),
+ 1,
+ GNUNET_NO);
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Duplicate response `%s', discarding.\n",
+ GNUNET_h2s (&mhash));
+#endif
+ return GNUNET_YES; /* duplicate */
+ case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
+ return GNUNET_YES; /* wrong namespace */
+ case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
GNUNET_break (0);
return GNUNET_YES;
+ case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
+ GNUNET_break (0);
+ return GNUNET_YES;
+ case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Unsupported block type %u\n"),
+ prq->type);
+ return GNUNET_NO;
}
- prio = pr->priority;
- prq->priority += pr->remaining_priority;
- pr->remaining_priority = 0;
if (pr->client_request_list != NULL)
{
+ if (pr->replies_seen_size == pr->replies_seen_off)
+ GNUNET_array_grow (pr->replies_seen,
+ pr->replies_seen_size,
+ pr->replies_seen_size * 2 + 4);
+ GNUNET_CRYPTO_hash (prq->data,
+ prq->size,
+ &pr->replies_seen[pr->replies_seen_off++]);
+ refresh_bloomfilter (pr);
+ }
+ if (NULL == prq->sender)
+ {
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting result for query `%s' to local client\n",
+ "Found result for query `%s' in local datastore\n",
GNUNET_h2s (key));
-#endif
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# results found locally"),
+ 1,
+ GNUNET_NO);
+ }
+ prq->priority += pr->remaining_priority;
+ pr->remaining_priority = 0;
+ pr->results_found++;
+ prq->request_found = GNUNET_YES;
+ if (NULL != pr->client_request_list)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# replies received for local clients"),
+ 1,
+ GNUNET_NO);
cl = pr->client_request_list->client_list;
msize = sizeof (struct PutMessage) + prq->size;
creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
memcpy (&pm[1], prq->data, prq->size);
if (NULL == cl->th)
- cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_to_client,
- cl);
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitting result for query `%s' to client\n",
+ GNUNET_h2s (key));
+#endif
+ cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client,
+ cl);
+ }
GNUNET_break (cl->th != NULL);
+ if (pr->do_remove)
+ {
+ prq->finished = GNUNET_YES;
+ destroy_pending_request (pr);
+ }
}
else
{
- cp = pr->pht_entry->cp;
+ cp = pr->cp;
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmitting result for query `%s' to other peer (PID=%u)\n",
GNUNET_h2s (key),
(unsigned int) cp->pid);
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# replies received for other peers"),
+ 1,
+ GNUNET_NO);
msize = sizeof (struct PutMessage) + prq->size;
reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
reply->cont = &transmit_reply_continuation;
reply->cont_cls = pr;
reply->msize = msize;
- reply->priority = (uint32_t) -1; /* send replies first! */
+ reply->priority = UINT32_MAX; /* send replies first! */
pm = (struct PutMessage*) &reply[1];
pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
pm->header.size = htons (msize);
memcpy (&pm[1], prq->data, prq->size);
add_to_pending_messages_for_peer (cp, reply, pr);
}
+ return GNUNET_YES;
+}
- // FIXME: implement hot-path routing statistics keeping!
- return GNUNET_YES;
+/**
+ * Continuation called to notify client about result of the
+ * operation.
+ *
+ * @param cls closure
+ * @param success GNUNET_SYSERR on failure
+ * @param msg NULL on success, otherwise an error message
+ */
+static void
+put_migration_continuation (void *cls,
+ int success,
+ const char *msg)
+{
+ struct GNUNET_TIME_Absolute *start = cls;
+ struct GNUNET_TIME_Relative delay;
+
+ delay = GNUNET_TIME_absolute_get_duration (*start);
+ GNUNET_free (start);
+ GNUNET_LOAD_update (datastore_put_load,
+ delay.value);
+ if (GNUNET_OK == success)
+ return;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# datastore 'put' failures"),
+ 1,
+ GNUNET_NO);
}
const struct PutMessage *put;
uint16_t msize;
size_t dsize;
- uint32_t type;
+ enum GNUNET_BLOCK_Type type;
struct GNUNET_TIME_Absolute expiration;
GNUNET_HashCode query;
struct ProcessReplyClosure prq;
+ struct GNUNET_TIME_Absolute *start;
+ struct GNUNET_TIME_Relative block_time;
+ double putl;
+ struct ConnectedPeer *cp;
+ struct PendingMessage *pm;
+ struct MigrationStopMessage *msm;
msize = ntohs (message->size);
if (msize < sizeof (struct PutMessage))
type = ntohl (put->type);
expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
- /* first, validate! */
- switch (type)
+ if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+ return GNUNET_SYSERR;
+ if (GNUNET_OK !=
+ GNUNET_BLOCK_get_key (block_ctx,
+ type,
+ &put[1],
+ dsize,
+ &query))
{
- case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
- case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
- GNUNET_CRYPTO_hash (&put[1], dsize, &query);
- break;
- case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
- if (GNUNET_OK !=
- check_kblock ((const struct KBlock*) &put[1],
- dsize,
- &query))
- return GNUNET_SYSERR;
- break;
- case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
- if (GNUNET_OK !=
- check_sblock ((const struct SBlock*) &put[1],
- dsize,
- &query,
- &prq.namespace))
- return GNUNET_SYSERR;
- break;
- case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
- // FIXME -- validate SKBLOCK!
- GNUNET_break (0);
- return GNUNET_OK;
- default:
- /* unknown block type */
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
-
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received result for query `%s' from peer `%4s'\n",
GNUNET_h2s (&query),
GNUNET_i2s (other));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# replies received (overall)"),
+ 1,
+ GNUNET_NO);
/* now, lookup 'query' */
prq.data = (const void*) &put[1];
+ if (other != NULL)
+ prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &other->hashPubKey);
+ else
+ prq.sender = NULL;
prq.size = dsize;
prq.type = type;
prq.expiration = expiration;
prq.priority = 0;
+ prq.finished = GNUNET_NO;
+ prq.request_found = GNUNET_NO;
GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
&query,
&process_reply,
&prq);
- // FIXME: if migration is on and load is low,
- // queue to store data in datastore;
- // use "prq.priority" for that!
+ if (prq.sender != NULL)
+ {
+ prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
+ prq.sender->trust += prq.priority;
+ }
+ if (GNUNET_YES == active_migration)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Replicating result for query `%s' with priority %u\n",
+ GNUNET_h2s (&query),
+ prq.priority);
+#endif
+ start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
+ *start = GNUNET_TIME_absolute_get ();
+ GNUNET_DATASTORE_put (dsh,
+ 0, &query, dsize, &put[1],
+ type, prq.priority, 1 /* anonymity */,
+ expiration,
+ 1 + prq.priority, MAX_DATASTORE_QUEUE,
+ GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+ &put_migration_continuation,
+ start);
+ }
+ putl = GNUNET_LOAD_get_load (datastore_put_load);
+ if ( (GNUNET_NO == prq.request_found) &&
+ ( (GNUNET_YES != active_migration) ||
+ (putl > 2.0) ) )
+ {
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &other->hashPubKey);
+ if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
+ return GNUNET_OK; /* already blocked */
+ /* We're too busy; send MigrationStop message! */
+ if (GNUNET_YES != active_migration)
+ putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
+ block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ (unsigned int) (60000 * putl * putl)));
+
+ cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) +
+ sizeof (struct MigrationStopMessage));
+ pm->msize = sizeof (struct MigrationStopMessage);
+ pm->priority = UINT32_MAX;
+ msm = (struct MigrationStopMessage*) &pm[1];
+ msm->header.size = htons (sizeof (struct MigrationStopMessage));
+ msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+ msm->duration = GNUNET_TIME_relative_hton (block_time);
+ add_to_pending_messages_for_peer (cp,
+ pm,
+ NULL);
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle P2P "MIGRATION_STOP" message.
+ *
+ * @param cls closure, always NULL
+ * @param other the other peer involved (sender or receiver, NULL
+ * for loopback messages where we are both sender and receiver)
+ * @param message the actual message
+ * @param latency reported latency of the connection with 'other'
+ * @param distance reported distance (DV) to 'other'
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_p2p_migration_stop (void *cls,
+ const struct GNUNET_PeerIdentity *other,
+ const struct GNUNET_MessageHeader *message,
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance)
+{
+ struct ConnectedPeer *cp;
+ const struct MigrationStopMessage *msm;
+
+ msm = (const struct MigrationStopMessage*) message;
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &other->hashPubKey);
+ if (cp == NULL)
+ {
+ GNUNET_break (0);
+ return GNUNET_OK;
+ }
+ cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
return GNUNET_OK;
}
+
/* **************************** P2P GET Handling ************************ */
+/**
+ * Closure for 'check_duplicate_request_{peer,client}'.
+ */
+struct CheckDuplicateRequestClosure
+{
+ /**
+ * The new request we should check if it already exists.
+ */
+ const struct PendingRequest *pr;
+
+ /**
+ * Existing request found by the checker, NULL if none.
+ */
+ struct PendingRequest *have;
+};
+
+
+/**
+ * Iterator over entries in the 'query_request_map' that
+ * tries to see if we have the same request pending from
+ * the same client already.
+ *
+ * @param cls closure (our 'struct CheckDuplicateRequestClosure')
+ * @param key current key code (query, ignored, must match)
+ * @param value value in the hash map (a 'struct PendingRequest'
+ * that already exists)
+ * @return GNUNET_YES if we should continue to
+ * iterate (no match yet)
+ * GNUNET_NO if not (match found).
+ */
+static int
+check_duplicate_request_client (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct CheckDuplicateRequestClosure *cdc = cls;
+ struct PendingRequest *have = value;
+
+ if (have->client_request_list == NULL)
+ return GNUNET_YES;
+ if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
+ (cdc->pr != have) )
+ {
+ cdc->have = have;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+
/**
* We're processing (local) results for a search request
* from another peer. Pass applicable results to the
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
struct GNUNET_TIME_Absolute
{
struct PendingRequest *pr = cls;
struct ProcessReplyClosure prq;
- GNUNET_HashCode dhash;
- GNUNET_HashCode mhash;
+ struct CheckDuplicateRequestClosure cdrc;
GNUNET_HashCode query;
+ unsigned int old_rf;
- pr->drq = NULL;
if (NULL == key)
{
+#if DEBUG_FS > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Done processing local replies, forwarding request to other peers.\n");
+#endif
+ pr->qe = NULL;
+ if (pr->client_request_list != NULL)
+ {
+ GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
+ GNUNET_YES);
+ /* Figure out if this is a duplicate request and possibly
+ merge 'struct PendingRequest' entries */
+ cdrc.have = NULL;
+ cdrc.pr = pr;
+ GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+ &pr->query,
+ &check_duplicate_request_client,
+ &cdrc);
+ if (cdrc.have != NULL)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received request for block `%s' twice from client, will only request once.\n",
+ GNUNET_h2s (&pr->query));
+#endif
+
+ destroy_pending_request (pr);
+ return;
+ }
+ }
+
/* no more results */
if (pr->task == GNUNET_SCHEDULER_NO_TASK)
pr->task = GNUNET_SCHEDULER_add_now (sched,
pr);
return;
}
- if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "New local response to `%s' of type %u.\n",
+ GNUNET_h2s (key),
+ type);
+#endif
+ if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
{
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Found ONDEMAND block, performing on-demand encoding\n");
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# on-demand blocks matched requests"),
+ 1,
+ GNUNET_NO);
if (GNUNET_OK !=
GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
anonymity, expiration, uid,
&process_local_reply,
pr))
- GNUNET_FS_drq_get_next (GNUNET_YES);
- return;
- }
- /* check for duplicates */
- GNUNET_CRYPTO_hash (data, size, &dhash);
- mingle_hash (&dhash,
- pr->mingle,
- &mhash);
- if ( (pr->bf != NULL) &&
- (GNUNET_YES ==
- GNUNET_CONTAINER_bloomfilter_test (pr->bf,
- &mhash)) )
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Result from datastore filtered by bloomfilter (duplicate).\n");
-#endif
- GNUNET_FS_drq_get_next (GNUNET_YES);
- return;
- }
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found result for query `%s' in local datastore\n",
- GNUNET_h2s (key));
-#endif
- pr->results_found++;
- if ( (pr->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
- (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
- (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
- {
- if (pr->bf == NULL)
+ if (pr->qe != NULL)
{
- pr->bf_size = 32;
- pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
- pr->bf_size,
- BLOOMFILTER_K);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
}
- GNUNET_CONTAINER_bloomfilter_add (pr->bf,
- &mhash);
+ return;
}
+ old_rf = pr->results_found;
memset (&prq, 0, sizeof (prq));
prq.data = data;
prq.expiration = expiration;
prq.size = size;
- if ( (type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) &&
- (GNUNET_OK != check_sblock ((const struct SBlock*) data,
- size,
- &query,
- &prq.namespace)) )
+ if (GNUNET_OK !=
+ GNUNET_BLOCK_get_key (block_ctx,
+ type,
+ data,
+ size,
+ &query))
{
GNUNET_break (0);
- /* FIXME: consider removing the block? */
- GNUNET_FS_drq_get_next (GNUNET_YES);
+ GNUNET_DATASTORE_remove (dsh,
+ key,
+ size, data,
+ -1, -1,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ NULL, NULL);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
return;
}
prq.type = type;
prq.priority = priority;
+ prq.finished = GNUNET_NO;
+ prq.request_found = GNUNET_NO;
process_reply (&prq, key, pr);
-
- if ( (GNUNET_YES == test_load_too_high()) ||
- (pr->results_found > 5 + 2 * pr->priority) )
+ if ( (old_rf == 0) &&
+ (pr->results_found == 1) )
+ update_datastore_delays (pr->start_time);
+ if (prq.finished == GNUNET_YES)
+ return;
+ if (pr->qe == NULL)
+ return; /* done here */
+ if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
{
- GNUNET_FS_drq_get_next (GNUNET_NO);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
return;
}
- GNUNET_FS_drq_get_next (GNUNET_YES);
-}
-
-
-/**
- * The priority level imposes a bound on the maximum
- * value for the ttl that can be requested.
- *
- * @param ttl_in requested ttl
- * @param prio given priority
- * @return ttl_in if ttl_in is below the limit,
- * otherwise the ttl-limit for the given priority
- */
-static int32_t
-bound_ttl (int32_t ttl_in, uint32_t prio)
-{
- unsigned long long allowed;
-
- if (ttl_in <= 0)
- return ttl_in;
- allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
- if (ttl_in > allowed)
+ if ( (pr->client_request_list == NULL) &&
+ ( (GNUNET_YES == test_load_too_high()) ||
+ (pr->results_found > 5 + 2 * pr->priority) ) )
{
- if (allowed >= (1 << 30))
- return 1 << 30;
- return allowed;
+#if DEBUG_FS > 2
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Load too high, done with request\n");
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# processing result set cut short due to load"),
+ 1,
+ GNUNET_NO);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+ return;
}
- return ttl_in;
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
}
bound_priority (uint32_t prio_in,
struct ConnectedPeer *cp)
{
- return 0; // FIXME!
+#define N ((double)128.0)
+ uint32_t ret;
+ double rret;
+ int ld;
+
+ ld = test_load_too_high ();
+ if (ld == GNUNET_SYSERR)
+ return 0; /* excess resources */
+ ret = change_host_trust (cp, prio_in);
+ if (ret > 0)
+ {
+ if (ret > current_priorities + N)
+ rret = current_priorities + N;
+ else
+ rret = ret;
+ current_priorities
+ = (current_priorities * (N-1) + rret)/N;
+ }
+#undef N
+ return ret;
+}
+
+
+/**
+ * Iterator over entries in the 'query_request_map' that
+ * tries to see if we have the same request pending from
+ * the same peer already.
+ *
+ * @param cls closure (our 'struct CheckDuplicateRequestClosure')
+ * @param key current key code (query, ignored, must match)
+ * @param value value in the hash map (a 'struct PendingRequest'
+ * that already exists)
+ * @return GNUNET_YES if we should continue to
+ * iterate (no match yet)
+ * GNUNET_NO if not (match found).
+ */
+static int
+check_duplicate_request_peer (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct CheckDuplicateRequestClosure *cdc = cls;
+ struct PendingRequest *have = value;
+
+ if (cdc->pr->target_pid == have->target_pid)
+ {
+ cdc->have = have;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
}
uint32_t distance)
{
struct PendingRequest *pr;
- struct PeerRequestEntry *pre;
struct ConnectedPeer *cp;
struct ConnectedPeer *cps;
+ struct CheckDuplicateRequestClosure cdc;
struct GNUNET_TIME_Relative timeout;
uint16_t msize;
const struct GetMessage *gm;
uint32_t bm;
size_t bfsize;
uint32_t ttl_decrement;
- uint32_t type;
- double preference;
+ enum GNUNET_BLOCK_Type type;
+ int have_ns;
+ int ld;
msize = ntohs(message->size);
if (msize < sizeof (struct GetMessage))
return GNUNET_SYSERR;
}
gm = (const struct GetMessage*) message;
+ type = ntohl (gm->type);
bm = ntohl (gm->hash_bitmap);
bits = 0;
while (bm > 0)
}
opt = (const GNUNET_HashCode*) &gm[1];
bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
-
bm = ntohl (gm->hash_bitmap);
- if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
- (ntohl (gm->type) == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) )
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
bits = 0;
cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
&other->hashPubKey);
- GNUNET_assert (NULL != cps);
+ if (NULL == cps)
+ {
+ /* peer must have just disconnected */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped due to initiator not being connected"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_SYSERR;
+ }
if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
&opt[bits++]);
cp = cps;
if (cp == NULL)
{
- /* FIXME: try connect? */
+#if DEBUG_FS
+ if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
+ GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
+
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to find peer `%4s' in connection set. Dropping query.\n",
+ GNUNET_i2s (other));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped due to missing reverse route"),
+ 1,
+ GNUNET_NO);
+ /* FIXME: try connect? */
return GNUNET_OK;
}
/* note that we can really only check load here since otherwise
peers could find out that we are overloaded by not being
disconnected after sending us a malformed query... */
- if (GNUNET_YES == test_load_too_high ())
+
+ /* FIXME: query priority should play
+ a major role here! */
+ ld = test_load_too_high ();
+ if (GNUNET_YES == ld)
{
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Dropping query from `%s', this peer is too busy.\n",
GNUNET_i2s (other));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped due to high load"),
+ 1,
+ GNUNET_NO);
return GNUNET_OK;
}
+ /* FIXME: if ld == GNUNET_NO, forward
+ instead of indirecting! */
-#if DEBUG_FS
+#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received request for `%s' of type %u from peer `%4s'\n",
+ "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
GNUNET_h2s (&gm->query),
- (unsigned int) ntohl (gm->type),
- GNUNET_i2s (other));
+ (unsigned int) type,
+ GNUNET_i2s (other),
+ (unsigned int) bm);
#endif
+ have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
pr = GNUNET_malloc (sizeof (struct PendingRequest) +
- (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)?sizeof(GNUNET_HashCode):0);
- if ((bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
- pr->namespace = (GNUNET_HashCode*) &pr[1];
- pr->type = ntohl (gm->type);
- pr->mingle = gm->filter_mutator;
- if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
- memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
- else if (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
+ (have_ns ? sizeof(GNUNET_HashCode) : 0));
+ if (have_ns)
{
- GNUNET_break_op (0);
- GNUNET_free (pr);
- return GNUNET_SYSERR;
+ pr->namespace = (GNUNET_HashCode*) &pr[1];
+ memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
}
+ pr->type = type;
+ pr->mingle = ntohl (gm->filter_mutator);
if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
-
pr->anonymity_level = 1;
pr->priority = bound_priority (ntohl (gm->priority), cps);
pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
TTL_DECREMENT);
if ( (pr->ttl < 0) &&
- (pr->ttl - ttl_decrement > 0) )
+ (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
{
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Dropping query from `%s' due to TTL underflow.\n",
- GNUNET_i2s (other));
+ "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
+ GNUNET_i2s (other),
+ pr->ttl,
+ ttl_decrement);
#endif
- /* integer underflow => drop (should be very rare)! */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped due TTL underflow"),
+ 1,
+ GNUNET_NO);
+ /* integer underflow => drop (should be very rare)! */
GNUNET_free (pr);
return GNUNET_OK;
}
pr->bf_size = bfsize;
}
- /* FIXME: check somewhere if request already exists, and if so,
- recycle old state... */
- pre = GNUNET_malloc (sizeof (struct PeerRequestEntry));
- pre->cp = cp;
- pre->req = pr;
- GNUNET_CONTAINER_multihashmap_put (query_request_map,
- &gm->query,
- pr,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ cdc.have = NULL;
+ cdc.pr = pr;
+ GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+ &gm->query,
+ &check_duplicate_request_peer,
+ &cdc);
+ if (cdc.have != NULL)
+ {
+ if (cdc.have->start_time.value + cdc.have->ttl >=
+ pr->start_time.value + pr->ttl)
+ {
+ /* existing request has higher TTL, drop new one! */
+ cdc.have->priority += pr->priority;
+ destroy_pending_request (pr);
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Have existing request with higher TTL, dropping new request.\n",
+ GNUNET_i2s (other));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped due to higher-TTL request"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_OK;
+ }
+ else
+ {
+ /* existing request has lower TTL, drop old one! */
+ pr->priority += cdc.have->priority;
+ /* Possible optimization: if we have applicable pending
+ replies in 'cdc.have', we might want to move those over
+ (this is a really rare special-case, so it is not clear
+ that this would be worth it) */
+ destroy_pending_request (cdc.have);
+ /* keep processing 'pr'! */
+ }
+ }
+
+ pr->cp = cp;
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (query_request_map,
+ &gm->query,
+ pr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (peer_request_map,
+ &other->hashPubKey,
+ pr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
pr,
- GNUNET_TIME_absolute_get().value + pr->ttl);
+ pr->start_time.value + pr->ttl);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# P2P searches received"),
+ 1,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# P2P searches active"),
+ 1,
+ GNUNET_NO);
/* calculate change in traffic preference */
- preference = (double) pr->priority;
- if (preference < QUERY_BANDWIDTH_VALUE)
- preference = QUERY_BANDWIDTH_VALUE;
- cps->inc_preference += preference;
-
+ cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
/* process locally */
- type = pr->type;
- if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
- type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
+ if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
+ type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
(pr->priority + 1));
- pr->drq = GNUNET_FS_drq_get (&gm->query,
- pr->type,
- &process_local_reply,
- pr,
- timeout,
- GNUNET_NO);
+ pr->qe = GNUNET_DATASTORE_get (dsh,
+ &gm->query,
+ type,
+ pr->priority + 1,
+ MAX_DATASTORE_QUEUE,
+ timeout,
+ &process_local_reply,
+ pr);
/* Are multiple results possible? If so, start processing remotely now! */
switch (pr->type)
{
- case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
- case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_IBLOCK:
/* only one result, wait for datastore */
break;
default:
- pr->task = GNUNET_SCHEDULER_add_now (sched,
- &forward_request_task,
- pr);
+ if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+ pr->task = GNUNET_SCHEDULER_add_now (sched,
+ &forward_request_task,
+ pr);
}
/* make sure we don't track too many requests */
if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
{
pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
+ GNUNET_assert (pr != NULL);
destroy_pending_request (pr);
}
return GNUNET_OK;
struct PendingRequest *pr;
uint16_t msize;
unsigned int sc;
- uint32_t type;
-
+ enum GNUNET_BLOCK_Type type;
+
msize = ntohs (message->size);
if ( (msize < sizeof (struct SearchMessage)) ||
(0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
GNUNET_SYSERR);
return;
}
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# client searches received"),
+ 1,
+ GNUNET_NO);
sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
sm = (const struct SearchMessage*) message;
-
+ type = ntohl (sm->type);
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received request for `%s' of type %u from local client\n",
+ GNUNET_h2s (&sm->query),
+ (unsigned int) type);
+#endif
cl = client_list;
while ( (cl != NULL) &&
(cl->client != client) )
cl->next = client_list;
client_list = cl;
}
- type = ntohl (sm->type);
+ /* detect duplicate KBLOCK requests */
+ if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
+ (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
+ (type == GNUNET_BLOCK_TYPE_ANY) )
+ {
+ crl = cl->rl_head;
+ while ( (crl != NULL) &&
+ ( (0 != memcmp (&crl->req->query,
+ &sm->query,
+ sizeof (GNUNET_HashCode))) ||
+ (crl->req->type != type) ) )
+ crl = crl->next;
+ if (crl != NULL)
+ {
#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received request for `%s' of type %u from local client\n",
- GNUNET_h2s (&sm->query),
- (unsigned int) type);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Have existing request, merging content-seen lists.\n");
#endif
- /* FIXME: detect duplicate request; if duplicate, simply update (merge)
- 'pr->replies_seen'! */
+ pr = crl->req;
+ /* Duplicate request (used to send long list of
+ known/blocked results); merge 'pr->replies_seen'
+ and update bloom filter */
+ GNUNET_array_grow (pr->replies_seen,
+ pr->replies_seen_size,
+ pr->replies_seen_off + sc);
+ memcpy (&pr->replies_seen[pr->replies_seen_off],
+ &sm[1],
+ sc * sizeof (GNUNET_HashCode));
+ pr->replies_seen_off += sc;
+ refresh_bloomfilter (pr);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# client searches updated (merged content seen list)"),
+ 1,
+ GNUNET_NO);
+ GNUNET_SERVER_receive_done (client,
+ GNUNET_OK);
+ return;
+ }
+ }
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# client searches active"),
+ 1,
+ GNUNET_NO);
pr = GNUNET_malloc (sizeof (struct PendingRequest) +
- ((type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)?sizeof(GNUNET_HashCode):0));
+ ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
crl = GNUNET_malloc (sizeof (struct ClientRequestList));
memset (crl, 0, sizeof (struct ClientRequestList));
crl->client_list = cl;
&sm[1],
sc * sizeof (GNUNET_HashCode));
pr->replies_seen_off = sc;
- pr->anonymity_level = ntohl (sm->anonymity_level);
- pr->mingle = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
- (uint32_t) -1);
+ pr->anonymity_level = ntohl (sm->anonymity_level);
+ pr->start_time = GNUNET_TIME_absolute_get ();
+ refresh_bloomfilter (pr);
pr->query = sm->query;
+ if (0 == (1 & ntohl (sm->options)))
+ pr->local_only = GNUNET_NO;
+ else
+ pr->local_only = GNUNET_YES;
switch (type)
{
- case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
- case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_IBLOCK:
if (0 != memcmp (&sm->target,
&all_zeros,
sizeof (GNUNET_HashCode)))
pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
break;
- case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
+ case GNUNET_BLOCK_TYPE_FS_SBLOCK:
pr->namespace = (GNUNET_HashCode*) &pr[1];
memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
break;
default:
break;
}
- GNUNET_CONTAINER_multihashmap_put (query_request_map,
- &sm->query,
- pr,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- pr->drq = GNUNET_FS_drq_get (&sm->query,
- pr->type,
- &process_local_reply,
- pr,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES);
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (query_request_map,
+ &sm->query,
+ pr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+ if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
+ type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
+ pr->qe = GNUNET_DATASTORE_get (dsh,
+ &sm->query,
+ type,
+ -3, -1,
+ GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+ &process_local_reply,
+ pr);
}
/* **************************** Startup ************************ */
-
-/**
- * List of handlers for P2P messages
- * that we care about.
- */
-static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
- {
- { &handle_p2p_get,
- GNUNET_MESSAGE_TYPE_FS_GET, 0 },
- { &handle_p2p_put,
- GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
- { NULL, 0, 0 }
- };
-
-
-/**
- * List of handlers for the messages understood by this
- * service.
- */
-static struct GNUNET_SERVER_MessageHandler handlers[] = {
- {&GNUNET_FS_handle_index_start, NULL,
- GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
- {&GNUNET_FS_handle_index_list_get, NULL,
- GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
- {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
- sizeof (struct UnindexMessage) },
- {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
- 0 },
- {NULL, NULL, 0, 0}
-};
-
-
/**
* Process fs requests.
*
- * @param cls closure
* @param s scheduler to use
* @param server the initialized server
* @param c configuration to use
struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *c)
{
+ static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
+ {
+ { &handle_p2p_get,
+ GNUNET_MESSAGE_TYPE_FS_GET, 0 },
+ { &handle_p2p_put,
+ GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
+ { &handle_p2p_migration_stop,
+ GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
+ sizeof (struct MigrationStopMessage) },
+ { NULL, 0, 0 }
+ };
+ static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+ {&GNUNET_FS_handle_index_start, NULL,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
+ {&GNUNET_FS_handle_index_list_get, NULL,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
+ {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
+ sizeof (struct UnindexMessage) },
+ {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
+ 0 },
+ {NULL, NULL, 0, 0}
+ };
+ unsigned long long enc = 128;
+
sched = s;
cfg = c;
- connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
- query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
- peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+ stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
+ min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
+ if ( (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_number (cfg,
+ "fs",
+ "MAX_PENDING_REQUESTS",
+ &max_pending_requests)) ||
+ (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_number (cfg,
+ "fs",
+ "EXPECTED_NEIGHBOUR_COUNT",
+ &enc)) ||
+ (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_time (cfg,
+ "fs",
+ "MIN_MIGRATION_DELAY",
+ &min_migration_delay)) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Configuration fails to specify certain parameters, assuming default values."));
+ }
+ connected_peers = GNUNET_CONTAINER_multihashmap_create (enc);
+ query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
+ peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
core = GNUNET_CORE_connect (sched,
cfg,
GNUNET_TIME_UNIT_FOREVER_REL,
NULL,
NULL,
- NULL,
&peer_connect_handler,
&peer_disconnect_handler,
+ NULL,
NULL, GNUNET_NO,
NULL, GNUNET_NO,
p2p_handlers);
requests_by_expiration_heap = NULL;
GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
peer_request_map = NULL;
-
+ if (dsh != NULL)
+ {
+ GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
+ dsh = NULL;
+ }
return GNUNET_SYSERR;
- }
+ }
+ /* FIXME: distinguish between sending and storing in options? */
+ if (active_migration)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Content migration is enabled, will start to gather data\n"));
+ consider_migration_gathering ();
+ }
GNUNET_SERVER_disconnect_notify (server,
&handle_client_disconnect,
NULL);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONFIGURATION_get_value_filename (cfg,
+ "fs",
+ "TRUST",
+ &trustDirectory));
+ GNUNET_DISK_directory_create (trustDirectory);
+ GNUNET_SCHEDULER_add_with_priority (sched,
+ GNUNET_SCHEDULER_PRIORITY_HIGH,
+ &cron_flush_trust, NULL);
+
+
GNUNET_SERVER_add_handlers (server, handlers);
GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_TIME_UNIT_FOREVER_REL,
struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *cfg)
{
- if ( (GNUNET_OK != GNUNET_FS_drq_init (sched, cfg)) ||
- (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg)) ||
+ active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
+ "FS",
+ "ACTIVEMIGRATION");
+ dsh = GNUNET_DATASTORE_connect (cfg,
+ sched);
+ if (dsh == NULL)
+ {
+ GNUNET_SCHEDULER_shutdown (sched);
+ return;
+ }
+ datastore_get_load = GNUNET_LOAD_value_init ();
+ datastore_put_load = GNUNET_LOAD_value_init ();
+ block_cfg = GNUNET_CONFIGURATION_create ();
+ GNUNET_CONFIGURATION_set_value_string (block_cfg,
+ "block",
+ "PLUGINS",
+ "fs");
+ block_ctx = GNUNET_BLOCK_context_create (block_cfg);
+ GNUNET_assert (NULL != block_ctx);
+ dht_handle = GNUNET_DHT_connect (sched,
+ cfg,
+ FS_DHT_HT_SIZE);
+ if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
(GNUNET_OK != main_init (sched, server, cfg)) )
{
GNUNET_SCHEDULER_shutdown (sched);
+ GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
+ dsh = NULL;
+ GNUNET_DHT_disconnect (dht_handle);
+ dht_handle = NULL;
+ GNUNET_BLOCK_context_destroy (block_ctx);
+ block_ctx = NULL;
+ GNUNET_CONFIGURATION_destroy (block_cfg);
+ block_cfg = NULL;
+ GNUNET_LOAD_value_free (datastore_get_load);
+ datastore_get_load = NULL;
+ GNUNET_LOAD_value_free (datastore_put_load);
+ datastore_put_load = NULL;
return;
}
}