/*
This file is part of GNUnet.
- (C) 2009 Christian Grothoff (and other contributing authors)
+ (C) 2009, 2010 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
/**
* @file fs/gnunet-service-fs.c
- * @brief program that provides the file-sharing service
+ * @brief gnunet anonymity protocol implementation
* @author Christian Grothoff
*
* TODO:
- * - fix gazillion of minor FIXME's
- * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
- * core to transmit to another peer; need to make sure this is bounded overall...
- * - randomly delay processing for improved anonymity (can wait)
- * - content migration (put in local DS) (can wait)
- * - handle some special cases when forwarding replies based on tracked requests (can wait)
- * - tracking of success correlations for hot-path routing (can wait)
- * - various load-based actions (can wait)
- * - validation of KSBLOCKS (can wait)
- * - remove on-demand blocks if they keep failing (can wait)
- * - check that we decrement PIDs always where necessary (can wait)
- * - find out how to use core-pulling instead of pushing (at least for some cases)
+ * - forward_request_task (P2P forwarding!)
+ * - track stats for hot-path routing
+ * - implement hot-path routing decision procedure
+ * - detect duplicate requests (P2P and CS)
+ * - implement: bound_priority, test_load_too_high, validate_skblock
+ * - add content migration support (store locally)
+ * - add random delay
+ * - statistics
+ *
*/
#include "platform.h"
#include <float.h>
+#include "gnunet_constants.h"
#include "gnunet_core_service.h"
#include "gnunet_datastore_service.h"
#include "gnunet_peer_lib.h"
#include "gnunet_protocols.h"
#include "gnunet_signatures.h"
#include "gnunet_util_lib.h"
+#include "gnunet-service-fs_drq.h"
#include "gnunet-service-fs_indexing.h"
#include "fs.h"
-#define DEBUG_FS GNUNET_NO
+/**
+ * Maximum number of outgoing messages we queue per peer.
+ * FIXME: set to a tiny value for testing; make configurable.
+ */
+#define MAX_QUEUE_PER_PEER 2
+
+
+
+/**
+ * Maximum number of requests (from other peers) that we're
+ * willing to have pending at any given point in time.
+ * FIXME: set from configuration (and 32 is a tiny value for testing only).
+ */
+static uint64_t max_pending_requests = 32;
+
+
+/**
+ * Information we keep for each pending reply. The
+ * actual message follows at the end of this struct.
+ */
+struct PendingMessage;
+
/**
- * Signature of a function that is called whenever a datastore
- * request can be processed (or an entry put on the queue times out).
+ * Function called upon completion of a transmission.
*
* @param cls closure
- * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
+ * @param pid ID of receiving peer, 0 on transmission error
*/
-typedef void (*RequestFunction)(void *cls,
- int ok);
+typedef void (*TransmissionContinuation)(void * cls,
+ GNUNET_PEER_Id tpid);
/**
- * Doubly-linked list of our requests for the datastore.
+ * Information we keep for each pending reply. The
+ * actual message follows at the end of this struct.
*/
-struct DatastoreRequestQueue
+struct PendingMessage
{
-
/**
- * This is a doubly-linked list.
+ * This is a doubly-linked list of messages to the same peer.
*/
- struct DatastoreRequestQueue *next;
+ struct PendingMessage *next;
/**
- * This is a doubly-linked list.
+ * This is a doubly-linked list of messages to the same peer.
*/
- struct DatastoreRequestQueue *prev;
+ struct PendingMessage *prev;
+
+ /**
+ * Entry in pending message list for this pending message.
+ */
+ struct PendingMessageList *pml;
/**
- * Function to call (will issue the request).
+ * Function to call immediately once we have transmitted this
+ * message.
*/
- RequestFunction req;
+ TransmissionContinuation cont;
/**
- * Closure for req.
+ * Closure for cont.
*/
- void *req_cls;
+ void *cont_cls;
/**
- * When should this request time-out because we don't care anymore?
+ * Size of the reply; actual reply message follows
+ * at the end of this struct.
*/
- struct GNUNET_TIME_Absolute timeout;
-
+ size_t msize;
+
/**
- * ID of task used for signaling timeout.
+ * How important is this message for us?
*/
- GNUNET_SCHEDULER_TaskIdentifier task;
-
+ uint32_t priority;
+
};
/**
- * Closure for processing START_SEARCH messages from a client.
+ * Information about a peer that we are connected to.
+ * We track data that is useful for determining which
+ * peers should receive our requests. We also keep
+ * a list of messages to transmit to this peer.
*/
-struct LocalGetContext
+struct ConnectedPeer
{
/**
- * This is a doubly-linked list.
+ * List of the last clients for which this peer successfully
+ * answered a query.
*/
- struct LocalGetContext *next;
+ struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
/**
- * This is a doubly-linked list.
+ * List of the last PIDs for which
+ * this peer successfully answered a query;
+ * We use 0 to indicate no successful reply.
*/
- struct LocalGetContext *prev;
+ GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
/**
- * Client that initiated the search.
- */
- struct GNUNET_SERVER_Client *client;
+ * Average delay between sending the peer a request and
+ * getting a reply (only calculated over the requests for
+ * which we actually got a reply). Calculated
+ * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
+ */
+ struct GNUNET_TIME_Relative avg_delay;
/**
- * Array of results that we've already received
- * (can be NULL).
+ * Handle for an active request for transmission to this
+ * peer, or NULL.
*/
- GNUNET_HashCode *results;
+ struct GNUNET_CORE_TransmitHandle *cth;
/**
- * Bloomfilter over all results (for fast query construction);
- * NULL if we don't have any results.
- *
- * FIXME: this member is not used, is that OK? If so, it should
- * be removed!
+ * Messages (replies, queries, content migration) we would like to
+ * send to this peer in the near future. Sorted by priority, head.
*/
- struct GNUNET_CONTAINER_BloomFilter *results_bf;
+ struct PendingMessage *pending_messages_head;
/**
- * DS request associated with this operation.
+ * Messages (replies, queries, content migration) we would like to
+ * send to this peer in the near future. Sorted by priority, tail.
*/
- struct DatastoreRequestQueue *req;
+ struct PendingMessage *pending_messages_tail;
/**
- * Current result message to transmit to client (or NULL).
- */
- struct ContentMessage *result;
-
- /**
- * Type of the content that we're looking for.
- * 0 for any.
+ * Average priority of successful replies. Calculated
+ * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
*/
- uint32_t type;
+ double avg_priority;
/**
- * Desired anonymity level.
+ * Increase in traffic preference still to be submitted
+ * to the core service for this peer. FIXME: double or 'uint64_t'?
*/
- uint32_t anonymity_level;
+ double inc_preference;
/**
- * Number of results actually stored in the results array.
- */
- unsigned int results_used;
-
- /**
- * Size of the results array in memory.
- */
- unsigned int results_size;
-
- /**
- * Size (in bytes) of the 'results_bf' bloomfilter.
- *
- * FIXME: this member is not used, is that OK? If so, it should
- * be removed!
+ * The peer's identity.
*/
- size_t results_bf_size;
+ GNUNET_PEER_Id pid;
/**
- * If the request is for a DBLOCK or IBLOCK, this is the identity of
- * the peer that is known to have a response. Set to all-zeros if
- * such a target is not known (note that even if OUR anonymity
- * level is >0 we may happen to know the responder's identity;
- * nevertheless, we should probably not use it for a DHT-lookup
- * or similar blunt actions in order to avoid exposing ourselves).
+ * Size of the linked list of 'pending_messages'.
*/
- struct GNUNET_PeerIdentity target;
+ unsigned int pending_requests;
/**
- * If the request is for an SBLOCK, this is the identity of the
- * pseudonym to which the SBLOCK belongs.
+ * Which offset in "last_p2p_replies" will be updated next?
+ * (we go round-robin).
*/
- GNUNET_HashCode namespace;
+ unsigned int last_p2p_replies_woff;
/**
- * Hash of the keyword (aka query) for KBLOCKs; Hash of
- * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
- * and hash of the identifier XORed with the target for
- * SBLOCKS (aka query).
+ * Which offset in "last_client_replies" will be updated next?
+ * (we go round-robin).
*/
- GNUNET_HashCode query;
+ unsigned int last_client_replies_woff;
};
/**
- * Possible routing policies for an FS-GET request.
+ * Information we keep for each pending request. We should try to
+ * keep this struct as small as possible since its memory consumption
+ * is key to how many requests we can have pending at once.
*/
-enum RoutingPolicy
- {
- /**
- * Simply drop the request.
- */
- ROUTING_POLICY_NONE = 0,
-
- /**
- * Answer it if we can from local datastore.
- */
- ROUTING_POLICY_ANSWER = 1,
-
- /**
- * Forward the request to other peers (if possible).
- */
- ROUTING_POLICY_FORWARD = 2,
-
- /**
- * Forward to other peers, and ask them to route
- * the response via ourselves.
- */
- ROUTING_POLICY_INDIRECT = 6,
-
- /**
- * Do everything we could possibly do (that would
- * make sense).
- */
- ROUTING_POLICY_ALL = 7
- };
+struct PendingRequest;
/**
- * Internal context we use for our initial processing
- * of a GET request.
+ * Doubly-linked list of requests we are performing
+ * on behalf of the same client.
*/
-struct ProcessGetContext
+struct ClientRequestList
{
+
/**
- * The search query (used for datastore lookup).
+ * This is a doubly-linked list.
*/
- GNUNET_HashCode query;
-
+ struct ClientRequestList *next;
+
/**
- * Which peer we should forward the response to.
+ * This is a doubly-linked list.
*/
- struct GNUNET_PeerIdentity reply_to;
+ struct ClientRequestList *prev;
/**
- * Namespace for the result (only set for SKS requests)
+ * Request this entry represents.
*/
- GNUNET_HashCode namespace;
+ struct PendingRequest *req;
/**
- * Peer that we should forward the query to if possible
- * (since that peer likely has the content).
+ * Client list this request belongs to.
*/
- struct GNUNET_PeerIdentity prime_target;
+ struct ClientList *client_list;
+
+};
+
+/**
+ * Replies to be transmitted to the client. The actual
+ * response message is allocated after this struct.
+ */
+struct ClientResponseMessage
+{
/**
- * When did we receive this request?
+ * This is a doubly-linked list.
*/
- struct GNUNET_TIME_Absolute start_time;
+ struct ClientResponseMessage *next;
/**
- * Our entry in the DRQ (non-NULL while we wait for our
- * turn to interact with the local database).
+ * This is a doubly-linked list.
*/
- struct DatastoreRequestQueue *drq;
+ struct ClientResponseMessage *prev;
/**
- * Filter used to eliminate duplicate results. Can be NULL if we
- * are not yet filtering any results.
+ * Client list entry this response belongs to.
*/
- struct GNUNET_CONTAINER_BloomFilter *bf;
+ struct ClientList *client_list;
/**
- * Bitmap describing which of the optional
- * hash codes / peer identities were given to us.
+ * Number of bytes in the response.
*/
- uint32_t bm;
+ size_t msize;
+};
+
+/**
+ * Linked list of clients we are performing requests
+ * for right now.
+ */
+struct ClientList
+{
/**
- * Desired block type.
+ * This is a linked list.
*/
- uint32_t type;
+ struct ClientList *next;
/**
- * Priority of the request.
+ * ID of a client making a request, NULL if this entry is for a
+ * peer.
*/
- uint32_t priority;
+ struct GNUNET_SERVER_Client *client;
/**
- * Size of the 'bf' (in bytes).
+ * Head of list of requests performed on behalf
+ * of this client right now.
*/
- size_t bf_size;
+ struct ClientRequestList *rl_head;
/**
- * In what ways are we going to process
- * the request?
+ * Tail of list of requests performed on behalf
+ * of this client right now.
*/
- enum RoutingPolicy policy;
+ struct ClientRequestList *rl_tail;
/**
- * Time-to-live for the request (value
- * we use).
+ * Head of linked list of responses.
*/
- int32_t ttl;
+ struct ClientResponseMessage *res_head;
/**
- * Number to mingle hashes for bloom-filter
- * tests with.
+ * Tail of linked list of responses.
*/
- int32_t mingle;
+ struct ClientResponseMessage *res_tail;
/**
- * Number of results that were found so far.
+ * Context for sending replies.
*/
- unsigned int results_found;
+ struct GNUNET_CONNECTION_TransmitHandle *th;
+
};
/**
- * Information we keep for each pending reply. The
- * actual message follows at the end of this struct.
+ * Hash map entry of requests we are performing
+ * on behalf of the same peer.
*/
-struct PendingMessage
+struct PeerRequestEntry
{
- /**
- * This is a linked list.
- */
- struct PendingMessage *next;
/**
- * Size of the reply; actual reply message follows
- * at the end of this struct.
+ * Request this entry represents.
*/
- size_t msize;
-
+ struct PendingRequest *req;
+
/**
- * How important is this message for us?
+ * Entry of peer responsible for this entry.
*/
- uint32_t priority;
+ struct ConnectedPeer *cp;
};
/**
- * All requests from a client are kept in a doubly-linked list.
+ * Doubly-linked list of messages we are performing
+ * due to a pending request.
*/
-struct ClientRequestList;
+struct PendingMessageList
+{
+
+ /**
+ * This is a doubly-linked list of messages on behalf of the same request.
+ */
+ struct PendingMessageList *next;
+
+ /**
+ * This is a doubly-linked list of messages on behalf of the same request.
+ */
+ struct PendingMessageList *prev;
+
+ /**
+ * Message this entry represents.
+ */
+ struct PendingMessage *pm;
+
+ /**
+ * Request this entry belongs to.
+ */
+ struct PendingRequest *req;
+
+ /**
+ * Peer this message is targeted for.
+ */
+ struct ConnectedPeer *target;
+
+};
/**
{
/**
- * ID of a client making a request, NULL if this entry is for a
- * peer.
+ * If this request was made by a client, this is our entry in the
+ * client request list; otherwise NULL.
*/
- struct GNUNET_SERVER_Client *client;
-
+ struct ClientRequestList *client_request_list;
+
/**
- * If this request was made by a client,
- * this is our entry in the client request
- * list; otherwise NULL.
+ * If this request was made by a peer, this is our entry in the
+ * per-peer multi-hash map; otherwise NULL.
*/
- struct ClientRequestList *crl_entry;
+ struct PeerRequestEntry *pht_entry;
/**
* If this is a namespace query, pointer to the hash of the public
- * key of the namespace; otherwise NULL.
+ * key of the namespace; otherwise NULL. Pointer will be to the
+ * end of this struct (so no need to free it).
*/
- GNUNET_HashCode *namespace;
+ const GNUNET_HashCode *namespace;
/**
* Bloomfilter we use to filter out replies that we don't care about
struct GNUNET_CORE_InformationRequestContext *irc;
/**
- * Handle for an active request for transmission to this peer, or
- * NULL. Only used for replies that we are trying to send to a peer
- * that we are not yet connected to.
- */
- struct GNUNET_CORE_TransmitHandle *cth;
-
- /**
- * Replies that we have received but were unable to forward yet
- * (typically non-null only if we have a pending transmission
- * request with the client or the respective peer).
+ * Hash code of all replies that we have seen so far (only valid
+ * if client is not NULL since we only track replies like this for
+ * our own clients).
*/
- struct PendingMessage *replies_pending;
+ GNUNET_HashCode *replies_seen;
/**
- * Pending transmission request for the target client (for processing of
- * 'replies_pending').
+ * Node in the heap representing this entry; NULL
+ * if we have no heap node.
*/
- struct GNUNET_CONNECTION_TransmitHandle *th;
+ struct GNUNET_CONTAINER_HeapNode *hnode;
/**
- * Hash code of all replies that we have seen so far (only valid
- * if client is not NULL since we only track replies like this for
- * our own clients).
+ * Head of list of messages being performed on behalf of this
+ * request.
*/
- GNUNET_HashCode *replies_seen;
+ struct PendingMessageList *pending_head;
/**
- * Node in the heap representing this entry.
+ * Tail of list of messages being performed on behalf of this
+ * request.
*/
- struct GNUNET_CONTAINER_HeapNode *hnode;
+ struct PendingMessageList *pending_tail;
/**
* When did we first see this request (form this peer), or, if our
*/
GNUNET_SCHEDULER_TaskIdentifier task;
- /**
- * (Interned) Peer identifier (only valid if "client" is NULL)
- * that identifies a peer that gave us this request.
- */
- GNUNET_PEER_Id source_pid;
-
/**
* (Interned) Peer identifier that identifies a preferred target
* for requests.
* received our query for this content.
*/
GNUNET_PEER_Id *used_pids;
+
+ /**
+ * Our entry in the DRQ (non-NULL while we wait for our
+ * turn to interact with the local database).
+ */
+ struct DatastoreRequestQueue *drq;
/**
* Size of the 'bf' (in bytes).
*/
unsigned int used_pids_size;
+ /**
+ * Number of results found for this request.
+ */
+ unsigned int results_found;
+
/**
* How many entries in "replies_seen" are actually valid?
*/
/**
- * All requests from a client are kept in a doubly-linked list.
+ * Our scheduler.
*/
-struct ClientRequestList
-{
- /**
- * This is a doubly-linked list.
- */
- struct ClientRequestList *next;
+static struct GNUNET_SCHEDULER_Handle *sched;
- /**
- * This is a doubly-linked list.
- */
- struct ClientRequestList *prev;
+/**
+ * Our configuration.
+ */
+const struct GNUNET_CONFIGURATION_Handle *cfg;
- /**
- * A request from this client.
- */
- struct PendingRequest *req;
+/**
+ * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
- /**
- * Client list with the head and tail of this DLL.
- */
- struct ClientList *cl;
-};
+/**
+ * Map of peer identifiers to "struct PendingRequest" (for that peer).
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
+/**
+ * Map of query identifiers to "struct PendingRequest" (for that query).
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
/**
- * Linked list of all clients that we are currently processing
- * requests for.
+ * Heap with the request that will expire next at the top. Contains
+ * pointers of type "struct PendingRequest*"; these will *also* be
+ * aliased from the "requests_by_peer" data structures and the
+ * "requests_by_query" table. Note that requests from our clients
+ * don't expire and are thus NOT in the "requests_by_expiration"
+ * (or the "requests_by_peer" tables).
*/
-struct ClientList
-{
+static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
- /**
- * This is a linked list.
- */
- struct ClientList *next;
-
- /**
- * What client is this entry for?
- */
- struct GNUNET_SERVER_Client* client;
-
- /**
- * Head of the DLL of requests from this client.
- */
- struct ClientRequestList *head;
-
- /**
- * Tail of the DLL of requests from this client.
- */
- struct ClientRequestList *tail;
-
-};
-
-
-/**
- * Closure for "process_reply" function.
- */
-struct ProcessReplyClosure
-{
- /**
- * The data for the reply.
- */
- const void *data;
-
- /**
- * When the reply expires.
- */
- struct GNUNET_TIME_Absolute expiration;
-
- /**
- * Size of data.
- */
- size_t size;
-
- /**
- * Namespace that this reply belongs to
- * (if it is of type SBLOCK).
- */
- GNUNET_HashCode namespace;
-
- /**
- * Type of the block.
- */
- uint32_t type;
-
- /**
- * How much was this reply worth to us?
- */
- uint32_t priority;
-};
-
-
-/**
- * Information about a peer that we are connected to.
- * We track data that is useful for determining which
- * peers should receive our requests.
- */
-struct ConnectedPeer
-{
-
- /**
- * List of the last clients for which this peer
- * successfully answered a query.
- */
- struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
-
- /**
- * List of the last PIDs for which
- * this peer successfully answered a query;
- * We use 0 to indicate no successful reply.
- */
- GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
-
- /**
- * Average delay between sending the peer a request and
- * getting a reply (only calculated over the requests for
- * which we actually got a reply). Calculated
- * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
- */
- struct GNUNET_TIME_Relative avg_delay;
-
- /**
- * Handle for an active request for transmission to this
- * peer, or NULL.
- */
- struct GNUNET_CORE_TransmitHandle *cth;
-
- /**
- * Messages (replies, queries, content migration) we would like to
- * send to this peer in the near future. Sorted by priority.
- */
- struct PendingMessage *pending_messages;
-
- /**
- * Average priority of successful replies. Calculated
- * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
- */
- double avg_priority;
-
- /**
- * The peer's identity.
- */
- GNUNET_PEER_Id pid;
-
- /**
- * Number of requests we have currently pending with this peer (that
- * is, requests that were transmitted so recently that we would not
- * retransmit them right now).
- */
- unsigned int pending_requests;
-
- /**
- * Which offset in "last_p2p_replies" will be updated next?
- * (we go round-robin).
- */
- unsigned int last_p2p_replies_woff;
-
- /**
- * Which offset in "last_client_replies" will be updated next?
- * (we go round-robin).
- */
- unsigned int last_client_replies_woff;
-
-};
-
-
-/**
- * Our connection to the datastore.
- */
-static struct GNUNET_DATASTORE_Handle *dsh;
-
-/**
- * Our scheduler.
- */
-static struct GNUNET_SCHEDULER_Handle *sched;
-
-/**
- * Our configuration.
- */
-const struct GNUNET_CONFIGURATION_Handle *cfg;
+/**
+ * Linked list of clients we are currently processing requests for.
+ */
+struct ClientList *client_list;
/**
- * Handle to the core service (NULL until we've connected to it).
+ * Pointer to handle to the core service (points to NULL until we've
+ * connected to it).
*/
struct GNUNET_CORE_Handle *core;
-/**
- * Head of doubly-linked LGC list.
- */
-static struct LocalGetContext *lgc_head;
-
-/**
- * Tail of doubly-linked LGC list.
- */
-static struct LocalGetContext *lgc_tail;
-
-/**
- * Head of request queue for the datastore, sorted by timeout.
- */
-static struct DatastoreRequestQueue *drq_head;
-
-/**
- * Tail of request queue for the datastore.
- */
-static struct DatastoreRequestQueue *drq_tail;
-
-/**
- * Map of query hash codes to requests.
- */
-static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
-
-/**
- * Map of peer IDs to requests (for those requests coming
- * from other peers).
- */
-static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
-
-/**
- * Linked list of all of our clients and their requests.
- */
-static struct ClientList *clients;
-
-/**
- * Heap with the request that will expire next at the top. Contains
- * pointers of type "struct PendingRequest*"; these will *also* be
- * aliased from the "requests_by_peer" data structures and the
- * "requests_by_query" table. Note that requests from our clients
- * don't expire and are thus NOT in the "requests_by_expiration"
- * (or the "requests_by_peer" tables).
- */
-static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
-
-/**
- * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
- */
-static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
-
-/**
- * Maximum number of requests (from other peers) that we're
- * willing to have pending at any given point in time.
- * FIXME: set from configuration (and 32 is a tiny value for testing only).
- */
-static uint64_t max_pending_requests = 32;
+/* ******************* clean up functions ************************ */
/**
- * Run the next DS request in our
- * queue, we're done with the current one.
+ * We're done with a particular message list entry.
+ * Free all associated resources.
+ *
+ * @param pml entry to destroy
*/
static void
-next_ds_request ()
+destroy_pending_message_list_entry (struct PendingMessageList *pml)
{
- struct DatastoreRequestQueue *e;
-
- while (NULL != (e = drq_head))
- {
- if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
- break;
- if (e->task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (sched, e->task);
- GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
- e->req (e->req_cls, GNUNET_NO);
- GNUNET_free (e);
- }
- if (e == NULL)
- return;
- if (e->task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (sched, e->task);
- e->task = GNUNET_SCHEDULER_NO_TASK;
- e->req (e->req_cls, GNUNET_YES);
- GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
- GNUNET_free (e);
+ GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
+ pml->req->pending_tail,
+ pml);
+ GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
+ pml->target->pending_messages_tail,
+ pml->pm);
+ pml->target->pending_requests--;
+ GNUNET_free (pml->pm);
+ GNUNET_free (pml);
}
/**
- * A datastore request had to be timed out.
+ * Destroy the given pending message (and call the respective
+ * continuation).
*
- * @param cls closure (of type "struct DatastoreRequestQueue*")
- * @param tc task context, unused
+ * @param pm message to destroy
+ * @param tpid id of peer that the message was delivered to, or 0 for none
*/
static void
-timeout_ds_request (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+destroy_pending_message (struct PendingMessage *pm,
+ GNUNET_PEER_Id tpid)
{
- struct DatastoreRequestQueue *e = cls;
+ struct PendingMessageList *pml = pm->pml;
- e->task = GNUNET_SCHEDULER_NO_TASK;
- GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
- e->req (e->req_cls, GNUNET_NO);
- GNUNET_free (e);
+ GNUNET_assert (pml->pm == pm);
+ GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
+ pm->cont (pm->cont_cls, 0);
+ destroy_pending_message_list_entry (pml);
}
+
/**
- * Queue a request for the datastore.
+ * We're done processing a particular request.
+ * Free all associated resources.
*
- * @param deadline by when the request should run
- * @param fun function to call once the request can be run
- * @param fun_cls closure for fun
+ * @param pr request to destroy
*/
-static struct DatastoreRequestQueue *
-queue_ds_request (struct GNUNET_TIME_Relative deadline,
- RequestFunction fun,
- void *fun_cls)
+static void
+destroy_pending_request (struct PendingRequest *pr)
{
- struct DatastoreRequestQueue *e;
- struct DatastoreRequestQueue *bef;
+ struct GNUNET_PeerIdentity pid;
- if (drq_head == NULL)
+ if (pr->hnode != NULL)
{
- /* no other requests pending, run immediately */
- fun (fun_cls, GNUNET_OK);
- return NULL;
+ GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
+ pr->hnode);
+ pr->hnode = NULL;
}
- e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
- e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
- e->req = fun;
- e->req_cls = fun_cls;
- if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
+ /* might have already been removed from map
+ in 'process_reply' if there was a unique
+ reply; hence ignore the return value here */
+ (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
+ &pr->query,
+ pr);
+ if (pr->drq != NULL)
{
- /* local request, highest prio, put at head of queue
- regardless of deadline */
- bef = NULL;
+ GNUNET_FS_drq_get_cancel (pr->drq);
+ pr->drq = NULL;
}
- else
+ if (pr->client_request_list != NULL)
+ {
+ GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
+ pr->client_request_list->client_list->rl_tail,
+ pr->client_request_list);
+ GNUNET_free (pr->client_request_list);
+ pr->client_request_list = NULL;
+ }
+ if (pr->pht_entry != NULL)
+ {
+ GNUNET_PEER_resolve (pr->pht_entry->cp->pid,
+ &pid);
+ GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
+ &pid.hashPubKey,
+ pr->pht_entry);
+ GNUNET_free (pr->pht_entry);
+ pr->pht_entry = NULL;
+ }
+ if (pr->bf != NULL)
+ {
+ GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+ pr->bf = NULL;
+ }
+ if (pr->irc != NULL)
+ {
+ GNUNET_CORE_peer_change_preference_cancel (pr->irc);
+ pr->irc = NULL;
+ }
+ if (pr->replies_seen != NULL)
+ {
+ GNUNET_free (pr->replies_seen);
+ pr->replies_seen = NULL;
+ }
+ if (pr->task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched,
+ pr->task);
+ pr->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ while (NULL != pr->pending_head)
+ destroy_pending_message_list_entry (pr->pending_head);
+ GNUNET_PEER_change_rc (pr->target_pid, -1);
+ if (pr->used_pids != NULL)
{
- bef = drq_tail;
- while ( (NULL != bef) &&
- (e->timeout.value < bef->timeout.value) )
- bef = bef->prev;
+ GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
+ GNUNET_free (pr->used_pids);
+ pr->used_pids_off = 0;
+ pr->used_pids_size = 0;
+ pr->used_pids = NULL;
}
- GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
- if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
- return e;
- e->task = GNUNET_SCHEDULER_add_delayed (sched,
- deadline,
- &timeout_ds_request,
- e);
- return e;
+ GNUNET_free (pr);
}
/**
- * Free the state associated with a local get context.
+ * Method called whenever a given peer connects.
*
- * @param lgc the lgc to free
+ * @param cls closure, not used
+ * @param peer peer identity this notification is about
+ * @param latency reported latency of the connection with 'other'
+ * @param distance reported distance (DV) to 'other'
*/
-static void
-local_get_context_free (struct LocalGetContext *lgc)
+static void
+peer_connect_handler (void *cls,
+ const struct
+ GNUNET_PeerIdentity * peer,
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance)
{
- GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
- GNUNET_SERVER_client_drop (lgc->client);
- GNUNET_free_non_null (lgc->results);
- if (lgc->results_bf != NULL)
- GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
- if (lgc->req != NULL)
- {
- if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
- GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
- GNUNET_free (lgc->req);
- }
- GNUNET_free (lgc);
+ struct ConnectedPeer *cp;
+
+ cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
+ cp->pid = GNUNET_PEER_intern (peer);
+ GNUNET_CONTAINER_multihashmap_put (connected_peers,
+ &peer->hashPubKey,
+ cp,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
}
/**
- * We're able to transmit the next (local) result to the client.
- * Do it and ask the datastore for more. Or, on error, tell
- * the datastore to stop giving us more.
+ * Free (each) request made by the peer.
*
- * @param cls our closure (struct LocalGetContext)
- * @param max maximum number of bytes we can transmit
- * @param buf where to copy our message
- * @return number of bytes copied to buf
+ * @param cls closure, points to peer that the request belongs to
+ * @param key current key code
+ * @param value value in the hash map
+ * @return GNUNET_YES (we should continue to iterate)
*/
-static size_t
-transmit_local_result (void *cls,
- size_t max,
- void *buf)
+static int
+destroy_request (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
{
- struct LocalGetContext *lgc = cls;
- uint16_t msize;
-
- if (NULL == buf)
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to transmit result to local client, aborting datastore iteration.\n");
-#endif
- /* error, abort! */
- GNUNET_free (lgc->result);
- lgc->result = NULL;
- GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
- return 0;
- }
- msize = ntohs (lgc->result->header.size);
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting %u bytes of result to local client.\n",
- msize);
-#endif
- GNUNET_assert (max >= msize);
- memcpy (buf, lgc->result, msize);
- GNUNET_free (lgc->result);
- lgc->result = NULL;
- GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
- return msize;
+ const struct GNUNET_PeerIdentity * peer = cls;
+ struct PendingRequest *pr = value;
+
+ GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
+ &peer->hashPubKey,
+ pr);
+ destroy_pending_request (pr);
+ return GNUNET_YES;
}
/**
- * Mingle hash with the mingle_number to produce different bits.
+ * Method called whenever a peer disconnects.
+ *
+ * @param cls closure, not used
+ * @param peer peer identity this notification is about
*/
static void
-mingle_hash (const GNUNET_HashCode * in,
- int32_t mingle_number,
+peer_disconnect_handler (void *cls,
+ const struct
+ GNUNET_PeerIdentity * peer)
+{
+ struct ConnectedPeer *cp;
+ struct PendingMessage *pm;
+ unsigned int i;
+
+ GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
+ &peer->hashPubKey,
+ &destroy_request,
+ (void*) peer);
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &peer->hashPubKey);
+ if (cp == NULL)
+ return;
+ for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
+ {
+ if (NULL != cp->last_client_replies[i])
+ {
+ GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
+ cp->last_client_replies[i] = NULL;
+ }
+ }
+ GNUNET_CONTAINER_multihashmap_remove (connected_peers,
+ &peer->hashPubKey,
+ cp);
+ GNUNET_PEER_change_rc (cp->pid, -1);
+ GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
+ if (NULL != cp->cth)
+ GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+ while (NULL != (pm = cp->pending_messages_head))
+ destroy_pending_message (pm, 0 /* delivery failed */);
+ GNUNET_break (0 == cp->pending_requests);
+ GNUNET_free (cp);
+}
+
+
+/**
+ * Iterator over hash map entries that removes all occurences
+ * of the given 'client' from the 'last_client_replies' of the
+ * given connected peer.
+ *
+ * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
+ * @param key current key code (unused)
+ * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int
+remove_client_from_last_client_replies (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct GNUNET_SERVER_Client *client = cls;
+ struct ConnectedPeer *cp = value;
+ unsigned int i;
+
+ for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
+ {
+ if (cp->last_client_replies[i] == client)
+ {
+ GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
+ cp->last_client_replies[i] = NULL;
+ }
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * A client disconnected. Remove all of its pending queries.
+ *
+ * @param cls closure, NULL
+ * @param client identification of the client
+ */
+static void
+handle_client_disconnect (void *cls,
+ struct GNUNET_SERVER_Client
+ * client)
+{
+ struct ClientList *pos;
+ struct ClientList *prev;
+ struct ClientRequestList *rcl;
+ struct ClientResponseMessage *creply;
+
+ if (client == NULL)
+ return; /* huh? is this allowed? */
+ prev = NULL;
+ pos = client_list;
+ while ( (NULL != pos) &&
+ (pos->client != client) )
+ {
+ prev = pos;
+ pos = pos->next;
+ }
+ if (pos == NULL)
+ return; /* no requests pending for this client */
+ while (NULL != (rcl = pos->rl_head))
+ destroy_pending_request (rcl->req);
+ if (prev == NULL)
+ client_list = pos->next;
+ else
+ prev->next = pos->next;
+ if (pos->th != NULL)
+ {
+ GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
+ pos->th = NULL;
+ }
+ while (NULL != (creply = pos->res_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (pos->res_head,
+ pos->res_tail,
+ creply);
+ GNUNET_free (creply);
+ }
+ GNUNET_SERVER_client_drop (pos->client);
+ GNUNET_free (pos);
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &remove_client_from_last_client_replies,
+ client);
+}
+
+
+/**
+ * Iterator to free peer entries.
+ *
+ * @param cls closure, unused
+ * @param key current key code
+ * @param value value in the hash map (peer entry)
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int
+clean_peer (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Task run during shutdown.
+ *
+ * @param cls unused
+ * @param tc unused
+ */
+static void
+shutdown_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &clean_peer,
+ NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
+ connected_peers = NULL;
+ while (client_list != NULL)
+ handle_client_disconnect (NULL,
+ client_list->client);
+ GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
+ GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
+ requests_by_expiration_heap = 0;
+ GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
+ GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
+ query_request_map = NULL;
+ GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
+ GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
+ peer_request_map = NULL;
+ GNUNET_assert (NULL != core);
+ GNUNET_CORE_disconnect (core);
+ core = NULL;
+ sched = NULL;
+ cfg = NULL;
+}
+
+
+/* ******************* Utility functions ******************** */
+
+
+/**
+ * Transmit the given message by copying it to the target buffer
+ * "buf". "buf" will be NULL and "size" zero if the socket was closed
+ * for writing in the meantime. In that case, do nothing
+ * (the disconnect or shutdown handler will take care of the rest).
+ * If we were able to transmit messages and there are still more
+ * pending, ask core again for further calls to this function.
+ *
+ * @param cls closure, pointer to the 'struct ConnectedPeer*'
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_to_peer (void *cls,
+ size_t size, void *buf)
+{
+ struct ConnectedPeer *cp = cls;
+ char *cbuf = buf;
+ struct GNUNET_PeerIdentity pid;
+ struct PendingMessage *pm;
+ size_t msize;
+
+ cp->cth = NULL;
+ if (NULL == buf)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Dropping reply, core too busy.\n");
+#endif
+ return 0;
+ }
+ msize = 0;
+ while ( (NULL != (pm = cp->pending_messages_head) ) &&
+ (pm->msize <= size) )
+ {
+ memcpy (&cbuf[msize], &pm[1], pm->msize);
+ msize += pm->msize;
+ size -= pm->msize;
+ destroy_pending_message (pm, cp->pid);
+ }
+ if (NULL != pm)
+ {
+ GNUNET_PEER_resolve (cp->pid,
+ &pid);
+ cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+ pm->priority,
+ GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+ &pid,
+ pm->msize,
+ &transmit_to_peer,
+ pm);
+ }
+ return msize;
+}
+
+
+/**
+ * Add a message to the set of pending messages for the given peer.
+ *
+ * @param cp peer to send message to
+ * @param pm message to queue
+ * @param pr request on which behalf this message is being queued
+ */
+static void
+add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
+ struct PendingMessage *pm,
+ struct PendingRequest *pr)
+{
+ struct PendingMessage *pos;
+ struct PendingMessageList *pml;
+ struct GNUNET_PeerIdentity pid;
+
+ GNUNET_assert (pm->next == NULL);
+ GNUNET_assert (pm->pml == NULL);
+ pml = GNUNET_malloc (sizeof (struct PendingMessageList));
+ pml->req = pr;
+ pml->target = cp;
+ pml->pm = pm;
+ pm->pml = pml;
+ GNUNET_CONTAINER_DLL_insert (pr->pending_head,
+ pr->pending_tail,
+ pml);
+ pos = cp->pending_messages_head;
+ while ( (pos != NULL) &&
+ (pm->priority < pos->priority) )
+ pos = pos->next;
+ GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
+ cp->pending_messages_tail,
+ pos,
+ pm);
+ cp->pending_requests++;
+ if (cp->pending_requests > MAX_QUEUE_PER_PEER)
+ destroy_pending_message (cp->pending_messages_tail, 0);
+ if (cp->cth == NULL)
+ {
+ /* need to schedule transmission */
+ GNUNET_PEER_resolve (cp->pid, &pid);
+ cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+ cp->pending_messages_head->priority,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &pid,
+ cp->pending_messages_head->msize,
+ &transmit_to_peer,
+ cp);
+ }
+ if (cp->cth == NULL)
+ {
+ /* FIXME: call stats (rare, bad case) */
+ }
+}
+
+
+/**
+ * Mingle hash with the mingle_number to produce different bits.
+ */
+static void
+mingle_hash (const GNUNET_HashCode * in,
+ int32_t mingle_number,
GNUNET_HashCode * hc)
{
GNUNET_HashCode m;
}
+/**
+ * Test if the load on this peer is too high
+ * to even consider processing the query at
+ * all.
+ *
+ * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
+ */
+static int
+test_load_too_high ()
+{
+ return GNUNET_NO; // FIXME
+}
+
+
+/* ******************* Pending Request Refresh Task ******************** */
+
+
+/**
+ * Function called after we either failed or succeeded
+ * at transmitting a query to a peer.
+ *
+ * @param cls the requests "struct PendingRequest*"
+ * @param pid ID of receiving peer, 0 on transmission error
+ */
+static void
+transmit_query_continuation (void *cls,
+ GNUNET_PEER_Id tpid)
+{
+ struct PendingRequest *pr = cls;
+
+ if (tpid == 0)
+ return;
+ GNUNET_PEER_change_rc (tpid, 1);
+ if (pr->used_pids_off == pr->used_pids_size)
+ GNUNET_array_grow (pr->used_pids,
+ pr->used_pids_size,
+ pr->used_pids_size * 2 + 2);
+ pr->used_pids[pr->used_pids_off++] = tpid;
+}
+
+
+#if 0
/**
* How many bytes should a bloomfilter be if we have already seen
* entry_count responses? Note that BLOOMFILTER_K gives us the number
}
return bf;
}
+#endif
/**
- * Closure used for "target_peer_select_cb".
- */
-struct PeerSelectionContext
-{
- /**
- * The request for which we are selecting
- * peers.
- */
- struct PendingRequest *pr;
-
- /**
- * Current "prime" target.
- */
- struct GNUNET_PeerIdentity target;
-
- /**
- * How much do we like this target?
- */
- double target_score;
-
-};
-
-
-/**
- * Function called for each connected peer to determine
- * which one(s) would make good targets for forwarding.
+ * We use a random delay to make the timing of requests less
+ * predictable. This function returns such a random delay.
*
- * @param cls closure (struct PeerSelectionContext)
- * @param key current key code (peer identity)
- * @param value value in the hash map (struct ConnectedPeer)
- * @return GNUNET_YES if we should continue to
- * iterate,
- * GNUNET_NO if not.
- */
-static int
-target_peer_select_cb (void *cls,
- const GNUNET_HashCode * key,
- void *value)
-{
- struct PeerSelectionContext *psc = cls;
- struct ConnectedPeer *cp = value;
- struct PendingRequest *pr = psc->pr;
- double score;
- unsigned int i;
-
- /* 1) check if we have already (recently) forwarded to this peer */
- for (i=0;i<pr->used_pids_off;i++)
- if (pr->used_pids[i] == cp->pid)
- return GNUNET_YES; /* skip */
- // 2) calculate how much we'd like to forward to this peer
- score = 0; // FIXME!
-
- /* store best-fit in closure */
- if (score > psc->target_score)
- {
- psc->target_score = score;
- psc->target.hashPubKey = *key;
- }
- return GNUNET_YES;
-}
-
-
-/**
- * We use a random delay to make the timing of requests
- * less predictable. This function returns such a random
- * delay.
+ * FIXME: make schedule dependent on the specifics of the request?
+ * Or bandwidth and number of connected peers and load?
*
* @return random delay to use for some request, between 0 and TTL_DECREMENT ms
*/
/**
- * Task that is run for each request with the goal of forwarding the
- * associated query to other peers. The task should re-schedule
- * itself to be re-run once the TTL has expired. (or at a later time
- * if more peers should be queried earlier).
+ * Function called after we've tried to reserve a certain amount of
+ * bandwidth for a reply. Check if we succeeded and if so send our
+ * query.
*
* @param cls the requests "struct PendingRequest*"
- * @param tc task context (unused)
- */
-static void
-forward_request_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
-/**
- * We've selected a peer for forwarding of a query. Construct the
- * message and then re-schedule the task to forward again to (other)
- * peers.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_request_cb (void *cls,
- size_t size,
- void *buf)
-{
- struct ConnectedPeer *cp = cls;
- char *cbuf = buf;
- struct GNUNET_PeerIdentity target;
- struct PendingMessage *pr;
- size_t tot;
-
- cp->cth = NULL;
- tot = 0;
- while ( (NULL != (pr = cp->pending_messages)) &&
- (pr->msize < size - tot) )
- {
- memcpy (&cbuf[tot],
- &pr[1],
- pr->msize);
- tot += pr->msize;
- cp->pending_messages = pr->next;
- GNUNET_free (pr);
- }
- if (NULL != pr)
- {
- GNUNET_PEER_resolve (cp->pid,
- &target);
- cp->cth = GNUNET_CORE_notify_transmit_ready (core,
- pr->priority,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &target,
- pr->msize,
- &transmit_request_cb,
- cp);
- }
- return tot;
-}
-
-
-/**
- * Function called after we've tried to reserve a certain amount of
- * bandwidth for a reply. Check if we succeeded and if so send our
- * query.
- *
- * @param cls the requests "struct PendingRequest*"
- * @param peer identifies the peer
- * @param bpm_in set to the current bandwidth limit (receiving) for this peer
- * @param bpm_out set to the current bandwidth limit (sending) for this peer
- * @param amount set to the amount that was actually reserved or unreserved
- * @param preference current traffic preference for the given peer
+ * @param peer identifies the peer
+ * @param bpm_in set to the current bandwidth limit (receiving) for this peer
+ * @param bpm_out set to the current bandwidth limit (sending) for this peer
+ * @param amount set to the amount that was actually reserved or unreserved
+ * @param preference current traffic preference for the given peer
*/
static void
target_reservation_cb (void *cls,
struct PendingRequest *pr = cls;
struct ConnectedPeer *cp;
struct PendingMessage *pm;
- struct PendingMessage *pos;
- struct PendingMessage *prev;
struct GetMessage *gm;
GNUNET_HashCode *ext;
char *bfdata;
size_t msize;
unsigned int k;
- pr->task = GNUNET_SCHEDULER_add_delayed (sched,
- get_processing_delay (), // FIXME: longer?
- &forward_request_task,
- pr);
pr->irc = NULL;
GNUNET_assert (peer != NULL);
if (amount != DBLOCK_SIZE)
/* FIXME: call stats... */
return; /* this target round failed */
}
- // (2) transmit, update ttl/priority
+ // (3) transmit, update ttl/priority
cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
&peer->hashPubKey);
if (cp == NULL)
{
- /* Peer must have just left; try again immediately */
- pr->task = GNUNET_SCHEDULER_add_now (sched,
- &forward_request_task,
- pr);
+ /* Peer must have just left */
return;
}
/* build message and insert message into priority queue */
GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
pm->msize = msize;
- pm->priority = 0; // FIXME: calculate priority properly!
gm = (struct GetMessage*) &pm[1];
gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
gm->header.size = htons (msize);
pr->remaining_priority /= 2;
gm->priority = htonl (pr->remaining_priority);
gm->ttl = htonl (pr->ttl);
- gm->filter_mutator = htonl(pr->mingle);
- gm->hash_bitmap = htonl (42);
+ gm->filter_mutator = htonl(pr->mingle); // FIXME: bad endianess conversion?
+ gm->hash_bitmap = htonl (42); // FIXME!
gm->query = pr->query;
ext = (GNUNET_HashCode*) &gm[1];
-
// FIXME: setup "ext[0]..[k-1]"
bfdata = (char *) &ext[k];
if (pr->bf != NULL)
GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
bfdata,
pr->bf_size);
+ pm->cont = &transmit_query_continuation;
+ pm->cont_cls = pr;
+ add_to_pending_messages_for_peer (cp, pm, pr);
+}
- prev = NULL;
- pos = cp->pending_messages;
- while ( (pos != NULL) &&
- (pm->priority < pos->priority) )
- {
- prev = pos;
- pos = pos->next;
- }
- if (prev == NULL)
- cp->pending_messages = pm;
- else
- prev->next = pm;
- pm->next = pos;
- if (cp->cth == NULL)
- cp->cth = GNUNET_CORE_notify_transmit_ready (core,
- cp->pending_messages->priority,
- GNUNET_TIME_UNIT_FOREVER_REL,
- peer,
- msize,
- &transmit_request_cb,
- cp);
- if (cp->cth == NULL)
+
+/**
+ * Closure used for "target_peer_select_cb".
+ */
+struct PeerSelectionContext
+{
+ /**
+ * The request for which we are selecting
+ * peers.
+ */
+ struct PendingRequest *pr;
+
+ /**
+ * Current "prime" target.
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
+ * How much do we like this target?
+ */
+ double target_score;
+
+};
+
+
+/**
+ * Function called for each connected peer to determine
+ * which one(s) would make good targets for forwarding.
+ *
+ * @param cls closure (struct PeerSelectionContext)
+ * @param key current key code (peer identity)
+ * @param value value in the hash map (struct ConnectedPeer)
+ * @return GNUNET_YES if we should continue to
+ * iterate,
+ * GNUNET_NO if not.
+ */
+static int
+target_peer_select_cb (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
+{
+ struct PeerSelectionContext *psc = cls;
+ struct ConnectedPeer *cp = value;
+ struct PendingRequest *pr = psc->pr;
+ double score;
+ unsigned int i;
+
+ /* 1) check if we have already (recently) forwarded to this peer */
+ for (i=0;i<pr->used_pids_off;i++)
+ if (pr->used_pids[i] == cp->pid)
+ return GNUNET_YES; /* skip */
+ // 2) calculate how much we'd like to forward to this peer
+ score = 42; // FIXME!
+ // FIXME: also need API to gather data on responsiveness
+ // of this peer (we have fields for that in 'cp', but
+ // they are never set!)
+
+ /* store best-fit in closure */
+ if (score > psc->target_score)
{
- /* technically, this should not be a 'break'; but
- we don't handle this (rare) case yet, so let's warn
- about it... */
- GNUNET_break (0);
- // FIXME: now what?
+ psc->target_score = score;
+ psc->target.hashPubKey = *key;
}
+ return GNUNET_YES;
}
-
+
/**
- * Task that is run for each request with the goal of forwarding the
- * associated query to other peers. The task should re-schedule
- * itself to be re-run once the TTL has expired. (or at a later time
- * if more peers should be queried earlier).
+ * We're processing a GET request from another peer and have decided
+ * to forward it to other peers. This function is called periodically
+ * and should forward the request to other peers until we have all
+ * possible replies. If we have transmitted the *only* reply to
+ * the initiator we should destroy the pending request. If we have
+ * many replies in the queue to the initiator, we should delay sending
+ * out more queries until the reply queue has shrunk some.
*
- * @param cls the requests "struct PendingRequest*"
- * @param tc task context (unused)
+ * @param cls our "struct ProcessGetContext *"
+ * @param tc unused
*/
static void
forward_request_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct PendingRequest *pr = cls;
struct PeerSelectionContext psc;
+ struct ConnectedPeer *cp;
- pr->task = GNUNET_SCHEDULER_NO_TASK;
+ pr->task = GNUNET_SCHEDULER_add_delayed (sched,
+ get_processing_delay (),
+ &forward_request_task,
+ pr);
+ if (pr->irc != NULL)
+ return; /* previous request still pending */
/* (1) select target */
psc.pr = pr;
psc.target_score = DBL_MIN;
GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
&target_peer_select_cb,
- &psc);
+ &psc);
if (psc.target_score == DBL_MIN)
- {
- /* no possible target found, wait some time */
- pr->task = GNUNET_SCHEDULER_add_delayed (sched,
- get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
- &forward_request_task,
- pr);
- return;
- }
+ return; /* nobody selected */
+
/* (2) reserve reply bandwidth */
- GNUNET_assert (NULL == pr->irc);
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &psc.target.hashPubKey);
pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
&psc.target,
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
- -1,
- DBLOCK_SIZE, // FIXME: make dependent on type?
- 0,
+ (uint32_t) -1 /* no limit */,
+ DBLOCK_SIZE,
+ (uint64_t) cp->inc_preference,
&target_reservation_cb,
pr);
+ cp->inc_preference = 0.0;
}
-/**
- * We're processing (local) results for a search request
- * from a (local) client. Pass applicable results to the
- * client and if we are done either clean up (operation
- * complete) or switch to P2P search (more results possible).
- *
- * @param cls our closure (struct LocalGetContext)
- * @param key key for the content
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param expiration expiration time for the content
- * @param uid unique identifier for the datum;
- * maybe 0 if no unique identifier is available
- */
-static void
-process_local_get_result (void *cls,
- const GNUNET_HashCode * key,
- uint32_t size,
- const void *data,
- uint32_t type,
- uint32_t priority,
- uint32_t anonymity,
- struct GNUNET_TIME_Absolute
- expiration,
- uint64_t uid)
-{
- struct LocalGetContext *lgc = cls;
- struct PendingRequest *pr;
- struct ClientRequestList *crl;
- struct ClientList *cl;
- size_t msize;
- unsigned int i;
-
- if (key == NULL)
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received last result for `%s' from local datastore, deciding what to do next.\n",
- GNUNET_h2s (&lgc->query));
-#endif
- /* no further results from datastore; continue
- processing further requests from the client and
- allow the next task to use the datastore; also,
- switch to P2P requests or clean up our state. */
- next_ds_request ();
- GNUNET_SERVER_receive_done (lgc->client,
- GNUNET_OK);
- if ( (lgc->results_used == 0) ||
- (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
- (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
- (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Forwarding query for `%s' to network.\n",
- GNUNET_h2s (&lgc->query));
-#endif
- cl = clients;
- while ( (NULL != cl) &&
- (cl->client != lgc->client) )
- cl = cl->next;
- if (cl == NULL)
- {
- cl = GNUNET_malloc (sizeof (struct ClientList));
- cl->client = lgc->client;
- cl->next = clients;
- clients = cl;
- }
- crl = GNUNET_malloc (sizeof (struct ClientRequestList));
- crl->cl = cl;
- GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
- pr = GNUNET_malloc (sizeof (struct PendingRequest));
- pr->client = lgc->client;
- GNUNET_SERVER_client_keep (pr->client);
- pr->crl_entry = crl;
- crl->req = pr;
- if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
- {
- pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
- *pr->namespace = lgc->namespace;
- }
- pr->replies_seen = lgc->results;
- lgc->results = NULL;
- pr->start_time = GNUNET_TIME_absolute_get ();
- pr->query = lgc->query;
- pr->target_pid = GNUNET_PEER_intern (&lgc->target);
- pr->replies_seen_off = lgc->results_used;
- pr->replies_seen_size = lgc->results_size;
- lgc->results_size = 0;
- pr->type = lgc->type;
- pr->anonymity_level = lgc->anonymity_level;
- pr->bf = refresh_bloomfilter (pr->replies_seen_off,
- &pr->mingle,
- &pr->bf_size,
- pr->replies_seen);
- GNUNET_CONTAINER_multihashmap_put (requests_by_query,
- &pr->query,
- pr,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- pr->task = GNUNET_SCHEDULER_add_delayed (sched,
- get_processing_delay (),
- &forward_request_task,
- pr);
- local_get_context_free (lgc);
- return;
- }
- /* got all possible results, clean up! */
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found all possible results for query for `%s', done!\n",
- GNUNET_h2s (&lgc->query));
-#endif
- local_get_context_free (lgc);
- return;
- }
- if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received on-demand block for `%s' from local datastore, fetching data.\n",
- GNUNET_h2s (&lgc->query));
-#endif
- GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
- anonymity, expiration, uid,
- dsh,
- &process_local_get_result,
- lgc);
- return;
- }
- if ( (type != lgc->type) &&
- (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) )
- {
- /* this should be virtually impossible to reach (DBLOCK
- query hash being identical to KBLOCK/SBLOCK query hash);
- nevertheless, if it happens, the correct thing is to
- simply skip the result. */
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n",
- type,
- lgc->type,
- GNUNET_h2s (&lgc->query));
-#endif
- GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
- return;
- }
- /* check if this is a result we've alredy
- received */
- for (i=0;i<lgc->results_used;i++)
- if (0 == memcmp (key,
- &lgc->results[i],
- sizeof (GNUNET_HashCode)))
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received duplicate result for `%s' from local datastore, ignoring.\n",
- GNUNET_h2s (&lgc->query));
-#endif
- GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
- return;
- }
- if (lgc->results_used == lgc->results_size)
- GNUNET_array_grow (lgc->results,
- lgc->results_size,
- lgc->results_size * 2 + 2);
- GNUNET_CRYPTO_hash (data,
- size,
- &lgc->results[lgc->results_used++]);
- msize = size + sizeof (struct ContentMessage);
- GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
- lgc->result = GNUNET_malloc (msize);
- lgc->result->header.size = htons (msize);
- lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
- lgc->result->type = htonl (type);
- lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
- memcpy (&lgc->result[1],
- data,
- size);
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received new result for `%s' from local datastore, passing to client.\n",
- GNUNET_h2s (&lgc->query));
-#endif
- GNUNET_SERVER_notify_transmit_ready (lgc->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_local_result,
- lgc);
-}
-
-
-/**
- * We're processing a search request from a local
- * client. Now it is our turn to query the datastore.
- *
- * @param cls our closure (struct LocalGetContext)
- * @param tc unused
- */
-static void
-transmit_local_get (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct LocalGetContext *lgc = cls;
- uint32_t type;
-
- type = lgc->type;
- if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
- type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
- GNUNET_DATASTORE_get (dsh,
- &lgc->query,
- type,
- &process_local_get_result,
- lgc,
- GNUNET_TIME_UNIT_FOREVER_REL);
-}
-
-
-/**
- * We're processing a search request from a local
- * client. Now it is our turn to query the datastore.
- *
- * @param cls our closure (struct LocalGetContext)
- * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
- */
-static void
-transmit_local_get_ready (void *cls,
- int ok)
-{
- struct LocalGetContext *lgc = cls;
-
- GNUNET_assert (GNUNET_OK == ok);
- GNUNET_SCHEDULER_add_continuation (sched,
- &transmit_local_get,
- lgc,
- GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-}
+/* **************************** P2P PUT Handling ************************ */
/**
- * Handle START_SEARCH-message (search request from client).
+ * Function called after we either failed or succeeded
+ * at transmitting a reply to a peer.
*
- * @param cls closure
- * @param client identification of the client
- * @param message the actual message
+ * @param cls the requests "struct PendingRequest*"
+ * @param pid ID of receiving peer, 0 on transmission error
*/
static void
-handle_start_search (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message)
+transmit_reply_continuation (void *cls,
+ GNUNET_PEER_Id tpid)
{
- const struct SearchMessage *sm;
- struct LocalGetContext *lgc;
- uint16_t msize;
- unsigned int sc;
-
- msize = ntohs (message->size);
- if ( (msize < sizeof (struct SearchMessage)) ||
- (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
- {
- GNUNET_break (0);
- GNUNET_SERVER_receive_done (client,
- GNUNET_SYSERR);
- return;
- }
- sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
- sm = (const struct SearchMessage*) message;
- GNUNET_SERVER_client_keep (client);
- lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
- if (sc > 0)
- {
- lgc->results_used = sc;
- GNUNET_array_grow (lgc->results,
- lgc->results_size,
- sc * 2);
- memcpy (lgc->results,
- &sm[1],
- sc * sizeof (GNUNET_HashCode));
- }
- lgc->client = client;
- lgc->type = ntohl (sm->type);
- lgc->anonymity_level = ntohl (sm->anonymity_level);
- switch (lgc->type)
+ struct PendingRequest *pr = cls;
+
+ switch (pr->type)
{
case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
- lgc->target.hashPubKey = sm->target;
+ /* only one reply expected, done with the request! */
+ destroy_pending_request (pr);
break;
+ case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
- lgc->namespace = sm->target;
break;
default:
+ GNUNET_break (0);
break;
}
- lgc->query = sm->query;
- GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
- lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_local_get_ready,
- lgc);
}
/**
- * List of handlers for the messages understood by this
- * service.
- */
-static struct GNUNET_SERVER_MessageHandler handlers[] = {
- {&GNUNET_FS_handle_index_start, NULL,
- GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
- {&GNUNET_FS_handle_index_list_get, NULL,
- GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
- {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
- sizeof (struct UnindexMessage) },
- {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
- 0 },
- {NULL, NULL, 0, 0}
-};
-
-
-/**
- * Clean up the memory used by the PendingRequest structure (except
- * for the client or peer list that the request may be part of).
+ * Check if the given KBlock is well-formed.
*
- * @param pr request to clean up
+ * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
+ * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
+ * @param query where to store the query that this block answers
+ * @return GNUNET_OK if this is actually a well-formed KBlock
*/
-static void
-destroy_pending_request (struct PendingRequest *pr)
+static int
+check_kblock (const struct KBlock *kb,
+ size_t dsize,
+ GNUNET_HashCode *query)
{
- struct PendingMessage *reply;
- struct ClientList *cl;
-
- GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
- &pr->query,
- pr);
- // FIXME: not sure how this can work (efficiently)
- // also, what does the return value mean?
- if (pr->irc != NULL)
- {
- GNUNET_CORE_peer_change_preference_cancel (pr->irc);
- pr->irc = NULL;
- }
- if (pr->client == NULL)
+ if (dsize < sizeof (struct KBlock))
{
- GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
- pr->hnode);
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- else
+ if (dsize - sizeof (struct KBlock) !=
+ ntohs (kb->purpose.size)
+ - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose)
+ - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) )
{
- cl = pr->crl_entry->cl;
- GNUNET_CONTAINER_DLL_remove (cl->head,
- cl->tail,
- pr->crl_entry);
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- if (GNUNET_SCHEDULER_NO_TASK != pr->task)
- GNUNET_SCHEDULER_cancel (sched, pr->task);
- if (NULL != pr->bf)
- GNUNET_CONTAINER_bloomfilter_free (pr->bf);
- if (NULL != pr->th)
- GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
- while (NULL != (reply = pr->replies_pending))
+ if (GNUNET_OK !=
+ GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
+ &kb->purpose,
+ &kb->signature,
+ &kb->keyspace))
{
- pr->replies_pending = reply->next;
- GNUNET_free (reply);
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- if (NULL != pr->cth)
- GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
- GNUNET_PEER_change_rc (pr->source_pid, -1);
- GNUNET_PEER_change_rc (pr->target_pid, -1);
- GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
- GNUNET_free_non_null (pr->used_pids);
- GNUNET_free_non_null (pr->replies_seen);
- GNUNET_free_non_null (pr->namespace);
- GNUNET_free (pr);
+ if (query != NULL)
+ GNUNET_CRYPTO_hash (&kb->keyspace,
+ sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+ query);
+ return GNUNET_OK;
}
/**
- * A client disconnected. Remove all of its pending queries.
+ * Check if the given SBlock is well-formed.
*
- * @param cls closure, NULL
- * @param client identification of the client
+ * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
+ * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
+ * @param query where to store the query that this block answers
+ * @param namespace where to store the namespace that this block belongs to
+ * @return GNUNET_OK if this is actually a well-formed SBlock
*/
-static void
-handle_client_disconnect (void *cls,
- struct GNUNET_SERVER_Client
- * client)
+static int
+check_sblock (const struct SBlock *sb,
+ size_t dsize,
+ GNUNET_HashCode *query,
+ GNUNET_HashCode *namespace)
{
- struct LocalGetContext *lgc;
- struct ClientList *cpos;
- struct ClientList *cprev;
- struct ClientRequestList *rl;
-
- if (client == NULL)
- return;
- lgc = lgc_head;
- while ( (NULL != lgc) &&
- (lgc->client != client) )
- lgc = lgc->next;
- if (lgc != NULL)
- local_get_context_free (lgc);
- cprev = NULL;
- cpos = clients;
- while ( (NULL != cpos) &&
- (clients->client != client) )
+ if (dsize < sizeof (struct SBlock))
{
- cprev = cpos;
- cpos = cpos->next;
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- if (cpos != NULL)
+ if (dsize !=
+ ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
{
- if (cprev == NULL)
- clients = cpos->next;
- else
- cprev->next = cpos->next;
- while (NULL != (rl = cpos->head))
- {
- cpos->head = rl->next;
- destroy_pending_request (rl->req);
- GNUNET_free (rl);
- }
- GNUNET_free (cpos);
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
+ if (GNUNET_OK !=
+ GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
+ &sb->purpose,
+ &sb->signature,
+ &sb->subspace))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if (query != NULL)
+ *query = sb->identifier;
+ if (namespace != NULL)
+ GNUNET_CRYPTO_hash (&sb->subspace,
+ sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+ namespace);
+ return GNUNET_OK;
}
/**
- * Iterator over entries in the "requests_by_query" map
- * that frees all the entries.
- *
- * @param cls closure, NULL
- * @param key current key code (the query, unused)
- * @param value value in the hash map, of type "struct PendingRequest*"
- * @return GNUNET_YES (we should continue to iterate)
- */
-static int
-destroy_pending_request_cb (void *cls,
- const GNUNET_HashCode * key,
- void *value)
-{
- struct PendingRequest *pr = value;
-
- destroy_pending_request (pr);
- return GNUNET_YES;
-}
-
-
-/**
- * Task run during shutdown.
+ * Transmit the given message by copying it to the target buffer
+ * "buf". "buf" will be NULL and "size" zero if the socket was closed
+ * for writing in the meantime. In that case, do nothing
+ * (the disconnect or shutdown handler will take care of the rest).
+ * If we were able to transmit messages and there are still more
+ * pending, ask core again for further calls to this function.
*
- * @param cls unused
- * @param tc unused
+ * @param cls closure, pointer to the 'struct ClientList*'
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
*/
-static void
-shutdown_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+static size_t
+transmit_to_client (void *cls,
+ size_t size, void *buf)
{
- if (NULL != core)
- {
- GNUNET_CORE_disconnect (core);
- core = NULL;
- }
- if (NULL != dsh)
+ struct ClientList *cl = cls;
+ char *cbuf = buf;
+ struct ClientResponseMessage *creply;
+ size_t msize;
+
+ cl->th = NULL;
+ if (NULL == buf)
{
- GNUNET_DATASTORE_disconnect (dsh,
- GNUNET_NO);
- dsh = NULL;
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Not sending reply, client communication problem.\n");
+#endif
+ return 0;
}
- GNUNET_CONTAINER_multihashmap_iterate (requests_by_query,
- &destroy_pending_request_cb,
- NULL);
- while (clients != NULL)
- handle_client_disconnect (NULL,
- clients->client);
- GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
- requests_by_query = NULL;
- GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
- requests_by_peer = NULL;
- GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
- requests_by_expiration = NULL;
- // FIXME: iterate over entries and free individually?
- // (or do we get disconnect notifications?)
- GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
- connected_peers = NULL;
+ msize = 0;
+ while ( (NULL != (creply = cl->res_head) ) &&
+ (creply->msize <= size) )
+ {
+ memcpy (&cbuf[msize], &creply[1], creply->msize);
+ msize += creply->msize;
+ size -= creply->msize;
+ GNUNET_CONTAINER_DLL_remove (cl->res_head,
+ cl->res_tail,
+ creply);
+ GNUNET_free (creply);
+ }
+ if (NULL != creply)
+ cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
+ creply->msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client,
+ cl);
+ return msize;
}
/**
- * Free (each) request made by the peer.
- *
- * @param cls closure, points to peer that the request belongs to
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES (we should continue to iterate)
+ * Closure for "process_reply" function.
*/
-static int
-destroy_request (void *cls,
- const GNUNET_HashCode * key,
- void *value)
+struct ProcessReplyClosure
{
- const struct GNUNET_PeerIdentity * peer = cls;
- struct PendingRequest *pr = value;
-
- GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
- &peer->hashPubKey,
- pr);
- destroy_pending_request (pr);
- return GNUNET_YES;
-}
+ /**
+ * The data for the reply.
+ */
+ const void *data;
+ // FIXME: add 'struct ConnectedPeer' to track 'last_xxx_replies' here!
+ /**
+ * When the reply expires.
+ */
+ struct GNUNET_TIME_Absolute expiration;
-/**
- * Method called whenever a given peer connects.
- *
- * @param cls closure, not used
- * @param peer peer identity this notification is about
- * @param latency reported latency of the connection with 'other'
- * @param distance reported distance (DV) to 'other'
- */
-static void
-peer_connect_handler (void *cls,
- const struct
- GNUNET_PeerIdentity * peer,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance)
-{
- struct ConnectedPeer *cp;
+ /**
+ * Size of data.
+ */
+ size_t size;
- cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
- cp->pid = GNUNET_PEER_intern (peer);
- GNUNET_CONTAINER_multihashmap_put (connected_peers,
- &peer->hashPubKey,
- cp,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-}
+ /**
+ * Namespace that this reply belongs to
+ * (if it is of type SBLOCK).
+ */
+ GNUNET_HashCode namespace;
+
+ /**
+ * Type of the block.
+ */
+ uint32_t type;
+
+ /**
+ * How much was this reply worth to us?
+ */
+ uint32_t priority;
+};
/**
- * Method called whenever a peer disconnects.
+ * We have received a reply; handle it!
*
- * @param cls closure, not used
- * @param peer peer identity this notification is about
+ * @param cls response (struct ProcessReplyClosure)
+ * @param key our query
+ * @param value value in the hash map (info about the query)
+ * @return GNUNET_YES (we should continue to iterate)
*/
-static void
-peer_disconnect_handler (void *cls,
- const struct
- GNUNET_PeerIdentity * peer)
+static int
+process_reply (void *cls,
+ const GNUNET_HashCode * key,
+ void *value)
{
+ struct ProcessReplyClosure *prq = cls;
+ struct PendingRequest *pr = value;
+ struct PendingMessage *reply;
+ struct ClientResponseMessage *creply;
+ struct ClientList *cl;
+ struct PutMessage *pm;
+ struct ContentMessage *cm;
struct ConnectedPeer *cp;
- struct PendingMessage *pm;
+ GNUNET_HashCode chash;
+ GNUNET_HashCode mhash;
+ size_t msize;
+ uint32_t prio;
- cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
- &peer->hashPubKey);
- if (cp != NULL)
+
+ GNUNET_CRYPTO_hash (prq->data,
+ prq->size,
+ &chash);
+ switch (prq->type)
{
- GNUNET_PEER_change_rc (cp->pid, -1);
- GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
- if (NULL != cp->cth)
- GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
- while (NULL != (pm = cp->pending_messages))
+ case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
+ case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+ /* only possible reply, stop requesting! */
+ while (NULL != pr->pending_head)
+ destroy_pending_message_list_entry (pr->pending_head);
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (query_request_map,
+ key,
+ prq));
+ break;
+ case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
+ if (0 != memcmp (pr->namespace,
+ &prq->namespace,
+ sizeof (GNUNET_HashCode)))
+ return GNUNET_YES; /* wrong namespace */
+ /* then: fall-through! */
+ case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
+ if (pr->bf != NULL)
{
- cp->pending_messages = pm->next;
- GNUNET_free (pm);
+ mingle_hash (&chash, pr->mingle, &mhash);
+ if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
+ &mhash))
+ return GNUNET_YES; /* duplicate */
+ GNUNET_CONTAINER_bloomfilter_add (pr->bf,
+ &mhash);
}
- GNUNET_free (cp);
+ if (pr->client_request_list != NULL)
+ {
+ cl = pr->client_request_list->client_list;
+ if (pr->replies_seen_size == pr->replies_seen_off)
+ {
+ GNUNET_array_grow (pr->replies_seen,
+ pr->replies_seen_size,
+ pr->replies_seen_size * 2 + 4);
+ // FIXME: recalculate BF!
+ }
+ pr->replies_seen[pr->replies_seen_off++] = chash;
+ }
+ break;
+ case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
+ // FIXME: any checks against duplicates for SKBlocks?
+ break;
+ default:
+ GNUNET_break (0);
+ return GNUNET_YES;
}
- GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
- &peer->hashPubKey,
- &destroy_request,
- (void*) peer);
+ prio = pr->priority;
+ prq->priority += pr->remaining_priority;
+ pr->remaining_priority = 0;
+ if (pr->client_request_list != NULL)
+ {
+ msize = sizeof (struct PutMessage) + prq->size;
+ creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
+ creply->msize = msize;
+ creply->client_list = cl;
+ GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
+ cl->res_tail,
+ cl->res_tail,
+ creply);
+ pm = (struct PutMessage*) &creply[1];
+ pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+ pm->header.size = htons (msize);
+ pm->type = htonl (prq->type);
+ pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
+ memcpy (&creply[1], prq->data, prq->size);
+ if (NULL == cl->th)
+ cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client,
+ cl);
+ GNUNET_break (cl->th != NULL);
+ }
+ else
+ {
+ cp = pr->pht_entry->cp;
+ msize = sizeof (struct ContentMessage) + prq->size;
+ reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
+ reply->cont = &transmit_reply_continuation;
+ reply->cont_cls = pr;
+ reply->msize = msize;
+ reply->priority = (uint32_t) -1; /* send replies first! */
+ cm = (struct ContentMessage*) &reply[1];
+ cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
+ cm->header.size = htons (msize);
+ cm->type = htonl (prq->type);
+ cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
+ memcpy (&reply[1], prq->data, prq->size);
+ add_to_pending_messages_for_peer (cp, reply, pr);
+ }
+
+
+ // FIXME: implement hot-path routing statistics keeping!
+ return GNUNET_YES;
}
/**
- * We're processing a GET request from another peer and have decided
- * to forward it to other peers.
+ * Handle P2P "PUT" message.
*
- * @param cls our "struct ProcessGetContext *"
- * @param tc unused
+ * @param cls closure, always NULL
+ * @param other the other peer involved (sender or receiver, NULL
+ * for loopback messages where we are both sender and receiver)
+ * @param message the actual message
+ * @param latency reported latency of the connection with 'other'
+ * @param distance reported distance (DV) to 'other'
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
*/
-static void
-forward_get_request (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+static int
+handle_p2p_put (void *cls,
+ const struct GNUNET_PeerIdentity *other,
+ const struct GNUNET_MessageHeader *message,
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance)
{
- struct ProcessGetContext *pgc = cls;
- struct PendingRequest *pr;
- struct PendingRequest *eer;
- struct GNUNET_PeerIdentity target;
+ const struct PutMessage *put;
+ uint16_t msize;
+ size_t dsize;
+ uint32_t type;
+ struct GNUNET_TIME_Absolute expiration;
+ GNUNET_HashCode query;
+ struct ProcessReplyClosure prq;
- pr = GNUNET_malloc (sizeof (struct PendingRequest));
- if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
- {
- pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
- *pr->namespace = pgc->namespace;
- }
- pr->bf = pgc->bf;
- pr->bf_size = pgc->bf_size;
- pgc->bf = NULL;
- pr->start_time = pgc->start_time;
- pr->query = pgc->query;
- pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
- if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
- pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
- pr->anonymity_level = 1; /* default */
- pr->priority = pgc->priority;
- pr->remaining_priority = pr->priority;
- pr->mingle = pgc->mingle;
- pr->ttl = pgc->ttl;
- pr->type = pgc->type;
- GNUNET_CONTAINER_multihashmap_put (requests_by_query,
- &pr->query,
- pr,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
- &pgc->reply_to.hashPubKey,
- pr,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration,
- pr,
- pr->start_time.value + pr->ttl);
- if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
+ msize = ntohs (message->size);
+ if (msize < sizeof (struct PutMessage))
{
- /* expire oldest request! */
- eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
- GNUNET_PEER_resolve (eer->source_pid,
- &target);
- GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
- &target.hashPubKey,
- eer);
- destroy_pending_request (eer);
+ GNUNET_break_op(0);
+ return GNUNET_SYSERR;
}
- pr->task = GNUNET_SCHEDULER_add_delayed (sched,
- get_processing_delay (),
- &forward_request_task,
- pr);
- GNUNET_free (pgc);
-}
-/**
- * Transmit the given message by copying it to
- * the target buffer "buf". "buf" will be
- * NULL and "size" zero if the socket was closed for
- * writing in the meantime. In that case, only
+ put = (const struct PutMessage*) message;
+ dsize = msize - sizeof (struct PutMessage);
+ type = ntohl (put->type);
+ expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
- * free the message
- *
- * @param cls closure, pointer to the message
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_message (void *cls,
- size_t size, void *buf)
-{
- struct GNUNET_MessageHeader *msg = cls;
- uint16_t msize;
-
- if (NULL == buf)
+ /* first, validate! */
+ switch (type)
{
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Dropping reply, core too busy.\n");
-#endif
- GNUNET_free (msg);
- return 0;
+ case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
+ case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+ GNUNET_CRYPTO_hash (&put[1], dsize, &query);
+ break;
+ case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
+ if (GNUNET_OK !=
+ check_kblock ((const struct KBlock*) &put[1],
+ dsize,
+ &query))
+ return GNUNET_SYSERR;
+ break;
+ case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
+ if (GNUNET_OK !=
+ check_sblock ((const struct SBlock*) &put[1],
+ dsize,
+ &query,
+ &prq.namespace))
+ return GNUNET_SYSERR;
+ break;
+ case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
+ // FIXME -- validate SKBLOCK!
+ GNUNET_break (0);
+ return GNUNET_OK;
+ default:
+ /* unknown block type */
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- msize = ntohs (msg->size);
- GNUNET_assert (size >= msize);
- memcpy (buf, msg, msize);
- GNUNET_free (msg);
- return msize;
+
+ /* now, lookup 'query' */
+ prq.data = (const void*) &put[1];
+ prq.size = dsize;
+ prq.type = type;
+ prq.expiration = expiration;
+ prq.priority = 0;
+ GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+ &query,
+ &process_reply,
+ &prq);
+ // FIXME: if migration is on and load is low,
+ // queue to store data in datastore;
+ // use "prq.priority" for that!
+ return GNUNET_OK;
}
-/**
- * Test if the load on this peer is too high
- * to even consider processing the query at
- * all.
- *
- * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
- */
-static int
-test_load_too_high ()
-{
- return GNUNET_NO; // FIXME
-}
+/* **************************** P2P GET Handling ************************ */
/**
* maybe 0 if no unique identifier is available
*/
static void
-process_p2p_get_result (void *cls,
- const GNUNET_HashCode * key,
- uint32_t size,
- const void *data,
- uint32_t type,
- uint32_t priority,
- uint32_t anonymity,
- struct GNUNET_TIME_Absolute
- expiration,
- uint64_t uid)
+process_local_reply (void *cls,
+ const GNUNET_HashCode * key,
+ uint32_t size,
+ const void *data,
+ uint32_t type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration,
+ uint64_t uid)
{
- struct ProcessGetContext *pgc = cls;
+ struct PendingRequest *pr = cls;
+ struct ProcessReplyClosure prq;
GNUNET_HashCode dhash;
GNUNET_HashCode mhash;
- struct PutMessage *reply;
+ GNUNET_HashCode query;
+ pr->drq = NULL;
if (NULL == key)
{
/* no more results */
- if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) == ROUTING_POLICY_FORWARD) &&
- ( (0 == pgc->results_found) ||
- (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
- (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
- (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
- {
- GNUNET_SCHEDULER_add_continuation (sched,
- &forward_get_request,
- pgc,
- GNUNET_SCHEDULER_REASON_PREREQ_DONE);
- }
- else
- {
- if (pgc->bf != NULL)
- GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
- GNUNET_free (pgc);
- }
- next_ds_request ();
+ if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+ pr->task = GNUNET_SCHEDULER_add_now (sched,
+ &forward_request_task,
+ pr);
return;
}
if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
{
- GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
- anonymity, expiration, uid, dsh,
- &process_p2p_get_result,
- pgc);
+ if (GNUNET_OK !=
+ GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
+ anonymity, expiration, uid,
+ &process_local_reply,
+ pr))
+ GNUNET_FS_drq_get_next (GNUNET_YES);
return;
}
/* check for duplicates */
GNUNET_CRYPTO_hash (data, size, &dhash);
mingle_hash (&dhash,
- pgc->mingle,
+ pr->mingle,
&mhash);
- if ( (pgc->bf != NULL) &&
+ if ( (pr->bf != NULL) &&
(GNUNET_YES ==
- GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
+ GNUNET_CONTAINER_bloomfilter_test (pr->bf,
&mhash)) )
{
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Result from datastore filtered by bloomfilter.\n");
#endif
- GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ GNUNET_FS_drq_get_next (GNUNET_YES);
return;
}
- pgc->results_found++;
- if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
- (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
- (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
+ pr->results_found++;
+ if ( (pr->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
+ (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
+ (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
{
- if (pgc->bf == NULL)
+ if (pr->bf == NULL)
{
- pgc->bf_size = 32;
- pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
- pgc->bf_size,
- BLOOMFILTER_K);
+ pr->bf_size = 32;
+ pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
+ pr->bf_size,
+ BLOOMFILTER_K);
}
- GNUNET_CONTAINER_bloomfilter_add (pgc->bf,
+ GNUNET_CONTAINER_bloomfilter_add (pr->bf,
&mhash);
}
-
- reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
- reply->header.size = htons (sizeof (struct PutMessage) + size);
- reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
- reply->type = htonl (type);
- reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
- memcpy (&reply[1], data, size);
- GNUNET_CORE_notify_transmit_ready (core,
- pgc->priority,
- ACCEPTABLE_REPLY_DELAY,
- &pgc->reply_to,
- sizeof (struct PutMessage) + size,
- &transmit_message,
- reply);
- if ( (GNUNET_YES == test_load_too_high()) ||
- (pgc->results_found > 5 + 2 * pgc->priority) )
+ memset (&prq, 0, sizeof (prq));
+ prq.data = data;
+ prq.expiration = expiration;
+ prq.size = size;
+ if ( (type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) &&
+ (GNUNET_OK != check_sblock ((const struct SBlock*) data,
+ size,
+ &query,
+ &prq.namespace)) )
{
- GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
- pgc->policy &= ~ ROUTING_POLICY_FORWARD;
+ GNUNET_break (0);
+ /* FIXME: consider removing the block? */
+ GNUNET_FS_drq_get_next (GNUNET_YES);
return;
}
- GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-}
+ prq.type = type;
+ prq.priority = priority;
+ process_reply (&prq, key, pr);
-
-/**
- * We're processing a GET request from another peer. Give it to our
- * local datastore.
- *
- * @param cls our "struct ProcessGetContext"
- * @param ok did we get a datastore slice or not?
- */
-static void
-ds_get_request (void *cls,
- int ok)
-{
- struct ProcessGetContext *pgc = cls;
- uint32_t type;
- struct GNUNET_TIME_Relative timeout;
-
- if (GNUNET_OK != ok)
+ if ( (GNUNET_YES == test_load_too_high()) ||
+ (pr->results_found > 5 + 2 * pr->priority) )
{
- /* no point in doing P2P stuff if we can't even do local */
- GNUNET_free (dsh);
+ GNUNET_FS_drq_get_next (GNUNET_NO);
return;
}
- type = pgc->type;
- if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
- type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
- timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
- (pgc->priority + 1));
- GNUNET_DATASTORE_get (dsh,
- &pgc->query,
- type,
- &process_p2p_get_result,
- pgc,
- timeout);
+ GNUNET_FS_drq_get_next (GNUNET_YES);
}
/**
- * We've received a request with the specified
- * priority. Bound it according to how much
- * we trust the given peer.
+ * We've received a request with the specified priority. Bound it
+ * according to how much we trust the given peer.
*
* @param prio_in requested priority
- * @param peer the peer making the request
+ * @param cp the peer making the request
* @return effective priority
*/
static uint32_t
bound_priority (uint32_t prio_in,
- const struct GNUNET_PeerIdentity *peer)
+ struct ConnectedPeer *cp)
{
return 0; // FIXME!
}
handle_p2p_get (void *cls,
const struct GNUNET_PeerIdentity *other,
const struct GNUNET_MessageHeader *message,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance)
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance)
{
+ struct PendingRequest *pr;
+ struct PeerRequestEntry *pre;
+ struct ConnectedPeer *cp;
+ struct ConnectedPeer *cps;
+ struct GNUNET_TIME_Relative timeout;
uint16_t msize;
const struct GetMessage *gm;
unsigned int bits;
const GNUNET_HashCode *opt;
- struct ProcessGetContext *pgc;
uint32_t bm;
size_t bfsize;
uint32_t ttl_decrement;
+ uint32_t type;
double preference;
- int net_load_up;
- int net_load_down;
msize = ntohs(message->size);
if (msize < sizeof (struct GetMessage))
if (1 == (bm & 1))
bits++;
bm >>= 1;
- }
- if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- opt = (const GNUNET_HashCode*) &gm[1];
- bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
- pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
- if (bfsize > 0)
- {
- pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
- bfsize,
- BLOOMFILTER_K);
- pgc->bf_size = bfsize;
- }
- pgc->type = ntohl (gm->type);
- pgc->bm = ntohl (gm->hash_bitmap);
- pgc->mingle = gm->filter_mutator;
- bits = 0;
- if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
- pgc->reply_to.hashPubKey = opt[bits++];
- else
- pgc->reply_to = *other;
- if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
- pgc->namespace = opt[bits++];
- else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
- {
- GNUNET_break_op (0);
- GNUNET_free (pgc);
- return GNUNET_SYSERR;
- }
- if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
- pgc->prime_target.hashPubKey = opt[bits++];
- /* note that we can really only check load here since otherwise
- peers could find out that we are overloaded by being disconnected
- after sending us a malformed query... */
- if (GNUNET_YES == test_load_too_high ())
- {
- if (NULL != pgc->bf)
- GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
- GNUNET_free (pgc);
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Dropping query from `%s', this peer is too busy.\n",
- GNUNET_i2s (other));
-#endif
- return GNUNET_OK;
- }
- net_load_up = 50; // FIXME
- net_load_down = 50; // FIXME
- pgc->policy = ROUTING_POLICY_NONE;
- if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
- (net_load_down < IDLE_LOAD_THRESHOLD) )
- {
- pgc->policy |= ROUTING_POLICY_ALL;
- pgc->priority = 0; /* no charge */
- }
- else
- {
- pgc->priority = bound_priority (ntohl (gm->priority), other);
- if ( (net_load_up <
- IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
- (net_load_down <
- IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
- {
- pgc->policy |= ROUTING_POLICY_ALL;
- }
- else
- {
- // FIXME: is this sound?
- if (net_load_up < 90 + 10 * pgc->priority)
- pgc->policy |= ROUTING_POLICY_FORWARD;
- if (net_load_down < 90 + 10 * pgc->priority)
- pgc->policy |= ROUTING_POLICY_ANSWER;
- }
- }
- if (pgc->policy == ROUTING_POLICY_NONE)
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Dropping query from `%s', network saturated.\n",
- GNUNET_i2s (other));
-#endif
- if (NULL != pgc->bf)
- GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
- GNUNET_free (pgc);
- return GNUNET_OK; /* drop */
- }
- if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
- pgc->priority = 0; /* kill the priority (we cannot benefit) */
- pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
- /* decrement ttl (always) */
- ttl_decrement = 2 * TTL_DECREMENT +
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- TTL_DECREMENT);
- if ( (pgc->ttl < 0) &&
- (pgc->ttl - ttl_decrement > 0) )
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Dropping query from `%s' due to TTL underflow.\n",
- GNUNET_i2s (other));
-#endif
- /* integer underflow => drop (should be very rare)! */
- if (NULL != pgc->bf)
- GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
- GNUNET_free (pgc);
- return GNUNET_OK;
- }
- pgc->ttl -= ttl_decrement;
- pgc->start_time = GNUNET_TIME_absolute_get ();
- preference = (double) pgc->priority;
- if (preference < QUERY_BANDWIDTH_VALUE)
- preference = QUERY_BANDWIDTH_VALUE;
- // FIXME: also reserve bandwidth for reply?
- (void) GNUNET_CORE_peer_change_preference (sched, cfg,
- other,
- GNUNET_TIME_UNIT_FOREVER_REL,
- 0, 0, preference, NULL, NULL);
- if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
- pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
- &ds_get_request,
- pgc);
- else
- GNUNET_SCHEDULER_add_continuation (sched,
- &forward_get_request,
- pgc,
- GNUNET_SCHEDULER_REASON_PREREQ_DONE);
- return GNUNET_OK;
-}
-
-
-/**
- * Function called to notify us that we can now transmit a reply to a
- * client or peer. "buf" will be NULL and "size" zero if the socket was
- * closed for writing in the meantime.
- *
- * @param cls closure, points to a "struct PendingRequest*" with
- * one or more pending replies
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_result (void *cls,
- size_t size,
- void *buf)
-{
- struct PendingRequest *pr = cls;
- char *cbuf = buf;
- struct PendingMessage *reply;
- size_t ret;
-
- ret = 0;
- while (NULL != (reply = pr->replies_pending))
- {
- if ( (reply->msize + ret < ret) ||
- (reply->msize + ret > size) )
- break;
- pr->replies_pending = reply->next;
- memcpy (&cbuf[ret], &reply[1], reply->msize);
- ret += reply->msize;
- GNUNET_free (reply);
- }
- return ret;
-}
-
-
-/**
- * We have received a reply; handle it!
- *
- * @param cls response (struct ProcessReplyClosure)
- * @param key our query
- * @param value value in the hash map (meta-info about the query)
- * @return GNUNET_YES (we should continue to iterate)
- */
-static int
-process_reply (void *cls,
- const GNUNET_HashCode * key,
- void *value)
-{
- struct ProcessReplyClosure *prq = cls;
- struct PendingRequest *pr = value;
- struct PendingRequest *eer;
- struct PendingMessage *reply;
- struct PutMessage *pm;
- struct ContentMessage *cm;
- struct ConnectedPeer *cp;
- GNUNET_HashCode chash;
- GNUNET_HashCode mhash;
- struct GNUNET_PeerIdentity target;
- size_t msize;
- uint32_t prio;
- struct GNUNET_TIME_Relative max_delay;
-
- GNUNET_CRYPTO_hash (prq->data,
- prq->size,
- &chash);
- switch (prq->type)
- {
- case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
- case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
- break;
- case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
- /* FIXME: does prq->namespace match our expectations? */
- /* then: fall-through??? */
- case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
- if (pr->bf != NULL)
- {
- mingle_hash (&chash, pr->mingle, &mhash);
- if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
- &mhash))
- return GNUNET_YES; /* duplicate */
- GNUNET_CONTAINER_bloomfilter_add (pr->bf,
- &mhash);
- }
- break;
- case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
- // FIXME: any checks against duplicates for SKBlocks?
- break;
- }
- prio = pr->priority;
- prq->priority += pr->remaining_priority;
- pr->remaining_priority = 0;
- if (pr->client != NULL)
- {
- if (pr->replies_seen_size == pr->replies_seen_off)
- GNUNET_array_grow (pr->replies_seen,
- pr->replies_seen_size,
- pr->replies_seen_size * 2 + 4);
- pr->replies_seen[pr->replies_seen_off++] = chash;
- // FIXME: possibly recalculate BF!
- }
- if (pr->client == NULL)
- {
- GNUNET_PEER_resolve (pr->source_pid,
- &target);
- cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
- &target.hashPubKey);
- msize = sizeof (struct ContentMessage) + prq->size;
- reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
- reply->msize = msize;
- reply->priority = (uint32_t) -1; /* send replies first! */
- cm = (struct ContentMessage*) &reply[1];
- cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
- cm->header.size = htons (msize);
- cm->type = htonl (prq->type);
- cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
- memcpy (&reply[1], prq->data, prq->size);
- max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
- if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
- {
- /* estimate expiration time from time difference between
- first request that will be discarded and this request */
- eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
- max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
- eer->start_time);
- }
-
- if (cp == NULL)
- {
- /* FIXME: bound queue size! */
- reply->next = pr->replies_pending;
- pr->replies_pending = reply;
- if (pr->cth == NULL)
- {
- /* implicitly tries to connect */
- pr->cth = GNUNET_CORE_notify_transmit_ready (core,
- prio,
- max_delay,
- &target,
- msize,
- &transmit_result,
- pr);
- }
- }
- else
- {
- /* insert replies always at the head */
- reply->next = cp->pending_messages;
- cp->pending_messages = reply;
- if (cp->cth == NULL)
- cp->cth = GNUNET_CORE_notify_transmit_ready (core,
- reply->priority,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &target,
- msize,
- &transmit_request_cb,
- cp);
- }
- }
- else
- {
- msize = sizeof (struct PutMessage) + prq->size;
- reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
- reply->msize = msize;
- reply->next = pr->replies_pending;
- pm = (struct PutMessage*) &reply[1];
- pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
- pm->header.size = htons (msize);
- pm->type = htonl (prq->type);
- pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
- pr->replies_pending = reply;
- memcpy (&reply[1], prq->data, prq->size);
- if (pr->th != NULL)
- return GNUNET_YES;
- pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_result,
- pr);
- if (pr->th == NULL)
- {
- // FIXME: need to try again later (not much
- // to do here specifically, but we need to
- // check somewhere else to handle this case!)
- }
- }
- // FIXME: implement hot-path routing statistics keeping!
- return GNUNET_YES;
-}
-
-
-/**
- * Check if the given KBlock is well-formed.
- *
- * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
- * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
- * @param query where to store the query that this block answers
- * @return GNUNET_OK if this is actually a well-formed KBlock
- */
-static int
-check_kblock (const struct KBlock *kb,
- size_t dsize,
- GNUNET_HashCode *query)
-{
- if (dsize < sizeof (struct KBlock))
+ }
+ if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
- }
- if (dsize - sizeof (struct KBlock) !=
- ntohs (kb->purpose.size)
- - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose)
- - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) )
+ }
+ opt = (const GNUNET_HashCode*) &gm[1];
+ bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
+
+ bm = ntohl (gm->hash_bitmap);
+ if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
+ (ntohl (gm->type) == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) )
{
GNUNET_break_op (0);
- return GNUNET_SYSERR;
+ return GNUNET_SYSERR;
}
- if (GNUNET_OK !=
- GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
- &kb->purpose,
- &kb->signature,
- &kb->keyspace))
+ bits = 0;
+ cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &other->hashPubKey);
+ GNUNET_assert (NULL != cps);
+ if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &opt[bits++]);
+ else
+ cp = cps;
+ if (cp == NULL)
+ {
+ /* FIXME: try connect? */
+ return GNUNET_OK;
+ }
+ /* note that we can really only check load here since otherwise
+ peers could find out that we are overloaded by not being
+ disconnected after sending us a malformed query... */
+ if (GNUNET_YES == test_load_too_high ())
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Dropping query from `%s', this peer is too busy.\n",
+ GNUNET_i2s (other));
+#endif
+ return GNUNET_OK;
+ }
+
+ pr = GNUNET_malloc (sizeof (struct PendingRequest) +
+ (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)?sizeof(GNUNET_HashCode):0);
+ if ((bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
+ pr->namespace = (GNUNET_HashCode*) &pr[1];
+ pr->type = ntohl (gm->type);
+ pr->mingle = gm->filter_mutator;
+ if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
+ memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
+ else if (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
{
GNUNET_break_op (0);
+ GNUNET_free (pr);
return GNUNET_SYSERR;
}
- if (query != NULL)
- GNUNET_CRYPTO_hash (&kb->keyspace,
- sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
- query);
- return GNUNET_OK;
-}
+ if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
+ pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
+ pr->anonymity_level = 1;
+ pr->priority = bound_priority (ntohl (gm->priority), cps);
+ pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
+ pr->query = gm->query;
+ /* decrement ttl (always) */
+ ttl_decrement = 2 * TTL_DECREMENT +
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ TTL_DECREMENT);
+ if ( (pr->ttl < 0) &&
+ (pr->ttl - ttl_decrement > 0) )
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Dropping query from `%s' due to TTL underflow.\n",
+ GNUNET_i2s (other));
+#endif
+ /* integer underflow => drop (should be very rare)! */
+ GNUNET_free (pr);
+ return GNUNET_OK;
+ }
+ pr->ttl -= ttl_decrement;
+ pr->start_time = GNUNET_TIME_absolute_get ();
-/**
- * Check if the given SBlock is well-formed.
- *
- * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
- * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
- * @param query where to store the query that this block answers
- * @param namespace where to store the namespace that this block belongs to
- * @return GNUNET_OK if this is actually a well-formed SBlock
- */
-static int
-check_sblock (const struct SBlock *sb,
- size_t dsize,
- GNUNET_HashCode *query,
- GNUNET_HashCode *namespace)
-{
- if (dsize < sizeof (struct SBlock))
+ /* get bloom filter */
+ if (bfsize > 0)
{
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
+ pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
+ bfsize,
+ BLOOMFILTER_K);
+ pr->bf_size = bfsize;
}
- if (dsize !=
- ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
+
+ /* FIXME: check somewhere if request already exists, and if so,
+ recycle old state... */
+ pre = GNUNET_malloc (sizeof (struct PeerRequestEntry));
+ pre->cp = cp;
+ pre->req = pr;
+ GNUNET_CONTAINER_multihashmap_put (query_request_map,
+ &gm->query,
+ pre,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+ pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
+ pr,
+ GNUNET_TIME_absolute_get().value + pr->ttl);
+
+
+ /* calculate change in traffic preference */
+ preference = (double) pr->priority;
+ if (preference < QUERY_BANDWIDTH_VALUE)
+ preference = QUERY_BANDWIDTH_VALUE;
+ cps->inc_preference += preference;
+
+ /* process locally */
+ type = pr->type;
+ if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
+ type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
+ timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
+ (pr->priority + 1));
+ pr->drq = GNUNET_FS_drq_get (&gm->query,
+ pr->type,
+ &process_local_reply,
+ pr,
+ timeout);
+
+ /* Are multiple results possible? If so, start processing remotely now! */
+ switch (pr->type)
{
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
+ case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
+ case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
+ /* only one result, wait for datastore */
+ break;
+ default:
+ pr->task = GNUNET_SCHEDULER_add_now (sched,
+ &forward_request_task,
+ pr);
}
- if (GNUNET_OK !=
- GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
- &sb->purpose,
- &sb->signature,
- &sb->subspace))
+
+ /* make sure we don't track too many requests */
+ if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
{
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
+ pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
+ destroy_pending_request (pr);
}
- if (query != NULL)
- *query = sb->identifier;
- if (namespace != NULL)
- GNUNET_CRYPTO_hash (&sb->subspace,
- sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
- namespace);
return GNUNET_OK;
}
+/* **************************** CS GET Handling ************************ */
+
+
/**
- * Handle P2P "PUT" request.
+ * Handle START_SEARCH-message (search request from client).
*
- * @param cls closure, always NULL
- * @param other the other peer involved (sender or receiver, NULL
- * for loopback messages where we are both sender and receiver)
+ * @param cls closure
+ * @param client identification of the client
* @param message the actual message
- * @param latency reported latency of the connection with 'other'
- * @param distance reported distance (DV) to 'other'
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
*/
-static int
-handle_p2p_put (void *cls,
- const struct GNUNET_PeerIdentity *other,
- const struct GNUNET_MessageHeader *message,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance)
+static void
+handle_start_search (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
{
- const struct PutMessage *put;
+ static GNUNET_HashCode all_zeros;
+ const struct SearchMessage *sm;
+ struct ClientList *cl;
+ struct ClientRequestList *crl;
+ struct PendingRequest *pr;
uint16_t msize;
- size_t dsize;
+ unsigned int sc;
uint32_t type;
- struct GNUNET_TIME_Absolute expiration;
- GNUNET_HashCode query;
- struct ProcessReplyClosure prq;
-
+
msize = ntohs (message->size);
- if (msize < sizeof (struct PutMessage))
+ if ( (msize < sizeof (struct SearchMessage)) ||
+ (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
{
- GNUNET_break_op(0);
- return GNUNET_SYSERR;
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client,
+ GNUNET_SYSERR);
+ return;
}
- put = (const struct PutMessage*) message;
- dsize = msize - sizeof (struct PutMessage);
- type = ntohl (put->type);
- expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
+ sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
+ sm = (const struct SearchMessage*) message;
- /* first, validate! */
+ cl = client_list;
+ while ( (cl != NULL) &&
+ (cl->client != client) )
+ cl = cl->next;
+ if (cl == NULL)
+ {
+ cl = GNUNET_malloc (sizeof (struct ClientList));
+ cl->client = client;
+ GNUNET_SERVER_client_keep (client);
+ cl->next = client_list;
+ client_list = cl;
+ }
+ type = ntohl (sm->type);
+
+ /* FIXME: detect duplicate request; if duplicate, simply update (merge)
+ 'pr->replies_seen'! */
+ pr = GNUNET_malloc (sizeof (struct PendingRequest) +
+ (type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)?sizeof(GNUNET_HashCode):0);
+ crl = GNUNET_malloc (sizeof (struct ClientRequestList));
+ crl->client_list = cl;
+ GNUNET_CONTAINER_DLL_insert (cl->rl_head,
+ cl->rl_tail,
+ crl);
+ crl->req = pr;
+ pr->type = type;
+ pr->client_request_list = crl;
+ GNUNET_array_grow (pr->replies_seen,
+ pr->replies_seen_size,
+ sc);
+ memcpy (pr->replies_seen,
+ &sm[1],
+ sc * sizeof (GNUNET_HashCode));
+ pr->replies_seen_off = sc;
+ pr->anonymity_level = ntohl (sm->anonymity_level);
+ pr->mingle = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
+ (uint32_t) -1);
+ pr->query = sm->query;
switch (type)
{
case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
- GNUNET_CRYPTO_hash (&put[1], dsize, &query);
- break;
- case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
- if (GNUNET_OK !=
- check_kblock ((const struct KBlock*) &put[1],
- dsize,
- &query))
- return GNUNET_SYSERR;
+ if (0 != memcmp (&sm->target,
+ &all_zeros,
+ sizeof (GNUNET_HashCode)))
+ pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
break;
case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
- if (GNUNET_OK !=
- check_sblock ((const struct SBlock*) &put[1],
- dsize,
- &query,
- &prq.namespace))
- return GNUNET_SYSERR;
+ pr->namespace = (GNUNET_HashCode*) &pr[1];
+ memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
break;
- case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
- // FIXME -- validate SKBLOCK!
- GNUNET_break (0);
- return GNUNET_OK;
default:
- /* unknown block type */
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
+ break;
}
-
- /* now, lookup 'query' */
- prq.data = (const void*) &put[1];
- prq.size = dsize;
- prq.type = type;
- prq.expiration = expiration;
- prq.priority = 0;
- GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
- &query,
- &process_reply,
- &prq);
- // FIXME: if migration is on and load is low,
- // queue to store data in datastore;
- // use "prq.priority" for that!
- return GNUNET_OK;
+ pr->drq = GNUNET_FS_drq_get (&sm->query,
+ pr->type,
+ &process_local_reply,
+ pr,
+ GNUNET_TIME_UNIT_FOREVER_REL);
}
+/* **************************** Startup ************************ */
+
+
/**
* List of handlers for P2P messages
* that we care about.
};
+/**
+ * List of handlers for the messages understood by this
+ * service.
+ */
+static struct GNUNET_SERVER_MessageHandler handlers[] = {
+ {&GNUNET_FS_handle_index_start, NULL,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
+ {&GNUNET_FS_handle_index_list_get, NULL,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
+ {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
+ sizeof (struct UnindexMessage) },
+ {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
+ 0 },
+ {NULL, NULL, 0, 0}
+};
+
+
/**
* Process fs requests.
*
* @param server the initialized server
* @param c configuration to use
*/
-static void
-run (void *cls,
- struct GNUNET_SCHEDULER_Handle *s,
- struct GNUNET_SERVER_Handle *server,
- const struct GNUNET_CONFIGURATION_Handle *c)
+static int
+main_init (struct GNUNET_SCHEDULER_Handle *s,
+ struct GNUNET_SERVER_Handle *server,
+ const struct GNUNET_CONFIGURATION_Handle *c)
{
sched = s;
cfg = c;
-
- requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
- requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
- connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
- requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- GNUNET_FS_init_indexing (sched, cfg);
- dsh = GNUNET_DATASTORE_connect (cfg,
- sched);
core = GNUNET_CORE_connect (sched,
cfg,
GNUNET_TIME_UNIT_FOREVER_REL,
NULL, GNUNET_NO,
NULL, GNUNET_NO,
p2p_handlers);
-
+ if (NULL == core)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to connect to `%s' service.\n"),
+ "core");
+ return GNUNET_SYSERR;
+ }
+ query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+ peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
+ requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
GNUNET_SERVER_disconnect_notify (server,
&handle_client_disconnect,
NULL);
GNUNET_TIME_UNIT_FOREVER_REL,
&shutdown_task,
NULL);
- if (NULL == dsh)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("Failed to connect to `%s' service.\n"),
- "datastore");
- GNUNET_SCHEDULER_shutdown (sched);
- return;
- }
- if (NULL == core)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("Failed to connect to `%s' service.\n"),
- "core");
+ return GNUNET_OK;
+}
+
+
+/**
+ * Process fs requests.
+ *
+ * @param cls closure
+ * @param sched scheduler to use
+ * @param server the initialized server
+ * @param cfg configuration to use
+ */
+static void
+run (void *cls,
+ struct GNUNET_SCHEDULER_Handle *sched,
+ struct GNUNET_SERVER_Handle *server,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ if ( (GNUNET_OK != GNUNET_FS_drq_init (sched, cfg)) ||
+ (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg)) ||
+ (GNUNET_OK != main_init (sched, server, cfg)) )
+ {
GNUNET_SCHEDULER_shutdown (sched);
- return;
+ return;
}
}