2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs.c
23 * @brief gnunet anonymity protocol implementation
24 * @author Christian Grothoff
27 * - track per-peer request latency (using new load API)
28 * - consider more precise latency estimation (per-peer & request) -- again load API?
29 * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
30 * - introduce random latency in processing
35 #include "gnunet_constants.h"
36 #include "gnunet_core_service.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_datastore_service.h"
39 #include "gnunet_load_lib.h"
40 #include "gnunet_peer_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_signatures.h"
43 #include "gnunet_statistics_service.h"
44 #include "gnunet_util_lib.h"
45 #include "gnunet-service-fs_indexing.h"
48 #define DEBUG_FS GNUNET_NO
51 * Maximum number of outgoing messages we queue per peer.
53 #define MAX_QUEUE_PER_PEER 16
56 * Size for the hash map for DHT requests from the FS
57 * service. Should be about the number of concurrent
58 * DHT requests we plan to make.
60 #define FS_DHT_HT_SIZE 1024
63 * How often do we flush trust values to disk?
65 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
68 * How often do we at most PUT content into the DHT?
70 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
73 * Inverse of the probability that we will submit the same query
74 * to the same peer again. If the same peer already got the query
75 * repeatedly recently, the probability is multiplied by the inverse
76 * of this number each time. Note that we only try about every TTL_DECREMENT/2
77 * plus MAX_CORK_DELAY (so roughly every 3.5s).
79 #define RETRY_PROBABILITY_INV 3
82 * What is the maximum delay for a P2P FS message (in our interaction
83 * with core)? FS-internal delays are another story. The value is
84 * chosen based on the 32k block size. Given that peers typcially
85 * have at least 1 kb/s bandwidth, 45s waits give us a chance to
86 * transmit one message even to the lowest-bandwidth peers.
88 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
91 * Maximum number of requests (from other peers) that we're
92 * willing to have pending at any given point in time.
94 static unsigned long long max_pending_requests = (32 * 1024);
98 * Information we keep for each pending reply. The
99 * actual message follows at the end of this struct.
101 struct PendingMessage;
104 * Function called upon completion of a transmission.
107 * @param pid ID of receiving peer, 0 on transmission error
109 typedef void (*TransmissionContinuation)(void * cls,
110 GNUNET_PEER_Id tpid);
114 * Information we keep for each pending message (GET/PUT). The
115 * actual message follows at the end of this struct.
117 struct PendingMessage
120 * This is a doubly-linked list of messages to the same peer.
122 struct PendingMessage *next;
125 * This is a doubly-linked list of messages to the same peer.
127 struct PendingMessage *prev;
130 * Entry in pending message list for this pending message.
132 struct PendingMessageList *pml;
135 * Function to call immediately once we have transmitted this
138 TransmissionContinuation cont;
146 * Size of the reply; actual reply message follows
147 * at the end of this struct.
152 * How important is this message for us?
160 * Information about a peer that we are connected to.
161 * We track data that is useful for determining which
162 * peers should receive our requests. We also keep
163 * a list of messages to transmit to this peer.
169 * List of the last clients for which this peer successfully
172 struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
175 * List of the last PIDs for which
176 * this peer successfully answered a query;
177 * We use 0 to indicate no successful reply.
179 GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
182 * Average delay between sending the peer a request and
183 * getting a reply (only calculated over the requests for
184 * which we actually got a reply). Calculated
185 * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
187 struct GNUNET_TIME_Relative avg_delay;
190 * Point in time until which this peer does not want us to migrate content
193 struct GNUNET_TIME_Absolute migration_blocked;
196 * Time until when we blocked this peer from migrating
199 struct GNUNET_TIME_Absolute last_migration_block;
202 * Handle for an active request for transmission to this
205 struct GNUNET_CORE_TransmitHandle *cth;
208 * Messages (replies, queries, content migration) we would like to
209 * send to this peer in the near future. Sorted by priority, head.
211 struct PendingMessage *pending_messages_head;
214 * Messages (replies, queries, content migration) we would like to
215 * send to this peer in the near future. Sorted by priority, tail.
217 struct PendingMessage *pending_messages_tail;
220 * How long does it typically take for us to transmit a message
221 * to this peer? (delay between the request being issued and
222 * the callback being invoked).
224 struct GNUNET_LOAD_Value *transmission_delay;
227 * Time when the last transmission request was issued.
229 struct GNUNET_TIME_Absolute last_transmission_request_start;
232 * Average priority of successful replies. Calculated
233 * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
238 * Increase in traffic preference still to be submitted
239 * to the core service for this peer.
241 uint64_t inc_preference;
244 * Trust rating for this peer
249 * Trust rating for this peer on disk.
254 * The peer's identity.
259 * Size of the linked list of 'pending_messages'.
261 unsigned int pending_requests;
264 * Which offset in "last_p2p_replies" will be updated next?
265 * (we go round-robin).
267 unsigned int last_p2p_replies_woff;
270 * Which offset in "last_client_replies" will be updated next?
271 * (we go round-robin).
273 unsigned int last_client_replies_woff;
279 * Information we keep for each pending request. We should try to
280 * keep this struct as small as possible since its memory consumption
281 * is key to how many requests we can have pending at once.
283 struct PendingRequest;
287 * Doubly-linked list of requests we are performing
288 * on behalf of the same client.
290 struct ClientRequestList
294 * This is a doubly-linked list.
296 struct ClientRequestList *next;
299 * This is a doubly-linked list.
301 struct ClientRequestList *prev;
304 * Request this entry represents.
306 struct PendingRequest *req;
309 * Client list this request belongs to.
311 struct ClientList *client_list;
317 * Replies to be transmitted to the client. The actual
318 * response message is allocated after this struct.
320 struct ClientResponseMessage
323 * This is a doubly-linked list.
325 struct ClientResponseMessage *next;
328 * This is a doubly-linked list.
330 struct ClientResponseMessage *prev;
333 * Client list entry this response belongs to.
335 struct ClientList *client_list;
338 * Number of bytes in the response.
345 * Linked list of clients we are performing requests
351 * This is a linked list.
353 struct ClientList *next;
356 * ID of a client making a request, NULL if this entry is for a
359 struct GNUNET_SERVER_Client *client;
362 * Head of list of requests performed on behalf
363 * of this client right now.
365 struct ClientRequestList *rl_head;
368 * Tail of list of requests performed on behalf
369 * of this client right now.
371 struct ClientRequestList *rl_tail;
374 * Head of linked list of responses.
376 struct ClientResponseMessage *res_head;
379 * Tail of linked list of responses.
381 struct ClientResponseMessage *res_tail;
384 * Context for sending replies.
386 struct GNUNET_CONNECTION_TransmitHandle *th;
392 * Doubly-linked list of messages we are performing
393 * due to a pending request.
395 struct PendingMessageList
399 * This is a doubly-linked list of messages on behalf of the same request.
401 struct PendingMessageList *next;
404 * This is a doubly-linked list of messages on behalf of the same request.
406 struct PendingMessageList *prev;
409 * Message this entry represents.
411 struct PendingMessage *pm;
414 * Request this entry belongs to.
416 struct PendingRequest *req;
419 * Peer this message is targeted for.
421 struct ConnectedPeer *target;
427 * Information we keep for each pending request. We should try to
428 * keep this struct as small as possible since its memory consumption
429 * is key to how many requests we can have pending at once.
431 struct PendingRequest
435 * If this request was made by a client, this is our entry in the
436 * client request list; otherwise NULL.
438 struct ClientRequestList *client_request_list;
441 * Entry of peer responsible for this entry (if this request
442 * was made by a peer).
444 struct ConnectedPeer *cp;
447 * If this is a namespace query, pointer to the hash of the public
448 * key of the namespace; otherwise NULL. Pointer will be to the
449 * end of this struct (so no need to free it).
451 const GNUNET_HashCode *namespace;
454 * Bloomfilter we use to filter out replies that we don't care about
455 * (anymore). NULL as long as we are interested in all replies.
457 struct GNUNET_CONTAINER_BloomFilter *bf;
460 * Context of our GNUNET_CORE_peer_change_preference call.
462 struct GNUNET_CORE_InformationRequestContext *irc;
465 * Reference to DHT get operation for this request (or NULL).
467 struct GNUNET_DHT_GetHandle *dht_get;
470 * Hash code of all replies that we have seen so far (only valid
471 * if client is not NULL since we only track replies like this for
474 GNUNET_HashCode *replies_seen;
477 * Node in the heap representing this entry; NULL
478 * if we have no heap node.
480 struct GNUNET_CONTAINER_HeapNode *hnode;
483 * Head of list of messages being performed on behalf of this
486 struct PendingMessageList *pending_head;
489 * Tail of list of messages being performed on behalf of this
492 struct PendingMessageList *pending_tail;
495 * When did we first see this request (form this peer), or, if our
496 * client is initiating, when did we last initiate a search?
498 struct GNUNET_TIME_Absolute start_time;
501 * The query that this request is for.
503 GNUNET_HashCode query;
506 * The task responsible for transmitting queries
509 GNUNET_SCHEDULER_TaskIdentifier task;
512 * (Interned) Peer identifier that identifies a preferred target
515 GNUNET_PEER_Id target_pid;
518 * (Interned) Peer identifiers of peers that have already
519 * received our query for this content.
521 GNUNET_PEER_Id *used_pids;
524 * Our entry in the queue (non-NULL while we wait for our
525 * turn to interact with the local database).
527 struct GNUNET_DATASTORE_QueueEntry *qe;
530 * Size of the 'bf' (in bytes).
535 * Desired anonymity level; only valid for requests from a local client.
537 uint32_t anonymity_level;
540 * How many entries in "used_pids" are actually valid?
542 unsigned int used_pids_off;
545 * How long is the "used_pids" array?
547 unsigned int used_pids_size;
550 * Number of results found for this request.
552 unsigned int results_found;
555 * How many entries in "replies_seen" are actually valid?
557 unsigned int replies_seen_off;
560 * How long is the "replies_seen" array?
562 unsigned int replies_seen_size;
565 * Priority with which this request was made. If one of our clients
566 * made the request, then this is the current priority that we are
567 * using when initiating the request. This value is used when
568 * we decide to reward other peers with trust for providing a reply.
573 * Priority points left for us to spend when forwarding this request
576 uint32_t remaining_priority;
579 * Number to mingle hashes for bloom-filter tests with.
584 * TTL with which we saw this request (or, if we initiated, TTL that
585 * we used for the request).
590 * Type of the content that this request is for.
592 enum GNUNET_BLOCK_Type type;
595 * Remove this request after transmission of the current response.
600 * GNUNET_YES if we should not forward this request to other peers.
608 * Block that is ready for migration to other peers. Actual data is at the end of the block.
610 struct MigrationReadyBlock
614 * This is a doubly-linked list.
616 struct MigrationReadyBlock *next;
619 * This is a doubly-linked list.
621 struct MigrationReadyBlock *prev;
624 * Query for the block.
626 GNUNET_HashCode query;
629 * When does this block expire?
631 struct GNUNET_TIME_Absolute expiration;
634 * Peers we would consider forwarding this
635 * block to. Zero for empty entries.
637 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
645 * Number of targets already used.
647 unsigned int used_targets;
652 enum GNUNET_BLOCK_Type type;
657 * Our connection to the datastore.
659 static struct GNUNET_DATASTORE_Handle *dsh;
664 static struct GNUNET_BLOCK_Context *block_ctx;
667 * Our block configuration.
669 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
674 static struct GNUNET_SCHEDULER_Handle *sched;
679 static const struct GNUNET_CONFIGURATION_Handle *cfg;
682 * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
684 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
687 * Map of peer identifiers to "struct PendingRequest" (for that peer).
689 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
692 * Map of query identifiers to "struct PendingRequest" (for that query).
694 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
697 * Heap with the request that will expire next at the top. Contains
698 * pointers of type "struct PendingRequest*"; these will *also* be
699 * aliased from the "requests_by_peer" data structures and the
700 * "requests_by_query" table. Note that requests from our clients
701 * don't expire and are thus NOT in the "requests_by_expiration"
702 * (or the "requests_by_peer" tables).
704 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
707 * Handle for reporting statistics.
709 static struct GNUNET_STATISTICS_Handle *stats;
712 * Linked list of clients we are currently processing requests for.
714 static struct ClientList *client_list;
717 * Pointer to handle to the core service (points to NULL until we've
720 static struct GNUNET_CORE_Handle *core;
723 * Head of linked list of blocks that can be migrated.
725 static struct MigrationReadyBlock *mig_head;
728 * Tail of linked list of blocks that can be migrated.
730 static struct MigrationReadyBlock *mig_tail;
733 * Request to datastore for migration (or NULL).
735 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
738 * Request to datastore for DHT PUTs (or NULL).
740 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
743 * Type we will request for the next DHT PUT round from the datastore.
745 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
748 * Where do we store trust information?
750 static char *trustDirectory;
753 * ID of task that collects blocks for migration.
755 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
758 * ID of task that collects blocks for DHT PUTs.
760 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
763 * What is the maximum frequency at which we are allowed to
764 * poll the datastore for migration content?
766 static struct GNUNET_TIME_Relative min_migration_delay;
769 * Handle for DHT operations.
771 static struct GNUNET_DHT_Handle *dht_handle;
774 * Size of the doubly-linked list of migration blocks.
776 static unsigned int mig_size;
779 * Are we allowed to migrate content to this peer.
781 static int active_migration;
784 * How many entires with zero anonymity do we currently estimate
785 * to have in the database?
787 static unsigned int zero_anonymity_count_estimate;
790 * Typical priorities we're seeing from other peers right now. Since
791 * most priorities will be zero, this value is the weighted average of
792 * non-zero priorities seen "recently". In order to ensure that new
793 * values do not dramatically change the ratio, values are first
794 * "capped" to a reasonable range (+N of the current value) and then
795 * averaged into the existing value by a ratio of 1:N. Hence
796 * receiving the largest possible priority can still only raise our
797 * "current_priorities" by at most 1.
799 static double current_priorities;
802 * Datastore 'GET' load tracking.
804 static struct GNUNET_LOAD_Value *datastore_get_load;
807 * Datastore 'PUT' load tracking.
809 static struct GNUNET_LOAD_Value *datastore_put_load;
813 * We've just now completed a datastore request. Update our
814 * datastore load calculations.
816 * @param start time when the datastore request was issued
819 update_datastore_delays (struct GNUNET_TIME_Absolute start)
821 struct GNUNET_TIME_Relative delay;
823 delay = GNUNET_TIME_absolute_get_duration (start);
824 GNUNET_LOAD_update (datastore_get_load,
830 * Get the filename under which we would store the GNUNET_HELLO_Message
831 * for the given host and protocol.
832 * @return filename of the form DIRECTORY/HOSTID
835 get_trust_filename (const struct GNUNET_PeerIdentity *id)
837 struct GNUNET_CRYPTO_HashAsciiEncoded fil;
840 GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
841 GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
848 * Transmit messages by copying it to the target buffer
849 * "buf". "buf" will be NULL and "size" zero if the socket was closed
850 * for writing in the meantime. In that case, do nothing
851 * (the disconnect or shutdown handler will take care of the rest).
852 * If we were able to transmit messages and there are still more
853 * pending, ask core again for further calls to this function.
855 * @param cls closure, pointer to the 'struct ConnectedPeer*'
856 * @param size number of bytes available in buf
857 * @param buf where the callee should write the message
858 * @return number of bytes written to buf
861 transmit_to_peer (void *cls,
862 size_t size, void *buf);
865 /* ******************* clean up functions ************************ */
868 * Delete the given migration block.
870 * @param mb block to delete
873 delete_migration_block (struct MigrationReadyBlock *mb)
875 GNUNET_CONTAINER_DLL_remove (mig_head,
878 GNUNET_PEER_decrement_rcs (mb->target_list,
879 MIGRATION_LIST_SIZE);
886 * Compare the distance of two peers to a key.
889 * @param p1 first peer
890 * @param p2 second peer
891 * @return GNUNET_YES if P1 is closer to key than P2
894 is_closer (const GNUNET_HashCode *key,
895 const struct GNUNET_PeerIdentity *p1,
896 const struct GNUNET_PeerIdentity *p2)
898 return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
905 * Consider migrating content to a given peer.
907 * @param cls 'struct MigrationReadyBlock*' to select
908 * targets for (or NULL for none)
909 * @param key ID of the peer
910 * @param value 'struct ConnectedPeer' of the peer
911 * @return GNUNET_YES (always continue iteration)
914 consider_migration (void *cls,
915 const GNUNET_HashCode *key,
918 struct MigrationReadyBlock *mb = cls;
919 struct ConnectedPeer *cp = value;
920 struct MigrationReadyBlock *pos;
921 struct GNUNET_PeerIdentity cppid;
922 struct GNUNET_PeerIdentity otherpid;
923 struct GNUNET_PeerIdentity worstpid;
928 /* consider 'cp' as a migration target for mb */
929 if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
930 return GNUNET_YES; /* peer has requested no migration! */
933 GNUNET_PEER_resolve (cp->pid,
935 repl = MIGRATION_LIST_SIZE;
936 for (i=0;i<MIGRATION_LIST_SIZE;i++)
938 if (mb->target_list[i] == 0)
940 mb->target_list[i] = cp->pid;
941 GNUNET_PEER_change_rc (mb->target_list[i], 1);
942 repl = MIGRATION_LIST_SIZE;
945 GNUNET_PEER_resolve (mb->target_list[i],
947 if ( (repl == MIGRATION_LIST_SIZE) &&
948 is_closer (&mb->query,
955 else if ( (repl != MIGRATION_LIST_SIZE) &&
956 (is_closer (&mb->query,
964 if (repl != MIGRATION_LIST_SIZE)
966 GNUNET_PEER_change_rc (mb->target_list[repl], -1);
967 mb->target_list[repl] = cp->pid;
968 GNUNET_PEER_change_rc (mb->target_list[repl], 1);
972 /* consider scheduling transmission to cp for content migration */
979 for (i=0;i<MIGRATION_LIST_SIZE;i++)
981 if (cp->pid == pos->target_list[i])
986 msize = GNUNET_MIN (msize,
994 return GNUNET_YES; /* no content available */
996 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997 "Trying to migrate at least %u bytes to peer `%s'\n",
1002 = GNUNET_CORE_notify_transmit_ready (core,
1003 0, GNUNET_TIME_UNIT_FOREVER_REL,
1004 (const struct GNUNET_PeerIdentity*) key,
1005 msize + sizeof (struct PutMessage),
1013 * Task that is run periodically to obtain blocks for content
1017 * @param tc scheduler context (also unused)
1020 gather_migration_blocks (void *cls,
1021 const struct GNUNET_SCHEDULER_TaskContext *tc);
1027 * Task that is run periodically to obtain blocks for DHT PUTs.
1029 * @param cls type of blocks to gather
1030 * @param tc scheduler context (unused)
1033 gather_dht_put_blocks (void *cls,
1034 const struct GNUNET_SCHEDULER_TaskContext *tc);
1038 * If the migration task is not currently running, consider
1039 * (re)scheduling it with the appropriate delay.
1042 consider_migration_gathering ()
1044 struct GNUNET_TIME_Relative delay;
1050 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1052 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1054 delay = GNUNET_TIME_relative_divide (delay,
1055 MAX_MIGRATION_QUEUE);
1056 delay = GNUNET_TIME_relative_max (delay,
1057 min_migration_delay);
1058 mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1060 &gather_migration_blocks,
1066 * If the DHT PUT gathering task is not currently running, consider
1067 * (re)scheduling it with the appropriate delay.
1070 consider_dht_put_gathering (void *cls)
1072 struct GNUNET_TIME_Relative delay;
1078 if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1080 if (zero_anonymity_count_estimate > 0)
1082 delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1083 zero_anonymity_count_estimate);
1084 delay = GNUNET_TIME_relative_min (delay,
1089 /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1090 (hopefully) appear */
1091 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1093 dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1095 &gather_dht_put_blocks,
1101 * Process content offered for migration.
1103 * @param cls closure
1104 * @param key key for the content
1105 * @param size number of bytes in data
1106 * @param data content stored
1107 * @param type type of the content
1108 * @param priority priority of the content
1109 * @param anonymity anonymity-level for the content
1110 * @param expiration expiration time for the content
1111 * @param uid unique identifier for the datum;
1112 * maybe 0 if no unique identifier is available
1115 process_migration_content (void *cls,
1116 const GNUNET_HashCode * key,
1119 enum GNUNET_BLOCK_Type type,
1122 struct GNUNET_TIME_Absolute
1123 expiration, uint64_t uid)
1125 struct MigrationReadyBlock *mb;
1130 if (mig_size < MAX_MIGRATION_QUEUE)
1131 consider_migration_gathering ();
1134 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1137 GNUNET_FS_handle_on_demand_block (key, size, data,
1138 type, priority, anonymity,
1140 &process_migration_content,
1143 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1148 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1149 "Retrieved block `%s' of type %u for migration\n",
1153 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1155 mb->expiration = expiration;
1158 memcpy (&mb[1], data, size);
1159 GNUNET_CONTAINER_DLL_insert_after (mig_head,
1164 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1165 &consider_migration,
1167 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1172 * Function called upon completion of the DHT PUT operation.
1175 dht_put_continuation (void *cls,
1176 const struct GNUNET_SCHEDULER_TaskContext *tc)
1178 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1183 * Store content in DHT.
1185 * @param cls closure
1186 * @param key key for the content
1187 * @param size number of bytes in data
1188 * @param data content stored
1189 * @param type type of the content
1190 * @param priority priority of the content
1191 * @param anonymity anonymity-level for the content
1192 * @param expiration expiration time for the content
1193 * @param uid unique identifier for the datum;
1194 * maybe 0 if no unique identifier is available
1197 process_dht_put_content (void *cls,
1198 const GNUNET_HashCode * key,
1201 enum GNUNET_BLOCK_Type type,
1204 struct GNUNET_TIME_Absolute
1205 expiration, uint64_t uid)
1207 static unsigned int counter;
1208 static GNUNET_HashCode last_vhash;
1209 static GNUNET_HashCode vhash;
1214 consider_dht_put_gathering (cls);
1217 /* slightly funky code to estimate the total number of values with zero
1218 anonymity from the maximum observed length of a monotonically increasing
1219 sequence of hashes over the contents */
1220 GNUNET_CRYPTO_hash (data, size, &vhash);
1221 if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1223 if (zero_anonymity_count_estimate > 0)
1224 zero_anonymity_count_estimate /= 2;
1230 if (zero_anonymity_count_estimate < (1 << counter))
1231 zero_anonymity_count_estimate = (1 << counter);
1233 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1234 "Retrieved block `%s' of type %u for DHT PUT\n",
1238 GNUNET_DHT_put (dht_handle,
1245 GNUNET_TIME_UNIT_FOREVER_REL,
1246 &dht_put_continuation,
1252 * Task that is run periodically to obtain blocks for content
1256 * @param tc scheduler context (also unused)
1259 gather_migration_blocks (void *cls,
1260 const struct GNUNET_SCHEDULER_TaskContext *tc)
1262 mig_task = GNUNET_SCHEDULER_NO_TASK;
1265 mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1266 GNUNET_TIME_UNIT_FOREVER_REL,
1267 &process_migration_content, NULL);
1268 GNUNET_assert (mig_qe != NULL);
1274 * Task that is run periodically to obtain blocks for DHT PUTs.
1276 * @param cls type of blocks to gather
1277 * @param tc scheduler context (unused)
1280 gather_dht_put_blocks (void *cls,
1281 const struct GNUNET_SCHEDULER_TaskContext *tc)
1283 dht_task = GNUNET_SCHEDULER_NO_TASK;
1286 if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1287 dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1288 dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1289 GNUNET_TIME_UNIT_FOREVER_REL,
1291 &process_dht_put_content, NULL);
1292 GNUNET_assert (dht_qe != NULL);
1298 * We're done with a particular message list entry.
1299 * Free all associated resources.
1301 * @param pml entry to destroy
1304 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1306 GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1307 pml->req->pending_tail,
1309 GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1310 pml->target->pending_messages_tail,
1312 pml->target->pending_requests--;
1313 GNUNET_free (pml->pm);
1319 * Destroy the given pending message (and call the respective
1322 * @param pm message to destroy
1323 * @param tpid id of peer that the message was delivered to, or 0 for none
1326 destroy_pending_message (struct PendingMessage *pm,
1327 GNUNET_PEER_Id tpid)
1329 struct PendingMessageList *pml = pm->pml;
1330 TransmissionContinuation cont;
1335 GNUNET_assert (pml->pm == pm);
1336 GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1338 cont_cls = pm->cont_cls;
1339 destroy_pending_message_list_entry (pml);
1346 cont (cont_cls, tpid);
1351 * We're done processing a particular request.
1352 * Free all associated resources.
1354 * @param pr request to destroy
1357 destroy_pending_request (struct PendingRequest *pr)
1359 struct GNUNET_PeerIdentity pid;
1361 if (pr->hnode != NULL)
1363 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1367 if (NULL == pr->client_request_list)
1369 GNUNET_STATISTICS_update (stats,
1370 gettext_noop ("# P2P searches active"),
1376 GNUNET_STATISTICS_update (stats,
1377 gettext_noop ("# client searches active"),
1381 /* might have already been removed from map in 'process_reply' (if
1382 there was a unique reply) or never inserted if it was a
1383 duplicate; hence ignore the return value here */
1384 (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1389 GNUNET_DATASTORE_cancel (pr->qe);
1392 if (pr->dht_get != NULL)
1394 GNUNET_DHT_get_stop (pr->dht_get);
1397 if (pr->client_request_list != NULL)
1399 GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1400 pr->client_request_list->client_list->rl_tail,
1401 pr->client_request_list);
1402 GNUNET_free (pr->client_request_list);
1403 pr->client_request_list = NULL;
1407 GNUNET_PEER_resolve (pr->cp->pid,
1409 (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1416 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1419 if (pr->irc != NULL)
1421 GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1424 if (pr->replies_seen != NULL)
1426 GNUNET_free (pr->replies_seen);
1427 pr->replies_seen = NULL;
1429 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1431 GNUNET_SCHEDULER_cancel (sched,
1433 pr->task = GNUNET_SCHEDULER_NO_TASK;
1435 while (NULL != pr->pending_head)
1436 destroy_pending_message_list_entry (pr->pending_head);
1437 GNUNET_PEER_change_rc (pr->target_pid, -1);
1438 if (pr->used_pids != NULL)
1440 GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1441 GNUNET_free (pr->used_pids);
1442 pr->used_pids_off = 0;
1443 pr->used_pids_size = 0;
1444 pr->used_pids = NULL;
1451 * Method called whenever a given peer connects.
1453 * @param cls closure, not used
1454 * @param peer peer identity this notification is about
1455 * @param latency reported latency of the connection with 'other'
1456 * @param distance reported distance (DV) to 'other'
1459 peer_connect_handler (void *cls,
1461 GNUNET_PeerIdentity * peer,
1462 struct GNUNET_TIME_Relative latency,
1465 struct ConnectedPeer *cp;
1466 struct MigrationReadyBlock *pos;
1470 cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1471 cp->transmission_delay = GNUNET_LOAD_value_init ();
1472 cp->pid = GNUNET_PEER_intern (peer);
1474 fn = get_trust_filename (peer);
1475 if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1476 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1477 cp->disk_trust = cp->trust = ntohl (trust);
1480 GNUNET_break (GNUNET_OK ==
1481 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1484 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1489 (void) consider_migration (pos, &peer->hashPubKey, cp);
1496 * Increase the host credit by a value.
1498 * @param host which peer to change the trust value on
1499 * @param value is the int value by which the
1500 * host credit is to be increased or decreased
1501 * @returns the actual change in trust (positive or negative)
1504 change_host_trust (struct ConnectedPeer *host, int value)
1506 unsigned int old_trust;
1510 GNUNET_assert (host != NULL);
1511 old_trust = host->trust;
1514 if (host->trust + value < host->trust)
1516 value = UINT32_MAX - host->trust;
1517 host->trust = UINT32_MAX;
1520 host->trust += value;
1524 if (host->trust < -value)
1526 value = -host->trust;
1530 host->trust += value;
1537 * Write host-trust information to a file - flush the buffer entry!
1540 flush_trust (void *cls,
1541 const GNUNET_HashCode *key,
1544 struct ConnectedPeer *host = value;
1547 struct GNUNET_PeerIdentity pid;
1549 if (host->trust == host->disk_trust)
1550 return GNUNET_OK; /* unchanged */
1551 GNUNET_PEER_resolve (host->pid,
1553 fn = get_trust_filename (&pid);
1554 if (host->trust == 0)
1556 if ((0 != UNLINK (fn)) && (errno != ENOENT))
1557 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1558 GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1562 trust = htonl (host->trust);
1563 if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust,
1565 GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1566 | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1567 host->disk_trust = host->trust;
1574 * Call this method periodically to scan data/hosts for new hosts.
1577 cron_flush_trust (void *cls,
1578 const struct GNUNET_SCHEDULER_TaskContext *tc)
1581 if (NULL == connected_peers)
1583 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1588 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1590 GNUNET_SCHEDULER_add_delayed (tc->sched,
1591 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1596 * Free (each) request made by the peer.
1598 * @param cls closure, points to peer that the request belongs to
1599 * @param key current key code
1600 * @param value value in the hash map
1601 * @return GNUNET_YES (we should continue to iterate)
1604 destroy_request (void *cls,
1605 const GNUNET_HashCode * key,
1608 const struct GNUNET_PeerIdentity * peer = cls;
1609 struct PendingRequest *pr = value;
1611 GNUNET_break (GNUNET_YES ==
1612 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1615 destroy_pending_request (pr);
1621 * Method called whenever a peer disconnects.
1623 * @param cls closure, not used
1624 * @param peer peer identity this notification is about
1627 peer_disconnect_handler (void *cls,
1629 GNUNET_PeerIdentity * peer)
1631 struct ConnectedPeer *cp;
1632 struct PendingMessage *pm;
1634 struct MigrationReadyBlock *pos;
1635 struct MigrationReadyBlock *next;
1637 GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1641 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1645 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1647 if (NULL != cp->last_client_replies[i])
1649 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1650 cp->last_client_replies[i] = NULL;
1653 GNUNET_break (GNUNET_YES ==
1654 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1657 /* remove this peer from migration considerations; schedule
1660 while (NULL != (pos = next))
1663 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1665 if (pos->target_list[i] == cp->pid)
1667 GNUNET_PEER_change_rc (pos->target_list[i], -1);
1668 pos->target_list[i] = 0;
1671 if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1673 delete_migration_block (pos);
1674 consider_migration_gathering ();
1677 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1678 &consider_migration,
1681 GNUNET_PEER_change_rc (cp->pid, -1);
1682 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1683 if (NULL != cp->cth)
1684 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1685 while (NULL != (pm = cp->pending_messages_head))
1686 destroy_pending_message (pm, 0 /* delivery failed */);
1687 GNUNET_LOAD_value_free (cp->transmission_delay);
1688 GNUNET_break (0 == cp->pending_requests);
1694 * Iterator over hash map entries that removes all occurences
1695 * of the given 'client' from the 'last_client_replies' of the
1696 * given connected peer.
1698 * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1699 * @param key current key code (unused)
1700 * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1701 * @return GNUNET_YES (we should continue to iterate)
1704 remove_client_from_last_client_replies (void *cls,
1705 const GNUNET_HashCode * key,
1708 struct GNUNET_SERVER_Client *client = cls;
1709 struct ConnectedPeer *cp = value;
1712 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1714 if (cp->last_client_replies[i] == client)
1716 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1717 cp->last_client_replies[i] = NULL;
1725 * A client disconnected. Remove all of its pending queries.
1727 * @param cls closure, NULL
1728 * @param client identification of the client
1731 handle_client_disconnect (void *cls,
1732 struct GNUNET_SERVER_Client
1735 struct ClientList *pos;
1736 struct ClientList *prev;
1737 struct ClientRequestList *rcl;
1738 struct ClientResponseMessage *creply;
1744 while ( (NULL != pos) &&
1745 (pos->client != client) )
1751 return; /* no requests pending for this client */
1752 while (NULL != (rcl = pos->rl_head))
1754 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1755 "Destroying pending request `%s' on disconnect\n",
1756 GNUNET_h2s (&rcl->req->query));
1757 destroy_pending_request (rcl->req);
1760 client_list = pos->next;
1762 prev->next = pos->next;
1763 if (pos->th != NULL)
1765 GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1768 while (NULL != (creply = pos->res_head))
1770 GNUNET_CONTAINER_DLL_remove (pos->res_head,
1773 GNUNET_free (creply);
1775 GNUNET_SERVER_client_drop (pos->client);
1777 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1778 &remove_client_from_last_client_replies,
1784 * Iterator to free peer entries.
1786 * @param cls closure, unused
1787 * @param key current key code
1788 * @param value value in the hash map (peer entry)
1789 * @return GNUNET_YES (we should continue to iterate)
1792 clean_peer (void *cls,
1793 const GNUNET_HashCode * key,
1796 peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1802 * Task run during shutdown.
1808 shutdown_task (void *cls,
1809 const struct GNUNET_SCHEDULER_TaskContext *tc)
1813 GNUNET_DATASTORE_cancel (mig_qe);
1818 GNUNET_DATASTORE_cancel (dht_qe);
1821 if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1823 GNUNET_SCHEDULER_cancel (sched, mig_task);
1824 mig_task = GNUNET_SCHEDULER_NO_TASK;
1826 if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1828 GNUNET_SCHEDULER_cancel (sched, dht_task);
1829 dht_task = GNUNET_SCHEDULER_NO_TASK;
1831 while (client_list != NULL)
1832 handle_client_disconnect (NULL,
1833 client_list->client);
1834 cron_flush_trust (NULL, NULL);
1835 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1838 GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1839 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1840 requests_by_expiration_heap = 0;
1841 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1842 connected_peers = NULL;
1843 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1844 GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1845 query_request_map = NULL;
1846 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1847 GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1848 peer_request_map = NULL;
1849 GNUNET_assert (NULL != core);
1850 GNUNET_CORE_disconnect (core);
1854 GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1859 GNUNET_DATASTORE_disconnect (dsh,
1863 while (mig_head != NULL)
1864 delete_migration_block (mig_head);
1865 GNUNET_assert (0 == mig_size);
1866 GNUNET_DHT_disconnect (dht_handle);
1868 GNUNET_LOAD_value_free (datastore_get_load);
1869 datastore_get_load = NULL;
1870 GNUNET_LOAD_value_free (datastore_put_load);
1871 datastore_put_load = NULL;
1872 GNUNET_BLOCK_context_destroy (block_ctx);
1874 GNUNET_CONFIGURATION_destroy (block_cfg);
1878 GNUNET_free_non_null (trustDirectory);
1879 trustDirectory = NULL;
1883 /* ******************* Utility functions ******************** */
1887 * Transmit messages by copying it to the target buffer
1888 * "buf". "buf" will be NULL and "size" zero if the socket was closed
1889 * for writing in the meantime. In that case, do nothing
1890 * (the disconnect or shutdown handler will take care of the rest).
1891 * If we were able to transmit messages and there are still more
1892 * pending, ask core again for further calls to this function.
1894 * @param cls closure, pointer to the 'struct ConnectedPeer*'
1895 * @param size number of bytes available in buf
1896 * @param buf where the callee should write the message
1897 * @return number of bytes written to buf
1900 transmit_to_peer (void *cls,
1901 size_t size, void *buf)
1903 struct ConnectedPeer *cp = cls;
1905 struct GNUNET_PeerIdentity pid;
1906 struct PendingMessage *pm;
1907 struct MigrationReadyBlock *mb;
1908 struct MigrationReadyBlock *next;
1909 struct PutMessage migm;
1917 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1918 "Dropping message, core too busy.\n");
1922 GNUNET_LOAD_update (cp->transmission_delay,
1923 GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
1925 while ( (NULL != (pm = cp->pending_messages_head) ) &&
1926 (pm->msize <= size) )
1928 memcpy (&cbuf[msize], &pm[1], pm->msize);
1931 destroy_pending_message (pm, cp->pid);
1935 GNUNET_PEER_resolve (cp->pid,
1937 cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
1938 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1940 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1949 while (NULL != (mb = next))
1952 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1954 if ( (cp->pid == mb->target_list[i]) &&
1955 (mb->size + sizeof (migm) <= size) )
1957 GNUNET_PEER_change_rc (mb->target_list[i], -1);
1958 mb->target_list[i] = 0;
1960 memset (&migm, 0, sizeof (migm));
1961 migm.header.size = htons (sizeof (migm) + mb->size);
1962 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1963 migm.type = htonl (mb->type);
1964 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1965 memcpy (&cbuf[msize], &migm, sizeof (migm));
1966 msize += sizeof (migm);
1967 size -= sizeof (migm);
1968 memcpy (&cbuf[msize], &mb[1], mb->size);
1972 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1973 "Pushing migration block `%s' (%u bytes) to `%s'\n",
1974 GNUNET_h2s (&mb->query),
1983 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1984 "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1985 GNUNET_h2s (&mb->query),
1991 if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
1992 (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
1994 delete_migration_block (mb);
1995 consider_migration_gathering ();
1998 consider_migration (NULL,
2003 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2004 "Transmitting %u bytes to peer %u\n",
2013 * Add a message to the set of pending messages for the given peer.
2015 * @param cp peer to send message to
2016 * @param pm message to queue
2017 * @param pr request on which behalf this message is being queued
2020 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2021 struct PendingMessage *pm,
2022 struct PendingRequest *pr)
2024 struct PendingMessage *pos;
2025 struct PendingMessageList *pml;
2026 struct GNUNET_PeerIdentity pid;
2028 GNUNET_assert (pm->next == NULL);
2029 GNUNET_assert (pm->pml == NULL);
2032 pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2037 GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2041 pos = cp->pending_messages_head;
2042 while ( (pos != NULL) &&
2043 (pm->priority < pos->priority) )
2045 GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2046 cp->pending_messages_tail,
2049 cp->pending_requests++;
2050 if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2051 destroy_pending_message (cp->pending_messages_tail, 0);
2052 GNUNET_PEER_resolve (cp->pid, &pid);
2053 if (NULL != cp->cth)
2054 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2055 /* need to schedule transmission */
2056 cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2057 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2058 cp->pending_messages_head->priority,
2061 cp->pending_messages_head->msize,
2064 if (cp->cth == NULL)
2067 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2068 "Failed to schedule transmission with core!\n");
2070 GNUNET_STATISTICS_update (stats,
2071 gettext_noop ("# CORE transmission failures"),
2079 * Test if the load on this peer is too high
2080 * to even consider processing the query at
2083 * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
2086 test_load_too_high ()
2088 return GNUNET_SYSERR; // FIXME
2092 /* ******************* Pending Request Refresh Task ******************** */
2097 * We use a random delay to make the timing of requests less
2098 * predictable. This function returns such a random delay. We add a base
2099 * delay of MAX_CORK_DELAY (1s).
2101 * FIXME: make schedule dependent on the specifics of the request?
2102 * Or bandwidth and number of connected peers and load?
2104 * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2106 static struct GNUNET_TIME_Relative
2107 get_processing_delay ()
2110 GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2111 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2112 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2118 * We're processing a GET request from another peer and have decided
2119 * to forward it to other peers. This function is called periodically
2120 * and should forward the request to other peers until we have all
2121 * possible replies. If we have transmitted the *only* reply to
2122 * the initiator we should destroy the pending request. If we have
2123 * many replies in the queue to the initiator, we should delay sending
2124 * out more queries until the reply queue has shrunk some.
2126 * @param cls our "struct ProcessGetContext *"
2130 forward_request_task (void *cls,
2131 const struct GNUNET_SCHEDULER_TaskContext *tc);
2135 * Function called after we either failed or succeeded
2136 * at transmitting a query to a peer.
2138 * @param cls the requests "struct PendingRequest*"
2139 * @param tpid ID of receiving peer, 0 on transmission error
2142 transmit_query_continuation (void *cls,
2143 GNUNET_PEER_Id tpid)
2145 struct PendingRequest *pr = cls;
2147 GNUNET_STATISTICS_update (stats,
2148 gettext_noop ("# queries scheduled for forwarding"),
2154 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2155 "Transmission of request failed, will try again later.\n");
2157 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2158 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2159 get_processing_delay (),
2160 &forward_request_task,
2164 GNUNET_STATISTICS_update (stats,
2165 gettext_noop ("# queries forwarded"),
2168 GNUNET_PEER_change_rc (tpid, 1);
2169 if (pr->used_pids_off == pr->used_pids_size)
2170 GNUNET_array_grow (pr->used_pids,
2172 pr->used_pids_size * 2 + 2);
2173 pr->used_pids[pr->used_pids_off++] = tpid;
2174 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2175 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2176 get_processing_delay (),
2177 &forward_request_task,
2183 * How many bytes should a bloomfilter be if we have already seen
2184 * entry_count responses? Note that BLOOMFILTER_K gives us the number
2185 * of bits set per entry. Furthermore, we should not re-size the
2186 * filter too often (to keep it cheap).
2188 * Since other peers will also add entries but not resize the filter,
2189 * we should generally pick a slightly larger size than what the
2190 * strict math would suggest.
2192 * @return must be a power of two and smaller or equal to 2^15.
2195 compute_bloomfilter_size (unsigned int entry_count)
2198 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2199 uint16_t max = 1 << 15;
2201 if (entry_count > max)
2204 while ((size < max) && (size < ideal))
2213 * Recalculate our bloom filter for filtering replies. This function
2214 * will create a new bloom filter from scratch, so it should only be
2215 * called if we have no bloomfilter at all (and hence can create a
2216 * fresh one of minimal size without problems) OR if our peer is the
2217 * initiator (in which case we may resize to larger than mimimum size).
2219 * @param pr request for which the BF is to be recomputed
2222 refresh_bloomfilter (struct PendingRequest *pr)
2226 GNUNET_HashCode mhash;
2228 nsize = compute_bloomfilter_size (pr->replies_seen_off);
2229 if (nsize == pr->bf_size)
2230 return; /* size not changed */
2232 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2233 pr->bf_size = nsize;
2234 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2235 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2238 for (i=0;i<pr->replies_seen_off;i++)
2240 GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2243 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2249 * Function called after we've tried to reserve a certain amount of
2250 * bandwidth for a reply. Check if we succeeded and if so send our
2253 * @param cls the requests "struct PendingRequest*"
2254 * @param peer identifies the peer
2255 * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2256 * @param bpm_out set to the current bandwidth limit (sending) for this peer
2257 * @param amount set to the amount that was actually reserved or unreserved
2258 * @param preference current traffic preference for the given peer
2261 target_reservation_cb (void *cls,
2263 GNUNET_PeerIdentity * peer,
2264 struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2265 struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2267 uint64_t preference)
2269 struct PendingRequest *pr = cls;
2270 struct ConnectedPeer *cp;
2271 struct PendingMessage *pm;
2272 struct GetMessage *gm;
2273 GNUNET_HashCode *ext;
2283 /* error in communication with core, try again later */
2284 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2285 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2286 get_processing_delay (),
2287 &forward_request_task,
2291 // (3) transmit, update ttl/priority
2292 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2296 /* Peer must have just left */
2298 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2299 "Selected peer disconnected!\n");
2301 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2302 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2303 get_processing_delay (),
2304 &forward_request_task,
2308 no_route = GNUNET_NO;
2314 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2315 "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2319 GNUNET_STATISTICS_update (stats,
2320 gettext_noop ("# reply bandwidth reservation requests failed"),
2323 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2324 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2325 get_processing_delay (),
2326 &forward_request_task,
2328 return; /* this target round failed */
2330 /* FIXME: if we are "quite" busy, we may still want to skip
2331 this round; need more load detection code! */
2332 no_route = GNUNET_YES;
2335 GNUNET_STATISTICS_update (stats,
2336 gettext_noop ("# queries scheduled for forwarding"),
2339 /* build message and insert message into priority queue */
2341 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2342 "Forwarding request `%s' to `%4s'!\n",
2343 GNUNET_h2s (&pr->query),
2348 if (GNUNET_YES == no_route)
2350 bm |= GET_MESSAGE_BIT_RETURN_TO;
2353 if (pr->namespace != NULL)
2355 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2358 if (pr->target_pid != 0)
2360 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2363 msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2364 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2365 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2367 gm = (struct GetMessage*) &pm[1];
2368 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2369 gm->header.size = htons (msize);
2370 gm->type = htonl (pr->type);
2371 pr->remaining_priority /= 2;
2372 gm->priority = htonl (pr->remaining_priority);
2373 gm->ttl = htonl (pr->ttl);
2374 gm->filter_mutator = htonl(pr->mingle);
2375 gm->hash_bitmap = htonl (bm);
2376 gm->query = pr->query;
2377 ext = (GNUNET_HashCode*) &gm[1];
2379 if (GNUNET_YES == no_route)
2380 GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2381 if (pr->namespace != NULL)
2382 memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2383 if (pr->target_pid != 0)
2384 GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2385 bfdata = (char *) &ext[k];
2387 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2390 pm->cont = &transmit_query_continuation;
2392 add_to_pending_messages_for_peer (cp, pm, pr);
2397 * Closure used for "target_peer_select_cb".
2399 struct PeerSelectionContext
2402 * The request for which we are selecting
2405 struct PendingRequest *pr;
2408 * Current "prime" target.
2410 struct GNUNET_PeerIdentity target;
2413 * How much do we like this target?
2415 double target_score;
2421 * Function called for each connected peer to determine
2422 * which one(s) would make good targets for forwarding.
2424 * @param cls closure (struct PeerSelectionContext)
2425 * @param key current key code (peer identity)
2426 * @param value value in the hash map (struct ConnectedPeer)
2427 * @return GNUNET_YES if we should continue to
2432 target_peer_select_cb (void *cls,
2433 const GNUNET_HashCode * key,
2436 struct PeerSelectionContext *psc = cls;
2437 struct ConnectedPeer *cp = value;
2438 struct PendingRequest *pr = psc->pr;
2443 /* 1) check that this peer is not the initiator */
2447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2448 "Skipping initiator in forwarding selection\n");
2450 return GNUNET_YES; /* skip */
2453 /* 2) check if we have already (recently) forwarded to this peer */
2455 for (i=0;i<pr->used_pids_off;i++)
2456 if (pr->used_pids[i] == cp->pid)
2459 if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2460 RETRY_PROBABILITY_INV))
2463 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2464 "NOT re-trying query that was previously transmitted %u times\n",
2465 (unsigned int) pr->used_pids_off);
2467 return GNUNET_YES; /* skip */
2472 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2473 "Re-trying query that was previously transmitted %u times to this peer\n",
2476 /* 3) calculate how much we'd like to forward to this peer,
2477 starting with a random value that is strong enough
2478 to at least give any peer a chance sometimes
2479 (compared to the other factors that come later) */
2480 /* 3a) count successful (recent) routes from cp for same source */
2483 score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2484 P2P_SUCCESS_LIST_SIZE);
2485 for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2486 if (cp->last_p2p_replies[i] == pr->cp->pid)
2487 score += 1; /* likely successful based on hot path */
2491 score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2492 CS2P_SUCCESS_LIST_SIZE);
2493 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2494 if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2495 score += 1; /* likely successful based on hot path */
2497 /* 3b) include latency */
2498 if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2499 score += 1; /* likely fast based on latency */
2500 /* 3c) include priorities */
2501 if (cp->avg_priority <= pr->remaining_priority / 2.0)
2502 score += 1; /* likely successful based on priorities */
2503 /* 3d) penalize for queue size */
2504 score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER);
2505 /* 3e) include peer proximity */
2506 score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2507 &pr->query)) / (double) UINT32_MAX);
2508 /* 4) super-bonus for being the known target */
2509 if (pr->target_pid == cp->pid)
2511 /* store best-fit in closure */
2513 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2514 "Peer `%s' gets score %f for forwarding query, max is %f\n",
2519 score++; /* avoid zero */
2520 if (score > psc->target_score)
2522 psc->target_score = score;
2523 psc->target.hashPubKey = *key;
2530 * The priority level imposes a bound on the maximum
2531 * value for the ttl that can be requested.
2533 * @param ttl_in requested ttl
2534 * @param prio given priority
2535 * @return ttl_in if ttl_in is below the limit,
2536 * otherwise the ttl-limit for the given priority
2539 bound_ttl (int32_t ttl_in, uint32_t prio)
2541 unsigned long long allowed;
2545 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
2546 if (ttl_in > allowed)
2548 if (allowed >= (1 << 30))
2557 * Iterator called on each result obtained for a DHT
2558 * operation that expects a reply
2560 * @param cls closure
2561 * @param exp when will this value expire
2562 * @param key key of the result
2563 * @param get_path NULL-terminated array of pointers
2564 * to the peers on reverse GET path (or NULL if not recorded)
2565 * @param put_path NULL-terminated array of pointers
2566 * to the peers on the PUT path (or NULL if not recorded)
2567 * @param type type of the result
2568 * @param size number of bytes in data
2569 * @param data pointer to the result data
2572 process_dht_reply (void *cls,
2573 struct GNUNET_TIME_Absolute exp,
2574 const GNUNET_HashCode * key,
2575 const struct GNUNET_PeerIdentity * const *get_path,
2576 const struct GNUNET_PeerIdentity * const *put_path,
2577 enum GNUNET_BLOCK_Type type,
2583 * We're processing a GET request and have decided
2584 * to forward it to other peers. This function is called periodically
2585 * and should forward the request to other peers until we have all
2586 * possible replies. If we have transmitted the *only* reply to
2587 * the initiator we should destroy the pending request. If we have
2588 * many replies in the queue to the initiator, we should delay sending
2589 * out more queries until the reply queue has shrunk some.
2591 * @param cls our "struct ProcessGetContext *"
2595 forward_request_task (void *cls,
2596 const struct GNUNET_SCHEDULER_TaskContext *tc)
2598 struct PendingRequest *pr = cls;
2599 struct PeerSelectionContext psc;
2600 struct ConnectedPeer *cp;
2601 struct GNUNET_TIME_Relative delay;
2603 pr->task = GNUNET_SCHEDULER_NO_TASK;
2604 if (pr->irc != NULL)
2607 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2608 "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2609 GNUNET_h2s (&pr->query));
2611 return; /* already pending */
2613 if (GNUNET_YES == pr->local_only)
2614 return; /* configured to not do P2P search */
2616 if ( (0 == pr->anonymity_level) &&
2617 (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2618 (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2620 pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2621 GNUNET_TIME_UNIT_FOREVER_REL,
2628 (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2632 /* (1) select target */
2634 psc.target_score = -DBL_MAX;
2635 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2636 &target_peer_select_cb,
2638 if (psc.target_score == -DBL_MAX)
2640 delay = get_processing_delay ();
2642 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2643 "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2644 GNUNET_h2s (&pr->query),
2647 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2649 &forward_request_task,
2651 return; /* nobody selected */
2653 /* (3) update TTL/priority */
2654 if (pr->client_request_list != NULL)
2656 /* FIXME: use better algorithm!? */
2657 if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2660 /* bound priority we use by priorities we see from other peers
2661 rounded up (must round up so that we can see non-zero
2662 priorities, but round up as little as possible to make it
2663 plausible that we forwarded another peers request) */
2664 if (pr->priority > current_priorities + 1.0)
2665 pr->priority = (uint32_t) current_priorities + 1.0;
2666 pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2669 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2670 "Trying query `%s' with priority %u and TTL %d.\n",
2671 GNUNET_h2s (&pr->query),
2677 /* (3) reserve reply bandwidth */
2678 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2679 &psc.target.hashPubKey);
2680 GNUNET_assert (NULL != cp);
2681 pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2683 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2684 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2687 &target_reservation_cb,
2689 cp->inc_preference = 0;
2693 /* **************************** P2P PUT Handling ************************ */
2697 * Function called after we either failed or succeeded
2698 * at transmitting a reply to a peer.
2700 * @param cls the requests "struct PendingRequest*"
2701 * @param tpid ID of receiving peer, 0 on transmission error
2704 transmit_reply_continuation (void *cls,
2705 GNUNET_PEER_Id tpid)
2707 struct PendingRequest *pr = cls;
2711 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2712 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2713 /* only one reply expected, done with the request! */
2714 destroy_pending_request (pr);
2716 case GNUNET_BLOCK_TYPE_ANY:
2717 case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2718 case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2728 * Transmit the given message by copying it to the target buffer
2729 * "buf". "buf" will be NULL and "size" zero if the socket was closed
2730 * for writing in the meantime. In that case, do nothing
2731 * (the disconnect or shutdown handler will take care of the rest).
2732 * If we were able to transmit messages and there are still more
2733 * pending, ask core again for further calls to this function.
2735 * @param cls closure, pointer to the 'struct ClientList*'
2736 * @param size number of bytes available in buf
2737 * @param buf where the callee should write the message
2738 * @return number of bytes written to buf
2741 transmit_to_client (void *cls,
2742 size_t size, void *buf)
2744 struct ClientList *cl = cls;
2746 struct ClientResponseMessage *creply;
2753 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2754 "Not sending reply, client communication problem.\n");
2759 while ( (NULL != (creply = cl->res_head) ) &&
2760 (creply->msize <= size) )
2762 memcpy (&cbuf[msize], &creply[1], creply->msize);
2763 msize += creply->msize;
2764 size -= creply->msize;
2765 GNUNET_CONTAINER_DLL_remove (cl->res_head,
2768 GNUNET_free (creply);
2771 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2773 GNUNET_TIME_UNIT_FOREVER_REL,
2774 &transmit_to_client,
2777 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2778 "Transmitted %u bytes to client\n",
2779 (unsigned int) msize);
2786 * Closure for "process_reply" function.
2788 struct ProcessReplyClosure
2791 * The data for the reply.
2796 * Who gave us this reply? NULL for local host (or DHT)
2798 struct ConnectedPeer *sender;
2801 * When the reply expires.
2803 struct GNUNET_TIME_Absolute expiration;
2811 * Type of the block.
2813 enum GNUNET_BLOCK_Type type;
2816 * How much was this reply worth to us?
2821 * Evaluation result (returned).
2823 enum GNUNET_BLOCK_EvaluationResult eval;
2826 * Did we finish processing the associated request?
2831 * Did we find a matching request?
2838 * We have received a reply; handle it!
2840 * @param cls response (struct ProcessReplyClosure)
2841 * @param key our query
2842 * @param value value in the hash map (info about the query)
2843 * @return GNUNET_YES (we should continue to iterate)
2846 process_reply (void *cls,
2847 const GNUNET_HashCode * key,
2850 struct ProcessReplyClosure *prq = cls;
2851 struct PendingRequest *pr = value;
2852 struct PendingMessage *reply;
2853 struct ClientResponseMessage *creply;
2854 struct ClientList *cl;
2855 struct PutMessage *pm;
2856 struct ConnectedPeer *cp;
2857 struct GNUNET_TIME_Relative cur_delay;
2861 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2862 "Matched result (type %u) for query `%s' with pending request\n",
2863 (unsigned int) prq->type,
2866 GNUNET_STATISTICS_update (stats,
2867 gettext_noop ("# replies received and matched"),
2870 if (prq->sender != NULL)
2872 /* FIXME: should we be more precise here and not use
2873 "start_time" but a peer-specific time stamp? */
2874 cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2875 prq->sender->avg_delay.value
2876 = (prq->sender->avg_delay.value *
2877 (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N;
2878 prq->sender->avg_priority
2879 = (prq->sender->avg_priority *
2880 (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2883 GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2884 [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
2886 GNUNET_PEER_change_rc (pr->cp->pid, 1);
2887 prq->sender->last_p2p_replies
2888 [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2893 if (NULL != prq->sender->last_client_replies
2894 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2895 GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2896 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2897 prq->sender->last_client_replies
2898 [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2899 = pr->client_request_list->client_list->client;
2900 GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2903 prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
2908 pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2913 case GNUNET_BLOCK_EVALUATION_OK_MORE:
2915 case GNUNET_BLOCK_EVALUATION_OK_LAST:
2916 while (NULL != pr->pending_head)
2917 destroy_pending_message_list_entry (pr->pending_head);
2920 if (pr->client_request_list != NULL)
2921 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
2923 GNUNET_DATASTORE_cancel (pr->qe);
2926 pr->do_remove = GNUNET_YES;
2927 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
2929 GNUNET_SCHEDULER_cancel (sched,
2931 pr->task = GNUNET_SCHEDULER_NO_TASK;
2933 GNUNET_break (GNUNET_YES ==
2934 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
2938 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
2939 GNUNET_STATISTICS_update (stats,
2940 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
2944 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2945 "Duplicate response `%s', discarding.\n",
2946 GNUNET_h2s (&mhash));
2948 return GNUNET_YES; /* duplicate */
2949 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
2950 return GNUNET_YES; /* wrong namespace */
2951 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
2954 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
2957 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
2958 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2959 _("Unsupported block type %u\n"),
2963 if (pr->client_request_list != NULL)
2965 if (pr->replies_seen_size == pr->replies_seen_off)
2966 GNUNET_array_grow (pr->replies_seen,
2967 pr->replies_seen_size,
2968 pr->replies_seen_size * 2 + 4);
2969 GNUNET_CRYPTO_hash (prq->data,
2971 &pr->replies_seen[pr->replies_seen_off++]);
2972 refresh_bloomfilter (pr);
2974 if (NULL == prq->sender)
2977 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2978 "Found result for query `%s' in local datastore\n",
2981 GNUNET_STATISTICS_update (stats,
2982 gettext_noop ("# results found locally"),
2986 prq->priority += pr->remaining_priority;
2987 pr->remaining_priority = 0;
2988 pr->results_found++;
2989 prq->request_found = GNUNET_YES;
2990 if (NULL != pr->client_request_list)
2992 GNUNET_STATISTICS_update (stats,
2993 gettext_noop ("# replies received for local clients"),
2996 cl = pr->client_request_list->client_list;
2997 msize = sizeof (struct PutMessage) + prq->size;
2998 creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
2999 creply->msize = msize;
3000 creply->client_list = cl;
3001 GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3005 pm = (struct PutMessage*) &creply[1];
3006 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3007 pm->header.size = htons (msize);
3008 pm->type = htonl (prq->type);
3009 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3010 memcpy (&pm[1], prq->data, prq->size);
3014 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3015 "Transmitting result for query `%s' to client\n",
3018 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3020 GNUNET_TIME_UNIT_FOREVER_REL,
3021 &transmit_to_client,
3024 GNUNET_break (cl->th != NULL);
3027 prq->finished = GNUNET_YES;
3028 destroy_pending_request (pr);
3035 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3036 "Transmitting result for query `%s' to other peer (PID=%u)\n",
3038 (unsigned int) cp->pid);
3040 GNUNET_STATISTICS_update (stats,
3041 gettext_noop ("# replies received for other peers"),
3044 msize = sizeof (struct PutMessage) + prq->size;
3045 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3046 reply->cont = &transmit_reply_continuation;
3047 reply->cont_cls = pr;
3048 reply->msize = msize;
3049 reply->priority = UINT32_MAX; /* send replies first! */
3050 pm = (struct PutMessage*) &reply[1];
3051 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3052 pm->header.size = htons (msize);
3053 pm->type = htonl (prq->type);
3054 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3055 memcpy (&pm[1], prq->data, prq->size);
3056 add_to_pending_messages_for_peer (cp, reply, pr);
3063 * Iterator called on each result obtained for a DHT
3064 * operation that expects a reply
3066 * @param cls closure
3067 * @param exp when will this value expire
3068 * @param key key of the result
3069 * @param get_path NULL-terminated array of pointers
3070 * to the peers on reverse GET path (or NULL if not recorded)
3071 * @param put_path NULL-terminated array of pointers
3072 * to the peers on the PUT path (or NULL if not recorded)
3073 * @param type type of the result
3074 * @param size number of bytes in data
3075 * @param data pointer to the result data
3078 process_dht_reply (void *cls,
3079 struct GNUNET_TIME_Absolute exp,
3080 const GNUNET_HashCode * key,
3081 const struct GNUNET_PeerIdentity * const *get_path,
3082 const struct GNUNET_PeerIdentity * const *put_path,
3083 enum GNUNET_BLOCK_Type type,
3087 struct PendingRequest *pr = cls;
3088 struct ProcessReplyClosure prq;
3090 memset (&prq, 0, sizeof (prq));
3092 prq.expiration = exp;
3095 process_reply (&prq, key, pr);
3101 * Continuation called to notify client about result of the
3104 * @param cls closure
3105 * @param success GNUNET_SYSERR on failure
3106 * @param msg NULL on success, otherwise an error message
3109 put_migration_continuation (void *cls,
3113 struct GNUNET_TIME_Absolute *start = cls;
3114 struct GNUNET_TIME_Relative delay;
3116 delay = GNUNET_TIME_absolute_get_duration (*start);
3117 GNUNET_free (start);
3118 GNUNET_LOAD_update (datastore_put_load,
3120 if (GNUNET_OK == success)
3122 GNUNET_STATISTICS_update (stats,
3123 gettext_noop ("# datastore 'put' failures"),
3130 * Handle P2P "PUT" message.
3132 * @param cls closure, always NULL
3133 * @param other the other peer involved (sender or receiver, NULL
3134 * for loopback messages where we are both sender and receiver)
3135 * @param message the actual message
3136 * @param latency reported latency of the connection with 'other'
3137 * @param distance reported distance (DV) to 'other'
3138 * @return GNUNET_OK to keep the connection open,
3139 * GNUNET_SYSERR to close it (signal serious error)
3142 handle_p2p_put (void *cls,
3143 const struct GNUNET_PeerIdentity *other,
3144 const struct GNUNET_MessageHeader *message,
3145 struct GNUNET_TIME_Relative latency,
3148 const struct PutMessage *put;
3151 enum GNUNET_BLOCK_Type type;
3152 struct GNUNET_TIME_Absolute expiration;
3153 GNUNET_HashCode query;
3154 struct ProcessReplyClosure prq;
3155 struct GNUNET_TIME_Absolute *start;
3156 struct GNUNET_TIME_Relative block_time;
3158 struct ConnectedPeer *cp;
3159 struct PendingMessage *pm;
3160 struct MigrationStopMessage *msm;
3162 msize = ntohs (message->size);
3163 if (msize < sizeof (struct PutMessage))
3166 return GNUNET_SYSERR;
3168 put = (const struct PutMessage*) message;
3169 dsize = msize - sizeof (struct PutMessage);
3170 type = ntohl (put->type);
3171 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3173 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3174 return GNUNET_SYSERR;
3176 GNUNET_BLOCK_get_key (block_ctx,
3182 GNUNET_break_op (0);
3183 return GNUNET_SYSERR;
3186 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3187 "Received result for query `%s' from peer `%4s'\n",
3188 GNUNET_h2s (&query),
3189 GNUNET_i2s (other));
3191 GNUNET_STATISTICS_update (stats,
3192 gettext_noop ("# replies received (overall)"),
3195 /* now, lookup 'query' */
3196 prq.data = (const void*) &put[1];
3198 prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3199 &other->hashPubKey);
3204 prq.expiration = expiration;
3206 prq.finished = GNUNET_NO;
3207 prq.request_found = GNUNET_NO;
3208 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3212 if (prq.sender != NULL)
3214 prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3215 prq.sender->trust += prq.priority;
3217 if (GNUNET_YES == active_migration)
3220 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3221 "Replicating result for query `%s' with priority %u\n",
3222 GNUNET_h2s (&query),
3225 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3226 *start = GNUNET_TIME_absolute_get ();
3227 GNUNET_DATASTORE_put (dsh,
3228 0, &query, dsize, &put[1],
3229 type, prq.priority, 1 /* anonymity */,
3231 1 + prq.priority, MAX_DATASTORE_QUEUE,
3232 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3233 &put_migration_continuation,
3236 putl = GNUNET_LOAD_get_load (datastore_put_load);
3237 if ( (GNUNET_NO == prq.request_found) &&
3238 ( (GNUNET_YES != active_migration) ||
3241 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3242 &other->hashPubKey);
3243 if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3244 return GNUNET_OK; /* already blocked */
3245 /* We're too busy; send MigrationStop message! */
3246 if (GNUNET_YES != active_migration)
3247 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3248 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3249 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3250 (unsigned int) (60000 * putl * putl)));
3252 cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3253 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
3254 sizeof (struct MigrationStopMessage));
3255 pm->msize = sizeof (struct MigrationStopMessage);
3256 pm->priority = UINT32_MAX;
3257 msm = (struct MigrationStopMessage*) &pm[1];
3258 msm->header.size = htons (sizeof (struct MigrationStopMessage));
3259 msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3260 msm->duration = GNUNET_TIME_relative_hton (block_time);
3261 add_to_pending_messages_for_peer (cp,
3270 * Handle P2P "MIGRATION_STOP" message.
3272 * @param cls closure, always NULL
3273 * @param other the other peer involved (sender or receiver, NULL
3274 * for loopback messages where we are both sender and receiver)
3275 * @param message the actual message
3276 * @param latency reported latency of the connection with 'other'
3277 * @param distance reported distance (DV) to 'other'
3278 * @return GNUNET_OK to keep the connection open,
3279 * GNUNET_SYSERR to close it (signal serious error)
3282 handle_p2p_migration_stop (void *cls,
3283 const struct GNUNET_PeerIdentity *other,
3284 const struct GNUNET_MessageHeader *message,
3285 struct GNUNET_TIME_Relative latency,
3288 struct ConnectedPeer *cp;
3289 const struct MigrationStopMessage *msm;
3291 msm = (const struct MigrationStopMessage*) message;
3292 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3293 &other->hashPubKey);
3299 cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3305 /* **************************** P2P GET Handling ************************ */
3309 * Closure for 'check_duplicate_request_{peer,client}'.
3311 struct CheckDuplicateRequestClosure
3314 * The new request we should check if it already exists.
3316 const struct PendingRequest *pr;
3319 * Existing request found by the checker, NULL if none.
3321 struct PendingRequest *have;
3326 * Iterator over entries in the 'query_request_map' that
3327 * tries to see if we have the same request pending from
3328 * the same client already.
3330 * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3331 * @param key current key code (query, ignored, must match)
3332 * @param value value in the hash map (a 'struct PendingRequest'
3333 * that already exists)
3334 * @return GNUNET_YES if we should continue to
3335 * iterate (no match yet)
3336 * GNUNET_NO if not (match found).
3339 check_duplicate_request_client (void *cls,
3340 const GNUNET_HashCode * key,
3343 struct CheckDuplicateRequestClosure *cdc = cls;
3344 struct PendingRequest *have = value;
3346 if (have->client_request_list == NULL)
3348 if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3359 * We're processing (local) results for a search request
3360 * from another peer. Pass applicable results to the
3361 * peer and if we are done either clean up (operation
3362 * complete) or forward to other peers (more results possible).
3364 * @param cls our closure (struct LocalGetContext)
3365 * @param key key for the content
3366 * @param size number of bytes in data
3367 * @param data content stored
3368 * @param type type of the content
3369 * @param priority priority of the content
3370 * @param anonymity anonymity-level for the content
3371 * @param expiration expiration time for the content
3372 * @param uid unique identifier for the datum;
3373 * maybe 0 if no unique identifier is available
3376 process_local_reply (void *cls,
3377 const GNUNET_HashCode * key,
3380 enum GNUNET_BLOCK_Type type,
3383 struct GNUNET_TIME_Absolute
3387 struct PendingRequest *pr = cls;
3388 struct ProcessReplyClosure prq;
3389 struct CheckDuplicateRequestClosure cdrc;
3390 GNUNET_HashCode query;
3391 unsigned int old_rf;
3396 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3397 "Done processing local replies, forwarding request to other peers.\n");
3400 if (pr->client_request_list != NULL)
3402 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
3404 /* Figure out if this is a duplicate request and possibly
3405 merge 'struct PendingRequest' entries */
3408 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3410 &check_duplicate_request_client,
3412 if (cdrc.have != NULL)
3415 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3416 "Received request for block `%s' twice from client, will only request once.\n",
3417 GNUNET_h2s (&pr->query));
3420 destroy_pending_request (pr);
3425 /* no more results */
3426 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3427 pr->task = GNUNET_SCHEDULER_add_now (sched,
3428 &forward_request_task,
3433 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3434 "New local response to `%s' of type %u.\n",
3438 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3441 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3442 "Found ONDEMAND block, performing on-demand encoding\n");
3444 GNUNET_STATISTICS_update (stats,
3445 gettext_noop ("# on-demand blocks matched requests"),
3449 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
3450 anonymity, expiration, uid,
3451 &process_local_reply,
3455 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3459 old_rf = pr->results_found;
3460 memset (&prq, 0, sizeof (prq));
3462 prq.expiration = expiration;
3465 GNUNET_BLOCK_get_key (block_ctx,
3472 GNUNET_DATASTORE_remove (dsh,
3476 GNUNET_TIME_UNIT_FOREVER_REL,
3478 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3482 prq.priority = priority;
3483 prq.finished = GNUNET_NO;
3484 prq.request_found = GNUNET_NO;
3485 process_reply (&prq, key, pr);
3486 if ( (old_rf == 0) &&
3487 (pr->results_found == 1) )
3488 update_datastore_delays (pr->start_time);
3489 if (prq.finished == GNUNET_YES)
3492 return; /* done here */
3493 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3495 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3498 if ( (pr->client_request_list == NULL) &&
3499 ( (GNUNET_YES == test_load_too_high()) ||
3500 (pr->results_found > 5 + 2 * pr->priority) ) )
3503 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3504 "Load too high, done with request\n");
3506 GNUNET_STATISTICS_update (stats,
3507 gettext_noop ("# processing result set cut short due to load"),
3510 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3513 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3518 * We've received a request with the specified priority. Bound it
3519 * according to how much we trust the given peer.
3521 * @param prio_in requested priority
3522 * @param cp the peer making the request
3523 * @return effective priority
3526 bound_priority (uint32_t prio_in,
3527 struct ConnectedPeer *cp)
3529 #define N ((double)128.0)
3534 ld = test_load_too_high ();
3535 if (ld == GNUNET_SYSERR)
3536 return 0; /* excess resources */
3537 ret = change_host_trust (cp, prio_in);
3540 if (ret > current_priorities + N)
3541 rret = current_priorities + N;
3545 = (current_priorities * (N-1) + rret)/N;
3553 * Iterator over entries in the 'query_request_map' that
3554 * tries to see if we have the same request pending from
3555 * the same peer already.
3557 * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3558 * @param key current key code (query, ignored, must match)
3559 * @param value value in the hash map (a 'struct PendingRequest'
3560 * that already exists)
3561 * @return GNUNET_YES if we should continue to
3562 * iterate (no match yet)
3563 * GNUNET_NO if not (match found).
3566 check_duplicate_request_peer (void *cls,
3567 const GNUNET_HashCode * key,
3570 struct CheckDuplicateRequestClosure *cdc = cls;
3571 struct PendingRequest *have = value;
3573 if (cdc->pr->target_pid == have->target_pid)
3583 * Handle P2P "GET" request.
3585 * @param cls closure, always NULL
3586 * @param other the other peer involved (sender or receiver, NULL
3587 * for loopback messages where we are both sender and receiver)
3588 * @param message the actual message
3589 * @param latency reported latency of the connection with 'other'
3590 * @param distance reported distance (DV) to 'other'
3591 * @return GNUNET_OK to keep the connection open,
3592 * GNUNET_SYSERR to close it (signal serious error)
3595 handle_p2p_get (void *cls,
3596 const struct GNUNET_PeerIdentity *other,
3597 const struct GNUNET_MessageHeader *message,
3598 struct GNUNET_TIME_Relative latency,
3601 struct PendingRequest *pr;
3602 struct ConnectedPeer *cp;
3603 struct ConnectedPeer *cps;
3604 struct CheckDuplicateRequestClosure cdc;
3605 struct GNUNET_TIME_Relative timeout;
3607 const struct GetMessage *gm;
3609 const GNUNET_HashCode *opt;
3612 uint32_t ttl_decrement;
3613 enum GNUNET_BLOCK_Type type;
3617 msize = ntohs(message->size);
3618 if (msize < sizeof (struct GetMessage))
3620 GNUNET_break_op (0);
3621 return GNUNET_SYSERR;
3623 gm = (const struct GetMessage*) message;
3624 type = ntohl (gm->type);
3625 bm = ntohl (gm->hash_bitmap);
3633 if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3635 GNUNET_break_op (0);
3636 return GNUNET_SYSERR;
3638 opt = (const GNUNET_HashCode*) &gm[1];
3639 bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3640 bm = ntohl (gm->hash_bitmap);
3642 cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3643 &other->hashPubKey);
3646 /* peer must have just disconnected */
3647 GNUNET_STATISTICS_update (stats,
3648 gettext_noop ("# requests dropped due to initiator not being connected"),
3651 return GNUNET_SYSERR;
3653 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3654 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3661 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3662 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3663 "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3664 GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3667 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3668 "Failed to find peer `%4s' in connection set. Dropping query.\n",
3669 GNUNET_i2s (other));
3671 GNUNET_STATISTICS_update (stats,
3672 gettext_noop ("# requests dropped due to missing reverse route"),
3675 /* FIXME: try connect? */
3678 /* note that we can really only check load here since otherwise
3679 peers could find out that we are overloaded by not being
3680 disconnected after sending us a malformed query... */
3682 /* FIXME: query priority should play
3683 a major role here! */
3684 ld = test_load_too_high ();
3685 if (GNUNET_YES == ld)
3688 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3689 "Dropping query from `%s', this peer is too busy.\n",
3690 GNUNET_i2s (other));
3692 GNUNET_STATISTICS_update (stats,
3693 gettext_noop ("# requests dropped due to high load"),
3698 /* FIXME: if ld == GNUNET_NO, forward
3699 instead of indirecting! */
3702 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3703 "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3704 GNUNET_h2s (&gm->query),
3705 (unsigned int) type,
3709 have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3710 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
3711 (have_ns ? sizeof(GNUNET_HashCode) : 0));
3714 pr->namespace = (GNUNET_HashCode*) &pr[1];
3715 memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3718 pr->mingle = ntohl (gm->filter_mutator);
3719 if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3720 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3721 pr->anonymity_level = 1;
3722 pr->priority = bound_priority (ntohl (gm->priority), cps);
3723 pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3724 pr->query = gm->query;
3725 /* decrement ttl (always) */
3726 ttl_decrement = 2 * TTL_DECREMENT +
3727 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3729 if ( (pr->ttl < 0) &&
3730 (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3734 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3739 GNUNET_STATISTICS_update (stats,
3740 gettext_noop ("# requests dropped due TTL underflow"),
3743 /* integer underflow => drop (should be very rare)! */
3747 pr->ttl -= ttl_decrement;
3748 pr->start_time = GNUNET_TIME_absolute_get ();
3750 /* get bloom filter */
3753 pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3756 pr->bf_size = bfsize;
3761 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3763 &check_duplicate_request_peer,
3765 if (cdc.have != NULL)
3767 if (cdc.have->start_time.value + cdc.have->ttl >=
3768 pr->start_time.value + pr->ttl)
3770 /* existing request has higher TTL, drop new one! */
3771 cdc.have->priority += pr->priority;
3772 destroy_pending_request (pr);
3774 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3775 "Have existing request with higher TTL, dropping new request.\n",
3776 GNUNET_i2s (other));
3778 GNUNET_STATISTICS_update (stats,
3779 gettext_noop ("# requests dropped due to higher-TTL request"),
3786 /* existing request has lower TTL, drop old one! */
3787 pr->priority += cdc.have->priority;
3788 /* Possible optimization: if we have applicable pending
3789 replies in 'cdc.have', we might want to move those over
3790 (this is a really rare special-case, so it is not clear
3791 that this would be worth it) */
3792 destroy_pending_request (cdc.have);
3793 /* keep processing 'pr'! */
3798 GNUNET_break (GNUNET_OK ==
3799 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3802 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3803 GNUNET_break (GNUNET_OK ==
3804 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3807 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3809 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3811 pr->start_time.value + pr->ttl);
3813 GNUNET_STATISTICS_update (stats,
3814 gettext_noop ("# P2P searches received"),
3817 GNUNET_STATISTICS_update (stats,
3818 gettext_noop ("# P2P searches active"),
3822 /* calculate change in traffic preference */
3823 cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
3824 /* process locally */
3825 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3826 type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3827 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3828 (pr->priority + 1));
3829 pr->qe = GNUNET_DATASTORE_get (dsh,
3833 MAX_DATASTORE_QUEUE,
3835 &process_local_reply,
3838 /* Are multiple results possible? If so, start processing remotely now! */
3841 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3842 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3843 /* only one result, wait for datastore */
3846 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3847 pr->task = GNUNET_SCHEDULER_add_now (sched,
3848 &forward_request_task,
3852 /* make sure we don't track too many requests */
3853 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3855 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3856 GNUNET_assert (pr != NULL);
3857 destroy_pending_request (pr);
3863 /* **************************** CS GET Handling ************************ */
3867 * Handle START_SEARCH-message (search request from client).
3869 * @param cls closure
3870 * @param client identification of the client
3871 * @param message the actual message
3874 handle_start_search (void *cls,
3875 struct GNUNET_SERVER_Client *client,
3876 const struct GNUNET_MessageHeader *message)
3878 static GNUNET_HashCode all_zeros;
3879 const struct SearchMessage *sm;
3880 struct ClientList *cl;
3881 struct ClientRequestList *crl;
3882 struct PendingRequest *pr;
3885 enum GNUNET_BLOCK_Type type;
3887 msize = ntohs (message->size);
3888 if ( (msize < sizeof (struct SearchMessage)) ||
3889 (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3892 GNUNET_SERVER_receive_done (client,
3896 GNUNET_STATISTICS_update (stats,
3897 gettext_noop ("# client searches received"),
3900 sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3901 sm = (const struct SearchMessage*) message;
3902 type = ntohl (sm->type);
3904 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3905 "Received request for `%s' of type %u from local client\n",
3906 GNUNET_h2s (&sm->query),
3907 (unsigned int) type);
3910 while ( (cl != NULL) &&
3911 (cl->client != client) )
3915 cl = GNUNET_malloc (sizeof (struct ClientList));
3916 cl->client = client;
3917 GNUNET_SERVER_client_keep (client);
3918 cl->next = client_list;
3921 /* detect duplicate KBLOCK requests */
3922 if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
3923 (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
3924 (type == GNUNET_BLOCK_TYPE_ANY) )
3927 while ( (crl != NULL) &&
3928 ( (0 != memcmp (&crl->req->query,
3930 sizeof (GNUNET_HashCode))) ||
3931 (crl->req->type != type) ) )
3936 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3937 "Have existing request, merging content-seen lists.\n");
3940 /* Duplicate request (used to send long list of
3941 known/blocked results); merge 'pr->replies_seen'
3942 and update bloom filter */
3943 GNUNET_array_grow (pr->replies_seen,
3944 pr->replies_seen_size,
3945 pr->replies_seen_off + sc);
3946 memcpy (&pr->replies_seen[pr->replies_seen_off],
3948 sc * sizeof (GNUNET_HashCode));
3949 pr->replies_seen_off += sc;
3950 refresh_bloomfilter (pr);
3951 GNUNET_STATISTICS_update (stats,
3952 gettext_noop ("# client searches updated (merged content seen list)"),
3955 GNUNET_SERVER_receive_done (client,
3960 GNUNET_STATISTICS_update (stats,
3961 gettext_noop ("# client searches active"),
3964 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
3965 ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
3966 crl = GNUNET_malloc (sizeof (struct ClientRequestList));
3967 memset (crl, 0, sizeof (struct ClientRequestList));
3968 crl->client_list = cl;
3969 GNUNET_CONTAINER_DLL_insert (cl->rl_head,
3974 pr->client_request_list = crl;
3975 GNUNET_array_grow (pr->replies_seen,
3976 pr->replies_seen_size,
3978 memcpy (pr->replies_seen,
3980 sc * sizeof (GNUNET_HashCode));
3981 pr->replies_seen_off = sc;
3982 pr->anonymity_level = ntohl (sm->anonymity_level);
3983 pr->start_time = GNUNET_TIME_absolute_get ();
3984 refresh_bloomfilter (pr);
3985 pr->query = sm->query;
3986 if (0 == (1 & ntohl (sm->options)))
3987 pr->local_only = GNUNET_NO;
3989 pr->local_only = GNUNET_YES;
3992 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3993 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3994 if (0 != memcmp (&sm->target,
3996 sizeof (GNUNET_HashCode)))
3997 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
3999 case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4000 pr->namespace = (GNUNET_HashCode*) &pr[1];
4001 memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4006 GNUNET_break (GNUNET_OK ==
4007 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4010 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4011 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4012 type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4013 pr->qe = GNUNET_DATASTORE_get (dsh,
4017 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
4018 &process_local_reply,
4023 /* **************************** Startup ************************ */
4026 * Process fs requests.
4028 * @param s scheduler to use
4029 * @param server the initialized server
4030 * @param c configuration to use
4033 main_init (struct GNUNET_SCHEDULER_Handle *s,
4034 struct GNUNET_SERVER_Handle *server,
4035 const struct GNUNET_CONFIGURATION_Handle *c)
4037 static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4040 GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4042 GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4043 { &handle_p2p_migration_stop,
4044 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4045 sizeof (struct MigrationStopMessage) },
4048 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4049 {&GNUNET_FS_handle_index_start, NULL,
4050 GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4051 {&GNUNET_FS_handle_index_list_get, NULL,
4052 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4053 {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
4054 sizeof (struct UnindexMessage) },
4055 {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
4059 unsigned long long enc = 128;
4063 stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4064 min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4066 GNUNET_CONFIGURATION_get_value_number (cfg,
4068 "MAX_PENDING_REQUESTS",
4069 &max_pending_requests)) ||
4071 GNUNET_CONFIGURATION_get_value_number (cfg,
4073 "EXPECTED_NEIGHBOUR_COUNT",
4076 GNUNET_CONFIGURATION_get_value_time (cfg,
4078 "MIN_MIGRATION_DELAY",
4079 &min_migration_delay)) )
4081 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4082 _("Configuration fails to specify certain parameters, assuming default values."));
4084 connected_peers = GNUNET_CONTAINER_multihashmap_create (enc);
4085 query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4086 peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4087 requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4088 core = GNUNET_CORE_connect (sched,
4090 GNUNET_TIME_UNIT_FOREVER_REL,
4093 &peer_connect_handler,
4094 &peer_disconnect_handler,
4101 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4102 _("Failed to connect to `%s' service.\n"),
4104 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4105 connected_peers = NULL;
4106 GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4107 query_request_map = NULL;
4108 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4109 requests_by_expiration_heap = NULL;
4110 GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4111 peer_request_map = NULL;
4114 GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4117 return GNUNET_SYSERR;
4119 /* FIXME: distinguish between sending and storing in options? */
4120 if (active_migration)
4122 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4123 _("Content migration is enabled, will start to gather data\n"));
4124 consider_migration_gathering ();
4126 consider_dht_put_gathering (NULL);
4127 GNUNET_SERVER_disconnect_notify (server,
4128 &handle_client_disconnect,
4130 GNUNET_assert (GNUNET_OK ==
4131 GNUNET_CONFIGURATION_get_value_filename (cfg,
4135 GNUNET_DISK_directory_create (trustDirectory);
4136 GNUNET_SCHEDULER_add_with_priority (sched,
4137 GNUNET_SCHEDULER_PRIORITY_HIGH,
4138 &cron_flush_trust, NULL);
4141 GNUNET_SERVER_add_handlers (server, handlers);
4142 GNUNET_SCHEDULER_add_delayed (sched,
4143 GNUNET_TIME_UNIT_FOREVER_REL,
4151 * Process fs requests.
4153 * @param cls closure
4154 * @param sched scheduler to use
4155 * @param server the initialized server
4156 * @param cfg configuration to use
4160 struct GNUNET_SCHEDULER_Handle *sched,
4161 struct GNUNET_SERVER_Handle *server,
4162 const struct GNUNET_CONFIGURATION_Handle *cfg)
4164 active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4167 dsh = GNUNET_DATASTORE_connect (cfg,
4171 GNUNET_SCHEDULER_shutdown (sched);
4174 datastore_get_load = GNUNET_LOAD_value_init ();
4175 datastore_put_load = GNUNET_LOAD_value_init ();
4176 block_cfg = GNUNET_CONFIGURATION_create ();
4177 GNUNET_CONFIGURATION_set_value_string (block_cfg,
4181 block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4182 GNUNET_assert (NULL != block_ctx);
4183 dht_handle = GNUNET_DHT_connect (sched,
4186 if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4187 (GNUNET_OK != main_init (sched, server, cfg)) )
4189 GNUNET_SCHEDULER_shutdown (sched);
4190 GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4192 GNUNET_DHT_disconnect (dht_handle);
4194 GNUNET_BLOCK_context_destroy (block_ctx);
4196 GNUNET_CONFIGURATION_destroy (block_cfg);
4198 GNUNET_LOAD_value_free (datastore_get_load);
4199 datastore_get_load = NULL;
4200 GNUNET_LOAD_value_free (datastore_put_load);
4201 datastore_put_load = NULL;
4208 * The main function for the fs service.
4210 * @param argc number of arguments from the command line
4211 * @param argv command line arguments
4212 * @return 0 ok, 1 on error
4215 main (int argc, char *const *argv)
4217 return (GNUNET_OK ==
4218 GNUNET_SERVICE_run (argc,
4221 GNUNET_SERVICE_OPTION_NONE,
4222 &run, NULL)) ? 0 : 1;
4225 /* end of gnunet-service-fs.c */