2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs.c
23 * @brief gnunet anonymity protocol implementation
24 * @author Christian Grothoff
27 * - track per-peer request latency (using new load API)
28 * - consider more precise latency estimation (per-peer & request) -- again load API?
29 * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
30 * - introduce random latency in processing
35 #include "gnunet_constants.h"
36 #include "gnunet_core_service.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_datastore_service.h"
39 #include "gnunet_load_lib.h"
40 #include "gnunet_peer_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_signatures.h"
43 #include "gnunet_statistics_service.h"
44 #include "gnunet_util_lib.h"
45 #include "gnunet-service-fs_indexing.h"
48 #define DEBUG_FS GNUNET_NO
51 * Maximum number of outgoing messages we queue per peer.
53 #define MAX_QUEUE_PER_PEER 16
56 * Size for the hash map for DHT requests from the FS
57 * service. Should be about the number of concurrent
58 * DHT requests we plan to make.
60 #define FS_DHT_HT_SIZE 1024
63 * How often do we flush trust values to disk?
65 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
68 * How often do we at most PUT content into the DHT?
70 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
73 * Inverse of the probability that we will submit the same query
74 * to the same peer again. If the same peer already got the query
75 * repeatedly recently, the probability is multiplied by the inverse
76 * of this number each time. Note that we only try about every TTL_DECREMENT/2
77 * plus MAX_CORK_DELAY (so roughly every 3.5s).
79 #define RETRY_PROBABILITY_INV 3
82 * What is the maximum delay for a P2P FS message (in our interaction
83 * with core)? FS-internal delays are another story. The value is
84 * chosen based on the 32k block size. Given that peers typcially
85 * have at least 1 kb/s bandwidth, 45s waits give us a chance to
86 * transmit one message even to the lowest-bandwidth peers.
88 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
91 * Maximum number of requests (from other peers) that we're
92 * willing to have pending at any given point in time.
94 static unsigned long long max_pending_requests = (32 * 1024);
98 * Information we keep for each pending reply. The
99 * actual message follows at the end of this struct.
101 struct PendingMessage;
104 * Function called upon completion of a transmission.
107 * @param pid ID of receiving peer, 0 on transmission error
109 typedef void (*TransmissionContinuation)(void * cls,
110 GNUNET_PEER_Id tpid);
114 * Information we keep for each pending message (GET/PUT). The
115 * actual message follows at the end of this struct.
117 struct PendingMessage
120 * This is a doubly-linked list of messages to the same peer.
122 struct PendingMessage *next;
125 * This is a doubly-linked list of messages to the same peer.
127 struct PendingMessage *prev;
130 * Entry in pending message list for this pending message.
132 struct PendingMessageList *pml;
135 * Function to call immediately once we have transmitted this
138 TransmissionContinuation cont;
146 * Size of the reply; actual reply message follows
147 * at the end of this struct.
152 * How important is this message for us?
160 * Information about a peer that we are connected to.
161 * We track data that is useful for determining which
162 * peers should receive our requests. We also keep
163 * a list of messages to transmit to this peer.
169 * List of the last clients for which this peer successfully
172 struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
175 * List of the last PIDs for which
176 * this peer successfully answered a query;
177 * We use 0 to indicate no successful reply.
179 GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
182 * Average delay between sending the peer a request and
183 * getting a reply (only calculated over the requests for
184 * which we actually got a reply). Calculated
185 * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
187 struct GNUNET_TIME_Relative avg_delay;
190 * Point in time until which this peer does not want us to migrate content
193 struct GNUNET_TIME_Absolute migration_blocked;
196 * Time until when we blocked this peer from migrating
199 struct GNUNET_TIME_Absolute last_migration_block;
202 * Handle for an active request for transmission to this
205 struct GNUNET_CORE_TransmitHandle *cth;
208 * Messages (replies, queries, content migration) we would like to
209 * send to this peer in the near future. Sorted by priority, head.
211 struct PendingMessage *pending_messages_head;
214 * Messages (replies, queries, content migration) we would like to
215 * send to this peer in the near future. Sorted by priority, tail.
217 struct PendingMessage *pending_messages_tail;
220 * Average priority of successful replies. Calculated
221 * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
226 * Increase in traffic preference still to be submitted
227 * to the core service for this peer.
229 uint64_t inc_preference;
232 * Trust rating for this peer
237 * Trust rating for this peer on disk.
242 * The peer's identity.
247 * Size of the linked list of 'pending_messages'.
249 unsigned int pending_requests;
252 * Which offset in "last_p2p_replies" will be updated next?
253 * (we go round-robin).
255 unsigned int last_p2p_replies_woff;
258 * Which offset in "last_client_replies" will be updated next?
259 * (we go round-robin).
261 unsigned int last_client_replies_woff;
267 * Information we keep for each pending request. We should try to
268 * keep this struct as small as possible since its memory consumption
269 * is key to how many requests we can have pending at once.
271 struct PendingRequest;
275 * Doubly-linked list of requests we are performing
276 * on behalf of the same client.
278 struct ClientRequestList
282 * This is a doubly-linked list.
284 struct ClientRequestList *next;
287 * This is a doubly-linked list.
289 struct ClientRequestList *prev;
292 * Request this entry represents.
294 struct PendingRequest *req;
297 * Client list this request belongs to.
299 struct ClientList *client_list;
305 * Replies to be transmitted to the client. The actual
306 * response message is allocated after this struct.
308 struct ClientResponseMessage
311 * This is a doubly-linked list.
313 struct ClientResponseMessage *next;
316 * This is a doubly-linked list.
318 struct ClientResponseMessage *prev;
321 * Client list entry this response belongs to.
323 struct ClientList *client_list;
326 * Number of bytes in the response.
333 * Linked list of clients we are performing requests
339 * This is a linked list.
341 struct ClientList *next;
344 * ID of a client making a request, NULL if this entry is for a
347 struct GNUNET_SERVER_Client *client;
350 * Head of list of requests performed on behalf
351 * of this client right now.
353 struct ClientRequestList *rl_head;
356 * Tail of list of requests performed on behalf
357 * of this client right now.
359 struct ClientRequestList *rl_tail;
362 * Head of linked list of responses.
364 struct ClientResponseMessage *res_head;
367 * Tail of linked list of responses.
369 struct ClientResponseMessage *res_tail;
372 * Context for sending replies.
374 struct GNUNET_CONNECTION_TransmitHandle *th;
380 * Doubly-linked list of messages we are performing
381 * due to a pending request.
383 struct PendingMessageList
387 * This is a doubly-linked list of messages on behalf of the same request.
389 struct PendingMessageList *next;
392 * This is a doubly-linked list of messages on behalf of the same request.
394 struct PendingMessageList *prev;
397 * Message this entry represents.
399 struct PendingMessage *pm;
402 * Request this entry belongs to.
404 struct PendingRequest *req;
407 * Peer this message is targeted for.
409 struct ConnectedPeer *target;
415 * Information we keep for each pending request. We should try to
416 * keep this struct as small as possible since its memory consumption
417 * is key to how many requests we can have pending at once.
419 struct PendingRequest
423 * If this request was made by a client, this is our entry in the
424 * client request list; otherwise NULL.
426 struct ClientRequestList *client_request_list;
429 * Entry of peer responsible for this entry (if this request
430 * was made by a peer).
432 struct ConnectedPeer *cp;
435 * If this is a namespace query, pointer to the hash of the public
436 * key of the namespace; otherwise NULL. Pointer will be to the
437 * end of this struct (so no need to free it).
439 const GNUNET_HashCode *namespace;
442 * Bloomfilter we use to filter out replies that we don't care about
443 * (anymore). NULL as long as we are interested in all replies.
445 struct GNUNET_CONTAINER_BloomFilter *bf;
448 * Context of our GNUNET_CORE_peer_change_preference call.
450 struct GNUNET_CORE_InformationRequestContext *irc;
453 * Reference to DHT get operation for this request (or NULL).
455 struct GNUNET_DHT_GetHandle *dht_get;
458 * Hash code of all replies that we have seen so far (only valid
459 * if client is not NULL since we only track replies like this for
462 GNUNET_HashCode *replies_seen;
465 * Node in the heap representing this entry; NULL
466 * if we have no heap node.
468 struct GNUNET_CONTAINER_HeapNode *hnode;
471 * Head of list of messages being performed on behalf of this
474 struct PendingMessageList *pending_head;
477 * Tail of list of messages being performed on behalf of this
480 struct PendingMessageList *pending_tail;
483 * When did we first see this request (form this peer), or, if our
484 * client is initiating, when did we last initiate a search?
486 struct GNUNET_TIME_Absolute start_time;
489 * The query that this request is for.
491 GNUNET_HashCode query;
494 * The task responsible for transmitting queries
497 GNUNET_SCHEDULER_TaskIdentifier task;
500 * (Interned) Peer identifier that identifies a preferred target
503 GNUNET_PEER_Id target_pid;
506 * (Interned) Peer identifiers of peers that have already
507 * received our query for this content.
509 GNUNET_PEER_Id *used_pids;
512 * Our entry in the queue (non-NULL while we wait for our
513 * turn to interact with the local database).
515 struct GNUNET_DATASTORE_QueueEntry *qe;
518 * Size of the 'bf' (in bytes).
523 * Desired anonymity level; only valid for requests from a local client.
525 uint32_t anonymity_level;
528 * How many entries in "used_pids" are actually valid?
530 unsigned int used_pids_off;
533 * How long is the "used_pids" array?
535 unsigned int used_pids_size;
538 * Number of results found for this request.
540 unsigned int results_found;
543 * How many entries in "replies_seen" are actually valid?
545 unsigned int replies_seen_off;
548 * How long is the "replies_seen" array?
550 unsigned int replies_seen_size;
553 * Priority with which this request was made. If one of our clients
554 * made the request, then this is the current priority that we are
555 * using when initiating the request. This value is used when
556 * we decide to reward other peers with trust for providing a reply.
561 * Priority points left for us to spend when forwarding this request
564 uint32_t remaining_priority;
567 * Number to mingle hashes for bloom-filter tests with.
572 * TTL with which we saw this request (or, if we initiated, TTL that
573 * we used for the request).
578 * Type of the content that this request is for.
580 enum GNUNET_BLOCK_Type type;
583 * Remove this request after transmission of the current response.
588 * GNUNET_YES if we should not forward this request to other peers.
596 * Block that is ready for migration to other peers. Actual data is at the end of the block.
598 struct MigrationReadyBlock
602 * This is a doubly-linked list.
604 struct MigrationReadyBlock *next;
607 * This is a doubly-linked list.
609 struct MigrationReadyBlock *prev;
612 * Query for the block.
614 GNUNET_HashCode query;
617 * When does this block expire?
619 struct GNUNET_TIME_Absolute expiration;
622 * Peers we would consider forwarding this
623 * block to. Zero for empty entries.
625 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
633 * Number of targets already used.
635 unsigned int used_targets;
640 enum GNUNET_BLOCK_Type type;
645 * Our connection to the datastore.
647 static struct GNUNET_DATASTORE_Handle *dsh;
652 static struct GNUNET_BLOCK_Context *block_ctx;
655 * Our block configuration.
657 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
662 static struct GNUNET_SCHEDULER_Handle *sched;
667 static const struct GNUNET_CONFIGURATION_Handle *cfg;
670 * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
672 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
675 * Map of peer identifiers to "struct PendingRequest" (for that peer).
677 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
680 * Map of query identifiers to "struct PendingRequest" (for that query).
682 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
685 * Heap with the request that will expire next at the top. Contains
686 * pointers of type "struct PendingRequest*"; these will *also* be
687 * aliased from the "requests_by_peer" data structures and the
688 * "requests_by_query" table. Note that requests from our clients
689 * don't expire and are thus NOT in the "requests_by_expiration"
690 * (or the "requests_by_peer" tables).
692 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
695 * Handle for reporting statistics.
697 static struct GNUNET_STATISTICS_Handle *stats;
700 * Linked list of clients we are currently processing requests for.
702 static struct ClientList *client_list;
705 * Pointer to handle to the core service (points to NULL until we've
708 static struct GNUNET_CORE_Handle *core;
711 * Head of linked list of blocks that can be migrated.
713 static struct MigrationReadyBlock *mig_head;
716 * Tail of linked list of blocks that can be migrated.
718 static struct MigrationReadyBlock *mig_tail;
721 * Request to datastore for migration (or NULL).
723 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
726 * Request to datastore for DHT PUTs (or NULL).
728 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
731 * Type we will request for the next DHT PUT round from the datastore.
733 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
736 * Where do we store trust information?
738 static char *trustDirectory;
741 * ID of task that collects blocks for migration.
743 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
746 * ID of task that collects blocks for DHT PUTs.
748 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
751 * What is the maximum frequency at which we are allowed to
752 * poll the datastore for migration content?
754 static struct GNUNET_TIME_Relative min_migration_delay;
757 * Handle for DHT operations.
759 static struct GNUNET_DHT_Handle *dht_handle;
762 * Size of the doubly-linked list of migration blocks.
764 static unsigned int mig_size;
767 * Are we allowed to migrate content to this peer.
769 static int active_migration;
772 * How many entires with zero anonymity do we currently estimate
773 * to have in the database?
775 static unsigned int zero_anonymity_count_estimate;
778 * Typical priorities we're seeing from other peers right now. Since
779 * most priorities will be zero, this value is the weighted average of
780 * non-zero priorities seen "recently". In order to ensure that new
781 * values do not dramatically change the ratio, values are first
782 * "capped" to a reasonable range (+N of the current value) and then
783 * averaged into the existing value by a ratio of 1:N. Hence
784 * receiving the largest possible priority can still only raise our
785 * "current_priorities" by at most 1.
787 static double current_priorities;
790 * Datastore 'GET' load tracking.
792 static struct GNUNET_LOAD_Value *datastore_get_load;
795 * Datastore 'PUT' load tracking.
797 static struct GNUNET_LOAD_Value *datastore_put_load;
801 * We've just now completed a datastore request. Update our
802 * datastore load calculations.
804 * @param start time when the datastore request was issued
807 update_datastore_delays (struct GNUNET_TIME_Absolute start)
809 struct GNUNET_TIME_Relative delay;
811 delay = GNUNET_TIME_absolute_get_duration (start);
812 GNUNET_LOAD_update (datastore_get_load,
818 * Get the filename under which we would store the GNUNET_HELLO_Message
819 * for the given host and protocol.
820 * @return filename of the form DIRECTORY/HOSTID
823 get_trust_filename (const struct GNUNET_PeerIdentity *id)
825 struct GNUNET_CRYPTO_HashAsciiEncoded fil;
828 GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
829 GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
836 * Transmit messages by copying it to the target buffer
837 * "buf". "buf" will be NULL and "size" zero if the socket was closed
838 * for writing in the meantime. In that case, do nothing
839 * (the disconnect or shutdown handler will take care of the rest).
840 * If we were able to transmit messages and there are still more
841 * pending, ask core again for further calls to this function.
843 * @param cls closure, pointer to the 'struct ConnectedPeer*'
844 * @param size number of bytes available in buf
845 * @param buf where the callee should write the message
846 * @return number of bytes written to buf
849 transmit_to_peer (void *cls,
850 size_t size, void *buf);
853 /* ******************* clean up functions ************************ */
856 * Delete the given migration block.
858 * @param mb block to delete
861 delete_migration_block (struct MigrationReadyBlock *mb)
863 GNUNET_CONTAINER_DLL_remove (mig_head,
866 GNUNET_PEER_decrement_rcs (mb->target_list,
867 MIGRATION_LIST_SIZE);
874 * Compare the distance of two peers to a key.
877 * @param p1 first peer
878 * @param p2 second peer
879 * @return GNUNET_YES if P1 is closer to key than P2
882 is_closer (const GNUNET_HashCode *key,
883 const struct GNUNET_PeerIdentity *p1,
884 const struct GNUNET_PeerIdentity *p2)
886 return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
893 * Consider migrating content to a given peer.
895 * @param cls 'struct MigrationReadyBlock*' to select
896 * targets for (or NULL for none)
897 * @param key ID of the peer
898 * @param value 'struct ConnectedPeer' of the peer
899 * @return GNUNET_YES (always continue iteration)
902 consider_migration (void *cls,
903 const GNUNET_HashCode *key,
906 struct MigrationReadyBlock *mb = cls;
907 struct ConnectedPeer *cp = value;
908 struct MigrationReadyBlock *pos;
909 struct GNUNET_PeerIdentity cppid;
910 struct GNUNET_PeerIdentity otherpid;
911 struct GNUNET_PeerIdentity worstpid;
916 /* consider 'cp' as a migration target for mb */
917 if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
918 return GNUNET_YES; /* peer has requested no migration! */
921 GNUNET_PEER_resolve (cp->pid,
923 repl = MIGRATION_LIST_SIZE;
924 for (i=0;i<MIGRATION_LIST_SIZE;i++)
926 if (mb->target_list[i] == 0)
928 mb->target_list[i] = cp->pid;
929 GNUNET_PEER_change_rc (mb->target_list[i], 1);
930 repl = MIGRATION_LIST_SIZE;
933 GNUNET_PEER_resolve (mb->target_list[i],
935 if ( (repl == MIGRATION_LIST_SIZE) &&
936 is_closer (&mb->query,
943 else if ( (repl != MIGRATION_LIST_SIZE) &&
944 (is_closer (&mb->query,
952 if (repl != MIGRATION_LIST_SIZE)
954 GNUNET_PEER_change_rc (mb->target_list[repl], -1);
955 mb->target_list[repl] = cp->pid;
956 GNUNET_PEER_change_rc (mb->target_list[repl], 1);
960 /* consider scheduling transmission to cp for content migration */
967 for (i=0;i<MIGRATION_LIST_SIZE;i++)
969 if (cp->pid == pos->target_list[i])
974 msize = GNUNET_MIN (msize,
982 return GNUNET_YES; /* no content available */
984 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985 "Trying to migrate at least %u bytes to peer `%s'\n",
990 = GNUNET_CORE_notify_transmit_ready (core,
991 0, GNUNET_TIME_UNIT_FOREVER_REL,
992 (const struct GNUNET_PeerIdentity*) key,
993 msize + sizeof (struct PutMessage),
1001 * Task that is run periodically to obtain blocks for content
1005 * @param tc scheduler context (also unused)
1008 gather_migration_blocks (void *cls,
1009 const struct GNUNET_SCHEDULER_TaskContext *tc);
1015 * Task that is run periodically to obtain blocks for DHT PUTs.
1017 * @param cls type of blocks to gather
1018 * @param tc scheduler context (unused)
1021 gather_dht_put_blocks (void *cls,
1022 const struct GNUNET_SCHEDULER_TaskContext *tc);
1026 * If the migration task is not currently running, consider
1027 * (re)scheduling it with the appropriate delay.
1030 consider_migration_gathering ()
1032 struct GNUNET_TIME_Relative delay;
1038 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1040 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1042 delay = GNUNET_TIME_relative_divide (delay,
1043 MAX_MIGRATION_QUEUE);
1044 delay = GNUNET_TIME_relative_max (delay,
1045 min_migration_delay);
1046 mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1048 &gather_migration_blocks,
1054 * If the DHT PUT gathering task is not currently running, consider
1055 * (re)scheduling it with the appropriate delay.
1058 consider_dht_put_gathering (void *cls)
1060 struct GNUNET_TIME_Relative delay;
1066 if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1068 if (zero_anonymity_count_estimate > 0)
1070 delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1071 zero_anonymity_count_estimate);
1072 delay = GNUNET_TIME_relative_min (delay,
1077 /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1078 (hopefully) appear */
1079 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1081 dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1083 &gather_dht_put_blocks,
1089 * Process content offered for migration.
1091 * @param cls closure
1092 * @param key key for the content
1093 * @param size number of bytes in data
1094 * @param data content stored
1095 * @param type type of the content
1096 * @param priority priority of the content
1097 * @param anonymity anonymity-level for the content
1098 * @param expiration expiration time for the content
1099 * @param uid unique identifier for the datum;
1100 * maybe 0 if no unique identifier is available
1103 process_migration_content (void *cls,
1104 const GNUNET_HashCode * key,
1107 enum GNUNET_BLOCK_Type type,
1110 struct GNUNET_TIME_Absolute
1111 expiration, uint64_t uid)
1113 struct MigrationReadyBlock *mb;
1118 if (mig_size < MAX_MIGRATION_QUEUE)
1119 consider_migration_gathering ();
1122 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1125 GNUNET_FS_handle_on_demand_block (key, size, data,
1126 type, priority, anonymity,
1128 &process_migration_content,
1131 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1136 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137 "Retrieved block `%s' of type %u for migration\n",
1141 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1143 mb->expiration = expiration;
1146 memcpy (&mb[1], data, size);
1147 GNUNET_CONTAINER_DLL_insert_after (mig_head,
1152 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1153 &consider_migration,
1155 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1160 * Function called upon completion of the DHT PUT operation.
1163 dht_put_continuation (void *cls,
1164 const struct GNUNET_SCHEDULER_TaskContext *tc)
1166 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1171 * Store content in DHT.
1173 * @param cls closure
1174 * @param key key for the content
1175 * @param size number of bytes in data
1176 * @param data content stored
1177 * @param type type of the content
1178 * @param priority priority of the content
1179 * @param anonymity anonymity-level for the content
1180 * @param expiration expiration time for the content
1181 * @param uid unique identifier for the datum;
1182 * maybe 0 if no unique identifier is available
1185 process_dht_put_content (void *cls,
1186 const GNUNET_HashCode * key,
1189 enum GNUNET_BLOCK_Type type,
1192 struct GNUNET_TIME_Absolute
1193 expiration, uint64_t uid)
1195 static unsigned int counter;
1196 static GNUNET_HashCode last_vhash;
1197 static GNUNET_HashCode vhash;
1202 consider_dht_put_gathering (cls);
1205 /* slightly funky code to estimate the total number of values with zero
1206 anonymity from the maximum observed length of a monotonically increasing
1207 sequence of hashes over the contents */
1208 GNUNET_CRYPTO_hash (data, size, &vhash);
1209 if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1211 if (zero_anonymity_count_estimate > 0)
1212 zero_anonymity_count_estimate /= 2;
1218 if (zero_anonymity_count_estimate < (1 << counter))
1219 zero_anonymity_count_estimate = (1 << counter);
1221 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1222 "Retrieved block `%s' of type %u for DHT PUT\n",
1226 GNUNET_DHT_put (dht_handle,
1233 GNUNET_TIME_UNIT_FOREVER_REL,
1234 &dht_put_continuation,
1240 * Task that is run periodically to obtain blocks for content
1244 * @param tc scheduler context (also unused)
1247 gather_migration_blocks (void *cls,
1248 const struct GNUNET_SCHEDULER_TaskContext *tc)
1250 mig_task = GNUNET_SCHEDULER_NO_TASK;
1253 mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1254 GNUNET_TIME_UNIT_FOREVER_REL,
1255 &process_migration_content, NULL);
1256 GNUNET_assert (mig_qe != NULL);
1262 * Task that is run periodically to obtain blocks for DHT PUTs.
1264 * @param cls type of blocks to gather
1265 * @param tc scheduler context (unused)
1268 gather_dht_put_blocks (void *cls,
1269 const struct GNUNET_SCHEDULER_TaskContext *tc)
1271 dht_task = GNUNET_SCHEDULER_NO_TASK;
1274 if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1275 dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1276 dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1277 GNUNET_TIME_UNIT_FOREVER_REL,
1279 &process_dht_put_content, NULL);
1280 GNUNET_assert (dht_qe != NULL);
1286 * We're done with a particular message list entry.
1287 * Free all associated resources.
1289 * @param pml entry to destroy
1292 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1294 GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1295 pml->req->pending_tail,
1297 GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1298 pml->target->pending_messages_tail,
1300 pml->target->pending_requests--;
1301 GNUNET_free (pml->pm);
1307 * Destroy the given pending message (and call the respective
1310 * @param pm message to destroy
1311 * @param tpid id of peer that the message was delivered to, or 0 for none
1314 destroy_pending_message (struct PendingMessage *pm,
1315 GNUNET_PEER_Id tpid)
1317 struct PendingMessageList *pml = pm->pml;
1318 TransmissionContinuation cont;
1323 GNUNET_assert (pml->pm == pm);
1324 GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1326 cont_cls = pm->cont_cls;
1327 destroy_pending_message_list_entry (pml);
1334 cont (cont_cls, tpid);
1339 * We're done processing a particular request.
1340 * Free all associated resources.
1342 * @param pr request to destroy
1345 destroy_pending_request (struct PendingRequest *pr)
1347 struct GNUNET_PeerIdentity pid;
1349 if (pr->hnode != NULL)
1351 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1355 if (NULL == pr->client_request_list)
1357 GNUNET_STATISTICS_update (stats,
1358 gettext_noop ("# P2P searches active"),
1364 GNUNET_STATISTICS_update (stats,
1365 gettext_noop ("# client searches active"),
1369 /* might have already been removed from map in 'process_reply' (if
1370 there was a unique reply) or never inserted if it was a
1371 duplicate; hence ignore the return value here */
1372 (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1377 GNUNET_DATASTORE_cancel (pr->qe);
1380 if (pr->dht_get != NULL)
1382 GNUNET_DHT_get_stop (pr->dht_get);
1385 if (pr->client_request_list != NULL)
1387 GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1388 pr->client_request_list->client_list->rl_tail,
1389 pr->client_request_list);
1390 GNUNET_free (pr->client_request_list);
1391 pr->client_request_list = NULL;
1395 GNUNET_PEER_resolve (pr->cp->pid,
1397 (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1404 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1407 if (pr->irc != NULL)
1409 GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1412 if (pr->replies_seen != NULL)
1414 GNUNET_free (pr->replies_seen);
1415 pr->replies_seen = NULL;
1417 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1419 GNUNET_SCHEDULER_cancel (sched,
1421 pr->task = GNUNET_SCHEDULER_NO_TASK;
1423 while (NULL != pr->pending_head)
1424 destroy_pending_message_list_entry (pr->pending_head);
1425 GNUNET_PEER_change_rc (pr->target_pid, -1);
1426 if (pr->used_pids != NULL)
1428 GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1429 GNUNET_free (pr->used_pids);
1430 pr->used_pids_off = 0;
1431 pr->used_pids_size = 0;
1432 pr->used_pids = NULL;
1439 * Method called whenever a given peer connects.
1441 * @param cls closure, not used
1442 * @param peer peer identity this notification is about
1443 * @param latency reported latency of the connection with 'other'
1444 * @param distance reported distance (DV) to 'other'
1447 peer_connect_handler (void *cls,
1449 GNUNET_PeerIdentity * peer,
1450 struct GNUNET_TIME_Relative latency,
1453 struct ConnectedPeer *cp;
1454 struct MigrationReadyBlock *pos;
1458 cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1459 cp->pid = GNUNET_PEER_intern (peer);
1461 fn = get_trust_filename (peer);
1462 if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1463 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1464 cp->disk_trust = cp->trust = ntohl (trust);
1467 GNUNET_break (GNUNET_OK ==
1468 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1471 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1476 (void) consider_migration (pos, &peer->hashPubKey, cp);
1483 * Increase the host credit by a value.
1485 * @param host which peer to change the trust value on
1486 * @param value is the int value by which the
1487 * host credit is to be increased or decreased
1488 * @returns the actual change in trust (positive or negative)
1491 change_host_trust (struct ConnectedPeer *host, int value)
1493 unsigned int old_trust;
1497 GNUNET_assert (host != NULL);
1498 old_trust = host->trust;
1501 if (host->trust + value < host->trust)
1503 value = UINT32_MAX - host->trust;
1504 host->trust = UINT32_MAX;
1507 host->trust += value;
1511 if (host->trust < -value)
1513 value = -host->trust;
1517 host->trust += value;
1524 * Write host-trust information to a file - flush the buffer entry!
1527 flush_trust (void *cls,
1528 const GNUNET_HashCode *key,
1531 struct ConnectedPeer *host = value;
1534 struct GNUNET_PeerIdentity pid;
1536 if (host->trust == host->disk_trust)
1537 return GNUNET_OK; /* unchanged */
1538 GNUNET_PEER_resolve (host->pid,
1540 fn = get_trust_filename (&pid);
1541 if (host->trust == 0)
1543 if ((0 != UNLINK (fn)) && (errno != ENOENT))
1544 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1545 GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1549 trust = htonl (host->trust);
1550 if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust,
1552 GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1553 | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1554 host->disk_trust = host->trust;
1561 * Call this method periodically to scan data/hosts for new hosts.
1564 cron_flush_trust (void *cls,
1565 const struct GNUNET_SCHEDULER_TaskContext *tc)
1568 if (NULL == connected_peers)
1570 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1575 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1577 GNUNET_SCHEDULER_add_delayed (tc->sched,
1578 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1583 * Free (each) request made by the peer.
1585 * @param cls closure, points to peer that the request belongs to
1586 * @param key current key code
1587 * @param value value in the hash map
1588 * @return GNUNET_YES (we should continue to iterate)
1591 destroy_request (void *cls,
1592 const GNUNET_HashCode * key,
1595 const struct GNUNET_PeerIdentity * peer = cls;
1596 struct PendingRequest *pr = value;
1598 GNUNET_break (GNUNET_YES ==
1599 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1602 destroy_pending_request (pr);
1608 * Method called whenever a peer disconnects.
1610 * @param cls closure, not used
1611 * @param peer peer identity this notification is about
1614 peer_disconnect_handler (void *cls,
1616 GNUNET_PeerIdentity * peer)
1618 struct ConnectedPeer *cp;
1619 struct PendingMessage *pm;
1621 struct MigrationReadyBlock *pos;
1622 struct MigrationReadyBlock *next;
1624 GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1628 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1632 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1634 if (NULL != cp->last_client_replies[i])
1636 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1637 cp->last_client_replies[i] = NULL;
1640 GNUNET_break (GNUNET_YES ==
1641 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1644 /* remove this peer from migration considerations; schedule
1647 while (NULL != (pos = next))
1650 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1652 if (pos->target_list[i] == cp->pid)
1654 GNUNET_PEER_change_rc (pos->target_list[i], -1);
1655 pos->target_list[i] = 0;
1658 if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1660 delete_migration_block (pos);
1661 consider_migration_gathering ();
1664 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1665 &consider_migration,
1668 GNUNET_PEER_change_rc (cp->pid, -1);
1669 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1670 if (NULL != cp->cth)
1671 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1672 while (NULL != (pm = cp->pending_messages_head))
1673 destroy_pending_message (pm, 0 /* delivery failed */);
1674 GNUNET_break (0 == cp->pending_requests);
1680 * Iterator over hash map entries that removes all occurences
1681 * of the given 'client' from the 'last_client_replies' of the
1682 * given connected peer.
1684 * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1685 * @param key current key code (unused)
1686 * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1687 * @return GNUNET_YES (we should continue to iterate)
1690 remove_client_from_last_client_replies (void *cls,
1691 const GNUNET_HashCode * key,
1694 struct GNUNET_SERVER_Client *client = cls;
1695 struct ConnectedPeer *cp = value;
1698 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1700 if (cp->last_client_replies[i] == client)
1702 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1703 cp->last_client_replies[i] = NULL;
1711 * A client disconnected. Remove all of its pending queries.
1713 * @param cls closure, NULL
1714 * @param client identification of the client
1717 handle_client_disconnect (void *cls,
1718 struct GNUNET_SERVER_Client
1721 struct ClientList *pos;
1722 struct ClientList *prev;
1723 struct ClientRequestList *rcl;
1724 struct ClientResponseMessage *creply;
1730 while ( (NULL != pos) &&
1731 (pos->client != client) )
1737 return; /* no requests pending for this client */
1738 while (NULL != (rcl = pos->rl_head))
1740 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1741 "Destroying pending request `%s' on disconnect\n",
1742 GNUNET_h2s (&rcl->req->query));
1743 destroy_pending_request (rcl->req);
1746 client_list = pos->next;
1748 prev->next = pos->next;
1749 if (pos->th != NULL)
1751 GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1754 while (NULL != (creply = pos->res_head))
1756 GNUNET_CONTAINER_DLL_remove (pos->res_head,
1759 GNUNET_free (creply);
1761 GNUNET_SERVER_client_drop (pos->client);
1763 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1764 &remove_client_from_last_client_replies,
1770 * Iterator to free peer entries.
1772 * @param cls closure, unused
1773 * @param key current key code
1774 * @param value value in the hash map (peer entry)
1775 * @return GNUNET_YES (we should continue to iterate)
1778 clean_peer (void *cls,
1779 const GNUNET_HashCode * key,
1782 peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1788 * Task run during shutdown.
1794 shutdown_task (void *cls,
1795 const struct GNUNET_SCHEDULER_TaskContext *tc)
1799 GNUNET_DATASTORE_cancel (mig_qe);
1804 GNUNET_DATASTORE_cancel (dht_qe);
1807 if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1809 GNUNET_SCHEDULER_cancel (sched, mig_task);
1810 mig_task = GNUNET_SCHEDULER_NO_TASK;
1812 if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1814 GNUNET_SCHEDULER_cancel (sched, dht_task);
1815 dht_task = GNUNET_SCHEDULER_NO_TASK;
1817 while (client_list != NULL)
1818 handle_client_disconnect (NULL,
1819 client_list->client);
1820 cron_flush_trust (NULL, NULL);
1821 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1824 GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1825 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1826 requests_by_expiration_heap = 0;
1827 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1828 connected_peers = NULL;
1829 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1830 GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1831 query_request_map = NULL;
1832 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1833 GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1834 peer_request_map = NULL;
1835 GNUNET_assert (NULL != core);
1836 GNUNET_CORE_disconnect (core);
1840 GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1845 GNUNET_DATASTORE_disconnect (dsh,
1849 while (mig_head != NULL)
1850 delete_migration_block (mig_head);
1851 GNUNET_assert (0 == mig_size);
1852 GNUNET_DHT_disconnect (dht_handle);
1854 GNUNET_LOAD_value_free (datastore_get_load);
1855 datastore_get_load = NULL;
1856 GNUNET_LOAD_value_free (datastore_put_load);
1857 datastore_put_load = NULL;
1858 GNUNET_BLOCK_context_destroy (block_ctx);
1860 GNUNET_CONFIGURATION_destroy (block_cfg);
1864 GNUNET_free_non_null (trustDirectory);
1865 trustDirectory = NULL;
1869 /* ******************* Utility functions ******************** */
1873 * Transmit messages by copying it to the target buffer
1874 * "buf". "buf" will be NULL and "size" zero if the socket was closed
1875 * for writing in the meantime. In that case, do nothing
1876 * (the disconnect or shutdown handler will take care of the rest).
1877 * If we were able to transmit messages and there are still more
1878 * pending, ask core again for further calls to this function.
1880 * @param cls closure, pointer to the 'struct ConnectedPeer*'
1881 * @param size number of bytes available in buf
1882 * @param buf where the callee should write the message
1883 * @return number of bytes written to buf
1886 transmit_to_peer (void *cls,
1887 size_t size, void *buf)
1889 struct ConnectedPeer *cp = cls;
1891 struct GNUNET_PeerIdentity pid;
1892 struct PendingMessage *pm;
1893 struct MigrationReadyBlock *mb;
1894 struct MigrationReadyBlock *next;
1895 struct PutMessage migm;
1903 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1904 "Dropping message, core too busy.\n");
1909 while ( (NULL != (pm = cp->pending_messages_head) ) &&
1910 (pm->msize <= size) )
1912 memcpy (&cbuf[msize], &pm[1], pm->msize);
1915 destroy_pending_message (pm, cp->pid);
1919 GNUNET_PEER_resolve (cp->pid,
1921 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1923 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1932 while (NULL != (mb = next))
1935 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1937 if ( (cp->pid == mb->target_list[i]) &&
1938 (mb->size + sizeof (migm) <= size) )
1940 GNUNET_PEER_change_rc (mb->target_list[i], -1);
1941 mb->target_list[i] = 0;
1943 memset (&migm, 0, sizeof (migm));
1944 migm.header.size = htons (sizeof (migm) + mb->size);
1945 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1946 migm.type = htonl (mb->type);
1947 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1948 memcpy (&cbuf[msize], &migm, sizeof (migm));
1949 msize += sizeof (migm);
1950 size -= sizeof (migm);
1951 memcpy (&cbuf[msize], &mb[1], mb->size);
1955 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1956 "Pushing migration block `%s' (%u bytes) to `%s'\n",
1957 GNUNET_h2s (&mb->query),
1966 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1967 "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1968 GNUNET_h2s (&mb->query),
1974 if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
1975 (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
1977 delete_migration_block (mb);
1978 consider_migration_gathering ();
1981 consider_migration (NULL,
1986 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1987 "Transmitting %u bytes to peer %u\n",
1996 * Add a message to the set of pending messages for the given peer.
1998 * @param cp peer to send message to
1999 * @param pm message to queue
2000 * @param pr request on which behalf this message is being queued
2003 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2004 struct PendingMessage *pm,
2005 struct PendingRequest *pr)
2007 struct PendingMessage *pos;
2008 struct PendingMessageList *pml;
2009 struct GNUNET_PeerIdentity pid;
2011 GNUNET_assert (pm->next == NULL);
2012 GNUNET_assert (pm->pml == NULL);
2015 pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2020 GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2024 pos = cp->pending_messages_head;
2025 while ( (pos != NULL) &&
2026 (pm->priority < pos->priority) )
2028 GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2029 cp->pending_messages_tail,
2032 cp->pending_requests++;
2033 if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2034 destroy_pending_message (cp->pending_messages_tail, 0);
2035 GNUNET_PEER_resolve (cp->pid, &pid);
2036 if (NULL != cp->cth)
2037 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2038 /* need to schedule transmission */
2039 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2040 cp->pending_messages_head->priority,
2043 cp->pending_messages_head->msize,
2046 if (cp->cth == NULL)
2049 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2050 "Failed to schedule transmission with core!\n");
2052 GNUNET_STATISTICS_update (stats,
2053 gettext_noop ("# CORE transmission failures"),
2061 * Test if the load on this peer is too high
2062 * to even consider processing the query at
2065 * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
2068 test_load_too_high ()
2070 return GNUNET_SYSERR; // FIXME
2074 /* ******************* Pending Request Refresh Task ******************** */
2079 * We use a random delay to make the timing of requests less
2080 * predictable. This function returns such a random delay. We add a base
2081 * delay of MAX_CORK_DELAY (1s).
2083 * FIXME: make schedule dependent on the specifics of the request?
2084 * Or bandwidth and number of connected peers and load?
2086 * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2088 static struct GNUNET_TIME_Relative
2089 get_processing_delay ()
2092 GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2093 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2094 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2100 * We're processing a GET request from another peer and have decided
2101 * to forward it to other peers. This function is called periodically
2102 * and should forward the request to other peers until we have all
2103 * possible replies. If we have transmitted the *only* reply to
2104 * the initiator we should destroy the pending request. If we have
2105 * many replies in the queue to the initiator, we should delay sending
2106 * out more queries until the reply queue has shrunk some.
2108 * @param cls our "struct ProcessGetContext *"
2112 forward_request_task (void *cls,
2113 const struct GNUNET_SCHEDULER_TaskContext *tc);
2117 * Function called after we either failed or succeeded
2118 * at transmitting a query to a peer.
2120 * @param cls the requests "struct PendingRequest*"
2121 * @param tpid ID of receiving peer, 0 on transmission error
2124 transmit_query_continuation (void *cls,
2125 GNUNET_PEER_Id tpid)
2127 struct PendingRequest *pr = cls;
2129 GNUNET_STATISTICS_update (stats,
2130 gettext_noop ("# queries scheduled for forwarding"),
2136 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2137 "Transmission of request failed, will try again later.\n");
2139 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2140 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2141 get_processing_delay (),
2142 &forward_request_task,
2146 GNUNET_STATISTICS_update (stats,
2147 gettext_noop ("# queries forwarded"),
2150 GNUNET_PEER_change_rc (tpid, 1);
2151 if (pr->used_pids_off == pr->used_pids_size)
2152 GNUNET_array_grow (pr->used_pids,
2154 pr->used_pids_size * 2 + 2);
2155 pr->used_pids[pr->used_pids_off++] = tpid;
2156 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2157 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2158 get_processing_delay (),
2159 &forward_request_task,
2165 * How many bytes should a bloomfilter be if we have already seen
2166 * entry_count responses? Note that BLOOMFILTER_K gives us the number
2167 * of bits set per entry. Furthermore, we should not re-size the
2168 * filter too often (to keep it cheap).
2170 * Since other peers will also add entries but not resize the filter,
2171 * we should generally pick a slightly larger size than what the
2172 * strict math would suggest.
2174 * @return must be a power of two and smaller or equal to 2^15.
2177 compute_bloomfilter_size (unsigned int entry_count)
2180 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2181 uint16_t max = 1 << 15;
2183 if (entry_count > max)
2186 while ((size < max) && (size < ideal))
2195 * Recalculate our bloom filter for filtering replies. This function
2196 * will create a new bloom filter from scratch, so it should only be
2197 * called if we have no bloomfilter at all (and hence can create a
2198 * fresh one of minimal size without problems) OR if our peer is the
2199 * initiator (in which case we may resize to larger than mimimum size).
2201 * @param pr request for which the BF is to be recomputed
2204 refresh_bloomfilter (struct PendingRequest *pr)
2208 GNUNET_HashCode mhash;
2210 nsize = compute_bloomfilter_size (pr->replies_seen_off);
2211 if (nsize == pr->bf_size)
2212 return; /* size not changed */
2214 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2215 pr->bf_size = nsize;
2216 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2217 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2220 for (i=0;i<pr->replies_seen_off;i++)
2222 GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2225 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2231 * Function called after we've tried to reserve a certain amount of
2232 * bandwidth for a reply. Check if we succeeded and if so send our
2235 * @param cls the requests "struct PendingRequest*"
2236 * @param peer identifies the peer
2237 * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2238 * @param bpm_out set to the current bandwidth limit (sending) for this peer
2239 * @param amount set to the amount that was actually reserved or unreserved
2240 * @param preference current traffic preference for the given peer
2243 target_reservation_cb (void *cls,
2245 GNUNET_PeerIdentity * peer,
2246 struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2247 struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2249 uint64_t preference)
2251 struct PendingRequest *pr = cls;
2252 struct ConnectedPeer *cp;
2253 struct PendingMessage *pm;
2254 struct GetMessage *gm;
2255 GNUNET_HashCode *ext;
2265 /* error in communication with core, try again later */
2266 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2267 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2268 get_processing_delay (),
2269 &forward_request_task,
2273 // (3) transmit, update ttl/priority
2274 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2278 /* Peer must have just left */
2280 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2281 "Selected peer disconnected!\n");
2283 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2284 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2285 get_processing_delay (),
2286 &forward_request_task,
2290 no_route = GNUNET_NO;
2296 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2297 "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2301 GNUNET_STATISTICS_update (stats,
2302 gettext_noop ("# reply bandwidth reservation requests failed"),
2305 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2306 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2307 get_processing_delay (),
2308 &forward_request_task,
2310 return; /* this target round failed */
2312 /* FIXME: if we are "quite" busy, we may still want to skip
2313 this round; need more load detection code! */
2314 no_route = GNUNET_YES;
2317 GNUNET_STATISTICS_update (stats,
2318 gettext_noop ("# queries scheduled for forwarding"),
2321 /* build message and insert message into priority queue */
2323 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2324 "Forwarding request `%s' to `%4s'!\n",
2325 GNUNET_h2s (&pr->query),
2330 if (GNUNET_YES == no_route)
2332 bm |= GET_MESSAGE_BIT_RETURN_TO;
2335 if (pr->namespace != NULL)
2337 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2340 if (pr->target_pid != 0)
2342 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2345 msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2346 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2347 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2349 gm = (struct GetMessage*) &pm[1];
2350 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2351 gm->header.size = htons (msize);
2352 gm->type = htonl (pr->type);
2353 pr->remaining_priority /= 2;
2354 gm->priority = htonl (pr->remaining_priority);
2355 gm->ttl = htonl (pr->ttl);
2356 gm->filter_mutator = htonl(pr->mingle);
2357 gm->hash_bitmap = htonl (bm);
2358 gm->query = pr->query;
2359 ext = (GNUNET_HashCode*) &gm[1];
2361 if (GNUNET_YES == no_route)
2362 GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2363 if (pr->namespace != NULL)
2364 memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2365 if (pr->target_pid != 0)
2366 GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2367 bfdata = (char *) &ext[k];
2369 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2372 pm->cont = &transmit_query_continuation;
2374 add_to_pending_messages_for_peer (cp, pm, pr);
2379 * Closure used for "target_peer_select_cb".
2381 struct PeerSelectionContext
2384 * The request for which we are selecting
2387 struct PendingRequest *pr;
2390 * Current "prime" target.
2392 struct GNUNET_PeerIdentity target;
2395 * How much do we like this target?
2397 double target_score;
2403 * Function called for each connected peer to determine
2404 * which one(s) would make good targets for forwarding.
2406 * @param cls closure (struct PeerSelectionContext)
2407 * @param key current key code (peer identity)
2408 * @param value value in the hash map (struct ConnectedPeer)
2409 * @return GNUNET_YES if we should continue to
2414 target_peer_select_cb (void *cls,
2415 const GNUNET_HashCode * key,
2418 struct PeerSelectionContext *psc = cls;
2419 struct ConnectedPeer *cp = value;
2420 struct PendingRequest *pr = psc->pr;
2425 /* 1) check that this peer is not the initiator */
2429 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2430 "Skipping initiator in forwarding selection\n");
2432 return GNUNET_YES; /* skip */
2435 /* 2) check if we have already (recently) forwarded to this peer */
2437 for (i=0;i<pr->used_pids_off;i++)
2438 if (pr->used_pids[i] == cp->pid)
2441 if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2442 RETRY_PROBABILITY_INV))
2445 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2446 "NOT re-trying query that was previously transmitted %u times\n",
2447 (unsigned int) pr->used_pids_off);
2449 return GNUNET_YES; /* skip */
2454 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2455 "Re-trying query that was previously transmitted %u times to this peer\n",
2458 /* 3) calculate how much we'd like to forward to this peer,
2459 starting with a random value that is strong enough
2460 to at least give any peer a chance sometimes
2461 (compared to the other factors that come later) */
2462 /* 3a) count successful (recent) routes from cp for same source */
2465 score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2466 P2P_SUCCESS_LIST_SIZE);
2467 for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2468 if (cp->last_p2p_replies[i] == pr->cp->pid)
2469 score += 1; /* likely successful based on hot path */
2473 score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2474 CS2P_SUCCESS_LIST_SIZE);
2475 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2476 if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2477 score += 1; /* likely successful based on hot path */
2479 /* 3b) include latency */
2480 if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2481 score += 1; /* likely fast based on latency */
2482 /* 3c) include priorities */
2483 if (cp->avg_priority <= pr->remaining_priority / 2.0)
2484 score += 1; /* likely successful based on priorities */
2485 /* 3d) penalize for queue size */
2486 score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER);
2487 /* 3e) include peer proximity */
2488 score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2489 &pr->query)) / (double) UINT32_MAX);
2490 /* 4) super-bonus for being the known target */
2491 if (pr->target_pid == cp->pid)
2493 /* store best-fit in closure */
2495 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2496 "Peer `%s' gets score %f for forwarding query, max is %f\n",
2501 score++; /* avoid zero */
2502 if (score > psc->target_score)
2504 psc->target_score = score;
2505 psc->target.hashPubKey = *key;
2512 * The priority level imposes a bound on the maximum
2513 * value for the ttl that can be requested.
2515 * @param ttl_in requested ttl
2516 * @param prio given priority
2517 * @return ttl_in if ttl_in is below the limit,
2518 * otherwise the ttl-limit for the given priority
2521 bound_ttl (int32_t ttl_in, uint32_t prio)
2523 unsigned long long allowed;
2527 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
2528 if (ttl_in > allowed)
2530 if (allowed >= (1 << 30))
2539 * Iterator called on each result obtained for a DHT
2540 * operation that expects a reply
2542 * @param cls closure
2543 * @param exp when will this value expire
2544 * @param key key of the result
2545 * @param get_path NULL-terminated array of pointers
2546 * to the peers on reverse GET path (or NULL if not recorded)
2547 * @param put_path NULL-terminated array of pointers
2548 * to the peers on the PUT path (or NULL if not recorded)
2549 * @param type type of the result
2550 * @param size number of bytes in data
2551 * @param data pointer to the result data
2554 process_dht_reply (void *cls,
2555 struct GNUNET_TIME_Absolute exp,
2556 const GNUNET_HashCode * key,
2557 const struct GNUNET_PeerIdentity * const *get_path,
2558 const struct GNUNET_PeerIdentity * const *put_path,
2559 enum GNUNET_BLOCK_Type type,
2565 * We're processing a GET request and have decided
2566 * to forward it to other peers. This function is called periodically
2567 * and should forward the request to other peers until we have all
2568 * possible replies. If we have transmitted the *only* reply to
2569 * the initiator we should destroy the pending request. If we have
2570 * many replies in the queue to the initiator, we should delay sending
2571 * out more queries until the reply queue has shrunk some.
2573 * @param cls our "struct ProcessGetContext *"
2577 forward_request_task (void *cls,
2578 const struct GNUNET_SCHEDULER_TaskContext *tc)
2580 struct PendingRequest *pr = cls;
2581 struct PeerSelectionContext psc;
2582 struct ConnectedPeer *cp;
2583 struct GNUNET_TIME_Relative delay;
2585 pr->task = GNUNET_SCHEDULER_NO_TASK;
2586 if (pr->irc != NULL)
2589 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2590 "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2591 GNUNET_h2s (&pr->query));
2593 return; /* already pending */
2595 if (GNUNET_YES == pr->local_only)
2596 return; /* configured to not do P2P search */
2598 if ( (0 == pr->anonymity_level) &&
2599 (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2600 (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2602 pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2603 GNUNET_TIME_UNIT_FOREVER_REL,
2610 (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2614 /* (1) select target */
2616 psc.target_score = -DBL_MAX;
2617 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2618 &target_peer_select_cb,
2620 if (psc.target_score == -DBL_MAX)
2622 delay = get_processing_delay ();
2624 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2625 "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2626 GNUNET_h2s (&pr->query),
2629 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2631 &forward_request_task,
2633 return; /* nobody selected */
2635 /* (3) update TTL/priority */
2636 if (pr->client_request_list != NULL)
2638 /* FIXME: use better algorithm!? */
2639 if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2642 /* bound priority we use by priorities we see from other peers
2643 rounded up (must round up so that we can see non-zero
2644 priorities, but round up as little as possible to make it
2645 plausible that we forwarded another peers request) */
2646 if (pr->priority > current_priorities + 1.0)
2647 pr->priority = (uint32_t) current_priorities + 1.0;
2648 pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2651 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2652 "Trying query `%s' with priority %u and TTL %d.\n",
2653 GNUNET_h2s (&pr->query),
2659 /* (3) reserve reply bandwidth */
2660 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2661 &psc.target.hashPubKey);
2662 GNUNET_assert (NULL != cp);
2663 pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2665 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2666 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2669 &target_reservation_cb,
2671 cp->inc_preference = 0;
2675 /* **************************** P2P PUT Handling ************************ */
2679 * Function called after we either failed or succeeded
2680 * at transmitting a reply to a peer.
2682 * @param cls the requests "struct PendingRequest*"
2683 * @param tpid ID of receiving peer, 0 on transmission error
2686 transmit_reply_continuation (void *cls,
2687 GNUNET_PEER_Id tpid)
2689 struct PendingRequest *pr = cls;
2693 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2694 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2695 /* only one reply expected, done with the request! */
2696 destroy_pending_request (pr);
2698 case GNUNET_BLOCK_TYPE_ANY:
2699 case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2700 case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2710 * Transmit the given message by copying it to the target buffer
2711 * "buf". "buf" will be NULL and "size" zero if the socket was closed
2712 * for writing in the meantime. In that case, do nothing
2713 * (the disconnect or shutdown handler will take care of the rest).
2714 * If we were able to transmit messages and there are still more
2715 * pending, ask core again for further calls to this function.
2717 * @param cls closure, pointer to the 'struct ClientList*'
2718 * @param size number of bytes available in buf
2719 * @param buf where the callee should write the message
2720 * @return number of bytes written to buf
2723 transmit_to_client (void *cls,
2724 size_t size, void *buf)
2726 struct ClientList *cl = cls;
2728 struct ClientResponseMessage *creply;
2735 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2736 "Not sending reply, client communication problem.\n");
2741 while ( (NULL != (creply = cl->res_head) ) &&
2742 (creply->msize <= size) )
2744 memcpy (&cbuf[msize], &creply[1], creply->msize);
2745 msize += creply->msize;
2746 size -= creply->msize;
2747 GNUNET_CONTAINER_DLL_remove (cl->res_head,
2750 GNUNET_free (creply);
2753 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2755 GNUNET_TIME_UNIT_FOREVER_REL,
2756 &transmit_to_client,
2759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2760 "Transmitted %u bytes to client\n",
2761 (unsigned int) msize);
2768 * Closure for "process_reply" function.
2770 struct ProcessReplyClosure
2773 * The data for the reply.
2778 * Who gave us this reply? NULL for local host (or DHT)
2780 struct ConnectedPeer *sender;
2783 * When the reply expires.
2785 struct GNUNET_TIME_Absolute expiration;
2793 * Type of the block.
2795 enum GNUNET_BLOCK_Type type;
2798 * How much was this reply worth to us?
2803 * Evaluation result (returned).
2805 enum GNUNET_BLOCK_EvaluationResult eval;
2808 * Did we finish processing the associated request?
2813 * Did we find a matching request?
2820 * We have received a reply; handle it!
2822 * @param cls response (struct ProcessReplyClosure)
2823 * @param key our query
2824 * @param value value in the hash map (info about the query)
2825 * @return GNUNET_YES (we should continue to iterate)
2828 process_reply (void *cls,
2829 const GNUNET_HashCode * key,
2832 struct ProcessReplyClosure *prq = cls;
2833 struct PendingRequest *pr = value;
2834 struct PendingMessage *reply;
2835 struct ClientResponseMessage *creply;
2836 struct ClientList *cl;
2837 struct PutMessage *pm;
2838 struct ConnectedPeer *cp;
2839 struct GNUNET_TIME_Relative cur_delay;
2843 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2844 "Matched result (type %u) for query `%s' with pending request\n",
2845 (unsigned int) prq->type,
2848 GNUNET_STATISTICS_update (stats,
2849 gettext_noop ("# replies received and matched"),
2852 if (prq->sender != NULL)
2854 /* FIXME: should we be more precise here and not use
2855 "start_time" but a peer-specific time stamp? */
2856 cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2857 prq->sender->avg_delay.value
2858 = (prq->sender->avg_delay.value *
2859 (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N;
2860 prq->sender->avg_priority
2861 = (prq->sender->avg_priority *
2862 (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2865 GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2866 [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
2868 GNUNET_PEER_change_rc (pr->cp->pid, 1);
2869 prq->sender->last_p2p_replies
2870 [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2875 if (NULL != prq->sender->last_client_replies
2876 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2877 GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2878 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2879 prq->sender->last_client_replies
2880 [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2881 = pr->client_request_list->client_list->client;
2882 GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2885 prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
2890 pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2895 case GNUNET_BLOCK_EVALUATION_OK_MORE:
2897 case GNUNET_BLOCK_EVALUATION_OK_LAST:
2898 while (NULL != pr->pending_head)
2899 destroy_pending_message_list_entry (pr->pending_head);
2902 if (pr->client_request_list != NULL)
2903 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
2905 GNUNET_DATASTORE_cancel (pr->qe);
2908 pr->do_remove = GNUNET_YES;
2909 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
2911 GNUNET_SCHEDULER_cancel (sched,
2913 pr->task = GNUNET_SCHEDULER_NO_TASK;
2915 GNUNET_break (GNUNET_YES ==
2916 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
2920 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
2921 GNUNET_STATISTICS_update (stats,
2922 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
2926 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2927 "Duplicate response `%s', discarding.\n",
2928 GNUNET_h2s (&mhash));
2930 return GNUNET_YES; /* duplicate */
2931 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
2932 return GNUNET_YES; /* wrong namespace */
2933 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
2936 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
2939 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
2940 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2941 _("Unsupported block type %u\n"),
2945 if (pr->client_request_list != NULL)
2947 if (pr->replies_seen_size == pr->replies_seen_off)
2948 GNUNET_array_grow (pr->replies_seen,
2949 pr->replies_seen_size,
2950 pr->replies_seen_size * 2 + 4);
2951 GNUNET_CRYPTO_hash (prq->data,
2953 &pr->replies_seen[pr->replies_seen_off++]);
2954 refresh_bloomfilter (pr);
2956 if (NULL == prq->sender)
2959 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2960 "Found result for query `%s' in local datastore\n",
2963 GNUNET_STATISTICS_update (stats,
2964 gettext_noop ("# results found locally"),
2968 prq->priority += pr->remaining_priority;
2969 pr->remaining_priority = 0;
2970 pr->results_found++;
2971 prq->request_found = GNUNET_YES;
2972 if (NULL != pr->client_request_list)
2974 GNUNET_STATISTICS_update (stats,
2975 gettext_noop ("# replies received for local clients"),
2978 cl = pr->client_request_list->client_list;
2979 msize = sizeof (struct PutMessage) + prq->size;
2980 creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
2981 creply->msize = msize;
2982 creply->client_list = cl;
2983 GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
2987 pm = (struct PutMessage*) &creply[1];
2988 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2989 pm->header.size = htons (msize);
2990 pm->type = htonl (prq->type);
2991 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2992 memcpy (&pm[1], prq->data, prq->size);
2996 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2997 "Transmitting result for query `%s' to client\n",
3000 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3002 GNUNET_TIME_UNIT_FOREVER_REL,
3003 &transmit_to_client,
3006 GNUNET_break (cl->th != NULL);
3009 prq->finished = GNUNET_YES;
3010 destroy_pending_request (pr);
3017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3018 "Transmitting result for query `%s' to other peer (PID=%u)\n",
3020 (unsigned int) cp->pid);
3022 GNUNET_STATISTICS_update (stats,
3023 gettext_noop ("# replies received for other peers"),
3026 msize = sizeof (struct PutMessage) + prq->size;
3027 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3028 reply->cont = &transmit_reply_continuation;
3029 reply->cont_cls = pr;
3030 reply->msize = msize;
3031 reply->priority = UINT32_MAX; /* send replies first! */
3032 pm = (struct PutMessage*) &reply[1];
3033 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3034 pm->header.size = htons (msize);
3035 pm->type = htonl (prq->type);
3036 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3037 memcpy (&pm[1], prq->data, prq->size);
3038 add_to_pending_messages_for_peer (cp, reply, pr);
3045 * Iterator called on each result obtained for a DHT
3046 * operation that expects a reply
3048 * @param cls closure
3049 * @param exp when will this value expire
3050 * @param key key of the result
3051 * @param get_path NULL-terminated array of pointers
3052 * to the peers on reverse GET path (or NULL if not recorded)
3053 * @param put_path NULL-terminated array of pointers
3054 * to the peers on the PUT path (or NULL if not recorded)
3055 * @param type type of the result
3056 * @param size number of bytes in data
3057 * @param data pointer to the result data
3060 process_dht_reply (void *cls,
3061 struct GNUNET_TIME_Absolute exp,
3062 const GNUNET_HashCode * key,
3063 const struct GNUNET_PeerIdentity * const *get_path,
3064 const struct GNUNET_PeerIdentity * const *put_path,
3065 enum GNUNET_BLOCK_Type type,
3069 struct PendingRequest *pr = cls;
3070 struct ProcessReplyClosure prq;
3072 memset (&prq, 0, sizeof (prq));
3074 prq.expiration = exp;
3077 process_reply (&prq, key, pr);
3083 * Continuation called to notify client about result of the
3086 * @param cls closure
3087 * @param success GNUNET_SYSERR on failure
3088 * @param msg NULL on success, otherwise an error message
3091 put_migration_continuation (void *cls,
3095 struct GNUNET_TIME_Absolute *start = cls;
3096 struct GNUNET_TIME_Relative delay;
3098 delay = GNUNET_TIME_absolute_get_duration (*start);
3099 GNUNET_free (start);
3100 GNUNET_LOAD_update (datastore_put_load,
3102 if (GNUNET_OK == success)
3104 GNUNET_STATISTICS_update (stats,
3105 gettext_noop ("# datastore 'put' failures"),
3112 * Handle P2P "PUT" message.
3114 * @param cls closure, always NULL
3115 * @param other the other peer involved (sender or receiver, NULL
3116 * for loopback messages where we are both sender and receiver)
3117 * @param message the actual message
3118 * @param latency reported latency of the connection with 'other'
3119 * @param distance reported distance (DV) to 'other'
3120 * @return GNUNET_OK to keep the connection open,
3121 * GNUNET_SYSERR to close it (signal serious error)
3124 handle_p2p_put (void *cls,
3125 const struct GNUNET_PeerIdentity *other,
3126 const struct GNUNET_MessageHeader *message,
3127 struct GNUNET_TIME_Relative latency,
3130 const struct PutMessage *put;
3133 enum GNUNET_BLOCK_Type type;
3134 struct GNUNET_TIME_Absolute expiration;
3135 GNUNET_HashCode query;
3136 struct ProcessReplyClosure prq;
3137 struct GNUNET_TIME_Absolute *start;
3138 struct GNUNET_TIME_Relative block_time;
3140 struct ConnectedPeer *cp;
3141 struct PendingMessage *pm;
3142 struct MigrationStopMessage *msm;
3144 msize = ntohs (message->size);
3145 if (msize < sizeof (struct PutMessage))
3148 return GNUNET_SYSERR;
3150 put = (const struct PutMessage*) message;
3151 dsize = msize - sizeof (struct PutMessage);
3152 type = ntohl (put->type);
3153 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3155 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3156 return GNUNET_SYSERR;
3158 GNUNET_BLOCK_get_key (block_ctx,
3164 GNUNET_break_op (0);
3165 return GNUNET_SYSERR;
3168 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3169 "Received result for query `%s' from peer `%4s'\n",
3170 GNUNET_h2s (&query),
3171 GNUNET_i2s (other));
3173 GNUNET_STATISTICS_update (stats,
3174 gettext_noop ("# replies received (overall)"),
3177 /* now, lookup 'query' */
3178 prq.data = (const void*) &put[1];
3180 prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3181 &other->hashPubKey);
3186 prq.expiration = expiration;
3188 prq.finished = GNUNET_NO;
3189 prq.request_found = GNUNET_NO;
3190 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3194 if (prq.sender != NULL)
3196 prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3197 prq.sender->trust += prq.priority;
3199 if (GNUNET_YES == active_migration)
3202 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3203 "Replicating result for query `%s' with priority %u\n",
3204 GNUNET_h2s (&query),
3207 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3208 *start = GNUNET_TIME_absolute_get ();
3209 GNUNET_DATASTORE_put (dsh,
3210 0, &query, dsize, &put[1],
3211 type, prq.priority, 1 /* anonymity */,
3213 1 + prq.priority, MAX_DATASTORE_QUEUE,
3214 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3215 &put_migration_continuation,
3218 putl = GNUNET_LOAD_get_load (datastore_put_load);
3219 if ( (GNUNET_NO == prq.request_found) &&
3220 ( (GNUNET_YES != active_migration) ||
3223 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3224 &other->hashPubKey);
3225 if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3226 return GNUNET_OK; /* already blocked */
3227 /* We're too busy; send MigrationStop message! */
3228 if (GNUNET_YES != active_migration)
3229 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3230 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3231 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3232 (unsigned int) (60000 * putl * putl)));
3234 cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3235 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
3236 sizeof (struct MigrationStopMessage));
3237 pm->msize = sizeof (struct MigrationStopMessage);
3238 pm->priority = UINT32_MAX;
3239 msm = (struct MigrationStopMessage*) &pm[1];
3240 msm->header.size = htons (sizeof (struct MigrationStopMessage));
3241 msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3242 msm->duration = GNUNET_TIME_relative_hton (block_time);
3243 add_to_pending_messages_for_peer (cp,
3252 * Handle P2P "MIGRATION_STOP" message.
3254 * @param cls closure, always NULL
3255 * @param other the other peer involved (sender or receiver, NULL
3256 * for loopback messages where we are both sender and receiver)
3257 * @param message the actual message
3258 * @param latency reported latency of the connection with 'other'
3259 * @param distance reported distance (DV) to 'other'
3260 * @return GNUNET_OK to keep the connection open,
3261 * GNUNET_SYSERR to close it (signal serious error)
3264 handle_p2p_migration_stop (void *cls,
3265 const struct GNUNET_PeerIdentity *other,
3266 const struct GNUNET_MessageHeader *message,
3267 struct GNUNET_TIME_Relative latency,
3270 struct ConnectedPeer *cp;
3271 const struct MigrationStopMessage *msm;
3273 msm = (const struct MigrationStopMessage*) message;
3274 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3275 &other->hashPubKey);
3281 cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3287 /* **************************** P2P GET Handling ************************ */
3291 * Closure for 'check_duplicate_request_{peer,client}'.
3293 struct CheckDuplicateRequestClosure
3296 * The new request we should check if it already exists.
3298 const struct PendingRequest *pr;
3301 * Existing request found by the checker, NULL if none.
3303 struct PendingRequest *have;
3308 * Iterator over entries in the 'query_request_map' that
3309 * tries to see if we have the same request pending from
3310 * the same client already.
3312 * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3313 * @param key current key code (query, ignored, must match)
3314 * @param value value in the hash map (a 'struct PendingRequest'
3315 * that already exists)
3316 * @return GNUNET_YES if we should continue to
3317 * iterate (no match yet)
3318 * GNUNET_NO if not (match found).
3321 check_duplicate_request_client (void *cls,
3322 const GNUNET_HashCode * key,
3325 struct CheckDuplicateRequestClosure *cdc = cls;
3326 struct PendingRequest *have = value;
3328 if (have->client_request_list == NULL)
3330 if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3341 * We're processing (local) results for a search request
3342 * from another peer. Pass applicable results to the
3343 * peer and if we are done either clean up (operation
3344 * complete) or forward to other peers (more results possible).
3346 * @param cls our closure (struct LocalGetContext)
3347 * @param key key for the content
3348 * @param size number of bytes in data
3349 * @param data content stored
3350 * @param type type of the content
3351 * @param priority priority of the content
3352 * @param anonymity anonymity-level for the content
3353 * @param expiration expiration time for the content
3354 * @param uid unique identifier for the datum;
3355 * maybe 0 if no unique identifier is available
3358 process_local_reply (void *cls,
3359 const GNUNET_HashCode * key,
3362 enum GNUNET_BLOCK_Type type,
3365 struct GNUNET_TIME_Absolute
3369 struct PendingRequest *pr = cls;
3370 struct ProcessReplyClosure prq;
3371 struct CheckDuplicateRequestClosure cdrc;
3372 GNUNET_HashCode query;
3373 unsigned int old_rf;
3378 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3379 "Done processing local replies, forwarding request to other peers.\n");
3382 if (pr->client_request_list != NULL)
3384 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
3386 /* Figure out if this is a duplicate request and possibly
3387 merge 'struct PendingRequest' entries */
3390 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3392 &check_duplicate_request_client,
3394 if (cdrc.have != NULL)
3397 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3398 "Received request for block `%s' twice from client, will only request once.\n",
3399 GNUNET_h2s (&pr->query));
3402 destroy_pending_request (pr);
3407 /* no more results */
3408 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3409 pr->task = GNUNET_SCHEDULER_add_now (sched,
3410 &forward_request_task,
3415 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3416 "New local response to `%s' of type %u.\n",
3420 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3423 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3424 "Found ONDEMAND block, performing on-demand encoding\n");
3426 GNUNET_STATISTICS_update (stats,
3427 gettext_noop ("# on-demand blocks matched requests"),
3431 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
3432 anonymity, expiration, uid,
3433 &process_local_reply,
3437 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3441 old_rf = pr->results_found;
3442 memset (&prq, 0, sizeof (prq));
3444 prq.expiration = expiration;
3447 GNUNET_BLOCK_get_key (block_ctx,
3454 GNUNET_DATASTORE_remove (dsh,
3458 GNUNET_TIME_UNIT_FOREVER_REL,
3460 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3464 prq.priority = priority;
3465 prq.finished = GNUNET_NO;
3466 prq.request_found = GNUNET_NO;
3467 process_reply (&prq, key, pr);
3468 if ( (old_rf == 0) &&
3469 (pr->results_found == 1) )
3470 update_datastore_delays (pr->start_time);
3471 if (prq.finished == GNUNET_YES)
3474 return; /* done here */
3475 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3477 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3480 if ( (pr->client_request_list == NULL) &&
3481 ( (GNUNET_YES == test_load_too_high()) ||
3482 (pr->results_found > 5 + 2 * pr->priority) ) )
3485 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3486 "Load too high, done with request\n");
3488 GNUNET_STATISTICS_update (stats,
3489 gettext_noop ("# processing result set cut short due to load"),
3492 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3495 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3500 * We've received a request with the specified priority. Bound it
3501 * according to how much we trust the given peer.
3503 * @param prio_in requested priority
3504 * @param cp the peer making the request
3505 * @return effective priority
3508 bound_priority (uint32_t prio_in,
3509 struct ConnectedPeer *cp)
3511 #define N ((double)128.0)
3516 ld = test_load_too_high ();
3517 if (ld == GNUNET_SYSERR)
3518 return 0; /* excess resources */
3519 ret = change_host_trust (cp, prio_in);
3522 if (ret > current_priorities + N)
3523 rret = current_priorities + N;
3527 = (current_priorities * (N-1) + rret)/N;
3535 * Iterator over entries in the 'query_request_map' that
3536 * tries to see if we have the same request pending from
3537 * the same peer already.
3539 * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3540 * @param key current key code (query, ignored, must match)
3541 * @param value value in the hash map (a 'struct PendingRequest'
3542 * that already exists)
3543 * @return GNUNET_YES if we should continue to
3544 * iterate (no match yet)
3545 * GNUNET_NO if not (match found).
3548 check_duplicate_request_peer (void *cls,
3549 const GNUNET_HashCode * key,
3552 struct CheckDuplicateRequestClosure *cdc = cls;
3553 struct PendingRequest *have = value;
3555 if (cdc->pr->target_pid == have->target_pid)
3565 * Handle P2P "GET" request.
3567 * @param cls closure, always NULL
3568 * @param other the other peer involved (sender or receiver, NULL
3569 * for loopback messages where we are both sender and receiver)
3570 * @param message the actual message
3571 * @param latency reported latency of the connection with 'other'
3572 * @param distance reported distance (DV) to 'other'
3573 * @return GNUNET_OK to keep the connection open,
3574 * GNUNET_SYSERR to close it (signal serious error)
3577 handle_p2p_get (void *cls,
3578 const struct GNUNET_PeerIdentity *other,
3579 const struct GNUNET_MessageHeader *message,
3580 struct GNUNET_TIME_Relative latency,
3583 struct PendingRequest *pr;
3584 struct ConnectedPeer *cp;
3585 struct ConnectedPeer *cps;
3586 struct CheckDuplicateRequestClosure cdc;
3587 struct GNUNET_TIME_Relative timeout;
3589 const struct GetMessage *gm;
3591 const GNUNET_HashCode *opt;
3594 uint32_t ttl_decrement;
3595 enum GNUNET_BLOCK_Type type;
3599 msize = ntohs(message->size);
3600 if (msize < sizeof (struct GetMessage))
3602 GNUNET_break_op (0);
3603 return GNUNET_SYSERR;
3605 gm = (const struct GetMessage*) message;
3606 type = ntohl (gm->type);
3607 bm = ntohl (gm->hash_bitmap);
3615 if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3617 GNUNET_break_op (0);
3618 return GNUNET_SYSERR;
3620 opt = (const GNUNET_HashCode*) &gm[1];
3621 bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3622 bm = ntohl (gm->hash_bitmap);
3624 cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3625 &other->hashPubKey);
3628 /* peer must have just disconnected */
3629 GNUNET_STATISTICS_update (stats,
3630 gettext_noop ("# requests dropped due to initiator not being connected"),
3633 return GNUNET_SYSERR;
3635 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3636 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3643 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3644 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3645 "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3646 GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3649 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3650 "Failed to find peer `%4s' in connection set. Dropping query.\n",
3651 GNUNET_i2s (other));
3653 GNUNET_STATISTICS_update (stats,
3654 gettext_noop ("# requests dropped due to missing reverse route"),
3657 /* FIXME: try connect? */
3660 /* note that we can really only check load here since otherwise
3661 peers could find out that we are overloaded by not being
3662 disconnected after sending us a malformed query... */
3664 /* FIXME: query priority should play
3665 a major role here! */
3666 ld = test_load_too_high ();
3667 if (GNUNET_YES == ld)
3670 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3671 "Dropping query from `%s', this peer is too busy.\n",
3672 GNUNET_i2s (other));
3674 GNUNET_STATISTICS_update (stats,
3675 gettext_noop ("# requests dropped due to high load"),
3680 /* FIXME: if ld == GNUNET_NO, forward
3681 instead of indirecting! */
3684 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3685 "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3686 GNUNET_h2s (&gm->query),
3687 (unsigned int) type,
3691 have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3692 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
3693 (have_ns ? sizeof(GNUNET_HashCode) : 0));
3696 pr->namespace = (GNUNET_HashCode*) &pr[1];
3697 memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3700 pr->mingle = ntohl (gm->filter_mutator);
3701 if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3702 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3703 pr->anonymity_level = 1;
3704 pr->priority = bound_priority (ntohl (gm->priority), cps);
3705 pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3706 pr->query = gm->query;
3707 /* decrement ttl (always) */
3708 ttl_decrement = 2 * TTL_DECREMENT +
3709 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3711 if ( (pr->ttl < 0) &&
3712 (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3715 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3716 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3721 GNUNET_STATISTICS_update (stats,
3722 gettext_noop ("# requests dropped due TTL underflow"),
3725 /* integer underflow => drop (should be very rare)! */
3729 pr->ttl -= ttl_decrement;
3730 pr->start_time = GNUNET_TIME_absolute_get ();
3732 /* get bloom filter */
3735 pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3738 pr->bf_size = bfsize;
3743 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3745 &check_duplicate_request_peer,
3747 if (cdc.have != NULL)
3749 if (cdc.have->start_time.value + cdc.have->ttl >=
3750 pr->start_time.value + pr->ttl)
3752 /* existing request has higher TTL, drop new one! */
3753 cdc.have->priority += pr->priority;
3754 destroy_pending_request (pr);
3756 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3757 "Have existing request with higher TTL, dropping new request.\n",
3758 GNUNET_i2s (other));
3760 GNUNET_STATISTICS_update (stats,
3761 gettext_noop ("# requests dropped due to higher-TTL request"),
3768 /* existing request has lower TTL, drop old one! */
3769 pr->priority += cdc.have->priority;
3770 /* Possible optimization: if we have applicable pending
3771 replies in 'cdc.have', we might want to move those over
3772 (this is a really rare special-case, so it is not clear
3773 that this would be worth it) */
3774 destroy_pending_request (cdc.have);
3775 /* keep processing 'pr'! */
3780 GNUNET_break (GNUNET_OK ==
3781 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3784 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3785 GNUNET_break (GNUNET_OK ==
3786 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3789 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3791 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3793 pr->start_time.value + pr->ttl);
3795 GNUNET_STATISTICS_update (stats,
3796 gettext_noop ("# P2P searches received"),
3799 GNUNET_STATISTICS_update (stats,
3800 gettext_noop ("# P2P searches active"),
3804 /* calculate change in traffic preference */
3805 cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
3806 /* process locally */
3807 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3808 type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3809 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3810 (pr->priority + 1));
3811 pr->qe = GNUNET_DATASTORE_get (dsh,
3815 MAX_DATASTORE_QUEUE,
3817 &process_local_reply,
3820 /* Are multiple results possible? If so, start processing remotely now! */
3823 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3824 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3825 /* only one result, wait for datastore */
3828 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3829 pr->task = GNUNET_SCHEDULER_add_now (sched,
3830 &forward_request_task,
3834 /* make sure we don't track too many requests */
3835 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3837 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3838 GNUNET_assert (pr != NULL);
3839 destroy_pending_request (pr);
3845 /* **************************** CS GET Handling ************************ */
3849 * Handle START_SEARCH-message (search request from client).
3851 * @param cls closure
3852 * @param client identification of the client
3853 * @param message the actual message
3856 handle_start_search (void *cls,
3857 struct GNUNET_SERVER_Client *client,
3858 const struct GNUNET_MessageHeader *message)
3860 static GNUNET_HashCode all_zeros;
3861 const struct SearchMessage *sm;
3862 struct ClientList *cl;
3863 struct ClientRequestList *crl;
3864 struct PendingRequest *pr;
3867 enum GNUNET_BLOCK_Type type;
3869 msize = ntohs (message->size);
3870 if ( (msize < sizeof (struct SearchMessage)) ||
3871 (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3874 GNUNET_SERVER_receive_done (client,
3878 GNUNET_STATISTICS_update (stats,
3879 gettext_noop ("# client searches received"),
3882 sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3883 sm = (const struct SearchMessage*) message;
3884 type = ntohl (sm->type);
3886 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3887 "Received request for `%s' of type %u from local client\n",
3888 GNUNET_h2s (&sm->query),
3889 (unsigned int) type);
3892 while ( (cl != NULL) &&
3893 (cl->client != client) )
3897 cl = GNUNET_malloc (sizeof (struct ClientList));
3898 cl->client = client;
3899 GNUNET_SERVER_client_keep (client);
3900 cl->next = client_list;
3903 /* detect duplicate KBLOCK requests */
3904 if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
3905 (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
3906 (type == GNUNET_BLOCK_TYPE_ANY) )
3909 while ( (crl != NULL) &&
3910 ( (0 != memcmp (&crl->req->query,
3912 sizeof (GNUNET_HashCode))) ||
3913 (crl->req->type != type) ) )
3918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3919 "Have existing request, merging content-seen lists.\n");
3922 /* Duplicate request (used to send long list of
3923 known/blocked results); merge 'pr->replies_seen'
3924 and update bloom filter */
3925 GNUNET_array_grow (pr->replies_seen,
3926 pr->replies_seen_size,
3927 pr->replies_seen_off + sc);
3928 memcpy (&pr->replies_seen[pr->replies_seen_off],
3930 sc * sizeof (GNUNET_HashCode));
3931 pr->replies_seen_off += sc;
3932 refresh_bloomfilter (pr);
3933 GNUNET_STATISTICS_update (stats,
3934 gettext_noop ("# client searches updated (merged content seen list)"),
3937 GNUNET_SERVER_receive_done (client,
3942 GNUNET_STATISTICS_update (stats,
3943 gettext_noop ("# client searches active"),
3946 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
3947 ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
3948 crl = GNUNET_malloc (sizeof (struct ClientRequestList));
3949 memset (crl, 0, sizeof (struct ClientRequestList));
3950 crl->client_list = cl;
3951 GNUNET_CONTAINER_DLL_insert (cl->rl_head,
3956 pr->client_request_list = crl;
3957 GNUNET_array_grow (pr->replies_seen,
3958 pr->replies_seen_size,
3960 memcpy (pr->replies_seen,
3962 sc * sizeof (GNUNET_HashCode));
3963 pr->replies_seen_off = sc;
3964 pr->anonymity_level = ntohl (sm->anonymity_level);
3965 pr->start_time = GNUNET_TIME_absolute_get ();
3966 refresh_bloomfilter (pr);
3967 pr->query = sm->query;
3968 if (0 == (1 & ntohl (sm->options)))
3969 pr->local_only = GNUNET_NO;
3971 pr->local_only = GNUNET_YES;
3974 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3975 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3976 if (0 != memcmp (&sm->target,
3978 sizeof (GNUNET_HashCode)))
3979 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
3981 case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3982 pr->namespace = (GNUNET_HashCode*) &pr[1];
3983 memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
3988 GNUNET_break (GNUNET_OK ==
3989 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3992 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3993 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3994 type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
3995 pr->qe = GNUNET_DATASTORE_get (dsh,
3999 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
4000 &process_local_reply,
4005 /* **************************** Startup ************************ */
4008 * Process fs requests.
4010 * @param s scheduler to use
4011 * @param server the initialized server
4012 * @param c configuration to use
4015 main_init (struct GNUNET_SCHEDULER_Handle *s,
4016 struct GNUNET_SERVER_Handle *server,
4017 const struct GNUNET_CONFIGURATION_Handle *c)
4019 static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4022 GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4024 GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4025 { &handle_p2p_migration_stop,
4026 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4027 sizeof (struct MigrationStopMessage) },
4030 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4031 {&GNUNET_FS_handle_index_start, NULL,
4032 GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4033 {&GNUNET_FS_handle_index_list_get, NULL,
4034 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4035 {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
4036 sizeof (struct UnindexMessage) },
4037 {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
4041 unsigned long long enc = 128;
4045 stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4046 min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4048 GNUNET_CONFIGURATION_get_value_number (cfg,
4050 "MAX_PENDING_REQUESTS",
4051 &max_pending_requests)) ||
4053 GNUNET_CONFIGURATION_get_value_number (cfg,
4055 "EXPECTED_NEIGHBOUR_COUNT",
4058 GNUNET_CONFIGURATION_get_value_time (cfg,
4060 "MIN_MIGRATION_DELAY",
4061 &min_migration_delay)) )
4063 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4064 _("Configuration fails to specify certain parameters, assuming default values."));
4066 connected_peers = GNUNET_CONTAINER_multihashmap_create (enc);
4067 query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4068 peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4069 requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4070 core = GNUNET_CORE_connect (sched,
4072 GNUNET_TIME_UNIT_FOREVER_REL,
4075 &peer_connect_handler,
4076 &peer_disconnect_handler,
4083 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4084 _("Failed to connect to `%s' service.\n"),
4086 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4087 connected_peers = NULL;
4088 GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4089 query_request_map = NULL;
4090 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4091 requests_by_expiration_heap = NULL;
4092 GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4093 peer_request_map = NULL;
4096 GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4099 return GNUNET_SYSERR;
4101 /* FIXME: distinguish between sending and storing in options? */
4102 if (active_migration)
4104 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4105 _("Content migration is enabled, will start to gather data\n"));
4106 consider_migration_gathering ();
4108 consider_dht_put_gathering (NULL);
4109 GNUNET_SERVER_disconnect_notify (server,
4110 &handle_client_disconnect,
4112 GNUNET_assert (GNUNET_OK ==
4113 GNUNET_CONFIGURATION_get_value_filename (cfg,
4117 GNUNET_DISK_directory_create (trustDirectory);
4118 GNUNET_SCHEDULER_add_with_priority (sched,
4119 GNUNET_SCHEDULER_PRIORITY_HIGH,
4120 &cron_flush_trust, NULL);
4123 GNUNET_SERVER_add_handlers (server, handlers);
4124 GNUNET_SCHEDULER_add_delayed (sched,
4125 GNUNET_TIME_UNIT_FOREVER_REL,
4133 * Process fs requests.
4135 * @param cls closure
4136 * @param sched scheduler to use
4137 * @param server the initialized server
4138 * @param cfg configuration to use
4142 struct GNUNET_SCHEDULER_Handle *sched,
4143 struct GNUNET_SERVER_Handle *server,
4144 const struct GNUNET_CONFIGURATION_Handle *cfg)
4146 active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4149 dsh = GNUNET_DATASTORE_connect (cfg,
4153 GNUNET_SCHEDULER_shutdown (sched);
4156 datastore_get_load = GNUNET_LOAD_value_init ();
4157 datastore_put_load = GNUNET_LOAD_value_init ();
4158 block_cfg = GNUNET_CONFIGURATION_create ();
4159 GNUNET_CONFIGURATION_set_value_string (block_cfg,
4163 block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4164 GNUNET_assert (NULL != block_ctx);
4165 dht_handle = GNUNET_DHT_connect (sched,
4168 if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4169 (GNUNET_OK != main_init (sched, server, cfg)) )
4171 GNUNET_SCHEDULER_shutdown (sched);
4172 GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4174 GNUNET_DHT_disconnect (dht_handle);
4176 GNUNET_BLOCK_context_destroy (block_ctx);
4178 GNUNET_CONFIGURATION_destroy (block_cfg);
4180 GNUNET_LOAD_value_free (datastore_get_load);
4181 datastore_get_load = NULL;
4182 GNUNET_LOAD_value_free (datastore_put_load);
4183 datastore_put_load = NULL;
4190 * The main function for the fs service.
4192 * @param argc number of arguments from the command line
4193 * @param argv command line arguments
4194 * @return 0 ok, 1 on error
4197 main (int argc, char *const *argv)
4199 return (GNUNET_OK ==
4200 GNUNET_SERVICE_run (argc,
4203 GNUNET_SERVICE_OPTION_NONE,
4204 &run, NULL)) ? 0 : 1;
4207 /* end of gnunet-service-fs.c */