2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs.c
23 * @brief gnunet anonymity protocol implementation
24 * @author Christian Grothoff
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
30 #include "gnunet_dht_service.h"
31 #include "gnunet_datastore_service.h"
32 #include "gnunet_load_lib.h"
33 #include "gnunet_peer_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_signatures.h"
36 #include "gnunet_statistics_service.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_util_lib.h"
39 #include "gnunet-service-fs_indexing.h"
42 #define DEBUG_FS GNUNET_NO
45 * Should we introduce random latency in processing? Required for proper
46 * implementation of GAP, but can be disabled for performance evaluation of
47 * the basic routing algorithm.
49 * Note that with delays enabled, performance can be significantly lower
50 * (several orders of magnitude in 2-peer test runs); if you want to
51 * measure throughput of other components, set this to NO. Also, you
52 * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
53 * a rather wasteful mode of operation (that might still get the highest
54 * throughput overall).
56 * Performance measurements (for 50 MB file, 2 peers):
58 * - Without delays: 3300 kb/s
59 * - With delays: 101 kb/s
61 #define SUPPORT_DELAYS GNUNET_NO
64 * Size for the hash map for DHT requests from the FS
65 * service. Should be about the number of concurrent
66 * DHT requests we plan to make.
68 #define FS_DHT_HT_SIZE 1024
71 * At what frequency should our datastore load decrease
72 * automatically (since if we don't use it, clearly the
73 * load must be going down).
75 #define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
78 * How often do we flush trust values to disk?
80 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
83 * How quickly do we age cover traffic? At the given
84 * time interval, remaining cover traffic counters are
85 * decremented by 1/16th.
87 #define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
90 * How often do we at most PUT content into the DHT?
92 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
95 * Inverse of the probability that we will submit the same query
96 * to the same peer again. If the same peer already got the query
97 * repeatedly recently, the probability is multiplied by the inverse
98 * of this number each time. Note that we only try about every TTL_DECREMENT/2
99 * plus MAX_CORK_DELAY (so roughly every 3.5s).
101 * Note that this factor is a key influence to performance in small
102 * networks (especially test networks of 2 peers) because if there is
103 * only a single peer with the data, this value will determine how
104 * soon we might re-try. For example, a value of 3 can result in
105 * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
106 * give us 5 MB/s. OTOH, obviously re-trying the same peer can be
107 * rather inefficient in larger networks, hence picking 1 is in
108 * general not the best choice.
110 * Performance measurements (for 50 MB file, 2 peers, no delays):
112 * - 1: 3300 kb/s (consistently)
113 * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s
114 * - 5: 759 kb/s, 968 kb/s, 1160 kb/s
116 * Note that this does NOT mean that the value should be 1 since
117 * a 2-peer network is far from representative here (and this fails
118 * to take into consideration bandwidth wasted by repeatedly
119 * sending queries to peers that don't have the content). Also,
120 * it is expected that higher values lead to more inconsistent
121 * measurements since this only affects lost messages towards the
122 * end of the download.
124 * Finally, we should probably consider changing this and making
125 * it dependent on the number of connected peers or a related
126 * metric (bad magic constants...).
128 #define RETRY_PROBABILITY_INV 1
131 * What is the maximum delay for a P2P FS message (in our interaction
132 * with core)? FS-internal delays are another story. The value is
133 * chosen based on the 32k block size. Given that peers typcially
134 * have at least 1 kb/s bandwidth, 45s waits give us a chance to
135 * transmit one message even to the lowest-bandwidth peers.
137 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
140 * Maximum number of requests (from other peers, overall) that we're
141 * willing to have pending at any given point in time. Can be changed
142 * via the configuration file (32k is just the default).
144 static unsigned long long max_pending_requests = (32 * 1024);
147 * Information we keep for each pending reply. The
148 * actual message follows at the end of this struct.
150 struct PendingMessage;
153 * Function called upon completion of a transmission.
156 * @param pid ID of receiving peer, 0 on transmission error
158 typedef void (*TransmissionContinuation)(void * cls,
159 GNUNET_PEER_Id tpid);
163 * Information we keep for each pending message (GET/PUT). The
164 * actual message follows at the end of this struct.
166 struct PendingMessage
169 * This is a doubly-linked list of messages to the same peer.
171 struct PendingMessage *next;
174 * This is a doubly-linked list of messages to the same peer.
176 struct PendingMessage *prev;
179 * Entry in pending message list for this pending message.
181 struct PendingMessageList *pml;
184 * Function to call immediately once we have transmitted this
187 TransmissionContinuation cont;
195 * Do not transmit this pending message until this deadline.
197 struct GNUNET_TIME_Absolute delay_until;
200 * Size of the reply; actual reply message follows
201 * at the end of this struct.
206 * How important is this message for us?
214 * Information about a peer that we are connected to.
215 * We track data that is useful for determining which
216 * peers should receive our requests. We also keep
217 * a list of messages to transmit to this peer.
223 * List of the last clients for which this peer successfully
226 struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
229 * List of the last PIDs for which
230 * this peer successfully answered a query;
231 * We use 0 to indicate no successful reply.
233 GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
236 * Average delay between sending the peer a request and
237 * getting a reply (only calculated over the requests for
238 * which we actually got a reply). Calculated
239 * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
241 struct GNUNET_TIME_Relative avg_delay;
244 * Point in time until which this peer does not want us to migrate content
247 struct GNUNET_TIME_Absolute migration_blocked;
250 * Time until when we blocked this peer from migrating
253 struct GNUNET_TIME_Absolute last_migration_block;
256 * Transmission times for the last MAX_QUEUE_PER_PEER
257 * requests for this peer. Used as a ring buffer, current
258 * offset is stored in 'last_request_times_off'. If the
259 * oldest entry is more recent than the 'avg_delay', we should
260 * not send any more requests right now.
262 struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
265 * Handle for an active request for transmission to this
268 struct GNUNET_CORE_TransmitHandle *cth;
271 * Messages (replies, queries, content migration) we would like to
272 * send to this peer in the near future. Sorted by priority, head.
274 struct PendingMessage *pending_messages_head;
277 * Messages (replies, queries, content migration) we would like to
278 * send to this peer in the near future. Sorted by priority, tail.
280 struct PendingMessage *pending_messages_tail;
283 * How long does it typically take for us to transmit a message
284 * to this peer? (delay between the request being issued and
285 * the callback being invoked).
287 struct GNUNET_LOAD_Value *transmission_delay;
290 * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
292 struct GNUNET_CORE_InformationRequestContext *irc;
295 * Request for which 'irc' is currently active (or NULL).
297 struct PendingRequest *pr;
300 * Time when the last transmission request was issued.
302 struct GNUNET_TIME_Absolute last_transmission_request_start;
305 * ID of delay task for scheduling transmission.
307 GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
310 * Average priority of successful replies. Calculated
311 * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
316 * Increase in traffic preference still to be submitted
317 * to the core service for this peer.
319 uint64_t inc_preference;
322 * Trust rating for this peer
327 * Trust rating for this peer on disk.
332 * The peer's identity.
337 * Size of the linked list of 'pending_messages'.
339 unsigned int pending_requests;
342 * Which offset in "last_p2p_replies" will be updated next?
343 * (we go round-robin).
345 unsigned int last_p2p_replies_woff;
348 * Which offset in "last_client_replies" will be updated next?
349 * (we go round-robin).
351 unsigned int last_client_replies_woff;
354 * Current offset into 'last_request_times' ring buffer.
356 unsigned int last_request_times_off;
362 * Information we keep for each pending request. We should try to
363 * keep this struct as small as possible since its memory consumption
364 * is key to how many requests we can have pending at once.
366 struct PendingRequest;
370 * Doubly-linked list of requests we are performing
371 * on behalf of the same client.
373 struct ClientRequestList
377 * This is a doubly-linked list.
379 struct ClientRequestList *next;
382 * This is a doubly-linked list.
384 struct ClientRequestList *prev;
387 * Request this entry represents.
389 struct PendingRequest *req;
392 * Client list this request belongs to.
394 struct ClientList *client_list;
400 * Replies to be transmitted to the client. The actual
401 * response message is allocated after this struct.
403 struct ClientResponseMessage
406 * This is a doubly-linked list.
408 struct ClientResponseMessage *next;
411 * This is a doubly-linked list.
413 struct ClientResponseMessage *prev;
416 * Client list entry this response belongs to.
418 struct ClientList *client_list;
421 * Number of bytes in the response.
428 * Linked list of clients we are performing requests
434 * This is a linked list.
436 struct ClientList *next;
439 * ID of a client making a request, NULL if this entry is for a
442 struct GNUNET_SERVER_Client *client;
445 * Head of list of requests performed on behalf
446 * of this client right now.
448 struct ClientRequestList *rl_head;
451 * Tail of list of requests performed on behalf
452 * of this client right now.
454 struct ClientRequestList *rl_tail;
457 * Head of linked list of responses.
459 struct ClientResponseMessage *res_head;
462 * Tail of linked list of responses.
464 struct ClientResponseMessage *res_tail;
467 * Context for sending replies.
469 struct GNUNET_CONNECTION_TransmitHandle *th;
475 * Information about a peer that we have forwarded this
476 * request to already.
478 struct UsedTargetEntry
481 * What was the last time we have transmitted this request to this
484 struct GNUNET_TIME_Absolute last_request_time;
487 * How often have we transmitted this request to this peer?
489 unsigned int num_requests;
492 * PID of the target peer.
500 * Doubly-linked list of messages we are performing
501 * due to a pending request.
503 struct PendingMessageList
507 * This is a doubly-linked list of messages on behalf of the same request.
509 struct PendingMessageList *next;
512 * This is a doubly-linked list of messages on behalf of the same request.
514 struct PendingMessageList *prev;
517 * Message this entry represents.
519 struct PendingMessage *pm;
522 * Request this entry belongs to.
524 struct PendingRequest *req;
527 * Peer this message is targeted for.
529 struct ConnectedPeer *target;
535 * Information we keep for each pending request. We should try to
536 * keep this struct as small as possible since its memory consumption
537 * is key to how many requests we can have pending at once.
539 struct PendingRequest
543 * If this request was made by a client, this is our entry in the
544 * client request list; otherwise NULL.
546 struct ClientRequestList *client_request_list;
549 * Entry of peer responsible for this entry (if this request
550 * was made by a peer).
552 struct ConnectedPeer *cp;
555 * If this is a namespace query, pointer to the hash of the public
556 * key of the namespace; otherwise NULL. Pointer will be to the
557 * end of this struct (so no need to free it).
559 const GNUNET_HashCode *namespace;
562 * Bloomfilter we use to filter out replies that we don't care about
563 * (anymore). NULL as long as we are interested in all replies.
565 struct GNUNET_CONTAINER_BloomFilter *bf;
568 * Reference to DHT get operation for this request (or NULL).
570 struct GNUNET_DHT_GetHandle *dht_get;
573 * Context of our GNUNET_CORE_peer_change_preference call.
575 struct ConnectedPeer *pirc;
578 * Hash code of all replies that we have seen so far (only valid
579 * if client is not NULL since we only track replies like this for
582 GNUNET_HashCode *replies_seen;
585 * Node in the heap representing this entry; NULL
586 * if we have no heap node.
588 struct GNUNET_CONTAINER_HeapNode *hnode;
591 * Head of list of messages being performed on behalf of this
594 struct PendingMessageList *pending_head;
597 * Tail of list of messages being performed on behalf of this
600 struct PendingMessageList *pending_tail;
603 * When did we first see this request (form this peer), or, if our
604 * client is initiating, when did we last initiate a search?
606 struct GNUNET_TIME_Absolute start_time;
609 * The query that this request is for.
611 GNUNET_HashCode query;
614 * The task responsible for transmitting queries
617 GNUNET_SCHEDULER_TaskIdentifier task;
620 * (Interned) Peer identifier that identifies a preferred target
623 GNUNET_PEER_Id target_pid;
626 * (Interned) Peer identifiers of peers that have already
627 * received our query for this content.
629 struct UsedTargetEntry *used_targets;
632 * Our entry in the queue (non-NULL while we wait for our
633 * turn to interact with the local database).
635 struct GNUNET_DATASTORE_QueueEntry *qe;
638 * Size of the 'bf' (in bytes).
643 * Desired anonymity level; only valid for requests from a local client.
645 uint32_t anonymity_level;
648 * How many entries in "used_targets" are actually valid?
650 unsigned int used_targets_off;
653 * How long is the "used_targets" array?
655 unsigned int used_targets_size;
658 * Number of results found for this request.
660 unsigned int results_found;
663 * How many entries in "replies_seen" are actually valid?
665 unsigned int replies_seen_off;
668 * How long is the "replies_seen" array?
670 unsigned int replies_seen_size;
673 * Priority with which this request was made. If one of our clients
674 * made the request, then this is the current priority that we are
675 * using when initiating the request. This value is used when
676 * we decide to reward other peers with trust for providing a reply.
681 * Priority points left for us to spend when forwarding this request
684 uint32_t remaining_priority;
687 * Number to mingle hashes for bloom-filter tests with.
692 * TTL with which we saw this request (or, if we initiated, TTL that
693 * we used for the request).
698 * Type of the content that this request is for.
700 enum GNUNET_BLOCK_Type type;
703 * Remove this request after transmission of the current response.
708 * GNUNET_YES if we should not forward this request to other peers.
713 * GNUNET_YES if we should not forward this request to other peers.
721 * Block that is ready for migration to other peers. Actual data is at the end of the block.
723 struct MigrationReadyBlock
727 * This is a doubly-linked list.
729 struct MigrationReadyBlock *next;
732 * This is a doubly-linked list.
734 struct MigrationReadyBlock *prev;
737 * Query for the block.
739 GNUNET_HashCode query;
742 * When does this block expire?
744 struct GNUNET_TIME_Absolute expiration;
747 * Peers we would consider forwarding this
748 * block to. Zero for empty entries.
750 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
758 * Number of targets already used.
760 unsigned int used_targets;
765 enum GNUNET_BLOCK_Type type;
769 * Identity of this peer.
771 static struct GNUNET_PeerIdentity my_id;
774 * Our connection to the datastore.
776 static struct GNUNET_DATASTORE_Handle *dsh;
781 static struct GNUNET_BLOCK_Context *block_ctx;
784 * Our block configuration.
786 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
791 static const struct GNUNET_CONFIGURATION_Handle *cfg;
794 * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
796 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
799 * Map of peer identifiers to "struct PendingRequest" (for that peer).
801 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
804 * Map of query identifiers to "struct PendingRequest" (for that query).
806 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
809 * Heap with the request that will expire next at the top. Contains
810 * pointers of type "struct PendingRequest*"; these will *also* be
811 * aliased from the "requests_by_peer" data structures and the
812 * "requests_by_query" table. Note that requests from our clients
813 * don't expire and are thus NOT in the "requests_by_expiration"
814 * (or the "requests_by_peer" tables).
816 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
819 * Handle for reporting statistics.
821 static struct GNUNET_STATISTICS_Handle *stats;
824 * Linked list of clients we are currently processing requests for.
826 static struct ClientList *client_list;
829 * Pointer to handle to the core service (points to NULL until we've
832 static struct GNUNET_CORE_Handle *core;
835 * Head of linked list of blocks that can be migrated.
837 static struct MigrationReadyBlock *mig_head;
840 * Tail of linked list of blocks that can be migrated.
842 static struct MigrationReadyBlock *mig_tail;
845 * Request to datastore for migration (or NULL).
847 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
850 * Request to datastore for DHT PUTs (or NULL).
852 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
855 * Type we will request for the next DHT PUT round from the datastore.
857 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
860 * Where do we store trust information?
862 static char *trustDirectory;
865 * ID of task that collects blocks for migration.
867 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
870 * ID of task that collects blocks for DHT PUTs.
872 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
875 * What is the maximum frequency at which we are allowed to
876 * poll the datastore for migration content?
878 static struct GNUNET_TIME_Relative min_migration_delay;
881 * Handle for DHT operations.
883 static struct GNUNET_DHT_Handle *dht_handle;
886 * Size of the doubly-linked list of migration blocks.
888 static unsigned int mig_size;
891 * Are we allowed to migrate content to this peer.
893 static int active_to_migration;
896 * Are we allowed to push out content from this peer.
898 static int active_from_migration;
901 * How many entires with zero anonymity do we currently estimate
902 * to have in the database?
904 static unsigned int zero_anonymity_count_estimate;
907 * Typical priorities we're seeing from other peers right now. Since
908 * most priorities will be zero, this value is the weighted average of
909 * non-zero priorities seen "recently". In order to ensure that new
910 * values do not dramatically change the ratio, values are first
911 * "capped" to a reasonable range (+N of the current value) and then
912 * averaged into the existing value by a ratio of 1:N. Hence
913 * receiving the largest possible priority can still only raise our
914 * "current_priorities" by at most 1.
916 static double current_priorities;
919 * Datastore 'GET' load tracking.
921 static struct GNUNET_LOAD_Value *datastore_get_load;
924 * Datastore 'PUT' load tracking.
926 static struct GNUNET_LOAD_Value *datastore_put_load;
929 * How long do requests typically stay in the routing table?
931 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
934 * How many query messages have we received 'recently' that
935 * have not yet been claimed as cover traffic?
937 static unsigned int cover_query_count;
940 * How many content messages have we received 'recently' that
941 * have not yet been claimed as cover traffic?
943 static unsigned int cover_content_count;
946 * ID of our task that we use to age the cover counters.
948 static GNUNET_SCHEDULER_TaskIdentifier cover_age_task;
951 age_cover_counters (void *cls,
952 const struct GNUNET_SCHEDULER_TaskContext *tc)
954 cover_content_count = (cover_content_count * 15) / 16;
955 cover_query_count = (cover_query_count * 15) / 16;
956 cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
962 * We've just now completed a datastore request. Update our
963 * datastore load calculations.
965 * @param start time when the datastore request was issued
968 update_datastore_delays (struct GNUNET_TIME_Absolute start)
970 struct GNUNET_TIME_Relative delay;
972 delay = GNUNET_TIME_absolute_get_duration (start);
973 GNUNET_LOAD_update (datastore_get_load,
979 * Get the filename under which we would store the GNUNET_HELLO_Message
980 * for the given host and protocol.
981 * @return filename of the form DIRECTORY/HOSTID
984 get_trust_filename (const struct GNUNET_PeerIdentity *id)
986 struct GNUNET_CRYPTO_HashAsciiEncoded fil;
989 GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
990 GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
997 * Transmit messages by copying it to the target buffer
998 * "buf". "buf" will be NULL and "size" zero if the socket was closed
999 * for writing in the meantime. In that case, do nothing
1000 * (the disconnect or shutdown handler will take care of the rest).
1001 * If we were able to transmit messages and there are still more
1002 * pending, ask core again for further calls to this function.
1004 * @param cls closure, pointer to the 'struct ConnectedPeer*'
1005 * @param size number of bytes available in buf
1006 * @param buf where the callee should write the message
1007 * @return number of bytes written to buf
1010 transmit_to_peer (void *cls,
1011 size_t size, void *buf);
1014 /* ******************* clean up functions ************************ */
1017 * Delete the given migration block.
1019 * @param mb block to delete
1022 delete_migration_block (struct MigrationReadyBlock *mb)
1024 GNUNET_CONTAINER_DLL_remove (mig_head,
1027 GNUNET_PEER_decrement_rcs (mb->target_list,
1028 MIGRATION_LIST_SIZE);
1035 * Compare the distance of two peers to a key.
1038 * @param p1 first peer
1039 * @param p2 second peer
1040 * @return GNUNET_YES if P1 is closer to key than P2
1043 is_closer (const GNUNET_HashCode *key,
1044 const struct GNUNET_PeerIdentity *p1,
1045 const struct GNUNET_PeerIdentity *p2)
1047 return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
1054 * Consider migrating content to a given peer.
1056 * @param cls 'struct MigrationReadyBlock*' to select
1057 * targets for (or NULL for none)
1058 * @param key ID of the peer
1059 * @param value 'struct ConnectedPeer' of the peer
1060 * @return GNUNET_YES (always continue iteration)
1063 consider_migration (void *cls,
1064 const GNUNET_HashCode *key,
1067 struct MigrationReadyBlock *mb = cls;
1068 struct ConnectedPeer *cp = value;
1069 struct MigrationReadyBlock *pos;
1070 struct GNUNET_PeerIdentity cppid;
1071 struct GNUNET_PeerIdentity otherpid;
1072 struct GNUNET_PeerIdentity worstpid;
1077 /* consider 'cp' as a migration target for mb */
1078 if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
1079 return GNUNET_YES; /* peer has requested no migration! */
1082 GNUNET_PEER_resolve (cp->pid,
1084 repl = MIGRATION_LIST_SIZE;
1085 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1087 if (mb->target_list[i] == 0)
1089 mb->target_list[i] = cp->pid;
1090 GNUNET_PEER_change_rc (mb->target_list[i], 1);
1091 repl = MIGRATION_LIST_SIZE;
1094 GNUNET_PEER_resolve (mb->target_list[i],
1096 if ( (repl == MIGRATION_LIST_SIZE) &&
1097 is_closer (&mb->query,
1102 worstpid = otherpid;
1104 else if ( (repl != MIGRATION_LIST_SIZE) &&
1105 (is_closer (&mb->query,
1110 worstpid = otherpid;
1113 if (repl != MIGRATION_LIST_SIZE)
1115 GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1116 mb->target_list[repl] = cp->pid;
1117 GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1121 /* consider scheduling transmission to cp for content migration */
1122 if (cp->cth != NULL)
1128 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1130 if (cp->pid == pos->target_list[i])
1135 msize = GNUNET_MIN (msize,
1143 return GNUNET_YES; /* no content available */
1145 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1146 "Trying to migrate at least %u bytes to peer `%s'\n",
1150 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1152 GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1153 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1156 = GNUNET_CORE_notify_transmit_ready (core,
1157 0, GNUNET_TIME_UNIT_FOREVER_REL,
1158 (const struct GNUNET_PeerIdentity*) key,
1159 msize + sizeof (struct PutMessage),
1167 * Task that is run periodically to obtain blocks for content
1171 * @param tc scheduler context (also unused)
1174 gather_migration_blocks (void *cls,
1175 const struct GNUNET_SCHEDULER_TaskContext *tc);
1181 * Task that is run periodically to obtain blocks for DHT PUTs.
1183 * @param cls type of blocks to gather
1184 * @param tc scheduler context (unused)
1187 gather_dht_put_blocks (void *cls,
1188 const struct GNUNET_SCHEDULER_TaskContext *tc);
1192 * If the migration task is not currently running, consider
1193 * (re)scheduling it with the appropriate delay.
1196 consider_migration_gathering ()
1198 struct GNUNET_TIME_Relative delay;
1204 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1206 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1208 delay = GNUNET_TIME_relative_divide (delay,
1209 MAX_MIGRATION_QUEUE);
1210 delay = GNUNET_TIME_relative_max (delay,
1211 min_migration_delay);
1212 mig_task = GNUNET_SCHEDULER_add_delayed (delay,
1213 &gather_migration_blocks,
1219 * If the DHT PUT gathering task is not currently running, consider
1220 * (re)scheduling it with the appropriate delay.
1223 consider_dht_put_gathering (void *cls)
1225 struct GNUNET_TIME_Relative delay;
1231 if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1233 if (zero_anonymity_count_estimate > 0)
1235 delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1236 zero_anonymity_count_estimate);
1237 delay = GNUNET_TIME_relative_min (delay,
1242 /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1243 (hopefully) appear */
1244 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1246 dht_task = GNUNET_SCHEDULER_add_delayed (delay,
1247 &gather_dht_put_blocks,
1253 * Process content offered for migration.
1255 * @param cls closure
1256 * @param key key for the content
1257 * @param size number of bytes in data
1258 * @param data content stored
1259 * @param type type of the content
1260 * @param priority priority of the content
1261 * @param anonymity anonymity-level for the content
1262 * @param expiration expiration time for the content
1263 * @param uid unique identifier for the datum;
1264 * maybe 0 if no unique identifier is available
1267 process_migration_content (void *cls,
1268 const GNUNET_HashCode * key,
1271 enum GNUNET_BLOCK_Type type,
1274 struct GNUNET_TIME_Absolute
1275 expiration, uint64_t uid)
1277 struct MigrationReadyBlock *mb;
1282 if (mig_size < MAX_MIGRATION_QUEUE)
1283 consider_migration_gathering ();
1286 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1289 GNUNET_FS_handle_on_demand_block (key, size, data,
1290 type, priority, anonymity,
1292 &process_migration_content,
1295 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1300 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1301 "Retrieved block `%s' of type %u for migration\n",
1305 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1307 mb->expiration = expiration;
1310 memcpy (&mb[1], data, size);
1311 GNUNET_CONTAINER_DLL_insert_after (mig_head,
1316 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1317 &consider_migration,
1319 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1324 * Function called upon completion of the DHT PUT operation.
1327 dht_put_continuation (void *cls,
1328 const struct GNUNET_SCHEDULER_TaskContext *tc)
1330 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1335 * Store content in DHT.
1337 * @param cls closure
1338 * @param key key for the content
1339 * @param size number of bytes in data
1340 * @param data content stored
1341 * @param type type of the content
1342 * @param priority priority of the content
1343 * @param anonymity anonymity-level for the content
1344 * @param expiration expiration time for the content
1345 * @param uid unique identifier for the datum;
1346 * maybe 0 if no unique identifier is available
1349 process_dht_put_content (void *cls,
1350 const GNUNET_HashCode * key,
1353 enum GNUNET_BLOCK_Type type,
1356 struct GNUNET_TIME_Absolute
1357 expiration, uint64_t uid)
1359 static unsigned int counter;
1360 static GNUNET_HashCode last_vhash;
1361 static GNUNET_HashCode vhash;
1366 consider_dht_put_gathering (cls);
1369 /* slightly funky code to estimate the total number of values with zero
1370 anonymity from the maximum observed length of a monotonically increasing
1371 sequence of hashes over the contents */
1372 GNUNET_CRYPTO_hash (data, size, &vhash);
1373 if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1375 if (zero_anonymity_count_estimate > 0)
1376 zero_anonymity_count_estimate /= 2;
1382 if (zero_anonymity_count_estimate < (1 << counter))
1383 zero_anonymity_count_estimate = (1 << counter);
1385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386 "Retrieved block `%s' of type %u for DHT PUT\n",
1390 GNUNET_DHT_put (dht_handle,
1392 DEFAULT_PUT_REPLICATION,
1398 GNUNET_TIME_UNIT_FOREVER_REL,
1399 &dht_put_continuation,
1405 * Task that is run periodically to obtain blocks for content
1409 * @param tc scheduler context (also unused)
1412 gather_migration_blocks (void *cls,
1413 const struct GNUNET_SCHEDULER_TaskContext *tc)
1415 mig_task = GNUNET_SCHEDULER_NO_TASK;
1418 mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1419 GNUNET_TIME_UNIT_FOREVER_REL,
1420 &process_migration_content, NULL);
1421 GNUNET_assert (mig_qe != NULL);
1427 * Task that is run periodically to obtain blocks for DHT PUTs.
1429 * @param cls type of blocks to gather
1430 * @param tc scheduler context (unused)
1433 gather_dht_put_blocks (void *cls,
1434 const struct GNUNET_SCHEDULER_TaskContext *tc)
1436 dht_task = GNUNET_SCHEDULER_NO_TASK;
1439 if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1440 dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1441 dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1442 GNUNET_TIME_UNIT_FOREVER_REL,
1444 &process_dht_put_content, NULL);
1445 GNUNET_assert (dht_qe != NULL);
1451 * We're done with a particular message list entry.
1452 * Free all associated resources.
1454 * @param pml entry to destroy
1457 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1459 GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1460 pml->req->pending_tail,
1462 GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1463 pml->target->pending_messages_tail,
1465 pml->target->pending_requests--;
1466 GNUNET_free (pml->pm);
1472 * Destroy the given pending message (and call the respective
1475 * @param pm message to destroy
1476 * @param tpid id of peer that the message was delivered to, or 0 for none
1479 destroy_pending_message (struct PendingMessage *pm,
1480 GNUNET_PEER_Id tpid)
1482 struct PendingMessageList *pml = pm->pml;
1483 TransmissionContinuation cont;
1487 cont_cls = pm->cont_cls;
1490 GNUNET_assert (pml->pm == pm);
1491 GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1492 destroy_pending_message_list_entry (pml);
1499 cont (cont_cls, tpid);
1504 * We're done processing a particular request.
1505 * Free all associated resources.
1507 * @param pr request to destroy
1510 destroy_pending_request (struct PendingRequest *pr)
1512 struct GNUNET_PeerIdentity pid;
1515 if (pr->hnode != NULL)
1517 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1521 if (NULL == pr->client_request_list)
1523 GNUNET_STATISTICS_update (stats,
1524 gettext_noop ("# P2P searches active"),
1530 GNUNET_STATISTICS_update (stats,
1531 gettext_noop ("# client searches active"),
1536 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1540 GNUNET_LOAD_update (rt_entry_lifetime,
1541 GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
1545 GNUNET_DATASTORE_cancel (pr->qe);
1548 if (pr->dht_get != NULL)
1550 GNUNET_DHT_get_stop (pr->dht_get);
1553 if (pr->client_request_list != NULL)
1555 GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1556 pr->client_request_list->client_list->rl_tail,
1557 pr->client_request_list);
1558 GNUNET_free (pr->client_request_list);
1559 pr->client_request_list = NULL;
1563 GNUNET_PEER_resolve (pr->cp->pid,
1565 (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1572 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1575 if (pr->pirc != NULL)
1577 GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc);
1578 pr->pirc->irc = NULL;
1581 if (pr->replies_seen != NULL)
1583 GNUNET_free (pr->replies_seen);
1584 pr->replies_seen = NULL;
1586 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1588 GNUNET_SCHEDULER_cancel (pr->task);
1589 pr->task = GNUNET_SCHEDULER_NO_TASK;
1591 while (NULL != pr->pending_head)
1592 destroy_pending_message_list_entry (pr->pending_head);
1593 GNUNET_PEER_change_rc (pr->target_pid, -1);
1594 if (pr->used_targets != NULL)
1596 for (i=0;i<pr->used_targets_off;i++)
1597 GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1598 GNUNET_free (pr->used_targets);
1599 pr->used_targets_off = 0;
1600 pr->used_targets_size = 0;
1601 pr->used_targets = NULL;
1608 * Find latency information in 'atsi'.
1610 * @param atsi performance data
1611 * @return connection latency
1613 static struct GNUNET_TIME_Relative
1614 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1617 return GNUNET_TIME_UNIT_SECONDS;
1618 while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
1619 (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
1621 if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR)
1624 /* how can we not have latency data? */
1625 return GNUNET_TIME_UNIT_SECONDS;
1627 return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1628 ntohl (atsi->value));
1633 * Method called whenever a given peer connects.
1635 * @param cls closure, not used
1636 * @param peer peer identity this notification is about
1637 * @param atsi performance information
1640 peer_connect_handler (void *cls,
1642 GNUNET_PeerIdentity * peer,
1643 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1645 struct ConnectedPeer *cp;
1646 struct MigrationReadyBlock *pos;
1649 struct GNUNET_TIME_Relative latency;
1651 if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity)))
1653 latency = get_latency (atsi);
1654 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1661 cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1662 cp->transmission_delay = GNUNET_LOAD_value_init (latency);
1663 cp->pid = GNUNET_PEER_intern (peer);
1665 fn = get_trust_filename (peer);
1666 if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1667 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1668 cp->disk_trust = cp->trust = ntohl (trust);
1671 GNUNET_break (GNUNET_OK ==
1672 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1675 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1680 (void) consider_migration (pos, &peer->hashPubKey, cp);
1687 * Method called whenever a given peer has a status change.
1689 * @param cls closure
1690 * @param peer peer identity this notification is about
1691 * @param bandwidth_in available amount of inbound bandwidth
1692 * @param bandwidth_out available amount of outbound bandwidth
1693 * @param timeout absolute time when this peer will time out
1694 * unless we see some further activity from it
1695 * @param atsi status information
1698 peer_status_handler (void *cls,
1700 GNUNET_PeerIdentity * peer,
1701 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1702 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1703 struct GNUNET_TIME_Absolute timeout,
1704 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1706 struct ConnectedPeer *cp;
1707 struct GNUNET_TIME_Relative latency;
1709 latency = get_latency (atsi);
1710 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1717 GNUNET_LOAD_value_set_decline (cp->transmission_delay,
1724 * Increase the host credit by a value.
1726 * @param host which peer to change the trust value on
1727 * @param value is the int value by which the
1728 * host credit is to be increased or decreased
1729 * @returns the actual change in trust (positive or negative)
1732 change_host_trust (struct ConnectedPeer *host, int value)
1736 GNUNET_assert (host != NULL);
1739 if (host->trust + value < host->trust)
1741 value = UINT32_MAX - host->trust;
1742 host->trust = UINT32_MAX;
1745 host->trust += value;
1749 if (host->trust < -value)
1751 value = -host->trust;
1755 host->trust += value;
1762 * Write host-trust information to a file - flush the buffer entry!
1765 flush_trust (void *cls,
1766 const GNUNET_HashCode *key,
1769 struct ConnectedPeer *host = value;
1772 struct GNUNET_PeerIdentity pid;
1774 if (host->trust == host->disk_trust)
1775 return GNUNET_OK; /* unchanged */
1776 GNUNET_PEER_resolve (host->pid,
1778 fn = get_trust_filename (&pid);
1779 if (host->trust == 0)
1781 if ((0 != UNLINK (fn)) && (errno != ENOENT))
1782 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1783 GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1787 trust = htonl (host->trust);
1788 if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust,
1790 GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1791 | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1792 host->disk_trust = host->trust;
1799 * Call this method periodically to scan data/hosts for new hosts.
1802 cron_flush_trust (void *cls,
1803 const struct GNUNET_SCHEDULER_TaskContext *tc)
1806 if (NULL == connected_peers)
1808 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1813 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1815 GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1820 * Free (each) request made by the peer.
1822 * @param cls closure, points to peer that the request belongs to
1823 * @param key current key code
1824 * @param value value in the hash map
1825 * @return GNUNET_YES (we should continue to iterate)
1828 destroy_request (void *cls,
1829 const GNUNET_HashCode * key,
1832 const struct GNUNET_PeerIdentity * peer = cls;
1833 struct PendingRequest *pr = value;
1835 GNUNET_break (GNUNET_YES ==
1836 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1839 destroy_pending_request (pr);
1845 * Method called whenever a peer disconnects.
1847 * @param cls closure, not used
1848 * @param peer peer identity this notification is about
1851 peer_disconnect_handler (void *cls,
1853 GNUNET_PeerIdentity * peer)
1855 struct ConnectedPeer *cp;
1856 struct PendingMessage *pm;
1858 struct MigrationReadyBlock *pos;
1859 struct MigrationReadyBlock *next;
1861 if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity)))
1863 GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1867 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1871 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1873 if (NULL != cp->last_client_replies[i])
1875 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1876 cp->last_client_replies[i] = NULL;
1879 GNUNET_break (GNUNET_YES ==
1880 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1883 if (cp->irc != NULL)
1885 GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1887 cp->pr->pirc = NULL;
1891 /* remove this peer from migration considerations; schedule
1894 while (NULL != (pos = next))
1897 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1899 if (pos->target_list[i] == cp->pid)
1901 GNUNET_PEER_change_rc (pos->target_list[i], -1);
1902 pos->target_list[i] = 0;
1905 if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1907 delete_migration_block (pos);
1908 consider_migration_gathering ();
1911 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1912 &consider_migration,
1915 GNUNET_PEER_change_rc (cp->pid, -1);
1916 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1917 if (NULL != cp->cth)
1919 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1922 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1924 GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1925 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1927 while (NULL != (pm = cp->pending_messages_head))
1928 destroy_pending_message (pm, 0 /* delivery failed */);
1929 GNUNET_LOAD_value_free (cp->transmission_delay);
1930 GNUNET_break (0 == cp->pending_requests);
1936 * Iterator over hash map entries that removes all occurences
1937 * of the given 'client' from the 'last_client_replies' of the
1938 * given connected peer.
1940 * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1941 * @param key current key code (unused)
1942 * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1943 * @return GNUNET_YES (we should continue to iterate)
1946 remove_client_from_last_client_replies (void *cls,
1947 const GNUNET_HashCode * key,
1950 struct GNUNET_SERVER_Client *client = cls;
1951 struct ConnectedPeer *cp = value;
1954 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1956 if (cp->last_client_replies[i] == client)
1958 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1959 cp->last_client_replies[i] = NULL;
1967 * A client disconnected. Remove all of its pending queries.
1969 * @param cls closure, NULL
1970 * @param client identification of the client
1973 handle_client_disconnect (void *cls,
1974 struct GNUNET_SERVER_Client
1977 struct ClientList *pos;
1978 struct ClientList *prev;
1979 struct ClientRequestList *rcl;
1980 struct ClientResponseMessage *creply;
1986 while ( (NULL != pos) &&
1987 (pos->client != client) )
1993 return; /* no requests pending for this client */
1994 while (NULL != (rcl = pos->rl_head))
1996 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1997 "Destroying pending request `%s' on disconnect\n",
1998 GNUNET_h2s (&rcl->req->query));
1999 destroy_pending_request (rcl->req);
2002 client_list = pos->next;
2004 prev->next = pos->next;
2005 if (pos->th != NULL)
2007 GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
2010 while (NULL != (creply = pos->res_head))
2012 GNUNET_CONTAINER_DLL_remove (pos->res_head,
2015 GNUNET_free (creply);
2017 GNUNET_SERVER_client_drop (pos->client);
2019 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2020 &remove_client_from_last_client_replies,
2026 * Iterator to free peer entries.
2028 * @param cls closure, unused
2029 * @param key current key code
2030 * @param value value in the hash map (peer entry)
2031 * @return GNUNET_YES (we should continue to iterate)
2034 clean_peer (void *cls,
2035 const GNUNET_HashCode * key,
2038 peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
2044 * Task run during shutdown.
2050 shutdown_task (void *cls,
2051 const struct GNUNET_SCHEDULER_TaskContext *tc)
2055 GNUNET_DATASTORE_cancel (mig_qe);
2060 GNUNET_DATASTORE_cancel (dht_qe);
2063 if (GNUNET_SCHEDULER_NO_TASK != mig_task)
2065 GNUNET_SCHEDULER_cancel (mig_task);
2066 mig_task = GNUNET_SCHEDULER_NO_TASK;
2068 if (GNUNET_SCHEDULER_NO_TASK != dht_task)
2070 GNUNET_SCHEDULER_cancel (dht_task);
2071 dht_task = GNUNET_SCHEDULER_NO_TASK;
2073 while (client_list != NULL)
2074 handle_client_disconnect (NULL,
2075 client_list->client);
2076 cron_flush_trust (NULL, NULL);
2077 GNUNET_assert (NULL != core);
2078 GNUNET_CORE_disconnect (core);
2080 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2083 GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
2084 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
2085 requests_by_expiration_heap = 0;
2086 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2087 connected_peers = NULL;
2088 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
2089 GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2090 query_request_map = NULL;
2091 GNUNET_LOAD_value_free (rt_entry_lifetime);
2092 rt_entry_lifetime = NULL;
2093 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
2094 GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2095 peer_request_map = NULL;
2098 GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
2103 GNUNET_DATASTORE_disconnect (dsh,
2107 while (mig_head != NULL)
2108 delete_migration_block (mig_head);
2109 GNUNET_assert (0 == mig_size);
2110 GNUNET_DHT_disconnect (dht_handle);
2112 GNUNET_LOAD_value_free (datastore_get_load);
2113 datastore_get_load = NULL;
2114 GNUNET_LOAD_value_free (datastore_put_load);
2115 datastore_put_load = NULL;
2116 GNUNET_BLOCK_context_destroy (block_ctx);
2118 GNUNET_CONFIGURATION_destroy (block_cfg);
2121 GNUNET_free_non_null (trustDirectory);
2122 trustDirectory = NULL;
2123 GNUNET_SCHEDULER_cancel (cover_age_task);
2124 cover_age_task = GNUNET_SCHEDULER_NO_TASK;
2128 /* ******************* Utility functions ******************** */
2132 * We've had to delay a request for transmission to core, but now
2133 * we should be ready. Run it.
2135 * @param cls the 'struct ConnectedPeer' for which a request was delayed
2136 * @param tc task context (unused)
2139 delayed_transmission_request (void *cls,
2140 const struct GNUNET_SCHEDULER_TaskContext *tc)
2142 struct ConnectedPeer *cp = cls;
2143 struct GNUNET_PeerIdentity pid;
2144 struct PendingMessage *pm;
2146 pm = cp->pending_messages_head;
2147 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2148 GNUNET_assert (cp->cth == NULL);
2151 GNUNET_PEER_resolve (cp->pid,
2153 cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2154 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2156 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2165 * Transmit messages by copying it to the target buffer
2166 * "buf". "buf" will be NULL and "size" zero if the socket was closed
2167 * for writing in the meantime. In that case, do nothing
2168 * (the disconnect or shutdown handler will take care of the rest).
2169 * If we were able to transmit messages and there are still more
2170 * pending, ask core again for further calls to this function.
2172 * @param cls closure, pointer to the 'struct ConnectedPeer*'
2173 * @param size number of bytes available in buf
2174 * @param buf where the callee should write the message
2175 * @return number of bytes written to buf
2178 transmit_to_peer (void *cls,
2179 size_t size, void *buf)
2181 struct ConnectedPeer *cp = cls;
2183 struct PendingMessage *pm;
2184 struct PendingMessage *next_pm;
2185 struct GNUNET_TIME_Absolute now;
2186 struct GNUNET_TIME_Relative min_delay;
2187 struct MigrationReadyBlock *mb;
2188 struct MigrationReadyBlock *next;
2189 struct PutMessage migm;
2192 struct GNUNET_PeerIdentity pid;
2198 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2199 "Dropping message, core too busy.\n");
2201 GNUNET_LOAD_update (cp->transmission_delay,
2204 if (NULL != (pm = cp->pending_messages_head))
2206 GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2207 cp->pending_messages_tail,
2209 cp->pending_requests--;
2210 destroy_pending_message (pm, 0);
2212 if (NULL != (pm = cp->pending_messages_head))
2214 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2215 min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until);
2216 cp->delayed_transmission_request_task
2217 = GNUNET_SCHEDULER_add_delayed (min_delay,
2218 &delayed_transmission_request,
2223 GNUNET_LOAD_update (cp->transmission_delay,
2224 GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).rel_value);
2225 now = GNUNET_TIME_absolute_get ();
2227 min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2228 next_pm = cp->pending_messages_head;
2229 while ( (NULL != (pm = next_pm) ) &&
2230 (pm->msize <= size) )
2233 if (pm->delay_until.abs_value > now.abs_value)
2235 min_delay = GNUNET_TIME_relative_min (min_delay,
2236 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2239 memcpy (&cbuf[msize], &pm[1], pm->msize);
2242 if (NULL == pm->pml)
2244 GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2245 cp->pending_messages_tail,
2247 cp->pending_requests--;
2249 destroy_pending_message (pm, cp->pid);
2252 min_delay = GNUNET_TIME_UNIT_ZERO;
2253 if (NULL != cp->pending_messages_head)
2255 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2256 cp->delayed_transmission_request_task
2257 = GNUNET_SCHEDULER_add_delayed (min_delay,
2258 &delayed_transmission_request,
2263 GNUNET_PEER_resolve (cp->pid,
2266 while (NULL != (mb = next))
2269 for (i=0;i<MIGRATION_LIST_SIZE;i++)
2271 if ( (cp->pid == mb->target_list[i]) &&
2272 (mb->size + sizeof (migm) <= size) )
2274 GNUNET_PEER_change_rc (mb->target_list[i], -1);
2275 mb->target_list[i] = 0;
2277 memset (&migm, 0, sizeof (migm));
2278 migm.header.size = htons (sizeof (migm) + mb->size);
2279 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2280 migm.type = htonl (mb->type);
2281 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2282 memcpy (&cbuf[msize], &migm, sizeof (migm));
2283 msize += sizeof (migm);
2284 size -= sizeof (migm);
2285 memcpy (&cbuf[msize], &mb[1], mb->size);
2289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2290 "Pushing migration block `%s' (%u bytes) to `%s'\n",
2291 GNUNET_h2s (&mb->query),
2292 (unsigned int) mb->size,
2300 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2301 "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2302 GNUNET_h2s (&mb->query),
2303 (unsigned int) mb->size,
2308 if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2309 (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2311 delete_migration_block (mb);
2312 consider_migration_gathering ();
2315 consider_migration (NULL,
2320 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2321 "Transmitting %u bytes to peer with PID %u\n",
2322 (unsigned int) msize,
2323 (unsigned int) cp->pid);
2330 * Add a message to the set of pending messages for the given peer.
2332 * @param cp peer to send message to
2333 * @param pm message to queue
2334 * @param pr request on which behalf this message is being queued
2337 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2338 struct PendingMessage *pm,
2339 struct PendingRequest *pr)
2341 struct PendingMessage *pos;
2342 struct PendingMessageList *pml;
2343 struct GNUNET_PeerIdentity pid;
2345 GNUNET_assert (pm->next == NULL);
2346 GNUNET_assert (pm->pml == NULL);
2349 pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2354 GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2358 pos = cp->pending_messages_head;
2359 while ( (pos != NULL) &&
2360 (pm->priority < pos->priority) )
2362 GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2363 cp->pending_messages_tail,
2366 cp->pending_requests++;
2367 if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2369 GNUNET_STATISTICS_update (stats,
2370 gettext_noop ("# P2P searches discarded (queue length bound)"),
2373 destroy_pending_message (cp->pending_messages_tail, 0);
2375 GNUNET_PEER_resolve (cp->pid, &pid);
2376 if (NULL != cp->cth)
2378 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2381 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2383 GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
2384 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2386 /* need to schedule transmission */
2387 cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2388 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2389 cp->pending_messages_head->priority,
2392 cp->pending_messages_head->msize,
2395 if (cp->cth == NULL)
2398 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2399 "Failed to schedule transmission with core!\n");
2401 GNUNET_STATISTICS_update (stats,
2402 gettext_noop ("# CORE transmission failures"),
2410 * Test if the DATABASE (GET) load on this peer is too high
2411 * to even consider processing the query at
2414 * @return GNUNET_YES if the load is too high to do anything (load high)
2415 * GNUNET_NO to process normally (load normal)
2416 * GNUNET_SYSERR to process for free (load low)
2419 test_get_load_too_high (uint32_t priority)
2423 ld = GNUNET_LOAD_get_load (datastore_get_load);
2425 return GNUNET_SYSERR;
2435 * Test if the DATABASE (PUT) load on this peer is too high
2436 * to even consider processing the query at
2439 * @return GNUNET_YES if the load is too high to do anything (load high)
2440 * GNUNET_NO to process normally (load normal or low)
2443 test_put_load_too_high (uint32_t priority)
2447 if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2448 return GNUNET_NO; /* very fast */
2449 ld = GNUNET_LOAD_get_load (datastore_put_load);
2450 if (ld < 2.0 * (1 + priority))
2452 GNUNET_STATISTICS_update (stats,
2453 gettext_noop ("# storage requests dropped due to high load"),
2460 /* ******************* Pending Request Refresh Task ******************** */
2465 * We use a random delay to make the timing of requests less
2466 * predictable. This function returns such a random delay. We add a base
2467 * delay of MAX_CORK_DELAY (1s).
2469 * FIXME: make schedule dependent on the specifics of the request?
2470 * Or bandwidth and number of connected peers and load?
2472 * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2474 static struct GNUNET_TIME_Relative
2475 get_processing_delay ()
2478 GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2479 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2480 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2486 * We're processing a GET request from another peer and have decided
2487 * to forward it to other peers. This function is called periodically
2488 * and should forward the request to other peers until we have all
2489 * possible replies. If we have transmitted the *only* reply to
2490 * the initiator we should destroy the pending request. If we have
2491 * many replies in the queue to the initiator, we should delay sending
2492 * out more queries until the reply queue has shrunk some.
2494 * @param cls our "struct ProcessGetContext *"
2498 forward_request_task (void *cls,
2499 const struct GNUNET_SCHEDULER_TaskContext *tc);
2503 * Function called after we either failed or succeeded
2504 * at transmitting a query to a peer.
2506 * @param cls the requests "struct PendingRequest*"
2507 * @param tpid ID of receiving peer, 0 on transmission error
2510 transmit_query_continuation (void *cls,
2511 GNUNET_PEER_Id tpid)
2513 struct PendingRequest *pr = cls;
2516 GNUNET_STATISTICS_update (stats,
2517 gettext_noop ("# queries scheduled for forwarding"),
2523 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2524 "Transmission of request failed, will try again later.\n");
2526 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2527 pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2528 &forward_request_task,
2533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2534 "Transmitted query `%s'\n",
2535 GNUNET_h2s (&pr->query));
2537 GNUNET_STATISTICS_update (stats,
2538 gettext_noop ("# queries forwarded"),
2541 for (i=0;i<pr->used_targets_off;i++)
2542 if (pr->used_targets[i].pid == tpid)
2543 break; /* found match! */
2544 if (i == pr->used_targets_off)
2546 /* need to create new entry */
2547 if (pr->used_targets_off == pr->used_targets_size)
2548 GNUNET_array_grow (pr->used_targets,
2549 pr->used_targets_size,
2550 pr->used_targets_size * 2 + 2);
2551 GNUNET_PEER_change_rc (tpid, 1);
2552 pr->used_targets[pr->used_targets_off].pid = tpid;
2553 pr->used_targets[pr->used_targets_off].num_requests = 0;
2554 i = pr->used_targets_off++;
2556 pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2557 pr->used_targets[i].num_requests++;
2558 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2559 pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2560 &forward_request_task,
2566 * How many bytes should a bloomfilter be if we have already seen
2567 * entry_count responses? Note that BLOOMFILTER_K gives us the number
2568 * of bits set per entry. Furthermore, we should not re-size the
2569 * filter too often (to keep it cheap).
2571 * Since other peers will also add entries but not resize the filter,
2572 * we should generally pick a slightly larger size than what the
2573 * strict math would suggest.
2575 * @return must be a power of two and smaller or equal to 2^15.
2578 compute_bloomfilter_size (unsigned int entry_count)
2581 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2582 uint16_t max = 1 << 15;
2584 if (entry_count > max)
2587 while ((size < max) && (size < ideal))
2596 * Recalculate our bloom filter for filtering replies. This function
2597 * will create a new bloom filter from scratch, so it should only be
2598 * called if we have no bloomfilter at all (and hence can create a
2599 * fresh one of minimal size without problems) OR if our peer is the
2600 * initiator (in which case we may resize to larger than mimimum size).
2602 * @param pr request for which the BF is to be recomputed
2605 refresh_bloomfilter (struct PendingRequest *pr)
2609 GNUNET_HashCode mhash;
2611 nsize = compute_bloomfilter_size (pr->replies_seen_off);
2612 if (nsize == pr->bf_size)
2613 return; /* size not changed */
2615 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2616 pr->bf_size = nsize;
2617 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2618 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2621 for (i=0;i<pr->replies_seen_off;i++)
2623 GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2626 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2632 * Function called after we've tried to reserve a certain amount of
2633 * bandwidth for a reply. Check if we succeeded and if so send our
2636 * @param cls the requests "struct PendingRequest*"
2637 * @param peer identifies the peer
2638 * @param bpm_out set to the current bandwidth limit (sending) for this peer
2639 * @param amount set to the amount that was actually reserved or unreserved
2640 * @param preference current traffic preference for the given peer
2643 target_reservation_cb (void *cls,
2645 GNUNET_PeerIdentity * peer,
2646 struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2648 uint64_t preference)
2650 struct PendingRequest *pr = cls;
2651 struct ConnectedPeer *cp;
2652 struct PendingMessage *pm;
2653 struct GetMessage *gm;
2654 GNUNET_HashCode *ext;
2662 /* (3) transmit, update ttl/priority */
2663 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2667 /* Peer must have just left */
2669 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2670 "Selected peer disconnected!\n");
2672 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2673 pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2674 &forward_request_task,
2682 /* error in communication with core, try again later */
2683 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2684 pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2685 &forward_request_task,
2689 no_route = GNUNET_NO;
2695 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2696 "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2700 GNUNET_STATISTICS_update (stats,
2701 gettext_noop ("# reply bandwidth reservation requests failed"),
2704 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2705 pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2706 &forward_request_task,
2708 return; /* this target round failed */
2710 no_route = GNUNET_YES;
2713 GNUNET_STATISTICS_update (stats,
2714 gettext_noop ("# queries scheduled for forwarding"),
2717 for (i=0;i<pr->used_targets_off;i++)
2718 if (pr->used_targets[i].pid == cp->pid)
2720 GNUNET_STATISTICS_update (stats,
2721 gettext_noop ("# queries retransmitted to same target"),
2727 /* build message and insert message into priority queue */
2729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2730 "Forwarding request `%s' to `%4s'!\n",
2731 GNUNET_h2s (&pr->query),
2736 if (GNUNET_YES == no_route)
2738 bm |= GET_MESSAGE_BIT_RETURN_TO;
2741 if (pr->namespace != NULL)
2743 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2746 if (pr->target_pid != 0)
2748 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2751 msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2752 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2753 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2755 gm = (struct GetMessage*) &pm[1];
2756 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2757 gm->header.size = htons (msize);
2758 gm->type = htonl (pr->type);
2759 pr->remaining_priority /= 2;
2760 gm->priority = htonl (pr->remaining_priority);
2761 gm->ttl = htonl (pr->ttl);
2762 gm->filter_mutator = htonl(pr->mingle);
2763 gm->hash_bitmap = htonl (bm);
2764 gm->query = pr->query;
2765 ext = (GNUNET_HashCode*) &gm[1];
2767 if (GNUNET_YES == no_route)
2768 GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2769 if (pr->namespace != NULL)
2770 memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2771 if (pr->target_pid != 0)
2772 GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2773 bfdata = (char *) &ext[k];
2775 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2778 pm->cont = &transmit_query_continuation;
2780 cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2781 add_to_pending_messages_for_peer (cp, pm, pr);
2786 * Closure used for "target_peer_select_cb".
2788 struct PeerSelectionContext
2791 * The request for which we are selecting
2794 struct PendingRequest *pr;
2797 * Current "prime" target.
2799 struct GNUNET_PeerIdentity target;
2802 * How much do we like this target?
2804 double target_score;
2807 * Does it make sense to we re-try quickly again?
2815 * Function called for each connected peer to determine
2816 * which one(s) would make good targets for forwarding.
2818 * @param cls closure (struct PeerSelectionContext)
2819 * @param key current key code (peer identity)
2820 * @param value value in the hash map (struct ConnectedPeer)
2821 * @return GNUNET_YES if we should continue to
2826 target_peer_select_cb (void *cls,
2827 const GNUNET_HashCode * key,
2830 struct PeerSelectionContext *psc = cls;
2831 struct ConnectedPeer *cp = value;
2832 struct PendingRequest *pr = psc->pr;
2833 struct GNUNET_TIME_Relative delay;
2838 /* 1) check that this peer is not the initiator */
2842 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2843 "Skipping initiator in forwarding selection\n");
2845 return GNUNET_YES; /* skip */
2847 if (cp->irc != NULL)
2849 psc->fast_retry = GNUNET_YES;
2850 return GNUNET_YES; /* skip: already querying core about this peer for other reasons */
2853 /* 2) check if we have already (recently) forwarded to this peer */
2854 /* 2a) this particular request */
2856 for (i=0;i<pr->used_targets_off;i++)
2857 if (pr->used_targets[i].pid == cp->pid)
2859 pc = pr->used_targets[i].num_requests;
2860 GNUNET_assert (pc > 0);
2861 /* FIXME: make re-enabling a peer independent of how often
2862 this function is called??? */
2863 if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2864 RETRY_PROBABILITY_INV * pc))
2867 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2868 "NOT re-trying query that was previously transmitted %u times\n",
2871 return GNUNET_YES; /* skip */
2878 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2879 "Re-trying query that was previously transmitted %u times to this peer\n",
2883 /* 2b) many other requests to this peer */
2884 delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2885 if (delay.rel_value <= cp->avg_delay.rel_value)
2888 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2889 "NOT sending query since we send %u others to this peer in the last %llums\n",
2891 cp->avg_delay.rel_value);
2893 return GNUNET_YES; /* skip */
2896 /* 3) calculate how much we'd like to forward to this peer,
2897 starting with a random value that is strong enough
2898 to at least give any peer a chance sometimes
2899 (compared to the other factors that come later) */
2900 /* 3a) count successful (recent) routes from cp for same source */
2903 score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2904 P2P_SUCCESS_LIST_SIZE);
2905 for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2906 if (cp->last_p2p_replies[i] == pr->cp->pid)
2907 score += 1.0; /* likely successful based on hot path */
2911 score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2912 CS2P_SUCCESS_LIST_SIZE);
2913 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2914 if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2915 score += 1.0; /* likely successful based on hot path */
2917 /* 3b) include latency */
2918 if (cp->avg_delay.rel_value < 4 * TTL_DECREMENT)
2919 score += 1.0; /* likely fast based on latency */
2920 /* 3c) include priorities */
2921 if (cp->avg_priority <= pr->remaining_priority / 2.0)
2922 score += 1.0; /* likely successful based on priorities */
2923 /* 3d) penalize for queue size */
2924 score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER);
2925 /* 3e) include peer proximity */
2926 score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2927 &pr->query)) / (double) UINT32_MAX);
2928 /* 4) super-bonus for being the known target */
2929 if (pr->target_pid == cp->pid)
2931 /* store best-fit in closure */
2932 score++; /* avoid zero */
2933 if (score > psc->target_score)
2935 psc->target_score = score;
2936 psc->target.hashPubKey = *key;
2939 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2940 "Peer `%s' gets score %f for forwarding query, max is %8f\n",
2950 * The priority level imposes a bound on the maximum
2951 * value for the ttl that can be requested.
2953 * @param ttl_in requested ttl
2954 * @param prio given priority
2955 * @return ttl_in if ttl_in is below the limit,
2956 * otherwise the ttl-limit for the given priority
2959 bound_ttl (int32_t ttl_in, uint32_t prio)
2961 unsigned long long allowed;
2965 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
2966 if (ttl_in > allowed)
2968 if (allowed >= (1 << 30))
2977 * Iterator called on each result obtained for a DHT
2978 * operation that expects a reply
2980 * @param cls closure
2981 * @param exp when will this value expire
2982 * @param key key of the result
2983 * @param get_path NULL-terminated array of pointers
2984 * to the peers on reverse GET path (or NULL if not recorded)
2985 * @param put_path NULL-terminated array of pointers
2986 * to the peers on the PUT path (or NULL if not recorded)
2987 * @param type type of the result
2988 * @param size number of bytes in data
2989 * @param data pointer to the result data
2992 process_dht_reply (void *cls,
2993 struct GNUNET_TIME_Absolute exp,
2994 const GNUNET_HashCode * key,
2995 const struct GNUNET_PeerIdentity * const *get_path,
2996 const struct GNUNET_PeerIdentity * const *put_path,
2997 enum GNUNET_BLOCK_Type type,
3003 * We're processing a GET request and have decided
3004 * to forward it to other peers. This function is called periodically
3005 * and should forward the request to other peers until we have all
3006 * possible replies. If we have transmitted the *only* reply to
3007 * the initiator we should destroy the pending request. If we have
3008 * many replies in the queue to the initiator, we should delay sending
3009 * out more queries until the reply queue has shrunk some.
3011 * @param cls our "struct ProcessGetContext *"
3015 forward_request_task (void *cls,
3016 const struct GNUNET_SCHEDULER_TaskContext *tc)
3018 struct PendingRequest *pr = cls;
3019 struct PeerSelectionContext psc;
3020 struct ConnectedPeer *cp;
3021 struct GNUNET_TIME_Relative delay;
3023 pr->task = GNUNET_SCHEDULER_NO_TASK;
3024 if (pr->pirc != NULL)
3027 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3028 "Forwarding of query `%s' not attempted due to pending local lookup!\n",
3029 GNUNET_h2s (&pr->query));
3031 return; /* already pending */
3033 if (GNUNET_YES == pr->local_only)
3034 return; /* configured to not do P2P search */
3036 if ( (0 == pr->anonymity_level) &&
3037 (GNUNET_YES != pr->forward_only) &&
3038 (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
3039 (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
3041 pr->dht_get = GNUNET_DHT_get_start (dht_handle,
3042 GNUNET_TIME_UNIT_FOREVER_REL,
3045 DEFAULT_GET_REPLICATION,
3050 (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3055 if ( (pr->anonymity_level > 1) &&
3056 (cover_query_count < pr->anonymity_level - 1) )
3058 delay = get_processing_delay ();
3060 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3061 "Not enough cover traffic to forward query `%s', will try again in %llu ms!\n",
3062 GNUNET_h2s (&pr->query),
3065 pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3066 &forward_request_task,
3070 /* consume cover traffic */
3071 if (pr->anonymity_level > 1)
3072 cover_query_count -= pr->anonymity_level - 1;
3074 /* (1) select target */
3076 psc.target_score = -DBL_MAX;
3077 psc.fast_retry = GNUNET_NO;
3078 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
3079 &target_peer_select_cb,
3081 if (psc.target_score == -DBL_MAX)
3083 if (psc.fast_retry == GNUNET_YES)
3084 delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */
3086 delay = get_processing_delay ();
3088 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3089 "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
3090 GNUNET_h2s (&pr->query),
3093 pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3094 &forward_request_task,
3096 return; /* nobody selected */
3098 /* (3) update TTL/priority */
3099 if (pr->client_request_list != NULL)
3101 /* FIXME: use better algorithm!? */
3102 if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3105 /* bound priority we use by priorities we see from other peers
3106 rounded up (must round up so that we can see non-zero
3107 priorities, but round up as little as possible to make it
3108 plausible that we forwarded another peers request) */
3109 if (pr->priority > current_priorities + 1.0)
3110 pr->priority = (uint32_t) current_priorities + 1.0;
3111 pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
3114 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3115 "Trying query `%s' with priority %u and TTL %d.\n",
3116 GNUNET_h2s (&pr->query),
3122 /* (3) reserve reply bandwidth */
3123 if (GNUNET_NO == pr->forward_only)
3125 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3126 &psc.target.hashPubKey);
3127 GNUNET_assert (NULL != cp);
3128 GNUNET_assert (cp->irc == NULL);
3131 cp->irc = GNUNET_CORE_peer_change_preference (core,
3133 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3134 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
3137 &target_reservation_cb,
3139 GNUNET_assert (cp->irc != NULL);
3140 cp->inc_preference = 0;
3144 /* force forwarding */
3145 static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
3146 target_reservation_cb (pr, &psc.target,
3152 /* **************************** P2P PUT Handling ************************ */
3156 * Function called after we either failed or succeeded
3157 * at transmitting a reply to a peer.
3159 * @param cls the requests "struct PendingRequest*"
3160 * @param tpid ID of receiving peer, 0 on transmission error
3163 transmit_reply_continuation (void *cls,
3164 GNUNET_PEER_Id tpid)
3166 struct PendingRequest *pr = cls;
3170 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3171 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3172 /* only one reply expected, done with the request! */
3173 destroy_pending_request (pr);
3175 case GNUNET_BLOCK_TYPE_ANY:
3176 case GNUNET_BLOCK_TYPE_FS_KBLOCK:
3177 case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3187 * Transmit the given message by copying it to the target buffer
3188 * "buf". "buf" will be NULL and "size" zero if the socket was closed
3189 * for writing in the meantime. In that case, do nothing
3190 * (the disconnect or shutdown handler will take care of the rest).
3191 * If we were able to transmit messages and there are still more
3192 * pending, ask core again for further calls to this function.
3194 * @param cls closure, pointer to the 'struct ClientList*'
3195 * @param size number of bytes available in buf
3196 * @param buf where the callee should write the message
3197 * @return number of bytes written to buf
3200 transmit_to_client (void *cls,
3201 size_t size, void *buf)
3203 struct ClientList *cl = cls;
3205 struct ClientResponseMessage *creply;
3212 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3213 "Not sending reply, client communication problem.\n");
3218 while ( (NULL != (creply = cl->res_head) ) &&
3219 (creply->msize <= size) )
3221 memcpy (&cbuf[msize], &creply[1], creply->msize);
3222 msize += creply->msize;
3223 size -= creply->msize;
3224 GNUNET_CONTAINER_DLL_remove (cl->res_head,
3227 GNUNET_free (creply);
3230 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3232 GNUNET_TIME_UNIT_FOREVER_REL,
3233 &transmit_to_client,
3236 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3237 "Transmitted %u bytes to client\n",
3238 (unsigned int) msize);
3245 * Closure for "process_reply" function.
3247 struct ProcessReplyClosure
3250 * The data for the reply.
3255 * Who gave us this reply? NULL for local host (or DHT)
3257 struct ConnectedPeer *sender;
3260 * When the reply expires.
3262 struct GNUNET_TIME_Absolute expiration;
3270 * Type of the block.
3272 enum GNUNET_BLOCK_Type type;
3275 * How much was this reply worth to us?
3280 * Anonymity requirements for this reply.
3282 uint32_t anonymity_level;
3285 * Evaluation result (returned).
3287 enum GNUNET_BLOCK_EvaluationResult eval;
3290 * Did we finish processing the associated request?
3295 * Did we find a matching request?
3302 * We have received a reply; handle it!
3304 * @param cls response (struct ProcessReplyClosure)
3305 * @param key our query
3306 * @param value value in the hash map (info about the query)
3307 * @return GNUNET_YES (we should continue to iterate)
3310 process_reply (void *cls,
3311 const GNUNET_HashCode * key,
3314 struct ProcessReplyClosure *prq = cls;
3315 struct PendingRequest *pr = value;
3316 struct PendingMessage *reply;
3317 struct ClientResponseMessage *creply;
3318 struct ClientList *cl;
3319 struct PutMessage *pm;
3320 struct ConnectedPeer *cp;
3321 struct GNUNET_TIME_Relative cur_delay;
3323 struct GNUNET_TIME_Relative art_delay;
3328 if (NULL == pr->client_request_list)
3330 /* reply will go over the network, check for cover traffic */
3331 if ( (prq->anonymity_level > 1) &&
3332 (cover_content_count < prq->anonymity_level - 1) )
3334 /* insufficient cover traffic, skip */
3335 GNUNET_STATISTICS_update (stats,
3336 gettext_noop ("# replies suppressed due to lack of cover traffic"),
3341 if (prq->anonymity_level > 1)
3342 cover_content_count -= prq->anonymity_level - 1;
3345 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3346 "Matched result (type %u) for query `%s' with pending request\n",
3347 (unsigned int) prq->type,
3350 GNUNET_STATISTICS_update (stats,
3351 gettext_noop ("# replies received and matched"),
3354 if (prq->sender != NULL)
3356 for (i=0;i<pr->used_targets_off;i++)
3357 if (pr->used_targets[i].pid == prq->sender->pid)
3359 if (i < pr->used_targets_off)
3361 cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);
3362 prq->sender->avg_delay.rel_value
3363 = (prq->sender->avg_delay.rel_value *
3364 (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N;
3365 prq->sender->avg_priority
3366 = (prq->sender->avg_priority *
3367 (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3371 GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3372 [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
3374 GNUNET_PEER_change_rc (pr->cp->pid, 1);
3375 prq->sender->last_p2p_replies
3376 [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3381 if (NULL != prq->sender->last_client_replies
3382 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3383 GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3384 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3385 prq->sender->last_client_replies
3386 [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3387 = pr->client_request_list->client_list->client;
3388 GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3391 prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3396 pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3401 case GNUNET_BLOCK_EVALUATION_OK_MORE:
3403 case GNUNET_BLOCK_EVALUATION_OK_LAST:
3404 while (NULL != pr->pending_head)
3405 destroy_pending_message_list_entry (pr->pending_head);
3408 if (pr->client_request_list != NULL)
3409 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
3411 GNUNET_DATASTORE_cancel (pr->qe);
3414 pr->do_remove = GNUNET_YES;
3415 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3417 GNUNET_SCHEDULER_cancel (pr->task);
3418 pr->task = GNUNET_SCHEDULER_NO_TASK;
3420 GNUNET_break (GNUNET_YES ==
3421 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3424 GNUNET_LOAD_update (rt_entry_lifetime,
3425 GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
3427 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3428 GNUNET_STATISTICS_update (stats,
3429 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3433 /* GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3434 "Duplicate response `%s', discarding.\n",
3435 GNUNET_h2s (&mhash));*/
3437 return GNUNET_YES; /* duplicate */
3438 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3439 return GNUNET_YES; /* wrong namespace */
3440 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3443 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3446 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3447 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3448 _("Unsupported block type %u\n"),
3452 if (pr->client_request_list != NULL)
3454 if (pr->replies_seen_size == pr->replies_seen_off)
3455 GNUNET_array_grow (pr->replies_seen,
3456 pr->replies_seen_size,
3457 pr->replies_seen_size * 2 + 4);
3458 GNUNET_CRYPTO_hash (prq->data,
3460 &pr->replies_seen[pr->replies_seen_off++]);
3461 refresh_bloomfilter (pr);
3463 if (NULL == prq->sender)
3466 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3467 "Found result for query `%s' in local datastore\n",
3470 GNUNET_STATISTICS_update (stats,
3471 gettext_noop ("# results found locally"),
3475 prq->priority += pr->remaining_priority;
3476 pr->remaining_priority = 0;
3477 pr->results_found++;
3478 prq->request_found = GNUNET_YES;
3479 if (NULL != pr->client_request_list)
3481 GNUNET_STATISTICS_update (stats,
3482 gettext_noop ("# replies received for local clients"),
3485 cl = pr->client_request_list->client_list;
3486 msize = sizeof (struct PutMessage) + prq->size;
3487 creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3488 creply->msize = msize;
3489 creply->client_list = cl;
3490 GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3494 pm = (struct PutMessage*) &creply[1];
3495 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3496 pm->header.size = htons (msize);
3497 pm->type = htonl (prq->type);
3498 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3499 memcpy (&pm[1], prq->data, prq->size);
3503 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3504 "Transmitting result for query `%s' to client\n",
3507 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3509 GNUNET_TIME_UNIT_FOREVER_REL,
3510 &transmit_to_client,
3513 GNUNET_break (cl->th != NULL);
3516 prq->finished = GNUNET_YES;
3517 destroy_pending_request (pr);
3524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3525 "Transmitting result for query `%s' to other peer (PID=%u)\n",
3527 (unsigned int) cp->pid);
3529 GNUNET_STATISTICS_update (stats,
3530 gettext_noop ("# replies received for other peers"),
3533 msize = sizeof (struct PutMessage) + prq->size;
3534 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3535 reply->cont = &transmit_reply_continuation;
3536 reply->cont_cls = pr;
3538 art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3539 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3542 = GNUNET_TIME_relative_to_absolute (art_delay);
3543 GNUNET_STATISTICS_update (stats,
3544 gettext_noop ("cummulative artificial delay introduced (ms)"),
3545 art_delay.abs_value,
3548 reply->msize = msize;
3549 reply->priority = UINT32_MAX; /* send replies first! */
3550 pm = (struct PutMessage*) &reply[1];
3551 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3552 pm->header.size = htons (msize);
3553 pm->type = htonl (prq->type);
3554 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3555 memcpy (&pm[1], prq->data, prq->size);
3556 add_to_pending_messages_for_peer (cp, reply, pr);
3563 * Iterator called on each result obtained for a DHT
3564 * operation that expects a reply
3566 * @param cls closure
3567 * @param exp when will this value expire
3568 * @param key key of the result
3569 * @param get_path NULL-terminated array of pointers
3570 * to the peers on reverse GET path (or NULL if not recorded)
3571 * @param put_path NULL-terminated array of pointers
3572 * to the peers on the PUT path (or NULL if not recorded)
3573 * @param type type of the result
3574 * @param size number of bytes in data
3575 * @param data pointer to the result data
3578 process_dht_reply (void *cls,
3579 struct GNUNET_TIME_Absolute exp,
3580 const GNUNET_HashCode * key,
3581 const struct GNUNET_PeerIdentity * const *get_path,
3582 const struct GNUNET_PeerIdentity * const *put_path,
3583 enum GNUNET_BLOCK_Type type,
3587 struct PendingRequest *pr = cls;
3588 struct ProcessReplyClosure prq;
3590 memset (&prq, 0, sizeof (prq));
3592 prq.expiration = exp;
3595 process_reply (&prq, key, pr);
3601 * Continuation called to notify client about result of the
3604 * @param cls closure
3605 * @param success GNUNET_SYSERR on failure
3606 * @param msg NULL on success, otherwise an error message
3609 put_migration_continuation (void *cls,
3613 struct GNUNET_TIME_Absolute *start = cls;
3614 struct GNUNET_TIME_Relative delay;
3616 delay = GNUNET_TIME_absolute_get_duration (*start);
3617 GNUNET_free (start);
3618 GNUNET_LOAD_update (datastore_put_load,
3620 if (GNUNET_OK == success)
3622 GNUNET_STATISTICS_update (stats,
3623 gettext_noop ("# datastore 'put' failures"),
3630 * Handle P2P "PUT" message.
3632 * @param cls closure, always NULL
3633 * @param other the other peer involved (sender or receiver, NULL
3634 * for loopback messages where we are both sender and receiver)
3635 * @param message the actual message
3636 * @param atsi performance information
3637 * @return GNUNET_OK to keep the connection open,
3638 * GNUNET_SYSERR to close it (signal serious error)
3641 handle_p2p_put (void *cls,
3642 const struct GNUNET_PeerIdentity *other,
3643 const struct GNUNET_MessageHeader *message,
3644 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3646 const struct PutMessage *put;
3649 enum GNUNET_BLOCK_Type type;
3650 struct GNUNET_TIME_Absolute expiration;
3651 GNUNET_HashCode query;
3652 struct ProcessReplyClosure prq;
3653 struct GNUNET_TIME_Absolute *start;
3654 struct GNUNET_TIME_Relative block_time;
3656 struct ConnectedPeer *cp;
3657 struct PendingMessage *pm;
3658 struct MigrationStopMessage *msm;
3660 msize = ntohs (message->size);
3661 if (msize < sizeof (struct PutMessage))
3664 return GNUNET_SYSERR;
3666 put = (const struct PutMessage*) message;
3667 dsize = msize - sizeof (struct PutMessage);
3668 type = ntohl (put->type);
3669 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3671 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3672 return GNUNET_SYSERR;
3674 GNUNET_BLOCK_get_key (block_ctx,
3680 GNUNET_break_op (0);
3681 return GNUNET_SYSERR;
3683 cover_content_count++;
3685 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3686 "Received result for query `%s' from peer `%4s'\n",
3687 GNUNET_h2s (&query),
3688 GNUNET_i2s (other));
3690 GNUNET_STATISTICS_update (stats,
3691 gettext_noop ("# replies received (overall)"),
3694 /* now, lookup 'query' */
3695 prq.data = (const void*) &put[1];
3697 prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3698 &other->hashPubKey);
3703 prq.expiration = expiration;
3705 prq.anonymity_level = 1;
3706 prq.finished = GNUNET_NO;
3707 prq.request_found = GNUNET_NO;
3708 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3712 if (prq.sender != NULL)
3714 prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3715 change_host_trust (prq.sender, prq.priority);
3717 if ( (GNUNET_YES == active_to_migration) &&
3718 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3721 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3722 "Replicating result for query `%s' with priority %u\n",
3723 GNUNET_h2s (&query),
3726 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3727 *start = GNUNET_TIME_absolute_get ();
3728 GNUNET_DATASTORE_put (dsh,
3729 0, &query, dsize, &put[1],
3730 type, prq.priority, 1 /* anonymity */,
3732 1 + prq.priority, MAX_DATASTORE_QUEUE,
3733 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3734 &put_migration_continuation,
3737 putl = GNUNET_LOAD_get_load (datastore_put_load);
3738 if ( (NULL != (cp = prq.sender)) &&
3739 (GNUNET_NO == prq.request_found) &&
3740 ( (GNUNET_YES != active_to_migration) ||
3741 (putl > 2.5 * (1 + prq.priority)) ) )
3743 if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000)
3744 return GNUNET_OK; /* already blocked */
3745 /* We're too busy; send MigrationStop message! */
3746 if (GNUNET_YES != active_to_migration)
3747 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3748 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3749 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3750 (unsigned int) (60000 * putl * putl)));
3752 cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3753 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
3754 sizeof (struct MigrationStopMessage));
3755 pm->msize = sizeof (struct MigrationStopMessage);
3756 pm->priority = UINT32_MAX;
3757 msm = (struct MigrationStopMessage*) &pm[1];
3758 msm->header.size = htons (sizeof (struct MigrationStopMessage));
3759 msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3760 msm->duration = GNUNET_TIME_relative_hton (block_time);
3761 add_to_pending_messages_for_peer (cp,
3770 * Handle P2P "MIGRATION_STOP" message.
3772 * @param cls closure, always NULL
3773 * @param other the other peer involved (sender or receiver, NULL
3774 * for loopback messages where we are both sender and receiver)
3775 * @param message the actual message
3776 * @param atsi performance information
3777 * @return GNUNET_OK to keep the connection open,
3778 * GNUNET_SYSERR to close it (signal serious error)
3781 handle_p2p_migration_stop (void *cls,
3782 const struct GNUNET_PeerIdentity *other,
3783 const struct GNUNET_MessageHeader *message,
3784 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3786 struct ConnectedPeer *cp;
3787 const struct MigrationStopMessage *msm;
3789 msm = (const struct MigrationStopMessage*) message;
3790 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3791 &other->hashPubKey);
3797 cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3803 /* **************************** P2P GET Handling ************************ */
3807 * Closure for 'check_duplicate_request_{peer,client}'.
3809 struct CheckDuplicateRequestClosure
3812 * The new request we should check if it already exists.
3814 const struct PendingRequest *pr;
3817 * Existing request found by the checker, NULL if none.
3819 struct PendingRequest *have;
3824 * Iterator over entries in the 'query_request_map' that
3825 * tries to see if we have the same request pending from
3826 * the same client already.
3828 * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3829 * @param key current key code (query, ignored, must match)
3830 * @param value value in the hash map (a 'struct PendingRequest'
3831 * that already exists)
3832 * @return GNUNET_YES if we should continue to
3833 * iterate (no match yet)
3834 * GNUNET_NO if not (match found).
3837 check_duplicate_request_client (void *cls,
3838 const GNUNET_HashCode * key,
3841 struct CheckDuplicateRequestClosure *cdc = cls;
3842 struct PendingRequest *have = value;
3844 if (have->client_request_list == NULL)
3846 if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3857 * We're processing (local) results for a search request
3858 * from another peer. Pass applicable results to the
3859 * peer and if we are done either clean up (operation
3860 * complete) or forward to other peers (more results possible).
3862 * @param cls our closure (struct LocalGetContext)
3863 * @param key key for the content
3864 * @param size number of bytes in data
3865 * @param data content stored
3866 * @param type type of the content
3867 * @param priority priority of the content
3868 * @param anonymity anonymity-level for the content
3869 * @param expiration expiration time for the content
3870 * @param uid unique identifier for the datum;
3871 * maybe 0 if no unique identifier is available
3874 process_local_reply (void *cls,
3875 const GNUNET_HashCode * key,
3878 enum GNUNET_BLOCK_Type type,
3881 struct GNUNET_TIME_Absolute
3885 struct PendingRequest *pr = cls;
3886 struct ProcessReplyClosure prq;
3887 struct CheckDuplicateRequestClosure cdrc;
3888 GNUNET_HashCode query;
3889 unsigned int old_rf;
3894 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3895 "Done processing local replies, forwarding request to other peers.\n");
3898 if (pr->client_request_list != NULL)
3900 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
3902 /* Figure out if this is a duplicate request and possibly
3903 merge 'struct PendingRequest' entries */
3906 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3908 &check_duplicate_request_client,
3910 if (cdrc.have != NULL)
3913 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3914 "Received request for block `%s' twice from client, will only request once.\n",
3915 GNUNET_h2s (&pr->query));
3918 destroy_pending_request (pr);
3922 if (pr->local_only == GNUNET_YES)
3924 destroy_pending_request (pr);
3927 /* no more results */
3928 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3929 pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
3934 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3935 "New local response to `%s' of type %u.\n",
3939 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3942 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3943 "Found ONDEMAND block, performing on-demand encoding\n");
3945 GNUNET_STATISTICS_update (stats,
3946 gettext_noop ("# on-demand blocks matched requests"),
3950 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
3951 anonymity, expiration, uid,
3952 &process_local_reply,
3956 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3960 old_rf = pr->results_found;
3961 memset (&prq, 0, sizeof (prq));
3963 prq.expiration = expiration;
3966 GNUNET_BLOCK_get_key (block_ctx,
3973 GNUNET_DATASTORE_remove (dsh,
3977 GNUNET_TIME_UNIT_FOREVER_REL,
3979 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3983 prq.priority = priority;
3984 prq.finished = GNUNET_NO;
3985 prq.request_found = GNUNET_NO;
3986 prq.anonymity_level = anonymity;
3987 if ( (old_rf == 0) &&
3988 (pr->results_found == 0) )
3989 update_datastore_delays (pr->start_time);
3990 process_reply (&prq, key, pr);
3991 if (prq.finished == GNUNET_YES)
3994 return; /* done here */
3995 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3997 pr->local_only = GNUNET_YES; /* do not forward */
3998 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
4001 if ( (pr->client_request_list == NULL) &&
4002 ( (GNUNET_YES == test_get_load_too_high (0)) ||
4003 (pr->results_found > 5 + 2 * pr->priority) ) )
4006 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4007 "Load too high, done with request\n");
4009 GNUNET_STATISTICS_update (stats,
4010 gettext_noop ("# processing result set cut short due to load"),
4013 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
4016 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
4021 * We've received a request with the specified priority. Bound it
4022 * according to how much we trust the given peer.
4024 * @param prio_in requested priority
4025 * @param cp the peer making the request
4026 * @return effective priority
4029 bound_priority (uint32_t prio_in,
4030 struct ConnectedPeer *cp)
4032 #define N ((double)128.0)
4037 ld = test_get_load_too_high (0);
4038 if (ld == GNUNET_SYSERR)
4040 GNUNET_STATISTICS_update (stats,
4041 gettext_noop ("# requests done for free (low load)"),
4044 return 0; /* excess resources */
4046 if (prio_in > INT32_MAX)
4047 prio_in = INT32_MAX;
4048 ret = - change_host_trust (cp, - (int) prio_in);
4051 if (ret > current_priorities + N)
4052 rret = current_priorities + N;
4056 = (current_priorities * (N-1) + rret)/N;
4058 if ( (ld == GNUNET_YES) && (ret > 0) )
4060 /* try with charging */
4061 ld = test_get_load_too_high (ret);
4063 if (ld == GNUNET_YES)
4065 GNUNET_STATISTICS_update (stats,
4066 gettext_noop ("# request dropped, priority insufficient"),
4070 change_host_trust (cp, (int) ret);
4071 return -1; /* not enough resources */
4075 GNUNET_STATISTICS_update (stats,
4076 gettext_noop ("# requests done for a price (normal load)"),
4086 * Iterator over entries in the 'query_request_map' that
4087 * tries to see if we have the same request pending from
4088 * the same peer already.
4090 * @param cls closure (our 'struct CheckDuplicateRequestClosure')
4091 * @param key current key code (query, ignored, must match)
4092 * @param value value in the hash map (a 'struct PendingRequest'
4093 * that already exists)
4094 * @return GNUNET_YES if we should continue to
4095 * iterate (no match yet)
4096 * GNUNET_NO if not (match found).
4099 check_duplicate_request_peer (void *cls,
4100 const GNUNET_HashCode * key,
4103 struct CheckDuplicateRequestClosure *cdc = cls;
4104 struct PendingRequest *have = value;
4106 if (cdc->pr->target_pid == have->target_pid)
4116 * Handle P2P "GET" request.
4118 * @param cls closure, always NULL
4119 * @param other the other peer involved (sender or receiver, NULL
4120 * for loopback messages where we are both sender and receiver)
4121 * @param message the actual message
4122 * @param atsi performance information
4123 * @return GNUNET_OK to keep the connection open,
4124 * GNUNET_SYSERR to close it (signal serious error)
4127 handle_p2p_get (void *cls,
4128 const struct GNUNET_PeerIdentity *other,
4129 const struct GNUNET_MessageHeader *message,
4130 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4132 struct PendingRequest *pr;
4133 struct ConnectedPeer *cp;
4134 struct ConnectedPeer *cps;
4135 struct CheckDuplicateRequestClosure cdc;
4136 struct GNUNET_TIME_Relative timeout;
4138 const struct GetMessage *gm;
4140 const GNUNET_HashCode *opt;
4143 uint32_t ttl_decrement;
4145 enum GNUNET_BLOCK_Type type;
4148 msize = ntohs(message->size);
4149 if (msize < sizeof (struct GetMessage))
4151 GNUNET_break_op (0);
4152 return GNUNET_SYSERR;
4154 gm = (const struct GetMessage*) message;
4156 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4157 "Received request for `%s'\n",
4158 GNUNET_h2s (&gm->query));
4160 type = ntohl (gm->type);
4161 bm = ntohl (gm->hash_bitmap);
4169 if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
4171 GNUNET_break_op (0);
4172 return GNUNET_SYSERR;
4174 opt = (const GNUNET_HashCode*) &gm[1];
4175 bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
4176 /* bfsize must be power of 2, check! */
4177 if (0 != ( (bfsize - 1) & bfsize))
4179 GNUNET_break_op (0);
4180 return GNUNET_SYSERR;
4182 cover_query_count++;
4183 bm = ntohl (gm->hash_bitmap);
4185 cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4186 &other->hashPubKey);
4189 /* peer must have just disconnected */
4190 GNUNET_STATISTICS_update (stats,
4191 gettext_noop ("# requests dropped due to initiator not being connected"),
4194 return GNUNET_SYSERR;
4196 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4197 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4204 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4205 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4206 "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
4207 GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
4210 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4211 "Failed to find peer `%4s' in connection set. Dropping query.\n",
4212 GNUNET_i2s (other));
4214 GNUNET_STATISTICS_update (stats,
4215 gettext_noop ("# requests dropped due to missing reverse route"),
4218 /* FIXME: try connect? */
4221 /* note that we can really only check load here since otherwise
4222 peers could find out that we are overloaded by not being
4223 disconnected after sending us a malformed query... */
4224 priority = bound_priority (ntohl (gm->priority), cps);
4228 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4229 "Dropping query from `%s', this peer is too busy.\n",
4230 GNUNET_i2s (other));
4235 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4236 "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4237 GNUNET_h2s (&gm->query),
4238 (unsigned int) type,
4242 have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4243 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
4244 (have_ns ? sizeof(GNUNET_HashCode) : 0));
4247 pr->namespace = (GNUNET_HashCode*) &pr[1];
4248 memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4250 if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4251 (GNUNET_LOAD_get_average (cp->transmission_delay) >
4252 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4254 /* don't have BW to send to peer, or would likely take longer than we have for it,
4255 so at best indirect the query */
4257 pr->forward_only = GNUNET_YES;
4260 pr->mingle = ntohl (gm->filter_mutator);
4261 if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4262 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4263 pr->anonymity_level = 1;
4264 pr->priority = (uint32_t) priority;
4265 pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4266 pr->query = gm->query;
4267 /* decrement ttl (always) */
4268 ttl_decrement = 2 * TTL_DECREMENT +
4269 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4271 if ( (pr->ttl < 0) &&
4272 (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4275 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4276 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4281 GNUNET_STATISTICS_update (stats,
4282 gettext_noop ("# requests dropped due TTL underflow"),
4285 /* integer underflow => drop (should be very rare)! */
4289 pr->ttl -= ttl_decrement;
4290 pr->start_time = GNUNET_TIME_absolute_get ();
4292 /* get bloom filter */
4295 pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4298 pr->bf_size = bfsize;
4302 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4304 &check_duplicate_request_peer,
4306 if (cdc.have != NULL)
4308 if (cdc.have->start_time.abs_value + cdc.have->ttl >=
4309 pr->start_time.abs_value + pr->ttl)
4311 /* existing request has higher TTL, drop new one! */
4312 cdc.have->priority += pr->priority;
4313 destroy_pending_request (pr);
4315 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4316 "Have existing request with higher TTL, dropping new request.\n",
4317 GNUNET_i2s (other));
4319 GNUNET_STATISTICS_update (stats,
4320 gettext_noop ("# requests dropped due to higher-TTL request"),
4327 /* existing request has lower TTL, drop old one! */
4328 pr->priority += cdc.have->priority;
4329 /* Possible optimization: if we have applicable pending
4330 replies in 'cdc.have', we might want to move those over
4331 (this is a really rare special-case, so it is not clear
4332 that this would be worth it) */
4333 destroy_pending_request (cdc.have);
4334 /* keep processing 'pr'! */
4339 GNUNET_break (GNUNET_OK ==
4340 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4343 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4344 GNUNET_break (GNUNET_OK ==
4345 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4348 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4350 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4352 pr->start_time.abs_value + pr->ttl);
4354 GNUNET_STATISTICS_update (stats,
4355 gettext_noop ("# P2P searches received"),
4358 GNUNET_STATISTICS_update (stats,
4359 gettext_noop ("# P2P searches active"),
4363 /* calculate change in traffic preference */
4364 cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4365 /* process locally */
4366 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4367 type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4368 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4369 (pr->priority + 1));
4370 if (GNUNET_YES != pr->forward_only)
4373 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4374 "Handing request for `%s' to datastore\n",
4375 GNUNET_h2s (&gm->query));
4377 pr->qe = GNUNET_DATASTORE_get (dsh,
4381 MAX_DATASTORE_QUEUE,
4383 &process_local_reply,
4387 GNUNET_STATISTICS_update (stats,
4388 gettext_noop ("# requests dropped by datastore (queue length limit)"),
4395 GNUNET_STATISTICS_update (stats,
4396 gettext_noop ("# requests forwarded due to high load"),
4401 /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */
4404 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4405 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4406 /* only one result, wait for datastore */
4407 if (GNUNET_YES != pr->forward_only)
4409 GNUNET_STATISTICS_update (stats,
4410 gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4416 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4417 pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
4421 /* make sure we don't track too many requests */
4422 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4424 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4425 GNUNET_assert (pr != NULL);
4426 destroy_pending_request (pr);
4432 /* **************************** CS GET Handling ************************ */
4436 * Handle START_SEARCH-message (search request from client).
4438 * @param cls closure
4439 * @param client identification of the client
4440 * @param message the actual message
4443 handle_start_search (void *cls,
4444 struct GNUNET_SERVER_Client *client,
4445 const struct GNUNET_MessageHeader *message)
4447 static GNUNET_HashCode all_zeros;
4448 const struct SearchMessage *sm;
4449 struct ClientList *cl;
4450 struct ClientRequestList *crl;
4451 struct PendingRequest *pr;
4454 enum GNUNET_BLOCK_Type type;
4456 msize = ntohs (message->size);
4457 if ( (msize < sizeof (struct SearchMessage)) ||
4458 (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4461 GNUNET_SERVER_receive_done (client,
4465 GNUNET_STATISTICS_update (stats,
4466 gettext_noop ("# client searches received"),
4469 sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4470 sm = (const struct SearchMessage*) message;
4471 type = ntohl (sm->type);
4473 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4474 "Received request for `%s' of type %u from local client\n",
4475 GNUNET_h2s (&sm->query),
4476 (unsigned int) type);
4479 while ( (cl != NULL) &&
4480 (cl->client != client) )
4484 cl = GNUNET_malloc (sizeof (struct ClientList));
4485 cl->client = client;
4486 GNUNET_SERVER_client_keep (client);
4487 cl->next = client_list;
4490 /* detect duplicate KBLOCK requests */
4491 if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4492 (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4493 (type == GNUNET_BLOCK_TYPE_ANY) )
4496 while ( (crl != NULL) &&
4497 ( (0 != memcmp (&crl->req->query,
4499 sizeof (GNUNET_HashCode))) ||
4500 (crl->req->type != type) ) )
4505 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4506 "Have existing request, merging content-seen lists.\n");
4509 /* Duplicate request (used to send long list of
4510 known/blocked results); merge 'pr->replies_seen'
4511 and update bloom filter */
4512 GNUNET_array_grow (pr->replies_seen,
4513 pr->replies_seen_size,
4514 pr->replies_seen_off + sc);
4515 memcpy (&pr->replies_seen[pr->replies_seen_off],
4517 sc * sizeof (GNUNET_HashCode));
4518 pr->replies_seen_off += sc;
4519 refresh_bloomfilter (pr);
4520 GNUNET_STATISTICS_update (stats,
4521 gettext_noop ("# client searches updated (merged content seen list)"),
4524 GNUNET_SERVER_receive_done (client,
4529 GNUNET_STATISTICS_update (stats,
4530 gettext_noop ("# client searches active"),
4533 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
4534 ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4535 crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4536 memset (crl, 0, sizeof (struct ClientRequestList));
4537 crl->client_list = cl;
4538 GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4543 pr->client_request_list = crl;
4544 GNUNET_array_grow (pr->replies_seen,
4545 pr->replies_seen_size,
4547 memcpy (pr->replies_seen,
4549 sc * sizeof (GNUNET_HashCode));
4550 pr->replies_seen_off = sc;
4551 pr->anonymity_level = ntohl (sm->anonymity_level);
4552 pr->start_time = GNUNET_TIME_absolute_get ();
4553 refresh_bloomfilter (pr);
4554 pr->query = sm->query;
4555 if (0 == (1 & ntohl (sm->options)))
4556 pr->local_only = GNUNET_NO;
4558 pr->local_only = GNUNET_YES;
4561 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4562 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4563 if (0 != memcmp (&sm->target,
4565 sizeof (GNUNET_HashCode)))
4566 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4568 case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4569 pr->namespace = (GNUNET_HashCode*) &pr[1];
4570 memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4575 GNUNET_break (GNUNET_OK ==
4576 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4579 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4580 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4581 type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4582 pr->qe = GNUNET_DATASTORE_get (dsh,
4586 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
4587 &process_local_reply,
4592 /* **************************** Startup ************************ */
4597 * Function called after GNUNET_CORE_connect has succeeded
4598 * (or failed for good). Note that the private key of the
4599 * peer is intentionally not exposed here; if you need it,
4600 * your process should try to read the private key file
4601 * directly (which should work if you are authorized...).
4603 * @param cls closure
4604 * @param server handle to the server, NULL if we failed
4605 * @param my_identity ID of this peer, NULL if we failed
4606 * @param publicKey public key of this peer, NULL if we failed
4609 peer_init_handler (void *cls,
4610 struct GNUNET_CORE_Handle * server,
4611 const struct GNUNET_PeerIdentity *
4614 GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
4617 my_id = *my_identity;
4624 * Process fs requests.
4626 * @param server the initialized server
4627 * @param c configuration to use
4630 main_init (struct GNUNET_SERVER_Handle *server,
4631 const struct GNUNET_CONFIGURATION_Handle *c)
4633 static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4636 GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4638 GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4639 { &handle_p2p_migration_stop,
4640 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4641 sizeof (struct MigrationStopMessage) },
4644 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4645 {&GNUNET_FS_handle_index_start, NULL,
4646 GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4647 {&GNUNET_FS_handle_index_list_get, NULL,
4648 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4649 {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
4650 sizeof (struct UnindexMessage) },
4651 {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
4655 unsigned long long enc = 128;
4658 stats = GNUNET_STATISTICS_create ("fs", cfg);
4659 min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4661 GNUNET_CONFIGURATION_get_value_number (cfg,
4663 "MAX_PENDING_REQUESTS",
4664 &max_pending_requests)) ||
4666 GNUNET_CONFIGURATION_get_value_number (cfg,
4668 "EXPECTED_NEIGHBOUR_COUNT",
4671 GNUNET_CONFIGURATION_get_value_time (cfg,
4673 "MIN_MIGRATION_DELAY",
4674 &min_migration_delay)) )
4676 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4677 _("Configuration fails to specify certain parameters, assuming default values."));
4679 connected_peers = GNUNET_CONTAINER_multihashmap_create (enc);
4680 query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4681 rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
4682 peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4683 requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4684 core = GNUNET_CORE_connect (cfg,
4688 &peer_connect_handler,
4689 &peer_disconnect_handler,
4690 &peer_status_handler,
4696 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4697 _("Failed to connect to `%s' service.\n"),
4699 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4700 connected_peers = NULL;
4701 GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4702 query_request_map = NULL;
4703 GNUNET_LOAD_value_free (rt_entry_lifetime);
4704 rt_entry_lifetime = NULL;
4705 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4706 requests_by_expiration_heap = NULL;
4707 GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4708 peer_request_map = NULL;
4711 GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4714 return GNUNET_SYSERR;
4716 if (active_from_migration)
4718 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4719 _("Content migration is enabled, will start to gather data\n"));
4720 consider_migration_gathering ();
4722 consider_dht_put_gathering (NULL);
4723 GNUNET_SERVER_disconnect_notify (server,
4724 &handle_client_disconnect,
4726 GNUNET_assert (GNUNET_OK ==
4727 GNUNET_CONFIGURATION_get_value_filename (cfg,
4731 GNUNET_DISK_directory_create (trustDirectory);
4732 GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
4733 &cron_flush_trust, NULL);
4736 GNUNET_SERVER_add_handlers (server, handlers);
4737 cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
4738 &age_cover_counters,
4740 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
4748 * Process fs requests.
4750 * @param cls closure
4751 * @param server the initialized server
4752 * @param cfg configuration to use
4756 struct GNUNET_SERVER_Handle *server,
4757 const struct GNUNET_CONFIGURATION_Handle *cfg)
4759 active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4762 active_from_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4765 dsh = GNUNET_DATASTORE_connect (cfg);
4768 GNUNET_SCHEDULER_shutdown ();
4771 datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4772 datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4773 block_cfg = GNUNET_CONFIGURATION_create ();
4774 GNUNET_CONFIGURATION_set_value_string (block_cfg,
4778 block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4779 GNUNET_assert (NULL != block_ctx);
4780 dht_handle = GNUNET_DHT_connect (cfg,
4782 if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, dsh)) ||
4783 (GNUNET_OK != main_init (server, cfg)) )
4785 GNUNET_SCHEDULER_shutdown ();
4786 GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4788 GNUNET_DHT_disconnect (dht_handle);
4790 GNUNET_BLOCK_context_destroy (block_ctx);
4792 GNUNET_CONFIGURATION_destroy (block_cfg);
4794 GNUNET_LOAD_value_free (datastore_get_load);
4795 datastore_get_load = NULL;
4796 GNUNET_LOAD_value_free (datastore_put_load);
4797 datastore_put_load = NULL;
4804 * The main function for the fs service.
4806 * @param argc number of arguments from the command line
4807 * @param argv command line arguments
4808 * @return 0 ok, 1 on error
4811 main (int argc, char *const *argv)
4813 return (GNUNET_OK ==
4814 GNUNET_SERVICE_run (argc,
4817 GNUNET_SERVICE_OPTION_NONE,
4818 &run, NULL)) ? 0 : 1;
4821 /* end of gnunet-service-fs.c */