From 258bd33b0a8e26200d8bf36d8e65524a1069790d Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 29 Jan 2010 19:21:41 +0000 Subject: [PATCH] finally compiles again --- src/fs/Makefile.am | 3 +- src/fs/fs.h | 49 +- src/fs/gnunet-service-fs.c | 3492 ++++++++++++--------------- src/fs/gnunet-service-fs_drq.c | 416 ++++ src/fs/gnunet-service-fs_drq.h | 137 ++ src/fs/gnunet-service-fs_indexing.c | 37 +- src/fs/gnunet-service-fs_indexing.h | 12 +- 7 files changed, 2123 insertions(+), 2023 deletions(-) create mode 100644 src/fs/gnunet-service-fs_drq.c create mode 100644 src/fs/gnunet-service-fs_drq.h diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am index 70897c5ef..a43c8340d 100644 --- a/src/fs/Makefile.am +++ b/src/fs/Makefile.am @@ -85,7 +85,8 @@ gnunet_search_LDADD = \ gnunet_service_fs_SOURCES = \ gnunet-service-fs.c \ - gnunet-service-fs_indexing.c gnunet-service-fs_indexing.h + gnunet-service-fs_drq.c gnunet-service-fs_drq.h \ + gnunet-service-fs_indexing.c gnunet-service-fs_indexing.h gnunet_service_fs_LDADD = \ $(top_builddir)/src/fs/libgnunetfs.la \ $(top_builddir)/src/datastore/libgnunetdatastore.la \ diff --git a/src/fs/fs.h b/src/fs/fs.h index ae2e67654..d19f27b6f 100644 --- a/src/fs/fs.h +++ b/src/fs/fs.h @@ -1132,28 +1132,23 @@ struct SBlock /** - * Message sent from a GNUnet (fs) publishing - * activity to the gnunet-fs-service to - * initiate indexing of a file. The service - * is supposed to check if the specified file - * is available and has the same cryptographic - * hash. It should then respond with either - * a confirmation or a denial. + * Message sent from a GNUnet (fs) publishing activity to the + * gnunet-fs-service to initiate indexing of a file. The service is + * supposed to check if the specified file is available and has the + * same cryptographic hash. It should then respond with either a + * confirmation or a denial. * - * On OSes where this works, it is considered - * acceptable if the service only checks that - * the path, device and inode match (it can - * then be assumed that the hash will also match - * without actually computing it; this is an - * optimization that should be safe given that - * the client is not our adversary). + * On OSes where this works, it is considered acceptable if the + * service only checks that the path, device and inode match (it can + * then be assumed that the hash will also match without actually + * computing it; this is an optimization that should be safe given + * that the client is not our adversary). */ struct IndexStartMessage { /** - * Message type will be - * GNUNET_MESSAGE_TYPE_FS_INDEX_START. + * Message type will be GNUNET_MESSAGE_TYPE_FS_INDEX_START. */ struct GNUNET_MessageHeader header; @@ -1216,12 +1211,10 @@ struct IndexInfoMessage /** - * Message sent from a GNUnet (fs) unindexing - * activity to the gnunet-fs-service to - * indicate that a file will be unindexed. The service - * is supposed to remove the file from the - * list of indexed files and response with - * a confirmation message (even if the file + * Message sent from a GNUnet (fs) unindexing activity to the + * gnunet-service-fs to indicate that a file will be unindexed. The + * service is supposed to remove the file from the list of indexed + * files and response with a confirmation message (even if the file * was already not on the list). */ struct UnindexMessage @@ -1247,9 +1240,8 @@ struct UnindexMessage /** - * Message sent from a GNUnet (fs) search - * activity to the gnunet-fs-service to - * start a search. + * Message sent from a GNUnet (fs) search activity to the + * gnunet-service-fs to start a search. */ struct SearchMessage { @@ -1308,10 +1300,9 @@ struct SearchMessage /** - * Response from FS service with a result for - * a previous FS search. Note that queries - * for DBLOCKS and IBLOCKS that have received - * a single response are considered done. + * Response from FS service with a result for a previous FS search. + * Note that queries for DBLOCKS and IBLOCKS that have received a + * single response are considered done. */ struct ContentMessage { diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 57b6dd421..740f63624 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009 Christian Grothoff (and other contributing authors) + (C) 2009, 2010 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -20,341 +20,363 @@ /** * @file fs/gnunet-service-fs.c - * @brief program that provides the file-sharing service + * @brief gnunet anonymity protocol implementation * @author Christian Grothoff * * TODO: - * - fix gazillion of minor FIXME's - * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the - * core to transmit to another peer; need to make sure this is bounded overall... - * - randomly delay processing for improved anonymity (can wait) - * - content migration (put in local DS) (can wait) - * - handle some special cases when forwarding replies based on tracked requests (can wait) - * - tracking of success correlations for hot-path routing (can wait) - * - various load-based actions (can wait) - * - validation of KSBLOCKS (can wait) - * - remove on-demand blocks if they keep failing (can wait) - * - check that we decrement PIDs always where necessary (can wait) - * - find out how to use core-pulling instead of pushing (at least for some cases) + * - forward_request_task (P2P forwarding!) + * - track stats for hot-path routing + * - implement hot-path routing decision procedure + * - detect duplicate requests (P2P and CS) + * - implement: bound_priority, test_load_too_high, validate_skblock + * - add content migration support (store locally) + * - add random delay + * - statistics + * */ #include "platform.h" #include +#include "gnunet_constants.h" #include "gnunet_core_service.h" #include "gnunet_datastore_service.h" #include "gnunet_peer_lib.h" #include "gnunet_protocols.h" #include "gnunet_signatures.h" #include "gnunet_util_lib.h" +#include "gnunet-service-fs_drq.h" #include "gnunet-service-fs_indexing.h" #include "fs.h" -#define DEBUG_FS GNUNET_NO +/** + * Maximum number of outgoing messages we queue per peer. + * FIXME: set to a tiny value for testing; make configurable. + */ +#define MAX_QUEUE_PER_PEER 2 + + + +/** + * Maximum number of requests (from other peers) that we're + * willing to have pending at any given point in time. + * FIXME: set from configuration (and 32 is a tiny value for testing only). + */ +static uint64_t max_pending_requests = 32; + + +/** + * Information we keep for each pending reply. The + * actual message follows at the end of this struct. + */ +struct PendingMessage; + /** - * Signature of a function that is called whenever a datastore - * request can be processed (or an entry put on the queue times out). + * Function called upon completion of a transmission. * * @param cls closure - * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout + * @param pid ID of receiving peer, 0 on transmission error */ -typedef void (*RequestFunction)(void *cls, - int ok); +typedef void (*TransmissionContinuation)(void * cls, + GNUNET_PEER_Id tpid); /** - * Doubly-linked list of our requests for the datastore. + * Information we keep for each pending reply. The + * actual message follows at the end of this struct. */ -struct DatastoreRequestQueue +struct PendingMessage { - /** - * This is a doubly-linked list. + * This is a doubly-linked list of messages to the same peer. */ - struct DatastoreRequestQueue *next; + struct PendingMessage *next; /** - * This is a doubly-linked list. + * This is a doubly-linked list of messages to the same peer. */ - struct DatastoreRequestQueue *prev; + struct PendingMessage *prev; + + /** + * Entry in pending message list for this pending message. + */ + struct PendingMessageList *pml; /** - * Function to call (will issue the request). + * Function to call immediately once we have transmitted this + * message. */ - RequestFunction req; + TransmissionContinuation cont; /** - * Closure for req. + * Closure for cont. */ - void *req_cls; + void *cont_cls; /** - * When should this request time-out because we don't care anymore? + * Size of the reply; actual reply message follows + * at the end of this struct. */ - struct GNUNET_TIME_Absolute timeout; - + size_t msize; + /** - * ID of task used for signaling timeout. + * How important is this message for us? */ - GNUNET_SCHEDULER_TaskIdentifier task; - + uint32_t priority; + }; /** - * Closure for processing START_SEARCH messages from a client. + * Information about a peer that we are connected to. + * We track data that is useful for determining which + * peers should receive our requests. We also keep + * a list of messages to transmit to this peer. */ -struct LocalGetContext +struct ConnectedPeer { /** - * This is a doubly-linked list. + * List of the last clients for which this peer successfully + * answered a query. */ - struct LocalGetContext *next; + struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE]; /** - * This is a doubly-linked list. + * List of the last PIDs for which + * this peer successfully answered a query; + * We use 0 to indicate no successful reply. */ - struct LocalGetContext *prev; + GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE]; /** - * Client that initiated the search. - */ - struct GNUNET_SERVER_Client *client; + * Average delay between sending the peer a request and + * getting a reply (only calculated over the requests for + * which we actually got a reply). Calculated + * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n + */ + struct GNUNET_TIME_Relative avg_delay; /** - * Array of results that we've already received - * (can be NULL). + * Handle for an active request for transmission to this + * peer, or NULL. */ - GNUNET_HashCode *results; + struct GNUNET_CORE_TransmitHandle *cth; /** - * Bloomfilter over all results (for fast query construction); - * NULL if we don't have any results. - * - * FIXME: this member is not used, is that OK? If so, it should - * be removed! + * Messages (replies, queries, content migration) we would like to + * send to this peer in the near future. Sorted by priority, head. */ - struct GNUNET_CONTAINER_BloomFilter *results_bf; + struct PendingMessage *pending_messages_head; /** - * DS request associated with this operation. + * Messages (replies, queries, content migration) we would like to + * send to this peer in the near future. Sorted by priority, tail. */ - struct DatastoreRequestQueue *req; + struct PendingMessage *pending_messages_tail; /** - * Current result message to transmit to client (or NULL). - */ - struct ContentMessage *result; - - /** - * Type of the content that we're looking for. - * 0 for any. + * Average priority of successful replies. Calculated + * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n */ - uint32_t type; + double avg_priority; /** - * Desired anonymity level. + * Increase in traffic preference still to be submitted + * to the core service for this peer. FIXME: double or 'uint64_t'? */ - uint32_t anonymity_level; + double inc_preference; /** - * Number of results actually stored in the results array. - */ - unsigned int results_used; - - /** - * Size of the results array in memory. - */ - unsigned int results_size; - - /** - * Size (in bytes) of the 'results_bf' bloomfilter. - * - * FIXME: this member is not used, is that OK? If so, it should - * be removed! + * The peer's identity. */ - size_t results_bf_size; + GNUNET_PEER_Id pid; /** - * If the request is for a DBLOCK or IBLOCK, this is the identity of - * the peer that is known to have a response. Set to all-zeros if - * such a target is not known (note that even if OUR anonymity - * level is >0 we may happen to know the responder's identity; - * nevertheless, we should probably not use it for a DHT-lookup - * or similar blunt actions in order to avoid exposing ourselves). + * Size of the linked list of 'pending_messages'. */ - struct GNUNET_PeerIdentity target; + unsigned int pending_requests; /** - * If the request is for an SBLOCK, this is the identity of the - * pseudonym to which the SBLOCK belongs. + * Which offset in "last_p2p_replies" will be updated next? + * (we go round-robin). */ - GNUNET_HashCode namespace; + unsigned int last_p2p_replies_woff; /** - * Hash of the keyword (aka query) for KBLOCKs; Hash of - * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query) - * and hash of the identifier XORed with the target for - * SBLOCKS (aka query). + * Which offset in "last_client_replies" will be updated next? + * (we go round-robin). */ - GNUNET_HashCode query; + unsigned int last_client_replies_woff; }; /** - * Possible routing policies for an FS-GET request. + * Information we keep for each pending request. We should try to + * keep this struct as small as possible since its memory consumption + * is key to how many requests we can have pending at once. */ -enum RoutingPolicy - { - /** - * Simply drop the request. - */ - ROUTING_POLICY_NONE = 0, - - /** - * Answer it if we can from local datastore. - */ - ROUTING_POLICY_ANSWER = 1, - - /** - * Forward the request to other peers (if possible). - */ - ROUTING_POLICY_FORWARD = 2, - - /** - * Forward to other peers, and ask them to route - * the response via ourselves. - */ - ROUTING_POLICY_INDIRECT = 6, - - /** - * Do everything we could possibly do (that would - * make sense). - */ - ROUTING_POLICY_ALL = 7 - }; +struct PendingRequest; /** - * Internal context we use for our initial processing - * of a GET request. + * Doubly-linked list of requests we are performing + * on behalf of the same client. */ -struct ProcessGetContext +struct ClientRequestList { + /** - * The search query (used for datastore lookup). + * This is a doubly-linked list. */ - GNUNET_HashCode query; - + struct ClientRequestList *next; + /** - * Which peer we should forward the response to. + * This is a doubly-linked list. */ - struct GNUNET_PeerIdentity reply_to; + struct ClientRequestList *prev; /** - * Namespace for the result (only set for SKS requests) + * Request this entry represents. */ - GNUNET_HashCode namespace; + struct PendingRequest *req; /** - * Peer that we should forward the query to if possible - * (since that peer likely has the content). + * Client list this request belongs to. */ - struct GNUNET_PeerIdentity prime_target; + struct ClientList *client_list; + +}; + +/** + * Replies to be transmitted to the client. The actual + * response message is allocated after this struct. + */ +struct ClientResponseMessage +{ /** - * When did we receive this request? + * This is a doubly-linked list. */ - struct GNUNET_TIME_Absolute start_time; + struct ClientResponseMessage *next; /** - * Our entry in the DRQ (non-NULL while we wait for our - * turn to interact with the local database). + * This is a doubly-linked list. */ - struct DatastoreRequestQueue *drq; + struct ClientResponseMessage *prev; /** - * Filter used to eliminate duplicate results. Can be NULL if we - * are not yet filtering any results. + * Client list entry this response belongs to. */ - struct GNUNET_CONTAINER_BloomFilter *bf; + struct ClientList *client_list; /** - * Bitmap describing which of the optional - * hash codes / peer identities were given to us. + * Number of bytes in the response. */ - uint32_t bm; + size_t msize; +}; + +/** + * Linked list of clients we are performing requests + * for right now. + */ +struct ClientList +{ /** - * Desired block type. + * This is a linked list. */ - uint32_t type; + struct ClientList *next; /** - * Priority of the request. + * ID of a client making a request, NULL if this entry is for a + * peer. */ - uint32_t priority; + struct GNUNET_SERVER_Client *client; /** - * Size of the 'bf' (in bytes). + * Head of list of requests performed on behalf + * of this client right now. */ - size_t bf_size; + struct ClientRequestList *rl_head; /** - * In what ways are we going to process - * the request? + * Tail of list of requests performed on behalf + * of this client right now. */ - enum RoutingPolicy policy; + struct ClientRequestList *rl_tail; /** - * Time-to-live for the request (value - * we use). + * Head of linked list of responses. */ - int32_t ttl; + struct ClientResponseMessage *res_head; /** - * Number to mingle hashes for bloom-filter - * tests with. + * Tail of linked list of responses. */ - int32_t mingle; + struct ClientResponseMessage *res_tail; /** - * Number of results that were found so far. + * Context for sending replies. */ - unsigned int results_found; + struct GNUNET_CONNECTION_TransmitHandle *th; + }; /** - * Information we keep for each pending reply. The - * actual message follows at the end of this struct. + * Hash map entry of requests we are performing + * on behalf of the same peer. */ -struct PendingMessage +struct PeerRequestEntry { - /** - * This is a linked list. - */ - struct PendingMessage *next; /** - * Size of the reply; actual reply message follows - * at the end of this struct. + * Request this entry represents. */ - size_t msize; - + struct PendingRequest *req; + /** - * How important is this message for us? + * Entry of peer responsible for this entry. */ - uint32_t priority; + struct ConnectedPeer *cp; }; /** - * All requests from a client are kept in a doubly-linked list. + * Doubly-linked list of messages we are performing + * due to a pending request. */ -struct ClientRequestList; +struct PendingMessageList +{ + + /** + * This is a doubly-linked list of messages on behalf of the same request. + */ + struct PendingMessageList *next; + + /** + * This is a doubly-linked list of messages on behalf of the same request. + */ + struct PendingMessageList *prev; + + /** + * Message this entry represents. + */ + struct PendingMessage *pm; + + /** + * Request this entry belongs to. + */ + struct PendingRequest *req; + + /** + * Peer this message is targeted for. + */ + struct ConnectedPeer *target; + +}; /** @@ -366,23 +388,23 @@ struct PendingRequest { /** - * ID of a client making a request, NULL if this entry is for a - * peer. + * If this request was made by a client, this is our entry in the + * client request list; otherwise NULL. */ - struct GNUNET_SERVER_Client *client; - + struct ClientRequestList *client_request_list; + /** - * If this request was made by a client, - * this is our entry in the client request - * list; otherwise NULL. + * If this request was made by a peer, this is our entry in the + * per-peer multi-hash map; otherwise NULL. */ - struct ClientRequestList *crl_entry; + struct PeerRequestEntry *pht_entry; /** * If this is a namespace query, pointer to the hash of the public - * key of the namespace; otherwise NULL. + * key of the namespace; otherwise NULL. Pointer will be to the + * end of this struct (so no need to free it). */ - GNUNET_HashCode *namespace; + const GNUNET_HashCode *namespace; /** * Bloomfilter we use to filter out replies that we don't care about @@ -396,36 +418,29 @@ struct PendingRequest struct GNUNET_CORE_InformationRequestContext *irc; /** - * Handle for an active request for transmission to this peer, or - * NULL. Only used for replies that we are trying to send to a peer - * that we are not yet connected to. - */ - struct GNUNET_CORE_TransmitHandle *cth; - - /** - * Replies that we have received but were unable to forward yet - * (typically non-null only if we have a pending transmission - * request with the client or the respective peer). + * Hash code of all replies that we have seen so far (only valid + * if client is not NULL since we only track replies like this for + * our own clients). */ - struct PendingMessage *replies_pending; + GNUNET_HashCode *replies_seen; /** - * Pending transmission request for the target client (for processing of - * 'replies_pending'). + * Node in the heap representing this entry; NULL + * if we have no heap node. */ - struct GNUNET_CONNECTION_TransmitHandle *th; + struct GNUNET_CONTAINER_HeapNode *hnode; /** - * Hash code of all replies that we have seen so far (only valid - * if client is not NULL since we only track replies like this for - * our own clients). + * Head of list of messages being performed on behalf of this + * request. */ - GNUNET_HashCode *replies_seen; + struct PendingMessageList *pending_head; /** - * Node in the heap representing this entry. + * Tail of list of messages being performed on behalf of this + * request. */ - struct GNUNET_CONTAINER_HeapNode *hnode; + struct PendingMessageList *pending_tail; /** * When did we first see this request (form this peer), or, if our @@ -444,12 +459,6 @@ struct PendingRequest */ GNUNET_SCHEDULER_TaskIdentifier task; - /** - * (Interned) Peer identifier (only valid if "client" is NULL) - * that identifies a peer that gave us this request. - */ - GNUNET_PEER_Id source_pid; - /** * (Interned) Peer identifier that identifies a preferred target * for requests. @@ -461,6 +470,12 @@ struct PendingRequest * received our query for this content. */ GNUNET_PEER_Id *used_pids; + + /** + * Our entry in the DRQ (non-NULL while we wait for our + * turn to interact with the local database). + */ + struct DatastoreRequestQueue *drq; /** * Size of the 'bf' (in bytes). @@ -482,6 +497,11 @@ struct PendingRequest */ unsigned int used_pids_size; + /** + * Number of results found for this request. + */ + unsigned int results_found; + /** * How many entries in "replies_seen" are actually valid? */ @@ -527,427 +547,534 @@ struct PendingRequest /** - * All requests from a client are kept in a doubly-linked list. + * Our scheduler. */ -struct ClientRequestList -{ - /** - * This is a doubly-linked list. - */ - struct ClientRequestList *next; +static struct GNUNET_SCHEDULER_Handle *sched; - /** - * This is a doubly-linked list. - */ - struct ClientRequestList *prev; +/** + * Our configuration. + */ +const struct GNUNET_CONFIGURATION_Handle *cfg; - /** - * A request from this client. - */ - struct PendingRequest *req; +/** + * Map of peer identifiers to "struct ConnectedPeer" (for that peer). + */ +static struct GNUNET_CONTAINER_MultiHashMap *connected_peers; - /** - * Client list with the head and tail of this DLL. - */ - struct ClientList *cl; -}; +/** + * Map of peer identifiers to "struct PendingRequest" (for that peer). + */ +static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map; +/** + * Map of query identifiers to "struct PendingRequest" (for that query). + */ +static struct GNUNET_CONTAINER_MultiHashMap *query_request_map; /** - * Linked list of all clients that we are currently processing - * requests for. + * Heap with the request that will expire next at the top. Contains + * pointers of type "struct PendingRequest*"; these will *also* be + * aliased from the "requests_by_peer" data structures and the + * "requests_by_query" table. Note that requests from our clients + * don't expire and are thus NOT in the "requests_by_expiration" + * (or the "requests_by_peer" tables). */ -struct ClientList -{ +static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap; - /** - * This is a linked list. - */ - struct ClientList *next; - - /** - * What client is this entry for? - */ - struct GNUNET_SERVER_Client* client; - - /** - * Head of the DLL of requests from this client. - */ - struct ClientRequestList *head; - - /** - * Tail of the DLL of requests from this client. - */ - struct ClientRequestList *tail; - -}; - - -/** - * Closure for "process_reply" function. - */ -struct ProcessReplyClosure -{ - /** - * The data for the reply. - */ - const void *data; - - /** - * When the reply expires. - */ - struct GNUNET_TIME_Absolute expiration; - - /** - * Size of data. - */ - size_t size; - - /** - * Namespace that this reply belongs to - * (if it is of type SBLOCK). - */ - GNUNET_HashCode namespace; - - /** - * Type of the block. - */ - uint32_t type; - - /** - * How much was this reply worth to us? - */ - uint32_t priority; -}; - - -/** - * Information about a peer that we are connected to. - * We track data that is useful for determining which - * peers should receive our requests. - */ -struct ConnectedPeer -{ - - /** - * List of the last clients for which this peer - * successfully answered a query. - */ - struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE]; - - /** - * List of the last PIDs for which - * this peer successfully answered a query; - * We use 0 to indicate no successful reply. - */ - GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE]; - - /** - * Average delay between sending the peer a request and - * getting a reply (only calculated over the requests for - * which we actually got a reply). Calculated - * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n - */ - struct GNUNET_TIME_Relative avg_delay; - - /** - * Handle for an active request for transmission to this - * peer, or NULL. - */ - struct GNUNET_CORE_TransmitHandle *cth; - - /** - * Messages (replies, queries, content migration) we would like to - * send to this peer in the near future. Sorted by priority. - */ - struct PendingMessage *pending_messages; - - /** - * Average priority of successful replies. Calculated - * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n - */ - double avg_priority; - - /** - * The peer's identity. - */ - GNUNET_PEER_Id pid; - - /** - * Number of requests we have currently pending with this peer (that - * is, requests that were transmitted so recently that we would not - * retransmit them right now). - */ - unsigned int pending_requests; - - /** - * Which offset in "last_p2p_replies" will be updated next? - * (we go round-robin). - */ - unsigned int last_p2p_replies_woff; - - /** - * Which offset in "last_client_replies" will be updated next? - * (we go round-robin). - */ - unsigned int last_client_replies_woff; - -}; - - -/** - * Our connection to the datastore. - */ -static struct GNUNET_DATASTORE_Handle *dsh; - -/** - * Our scheduler. - */ -static struct GNUNET_SCHEDULER_Handle *sched; - -/** - * Our configuration. - */ -const struct GNUNET_CONFIGURATION_Handle *cfg; +/** + * Linked list of clients we are currently processing requests for. + */ +struct ClientList *client_list; /** - * Handle to the core service (NULL until we've connected to it). + * Pointer to handle to the core service (points to NULL until we've + * connected to it). */ struct GNUNET_CORE_Handle *core; -/** - * Head of doubly-linked LGC list. - */ -static struct LocalGetContext *lgc_head; - -/** - * Tail of doubly-linked LGC list. - */ -static struct LocalGetContext *lgc_tail; - -/** - * Head of request queue for the datastore, sorted by timeout. - */ -static struct DatastoreRequestQueue *drq_head; - -/** - * Tail of request queue for the datastore. - */ -static struct DatastoreRequestQueue *drq_tail; - -/** - * Map of query hash codes to requests. - */ -static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query; - -/** - * Map of peer IDs to requests (for those requests coming - * from other peers). - */ -static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer; - -/** - * Linked list of all of our clients and their requests. - */ -static struct ClientList *clients; - -/** - * Heap with the request that will expire next at the top. Contains - * pointers of type "struct PendingRequest*"; these will *also* be - * aliased from the "requests_by_peer" data structures and the - * "requests_by_query" table. Note that requests from our clients - * don't expire and are thus NOT in the "requests_by_expiration" - * (or the "requests_by_peer" tables). - */ -static struct GNUNET_CONTAINER_Heap *requests_by_expiration; - -/** - * Map of peer identifiers to "struct ConnectedPeer" (for that peer). - */ -static struct GNUNET_CONTAINER_MultiHashMap *connected_peers; - -/** - * Maximum number of requests (from other peers) that we're - * willing to have pending at any given point in time. - * FIXME: set from configuration (and 32 is a tiny value for testing only). - */ -static uint64_t max_pending_requests = 32; +/* ******************* clean up functions ************************ */ /** - * Run the next DS request in our - * queue, we're done with the current one. + * We're done with a particular message list entry. + * Free all associated resources. + * + * @param pml entry to destroy */ static void -next_ds_request () +destroy_pending_message_list_entry (struct PendingMessageList *pml) { - struct DatastoreRequestQueue *e; - - while (NULL != (e = drq_head)) - { - if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value) - break; - if (e->task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (sched, e->task); - GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); - e->req (e->req_cls, GNUNET_NO); - GNUNET_free (e); - } - if (e == NULL) - return; - if (e->task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (sched, e->task); - e->task = GNUNET_SCHEDULER_NO_TASK; - e->req (e->req_cls, GNUNET_YES); - GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); - GNUNET_free (e); + GNUNET_CONTAINER_DLL_remove (pml->req->pending_head, + pml->req->pending_tail, + pml); + GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head, + pml->target->pending_messages_tail, + pml->pm); + pml->target->pending_requests--; + GNUNET_free (pml->pm); + GNUNET_free (pml); } /** - * A datastore request had to be timed out. + * Destroy the given pending message (and call the respective + * continuation). * - * @param cls closure (of type "struct DatastoreRequestQueue*") - * @param tc task context, unused + * @param pm message to destroy + * @param tpid id of peer that the message was delivered to, or 0 for none */ static void -timeout_ds_request (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +destroy_pending_message (struct PendingMessage *pm, + GNUNET_PEER_Id tpid) { - struct DatastoreRequestQueue *e = cls; + struct PendingMessageList *pml = pm->pml; - e->task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); - e->req (e->req_cls, GNUNET_NO); - GNUNET_free (e); + GNUNET_assert (pml->pm == pm); + GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) ); + pm->cont (pm->cont_cls, 0); + destroy_pending_message_list_entry (pml); } + /** - * Queue a request for the datastore. + * We're done processing a particular request. + * Free all associated resources. * - * @param deadline by when the request should run - * @param fun function to call once the request can be run - * @param fun_cls closure for fun + * @param pr request to destroy */ -static struct DatastoreRequestQueue * -queue_ds_request (struct GNUNET_TIME_Relative deadline, - RequestFunction fun, - void *fun_cls) +static void +destroy_pending_request (struct PendingRequest *pr) { - struct DatastoreRequestQueue *e; - struct DatastoreRequestQueue *bef; + struct GNUNET_PeerIdentity pid; - if (drq_head == NULL) + if (pr->hnode != NULL) { - /* no other requests pending, run immediately */ - fun (fun_cls, GNUNET_OK); - return NULL; + GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap, + pr->hnode); + pr->hnode = NULL; } - e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue)); - e->timeout = GNUNET_TIME_relative_to_absolute (deadline); - e->req = fun; - e->req_cls = fun_cls; - if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) + /* might have already been removed from map + in 'process_reply' if there was a unique + reply; hence ignore the return value here */ + (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map, + &pr->query, + pr); + if (pr->drq != NULL) { - /* local request, highest prio, put at head of queue - regardless of deadline */ - bef = NULL; + GNUNET_FS_drq_get_cancel (pr->drq); + pr->drq = NULL; } - else + if (pr->client_request_list != NULL) + { + GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head, + pr->client_request_list->client_list->rl_tail, + pr->client_request_list); + GNUNET_free (pr->client_request_list); + pr->client_request_list = NULL; + } + if (pr->pht_entry != NULL) + { + GNUNET_PEER_resolve (pr->pht_entry->cp->pid, + &pid); + GNUNET_CONTAINER_multihashmap_remove (peer_request_map, + &pid.hashPubKey, + pr->pht_entry); + GNUNET_free (pr->pht_entry); + pr->pht_entry = NULL; + } + if (pr->bf != NULL) + { + GNUNET_CONTAINER_bloomfilter_free (pr->bf); + pr->bf = NULL; + } + if (pr->irc != NULL) + { + GNUNET_CORE_peer_change_preference_cancel (pr->irc); + pr->irc = NULL; + } + if (pr->replies_seen != NULL) + { + GNUNET_free (pr->replies_seen); + pr->replies_seen = NULL; + } + if (pr->task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (sched, + pr->task); + pr->task = GNUNET_SCHEDULER_NO_TASK; + } + while (NULL != pr->pending_head) + destroy_pending_message_list_entry (pr->pending_head); + GNUNET_PEER_change_rc (pr->target_pid, -1); + if (pr->used_pids != NULL) { - bef = drq_tail; - while ( (NULL != bef) && - (e->timeout.value < bef->timeout.value) ) - bef = bef->prev; + GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off); + GNUNET_free (pr->used_pids); + pr->used_pids_off = 0; + pr->used_pids_size = 0; + pr->used_pids = NULL; } - GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e); - if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) - return e; - e->task = GNUNET_SCHEDULER_add_delayed (sched, - deadline, - &timeout_ds_request, - e); - return e; + GNUNET_free (pr); } /** - * Free the state associated with a local get context. + * Method called whenever a given peer connects. * - * @param lgc the lgc to free + * @param cls closure, not used + * @param peer peer identity this notification is about + * @param latency reported latency of the connection with 'other' + * @param distance reported distance (DV) to 'other' */ -static void -local_get_context_free (struct LocalGetContext *lgc) +static void +peer_connect_handler (void *cls, + const struct + GNUNET_PeerIdentity * peer, + struct GNUNET_TIME_Relative latency, + uint32_t distance) { - GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc); - GNUNET_SERVER_client_drop (lgc->client); - GNUNET_free_non_null (lgc->results); - if (lgc->results_bf != NULL) - GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf); - if (lgc->req != NULL) - { - if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (sched, lgc->req->task); - GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc); - GNUNET_free (lgc->req); - } - GNUNET_free (lgc); + struct ConnectedPeer *cp; + + cp = GNUNET_malloc (sizeof (struct ConnectedPeer)); + cp->pid = GNUNET_PEER_intern (peer); + GNUNET_CONTAINER_multihashmap_put (connected_peers, + &peer->hashPubKey, + cp, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); } /** - * We're able to transmit the next (local) result to the client. - * Do it and ask the datastore for more. Or, on error, tell - * the datastore to stop giving us more. + * Free (each) request made by the peer. * - * @param cls our closure (struct LocalGetContext) - * @param max maximum number of bytes we can transmit - * @param buf where to copy our message - * @return number of bytes copied to buf + * @param cls closure, points to peer that the request belongs to + * @param key current key code + * @param value value in the hash map + * @return GNUNET_YES (we should continue to iterate) */ -static size_t -transmit_local_result (void *cls, - size_t max, - void *buf) +static int +destroy_request (void *cls, + const GNUNET_HashCode * key, + void *value) { - struct LocalGetContext *lgc = cls; - uint16_t msize; - - if (NULL == buf) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit result to local client, aborting datastore iteration.\n"); -#endif - /* error, abort! */ - GNUNET_free (lgc->result); - lgc->result = NULL; - GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); - return 0; - } - msize = ntohs (lgc->result->header.size); -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u bytes of result to local client.\n", - msize); -#endif - GNUNET_assert (max >= msize); - memcpy (buf, lgc->result, msize); - GNUNET_free (lgc->result); - lgc->result = NULL; - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); - return msize; + const struct GNUNET_PeerIdentity * peer = cls; + struct PendingRequest *pr = value; + + GNUNET_CONTAINER_multihashmap_remove (peer_request_map, + &peer->hashPubKey, + pr); + destroy_pending_request (pr); + return GNUNET_YES; } /** - * Mingle hash with the mingle_number to produce different bits. + * Method called whenever a peer disconnects. + * + * @param cls closure, not used + * @param peer peer identity this notification is about */ static void -mingle_hash (const GNUNET_HashCode * in, - int32_t mingle_number, +peer_disconnect_handler (void *cls, + const struct + GNUNET_PeerIdentity * peer) +{ + struct ConnectedPeer *cp; + struct PendingMessage *pm; + unsigned int i; + + GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map, + &peer->hashPubKey, + &destroy_request, + (void*) peer); + cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, + &peer->hashPubKey); + if (cp == NULL) + return; + for (i=0;ilast_client_replies[i]) + { + GNUNET_SERVER_client_drop (cp->last_client_replies[i]); + cp->last_client_replies[i] = NULL; + } + } + GNUNET_CONTAINER_multihashmap_remove (connected_peers, + &peer->hashPubKey, + cp); + GNUNET_PEER_change_rc (cp->pid, -1); + GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); + if (NULL != cp->cth) + GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); + while (NULL != (pm = cp->pending_messages_head)) + destroy_pending_message (pm, 0 /* delivery failed */); + GNUNET_break (0 == cp->pending_requests); + GNUNET_free (cp); +} + + +/** + * Iterator over hash map entries that removes all occurences + * of the given 'client' from the 'last_client_replies' of the + * given connected peer. + * + * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove + * @param key current key code (unused) + * @param value value in the hash map (the 'struct ConnectedPeer*' to change) + * @return GNUNET_YES (we should continue to iterate) + */ +static int +remove_client_from_last_client_replies (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct GNUNET_SERVER_Client *client = cls; + struct ConnectedPeer *cp = value; + unsigned int i; + + for (i=0;ilast_client_replies[i] == client) + { + GNUNET_SERVER_client_drop (cp->last_client_replies[i]); + cp->last_client_replies[i] = NULL; + } + } + return GNUNET_YES; +} + + +/** + * A client disconnected. Remove all of its pending queries. + * + * @param cls closure, NULL + * @param client identification of the client + */ +static void +handle_client_disconnect (void *cls, + struct GNUNET_SERVER_Client + * client) +{ + struct ClientList *pos; + struct ClientList *prev; + struct ClientRequestList *rcl; + struct ClientResponseMessage *creply; + + if (client == NULL) + return; /* huh? is this allowed? */ + prev = NULL; + pos = client_list; + while ( (NULL != pos) && + (pos->client != client) ) + { + prev = pos; + pos = pos->next; + } + if (pos == NULL) + return; /* no requests pending for this client */ + while (NULL != (rcl = pos->rl_head)) + destroy_pending_request (rcl->req); + if (prev == NULL) + client_list = pos->next; + else + prev->next = pos->next; + if (pos->th != NULL) + { + GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th); + pos->th = NULL; + } + while (NULL != (creply = pos->res_head)) + { + GNUNET_CONTAINER_DLL_remove (pos->res_head, + pos->res_tail, + creply); + GNUNET_free (creply); + } + GNUNET_SERVER_client_drop (pos->client); + GNUNET_free (pos); + GNUNET_CONTAINER_multihashmap_iterate (connected_peers, + &remove_client_from_last_client_replies, + client); +} + + +/** + * Iterator to free peer entries. + * + * @param cls closure, unused + * @param key current key code + * @param value value in the hash map (peer entry) + * @return GNUNET_YES (we should continue to iterate) + */ +static int +clean_peer (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key); + return GNUNET_YES; +} + + +/** + * Task run during shutdown. + * + * @param cls unused + * @param tc unused + */ +static void +shutdown_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + GNUNET_CONTAINER_multihashmap_iterate (connected_peers, + &clean_peer, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (connected_peers); + connected_peers = NULL; + while (client_list != NULL) + handle_client_disconnect (NULL, + client_list->client); + GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap)); + GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap); + requests_by_expiration_heap = 0; + GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map)); + GNUNET_CONTAINER_multihashmap_destroy (query_request_map); + query_request_map = NULL; + GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map)); + GNUNET_CONTAINER_multihashmap_destroy (peer_request_map); + peer_request_map = NULL; + GNUNET_assert (NULL != core); + GNUNET_CORE_disconnect (core); + core = NULL; + sched = NULL; + cfg = NULL; +} + + +/* ******************* Utility functions ******************** */ + + +/** + * Transmit the given message by copying it to the target buffer + * "buf". "buf" will be NULL and "size" zero if the socket was closed + * for writing in the meantime. In that case, do nothing + * (the disconnect or shutdown handler will take care of the rest). + * If we were able to transmit messages and there are still more + * pending, ask core again for further calls to this function. + * + * @param cls closure, pointer to the 'struct ConnectedPeer*' + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +transmit_to_peer (void *cls, + size_t size, void *buf) +{ + struct ConnectedPeer *cp = cls; + char *cbuf = buf; + struct GNUNET_PeerIdentity pid; + struct PendingMessage *pm; + size_t msize; + + cp->cth = NULL; + if (NULL == buf) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Dropping reply, core too busy.\n"); +#endif + return 0; + } + msize = 0; + while ( (NULL != (pm = cp->pending_messages_head) ) && + (pm->msize <= size) ) + { + memcpy (&cbuf[msize], &pm[1], pm->msize); + msize += pm->msize; + size -= pm->msize; + destroy_pending_message (pm, cp->pid); + } + if (NULL != pm) + { + GNUNET_PEER_resolve (cp->pid, + &pid); + cp->cth = GNUNET_CORE_notify_transmit_ready (core, + pm->priority, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + &pid, + pm->msize, + &transmit_to_peer, + pm); + } + return msize; +} + + +/** + * Add a message to the set of pending messages for the given peer. + * + * @param cp peer to send message to + * @param pm message to queue + * @param pr request on which behalf this message is being queued + */ +static void +add_to_pending_messages_for_peer (struct ConnectedPeer *cp, + struct PendingMessage *pm, + struct PendingRequest *pr) +{ + struct PendingMessage *pos; + struct PendingMessageList *pml; + struct GNUNET_PeerIdentity pid; + + GNUNET_assert (pm->next == NULL); + GNUNET_assert (pm->pml == NULL); + pml = GNUNET_malloc (sizeof (struct PendingMessageList)); + pml->req = pr; + pml->target = cp; + pml->pm = pm; + pm->pml = pml; + GNUNET_CONTAINER_DLL_insert (pr->pending_head, + pr->pending_tail, + pml); + pos = cp->pending_messages_head; + while ( (pos != NULL) && + (pm->priority < pos->priority) ) + pos = pos->next; + GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head, + cp->pending_messages_tail, + pos, + pm); + cp->pending_requests++; + if (cp->pending_requests > MAX_QUEUE_PER_PEER) + destroy_pending_message (cp->pending_messages_tail, 0); + if (cp->cth == NULL) + { + /* need to schedule transmission */ + GNUNET_PEER_resolve (cp->pid, &pid); + cp->cth = GNUNET_CORE_notify_transmit_ready (core, + cp->pending_messages_head->priority, + GNUNET_TIME_UNIT_FOREVER_REL, + &pid, + cp->pending_messages_head->msize, + &transmit_to_peer, + cp); + } + if (cp->cth == NULL) + { + /* FIXME: call stats (rare, bad case) */ + } +} + + +/** + * Mingle hash with the mingle_number to produce different bits. + */ +static void +mingle_hash (const GNUNET_HashCode * in, + int32_t mingle_number, GNUNET_HashCode * hc) { GNUNET_HashCode m; @@ -959,6 +1086,48 @@ mingle_hash (const GNUNET_HashCode * in, } +/** + * Test if the load on this peer is too high + * to even consider processing the query at + * all. + * + * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise + */ +static int +test_load_too_high () +{ + return GNUNET_NO; // FIXME +} + + +/* ******************* Pending Request Refresh Task ******************** */ + + +/** + * Function called after we either failed or succeeded + * at transmitting a query to a peer. + * + * @param cls the requests "struct PendingRequest*" + * @param pid ID of receiving peer, 0 on transmission error + */ +static void +transmit_query_continuation (void *cls, + GNUNET_PEER_Id tpid) +{ + struct PendingRequest *pr = cls; + + if (tpid == 0) + return; + GNUNET_PEER_change_rc (tpid, 1); + if (pr->used_pids_off == pr->used_pids_size) + GNUNET_array_grow (pr->used_pids, + pr->used_pids_size, + pr->used_pids_size * 2 + 2); + pr->used_pids[pr->used_pids_off++] = tpid; +} + + +#if 0 /** * How many bytes should a bloomfilter be if we have already seen * entry_count responses? Note that BLOOMFILTER_K gives us the number @@ -1024,75 +1193,15 @@ refresh_bloomfilter (unsigned int count, } return bf; } +#endif /** - * Closure used for "target_peer_select_cb". - */ -struct PeerSelectionContext -{ - /** - * The request for which we are selecting - * peers. - */ - struct PendingRequest *pr; - - /** - * Current "prime" target. - */ - struct GNUNET_PeerIdentity target; - - /** - * How much do we like this target? - */ - double target_score; - -}; - - -/** - * Function called for each connected peer to determine - * which one(s) would make good targets for forwarding. + * We use a random delay to make the timing of requests less + * predictable. This function returns such a random delay. * - * @param cls closure (struct PeerSelectionContext) - * @param key current key code (peer identity) - * @param value value in the hash map (struct ConnectedPeer) - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. - */ -static int -target_peer_select_cb (void *cls, - const GNUNET_HashCode * key, - void *value) -{ - struct PeerSelectionContext *psc = cls; - struct ConnectedPeer *cp = value; - struct PendingRequest *pr = psc->pr; - double score; - unsigned int i; - - /* 1) check if we have already (recently) forwarded to this peer */ - for (i=0;iused_pids_off;i++) - if (pr->used_pids[i] == cp->pid) - return GNUNET_YES; /* skip */ - // 2) calculate how much we'd like to forward to this peer - score = 0; // FIXME! - - /* store best-fit in closure */ - if (score > psc->target_score) - { - psc->target_score = score; - psc->target.hashPubKey = *key; - } - return GNUNET_YES; -} - - -/** - * We use a random delay to make the timing of requests - * less predictable. This function returns such a random - * delay. + * FIXME: make schedule dependent on the specifics of the request? + * Or bandwidth and number of connected peers and load? * * @return random delay to use for some request, between 0 and TTL_DECREMENT ms */ @@ -1106,79 +1215,16 @@ get_processing_delay () /** - * Task that is run for each request with the goal of forwarding the - * associated query to other peers. The task should re-schedule - * itself to be re-run once the TTL has expired. (or at a later time - * if more peers should be queried earlier). + * Function called after we've tried to reserve a certain amount of + * bandwidth for a reply. Check if we succeeded and if so send our + * query. * * @param cls the requests "struct PendingRequest*" - * @param tc task context (unused) - */ -static void -forward_request_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc); - - -/** - * We've selected a peer for forwarding of a query. Construct the - * message and then re-schedule the task to forward again to (other) - * peers. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_request_cb (void *cls, - size_t size, - void *buf) -{ - struct ConnectedPeer *cp = cls; - char *cbuf = buf; - struct GNUNET_PeerIdentity target; - struct PendingMessage *pr; - size_t tot; - - cp->cth = NULL; - tot = 0; - while ( (NULL != (pr = cp->pending_messages)) && - (pr->msize < size - tot) ) - { - memcpy (&cbuf[tot], - &pr[1], - pr->msize); - tot += pr->msize; - cp->pending_messages = pr->next; - GNUNET_free (pr); - } - if (NULL != pr) - { - GNUNET_PEER_resolve (cp->pid, - &target); - cp->cth = GNUNET_CORE_notify_transmit_ready (core, - pr->priority, - GNUNET_TIME_UNIT_FOREVER_REL, - &target, - pr->msize, - &transmit_request_cb, - cp); - } - return tot; -} - - -/** - * Function called after we've tried to reserve a certain amount of - * bandwidth for a reply. Check if we succeeded and if so send our - * query. - * - * @param cls the requests "struct PendingRequest*" - * @param peer identifies the peer - * @param bpm_in set to the current bandwidth limit (receiving) for this peer - * @param bpm_out set to the current bandwidth limit (sending) for this peer - * @param amount set to the amount that was actually reserved or unreserved - * @param preference current traffic preference for the given peer + * @param peer identifies the peer + * @param bpm_in set to the current bandwidth limit (receiving) for this peer + * @param bpm_out set to the current bandwidth limit (sending) for this peer + * @param amount set to the amount that was actually reserved or unreserved + * @param preference current traffic preference for the given peer */ static void target_reservation_cb (void *cls, @@ -1192,18 +1238,12 @@ target_reservation_cb (void *cls, struct PendingRequest *pr = cls; struct ConnectedPeer *cp; struct PendingMessage *pm; - struct PendingMessage *pos; - struct PendingMessage *prev; struct GetMessage *gm; GNUNET_HashCode *ext; char *bfdata; size_t msize; unsigned int k; - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - get_processing_delay (), // FIXME: longer? - &forward_request_task, - pr); pr->irc = NULL; GNUNET_assert (peer != NULL); if (amount != DBLOCK_SIZE) @@ -1211,15 +1251,12 @@ target_reservation_cb (void *cls, /* FIXME: call stats... */ return; /* this target round failed */ } - // (2) transmit, update ttl/priority + // (3) transmit, update ttl/priority cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, &peer->hashPubKey); if (cp == NULL) { - /* Peer must have just left; try again immediately */ - pr->task = GNUNET_SCHEDULER_add_now (sched, - &forward_request_task, - pr); + /* Peer must have just left */ return; } /* build message and insert message into priority queue */ @@ -1228,7 +1265,6 @@ target_reservation_cb (void *cls, GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); pm->msize = msize; - pm->priority = 0; // FIXME: calculate priority properly! gm = (struct GetMessage*) &pm[1]; gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); gm->header.size = htons (msize); @@ -1236,784 +1272,570 @@ target_reservation_cb (void *cls, pr->remaining_priority /= 2; gm->priority = htonl (pr->remaining_priority); gm->ttl = htonl (pr->ttl); - gm->filter_mutator = htonl(pr->mingle); - gm->hash_bitmap = htonl (42); + gm->filter_mutator = htonl(pr->mingle); // FIXME: bad endianess conversion? + gm->hash_bitmap = htonl (42); // FIXME! gm->query = pr->query; ext = (GNUNET_HashCode*) &gm[1]; - // FIXME: setup "ext[0]..[k-1]" bfdata = (char *) &ext[k]; if (pr->bf != NULL) GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, bfdata, pr->bf_size); + pm->cont = &transmit_query_continuation; + pm->cont_cls = pr; + add_to_pending_messages_for_peer (cp, pm, pr); +} - prev = NULL; - pos = cp->pending_messages; - while ( (pos != NULL) && - (pm->priority < pos->priority) ) - { - prev = pos; - pos = pos->next; - } - if (prev == NULL) - cp->pending_messages = pm; - else - prev->next = pm; - pm->next = pos; - if (cp->cth == NULL) - cp->cth = GNUNET_CORE_notify_transmit_ready (core, - cp->pending_messages->priority, - GNUNET_TIME_UNIT_FOREVER_REL, - peer, - msize, - &transmit_request_cb, - cp); - if (cp->cth == NULL) + +/** + * Closure used for "target_peer_select_cb". + */ +struct PeerSelectionContext +{ + /** + * The request for which we are selecting + * peers. + */ + struct PendingRequest *pr; + + /** + * Current "prime" target. + */ + struct GNUNET_PeerIdentity target; + + /** + * How much do we like this target? + */ + double target_score; + +}; + + +/** + * Function called for each connected peer to determine + * which one(s) would make good targets for forwarding. + * + * @param cls closure (struct PeerSelectionContext) + * @param key current key code (peer identity) + * @param value value in the hash map (struct ConnectedPeer) + * @return GNUNET_YES if we should continue to + * iterate, + * GNUNET_NO if not. + */ +static int +target_peer_select_cb (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct PeerSelectionContext *psc = cls; + struct ConnectedPeer *cp = value; + struct PendingRequest *pr = psc->pr; + double score; + unsigned int i; + + /* 1) check if we have already (recently) forwarded to this peer */ + for (i=0;iused_pids_off;i++) + if (pr->used_pids[i] == cp->pid) + return GNUNET_YES; /* skip */ + // 2) calculate how much we'd like to forward to this peer + score = 42; // FIXME! + // FIXME: also need API to gather data on responsiveness + // of this peer (we have fields for that in 'cp', but + // they are never set!) + + /* store best-fit in closure */ + if (score > psc->target_score) { - /* technically, this should not be a 'break'; but - we don't handle this (rare) case yet, so let's warn - about it... */ - GNUNET_break (0); - // FIXME: now what? + psc->target_score = score; + psc->target.hashPubKey = *key; } + return GNUNET_YES; } - + /** - * Task that is run for each request with the goal of forwarding the - * associated query to other peers. The task should re-schedule - * itself to be re-run once the TTL has expired. (or at a later time - * if more peers should be queried earlier). + * We're processing a GET request from another peer and have decided + * to forward it to other peers. This function is called periodically + * and should forward the request to other peers until we have all + * possible replies. If we have transmitted the *only* reply to + * the initiator we should destroy the pending request. If we have + * many replies in the queue to the initiator, we should delay sending + * out more queries until the reply queue has shrunk some. * - * @param cls the requests "struct PendingRequest*" - * @param tc task context (unused) + * @param cls our "struct ProcessGetContext *" + * @param tc unused */ static void forward_request_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct PendingRequest *pr = cls; struct PeerSelectionContext psc; + struct ConnectedPeer *cp; - pr->task = GNUNET_SCHEDULER_NO_TASK; + pr->task = GNUNET_SCHEDULER_add_delayed (sched, + get_processing_delay (), + &forward_request_task, + pr); + if (pr->irc != NULL) + return; /* previous request still pending */ /* (1) select target */ psc.pr = pr; psc.target_score = DBL_MIN; GNUNET_CONTAINER_multihashmap_iterate (connected_peers, &target_peer_select_cb, - &psc); + &psc); if (psc.target_score == DBL_MIN) - { - /* no possible target found, wait some time */ - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - get_processing_delay (), // FIXME: exponential back-off? or at least wait longer... - &forward_request_task, - pr); - return; - } + return; /* nobody selected */ + /* (2) reserve reply bandwidth */ - GNUNET_assert (NULL == pr->irc); + cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, + &psc.target.hashPubKey); pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg, &psc.target, GNUNET_CONSTANTS_SERVICE_TIMEOUT, - -1, - DBLOCK_SIZE, // FIXME: make dependent on type? - 0, + (uint32_t) -1 /* no limit */, + DBLOCK_SIZE, + (uint64_t) cp->inc_preference, &target_reservation_cb, pr); + cp->inc_preference = 0.0; } -/** - * We're processing (local) results for a search request - * from a (local) client. Pass applicable results to the - * client and if we are done either clean up (operation - * complete) or switch to P2P search (more results possible). - * - * @param cls our closure (struct LocalGetContext) - * @param key key for the content - * @param size number of bytes in data - * @param data content stored - * @param type type of the content - * @param priority priority of the content - * @param anonymity anonymity-level for the content - * @param expiration expiration time for the content - * @param uid unique identifier for the datum; - * maybe 0 if no unique identifier is available - */ -static void -process_local_get_result (void *cls, - const GNUNET_HashCode * key, - uint32_t size, - const void *data, - uint32_t type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, - uint64_t uid) -{ - struct LocalGetContext *lgc = cls; - struct PendingRequest *pr; - struct ClientRequestList *crl; - struct ClientList *cl; - size_t msize; - unsigned int i; - - if (key == NULL) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received last result for `%s' from local datastore, deciding what to do next.\n", - GNUNET_h2s (&lgc->query)); -#endif - /* no further results from datastore; continue - processing further requests from the client and - allow the next task to use the datastore; also, - switch to P2P requests or clean up our state. */ - next_ds_request (); - GNUNET_SERVER_receive_done (lgc->client, - GNUNET_OK); - if ( (lgc->results_used == 0) || - (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) || - (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) || - (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Forwarding query for `%s' to network.\n", - GNUNET_h2s (&lgc->query)); -#endif - cl = clients; - while ( (NULL != cl) && - (cl->client != lgc->client) ) - cl = cl->next; - if (cl == NULL) - { - cl = GNUNET_malloc (sizeof (struct ClientList)); - cl->client = lgc->client; - cl->next = clients; - clients = cl; - } - crl = GNUNET_malloc (sizeof (struct ClientRequestList)); - crl->cl = cl; - GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl); - pr = GNUNET_malloc (sizeof (struct PendingRequest)); - pr->client = lgc->client; - GNUNET_SERVER_client_keep (pr->client); - pr->crl_entry = crl; - crl->req = pr; - if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) - { - pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode)); - *pr->namespace = lgc->namespace; - } - pr->replies_seen = lgc->results; - lgc->results = NULL; - pr->start_time = GNUNET_TIME_absolute_get (); - pr->query = lgc->query; - pr->target_pid = GNUNET_PEER_intern (&lgc->target); - pr->replies_seen_off = lgc->results_used; - pr->replies_seen_size = lgc->results_size; - lgc->results_size = 0; - pr->type = lgc->type; - pr->anonymity_level = lgc->anonymity_level; - pr->bf = refresh_bloomfilter (pr->replies_seen_off, - &pr->mingle, - &pr->bf_size, - pr->replies_seen); - GNUNET_CONTAINER_multihashmap_put (requests_by_query, - &pr->query, - pr, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - get_processing_delay (), - &forward_request_task, - pr); - local_get_context_free (lgc); - return; - } - /* got all possible results, clean up! */ -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found all possible results for query for `%s', done!\n", - GNUNET_h2s (&lgc->query)); -#endif - local_get_context_free (lgc); - return; - } - if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received on-demand block for `%s' from local datastore, fetching data.\n", - GNUNET_h2s (&lgc->query)); -#endif - GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, - anonymity, expiration, uid, - dsh, - &process_local_get_result, - lgc); - return; - } - if ( (type != lgc->type) && - (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) ) - { - /* this should be virtually impossible to reach (DBLOCK - query hash being identical to KBLOCK/SBLOCK query hash); - nevertheless, if it happens, the correct thing is to - simply skip the result. */ -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n", - type, - lgc->type, - GNUNET_h2s (&lgc->query)); -#endif - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); - return; - } - /* check if this is a result we've alredy - received */ - for (i=0;iresults_used;i++) - if (0 == memcmp (key, - &lgc->results[i], - sizeof (GNUNET_HashCode))) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received duplicate result for `%s' from local datastore, ignoring.\n", - GNUNET_h2s (&lgc->query)); -#endif - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); - return; - } - if (lgc->results_used == lgc->results_size) - GNUNET_array_grow (lgc->results, - lgc->results_size, - lgc->results_size * 2 + 2); - GNUNET_CRYPTO_hash (data, - size, - &lgc->results[lgc->results_used++]); - msize = size + sizeof (struct ContentMessage); - GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); - lgc->result = GNUNET_malloc (msize); - lgc->result->header.size = htons (msize); - lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT); - lgc->result->type = htonl (type); - lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration); - memcpy (&lgc->result[1], - data, - size); -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received new result for `%s' from local datastore, passing to client.\n", - GNUNET_h2s (&lgc->query)); -#endif - GNUNET_SERVER_notify_transmit_ready (lgc->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_local_result, - lgc); -} - - -/** - * We're processing a search request from a local - * client. Now it is our turn to query the datastore. - * - * @param cls our closure (struct LocalGetContext) - * @param tc unused - */ -static void -transmit_local_get (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct LocalGetContext *lgc = cls; - uint32_t type; - - type = lgc->type; - if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK) - type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */ - GNUNET_DATASTORE_get (dsh, - &lgc->query, - type, - &process_local_get_result, - lgc, - GNUNET_TIME_UNIT_FOREVER_REL); -} - - -/** - * We're processing a search request from a local - * client. Now it is our turn to query the datastore. - * - * @param cls our closure (struct LocalGetContext) - * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK - */ -static void -transmit_local_get_ready (void *cls, - int ok) -{ - struct LocalGetContext *lgc = cls; - - GNUNET_assert (GNUNET_OK == ok); - GNUNET_SCHEDULER_add_continuation (sched, - &transmit_local_get, - lgc, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); -} +/* **************************** P2P PUT Handling ************************ */ /** - * Handle START_SEARCH-message (search request from client). + * Function called after we either failed or succeeded + * at transmitting a reply to a peer. * - * @param cls closure - * @param client identification of the client - * @param message the actual message + * @param cls the requests "struct PendingRequest*" + * @param pid ID of receiving peer, 0 on transmission error */ static void -handle_start_search (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +transmit_reply_continuation (void *cls, + GNUNET_PEER_Id tpid) { - const struct SearchMessage *sm; - struct LocalGetContext *lgc; - uint16_t msize; - unsigned int sc; - - msize = ntohs (message->size); - if ( (msize < sizeof (struct SearchMessage)) || - (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) ) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, - GNUNET_SYSERR); - return; - } - sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode); - sm = (const struct SearchMessage*) message; - GNUNET_SERVER_client_keep (client); - lgc = GNUNET_malloc (sizeof (struct LocalGetContext)); - if (sc > 0) - { - lgc->results_used = sc; - GNUNET_array_grow (lgc->results, - lgc->results_size, - sc * 2); - memcpy (lgc->results, - &sm[1], - sc * sizeof (GNUNET_HashCode)); - } - lgc->client = client; - lgc->type = ntohl (sm->type); - lgc->anonymity_level = ntohl (sm->anonymity_level); - switch (lgc->type) + struct PendingRequest *pr = cls; + + switch (pr->type) { case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: - lgc->target.hashPubKey = sm->target; + /* only one reply expected, done with the request! */ + destroy_pending_request (pr); break; + case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK: case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: - lgc->namespace = sm->target; break; default: + GNUNET_break (0); break; } - lgc->query = sm->query; - GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc); - lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_local_get_ready, - lgc); } /** - * List of handlers for the messages understood by this - * service. - */ -static struct GNUNET_SERVER_MessageHandler handlers[] = { - {&GNUNET_FS_handle_index_start, NULL, - GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0}, - {&GNUNET_FS_handle_index_list_get, NULL, - GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) }, - {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, - sizeof (struct UnindexMessage) }, - {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, - 0 }, - {NULL, NULL, 0, 0} -}; - - -/** - * Clean up the memory used by the PendingRequest structure (except - * for the client or peer list that the request may be part of). + * Check if the given KBlock is well-formed. * - * @param pr request to clean up + * @param kb the kblock data (or at least "dsize" bytes claiming to be one) + * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)! + * @param query where to store the query that this block answers + * @return GNUNET_OK if this is actually a well-formed KBlock */ -static void -destroy_pending_request (struct PendingRequest *pr) +static int +check_kblock (const struct KBlock *kb, + size_t dsize, + GNUNET_HashCode *query) { - struct PendingMessage *reply; - struct ClientList *cl; - - GNUNET_CONTAINER_multihashmap_remove (requests_by_query, - &pr->query, - pr); - // FIXME: not sure how this can work (efficiently) - // also, what does the return value mean? - if (pr->irc != NULL) - { - GNUNET_CORE_peer_change_preference_cancel (pr->irc); - pr->irc = NULL; - } - if (pr->client == NULL) + if (dsize < sizeof (struct KBlock)) { - GNUNET_CONTAINER_heap_remove_node (requests_by_expiration, - pr->hnode); + GNUNET_break_op (0); + return GNUNET_SYSERR; } - else + if (dsize - sizeof (struct KBlock) != + ntohs (kb->purpose.size) + - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) + - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) { - cl = pr->crl_entry->cl; - GNUNET_CONTAINER_DLL_remove (cl->head, - cl->tail, - pr->crl_entry); + GNUNET_break_op (0); + return GNUNET_SYSERR; } - if (GNUNET_SCHEDULER_NO_TASK != pr->task) - GNUNET_SCHEDULER_cancel (sched, pr->task); - if (NULL != pr->bf) - GNUNET_CONTAINER_bloomfilter_free (pr->bf); - if (NULL != pr->th) - GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th); - while (NULL != (reply = pr->replies_pending)) + if (GNUNET_OK != + GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK, + &kb->purpose, + &kb->signature, + &kb->keyspace)) { - pr->replies_pending = reply->next; - GNUNET_free (reply); + GNUNET_break_op (0); + return GNUNET_SYSERR; } - if (NULL != pr->cth) - GNUNET_CORE_notify_transmit_ready_cancel (pr->cth); - GNUNET_PEER_change_rc (pr->source_pid, -1); - GNUNET_PEER_change_rc (pr->target_pid, -1); - GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off); - GNUNET_free_non_null (pr->used_pids); - GNUNET_free_non_null (pr->replies_seen); - GNUNET_free_non_null (pr->namespace); - GNUNET_free (pr); + if (query != NULL) + GNUNET_CRYPTO_hash (&kb->keyspace, + sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + query); + return GNUNET_OK; } /** - * A client disconnected. Remove all of its pending queries. + * Check if the given SBlock is well-formed. * - * @param cls closure, NULL - * @param client identification of the client + * @param sb the sblock data (or at least "dsize" bytes claiming to be one) + * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)! + * @param query where to store the query that this block answers + * @param namespace where to store the namespace that this block belongs to + * @return GNUNET_OK if this is actually a well-formed SBlock */ -static void -handle_client_disconnect (void *cls, - struct GNUNET_SERVER_Client - * client) +static int +check_sblock (const struct SBlock *sb, + size_t dsize, + GNUNET_HashCode *query, + GNUNET_HashCode *namespace) { - struct LocalGetContext *lgc; - struct ClientList *cpos; - struct ClientList *cprev; - struct ClientRequestList *rl; - - if (client == NULL) - return; - lgc = lgc_head; - while ( (NULL != lgc) && - (lgc->client != client) ) - lgc = lgc->next; - if (lgc != NULL) - local_get_context_free (lgc); - cprev = NULL; - cpos = clients; - while ( (NULL != cpos) && - (clients->client != client) ) + if (dsize < sizeof (struct SBlock)) { - cprev = cpos; - cpos = cpos->next; + GNUNET_break_op (0); + return GNUNET_SYSERR; } - if (cpos != NULL) + if (dsize != + ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature)) { - if (cprev == NULL) - clients = cpos->next; - else - cprev->next = cpos->next; - while (NULL != (rl = cpos->head)) - { - cpos->head = rl->next; - destroy_pending_request (rl->req); - GNUNET_free (rl); - } - GNUNET_free (cpos); + GNUNET_break_op (0); + return GNUNET_SYSERR; } + if (GNUNET_OK != + GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK, + &sb->purpose, + &sb->signature, + &sb->subspace)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (query != NULL) + *query = sb->identifier; + if (namespace != NULL) + GNUNET_CRYPTO_hash (&sb->subspace, + sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + namespace); + return GNUNET_OK; } /** - * Iterator over entries in the "requests_by_query" map - * that frees all the entries. - * - * @param cls closure, NULL - * @param key current key code (the query, unused) - * @param value value in the hash map, of type "struct PendingRequest*" - * @return GNUNET_YES (we should continue to iterate) - */ -static int -destroy_pending_request_cb (void *cls, - const GNUNET_HashCode * key, - void *value) -{ - struct PendingRequest *pr = value; - - destroy_pending_request (pr); - return GNUNET_YES; -} - - -/** - * Task run during shutdown. + * Transmit the given message by copying it to the target buffer + * "buf". "buf" will be NULL and "size" zero if the socket was closed + * for writing in the meantime. In that case, do nothing + * (the disconnect or shutdown handler will take care of the rest). + * If we were able to transmit messages and there are still more + * pending, ask core again for further calls to this function. * - * @param cls unused - * @param tc unused + * @param cls closure, pointer to the 'struct ClientList*' + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf */ -static void -shutdown_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +static size_t +transmit_to_client (void *cls, + size_t size, void *buf) { - if (NULL != core) - { - GNUNET_CORE_disconnect (core); - core = NULL; - } - if (NULL != dsh) + struct ClientList *cl = cls; + char *cbuf = buf; + struct ClientResponseMessage *creply; + size_t msize; + + cl->th = NULL; + if (NULL == buf) { - GNUNET_DATASTORE_disconnect (dsh, - GNUNET_NO); - dsh = NULL; +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Not sending reply, client communication problem.\n"); +#endif + return 0; } - GNUNET_CONTAINER_multihashmap_iterate (requests_by_query, - &destroy_pending_request_cb, - NULL); - while (clients != NULL) - handle_client_disconnect (NULL, - clients->client); - GNUNET_CONTAINER_multihashmap_destroy (requests_by_query); - requests_by_query = NULL; - GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer); - requests_by_peer = NULL; - GNUNET_CONTAINER_heap_destroy (requests_by_expiration); - requests_by_expiration = NULL; - // FIXME: iterate over entries and free individually? - // (or do we get disconnect notifications?) - GNUNET_CONTAINER_multihashmap_destroy (connected_peers); - connected_peers = NULL; + msize = 0; + while ( (NULL != (creply = cl->res_head) ) && + (creply->msize <= size) ) + { + memcpy (&cbuf[msize], &creply[1], creply->msize); + msize += creply->msize; + size -= creply->msize; + GNUNET_CONTAINER_DLL_remove (cl->res_head, + cl->res_tail, + creply); + GNUNET_free (creply); + } + if (NULL != creply) + cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client, + creply->msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_to_client, + cl); + return msize; } /** - * Free (each) request made by the peer. - * - * @param cls closure, points to peer that the request belongs to - * @param key current key code - * @param value value in the hash map - * @return GNUNET_YES (we should continue to iterate) + * Closure for "process_reply" function. */ -static int -destroy_request (void *cls, - const GNUNET_HashCode * key, - void *value) +struct ProcessReplyClosure { - const struct GNUNET_PeerIdentity * peer = cls; - struct PendingRequest *pr = value; - - GNUNET_CONTAINER_multihashmap_remove (requests_by_peer, - &peer->hashPubKey, - pr); - destroy_pending_request (pr); - return GNUNET_YES; -} + /** + * The data for the reply. + */ + const void *data; + // FIXME: add 'struct ConnectedPeer' to track 'last_xxx_replies' here! + /** + * When the reply expires. + */ + struct GNUNET_TIME_Absolute expiration; -/** - * Method called whenever a given peer connects. - * - * @param cls closure, not used - * @param peer peer identity this notification is about - * @param latency reported latency of the connection with 'other' - * @param distance reported distance (DV) to 'other' - */ -static void -peer_connect_handler (void *cls, - const struct - GNUNET_PeerIdentity * peer, - struct GNUNET_TIME_Relative latency, - uint32_t distance) -{ - struct ConnectedPeer *cp; + /** + * Size of data. + */ + size_t size; - cp = GNUNET_malloc (sizeof (struct ConnectedPeer)); - cp->pid = GNUNET_PEER_intern (peer); - GNUNET_CONTAINER_multihashmap_put (connected_peers, - &peer->hashPubKey, - cp, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); -} + /** + * Namespace that this reply belongs to + * (if it is of type SBLOCK). + */ + GNUNET_HashCode namespace; + + /** + * Type of the block. + */ + uint32_t type; + + /** + * How much was this reply worth to us? + */ + uint32_t priority; +}; /** - * Method called whenever a peer disconnects. + * We have received a reply; handle it! * - * @param cls closure, not used - * @param peer peer identity this notification is about + * @param cls response (struct ProcessReplyClosure) + * @param key our query + * @param value value in the hash map (info about the query) + * @return GNUNET_YES (we should continue to iterate) */ -static void -peer_disconnect_handler (void *cls, - const struct - GNUNET_PeerIdentity * peer) +static int +process_reply (void *cls, + const GNUNET_HashCode * key, + void *value) { + struct ProcessReplyClosure *prq = cls; + struct PendingRequest *pr = value; + struct PendingMessage *reply; + struct ClientResponseMessage *creply; + struct ClientList *cl; + struct PutMessage *pm; + struct ContentMessage *cm; struct ConnectedPeer *cp; - struct PendingMessage *pm; + GNUNET_HashCode chash; + GNUNET_HashCode mhash; + size_t msize; + uint32_t prio; - cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, - &peer->hashPubKey); - if (cp != NULL) + + GNUNET_CRYPTO_hash (prq->data, + prq->size, + &chash); + switch (prq->type) { - GNUNET_PEER_change_rc (cp->pid, -1); - GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); - if (NULL != cp->cth) - GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); - while (NULL != (pm = cp->pending_messages)) + case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: + case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: + /* only possible reply, stop requesting! */ + while (NULL != pr->pending_head) + destroy_pending_message_list_entry (pr->pending_head); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (query_request_map, + key, + prq)); + break; + case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: + if (0 != memcmp (pr->namespace, + &prq->namespace, + sizeof (GNUNET_HashCode))) + return GNUNET_YES; /* wrong namespace */ + /* then: fall-through! */ + case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK: + if (pr->bf != NULL) { - cp->pending_messages = pm->next; - GNUNET_free (pm); + mingle_hash (&chash, pr->mingle, &mhash); + if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf, + &mhash)) + return GNUNET_YES; /* duplicate */ + GNUNET_CONTAINER_bloomfilter_add (pr->bf, + &mhash); } - GNUNET_free (cp); + if (pr->client_request_list != NULL) + { + cl = pr->client_request_list->client_list; + if (pr->replies_seen_size == pr->replies_seen_off) + { + GNUNET_array_grow (pr->replies_seen, + pr->replies_seen_size, + pr->replies_seen_size * 2 + 4); + // FIXME: recalculate BF! + } + pr->replies_seen[pr->replies_seen_off++] = chash; + } + break; + case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK: + // FIXME: any checks against duplicates for SKBlocks? + break; + default: + GNUNET_break (0); + return GNUNET_YES; } - GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer, - &peer->hashPubKey, - &destroy_request, - (void*) peer); + prio = pr->priority; + prq->priority += pr->remaining_priority; + pr->remaining_priority = 0; + if (pr->client_request_list != NULL) + { + msize = sizeof (struct PutMessage) + prq->size; + creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage)); + creply->msize = msize; + creply->client_list = cl; + GNUNET_CONTAINER_DLL_insert_after (cl->res_head, + cl->res_tail, + cl->res_tail, + creply); + pm = (struct PutMessage*) &creply[1]; + pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); + pm->header.size = htons (msize); + pm->type = htonl (prq->type); + pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration)); + memcpy (&creply[1], prq->data, prq->size); + if (NULL == cl->th) + cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_to_client, + cl); + GNUNET_break (cl->th != NULL); + } + else + { + cp = pr->pht_entry->cp; + msize = sizeof (struct ContentMessage) + prq->size; + reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); + reply->cont = &transmit_reply_continuation; + reply->cont_cls = pr; + reply->msize = msize; + reply->priority = (uint32_t) -1; /* send replies first! */ + cm = (struct ContentMessage*) &reply[1]; + cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT); + cm->header.size = htons (msize); + cm->type = htonl (prq->type); + cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); + memcpy (&reply[1], prq->data, prq->size); + add_to_pending_messages_for_peer (cp, reply, pr); + } + + + // FIXME: implement hot-path routing statistics keeping! + return GNUNET_YES; } /** - * We're processing a GET request from another peer and have decided - * to forward it to other peers. + * Handle P2P "PUT" message. * - * @param cls our "struct ProcessGetContext *" - * @param tc unused + * @param cls closure, always NULL + * @param other the other peer involved (sender or receiver, NULL + * for loopback messages where we are both sender and receiver) + * @param message the actual message + * @param latency reported latency of the connection with 'other' + * @param distance reported distance (DV) to 'other' + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) */ -static void -forward_get_request (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +static int +handle_p2p_put (void *cls, + const struct GNUNET_PeerIdentity *other, + const struct GNUNET_MessageHeader *message, + struct GNUNET_TIME_Relative latency, + uint32_t distance) { - struct ProcessGetContext *pgc = cls; - struct PendingRequest *pr; - struct PendingRequest *eer; - struct GNUNET_PeerIdentity target; + const struct PutMessage *put; + uint16_t msize; + size_t dsize; + uint32_t type; + struct GNUNET_TIME_Absolute expiration; + GNUNET_HashCode query; + struct ProcessReplyClosure prq; - pr = GNUNET_malloc (sizeof (struct PendingRequest)); - if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm)) - { - pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode)); - *pr->namespace = pgc->namespace; - } - pr->bf = pgc->bf; - pr->bf_size = pgc->bf_size; - pgc->bf = NULL; - pr->start_time = pgc->start_time; - pr->query = pgc->query; - pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to); - if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm)) - pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target); - pr->anonymity_level = 1; /* default */ - pr->priority = pgc->priority; - pr->remaining_priority = pr->priority; - pr->mingle = pgc->mingle; - pr->ttl = pgc->ttl; - pr->type = pgc->type; - GNUNET_CONTAINER_multihashmap_put (requests_by_query, - &pr->query, - pr, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - GNUNET_CONTAINER_multihashmap_put (requests_by_peer, - &pgc->reply_to.hashPubKey, - pr, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration, - pr, - pr->start_time.value + pr->ttl); - if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests) + msize = ntohs (message->size); + if (msize < sizeof (struct PutMessage)) { - /* expire oldest request! */ - eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration); - GNUNET_PEER_resolve (eer->source_pid, - &target); - GNUNET_CONTAINER_multihashmap_remove (requests_by_peer, - &target.hashPubKey, - eer); - destroy_pending_request (eer); + GNUNET_break_op(0); + return GNUNET_SYSERR; } - pr->task = GNUNET_SCHEDULER_add_delayed (sched, - get_processing_delay (), - &forward_request_task, - pr); - GNUNET_free (pgc); -} -/** - * Transmit the given message by copying it to - * the target buffer "buf". "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. In that case, only + put = (const struct PutMessage*) message; + dsize = msize - sizeof (struct PutMessage); + type = ntohl (put->type); + expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration)); - * free the message - * - * @param cls closure, pointer to the message - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_message (void *cls, - size_t size, void *buf) -{ - struct GNUNET_MessageHeader *msg = cls; - uint16_t msize; - - if (NULL == buf) + /* first, validate! */ + switch (type) { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dropping reply, core too busy.\n"); -#endif - GNUNET_free (msg); - return 0; + case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: + case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: + GNUNET_CRYPTO_hash (&put[1], dsize, &query); + break; + case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK: + if (GNUNET_OK != + check_kblock ((const struct KBlock*) &put[1], + dsize, + &query)) + return GNUNET_SYSERR; + break; + case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: + if (GNUNET_OK != + check_sblock ((const struct SBlock*) &put[1], + dsize, + &query, + &prq.namespace)) + return GNUNET_SYSERR; + break; + case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK: + // FIXME -- validate SKBLOCK! + GNUNET_break (0); + return GNUNET_OK; + default: + /* unknown block type */ + GNUNET_break_op (0); + return GNUNET_SYSERR; } - msize = ntohs (msg->size); - GNUNET_assert (size >= msize); - memcpy (buf, msg, msize); - GNUNET_free (msg); - return msize; + + /* now, lookup 'query' */ + prq.data = (const void*) &put[1]; + prq.size = dsize; + prq.type = type; + prq.expiration = expiration; + prq.priority = 0; + GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, + &query, + &process_reply, + &prq); + // FIXME: if migration is on and load is low, + // queue to store data in datastore; + // use "prq.priority" for that! + return GNUNET_OK; } -/** - * Test if the load on this peer is too high - * to even consider processing the query at - * all. - * - * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise - */ -static int -test_load_too_high () -{ - return GNUNET_NO; // FIXME -} +/* **************************** P2P GET Handling ************************ */ /** @@ -2034,142 +1856,101 @@ test_load_too_high () * maybe 0 if no unique identifier is available */ static void -process_p2p_get_result (void *cls, - const GNUNET_HashCode * key, - uint32_t size, - const void *data, - uint32_t type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, - uint64_t uid) +process_local_reply (void *cls, + const GNUNET_HashCode * key, + uint32_t size, + const void *data, + uint32_t type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute + expiration, + uint64_t uid) { - struct ProcessGetContext *pgc = cls; + struct PendingRequest *pr = cls; + struct ProcessReplyClosure prq; GNUNET_HashCode dhash; GNUNET_HashCode mhash; - struct PutMessage *reply; + GNUNET_HashCode query; + pr->drq = NULL; if (NULL == key) { /* no more results */ - if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) == ROUTING_POLICY_FORWARD) && - ( (0 == pgc->results_found) || - (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) || - (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) || - (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) ) - { - GNUNET_SCHEDULER_add_continuation (sched, - &forward_get_request, - pgc, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); - } - else - { - if (pgc->bf != NULL) - GNUNET_CONTAINER_bloomfilter_free (pgc->bf); - GNUNET_free (pgc); - } - next_ds_request (); + if (pr->task == GNUNET_SCHEDULER_NO_TASK) + pr->task = GNUNET_SCHEDULER_add_now (sched, + &forward_request_task, + pr); return; } if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND) { - GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, - anonymity, expiration, uid, dsh, - &process_p2p_get_result, - pgc); + if (GNUNET_OK != + GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, + anonymity, expiration, uid, + &process_local_reply, + pr)) + GNUNET_FS_drq_get_next (GNUNET_YES); return; } /* check for duplicates */ GNUNET_CRYPTO_hash (data, size, &dhash); mingle_hash (&dhash, - pgc->mingle, + pr->mingle, &mhash); - if ( (pgc->bf != NULL) && + if ( (pr->bf != NULL) && (GNUNET_YES == - GNUNET_CONTAINER_bloomfilter_test (pgc->bf, + GNUNET_CONTAINER_bloomfilter_test (pr->bf, &mhash)) ) { #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Result from datastore filtered by bloomfilter.\n"); #endif - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + GNUNET_FS_drq_get_next (GNUNET_YES); return; } - pgc->results_found++; - if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) || - (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) || - (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) + pr->results_found++; + if ( (pr->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) || + (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) || + (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) { - if (pgc->bf == NULL) + if (pr->bf == NULL) { - pgc->bf_size = 32; - pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, - pgc->bf_size, - BLOOMFILTER_K); + pr->bf_size = 32; + pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, + pr->bf_size, + BLOOMFILTER_K); } - GNUNET_CONTAINER_bloomfilter_add (pgc->bf, + GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash); } - - reply = GNUNET_malloc (sizeof (struct PutMessage) + size); - reply->header.size = htons (sizeof (struct PutMessage) + size); - reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); - reply->type = htonl (type); - reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration)); - memcpy (&reply[1], data, size); - GNUNET_CORE_notify_transmit_ready (core, - pgc->priority, - ACCEPTABLE_REPLY_DELAY, - &pgc->reply_to, - sizeof (struct PutMessage) + size, - &transmit_message, - reply); - if ( (GNUNET_YES == test_load_too_high()) || - (pgc->results_found > 5 + 2 * pgc->priority) ) + memset (&prq, 0, sizeof (prq)); + prq.data = data; + prq.expiration = expiration; + prq.size = size; + if ( (type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) && + (GNUNET_OK != check_sblock ((const struct SBlock*) data, + size, + &query, + &prq.namespace)) ) { - GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); - pgc->policy &= ~ ROUTING_POLICY_FORWARD; + GNUNET_break (0); + /* FIXME: consider removing the block? */ + GNUNET_FS_drq_get_next (GNUNET_YES); return; } - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); -} + prq.type = type; + prq.priority = priority; + process_reply (&prq, key, pr); - -/** - * We're processing a GET request from another peer. Give it to our - * local datastore. - * - * @param cls our "struct ProcessGetContext" - * @param ok did we get a datastore slice or not? - */ -static void -ds_get_request (void *cls, - int ok) -{ - struct ProcessGetContext *pgc = cls; - uint32_t type; - struct GNUNET_TIME_Relative timeout; - - if (GNUNET_OK != ok) + if ( (GNUNET_YES == test_load_too_high()) || + (pr->results_found > 5 + 2 * pr->priority) ) { - /* no point in doing P2P stuff if we can't even do local */ - GNUNET_free (dsh); + GNUNET_FS_drq_get_next (GNUNET_NO); return; } - type = pgc->type; - if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK) - type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */ - timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY, - (pgc->priority + 1)); - GNUNET_DATASTORE_get (dsh, - &pgc->query, - type, - &process_p2p_get_result, - pgc, - timeout); + GNUNET_FS_drq_get_next (GNUNET_YES); } @@ -2201,17 +1982,16 @@ bound_ttl (int32_t ttl_in, uint32_t prio) /** - * We've received a request with the specified - * priority. Bound it according to how much - * we trust the given peer. + * We've received a request with the specified priority. Bound it + * according to how much we trust the given peer. * * @param prio_in requested priority - * @param peer the peer making the request + * @param cp the peer making the request * @return effective priority */ static uint32_t bound_priority (uint32_t prio_in, - const struct GNUNET_PeerIdentity *peer) + struct ConnectedPeer *cp) { return 0; // FIXME! } @@ -2233,20 +2013,23 @@ static int handle_p2p_get (void *cls, const struct GNUNET_PeerIdentity *other, const struct GNUNET_MessageHeader *message, - struct GNUNET_TIME_Relative latency, - uint32_t distance) + struct GNUNET_TIME_Relative latency, + uint32_t distance) { + struct PendingRequest *pr; + struct PeerRequestEntry *pre; + struct ConnectedPeer *cp; + struct ConnectedPeer *cps; + struct GNUNET_TIME_Relative timeout; uint16_t msize; const struct GetMessage *gm; unsigned int bits; const GNUNET_HashCode *opt; - struct ProcessGetContext *pgc; uint32_t bm; size_t bfsize; uint32_t ttl_decrement; + uint32_t type; double preference; - int net_load_up; - int net_load_down; msize = ntohs(message->size); if (msize < sizeof (struct GetMessage)) @@ -2262,505 +2045,254 @@ handle_p2p_get (void *cls, if (1 == (bm & 1)) bits++; bm >>= 1; - } - if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - opt = (const GNUNET_HashCode*) &gm[1]; - bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode); - pgc = GNUNET_malloc (sizeof (struct ProcessGetContext)); - if (bfsize > 0) - { - pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1], - bfsize, - BLOOMFILTER_K); - pgc->bf_size = bfsize; - } - pgc->type = ntohl (gm->type); - pgc->bm = ntohl (gm->hash_bitmap); - pgc->mingle = gm->filter_mutator; - bits = 0; - if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO)) - pgc->reply_to.hashPubKey = opt[bits++]; - else - pgc->reply_to = *other; - if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) - pgc->namespace = opt[bits++]; - else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) - { - GNUNET_break_op (0); - GNUNET_free (pgc); - return GNUNET_SYSERR; - } - if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO)) - pgc->prime_target.hashPubKey = opt[bits++]; - /* note that we can really only check load here since otherwise - peers could find out that we are overloaded by being disconnected - after sending us a malformed query... */ - if (GNUNET_YES == test_load_too_high ()) - { - if (NULL != pgc->bf) - GNUNET_CONTAINER_bloomfilter_free (pgc->bf); - GNUNET_free (pgc); -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dropping query from `%s', this peer is too busy.\n", - GNUNET_i2s (other)); -#endif - return GNUNET_OK; - } - net_load_up = 50; // FIXME - net_load_down = 50; // FIXME - pgc->policy = ROUTING_POLICY_NONE; - if ( (net_load_up < IDLE_LOAD_THRESHOLD) && - (net_load_down < IDLE_LOAD_THRESHOLD) ) - { - pgc->policy |= ROUTING_POLICY_ALL; - pgc->priority = 0; /* no charge */ - } - else - { - pgc->priority = bound_priority (ntohl (gm->priority), other); - if ( (net_load_up < - IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) && - (net_load_down < - IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) ) - { - pgc->policy |= ROUTING_POLICY_ALL; - } - else - { - // FIXME: is this sound? - if (net_load_up < 90 + 10 * pgc->priority) - pgc->policy |= ROUTING_POLICY_FORWARD; - if (net_load_down < 90 + 10 * pgc->priority) - pgc->policy |= ROUTING_POLICY_ANSWER; - } - } - if (pgc->policy == ROUTING_POLICY_NONE) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dropping query from `%s', network saturated.\n", - GNUNET_i2s (other)); -#endif - if (NULL != pgc->bf) - GNUNET_CONTAINER_bloomfilter_free (pgc->bf); - GNUNET_free (pgc); - return GNUNET_OK; /* drop */ - } - if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT) - pgc->priority = 0; /* kill the priority (we cannot benefit) */ - pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority); - /* decrement ttl (always) */ - ttl_decrement = 2 * TTL_DECREMENT + - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - TTL_DECREMENT); - if ( (pgc->ttl < 0) && - (pgc->ttl - ttl_decrement > 0) ) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dropping query from `%s' due to TTL underflow.\n", - GNUNET_i2s (other)); -#endif - /* integer underflow => drop (should be very rare)! */ - if (NULL != pgc->bf) - GNUNET_CONTAINER_bloomfilter_free (pgc->bf); - GNUNET_free (pgc); - return GNUNET_OK; - } - pgc->ttl -= ttl_decrement; - pgc->start_time = GNUNET_TIME_absolute_get (); - preference = (double) pgc->priority; - if (preference < QUERY_BANDWIDTH_VALUE) - preference = QUERY_BANDWIDTH_VALUE; - // FIXME: also reserve bandwidth for reply? - (void) GNUNET_CORE_peer_change_preference (sched, cfg, - other, - GNUNET_TIME_UNIT_FOREVER_REL, - 0, 0, preference, NULL, NULL); - if (0 != (pgc->policy & ROUTING_POLICY_ANSWER)) - pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY, - &ds_get_request, - pgc); - else - GNUNET_SCHEDULER_add_continuation (sched, - &forward_get_request, - pgc, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); - return GNUNET_OK; -} - - -/** - * Function called to notify us that we can now transmit a reply to a - * client or peer. "buf" will be NULL and "size" zero if the socket was - * closed for writing in the meantime. - * - * @param cls closure, points to a "struct PendingRequest*" with - * one or more pending replies - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_result (void *cls, - size_t size, - void *buf) -{ - struct PendingRequest *pr = cls; - char *cbuf = buf; - struct PendingMessage *reply; - size_t ret; - - ret = 0; - while (NULL != (reply = pr->replies_pending)) - { - if ( (reply->msize + ret < ret) || - (reply->msize + ret > size) ) - break; - pr->replies_pending = reply->next; - memcpy (&cbuf[ret], &reply[1], reply->msize); - ret += reply->msize; - GNUNET_free (reply); - } - return ret; -} - - -/** - * We have received a reply; handle it! - * - * @param cls response (struct ProcessReplyClosure) - * @param key our query - * @param value value in the hash map (meta-info about the query) - * @return GNUNET_YES (we should continue to iterate) - */ -static int -process_reply (void *cls, - const GNUNET_HashCode * key, - void *value) -{ - struct ProcessReplyClosure *prq = cls; - struct PendingRequest *pr = value; - struct PendingRequest *eer; - struct PendingMessage *reply; - struct PutMessage *pm; - struct ContentMessage *cm; - struct ConnectedPeer *cp; - GNUNET_HashCode chash; - GNUNET_HashCode mhash; - struct GNUNET_PeerIdentity target; - size_t msize; - uint32_t prio; - struct GNUNET_TIME_Relative max_delay; - - GNUNET_CRYPTO_hash (prq->data, - prq->size, - &chash); - switch (prq->type) - { - case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: - case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: - break; - case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: - /* FIXME: does prq->namespace match our expectations? */ - /* then: fall-through??? */ - case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK: - if (pr->bf != NULL) - { - mingle_hash (&chash, pr->mingle, &mhash); - if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf, - &mhash)) - return GNUNET_YES; /* duplicate */ - GNUNET_CONTAINER_bloomfilter_add (pr->bf, - &mhash); - } - break; - case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK: - // FIXME: any checks against duplicates for SKBlocks? - break; - } - prio = pr->priority; - prq->priority += pr->remaining_priority; - pr->remaining_priority = 0; - if (pr->client != NULL) - { - if (pr->replies_seen_size == pr->replies_seen_off) - GNUNET_array_grow (pr->replies_seen, - pr->replies_seen_size, - pr->replies_seen_size * 2 + 4); - pr->replies_seen[pr->replies_seen_off++] = chash; - // FIXME: possibly recalculate BF! - } - if (pr->client == NULL) - { - GNUNET_PEER_resolve (pr->source_pid, - &target); - cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, - &target.hashPubKey); - msize = sizeof (struct ContentMessage) + prq->size; - reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); - reply->msize = msize; - reply->priority = (uint32_t) -1; /* send replies first! */ - cm = (struct ContentMessage*) &reply[1]; - cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT); - cm->header.size = htons (msize); - cm->type = htonl (prq->type); - cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); - memcpy (&reply[1], prq->data, prq->size); - max_delay = GNUNET_TIME_UNIT_FOREVER_REL; - if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests) - { - /* estimate expiration time from time difference between - first request that will be discarded and this request */ - eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration); - max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time, - eer->start_time); - } - - if (cp == NULL) - { - /* FIXME: bound queue size! */ - reply->next = pr->replies_pending; - pr->replies_pending = reply; - if (pr->cth == NULL) - { - /* implicitly tries to connect */ - pr->cth = GNUNET_CORE_notify_transmit_ready (core, - prio, - max_delay, - &target, - msize, - &transmit_result, - pr); - } - } - else - { - /* insert replies always at the head */ - reply->next = cp->pending_messages; - cp->pending_messages = reply; - if (cp->cth == NULL) - cp->cth = GNUNET_CORE_notify_transmit_ready (core, - reply->priority, - GNUNET_TIME_UNIT_FOREVER_REL, - &target, - msize, - &transmit_request_cb, - cp); - } - } - else - { - msize = sizeof (struct PutMessage) + prq->size; - reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); - reply->msize = msize; - reply->next = pr->replies_pending; - pm = (struct PutMessage*) &reply[1]; - pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); - pm->header.size = htons (msize); - pm->type = htonl (prq->type); - pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration)); - pr->replies_pending = reply; - memcpy (&reply[1], prq->data, prq->size); - if (pr->th != NULL) - return GNUNET_YES; - pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_result, - pr); - if (pr->th == NULL) - { - // FIXME: need to try again later (not much - // to do here specifically, but we need to - // check somewhere else to handle this case!) - } - } - // FIXME: implement hot-path routing statistics keeping! - return GNUNET_YES; -} - - -/** - * Check if the given KBlock is well-formed. - * - * @param kb the kblock data (or at least "dsize" bytes claiming to be one) - * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)! - * @param query where to store the query that this block answers - * @return GNUNET_OK if this is actually a well-formed KBlock - */ -static int -check_kblock (const struct KBlock *kb, - size_t dsize, - GNUNET_HashCode *query) -{ - if (dsize < sizeof (struct KBlock)) + } + if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode)) { GNUNET_break_op (0); return GNUNET_SYSERR; - } - if (dsize - sizeof (struct KBlock) != - ntohs (kb->purpose.size) - - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) - - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) + } + opt = (const GNUNET_HashCode*) &gm[1]; + bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode); + + bm = ntohl (gm->hash_bitmap); + if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) && + (ntohl (gm->type) == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ) { GNUNET_break_op (0); - return GNUNET_SYSERR; + return GNUNET_SYSERR; } - if (GNUNET_OK != - GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK, - &kb->purpose, - &kb->signature, - &kb->keyspace)) + bits = 0; + cps = GNUNET_CONTAINER_multihashmap_get (connected_peers, + &other->hashPubKey); + GNUNET_assert (NULL != cps); + if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) + cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, + &opt[bits++]); + else + cp = cps; + if (cp == NULL) + { + /* FIXME: try connect? */ + return GNUNET_OK; + } + /* note that we can really only check load here since otherwise + peers could find out that we are overloaded by not being + disconnected after sending us a malformed query... */ + if (GNUNET_YES == test_load_too_high ()) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Dropping query from `%s', this peer is too busy.\n", + GNUNET_i2s (other)); +#endif + return GNUNET_OK; + } + + pr = GNUNET_malloc (sizeof (struct PendingRequest) + + (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)?sizeof(GNUNET_HashCode):0); + if ((bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) + pr->namespace = (GNUNET_HashCode*) &pr[1]; + pr->type = ntohl (gm->type); + pr->mingle = gm->filter_mutator; + if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) + memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode)); + else if (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) { GNUNET_break_op (0); + GNUNET_free (pr); return GNUNET_SYSERR; } - if (query != NULL) - GNUNET_CRYPTO_hash (&kb->keyspace, - sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - query); - return GNUNET_OK; -} + if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) + pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]); + pr->anonymity_level = 1; + pr->priority = bound_priority (ntohl (gm->priority), cps); + pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority); + pr->query = gm->query; + /* decrement ttl (always) */ + ttl_decrement = 2 * TTL_DECREMENT + + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + TTL_DECREMENT); + if ( (pr->ttl < 0) && + (pr->ttl - ttl_decrement > 0) ) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Dropping query from `%s' due to TTL underflow.\n", + GNUNET_i2s (other)); +#endif + /* integer underflow => drop (should be very rare)! */ + GNUNET_free (pr); + return GNUNET_OK; + } + pr->ttl -= ttl_decrement; + pr->start_time = GNUNET_TIME_absolute_get (); -/** - * Check if the given SBlock is well-formed. - * - * @param sb the sblock data (or at least "dsize" bytes claiming to be one) - * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)! - * @param query where to store the query that this block answers - * @param namespace where to store the namespace that this block belongs to - * @return GNUNET_OK if this is actually a well-formed SBlock - */ -static int -check_sblock (const struct SBlock *sb, - size_t dsize, - GNUNET_HashCode *query, - GNUNET_HashCode *namespace) -{ - if (dsize < sizeof (struct SBlock)) + /* get bloom filter */ + if (bfsize > 0) { - GNUNET_break_op (0); - return GNUNET_SYSERR; + pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits], + bfsize, + BLOOMFILTER_K); + pr->bf_size = bfsize; } - if (dsize != - ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature)) + + /* FIXME: check somewhere if request already exists, and if so, + recycle old state... */ + pre = GNUNET_malloc (sizeof (struct PeerRequestEntry)); + pre->cp = cp; + pre->req = pr; + GNUNET_CONTAINER_multihashmap_put (query_request_map, + &gm->query, + pre, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, + pr, + GNUNET_TIME_absolute_get().value + pr->ttl); + + + /* calculate change in traffic preference */ + preference = (double) pr->priority; + if (preference < QUERY_BANDWIDTH_VALUE) + preference = QUERY_BANDWIDTH_VALUE; + cps->inc_preference += preference; + + /* process locally */ + type = pr->type; + if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK) + type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */ + timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY, + (pr->priority + 1)); + pr->drq = GNUNET_FS_drq_get (&gm->query, + pr->type, + &process_local_reply, + pr, + timeout); + + /* Are multiple results possible? If so, start processing remotely now! */ + switch (pr->type) { - GNUNET_break_op (0); - return GNUNET_SYSERR; + case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: + case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: + /* only one result, wait for datastore */ + break; + default: + pr->task = GNUNET_SCHEDULER_add_now (sched, + &forward_request_task, + pr); } - if (GNUNET_OK != - GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK, - &sb->purpose, - &sb->signature, - &sb->subspace)) + + /* make sure we don't track too many requests */ + if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) { - GNUNET_break_op (0); - return GNUNET_SYSERR; + pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap); + destroy_pending_request (pr); } - if (query != NULL) - *query = sb->identifier; - if (namespace != NULL) - GNUNET_CRYPTO_hash (&sb->subspace, - sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - namespace); return GNUNET_OK; } +/* **************************** CS GET Handling ************************ */ + + /** - * Handle P2P "PUT" request. + * Handle START_SEARCH-message (search request from client). * - * @param cls closure, always NULL - * @param other the other peer involved (sender or receiver, NULL - * for loopback messages where we are both sender and receiver) + * @param cls closure + * @param client identification of the client * @param message the actual message - * @param latency reported latency of the connection with 'other' - * @param distance reported distance (DV) to 'other' - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) */ -static int -handle_p2p_put (void *cls, - const struct GNUNET_PeerIdentity *other, - const struct GNUNET_MessageHeader *message, - struct GNUNET_TIME_Relative latency, - uint32_t distance) +static void +handle_start_search (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { - const struct PutMessage *put; + static GNUNET_HashCode all_zeros; + const struct SearchMessage *sm; + struct ClientList *cl; + struct ClientRequestList *crl; + struct PendingRequest *pr; uint16_t msize; - size_t dsize; + unsigned int sc; uint32_t type; - struct GNUNET_TIME_Absolute expiration; - GNUNET_HashCode query; - struct ProcessReplyClosure prq; - + msize = ntohs (message->size); - if (msize < sizeof (struct PutMessage)) + if ( (msize < sizeof (struct SearchMessage)) || + (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) ) { - GNUNET_break_op(0); - return GNUNET_SYSERR; + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, + GNUNET_SYSERR); + return; } - put = (const struct PutMessage*) message; - dsize = msize - sizeof (struct PutMessage); - type = ntohl (put->type); - expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration)); + sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode); + sm = (const struct SearchMessage*) message; - /* first, validate! */ + cl = client_list; + while ( (cl != NULL) && + (cl->client != client) ) + cl = cl->next; + if (cl == NULL) + { + cl = GNUNET_malloc (sizeof (struct ClientList)); + cl->client = client; + GNUNET_SERVER_client_keep (client); + cl->next = client_list; + client_list = cl; + } + type = ntohl (sm->type); + + /* FIXME: detect duplicate request; if duplicate, simply update (merge) + 'pr->replies_seen'! */ + pr = GNUNET_malloc (sizeof (struct PendingRequest) + + (type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)?sizeof(GNUNET_HashCode):0); + crl = GNUNET_malloc (sizeof (struct ClientRequestList)); + crl->client_list = cl; + GNUNET_CONTAINER_DLL_insert (cl->rl_head, + cl->rl_tail, + crl); + crl->req = pr; + pr->type = type; + pr->client_request_list = crl; + GNUNET_array_grow (pr->replies_seen, + pr->replies_seen_size, + sc); + memcpy (pr->replies_seen, + &sm[1], + sc * sizeof (GNUNET_HashCode)); + pr->replies_seen_off = sc; + pr->anonymity_level = ntohl (sm->anonymity_level); + pr->mingle = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, + (uint32_t) -1); + pr->query = sm->query; switch (type) { case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: - GNUNET_CRYPTO_hash (&put[1], dsize, &query); - break; - case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK: - if (GNUNET_OK != - check_kblock ((const struct KBlock*) &put[1], - dsize, - &query)) - return GNUNET_SYSERR; + if (0 != memcmp (&sm->target, + &all_zeros, + sizeof (GNUNET_HashCode))) + pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target); break; case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: - if (GNUNET_OK != - check_sblock ((const struct SBlock*) &put[1], - dsize, - &query, - &prq.namespace)) - return GNUNET_SYSERR; + pr->namespace = (GNUNET_HashCode*) &pr[1]; + memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode)); break; - case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK: - // FIXME -- validate SKBLOCK! - GNUNET_break (0); - return GNUNET_OK; default: - /* unknown block type */ - GNUNET_break_op (0); - return GNUNET_SYSERR; + break; } - - /* now, lookup 'query' */ - prq.data = (const void*) &put[1]; - prq.size = dsize; - prq.type = type; - prq.expiration = expiration; - prq.priority = 0; - GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query, - &query, - &process_reply, - &prq); - // FIXME: if migration is on and load is low, - // queue to store data in datastore; - // use "prq.priority" for that! - return GNUNET_OK; + pr->drq = GNUNET_FS_drq_get (&sm->query, + pr->type, + &process_local_reply, + pr, + GNUNET_TIME_UNIT_FOREVER_REL); } +/* **************************** Startup ************************ */ + + /** * List of handlers for P2P messages * that we care about. @@ -2775,6 +2307,23 @@ static struct GNUNET_CORE_MessageHandler p2p_handlers[] = }; +/** + * List of handlers for the messages understood by this + * service. + */ +static struct GNUNET_SERVER_MessageHandler handlers[] = { + {&GNUNET_FS_handle_index_start, NULL, + GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0}, + {&GNUNET_FS_handle_index_list_get, NULL, + GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) }, + {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, + sizeof (struct UnindexMessage) }, + {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, + 0 }, + {NULL, NULL, 0, 0} +}; + + /** * Process fs requests. * @@ -2783,22 +2332,13 @@ static struct GNUNET_CORE_MessageHandler p2p_handlers[] = * @param server the initialized server * @param c configuration to use */ -static void -run (void *cls, - struct GNUNET_SCHEDULER_Handle *s, - struct GNUNET_SERVER_Handle *server, - const struct GNUNET_CONFIGURATION_Handle *c) +static int +main_init (struct GNUNET_SCHEDULER_Handle *s, + struct GNUNET_SERVER_Handle *server, + const struct GNUNET_CONFIGURATION_Handle *c) { sched = s; cfg = c; - - requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config - requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config - connected_peers = GNUNET_CONTAINER_multihashmap_create (64); - requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); - GNUNET_FS_init_indexing (sched, cfg); - dsh = GNUNET_DATASTORE_connect (cfg, - sched); core = GNUNET_CORE_connect (sched, cfg, GNUNET_TIME_UNIT_FOREVER_REL, @@ -2810,7 +2350,16 @@ run (void *cls, NULL, GNUNET_NO, NULL, GNUNET_NO, p2p_handlers); - + if (NULL == core) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to connect to `%s' service.\n"), + "core"); + return GNUNET_SYSERR; + } + query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config + peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config + requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL); @@ -2819,21 +2368,30 @@ run (void *cls, GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); - if (NULL == dsh) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Failed to connect to `%s' service.\n"), - "datastore"); - GNUNET_SCHEDULER_shutdown (sched); - return; - } - if (NULL == core) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Failed to connect to `%s' service.\n"), - "core"); + return GNUNET_OK; +} + + +/** + * Process fs requests. + * + * @param cls closure + * @param sched scheduler to use + * @param server the initialized server + * @param cfg configuration to use + */ +static void +run (void *cls, + struct GNUNET_SCHEDULER_Handle *sched, + struct GNUNET_SERVER_Handle *server, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + if ( (GNUNET_OK != GNUNET_FS_drq_init (sched, cfg)) || + (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg)) || + (GNUNET_OK != main_init (sched, server, cfg)) ) + { GNUNET_SCHEDULER_shutdown (sched); - return; + return; } } diff --git a/src/fs/gnunet-service-fs_drq.c b/src/fs/gnunet-service-fs_drq.c new file mode 100644 index 000000000..c15e37a0d --- /dev/null +++ b/src/fs/gnunet-service-fs_drq.c @@ -0,0 +1,416 @@ +/* + This file is part of GNUnet. + (C) 2009, 2010 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file fs/gnunet-service-fs_drq.c + * @brief queueing of requests to the datastore service + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet-service-fs_drq.h" + + +/** + * Signature of a function that is called whenever a datastore + * request can be processed (or an entry put on the queue times out). + * + * @param cls closure + * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout + */ +typedef void (*RequestFunction)(void *cls, + int ok); + + +/** + * Doubly-linked list of our requests for the datastore. + */ +struct DatastoreRequestQueue +{ + + /** + * This is a doubly-linked list. + */ + struct DatastoreRequestQueue *next; + + /** + * This is a doubly-linked list. + */ + struct DatastoreRequestQueue *prev; + + /** + * Function to call (will issue the request). + */ + RequestFunction req; + + /** + * Closure for req. + */ + void *req_cls; + + /** + * When should this request time-out because we don't care anymore? + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * ID of task used for signaling timeout. + */ + GNUNET_SCHEDULER_TaskIdentifier task; + +}; + +/** + * Our scheduler. + */ +static struct GNUNET_SCHEDULER_Handle *sched; + +/** + * Our configuration. + */ +const struct GNUNET_CONFIGURATION_Handle *cfg; + +/** + * Head of request queue for the datastore, sorted by timeout. + */ +static struct DatastoreRequestQueue *drq_head; + +/** + * Tail of request queue for the datastore. + */ +static struct DatastoreRequestQueue *drq_tail; + +/** + * Our connection to the datastore. + */ +static struct GNUNET_DATASTORE_Handle *dsh; + + +/** + * Run the next DS request in our + * queue, we're done with the current one. + */ +static void +next_ds_request () +{ + struct DatastoreRequestQueue *e; + + while (NULL != (e = drq_head)) + { + if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value) + break; + if (e->task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (sched, e->task); + GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); + e->req (e->req_cls, GNUNET_NO); + GNUNET_free (e); + } + if (e == NULL) + return; + if (e->task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (sched, e->task); + e->task = GNUNET_SCHEDULER_NO_TASK; + e->req (e->req_cls, GNUNET_YES); + GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); + GNUNET_free (e); +} + + +/** + * A datastore request had to be timed out. + * + * @param cls closure (of type "struct DatastoreRequestQueue*") + * @param tc task context, unused + */ +static void +timeout_ds_request (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct DatastoreRequestQueue *e = cls; + + e->task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); + e->req (e->req_cls, GNUNET_NO); + GNUNET_free (e); +} + + +static void +dequeue_ds_request (struct DatastoreRequestQueue *req) +{ + if (req->task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (sched, req->task); + GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, req); + GNUNET_free (req); +} + + +/** + * Queue a request for the datastore. + * + * @param deadline by when the request should run + * @param fun function to call once the request can be run + * @param fun_cls closure for fun + * @return handle that can be used to dequeue the request + */ +static struct DatastoreRequestQueue * +queue_ds_request (struct GNUNET_TIME_Relative deadline, + RequestFunction fun, + void *fun_cls) +{ + struct DatastoreRequestQueue *e; + struct DatastoreRequestQueue *bef; + + if (drq_head == NULL) + { + /* no other requests pending, run immediately */ + // FIXME: should probably use scheduler nevertheless + // and return non-null! + fun (fun_cls, GNUNET_OK); + return NULL; + } + e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue)); + e->timeout = GNUNET_TIME_relative_to_absolute (deadline); + e->req = fun; + e->req_cls = fun_cls; + if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) + { + /* local request, highest prio, put at head of queue + regardless of deadline */ + bef = NULL; + } + else + { + bef = drq_tail; + while ( (NULL != bef) && + (e->timeout.value < bef->timeout.value) ) + bef = bef->prev; + } + GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e); + if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) + return e; + e->task = GNUNET_SCHEDULER_add_delayed (sched, + deadline, + &timeout_ds_request, + e); + return e; +} + + +/** + * Task run during shutdown. + * + * @param cls unused + * @param tc unused + */ +static void +shutdown_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct DatastoreRequestQueue *drq; + + GNUNET_assert (NULL != dsh); + GNUNET_DATASTORE_disconnect (dsh, + GNUNET_NO); + dsh = NULL; + while (NULL != (drq = drq_head)) + { + drq_head = drq->next; + drq->req (drq->req_cls, GNUNET_NO); + dequeue_ds_request (drq); + } + drq_tail = NULL; +} + + +struct GetClosure +{ + GNUNET_HashCode key; + uint32_t type; + GNUNET_DATASTORE_Iterator iter; + void *iter_cls; + struct GNUNET_TIME_Absolute timeout; +}; + + +static void +get_iterator (void *cls, + const GNUNET_HashCode * key, + uint32_t size, + const void *data, + uint32_t type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute + expiration, + uint64_t uid) +{ + struct GetClosure *gc = cls; + + gc->iter (gc->iter_cls, + key, size, data, type, + priority, anonymity, expiration, uid); + if (key == NULL) + { + next_ds_request (); + GNUNET_free (gc); + } +} + + +static void +do_get (void *cls, + int ok) +{ + struct GetClosure *gc = cls; + + if (ok != GNUNET_OK) + { + gc->iter (gc->iter_cls, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_free (gc); + next_ds_request (); + return; + } + GNUNET_DATASTORE_get (dsh, &gc->key, gc->type, + &get_iterator, + gc, + GNUNET_TIME_absolute_get_remaining(gc->timeout)); +} + + +/** + * Iterate over the results for a particular key + * in the datastore. The iterator will only be called + * once initially; if the first call did contain a + * result, further results can be obtained by calling + * "GNUNET_DATASTORE_get_next" with the given argument. + * + * @param key maybe NULL (to match all entries) + * @param type desired type, 0 for any + * @param iter function to call on each matching value; + * will be called once with a NULL value at the end + * @param iter_cls closure for iter + * @param timeout how long to wait at most for a response + */ +struct DatastoreRequestQueue * +GNUNET_FS_drq_get (const GNUNET_HashCode * key, + uint32_t type, + GNUNET_DATASTORE_Iterator iter, + void *iter_cls, + struct GNUNET_TIME_Relative timeout) +{ + struct GetClosure *gc; + + gc = GNUNET_malloc (sizeof (struct GetClosure)); + gc->key = *key; + gc->type = type; + gc->iter = iter; + gc->iter_cls = iter_cls; + gc->timeout = GNUNET_TIME_relative_to_absolute (timeout); + return queue_ds_request (timeout, + &do_get, + gc); +} + + +void +GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq) +{ + dequeue_ds_request (drq); +} + + +/** + * Function called to trigger obtaining the next result + * from the datastore. + * + * @param more GNUNET_YES to get more results, GNUNET_NO to abort + * iteration (with a final call to "iter" with key/data == NULL). + */ +void +GNUNET_FS_drq_get_next (int more) +{ + GNUNET_DATASTORE_get_next (dsh, more); +} + + +/** + * Explicitly remove some content from the database. + * The "cont"inuation will be called with status + * "GNUNET_OK" if content was removed, "GNUNET_NO" + * if no matching entry was found and "GNUNET_SYSERR" + * on all other types of errors. + * + * @param key key for the value + * @param size number of bytes in data + * @param data content stored + * @param cont continuation to call when done + * @param cont_cls closure for cont + * @param timeout how long to wait at most for a response + */ +void +GNUNET_FS_drq_remove (const GNUNET_HashCode *key, + uint32_t size, const void *data, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls, + struct GNUNET_TIME_Relative timeout) +{ + if (dsh == NULL) + { + GNUNET_break (0); + return; + } + GNUNET_DATASTORE_remove (dsh, key, size, data, + cont, cont_cls, timeout); +} + + +/** + * Setup datastore request queues. + * + * @param s scheduler to use + * @param c configuration to use + * @return GNUNET_OK on success + */ +int +GNUNET_FS_drq_init (struct GNUNET_SCHEDULER_Handle *s, + const struct GNUNET_CONFIGURATION_Handle *c) +{ + sched = s; + cfg = c; + dsh = GNUNET_DATASTORE_connect (cfg, + sched); + if (NULL == dsh) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to connect to `%s' service.\n"), + "datastore"); + return GNUNET_SYSERR; + } + GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_TIME_UNIT_FOREVER_REL, + &shutdown_task, + NULL); + return GNUNET_OK; +} + + +/* end of gnunet-service-fs_drq.c */ diff --git a/src/fs/gnunet-service-fs_drq.h b/src/fs/gnunet-service-fs_drq.h new file mode 100644 index 000000000..306db9bad --- /dev/null +++ b/src/fs/gnunet-service-fs_drq.h @@ -0,0 +1,137 @@ +/* + This file is part of GNUnet. + (C) 2009, 2010 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file fs/gnunet-service-fs_drq.h + * @brief queueing of requests to the datastore service + * @author Christian Grothoff + */ +#ifndef GNUNET_SERVICE_FS_DRQ_H +#define GNUNET_SERVICE_FS_DRQ_H + +#include "gnunet_datastore_service.h" +#include "gnunet_util_lib.h" + + +/** + * Handle for pending, abortable requests for the datastore. + */ +struct DatastoreRequestQueue; + + +/** + * Iterate over the results for a particular key + * in the datastore. The iterator will only be called + * once initially; if the first call did contain a + * result, further results can be obtained by calling + * "GNUNET_DATASTORE_get_next" with the given argument. + * + * @param key maybe NULL (to match all entries) + * @param type desired type, 0 for any + * @param iter function to call on each matching value; + * will be called once with a NULL value at the end + * @param iter_cls closure for iter + * @param timeout how long to wait at most for a response + */ +struct DatastoreRequestQueue * +GNUNET_FS_drq_get (const GNUNET_HashCode * key, + uint32_t type, + GNUNET_DATASTORE_Iterator iter, + void *iter_cls, + struct GNUNET_TIME_Relative timeout); + + + +void +GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq); + + +/** + * Function called to trigger obtaining the next result + * from the datastore. Must be called (directly or indirectly) + * from the 'iter' callback given to 'GNUNET_FS_drq_get'. + * Not calling 'get_next' means no other datastore + * interactions (other than remove) will happen. + * + * @param more GNUNET_YES to get more results, GNUNET_NO to abort + * iteration (with a final call to "iter" with key/data == NULL). + */ +void +GNUNET_FS_drq_get_next (int more); + + +/** + * Explicitly remove some content from the database. + * The "cont"inuation will be called with status + * "GNUNET_OK" if content was removed, "GNUNET_NO" + * if no matching entry was found and "GNUNET_SYSERR" + * on all other types of errors. + * + * @param key key for the value + * @param size number of bytes in data + * @param data content stored + * @param cont continuation to call when done + * @param cont_cls closure for cont + * @param timeout how long to wait at most for a response + */ +void +GNUNET_FS_drq_remove (const GNUNET_HashCode *key, + uint32_t size, const void *data, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls, + struct GNUNET_TIME_Relative timeout); + + + +/** + * Explicitly remove some content from the database. + * The "cont"inuation will be called with status + * "GNUNET_OK" if content was removed, "GNUNET_NO" + * if no matching entry was found and "GNUNET_SYSERR" + * on all other types of errors. + * + * @param key key for the value + * @param size number of bytes in data + * @param data content stored + * @param cont continuation to call when done + * @param cont_cls closure for cont + * @param timeout how long to wait at most for a response + */ +void +GNUNET_FS_drq_remove (const GNUNET_HashCode *key, + uint32_t size, const void *data, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls, + struct GNUNET_TIME_Relative timeout); +/** + * Setup datastore request queues. + * + * @param s scheduler to use + * @param c configuration to use + * @return GNUNET_OK on success + */ +int +GNUNET_FS_drq_init (struct GNUNET_SCHEDULER_Handle *s, + const struct GNUNET_CONFIGURATION_Handle *c); + + + +/* end of gnunet-service-fs_drq.h */ +#endif diff --git a/src/fs/gnunet-service-fs_indexing.c b/src/fs/gnunet-service-fs_indexing.c index ebd7114d3..7dea53ee9 100644 --- a/src/fs/gnunet-service-fs_indexing.c +++ b/src/fs/gnunet-service-fs_indexing.c @@ -34,6 +34,7 @@ #include "gnunet_protocols.h" #include "gnunet_signatures.h" #include "gnunet_util_lib.h" +#include "gnunet-service-fs_drq.h" #include "gnunet-service-fs_indexing.h" #include "fs.h" @@ -508,13 +509,10 @@ remove_cont (void *cls, int success, const char *msg) { - struct GNUNET_DATASTORE_Handle *dsh = cls; - if (GNUNET_OK != success) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to delete bogus block: %s\n"), msg); - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); } @@ -533,10 +531,11 @@ remove_cont (void *cls, * @param expiration expiration time for the content * @param uid unique identifier for the datum; * maybe 0 if no unique identifier is available - * @param cont function to call with the actual block + * @param cont function to call with the actual block (at most once, on success) * @param cont_cls closure for cont + * @return GNUNET_OK on success */ -void +int GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, uint32_t size, const void *data, @@ -545,7 +544,6 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, uint32_t anonymity, struct GNUNET_TIME_Absolute expiration, uint64_t uid, - struct GNUNET_DATASTORE_Handle *dsh, GNUNET_DATASTORE_Iterator cont, void *cont_cls) { @@ -564,14 +562,13 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, if (size != sizeof (struct OnDemandBlock)) { GNUNET_break (0); - GNUNET_DATASTORE_remove (dsh, - key, - size, - data, - &remove_cont, - dsh, - GNUNET_TIME_UNIT_FOREVER_REL); - return; + GNUNET_FS_drq_remove (key, + size, + data, + &remove_cont, + NULL, + GNUNET_TIME_UNIT_FOREVER_REL); + return GNUNET_SYSERR; } odb = (const struct OnDemandBlock*) data; off = GNUNET_ntohll (odb->offset); @@ -600,8 +597,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, GNUNET_DISK_file_close (fh); /* FIXME: if this happens often, we need to remove the OnDemand block from the DS! */ - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); - return; + return GNUNET_SYSERR; } GNUNET_DISK_file_close (fh); GNUNET_CRYPTO_hash (ndata, @@ -626,8 +622,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, (unsigned long long) off); /* FIXME: if this happens often, we need to remove the OnDemand block from the DS! */ - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); - return; + return GNUNET_SYSERR; } cont (cont_cls, key, @@ -638,6 +633,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, anonymity, expiration, uid); + return GNUNET_OK; } @@ -671,8 +667,8 @@ shutdown_task (void *cls, * @param s scheduler to use * @param c configuration to use */ -void -GNUNET_FS_init_indexing (struct GNUNET_SCHEDULER_Handle *s, +int +GNUNET_FS_indexing_init (struct GNUNET_SCHEDULER_Handle *s, const struct GNUNET_CONFIGURATION_Handle *c) { sched = s; @@ -683,6 +679,7 @@ GNUNET_FS_init_indexing (struct GNUNET_SCHEDULER_Handle *s, &shutdown_task, NULL); read_index_list (); + return GNUNET_OK; } /* end of gnunet-service-fs_indexing.c */ diff --git a/src/fs/gnunet-service-fs_indexing.h b/src/fs/gnunet-service-fs_indexing.h index dc6427234..9749b42a0 100644 --- a/src/fs/gnunet-service-fs_indexing.h +++ b/src/fs/gnunet-service-fs_indexing.h @@ -49,11 +49,11 @@ * @param expiration expiration time for the content * @param uid unique identifier for the datum; * maybe 0 if no unique identifier is available - * @param dsh connection to the datastore (to ask for more) - * @param cont function to call with the actual block + * @param cont function to call with the actual block (at most once, on success) * @param cont_cls closure for cont + * @return GNUNET_OK on success */ -void +int GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, uint32_t size, const void *data, @@ -62,7 +62,6 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, uint32_t anonymity, struct GNUNET_TIME_Absolute expiration, uint64_t uid, - struct GNUNET_DATASTORE_Handle *dsh, GNUNET_DATASTORE_Iterator cont, void *cont_cls); @@ -112,9 +111,10 @@ GNUNET_FS_handle_unindex (void *cls, * * @param s scheduler to use * @param c configuration to use + * @return GNUNET_OK on success */ -void -GNUNET_FS_init_indexing (struct GNUNET_SCHEDULER_Handle *s, +int +GNUNET_FS_indexing_init (struct GNUNET_SCHEDULER_Handle *s, const struct GNUNET_CONFIGURATION_Handle *c); -- 2.25.1