2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs.c
23 * @brief gnunet anonymity protocol implementation
24 * @author Christian Grothoff
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33 #include "gnunet_dht_service.h"
34 #include "gnunet_datastore_service.h"
35 #include "gnunet_load_lib.h"
36 #include "gnunet_peer_lib.h"
37 #include "gnunet_protocols.h"
38 #include "gnunet_signatures.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet-service-fs_indexing.h"
44 #define DEBUG_FS GNUNET_NO
47 * Should we introduce random latency in processing? Required for proper
48 * implementation of GAP, but can be disabled for performance evaluation of
49 * the basic routing algorithm.
51 * Note that with delays enabled, performance can be significantly lower
52 * (several orders of magnitude in 2-peer test runs); if you want to
53 * measure throughput of other components, set this to NO. Also, you
54 * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
55 * a rather wasteful mode of operation (that might still get the highest
56 * throughput overall).
58 #define SUPPORT_DELAYS GNUNET_NO
61 * Size for the hash map for DHT requests from the FS
62 * service. Should be about the number of concurrent
63 * DHT requests we plan to make.
65 #define FS_DHT_HT_SIZE 1024
68 * How often do we flush trust values to disk?
70 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
73 * How often do we at most PUT content into the DHT?
75 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
78 * Inverse of the probability that we will submit the same query
79 * to the same peer again. If the same peer already got the query
80 * repeatedly recently, the probability is multiplied by the inverse
81 * of this number each time. Note that we only try about every TTL_DECREMENT/2
82 * plus MAX_CORK_DELAY (so roughly every 3.5s).
84 * Note that this factor is a key influence to performance in small
85 * networks (especially test networks of 2 peers) because if there is
86 * only a single peer with the data, this value will determine how
87 * soon we might re-try. For example, a value of 3 can result in
88 * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
89 * give us 5 MB/s. OTOH, obviously re-trying the same peer can be
90 * rather inefficient in larger networks, hence picking 1 is in
91 * general not the best choice.
93 #define RETRY_PROBABILITY_INV 1
96 * What is the maximum delay for a P2P FS message (in our interaction
97 * with core)? FS-internal delays are another story. The value is
98 * chosen based on the 32k block size. Given that peers typcially
99 * have at least 1 kb/s bandwidth, 45s waits give us a chance to
100 * transmit one message even to the lowest-bandwidth peers.
102 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
105 * Maximum number of requests (from other peers, overall) that we're
106 * willing to have pending at any given point in time. Can be changed
107 * via the configuration file (32k is just the default).
109 static unsigned long long max_pending_requests = (32 * 1024);
113 * Information we keep for each pending reply. The
114 * actual message follows at the end of this struct.
116 struct PendingMessage;
119 * Function called upon completion of a transmission.
122 * @param pid ID of receiving peer, 0 on transmission error
124 typedef void (*TransmissionContinuation)(void * cls,
125 GNUNET_PEER_Id tpid);
129 * Information we keep for each pending message (GET/PUT). The
130 * actual message follows at the end of this struct.
132 struct PendingMessage
135 * This is a doubly-linked list of messages to the same peer.
137 struct PendingMessage *next;
140 * This is a doubly-linked list of messages to the same peer.
142 struct PendingMessage *prev;
145 * Entry in pending message list for this pending message.
147 struct PendingMessageList *pml;
150 * Function to call immediately once we have transmitted this
153 TransmissionContinuation cont;
161 * Do not transmit this pending message until this deadline.
163 struct GNUNET_TIME_Absolute delay_until;
166 * Size of the reply; actual reply message follows
167 * at the end of this struct.
172 * How important is this message for us?
180 * Information about a peer that we are connected to.
181 * We track data that is useful for determining which
182 * peers should receive our requests. We also keep
183 * a list of messages to transmit to this peer.
189 * List of the last clients for which this peer successfully
192 struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
195 * List of the last PIDs for which
196 * this peer successfully answered a query;
197 * We use 0 to indicate no successful reply.
199 GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
202 * Average delay between sending the peer a request and
203 * getting a reply (only calculated over the requests for
204 * which we actually got a reply). Calculated
205 * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
207 struct GNUNET_TIME_Relative avg_delay;
210 * Point in time until which this peer does not want us to migrate content
213 struct GNUNET_TIME_Absolute migration_blocked;
216 * Time until when we blocked this peer from migrating
219 struct GNUNET_TIME_Absolute last_migration_block;
222 * Transmission times for the last MAX_QUEUE_PER_PEER
223 * requests for this peer. Used as a ring buffer, current
224 * offset is stored in 'last_request_times_off'. If the
225 * oldest entry is more recent than the 'avg_delay', we should
226 * not send any more requests right now.
228 struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
231 * Handle for an active request for transmission to this
234 struct GNUNET_CORE_TransmitHandle *cth;
237 * Messages (replies, queries, content migration) we would like to
238 * send to this peer in the near future. Sorted by priority, head.
240 struct PendingMessage *pending_messages_head;
243 * Messages (replies, queries, content migration) we would like to
244 * send to this peer in the near future. Sorted by priority, tail.
246 struct PendingMessage *pending_messages_tail;
249 * How long does it typically take for us to transmit a message
250 * to this peer? (delay between the request being issued and
251 * the callback being invoked).
253 struct GNUNET_LOAD_Value *transmission_delay;
256 * Time when the last transmission request was issued.
258 struct GNUNET_TIME_Absolute last_transmission_request_start;
261 * ID of delay task for scheduling transmission.
263 GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
266 * Average priority of successful replies. Calculated
267 * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
272 * Increase in traffic preference still to be submitted
273 * to the core service for this peer.
275 uint64_t inc_preference;
278 * Trust rating for this peer
283 * Trust rating for this peer on disk.
288 * The peer's identity.
293 * Size of the linked list of 'pending_messages'.
295 unsigned int pending_requests;
298 * Which offset in "last_p2p_replies" will be updated next?
299 * (we go round-robin).
301 unsigned int last_p2p_replies_woff;
304 * Which offset in "last_client_replies" will be updated next?
305 * (we go round-robin).
307 unsigned int last_client_replies_woff;
310 * Current offset into 'last_request_times' ring buffer.
312 unsigned int last_request_times_off;
318 * Information we keep for each pending request. We should try to
319 * keep this struct as small as possible since its memory consumption
320 * is key to how many requests we can have pending at once.
322 struct PendingRequest;
326 * Doubly-linked list of requests we are performing
327 * on behalf of the same client.
329 struct ClientRequestList
333 * This is a doubly-linked list.
335 struct ClientRequestList *next;
338 * This is a doubly-linked list.
340 struct ClientRequestList *prev;
343 * Request this entry represents.
345 struct PendingRequest *req;
348 * Client list this request belongs to.
350 struct ClientList *client_list;
356 * Replies to be transmitted to the client. The actual
357 * response message is allocated after this struct.
359 struct ClientResponseMessage
362 * This is a doubly-linked list.
364 struct ClientResponseMessage *next;
367 * This is a doubly-linked list.
369 struct ClientResponseMessage *prev;
372 * Client list entry this response belongs to.
374 struct ClientList *client_list;
377 * Number of bytes in the response.
384 * Linked list of clients we are performing requests
390 * This is a linked list.
392 struct ClientList *next;
395 * ID of a client making a request, NULL if this entry is for a
398 struct GNUNET_SERVER_Client *client;
401 * Head of list of requests performed on behalf
402 * of this client right now.
404 struct ClientRequestList *rl_head;
407 * Tail of list of requests performed on behalf
408 * of this client right now.
410 struct ClientRequestList *rl_tail;
413 * Head of linked list of responses.
415 struct ClientResponseMessage *res_head;
418 * Tail of linked list of responses.
420 struct ClientResponseMessage *res_tail;
423 * Context for sending replies.
425 struct GNUNET_CONNECTION_TransmitHandle *th;
431 * Information about a peer that we have forwarded this
432 * request to already.
434 struct UsedTargetEntry
437 * What was the last time we have transmitted this request to this
440 struct GNUNET_TIME_Absolute last_request_time;
443 * How often have we transmitted this request to this peer?
445 unsigned int num_requests;
448 * PID of the target peer.
459 * Doubly-linked list of messages we are performing
460 * due to a pending request.
462 struct PendingMessageList
466 * This is a doubly-linked list of messages on behalf of the same request.
468 struct PendingMessageList *next;
471 * This is a doubly-linked list of messages on behalf of the same request.
473 struct PendingMessageList *prev;
476 * Message this entry represents.
478 struct PendingMessage *pm;
481 * Request this entry belongs to.
483 struct PendingRequest *req;
486 * Peer this message is targeted for.
488 struct ConnectedPeer *target;
494 * Information we keep for each pending request. We should try to
495 * keep this struct as small as possible since its memory consumption
496 * is key to how many requests we can have pending at once.
498 struct PendingRequest
502 * If this request was made by a client, this is our entry in the
503 * client request list; otherwise NULL.
505 struct ClientRequestList *client_request_list;
508 * Entry of peer responsible for this entry (if this request
509 * was made by a peer).
511 struct ConnectedPeer *cp;
514 * If this is a namespace query, pointer to the hash of the public
515 * key of the namespace; otherwise NULL. Pointer will be to the
516 * end of this struct (so no need to free it).
518 const GNUNET_HashCode *namespace;
521 * Bloomfilter we use to filter out replies that we don't care about
522 * (anymore). NULL as long as we are interested in all replies.
524 struct GNUNET_CONTAINER_BloomFilter *bf;
527 * Context of our GNUNET_CORE_peer_change_preference call.
529 struct GNUNET_CORE_InformationRequestContext *irc;
532 * Reference to DHT get operation for this request (or NULL).
534 struct GNUNET_DHT_GetHandle *dht_get;
537 * Hash code of all replies that we have seen so far (only valid
538 * if client is not NULL since we only track replies like this for
541 GNUNET_HashCode *replies_seen;
544 * Node in the heap representing this entry; NULL
545 * if we have no heap node.
547 struct GNUNET_CONTAINER_HeapNode *hnode;
550 * Head of list of messages being performed on behalf of this
553 struct PendingMessageList *pending_head;
556 * Tail of list of messages being performed on behalf of this
559 struct PendingMessageList *pending_tail;
562 * When did we first see this request (form this peer), or, if our
563 * client is initiating, when did we last initiate a search?
565 struct GNUNET_TIME_Absolute start_time;
568 * The query that this request is for.
570 GNUNET_HashCode query;
573 * The task responsible for transmitting queries
576 GNUNET_SCHEDULER_TaskIdentifier task;
579 * (Interned) Peer identifier that identifies a preferred target
582 GNUNET_PEER_Id target_pid;
585 * (Interned) Peer identifiers of peers that have already
586 * received our query for this content.
588 struct UsedTargetEntry *used_targets;
591 * Our entry in the queue (non-NULL while we wait for our
592 * turn to interact with the local database).
594 struct GNUNET_DATASTORE_QueueEntry *qe;
597 * Size of the 'bf' (in bytes).
602 * Desired anonymity level; only valid for requests from a local client.
604 uint32_t anonymity_level;
607 * How many entries in "used_targets" are actually valid?
609 unsigned int used_targets_off;
612 * How long is the "used_targets" array?
614 unsigned int used_targets_size;
617 * Number of results found for this request.
619 unsigned int results_found;
622 * How many entries in "replies_seen" are actually valid?
624 unsigned int replies_seen_off;
627 * How long is the "replies_seen" array?
629 unsigned int replies_seen_size;
632 * Priority with which this request was made. If one of our clients
633 * made the request, then this is the current priority that we are
634 * using when initiating the request. This value is used when
635 * we decide to reward other peers with trust for providing a reply.
640 * Priority points left for us to spend when forwarding this request
643 uint32_t remaining_priority;
646 * Number to mingle hashes for bloom-filter tests with.
651 * TTL with which we saw this request (or, if we initiated, TTL that
652 * we used for the request).
657 * Type of the content that this request is for.
659 enum GNUNET_BLOCK_Type type;
662 * Remove this request after transmission of the current response.
667 * GNUNET_YES if we should not forward this request to other peers.
672 * GNUNET_YES if we should not forward this request to other peers.
680 * Block that is ready for migration to other peers. Actual data is at the end of the block.
682 struct MigrationReadyBlock
686 * This is a doubly-linked list.
688 struct MigrationReadyBlock *next;
691 * This is a doubly-linked list.
693 struct MigrationReadyBlock *prev;
696 * Query for the block.
698 GNUNET_HashCode query;
701 * When does this block expire?
703 struct GNUNET_TIME_Absolute expiration;
706 * Peers we would consider forwarding this
707 * block to. Zero for empty entries.
709 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
717 * Number of targets already used.
719 unsigned int used_targets;
724 enum GNUNET_BLOCK_Type type;
729 * Our connection to the datastore.
731 static struct GNUNET_DATASTORE_Handle *dsh;
736 static struct GNUNET_BLOCK_Context *block_ctx;
739 * Our block configuration.
741 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
746 static struct GNUNET_SCHEDULER_Handle *sched;
751 static const struct GNUNET_CONFIGURATION_Handle *cfg;
754 * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
756 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
759 * Map of peer identifiers to "struct PendingRequest" (for that peer).
761 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
764 * Map of query identifiers to "struct PendingRequest" (for that query).
766 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
769 * Heap with the request that will expire next at the top. Contains
770 * pointers of type "struct PendingRequest*"; these will *also* be
771 * aliased from the "requests_by_peer" data structures and the
772 * "requests_by_query" table. Note that requests from our clients
773 * don't expire and are thus NOT in the "requests_by_expiration"
774 * (or the "requests_by_peer" tables).
776 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
779 * Handle for reporting statistics.
781 static struct GNUNET_STATISTICS_Handle *stats;
784 * Linked list of clients we are currently processing requests for.
786 static struct ClientList *client_list;
789 * Pointer to handle to the core service (points to NULL until we've
792 static struct GNUNET_CORE_Handle *core;
795 * Head of linked list of blocks that can be migrated.
797 static struct MigrationReadyBlock *mig_head;
800 * Tail of linked list of blocks that can be migrated.
802 static struct MigrationReadyBlock *mig_tail;
805 * Request to datastore for migration (or NULL).
807 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
810 * Request to datastore for DHT PUTs (or NULL).
812 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
815 * Type we will request for the next DHT PUT round from the datastore.
817 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
820 * Where do we store trust information?
822 static char *trustDirectory;
825 * ID of task that collects blocks for migration.
827 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
830 * ID of task that collects blocks for DHT PUTs.
832 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
835 * What is the maximum frequency at which we are allowed to
836 * poll the datastore for migration content?
838 static struct GNUNET_TIME_Relative min_migration_delay;
841 * Handle for DHT operations.
843 static struct GNUNET_DHT_Handle *dht_handle;
846 * Size of the doubly-linked list of migration blocks.
848 static unsigned int mig_size;
851 * Are we allowed to migrate content to this peer.
853 static int active_migration;
856 * How many entires with zero anonymity do we currently estimate
857 * to have in the database?
859 static unsigned int zero_anonymity_count_estimate;
862 * Typical priorities we're seeing from other peers right now. Since
863 * most priorities will be zero, this value is the weighted average of
864 * non-zero priorities seen "recently". In order to ensure that new
865 * values do not dramatically change the ratio, values are first
866 * "capped" to a reasonable range (+N of the current value) and then
867 * averaged into the existing value by a ratio of 1:N. Hence
868 * receiving the largest possible priority can still only raise our
869 * "current_priorities" by at most 1.
871 static double current_priorities;
874 * Datastore 'GET' load tracking.
876 static struct GNUNET_LOAD_Value *datastore_get_load;
879 * Datastore 'PUT' load tracking.
881 static struct GNUNET_LOAD_Value *datastore_put_load;
884 * How long do requests typically stay in the routing table?
886 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
889 * We've just now completed a datastore request. Update our
890 * datastore load calculations.
892 * @param start time when the datastore request was issued
895 update_datastore_delays (struct GNUNET_TIME_Absolute start)
897 struct GNUNET_TIME_Relative delay;
899 delay = GNUNET_TIME_absolute_get_duration (start);
900 GNUNET_LOAD_update (datastore_get_load,
906 * Get the filename under which we would store the GNUNET_HELLO_Message
907 * for the given host and protocol.
908 * @return filename of the form DIRECTORY/HOSTID
911 get_trust_filename (const struct GNUNET_PeerIdentity *id)
913 struct GNUNET_CRYPTO_HashAsciiEncoded fil;
916 GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
917 GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
924 * Transmit messages by copying it to the target buffer
925 * "buf". "buf" will be NULL and "size" zero if the socket was closed
926 * for writing in the meantime. In that case, do nothing
927 * (the disconnect or shutdown handler will take care of the rest).
928 * If we were able to transmit messages and there are still more
929 * pending, ask core again for further calls to this function.
931 * @param cls closure, pointer to the 'struct ConnectedPeer*'
932 * @param size number of bytes available in buf
933 * @param buf where the callee should write the message
934 * @return number of bytes written to buf
937 transmit_to_peer (void *cls,
938 size_t size, void *buf);
941 /* ******************* clean up functions ************************ */
944 * Delete the given migration block.
946 * @param mb block to delete
949 delete_migration_block (struct MigrationReadyBlock *mb)
951 GNUNET_CONTAINER_DLL_remove (mig_head,
954 GNUNET_PEER_decrement_rcs (mb->target_list,
955 MIGRATION_LIST_SIZE);
962 * Compare the distance of two peers to a key.
965 * @param p1 first peer
966 * @param p2 second peer
967 * @return GNUNET_YES if P1 is closer to key than P2
970 is_closer (const GNUNET_HashCode *key,
971 const struct GNUNET_PeerIdentity *p1,
972 const struct GNUNET_PeerIdentity *p2)
974 return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
981 * Consider migrating content to a given peer.
983 * @param cls 'struct MigrationReadyBlock*' to select
984 * targets for (or NULL for none)
985 * @param key ID of the peer
986 * @param value 'struct ConnectedPeer' of the peer
987 * @return GNUNET_YES (always continue iteration)
990 consider_migration (void *cls,
991 const GNUNET_HashCode *key,
994 struct MigrationReadyBlock *mb = cls;
995 struct ConnectedPeer *cp = value;
996 struct MigrationReadyBlock *pos;
997 struct GNUNET_PeerIdentity cppid;
998 struct GNUNET_PeerIdentity otherpid;
999 struct GNUNET_PeerIdentity worstpid;
1004 /* consider 'cp' as a migration target for mb */
1005 if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
1006 return GNUNET_YES; /* peer has requested no migration! */
1009 GNUNET_PEER_resolve (cp->pid,
1011 repl = MIGRATION_LIST_SIZE;
1012 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1014 if (mb->target_list[i] == 0)
1016 mb->target_list[i] = cp->pid;
1017 GNUNET_PEER_change_rc (mb->target_list[i], 1);
1018 repl = MIGRATION_LIST_SIZE;
1021 GNUNET_PEER_resolve (mb->target_list[i],
1023 if ( (repl == MIGRATION_LIST_SIZE) &&
1024 is_closer (&mb->query,
1029 worstpid = otherpid;
1031 else if ( (repl != MIGRATION_LIST_SIZE) &&
1032 (is_closer (&mb->query,
1037 worstpid = otherpid;
1040 if (repl != MIGRATION_LIST_SIZE)
1042 GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1043 mb->target_list[repl] = cp->pid;
1044 GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1048 /* consider scheduling transmission to cp for content migration */
1049 if (cp->cth != NULL)
1055 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1057 if (cp->pid == pos->target_list[i])
1062 msize = GNUNET_MIN (msize,
1070 return GNUNET_YES; /* no content available */
1072 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073 "Trying to migrate at least %u bytes to peer `%s'\n",
1077 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1079 GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1080 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1083 = GNUNET_CORE_notify_transmit_ready (core,
1084 0, GNUNET_TIME_UNIT_FOREVER_REL,
1085 (const struct GNUNET_PeerIdentity*) key,
1086 msize + sizeof (struct PutMessage),
1094 * Task that is run periodically to obtain blocks for content
1098 * @param tc scheduler context (also unused)
1101 gather_migration_blocks (void *cls,
1102 const struct GNUNET_SCHEDULER_TaskContext *tc);
1108 * Task that is run periodically to obtain blocks for DHT PUTs.
1110 * @param cls type of blocks to gather
1111 * @param tc scheduler context (unused)
1114 gather_dht_put_blocks (void *cls,
1115 const struct GNUNET_SCHEDULER_TaskContext *tc);
1119 * If the migration task is not currently running, consider
1120 * (re)scheduling it with the appropriate delay.
1123 consider_migration_gathering ()
1125 struct GNUNET_TIME_Relative delay;
1131 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1133 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1135 delay = GNUNET_TIME_relative_divide (delay,
1136 MAX_MIGRATION_QUEUE);
1137 delay = GNUNET_TIME_relative_max (delay,
1138 min_migration_delay);
1139 mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1141 &gather_migration_blocks,
1147 * If the DHT PUT gathering task is not currently running, consider
1148 * (re)scheduling it with the appropriate delay.
1151 consider_dht_put_gathering (void *cls)
1153 struct GNUNET_TIME_Relative delay;
1159 if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1161 if (zero_anonymity_count_estimate > 0)
1163 delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1164 zero_anonymity_count_estimate);
1165 delay = GNUNET_TIME_relative_min (delay,
1170 /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1171 (hopefully) appear */
1172 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1174 dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1176 &gather_dht_put_blocks,
1182 * Process content offered for migration.
1184 * @param cls closure
1185 * @param key key for the content
1186 * @param size number of bytes in data
1187 * @param data content stored
1188 * @param type type of the content
1189 * @param priority priority of the content
1190 * @param anonymity anonymity-level for the content
1191 * @param expiration expiration time for the content
1192 * @param uid unique identifier for the datum;
1193 * maybe 0 if no unique identifier is available
1196 process_migration_content (void *cls,
1197 const GNUNET_HashCode * key,
1200 enum GNUNET_BLOCK_Type type,
1203 struct GNUNET_TIME_Absolute
1204 expiration, uint64_t uid)
1206 struct MigrationReadyBlock *mb;
1211 if (mig_size < MAX_MIGRATION_QUEUE)
1212 consider_migration_gathering ();
1215 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1218 GNUNET_FS_handle_on_demand_block (key, size, data,
1219 type, priority, anonymity,
1221 &process_migration_content,
1224 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1229 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1230 "Retrieved block `%s' of type %u for migration\n",
1234 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1236 mb->expiration = expiration;
1239 memcpy (&mb[1], data, size);
1240 GNUNET_CONTAINER_DLL_insert_after (mig_head,
1245 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1246 &consider_migration,
1248 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1253 * Function called upon completion of the DHT PUT operation.
1256 dht_put_continuation (void *cls,
1257 const struct GNUNET_SCHEDULER_TaskContext *tc)
1259 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1264 * Store content in DHT.
1266 * @param cls closure
1267 * @param key key for the content
1268 * @param size number of bytes in data
1269 * @param data content stored
1270 * @param type type of the content
1271 * @param priority priority of the content
1272 * @param anonymity anonymity-level for the content
1273 * @param expiration expiration time for the content
1274 * @param uid unique identifier for the datum;
1275 * maybe 0 if no unique identifier is available
1278 process_dht_put_content (void *cls,
1279 const GNUNET_HashCode * key,
1282 enum GNUNET_BLOCK_Type type,
1285 struct GNUNET_TIME_Absolute
1286 expiration, uint64_t uid)
1288 static unsigned int counter;
1289 static GNUNET_HashCode last_vhash;
1290 static GNUNET_HashCode vhash;
1295 consider_dht_put_gathering (cls);
1298 /* slightly funky code to estimate the total number of values with zero
1299 anonymity from the maximum observed length of a monotonically increasing
1300 sequence of hashes over the contents */
1301 GNUNET_CRYPTO_hash (data, size, &vhash);
1302 if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1304 if (zero_anonymity_count_estimate > 0)
1305 zero_anonymity_count_estimate /= 2;
1311 if (zero_anonymity_count_estimate < (1 << counter))
1312 zero_anonymity_count_estimate = (1 << counter);
1314 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1315 "Retrieved block `%s' of type %u for DHT PUT\n",
1319 GNUNET_DHT_put (dht_handle,
1326 GNUNET_TIME_UNIT_FOREVER_REL,
1327 &dht_put_continuation,
1333 * Task that is run periodically to obtain blocks for content
1337 * @param tc scheduler context (also unused)
1340 gather_migration_blocks (void *cls,
1341 const struct GNUNET_SCHEDULER_TaskContext *tc)
1343 mig_task = GNUNET_SCHEDULER_NO_TASK;
1346 mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1347 GNUNET_TIME_UNIT_FOREVER_REL,
1348 &process_migration_content, NULL);
1349 GNUNET_assert (mig_qe != NULL);
1355 * Task that is run periodically to obtain blocks for DHT PUTs.
1357 * @param cls type of blocks to gather
1358 * @param tc scheduler context (unused)
1361 gather_dht_put_blocks (void *cls,
1362 const struct GNUNET_SCHEDULER_TaskContext *tc)
1364 dht_task = GNUNET_SCHEDULER_NO_TASK;
1367 if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1368 dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1369 dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1370 GNUNET_TIME_UNIT_FOREVER_REL,
1372 &process_dht_put_content, NULL);
1373 GNUNET_assert (dht_qe != NULL);
1379 * We're done with a particular message list entry.
1380 * Free all associated resources.
1382 * @param pml entry to destroy
1385 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1387 GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1388 pml->req->pending_tail,
1390 GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1391 pml->target->pending_messages_tail,
1393 pml->target->pending_requests--;
1394 GNUNET_free (pml->pm);
1400 * Destroy the given pending message (and call the respective
1403 * @param pm message to destroy
1404 * @param tpid id of peer that the message was delivered to, or 0 for none
1407 destroy_pending_message (struct PendingMessage *pm,
1408 GNUNET_PEER_Id tpid)
1410 struct PendingMessageList *pml = pm->pml;
1411 TransmissionContinuation cont;
1415 cont_cls = pm->cont_cls;
1418 GNUNET_assert (pml->pm == pm);
1419 GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1420 destroy_pending_message_list_entry (pml);
1427 cont (cont_cls, tpid);
1432 * We're done processing a particular request.
1433 * Free all associated resources.
1435 * @param pr request to destroy
1438 destroy_pending_request (struct PendingRequest *pr)
1440 struct GNUNET_PeerIdentity pid;
1443 if (pr->hnode != NULL)
1445 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1449 if (NULL == pr->client_request_list)
1451 GNUNET_STATISTICS_update (stats,
1452 gettext_noop ("# P2P searches active"),
1458 GNUNET_STATISTICS_update (stats,
1459 gettext_noop ("# client searches active"),
1464 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1468 GNUNET_LOAD_update (rt_entry_lifetime,
1469 GNUNET_TIME_absolute_get_duration (pr->start_time).value);
1473 GNUNET_DATASTORE_cancel (pr->qe);
1476 if (pr->dht_get != NULL)
1478 GNUNET_DHT_get_stop (pr->dht_get);
1481 if (pr->client_request_list != NULL)
1483 GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1484 pr->client_request_list->client_list->rl_tail,
1485 pr->client_request_list);
1486 GNUNET_free (pr->client_request_list);
1487 pr->client_request_list = NULL;
1491 GNUNET_PEER_resolve (pr->cp->pid,
1493 (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1500 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1503 if (pr->irc != NULL)
1505 GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1508 if (pr->replies_seen != NULL)
1510 GNUNET_free (pr->replies_seen);
1511 pr->replies_seen = NULL;
1513 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1515 GNUNET_SCHEDULER_cancel (sched,
1517 pr->task = GNUNET_SCHEDULER_NO_TASK;
1519 while (NULL != pr->pending_head)
1520 destroy_pending_message_list_entry (pr->pending_head);
1521 GNUNET_PEER_change_rc (pr->target_pid, -1);
1522 if (pr->used_targets != NULL)
1524 for (i=0;i<pr->used_targets_off;i++)
1525 GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1526 GNUNET_free (pr->used_targets);
1527 pr->used_targets_off = 0;
1528 pr->used_targets_size = 0;
1529 pr->used_targets = NULL;
1536 * Method called whenever a given peer connects.
1538 * @param cls closure, not used
1539 * @param peer peer identity this notification is about
1540 * @param latency reported latency of the connection with 'other'
1541 * @param distance reported distance (DV) to 'other'
1544 peer_connect_handler (void *cls,
1546 GNUNET_PeerIdentity * peer,
1547 struct GNUNET_TIME_Relative latency,
1550 struct ConnectedPeer *cp;
1551 struct MigrationReadyBlock *pos;
1555 cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1556 cp->transmission_delay = GNUNET_LOAD_value_init ();
1557 cp->pid = GNUNET_PEER_intern (peer);
1559 fn = get_trust_filename (peer);
1560 if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1561 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1562 cp->disk_trust = cp->trust = ntohl (trust);
1565 GNUNET_break (GNUNET_OK ==
1566 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1569 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1574 (void) consider_migration (pos, &peer->hashPubKey, cp);
1581 * Increase the host credit by a value.
1583 * @param host which peer to change the trust value on
1584 * @param value is the int value by which the
1585 * host credit is to be increased or decreased
1586 * @returns the actual change in trust (positive or negative)
1589 change_host_trust (struct ConnectedPeer *host, int value)
1591 unsigned int old_trust;
1595 GNUNET_assert (host != NULL);
1596 old_trust = host->trust;
1599 if (host->trust + value < host->trust)
1601 value = UINT32_MAX - host->trust;
1602 host->trust = UINT32_MAX;
1605 host->trust += value;
1609 if (host->trust < -value)
1611 value = -host->trust;
1615 host->trust += value;
1622 * Write host-trust information to a file - flush the buffer entry!
1625 flush_trust (void *cls,
1626 const GNUNET_HashCode *key,
1629 struct ConnectedPeer *host = value;
1632 struct GNUNET_PeerIdentity pid;
1634 if (host->trust == host->disk_trust)
1635 return GNUNET_OK; /* unchanged */
1636 GNUNET_PEER_resolve (host->pid,
1638 fn = get_trust_filename (&pid);
1639 if (host->trust == 0)
1641 if ((0 != UNLINK (fn)) && (errno != ENOENT))
1642 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1643 GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1647 trust = htonl (host->trust);
1648 if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust,
1650 GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1651 | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1652 host->disk_trust = host->trust;
1659 * Call this method periodically to scan data/hosts for new hosts.
1662 cron_flush_trust (void *cls,
1663 const struct GNUNET_SCHEDULER_TaskContext *tc)
1666 if (NULL == connected_peers)
1668 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1673 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1675 GNUNET_SCHEDULER_add_delayed (tc->sched,
1676 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1681 * Free (each) request made by the peer.
1683 * @param cls closure, points to peer that the request belongs to
1684 * @param key current key code
1685 * @param value value in the hash map
1686 * @return GNUNET_YES (we should continue to iterate)
1689 destroy_request (void *cls,
1690 const GNUNET_HashCode * key,
1693 const struct GNUNET_PeerIdentity * peer = cls;
1694 struct PendingRequest *pr = value;
1696 GNUNET_break (GNUNET_YES ==
1697 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1700 destroy_pending_request (pr);
1706 * Method called whenever a peer disconnects.
1708 * @param cls closure, not used
1709 * @param peer peer identity this notification is about
1712 peer_disconnect_handler (void *cls,
1714 GNUNET_PeerIdentity * peer)
1716 struct ConnectedPeer *cp;
1717 struct PendingMessage *pm;
1719 struct MigrationReadyBlock *pos;
1720 struct MigrationReadyBlock *next;
1722 GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1726 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1730 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1732 if (NULL != cp->last_client_replies[i])
1734 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1735 cp->last_client_replies[i] = NULL;
1738 GNUNET_break (GNUNET_YES ==
1739 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1742 /* remove this peer from migration considerations; schedule
1745 while (NULL != (pos = next))
1748 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1750 if (pos->target_list[i] == cp->pid)
1752 GNUNET_PEER_change_rc (pos->target_list[i], -1);
1753 pos->target_list[i] = 0;
1756 if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1758 delete_migration_block (pos);
1759 consider_migration_gathering ();
1762 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1763 &consider_migration,
1766 GNUNET_PEER_change_rc (cp->pid, -1);
1767 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1768 if (NULL != cp->cth)
1770 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1773 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1775 GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1776 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1778 while (NULL != (pm = cp->pending_messages_head))
1779 destroy_pending_message (pm, 0 /* delivery failed */);
1780 GNUNET_LOAD_value_free (cp->transmission_delay);
1781 GNUNET_break (0 == cp->pending_requests);
1787 * Iterator over hash map entries that removes all occurences
1788 * of the given 'client' from the 'last_client_replies' of the
1789 * given connected peer.
1791 * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1792 * @param key current key code (unused)
1793 * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1794 * @return GNUNET_YES (we should continue to iterate)
1797 remove_client_from_last_client_replies (void *cls,
1798 const GNUNET_HashCode * key,
1801 struct GNUNET_SERVER_Client *client = cls;
1802 struct ConnectedPeer *cp = value;
1805 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1807 if (cp->last_client_replies[i] == client)
1809 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1810 cp->last_client_replies[i] = NULL;
1818 * A client disconnected. Remove all of its pending queries.
1820 * @param cls closure, NULL
1821 * @param client identification of the client
1824 handle_client_disconnect (void *cls,
1825 struct GNUNET_SERVER_Client
1828 struct ClientList *pos;
1829 struct ClientList *prev;
1830 struct ClientRequestList *rcl;
1831 struct ClientResponseMessage *creply;
1837 while ( (NULL != pos) &&
1838 (pos->client != client) )
1844 return; /* no requests pending for this client */
1845 while (NULL != (rcl = pos->rl_head))
1847 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1848 "Destroying pending request `%s' on disconnect\n",
1849 GNUNET_h2s (&rcl->req->query));
1850 destroy_pending_request (rcl->req);
1853 client_list = pos->next;
1855 prev->next = pos->next;
1856 if (pos->th != NULL)
1858 GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1861 while (NULL != (creply = pos->res_head))
1863 GNUNET_CONTAINER_DLL_remove (pos->res_head,
1866 GNUNET_free (creply);
1868 GNUNET_SERVER_client_drop (pos->client);
1870 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1871 &remove_client_from_last_client_replies,
1877 * Iterator to free peer entries.
1879 * @param cls closure, unused
1880 * @param key current key code
1881 * @param value value in the hash map (peer entry)
1882 * @return GNUNET_YES (we should continue to iterate)
1885 clean_peer (void *cls,
1886 const GNUNET_HashCode * key,
1889 peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1895 * Task run during shutdown.
1901 shutdown_task (void *cls,
1902 const struct GNUNET_SCHEDULER_TaskContext *tc)
1906 GNUNET_DATASTORE_cancel (mig_qe);
1911 GNUNET_DATASTORE_cancel (dht_qe);
1914 if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1916 GNUNET_SCHEDULER_cancel (sched, mig_task);
1917 mig_task = GNUNET_SCHEDULER_NO_TASK;
1919 if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1921 GNUNET_SCHEDULER_cancel (sched, dht_task);
1922 dht_task = GNUNET_SCHEDULER_NO_TASK;
1924 while (client_list != NULL)
1925 handle_client_disconnect (NULL,
1926 client_list->client);
1927 cron_flush_trust (NULL, NULL);
1928 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1931 GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1932 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1933 requests_by_expiration_heap = 0;
1934 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1935 connected_peers = NULL;
1936 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1937 GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1938 query_request_map = NULL;
1939 GNUNET_LOAD_value_free (rt_entry_lifetime);
1940 rt_entry_lifetime = NULL;
1941 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1942 GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1943 peer_request_map = NULL;
1944 GNUNET_assert (NULL != core);
1945 GNUNET_CORE_disconnect (core);
1949 GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1954 GNUNET_DATASTORE_disconnect (dsh,
1958 while (mig_head != NULL)
1959 delete_migration_block (mig_head);
1960 GNUNET_assert (0 == mig_size);
1961 GNUNET_DHT_disconnect (dht_handle);
1963 GNUNET_LOAD_value_free (datastore_get_load);
1964 datastore_get_load = NULL;
1965 GNUNET_LOAD_value_free (datastore_put_load);
1966 datastore_put_load = NULL;
1967 GNUNET_BLOCK_context_destroy (block_ctx);
1969 GNUNET_CONFIGURATION_destroy (block_cfg);
1973 GNUNET_free_non_null (trustDirectory);
1974 trustDirectory = NULL;
1978 /* ******************* Utility functions ******************** */
1982 * We've had to delay a request for transmission to core, but now
1983 * we should be ready. Run it.
1985 * @param cls the 'struct ConnectedPeer' for which a request was delayed
1986 * @param tc task context (unused)
1989 delayed_transmission_request (void *cls,
1990 const struct GNUNET_SCHEDULER_TaskContext *tc)
1992 struct ConnectedPeer *cp = cls;
1993 struct GNUNET_PeerIdentity pid;
1994 struct PendingMessage *pm;
1996 pm = cp->pending_messages_head;
1997 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1998 GNUNET_assert (cp->cth == NULL);
2001 GNUNET_PEER_resolve (cp->pid,
2003 cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2004 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2006 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2015 * Transmit messages by copying it to the target buffer
2016 * "buf". "buf" will be NULL and "size" zero if the socket was closed
2017 * for writing in the meantime. In that case, do nothing
2018 * (the disconnect or shutdown handler will take care of the rest).
2019 * If we were able to transmit messages and there are still more
2020 * pending, ask core again for further calls to this function.
2022 * @param cls closure, pointer to the 'struct ConnectedPeer*'
2023 * @param size number of bytes available in buf
2024 * @param buf where the callee should write the message
2025 * @return number of bytes written to buf
2028 transmit_to_peer (void *cls,
2029 size_t size, void *buf)
2031 struct ConnectedPeer *cp = cls;
2033 struct PendingMessage *pm;
2034 struct PendingMessage *next_pm;
2035 struct GNUNET_TIME_Absolute now;
2036 struct GNUNET_TIME_Relative min_delay;
2037 struct MigrationReadyBlock *mb;
2038 struct MigrationReadyBlock *next;
2039 struct PutMessage migm;
2042 struct GNUNET_PeerIdentity pid;
2048 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2049 "Dropping message, core too busy.\n");
2051 GNUNET_LOAD_update (cp->transmission_delay,
2055 GNUNET_LOAD_update (cp->transmission_delay,
2056 GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
2057 now = GNUNET_TIME_absolute_get ();
2059 min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2060 next_pm = cp->pending_messages_head;
2061 while ( (NULL != (pm = next_pm) ) &&
2062 (pm->msize <= size) )
2065 if (pm->delay_until.value > now.value)
2067 min_delay = GNUNET_TIME_relative_min (min_delay,
2068 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2071 memcpy (&cbuf[msize], &pm[1], pm->msize);
2074 if (NULL == pm->pml)
2076 GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2077 cp->pending_messages_tail,
2079 cp->pending_requests--;
2081 destroy_pending_message (pm, cp->pid);
2084 min_delay = GNUNET_TIME_UNIT_ZERO;
2085 if (NULL != cp->pending_messages_head)
2087 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2088 cp->delayed_transmission_request_task
2089 = GNUNET_SCHEDULER_add_delayed (sched,
2091 &delayed_transmission_request,
2096 GNUNET_PEER_resolve (cp->pid,
2099 while (NULL != (mb = next))
2102 for (i=0;i<MIGRATION_LIST_SIZE;i++)
2104 if ( (cp->pid == mb->target_list[i]) &&
2105 (mb->size + sizeof (migm) <= size) )
2107 GNUNET_PEER_change_rc (mb->target_list[i], -1);
2108 mb->target_list[i] = 0;
2110 memset (&migm, 0, sizeof (migm));
2111 migm.header.size = htons (sizeof (migm) + mb->size);
2112 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2113 migm.type = htonl (mb->type);
2114 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2115 memcpy (&cbuf[msize], &migm, sizeof (migm));
2116 msize += sizeof (migm);
2117 size -= sizeof (migm);
2118 memcpy (&cbuf[msize], &mb[1], mb->size);
2122 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2123 "Pushing migration block `%s' (%u bytes) to `%s'\n",
2124 GNUNET_h2s (&mb->query),
2125 (unsigned int) mb->size,
2133 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2134 "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2135 GNUNET_h2s (&mb->query),
2136 (unsigned int) mb->size,
2141 if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2142 (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2144 delete_migration_block (mb);
2145 consider_migration_gathering ();
2148 consider_migration (NULL,
2153 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2154 "Transmitting %u bytes to peer with PID %u\n",
2155 (unsigned int) msize,
2156 (unsigned int) cp->pid);
2163 * Add a message to the set of pending messages for the given peer.
2165 * @param cp peer to send message to
2166 * @param pm message to queue
2167 * @param pr request on which behalf this message is being queued
2170 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2171 struct PendingMessage *pm,
2172 struct PendingRequest *pr)
2174 struct PendingMessage *pos;
2175 struct PendingMessageList *pml;
2176 struct GNUNET_PeerIdentity pid;
2178 GNUNET_assert (pm->next == NULL);
2179 GNUNET_assert (pm->pml == NULL);
2182 pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2187 GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2191 pos = cp->pending_messages_head;
2192 while ( (pos != NULL) &&
2193 (pm->priority < pos->priority) )
2195 GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2196 cp->pending_messages_tail,
2199 cp->pending_requests++;
2200 if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2202 GNUNET_STATISTICS_update (stats,
2203 gettext_noop ("# P2P searches discarded (queue length bound)"),
2206 destroy_pending_message (cp->pending_messages_tail, 0);
2208 GNUNET_PEER_resolve (cp->pid, &pid);
2209 if (NULL != cp->cth)
2211 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2214 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2216 GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
2217 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2219 /* need to schedule transmission */
2220 cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2221 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2222 cp->pending_messages_head->priority,
2225 cp->pending_messages_head->msize,
2228 if (cp->cth == NULL)
2231 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2232 "Failed to schedule transmission with core!\n");
2234 GNUNET_STATISTICS_update (stats,
2235 gettext_noop ("# CORE transmission failures"),
2243 * Test if the DATABASE (GET) load on this peer is too high
2244 * to even consider processing the query at
2247 * @return GNUNET_YES if the load is too high to do anything (load high)
2248 * GNUNET_NO to process normally (load normal)
2249 * GNUNET_SYSERR to process for free (load low)
2252 test_get_load_too_high (uint32_t priority)
2256 ld = GNUNET_LOAD_get_load (datastore_get_load);
2259 GNUNET_STATISTICS_update (stats,
2260 gettext_noop ("# requests done for free (low load)"),
2263 return GNUNET_SYSERR;
2267 GNUNET_STATISTICS_update (stats,
2268 gettext_noop ("# requests done for a price (normal load)"),
2273 GNUNET_STATISTICS_update (stats,
2274 gettext_noop ("# priority determined to be high"),
2284 * Test if the DATABASE (PUT) load on this peer is too high
2285 * to even consider processing the query at
2288 * @return GNUNET_YES if the load is too high to do anything (load high)
2289 * GNUNET_NO to process normally (load normal or low)
2292 test_put_load_too_high (uint32_t priority)
2296 if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2297 return GNUNET_NO; /* very fast */
2298 ld = GNUNET_LOAD_get_load (datastore_put_load);
2299 if (ld < 2.0 * (1 + priority))
2301 GNUNET_STATISTICS_update (stats,
2302 gettext_noop ("# storage requests dropped due to high load"),
2309 /* ******************* Pending Request Refresh Task ******************** */
2314 * We use a random delay to make the timing of requests less
2315 * predictable. This function returns such a random delay. We add a base
2316 * delay of MAX_CORK_DELAY (1s).
2318 * FIXME: make schedule dependent on the specifics of the request?
2319 * Or bandwidth and number of connected peers and load?
2321 * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2323 static struct GNUNET_TIME_Relative
2324 get_processing_delay ()
2327 GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2328 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2329 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2335 * We're processing a GET request from another peer and have decided
2336 * to forward it to other peers. This function is called periodically
2337 * and should forward the request to other peers until we have all
2338 * possible replies. If we have transmitted the *only* reply to
2339 * the initiator we should destroy the pending request. If we have
2340 * many replies in the queue to the initiator, we should delay sending
2341 * out more queries until the reply queue has shrunk some.
2343 * @param cls our "struct ProcessGetContext *"
2347 forward_request_task (void *cls,
2348 const struct GNUNET_SCHEDULER_TaskContext *tc);
2352 * Function called after we either failed or succeeded
2353 * at transmitting a query to a peer.
2355 * @param cls the requests "struct PendingRequest*"
2356 * @param tpid ID of receiving peer, 0 on transmission error
2359 transmit_query_continuation (void *cls,
2360 GNUNET_PEER_Id tpid)
2362 struct PendingRequest *pr = cls;
2365 GNUNET_STATISTICS_update (stats,
2366 gettext_noop ("# queries scheduled for forwarding"),
2372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2373 "Transmission of request failed, will try again later.\n");
2375 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2376 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2377 get_processing_delay (),
2378 &forward_request_task,
2383 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2384 "Transmitted query `%s'\n",
2385 GNUNET_h2s (&pr->query));
2387 GNUNET_STATISTICS_update (stats,
2388 gettext_noop ("# queries forwarded"),
2391 for (i=0;i<pr->used_targets_off;i++)
2392 if (pr->used_targets[i].pid == tpid)
2393 break; /* found match! */
2394 if (i == pr->used_targets_off)
2396 /* need to create new entry */
2397 if (pr->used_targets_off == pr->used_targets_size)
2398 GNUNET_array_grow (pr->used_targets,
2399 pr->used_targets_size,
2400 pr->used_targets_size * 2 + 2);
2401 GNUNET_PEER_change_rc (tpid, 1);
2402 pr->used_targets[pr->used_targets_off].pid = tpid;
2403 pr->used_targets[pr->used_targets_off].num_requests = 0;
2404 i = pr->used_targets_off++;
2406 pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2407 pr->used_targets[i].num_requests++;
2408 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2409 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2410 get_processing_delay (),
2411 &forward_request_task,
2417 * How many bytes should a bloomfilter be if we have already seen
2418 * entry_count responses? Note that BLOOMFILTER_K gives us the number
2419 * of bits set per entry. Furthermore, we should not re-size the
2420 * filter too often (to keep it cheap).
2422 * Since other peers will also add entries but not resize the filter,
2423 * we should generally pick a slightly larger size than what the
2424 * strict math would suggest.
2426 * @return must be a power of two and smaller or equal to 2^15.
2429 compute_bloomfilter_size (unsigned int entry_count)
2432 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2433 uint16_t max = 1 << 15;
2435 if (entry_count > max)
2438 while ((size < max) && (size < ideal))
2447 * Recalculate our bloom filter for filtering replies. This function
2448 * will create a new bloom filter from scratch, so it should only be
2449 * called if we have no bloomfilter at all (and hence can create a
2450 * fresh one of minimal size without problems) OR if our peer is the
2451 * initiator (in which case we may resize to larger than mimimum size).
2453 * @param pr request for which the BF is to be recomputed
2456 refresh_bloomfilter (struct PendingRequest *pr)
2460 GNUNET_HashCode mhash;
2462 nsize = compute_bloomfilter_size (pr->replies_seen_off);
2463 if (nsize == pr->bf_size)
2464 return; /* size not changed */
2466 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2467 pr->bf_size = nsize;
2468 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2469 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2472 for (i=0;i<pr->replies_seen_off;i++)
2474 GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2477 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2483 * Function called after we've tried to reserve a certain amount of
2484 * bandwidth for a reply. Check if we succeeded and if so send our
2487 * @param cls the requests "struct PendingRequest*"
2488 * @param peer identifies the peer
2489 * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2490 * @param bpm_out set to the current bandwidth limit (sending) for this peer
2491 * @param amount set to the amount that was actually reserved or unreserved
2492 * @param preference current traffic preference for the given peer
2495 target_reservation_cb (void *cls,
2497 GNUNET_PeerIdentity * peer,
2498 struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2499 struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2501 uint64_t preference)
2503 struct PendingRequest *pr = cls;
2504 struct ConnectedPeer *cp;
2505 struct PendingMessage *pm;
2506 struct GetMessage *gm;
2507 GNUNET_HashCode *ext;
2518 /* error in communication with core, try again later */
2519 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2520 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2521 get_processing_delay (),
2522 &forward_request_task,
2526 /* (3) transmit, update ttl/priority */
2527 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2531 /* Peer must have just left */
2533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2534 "Selected peer disconnected!\n");
2536 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2537 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2538 get_processing_delay (),
2539 &forward_request_task,
2543 no_route = GNUNET_NO;
2549 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2550 "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2554 GNUNET_STATISTICS_update (stats,
2555 gettext_noop ("# reply bandwidth reservation requests failed"),
2558 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2559 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2560 get_processing_delay (),
2561 &forward_request_task,
2563 return; /* this target round failed */
2565 no_route = GNUNET_YES;
2568 GNUNET_STATISTICS_update (stats,
2569 gettext_noop ("# queries scheduled for forwarding"),
2572 for (i=0;i<pr->used_targets_off;i++)
2573 if (pr->used_targets[i].pid == cp->pid)
2575 GNUNET_STATISTICS_update (stats,
2576 gettext_noop ("# queries retransmitted to same target"),
2582 /* build message and insert message into priority queue */
2584 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2585 "Forwarding request `%s' to `%4s'!\n",
2586 GNUNET_h2s (&pr->query),
2591 if (GNUNET_YES == no_route)
2593 bm |= GET_MESSAGE_BIT_RETURN_TO;
2596 if (pr->namespace != NULL)
2598 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2601 if (pr->target_pid != 0)
2603 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2606 msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2607 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2608 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2610 gm = (struct GetMessage*) &pm[1];
2611 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2612 gm->header.size = htons (msize);
2613 gm->type = htonl (pr->type);
2614 pr->remaining_priority /= 2;
2615 gm->priority = htonl (pr->remaining_priority);
2616 gm->ttl = htonl (pr->ttl);
2617 gm->filter_mutator = htonl(pr->mingle);
2618 gm->hash_bitmap = htonl (bm);
2619 gm->query = pr->query;
2620 ext = (GNUNET_HashCode*) &gm[1];
2622 if (GNUNET_YES == no_route)
2623 GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2624 if (pr->namespace != NULL)
2625 memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2626 if (pr->target_pid != 0)
2627 GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2628 bfdata = (char *) &ext[k];
2630 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2633 pm->cont = &transmit_query_continuation;
2635 cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2636 add_to_pending_messages_for_peer (cp, pm, pr);
2641 * Closure used for "target_peer_select_cb".
2643 struct PeerSelectionContext
2646 * The request for which we are selecting
2649 struct PendingRequest *pr;
2652 * Current "prime" target.
2654 struct GNUNET_PeerIdentity target;
2657 * How much do we like this target?
2659 double target_score;
2665 * Function called for each connected peer to determine
2666 * which one(s) would make good targets for forwarding.
2668 * @param cls closure (struct PeerSelectionContext)
2669 * @param key current key code (peer identity)
2670 * @param value value in the hash map (struct ConnectedPeer)
2671 * @return GNUNET_YES if we should continue to
2676 target_peer_select_cb (void *cls,
2677 const GNUNET_HashCode * key,
2680 struct PeerSelectionContext *psc = cls;
2681 struct ConnectedPeer *cp = value;
2682 struct PendingRequest *pr = psc->pr;
2683 struct GNUNET_TIME_Relative delay;
2688 /* 1) check that this peer is not the initiator */
2692 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2693 "Skipping initiator in forwarding selection\n");
2695 return GNUNET_YES; /* skip */
2698 /* 2) check if we have already (recently) forwarded to this peer */
2699 /* 2a) this particular request */
2701 for (i=0;i<pr->used_targets_off;i++)
2702 if (pr->used_targets[i].pid == cp->pid)
2704 pc = pr->used_targets[i].num_requests;
2705 GNUNET_assert (pc > 0);
2706 if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2707 RETRY_PROBABILITY_INV * pc))
2710 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2711 "NOT re-trying query that was previously transmitted %u times\n",
2714 return GNUNET_YES; /* skip */
2721 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2722 "Re-trying query that was previously transmitted %u times to this peer\n",
2726 /* 2b) many other requests to this peer */
2727 delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2728 if (delay.value <= cp->avg_delay.value)
2731 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2732 "NOT sending query since we send %u others to this peer in the last %llums\n",
2734 cp->avg_delay.value);
2736 return GNUNET_YES; /* skip */
2739 /* 3) calculate how much we'd like to forward to this peer,
2740 starting with a random value that is strong enough
2741 to at least give any peer a chance sometimes
2742 (compared to the other factors that come later) */
2743 /* 3a) count successful (recent) routes from cp for same source */
2746 score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2747 P2P_SUCCESS_LIST_SIZE);
2748 for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2749 if (cp->last_p2p_replies[i] == pr->cp->pid)
2750 score += 1.0; /* likely successful based on hot path */
2754 score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2755 CS2P_SUCCESS_LIST_SIZE);
2756 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2757 if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2758 score += 1.0; /* likely successful based on hot path */
2760 /* 3b) include latency */
2761 if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2762 score += 1.0; /* likely fast based on latency */
2763 /* 3c) include priorities */
2764 if (cp->avg_priority <= pr->remaining_priority / 2.0)
2765 score += 1.0; /* likely successful based on priorities */
2766 /* 3d) penalize for queue size */
2767 score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER);
2768 /* 3e) include peer proximity */
2769 score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2770 &pr->query)) / (double) UINT32_MAX);
2771 /* 4) super-bonus for being the known target */
2772 if (pr->target_pid == cp->pid)
2774 /* store best-fit in closure */
2776 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2777 "Peer `%s' gets score %f for forwarding query, max is %f\n",
2782 score++; /* avoid zero */
2783 if (score > psc->target_score)
2785 psc->target_score = score;
2786 psc->target.hashPubKey = *key;
2793 * The priority level imposes a bound on the maximum
2794 * value for the ttl that can be requested.
2796 * @param ttl_in requested ttl
2797 * @param prio given priority
2798 * @return ttl_in if ttl_in is below the limit,
2799 * otherwise the ttl-limit for the given priority
2802 bound_ttl (int32_t ttl_in, uint32_t prio)
2804 unsigned long long allowed;
2808 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
2809 if (ttl_in > allowed)
2811 if (allowed >= (1 << 30))
2820 * Iterator called on each result obtained for a DHT
2821 * operation that expects a reply
2823 * @param cls closure
2824 * @param exp when will this value expire
2825 * @param key key of the result
2826 * @param get_path NULL-terminated array of pointers
2827 * to the peers on reverse GET path (or NULL if not recorded)
2828 * @param put_path NULL-terminated array of pointers
2829 * to the peers on the PUT path (or NULL if not recorded)
2830 * @param type type of the result
2831 * @param size number of bytes in data
2832 * @param data pointer to the result data
2835 process_dht_reply (void *cls,
2836 struct GNUNET_TIME_Absolute exp,
2837 const GNUNET_HashCode * key,
2838 const struct GNUNET_PeerIdentity * const *get_path,
2839 const struct GNUNET_PeerIdentity * const *put_path,
2840 enum GNUNET_BLOCK_Type type,
2846 * We're processing a GET request and have decided
2847 * to forward it to other peers. This function is called periodically
2848 * and should forward the request to other peers until we have all
2849 * possible replies. If we have transmitted the *only* reply to
2850 * the initiator we should destroy the pending request. If we have
2851 * many replies in the queue to the initiator, we should delay sending
2852 * out more queries until the reply queue has shrunk some.
2854 * @param cls our "struct ProcessGetContext *"
2858 forward_request_task (void *cls,
2859 const struct GNUNET_SCHEDULER_TaskContext *tc)
2861 struct PendingRequest *pr = cls;
2862 struct PeerSelectionContext psc;
2863 struct ConnectedPeer *cp;
2864 struct GNUNET_TIME_Relative delay;
2866 pr->task = GNUNET_SCHEDULER_NO_TASK;
2867 if (pr->irc != NULL)
2870 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2871 "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2872 GNUNET_h2s (&pr->query));
2874 return; /* already pending */
2876 if (GNUNET_YES == pr->local_only)
2877 return; /* configured to not do P2P search */
2879 if ( (0 == pr->anonymity_level) &&
2880 (GNUNET_YES != pr->forward_only) &&
2881 (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2882 (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2884 pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2885 GNUNET_TIME_UNIT_FOREVER_REL,
2892 (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2896 /* (1) select target */
2898 psc.target_score = -DBL_MAX;
2899 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2900 &target_peer_select_cb,
2902 if (psc.target_score == -DBL_MAX)
2904 delay = get_processing_delay ();
2906 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2907 "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2908 GNUNET_h2s (&pr->query),
2911 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2913 &forward_request_task,
2915 return; /* nobody selected */
2917 /* (3) update TTL/priority */
2918 if (pr->client_request_list != NULL)
2920 /* FIXME: use better algorithm!? */
2921 if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2924 /* bound priority we use by priorities we see from other peers
2925 rounded up (must round up so that we can see non-zero
2926 priorities, but round up as little as possible to make it
2927 plausible that we forwarded another peers request) */
2928 if (pr->priority > current_priorities + 1.0)
2929 pr->priority = (uint32_t) current_priorities + 1.0;
2930 pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2933 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2934 "Trying query `%s' with priority %u and TTL %d.\n",
2935 GNUNET_h2s (&pr->query),
2941 /* (3) reserve reply bandwidth */
2942 if (GNUNET_NO == pr->forward_only)
2944 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2945 &psc.target.hashPubKey);
2946 GNUNET_assert (NULL != cp);
2947 pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2949 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2950 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2953 &target_reservation_cb,
2955 cp->inc_preference = 0;
2959 /* force forwarding */
2960 static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
2961 target_reservation_cb (pr, &psc.target,
2962 zerobw, zerobw, 0, 0.0);
2967 /* **************************** P2P PUT Handling ************************ */
2971 * Function called after we either failed or succeeded
2972 * at transmitting a reply to a peer.
2974 * @param cls the requests "struct PendingRequest*"
2975 * @param tpid ID of receiving peer, 0 on transmission error
2978 transmit_reply_continuation (void *cls,
2979 GNUNET_PEER_Id tpid)
2981 struct PendingRequest *pr = cls;
2985 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2986 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2987 /* only one reply expected, done with the request! */
2988 destroy_pending_request (pr);
2990 case GNUNET_BLOCK_TYPE_ANY:
2991 case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2992 case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3002 * Transmit the given message by copying it to the target buffer
3003 * "buf". "buf" will be NULL and "size" zero if the socket was closed
3004 * for writing in the meantime. In that case, do nothing
3005 * (the disconnect or shutdown handler will take care of the rest).
3006 * If we were able to transmit messages and there are still more
3007 * pending, ask core again for further calls to this function.
3009 * @param cls closure, pointer to the 'struct ClientList*'
3010 * @param size number of bytes available in buf
3011 * @param buf where the callee should write the message
3012 * @return number of bytes written to buf
3015 transmit_to_client (void *cls,
3016 size_t size, void *buf)
3018 struct ClientList *cl = cls;
3020 struct ClientResponseMessage *creply;
3027 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3028 "Not sending reply, client communication problem.\n");
3033 while ( (NULL != (creply = cl->res_head) ) &&
3034 (creply->msize <= size) )
3036 memcpy (&cbuf[msize], &creply[1], creply->msize);
3037 msize += creply->msize;
3038 size -= creply->msize;
3039 GNUNET_CONTAINER_DLL_remove (cl->res_head,
3042 GNUNET_free (creply);
3045 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3047 GNUNET_TIME_UNIT_FOREVER_REL,
3048 &transmit_to_client,
3051 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3052 "Transmitted %u bytes to client\n",
3053 (unsigned int) msize);
3060 * Closure for "process_reply" function.
3062 struct ProcessReplyClosure
3065 * The data for the reply.
3070 * Who gave us this reply? NULL for local host (or DHT)
3072 struct ConnectedPeer *sender;
3075 * When the reply expires.
3077 struct GNUNET_TIME_Absolute expiration;
3085 * Type of the block.
3087 enum GNUNET_BLOCK_Type type;
3090 * How much was this reply worth to us?
3095 * Evaluation result (returned).
3097 enum GNUNET_BLOCK_EvaluationResult eval;
3100 * Did we finish processing the associated request?
3105 * Did we find a matching request?
3112 * We have received a reply; handle it!
3114 * @param cls response (struct ProcessReplyClosure)
3115 * @param key our query
3116 * @param value value in the hash map (info about the query)
3117 * @return GNUNET_YES (we should continue to iterate)
3120 process_reply (void *cls,
3121 const GNUNET_HashCode * key,
3124 struct ProcessReplyClosure *prq = cls;
3125 struct PendingRequest *pr = value;
3126 struct PendingMessage *reply;
3127 struct ClientResponseMessage *creply;
3128 struct ClientList *cl;
3129 struct PutMessage *pm;
3130 struct ConnectedPeer *cp;
3131 struct GNUNET_TIME_Relative cur_delay;
3133 struct GNUNET_TIME_Relative art_delay;
3139 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3140 "Matched result (type %u) for query `%s' with pending request\n",
3141 (unsigned int) prq->type,
3144 GNUNET_STATISTICS_update (stats,
3145 gettext_noop ("# replies received and matched"),
3148 if (prq->sender != NULL)
3150 for (i=0;i<pr->used_targets_off;i++)
3151 if (pr->used_targets[i].pid == prq->sender->pid)
3153 if (i < pr->used_targets_off)
3155 cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);
3156 prq->sender->avg_delay.value
3157 = (prq->sender->avg_delay.value *
3158 (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N;
3159 prq->sender->avg_priority
3160 = (prq->sender->avg_priority *
3161 (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3165 GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3166 [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
3168 GNUNET_PEER_change_rc (pr->cp->pid, 1);
3169 prq->sender->last_p2p_replies
3170 [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3175 if (NULL != prq->sender->last_client_replies
3176 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3177 GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3178 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3179 prq->sender->last_client_replies
3180 [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3181 = pr->client_request_list->client_list->client;
3182 GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3185 prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3190 pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3195 case GNUNET_BLOCK_EVALUATION_OK_MORE:
3197 case GNUNET_BLOCK_EVALUATION_OK_LAST:
3198 while (NULL != pr->pending_head)
3199 destroy_pending_message_list_entry (pr->pending_head);
3202 if (pr->client_request_list != NULL)
3203 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
3205 GNUNET_DATASTORE_cancel (pr->qe);
3208 pr->do_remove = GNUNET_YES;
3209 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3211 GNUNET_SCHEDULER_cancel (sched,
3213 pr->task = GNUNET_SCHEDULER_NO_TASK;
3215 GNUNET_break (GNUNET_YES ==
3216 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3219 GNUNET_LOAD_update (rt_entry_lifetime,
3220 GNUNET_TIME_absolute_get_duration (pr->start_time).value);
3222 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3223 GNUNET_STATISTICS_update (stats,
3224 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3228 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3229 "Duplicate response `%s', discarding.\n",
3230 GNUNET_h2s (&mhash));
3232 return GNUNET_YES; /* duplicate */
3233 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3234 return GNUNET_YES; /* wrong namespace */
3235 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3238 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3241 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3242 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3243 _("Unsupported block type %u\n"),
3247 if (pr->client_request_list != NULL)
3249 if (pr->replies_seen_size == pr->replies_seen_off)
3250 GNUNET_array_grow (pr->replies_seen,
3251 pr->replies_seen_size,
3252 pr->replies_seen_size * 2 + 4);
3253 GNUNET_CRYPTO_hash (prq->data,
3255 &pr->replies_seen[pr->replies_seen_off++]);
3256 refresh_bloomfilter (pr);
3258 if (NULL == prq->sender)
3261 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3262 "Found result for query `%s' in local datastore\n",
3265 GNUNET_STATISTICS_update (stats,
3266 gettext_noop ("# results found locally"),
3270 prq->priority += pr->remaining_priority;
3271 pr->remaining_priority = 0;
3272 pr->results_found++;
3273 prq->request_found = GNUNET_YES;
3274 if (NULL != pr->client_request_list)
3276 GNUNET_STATISTICS_update (stats,
3277 gettext_noop ("# replies received for local clients"),
3280 cl = pr->client_request_list->client_list;
3281 msize = sizeof (struct PutMessage) + prq->size;
3282 creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3283 creply->msize = msize;
3284 creply->client_list = cl;
3285 GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3289 pm = (struct PutMessage*) &creply[1];
3290 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3291 pm->header.size = htons (msize);
3292 pm->type = htonl (prq->type);
3293 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3294 memcpy (&pm[1], prq->data, prq->size);
3298 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3299 "Transmitting result for query `%s' to client\n",
3302 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3304 GNUNET_TIME_UNIT_FOREVER_REL,
3305 &transmit_to_client,
3308 GNUNET_break (cl->th != NULL);
3311 prq->finished = GNUNET_YES;
3312 destroy_pending_request (pr);
3319 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3320 "Transmitting result for query `%s' to other peer (PID=%u)\n",
3322 (unsigned int) cp->pid);
3324 GNUNET_STATISTICS_update (stats,
3325 gettext_noop ("# replies received for other peers"),
3328 msize = sizeof (struct PutMessage) + prq->size;
3329 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3330 reply->cont = &transmit_reply_continuation;
3331 reply->cont_cls = pr;
3333 art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3334 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3337 = GNUNET_TIME_relative_to_absolute (art_delay);
3338 GNUNET_STATISTICS_update (stats,
3339 gettext_noop ("cummulative artificial delay introduced (ms)"),
3343 reply->msize = msize;
3344 reply->priority = UINT32_MAX; /* send replies first! */
3345 pm = (struct PutMessage*) &reply[1];
3346 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3347 pm->header.size = htons (msize);
3348 pm->type = htonl (prq->type);
3349 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3350 memcpy (&pm[1], prq->data, prq->size);
3351 add_to_pending_messages_for_peer (cp, reply, pr);
3358 * Iterator called on each result obtained for a DHT
3359 * operation that expects a reply
3361 * @param cls closure
3362 * @param exp when will this value expire
3363 * @param key key of the result
3364 * @param get_path NULL-terminated array of pointers
3365 * to the peers on reverse GET path (or NULL if not recorded)
3366 * @param put_path NULL-terminated array of pointers
3367 * to the peers on the PUT path (or NULL if not recorded)
3368 * @param type type of the result
3369 * @param size number of bytes in data
3370 * @param data pointer to the result data
3373 process_dht_reply (void *cls,
3374 struct GNUNET_TIME_Absolute exp,
3375 const GNUNET_HashCode * key,
3376 const struct GNUNET_PeerIdentity * const *get_path,
3377 const struct GNUNET_PeerIdentity * const *put_path,
3378 enum GNUNET_BLOCK_Type type,
3382 struct PendingRequest *pr = cls;
3383 struct ProcessReplyClosure prq;
3385 memset (&prq, 0, sizeof (prq));
3387 prq.expiration = exp;
3390 process_reply (&prq, key, pr);
3396 * Continuation called to notify client about result of the
3399 * @param cls closure
3400 * @param success GNUNET_SYSERR on failure
3401 * @param msg NULL on success, otherwise an error message
3404 put_migration_continuation (void *cls,
3408 struct GNUNET_TIME_Absolute *start = cls;
3409 struct GNUNET_TIME_Relative delay;
3411 delay = GNUNET_TIME_absolute_get_duration (*start);
3412 GNUNET_free (start);
3413 GNUNET_LOAD_update (datastore_put_load,
3415 if (GNUNET_OK == success)
3417 GNUNET_STATISTICS_update (stats,
3418 gettext_noop ("# datastore 'put' failures"),
3425 * Handle P2P "PUT" message.
3427 * @param cls closure, always NULL
3428 * @param other the other peer involved (sender or receiver, NULL
3429 * for loopback messages where we are both sender and receiver)
3430 * @param message the actual message
3431 * @param latency reported latency of the connection with 'other'
3432 * @param distance reported distance (DV) to 'other'
3433 * @return GNUNET_OK to keep the connection open,
3434 * GNUNET_SYSERR to close it (signal serious error)
3437 handle_p2p_put (void *cls,
3438 const struct GNUNET_PeerIdentity *other,
3439 const struct GNUNET_MessageHeader *message,
3440 struct GNUNET_TIME_Relative latency,
3443 const struct PutMessage *put;
3446 enum GNUNET_BLOCK_Type type;
3447 struct GNUNET_TIME_Absolute expiration;
3448 GNUNET_HashCode query;
3449 struct ProcessReplyClosure prq;
3450 struct GNUNET_TIME_Absolute *start;
3451 struct GNUNET_TIME_Relative block_time;
3453 struct ConnectedPeer *cp;
3454 struct PendingMessage *pm;
3455 struct MigrationStopMessage *msm;
3457 msize = ntohs (message->size);
3458 if (msize < sizeof (struct PutMessage))
3461 return GNUNET_SYSERR;
3463 put = (const struct PutMessage*) message;
3464 dsize = msize - sizeof (struct PutMessage);
3465 type = ntohl (put->type);
3466 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3468 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3469 return GNUNET_SYSERR;
3471 GNUNET_BLOCK_get_key (block_ctx,
3477 GNUNET_break_op (0);
3478 return GNUNET_SYSERR;
3481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3482 "Received result for query `%s' from peer `%4s'\n",
3483 GNUNET_h2s (&query),
3484 GNUNET_i2s (other));
3486 GNUNET_STATISTICS_update (stats,
3487 gettext_noop ("# replies received (overall)"),
3490 /* now, lookup 'query' */
3491 prq.data = (const void*) &put[1];
3493 prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3494 &other->hashPubKey);
3499 prq.expiration = expiration;
3501 prq.finished = GNUNET_NO;
3502 prq.request_found = GNUNET_NO;
3503 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3507 if (prq.sender != NULL)
3509 prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3510 prq.sender->trust += prq.priority;
3512 if ( (GNUNET_YES == active_migration) &&
3513 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3516 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3517 "Replicating result for query `%s' with priority %u\n",
3518 GNUNET_h2s (&query),
3521 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3522 *start = GNUNET_TIME_absolute_get ();
3523 GNUNET_DATASTORE_put (dsh,
3524 0, &query, dsize, &put[1],
3525 type, prq.priority, 1 /* anonymity */,
3527 1 + prq.priority, MAX_DATASTORE_QUEUE,
3528 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3529 &put_migration_continuation,
3532 putl = GNUNET_LOAD_get_load (datastore_put_load);
3533 if ( (GNUNET_NO == prq.request_found) &&
3534 ( (GNUNET_YES != active_migration) ||
3535 (putl > 2.5 * (1 + prq.priority)) ) )
3537 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3538 &other->hashPubKey);
3539 if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3540 return GNUNET_OK; /* already blocked */
3541 /* We're too busy; send MigrationStop message! */
3542 if (GNUNET_YES != active_migration)
3543 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3544 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3545 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3546 (unsigned int) (60000 * putl * putl)));
3548 cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3549 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
3550 sizeof (struct MigrationStopMessage));
3551 pm->msize = sizeof (struct MigrationStopMessage);
3552 pm->priority = UINT32_MAX;
3553 msm = (struct MigrationStopMessage*) &pm[1];
3554 msm->header.size = htons (sizeof (struct MigrationStopMessage));
3555 msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3556 msm->duration = GNUNET_TIME_relative_hton (block_time);
3557 add_to_pending_messages_for_peer (cp,
3566 * Handle P2P "MIGRATION_STOP" message.
3568 * @param cls closure, always NULL
3569 * @param other the other peer involved (sender or receiver, NULL
3570 * for loopback messages where we are both sender and receiver)
3571 * @param message the actual message
3572 * @param latency reported latency of the connection with 'other'
3573 * @param distance reported distance (DV) to 'other'
3574 * @return GNUNET_OK to keep the connection open,
3575 * GNUNET_SYSERR to close it (signal serious error)
3578 handle_p2p_migration_stop (void *cls,
3579 const struct GNUNET_PeerIdentity *other,
3580 const struct GNUNET_MessageHeader *message,
3581 struct GNUNET_TIME_Relative latency,
3584 struct ConnectedPeer *cp;
3585 const struct MigrationStopMessage *msm;
3587 msm = (const struct MigrationStopMessage*) message;
3588 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3589 &other->hashPubKey);
3595 cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3601 /* **************************** P2P GET Handling ************************ */
3605 * Closure for 'check_duplicate_request_{peer,client}'.
3607 struct CheckDuplicateRequestClosure
3610 * The new request we should check if it already exists.
3612 const struct PendingRequest *pr;
3615 * Existing request found by the checker, NULL if none.
3617 struct PendingRequest *have;
3622 * Iterator over entries in the 'query_request_map' that
3623 * tries to see if we have the same request pending from
3624 * the same client already.
3626 * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3627 * @param key current key code (query, ignored, must match)
3628 * @param value value in the hash map (a 'struct PendingRequest'
3629 * that already exists)
3630 * @return GNUNET_YES if we should continue to
3631 * iterate (no match yet)
3632 * GNUNET_NO if not (match found).
3635 check_duplicate_request_client (void *cls,
3636 const GNUNET_HashCode * key,
3639 struct CheckDuplicateRequestClosure *cdc = cls;
3640 struct PendingRequest *have = value;
3642 if (have->client_request_list == NULL)
3644 if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3655 * We're processing (local) results for a search request
3656 * from another peer. Pass applicable results to the
3657 * peer and if we are done either clean up (operation
3658 * complete) or forward to other peers (more results possible).
3660 * @param cls our closure (struct LocalGetContext)
3661 * @param key key for the content
3662 * @param size number of bytes in data
3663 * @param data content stored
3664 * @param type type of the content
3665 * @param priority priority of the content
3666 * @param anonymity anonymity-level for the content
3667 * @param expiration expiration time for the content
3668 * @param uid unique identifier for the datum;
3669 * maybe 0 if no unique identifier is available
3672 process_local_reply (void *cls,
3673 const GNUNET_HashCode * key,
3676 enum GNUNET_BLOCK_Type type,
3679 struct GNUNET_TIME_Absolute
3683 struct PendingRequest *pr = cls;
3684 struct ProcessReplyClosure prq;
3685 struct CheckDuplicateRequestClosure cdrc;
3686 GNUNET_HashCode query;
3687 unsigned int old_rf;
3692 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3693 "Done processing local replies, forwarding request to other peers.\n");
3696 if (pr->client_request_list != NULL)
3698 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
3700 /* Figure out if this is a duplicate request and possibly
3701 merge 'struct PendingRequest' entries */
3704 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3706 &check_duplicate_request_client,
3708 if (cdrc.have != NULL)
3711 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3712 "Received request for block `%s' twice from client, will only request once.\n",
3713 GNUNET_h2s (&pr->query));
3716 destroy_pending_request (pr);
3720 if (pr->local_only == GNUNET_YES)
3722 destroy_pending_request (pr);
3725 /* no more results */
3726 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3727 pr->task = GNUNET_SCHEDULER_add_now (sched,
3728 &forward_request_task,
3733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3734 "New local response to `%s' of type %u.\n",
3738 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3741 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3742 "Found ONDEMAND block, performing on-demand encoding\n");
3744 GNUNET_STATISTICS_update (stats,
3745 gettext_noop ("# on-demand blocks matched requests"),
3749 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
3750 anonymity, expiration, uid,
3751 &process_local_reply,
3755 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3759 old_rf = pr->results_found;
3760 memset (&prq, 0, sizeof (prq));
3762 prq.expiration = expiration;
3765 GNUNET_BLOCK_get_key (block_ctx,
3772 GNUNET_DATASTORE_remove (dsh,
3776 GNUNET_TIME_UNIT_FOREVER_REL,
3778 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3782 prq.priority = priority;
3783 prq.finished = GNUNET_NO;
3784 prq.request_found = GNUNET_NO;
3785 if ( (old_rf == 0) &&
3786 (pr->results_found == 0) )
3787 update_datastore_delays (pr->start_time);
3788 process_reply (&prq, key, pr);
3789 if (prq.finished == GNUNET_YES)
3792 return; /* done here */
3793 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3795 pr->local_only = GNUNET_YES; /* do not forward */
3796 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3799 if ( (pr->client_request_list == NULL) &&
3800 ( (GNUNET_YES == test_get_load_too_high (0)) ||
3801 (pr->results_found > 5 + 2 * pr->priority) ) )
3804 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3805 "Load too high, done with request\n");
3807 GNUNET_STATISTICS_update (stats,
3808 gettext_noop ("# processing result set cut short due to load"),
3811 /* FIXME: if this is activated, we might stall large downloads
3812 indefinitely since (presumably) the load can never go down again! */
3814 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3818 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3823 * We've received a request with the specified priority. Bound it
3824 * according to how much we trust the given peer.
3826 * @param prio_in requested priority
3827 * @param cp the peer making the request
3828 * @return effective priority
3831 bound_priority (uint32_t prio_in,
3832 struct ConnectedPeer *cp)
3834 #define N ((double)128.0)
3839 ld = test_get_load_too_high (0);
3840 if (ld == GNUNET_SYSERR)
3841 return 0; /* excess resources */
3842 ret = change_host_trust (cp, prio_in);
3845 if (ret > current_priorities + N)
3846 rret = current_priorities + N;
3850 = (current_priorities * (N-1) + rret)/N;
3852 if ( (ld == GNUNET_YES) && (ret > 0) )
3854 /* try with charging */
3855 ld = test_get_load_too_high (ret);
3857 if (ld == GNUNET_YES)
3861 change_host_trust (cp, -ret);
3862 return -1; /* not enough resources */
3870 * Iterator over entries in the 'query_request_map' that
3871 * tries to see if we have the same request pending from
3872 * the same peer already.
3874 * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3875 * @param key current key code (query, ignored, must match)
3876 * @param value value in the hash map (a 'struct PendingRequest'
3877 * that already exists)
3878 * @return GNUNET_YES if we should continue to
3879 * iterate (no match yet)
3880 * GNUNET_NO if not (match found).
3883 check_duplicate_request_peer (void *cls,
3884 const GNUNET_HashCode * key,
3887 struct CheckDuplicateRequestClosure *cdc = cls;
3888 struct PendingRequest *have = value;
3890 if (cdc->pr->target_pid == have->target_pid)
3900 * Handle P2P "GET" request.
3902 * @param cls closure, always NULL
3903 * @param other the other peer involved (sender or receiver, NULL
3904 * for loopback messages where we are both sender and receiver)
3905 * @param message the actual message
3906 * @param latency reported latency of the connection with 'other'
3907 * @param distance reported distance (DV) to 'other'
3908 * @return GNUNET_OK to keep the connection open,
3909 * GNUNET_SYSERR to close it (signal serious error)
3912 handle_p2p_get (void *cls,
3913 const struct GNUNET_PeerIdentity *other,
3914 const struct GNUNET_MessageHeader *message,
3915 struct GNUNET_TIME_Relative latency,
3918 struct PendingRequest *pr;
3919 struct ConnectedPeer *cp;
3920 struct ConnectedPeer *cps;
3921 struct CheckDuplicateRequestClosure cdc;
3922 struct GNUNET_TIME_Relative timeout;
3924 const struct GetMessage *gm;
3926 const GNUNET_HashCode *opt;
3929 uint32_t ttl_decrement;
3931 enum GNUNET_BLOCK_Type type;
3934 msize = ntohs(message->size);
3935 if (msize < sizeof (struct GetMessage))
3937 GNUNET_break_op (0);
3938 return GNUNET_SYSERR;
3940 gm = (const struct GetMessage*) message;
3942 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3943 "Received request for `%s'\n",
3944 GNUNET_h2s (&gm->query));
3946 type = ntohl (gm->type);
3947 bm = ntohl (gm->hash_bitmap);
3955 if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3957 GNUNET_break_op (0);
3958 return GNUNET_SYSERR;
3960 opt = (const GNUNET_HashCode*) &gm[1];
3961 bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3962 bm = ntohl (gm->hash_bitmap);
3964 cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3965 &other->hashPubKey);
3968 /* peer must have just disconnected */
3969 GNUNET_STATISTICS_update (stats,
3970 gettext_noop ("# requests dropped due to initiator not being connected"),
3973 return GNUNET_SYSERR;
3975 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3976 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3983 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3984 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3985 "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3986 GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3989 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3990 "Failed to find peer `%4s' in connection set. Dropping query.\n",
3991 GNUNET_i2s (other));
3993 GNUNET_STATISTICS_update (stats,
3994 gettext_noop ("# requests dropped due to missing reverse route"),
3997 /* FIXME: try connect? */
4000 /* note that we can really only check load here since otherwise
4001 peers could find out that we are overloaded by not being
4002 disconnected after sending us a malformed query... */
4003 priority = bound_priority (ntohl (gm->priority), cps);
4007 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4008 "Dropping query from `%s', this peer is too busy.\n",
4009 GNUNET_i2s (other));
4011 GNUNET_STATISTICS_update (stats,
4012 gettext_noop ("# requests dropped due to high load"),
4016 /* FIXME: this causes problems... */
4021 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4022 "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4023 GNUNET_h2s (&gm->query),
4024 (unsigned int) type,
4028 have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4029 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
4030 (have_ns ? sizeof(GNUNET_HashCode) : 0));
4033 pr->namespace = (GNUNET_HashCode*) &pr[1];
4034 memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4036 if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4037 (GNUNET_LOAD_get_average (cp->transmission_delay) >
4038 GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4040 /* don't have BW to send to peer, or would likely take longer than we have for it,
4041 so at best indirect the query */
4043 /* FIXME: if this line is enabled, the 'perf' test for larger files simply "hangs";
4044 the cause seems to be that the load goes up (to the point where we do this)
4045 and then never goes down again... (outch) */
4046 // pr->forward_only = GNUNET_YES;
4049 pr->mingle = ntohl (gm->filter_mutator);
4050 if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4051 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4052 pr->anonymity_level = 1;
4053 pr->priority = (uint32_t) priority;
4054 pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4055 pr->query = gm->query;
4056 /* decrement ttl (always) */
4057 ttl_decrement = 2 * TTL_DECREMENT +
4058 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4060 if ( (pr->ttl < 0) &&
4061 (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4064 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4065 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4070 GNUNET_STATISTICS_update (stats,
4071 gettext_noop ("# requests dropped due TTL underflow"),
4074 /* integer underflow => drop (should be very rare)! */
4078 pr->ttl -= ttl_decrement;
4079 pr->start_time = GNUNET_TIME_absolute_get ();
4081 /* get bloom filter */
4084 pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4087 pr->bf_size = bfsize;
4091 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4093 &check_duplicate_request_peer,
4095 if (cdc.have != NULL)
4097 if (cdc.have->start_time.value + cdc.have->ttl >=
4098 pr->start_time.value + pr->ttl)
4100 /* existing request has higher TTL, drop new one! */
4101 cdc.have->priority += pr->priority;
4102 destroy_pending_request (pr);
4104 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4105 "Have existing request with higher TTL, dropping new request.\n",
4106 GNUNET_i2s (other));
4108 GNUNET_STATISTICS_update (stats,
4109 gettext_noop ("# requests dropped due to higher-TTL request"),
4116 /* existing request has lower TTL, drop old one! */
4117 pr->priority += cdc.have->priority;
4118 /* Possible optimization: if we have applicable pending
4119 replies in 'cdc.have', we might want to move those over
4120 (this is a really rare special-case, so it is not clear
4121 that this would be worth it) */
4122 destroy_pending_request (cdc.have);
4123 /* keep processing 'pr'! */
4128 GNUNET_break (GNUNET_OK ==
4129 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4132 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4133 GNUNET_break (GNUNET_OK ==
4134 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4137 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4139 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4141 pr->start_time.value + pr->ttl);
4143 GNUNET_STATISTICS_update (stats,
4144 gettext_noop ("# P2P searches received"),
4147 GNUNET_STATISTICS_update (stats,
4148 gettext_noop ("# P2P searches active"),
4152 /* calculate change in traffic preference */
4153 cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4154 /* process locally */
4155 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4156 type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4157 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4158 (pr->priority + 1));
4159 if (GNUNET_YES != pr->forward_only)
4162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4163 "Handing request for `%s' to datastore\n",
4164 GNUNET_h2s (&gm->query));
4166 pr->qe = GNUNET_DATASTORE_get (dsh,
4170 MAX_DATASTORE_QUEUE,
4172 &process_local_reply,
4176 GNUNET_STATISTICS_update (stats,
4177 gettext_noop ("# requests dropped by datastore (queue length limit)"),
4184 GNUNET_STATISTICS_update (stats,
4185 gettext_noop ("# requests forwarded due to high load"),
4190 /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */
4193 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4194 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4195 /* only one result, wait for datastore */
4196 if (GNUNET_YES != pr->forward_only)
4198 GNUNET_STATISTICS_update (stats,
4199 gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4205 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4206 pr->task = GNUNET_SCHEDULER_add_now (sched,
4207 &forward_request_task,
4211 /* make sure we don't track too many requests */
4212 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4214 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4215 GNUNET_assert (pr != NULL);
4216 destroy_pending_request (pr);
4222 /* **************************** CS GET Handling ************************ */
4226 * Handle START_SEARCH-message (search request from client).
4228 * @param cls closure
4229 * @param client identification of the client
4230 * @param message the actual message
4233 handle_start_search (void *cls,
4234 struct GNUNET_SERVER_Client *client,
4235 const struct GNUNET_MessageHeader *message)
4237 static GNUNET_HashCode all_zeros;
4238 const struct SearchMessage *sm;
4239 struct ClientList *cl;
4240 struct ClientRequestList *crl;
4241 struct PendingRequest *pr;
4244 enum GNUNET_BLOCK_Type type;
4246 msize = ntohs (message->size);
4247 if ( (msize < sizeof (struct SearchMessage)) ||
4248 (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4251 GNUNET_SERVER_receive_done (client,
4255 GNUNET_STATISTICS_update (stats,
4256 gettext_noop ("# client searches received"),
4259 sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4260 sm = (const struct SearchMessage*) message;
4261 type = ntohl (sm->type);
4263 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4264 "Received request for `%s' of type %u from local client\n",
4265 GNUNET_h2s (&sm->query),
4266 (unsigned int) type);
4269 while ( (cl != NULL) &&
4270 (cl->client != client) )
4274 cl = GNUNET_malloc (sizeof (struct ClientList));
4275 cl->client = client;
4276 GNUNET_SERVER_client_keep (client);
4277 cl->next = client_list;
4280 /* detect duplicate KBLOCK requests */
4281 if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4282 (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4283 (type == GNUNET_BLOCK_TYPE_ANY) )
4286 while ( (crl != NULL) &&
4287 ( (0 != memcmp (&crl->req->query,
4289 sizeof (GNUNET_HashCode))) ||
4290 (crl->req->type != type) ) )
4295 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4296 "Have existing request, merging content-seen lists.\n");
4299 /* Duplicate request (used to send long list of
4300 known/blocked results); merge 'pr->replies_seen'
4301 and update bloom filter */
4302 GNUNET_array_grow (pr->replies_seen,
4303 pr->replies_seen_size,
4304 pr->replies_seen_off + sc);
4305 memcpy (&pr->replies_seen[pr->replies_seen_off],
4307 sc * sizeof (GNUNET_HashCode));
4308 pr->replies_seen_off += sc;
4309 refresh_bloomfilter (pr);
4310 GNUNET_STATISTICS_update (stats,
4311 gettext_noop ("# client searches updated (merged content seen list)"),
4314 GNUNET_SERVER_receive_done (client,
4319 GNUNET_STATISTICS_update (stats,
4320 gettext_noop ("# client searches active"),
4323 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
4324 ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4325 crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4326 memset (crl, 0, sizeof (struct ClientRequestList));
4327 crl->client_list = cl;
4328 GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4333 pr->client_request_list = crl;
4334 GNUNET_array_grow (pr->replies_seen,
4335 pr->replies_seen_size,
4337 memcpy (pr->replies_seen,
4339 sc * sizeof (GNUNET_HashCode));
4340 pr->replies_seen_off = sc;
4341 pr->anonymity_level = ntohl (sm->anonymity_level);
4342 pr->start_time = GNUNET_TIME_absolute_get ();
4343 refresh_bloomfilter (pr);
4344 pr->query = sm->query;
4345 if (0 == (1 & ntohl (sm->options)))
4346 pr->local_only = GNUNET_NO;
4348 pr->local_only = GNUNET_YES;
4351 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4352 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4353 if (0 != memcmp (&sm->target,
4355 sizeof (GNUNET_HashCode)))
4356 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4358 case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4359 pr->namespace = (GNUNET_HashCode*) &pr[1];
4360 memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4365 GNUNET_break (GNUNET_OK ==
4366 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4369 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4370 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4371 type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4372 pr->qe = GNUNET_DATASTORE_get (dsh,
4376 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
4377 &process_local_reply,
4382 /* **************************** Startup ************************ */
4385 * Process fs requests.
4387 * @param s scheduler to use
4388 * @param server the initialized server
4389 * @param c configuration to use
4392 main_init (struct GNUNET_SCHEDULER_Handle *s,
4393 struct GNUNET_SERVER_Handle *server,
4394 const struct GNUNET_CONFIGURATION_Handle *c)
4396 static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4399 GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4401 GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4402 { &handle_p2p_migration_stop,
4403 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4404 sizeof (struct MigrationStopMessage) },
4407 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4408 {&GNUNET_FS_handle_index_start, NULL,
4409 GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4410 {&GNUNET_FS_handle_index_list_get, NULL,
4411 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4412 {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
4413 sizeof (struct UnindexMessage) },
4414 {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
4418 unsigned long long enc = 128;
4422 stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4423 min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4425 GNUNET_CONFIGURATION_get_value_number (cfg,
4427 "MAX_PENDING_REQUESTS",
4428 &max_pending_requests)) ||
4430 GNUNET_CONFIGURATION_get_value_number (cfg,
4432 "EXPECTED_NEIGHBOUR_COUNT",
4435 GNUNET_CONFIGURATION_get_value_time (cfg,
4437 "MIN_MIGRATION_DELAY",
4438 &min_migration_delay)) )
4440 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4441 _("Configuration fails to specify certain parameters, assuming default values."));
4443 connected_peers = GNUNET_CONTAINER_multihashmap_create (enc);
4444 query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4445 rt_entry_lifetime = GNUNET_LOAD_value_init ();
4446 peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4447 requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4448 core = GNUNET_CORE_connect (sched,
4450 GNUNET_TIME_UNIT_FOREVER_REL,
4453 &peer_connect_handler,
4454 &peer_disconnect_handler,
4461 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4462 _("Failed to connect to `%s' service.\n"),
4464 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4465 connected_peers = NULL;
4466 GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4467 query_request_map = NULL;
4468 GNUNET_LOAD_value_free (rt_entry_lifetime);
4469 rt_entry_lifetime = NULL;
4470 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4471 requests_by_expiration_heap = NULL;
4472 GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4473 peer_request_map = NULL;
4476 GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4479 return GNUNET_SYSERR;
4481 /* FIXME: distinguish between sending and storing in options? */
4482 if (active_migration)
4484 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4485 _("Content migration is enabled, will start to gather data\n"));
4486 consider_migration_gathering ();
4488 consider_dht_put_gathering (NULL);
4489 GNUNET_SERVER_disconnect_notify (server,
4490 &handle_client_disconnect,
4492 GNUNET_assert (GNUNET_OK ==
4493 GNUNET_CONFIGURATION_get_value_filename (cfg,
4497 GNUNET_DISK_directory_create (trustDirectory);
4498 GNUNET_SCHEDULER_add_with_priority (sched,
4499 GNUNET_SCHEDULER_PRIORITY_HIGH,
4500 &cron_flush_trust, NULL);
4503 GNUNET_SERVER_add_handlers (server, handlers);
4504 GNUNET_SCHEDULER_add_delayed (sched,
4505 GNUNET_TIME_UNIT_FOREVER_REL,
4513 * Process fs requests.
4515 * @param cls closure
4516 * @param sched scheduler to use
4517 * @param server the initialized server
4518 * @param cfg configuration to use
4522 struct GNUNET_SCHEDULER_Handle *sched,
4523 struct GNUNET_SERVER_Handle *server,
4524 const struct GNUNET_CONFIGURATION_Handle *cfg)
4526 active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4529 dsh = GNUNET_DATASTORE_connect (cfg,
4533 GNUNET_SCHEDULER_shutdown (sched);
4536 datastore_get_load = GNUNET_LOAD_value_init ();
4537 datastore_put_load = GNUNET_LOAD_value_init ();
4538 block_cfg = GNUNET_CONFIGURATION_create ();
4539 GNUNET_CONFIGURATION_set_value_string (block_cfg,
4543 block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4544 GNUNET_assert (NULL != block_ctx);
4545 dht_handle = GNUNET_DHT_connect (sched,
4548 if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4549 (GNUNET_OK != main_init (sched, server, cfg)) )
4551 GNUNET_SCHEDULER_shutdown (sched);
4552 GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4554 GNUNET_DHT_disconnect (dht_handle);
4556 GNUNET_BLOCK_context_destroy (block_ctx);
4558 GNUNET_CONFIGURATION_destroy (block_cfg);
4560 GNUNET_LOAD_value_free (datastore_get_load);
4561 datastore_get_load = NULL;
4562 GNUNET_LOAD_value_free (datastore_put_load);
4563 datastore_put_load = NULL;
4570 * The main function for the fs service.
4572 * @param argc number of arguments from the command line
4573 * @param argv command line arguments
4574 * @return 0 ok, 1 on error
4577 main (int argc, char *const *argv)
4579 return (GNUNET_OK ==
4580 GNUNET_SERVICE_run (argc,
4583 GNUNET_SERVICE_OPTION_NONE,
4584 &run, NULL)) ? 0 : 1;
4587 /* end of gnunet-service-fs.c */