bi-di migration options
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - more statistics
28  */
29 #include "platform.h"
30 #include <float.h>
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33 #include "gnunet_dht_service.h"
34 #include "gnunet_datastore_service.h"
35 #include "gnunet_load_lib.h"
36 #include "gnunet_peer_lib.h"
37 #include "gnunet_protocols.h"
38 #include "gnunet_signatures.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet-service-fs_indexing.h"
42 #include "fs.h"
43
44 #define DEBUG_FS GNUNET_NO
45
46 /**
47  * Should we introduce random latency in processing?  Required for proper
48  * implementation of GAP, but can be disabled for performance evaluation of
49  * the basic routing algorithm.
50  *
51  * Note that with delays enabled, performance can be significantly lower
52  * (several orders of magnitude in 2-peer test runs); if you want to
53  * measure throughput of other components, set this to NO.  Also, you
54  * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
55  * a rather wasteful mode of operation (that might still get the highest
56  * throughput overall).
57  *
58  * Performance measurements (for 50 MB file, 2 peers):
59  *
60  * - Without delays: 3300 kb/s
61  * - With    delays:  101 kb/s
62  */
63 #define SUPPORT_DELAYS GNUNET_NO
64
65 /**
66  * Size for the hash map for DHT requests from the FS
67  * service.  Should be about the number of concurrent
68  * DHT requests we plan to make.
69  */
70 #define FS_DHT_HT_SIZE 1024
71
72 /**
73  * At what frequency should our datastore load decrease
74  * automatically (since if we don't use it, clearly the
75  * load must be going down).
76  */
77 #define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
78
79 /**
80  * How often do we flush trust values to disk?
81  */
82 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
83
84 /**
85  * How often do we at most PUT content into the DHT?
86  */
87 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
88
89 /**
90  * Inverse of the probability that we will submit the same query
91  * to the same peer again.  If the same peer already got the query
92  * repeatedly recently, the probability is multiplied by the inverse
93  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
94  * plus MAX_CORK_DELAY (so roughly every 3.5s).
95  *
96  * Note that this factor is a key influence to performance in small
97  * networks (especially test networks of 2 peers) because if there is
98  * only a single peer with the data, this value will determine how
99  * soon we might re-try.  For example, a value of 3 can result in 
100  * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
101  * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
102  * rather inefficient in larger networks, hence picking 1 is in 
103  * general not the best choice.
104  *
105  * Performance measurements (for 50 MB file, 2 peers, no delays):
106  *
107  * - 1: 3300 kb/s (consistently)
108  * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s
109  * - 5:  759 kb/s, 968 kb/s, 1160 kb/s
110  *
111  * Note that this does NOT mean that the value should be 1 since
112  * a 2-peer network is far from representative here (and this fails
113  * to take into consideration bandwidth wasted by repeatedly 
114  * sending queries to peers that don't have the content).  Also,
115  * it is expected that higher values lead to more inconsistent
116  * measurements since this only affects lost messages towards the
117  * end of the download.
118  *
119  * Finally, we should probably consider changing this and making
120  * it dependent on the number of connected peers or a related
121  * metric (bad magic constants...).
122  */
123 #define RETRY_PROBABILITY_INV 1
124
125 /**
126  * What is the maximum delay for a P2P FS message (in our interaction
127  * with core)?  FS-internal delays are another story.  The value is
128  * chosen based on the 32k block size.  Given that peers typcially
129  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
130  * transmit one message even to the lowest-bandwidth peers.
131  */
132 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
133
134 /**
135  * Maximum number of requests (from other peers, overall) that we're
136  * willing to have pending at any given point in time.  Can be changed
137  * via the configuration file (32k is just the default).
138  */
139 static unsigned long long max_pending_requests = (32 * 1024);
140
141
142 /**
143  * Information we keep for each pending reply.  The
144  * actual message follows at the end of this struct.
145  */
146 struct PendingMessage;
147
148 /**
149  * Function called upon completion of a transmission.
150  *
151  * @param cls closure
152  * @param pid ID of receiving peer, 0 on transmission error
153  */
154 typedef void (*TransmissionContinuation)(void * cls, 
155                                          GNUNET_PEER_Id tpid);
156
157
158 /**
159  * Information we keep for each pending message (GET/PUT).  The
160  * actual message follows at the end of this struct.
161  */
162 struct PendingMessage
163 {
164   /**
165    * This is a doubly-linked list of messages to the same peer.
166    */
167   struct PendingMessage *next;
168
169   /**
170    * This is a doubly-linked list of messages to the same peer.
171    */
172   struct PendingMessage *prev;
173
174   /**
175    * Entry in pending message list for this pending message.
176    */ 
177   struct PendingMessageList *pml;  
178
179   /**
180    * Function to call immediately once we have transmitted this
181    * message.
182    */
183   TransmissionContinuation cont;
184
185   /**
186    * Closure for cont.
187    */
188   void *cont_cls;
189
190   /**
191    * Do not transmit this pending message until this deadline.
192    */
193   struct GNUNET_TIME_Absolute delay_until;
194
195   /**
196    * Size of the reply; actual reply message follows
197    * at the end of this struct.
198    */
199   size_t msize;
200   
201   /**
202    * How important is this message for us?
203    */
204   uint32_t priority;
205  
206 };
207
208
209 /**
210  * Information about a peer that we are connected to.
211  * We track data that is useful for determining which
212  * peers should receive our requests.  We also keep
213  * a list of messages to transmit to this peer.
214  */
215 struct ConnectedPeer
216 {
217
218   /**
219    * List of the last clients for which this peer successfully
220    * answered a query.
221    */
222   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
223
224   /**
225    * List of the last PIDs for which
226    * this peer successfully answered a query;
227    * We use 0 to indicate no successful reply.
228    */
229   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
230
231   /**
232    * Average delay between sending the peer a request and
233    * getting a reply (only calculated over the requests for
234    * which we actually got a reply).   Calculated
235    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
236    */ 
237   struct GNUNET_TIME_Relative avg_delay;
238
239   /**
240    * Point in time until which this peer does not want us to migrate content
241    * to it.
242    */
243   struct GNUNET_TIME_Absolute migration_blocked;
244
245   /**
246    * Time until when we blocked this peer from migrating
247    * data to us.
248    */
249   struct GNUNET_TIME_Absolute last_migration_block;
250
251   /**
252    * Transmission times for the last MAX_QUEUE_PER_PEER
253    * requests for this peer.  Used as a ring buffer, current
254    * offset is stored in 'last_request_times_off'.  If the
255    * oldest entry is more recent than the 'avg_delay', we should
256    * not send any more requests right now.
257    */
258   struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
259
260   /**
261    * Handle for an active request for transmission to this
262    * peer, or NULL.
263    */
264   struct GNUNET_CORE_TransmitHandle *cth;
265
266   /**
267    * Messages (replies, queries, content migration) we would like to
268    * send to this peer in the near future.  Sorted by priority, head.
269    */
270   struct PendingMessage *pending_messages_head;
271
272   /**
273    * Messages (replies, queries, content migration) we would like to
274    * send to this peer in the near future.  Sorted by priority, tail.
275    */
276   struct PendingMessage *pending_messages_tail;
277
278   /**
279    * How long does it typically take for us to transmit a message
280    * to this peer?  (delay between the request being issued and
281    * the callback being invoked).
282    */
283   struct GNUNET_LOAD_Value *transmission_delay;
284
285   /**
286    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
287    */
288   struct GNUNET_CORE_InformationRequestContext *irc;
289
290   /**
291    * Request for which 'irc' is currently active (or NULL).
292    */
293   struct PendingRequest *pr;
294
295   /**
296    * Time when the last transmission request was issued.
297    */
298   struct GNUNET_TIME_Absolute last_transmission_request_start;
299
300   /**
301    * ID of delay task for scheduling transmission.
302    */
303   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
304
305   /**
306    * Average priority of successful replies.  Calculated
307    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
308    */
309   double avg_priority;
310
311   /**
312    * Increase in traffic preference still to be submitted
313    * to the core service for this peer.
314    */
315   uint64_t inc_preference;
316
317   /**
318    * Trust rating for this peer
319    */
320   uint32_t trust;
321
322   /**
323    * Trust rating for this peer on disk.
324    */
325   uint32_t disk_trust;
326
327   /**
328    * The peer's identity.
329    */
330   GNUNET_PEER_Id pid;  
331
332   /**
333    * Size of the linked list of 'pending_messages'.
334    */
335   unsigned int pending_requests;
336
337   /**
338    * Which offset in "last_p2p_replies" will be updated next?
339    * (we go round-robin).
340    */
341   unsigned int last_p2p_replies_woff;
342
343   /**
344    * Which offset in "last_client_replies" will be updated next?
345    * (we go round-robin).
346    */
347   unsigned int last_client_replies_woff;
348
349   /**
350    * Current offset into 'last_request_times' ring buffer.
351    */
352   unsigned int last_request_times_off;
353
354 };
355
356
357 /**
358  * Information we keep for each pending request.  We should try to
359  * keep this struct as small as possible since its memory consumption
360  * is key to how many requests we can have pending at once.
361  */
362 struct PendingRequest;
363
364
365 /**
366  * Doubly-linked list of requests we are performing
367  * on behalf of the same client.
368  */
369 struct ClientRequestList
370 {
371
372   /**
373    * This is a doubly-linked list.
374    */
375   struct ClientRequestList *next;
376
377   /**
378    * This is a doubly-linked list.
379    */
380   struct ClientRequestList *prev;
381
382   /**
383    * Request this entry represents.
384    */
385   struct PendingRequest *req;
386
387   /**
388    * Client list this request belongs to.
389    */
390   struct ClientList *client_list;
391
392 };
393
394
395 /**
396  * Replies to be transmitted to the client.  The actual
397  * response message is allocated after this struct.
398  */
399 struct ClientResponseMessage
400 {
401   /**
402    * This is a doubly-linked list.
403    */
404   struct ClientResponseMessage *next;
405
406   /**
407    * This is a doubly-linked list.
408    */
409   struct ClientResponseMessage *prev;
410
411   /**
412    * Client list entry this response belongs to.
413    */
414   struct ClientList *client_list;
415
416   /**
417    * Number of bytes in the response.
418    */
419   size_t msize;
420 };
421
422
423 /**
424  * Linked list of clients we are performing requests
425  * for right now.
426  */
427 struct ClientList
428 {
429   /**
430    * This is a linked list.
431    */
432   struct ClientList *next;
433
434   /**
435    * ID of a client making a request, NULL if this entry is for a
436    * peer.
437    */
438   struct GNUNET_SERVER_Client *client;
439
440   /**
441    * Head of list of requests performed on behalf
442    * of this client right now.
443    */
444   struct ClientRequestList *rl_head;
445
446   /**
447    * Tail of list of requests performed on behalf
448    * of this client right now.
449    */
450   struct ClientRequestList *rl_tail;
451
452   /**
453    * Head of linked list of responses.
454    */
455   struct ClientResponseMessage *res_head;
456
457   /**
458    * Tail of linked list of responses.
459    */
460   struct ClientResponseMessage *res_tail;
461
462   /**
463    * Context for sending replies.
464    */
465   struct GNUNET_CONNECTION_TransmitHandle *th;
466
467 };
468
469
470 /**
471  * Information about a peer that we have forwarded this
472  * request to already.  
473  */
474 struct UsedTargetEntry
475 {
476   /**
477    * What was the last time we have transmitted this request to this
478    * peer?
479    */
480   struct GNUNET_TIME_Absolute last_request_time;
481
482   /**
483    * How often have we transmitted this request to this peer?
484    */
485   unsigned int num_requests;
486
487   /**
488    * PID of the target peer.
489    */
490   GNUNET_PEER_Id pid;
491
492 };
493
494
495
496
497
498 /**
499  * Doubly-linked list of messages we are performing
500  * due to a pending request.
501  */
502 struct PendingMessageList
503 {
504
505   /**
506    * This is a doubly-linked list of messages on behalf of the same request.
507    */
508   struct PendingMessageList *next;
509
510   /**
511    * This is a doubly-linked list of messages on behalf of the same request.
512    */
513   struct PendingMessageList *prev;
514
515   /**
516    * Message this entry represents.
517    */
518   struct PendingMessage *pm;
519
520   /**
521    * Request this entry belongs to.
522    */
523   struct PendingRequest *req;
524
525   /**
526    * Peer this message is targeted for.
527    */
528   struct ConnectedPeer *target;
529
530 };
531
532
533 /**
534  * Information we keep for each pending request.  We should try to
535  * keep this struct as small as possible since its memory consumption
536  * is key to how many requests we can have pending at once.
537  */
538 struct PendingRequest
539 {
540
541   /**
542    * If this request was made by a client, this is our entry in the
543    * client request list; otherwise NULL.
544    */
545   struct ClientRequestList *client_request_list;
546
547   /**
548    * Entry of peer responsible for this entry (if this request
549    * was made by a peer).
550    */
551   struct ConnectedPeer *cp;
552
553   /**
554    * If this is a namespace query, pointer to the hash of the public
555    * key of the namespace; otherwise NULL.  Pointer will be to the 
556    * end of this struct (so no need to free it).
557    */
558   const GNUNET_HashCode *namespace;
559
560   /**
561    * Bloomfilter we use to filter out replies that we don't care about
562    * (anymore).  NULL as long as we are interested in all replies.
563    */
564   struct GNUNET_CONTAINER_BloomFilter *bf;
565
566   /**
567    * Reference to DHT get operation for this request (or NULL).
568    */
569   struct GNUNET_DHT_GetHandle *dht_get;
570
571   /**
572    * Context of our GNUNET_CORE_peer_change_preference call.
573    */
574   struct ConnectedPeer *pirc;
575
576   /**
577    * Hash code of all replies that we have seen so far (only valid
578    * if client is not NULL since we only track replies like this for
579    * our own clients).
580    */
581   GNUNET_HashCode *replies_seen;
582
583   /**
584    * Node in the heap representing this entry; NULL
585    * if we have no heap node.
586    */
587   struct GNUNET_CONTAINER_HeapNode *hnode;
588
589   /**
590    * Head of list of messages being performed on behalf of this
591    * request.
592    */
593   struct PendingMessageList *pending_head;
594
595   /**
596    * Tail of list of messages being performed on behalf of this
597    * request.
598    */
599   struct PendingMessageList *pending_tail;
600
601   /**
602    * When did we first see this request (form this peer), or, if our
603    * client is initiating, when did we last initiate a search?
604    */
605   struct GNUNET_TIME_Absolute start_time;
606
607   /**
608    * The query that this request is for.
609    */
610   GNUNET_HashCode query;
611
612   /**
613    * The task responsible for transmitting queries
614    * for this request.
615    */
616   GNUNET_SCHEDULER_TaskIdentifier task;
617
618   /**
619    * (Interned) Peer identifier that identifies a preferred target
620    * for requests.
621    */
622   GNUNET_PEER_Id target_pid;
623
624   /**
625    * (Interned) Peer identifiers of peers that have already
626    * received our query for this content.
627    */
628   struct UsedTargetEntry *used_targets;
629   
630   /**
631    * Our entry in the queue (non-NULL while we wait for our
632    * turn to interact with the local database).
633    */
634   struct GNUNET_DATASTORE_QueueEntry *qe;
635
636   /**
637    * Size of the 'bf' (in bytes).
638    */
639   size_t bf_size;
640
641   /**
642    * Desired anonymity level; only valid for requests from a local client.
643    */
644   uint32_t anonymity_level;
645
646   /**
647    * How many entries in "used_targets" are actually valid?
648    */
649   unsigned int used_targets_off;
650
651   /**
652    * How long is the "used_targets" array?
653    */
654   unsigned int used_targets_size;
655
656   /**
657    * Number of results found for this request.
658    */
659   unsigned int results_found;
660
661   /**
662    * How many entries in "replies_seen" are actually valid?
663    */
664   unsigned int replies_seen_off;
665
666   /**
667    * How long is the "replies_seen" array?
668    */
669   unsigned int replies_seen_size;
670   
671   /**
672    * Priority with which this request was made.  If one of our clients
673    * made the request, then this is the current priority that we are
674    * using when initiating the request.  This value is used when
675    * we decide to reward other peers with trust for providing a reply.
676    */
677   uint32_t priority;
678
679   /**
680    * Priority points left for us to spend when forwarding this request
681    * to other peers.
682    */
683   uint32_t remaining_priority;
684
685   /**
686    * Number to mingle hashes for bloom-filter tests with.
687    */
688   int32_t mingle;
689
690   /**
691    * TTL with which we saw this request (or, if we initiated, TTL that
692    * we used for the request).
693    */
694   int32_t ttl;
695   
696   /**
697    * Type of the content that this request is for.
698    */
699   enum GNUNET_BLOCK_Type type;
700
701   /**
702    * Remove this request after transmission of the current response.
703    */
704   int8_t do_remove;
705
706   /**
707    * GNUNET_YES if we should not forward this request to other peers.
708    */
709   int8_t local_only;
710
711   /**
712    * GNUNET_YES if we should not forward this request to other peers.
713    */
714   int8_t forward_only;
715
716 };
717
718
719 /**
720  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
721  */
722 struct MigrationReadyBlock
723 {
724
725   /**
726    * This is a doubly-linked list.
727    */
728   struct MigrationReadyBlock *next;
729
730   /**
731    * This is a doubly-linked list.
732    */
733   struct MigrationReadyBlock *prev;
734
735   /**
736    * Query for the block.
737    */
738   GNUNET_HashCode query;
739
740   /**
741    * When does this block expire? 
742    */
743   struct GNUNET_TIME_Absolute expiration;
744
745   /**
746    * Peers we would consider forwarding this
747    * block to.  Zero for empty entries.
748    */
749   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
750
751   /**
752    * Size of the block.
753    */
754   size_t size;
755
756   /**
757    *  Number of targets already used.
758    */
759   unsigned int used_targets;
760
761   /**
762    * Type of the block.
763    */
764   enum GNUNET_BLOCK_Type type;
765 };
766
767
768 /**
769  * Our connection to the datastore.
770  */
771 static struct GNUNET_DATASTORE_Handle *dsh;
772
773 /**
774  * Our block context.
775  */
776 static struct GNUNET_BLOCK_Context *block_ctx;
777
778 /**
779  * Our block configuration.
780  */
781 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
782
783 /**
784  * Our configuration.
785  */
786 static const struct GNUNET_CONFIGURATION_Handle *cfg;
787
788 /**
789  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
790  */
791 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
792
793 /**
794  * Map of peer identifiers to "struct PendingRequest" (for that peer).
795  */
796 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
797
798 /**
799  * Map of query identifiers to "struct PendingRequest" (for that query).
800  */
801 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
802
803 /**
804  * Heap with the request that will expire next at the top.  Contains
805  * pointers of type "struct PendingRequest*"; these will *also* be
806  * aliased from the "requests_by_peer" data structures and the
807  * "requests_by_query" table.  Note that requests from our clients
808  * don't expire and are thus NOT in the "requests_by_expiration"
809  * (or the "requests_by_peer" tables).
810  */
811 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
812
813 /**
814  * Handle for reporting statistics.
815  */
816 static struct GNUNET_STATISTICS_Handle *stats;
817
818 /**
819  * Linked list of clients we are currently processing requests for.
820  */
821 static struct ClientList *client_list;
822
823 /**
824  * Pointer to handle to the core service (points to NULL until we've
825  * connected to it).
826  */
827 static struct GNUNET_CORE_Handle *core;
828
829 /**
830  * Head of linked list of blocks that can be migrated.
831  */
832 static struct MigrationReadyBlock *mig_head;
833
834 /**
835  * Tail of linked list of blocks that can be migrated.
836  */
837 static struct MigrationReadyBlock *mig_tail;
838
839 /**
840  * Request to datastore for migration (or NULL).
841  */
842 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
843
844 /**
845  * Request to datastore for DHT PUTs (or NULL).
846  */
847 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
848
849 /**
850  * Type we will request for the next DHT PUT round from the datastore.
851  */
852 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
853
854 /**
855  * Where do we store trust information?
856  */
857 static char *trustDirectory;
858
859 /**
860  * ID of task that collects blocks for migration.
861  */
862 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
863
864 /**
865  * ID of task that collects blocks for DHT PUTs.
866  */
867 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
868
869 /**
870  * What is the maximum frequency at which we are allowed to
871  * poll the datastore for migration content?
872  */
873 static struct GNUNET_TIME_Relative min_migration_delay;
874
875 /**
876  * Handle for DHT operations.
877  */
878 static struct GNUNET_DHT_Handle *dht_handle;
879
880 /**
881  * Size of the doubly-linked list of migration blocks.
882  */
883 static unsigned int mig_size;
884
885 /**
886  * Are we allowed to migrate content to this peer.
887  */
888 static int active_to_migration;
889
890 /**
891  * Are we allowed to push out content from this peer.
892  */
893 static int active_from_migration;
894
895 /**
896  * How many entires with zero anonymity do we currently estimate
897  * to have in the database?
898  */
899 static unsigned int zero_anonymity_count_estimate;
900
901 /**
902  * Typical priorities we're seeing from other peers right now.  Since
903  * most priorities will be zero, this value is the weighted average of
904  * non-zero priorities seen "recently".  In order to ensure that new
905  * values do not dramatically change the ratio, values are first
906  * "capped" to a reasonable range (+N of the current value) and then
907  * averaged into the existing value by a ratio of 1:N.  Hence
908  * receiving the largest possible priority can still only raise our
909  * "current_priorities" by at most 1.
910  */
911 static double current_priorities;
912
913 /**
914  * Datastore 'GET' load tracking.
915  */
916 static struct GNUNET_LOAD_Value *datastore_get_load;
917
918 /**
919  * Datastore 'PUT' load tracking.
920  */
921 static struct GNUNET_LOAD_Value *datastore_put_load;
922
923 /**
924  * How long do requests typically stay in the routing table?
925  */
926 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
927
928 /**
929  * We've just now completed a datastore request.  Update our
930  * datastore load calculations.
931  *
932  * @param start time when the datastore request was issued
933  */
934 static void
935 update_datastore_delays (struct GNUNET_TIME_Absolute start)
936 {
937   struct GNUNET_TIME_Relative delay;
938
939   delay = GNUNET_TIME_absolute_get_duration (start);
940   GNUNET_LOAD_update (datastore_get_load,
941                       delay.rel_value);
942 }
943
944
945 /**
946  * Get the filename under which we would store the GNUNET_HELLO_Message
947  * for the given host and protocol.
948  * @return filename of the form DIRECTORY/HOSTID
949  */
950 static char *
951 get_trust_filename (const struct GNUNET_PeerIdentity *id)
952 {
953   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
954   char *fn;
955
956   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
957   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
958   return fn;
959 }
960
961
962
963 /**
964  * Transmit messages by copying it to the target buffer
965  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
966  * for writing in the meantime.  In that case, do nothing
967  * (the disconnect or shutdown handler will take care of the rest).
968  * If we were able to transmit messages and there are still more
969  * pending, ask core again for further calls to this function.
970  *
971  * @param cls closure, pointer to the 'struct ConnectedPeer*'
972  * @param size number of bytes available in buf
973  * @param buf where the callee should write the message
974  * @return number of bytes written to buf
975  */
976 static size_t
977 transmit_to_peer (void *cls,
978                   size_t size, void *buf);
979
980
981 /* ******************* clean up functions ************************ */
982
983 /**
984  * Delete the given migration block.
985  *
986  * @param mb block to delete
987  */
988 static void
989 delete_migration_block (struct MigrationReadyBlock *mb)
990 {
991   GNUNET_CONTAINER_DLL_remove (mig_head,
992                                mig_tail,
993                                mb);
994   GNUNET_PEER_decrement_rcs (mb->target_list,
995                              MIGRATION_LIST_SIZE);
996   mig_size--;
997   GNUNET_free (mb);
998 }
999
1000
1001 /**
1002  * Compare the distance of two peers to a key.
1003  *
1004  * @param key key
1005  * @param p1 first peer
1006  * @param p2 second peer
1007  * @return GNUNET_YES if P1 is closer to key than P2
1008  */
1009 static int
1010 is_closer (const GNUNET_HashCode *key,
1011            const struct GNUNET_PeerIdentity *p1,
1012            const struct GNUNET_PeerIdentity *p2)
1013 {
1014   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
1015                                     &p2->hashPubKey,
1016                                     key);
1017 }
1018
1019
1020 /**
1021  * Consider migrating content to a given peer.
1022  *
1023  * @param cls 'struct MigrationReadyBlock*' to select
1024  *            targets for (or NULL for none)
1025  * @param key ID of the peer 
1026  * @param value 'struct ConnectedPeer' of the peer
1027  * @return GNUNET_YES (always continue iteration)
1028  */
1029 static int
1030 consider_migration (void *cls,
1031                     const GNUNET_HashCode *key,
1032                     void *value)
1033 {
1034   struct MigrationReadyBlock *mb = cls;
1035   struct ConnectedPeer *cp = value;
1036   struct MigrationReadyBlock *pos;
1037   struct GNUNET_PeerIdentity cppid;
1038   struct GNUNET_PeerIdentity otherpid;
1039   struct GNUNET_PeerIdentity worstpid;
1040   size_t msize;
1041   unsigned int i;
1042   unsigned int repl;
1043   
1044   /* consider 'cp' as a migration target for mb */
1045   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
1046     return GNUNET_YES; /* peer has requested no migration! */
1047   if (mb != NULL)
1048     {
1049       GNUNET_PEER_resolve (cp->pid,
1050                            &cppid);
1051       repl = MIGRATION_LIST_SIZE;
1052       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1053         {
1054           if (mb->target_list[i] == 0)
1055             {
1056               mb->target_list[i] = cp->pid;
1057               GNUNET_PEER_change_rc (mb->target_list[i], 1);
1058               repl = MIGRATION_LIST_SIZE;
1059               break;
1060             }
1061           GNUNET_PEER_resolve (mb->target_list[i],
1062                                &otherpid);
1063           if ( (repl == MIGRATION_LIST_SIZE) &&
1064                is_closer (&mb->query,
1065                           &cppid,
1066                           &otherpid)) 
1067             {
1068               repl = i;
1069               worstpid = otherpid;
1070             }
1071           else if ( (repl != MIGRATION_LIST_SIZE) &&
1072                     (is_closer (&mb->query,
1073                                 &worstpid,
1074                                 &otherpid) ) )
1075             {
1076               repl = i;
1077               worstpid = otherpid;
1078             }       
1079         }
1080       if (repl != MIGRATION_LIST_SIZE) 
1081         {
1082           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1083           mb->target_list[repl] = cp->pid;
1084           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1085         }
1086     }
1087
1088   /* consider scheduling transmission to cp for content migration */
1089   if (cp->cth != NULL)        
1090     return GNUNET_YES; 
1091   msize = 0;
1092   pos = mig_head;
1093   while (pos != NULL)
1094     {
1095       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1096         {
1097           if (cp->pid == pos->target_list[i])
1098             {
1099               if (msize == 0)
1100                 msize = pos->size;
1101               else
1102                 msize = GNUNET_MIN (msize,
1103                                     pos->size);
1104               break;
1105             }
1106         }
1107       pos = pos->next;
1108     }
1109   if (msize == 0)
1110     return GNUNET_YES; /* no content available */
1111 #if DEBUG_FS
1112   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113               "Trying to migrate at least %u bytes to peer `%s'\n",
1114               msize,
1115               GNUNET_h2s (key));
1116 #endif
1117   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1118     {
1119       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1120       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1121     }
1122   cp->cth 
1123     = GNUNET_CORE_notify_transmit_ready (core,
1124                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1125                                          (const struct GNUNET_PeerIdentity*) key,
1126                                          msize + sizeof (struct PutMessage),
1127                                          &transmit_to_peer,
1128                                          cp);
1129   return GNUNET_YES;
1130 }
1131
1132
1133 /**
1134  * Task that is run periodically to obtain blocks for content
1135  * migration
1136  * 
1137  * @param cls unused
1138  * @param tc scheduler context (also unused)
1139  */
1140 static void
1141 gather_migration_blocks (void *cls,
1142                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1143
1144
1145
1146
1147 /**
1148  * Task that is run periodically to obtain blocks for DHT PUTs.
1149  * 
1150  * @param cls type of blocks to gather
1151  * @param tc scheduler context (unused)
1152  */
1153 static void
1154 gather_dht_put_blocks (void *cls,
1155                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1156
1157
1158 /**
1159  * If the migration task is not currently running, consider
1160  * (re)scheduling it with the appropriate delay.
1161  */
1162 static void
1163 consider_migration_gathering ()
1164 {
1165   struct GNUNET_TIME_Relative delay;
1166
1167   if (dsh == NULL)
1168     return;
1169   if (mig_qe != NULL)
1170     return;
1171   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1172     return;
1173   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1174                                          mig_size);
1175   delay = GNUNET_TIME_relative_divide (delay,
1176                                        MAX_MIGRATION_QUEUE);
1177   delay = GNUNET_TIME_relative_max (delay,
1178                                     min_migration_delay);
1179   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
1180                                            &gather_migration_blocks,
1181                                            NULL);
1182 }
1183
1184
1185 /**
1186  * If the DHT PUT gathering task is not currently running, consider
1187  * (re)scheduling it with the appropriate delay.
1188  */
1189 static void
1190 consider_dht_put_gathering (void *cls)
1191 {
1192   struct GNUNET_TIME_Relative delay;
1193
1194   if (dsh == NULL)
1195     return;
1196   if (dht_qe != NULL)
1197     return;
1198   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1199     return;
1200   if (zero_anonymity_count_estimate > 0)
1201     {
1202       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1203                                            zero_anonymity_count_estimate);
1204       delay = GNUNET_TIME_relative_min (delay,
1205                                         MAX_DHT_PUT_FREQ);
1206     }
1207   else
1208     {
1209       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1210          (hopefully) appear */
1211       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1212     }
1213   dht_task = GNUNET_SCHEDULER_add_delayed (delay,
1214                                            &gather_dht_put_blocks,
1215                                            cls);
1216 }
1217
1218
1219 /**
1220  * Process content offered for migration.
1221  *
1222  * @param cls closure
1223  * @param key key for the content
1224  * @param size number of bytes in data
1225  * @param data content stored
1226  * @param type type of the content
1227  * @param priority priority of the content
1228  * @param anonymity anonymity-level for the content
1229  * @param expiration expiration time for the content
1230  * @param uid unique identifier for the datum;
1231  *        maybe 0 if no unique identifier is available
1232  */
1233 static void
1234 process_migration_content (void *cls,
1235                            const GNUNET_HashCode * key,
1236                            size_t size,
1237                            const void *data,
1238                            enum GNUNET_BLOCK_Type type,
1239                            uint32_t priority,
1240                            uint32_t anonymity,
1241                            struct GNUNET_TIME_Absolute
1242                            expiration, uint64_t uid)
1243 {
1244   struct MigrationReadyBlock *mb;
1245   
1246   if (key == NULL)
1247     {
1248       mig_qe = NULL;
1249       if (mig_size < MAX_MIGRATION_QUEUE)  
1250         consider_migration_gathering ();
1251       return;
1252     }
1253   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1254     {
1255       if (GNUNET_OK !=
1256           GNUNET_FS_handle_on_demand_block (key, size, data,
1257                                             type, priority, anonymity,
1258                                             expiration, uid, 
1259                                             &process_migration_content,
1260                                             NULL))
1261         {
1262           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1263         }
1264       return;
1265     }
1266 #if DEBUG_FS
1267   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1268               "Retrieved block `%s' of type %u for migration\n",
1269               GNUNET_h2s (key),
1270               type);
1271 #endif
1272   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1273   mb->query = *key;
1274   mb->expiration = expiration;
1275   mb->size = size;
1276   mb->type = type;
1277   memcpy (&mb[1], data, size);
1278   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1279                                      mig_tail,
1280                                      mig_tail,
1281                                      mb);
1282   mig_size++;
1283   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1284                                          &consider_migration,
1285                                          mb);
1286   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1287 }
1288
1289
1290 /**
1291  * Function called upon completion of the DHT PUT operation.
1292  */
1293 static void
1294 dht_put_continuation (void *cls,
1295                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1296 {
1297   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1298 }
1299
1300
1301 /**
1302  * Store content in DHT.
1303  *
1304  * @param cls closure
1305  * @param key key for the content
1306  * @param size number of bytes in data
1307  * @param data content stored
1308  * @param type type of the content
1309  * @param priority priority of the content
1310  * @param anonymity anonymity-level for the content
1311  * @param expiration expiration time for the content
1312  * @param uid unique identifier for the datum;
1313  *        maybe 0 if no unique identifier is available
1314  */
1315 static void
1316 process_dht_put_content (void *cls,
1317                          const GNUNET_HashCode * key,
1318                          size_t size,
1319                          const void *data,
1320                          enum GNUNET_BLOCK_Type type,
1321                          uint32_t priority,
1322                          uint32_t anonymity,
1323                          struct GNUNET_TIME_Absolute
1324                          expiration, uint64_t uid)
1325
1326   static unsigned int counter;
1327   static GNUNET_HashCode last_vhash;
1328   static GNUNET_HashCode vhash;
1329
1330   if (key == NULL)
1331     {
1332       dht_qe = NULL;
1333       consider_dht_put_gathering (cls);
1334       return;
1335     }
1336   /* slightly funky code to estimate the total number of values with zero
1337      anonymity from the maximum observed length of a monotonically increasing 
1338      sequence of hashes over the contents */
1339   GNUNET_CRYPTO_hash (data, size, &vhash);
1340   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1341     {
1342       if (zero_anonymity_count_estimate > 0)
1343         zero_anonymity_count_estimate /= 2;
1344       counter = 0;
1345     }
1346   last_vhash = vhash;
1347   if (counter < 31)
1348     counter++;
1349   if (zero_anonymity_count_estimate < (1 << counter))
1350     zero_anonymity_count_estimate = (1 << counter);
1351 #if DEBUG_FS
1352   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1353               "Retrieved block `%s' of type %u for DHT PUT\n",
1354               GNUNET_h2s (key),
1355               type);
1356 #endif
1357   GNUNET_DHT_put (dht_handle,
1358                   key,
1359                   DEFAULT_PUT_REPLICATION,
1360                   GNUNET_DHT_RO_NONE,
1361                   type,
1362                   size,
1363                   data,
1364                   expiration,
1365                   GNUNET_TIME_UNIT_FOREVER_REL,
1366                   &dht_put_continuation,
1367                   cls);
1368 }
1369
1370
1371 /**
1372  * Task that is run periodically to obtain blocks for content
1373  * migration
1374  * 
1375  * @param cls unused
1376  * @param tc scheduler context (also unused)
1377  */
1378 static void
1379 gather_migration_blocks (void *cls,
1380                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1381 {
1382   mig_task = GNUNET_SCHEDULER_NO_TASK;
1383   if (dsh != NULL)
1384     {
1385       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1386                                             GNUNET_TIME_UNIT_FOREVER_REL,
1387                                             &process_migration_content, NULL);
1388       GNUNET_assert (mig_qe != NULL);
1389     }
1390 }
1391
1392
1393 /**
1394  * Task that is run periodically to obtain blocks for DHT PUTs.
1395  * 
1396  * @param cls type of blocks to gather
1397  * @param tc scheduler context (unused)
1398  */
1399 static void
1400 gather_dht_put_blocks (void *cls,
1401                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1402 {
1403   dht_task = GNUNET_SCHEDULER_NO_TASK;
1404   if (dsh != NULL)
1405     {
1406       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1407         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1408       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1409                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1410                                                     dht_put_type++,
1411                                                     &process_dht_put_content, NULL);
1412       GNUNET_assert (dht_qe != NULL);
1413     }
1414 }
1415
1416
1417 /**
1418  * We're done with a particular message list entry.
1419  * Free all associated resources.
1420  * 
1421  * @param pml entry to destroy
1422  */
1423 static void
1424 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1425 {
1426   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1427                                pml->req->pending_tail,
1428                                pml);
1429   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1430                                pml->target->pending_messages_tail,
1431                                pml->pm);
1432   pml->target->pending_requests--;
1433   GNUNET_free (pml->pm);
1434   GNUNET_free (pml);
1435 }
1436
1437
1438 /**
1439  * Destroy the given pending message (and call the respective
1440  * continuation).
1441  *
1442  * @param pm message to destroy
1443  * @param tpid id of peer that the message was delivered to, or 0 for none
1444  */
1445 static void
1446 destroy_pending_message (struct PendingMessage *pm,
1447                          GNUNET_PEER_Id tpid)
1448 {
1449   struct PendingMessageList *pml = pm->pml;
1450   TransmissionContinuation cont;
1451   void *cont_cls;
1452
1453   cont = pm->cont;
1454   cont_cls = pm->cont_cls;
1455   if (pml != NULL)
1456     {
1457       GNUNET_assert (pml->pm == pm);
1458       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1459       destroy_pending_message_list_entry (pml);
1460     }
1461   else
1462     {
1463       GNUNET_free (pm);
1464     }
1465   if (cont != NULL)
1466     cont (cont_cls, tpid);  
1467 }
1468
1469
1470 /**
1471  * We're done processing a particular request.
1472  * Free all associated resources.
1473  *
1474  * @param pr request to destroy
1475  */
1476 static void
1477 destroy_pending_request (struct PendingRequest *pr)
1478 {
1479   struct GNUNET_PeerIdentity pid;
1480   unsigned int i;
1481
1482   if (pr->hnode != NULL)
1483     {
1484       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1485                                          pr->hnode);
1486       pr->hnode = NULL;
1487     }
1488   if (NULL == pr->client_request_list)
1489     {
1490       GNUNET_STATISTICS_update (stats,
1491                                 gettext_noop ("# P2P searches active"),
1492                                 -1,
1493                                 GNUNET_NO);
1494     }
1495   else
1496     {
1497       GNUNET_STATISTICS_update (stats,
1498                                 gettext_noop ("# client searches active"),
1499                                 -1,
1500                                 GNUNET_NO);
1501     }
1502   if (GNUNET_YES == 
1503       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1504                                             &pr->query,
1505                                             pr))
1506     {
1507       GNUNET_LOAD_update (rt_entry_lifetime,
1508                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
1509     }
1510   if (pr->qe != NULL)
1511      {
1512       GNUNET_DATASTORE_cancel (pr->qe);
1513       pr->qe = NULL;
1514     }
1515   if (pr->dht_get != NULL)
1516     {
1517       GNUNET_DHT_get_stop (pr->dht_get);
1518       pr->dht_get = NULL;
1519     }
1520   if (pr->client_request_list != NULL)
1521     {
1522       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1523                                    pr->client_request_list->client_list->rl_tail,
1524                                    pr->client_request_list);
1525       GNUNET_free (pr->client_request_list);
1526       pr->client_request_list = NULL;
1527     }
1528   if (pr->cp != NULL)
1529     {
1530       GNUNET_PEER_resolve (pr->cp->pid,
1531                            &pid);
1532       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1533                                                    &pid.hashPubKey,
1534                                                    pr);
1535       pr->cp = NULL;
1536     }
1537   if (pr->bf != NULL)
1538     {
1539       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1540       pr->bf = NULL;
1541     }
1542   if (pr->pirc != NULL)
1543     {
1544       GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc);
1545       pr->pirc->irc = NULL;
1546       pr->pirc = NULL;
1547     }
1548   if (pr->replies_seen != NULL)
1549     {
1550       GNUNET_free (pr->replies_seen);
1551       pr->replies_seen = NULL;
1552     }
1553   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1554     {
1555       GNUNET_SCHEDULER_cancel (pr->task);
1556       pr->task = GNUNET_SCHEDULER_NO_TASK;
1557     }
1558   while (NULL != pr->pending_head)    
1559     destroy_pending_message_list_entry (pr->pending_head);
1560   GNUNET_PEER_change_rc (pr->target_pid, -1);
1561   if (pr->used_targets != NULL)
1562     {
1563       for (i=0;i<pr->used_targets_off;i++)
1564         GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1565       GNUNET_free (pr->used_targets);
1566       pr->used_targets_off = 0;
1567       pr->used_targets_size = 0;
1568       pr->used_targets = NULL;
1569     }
1570   GNUNET_free (pr);
1571 }
1572
1573
1574 /**
1575  * Find latency information in 'atsi'.
1576  *
1577  * @param atsi performance data
1578  * @return connection latency
1579  */
1580 static struct GNUNET_TIME_Relative
1581 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1582 {
1583   /* FIXME: extract latency data from 'atsi' */
1584   return GNUNET_TIME_UNIT_SECONDS;
1585 }
1586
1587
1588 /**
1589  * Method called whenever a given peer connects.
1590  *
1591  * @param cls closure, not used
1592  * @param peer peer identity this notification is about
1593  * @param atsi performance information
1594  */
1595 static void 
1596 peer_connect_handler (void *cls,
1597                       const struct
1598                       GNUNET_PeerIdentity * peer,
1599                       const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1600 {
1601   struct ConnectedPeer *cp;
1602   struct MigrationReadyBlock *pos;
1603   char *fn;
1604   uint32_t trust;
1605   struct GNUNET_TIME_Relative latency;
1606
1607   latency = get_latency (atsi);
1608   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1609                                           &peer->hashPubKey);
1610   if (NULL != cp)
1611     {
1612       GNUNET_break (0);
1613       return;
1614     }
1615   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1616   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
1617   cp->pid = GNUNET_PEER_intern (peer);
1618
1619   fn = get_trust_filename (peer);
1620   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1621       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1622     cp->disk_trust = cp->trust = ntohl (trust);
1623   GNUNET_free (fn);
1624
1625   GNUNET_break (GNUNET_OK ==
1626                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1627                                                    &peer->hashPubKey,
1628                                                    cp,
1629                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1630
1631   pos = mig_head;
1632   while (NULL != pos)
1633     {
1634       (void) consider_migration (pos, &peer->hashPubKey, cp);
1635       pos = pos->next;
1636     }
1637 }
1638
1639
1640 /**
1641  * Method called whenever a given peer has a status change.
1642  *
1643  * @param cls closure
1644  * @param peer peer identity this notification is about
1645  * @param bandwidth_in available amount of inbound bandwidth
1646  * @param bandwidth_out available amount of outbound bandwidth
1647  * @param timeout absolute time when this peer will time out
1648  *        unless we see some further activity from it
1649  * @param atsi status information
1650  */
1651 static void
1652 peer_status_handler (void *cls,
1653                      const struct
1654                      GNUNET_PeerIdentity * peer,
1655                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1656                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1657                      struct GNUNET_TIME_Absolute timeout,
1658                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1659 {
1660   struct ConnectedPeer *cp;
1661   struct GNUNET_TIME_Relative latency;
1662
1663   latency = get_latency (atsi);
1664   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1665                                           &peer->hashPubKey);
1666   if (cp == NULL)
1667     {
1668       GNUNET_break (0);
1669       return;
1670     }
1671   GNUNET_LOAD_value_set_decline (cp->transmission_delay,
1672                                  latency);  
1673 }
1674
1675
1676
1677 /**
1678  * Increase the host credit by a value.
1679  *
1680  * @param host which peer to change the trust value on
1681  * @param value is the int value by which the
1682  *  host credit is to be increased or decreased
1683  * @returns the actual change in trust (positive or negative)
1684  */
1685 static int
1686 change_host_trust (struct ConnectedPeer *host, int value)
1687 {
1688   if (value == 0)
1689     return 0;
1690   GNUNET_assert (host != NULL);
1691   if (value > 0)
1692     {
1693       if (host->trust + value < host->trust)
1694         {
1695           value = UINT32_MAX - host->trust;
1696           host->trust = UINT32_MAX;
1697         }
1698       else
1699         host->trust += value;
1700     }
1701   else
1702     {
1703       if (host->trust < -value)
1704         {
1705           value = -host->trust;
1706           host->trust = 0;
1707         }
1708       else
1709         host->trust += value;
1710     }
1711   return value;
1712 }
1713
1714
1715 /**
1716  * Write host-trust information to a file - flush the buffer entry!
1717  */
1718 static int
1719 flush_trust (void *cls,
1720              const GNUNET_HashCode *key,
1721              void *value)
1722 {
1723   struct ConnectedPeer *host = value;
1724   char *fn;
1725   uint32_t trust;
1726   struct GNUNET_PeerIdentity pid;
1727
1728   if (host->trust == host->disk_trust)
1729     return GNUNET_OK;                     /* unchanged */
1730   GNUNET_PEER_resolve (host->pid,
1731                        &pid);
1732   fn = get_trust_filename (&pid);
1733   if (host->trust == 0)
1734     {
1735       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1736         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1737                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1738     }
1739   else
1740     {
1741       trust = htonl (host->trust);
1742       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1743                                                     sizeof(uint32_t),
1744                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1745                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1746         host->disk_trust = host->trust;
1747     }
1748   GNUNET_free (fn);
1749   return GNUNET_OK;
1750 }
1751
1752 /**
1753  * Call this method periodically to scan data/hosts for new hosts.
1754  */
1755 static void
1756 cron_flush_trust (void *cls,
1757                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1758 {
1759
1760   if (NULL == connected_peers)
1761     return;
1762   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1763                                          &flush_trust,
1764                                          NULL);
1765   if (NULL == tc)
1766     return;
1767   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1768     return;
1769   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1770 }
1771
1772
1773 /**
1774  * Free (each) request made by the peer.
1775  *
1776  * @param cls closure, points to peer that the request belongs to
1777  * @param key current key code
1778  * @param value value in the hash map
1779  * @return GNUNET_YES (we should continue to iterate)
1780  */
1781 static int
1782 destroy_request (void *cls,
1783                  const GNUNET_HashCode * key,
1784                  void *value)
1785 {
1786   const struct GNUNET_PeerIdentity * peer = cls;
1787   struct PendingRequest *pr = value;
1788   
1789   GNUNET_break (GNUNET_YES ==
1790                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1791                                                       &peer->hashPubKey,
1792                                                       pr));
1793   destroy_pending_request (pr);
1794   return GNUNET_YES;
1795 }
1796
1797
1798 /**
1799  * Method called whenever a peer disconnects.
1800  *
1801  * @param cls closure, not used
1802  * @param peer peer identity this notification is about
1803  */
1804 static void
1805 peer_disconnect_handler (void *cls,
1806                          const struct
1807                          GNUNET_PeerIdentity * peer)
1808 {
1809   struct ConnectedPeer *cp;
1810   struct PendingMessage *pm;
1811   unsigned int i;
1812   struct MigrationReadyBlock *pos;
1813   struct MigrationReadyBlock *next;
1814
1815   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1816                                               &peer->hashPubKey,
1817                                               &destroy_request,
1818                                               (void*) peer);
1819   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1820                                           &peer->hashPubKey);
1821   if (cp == NULL)
1822     return;
1823   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1824     {
1825       if (NULL != cp->last_client_replies[i])
1826         {
1827           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1828           cp->last_client_replies[i] = NULL;
1829         }
1830     }
1831   GNUNET_break (GNUNET_YES ==
1832                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1833                                                       &peer->hashPubKey,
1834                                                       cp));
1835   if (cp->irc != NULL)
1836     {
1837       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1838       cp->irc = NULL;
1839       cp->pr->pirc = NULL;
1840       cp->pr = NULL;
1841     }
1842
1843   /* remove this peer from migration considerations; schedule
1844      alternatives */
1845   next = mig_head;
1846   while (NULL != (pos = next))
1847     {
1848       next = pos->next;
1849       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1850         {
1851           if (pos->target_list[i] == cp->pid)
1852             {
1853               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1854               pos->target_list[i] = 0;
1855             }
1856          }
1857       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1858         {
1859           delete_migration_block (pos);
1860           consider_migration_gathering ();
1861           continue;
1862         }
1863       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1864                                              &consider_migration,
1865                                              pos);
1866     }
1867   GNUNET_PEER_change_rc (cp->pid, -1);
1868   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1869   if (NULL != cp->cth)
1870     {
1871       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1872       cp->cth = NULL;
1873     }
1874   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1875     {
1876       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1877       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1878     }
1879   while (NULL != (pm = cp->pending_messages_head))
1880     destroy_pending_message (pm, 0 /* delivery failed */);
1881   GNUNET_LOAD_value_free (cp->transmission_delay);
1882   GNUNET_break (0 == cp->pending_requests);
1883   GNUNET_free (cp);
1884 }
1885
1886
1887 /**
1888  * Iterator over hash map entries that removes all occurences
1889  * of the given 'client' from the 'last_client_replies' of the
1890  * given connected peer.
1891  *
1892  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1893  * @param key current key code (unused)
1894  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1895  * @return GNUNET_YES (we should continue to iterate)
1896  */
1897 static int
1898 remove_client_from_last_client_replies (void *cls,
1899                                         const GNUNET_HashCode * key,
1900                                         void *value)
1901 {
1902   struct GNUNET_SERVER_Client *client = cls;
1903   struct ConnectedPeer *cp = value;
1904   unsigned int i;
1905
1906   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1907     {
1908       if (cp->last_client_replies[i] == client)
1909         {
1910           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1911           cp->last_client_replies[i] = NULL;
1912         }
1913     }  
1914   return GNUNET_YES;
1915 }
1916
1917
1918 /**
1919  * A client disconnected.  Remove all of its pending queries.
1920  *
1921  * @param cls closure, NULL
1922  * @param client identification of the client
1923  */
1924 static void
1925 handle_client_disconnect (void *cls,
1926                           struct GNUNET_SERVER_Client
1927                           * client)
1928 {
1929   struct ClientList *pos;
1930   struct ClientList *prev;
1931   struct ClientRequestList *rcl;
1932   struct ClientResponseMessage *creply;
1933
1934   if (client == NULL)
1935     return;
1936   prev = NULL;
1937   pos = client_list;
1938   while ( (NULL != pos) &&
1939           (pos->client != client) )
1940     {
1941       prev = pos;
1942       pos = pos->next;
1943     }
1944   if (pos == NULL)
1945     return; /* no requests pending for this client */
1946   while (NULL != (rcl = pos->rl_head))
1947     {
1948       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1949                   "Destroying pending request `%s' on disconnect\n",
1950                   GNUNET_h2s (&rcl->req->query));
1951       destroy_pending_request (rcl->req);
1952     }
1953   if (prev == NULL)
1954     client_list = pos->next;
1955   else
1956     prev->next = pos->next;
1957   if (pos->th != NULL)
1958     {
1959       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1960       pos->th = NULL;
1961     }
1962   while (NULL != (creply = pos->res_head))
1963     {
1964       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1965                                    pos->res_tail,
1966                                    creply);
1967       GNUNET_free (creply);
1968     }    
1969   GNUNET_SERVER_client_drop (pos->client);
1970   GNUNET_free (pos);
1971   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1972                                          &remove_client_from_last_client_replies,
1973                                          client);
1974 }
1975
1976
1977 /**
1978  * Iterator to free peer entries.
1979  *
1980  * @param cls closure, unused
1981  * @param key current key code
1982  * @param value value in the hash map (peer entry)
1983  * @return GNUNET_YES (we should continue to iterate)
1984  */
1985 static int 
1986 clean_peer (void *cls,
1987             const GNUNET_HashCode * key,
1988             void *value)
1989 {
1990   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1991   return GNUNET_YES;
1992 }
1993
1994
1995 /**
1996  * Task run during shutdown.
1997  *
1998  * @param cls unused
1999  * @param tc unused
2000  */
2001 static void
2002 shutdown_task (void *cls,
2003                const struct GNUNET_SCHEDULER_TaskContext *tc)
2004 {
2005   if (mig_qe != NULL)
2006     {
2007       GNUNET_DATASTORE_cancel (mig_qe);
2008       mig_qe = NULL;
2009     }
2010   if (dht_qe != NULL)
2011     {
2012       GNUNET_DATASTORE_cancel (dht_qe);
2013       dht_qe = NULL;
2014     }
2015   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
2016     {
2017       GNUNET_SCHEDULER_cancel (mig_task);
2018       mig_task = GNUNET_SCHEDULER_NO_TASK;
2019     }
2020   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
2021     {
2022       GNUNET_SCHEDULER_cancel (dht_task);
2023       dht_task = GNUNET_SCHEDULER_NO_TASK;
2024     }
2025   while (client_list != NULL)
2026     handle_client_disconnect (NULL,
2027                               client_list->client);
2028   cron_flush_trust (NULL, NULL);
2029   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2030                                          &clean_peer,
2031                                          NULL);
2032   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
2033   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
2034   requests_by_expiration_heap = 0;
2035   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2036   connected_peers = NULL;
2037   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
2038   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2039   query_request_map = NULL;
2040   GNUNET_LOAD_value_free (rt_entry_lifetime);
2041   rt_entry_lifetime = NULL;
2042   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
2043   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2044   peer_request_map = NULL;
2045   GNUNET_assert (NULL != core);
2046   GNUNET_CORE_disconnect (core);
2047   core = NULL;
2048   if (stats != NULL)
2049     {
2050       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
2051       stats = NULL;
2052     }
2053   if (dsh != NULL)
2054     {
2055       GNUNET_DATASTORE_disconnect (dsh,
2056                                    GNUNET_NO);
2057       dsh = NULL;
2058     }
2059   while (mig_head != NULL)
2060     delete_migration_block (mig_head);
2061   GNUNET_assert (0 == mig_size);
2062   GNUNET_DHT_disconnect (dht_handle);
2063   dht_handle = NULL;
2064   GNUNET_LOAD_value_free (datastore_get_load);
2065   datastore_get_load = NULL;
2066   GNUNET_LOAD_value_free (datastore_put_load);
2067   datastore_put_load = NULL;
2068   GNUNET_BLOCK_context_destroy (block_ctx);
2069   block_ctx = NULL;
2070   GNUNET_CONFIGURATION_destroy (block_cfg);
2071   block_cfg = NULL;
2072   cfg = NULL;  
2073   GNUNET_free_non_null (trustDirectory);
2074   trustDirectory = NULL;
2075 }
2076
2077
2078 /* ******************* Utility functions  ******************** */
2079
2080
2081 /**
2082  * We've had to delay a request for transmission to core, but now
2083  * we should be ready.  Run it.
2084  *
2085  * @param cls the 'struct ConnectedPeer' for which a request was delayed
2086  * @param tc task context (unused)
2087  */
2088 static void
2089 delayed_transmission_request (void *cls,
2090                               const struct GNUNET_SCHEDULER_TaskContext *tc)
2091 {
2092   struct ConnectedPeer *cp = cls;
2093   struct GNUNET_PeerIdentity pid;
2094   struct PendingMessage *pm;
2095
2096   pm = cp->pending_messages_head;
2097   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2098   GNUNET_assert (cp->cth == NULL);
2099   if (pm == NULL)
2100     return;
2101   GNUNET_PEER_resolve (cp->pid,
2102                        &pid);
2103   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2104   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2105                                                pm->priority,
2106                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2107                                                &pid,
2108                                                pm->msize,
2109                                                &transmit_to_peer,
2110                                                cp);
2111 }
2112
2113
2114 /**
2115  * Transmit messages by copying it to the target buffer
2116  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2117  * for writing in the meantime.  In that case, do nothing
2118  * (the disconnect or shutdown handler will take care of the rest).
2119  * If we were able to transmit messages and there are still more
2120  * pending, ask core again for further calls to this function.
2121  *
2122  * @param cls closure, pointer to the 'struct ConnectedPeer*'
2123  * @param size number of bytes available in buf
2124  * @param buf where the callee should write the message
2125  * @return number of bytes written to buf
2126  */
2127 static size_t
2128 transmit_to_peer (void *cls,
2129                   size_t size, void *buf)
2130 {
2131   struct ConnectedPeer *cp = cls;
2132   char *cbuf = buf;
2133   struct PendingMessage *pm;
2134   struct PendingMessage *next_pm;
2135   struct GNUNET_TIME_Absolute now;
2136   struct GNUNET_TIME_Relative min_delay;
2137   struct MigrationReadyBlock *mb;
2138   struct MigrationReadyBlock *next;
2139   struct PutMessage migm;
2140   size_t msize;
2141   unsigned int i;
2142   struct GNUNET_PeerIdentity pid;
2143  
2144   cp->cth = NULL;
2145   if (NULL == buf)
2146     {
2147 #if DEBUG_FS
2148       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2149                   "Dropping message, core too busy.\n");
2150 #endif
2151       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2152                   "Dropping message, core too busy.\n");
2153       GNUNET_LOAD_update (cp->transmission_delay,
2154                           UINT64_MAX);
2155       
2156       if (NULL != (pm = cp->pending_messages_head))
2157         {
2158           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2159                                        cp->pending_messages_tail,
2160                                        pm);
2161           cp->pending_requests--;    
2162           destroy_pending_message (pm, 0);
2163         }
2164       if (NULL != (pm = cp->pending_messages_head))
2165         {
2166           GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2167           min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until);
2168           cp->delayed_transmission_request_task
2169             = GNUNET_SCHEDULER_add_delayed (min_delay,
2170                                             &delayed_transmission_request,
2171                                             cp);
2172         }
2173       return 0;
2174     }  
2175   GNUNET_LOAD_update (cp->transmission_delay,
2176                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).rel_value);
2177   now = GNUNET_TIME_absolute_get ();
2178   msize = 0;
2179   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2180   next_pm = cp->pending_messages_head;
2181   while ( (NULL != (pm = next_pm) ) &&
2182           (pm->msize <= size) )
2183     {
2184       next_pm = pm->next;
2185       if (pm->delay_until.abs_value > now.abs_value)
2186         {
2187           min_delay = GNUNET_TIME_relative_min (min_delay,
2188                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2189           continue;
2190         }
2191       memcpy (&cbuf[msize], &pm[1], pm->msize);
2192       msize += pm->msize;
2193       size -= pm->msize;
2194       if (NULL == pm->pml)
2195         {
2196           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2197                                        cp->pending_messages_tail,
2198                                        pm);
2199           cp->pending_requests--;
2200         }
2201       destroy_pending_message (pm, cp->pid);
2202     }
2203   if (pm != NULL)
2204     min_delay = GNUNET_TIME_UNIT_ZERO;
2205   if (NULL != cp->pending_messages_head)
2206     {     
2207       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2208       cp->delayed_transmission_request_task
2209         = GNUNET_SCHEDULER_add_delayed (min_delay,
2210                                         &delayed_transmission_request,
2211                                         cp);
2212     }
2213   if (pm == NULL)
2214     {      
2215       GNUNET_PEER_resolve (cp->pid,
2216                            &pid);
2217       next = mig_head;
2218       while (NULL != (mb = next))
2219         {
2220           next = mb->next;
2221           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2222             {
2223               if ( (cp->pid == mb->target_list[i]) &&
2224                    (mb->size + sizeof (migm) <= size) )
2225                 {
2226                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2227                   mb->target_list[i] = 0;
2228                   mb->used_targets++;
2229                   memset (&migm, 0, sizeof (migm));
2230                   migm.header.size = htons (sizeof (migm) + mb->size);
2231                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2232                   migm.type = htonl (mb->type);
2233                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2234                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2235                   msize += sizeof (migm);
2236                   size -= sizeof (migm);
2237                   memcpy (&cbuf[msize], &mb[1], mb->size);
2238                   msize += mb->size;
2239                   size -= mb->size;
2240 #if DEBUG_FS
2241                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2242                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2243                               GNUNET_h2s (&mb->query),
2244                               (unsigned int) mb->size,
2245                               GNUNET_i2s (&pid));
2246 #endif    
2247                   break;
2248                 }
2249               else
2250                 {
2251 #if DEBUG_FS
2252                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2253                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2254                               GNUNET_h2s (&mb->query),
2255                               (unsigned int) mb->size,
2256                               GNUNET_i2s (&pid));
2257 #endif    
2258                 }
2259             }
2260           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2261                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2262             {
2263               delete_migration_block (mb);
2264               consider_migration_gathering ();
2265             }
2266         }
2267       consider_migration (NULL, 
2268                           &pid.hashPubKey,
2269                           cp);
2270     }
2271 #if DEBUG_FS
2272   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2273               "Transmitting %u bytes to peer with PID %u\n",
2274               (unsigned int) msize,
2275               (unsigned int) cp->pid);
2276 #endif
2277   return msize;
2278 }
2279
2280
2281 /**
2282  * Add a message to the set of pending messages for the given peer.
2283  *
2284  * @param cp peer to send message to
2285  * @param pm message to queue
2286  * @param pr request on which behalf this message is being queued
2287  */
2288 static void
2289 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2290                                   struct PendingMessage *pm,
2291                                   struct PendingRequest *pr)
2292 {
2293   struct PendingMessage *pos;
2294   struct PendingMessageList *pml;
2295   struct GNUNET_PeerIdentity pid;
2296
2297   GNUNET_assert (pm->next == NULL);
2298   GNUNET_assert (pm->pml == NULL);    
2299   if (pr != NULL)
2300     {
2301       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2302       pml->req = pr;
2303       pml->target = cp;
2304       pml->pm = pm;
2305       pm->pml = pml;  
2306       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2307                                    pr->pending_tail,
2308                                    pml);
2309     }
2310   pos = cp->pending_messages_head;
2311   while ( (pos != NULL) &&
2312           (pm->priority < pos->priority) )
2313     pos = pos->next;    
2314   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2315                                      cp->pending_messages_tail,
2316                                      pos,
2317                                      pm);
2318   cp->pending_requests++;
2319   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2320     {
2321       GNUNET_STATISTICS_update (stats,
2322                                 gettext_noop ("# P2P searches discarded (queue length bound)"),
2323                                 1,
2324                                 GNUNET_NO);
2325       destroy_pending_message (cp->pending_messages_tail, 0);  
2326     }
2327   GNUNET_PEER_resolve (cp->pid, &pid);
2328   if (NULL != cp->cth)
2329     {
2330       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2331       cp->cth = NULL;
2332     }
2333   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2334     {
2335       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
2336       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2337     }
2338   /* need to schedule transmission */
2339   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2340   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2341                                                cp->pending_messages_head->priority,
2342                                                MAX_TRANSMIT_DELAY,
2343                                                &pid,
2344                                                cp->pending_messages_head->msize,
2345                                                &transmit_to_peer,
2346                                                cp);
2347   if (cp->cth == NULL)
2348     {
2349 #if DEBUG_FS
2350       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2351                   "Failed to schedule transmission with core!\n");
2352 #endif
2353       GNUNET_STATISTICS_update (stats,
2354                                 gettext_noop ("# CORE transmission failures"),
2355                                 1,
2356                                 GNUNET_NO);
2357     }
2358 }
2359
2360
2361 /**
2362  * Test if the DATABASE (GET) load on this peer is too high
2363  * to even consider processing the query at
2364  * all.  
2365  * 
2366  * @return GNUNET_YES if the load is too high to do anything (load high)
2367  *         GNUNET_NO to process normally (load normal)
2368  *         GNUNET_SYSERR to process for free (load low)
2369  */
2370 static int
2371 test_get_load_too_high (uint32_t priority)
2372 {
2373   double ld;
2374
2375   ld = GNUNET_LOAD_get_load (datastore_get_load);
2376   if (ld < 1)
2377     return GNUNET_SYSERR;    
2378   if (ld <= priority)    
2379     return GNUNET_NO;    
2380   return GNUNET_YES;
2381 }
2382
2383
2384
2385
2386 /**
2387  * Test if the DATABASE (PUT) load on this peer is too high
2388  * to even consider processing the query at
2389  * all.  
2390  * 
2391  * @return GNUNET_YES if the load is too high to do anything (load high)
2392  *         GNUNET_NO to process normally (load normal or low)
2393  */
2394 static int
2395 test_put_load_too_high (uint32_t priority)
2396 {
2397   double ld;
2398
2399   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2400     return GNUNET_NO; /* very fast */
2401   ld = GNUNET_LOAD_get_load (datastore_put_load);
2402   if (ld < 2.0 * (1 + priority))
2403     return GNUNET_NO;
2404   GNUNET_STATISTICS_update (stats,
2405                             gettext_noop ("# storage requests dropped due to high load"),
2406                             1,
2407                             GNUNET_NO);
2408   return GNUNET_YES;
2409 }
2410
2411
2412 /* ******************* Pending Request Refresh Task ******************** */
2413
2414
2415
2416 /**
2417  * We use a random delay to make the timing of requests less
2418  * predictable.  This function returns such a random delay.  We add a base
2419  * delay of MAX_CORK_DELAY (1s).
2420  *
2421  * FIXME: make schedule dependent on the specifics of the request?
2422  * Or bandwidth and number of connected peers and load?
2423  *
2424  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2425  */
2426 static struct GNUNET_TIME_Relative
2427 get_processing_delay ()
2428 {
2429   return 
2430     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2431                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2432                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2433                                                                                        TTL_DECREMENT)));
2434 }
2435
2436
2437 /**
2438  * We're processing a GET request from another peer and have decided
2439  * to forward it to other peers.  This function is called periodically
2440  * and should forward the request to other peers until we have all
2441  * possible replies.  If we have transmitted the *only* reply to
2442  * the initiator we should destroy the pending request.  If we have
2443  * many replies in the queue to the initiator, we should delay sending
2444  * out more queries until the reply queue has shrunk some.
2445  *
2446  * @param cls our "struct ProcessGetContext *"
2447  * @param tc unused
2448  */
2449 static void
2450 forward_request_task (void *cls,
2451                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2452
2453
2454 /**
2455  * Function called after we either failed or succeeded
2456  * at transmitting a query to a peer.  
2457  *
2458  * @param cls the requests "struct PendingRequest*"
2459  * @param tpid ID of receiving peer, 0 on transmission error
2460  */
2461 static void
2462 transmit_query_continuation (void *cls,
2463                              GNUNET_PEER_Id tpid)
2464 {
2465   struct PendingRequest *pr = cls;
2466   unsigned int i;
2467
2468   GNUNET_STATISTICS_update (stats,
2469                             gettext_noop ("# queries scheduled for forwarding"),
2470                             -1,
2471                             GNUNET_NO);
2472   if (tpid == 0)   
2473     {
2474 #if DEBUG_FS
2475       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2476                   "Transmission of request failed, will try again later.\n");
2477 #endif
2478       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2479         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2480                                                  &forward_request_task,
2481                                                  pr); 
2482       return;    
2483     }
2484 #if DEBUG_FS
2485   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2486               "Transmitted query `%s'\n",
2487               GNUNET_h2s (&pr->query));
2488 #endif
2489   GNUNET_STATISTICS_update (stats,
2490                             gettext_noop ("# queries forwarded"),
2491                             1,
2492                             GNUNET_NO);
2493   for (i=0;i<pr->used_targets_off;i++)
2494     if (pr->used_targets[i].pid == tpid)
2495       break; /* found match! */    
2496   if (i == pr->used_targets_off)
2497     {
2498       /* need to create new entry */
2499       if (pr->used_targets_off == pr->used_targets_size)
2500         GNUNET_array_grow (pr->used_targets,
2501                            pr->used_targets_size,
2502                            pr->used_targets_size * 2 + 2);
2503       GNUNET_PEER_change_rc (tpid, 1);
2504       pr->used_targets[pr->used_targets_off].pid = tpid;
2505       pr->used_targets[pr->used_targets_off].num_requests = 0;
2506       i = pr->used_targets_off++;
2507     }
2508   pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2509   pr->used_targets[i].num_requests++;
2510   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2511     pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2512                                              &forward_request_task,
2513                                              pr);
2514 }
2515
2516
2517 /**
2518  * How many bytes should a bloomfilter be if we have already seen
2519  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2520  * of bits set per entry.  Furthermore, we should not re-size the
2521  * filter too often (to keep it cheap).
2522  *
2523  * Since other peers will also add entries but not resize the filter,
2524  * we should generally pick a slightly larger size than what the
2525  * strict math would suggest.
2526  *
2527  * @return must be a power of two and smaller or equal to 2^15.
2528  */
2529 static size_t
2530 compute_bloomfilter_size (unsigned int entry_count)
2531 {
2532   size_t size;
2533   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2534   uint16_t max = 1 << 15;
2535
2536   if (entry_count > max)
2537     return max;
2538   size = 8;
2539   while ((size < max) && (size < ideal))
2540     size *= 2;
2541   if (size > max)
2542     return max;
2543   return size;
2544 }
2545
2546
2547 /**
2548  * Recalculate our bloom filter for filtering replies.  This function
2549  * will create a new bloom filter from scratch, so it should only be
2550  * called if we have no bloomfilter at all (and hence can create a
2551  * fresh one of minimal size without problems) OR if our peer is the
2552  * initiator (in which case we may resize to larger than mimimum size).
2553  *
2554  * @param pr request for which the BF is to be recomputed
2555  */
2556 static void
2557 refresh_bloomfilter (struct PendingRequest *pr)
2558 {
2559   unsigned int i;
2560   size_t nsize;
2561   GNUNET_HashCode mhash;
2562
2563   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2564   if (nsize == pr->bf_size)
2565     return; /* size not changed */
2566   if (pr->bf != NULL)
2567     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2568   pr->bf_size = nsize;
2569   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2570   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2571                                               pr->bf_size,
2572                                               BLOOMFILTER_K);
2573   for (i=0;i<pr->replies_seen_off;i++)
2574     {
2575       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2576                                 pr->mingle,
2577                                 &mhash);
2578       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2579     }
2580 }
2581
2582
2583 /**
2584  * Function called after we've tried to reserve a certain amount of
2585  * bandwidth for a reply.  Check if we succeeded and if so send our
2586  * query.
2587  *
2588  * @param cls the requests "struct PendingRequest*"
2589  * @param peer identifies the peer
2590  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2591  * @param amount set to the amount that was actually reserved or unreserved
2592  * @param preference current traffic preference for the given peer
2593  */
2594 static void
2595 target_reservation_cb (void *cls,
2596                        const struct
2597                        GNUNET_PeerIdentity * peer,
2598                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2599                        int amount,
2600                        uint64_t preference)
2601 {
2602   struct PendingRequest *pr = cls;
2603   struct ConnectedPeer *cp;
2604   struct PendingMessage *pm;
2605   struct GetMessage *gm;
2606   GNUNET_HashCode *ext;
2607   char *bfdata;
2608   size_t msize;
2609   unsigned int k;
2610   int no_route;
2611   uint32_t bm;
2612   unsigned int i;
2613
2614   /* (3) transmit, update ttl/priority */
2615   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2616                                           &peer->hashPubKey);
2617   if (cp == NULL)
2618     {
2619       /* Peer must have just left */
2620 #if DEBUG_FS
2621       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2622                   "Selected peer disconnected!\n");
2623 #endif
2624       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2625         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2626                                                  &forward_request_task,
2627                                                  pr);
2628       return;
2629     }
2630   cp->irc = NULL;
2631   pr->pirc = NULL;
2632   if (peer == NULL)
2633     {
2634       /* error in communication with core, try again later */
2635       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2636         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2637                                                  &forward_request_task,
2638                                                  pr);
2639       return;
2640     }
2641   no_route = GNUNET_NO;
2642   if (amount == 0)
2643     {
2644       if (pr->cp == NULL)
2645         {
2646 #if DEBUG_FS > 1
2647           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2648                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2649                       amount,
2650                       DBLOCK_SIZE);
2651 #endif
2652           GNUNET_STATISTICS_update (stats,
2653                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2654                                     1,
2655                                     GNUNET_NO);
2656           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2657             pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2658                                                      &forward_request_task,
2659                                                      pr);
2660           return;  /* this target round failed */
2661         }
2662       no_route = GNUNET_YES;
2663     }
2664   
2665   GNUNET_STATISTICS_update (stats,
2666                             gettext_noop ("# queries scheduled for forwarding"),
2667                             1,
2668                             GNUNET_NO);
2669   for (i=0;i<pr->used_targets_off;i++)
2670     if (pr->used_targets[i].pid == cp->pid) 
2671       {
2672         GNUNET_STATISTICS_update (stats,
2673                                   gettext_noop ("# queries retransmitted to same target"),
2674                                   1,
2675                                   GNUNET_NO);
2676         break;
2677       } 
2678
2679   /* build message and insert message into priority queue */
2680 #if DEBUG_FS
2681   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2682               "Forwarding request `%s' to `%4s'!\n",
2683               GNUNET_h2s (&pr->query),
2684               GNUNET_i2s (peer));
2685 #endif
2686   k = 0;
2687   bm = 0;
2688   if (GNUNET_YES == no_route)
2689     {
2690       bm |= GET_MESSAGE_BIT_RETURN_TO;
2691       k++;      
2692     }
2693   if (pr->namespace != NULL)
2694     {
2695       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2696       k++;
2697     }
2698   if (pr->target_pid != 0)
2699     {
2700       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2701       k++;
2702     }
2703   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2704   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2705   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2706   pm->msize = msize;
2707   gm = (struct GetMessage*) &pm[1];
2708   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2709   gm->header.size = htons (msize);
2710   gm->type = htonl (pr->type);
2711   pr->remaining_priority /= 2;
2712   gm->priority = htonl (pr->remaining_priority);
2713   gm->ttl = htonl (pr->ttl);
2714   gm->filter_mutator = htonl(pr->mingle); 
2715   gm->hash_bitmap = htonl (bm);
2716   gm->query = pr->query;
2717   ext = (GNUNET_HashCode*) &gm[1];
2718   k = 0;
2719   if (GNUNET_YES == no_route)
2720     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2721   if (pr->namespace != NULL)
2722     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2723   if (pr->target_pid != 0)
2724     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2725   bfdata = (char *) &ext[k];
2726   if (pr->bf != NULL)
2727     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2728                                                bfdata,
2729                                                pr->bf_size);
2730   pm->cont = &transmit_query_continuation;
2731   pm->cont_cls = pr;
2732   cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2733   add_to_pending_messages_for_peer (cp, pm, pr);
2734 }
2735
2736
2737 /**
2738  * Closure used for "target_peer_select_cb".
2739  */
2740 struct PeerSelectionContext 
2741 {
2742   /**
2743    * The request for which we are selecting
2744    * peers.
2745    */
2746   struct PendingRequest *pr;
2747
2748   /**
2749    * Current "prime" target.
2750    */
2751   struct GNUNET_PeerIdentity target;
2752
2753   /**
2754    * How much do we like this target?
2755    */
2756   double target_score;
2757
2758   /**
2759    * Does it make sense to we re-try quickly again?
2760    */
2761   int fast_retry;
2762
2763 };
2764
2765
2766 /**
2767  * Function called for each connected peer to determine
2768  * which one(s) would make good targets for forwarding.
2769  *
2770  * @param cls closure (struct PeerSelectionContext)
2771  * @param key current key code (peer identity)
2772  * @param value value in the hash map (struct ConnectedPeer)
2773  * @return GNUNET_YES if we should continue to
2774  *         iterate,
2775  *         GNUNET_NO if not.
2776  */
2777 static int
2778 target_peer_select_cb (void *cls,
2779                        const GNUNET_HashCode * key,
2780                        void *value)
2781 {
2782   struct PeerSelectionContext *psc = cls;
2783   struct ConnectedPeer *cp = value;
2784   struct PendingRequest *pr = psc->pr;
2785   struct GNUNET_TIME_Relative delay;
2786   double score;
2787   unsigned int i;
2788   unsigned int pc;
2789
2790   /* 1) check that this peer is not the initiator */
2791   if (cp == pr->cp)     
2792     {
2793 #if DEBUG_FS
2794       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2795                   "Skipping initiator in forwarding selection\n");
2796 #endif
2797       return GNUNET_YES; /* skip */        
2798     }
2799   if (cp->irc != NULL)
2800     {
2801       psc->fast_retry = GNUNET_YES;
2802       return GNUNET_YES; /* skip: already querying core about this peer for other reasons */
2803     }
2804
2805   /* 2) check if we have already (recently) forwarded to this peer */
2806   /* 2a) this particular request */
2807   pc = 0;
2808   for (i=0;i<pr->used_targets_off;i++)
2809     if (pr->used_targets[i].pid == cp->pid) 
2810       {
2811         pc = pr->used_targets[i].num_requests;
2812         GNUNET_assert (pc > 0);
2813         /* FIXME: make re-enabling a peer independent of how often
2814            this function is called??? */
2815         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2816                                            RETRY_PROBABILITY_INV * pc))
2817           {
2818 #if DEBUG_FS
2819             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2820                         "NOT re-trying query that was previously transmitted %u times\n",
2821                         (unsigned int) pc);
2822 #endif
2823             return GNUNET_YES; /* skip */
2824           }
2825         break;
2826       }
2827 #if DEBUG_FS
2828   if (0 < pc)
2829     {
2830       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2831                   "Re-trying query that was previously transmitted %u times to this peer\n",
2832                   (unsigned int) pc);
2833     }
2834 #endif
2835   /* 2b) many other requests to this peer */
2836   delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2837   if (delay.rel_value <= cp->avg_delay.rel_value)
2838     {
2839 #if DEBUG_FS
2840       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2841                   "NOT sending query since we send %u others to this peer in the last %llums\n",
2842                   MAX_QUEUE_PER_PEER,
2843                   cp->avg_delay.rel_value);
2844 #endif
2845       return GNUNET_YES; /* skip */      
2846     }
2847
2848   /* 3) calculate how much we'd like to forward to this peer,
2849      starting with a random value that is strong enough
2850      to at least give any peer a chance sometimes 
2851      (compared to the other factors that come later) */
2852   /* 3a) count successful (recent) routes from cp for same source */
2853   if (pr->cp != NULL)
2854     {
2855       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2856                                         P2P_SUCCESS_LIST_SIZE);
2857       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2858         if (cp->last_p2p_replies[i] == pr->cp->pid)
2859           score += 1.0; /* likely successful based on hot path */
2860     }
2861   else
2862     {
2863       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2864                                         CS2P_SUCCESS_LIST_SIZE);
2865       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2866         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2867           score += 1.0; /* likely successful based on hot path */
2868     }
2869   /* 3b) include latency */
2870   if (cp->avg_delay.rel_value < 4 * TTL_DECREMENT)
2871     score += 1.0; /* likely fast based on latency */
2872   /* 3c) include priorities */
2873   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2874     score += 1.0; /* likely successful based on priorities */
2875   /* 3d) penalize for queue size */  
2876   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2877   /* 3e) include peer proximity */
2878   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2879                                                     &pr->query)) / (double) UINT32_MAX);
2880   /* 4) super-bonus for being the known target */
2881   if (pr->target_pid == cp->pid)
2882     score += 100.0;
2883   /* store best-fit in closure */
2884   score++; /* avoid zero */
2885   if (score > psc->target_score)
2886     {
2887       psc->target_score = score;
2888       psc->target.hashPubKey = *key; 
2889     }
2890 #if DEBUG_FS
2891   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2892               "Peer `%s' gets score %f for forwarding query, max is %8f\n",
2893               GNUNET_h2s (key),
2894               score,
2895               psc->target_score);
2896 #endif
2897   return GNUNET_YES;
2898 }
2899   
2900
2901 /**
2902  * The priority level imposes a bound on the maximum
2903  * value for the ttl that can be requested.
2904  *
2905  * @param ttl_in requested ttl
2906  * @param prio given priority
2907  * @return ttl_in if ttl_in is below the limit,
2908  *         otherwise the ttl-limit for the given priority
2909  */
2910 static int32_t
2911 bound_ttl (int32_t ttl_in, uint32_t prio)
2912 {
2913   unsigned long long allowed;
2914
2915   if (ttl_in <= 0)
2916     return ttl_in;
2917   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2918   if (ttl_in > allowed)      
2919     {
2920       if (allowed >= (1 << 30))
2921         return 1 << 30;
2922       return allowed;
2923     }
2924   return ttl_in;
2925 }
2926
2927
2928 /**
2929  * Iterator called on each result obtained for a DHT
2930  * operation that expects a reply
2931  *
2932  * @param cls closure
2933  * @param exp when will this value expire
2934  * @param key key of the result
2935  * @param get_path NULL-terminated array of pointers
2936  *                 to the peers on reverse GET path (or NULL if not recorded)
2937  * @param put_path NULL-terminated array of pointers
2938  *                 to the peers on the PUT path (or NULL if not recorded)
2939  * @param type type of the result
2940  * @param size number of bytes in data
2941  * @param data pointer to the result data
2942  */
2943 static void
2944 process_dht_reply (void *cls,
2945                    struct GNUNET_TIME_Absolute exp,
2946                    const GNUNET_HashCode * key,
2947                    const struct GNUNET_PeerIdentity * const *get_path,
2948                    const struct GNUNET_PeerIdentity * const *put_path,
2949                    enum GNUNET_BLOCK_Type type,
2950                    size_t size,
2951                    const void *data);
2952
2953
2954 /**
2955  * We're processing a GET request and have decided
2956  * to forward it to other peers.  This function is called periodically
2957  * and should forward the request to other peers until we have all
2958  * possible replies.  If we have transmitted the *only* reply to
2959  * the initiator we should destroy the pending request.  If we have
2960  * many replies in the queue to the initiator, we should delay sending
2961  * out more queries until the reply queue has shrunk some.
2962  *
2963  * @param cls our "struct ProcessGetContext *"
2964  * @param tc unused
2965  */
2966 static void
2967 forward_request_task (void *cls,
2968                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2969 {
2970   struct PendingRequest *pr = cls;
2971   struct PeerSelectionContext psc;
2972   struct ConnectedPeer *cp; 
2973   struct GNUNET_TIME_Relative delay;
2974
2975   pr->task = GNUNET_SCHEDULER_NO_TASK;
2976   if (pr->pirc != NULL)
2977     {
2978 #if DEBUG_FS
2979       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2980                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2981                   GNUNET_h2s (&pr->query));
2982 #endif
2983       return; /* already pending */
2984     }
2985   if (GNUNET_YES == pr->local_only)
2986     return; /* configured to not do P2P search */
2987   /* (0) try DHT */
2988   if ( (0 == pr->anonymity_level) &&
2989        (GNUNET_YES != pr->forward_only) &&
2990        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2991        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2992     {
2993       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2994                                           GNUNET_TIME_UNIT_FOREVER_REL,
2995                                           pr->type,
2996                                           &pr->query,
2997                                           DEFAULT_GET_REPLICATION,
2998                                           GNUNET_DHT_RO_NONE,
2999                                           pr->bf,
3000                                           pr->mingle,
3001                                           pr->namespace,
3002                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3003                                           &process_dht_reply,
3004                                           pr);
3005     }
3006   /* (1) select target */
3007   psc.pr = pr;
3008   psc.target_score = -DBL_MAX;
3009   psc.fast_retry = GNUNET_NO;
3010   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
3011                                          &target_peer_select_cb,
3012                                          &psc);  
3013   if (psc.target_score == -DBL_MAX)
3014     {
3015       if (psc.fast_retry == GNUNET_YES)
3016         delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */
3017       else
3018         delay = get_processing_delay ();
3019 #if DEBUG_FS 
3020       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3021                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
3022                   GNUNET_h2s (&pr->query),
3023                   delay.rel_value);
3024 #endif
3025       pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3026                                                &forward_request_task,
3027                                                pr);
3028       return; /* nobody selected */
3029     }
3030   /* (3) update TTL/priority */
3031   if (pr->client_request_list != NULL)
3032     {
3033       /* FIXME: use better algorithm!? */
3034       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3035                                          4))
3036         pr->priority++;
3037       /* bound priority we use by priorities we see from other peers
3038          rounded up (must round up so that we can see non-zero
3039          priorities, but round up as little as possible to make it
3040          plausible that we forwarded another peers request) */
3041       if (pr->priority > current_priorities + 1.0)
3042         pr->priority = (uint32_t) current_priorities + 1.0;
3043       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
3044                            pr->priority);
3045 #if DEBUG_FS
3046       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3047                   "Trying query `%s' with priority %u and TTL %d.\n",
3048                   GNUNET_h2s (&pr->query),
3049                   pr->priority,
3050                   pr->ttl);
3051 #endif
3052     }
3053
3054   /* (3) reserve reply bandwidth */
3055   if (GNUNET_NO == pr->forward_only)
3056     {
3057       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3058                                               &psc.target.hashPubKey);
3059       GNUNET_assert (NULL != cp);
3060       GNUNET_assert (cp->irc == NULL);
3061       pr->pirc = cp;
3062       cp->pr = pr;
3063       cp->irc = GNUNET_CORE_peer_change_preference (core,
3064                                                     &psc.target,
3065                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
3066                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
3067                                                     DBLOCK_SIZE * 2, 
3068                                                     cp->inc_preference,
3069                                                     &target_reservation_cb,
3070                                                     pr);
3071       GNUNET_assert (cp->irc != NULL);
3072       cp->inc_preference = 0;
3073     }
3074   else
3075     {
3076       /* force forwarding */
3077       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
3078       target_reservation_cb (pr, &psc.target,
3079                              zerobw, 0, 0.0);
3080     }
3081 }
3082
3083
3084 /* **************************** P2P PUT Handling ************************ */
3085
3086
3087 /**
3088  * Function called after we either failed or succeeded
3089  * at transmitting a reply to a peer.  
3090  *
3091  * @param cls the requests "struct PendingRequest*"
3092  * @param tpid ID of receiving peer, 0 on transmission error
3093  */
3094 static void
3095 transmit_reply_continuation (void *cls,
3096                              GNUNET_PEER_Id tpid)
3097 {
3098   struct PendingRequest *pr = cls;
3099   
3100   switch (pr->type)
3101     {
3102     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3103     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3104       /* only one reply expected, done with the request! */
3105       destroy_pending_request (pr);
3106       break;
3107     case GNUNET_BLOCK_TYPE_ANY:
3108     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
3109     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3110       break;
3111     default:
3112       GNUNET_break (0);
3113       break;
3114     }
3115 }
3116
3117
3118 /**
3119  * Transmit the given message by copying it to the target buffer
3120  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
3121  * for writing in the meantime.  In that case, do nothing
3122  * (the disconnect or shutdown handler will take care of the rest).
3123  * If we were able to transmit messages and there are still more
3124  * pending, ask core again for further calls to this function.
3125  *
3126  * @param cls closure, pointer to the 'struct ClientList*'
3127  * @param size number of bytes available in buf
3128  * @param buf where the callee should write the message
3129  * @return number of bytes written to buf
3130  */
3131 static size_t
3132 transmit_to_client (void *cls,
3133                   size_t size, void *buf)
3134 {
3135   struct ClientList *cl = cls;
3136   char *cbuf = buf;
3137   struct ClientResponseMessage *creply;
3138   size_t msize;
3139   
3140   cl->th = NULL;
3141   if (NULL == buf)
3142     {
3143 #if DEBUG_FS
3144       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3145                   "Not sending reply, client communication problem.\n");
3146 #endif
3147       return 0;
3148     }
3149   msize = 0;
3150   while ( (NULL != (creply = cl->res_head) ) &&
3151           (creply->msize <= size) )
3152     {
3153       memcpy (&cbuf[msize], &creply[1], creply->msize);
3154       msize += creply->msize;
3155       size -= creply->msize;
3156       GNUNET_CONTAINER_DLL_remove (cl->res_head,
3157                                    cl->res_tail,
3158                                    creply);
3159       GNUNET_free (creply);
3160     }
3161   if (NULL != creply)
3162     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3163                                                   creply->msize,
3164                                                   GNUNET_TIME_UNIT_FOREVER_REL,
3165                                                   &transmit_to_client,
3166                                                   cl);
3167 #if DEBUG_FS
3168   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3169               "Transmitted %u bytes to client\n",
3170               (unsigned int) msize);
3171 #endif
3172   return msize;
3173 }
3174
3175
3176 /**
3177  * Closure for "process_reply" function.
3178  */
3179 struct ProcessReplyClosure
3180 {
3181   /**
3182    * The data for the reply.
3183    */
3184   const void *data;
3185
3186   /**
3187    * Who gave us this reply? NULL for local host (or DHT)
3188    */
3189   struct ConnectedPeer *sender;
3190
3191   /**
3192    * When the reply expires.
3193    */
3194   struct GNUNET_TIME_Absolute expiration;
3195
3196   /**
3197    * Size of data.
3198    */
3199   size_t size;
3200
3201   /**
3202    * Type of the block.
3203    */
3204   enum GNUNET_BLOCK_Type type;
3205
3206   /**
3207    * How much was this reply worth to us?
3208    */
3209   uint32_t priority;
3210
3211   /**
3212    * Evaluation result (returned).
3213    */
3214   enum GNUNET_BLOCK_EvaluationResult eval;
3215
3216   /**
3217    * Did we finish processing the associated request?
3218    */ 
3219   int finished;
3220
3221   /**
3222    * Did we find a matching request?
3223    */
3224   int request_found;
3225 };
3226
3227
3228 /**
3229  * We have received a reply; handle it!
3230  *
3231  * @param cls response (struct ProcessReplyClosure)
3232  * @param key our query
3233  * @param value value in the hash map (info about the query)
3234  * @return GNUNET_YES (we should continue to iterate)
3235  */
3236 static int
3237 process_reply (void *cls,
3238                const GNUNET_HashCode * key,
3239                void *value)
3240 {
3241   struct ProcessReplyClosure *prq = cls;
3242   struct PendingRequest *pr = value;
3243   struct PendingMessage *reply;
3244   struct ClientResponseMessage *creply;
3245   struct ClientList *cl;
3246   struct PutMessage *pm;
3247   struct ConnectedPeer *cp;
3248   struct GNUNET_TIME_Relative cur_delay;
3249 #if SUPPORT_DELAYS  
3250 struct GNUNET_TIME_Relative art_delay;
3251 #endif
3252   size_t msize;
3253   unsigned int i;
3254
3255 #if DEBUG_FS
3256   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3257               "Matched result (type %u) for query `%s' with pending request\n",
3258               (unsigned int) prq->type,
3259               GNUNET_h2s (key));
3260 #endif  
3261   GNUNET_STATISTICS_update (stats,
3262                             gettext_noop ("# replies received and matched"),
3263                             1,
3264                             GNUNET_NO);
3265   if (prq->sender != NULL)
3266     {
3267       for (i=0;i<pr->used_targets_off;i++)
3268         if (pr->used_targets[i].pid == prq->sender->pid)
3269           break;
3270       if (i < pr->used_targets_off)
3271         {
3272           cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
3273           prq->sender->avg_delay.rel_value
3274             = (prq->sender->avg_delay.rel_value * 
3275                (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
3276           prq->sender->avg_priority
3277             = (prq->sender->avg_priority * 
3278                (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3279         }
3280       if (pr->cp != NULL)
3281         {
3282           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3283                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3284                                  -1);
3285           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3286           prq->sender->last_p2p_replies
3287             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3288             = pr->cp->pid;
3289         }
3290       else
3291         {
3292           if (NULL != prq->sender->last_client_replies
3293               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3294             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3295                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3296           prq->sender->last_client_replies
3297             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3298             = pr->client_request_list->client_list->client;
3299           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3300         }
3301     }
3302   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3303                                      prq->type,
3304                                      key,
3305                                      &pr->bf,
3306                                      pr->mingle,
3307                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3308                                      prq->data,
3309                                      prq->size);
3310   switch (prq->eval)
3311     {
3312     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3313       break;
3314     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3315       while (NULL != pr->pending_head)
3316         destroy_pending_message_list_entry (pr->pending_head);
3317       if (pr->qe != NULL)
3318         {
3319           if (pr->client_request_list != NULL)
3320             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3321                                         GNUNET_YES);
3322           GNUNET_DATASTORE_cancel (pr->qe);
3323           pr->qe = NULL;
3324         }
3325       pr->do_remove = GNUNET_YES;
3326       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3327         {
3328           GNUNET_SCHEDULER_cancel (pr->task);
3329           pr->task = GNUNET_SCHEDULER_NO_TASK;
3330         }
3331       GNUNET_break (GNUNET_YES ==
3332                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3333                                                           key,
3334                                                           pr));
3335       GNUNET_LOAD_update (rt_entry_lifetime,
3336                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
3337       break;
3338     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3339       GNUNET_STATISTICS_update (stats,
3340                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3341                                 1,
3342                                 GNUNET_NO);
3343 #if DEBUG_FS
3344 /*      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3345                   "Duplicate response `%s', discarding.\n",
3346                   GNUNET_h2s (&mhash));*/
3347 #endif
3348       return GNUNET_YES; /* duplicate */
3349     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3350       return GNUNET_YES; /* wrong namespace */  
3351     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3352       GNUNET_break (0);
3353       return GNUNET_YES;
3354     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3355       GNUNET_break (0);
3356       return GNUNET_YES;
3357     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3358       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3359                   _("Unsupported block type %u\n"),
3360                   prq->type);
3361       return GNUNET_NO;
3362     }
3363   if (pr->client_request_list != NULL)
3364     {
3365       if (pr->replies_seen_size == pr->replies_seen_off)
3366         GNUNET_array_grow (pr->replies_seen,
3367                            pr->replies_seen_size,
3368                            pr->replies_seen_size * 2 + 4);      
3369       GNUNET_CRYPTO_hash (prq->data,
3370                           prq->size,
3371                           &pr->replies_seen[pr->replies_seen_off++]);         
3372       refresh_bloomfilter (pr);
3373     }
3374   if (NULL == prq->sender)
3375     {
3376 #if DEBUG_FS
3377       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3378                   "Found result for query `%s' in local datastore\n",
3379                   GNUNET_h2s (key));
3380 #endif
3381       GNUNET_STATISTICS_update (stats,
3382                                 gettext_noop ("# results found locally"),
3383                                 1,
3384                                 GNUNET_NO);      
3385     }
3386   prq->priority += pr->remaining_priority;
3387   pr->remaining_priority = 0;
3388   pr->results_found++;
3389   prq->request_found = GNUNET_YES;
3390   if (NULL != pr->client_request_list)
3391     {
3392       GNUNET_STATISTICS_update (stats,
3393                                 gettext_noop ("# replies received for local clients"),
3394                                 1,
3395                                 GNUNET_NO);
3396       cl = pr->client_request_list->client_list;
3397       msize = sizeof (struct PutMessage) + prq->size;
3398       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3399       creply->msize = msize;
3400       creply->client_list = cl;
3401       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3402                                          cl->res_tail,
3403                                          cl->res_tail,
3404                                          creply);      
3405       pm = (struct PutMessage*) &creply[1];
3406       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3407       pm->header.size = htons (msize);
3408       pm->type = htonl (prq->type);
3409       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3410       memcpy (&pm[1], prq->data, prq->size);      
3411       if (NULL == cl->th)
3412         {
3413 #if DEBUG_FS
3414           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3415                       "Transmitting result for query `%s' to client\n",
3416                       GNUNET_h2s (key));
3417 #endif  
3418           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3419                                                         msize,
3420                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3421                                                         &transmit_to_client,
3422                                                         cl);
3423         }
3424       GNUNET_break (cl->th != NULL);
3425       if (pr->do_remove)                
3426         {
3427           prq->finished = GNUNET_YES;
3428           destroy_pending_request (pr);         
3429         }
3430     }
3431   else
3432     {
3433       cp = pr->cp;
3434 #if DEBUG_FS
3435       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3436                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3437                   GNUNET_h2s (key),
3438                   (unsigned int) cp->pid);
3439 #endif  
3440       GNUNET_STATISTICS_update (stats,
3441                                 gettext_noop ("# replies received for other peers"),
3442                                 1,
3443                                 GNUNET_NO);
3444       msize = sizeof (struct PutMessage) + prq->size;
3445       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3446       reply->cont = &transmit_reply_continuation;
3447       reply->cont_cls = pr;
3448 #if SUPPORT_DELAYS
3449       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3450                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3451                                                                            TTL_DECREMENT));
3452       reply->delay_until 
3453         = GNUNET_TIME_relative_to_absolute (art_delay);
3454       GNUNET_STATISTICS_update (stats,
3455                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3456                                 art_delay.abs_value,
3457                                 GNUNET_NO);
3458 #endif
3459       reply->msize = msize;
3460       reply->priority = UINT32_MAX; /* send replies first! */
3461       pm = (struct PutMessage*) &reply[1];
3462       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3463       pm->header.size = htons (msize);
3464       pm->type = htonl (prq->type);
3465       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3466       memcpy (&pm[1], prq->data, prq->size);
3467       add_to_pending_messages_for_peer (cp, reply, pr);
3468     }
3469   return GNUNET_YES;
3470 }
3471
3472
3473 /**
3474  * Iterator called on each result obtained for a DHT
3475  * operation that expects a reply
3476  *
3477  * @param cls closure
3478  * @param exp when will this value expire
3479  * @param key key of the result
3480  * @param get_path NULL-terminated array of pointers
3481  *                 to the peers on reverse GET path (or NULL if not recorded)
3482  * @param put_path NULL-terminated array of pointers
3483  *                 to the peers on the PUT path (or NULL if not recorded)
3484  * @param type type of the result
3485  * @param size number of bytes in data
3486  * @param data pointer to the result data
3487  */
3488 static void
3489 process_dht_reply (void *cls,
3490                    struct GNUNET_TIME_Absolute exp,
3491                    const GNUNET_HashCode * key,
3492                    const struct GNUNET_PeerIdentity * const *get_path,
3493                    const struct GNUNET_PeerIdentity * const *put_path,
3494                    enum GNUNET_BLOCK_Type type,
3495                    size_t size,
3496                    const void *data)
3497 {
3498   struct PendingRequest *pr = cls;
3499   struct ProcessReplyClosure prq;
3500
3501   memset (&prq, 0, sizeof (prq));
3502   prq.data = data;
3503   prq.expiration = exp;
3504   prq.size = size;  
3505   prq.type = type;
3506   process_reply (&prq, key, pr);
3507 }
3508
3509
3510
3511 /**
3512  * Continuation called to notify client about result of the
3513  * operation.
3514  *
3515  * @param cls closure
3516  * @param success GNUNET_SYSERR on failure
3517  * @param msg NULL on success, otherwise an error message
3518  */
3519 static void 
3520 put_migration_continuation (void *cls,
3521                             int success,
3522                             const char *msg)
3523 {
3524   struct GNUNET_TIME_Absolute *start = cls;
3525   struct GNUNET_TIME_Relative delay;
3526   
3527   delay = GNUNET_TIME_absolute_get_duration (*start);
3528   GNUNET_free (start);
3529   GNUNET_LOAD_update (datastore_put_load,
3530                       delay.rel_value);
3531   if (GNUNET_OK == success)
3532     return;
3533   GNUNET_STATISTICS_update (stats,
3534                             gettext_noop ("# datastore 'put' failures"),
3535                             1,
3536                             GNUNET_NO);
3537 }
3538
3539
3540 /**
3541  * Handle P2P "PUT" message.
3542  *
3543  * @param cls closure, always NULL
3544  * @param other the other peer involved (sender or receiver, NULL
3545  *        for loopback messages where we are both sender and receiver)
3546  * @param message the actual message
3547  * @param atsi performance information
3548  * @return GNUNET_OK to keep the connection open,
3549  *         GNUNET_SYSERR to close it (signal serious error)
3550  */
3551 static int
3552 handle_p2p_put (void *cls,
3553                 const struct GNUNET_PeerIdentity *other,
3554                 const struct GNUNET_MessageHeader *message,
3555                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3556 {
3557   const struct PutMessage *put;
3558   uint16_t msize;
3559   size_t dsize;
3560   enum GNUNET_BLOCK_Type type;
3561   struct GNUNET_TIME_Absolute expiration;
3562   GNUNET_HashCode query;
3563   struct ProcessReplyClosure prq;
3564   struct GNUNET_TIME_Absolute *start;
3565   struct GNUNET_TIME_Relative block_time;  
3566   double putl;
3567   struct ConnectedPeer *cp; 
3568   struct PendingMessage *pm;
3569   struct MigrationStopMessage *msm;
3570
3571   msize = ntohs (message->size);
3572   if (msize < sizeof (struct PutMessage))
3573     {
3574       GNUNET_break_op(0);
3575       return GNUNET_SYSERR;
3576     }
3577   put = (const struct PutMessage*) message;
3578   dsize = msize - sizeof (struct PutMessage);
3579   type = ntohl (put->type);
3580   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3581
3582   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3583     return GNUNET_SYSERR;
3584   if (GNUNET_OK !=
3585       GNUNET_BLOCK_get_key (block_ctx,
3586                             type,
3587                             &put[1],
3588                             dsize,
3589                             &query))
3590     {
3591       GNUNET_break_op (0);
3592       return GNUNET_SYSERR;
3593     }
3594 #if DEBUG_FS
3595   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3596               "Received result for query `%s' from peer `%4s'\n",
3597               GNUNET_h2s (&query),
3598               GNUNET_i2s (other));
3599 #endif
3600   GNUNET_STATISTICS_update (stats,
3601                             gettext_noop ("# replies received (overall)"),
3602                             1,
3603                             GNUNET_NO);
3604   /* now, lookup 'query' */
3605   prq.data = (const void*) &put[1];
3606   if (other != NULL)
3607     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3608                                                     &other->hashPubKey);
3609   else
3610     prq.sender = NULL;
3611   prq.size = dsize;
3612   prq.type = type;
3613   prq.expiration = expiration;
3614   prq.priority = 0;
3615   prq.finished = GNUNET_NO;
3616   prq.request_found = GNUNET_NO;
3617   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3618                                               &query,
3619                                               &process_reply,
3620                                               &prq);
3621   if (prq.sender != NULL)
3622     {
3623       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3624       change_host_trust (prq.sender, prq.priority);
3625     }
3626   if ( (GNUNET_YES == active_to_migration) &&
3627        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3628     {      
3629 #if DEBUG_FS
3630       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3631                   "Replicating result for query `%s' with priority %u\n",
3632                   GNUNET_h2s (&query),
3633                   prq.priority);
3634 #endif
3635       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3636       *start = GNUNET_TIME_absolute_get ();
3637       GNUNET_DATASTORE_put (dsh,
3638                             0, &query, dsize, &put[1],
3639                             type, prq.priority, 1 /* anonymity */, 
3640                             expiration, 
3641                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3642                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3643                             &put_migration_continuation, 
3644                             start);
3645     }
3646   putl = GNUNET_LOAD_get_load (datastore_put_load);
3647   if ( (GNUNET_NO == prq.request_found) &&
3648        ( (GNUNET_YES != active_to_migration) ||
3649          (putl > 2.5 * (1 + prq.priority)) ) )
3650     {
3651       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3652                                               &other->hashPubKey);
3653       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000)
3654         return GNUNET_OK; /* already blocked */
3655       /* We're too busy; send MigrationStop message! */
3656       if (GNUNET_YES != active_to_migration) 
3657         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3658       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3659                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3660                                                                                    (unsigned int) (60000 * putl * putl)));
3661       
3662       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3663       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3664                           sizeof (struct MigrationStopMessage));
3665       pm->msize = sizeof (struct MigrationStopMessage);
3666       pm->priority = UINT32_MAX;
3667       msm = (struct MigrationStopMessage*) &pm[1];
3668       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3669       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3670       msm->duration = GNUNET_TIME_relative_hton (block_time);
3671       add_to_pending_messages_for_peer (cp,
3672                                         pm,
3673                                         NULL);
3674     }
3675   return GNUNET_OK;
3676 }
3677
3678
3679 /**
3680  * Handle P2P "MIGRATION_STOP" message.
3681  *
3682  * @param cls closure, always NULL
3683  * @param other the other peer involved (sender or receiver, NULL
3684  *        for loopback messages where we are both sender and receiver)
3685  * @param message the actual message
3686  * @param atsi performance information
3687  * @return GNUNET_OK to keep the connection open,
3688  *         GNUNET_SYSERR to close it (signal serious error)
3689  */
3690 static int
3691 handle_p2p_migration_stop (void *cls,
3692                            const struct GNUNET_PeerIdentity *other,
3693                            const struct GNUNET_MessageHeader *message,
3694                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3695 {
3696   struct ConnectedPeer *cp; 
3697   const struct MigrationStopMessage *msm;
3698
3699   msm = (const struct MigrationStopMessage*) message;
3700   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3701                                           &other->hashPubKey);
3702   if (cp == NULL)
3703     {
3704       GNUNET_break (0);
3705       return GNUNET_OK;
3706     }
3707   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3708   return GNUNET_OK;
3709 }
3710
3711
3712
3713 /* **************************** P2P GET Handling ************************ */
3714
3715
3716 /**
3717  * Closure for 'check_duplicate_request_{peer,client}'.
3718  */
3719 struct CheckDuplicateRequestClosure
3720 {
3721   /**
3722    * The new request we should check if it already exists.
3723    */
3724   const struct PendingRequest *pr;
3725
3726   /**
3727    * Existing request found by the checker, NULL if none.
3728    */
3729   struct PendingRequest *have;
3730 };
3731
3732
3733 /**
3734  * Iterator over entries in the 'query_request_map' that
3735  * tries to see if we have the same request pending from
3736  * the same client already.
3737  *
3738  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3739  * @param key current key code (query, ignored, must match)
3740  * @param value value in the hash map (a 'struct PendingRequest' 
3741  *              that already exists)
3742  * @return GNUNET_YES if we should continue to
3743  *         iterate (no match yet)
3744  *         GNUNET_NO if not (match found).
3745  */
3746 static int
3747 check_duplicate_request_client (void *cls,
3748                                 const GNUNET_HashCode * key,
3749                                 void *value)
3750 {
3751   struct CheckDuplicateRequestClosure *cdc = cls;
3752   struct PendingRequest *have = value;
3753
3754   if (have->client_request_list == NULL)
3755     return GNUNET_YES;
3756   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3757        (cdc->pr != have) )
3758     {
3759       cdc->have = have;
3760       return GNUNET_NO;
3761     }
3762   return GNUNET_YES;
3763 }
3764
3765
3766 /**
3767  * We're processing (local) results for a search request
3768  * from another peer.  Pass applicable results to the
3769  * peer and if we are done either clean up (operation
3770  * complete) or forward to other peers (more results possible).
3771  *
3772  * @param cls our closure (struct LocalGetContext)
3773  * @param key key for the content
3774  * @param size number of bytes in data
3775  * @param data content stored
3776  * @param type type of the content
3777  * @param priority priority of the content
3778  * @param anonymity anonymity-level for the content
3779  * @param expiration expiration time for the content
3780  * @param uid unique identifier for the datum;
3781  *        maybe 0 if no unique identifier is available
3782  */
3783 static void
3784 process_local_reply (void *cls,
3785                      const GNUNET_HashCode * key,
3786                      size_t size,
3787                      const void *data,
3788                      enum GNUNET_BLOCK_Type type,
3789                      uint32_t priority,
3790                      uint32_t anonymity,
3791                      struct GNUNET_TIME_Absolute
3792                      expiration, 
3793                      uint64_t uid)
3794 {
3795   struct PendingRequest *pr = cls;
3796   struct ProcessReplyClosure prq;
3797   struct CheckDuplicateRequestClosure cdrc;
3798   GNUNET_HashCode query;
3799   unsigned int old_rf;
3800   
3801   if (NULL == key)
3802     {
3803 #if DEBUG_FS > 1
3804       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3805                   "Done processing local replies, forwarding request to other peers.\n");
3806 #endif
3807       pr->qe = NULL;
3808       if (pr->client_request_list != NULL)
3809         {
3810           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3811                                       GNUNET_YES);
3812           /* Figure out if this is a duplicate request and possibly
3813              merge 'struct PendingRequest' entries */
3814           cdrc.have = NULL;
3815           cdrc.pr = pr;
3816           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3817                                                       &pr->query,
3818                                                       &check_duplicate_request_client,
3819                                                       &cdrc);
3820           if (cdrc.have != NULL)
3821             {
3822 #if DEBUG_FS
3823               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3824                           "Received request for block `%s' twice from client, will only request once.\n",
3825                           GNUNET_h2s (&pr->query));
3826 #endif
3827               
3828               destroy_pending_request (pr);
3829               return;
3830             }
3831         }
3832       if (pr->local_only == GNUNET_YES)
3833         {
3834           destroy_pending_request (pr);
3835           return;
3836         }
3837       /* no more results */
3838       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3839         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
3840                                              pr);      
3841       return;
3842     }
3843 #if DEBUG_FS
3844   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3845               "New local response to `%s' of type %u.\n",
3846               GNUNET_h2s (key),
3847               type);
3848 #endif
3849   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3850     {
3851 #if DEBUG_FS
3852       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3853                   "Found ONDEMAND block, performing on-demand encoding\n");
3854 #endif
3855       GNUNET_STATISTICS_update (stats,
3856                                 gettext_noop ("# on-demand blocks matched requests"),
3857                                 1,
3858                                 GNUNET_NO);
3859       if (GNUNET_OK != 
3860           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3861                                             anonymity, expiration, uid, 
3862                                             &process_local_reply,
3863                                             pr))
3864       if (pr->qe != NULL)
3865         {
3866           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3867         }
3868       return;
3869     }
3870   old_rf = pr->results_found;
3871   memset (&prq, 0, sizeof (prq));
3872   prq.data = data;
3873   prq.expiration = expiration;
3874   prq.size = size;  
3875   if (GNUNET_OK != 
3876       GNUNET_BLOCK_get_key (block_ctx,
3877                             type,
3878                             data,
3879                             size,
3880                             &query))
3881     {
3882       GNUNET_break (0);
3883       GNUNET_DATASTORE_remove (dsh,
3884                                key,
3885                                size, data,
3886                                -1, -1, 
3887                                GNUNET_TIME_UNIT_FOREVER_REL,
3888                                NULL, NULL);
3889       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3890       return;
3891     }
3892   prq.type = type;
3893   prq.priority = priority;  
3894   prq.finished = GNUNET_NO;
3895   prq.request_found = GNUNET_NO;
3896   if ( (old_rf == 0) &&
3897        (pr->results_found == 0) )
3898     update_datastore_delays (pr->start_time);
3899   process_reply (&prq, key, pr);
3900   if (prq.finished == GNUNET_YES)
3901     return;
3902   if (pr->qe == NULL)
3903     return; /* done here */
3904   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3905     {
3906       pr->local_only = GNUNET_YES; /* do not forward */
3907       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3908       return;
3909     }
3910   if ( (pr->client_request_list == NULL) &&
3911        ( (GNUNET_YES == test_get_load_too_high (0)) ||
3912          (pr->results_found > 5 + 2 * pr->priority) ) )
3913     {
3914 #if DEBUG_FS > 2
3915       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3916                   "Load too high, done with request\n");
3917 #endif
3918       GNUNET_STATISTICS_update (stats,
3919                                 gettext_noop ("# processing result set cut short due to load"),
3920                                 1,
3921                                 GNUNET_NO);
3922       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3923       return;
3924     }
3925   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3926 }
3927
3928
3929 /**
3930  * We've received a request with the specified priority.  Bound it
3931  * according to how much we trust the given peer.
3932  * 
3933  * @param prio_in requested priority
3934  * @param cp the peer making the request
3935  * @return effective priority
3936  */
3937 static int32_t
3938 bound_priority (uint32_t prio_in,
3939                 struct ConnectedPeer *cp)
3940 {
3941 #define N ((double)128.0)
3942   uint32_t ret;
3943   double rret;
3944   int ld;
3945
3946   ld = test_get_load_too_high (0);
3947   if (ld == GNUNET_SYSERR)
3948     {
3949       GNUNET_STATISTICS_update (stats,
3950                                 gettext_noop ("# requests done for free (low load)"),
3951                                 1,
3952                                 GNUNET_NO);
3953       return 0; /* excess resources */
3954     }
3955   if (prio_in > INT32_MAX)
3956     prio_in = INT32_MAX;
3957   ret = - change_host_trust (cp, - (int) prio_in);
3958   if (ret > 0)
3959     {
3960       if (ret > current_priorities + N)
3961         rret = current_priorities + N;
3962       else
3963         rret = ret;
3964       current_priorities 
3965         = (current_priorities * (N-1) + rret)/N;
3966     }
3967   if ( (ld == GNUNET_YES) && (ret > 0) )
3968     {
3969       /* try with charging */
3970       ld = test_get_load_too_high (ret);
3971     }
3972   if (ld == GNUNET_YES)
3973     {
3974       GNUNET_STATISTICS_update (stats,
3975                                 gettext_noop ("# request dropped, priority insufficient"),
3976                                 1,
3977                                 GNUNET_NO);
3978       /* undo charge */
3979       change_host_trust (cp, (int) ret);
3980       return -1; /* not enough resources */
3981     }
3982   else
3983     {
3984       GNUNET_STATISTICS_update (stats,
3985                                 gettext_noop ("# requests done for a price (normal load)"),
3986                                 1,
3987                                 GNUNET_NO);
3988     }
3989 #undef N
3990   return ret;
3991 }
3992
3993
3994 /**
3995  * Iterator over entries in the 'query_request_map' that
3996  * tries to see if we have the same request pending from
3997  * the same peer already.
3998  *
3999  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
4000  * @param key current key code (query, ignored, must match)
4001  * @param value value in the hash map (a 'struct PendingRequest' 
4002  *              that already exists)
4003  * @return GNUNET_YES if we should continue to
4004  *         iterate (no match yet)
4005  *         GNUNET_NO if not (match found).
4006  */
4007 static int
4008 check_duplicate_request_peer (void *cls,
4009                               const GNUNET_HashCode * key,
4010                               void *value)
4011 {
4012   struct CheckDuplicateRequestClosure *cdc = cls;
4013   struct PendingRequest *have = value;
4014
4015   if (cdc->pr->target_pid == have->target_pid)
4016     {
4017       cdc->have = have;
4018       return GNUNET_NO;
4019     }
4020   return GNUNET_YES;
4021 }
4022
4023
4024 /**
4025  * Handle P2P "GET" request.
4026  *
4027  * @param cls closure, always NULL
4028  * @param other the other peer involved (sender or receiver, NULL
4029  *        for loopback messages where we are both sender and receiver)
4030  * @param message the actual message
4031  * @param atsi performance information
4032  * @return GNUNET_OK to keep the connection open,
4033  *         GNUNET_SYSERR to close it (signal serious error)
4034  */
4035 static int
4036 handle_p2p_get (void *cls,
4037                 const struct GNUNET_PeerIdentity *other,
4038                 const struct GNUNET_MessageHeader *message,
4039                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4040 {
4041   struct PendingRequest *pr;
4042   struct ConnectedPeer *cp;
4043   struct ConnectedPeer *cps;
4044   struct CheckDuplicateRequestClosure cdc;
4045   struct GNUNET_TIME_Relative timeout;
4046   uint16_t msize;
4047   const struct GetMessage *gm;
4048   unsigned int bits;
4049   const GNUNET_HashCode *opt;
4050   uint32_t bm;
4051   size_t bfsize;
4052   uint32_t ttl_decrement;
4053   int32_t priority;
4054   enum GNUNET_BLOCK_Type type;
4055   int have_ns;
4056
4057   msize = ntohs(message->size);
4058   if (msize < sizeof (struct GetMessage))
4059     {
4060       GNUNET_break_op (0);
4061       return GNUNET_SYSERR;
4062     }
4063   gm = (const struct GetMessage*) message;
4064 #if DEBUG_FS
4065   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4066               "Received request for `%s'\n",
4067               GNUNET_h2s (&gm->query));
4068 #endif
4069   type = ntohl (gm->type);
4070   bm = ntohl (gm->hash_bitmap);
4071   bits = 0;
4072   while (bm > 0)
4073     {
4074       if (1 == (bm & 1))
4075         bits++;
4076       bm >>= 1;
4077     }
4078   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
4079     {
4080       GNUNET_break_op (0);
4081       return GNUNET_SYSERR;
4082     }  
4083   opt = (const GNUNET_HashCode*) &gm[1];
4084   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
4085   /* bfsize must be power of 2, check! */
4086   if (0 != ( (bfsize - 1) & bfsize))
4087     {
4088       GNUNET_break_op (0);
4089       return GNUNET_SYSERR;
4090     }
4091   bm = ntohl (gm->hash_bitmap);
4092   bits = 0;
4093   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4094                                            &other->hashPubKey);
4095   if (NULL == cps)
4096     {
4097       /* peer must have just disconnected */
4098       GNUNET_STATISTICS_update (stats,
4099                                 gettext_noop ("# requests dropped due to initiator not being connected"),
4100                                 1,
4101                                 GNUNET_NO);
4102       return GNUNET_SYSERR;
4103     }
4104   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4105     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4106                                             &opt[bits++]);
4107   else
4108     cp = cps;
4109   if (cp == NULL)
4110     {
4111 #if DEBUG_FS
4112       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4113         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4114                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
4115                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
4116       
4117       else
4118         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4119                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
4120                     GNUNET_i2s (other));
4121 #endif
4122       GNUNET_STATISTICS_update (stats,
4123                                 gettext_noop ("# requests dropped due to missing reverse route"),
4124                                 1,
4125                                 GNUNET_NO);
4126      /* FIXME: try connect? */
4127       return GNUNET_OK;
4128     }
4129   /* note that we can really only check load here since otherwise
4130      peers could find out that we are overloaded by not being
4131      disconnected after sending us a malformed query... */
4132   priority = bound_priority (ntohl (gm->priority), cps);
4133   if (priority < 0)
4134     {
4135 #if DEBUG_FS
4136       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4137                   "Dropping query from `%s', this peer is too busy.\n",
4138                   GNUNET_i2s (other));
4139 #endif
4140       return GNUNET_OK;
4141     }
4142 #if DEBUG_FS 
4143   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4144               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4145               GNUNET_h2s (&gm->query),
4146               (unsigned int) type,
4147               GNUNET_i2s (other),
4148               (unsigned int) bm);
4149 #endif
4150   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4151   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4152                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
4153   if (have_ns)
4154     {
4155       pr->namespace = (GNUNET_HashCode*) &pr[1];
4156       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4157     }
4158   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4159        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
4160         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4161     {
4162       /* don't have BW to send to peer, or would likely take longer than we have for it,
4163          so at best indirect the query */
4164       priority = 0;
4165       pr->forward_only = GNUNET_YES;
4166     }
4167   pr->type = type;
4168   pr->mingle = ntohl (gm->filter_mutator);
4169   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4170     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4171   pr->anonymity_level = 1;
4172   pr->priority = (uint32_t) priority;
4173   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4174   pr->query = gm->query;
4175   /* decrement ttl (always) */
4176   ttl_decrement = 2 * TTL_DECREMENT +
4177     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4178                               TTL_DECREMENT);
4179   if ( (pr->ttl < 0) &&
4180        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4181     {
4182 #if DEBUG_FS
4183       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4184                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4185                   GNUNET_i2s (other),
4186                   pr->ttl,
4187                   ttl_decrement);
4188 #endif
4189       GNUNET_STATISTICS_update (stats,
4190                                 gettext_noop ("# requests dropped due TTL underflow"),
4191                                 1,
4192                                 GNUNET_NO);
4193       /* integer underflow => drop (should be very rare)! */      
4194       GNUNET_free (pr);
4195       return GNUNET_OK;
4196     } 
4197   pr->ttl -= ttl_decrement;
4198   pr->start_time = GNUNET_TIME_absolute_get ();
4199
4200   /* get bloom filter */
4201   if (bfsize > 0)
4202     {
4203       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4204                                                   bfsize,
4205                                                   BLOOMFILTER_K);
4206       pr->bf_size = bfsize;
4207     }
4208   cdc.have = NULL;
4209   cdc.pr = pr;
4210   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4211                                               &gm->query,
4212                                               &check_duplicate_request_peer,
4213                                               &cdc);
4214   if (cdc.have != NULL)
4215     {
4216       if (cdc.have->start_time.abs_value + cdc.have->ttl >=
4217           pr->start_time.abs_value + pr->ttl)
4218         {
4219           /* existing request has higher TTL, drop new one! */
4220           cdc.have->priority += pr->priority;
4221           destroy_pending_request (pr);
4222 #if DEBUG_FS
4223           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4224                       "Have existing request with higher TTL, dropping new request.\n",
4225                       GNUNET_i2s (other));
4226 #endif
4227           GNUNET_STATISTICS_update (stats,
4228                                     gettext_noop ("# requests dropped due to higher-TTL request"),
4229                                     1,
4230                                     GNUNET_NO);
4231           return GNUNET_OK;
4232         }
4233       else
4234         {
4235           /* existing request has lower TTL, drop old one! */
4236           pr->priority += cdc.have->priority;
4237           /* Possible optimization: if we have applicable pending
4238              replies in 'cdc.have', we might want to move those over
4239              (this is a really rare special-case, so it is not clear
4240              that this would be worth it) */
4241           destroy_pending_request (cdc.have);
4242           /* keep processing 'pr'! */
4243         }
4244     }
4245
4246   pr->cp = cp;
4247   GNUNET_break (GNUNET_OK ==
4248                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4249                                                    &gm->query,
4250                                                    pr,
4251                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4252   GNUNET_break (GNUNET_OK ==
4253                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4254                                                    &other->hashPubKey,
4255                                                    pr,
4256                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4257   
4258   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4259                                             pr,
4260                                             pr->start_time.abs_value + pr->ttl);
4261
4262   GNUNET_STATISTICS_update (stats,
4263                             gettext_noop ("# P2P searches received"),
4264                             1,
4265                             GNUNET_NO);
4266   GNUNET_STATISTICS_update (stats,
4267                             gettext_noop ("# P2P searches active"),
4268                             1,
4269                             GNUNET_NO);
4270
4271   /* calculate change in traffic preference */
4272   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4273   /* process locally */
4274   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4275     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4276   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4277                                            (pr->priority + 1)); 
4278   if (GNUNET_YES != pr->forward_only)
4279     {
4280 #if DEBUG_FS
4281       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4282                   "Handing request for `%s' to datastore\n",
4283                   GNUNET_h2s (&gm->query));
4284 #endif
4285       pr->qe = GNUNET_DATASTORE_get (dsh,
4286                                      &gm->query,
4287                                      type,                             
4288                                      pr->priority + 1,
4289                                      MAX_DATASTORE_QUEUE,                                
4290                                      timeout,
4291                                      &process_local_reply,
4292                                      pr);
4293       if (NULL == pr->qe)
4294         {
4295           GNUNET_STATISTICS_update (stats,
4296                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
4297                                     1,
4298                                     GNUNET_NO);
4299         }
4300     }
4301   else
4302     {
4303       GNUNET_STATISTICS_update (stats,
4304                                 gettext_noop ("# requests forwarded due to high load"),
4305                                 1,
4306                                 GNUNET_NO);
4307     }
4308
4309   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4310   switch (pr->type)
4311     {
4312     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4313     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4314       /* only one result, wait for datastore */
4315       if (GNUNET_YES != pr->forward_only)
4316         {
4317           GNUNET_STATISTICS_update (stats,
4318                                     gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4319                                     1,
4320                                     GNUNET_NO);
4321           break;
4322         }
4323     default:
4324       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4325         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
4326                                              pr);
4327     }
4328
4329   /* make sure we don't track too many requests */
4330   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4331     {
4332       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4333       GNUNET_assert (pr != NULL);
4334       destroy_pending_request (pr);
4335     }
4336   return GNUNET_OK;
4337 }
4338
4339
4340 /* **************************** CS GET Handling ************************ */
4341
4342
4343 /**
4344  * Handle START_SEARCH-message (search request from client).
4345  *
4346  * @param cls closure
4347  * @param client identification of the client
4348  * @param message the actual message
4349  */
4350 static void
4351 handle_start_search (void *cls,
4352                      struct GNUNET_SERVER_Client *client,
4353                      const struct GNUNET_MessageHeader *message)
4354 {
4355   static GNUNET_HashCode all_zeros;
4356   const struct SearchMessage *sm;
4357   struct ClientList *cl;
4358   struct ClientRequestList *crl;
4359   struct PendingRequest *pr;
4360   uint16_t msize;
4361   unsigned int sc;
4362   enum GNUNET_BLOCK_Type type;
4363
4364   msize = ntohs (message->size);
4365   if ( (msize < sizeof (struct SearchMessage)) ||
4366        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4367     {
4368       GNUNET_break (0);
4369       GNUNET_SERVER_receive_done (client,
4370                                   GNUNET_SYSERR);
4371       return;
4372     }
4373   GNUNET_STATISTICS_update (stats,
4374                             gettext_noop ("# client searches received"),
4375                             1,
4376                             GNUNET_NO);
4377   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4378   sm = (const struct SearchMessage*) message;
4379   type = ntohl (sm->type);
4380 #if DEBUG_FS
4381   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4382               "Received request for `%s' of type %u from local client\n",
4383               GNUNET_h2s (&sm->query),
4384               (unsigned int) type);
4385 #endif
4386   cl = client_list;
4387   while ( (cl != NULL) &&
4388           (cl->client != client) )
4389     cl = cl->next;
4390   if (cl == NULL)
4391     {
4392       cl = GNUNET_malloc (sizeof (struct ClientList));
4393       cl->client = client;
4394       GNUNET_SERVER_client_keep (client);
4395       cl->next = client_list;
4396       client_list = cl;
4397     }
4398   /* detect duplicate KBLOCK requests */
4399   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4400        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4401        (type == GNUNET_BLOCK_TYPE_ANY) )
4402     {
4403       crl = cl->rl_head;
4404       while ( (crl != NULL) &&
4405               ( (0 != memcmp (&crl->req->query,
4406                               &sm->query,
4407                               sizeof (GNUNET_HashCode))) ||
4408                 (crl->req->type != type) ) )
4409         crl = crl->next;
4410       if (crl != NULL)  
4411         { 
4412 #if DEBUG_FS
4413           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4414                       "Have existing request, merging content-seen lists.\n");
4415 #endif
4416           pr = crl->req;
4417           /* Duplicate request (used to send long list of
4418              known/blocked results); merge 'pr->replies_seen'
4419              and update bloom filter */
4420           GNUNET_array_grow (pr->replies_seen,
4421                              pr->replies_seen_size,
4422                              pr->replies_seen_off + sc);
4423           memcpy (&pr->replies_seen[pr->replies_seen_off],
4424                   &sm[1],
4425                   sc * sizeof (GNUNET_HashCode));
4426           pr->replies_seen_off += sc;
4427           refresh_bloomfilter (pr);
4428           GNUNET_STATISTICS_update (stats,
4429                                     gettext_noop ("# client searches updated (merged content seen list)"),
4430                                     1,
4431                                     GNUNET_NO);
4432           GNUNET_SERVER_receive_done (client,
4433                                       GNUNET_OK);
4434           return;
4435         }
4436     }
4437   GNUNET_STATISTICS_update (stats,
4438                             gettext_noop ("# client searches active"),
4439                             1,
4440                             GNUNET_NO);
4441   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4442                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4443   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4444   memset (crl, 0, sizeof (struct ClientRequestList));
4445   crl->client_list = cl;
4446   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4447                                cl->rl_tail,
4448                                crl);  
4449   crl->req = pr;
4450   pr->type = type;
4451   pr->client_request_list = crl;
4452   GNUNET_array_grow (pr->replies_seen,
4453                      pr->replies_seen_size,
4454                      sc);
4455   memcpy (pr->replies_seen,
4456           &sm[1],
4457           sc * sizeof (GNUNET_HashCode));
4458   pr->replies_seen_off = sc;
4459   pr->anonymity_level = ntohl (sm->anonymity_level); 
4460   pr->start_time = GNUNET_TIME_absolute_get ();
4461   refresh_bloomfilter (pr);
4462   pr->query = sm->query;
4463   if (0 == (1 & ntohl (sm->options)))
4464     pr->local_only = GNUNET_NO;
4465   else
4466     pr->local_only = GNUNET_YES;
4467   switch (type)
4468     {
4469     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4470     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4471       if (0 != memcmp (&sm->target,
4472                        &all_zeros,
4473                        sizeof (GNUNET_HashCode)))
4474         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4475       break;
4476     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4477       pr->namespace = (GNUNET_HashCode*) &pr[1];
4478       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4479       break;
4480     default:
4481       break;
4482     }
4483   GNUNET_break (GNUNET_OK ==
4484                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4485                                                    &sm->query,
4486                                                    pr,
4487                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4488   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4489     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4490   pr->qe = GNUNET_DATASTORE_get (dsh,
4491                                  &sm->query,
4492                                  type,
4493                                  -3, -1,
4494                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4495                                  &process_local_reply,
4496                                  pr);
4497 }
4498
4499
4500 /* **************************** Startup ************************ */
4501
4502 /**
4503  * Process fs requests.
4504  *
4505  * @param server the initialized server
4506  * @param c configuration to use
4507  */
4508 static int
4509 main_init (struct GNUNET_SERVER_Handle *server,
4510            const struct GNUNET_CONFIGURATION_Handle *c)
4511 {
4512   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4513     {
4514       { &handle_p2p_get, 
4515         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4516       { &handle_p2p_put, 
4517         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4518       { &handle_p2p_migration_stop, 
4519         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4520         sizeof (struct MigrationStopMessage) },
4521       { NULL, 0, 0 }
4522     };
4523   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4524     {&GNUNET_FS_handle_index_start, NULL, 
4525      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4526     {&GNUNET_FS_handle_index_list_get, NULL, 
4527      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4528     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4529      sizeof (struct UnindexMessage) },
4530     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4531      0 },
4532     {NULL, NULL, 0, 0}
4533   };
4534   unsigned long long enc = 128;
4535
4536   cfg = c;
4537   stats = GNUNET_STATISTICS_create ("fs", cfg);
4538   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4539   if ( (GNUNET_OK !=
4540         GNUNET_CONFIGURATION_get_value_number (cfg,
4541                                                "fs",
4542                                                "MAX_PENDING_REQUESTS",
4543                                                &max_pending_requests)) ||
4544        (GNUNET_OK !=
4545         GNUNET_CONFIGURATION_get_value_number (cfg,
4546                                                "fs",
4547                                                "EXPECTED_NEIGHBOUR_COUNT",
4548                                                &enc)) ||
4549        (GNUNET_OK != 
4550         GNUNET_CONFIGURATION_get_value_time (cfg,
4551                                              "fs",
4552                                              "MIN_MIGRATION_DELAY",
4553                                              &min_migration_delay)) )
4554     {
4555       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4556                   _("Configuration fails to specify certain parameters, assuming default values."));
4557     }
4558   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4559   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4560   rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
4561   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4562   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4563   core = GNUNET_CORE_connect (cfg,
4564                               1, /* larger? */
4565                               NULL,
4566                               NULL,
4567                               &peer_connect_handler,
4568                               &peer_disconnect_handler,
4569                               &peer_status_handler,
4570                               NULL, GNUNET_NO,
4571                               NULL, GNUNET_NO,
4572                               p2p_handlers);
4573   if (NULL == core)
4574     {
4575       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4576                   _("Failed to connect to `%s' service.\n"),
4577                   "core");
4578       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4579       connected_peers = NULL;
4580       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4581       query_request_map = NULL;
4582       GNUNET_LOAD_value_free (rt_entry_lifetime);
4583       rt_entry_lifetime = NULL;
4584       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4585       requests_by_expiration_heap = NULL;
4586       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4587       peer_request_map = NULL;
4588       if (dsh != NULL)
4589         {
4590           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4591           dsh = NULL;
4592         }
4593       return GNUNET_SYSERR;
4594     }
4595   if (active_from_migration) 
4596     {
4597       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4598                   _("Content migration is enabled, will start to gather data\n"));
4599       consider_migration_gathering ();
4600     }
4601   consider_dht_put_gathering (NULL);
4602   GNUNET_SERVER_disconnect_notify (server, 
4603                                    &handle_client_disconnect,
4604                                    NULL);
4605   GNUNET_assert (GNUNET_OK ==
4606                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4607                                                           "fs",
4608                                                           "TRUST",
4609                                                           &trustDirectory));
4610   GNUNET_DISK_directory_create (trustDirectory);
4611   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
4612                                       &cron_flush_trust, NULL);
4613
4614
4615   GNUNET_SERVER_add_handlers (server, handlers);
4616   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
4617                                 &shutdown_task,
4618                                 NULL);
4619   return GNUNET_OK;
4620 }
4621
4622
4623 /**
4624  * Process fs requests.
4625  *
4626  * @param cls closure
4627  * @param server the initialized server
4628  * @param cfg configuration to use
4629  */
4630 static void
4631 run (void *cls,
4632      struct GNUNET_SERVER_Handle *server,
4633      const struct GNUNET_CONFIGURATION_Handle *cfg)
4634 {
4635   active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4636                                                               "FS",
4637                                                               "CONTENT_CACHING");
4638   active_from_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4639                                                                 "FS",
4640                                                                 "CONTENT_PUSHING");
4641   dsh = GNUNET_DATASTORE_connect (cfg);
4642   if (dsh == NULL)
4643     {
4644       GNUNET_SCHEDULER_shutdown ();
4645       return;
4646     }
4647   datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4648   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4649   block_cfg = GNUNET_CONFIGURATION_create ();
4650   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4651                                          "block",
4652                                          "PLUGINS",
4653                                          "fs");
4654   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4655   GNUNET_assert (NULL != block_ctx);
4656   dht_handle = GNUNET_DHT_connect (cfg,
4657                                    FS_DHT_HT_SIZE);
4658   if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, dsh)) ||
4659        (GNUNET_OK != main_init (server, cfg)) )
4660     {    
4661       GNUNET_SCHEDULER_shutdown ();
4662       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4663       dsh = NULL;
4664       GNUNET_DHT_disconnect (dht_handle);
4665       dht_handle = NULL;
4666       GNUNET_BLOCK_context_destroy (block_ctx);
4667       block_ctx = NULL;
4668       GNUNET_CONFIGURATION_destroy (block_cfg);
4669       block_cfg = NULL;
4670       GNUNET_LOAD_value_free (datastore_get_load);
4671       datastore_get_load = NULL;
4672       GNUNET_LOAD_value_free (datastore_put_load);
4673       datastore_put_load = NULL;
4674       return;   
4675     }
4676 }
4677
4678
4679 /**
4680  * The main function for the fs service.
4681  *
4682  * @param argc number of arguments from the command line
4683  * @param argv command line arguments
4684  * @return 0 ok, 1 on error
4685  */
4686 int
4687 main (int argc, char *const *argv)
4688 {
4689   return (GNUNET_OK ==
4690           GNUNET_SERVICE_run (argc,
4691                               argv,
4692                               "fs",
4693                               GNUNET_SERVICE_OPTION_NONE,
4694                               &run, NULL)) ? 0 : 1;
4695 }
4696
4697 /* end of gnunet-service-fs.c */