2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs.c
23 * @brief gnunet anonymity protocol implementation
24 * @author Christian Grothoff
27 * - introduce random latency in processing
32 #include "gnunet_constants.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_dht_service.h"
35 #include "gnunet_datastore_service.h"
36 #include "gnunet_load_lib.h"
37 #include "gnunet_peer_lib.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet_util_lib.h"
42 #include "gnunet-service-fs_indexing.h"
45 #define DEBUG_FS GNUNET_NO
48 * Maximum number of outgoing messages we queue per peer.
50 #define MAX_QUEUE_PER_PEER 16
53 * Size for the hash map for DHT requests from the FS
54 * service. Should be about the number of concurrent
55 * DHT requests we plan to make.
57 #define FS_DHT_HT_SIZE 1024
60 * How often do we flush trust values to disk?
62 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
65 * How often do we at most PUT content into the DHT?
67 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
70 * Inverse of the probability that we will submit the same query
71 * to the same peer again. If the same peer already got the query
72 * repeatedly recently, the probability is multiplied by the inverse
73 * of this number each time. Note that we only try about every TTL_DECREMENT/2
74 * plus MAX_CORK_DELAY (so roughly every 3.5s).
76 #define RETRY_PROBABILITY_INV 3
79 * What is the maximum delay for a P2P FS message (in our interaction
80 * with core)? FS-internal delays are another story. The value is
81 * chosen based on the 32k block size. Given that peers typcially
82 * have at least 1 kb/s bandwidth, 45s waits give us a chance to
83 * transmit one message even to the lowest-bandwidth peers.
85 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
88 * Maximum number of requests (from other peers) that we're
89 * willing to have pending at any given point in time.
91 static unsigned long long max_pending_requests = (32 * 1024);
95 * Information we keep for each pending reply. The
96 * actual message follows at the end of this struct.
98 struct PendingMessage;
101 * Function called upon completion of a transmission.
104 * @param pid ID of receiving peer, 0 on transmission error
106 typedef void (*TransmissionContinuation)(void * cls,
107 GNUNET_PEER_Id tpid);
111 * Information we keep for each pending message (GET/PUT). The
112 * actual message follows at the end of this struct.
114 struct PendingMessage
117 * This is a doubly-linked list of messages to the same peer.
119 struct PendingMessage *next;
122 * This is a doubly-linked list of messages to the same peer.
124 struct PendingMessage *prev;
127 * Entry in pending message list for this pending message.
129 struct PendingMessageList *pml;
132 * Function to call immediately once we have transmitted this
135 TransmissionContinuation cont;
143 * Size of the reply; actual reply message follows
144 * at the end of this struct.
149 * How important is this message for us?
157 * Information about a peer that we are connected to.
158 * We track data that is useful for determining which
159 * peers should receive our requests. We also keep
160 * a list of messages to transmit to this peer.
166 * List of the last clients for which this peer successfully
169 struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
172 * List of the last PIDs for which
173 * this peer successfully answered a query;
174 * We use 0 to indicate no successful reply.
176 GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
179 * Average delay between sending the peer a request and
180 * getting a reply (only calculated over the requests for
181 * which we actually got a reply). Calculated
182 * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
184 struct GNUNET_TIME_Relative avg_delay;
187 * Point in time until which this peer does not want us to migrate content
190 struct GNUNET_TIME_Absolute migration_blocked;
193 * Time until when we blocked this peer from migrating
196 struct GNUNET_TIME_Absolute last_migration_block;
199 * Handle for an active request for transmission to this
202 struct GNUNET_CORE_TransmitHandle *cth;
205 * Messages (replies, queries, content migration) we would like to
206 * send to this peer in the near future. Sorted by priority, head.
208 struct PendingMessage *pending_messages_head;
211 * Messages (replies, queries, content migration) we would like to
212 * send to this peer in the near future. Sorted by priority, tail.
214 struct PendingMessage *pending_messages_tail;
217 * How long does it typically take for us to transmit a message
218 * to this peer? (delay between the request being issued and
219 * the callback being invoked).
221 struct GNUNET_LOAD_Value *transmission_delay;
224 * Time when the last transmission request was issued.
226 struct GNUNET_TIME_Absolute last_transmission_request_start;
229 * Average priority of successful replies. Calculated
230 * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
235 * Increase in traffic preference still to be submitted
236 * to the core service for this peer.
238 uint64_t inc_preference;
241 * Trust rating for this peer
246 * Trust rating for this peer on disk.
251 * The peer's identity.
256 * Size of the linked list of 'pending_messages'.
258 unsigned int pending_requests;
261 * Which offset in "last_p2p_replies" will be updated next?
262 * (we go round-robin).
264 unsigned int last_p2p_replies_woff;
267 * Which offset in "last_client_replies" will be updated next?
268 * (we go round-robin).
270 unsigned int last_client_replies_woff;
276 * Information we keep for each pending request. We should try to
277 * keep this struct as small as possible since its memory consumption
278 * is key to how many requests we can have pending at once.
280 struct PendingRequest;
284 * Doubly-linked list of requests we are performing
285 * on behalf of the same client.
287 struct ClientRequestList
291 * This is a doubly-linked list.
293 struct ClientRequestList *next;
296 * This is a doubly-linked list.
298 struct ClientRequestList *prev;
301 * Request this entry represents.
303 struct PendingRequest *req;
306 * Client list this request belongs to.
308 struct ClientList *client_list;
314 * Replies to be transmitted to the client. The actual
315 * response message is allocated after this struct.
317 struct ClientResponseMessage
320 * This is a doubly-linked list.
322 struct ClientResponseMessage *next;
325 * This is a doubly-linked list.
327 struct ClientResponseMessage *prev;
330 * Client list entry this response belongs to.
332 struct ClientList *client_list;
335 * Number of bytes in the response.
342 * Linked list of clients we are performing requests
348 * This is a linked list.
350 struct ClientList *next;
353 * ID of a client making a request, NULL if this entry is for a
356 struct GNUNET_SERVER_Client *client;
359 * Head of list of requests performed on behalf
360 * of this client right now.
362 struct ClientRequestList *rl_head;
365 * Tail of list of requests performed on behalf
366 * of this client right now.
368 struct ClientRequestList *rl_tail;
371 * Head of linked list of responses.
373 struct ClientResponseMessage *res_head;
376 * Tail of linked list of responses.
378 struct ClientResponseMessage *res_tail;
381 * Context for sending replies.
383 struct GNUNET_CONNECTION_TransmitHandle *th;
389 * Doubly-linked list of messages we are performing
390 * due to a pending request.
392 struct PendingMessageList
396 * This is a doubly-linked list of messages on behalf of the same request.
398 struct PendingMessageList *next;
401 * This is a doubly-linked list of messages on behalf of the same request.
403 struct PendingMessageList *prev;
406 * Message this entry represents.
408 struct PendingMessage *pm;
411 * Request this entry belongs to.
413 struct PendingRequest *req;
416 * Peer this message is targeted for.
418 struct ConnectedPeer *target;
424 * Information we keep for each pending request. We should try to
425 * keep this struct as small as possible since its memory consumption
426 * is key to how many requests we can have pending at once.
428 struct PendingRequest
432 * If this request was made by a client, this is our entry in the
433 * client request list; otherwise NULL.
435 struct ClientRequestList *client_request_list;
438 * Entry of peer responsible for this entry (if this request
439 * was made by a peer).
441 struct ConnectedPeer *cp;
444 * If this is a namespace query, pointer to the hash of the public
445 * key of the namespace; otherwise NULL. Pointer will be to the
446 * end of this struct (so no need to free it).
448 const GNUNET_HashCode *namespace;
451 * Bloomfilter we use to filter out replies that we don't care about
452 * (anymore). NULL as long as we are interested in all replies.
454 struct GNUNET_CONTAINER_BloomFilter *bf;
457 * Context of our GNUNET_CORE_peer_change_preference call.
459 struct GNUNET_CORE_InformationRequestContext *irc;
462 * Reference to DHT get operation for this request (or NULL).
464 struct GNUNET_DHT_GetHandle *dht_get;
467 * Hash code of all replies that we have seen so far (only valid
468 * if client is not NULL since we only track replies like this for
471 GNUNET_HashCode *replies_seen;
474 * Node in the heap representing this entry; NULL
475 * if we have no heap node.
477 struct GNUNET_CONTAINER_HeapNode *hnode;
480 * Head of list of messages being performed on behalf of this
483 struct PendingMessageList *pending_head;
486 * Tail of list of messages being performed on behalf of this
489 struct PendingMessageList *pending_tail;
492 * When did we first see this request (form this peer), or, if our
493 * client is initiating, when did we last initiate a search?
495 struct GNUNET_TIME_Absolute start_time;
498 * The query that this request is for.
500 GNUNET_HashCode query;
503 * The task responsible for transmitting queries
506 GNUNET_SCHEDULER_TaskIdentifier task;
509 * (Interned) Peer identifier that identifies a preferred target
512 GNUNET_PEER_Id target_pid;
515 * (Interned) Peer identifiers of peers that have already
516 * received our query for this content.
518 GNUNET_PEER_Id *used_pids;
521 * Our entry in the queue (non-NULL while we wait for our
522 * turn to interact with the local database).
524 struct GNUNET_DATASTORE_QueueEntry *qe;
527 * Size of the 'bf' (in bytes).
532 * Desired anonymity level; only valid for requests from a local client.
534 uint32_t anonymity_level;
537 * How many entries in "used_pids" are actually valid?
539 unsigned int used_pids_off;
542 * How long is the "used_pids" array?
544 unsigned int used_pids_size;
547 * Number of results found for this request.
549 unsigned int results_found;
552 * How many entries in "replies_seen" are actually valid?
554 unsigned int replies_seen_off;
557 * How long is the "replies_seen" array?
559 unsigned int replies_seen_size;
562 * Priority with which this request was made. If one of our clients
563 * made the request, then this is the current priority that we are
564 * using when initiating the request. This value is used when
565 * we decide to reward other peers with trust for providing a reply.
570 * Priority points left for us to spend when forwarding this request
573 uint32_t remaining_priority;
576 * Number to mingle hashes for bloom-filter tests with.
581 * TTL with which we saw this request (or, if we initiated, TTL that
582 * we used for the request).
587 * Type of the content that this request is for.
589 enum GNUNET_BLOCK_Type type;
592 * Remove this request after transmission of the current response.
597 * GNUNET_YES if we should not forward this request to other peers.
602 * GNUNET_YES if we should not forward this request to other peers.
610 * Block that is ready for migration to other peers. Actual data is at the end of the block.
612 struct MigrationReadyBlock
616 * This is a doubly-linked list.
618 struct MigrationReadyBlock *next;
621 * This is a doubly-linked list.
623 struct MigrationReadyBlock *prev;
626 * Query for the block.
628 GNUNET_HashCode query;
631 * When does this block expire?
633 struct GNUNET_TIME_Absolute expiration;
636 * Peers we would consider forwarding this
637 * block to. Zero for empty entries.
639 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
647 * Number of targets already used.
649 unsigned int used_targets;
654 enum GNUNET_BLOCK_Type type;
659 * Our connection to the datastore.
661 static struct GNUNET_DATASTORE_Handle *dsh;
666 static struct GNUNET_BLOCK_Context *block_ctx;
669 * Our block configuration.
671 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
676 static struct GNUNET_SCHEDULER_Handle *sched;
681 static const struct GNUNET_CONFIGURATION_Handle *cfg;
684 * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
686 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
689 * Map of peer identifiers to "struct PendingRequest" (for that peer).
691 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
694 * Map of query identifiers to "struct PendingRequest" (for that query).
696 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
699 * Heap with the request that will expire next at the top. Contains
700 * pointers of type "struct PendingRequest*"; these will *also* be
701 * aliased from the "requests_by_peer" data structures and the
702 * "requests_by_query" table. Note that requests from our clients
703 * don't expire and are thus NOT in the "requests_by_expiration"
704 * (or the "requests_by_peer" tables).
706 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
709 * Handle for reporting statistics.
711 static struct GNUNET_STATISTICS_Handle *stats;
714 * Linked list of clients we are currently processing requests for.
716 static struct ClientList *client_list;
719 * Pointer to handle to the core service (points to NULL until we've
722 static struct GNUNET_CORE_Handle *core;
725 * Head of linked list of blocks that can be migrated.
727 static struct MigrationReadyBlock *mig_head;
730 * Tail of linked list of blocks that can be migrated.
732 static struct MigrationReadyBlock *mig_tail;
735 * Request to datastore for migration (or NULL).
737 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
740 * Request to datastore for DHT PUTs (or NULL).
742 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
745 * Type we will request for the next DHT PUT round from the datastore.
747 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
750 * Where do we store trust information?
752 static char *trustDirectory;
755 * ID of task that collects blocks for migration.
757 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
760 * ID of task that collects blocks for DHT PUTs.
762 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
765 * What is the maximum frequency at which we are allowed to
766 * poll the datastore for migration content?
768 static struct GNUNET_TIME_Relative min_migration_delay;
771 * Handle for DHT operations.
773 static struct GNUNET_DHT_Handle *dht_handle;
776 * Size of the doubly-linked list of migration blocks.
778 static unsigned int mig_size;
781 * Are we allowed to migrate content to this peer.
783 static int active_migration;
786 * How many entires with zero anonymity do we currently estimate
787 * to have in the database?
789 static unsigned int zero_anonymity_count_estimate;
792 * Typical priorities we're seeing from other peers right now. Since
793 * most priorities will be zero, this value is the weighted average of
794 * non-zero priorities seen "recently". In order to ensure that new
795 * values do not dramatically change the ratio, values are first
796 * "capped" to a reasonable range (+N of the current value) and then
797 * averaged into the existing value by a ratio of 1:N. Hence
798 * receiving the largest possible priority can still only raise our
799 * "current_priorities" by at most 1.
801 static double current_priorities;
804 * Datastore 'GET' load tracking.
806 static struct GNUNET_LOAD_Value *datastore_get_load;
809 * Datastore 'PUT' load tracking.
811 static struct GNUNET_LOAD_Value *datastore_put_load;
814 * How long do requests typically stay in the routing table?
816 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
819 * We've just now completed a datastore request. Update our
820 * datastore load calculations.
822 * @param start time when the datastore request was issued
825 update_datastore_delays (struct GNUNET_TIME_Absolute start)
827 struct GNUNET_TIME_Relative delay;
829 delay = GNUNET_TIME_absolute_get_duration (start);
830 GNUNET_LOAD_update (datastore_get_load,
836 * Get the filename under which we would store the GNUNET_HELLO_Message
837 * for the given host and protocol.
838 * @return filename of the form DIRECTORY/HOSTID
841 get_trust_filename (const struct GNUNET_PeerIdentity *id)
843 struct GNUNET_CRYPTO_HashAsciiEncoded fil;
846 GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
847 GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
854 * Transmit messages by copying it to the target buffer
855 * "buf". "buf" will be NULL and "size" zero if the socket was closed
856 * for writing in the meantime. In that case, do nothing
857 * (the disconnect or shutdown handler will take care of the rest).
858 * If we were able to transmit messages and there are still more
859 * pending, ask core again for further calls to this function.
861 * @param cls closure, pointer to the 'struct ConnectedPeer*'
862 * @param size number of bytes available in buf
863 * @param buf where the callee should write the message
864 * @return number of bytes written to buf
867 transmit_to_peer (void *cls,
868 size_t size, void *buf);
871 /* ******************* clean up functions ************************ */
874 * Delete the given migration block.
876 * @param mb block to delete
879 delete_migration_block (struct MigrationReadyBlock *mb)
881 GNUNET_CONTAINER_DLL_remove (mig_head,
884 GNUNET_PEER_decrement_rcs (mb->target_list,
885 MIGRATION_LIST_SIZE);
892 * Compare the distance of two peers to a key.
895 * @param p1 first peer
896 * @param p2 second peer
897 * @return GNUNET_YES if P1 is closer to key than P2
900 is_closer (const GNUNET_HashCode *key,
901 const struct GNUNET_PeerIdentity *p1,
902 const struct GNUNET_PeerIdentity *p2)
904 return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
911 * Consider migrating content to a given peer.
913 * @param cls 'struct MigrationReadyBlock*' to select
914 * targets for (or NULL for none)
915 * @param key ID of the peer
916 * @param value 'struct ConnectedPeer' of the peer
917 * @return GNUNET_YES (always continue iteration)
920 consider_migration (void *cls,
921 const GNUNET_HashCode *key,
924 struct MigrationReadyBlock *mb = cls;
925 struct ConnectedPeer *cp = value;
926 struct MigrationReadyBlock *pos;
927 struct GNUNET_PeerIdentity cppid;
928 struct GNUNET_PeerIdentity otherpid;
929 struct GNUNET_PeerIdentity worstpid;
934 /* consider 'cp' as a migration target for mb */
935 if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
936 return GNUNET_YES; /* peer has requested no migration! */
939 GNUNET_PEER_resolve (cp->pid,
941 repl = MIGRATION_LIST_SIZE;
942 for (i=0;i<MIGRATION_LIST_SIZE;i++)
944 if (mb->target_list[i] == 0)
946 mb->target_list[i] = cp->pid;
947 GNUNET_PEER_change_rc (mb->target_list[i], 1);
948 repl = MIGRATION_LIST_SIZE;
951 GNUNET_PEER_resolve (mb->target_list[i],
953 if ( (repl == MIGRATION_LIST_SIZE) &&
954 is_closer (&mb->query,
961 else if ( (repl != MIGRATION_LIST_SIZE) &&
962 (is_closer (&mb->query,
970 if (repl != MIGRATION_LIST_SIZE)
972 GNUNET_PEER_change_rc (mb->target_list[repl], -1);
973 mb->target_list[repl] = cp->pid;
974 GNUNET_PEER_change_rc (mb->target_list[repl], 1);
978 /* consider scheduling transmission to cp for content migration */
985 for (i=0;i<MIGRATION_LIST_SIZE;i++)
987 if (cp->pid == pos->target_list[i])
992 msize = GNUNET_MIN (msize,
1000 return GNUNET_YES; /* no content available */
1002 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1003 "Trying to migrate at least %u bytes to peer `%s'\n",
1008 = GNUNET_CORE_notify_transmit_ready (core,
1009 0, GNUNET_TIME_UNIT_FOREVER_REL,
1010 (const struct GNUNET_PeerIdentity*) key,
1011 msize + sizeof (struct PutMessage),
1019 * Task that is run periodically to obtain blocks for content
1023 * @param tc scheduler context (also unused)
1026 gather_migration_blocks (void *cls,
1027 const struct GNUNET_SCHEDULER_TaskContext *tc);
1033 * Task that is run periodically to obtain blocks for DHT PUTs.
1035 * @param cls type of blocks to gather
1036 * @param tc scheduler context (unused)
1039 gather_dht_put_blocks (void *cls,
1040 const struct GNUNET_SCHEDULER_TaskContext *tc);
1044 * If the migration task is not currently running, consider
1045 * (re)scheduling it with the appropriate delay.
1048 consider_migration_gathering ()
1050 struct GNUNET_TIME_Relative delay;
1056 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1058 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1060 delay = GNUNET_TIME_relative_divide (delay,
1061 MAX_MIGRATION_QUEUE);
1062 delay = GNUNET_TIME_relative_max (delay,
1063 min_migration_delay);
1064 mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1066 &gather_migration_blocks,
1072 * If the DHT PUT gathering task is not currently running, consider
1073 * (re)scheduling it with the appropriate delay.
1076 consider_dht_put_gathering (void *cls)
1078 struct GNUNET_TIME_Relative delay;
1084 if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1086 if (zero_anonymity_count_estimate > 0)
1088 delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1089 zero_anonymity_count_estimate);
1090 delay = GNUNET_TIME_relative_min (delay,
1095 /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1096 (hopefully) appear */
1097 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1099 dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1101 &gather_dht_put_blocks,
1107 * Process content offered for migration.
1109 * @param cls closure
1110 * @param key key for the content
1111 * @param size number of bytes in data
1112 * @param data content stored
1113 * @param type type of the content
1114 * @param priority priority of the content
1115 * @param anonymity anonymity-level for the content
1116 * @param expiration expiration time for the content
1117 * @param uid unique identifier for the datum;
1118 * maybe 0 if no unique identifier is available
1121 process_migration_content (void *cls,
1122 const GNUNET_HashCode * key,
1125 enum GNUNET_BLOCK_Type type,
1128 struct GNUNET_TIME_Absolute
1129 expiration, uint64_t uid)
1131 struct MigrationReadyBlock *mb;
1136 if (mig_size < MAX_MIGRATION_QUEUE)
1137 consider_migration_gathering ();
1140 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1143 GNUNET_FS_handle_on_demand_block (key, size, data,
1144 type, priority, anonymity,
1146 &process_migration_content,
1149 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1154 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1155 "Retrieved block `%s' of type %u for migration\n",
1159 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1161 mb->expiration = expiration;
1164 memcpy (&mb[1], data, size);
1165 GNUNET_CONTAINER_DLL_insert_after (mig_head,
1170 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1171 &consider_migration,
1173 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1178 * Function called upon completion of the DHT PUT operation.
1181 dht_put_continuation (void *cls,
1182 const struct GNUNET_SCHEDULER_TaskContext *tc)
1184 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1189 * Store content in DHT.
1191 * @param cls closure
1192 * @param key key for the content
1193 * @param size number of bytes in data
1194 * @param data content stored
1195 * @param type type of the content
1196 * @param priority priority of the content
1197 * @param anonymity anonymity-level for the content
1198 * @param expiration expiration time for the content
1199 * @param uid unique identifier for the datum;
1200 * maybe 0 if no unique identifier is available
1203 process_dht_put_content (void *cls,
1204 const GNUNET_HashCode * key,
1207 enum GNUNET_BLOCK_Type type,
1210 struct GNUNET_TIME_Absolute
1211 expiration, uint64_t uid)
1213 static unsigned int counter;
1214 static GNUNET_HashCode last_vhash;
1215 static GNUNET_HashCode vhash;
1220 consider_dht_put_gathering (cls);
1223 /* slightly funky code to estimate the total number of values with zero
1224 anonymity from the maximum observed length of a monotonically increasing
1225 sequence of hashes over the contents */
1226 GNUNET_CRYPTO_hash (data, size, &vhash);
1227 if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1229 if (zero_anonymity_count_estimate > 0)
1230 zero_anonymity_count_estimate /= 2;
1236 if (zero_anonymity_count_estimate < (1 << counter))
1237 zero_anonymity_count_estimate = (1 << counter);
1239 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1240 "Retrieved block `%s' of type %u for DHT PUT\n",
1244 GNUNET_DHT_put (dht_handle,
1251 GNUNET_TIME_UNIT_FOREVER_REL,
1252 &dht_put_continuation,
1258 * Task that is run periodically to obtain blocks for content
1262 * @param tc scheduler context (also unused)
1265 gather_migration_blocks (void *cls,
1266 const struct GNUNET_SCHEDULER_TaskContext *tc)
1268 mig_task = GNUNET_SCHEDULER_NO_TASK;
1271 mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1272 GNUNET_TIME_UNIT_FOREVER_REL,
1273 &process_migration_content, NULL);
1274 GNUNET_assert (mig_qe != NULL);
1280 * Task that is run periodically to obtain blocks for DHT PUTs.
1282 * @param cls type of blocks to gather
1283 * @param tc scheduler context (unused)
1286 gather_dht_put_blocks (void *cls,
1287 const struct GNUNET_SCHEDULER_TaskContext *tc)
1289 dht_task = GNUNET_SCHEDULER_NO_TASK;
1292 if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1293 dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1294 dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1295 GNUNET_TIME_UNIT_FOREVER_REL,
1297 &process_dht_put_content, NULL);
1298 GNUNET_assert (dht_qe != NULL);
1304 * We're done with a particular message list entry.
1305 * Free all associated resources.
1307 * @param pml entry to destroy
1310 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1312 GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1313 pml->req->pending_tail,
1315 GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1316 pml->target->pending_messages_tail,
1318 pml->target->pending_requests--;
1319 GNUNET_free (pml->pm);
1325 * Destroy the given pending message (and call the respective
1328 * @param pm message to destroy
1329 * @param tpid id of peer that the message was delivered to, or 0 for none
1332 destroy_pending_message (struct PendingMessage *pm,
1333 GNUNET_PEER_Id tpid)
1335 struct PendingMessageList *pml = pm->pml;
1336 TransmissionContinuation cont;
1341 GNUNET_assert (pml->pm == pm);
1342 GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1344 cont_cls = pm->cont_cls;
1345 destroy_pending_message_list_entry (pml);
1352 cont (cont_cls, tpid);
1357 * We're done processing a particular request.
1358 * Free all associated resources.
1360 * @param pr request to destroy
1363 destroy_pending_request (struct PendingRequest *pr)
1365 struct GNUNET_PeerIdentity pid;
1367 if (pr->hnode != NULL)
1369 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1373 if (NULL == pr->client_request_list)
1375 GNUNET_STATISTICS_update (stats,
1376 gettext_noop ("# P2P searches active"),
1382 GNUNET_STATISTICS_update (stats,
1383 gettext_noop ("# client searches active"),
1388 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1392 GNUNET_LOAD_update (rt_entry_lifetime,
1393 GNUNET_TIME_absolute_get_duration (pr->start_time).value);
1397 GNUNET_DATASTORE_cancel (pr->qe);
1400 if (pr->dht_get != NULL)
1402 GNUNET_DHT_get_stop (pr->dht_get);
1405 if (pr->client_request_list != NULL)
1407 GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1408 pr->client_request_list->client_list->rl_tail,
1409 pr->client_request_list);
1410 GNUNET_free (pr->client_request_list);
1411 pr->client_request_list = NULL;
1415 GNUNET_PEER_resolve (pr->cp->pid,
1417 (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1424 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1427 if (pr->irc != NULL)
1429 GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1432 if (pr->replies_seen != NULL)
1434 GNUNET_free (pr->replies_seen);
1435 pr->replies_seen = NULL;
1437 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1439 GNUNET_SCHEDULER_cancel (sched,
1441 pr->task = GNUNET_SCHEDULER_NO_TASK;
1443 while (NULL != pr->pending_head)
1444 destroy_pending_message_list_entry (pr->pending_head);
1445 GNUNET_PEER_change_rc (pr->target_pid, -1);
1446 if (pr->used_pids != NULL)
1448 GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1449 GNUNET_free (pr->used_pids);
1450 pr->used_pids_off = 0;
1451 pr->used_pids_size = 0;
1452 pr->used_pids = NULL;
1459 * Method called whenever a given peer connects.
1461 * @param cls closure, not used
1462 * @param peer peer identity this notification is about
1463 * @param latency reported latency of the connection with 'other'
1464 * @param distance reported distance (DV) to 'other'
1467 peer_connect_handler (void *cls,
1469 GNUNET_PeerIdentity * peer,
1470 struct GNUNET_TIME_Relative latency,
1473 struct ConnectedPeer *cp;
1474 struct MigrationReadyBlock *pos;
1478 cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1479 cp->transmission_delay = GNUNET_LOAD_value_init ();
1480 cp->pid = GNUNET_PEER_intern (peer);
1482 fn = get_trust_filename (peer);
1483 if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1484 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1485 cp->disk_trust = cp->trust = ntohl (trust);
1488 GNUNET_break (GNUNET_OK ==
1489 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1492 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1497 (void) consider_migration (pos, &peer->hashPubKey, cp);
1504 * Increase the host credit by a value.
1506 * @param host which peer to change the trust value on
1507 * @param value is the int value by which the
1508 * host credit is to be increased or decreased
1509 * @returns the actual change in trust (positive or negative)
1512 change_host_trust (struct ConnectedPeer *host, int value)
1514 unsigned int old_trust;
1518 GNUNET_assert (host != NULL);
1519 old_trust = host->trust;
1522 if (host->trust + value < host->trust)
1524 value = UINT32_MAX - host->trust;
1525 host->trust = UINT32_MAX;
1528 host->trust += value;
1532 if (host->trust < -value)
1534 value = -host->trust;
1538 host->trust += value;
1545 * Write host-trust information to a file - flush the buffer entry!
1548 flush_trust (void *cls,
1549 const GNUNET_HashCode *key,
1552 struct ConnectedPeer *host = value;
1555 struct GNUNET_PeerIdentity pid;
1557 if (host->trust == host->disk_trust)
1558 return GNUNET_OK; /* unchanged */
1559 GNUNET_PEER_resolve (host->pid,
1561 fn = get_trust_filename (&pid);
1562 if (host->trust == 0)
1564 if ((0 != UNLINK (fn)) && (errno != ENOENT))
1565 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1566 GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1570 trust = htonl (host->trust);
1571 if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust,
1573 GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1574 | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1575 host->disk_trust = host->trust;
1582 * Call this method periodically to scan data/hosts for new hosts.
1585 cron_flush_trust (void *cls,
1586 const struct GNUNET_SCHEDULER_TaskContext *tc)
1589 if (NULL == connected_peers)
1591 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1596 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1598 GNUNET_SCHEDULER_add_delayed (tc->sched,
1599 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1604 * Free (each) request made by the peer.
1606 * @param cls closure, points to peer that the request belongs to
1607 * @param key current key code
1608 * @param value value in the hash map
1609 * @return GNUNET_YES (we should continue to iterate)
1612 destroy_request (void *cls,
1613 const GNUNET_HashCode * key,
1616 const struct GNUNET_PeerIdentity * peer = cls;
1617 struct PendingRequest *pr = value;
1619 GNUNET_break (GNUNET_YES ==
1620 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1623 destroy_pending_request (pr);
1629 * Method called whenever a peer disconnects.
1631 * @param cls closure, not used
1632 * @param peer peer identity this notification is about
1635 peer_disconnect_handler (void *cls,
1637 GNUNET_PeerIdentity * peer)
1639 struct ConnectedPeer *cp;
1640 struct PendingMessage *pm;
1642 struct MigrationReadyBlock *pos;
1643 struct MigrationReadyBlock *next;
1645 GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1649 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1653 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1655 if (NULL != cp->last_client_replies[i])
1657 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1658 cp->last_client_replies[i] = NULL;
1661 GNUNET_break (GNUNET_YES ==
1662 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1665 /* remove this peer from migration considerations; schedule
1668 while (NULL != (pos = next))
1671 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1673 if (pos->target_list[i] == cp->pid)
1675 GNUNET_PEER_change_rc (pos->target_list[i], -1);
1676 pos->target_list[i] = 0;
1679 if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1681 delete_migration_block (pos);
1682 consider_migration_gathering ();
1685 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1686 &consider_migration,
1689 GNUNET_PEER_change_rc (cp->pid, -1);
1690 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1691 if (NULL != cp->cth)
1692 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1693 while (NULL != (pm = cp->pending_messages_head))
1694 destroy_pending_message (pm, 0 /* delivery failed */);
1695 GNUNET_LOAD_value_free (cp->transmission_delay);
1696 GNUNET_break (0 == cp->pending_requests);
1702 * Iterator over hash map entries that removes all occurences
1703 * of the given 'client' from the 'last_client_replies' of the
1704 * given connected peer.
1706 * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1707 * @param key current key code (unused)
1708 * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1709 * @return GNUNET_YES (we should continue to iterate)
1712 remove_client_from_last_client_replies (void *cls,
1713 const GNUNET_HashCode * key,
1716 struct GNUNET_SERVER_Client *client = cls;
1717 struct ConnectedPeer *cp = value;
1720 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1722 if (cp->last_client_replies[i] == client)
1724 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1725 cp->last_client_replies[i] = NULL;
1733 * A client disconnected. Remove all of its pending queries.
1735 * @param cls closure, NULL
1736 * @param client identification of the client
1739 handle_client_disconnect (void *cls,
1740 struct GNUNET_SERVER_Client
1743 struct ClientList *pos;
1744 struct ClientList *prev;
1745 struct ClientRequestList *rcl;
1746 struct ClientResponseMessage *creply;
1752 while ( (NULL != pos) &&
1753 (pos->client != client) )
1759 return; /* no requests pending for this client */
1760 while (NULL != (rcl = pos->rl_head))
1762 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1763 "Destroying pending request `%s' on disconnect\n",
1764 GNUNET_h2s (&rcl->req->query));
1765 destroy_pending_request (rcl->req);
1768 client_list = pos->next;
1770 prev->next = pos->next;
1771 if (pos->th != NULL)
1773 GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1776 while (NULL != (creply = pos->res_head))
1778 GNUNET_CONTAINER_DLL_remove (pos->res_head,
1781 GNUNET_free (creply);
1783 GNUNET_SERVER_client_drop (pos->client);
1785 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1786 &remove_client_from_last_client_replies,
1792 * Iterator to free peer entries.
1794 * @param cls closure, unused
1795 * @param key current key code
1796 * @param value value in the hash map (peer entry)
1797 * @return GNUNET_YES (we should continue to iterate)
1800 clean_peer (void *cls,
1801 const GNUNET_HashCode * key,
1804 peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1810 * Task run during shutdown.
1816 shutdown_task (void *cls,
1817 const struct GNUNET_SCHEDULER_TaskContext *tc)
1821 GNUNET_DATASTORE_cancel (mig_qe);
1826 GNUNET_DATASTORE_cancel (dht_qe);
1829 if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1831 GNUNET_SCHEDULER_cancel (sched, mig_task);
1832 mig_task = GNUNET_SCHEDULER_NO_TASK;
1834 if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1836 GNUNET_SCHEDULER_cancel (sched, dht_task);
1837 dht_task = GNUNET_SCHEDULER_NO_TASK;
1839 while (client_list != NULL)
1840 handle_client_disconnect (NULL,
1841 client_list->client);
1842 cron_flush_trust (NULL, NULL);
1843 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1846 GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1847 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1848 requests_by_expiration_heap = 0;
1849 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1850 connected_peers = NULL;
1851 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1852 GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1853 query_request_map = NULL;
1854 GNUNET_LOAD_value_free (rt_entry_lifetime);
1855 rt_entry_lifetime = NULL;
1856 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1857 GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1858 peer_request_map = NULL;
1859 GNUNET_assert (NULL != core);
1860 GNUNET_CORE_disconnect (core);
1864 GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1869 GNUNET_DATASTORE_disconnect (dsh,
1873 while (mig_head != NULL)
1874 delete_migration_block (mig_head);
1875 GNUNET_assert (0 == mig_size);
1876 GNUNET_DHT_disconnect (dht_handle);
1878 GNUNET_LOAD_value_free (datastore_get_load);
1879 datastore_get_load = NULL;
1880 GNUNET_LOAD_value_free (datastore_put_load);
1881 datastore_put_load = NULL;
1882 GNUNET_BLOCK_context_destroy (block_ctx);
1884 GNUNET_CONFIGURATION_destroy (block_cfg);
1888 GNUNET_free_non_null (trustDirectory);
1889 trustDirectory = NULL;
1893 /* ******************* Utility functions ******************** */
1897 * Transmit messages by copying it to the target buffer
1898 * "buf". "buf" will be NULL and "size" zero if the socket was closed
1899 * for writing in the meantime. In that case, do nothing
1900 * (the disconnect or shutdown handler will take care of the rest).
1901 * If we were able to transmit messages and there are still more
1902 * pending, ask core again for further calls to this function.
1904 * @param cls closure, pointer to the 'struct ConnectedPeer*'
1905 * @param size number of bytes available in buf
1906 * @param buf where the callee should write the message
1907 * @return number of bytes written to buf
1910 transmit_to_peer (void *cls,
1911 size_t size, void *buf)
1913 struct ConnectedPeer *cp = cls;
1915 struct GNUNET_PeerIdentity pid;
1916 struct PendingMessage *pm;
1917 struct MigrationReadyBlock *mb;
1918 struct MigrationReadyBlock *next;
1919 struct PutMessage migm;
1927 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1928 "Dropping message, core too busy.\n");
1930 GNUNET_LOAD_update (cp->transmission_delay,
1934 GNUNET_LOAD_update (cp->transmission_delay,
1935 GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
1937 while ( (NULL != (pm = cp->pending_messages_head) ) &&
1938 (pm->msize <= size) )
1940 memcpy (&cbuf[msize], &pm[1], pm->msize);
1943 destroy_pending_message (pm, cp->pid);
1947 GNUNET_PEER_resolve (cp->pid,
1949 cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
1950 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1952 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1961 while (NULL != (mb = next))
1964 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1966 if ( (cp->pid == mb->target_list[i]) &&
1967 (mb->size + sizeof (migm) <= size) )
1969 GNUNET_PEER_change_rc (mb->target_list[i], -1);
1970 mb->target_list[i] = 0;
1972 memset (&migm, 0, sizeof (migm));
1973 migm.header.size = htons (sizeof (migm) + mb->size);
1974 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1975 migm.type = htonl (mb->type);
1976 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1977 memcpy (&cbuf[msize], &migm, sizeof (migm));
1978 msize += sizeof (migm);
1979 size -= sizeof (migm);
1980 memcpy (&cbuf[msize], &mb[1], mb->size);
1984 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1985 "Pushing migration block `%s' (%u bytes) to `%s'\n",
1986 GNUNET_h2s (&mb->query),
1995 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1996 "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1997 GNUNET_h2s (&mb->query),
2003 if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2004 (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2006 delete_migration_block (mb);
2007 consider_migration_gathering ();
2010 consider_migration (NULL,
2015 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2016 "Transmitting %u bytes to peer %u\n",
2025 * Add a message to the set of pending messages for the given peer.
2027 * @param cp peer to send message to
2028 * @param pm message to queue
2029 * @param pr request on which behalf this message is being queued
2032 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2033 struct PendingMessage *pm,
2034 struct PendingRequest *pr)
2036 struct PendingMessage *pos;
2037 struct PendingMessageList *pml;
2038 struct GNUNET_PeerIdentity pid;
2040 GNUNET_assert (pm->next == NULL);
2041 GNUNET_assert (pm->pml == NULL);
2044 pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2049 GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2053 pos = cp->pending_messages_head;
2054 while ( (pos != NULL) &&
2055 (pm->priority < pos->priority) )
2057 GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2058 cp->pending_messages_tail,
2061 cp->pending_requests++;
2062 if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2063 destroy_pending_message (cp->pending_messages_tail, 0);
2064 GNUNET_PEER_resolve (cp->pid, &pid);
2065 if (NULL != cp->cth)
2066 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2067 /* need to schedule transmission */
2068 cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2069 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2070 cp->pending_messages_head->priority,
2073 cp->pending_messages_head->msize,
2076 if (cp->cth == NULL)
2079 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2080 "Failed to schedule transmission with core!\n");
2082 GNUNET_STATISTICS_update (stats,
2083 gettext_noop ("# CORE transmission failures"),
2091 * Test if the DATABASE (GET) load on this peer is too high
2092 * to even consider processing the query at
2095 * @return GNUNET_YES if the load is too high to do anything (load high)
2096 * GNUNET_NO to process normally (load normal)
2097 * GNUNET_SYSERR to process for free (load low)
2100 test_get_load_too_high (uint32_t priority)
2104 ld = GNUNET_LOAD_get_load (datastore_get_load);
2107 GNUNET_STATISTICS_update (stats,
2108 gettext_noop ("# requests done for free (low load)"),
2111 return GNUNET_SYSERR;
2115 GNUNET_STATISTICS_update (stats,
2116 gettext_noop ("# requests done for a price (normal load)"),
2121 GNUNET_STATISTICS_update (stats,
2122 gettext_noop ("# requests dropped due to high load"),
2132 * Test if the DATABASE (PUT) load on this peer is too high
2133 * to even consider processing the query at
2136 * @return GNUNET_YES if the load is too high to do anything (load high)
2137 * GNUNET_NO to process normally (load normal or low)
2140 test_put_load_too_high (uint32_t priority)
2144 if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2145 return GNUNET_NO; /* very fast */
2146 ld = GNUNET_LOAD_get_load (datastore_put_load);
2147 if ( (ld < 1) || (ld < priority) )
2149 GNUNET_STATISTICS_update (stats,
2150 gettext_noop ("# storage requests dropped due to high load"),
2157 /* ******************* Pending Request Refresh Task ******************** */
2162 * We use a random delay to make the timing of requests less
2163 * predictable. This function returns such a random delay. We add a base
2164 * delay of MAX_CORK_DELAY (1s).
2166 * FIXME: make schedule dependent on the specifics of the request?
2167 * Or bandwidth and number of connected peers and load?
2169 * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2171 static struct GNUNET_TIME_Relative
2172 get_processing_delay ()
2175 GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2176 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2177 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2183 * We're processing a GET request from another peer and have decided
2184 * to forward it to other peers. This function is called periodically
2185 * and should forward the request to other peers until we have all
2186 * possible replies. If we have transmitted the *only* reply to
2187 * the initiator we should destroy the pending request. If we have
2188 * many replies in the queue to the initiator, we should delay sending
2189 * out more queries until the reply queue has shrunk some.
2191 * @param cls our "struct ProcessGetContext *"
2195 forward_request_task (void *cls,
2196 const struct GNUNET_SCHEDULER_TaskContext *tc);
2200 * Function called after we either failed or succeeded
2201 * at transmitting a query to a peer.
2203 * @param cls the requests "struct PendingRequest*"
2204 * @param tpid ID of receiving peer, 0 on transmission error
2207 transmit_query_continuation (void *cls,
2208 GNUNET_PEER_Id tpid)
2210 struct PendingRequest *pr = cls;
2212 GNUNET_STATISTICS_update (stats,
2213 gettext_noop ("# queries scheduled for forwarding"),
2219 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2220 "Transmission of request failed, will try again later.\n");
2222 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2223 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2224 get_processing_delay (),
2225 &forward_request_task,
2229 GNUNET_STATISTICS_update (stats,
2230 gettext_noop ("# queries forwarded"),
2233 GNUNET_PEER_change_rc (tpid, 1);
2234 if (pr->used_pids_off == pr->used_pids_size)
2235 GNUNET_array_grow (pr->used_pids,
2237 pr->used_pids_size * 2 + 2);
2238 pr->used_pids[pr->used_pids_off++] = tpid;
2239 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2240 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2241 get_processing_delay (),
2242 &forward_request_task,
2248 * How many bytes should a bloomfilter be if we have already seen
2249 * entry_count responses? Note that BLOOMFILTER_K gives us the number
2250 * of bits set per entry. Furthermore, we should not re-size the
2251 * filter too often (to keep it cheap).
2253 * Since other peers will also add entries but not resize the filter,
2254 * we should generally pick a slightly larger size than what the
2255 * strict math would suggest.
2257 * @return must be a power of two and smaller or equal to 2^15.
2260 compute_bloomfilter_size (unsigned int entry_count)
2263 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2264 uint16_t max = 1 << 15;
2266 if (entry_count > max)
2269 while ((size < max) && (size < ideal))
2278 * Recalculate our bloom filter for filtering replies. This function
2279 * will create a new bloom filter from scratch, so it should only be
2280 * called if we have no bloomfilter at all (and hence can create a
2281 * fresh one of minimal size without problems) OR if our peer is the
2282 * initiator (in which case we may resize to larger than mimimum size).
2284 * @param pr request for which the BF is to be recomputed
2287 refresh_bloomfilter (struct PendingRequest *pr)
2291 GNUNET_HashCode mhash;
2293 nsize = compute_bloomfilter_size (pr->replies_seen_off);
2294 if (nsize == pr->bf_size)
2295 return; /* size not changed */
2297 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2298 pr->bf_size = nsize;
2299 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2300 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2303 for (i=0;i<pr->replies_seen_off;i++)
2305 GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2308 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2314 * Function called after we've tried to reserve a certain amount of
2315 * bandwidth for a reply. Check if we succeeded and if so send our
2318 * @param cls the requests "struct PendingRequest*"
2319 * @param peer identifies the peer
2320 * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2321 * @param bpm_out set to the current bandwidth limit (sending) for this peer
2322 * @param amount set to the amount that was actually reserved or unreserved
2323 * @param preference current traffic preference for the given peer
2326 target_reservation_cb (void *cls,
2328 GNUNET_PeerIdentity * peer,
2329 struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2330 struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2332 uint64_t preference)
2334 struct PendingRequest *pr = cls;
2335 struct ConnectedPeer *cp;
2336 struct PendingMessage *pm;
2337 struct GetMessage *gm;
2338 GNUNET_HashCode *ext;
2348 /* error in communication with core, try again later */
2349 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2350 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2351 get_processing_delay (),
2352 &forward_request_task,
2356 // (3) transmit, update ttl/priority
2357 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2361 /* Peer must have just left */
2363 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2364 "Selected peer disconnected!\n");
2366 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2367 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2368 get_processing_delay (),
2369 &forward_request_task,
2373 no_route = GNUNET_NO;
2379 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2380 "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2384 GNUNET_STATISTICS_update (stats,
2385 gettext_noop ("# reply bandwidth reservation requests failed"),
2388 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2389 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2390 get_processing_delay (),
2391 &forward_request_task,
2393 return; /* this target round failed */
2395 no_route = GNUNET_YES;
2398 GNUNET_STATISTICS_update (stats,
2399 gettext_noop ("# queries scheduled for forwarding"),
2402 /* build message and insert message into priority queue */
2404 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2405 "Forwarding request `%s' to `%4s'!\n",
2406 GNUNET_h2s (&pr->query),
2411 if (GNUNET_YES == no_route)
2413 bm |= GET_MESSAGE_BIT_RETURN_TO;
2416 if (pr->namespace != NULL)
2418 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2421 if (pr->target_pid != 0)
2423 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2426 msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2427 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2428 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2430 gm = (struct GetMessage*) &pm[1];
2431 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2432 gm->header.size = htons (msize);
2433 gm->type = htonl (pr->type);
2434 pr->remaining_priority /= 2;
2435 gm->priority = htonl (pr->remaining_priority);
2436 gm->ttl = htonl (pr->ttl);
2437 gm->filter_mutator = htonl(pr->mingle);
2438 gm->hash_bitmap = htonl (bm);
2439 gm->query = pr->query;
2440 ext = (GNUNET_HashCode*) &gm[1];
2442 if (GNUNET_YES == no_route)
2443 GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2444 if (pr->namespace != NULL)
2445 memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2446 if (pr->target_pid != 0)
2447 GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2448 bfdata = (char *) &ext[k];
2450 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2453 pm->cont = &transmit_query_continuation;
2455 add_to_pending_messages_for_peer (cp, pm, pr);
2460 * Closure used for "target_peer_select_cb".
2462 struct PeerSelectionContext
2465 * The request for which we are selecting
2468 struct PendingRequest *pr;
2471 * Current "prime" target.
2473 struct GNUNET_PeerIdentity target;
2476 * How much do we like this target?
2478 double target_score;
2484 * Function called for each connected peer to determine
2485 * which one(s) would make good targets for forwarding.
2487 * @param cls closure (struct PeerSelectionContext)
2488 * @param key current key code (peer identity)
2489 * @param value value in the hash map (struct ConnectedPeer)
2490 * @return GNUNET_YES if we should continue to
2495 target_peer_select_cb (void *cls,
2496 const GNUNET_HashCode * key,
2499 struct PeerSelectionContext *psc = cls;
2500 struct ConnectedPeer *cp = value;
2501 struct PendingRequest *pr = psc->pr;
2506 /* 1) check that this peer is not the initiator */
2510 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2511 "Skipping initiator in forwarding selection\n");
2513 return GNUNET_YES; /* skip */
2516 /* 2) check if we have already (recently) forwarded to this peer */
2518 for (i=0;i<pr->used_pids_off;i++)
2519 if (pr->used_pids[i] == cp->pid)
2522 if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2523 RETRY_PROBABILITY_INV))
2526 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2527 "NOT re-trying query that was previously transmitted %u times\n",
2528 (unsigned int) pr->used_pids_off);
2530 return GNUNET_YES; /* skip */
2535 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2536 "Re-trying query that was previously transmitted %u times to this peer\n",
2539 /* 3) calculate how much we'd like to forward to this peer,
2540 starting with a random value that is strong enough
2541 to at least give any peer a chance sometimes
2542 (compared to the other factors that come later) */
2543 /* 3a) count successful (recent) routes from cp for same source */
2546 score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2547 P2P_SUCCESS_LIST_SIZE);
2548 for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2549 if (cp->last_p2p_replies[i] == pr->cp->pid)
2550 score += 1.0; /* likely successful based on hot path */
2554 score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2555 CS2P_SUCCESS_LIST_SIZE);
2556 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2557 if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2558 score += 1.0; /* likely successful based on hot path */
2560 /* 3b) include latency */
2561 if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2562 score += 1.0; /* likely fast based on latency */
2563 /* 3c) include priorities */
2564 if (cp->avg_priority <= pr->remaining_priority / 2.0)
2565 score += 1.0; /* likely successful based on priorities */
2566 /* 3d) penalize for queue size */
2567 score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER);
2568 /* 3e) include peer proximity */
2569 score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2570 &pr->query)) / (double) UINT32_MAX);
2571 /* 4) super-bonus for being the known target */
2572 if (pr->target_pid == cp->pid)
2574 /* store best-fit in closure */
2576 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2577 "Peer `%s' gets score %f for forwarding query, max is %f\n",
2582 score++; /* avoid zero */
2583 if (score > psc->target_score)
2585 psc->target_score = score;
2586 psc->target.hashPubKey = *key;
2593 * The priority level imposes a bound on the maximum
2594 * value for the ttl that can be requested.
2596 * @param ttl_in requested ttl
2597 * @param prio given priority
2598 * @return ttl_in if ttl_in is below the limit,
2599 * otherwise the ttl-limit for the given priority
2602 bound_ttl (int32_t ttl_in, uint32_t prio)
2604 unsigned long long allowed;
2608 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
2609 if (ttl_in > allowed)
2611 if (allowed >= (1 << 30))
2620 * Iterator called on each result obtained for a DHT
2621 * operation that expects a reply
2623 * @param cls closure
2624 * @param exp when will this value expire
2625 * @param key key of the result
2626 * @param get_path NULL-terminated array of pointers
2627 * to the peers on reverse GET path (or NULL if not recorded)
2628 * @param put_path NULL-terminated array of pointers
2629 * to the peers on the PUT path (or NULL if not recorded)
2630 * @param type type of the result
2631 * @param size number of bytes in data
2632 * @param data pointer to the result data
2635 process_dht_reply (void *cls,
2636 struct GNUNET_TIME_Absolute exp,
2637 const GNUNET_HashCode * key,
2638 const struct GNUNET_PeerIdentity * const *get_path,
2639 const struct GNUNET_PeerIdentity * const *put_path,
2640 enum GNUNET_BLOCK_Type type,
2646 * We're processing a GET request and have decided
2647 * to forward it to other peers. This function is called periodically
2648 * and should forward the request to other peers until we have all
2649 * possible replies. If we have transmitted the *only* reply to
2650 * the initiator we should destroy the pending request. If we have
2651 * many replies in the queue to the initiator, we should delay sending
2652 * out more queries until the reply queue has shrunk some.
2654 * @param cls our "struct ProcessGetContext *"
2658 forward_request_task (void *cls,
2659 const struct GNUNET_SCHEDULER_TaskContext *tc)
2661 struct PendingRequest *pr = cls;
2662 struct PeerSelectionContext psc;
2663 struct ConnectedPeer *cp;
2664 struct GNUNET_TIME_Relative delay;
2666 pr->task = GNUNET_SCHEDULER_NO_TASK;
2667 if (pr->irc != NULL)
2670 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2671 "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2672 GNUNET_h2s (&pr->query));
2674 return; /* already pending */
2676 if (GNUNET_YES == pr->local_only)
2677 return; /* configured to not do P2P search */
2679 if ( (0 == pr->anonymity_level) &&
2680 (GNUNET_YES != pr->forward_only) &&
2681 (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2682 (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2684 pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2685 GNUNET_TIME_UNIT_FOREVER_REL,
2692 (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2696 /* (1) select target */
2698 psc.target_score = -DBL_MAX;
2699 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2700 &target_peer_select_cb,
2702 if (psc.target_score == -DBL_MAX)
2704 delay = get_processing_delay ();
2706 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2707 "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2708 GNUNET_h2s (&pr->query),
2711 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2713 &forward_request_task,
2715 return; /* nobody selected */
2717 /* (3) update TTL/priority */
2718 if (pr->client_request_list != NULL)
2720 /* FIXME: use better algorithm!? */
2721 if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2724 /* bound priority we use by priorities we see from other peers
2725 rounded up (must round up so that we can see non-zero
2726 priorities, but round up as little as possible to make it
2727 plausible that we forwarded another peers request) */
2728 if (pr->priority > current_priorities + 1.0)
2729 pr->priority = (uint32_t) current_priorities + 1.0;
2730 pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2734 "Trying query `%s' with priority %u and TTL %d.\n",
2735 GNUNET_h2s (&pr->query),
2741 /* (3) reserve reply bandwidth */
2742 if (GNUNET_NO == pr->forward_only)
2744 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2745 &psc.target.hashPubKey);
2746 GNUNET_assert (NULL != cp);
2747 pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2749 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2750 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2753 &target_reservation_cb,
2755 cp->inc_preference = 0;
2759 /* force forwarding */
2760 static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
2761 target_reservation_cb (pr, &psc.target,
2762 zerobw, zerobw, 0, 0.0);
2767 /* **************************** P2P PUT Handling ************************ */
2771 * Function called after we either failed or succeeded
2772 * at transmitting a reply to a peer.
2774 * @param cls the requests "struct PendingRequest*"
2775 * @param tpid ID of receiving peer, 0 on transmission error
2778 transmit_reply_continuation (void *cls,
2779 GNUNET_PEER_Id tpid)
2781 struct PendingRequest *pr = cls;
2785 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2786 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2787 /* only one reply expected, done with the request! */
2788 destroy_pending_request (pr);
2790 case GNUNET_BLOCK_TYPE_ANY:
2791 case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2792 case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2802 * Transmit the given message by copying it to the target buffer
2803 * "buf". "buf" will be NULL and "size" zero if the socket was closed
2804 * for writing in the meantime. In that case, do nothing
2805 * (the disconnect or shutdown handler will take care of the rest).
2806 * If we were able to transmit messages and there are still more
2807 * pending, ask core again for further calls to this function.
2809 * @param cls closure, pointer to the 'struct ClientList*'
2810 * @param size number of bytes available in buf
2811 * @param buf where the callee should write the message
2812 * @return number of bytes written to buf
2815 transmit_to_client (void *cls,
2816 size_t size, void *buf)
2818 struct ClientList *cl = cls;
2820 struct ClientResponseMessage *creply;
2827 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2828 "Not sending reply, client communication problem.\n");
2833 while ( (NULL != (creply = cl->res_head) ) &&
2834 (creply->msize <= size) )
2836 memcpy (&cbuf[msize], &creply[1], creply->msize);
2837 msize += creply->msize;
2838 size -= creply->msize;
2839 GNUNET_CONTAINER_DLL_remove (cl->res_head,
2842 GNUNET_free (creply);
2845 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2847 GNUNET_TIME_UNIT_FOREVER_REL,
2848 &transmit_to_client,
2851 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2852 "Transmitted %u bytes to client\n",
2853 (unsigned int) msize);
2860 * Closure for "process_reply" function.
2862 struct ProcessReplyClosure
2865 * The data for the reply.
2870 * Who gave us this reply? NULL for local host (or DHT)
2872 struct ConnectedPeer *sender;
2875 * When the reply expires.
2877 struct GNUNET_TIME_Absolute expiration;
2885 * Type of the block.
2887 enum GNUNET_BLOCK_Type type;
2890 * How much was this reply worth to us?
2895 * Evaluation result (returned).
2897 enum GNUNET_BLOCK_EvaluationResult eval;
2900 * Did we finish processing the associated request?
2905 * Did we find a matching request?
2912 * We have received a reply; handle it!
2914 * @param cls response (struct ProcessReplyClosure)
2915 * @param key our query
2916 * @param value value in the hash map (info about the query)
2917 * @return GNUNET_YES (we should continue to iterate)
2920 process_reply (void *cls,
2921 const GNUNET_HashCode * key,
2924 struct ProcessReplyClosure *prq = cls;
2925 struct PendingRequest *pr = value;
2926 struct PendingMessage *reply;
2927 struct ClientResponseMessage *creply;
2928 struct ClientList *cl;
2929 struct PutMessage *pm;
2930 struct ConnectedPeer *cp;
2931 struct GNUNET_TIME_Relative cur_delay;
2935 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2936 "Matched result (type %u) for query `%s' with pending request\n",
2937 (unsigned int) prq->type,
2940 GNUNET_STATISTICS_update (stats,
2941 gettext_noop ("# replies received and matched"),
2944 if (prq->sender != NULL)
2946 cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2947 prq->sender->avg_delay.value
2948 = (prq->sender->avg_delay.value *
2949 (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N;
2950 prq->sender->avg_priority
2951 = (prq->sender->avg_priority *
2952 (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2955 GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2956 [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
2958 GNUNET_PEER_change_rc (pr->cp->pid, 1);
2959 prq->sender->last_p2p_replies
2960 [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2965 if (NULL != prq->sender->last_client_replies
2966 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2967 GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2968 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2969 prq->sender->last_client_replies
2970 [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2971 = pr->client_request_list->client_list->client;
2972 GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2975 prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
2980 pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2985 case GNUNET_BLOCK_EVALUATION_OK_MORE:
2987 case GNUNET_BLOCK_EVALUATION_OK_LAST:
2988 while (NULL != pr->pending_head)
2989 destroy_pending_message_list_entry (pr->pending_head);
2992 if (pr->client_request_list != NULL)
2993 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
2995 GNUNET_DATASTORE_cancel (pr->qe);
2998 pr->do_remove = GNUNET_YES;
2999 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3001 GNUNET_SCHEDULER_cancel (sched,
3003 pr->task = GNUNET_SCHEDULER_NO_TASK;
3005 GNUNET_break (GNUNET_YES ==
3006 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3009 GNUNET_LOAD_update (rt_entry_lifetime,
3010 GNUNET_TIME_absolute_get_duration (pr->start_time).value);
3012 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3013 GNUNET_STATISTICS_update (stats,
3014 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3018 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3019 "Duplicate response `%s', discarding.\n",
3020 GNUNET_h2s (&mhash));
3022 return GNUNET_YES; /* duplicate */
3023 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3024 return GNUNET_YES; /* wrong namespace */
3025 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3028 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3031 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3032 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3033 _("Unsupported block type %u\n"),
3037 if (pr->client_request_list != NULL)
3039 if (pr->replies_seen_size == pr->replies_seen_off)
3040 GNUNET_array_grow (pr->replies_seen,
3041 pr->replies_seen_size,
3042 pr->replies_seen_size * 2 + 4);
3043 GNUNET_CRYPTO_hash (prq->data,
3045 &pr->replies_seen[pr->replies_seen_off++]);
3046 refresh_bloomfilter (pr);
3048 if (NULL == prq->sender)
3051 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3052 "Found result for query `%s' in local datastore\n",
3055 GNUNET_STATISTICS_update (stats,
3056 gettext_noop ("# results found locally"),
3060 prq->priority += pr->remaining_priority;
3061 pr->remaining_priority = 0;
3062 pr->results_found++;
3063 prq->request_found = GNUNET_YES;
3064 if (NULL != pr->client_request_list)
3066 GNUNET_STATISTICS_update (stats,
3067 gettext_noop ("# replies received for local clients"),
3070 cl = pr->client_request_list->client_list;
3071 msize = sizeof (struct PutMessage) + prq->size;
3072 creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3073 creply->msize = msize;
3074 creply->client_list = cl;
3075 GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3079 pm = (struct PutMessage*) &creply[1];
3080 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3081 pm->header.size = htons (msize);
3082 pm->type = htonl (prq->type);
3083 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3084 memcpy (&pm[1], prq->data, prq->size);
3088 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3089 "Transmitting result for query `%s' to client\n",
3092 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3094 GNUNET_TIME_UNIT_FOREVER_REL,
3095 &transmit_to_client,
3098 GNUNET_break (cl->th != NULL);
3101 prq->finished = GNUNET_YES;
3102 destroy_pending_request (pr);
3109 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3110 "Transmitting result for query `%s' to other peer (PID=%u)\n",
3112 (unsigned int) cp->pid);
3114 GNUNET_STATISTICS_update (stats,
3115 gettext_noop ("# replies received for other peers"),
3118 msize = sizeof (struct PutMessage) + prq->size;
3119 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3120 reply->cont = &transmit_reply_continuation;
3121 reply->cont_cls = pr;
3122 reply->msize = msize;
3123 reply->priority = UINT32_MAX; /* send replies first! */
3124 pm = (struct PutMessage*) &reply[1];
3125 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3126 pm->header.size = htons (msize);
3127 pm->type = htonl (prq->type);
3128 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3129 memcpy (&pm[1], prq->data, prq->size);
3130 add_to_pending_messages_for_peer (cp, reply, pr);
3137 * Iterator called on each result obtained for a DHT
3138 * operation that expects a reply
3140 * @param cls closure
3141 * @param exp when will this value expire
3142 * @param key key of the result
3143 * @param get_path NULL-terminated array of pointers
3144 * to the peers on reverse GET path (or NULL if not recorded)
3145 * @param put_path NULL-terminated array of pointers
3146 * to the peers on the PUT path (or NULL if not recorded)
3147 * @param type type of the result
3148 * @param size number of bytes in data
3149 * @param data pointer to the result data
3152 process_dht_reply (void *cls,
3153 struct GNUNET_TIME_Absolute exp,
3154 const GNUNET_HashCode * key,
3155 const struct GNUNET_PeerIdentity * const *get_path,
3156 const struct GNUNET_PeerIdentity * const *put_path,
3157 enum GNUNET_BLOCK_Type type,
3161 struct PendingRequest *pr = cls;
3162 struct ProcessReplyClosure prq;
3164 memset (&prq, 0, sizeof (prq));
3166 prq.expiration = exp;
3169 process_reply (&prq, key, pr);
3175 * Continuation called to notify client about result of the
3178 * @param cls closure
3179 * @param success GNUNET_SYSERR on failure
3180 * @param msg NULL on success, otherwise an error message
3183 put_migration_continuation (void *cls,
3187 struct GNUNET_TIME_Absolute *start = cls;
3188 struct GNUNET_TIME_Relative delay;
3190 delay = GNUNET_TIME_absolute_get_duration (*start);
3191 GNUNET_free (start);
3192 GNUNET_LOAD_update (datastore_put_load,
3194 if (GNUNET_OK == success)
3196 GNUNET_STATISTICS_update (stats,
3197 gettext_noop ("# datastore 'put' failures"),
3204 * Handle P2P "PUT" message.
3206 * @param cls closure, always NULL
3207 * @param other the other peer involved (sender or receiver, NULL
3208 * for loopback messages where we are both sender and receiver)
3209 * @param message the actual message
3210 * @param latency reported latency of the connection with 'other'
3211 * @param distance reported distance (DV) to 'other'
3212 * @return GNUNET_OK to keep the connection open,
3213 * GNUNET_SYSERR to close it (signal serious error)
3216 handle_p2p_put (void *cls,
3217 const struct GNUNET_PeerIdentity *other,
3218 const struct GNUNET_MessageHeader *message,
3219 struct GNUNET_TIME_Relative latency,
3222 const struct PutMessage *put;
3225 enum GNUNET_BLOCK_Type type;
3226 struct GNUNET_TIME_Absolute expiration;
3227 GNUNET_HashCode query;
3228 struct ProcessReplyClosure prq;
3229 struct GNUNET_TIME_Absolute *start;
3230 struct GNUNET_TIME_Relative block_time;
3232 struct ConnectedPeer *cp;
3233 struct PendingMessage *pm;
3234 struct MigrationStopMessage *msm;
3236 msize = ntohs (message->size);
3237 if (msize < sizeof (struct PutMessage))
3240 return GNUNET_SYSERR;
3242 put = (const struct PutMessage*) message;
3243 dsize = msize - sizeof (struct PutMessage);
3244 type = ntohl (put->type);
3245 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3247 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3248 return GNUNET_SYSERR;
3250 GNUNET_BLOCK_get_key (block_ctx,
3256 GNUNET_break_op (0);
3257 return GNUNET_SYSERR;
3260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3261 "Received result for query `%s' from peer `%4s'\n",
3262 GNUNET_h2s (&query),
3263 GNUNET_i2s (other));
3265 GNUNET_STATISTICS_update (stats,
3266 gettext_noop ("# replies received (overall)"),
3269 /* now, lookup 'query' */
3270 prq.data = (const void*) &put[1];
3272 prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3273 &other->hashPubKey);
3278 prq.expiration = expiration;
3280 prq.finished = GNUNET_NO;
3281 prq.request_found = GNUNET_NO;
3282 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3286 if (prq.sender != NULL)
3288 prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3289 prq.sender->trust += prq.priority;
3291 if ( (GNUNET_YES == active_migration) &&
3292 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3295 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3296 "Replicating result for query `%s' with priority %u\n",
3297 GNUNET_h2s (&query),
3300 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3301 *start = GNUNET_TIME_absolute_get ();
3302 GNUNET_DATASTORE_put (dsh,
3303 0, &query, dsize, &put[1],
3304 type, prq.priority, 1 /* anonymity */,
3306 1 + prq.priority, MAX_DATASTORE_QUEUE,
3307 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3308 &put_migration_continuation,
3311 putl = GNUNET_LOAD_get_load (datastore_put_load);
3312 if ( (GNUNET_NO == prq.request_found) &&
3313 ( (GNUNET_YES != active_migration) ||
3316 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3317 &other->hashPubKey);
3318 if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3319 return GNUNET_OK; /* already blocked */
3320 /* We're too busy; send MigrationStop message! */
3321 if (GNUNET_YES != active_migration)
3322 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3323 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3324 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3325 (unsigned int) (60000 * putl * putl)));
3327 cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3328 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
3329 sizeof (struct MigrationStopMessage));
3330 pm->msize = sizeof (struct MigrationStopMessage);
3331 pm->priority = UINT32_MAX;
3332 msm = (struct MigrationStopMessage*) &pm[1];
3333 msm->header.size = htons (sizeof (struct MigrationStopMessage));
3334 msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3335 msm->duration = GNUNET_TIME_relative_hton (block_time);
3336 add_to_pending_messages_for_peer (cp,
3345 * Handle P2P "MIGRATION_STOP" message.
3347 * @param cls closure, always NULL
3348 * @param other the other peer involved (sender or receiver, NULL
3349 * for loopback messages where we are both sender and receiver)
3350 * @param message the actual message
3351 * @param latency reported latency of the connection with 'other'
3352 * @param distance reported distance (DV) to 'other'
3353 * @return GNUNET_OK to keep the connection open,
3354 * GNUNET_SYSERR to close it (signal serious error)
3357 handle_p2p_migration_stop (void *cls,
3358 const struct GNUNET_PeerIdentity *other,
3359 const struct GNUNET_MessageHeader *message,
3360 struct GNUNET_TIME_Relative latency,
3363 struct ConnectedPeer *cp;
3364 const struct MigrationStopMessage *msm;
3366 msm = (const struct MigrationStopMessage*) message;
3367 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3368 &other->hashPubKey);
3374 cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3380 /* **************************** P2P GET Handling ************************ */
3384 * Closure for 'check_duplicate_request_{peer,client}'.
3386 struct CheckDuplicateRequestClosure
3389 * The new request we should check if it already exists.
3391 const struct PendingRequest *pr;
3394 * Existing request found by the checker, NULL if none.
3396 struct PendingRequest *have;
3401 * Iterator over entries in the 'query_request_map' that
3402 * tries to see if we have the same request pending from
3403 * the same client already.
3405 * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3406 * @param key current key code (query, ignored, must match)
3407 * @param value value in the hash map (a 'struct PendingRequest'
3408 * that already exists)
3409 * @return GNUNET_YES if we should continue to
3410 * iterate (no match yet)
3411 * GNUNET_NO if not (match found).
3414 check_duplicate_request_client (void *cls,
3415 const GNUNET_HashCode * key,
3418 struct CheckDuplicateRequestClosure *cdc = cls;
3419 struct PendingRequest *have = value;
3421 if (have->client_request_list == NULL)
3423 if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3434 * We're processing (local) results for a search request
3435 * from another peer. Pass applicable results to the
3436 * peer and if we are done either clean up (operation
3437 * complete) or forward to other peers (more results possible).
3439 * @param cls our closure (struct LocalGetContext)
3440 * @param key key for the content
3441 * @param size number of bytes in data
3442 * @param data content stored
3443 * @param type type of the content
3444 * @param priority priority of the content
3445 * @param anonymity anonymity-level for the content
3446 * @param expiration expiration time for the content
3447 * @param uid unique identifier for the datum;
3448 * maybe 0 if no unique identifier is available
3451 process_local_reply (void *cls,
3452 const GNUNET_HashCode * key,
3455 enum GNUNET_BLOCK_Type type,
3458 struct GNUNET_TIME_Absolute
3462 struct PendingRequest *pr = cls;
3463 struct ProcessReplyClosure prq;
3464 struct CheckDuplicateRequestClosure cdrc;
3465 GNUNET_HashCode query;
3466 unsigned int old_rf;
3471 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3472 "Done processing local replies, forwarding request to other peers.\n");
3475 if (pr->client_request_list != NULL)
3477 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
3479 /* Figure out if this is a duplicate request and possibly
3480 merge 'struct PendingRequest' entries */
3483 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3485 &check_duplicate_request_client,
3487 if (cdrc.have != NULL)
3490 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3491 "Received request for block `%s' twice from client, will only request once.\n",
3492 GNUNET_h2s (&pr->query));
3495 destroy_pending_request (pr);
3500 /* no more results */
3501 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3502 pr->task = GNUNET_SCHEDULER_add_now (sched,
3503 &forward_request_task,
3508 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3509 "New local response to `%s' of type %u.\n",
3513 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3516 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3517 "Found ONDEMAND block, performing on-demand encoding\n");
3519 GNUNET_STATISTICS_update (stats,
3520 gettext_noop ("# on-demand blocks matched requests"),
3524 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
3525 anonymity, expiration, uid,
3526 &process_local_reply,
3530 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3534 old_rf = pr->results_found;
3535 memset (&prq, 0, sizeof (prq));
3537 prq.expiration = expiration;
3540 GNUNET_BLOCK_get_key (block_ctx,
3547 GNUNET_DATASTORE_remove (dsh,
3551 GNUNET_TIME_UNIT_FOREVER_REL,
3553 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3557 prq.priority = priority;
3558 prq.finished = GNUNET_NO;
3559 prq.request_found = GNUNET_NO;
3560 process_reply (&prq, key, pr);
3561 if ( (old_rf == 0) &&
3562 (pr->results_found == 1) )
3563 update_datastore_delays (pr->start_time);
3564 if (prq.finished == GNUNET_YES)
3567 return; /* done here */
3568 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3570 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3573 if ( (pr->client_request_list == NULL) &&
3574 ( (GNUNET_YES == test_get_load_too_high (0)) ||
3575 (pr->results_found > 5 + 2 * pr->priority) ) )
3578 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3579 "Load too high, done with request\n");
3581 GNUNET_STATISTICS_update (stats,
3582 gettext_noop ("# processing result set cut short due to load"),
3585 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3588 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3593 * We've received a request with the specified priority. Bound it
3594 * according to how much we trust the given peer.
3596 * @param prio_in requested priority
3597 * @param cp the peer making the request
3598 * @return effective priority
3601 bound_priority (uint32_t prio_in,
3602 struct ConnectedPeer *cp)
3604 #define N ((double)128.0)
3609 ld = test_get_load_too_high (0);
3610 if (ld == GNUNET_SYSERR)
3611 return 0; /* excess resources */
3612 ret = change_host_trust (cp, prio_in);
3615 if (ret > current_priorities + N)
3616 rret = current_priorities + N;
3620 = (current_priorities * (N-1) + rret)/N;
3622 if ( (ld == GNUNET_YES) && (ret > 0) )
3624 /* try with charging */
3625 ld = test_get_load_too_high (ret);
3627 if (ld == GNUNET_YES)
3631 change_host_trust (cp, -ret);
3632 return -1; /* not enough resources */
3640 * Iterator over entries in the 'query_request_map' that
3641 * tries to see if we have the same request pending from
3642 * the same peer already.
3644 * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3645 * @param key current key code (query, ignored, must match)
3646 * @param value value in the hash map (a 'struct PendingRequest'
3647 * that already exists)
3648 * @return GNUNET_YES if we should continue to
3649 * iterate (no match yet)
3650 * GNUNET_NO if not (match found).
3653 check_duplicate_request_peer (void *cls,
3654 const GNUNET_HashCode * key,
3657 struct CheckDuplicateRequestClosure *cdc = cls;
3658 struct PendingRequest *have = value;
3660 if (cdc->pr->target_pid == have->target_pid)
3670 * Handle P2P "GET" request.
3672 * @param cls closure, always NULL
3673 * @param other the other peer involved (sender or receiver, NULL
3674 * for loopback messages where we are both sender and receiver)
3675 * @param message the actual message
3676 * @param latency reported latency of the connection with 'other'
3677 * @param distance reported distance (DV) to 'other'
3678 * @return GNUNET_OK to keep the connection open,
3679 * GNUNET_SYSERR to close it (signal serious error)
3682 handle_p2p_get (void *cls,
3683 const struct GNUNET_PeerIdentity *other,
3684 const struct GNUNET_MessageHeader *message,
3685 struct GNUNET_TIME_Relative latency,
3688 struct PendingRequest *pr;
3689 struct ConnectedPeer *cp;
3690 struct ConnectedPeer *cps;
3691 struct CheckDuplicateRequestClosure cdc;
3692 struct GNUNET_TIME_Relative timeout;
3694 const struct GetMessage *gm;
3696 const GNUNET_HashCode *opt;
3699 uint32_t ttl_decrement;
3701 enum GNUNET_BLOCK_Type type;
3704 msize = ntohs(message->size);
3705 if (msize < sizeof (struct GetMessage))
3707 GNUNET_break_op (0);
3708 return GNUNET_SYSERR;
3710 gm = (const struct GetMessage*) message;
3711 type = ntohl (gm->type);
3712 bm = ntohl (gm->hash_bitmap);
3720 if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3722 GNUNET_break_op (0);
3723 return GNUNET_SYSERR;
3725 opt = (const GNUNET_HashCode*) &gm[1];
3726 bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3727 bm = ntohl (gm->hash_bitmap);
3729 cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3730 &other->hashPubKey);
3733 /* peer must have just disconnected */
3734 GNUNET_STATISTICS_update (stats,
3735 gettext_noop ("# requests dropped due to initiator not being connected"),
3738 return GNUNET_SYSERR;
3740 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3741 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3748 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3749 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3750 "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3751 GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3754 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3755 "Failed to find peer `%4s' in connection set. Dropping query.\n",
3756 GNUNET_i2s (other));
3758 GNUNET_STATISTICS_update (stats,
3759 gettext_noop ("# requests dropped due to missing reverse route"),
3762 /* FIXME: try connect? */
3765 /* note that we can really only check load here since otherwise
3766 peers could find out that we are overloaded by not being
3767 disconnected after sending us a malformed query... */
3768 priority = bound_priority (ntohl (gm->priority), cps);
3772 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3773 "Dropping query from `%s', this peer is too busy.\n",
3774 GNUNET_i2s (other));
3776 GNUNET_STATISTICS_update (stats,
3777 gettext_noop ("# requests dropped due to high load"),
3783 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3784 "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3785 GNUNET_h2s (&gm->query),
3786 (unsigned int) type,
3790 have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3791 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
3792 (have_ns ? sizeof(GNUNET_HashCode) : 0));
3795 pr->namespace = (GNUNET_HashCode*) &pr[1];
3796 memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3798 if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3) ||
3799 (GNUNET_LOAD_get_average (cp->transmission_delay) >
3800 GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
3802 /* don't have BW to send to peer, or would likely take longer than we have for it,
3803 so at best indirect the query */
3805 pr->forward_only = GNUNET_YES;
3808 pr->mingle = ntohl (gm->filter_mutator);
3809 if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3810 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3811 pr->anonymity_level = 1;
3812 pr->priority = (uint32_t) priority;
3813 pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3814 pr->query = gm->query;
3815 /* decrement ttl (always) */
3816 ttl_decrement = 2 * TTL_DECREMENT +
3817 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3819 if ( (pr->ttl < 0) &&
3820 (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3823 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3824 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3829 GNUNET_STATISTICS_update (stats,
3830 gettext_noop ("# requests dropped due TTL underflow"),
3833 /* integer underflow => drop (should be very rare)! */
3837 pr->ttl -= ttl_decrement;
3838 pr->start_time = GNUNET_TIME_absolute_get ();
3840 /* get bloom filter */
3843 pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3846 pr->bf_size = bfsize;
3851 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3853 &check_duplicate_request_peer,
3855 if (cdc.have != NULL)
3857 if (cdc.have->start_time.value + cdc.have->ttl >=
3858 pr->start_time.value + pr->ttl)
3860 /* existing request has higher TTL, drop new one! */
3861 cdc.have->priority += pr->priority;
3862 destroy_pending_request (pr);
3864 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3865 "Have existing request with higher TTL, dropping new request.\n",
3866 GNUNET_i2s (other));
3868 GNUNET_STATISTICS_update (stats,
3869 gettext_noop ("# requests dropped due to higher-TTL request"),
3876 /* existing request has lower TTL, drop old one! */
3877 pr->priority += cdc.have->priority;
3878 /* Possible optimization: if we have applicable pending
3879 replies in 'cdc.have', we might want to move those over
3880 (this is a really rare special-case, so it is not clear
3881 that this would be worth it) */
3882 destroy_pending_request (cdc.have);
3883 /* keep processing 'pr'! */
3888 GNUNET_break (GNUNET_OK ==
3889 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3892 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3893 GNUNET_break (GNUNET_OK ==
3894 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3897 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3899 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3901 pr->start_time.value + pr->ttl);
3903 GNUNET_STATISTICS_update (stats,
3904 gettext_noop ("# P2P searches received"),
3907 GNUNET_STATISTICS_update (stats,
3908 gettext_noop ("# P2P searches active"),
3912 /* calculate change in traffic preference */
3913 cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
3914 /* process locally */
3915 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3916 type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3917 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3918 (pr->priority + 1));
3919 if (GNUNET_YES != pr->forward_only)
3920 pr->qe = GNUNET_DATASTORE_get (dsh,
3924 MAX_DATASTORE_QUEUE,
3926 &process_local_reply,
3929 GNUNET_STATISTICS_update (stats,
3930 gettext_noop ("# requests forwarded due to high load"),
3934 /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */
3937 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3938 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3939 /* only one result, wait for datastore */
3940 if (GNUNET_YES != pr->forward_only)
3943 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3944 pr->task = GNUNET_SCHEDULER_add_now (sched,
3945 &forward_request_task,
3949 /* make sure we don't track too many requests */
3950 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3952 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3953 GNUNET_assert (pr != NULL);
3954 destroy_pending_request (pr);
3960 /* **************************** CS GET Handling ************************ */
3964 * Handle START_SEARCH-message (search request from client).
3966 * @param cls closure
3967 * @param client identification of the client
3968 * @param message the actual message
3971 handle_start_search (void *cls,
3972 struct GNUNET_SERVER_Client *client,
3973 const struct GNUNET_MessageHeader *message)
3975 static GNUNET_HashCode all_zeros;
3976 const struct SearchMessage *sm;
3977 struct ClientList *cl;
3978 struct ClientRequestList *crl;
3979 struct PendingRequest *pr;
3982 enum GNUNET_BLOCK_Type type;
3984 msize = ntohs (message->size);
3985 if ( (msize < sizeof (struct SearchMessage)) ||
3986 (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3989 GNUNET_SERVER_receive_done (client,
3993 GNUNET_STATISTICS_update (stats,
3994 gettext_noop ("# client searches received"),
3997 sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3998 sm = (const struct SearchMessage*) message;
3999 type = ntohl (sm->type);
4001 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4002 "Received request for `%s' of type %u from local client\n",
4003 GNUNET_h2s (&sm->query),
4004 (unsigned int) type);
4007 while ( (cl != NULL) &&
4008 (cl->client != client) )
4012 cl = GNUNET_malloc (sizeof (struct ClientList));
4013 cl->client = client;
4014 GNUNET_SERVER_client_keep (client);
4015 cl->next = client_list;
4018 /* detect duplicate KBLOCK requests */
4019 if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4020 (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4021 (type == GNUNET_BLOCK_TYPE_ANY) )
4024 while ( (crl != NULL) &&
4025 ( (0 != memcmp (&crl->req->query,
4027 sizeof (GNUNET_HashCode))) ||
4028 (crl->req->type != type) ) )
4033 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4034 "Have existing request, merging content-seen lists.\n");
4037 /* Duplicate request (used to send long list of
4038 known/blocked results); merge 'pr->replies_seen'
4039 and update bloom filter */
4040 GNUNET_array_grow (pr->replies_seen,
4041 pr->replies_seen_size,
4042 pr->replies_seen_off + sc);
4043 memcpy (&pr->replies_seen[pr->replies_seen_off],
4045 sc * sizeof (GNUNET_HashCode));
4046 pr->replies_seen_off += sc;
4047 refresh_bloomfilter (pr);
4048 GNUNET_STATISTICS_update (stats,
4049 gettext_noop ("# client searches updated (merged content seen list)"),
4052 GNUNET_SERVER_receive_done (client,
4057 GNUNET_STATISTICS_update (stats,
4058 gettext_noop ("# client searches active"),
4061 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
4062 ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4063 crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4064 memset (crl, 0, sizeof (struct ClientRequestList));
4065 crl->client_list = cl;
4066 GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4071 pr->client_request_list = crl;
4072 GNUNET_array_grow (pr->replies_seen,
4073 pr->replies_seen_size,
4075 memcpy (pr->replies_seen,
4077 sc * sizeof (GNUNET_HashCode));
4078 pr->replies_seen_off = sc;
4079 pr->anonymity_level = ntohl (sm->anonymity_level);
4080 pr->start_time = GNUNET_TIME_absolute_get ();
4081 refresh_bloomfilter (pr);
4082 pr->query = sm->query;
4083 if (0 == (1 & ntohl (sm->options)))
4084 pr->local_only = GNUNET_NO;
4086 pr->local_only = GNUNET_YES;
4089 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4090 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4091 if (0 != memcmp (&sm->target,
4093 sizeof (GNUNET_HashCode)))
4094 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4096 case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4097 pr->namespace = (GNUNET_HashCode*) &pr[1];
4098 memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4103 GNUNET_break (GNUNET_OK ==
4104 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4107 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4108 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4109 type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4110 pr->qe = GNUNET_DATASTORE_get (dsh,
4114 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
4115 &process_local_reply,
4120 /* **************************** Startup ************************ */
4123 * Process fs requests.
4125 * @param s scheduler to use
4126 * @param server the initialized server
4127 * @param c configuration to use
4130 main_init (struct GNUNET_SCHEDULER_Handle *s,
4131 struct GNUNET_SERVER_Handle *server,
4132 const struct GNUNET_CONFIGURATION_Handle *c)
4134 static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4137 GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4139 GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4140 { &handle_p2p_migration_stop,
4141 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4142 sizeof (struct MigrationStopMessage) },
4145 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4146 {&GNUNET_FS_handle_index_start, NULL,
4147 GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4148 {&GNUNET_FS_handle_index_list_get, NULL,
4149 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4150 {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
4151 sizeof (struct UnindexMessage) },
4152 {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
4156 unsigned long long enc = 128;
4160 stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4161 min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4163 GNUNET_CONFIGURATION_get_value_number (cfg,
4165 "MAX_PENDING_REQUESTS",
4166 &max_pending_requests)) ||
4168 GNUNET_CONFIGURATION_get_value_number (cfg,
4170 "EXPECTED_NEIGHBOUR_COUNT",
4173 GNUNET_CONFIGURATION_get_value_time (cfg,
4175 "MIN_MIGRATION_DELAY",
4176 &min_migration_delay)) )
4178 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4179 _("Configuration fails to specify certain parameters, assuming default values."));
4181 connected_peers = GNUNET_CONTAINER_multihashmap_create (enc);
4182 query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4183 rt_entry_lifetime = GNUNET_LOAD_value_init ();
4184 peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4185 requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4186 core = GNUNET_CORE_connect (sched,
4188 GNUNET_TIME_UNIT_FOREVER_REL,
4191 &peer_connect_handler,
4192 &peer_disconnect_handler,
4199 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4200 _("Failed to connect to `%s' service.\n"),
4202 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4203 connected_peers = NULL;
4204 GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4205 query_request_map = NULL;
4206 GNUNET_LOAD_value_free (rt_entry_lifetime);
4207 rt_entry_lifetime = NULL;
4208 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4209 requests_by_expiration_heap = NULL;
4210 GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4211 peer_request_map = NULL;
4214 GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4217 return GNUNET_SYSERR;
4219 /* FIXME: distinguish between sending and storing in options? */
4220 if (active_migration)
4222 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4223 _("Content migration is enabled, will start to gather data\n"));
4224 consider_migration_gathering ();
4226 consider_dht_put_gathering (NULL);
4227 GNUNET_SERVER_disconnect_notify (server,
4228 &handle_client_disconnect,
4230 GNUNET_assert (GNUNET_OK ==
4231 GNUNET_CONFIGURATION_get_value_filename (cfg,
4235 GNUNET_DISK_directory_create (trustDirectory);
4236 GNUNET_SCHEDULER_add_with_priority (sched,
4237 GNUNET_SCHEDULER_PRIORITY_HIGH,
4238 &cron_flush_trust, NULL);
4241 GNUNET_SERVER_add_handlers (server, handlers);
4242 GNUNET_SCHEDULER_add_delayed (sched,
4243 GNUNET_TIME_UNIT_FOREVER_REL,
4251 * Process fs requests.
4253 * @param cls closure
4254 * @param sched scheduler to use
4255 * @param server the initialized server
4256 * @param cfg configuration to use
4260 struct GNUNET_SCHEDULER_Handle *sched,
4261 struct GNUNET_SERVER_Handle *server,
4262 const struct GNUNET_CONFIGURATION_Handle *cfg)
4264 active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4267 dsh = GNUNET_DATASTORE_connect (cfg,
4271 GNUNET_SCHEDULER_shutdown (sched);
4274 datastore_get_load = GNUNET_LOAD_value_init ();
4275 datastore_put_load = GNUNET_LOAD_value_init ();
4276 block_cfg = GNUNET_CONFIGURATION_create ();
4277 GNUNET_CONFIGURATION_set_value_string (block_cfg,
4281 block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4282 GNUNET_assert (NULL != block_ctx);
4283 dht_handle = GNUNET_DHT_connect (sched,
4286 if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4287 (GNUNET_OK != main_init (sched, server, cfg)) )
4289 GNUNET_SCHEDULER_shutdown (sched);
4290 GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4292 GNUNET_DHT_disconnect (dht_handle);
4294 GNUNET_BLOCK_context_destroy (block_ctx);
4296 GNUNET_CONFIGURATION_destroy (block_cfg);
4298 GNUNET_LOAD_value_free (datastore_get_load);
4299 datastore_get_load = NULL;
4300 GNUNET_LOAD_value_free (datastore_put_load);
4301 datastore_put_load = NULL;
4308 * The main function for the fs service.
4310 * @param argc number of arguments from the command line
4311 * @param argv command line arguments
4312 * @return 0 ok, 1 on error
4315 main (int argc, char *const *argv)
4317 return (GNUNET_OK ==
4318 GNUNET_SERVICE_run (argc,
4321 GNUNET_SERVICE_OPTION_NONE,
4322 &run, NULL)) ? 0 : 1;
4325 /* end of gnunet-service-fs.c */