docu
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - collect traffic data for anonymity levels > 1
28  * - implement transmission restrictions for anonymity level > 1
29  * - more statistics
30  */
31 #include "platform.h"
32 #include <float.h>
33 #include "gnunet_constants.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_dht_service.h"
36 #include "gnunet_datastore_service.h"
37 #include "gnunet_load_lib.h"
38 #include "gnunet_peer_lib.h"
39 #include "gnunet_protocols.h"
40 #include "gnunet_signatures.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_util_lib.h"
43 #include "gnunet-service-fs_indexing.h"
44 #include "fs.h"
45
46 #define DEBUG_FS GNUNET_NO
47
48 /**
49  * Should we introduce random latency in processing?  Required for proper
50  * implementation of GAP, but can be disabled for performance evaluation of
51  * the basic routing algorithm.
52  *
53  * Note that with delays enabled, performance can be significantly lower
54  * (several orders of magnitude in 2-peer test runs); if you want to
55  * measure throughput of other components, set this to NO.  Also, you
56  * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
57  * a rather wasteful mode of operation (that might still get the highest
58  * throughput overall).
59  *
60  * Performance measurements (for 50 MB file, 2 peers):
61  *
62  * - Without delays: 3300 kb/s
63  * - With    delays:  101 kb/s
64  */
65 #define SUPPORT_DELAYS GNUNET_NO
66
67 /**
68  * Size for the hash map for DHT requests from the FS
69  * service.  Should be about the number of concurrent
70  * DHT requests we plan to make.
71  */
72 #define FS_DHT_HT_SIZE 1024
73
74 /**
75  * At what frequency should our datastore load decrease
76  * automatically (since if we don't use it, clearly the
77  * load must be going down).
78  */
79 #define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
80
81 /**
82  * How often do we flush trust values to disk?
83  */
84 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
85
86 /**
87  * How often do we at most PUT content into the DHT?
88  */
89 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
90
91 /**
92  * Inverse of the probability that we will submit the same query
93  * to the same peer again.  If the same peer already got the query
94  * repeatedly recently, the probability is multiplied by the inverse
95  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
96  * plus MAX_CORK_DELAY (so roughly every 3.5s).
97  *
98  * Note that this factor is a key influence to performance in small
99  * networks (especially test networks of 2 peers) because if there is
100  * only a single peer with the data, this value will determine how
101  * soon we might re-try.  For example, a value of 3 can result in 
102  * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
103  * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
104  * rather inefficient in larger networks, hence picking 1 is in 
105  * general not the best choice.
106  *
107  * Performance measurements (for 50 MB file, 2 peers, no delays):
108  *
109  * - 1: 3300 kb/s (consistently)
110  * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s
111  * - 5:  759 kb/s, 968 kb/s, 1160 kb/s
112  *
113  * Note that this does NOT mean that the value should be 1 since
114  * a 2-peer network is far from representative here (and this fails
115  * to take into consideration bandwidth wasted by repeatedly 
116  * sending queries to peers that don't have the content).  Also,
117  * it is expected that higher values lead to more inconsistent
118  * measurements since this only affects lost messages towards the
119  * end of the download.
120  *
121  * Finally, we should probably consider changing this and making
122  * it dependent on the number of connected peers or a related
123  * metric (bad magic constants...).
124  */
125 #define RETRY_PROBABILITY_INV 1
126
127 /**
128  * What is the maximum delay for a P2P FS message (in our interaction
129  * with core)?  FS-internal delays are another story.  The value is
130  * chosen based on the 32k block size.  Given that peers typcially
131  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
132  * transmit one message even to the lowest-bandwidth peers.
133  */
134 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
135
136 /**
137  * Maximum number of requests (from other peers, overall) that we're
138  * willing to have pending at any given point in time.  Can be changed
139  * via the configuration file (32k is just the default).
140  */
141 static unsigned long long max_pending_requests = (32 * 1024);
142
143
144 /**
145  * Information we keep for each pending reply.  The
146  * actual message follows at the end of this struct.
147  */
148 struct PendingMessage;
149
150 /**
151  * Function called upon completion of a transmission.
152  *
153  * @param cls closure
154  * @param pid ID of receiving peer, 0 on transmission error
155  */
156 typedef void (*TransmissionContinuation)(void * cls, 
157                                          GNUNET_PEER_Id tpid);
158
159
160 /**
161  * Information we keep for each pending message (GET/PUT).  The
162  * actual message follows at the end of this struct.
163  */
164 struct PendingMessage
165 {
166   /**
167    * This is a doubly-linked list of messages to the same peer.
168    */
169   struct PendingMessage *next;
170
171   /**
172    * This is a doubly-linked list of messages to the same peer.
173    */
174   struct PendingMessage *prev;
175
176   /**
177    * Entry in pending message list for this pending message.
178    */ 
179   struct PendingMessageList *pml;  
180
181   /**
182    * Function to call immediately once we have transmitted this
183    * message.
184    */
185   TransmissionContinuation cont;
186
187   /**
188    * Closure for cont.
189    */
190   void *cont_cls;
191
192   /**
193    * Do not transmit this pending message until this deadline.
194    */
195   struct GNUNET_TIME_Absolute delay_until;
196
197   /**
198    * Size of the reply; actual reply message follows
199    * at the end of this struct.
200    */
201   size_t msize;
202   
203   /**
204    * How important is this message for us?
205    */
206   uint32_t priority;
207  
208 };
209
210
211 /**
212  * Information about a peer that we are connected to.
213  * We track data that is useful for determining which
214  * peers should receive our requests.  We also keep
215  * a list of messages to transmit to this peer.
216  */
217 struct ConnectedPeer
218 {
219
220   /**
221    * List of the last clients for which this peer successfully
222    * answered a query.
223    */
224   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
225
226   /**
227    * List of the last PIDs for which
228    * this peer successfully answered a query;
229    * We use 0 to indicate no successful reply.
230    */
231   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
232
233   /**
234    * Average delay between sending the peer a request and
235    * getting a reply (only calculated over the requests for
236    * which we actually got a reply).   Calculated
237    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
238    */ 
239   struct GNUNET_TIME_Relative avg_delay;
240
241   /**
242    * Point in time until which this peer does not want us to migrate content
243    * to it.
244    */
245   struct GNUNET_TIME_Absolute migration_blocked;
246
247   /**
248    * Time until when we blocked this peer from migrating
249    * data to us.
250    */
251   struct GNUNET_TIME_Absolute last_migration_block;
252
253   /**
254    * Transmission times for the last MAX_QUEUE_PER_PEER
255    * requests for this peer.  Used as a ring buffer, current
256    * offset is stored in 'last_request_times_off'.  If the
257    * oldest entry is more recent than the 'avg_delay', we should
258    * not send any more requests right now.
259    */
260   struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
261
262   /**
263    * Handle for an active request for transmission to this
264    * peer, or NULL.
265    */
266   struct GNUNET_CORE_TransmitHandle *cth;
267
268   /**
269    * Messages (replies, queries, content migration) we would like to
270    * send to this peer in the near future.  Sorted by priority, head.
271    */
272   struct PendingMessage *pending_messages_head;
273
274   /**
275    * Messages (replies, queries, content migration) we would like to
276    * send to this peer in the near future.  Sorted by priority, tail.
277    */
278   struct PendingMessage *pending_messages_tail;
279
280   /**
281    * How long does it typically take for us to transmit a message
282    * to this peer?  (delay between the request being issued and
283    * the callback being invoked).
284    */
285   struct GNUNET_LOAD_Value *transmission_delay;
286
287   /**
288    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
289    */
290   struct GNUNET_CORE_InformationRequestContext *irc;
291
292   /**
293    * Request for which 'irc' is currently active (or NULL).
294    */
295   struct PendingRequest *pr;
296
297   /**
298    * Time when the last transmission request was issued.
299    */
300   struct GNUNET_TIME_Absolute last_transmission_request_start;
301
302   /**
303    * ID of delay task for scheduling transmission.
304    */
305   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
306
307   /**
308    * Average priority of successful replies.  Calculated
309    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
310    */
311   double avg_priority;
312
313   /**
314    * Increase in traffic preference still to be submitted
315    * to the core service for this peer.
316    */
317   uint64_t inc_preference;
318
319   /**
320    * Trust rating for this peer
321    */
322   uint32_t trust;
323
324   /**
325    * Trust rating for this peer on disk.
326    */
327   uint32_t disk_trust;
328
329   /**
330    * The peer's identity.
331    */
332   GNUNET_PEER_Id pid;  
333
334   /**
335    * Size of the linked list of 'pending_messages'.
336    */
337   unsigned int pending_requests;
338
339   /**
340    * Which offset in "last_p2p_replies" will be updated next?
341    * (we go round-robin).
342    */
343   unsigned int last_p2p_replies_woff;
344
345   /**
346    * Which offset in "last_client_replies" will be updated next?
347    * (we go round-robin).
348    */
349   unsigned int last_client_replies_woff;
350
351   /**
352    * Current offset into 'last_request_times' ring buffer.
353    */
354   unsigned int last_request_times_off;
355
356 };
357
358
359 /**
360  * Information we keep for each pending request.  We should try to
361  * keep this struct as small as possible since its memory consumption
362  * is key to how many requests we can have pending at once.
363  */
364 struct PendingRequest;
365
366
367 /**
368  * Doubly-linked list of requests we are performing
369  * on behalf of the same client.
370  */
371 struct ClientRequestList
372 {
373
374   /**
375    * This is a doubly-linked list.
376    */
377   struct ClientRequestList *next;
378
379   /**
380    * This is a doubly-linked list.
381    */
382   struct ClientRequestList *prev;
383
384   /**
385    * Request this entry represents.
386    */
387   struct PendingRequest *req;
388
389   /**
390    * Client list this request belongs to.
391    */
392   struct ClientList *client_list;
393
394 };
395
396
397 /**
398  * Replies to be transmitted to the client.  The actual
399  * response message is allocated after this struct.
400  */
401 struct ClientResponseMessage
402 {
403   /**
404    * This is a doubly-linked list.
405    */
406   struct ClientResponseMessage *next;
407
408   /**
409    * This is a doubly-linked list.
410    */
411   struct ClientResponseMessage *prev;
412
413   /**
414    * Client list entry this response belongs to.
415    */
416   struct ClientList *client_list;
417
418   /**
419    * Number of bytes in the response.
420    */
421   size_t msize;
422 };
423
424
425 /**
426  * Linked list of clients we are performing requests
427  * for right now.
428  */
429 struct ClientList
430 {
431   /**
432    * This is a linked list.
433    */
434   struct ClientList *next;
435
436   /**
437    * ID of a client making a request, NULL if this entry is for a
438    * peer.
439    */
440   struct GNUNET_SERVER_Client *client;
441
442   /**
443    * Head of list of requests performed on behalf
444    * of this client right now.
445    */
446   struct ClientRequestList *rl_head;
447
448   /**
449    * Tail of list of requests performed on behalf
450    * of this client right now.
451    */
452   struct ClientRequestList *rl_tail;
453
454   /**
455    * Head of linked list of responses.
456    */
457   struct ClientResponseMessage *res_head;
458
459   /**
460    * Tail of linked list of responses.
461    */
462   struct ClientResponseMessage *res_tail;
463
464   /**
465    * Context for sending replies.
466    */
467   struct GNUNET_CONNECTION_TransmitHandle *th;
468
469 };
470
471
472 /**
473  * Information about a peer that we have forwarded this
474  * request to already.  
475  */
476 struct UsedTargetEntry
477 {
478   /**
479    * What was the last time we have transmitted this request to this
480    * peer?
481    */
482   struct GNUNET_TIME_Absolute last_request_time;
483
484   /**
485    * How often have we transmitted this request to this peer?
486    */
487   unsigned int num_requests;
488
489   /**
490    * PID of the target peer.
491    */
492   GNUNET_PEER_Id pid;
493
494 };
495
496
497
498
499
500 /**
501  * Doubly-linked list of messages we are performing
502  * due to a pending request.
503  */
504 struct PendingMessageList
505 {
506
507   /**
508    * This is a doubly-linked list of messages on behalf of the same request.
509    */
510   struct PendingMessageList *next;
511
512   /**
513    * This is a doubly-linked list of messages on behalf of the same request.
514    */
515   struct PendingMessageList *prev;
516
517   /**
518    * Message this entry represents.
519    */
520   struct PendingMessage *pm;
521
522   /**
523    * Request this entry belongs to.
524    */
525   struct PendingRequest *req;
526
527   /**
528    * Peer this message is targeted for.
529    */
530   struct ConnectedPeer *target;
531
532 };
533
534
535 /**
536  * Information we keep for each pending request.  We should try to
537  * keep this struct as small as possible since its memory consumption
538  * is key to how many requests we can have pending at once.
539  */
540 struct PendingRequest
541 {
542
543   /**
544    * If this request was made by a client, this is our entry in the
545    * client request list; otherwise NULL.
546    */
547   struct ClientRequestList *client_request_list;
548
549   /**
550    * Entry of peer responsible for this entry (if this request
551    * was made by a peer).
552    */
553   struct ConnectedPeer *cp;
554
555   /**
556    * If this is a namespace query, pointer to the hash of the public
557    * key of the namespace; otherwise NULL.  Pointer will be to the 
558    * end of this struct (so no need to free it).
559    */
560   const GNUNET_HashCode *namespace;
561
562   /**
563    * Bloomfilter we use to filter out replies that we don't care about
564    * (anymore).  NULL as long as we are interested in all replies.
565    */
566   struct GNUNET_CONTAINER_BloomFilter *bf;
567
568   /**
569    * Reference to DHT get operation for this request (or NULL).
570    */
571   struct GNUNET_DHT_GetHandle *dht_get;
572
573   /**
574    * Context of our GNUNET_CORE_peer_change_preference call.
575    */
576   struct ConnectedPeer *pirc;
577
578   /**
579    * Hash code of all replies that we have seen so far (only valid
580    * if client is not NULL since we only track replies like this for
581    * our own clients).
582    */
583   GNUNET_HashCode *replies_seen;
584
585   /**
586    * Node in the heap representing this entry; NULL
587    * if we have no heap node.
588    */
589   struct GNUNET_CONTAINER_HeapNode *hnode;
590
591   /**
592    * Head of list of messages being performed on behalf of this
593    * request.
594    */
595   struct PendingMessageList *pending_head;
596
597   /**
598    * Tail of list of messages being performed on behalf of this
599    * request.
600    */
601   struct PendingMessageList *pending_tail;
602
603   /**
604    * When did we first see this request (form this peer), or, if our
605    * client is initiating, when did we last initiate a search?
606    */
607   struct GNUNET_TIME_Absolute start_time;
608
609   /**
610    * The query that this request is for.
611    */
612   GNUNET_HashCode query;
613
614   /**
615    * The task responsible for transmitting queries
616    * for this request.
617    */
618   GNUNET_SCHEDULER_TaskIdentifier task;
619
620   /**
621    * (Interned) Peer identifier that identifies a preferred target
622    * for requests.
623    */
624   GNUNET_PEER_Id target_pid;
625
626   /**
627    * (Interned) Peer identifiers of peers that have already
628    * received our query for this content.
629    */
630   struct UsedTargetEntry *used_targets;
631   
632   /**
633    * Our entry in the queue (non-NULL while we wait for our
634    * turn to interact with the local database).
635    */
636   struct GNUNET_DATASTORE_QueueEntry *qe;
637
638   /**
639    * Size of the 'bf' (in bytes).
640    */
641   size_t bf_size;
642
643   /**
644    * Desired anonymity level; only valid for requests from a local client.
645    */
646   uint32_t anonymity_level;
647
648   /**
649    * How many entries in "used_targets" are actually valid?
650    */
651   unsigned int used_targets_off;
652
653   /**
654    * How long is the "used_targets" array?
655    */
656   unsigned int used_targets_size;
657
658   /**
659    * Number of results found for this request.
660    */
661   unsigned int results_found;
662
663   /**
664    * How many entries in "replies_seen" are actually valid?
665    */
666   unsigned int replies_seen_off;
667
668   /**
669    * How long is the "replies_seen" array?
670    */
671   unsigned int replies_seen_size;
672   
673   /**
674    * Priority with which this request was made.  If one of our clients
675    * made the request, then this is the current priority that we are
676    * using when initiating the request.  This value is used when
677    * we decide to reward other peers with trust for providing a reply.
678    */
679   uint32_t priority;
680
681   /**
682    * Priority points left for us to spend when forwarding this request
683    * to other peers.
684    */
685   uint32_t remaining_priority;
686
687   /**
688    * Number to mingle hashes for bloom-filter tests with.
689    */
690   int32_t mingle;
691
692   /**
693    * TTL with which we saw this request (or, if we initiated, TTL that
694    * we used for the request).
695    */
696   int32_t ttl;
697   
698   /**
699    * Type of the content that this request is for.
700    */
701   enum GNUNET_BLOCK_Type type;
702
703   /**
704    * Remove this request after transmission of the current response.
705    */
706   int8_t do_remove;
707
708   /**
709    * GNUNET_YES if we should not forward this request to other peers.
710    */
711   int8_t local_only;
712
713   /**
714    * GNUNET_YES if we should not forward this request to other peers.
715    */
716   int8_t forward_only;
717
718 };
719
720
721 /**
722  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
723  */
724 struct MigrationReadyBlock
725 {
726
727   /**
728    * This is a doubly-linked list.
729    */
730   struct MigrationReadyBlock *next;
731
732   /**
733    * This is a doubly-linked list.
734    */
735   struct MigrationReadyBlock *prev;
736
737   /**
738    * Query for the block.
739    */
740   GNUNET_HashCode query;
741
742   /**
743    * When does this block expire? 
744    */
745   struct GNUNET_TIME_Absolute expiration;
746
747   /**
748    * Peers we would consider forwarding this
749    * block to.  Zero for empty entries.
750    */
751   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
752
753   /**
754    * Size of the block.
755    */
756   size_t size;
757
758   /**
759    *  Number of targets already used.
760    */
761   unsigned int used_targets;
762
763   /**
764    * Type of the block.
765    */
766   enum GNUNET_BLOCK_Type type;
767 };
768
769
770 /**
771  * Our connection to the datastore.
772  */
773 static struct GNUNET_DATASTORE_Handle *dsh;
774
775 /**
776  * Our block context.
777  */
778 static struct GNUNET_BLOCK_Context *block_ctx;
779
780 /**
781  * Our block configuration.
782  */
783 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
784
785 /**
786  * Our configuration.
787  */
788 static const struct GNUNET_CONFIGURATION_Handle *cfg;
789
790 /**
791  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
792  */
793 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
794
795 /**
796  * Map of peer identifiers to "struct PendingRequest" (for that peer).
797  */
798 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
799
800 /**
801  * Map of query identifiers to "struct PendingRequest" (for that query).
802  */
803 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
804
805 /**
806  * Heap with the request that will expire next at the top.  Contains
807  * pointers of type "struct PendingRequest*"; these will *also* be
808  * aliased from the "requests_by_peer" data structures and the
809  * "requests_by_query" table.  Note that requests from our clients
810  * don't expire and are thus NOT in the "requests_by_expiration"
811  * (or the "requests_by_peer" tables).
812  */
813 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
814
815 /**
816  * Handle for reporting statistics.
817  */
818 static struct GNUNET_STATISTICS_Handle *stats;
819
820 /**
821  * Linked list of clients we are currently processing requests for.
822  */
823 static struct ClientList *client_list;
824
825 /**
826  * Pointer to handle to the core service (points to NULL until we've
827  * connected to it).
828  */
829 static struct GNUNET_CORE_Handle *core;
830
831 /**
832  * Head of linked list of blocks that can be migrated.
833  */
834 static struct MigrationReadyBlock *mig_head;
835
836 /**
837  * Tail of linked list of blocks that can be migrated.
838  */
839 static struct MigrationReadyBlock *mig_tail;
840
841 /**
842  * Request to datastore for migration (or NULL).
843  */
844 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
845
846 /**
847  * Request to datastore for DHT PUTs (or NULL).
848  */
849 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
850
851 /**
852  * Type we will request for the next DHT PUT round from the datastore.
853  */
854 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
855
856 /**
857  * Where do we store trust information?
858  */
859 static char *trustDirectory;
860
861 /**
862  * ID of task that collects blocks for migration.
863  */
864 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
865
866 /**
867  * ID of task that collects blocks for DHT PUTs.
868  */
869 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
870
871 /**
872  * What is the maximum frequency at which we are allowed to
873  * poll the datastore for migration content?
874  */
875 static struct GNUNET_TIME_Relative min_migration_delay;
876
877 /**
878  * Handle for DHT operations.
879  */
880 static struct GNUNET_DHT_Handle *dht_handle;
881
882 /**
883  * Size of the doubly-linked list of migration blocks.
884  */
885 static unsigned int mig_size;
886
887 /**
888  * Are we allowed to migrate content to this peer.
889  */
890 static int active_to_migration;
891
892 /**
893  * Are we allowed to push out content from this peer.
894  */
895 static int active_from_migration;
896
897 /**
898  * How many entires with zero anonymity do we currently estimate
899  * to have in the database?
900  */
901 static unsigned int zero_anonymity_count_estimate;
902
903 /**
904  * Typical priorities we're seeing from other peers right now.  Since
905  * most priorities will be zero, this value is the weighted average of
906  * non-zero priorities seen "recently".  In order to ensure that new
907  * values do not dramatically change the ratio, values are first
908  * "capped" to a reasonable range (+N of the current value) and then
909  * averaged into the existing value by a ratio of 1:N.  Hence
910  * receiving the largest possible priority can still only raise our
911  * "current_priorities" by at most 1.
912  */
913 static double current_priorities;
914
915 /**
916  * Datastore 'GET' load tracking.
917  */
918 static struct GNUNET_LOAD_Value *datastore_get_load;
919
920 /**
921  * Datastore 'PUT' load tracking.
922  */
923 static struct GNUNET_LOAD_Value *datastore_put_load;
924
925 /**
926  * How long do requests typically stay in the routing table?
927  */
928 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
929
930 /**
931  * We've just now completed a datastore request.  Update our
932  * datastore load calculations.
933  *
934  * @param start time when the datastore request was issued
935  */
936 static void
937 update_datastore_delays (struct GNUNET_TIME_Absolute start)
938 {
939   struct GNUNET_TIME_Relative delay;
940
941   delay = GNUNET_TIME_absolute_get_duration (start);
942   GNUNET_LOAD_update (datastore_get_load,
943                       delay.rel_value);
944 }
945
946
947 /**
948  * Get the filename under which we would store the GNUNET_HELLO_Message
949  * for the given host and protocol.
950  * @return filename of the form DIRECTORY/HOSTID
951  */
952 static char *
953 get_trust_filename (const struct GNUNET_PeerIdentity *id)
954 {
955   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
956   char *fn;
957
958   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
959   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
960   return fn;
961 }
962
963
964
965 /**
966  * Transmit messages by copying it to the target buffer
967  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
968  * for writing in the meantime.  In that case, do nothing
969  * (the disconnect or shutdown handler will take care of the rest).
970  * If we were able to transmit messages and there are still more
971  * pending, ask core again for further calls to this function.
972  *
973  * @param cls closure, pointer to the 'struct ConnectedPeer*'
974  * @param size number of bytes available in buf
975  * @param buf where the callee should write the message
976  * @return number of bytes written to buf
977  */
978 static size_t
979 transmit_to_peer (void *cls,
980                   size_t size, void *buf);
981
982
983 /* ******************* clean up functions ************************ */
984
985 /**
986  * Delete the given migration block.
987  *
988  * @param mb block to delete
989  */
990 static void
991 delete_migration_block (struct MigrationReadyBlock *mb)
992 {
993   GNUNET_CONTAINER_DLL_remove (mig_head,
994                                mig_tail,
995                                mb);
996   GNUNET_PEER_decrement_rcs (mb->target_list,
997                              MIGRATION_LIST_SIZE);
998   mig_size--;
999   GNUNET_free (mb);
1000 }
1001
1002
1003 /**
1004  * Compare the distance of two peers to a key.
1005  *
1006  * @param key key
1007  * @param p1 first peer
1008  * @param p2 second peer
1009  * @return GNUNET_YES if P1 is closer to key than P2
1010  */
1011 static int
1012 is_closer (const GNUNET_HashCode *key,
1013            const struct GNUNET_PeerIdentity *p1,
1014            const struct GNUNET_PeerIdentity *p2)
1015 {
1016   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
1017                                     &p2->hashPubKey,
1018                                     key);
1019 }
1020
1021
1022 /**
1023  * Consider migrating content to a given peer.
1024  *
1025  * @param cls 'struct MigrationReadyBlock*' to select
1026  *            targets for (or NULL for none)
1027  * @param key ID of the peer 
1028  * @param value 'struct ConnectedPeer' of the peer
1029  * @return GNUNET_YES (always continue iteration)
1030  */
1031 static int
1032 consider_migration (void *cls,
1033                     const GNUNET_HashCode *key,
1034                     void *value)
1035 {
1036   struct MigrationReadyBlock *mb = cls;
1037   struct ConnectedPeer *cp = value;
1038   struct MigrationReadyBlock *pos;
1039   struct GNUNET_PeerIdentity cppid;
1040   struct GNUNET_PeerIdentity otherpid;
1041   struct GNUNET_PeerIdentity worstpid;
1042   size_t msize;
1043   unsigned int i;
1044   unsigned int repl;
1045   
1046   /* consider 'cp' as a migration target for mb */
1047   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
1048     return GNUNET_YES; /* peer has requested no migration! */
1049   if (mb != NULL)
1050     {
1051       GNUNET_PEER_resolve (cp->pid,
1052                            &cppid);
1053       repl = MIGRATION_LIST_SIZE;
1054       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1055         {
1056           if (mb->target_list[i] == 0)
1057             {
1058               mb->target_list[i] = cp->pid;
1059               GNUNET_PEER_change_rc (mb->target_list[i], 1);
1060               repl = MIGRATION_LIST_SIZE;
1061               break;
1062             }
1063           GNUNET_PEER_resolve (mb->target_list[i],
1064                                &otherpid);
1065           if ( (repl == MIGRATION_LIST_SIZE) &&
1066                is_closer (&mb->query,
1067                           &cppid,
1068                           &otherpid)) 
1069             {
1070               repl = i;
1071               worstpid = otherpid;
1072             }
1073           else if ( (repl != MIGRATION_LIST_SIZE) &&
1074                     (is_closer (&mb->query,
1075                                 &worstpid,
1076                                 &otherpid) ) )
1077             {
1078               repl = i;
1079               worstpid = otherpid;
1080             }       
1081         }
1082       if (repl != MIGRATION_LIST_SIZE) 
1083         {
1084           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1085           mb->target_list[repl] = cp->pid;
1086           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1087         }
1088     }
1089
1090   /* consider scheduling transmission to cp for content migration */
1091   if (cp->cth != NULL)        
1092     return GNUNET_YES; 
1093   msize = 0;
1094   pos = mig_head;
1095   while (pos != NULL)
1096     {
1097       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1098         {
1099           if (cp->pid == pos->target_list[i])
1100             {
1101               if (msize == 0)
1102                 msize = pos->size;
1103               else
1104                 msize = GNUNET_MIN (msize,
1105                                     pos->size);
1106               break;
1107             }
1108         }
1109       pos = pos->next;
1110     }
1111   if (msize == 0)
1112     return GNUNET_YES; /* no content available */
1113 #if DEBUG_FS
1114   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1115               "Trying to migrate at least %u bytes to peer `%s'\n",
1116               msize,
1117               GNUNET_h2s (key));
1118 #endif
1119   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1120     {
1121       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1122       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1123     }
1124   cp->cth 
1125     = GNUNET_CORE_notify_transmit_ready (core,
1126                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1127                                          (const struct GNUNET_PeerIdentity*) key,
1128                                          msize + sizeof (struct PutMessage),
1129                                          &transmit_to_peer,
1130                                          cp);
1131   return GNUNET_YES;
1132 }
1133
1134
1135 /**
1136  * Task that is run periodically to obtain blocks for content
1137  * migration
1138  * 
1139  * @param cls unused
1140  * @param tc scheduler context (also unused)
1141  */
1142 static void
1143 gather_migration_blocks (void *cls,
1144                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1145
1146
1147
1148
1149 /**
1150  * Task that is run periodically to obtain blocks for DHT PUTs.
1151  * 
1152  * @param cls type of blocks to gather
1153  * @param tc scheduler context (unused)
1154  */
1155 static void
1156 gather_dht_put_blocks (void *cls,
1157                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1158
1159
1160 /**
1161  * If the migration task is not currently running, consider
1162  * (re)scheduling it with the appropriate delay.
1163  */
1164 static void
1165 consider_migration_gathering ()
1166 {
1167   struct GNUNET_TIME_Relative delay;
1168
1169   if (dsh == NULL)
1170     return;
1171   if (mig_qe != NULL)
1172     return;
1173   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1174     return;
1175   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1176                                          mig_size);
1177   delay = GNUNET_TIME_relative_divide (delay,
1178                                        MAX_MIGRATION_QUEUE);
1179   delay = GNUNET_TIME_relative_max (delay,
1180                                     min_migration_delay);
1181   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
1182                                            &gather_migration_blocks,
1183                                            NULL);
1184 }
1185
1186
1187 /**
1188  * If the DHT PUT gathering task is not currently running, consider
1189  * (re)scheduling it with the appropriate delay.
1190  */
1191 static void
1192 consider_dht_put_gathering (void *cls)
1193 {
1194   struct GNUNET_TIME_Relative delay;
1195
1196   if (dsh == NULL)
1197     return;
1198   if (dht_qe != NULL)
1199     return;
1200   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1201     return;
1202   if (zero_anonymity_count_estimate > 0)
1203     {
1204       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1205                                            zero_anonymity_count_estimate);
1206       delay = GNUNET_TIME_relative_min (delay,
1207                                         MAX_DHT_PUT_FREQ);
1208     }
1209   else
1210     {
1211       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1212          (hopefully) appear */
1213       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1214     }
1215   dht_task = GNUNET_SCHEDULER_add_delayed (delay,
1216                                            &gather_dht_put_blocks,
1217                                            cls);
1218 }
1219
1220
1221 /**
1222  * Process content offered for migration.
1223  *
1224  * @param cls closure
1225  * @param key key for the content
1226  * @param size number of bytes in data
1227  * @param data content stored
1228  * @param type type of the content
1229  * @param priority priority of the content
1230  * @param anonymity anonymity-level for the content
1231  * @param expiration expiration time for the content
1232  * @param uid unique identifier for the datum;
1233  *        maybe 0 if no unique identifier is available
1234  */
1235 static void
1236 process_migration_content (void *cls,
1237                            const GNUNET_HashCode * key,
1238                            size_t size,
1239                            const void *data,
1240                            enum GNUNET_BLOCK_Type type,
1241                            uint32_t priority,
1242                            uint32_t anonymity,
1243                            struct GNUNET_TIME_Absolute
1244                            expiration, uint64_t uid)
1245 {
1246   struct MigrationReadyBlock *mb;
1247   
1248   if (key == NULL)
1249     {
1250       mig_qe = NULL;
1251       if (mig_size < MAX_MIGRATION_QUEUE)  
1252         consider_migration_gathering ();
1253       return;
1254     }
1255   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1256     {
1257       if (GNUNET_OK !=
1258           GNUNET_FS_handle_on_demand_block (key, size, data,
1259                                             type, priority, anonymity,
1260                                             expiration, uid, 
1261                                             &process_migration_content,
1262                                             NULL))
1263         {
1264           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1265         }
1266       return;
1267     }
1268 #if DEBUG_FS
1269   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1270               "Retrieved block `%s' of type %u for migration\n",
1271               GNUNET_h2s (key),
1272               type);
1273 #endif
1274   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1275   mb->query = *key;
1276   mb->expiration = expiration;
1277   mb->size = size;
1278   mb->type = type;
1279   memcpy (&mb[1], data, size);
1280   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1281                                      mig_tail,
1282                                      mig_tail,
1283                                      mb);
1284   mig_size++;
1285   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1286                                          &consider_migration,
1287                                          mb);
1288   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1289 }
1290
1291
1292 /**
1293  * Function called upon completion of the DHT PUT operation.
1294  */
1295 static void
1296 dht_put_continuation (void *cls,
1297                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1298 {
1299   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1300 }
1301
1302
1303 /**
1304  * Store content in DHT.
1305  *
1306  * @param cls closure
1307  * @param key key for the content
1308  * @param size number of bytes in data
1309  * @param data content stored
1310  * @param type type of the content
1311  * @param priority priority of the content
1312  * @param anonymity anonymity-level for the content
1313  * @param expiration expiration time for the content
1314  * @param uid unique identifier for the datum;
1315  *        maybe 0 if no unique identifier is available
1316  */
1317 static void
1318 process_dht_put_content (void *cls,
1319                          const GNUNET_HashCode * key,
1320                          size_t size,
1321                          const void *data,
1322                          enum GNUNET_BLOCK_Type type,
1323                          uint32_t priority,
1324                          uint32_t anonymity,
1325                          struct GNUNET_TIME_Absolute
1326                          expiration, uint64_t uid)
1327
1328   static unsigned int counter;
1329   static GNUNET_HashCode last_vhash;
1330   static GNUNET_HashCode vhash;
1331
1332   if (key == NULL)
1333     {
1334       dht_qe = NULL;
1335       consider_dht_put_gathering (cls);
1336       return;
1337     }
1338   /* slightly funky code to estimate the total number of values with zero
1339      anonymity from the maximum observed length of a monotonically increasing 
1340      sequence of hashes over the contents */
1341   GNUNET_CRYPTO_hash (data, size, &vhash);
1342   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1343     {
1344       if (zero_anonymity_count_estimate > 0)
1345         zero_anonymity_count_estimate /= 2;
1346       counter = 0;
1347     }
1348   last_vhash = vhash;
1349   if (counter < 31)
1350     counter++;
1351   if (zero_anonymity_count_estimate < (1 << counter))
1352     zero_anonymity_count_estimate = (1 << counter);
1353 #if DEBUG_FS
1354   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355               "Retrieved block `%s' of type %u for DHT PUT\n",
1356               GNUNET_h2s (key),
1357               type);
1358 #endif
1359   GNUNET_DHT_put (dht_handle,
1360                   key,
1361                   DEFAULT_PUT_REPLICATION,
1362                   GNUNET_DHT_RO_NONE,
1363                   type,
1364                   size,
1365                   data,
1366                   expiration,
1367                   GNUNET_TIME_UNIT_FOREVER_REL,
1368                   &dht_put_continuation,
1369                   cls);
1370 }
1371
1372
1373 /**
1374  * Task that is run periodically to obtain blocks for content
1375  * migration
1376  * 
1377  * @param cls unused
1378  * @param tc scheduler context (also unused)
1379  */
1380 static void
1381 gather_migration_blocks (void *cls,
1382                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1383 {
1384   mig_task = GNUNET_SCHEDULER_NO_TASK;
1385   if (dsh != NULL)
1386     {
1387       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1388                                             GNUNET_TIME_UNIT_FOREVER_REL,
1389                                             &process_migration_content, NULL);
1390       GNUNET_assert (mig_qe != NULL);
1391     }
1392 }
1393
1394
1395 /**
1396  * Task that is run periodically to obtain blocks for DHT PUTs.
1397  * 
1398  * @param cls type of blocks to gather
1399  * @param tc scheduler context (unused)
1400  */
1401 static void
1402 gather_dht_put_blocks (void *cls,
1403                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1404 {
1405   dht_task = GNUNET_SCHEDULER_NO_TASK;
1406   if (dsh != NULL)
1407     {
1408       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1409         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1410       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1411                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1412                                                     dht_put_type++,
1413                                                     &process_dht_put_content, NULL);
1414       GNUNET_assert (dht_qe != NULL);
1415     }
1416 }
1417
1418
1419 /**
1420  * We're done with a particular message list entry.
1421  * Free all associated resources.
1422  * 
1423  * @param pml entry to destroy
1424  */
1425 static void
1426 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1427 {
1428   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1429                                pml->req->pending_tail,
1430                                pml);
1431   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1432                                pml->target->pending_messages_tail,
1433                                pml->pm);
1434   pml->target->pending_requests--;
1435   GNUNET_free (pml->pm);
1436   GNUNET_free (pml);
1437 }
1438
1439
1440 /**
1441  * Destroy the given pending message (and call the respective
1442  * continuation).
1443  *
1444  * @param pm message to destroy
1445  * @param tpid id of peer that the message was delivered to, or 0 for none
1446  */
1447 static void
1448 destroy_pending_message (struct PendingMessage *pm,
1449                          GNUNET_PEER_Id tpid)
1450 {
1451   struct PendingMessageList *pml = pm->pml;
1452   TransmissionContinuation cont;
1453   void *cont_cls;
1454
1455   cont = pm->cont;
1456   cont_cls = pm->cont_cls;
1457   if (pml != NULL)
1458     {
1459       GNUNET_assert (pml->pm == pm);
1460       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1461       destroy_pending_message_list_entry (pml);
1462     }
1463   else
1464     {
1465       GNUNET_free (pm);
1466     }
1467   if (cont != NULL)
1468     cont (cont_cls, tpid);  
1469 }
1470
1471
1472 /**
1473  * We're done processing a particular request.
1474  * Free all associated resources.
1475  *
1476  * @param pr request to destroy
1477  */
1478 static void
1479 destroy_pending_request (struct PendingRequest *pr)
1480 {
1481   struct GNUNET_PeerIdentity pid;
1482   unsigned int i;
1483
1484   if (pr->hnode != NULL)
1485     {
1486       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1487                                          pr->hnode);
1488       pr->hnode = NULL;
1489     }
1490   if (NULL == pr->client_request_list)
1491     {
1492       GNUNET_STATISTICS_update (stats,
1493                                 gettext_noop ("# P2P searches active"),
1494                                 -1,
1495                                 GNUNET_NO);
1496     }
1497   else
1498     {
1499       GNUNET_STATISTICS_update (stats,
1500                                 gettext_noop ("# client searches active"),
1501                                 -1,
1502                                 GNUNET_NO);
1503     }
1504   if (GNUNET_YES == 
1505       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1506                                             &pr->query,
1507                                             pr))
1508     {
1509       GNUNET_LOAD_update (rt_entry_lifetime,
1510                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
1511     }
1512   if (pr->qe != NULL)
1513      {
1514       GNUNET_DATASTORE_cancel (pr->qe);
1515       pr->qe = NULL;
1516     }
1517   if (pr->dht_get != NULL)
1518     {
1519       GNUNET_DHT_get_stop (pr->dht_get);
1520       pr->dht_get = NULL;
1521     }
1522   if (pr->client_request_list != NULL)
1523     {
1524       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1525                                    pr->client_request_list->client_list->rl_tail,
1526                                    pr->client_request_list);
1527       GNUNET_free (pr->client_request_list);
1528       pr->client_request_list = NULL;
1529     }
1530   if (pr->cp != NULL)
1531     {
1532       GNUNET_PEER_resolve (pr->cp->pid,
1533                            &pid);
1534       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1535                                                    &pid.hashPubKey,
1536                                                    pr);
1537       pr->cp = NULL;
1538     }
1539   if (pr->bf != NULL)
1540     {
1541       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1542       pr->bf = NULL;
1543     }
1544   if (pr->pirc != NULL)
1545     {
1546       GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc);
1547       pr->pirc->irc = NULL;
1548       pr->pirc = NULL;
1549     }
1550   if (pr->replies_seen != NULL)
1551     {
1552       GNUNET_free (pr->replies_seen);
1553       pr->replies_seen = NULL;
1554     }
1555   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1556     {
1557       GNUNET_SCHEDULER_cancel (pr->task);
1558       pr->task = GNUNET_SCHEDULER_NO_TASK;
1559     }
1560   while (NULL != pr->pending_head)    
1561     destroy_pending_message_list_entry (pr->pending_head);
1562   GNUNET_PEER_change_rc (pr->target_pid, -1);
1563   if (pr->used_targets != NULL)
1564     {
1565       for (i=0;i<pr->used_targets_off;i++)
1566         GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1567       GNUNET_free (pr->used_targets);
1568       pr->used_targets_off = 0;
1569       pr->used_targets_size = 0;
1570       pr->used_targets = NULL;
1571     }
1572   GNUNET_free (pr);
1573 }
1574
1575
1576 /**
1577  * Find latency information in 'atsi'.
1578  *
1579  * @param atsi performance data
1580  * @return connection latency
1581  */
1582 static struct GNUNET_TIME_Relative
1583 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1584 {
1585   /* FIXME: extract latency data from 'atsi' */
1586   return GNUNET_TIME_UNIT_SECONDS;
1587 }
1588
1589
1590 /**
1591  * Method called whenever a given peer connects.
1592  *
1593  * @param cls closure, not used
1594  * @param peer peer identity this notification is about
1595  * @param atsi performance information
1596  */
1597 static void 
1598 peer_connect_handler (void *cls,
1599                       const struct
1600                       GNUNET_PeerIdentity * peer,
1601                       const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1602 {
1603   struct ConnectedPeer *cp;
1604   struct MigrationReadyBlock *pos;
1605   char *fn;
1606   uint32_t trust;
1607   struct GNUNET_TIME_Relative latency;
1608
1609   latency = get_latency (atsi);
1610   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1611                                           &peer->hashPubKey);
1612   if (NULL != cp)
1613     {
1614       GNUNET_break (0);
1615       return;
1616     }
1617   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1618   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
1619   cp->pid = GNUNET_PEER_intern (peer);
1620
1621   fn = get_trust_filename (peer);
1622   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1623       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1624     cp->disk_trust = cp->trust = ntohl (trust);
1625   GNUNET_free (fn);
1626
1627   GNUNET_break (GNUNET_OK ==
1628                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1629                                                    &peer->hashPubKey,
1630                                                    cp,
1631                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1632
1633   pos = mig_head;
1634   while (NULL != pos)
1635     {
1636       (void) consider_migration (pos, &peer->hashPubKey, cp);
1637       pos = pos->next;
1638     }
1639 }
1640
1641
1642 /**
1643  * Method called whenever a given peer has a status change.
1644  *
1645  * @param cls closure
1646  * @param peer peer identity this notification is about
1647  * @param bandwidth_in available amount of inbound bandwidth
1648  * @param bandwidth_out available amount of outbound bandwidth
1649  * @param timeout absolute time when this peer will time out
1650  *        unless we see some further activity from it
1651  * @param atsi status information
1652  */
1653 static void
1654 peer_status_handler (void *cls,
1655                      const struct
1656                      GNUNET_PeerIdentity * peer,
1657                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1658                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1659                      struct GNUNET_TIME_Absolute timeout,
1660                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1661 {
1662   struct ConnectedPeer *cp;
1663   struct GNUNET_TIME_Relative latency;
1664
1665   latency = get_latency (atsi);
1666   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1667                                           &peer->hashPubKey);
1668   if (cp == NULL)
1669     {
1670       GNUNET_break (0);
1671       return;
1672     }
1673   GNUNET_LOAD_value_set_decline (cp->transmission_delay,
1674                                  latency);  
1675 }
1676
1677
1678
1679 /**
1680  * Increase the host credit by a value.
1681  *
1682  * @param host which peer to change the trust value on
1683  * @param value is the int value by which the
1684  *  host credit is to be increased or decreased
1685  * @returns the actual change in trust (positive or negative)
1686  */
1687 static int
1688 change_host_trust (struct ConnectedPeer *host, int value)
1689 {
1690   if (value == 0)
1691     return 0;
1692   GNUNET_assert (host != NULL);
1693   if (value > 0)
1694     {
1695       if (host->trust + value < host->trust)
1696         {
1697           value = UINT32_MAX - host->trust;
1698           host->trust = UINT32_MAX;
1699         }
1700       else
1701         host->trust += value;
1702     }
1703   else
1704     {
1705       if (host->trust < -value)
1706         {
1707           value = -host->trust;
1708           host->trust = 0;
1709         }
1710       else
1711         host->trust += value;
1712     }
1713   return value;
1714 }
1715
1716
1717 /**
1718  * Write host-trust information to a file - flush the buffer entry!
1719  */
1720 static int
1721 flush_trust (void *cls,
1722              const GNUNET_HashCode *key,
1723              void *value)
1724 {
1725   struct ConnectedPeer *host = value;
1726   char *fn;
1727   uint32_t trust;
1728   struct GNUNET_PeerIdentity pid;
1729
1730   if (host->trust == host->disk_trust)
1731     return GNUNET_OK;                     /* unchanged */
1732   GNUNET_PEER_resolve (host->pid,
1733                        &pid);
1734   fn = get_trust_filename (&pid);
1735   if (host->trust == 0)
1736     {
1737       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1738         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1739                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1740     }
1741   else
1742     {
1743       trust = htonl (host->trust);
1744       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1745                                                     sizeof(uint32_t),
1746                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1747                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1748         host->disk_trust = host->trust;
1749     }
1750   GNUNET_free (fn);
1751   return GNUNET_OK;
1752 }
1753
1754 /**
1755  * Call this method periodically to scan data/hosts for new hosts.
1756  */
1757 static void
1758 cron_flush_trust (void *cls,
1759                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1760 {
1761
1762   if (NULL == connected_peers)
1763     return;
1764   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1765                                          &flush_trust,
1766                                          NULL);
1767   if (NULL == tc)
1768     return;
1769   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1770     return;
1771   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1772 }
1773
1774
1775 /**
1776  * Free (each) request made by the peer.
1777  *
1778  * @param cls closure, points to peer that the request belongs to
1779  * @param key current key code
1780  * @param value value in the hash map
1781  * @return GNUNET_YES (we should continue to iterate)
1782  */
1783 static int
1784 destroy_request (void *cls,
1785                  const GNUNET_HashCode * key,
1786                  void *value)
1787 {
1788   const struct GNUNET_PeerIdentity * peer = cls;
1789   struct PendingRequest *pr = value;
1790   
1791   GNUNET_break (GNUNET_YES ==
1792                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1793                                                       &peer->hashPubKey,
1794                                                       pr));
1795   destroy_pending_request (pr);
1796   return GNUNET_YES;
1797 }
1798
1799
1800 /**
1801  * Method called whenever a peer disconnects.
1802  *
1803  * @param cls closure, not used
1804  * @param peer peer identity this notification is about
1805  */
1806 static void
1807 peer_disconnect_handler (void *cls,
1808                          const struct
1809                          GNUNET_PeerIdentity * peer)
1810 {
1811   struct ConnectedPeer *cp;
1812   struct PendingMessage *pm;
1813   unsigned int i;
1814   struct MigrationReadyBlock *pos;
1815   struct MigrationReadyBlock *next;
1816
1817   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1818                                               &peer->hashPubKey,
1819                                               &destroy_request,
1820                                               (void*) peer);
1821   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1822                                           &peer->hashPubKey);
1823   if (cp == NULL)
1824     return;
1825   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1826     {
1827       if (NULL != cp->last_client_replies[i])
1828         {
1829           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1830           cp->last_client_replies[i] = NULL;
1831         }
1832     }
1833   GNUNET_break (GNUNET_YES ==
1834                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1835                                                       &peer->hashPubKey,
1836                                                       cp));
1837   if (cp->irc != NULL)
1838     {
1839       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1840       cp->irc = NULL;
1841       cp->pr->pirc = NULL;
1842       cp->pr = NULL;
1843     }
1844
1845   /* remove this peer from migration considerations; schedule
1846      alternatives */
1847   next = mig_head;
1848   while (NULL != (pos = next))
1849     {
1850       next = pos->next;
1851       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1852         {
1853           if (pos->target_list[i] == cp->pid)
1854             {
1855               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1856               pos->target_list[i] = 0;
1857             }
1858          }
1859       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1860         {
1861           delete_migration_block (pos);
1862           consider_migration_gathering ();
1863           continue;
1864         }
1865       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1866                                              &consider_migration,
1867                                              pos);
1868     }
1869   GNUNET_PEER_change_rc (cp->pid, -1);
1870   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1871   if (NULL != cp->cth)
1872     {
1873       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1874       cp->cth = NULL;
1875     }
1876   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1877     {
1878       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1879       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1880     }
1881   while (NULL != (pm = cp->pending_messages_head))
1882     destroy_pending_message (pm, 0 /* delivery failed */);
1883   GNUNET_LOAD_value_free (cp->transmission_delay);
1884   GNUNET_break (0 == cp->pending_requests);
1885   GNUNET_free (cp);
1886 }
1887
1888
1889 /**
1890  * Iterator over hash map entries that removes all occurences
1891  * of the given 'client' from the 'last_client_replies' of the
1892  * given connected peer.
1893  *
1894  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1895  * @param key current key code (unused)
1896  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1897  * @return GNUNET_YES (we should continue to iterate)
1898  */
1899 static int
1900 remove_client_from_last_client_replies (void *cls,
1901                                         const GNUNET_HashCode * key,
1902                                         void *value)
1903 {
1904   struct GNUNET_SERVER_Client *client = cls;
1905   struct ConnectedPeer *cp = value;
1906   unsigned int i;
1907
1908   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1909     {
1910       if (cp->last_client_replies[i] == client)
1911         {
1912           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1913           cp->last_client_replies[i] = NULL;
1914         }
1915     }  
1916   return GNUNET_YES;
1917 }
1918
1919
1920 /**
1921  * A client disconnected.  Remove all of its pending queries.
1922  *
1923  * @param cls closure, NULL
1924  * @param client identification of the client
1925  */
1926 static void
1927 handle_client_disconnect (void *cls,
1928                           struct GNUNET_SERVER_Client
1929                           * client)
1930 {
1931   struct ClientList *pos;
1932   struct ClientList *prev;
1933   struct ClientRequestList *rcl;
1934   struct ClientResponseMessage *creply;
1935
1936   if (client == NULL)
1937     return;
1938   prev = NULL;
1939   pos = client_list;
1940   while ( (NULL != pos) &&
1941           (pos->client != client) )
1942     {
1943       prev = pos;
1944       pos = pos->next;
1945     }
1946   if (pos == NULL)
1947     return; /* no requests pending for this client */
1948   while (NULL != (rcl = pos->rl_head))
1949     {
1950       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1951                   "Destroying pending request `%s' on disconnect\n",
1952                   GNUNET_h2s (&rcl->req->query));
1953       destroy_pending_request (rcl->req);
1954     }
1955   if (prev == NULL)
1956     client_list = pos->next;
1957   else
1958     prev->next = pos->next;
1959   if (pos->th != NULL)
1960     {
1961       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1962       pos->th = NULL;
1963     }
1964   while (NULL != (creply = pos->res_head))
1965     {
1966       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1967                                    pos->res_tail,
1968                                    creply);
1969       GNUNET_free (creply);
1970     }    
1971   GNUNET_SERVER_client_drop (pos->client);
1972   GNUNET_free (pos);
1973   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1974                                          &remove_client_from_last_client_replies,
1975                                          client);
1976 }
1977
1978
1979 /**
1980  * Iterator to free peer entries.
1981  *
1982  * @param cls closure, unused
1983  * @param key current key code
1984  * @param value value in the hash map (peer entry)
1985  * @return GNUNET_YES (we should continue to iterate)
1986  */
1987 static int 
1988 clean_peer (void *cls,
1989             const GNUNET_HashCode * key,
1990             void *value)
1991 {
1992   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1993   return GNUNET_YES;
1994 }
1995
1996
1997 /**
1998  * Task run during shutdown.
1999  *
2000  * @param cls unused
2001  * @param tc unused
2002  */
2003 static void
2004 shutdown_task (void *cls,
2005                const struct GNUNET_SCHEDULER_TaskContext *tc)
2006 {
2007   if (mig_qe != NULL)
2008     {
2009       GNUNET_DATASTORE_cancel (mig_qe);
2010       mig_qe = NULL;
2011     }
2012   if (dht_qe != NULL)
2013     {
2014       GNUNET_DATASTORE_cancel (dht_qe);
2015       dht_qe = NULL;
2016     }
2017   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
2018     {
2019       GNUNET_SCHEDULER_cancel (mig_task);
2020       mig_task = GNUNET_SCHEDULER_NO_TASK;
2021     }
2022   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
2023     {
2024       GNUNET_SCHEDULER_cancel (dht_task);
2025       dht_task = GNUNET_SCHEDULER_NO_TASK;
2026     }
2027   while (client_list != NULL)
2028     handle_client_disconnect (NULL,
2029                               client_list->client);
2030   cron_flush_trust (NULL, NULL);
2031   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2032                                          &clean_peer,
2033                                          NULL);
2034   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
2035   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
2036   requests_by_expiration_heap = 0;
2037   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2038   connected_peers = NULL;
2039   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
2040   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2041   query_request_map = NULL;
2042   GNUNET_LOAD_value_free (rt_entry_lifetime);
2043   rt_entry_lifetime = NULL;
2044   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
2045   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2046   peer_request_map = NULL;
2047   GNUNET_assert (NULL != core);
2048   GNUNET_CORE_disconnect (core);
2049   core = NULL;
2050   if (stats != NULL)
2051     {
2052       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
2053       stats = NULL;
2054     }
2055   if (dsh != NULL)
2056     {
2057       GNUNET_DATASTORE_disconnect (dsh,
2058                                    GNUNET_NO);
2059       dsh = NULL;
2060     }
2061   while (mig_head != NULL)
2062     delete_migration_block (mig_head);
2063   GNUNET_assert (0 == mig_size);
2064   GNUNET_DHT_disconnect (dht_handle);
2065   dht_handle = NULL;
2066   GNUNET_LOAD_value_free (datastore_get_load);
2067   datastore_get_load = NULL;
2068   GNUNET_LOAD_value_free (datastore_put_load);
2069   datastore_put_load = NULL;
2070   GNUNET_BLOCK_context_destroy (block_ctx);
2071   block_ctx = NULL;
2072   GNUNET_CONFIGURATION_destroy (block_cfg);
2073   block_cfg = NULL;
2074   cfg = NULL;  
2075   GNUNET_free_non_null (trustDirectory);
2076   trustDirectory = NULL;
2077 }
2078
2079
2080 /* ******************* Utility functions  ******************** */
2081
2082
2083 /**
2084  * We've had to delay a request for transmission to core, but now
2085  * we should be ready.  Run it.
2086  *
2087  * @param cls the 'struct ConnectedPeer' for which a request was delayed
2088  * @param tc task context (unused)
2089  */
2090 static void
2091 delayed_transmission_request (void *cls,
2092                               const struct GNUNET_SCHEDULER_TaskContext *tc)
2093 {
2094   struct ConnectedPeer *cp = cls;
2095   struct GNUNET_PeerIdentity pid;
2096   struct PendingMessage *pm;
2097
2098   pm = cp->pending_messages_head;
2099   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2100   GNUNET_assert (cp->cth == NULL);
2101   if (pm == NULL)
2102     return;
2103   GNUNET_PEER_resolve (cp->pid,
2104                        &pid);
2105   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2106   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2107                                                pm->priority,
2108                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2109                                                &pid,
2110                                                pm->msize,
2111                                                &transmit_to_peer,
2112                                                cp);
2113 }
2114
2115
2116 /**
2117  * Transmit messages by copying it to the target buffer
2118  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2119  * for writing in the meantime.  In that case, do nothing
2120  * (the disconnect or shutdown handler will take care of the rest).
2121  * If we were able to transmit messages and there are still more
2122  * pending, ask core again for further calls to this function.
2123  *
2124  * @param cls closure, pointer to the 'struct ConnectedPeer*'
2125  * @param size number of bytes available in buf
2126  * @param buf where the callee should write the message
2127  * @return number of bytes written to buf
2128  */
2129 static size_t
2130 transmit_to_peer (void *cls,
2131                   size_t size, void *buf)
2132 {
2133   struct ConnectedPeer *cp = cls;
2134   char *cbuf = buf;
2135   struct PendingMessage *pm;
2136   struct PendingMessage *next_pm;
2137   struct GNUNET_TIME_Absolute now;
2138   struct GNUNET_TIME_Relative min_delay;
2139   struct MigrationReadyBlock *mb;
2140   struct MigrationReadyBlock *next;
2141   struct PutMessage migm;
2142   size_t msize;
2143   unsigned int i;
2144   struct GNUNET_PeerIdentity pid;
2145  
2146   cp->cth = NULL;
2147   if (NULL == buf)
2148     {
2149 #if DEBUG_FS
2150       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2151                   "Dropping message, core too busy.\n");
2152 #endif
2153       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2154                   "Dropping message, core too busy.\n");
2155       GNUNET_LOAD_update (cp->transmission_delay,
2156                           UINT64_MAX);
2157       
2158       if (NULL != (pm = cp->pending_messages_head))
2159         {
2160           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2161                                        cp->pending_messages_tail,
2162                                        pm);
2163           cp->pending_requests--;    
2164           destroy_pending_message (pm, 0);
2165         }
2166       if (NULL != (pm = cp->pending_messages_head))
2167         {
2168           GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2169           min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until);
2170           cp->delayed_transmission_request_task
2171             = GNUNET_SCHEDULER_add_delayed (min_delay,
2172                                             &delayed_transmission_request,
2173                                             cp);
2174         }
2175       return 0;
2176     }  
2177   GNUNET_LOAD_update (cp->transmission_delay,
2178                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).rel_value);
2179   now = GNUNET_TIME_absolute_get ();
2180   msize = 0;
2181   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2182   next_pm = cp->pending_messages_head;
2183   while ( (NULL != (pm = next_pm) ) &&
2184           (pm->msize <= size) )
2185     {
2186       next_pm = pm->next;
2187       if (pm->delay_until.abs_value > now.abs_value)
2188         {
2189           min_delay = GNUNET_TIME_relative_min (min_delay,
2190                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2191           continue;
2192         }
2193       memcpy (&cbuf[msize], &pm[1], pm->msize);
2194       msize += pm->msize;
2195       size -= pm->msize;
2196       if (NULL == pm->pml)
2197         {
2198           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2199                                        cp->pending_messages_tail,
2200                                        pm);
2201           cp->pending_requests--;
2202         }
2203       destroy_pending_message (pm, cp->pid);
2204     }
2205   if (pm != NULL)
2206     min_delay = GNUNET_TIME_UNIT_ZERO;
2207   if (NULL != cp->pending_messages_head)
2208     {     
2209       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2210       cp->delayed_transmission_request_task
2211         = GNUNET_SCHEDULER_add_delayed (min_delay,
2212                                         &delayed_transmission_request,
2213                                         cp);
2214     }
2215   if (pm == NULL)
2216     {      
2217       GNUNET_PEER_resolve (cp->pid,
2218                            &pid);
2219       next = mig_head;
2220       while (NULL != (mb = next))
2221         {
2222           next = mb->next;
2223           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2224             {
2225               if ( (cp->pid == mb->target_list[i]) &&
2226                    (mb->size + sizeof (migm) <= size) )
2227                 {
2228                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2229                   mb->target_list[i] = 0;
2230                   mb->used_targets++;
2231                   memset (&migm, 0, sizeof (migm));
2232                   migm.header.size = htons (sizeof (migm) + mb->size);
2233                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2234                   migm.type = htonl (mb->type);
2235                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2236                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2237                   msize += sizeof (migm);
2238                   size -= sizeof (migm);
2239                   memcpy (&cbuf[msize], &mb[1], mb->size);
2240                   msize += mb->size;
2241                   size -= mb->size;
2242 #if DEBUG_FS
2243                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2244                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2245                               GNUNET_h2s (&mb->query),
2246                               (unsigned int) mb->size,
2247                               GNUNET_i2s (&pid));
2248 #endif    
2249                   break;
2250                 }
2251               else
2252                 {
2253 #if DEBUG_FS
2254                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2255                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2256                               GNUNET_h2s (&mb->query),
2257                               (unsigned int) mb->size,
2258                               GNUNET_i2s (&pid));
2259 #endif    
2260                 }
2261             }
2262           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2263                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2264             {
2265               delete_migration_block (mb);
2266               consider_migration_gathering ();
2267             }
2268         }
2269       consider_migration (NULL, 
2270                           &pid.hashPubKey,
2271                           cp);
2272     }
2273 #if DEBUG_FS
2274   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2275               "Transmitting %u bytes to peer with PID %u\n",
2276               (unsigned int) msize,
2277               (unsigned int) cp->pid);
2278 #endif
2279   return msize;
2280 }
2281
2282
2283 /**
2284  * Add a message to the set of pending messages for the given peer.
2285  *
2286  * @param cp peer to send message to
2287  * @param pm message to queue
2288  * @param pr request on which behalf this message is being queued
2289  */
2290 static void
2291 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2292                                   struct PendingMessage *pm,
2293                                   struct PendingRequest *pr)
2294 {
2295   struct PendingMessage *pos;
2296   struct PendingMessageList *pml;
2297   struct GNUNET_PeerIdentity pid;
2298
2299   GNUNET_assert (pm->next == NULL);
2300   GNUNET_assert (pm->pml == NULL);    
2301   if (pr != NULL)
2302     {
2303       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2304       pml->req = pr;
2305       pml->target = cp;
2306       pml->pm = pm;
2307       pm->pml = pml;  
2308       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2309                                    pr->pending_tail,
2310                                    pml);
2311     }
2312   pos = cp->pending_messages_head;
2313   while ( (pos != NULL) &&
2314           (pm->priority < pos->priority) )
2315     pos = pos->next;    
2316   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2317                                      cp->pending_messages_tail,
2318                                      pos,
2319                                      pm);
2320   cp->pending_requests++;
2321   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2322     {
2323       GNUNET_STATISTICS_update (stats,
2324                                 gettext_noop ("# P2P searches discarded (queue length bound)"),
2325                                 1,
2326                                 GNUNET_NO);
2327       destroy_pending_message (cp->pending_messages_tail, 0);  
2328     }
2329   GNUNET_PEER_resolve (cp->pid, &pid);
2330   if (NULL != cp->cth)
2331     {
2332       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2333       cp->cth = NULL;
2334     }
2335   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2336     {
2337       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
2338       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2339     }
2340   /* need to schedule transmission */
2341   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2342   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2343                                                cp->pending_messages_head->priority,
2344                                                MAX_TRANSMIT_DELAY,
2345                                                &pid,
2346                                                cp->pending_messages_head->msize,
2347                                                &transmit_to_peer,
2348                                                cp);
2349   if (cp->cth == NULL)
2350     {
2351 #if DEBUG_FS
2352       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2353                   "Failed to schedule transmission with core!\n");
2354 #endif
2355       GNUNET_STATISTICS_update (stats,
2356                                 gettext_noop ("# CORE transmission failures"),
2357                                 1,
2358                                 GNUNET_NO);
2359     }
2360 }
2361
2362
2363 /**
2364  * Test if the DATABASE (GET) load on this peer is too high
2365  * to even consider processing the query at
2366  * all.  
2367  * 
2368  * @return GNUNET_YES if the load is too high to do anything (load high)
2369  *         GNUNET_NO to process normally (load normal)
2370  *         GNUNET_SYSERR to process for free (load low)
2371  */
2372 static int
2373 test_get_load_too_high (uint32_t priority)
2374 {
2375   double ld;
2376
2377   ld = GNUNET_LOAD_get_load (datastore_get_load);
2378   if (ld < 1)
2379     return GNUNET_SYSERR;    
2380   if (ld <= priority)    
2381     return GNUNET_NO;    
2382   return GNUNET_YES;
2383 }
2384
2385
2386
2387
2388 /**
2389  * Test if the DATABASE (PUT) load on this peer is too high
2390  * to even consider processing the query at
2391  * all.  
2392  * 
2393  * @return GNUNET_YES if the load is too high to do anything (load high)
2394  *         GNUNET_NO to process normally (load normal or low)
2395  */
2396 static int
2397 test_put_load_too_high (uint32_t priority)
2398 {
2399   double ld;
2400
2401   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2402     return GNUNET_NO; /* very fast */
2403   ld = GNUNET_LOAD_get_load (datastore_put_load);
2404   if (ld < 2.0 * (1 + priority))
2405     return GNUNET_NO;
2406   GNUNET_STATISTICS_update (stats,
2407                             gettext_noop ("# storage requests dropped due to high load"),
2408                             1,
2409                             GNUNET_NO);
2410   return GNUNET_YES;
2411 }
2412
2413
2414 /* ******************* Pending Request Refresh Task ******************** */
2415
2416
2417
2418 /**
2419  * We use a random delay to make the timing of requests less
2420  * predictable.  This function returns such a random delay.  We add a base
2421  * delay of MAX_CORK_DELAY (1s).
2422  *
2423  * FIXME: make schedule dependent on the specifics of the request?
2424  * Or bandwidth and number of connected peers and load?
2425  *
2426  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2427  */
2428 static struct GNUNET_TIME_Relative
2429 get_processing_delay ()
2430 {
2431   return 
2432     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2433                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2434                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2435                                                                                        TTL_DECREMENT)));
2436 }
2437
2438
2439 /**
2440  * We're processing a GET request from another peer and have decided
2441  * to forward it to other peers.  This function is called periodically
2442  * and should forward the request to other peers until we have all
2443  * possible replies.  If we have transmitted the *only* reply to
2444  * the initiator we should destroy the pending request.  If we have
2445  * many replies in the queue to the initiator, we should delay sending
2446  * out more queries until the reply queue has shrunk some.
2447  *
2448  * @param cls our "struct ProcessGetContext *"
2449  * @param tc unused
2450  */
2451 static void
2452 forward_request_task (void *cls,
2453                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2454
2455
2456 /**
2457  * Function called after we either failed or succeeded
2458  * at transmitting a query to a peer.  
2459  *
2460  * @param cls the requests "struct PendingRequest*"
2461  * @param tpid ID of receiving peer, 0 on transmission error
2462  */
2463 static void
2464 transmit_query_continuation (void *cls,
2465                              GNUNET_PEER_Id tpid)
2466 {
2467   struct PendingRequest *pr = cls;
2468   unsigned int i;
2469
2470   GNUNET_STATISTICS_update (stats,
2471                             gettext_noop ("# queries scheduled for forwarding"),
2472                             -1,
2473                             GNUNET_NO);
2474   if (tpid == 0)   
2475     {
2476 #if DEBUG_FS
2477       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2478                   "Transmission of request failed, will try again later.\n");
2479 #endif
2480       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2481         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2482                                                  &forward_request_task,
2483                                                  pr); 
2484       return;    
2485     }
2486 #if DEBUG_FS
2487   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2488               "Transmitted query `%s'\n",
2489               GNUNET_h2s (&pr->query));
2490 #endif
2491   GNUNET_STATISTICS_update (stats,
2492                             gettext_noop ("# queries forwarded"),
2493                             1,
2494                             GNUNET_NO);
2495   for (i=0;i<pr->used_targets_off;i++)
2496     if (pr->used_targets[i].pid == tpid)
2497       break; /* found match! */    
2498   if (i == pr->used_targets_off)
2499     {
2500       /* need to create new entry */
2501       if (pr->used_targets_off == pr->used_targets_size)
2502         GNUNET_array_grow (pr->used_targets,
2503                            pr->used_targets_size,
2504                            pr->used_targets_size * 2 + 2);
2505       GNUNET_PEER_change_rc (tpid, 1);
2506       pr->used_targets[pr->used_targets_off].pid = tpid;
2507       pr->used_targets[pr->used_targets_off].num_requests = 0;
2508       i = pr->used_targets_off++;
2509     }
2510   pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2511   pr->used_targets[i].num_requests++;
2512   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2513     pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2514                                              &forward_request_task,
2515                                              pr);
2516 }
2517
2518
2519 /**
2520  * How many bytes should a bloomfilter be if we have already seen
2521  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2522  * of bits set per entry.  Furthermore, we should not re-size the
2523  * filter too often (to keep it cheap).
2524  *
2525  * Since other peers will also add entries but not resize the filter,
2526  * we should generally pick a slightly larger size than what the
2527  * strict math would suggest.
2528  *
2529  * @return must be a power of two and smaller or equal to 2^15.
2530  */
2531 static size_t
2532 compute_bloomfilter_size (unsigned int entry_count)
2533 {
2534   size_t size;
2535   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2536   uint16_t max = 1 << 15;
2537
2538   if (entry_count > max)
2539     return max;
2540   size = 8;
2541   while ((size < max) && (size < ideal))
2542     size *= 2;
2543   if (size > max)
2544     return max;
2545   return size;
2546 }
2547
2548
2549 /**
2550  * Recalculate our bloom filter for filtering replies.  This function
2551  * will create a new bloom filter from scratch, so it should only be
2552  * called if we have no bloomfilter at all (and hence can create a
2553  * fresh one of minimal size without problems) OR if our peer is the
2554  * initiator (in which case we may resize to larger than mimimum size).
2555  *
2556  * @param pr request for which the BF is to be recomputed
2557  */
2558 static void
2559 refresh_bloomfilter (struct PendingRequest *pr)
2560 {
2561   unsigned int i;
2562   size_t nsize;
2563   GNUNET_HashCode mhash;
2564
2565   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2566   if (nsize == pr->bf_size)
2567     return; /* size not changed */
2568   if (pr->bf != NULL)
2569     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2570   pr->bf_size = nsize;
2571   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2572   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2573                                               pr->bf_size,
2574                                               BLOOMFILTER_K);
2575   for (i=0;i<pr->replies_seen_off;i++)
2576     {
2577       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2578                                 pr->mingle,
2579                                 &mhash);
2580       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2581     }
2582 }
2583
2584
2585 /**
2586  * Function called after we've tried to reserve a certain amount of
2587  * bandwidth for a reply.  Check if we succeeded and if so send our
2588  * query.
2589  *
2590  * @param cls the requests "struct PendingRequest*"
2591  * @param peer identifies the peer
2592  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2593  * @param amount set to the amount that was actually reserved or unreserved
2594  * @param preference current traffic preference for the given peer
2595  */
2596 static void
2597 target_reservation_cb (void *cls,
2598                        const struct
2599                        GNUNET_PeerIdentity * peer,
2600                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2601                        int amount,
2602                        uint64_t preference)
2603 {
2604   struct PendingRequest *pr = cls;
2605   struct ConnectedPeer *cp;
2606   struct PendingMessage *pm;
2607   struct GetMessage *gm;
2608   GNUNET_HashCode *ext;
2609   char *bfdata;
2610   size_t msize;
2611   unsigned int k;
2612   int no_route;
2613   uint32_t bm;
2614   unsigned int i;
2615
2616   /* (3) transmit, update ttl/priority */
2617   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2618                                           &peer->hashPubKey);
2619   if (cp == NULL)
2620     {
2621       /* Peer must have just left */
2622 #if DEBUG_FS
2623       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2624                   "Selected peer disconnected!\n");
2625 #endif
2626       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2627         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2628                                                  &forward_request_task,
2629                                                  pr);
2630       return;
2631     }
2632   cp->irc = NULL;
2633   pr->pirc = NULL;
2634   if (peer == NULL)
2635     {
2636       /* error in communication with core, try again later */
2637       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2638         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2639                                                  &forward_request_task,
2640                                                  pr);
2641       return;
2642     }
2643   no_route = GNUNET_NO;
2644   if (amount == 0)
2645     {
2646       if (pr->cp == NULL)
2647         {
2648 #if DEBUG_FS > 1
2649           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2650                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2651                       amount,
2652                       DBLOCK_SIZE);
2653 #endif
2654           GNUNET_STATISTICS_update (stats,
2655                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2656                                     1,
2657                                     GNUNET_NO);
2658           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2659             pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2660                                                      &forward_request_task,
2661                                                      pr);
2662           return;  /* this target round failed */
2663         }
2664       no_route = GNUNET_YES;
2665     }
2666   
2667   GNUNET_STATISTICS_update (stats,
2668                             gettext_noop ("# queries scheduled for forwarding"),
2669                             1,
2670                             GNUNET_NO);
2671   for (i=0;i<pr->used_targets_off;i++)
2672     if (pr->used_targets[i].pid == cp->pid) 
2673       {
2674         GNUNET_STATISTICS_update (stats,
2675                                   gettext_noop ("# queries retransmitted to same target"),
2676                                   1,
2677                                   GNUNET_NO);
2678         break;
2679       } 
2680
2681   /* build message and insert message into priority queue */
2682 #if DEBUG_FS
2683   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2684               "Forwarding request `%s' to `%4s'!\n",
2685               GNUNET_h2s (&pr->query),
2686               GNUNET_i2s (peer));
2687 #endif
2688   k = 0;
2689   bm = 0;
2690   if (GNUNET_YES == no_route)
2691     {
2692       bm |= GET_MESSAGE_BIT_RETURN_TO;
2693       k++;      
2694     }
2695   if (pr->namespace != NULL)
2696     {
2697       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2698       k++;
2699     }
2700   if (pr->target_pid != 0)
2701     {
2702       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2703       k++;
2704     }
2705   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2706   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2707   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2708   pm->msize = msize;
2709   gm = (struct GetMessage*) &pm[1];
2710   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2711   gm->header.size = htons (msize);
2712   gm->type = htonl (pr->type);
2713   pr->remaining_priority /= 2;
2714   gm->priority = htonl (pr->remaining_priority);
2715   gm->ttl = htonl (pr->ttl);
2716   gm->filter_mutator = htonl(pr->mingle); 
2717   gm->hash_bitmap = htonl (bm);
2718   gm->query = pr->query;
2719   ext = (GNUNET_HashCode*) &gm[1];
2720   k = 0;
2721   if (GNUNET_YES == no_route)
2722     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2723   if (pr->namespace != NULL)
2724     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2725   if (pr->target_pid != 0)
2726     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2727   bfdata = (char *) &ext[k];
2728   if (pr->bf != NULL)
2729     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2730                                                bfdata,
2731                                                pr->bf_size);
2732   pm->cont = &transmit_query_continuation;
2733   pm->cont_cls = pr;
2734   cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2735   add_to_pending_messages_for_peer (cp, pm, pr);
2736 }
2737
2738
2739 /**
2740  * Closure used for "target_peer_select_cb".
2741  */
2742 struct PeerSelectionContext 
2743 {
2744   /**
2745    * The request for which we are selecting
2746    * peers.
2747    */
2748   struct PendingRequest *pr;
2749
2750   /**
2751    * Current "prime" target.
2752    */
2753   struct GNUNET_PeerIdentity target;
2754
2755   /**
2756    * How much do we like this target?
2757    */
2758   double target_score;
2759
2760   /**
2761    * Does it make sense to we re-try quickly again?
2762    */
2763   int fast_retry;
2764
2765 };
2766
2767
2768 /**
2769  * Function called for each connected peer to determine
2770  * which one(s) would make good targets for forwarding.
2771  *
2772  * @param cls closure (struct PeerSelectionContext)
2773  * @param key current key code (peer identity)
2774  * @param value value in the hash map (struct ConnectedPeer)
2775  * @return GNUNET_YES if we should continue to
2776  *         iterate,
2777  *         GNUNET_NO if not.
2778  */
2779 static int
2780 target_peer_select_cb (void *cls,
2781                        const GNUNET_HashCode * key,
2782                        void *value)
2783 {
2784   struct PeerSelectionContext *psc = cls;
2785   struct ConnectedPeer *cp = value;
2786   struct PendingRequest *pr = psc->pr;
2787   struct GNUNET_TIME_Relative delay;
2788   double score;
2789   unsigned int i;
2790   unsigned int pc;
2791
2792   /* 1) check that this peer is not the initiator */
2793   if (cp == pr->cp)     
2794     {
2795 #if DEBUG_FS
2796       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2797                   "Skipping initiator in forwarding selection\n");
2798 #endif
2799       return GNUNET_YES; /* skip */        
2800     }
2801   if (cp->irc != NULL)
2802     {
2803       psc->fast_retry = GNUNET_YES;
2804       return GNUNET_YES; /* skip: already querying core about this peer for other reasons */
2805     }
2806
2807   /* 2) check if we have already (recently) forwarded to this peer */
2808   /* 2a) this particular request */
2809   pc = 0;
2810   for (i=0;i<pr->used_targets_off;i++)
2811     if (pr->used_targets[i].pid == cp->pid) 
2812       {
2813         pc = pr->used_targets[i].num_requests;
2814         GNUNET_assert (pc > 0);
2815         /* FIXME: make re-enabling a peer independent of how often
2816            this function is called??? */
2817         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2818                                            RETRY_PROBABILITY_INV * pc))
2819           {
2820 #if DEBUG_FS
2821             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2822                         "NOT re-trying query that was previously transmitted %u times\n",
2823                         (unsigned int) pc);
2824 #endif
2825             return GNUNET_YES; /* skip */
2826           }
2827         break;
2828       }
2829 #if DEBUG_FS
2830   if (0 < pc)
2831     {
2832       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2833                   "Re-trying query that was previously transmitted %u times to this peer\n",
2834                   (unsigned int) pc);
2835     }
2836 #endif
2837   /* 2b) many other requests to this peer */
2838   delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2839   if (delay.rel_value <= cp->avg_delay.rel_value)
2840     {
2841 #if DEBUG_FS
2842       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2843                   "NOT sending query since we send %u others to this peer in the last %llums\n",
2844                   MAX_QUEUE_PER_PEER,
2845                   cp->avg_delay.rel_value);
2846 #endif
2847       return GNUNET_YES; /* skip */      
2848     }
2849
2850   /* 3) calculate how much we'd like to forward to this peer,
2851      starting with a random value that is strong enough
2852      to at least give any peer a chance sometimes 
2853      (compared to the other factors that come later) */
2854   /* 3a) count successful (recent) routes from cp for same source */
2855   if (pr->cp != NULL)
2856     {
2857       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2858                                         P2P_SUCCESS_LIST_SIZE);
2859       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2860         if (cp->last_p2p_replies[i] == pr->cp->pid)
2861           score += 1.0; /* likely successful based on hot path */
2862     }
2863   else
2864     {
2865       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2866                                         CS2P_SUCCESS_LIST_SIZE);
2867       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2868         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2869           score += 1.0; /* likely successful based on hot path */
2870     }
2871   /* 3b) include latency */
2872   if (cp->avg_delay.rel_value < 4 * TTL_DECREMENT)
2873     score += 1.0; /* likely fast based on latency */
2874   /* 3c) include priorities */
2875   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2876     score += 1.0; /* likely successful based on priorities */
2877   /* 3d) penalize for queue size */  
2878   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2879   /* 3e) include peer proximity */
2880   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2881                                                     &pr->query)) / (double) UINT32_MAX);
2882   /* 4) super-bonus for being the known target */
2883   if (pr->target_pid == cp->pid)
2884     score += 100.0;
2885   /* store best-fit in closure */
2886   score++; /* avoid zero */
2887   if (score > psc->target_score)
2888     {
2889       psc->target_score = score;
2890       psc->target.hashPubKey = *key; 
2891     }
2892 #if DEBUG_FS
2893   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2894               "Peer `%s' gets score %f for forwarding query, max is %8f\n",
2895               GNUNET_h2s (key),
2896               score,
2897               psc->target_score);
2898 #endif
2899   return GNUNET_YES;
2900 }
2901   
2902
2903 /**
2904  * The priority level imposes a bound on the maximum
2905  * value for the ttl that can be requested.
2906  *
2907  * @param ttl_in requested ttl
2908  * @param prio given priority
2909  * @return ttl_in if ttl_in is below the limit,
2910  *         otherwise the ttl-limit for the given priority
2911  */
2912 static int32_t
2913 bound_ttl (int32_t ttl_in, uint32_t prio)
2914 {
2915   unsigned long long allowed;
2916
2917   if (ttl_in <= 0)
2918     return ttl_in;
2919   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2920   if (ttl_in > allowed)      
2921     {
2922       if (allowed >= (1 << 30))
2923         return 1 << 30;
2924       return allowed;
2925     }
2926   return ttl_in;
2927 }
2928
2929
2930 /**
2931  * Iterator called on each result obtained for a DHT
2932  * operation that expects a reply
2933  *
2934  * @param cls closure
2935  * @param exp when will this value expire
2936  * @param key key of the result
2937  * @param get_path NULL-terminated array of pointers
2938  *                 to the peers on reverse GET path (or NULL if not recorded)
2939  * @param put_path NULL-terminated array of pointers
2940  *                 to the peers on the PUT path (or NULL if not recorded)
2941  * @param type type of the result
2942  * @param size number of bytes in data
2943  * @param data pointer to the result data
2944  */
2945 static void
2946 process_dht_reply (void *cls,
2947                    struct GNUNET_TIME_Absolute exp,
2948                    const GNUNET_HashCode * key,
2949                    const struct GNUNET_PeerIdentity * const *get_path,
2950                    const struct GNUNET_PeerIdentity * const *put_path,
2951                    enum GNUNET_BLOCK_Type type,
2952                    size_t size,
2953                    const void *data);
2954
2955
2956 /**
2957  * We're processing a GET request and have decided
2958  * to forward it to other peers.  This function is called periodically
2959  * and should forward the request to other peers until we have all
2960  * possible replies.  If we have transmitted the *only* reply to
2961  * the initiator we should destroy the pending request.  If we have
2962  * many replies in the queue to the initiator, we should delay sending
2963  * out more queries until the reply queue has shrunk some.
2964  *
2965  * @param cls our "struct ProcessGetContext *"
2966  * @param tc unused
2967  */
2968 static void
2969 forward_request_task (void *cls,
2970                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2971 {
2972   struct PendingRequest *pr = cls;
2973   struct PeerSelectionContext psc;
2974   struct ConnectedPeer *cp; 
2975   struct GNUNET_TIME_Relative delay;
2976
2977   pr->task = GNUNET_SCHEDULER_NO_TASK;
2978   if (pr->pirc != NULL)
2979     {
2980 #if DEBUG_FS
2981       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2982                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2983                   GNUNET_h2s (&pr->query));
2984 #endif
2985       return; /* already pending */
2986     }
2987   if (GNUNET_YES == pr->local_only)
2988     return; /* configured to not do P2P search */
2989   /* (0) try DHT */
2990   if ( (0 == pr->anonymity_level) &&
2991        (GNUNET_YES != pr->forward_only) &&
2992        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2993        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2994     {
2995       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2996                                           GNUNET_TIME_UNIT_FOREVER_REL,
2997                                           pr->type,
2998                                           &pr->query,
2999                                           DEFAULT_GET_REPLICATION,
3000                                           GNUNET_DHT_RO_NONE,
3001                                           pr->bf,
3002                                           pr->mingle,
3003                                           pr->namespace,
3004                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3005                                           &process_dht_reply,
3006                                           pr);
3007     }
3008   /* (1) select target */
3009   psc.pr = pr;
3010   psc.target_score = -DBL_MAX;
3011   psc.fast_retry = GNUNET_NO;
3012   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
3013                                          &target_peer_select_cb,
3014                                          &psc);  
3015   if (psc.target_score == -DBL_MAX)
3016     {
3017       if (psc.fast_retry == GNUNET_YES)
3018         delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */
3019       else
3020         delay = get_processing_delay ();
3021 #if DEBUG_FS 
3022       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3023                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
3024                   GNUNET_h2s (&pr->query),
3025                   delay.rel_value);
3026 #endif
3027       pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3028                                                &forward_request_task,
3029                                                pr);
3030       return; /* nobody selected */
3031     }
3032   /* (3) update TTL/priority */
3033   if (pr->client_request_list != NULL)
3034     {
3035       /* FIXME: use better algorithm!? */
3036       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3037                                          4))
3038         pr->priority++;
3039       /* bound priority we use by priorities we see from other peers
3040          rounded up (must round up so that we can see non-zero
3041          priorities, but round up as little as possible to make it
3042          plausible that we forwarded another peers request) */
3043       if (pr->priority > current_priorities + 1.0)
3044         pr->priority = (uint32_t) current_priorities + 1.0;
3045       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
3046                            pr->priority);
3047 #if DEBUG_FS
3048       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3049                   "Trying query `%s' with priority %u and TTL %d.\n",
3050                   GNUNET_h2s (&pr->query),
3051                   pr->priority,
3052                   pr->ttl);
3053 #endif
3054     }
3055
3056   /* (3) reserve reply bandwidth */
3057   if (GNUNET_NO == pr->forward_only)
3058     {
3059       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3060                                               &psc.target.hashPubKey);
3061       GNUNET_assert (NULL != cp);
3062       GNUNET_assert (cp->irc == NULL);
3063       pr->pirc = cp;
3064       cp->pr = pr;
3065       cp->irc = GNUNET_CORE_peer_change_preference (core,
3066                                                     &psc.target,
3067                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
3068                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
3069                                                     DBLOCK_SIZE * 2, 
3070                                                     cp->inc_preference,
3071                                                     &target_reservation_cb,
3072                                                     pr);
3073       GNUNET_assert (cp->irc != NULL);
3074       cp->inc_preference = 0;
3075     }
3076   else
3077     {
3078       /* force forwarding */
3079       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
3080       target_reservation_cb (pr, &psc.target,
3081                              zerobw, 0, 0.0);
3082     }
3083 }
3084
3085
3086 /* **************************** P2P PUT Handling ************************ */
3087
3088
3089 /**
3090  * Function called after we either failed or succeeded
3091  * at transmitting a reply to a peer.  
3092  *
3093  * @param cls the requests "struct PendingRequest*"
3094  * @param tpid ID of receiving peer, 0 on transmission error
3095  */
3096 static void
3097 transmit_reply_continuation (void *cls,
3098                              GNUNET_PEER_Id tpid)
3099 {
3100   struct PendingRequest *pr = cls;
3101   
3102   switch (pr->type)
3103     {
3104     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3105     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3106       /* only one reply expected, done with the request! */
3107       destroy_pending_request (pr);
3108       break;
3109     case GNUNET_BLOCK_TYPE_ANY:
3110     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
3111     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3112       break;
3113     default:
3114       GNUNET_break (0);
3115       break;
3116     }
3117 }
3118
3119
3120 /**
3121  * Transmit the given message by copying it to the target buffer
3122  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
3123  * for writing in the meantime.  In that case, do nothing
3124  * (the disconnect or shutdown handler will take care of the rest).
3125  * If we were able to transmit messages and there are still more
3126  * pending, ask core again for further calls to this function.
3127  *
3128  * @param cls closure, pointer to the 'struct ClientList*'
3129  * @param size number of bytes available in buf
3130  * @param buf where the callee should write the message
3131  * @return number of bytes written to buf
3132  */
3133 static size_t
3134 transmit_to_client (void *cls,
3135                   size_t size, void *buf)
3136 {
3137   struct ClientList *cl = cls;
3138   char *cbuf = buf;
3139   struct ClientResponseMessage *creply;
3140   size_t msize;
3141   
3142   cl->th = NULL;
3143   if (NULL == buf)
3144     {
3145 #if DEBUG_FS
3146       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3147                   "Not sending reply, client communication problem.\n");
3148 #endif
3149       return 0;
3150     }
3151   msize = 0;
3152   while ( (NULL != (creply = cl->res_head) ) &&
3153           (creply->msize <= size) )
3154     {
3155       memcpy (&cbuf[msize], &creply[1], creply->msize);
3156       msize += creply->msize;
3157       size -= creply->msize;
3158       GNUNET_CONTAINER_DLL_remove (cl->res_head,
3159                                    cl->res_tail,
3160                                    creply);
3161       GNUNET_free (creply);
3162     }
3163   if (NULL != creply)
3164     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3165                                                   creply->msize,
3166                                                   GNUNET_TIME_UNIT_FOREVER_REL,
3167                                                   &transmit_to_client,
3168                                                   cl);
3169 #if DEBUG_FS
3170   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3171               "Transmitted %u bytes to client\n",
3172               (unsigned int) msize);
3173 #endif
3174   return msize;
3175 }
3176
3177
3178 /**
3179  * Closure for "process_reply" function.
3180  */
3181 struct ProcessReplyClosure
3182 {
3183   /**
3184    * The data for the reply.
3185    */
3186   const void *data;
3187
3188   /**
3189    * Who gave us this reply? NULL for local host (or DHT)
3190    */
3191   struct ConnectedPeer *sender;
3192
3193   /**
3194    * When the reply expires.
3195    */
3196   struct GNUNET_TIME_Absolute expiration;
3197
3198   /**
3199    * Size of data.
3200    */
3201   size_t size;
3202
3203   /**
3204    * Type of the block.
3205    */
3206   enum GNUNET_BLOCK_Type type;
3207
3208   /**
3209    * How much was this reply worth to us?
3210    */
3211   uint32_t priority;
3212
3213   /**
3214    * Evaluation result (returned).
3215    */
3216   enum GNUNET_BLOCK_EvaluationResult eval;
3217
3218   /**
3219    * Did we finish processing the associated request?
3220    */ 
3221   int finished;
3222
3223   /**
3224    * Did we find a matching request?
3225    */
3226   int request_found;
3227 };
3228
3229
3230 /**
3231  * We have received a reply; handle it!
3232  *
3233  * @param cls response (struct ProcessReplyClosure)
3234  * @param key our query
3235  * @param value value in the hash map (info about the query)
3236  * @return GNUNET_YES (we should continue to iterate)
3237  */
3238 static int
3239 process_reply (void *cls,
3240                const GNUNET_HashCode * key,
3241                void *value)
3242 {
3243   struct ProcessReplyClosure *prq = cls;
3244   struct PendingRequest *pr = value;
3245   struct PendingMessage *reply;
3246   struct ClientResponseMessage *creply;
3247   struct ClientList *cl;
3248   struct PutMessage *pm;
3249   struct ConnectedPeer *cp;
3250   struct GNUNET_TIME_Relative cur_delay;
3251 #if SUPPORT_DELAYS  
3252 struct GNUNET_TIME_Relative art_delay;
3253 #endif
3254   size_t msize;
3255   unsigned int i;
3256
3257 #if DEBUG_FS
3258   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3259               "Matched result (type %u) for query `%s' with pending request\n",
3260               (unsigned int) prq->type,
3261               GNUNET_h2s (key));
3262 #endif  
3263   GNUNET_STATISTICS_update (stats,
3264                             gettext_noop ("# replies received and matched"),
3265                             1,
3266                             GNUNET_NO);
3267   if (prq->sender != NULL)
3268     {
3269       for (i=0;i<pr->used_targets_off;i++)
3270         if (pr->used_targets[i].pid == prq->sender->pid)
3271           break;
3272       if (i < pr->used_targets_off)
3273         {
3274           cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
3275           prq->sender->avg_delay.rel_value
3276             = (prq->sender->avg_delay.rel_value * 
3277                (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
3278           prq->sender->avg_priority
3279             = (prq->sender->avg_priority * 
3280                (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3281         }
3282       if (pr->cp != NULL)
3283         {
3284           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3285                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3286                                  -1);
3287           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3288           prq->sender->last_p2p_replies
3289             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3290             = pr->cp->pid;
3291         }
3292       else
3293         {
3294           if (NULL != prq->sender->last_client_replies
3295               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3296             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3297                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3298           prq->sender->last_client_replies
3299             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3300             = pr->client_request_list->client_list->client;
3301           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3302         }
3303     }
3304   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3305                                      prq->type,
3306                                      key,
3307                                      &pr->bf,
3308                                      pr->mingle,
3309                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3310                                      prq->data,
3311                                      prq->size);
3312   switch (prq->eval)
3313     {
3314     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3315       break;
3316     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3317       while (NULL != pr->pending_head)
3318         destroy_pending_message_list_entry (pr->pending_head);
3319       if (pr->qe != NULL)
3320         {
3321           if (pr->client_request_list != NULL)
3322             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3323                                         GNUNET_YES);
3324           GNUNET_DATASTORE_cancel (pr->qe);
3325           pr->qe = NULL;
3326         }
3327       pr->do_remove = GNUNET_YES;
3328       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3329         {
3330           GNUNET_SCHEDULER_cancel (pr->task);
3331           pr->task = GNUNET_SCHEDULER_NO_TASK;
3332         }
3333       GNUNET_break (GNUNET_YES ==
3334                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3335                                                           key,
3336                                                           pr));
3337       GNUNET_LOAD_update (rt_entry_lifetime,
3338                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
3339       break;
3340     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3341       GNUNET_STATISTICS_update (stats,
3342                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3343                                 1,
3344                                 GNUNET_NO);
3345 #if DEBUG_FS
3346 /*      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3347                   "Duplicate response `%s', discarding.\n",
3348                   GNUNET_h2s (&mhash));*/
3349 #endif
3350       return GNUNET_YES; /* duplicate */
3351     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3352       return GNUNET_YES; /* wrong namespace */  
3353     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3354       GNUNET_break (0);
3355       return GNUNET_YES;
3356     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3357       GNUNET_break (0);
3358       return GNUNET_YES;
3359     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3360       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3361                   _("Unsupported block type %u\n"),
3362                   prq->type);
3363       return GNUNET_NO;
3364     }
3365   if (pr->client_request_list != NULL)
3366     {
3367       if (pr->replies_seen_size == pr->replies_seen_off)
3368         GNUNET_array_grow (pr->replies_seen,
3369                            pr->replies_seen_size,
3370                            pr->replies_seen_size * 2 + 4);      
3371       GNUNET_CRYPTO_hash (prq->data,
3372                           prq->size,
3373                           &pr->replies_seen[pr->replies_seen_off++]);         
3374       refresh_bloomfilter (pr);
3375     }
3376   if (NULL == prq->sender)
3377     {
3378 #if DEBUG_FS
3379       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3380                   "Found result for query `%s' in local datastore\n",
3381                   GNUNET_h2s (key));
3382 #endif
3383       GNUNET_STATISTICS_update (stats,
3384                                 gettext_noop ("# results found locally"),
3385                                 1,
3386                                 GNUNET_NO);      
3387     }
3388   prq->priority += pr->remaining_priority;
3389   pr->remaining_priority = 0;
3390   pr->results_found++;
3391   prq->request_found = GNUNET_YES;
3392   if (NULL != pr->client_request_list)
3393     {
3394       GNUNET_STATISTICS_update (stats,
3395                                 gettext_noop ("# replies received for local clients"),
3396                                 1,
3397                                 GNUNET_NO);
3398       cl = pr->client_request_list->client_list;
3399       msize = sizeof (struct PutMessage) + prq->size;
3400       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3401       creply->msize = msize;
3402       creply->client_list = cl;
3403       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3404                                          cl->res_tail,
3405                                          cl->res_tail,
3406                                          creply);      
3407       pm = (struct PutMessage*) &creply[1];
3408       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3409       pm->header.size = htons (msize);
3410       pm->type = htonl (prq->type);
3411       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3412       memcpy (&pm[1], prq->data, prq->size);      
3413       if (NULL == cl->th)
3414         {
3415 #if DEBUG_FS
3416           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3417                       "Transmitting result for query `%s' to client\n",
3418                       GNUNET_h2s (key));
3419 #endif  
3420           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3421                                                         msize,
3422                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3423                                                         &transmit_to_client,
3424                                                         cl);
3425         }
3426       GNUNET_break (cl->th != NULL);
3427       if (pr->do_remove)                
3428         {
3429           prq->finished = GNUNET_YES;
3430           destroy_pending_request (pr);         
3431         }
3432     }
3433   else
3434     {
3435       cp = pr->cp;
3436 #if DEBUG_FS
3437       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3438                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3439                   GNUNET_h2s (key),
3440                   (unsigned int) cp->pid);
3441 #endif  
3442       GNUNET_STATISTICS_update (stats,
3443                                 gettext_noop ("# replies received for other peers"),
3444                                 1,
3445                                 GNUNET_NO);
3446       msize = sizeof (struct PutMessage) + prq->size;
3447       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3448       reply->cont = &transmit_reply_continuation;
3449       reply->cont_cls = pr;
3450 #if SUPPORT_DELAYS
3451       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3452                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3453                                                                            TTL_DECREMENT));
3454       reply->delay_until 
3455         = GNUNET_TIME_relative_to_absolute (art_delay);
3456       GNUNET_STATISTICS_update (stats,
3457                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3458                                 art_delay.abs_value,
3459                                 GNUNET_NO);
3460 #endif
3461       reply->msize = msize;
3462       reply->priority = UINT32_MAX; /* send replies first! */
3463       pm = (struct PutMessage*) &reply[1];
3464       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3465       pm->header.size = htons (msize);
3466       pm->type = htonl (prq->type);
3467       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3468       memcpy (&pm[1], prq->data, prq->size);
3469       add_to_pending_messages_for_peer (cp, reply, pr);
3470     }
3471   return GNUNET_YES;
3472 }
3473
3474
3475 /**
3476  * Iterator called on each result obtained for a DHT
3477  * operation that expects a reply
3478  *
3479  * @param cls closure
3480  * @param exp when will this value expire
3481  * @param key key of the result
3482  * @param get_path NULL-terminated array of pointers
3483  *                 to the peers on reverse GET path (or NULL if not recorded)
3484  * @param put_path NULL-terminated array of pointers
3485  *                 to the peers on the PUT path (or NULL if not recorded)
3486  * @param type type of the result
3487  * @param size number of bytes in data
3488  * @param data pointer to the result data
3489  */
3490 static void
3491 process_dht_reply (void *cls,
3492                    struct GNUNET_TIME_Absolute exp,
3493                    const GNUNET_HashCode * key,
3494                    const struct GNUNET_PeerIdentity * const *get_path,
3495                    const struct GNUNET_PeerIdentity * const *put_path,
3496                    enum GNUNET_BLOCK_Type type,
3497                    size_t size,
3498                    const void *data)
3499 {
3500   struct PendingRequest *pr = cls;
3501   struct ProcessReplyClosure prq;
3502
3503   memset (&prq, 0, sizeof (prq));
3504   prq.data = data;
3505   prq.expiration = exp;
3506   prq.size = size;  
3507   prq.type = type;
3508   process_reply (&prq, key, pr);
3509 }
3510
3511
3512
3513 /**
3514  * Continuation called to notify client about result of the
3515  * operation.
3516  *
3517  * @param cls closure
3518  * @param success GNUNET_SYSERR on failure
3519  * @param msg NULL on success, otherwise an error message
3520  */
3521 static void 
3522 put_migration_continuation (void *cls,
3523                             int success,
3524                             const char *msg)
3525 {
3526   struct GNUNET_TIME_Absolute *start = cls;
3527   struct GNUNET_TIME_Relative delay;
3528   
3529   delay = GNUNET_TIME_absolute_get_duration (*start);
3530   GNUNET_free (start);
3531   GNUNET_LOAD_update (datastore_put_load,
3532                       delay.rel_value);
3533   if (GNUNET_OK == success)
3534     return;
3535   GNUNET_STATISTICS_update (stats,
3536                             gettext_noop ("# datastore 'put' failures"),
3537                             1,
3538                             GNUNET_NO);
3539 }
3540
3541
3542 /**
3543  * Handle P2P "PUT" message.
3544  *
3545  * @param cls closure, always NULL
3546  * @param other the other peer involved (sender or receiver, NULL
3547  *        for loopback messages where we are both sender and receiver)
3548  * @param message the actual message
3549  * @param atsi performance information
3550  * @return GNUNET_OK to keep the connection open,
3551  *         GNUNET_SYSERR to close it (signal serious error)
3552  */
3553 static int
3554 handle_p2p_put (void *cls,
3555                 const struct GNUNET_PeerIdentity *other,
3556                 const struct GNUNET_MessageHeader *message,
3557                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3558 {
3559   const struct PutMessage *put;
3560   uint16_t msize;
3561   size_t dsize;
3562   enum GNUNET_BLOCK_Type type;
3563   struct GNUNET_TIME_Absolute expiration;
3564   GNUNET_HashCode query;
3565   struct ProcessReplyClosure prq;
3566   struct GNUNET_TIME_Absolute *start;
3567   struct GNUNET_TIME_Relative block_time;  
3568   double putl;
3569   struct ConnectedPeer *cp; 
3570   struct PendingMessage *pm;
3571   struct MigrationStopMessage *msm;
3572
3573   msize = ntohs (message->size);
3574   if (msize < sizeof (struct PutMessage))
3575     {
3576       GNUNET_break_op(0);
3577       return GNUNET_SYSERR;
3578     }
3579   put = (const struct PutMessage*) message;
3580   dsize = msize - sizeof (struct PutMessage);
3581   type = ntohl (put->type);
3582   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3583
3584   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3585     return GNUNET_SYSERR;
3586   if (GNUNET_OK !=
3587       GNUNET_BLOCK_get_key (block_ctx,
3588                             type,
3589                             &put[1],
3590                             dsize,
3591                             &query))
3592     {
3593       GNUNET_break_op (0);
3594       return GNUNET_SYSERR;
3595     }
3596 #if DEBUG_FS
3597   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3598               "Received result for query `%s' from peer `%4s'\n",
3599               GNUNET_h2s (&query),
3600               GNUNET_i2s (other));
3601 #endif
3602   GNUNET_STATISTICS_update (stats,
3603                             gettext_noop ("# replies received (overall)"),
3604                             1,
3605                             GNUNET_NO);
3606   /* now, lookup 'query' */
3607   prq.data = (const void*) &put[1];
3608   if (other != NULL)
3609     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3610                                                     &other->hashPubKey);
3611   else
3612     prq.sender = NULL;
3613   prq.size = dsize;
3614   prq.type = type;
3615   prq.expiration = expiration;
3616   prq.priority = 0;
3617   prq.finished = GNUNET_NO;
3618   prq.request_found = GNUNET_NO;
3619   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3620                                               &query,
3621                                               &process_reply,
3622                                               &prq);
3623   if (prq.sender != NULL)
3624     {
3625       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3626       change_host_trust (prq.sender, prq.priority);
3627     }
3628   if ( (GNUNET_YES == active_to_migration) &&
3629        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3630     {      
3631 #if DEBUG_FS
3632       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3633                   "Replicating result for query `%s' with priority %u\n",
3634                   GNUNET_h2s (&query),
3635                   prq.priority);
3636 #endif
3637       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3638       *start = GNUNET_TIME_absolute_get ();
3639       GNUNET_DATASTORE_put (dsh,
3640                             0, &query, dsize, &put[1],
3641                             type, prq.priority, 1 /* anonymity */, 
3642                             expiration, 
3643                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3644                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3645                             &put_migration_continuation, 
3646                             start);
3647     }
3648   putl = GNUNET_LOAD_get_load (datastore_put_load);
3649   if ( (GNUNET_NO == prq.request_found) &&
3650        ( (GNUNET_YES != active_to_migration) ||
3651          (putl > 2.5 * (1 + prq.priority)) ) )
3652     {
3653       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3654                                               &other->hashPubKey);
3655       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000)
3656         return GNUNET_OK; /* already blocked */
3657       /* We're too busy; send MigrationStop message! */
3658       if (GNUNET_YES != active_to_migration) 
3659         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3660       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3661                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3662                                                                                    (unsigned int) (60000 * putl * putl)));
3663       
3664       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3665       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3666                           sizeof (struct MigrationStopMessage));
3667       pm->msize = sizeof (struct MigrationStopMessage);
3668       pm->priority = UINT32_MAX;
3669       msm = (struct MigrationStopMessage*) &pm[1];
3670       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3671       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3672       msm->duration = GNUNET_TIME_relative_hton (block_time);
3673       add_to_pending_messages_for_peer (cp,
3674                                         pm,
3675                                         NULL);
3676     }
3677   return GNUNET_OK;
3678 }
3679
3680
3681 /**
3682  * Handle P2P "MIGRATION_STOP" message.
3683  *
3684  * @param cls closure, always NULL
3685  * @param other the other peer involved (sender or receiver, NULL
3686  *        for loopback messages where we are both sender and receiver)
3687  * @param message the actual message
3688  * @param atsi performance information
3689  * @return GNUNET_OK to keep the connection open,
3690  *         GNUNET_SYSERR to close it (signal serious error)
3691  */
3692 static int
3693 handle_p2p_migration_stop (void *cls,
3694                            const struct GNUNET_PeerIdentity *other,
3695                            const struct GNUNET_MessageHeader *message,
3696                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3697 {
3698   struct ConnectedPeer *cp; 
3699   const struct MigrationStopMessage *msm;
3700
3701   msm = (const struct MigrationStopMessage*) message;
3702   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3703                                           &other->hashPubKey);
3704   if (cp == NULL)
3705     {
3706       GNUNET_break (0);
3707       return GNUNET_OK;
3708     }
3709   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3710   return GNUNET_OK;
3711 }
3712
3713
3714
3715 /* **************************** P2P GET Handling ************************ */
3716
3717
3718 /**
3719  * Closure for 'check_duplicate_request_{peer,client}'.
3720  */
3721 struct CheckDuplicateRequestClosure
3722 {
3723   /**
3724    * The new request we should check if it already exists.
3725    */
3726   const struct PendingRequest *pr;
3727
3728   /**
3729    * Existing request found by the checker, NULL if none.
3730    */
3731   struct PendingRequest *have;
3732 };
3733
3734
3735 /**
3736  * Iterator over entries in the 'query_request_map' that
3737  * tries to see if we have the same request pending from
3738  * the same client already.
3739  *
3740  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3741  * @param key current key code (query, ignored, must match)
3742  * @param value value in the hash map (a 'struct PendingRequest' 
3743  *              that already exists)
3744  * @return GNUNET_YES if we should continue to
3745  *         iterate (no match yet)
3746  *         GNUNET_NO if not (match found).
3747  */
3748 static int
3749 check_duplicate_request_client (void *cls,
3750                                 const GNUNET_HashCode * key,
3751                                 void *value)
3752 {
3753   struct CheckDuplicateRequestClosure *cdc = cls;
3754   struct PendingRequest *have = value;
3755
3756   if (have->client_request_list == NULL)
3757     return GNUNET_YES;
3758   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3759        (cdc->pr != have) )
3760     {
3761       cdc->have = have;
3762       return GNUNET_NO;
3763     }
3764   return GNUNET_YES;
3765 }
3766
3767
3768 /**
3769  * We're processing (local) results for a search request
3770  * from another peer.  Pass applicable results to the
3771  * peer and if we are done either clean up (operation
3772  * complete) or forward to other peers (more results possible).
3773  *
3774  * @param cls our closure (struct LocalGetContext)
3775  * @param key key for the content
3776  * @param size number of bytes in data
3777  * @param data content stored
3778  * @param type type of the content
3779  * @param priority priority of the content
3780  * @param anonymity anonymity-level for the content
3781  * @param expiration expiration time for the content
3782  * @param uid unique identifier for the datum;
3783  *        maybe 0 if no unique identifier is available
3784  */
3785 static void
3786 process_local_reply (void *cls,
3787                      const GNUNET_HashCode * key,
3788                      size_t size,
3789                      const void *data,
3790                      enum GNUNET_BLOCK_Type type,
3791                      uint32_t priority,
3792                      uint32_t anonymity,
3793                      struct GNUNET_TIME_Absolute
3794                      expiration, 
3795                      uint64_t uid)
3796 {
3797   struct PendingRequest *pr = cls;
3798   struct ProcessReplyClosure prq;
3799   struct CheckDuplicateRequestClosure cdrc;
3800   GNUNET_HashCode query;
3801   unsigned int old_rf;
3802   
3803   if (NULL == key)
3804     {
3805 #if DEBUG_FS > 1
3806       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3807                   "Done processing local replies, forwarding request to other peers.\n");
3808 #endif
3809       pr->qe = NULL;
3810       if (pr->client_request_list != NULL)
3811         {
3812           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3813                                       GNUNET_YES);
3814           /* Figure out if this is a duplicate request and possibly
3815              merge 'struct PendingRequest' entries */
3816           cdrc.have = NULL;
3817           cdrc.pr = pr;
3818           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3819                                                       &pr->query,
3820                                                       &check_duplicate_request_client,
3821                                                       &cdrc);
3822           if (cdrc.have != NULL)
3823             {
3824 #if DEBUG_FS
3825               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3826                           "Received request for block `%s' twice from client, will only request once.\n",
3827                           GNUNET_h2s (&pr->query));
3828 #endif
3829               
3830               destroy_pending_request (pr);
3831               return;
3832             }
3833         }
3834       if (pr->local_only == GNUNET_YES)
3835         {
3836           destroy_pending_request (pr);
3837           return;
3838         }
3839       /* no more results */
3840       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3841         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
3842                                              pr);      
3843       return;
3844     }
3845 #if DEBUG_FS
3846   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3847               "New local response to `%s' of type %u.\n",
3848               GNUNET_h2s (key),
3849               type);
3850 #endif
3851   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3852     {
3853 #if DEBUG_FS
3854       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3855                   "Found ONDEMAND block, performing on-demand encoding\n");
3856 #endif
3857       GNUNET_STATISTICS_update (stats,
3858                                 gettext_noop ("# on-demand blocks matched requests"),
3859                                 1,
3860                                 GNUNET_NO);
3861       if (GNUNET_OK != 
3862           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3863                                             anonymity, expiration, uid, 
3864                                             &process_local_reply,
3865                                             pr))
3866       if (pr->qe != NULL)
3867         {
3868           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3869         }
3870       return;
3871     }
3872   old_rf = pr->results_found;
3873   memset (&prq, 0, sizeof (prq));
3874   prq.data = data;
3875   prq.expiration = expiration;
3876   prq.size = size;  
3877   if (GNUNET_OK != 
3878       GNUNET_BLOCK_get_key (block_ctx,
3879                             type,
3880                             data,
3881                             size,
3882                             &query))
3883     {
3884       GNUNET_break (0);
3885       GNUNET_DATASTORE_remove (dsh,
3886                                key,
3887                                size, data,
3888                                -1, -1, 
3889                                GNUNET_TIME_UNIT_FOREVER_REL,
3890                                NULL, NULL);
3891       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3892       return;
3893     }
3894   prq.type = type;
3895   prq.priority = priority;  
3896   prq.finished = GNUNET_NO;
3897   prq.request_found = GNUNET_NO;
3898   if ( (old_rf == 0) &&
3899        (pr->results_found == 0) )
3900     update_datastore_delays (pr->start_time);
3901   process_reply (&prq, key, pr);
3902   if (prq.finished == GNUNET_YES)
3903     return;
3904   if (pr->qe == NULL)
3905     return; /* done here */
3906   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3907     {
3908       pr->local_only = GNUNET_YES; /* do not forward */
3909       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3910       return;
3911     }
3912   if ( (pr->client_request_list == NULL) &&
3913        ( (GNUNET_YES == test_get_load_too_high (0)) ||
3914          (pr->results_found > 5 + 2 * pr->priority) ) )
3915     {
3916 #if DEBUG_FS > 2
3917       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3918                   "Load too high, done with request\n");
3919 #endif
3920       GNUNET_STATISTICS_update (stats,
3921                                 gettext_noop ("# processing result set cut short due to load"),
3922                                 1,
3923                                 GNUNET_NO);
3924       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3925       return;
3926     }
3927   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3928 }
3929
3930
3931 /**
3932  * We've received a request with the specified priority.  Bound it
3933  * according to how much we trust the given peer.
3934  * 
3935  * @param prio_in requested priority
3936  * @param cp the peer making the request
3937  * @return effective priority
3938  */
3939 static int32_t
3940 bound_priority (uint32_t prio_in,
3941                 struct ConnectedPeer *cp)
3942 {
3943 #define N ((double)128.0)
3944   uint32_t ret;
3945   double rret;
3946   int ld;
3947
3948   ld = test_get_load_too_high (0);
3949   if (ld == GNUNET_SYSERR)
3950     {
3951       GNUNET_STATISTICS_update (stats,
3952                                 gettext_noop ("# requests done for free (low load)"),
3953                                 1,
3954                                 GNUNET_NO);
3955       return 0; /* excess resources */
3956     }
3957   if (prio_in > INT32_MAX)
3958     prio_in = INT32_MAX;
3959   ret = - change_host_trust (cp, - (int) prio_in);
3960   if (ret > 0)
3961     {
3962       if (ret > current_priorities + N)
3963         rret = current_priorities + N;
3964       else
3965         rret = ret;
3966       current_priorities 
3967         = (current_priorities * (N-1) + rret)/N;
3968     }
3969   if ( (ld == GNUNET_YES) && (ret > 0) )
3970     {
3971       /* try with charging */
3972       ld = test_get_load_too_high (ret);
3973     }
3974   if (ld == GNUNET_YES)
3975     {
3976       GNUNET_STATISTICS_update (stats,
3977                                 gettext_noop ("# request dropped, priority insufficient"),
3978                                 1,
3979                                 GNUNET_NO);
3980       /* undo charge */
3981       change_host_trust (cp, (int) ret);
3982       return -1; /* not enough resources */
3983     }
3984   else
3985     {
3986       GNUNET_STATISTICS_update (stats,
3987                                 gettext_noop ("# requests done for a price (normal load)"),
3988                                 1,
3989                                 GNUNET_NO);
3990     }
3991 #undef N
3992   return ret;
3993 }
3994
3995
3996 /**
3997  * Iterator over entries in the 'query_request_map' that
3998  * tries to see if we have the same request pending from
3999  * the same peer already.
4000  *
4001  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
4002  * @param key current key code (query, ignored, must match)
4003  * @param value value in the hash map (a 'struct PendingRequest' 
4004  *              that already exists)
4005  * @return GNUNET_YES if we should continue to
4006  *         iterate (no match yet)
4007  *         GNUNET_NO if not (match found).
4008  */
4009 static int
4010 check_duplicate_request_peer (void *cls,
4011                               const GNUNET_HashCode * key,
4012                               void *value)
4013 {
4014   struct CheckDuplicateRequestClosure *cdc = cls;
4015   struct PendingRequest *have = value;
4016
4017   if (cdc->pr->target_pid == have->target_pid)
4018     {
4019       cdc->have = have;
4020       return GNUNET_NO;
4021     }
4022   return GNUNET_YES;
4023 }
4024
4025
4026 /**
4027  * Handle P2P "GET" request.
4028  *
4029  * @param cls closure, always NULL
4030  * @param other the other peer involved (sender or receiver, NULL
4031  *        for loopback messages where we are both sender and receiver)
4032  * @param message the actual message
4033  * @param atsi performance information
4034  * @return GNUNET_OK to keep the connection open,
4035  *         GNUNET_SYSERR to close it (signal serious error)
4036  */
4037 static int
4038 handle_p2p_get (void *cls,
4039                 const struct GNUNET_PeerIdentity *other,
4040                 const struct GNUNET_MessageHeader *message,
4041                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4042 {
4043   struct PendingRequest *pr;
4044   struct ConnectedPeer *cp;
4045   struct ConnectedPeer *cps;
4046   struct CheckDuplicateRequestClosure cdc;
4047   struct GNUNET_TIME_Relative timeout;
4048   uint16_t msize;
4049   const struct GetMessage *gm;
4050   unsigned int bits;
4051   const GNUNET_HashCode *opt;
4052   uint32_t bm;
4053   size_t bfsize;
4054   uint32_t ttl_decrement;
4055   int32_t priority;
4056   enum GNUNET_BLOCK_Type type;
4057   int have_ns;
4058
4059   msize = ntohs(message->size);
4060   if (msize < sizeof (struct GetMessage))
4061     {
4062       GNUNET_break_op (0);
4063       return GNUNET_SYSERR;
4064     }
4065   gm = (const struct GetMessage*) message;
4066 #if DEBUG_FS
4067   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4068               "Received request for `%s'\n",
4069               GNUNET_h2s (&gm->query));
4070 #endif
4071   type = ntohl (gm->type);
4072   bm = ntohl (gm->hash_bitmap);
4073   bits = 0;
4074   while (bm > 0)
4075     {
4076       if (1 == (bm & 1))
4077         bits++;
4078       bm >>= 1;
4079     }
4080   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
4081     {
4082       GNUNET_break_op (0);
4083       return GNUNET_SYSERR;
4084     }  
4085   opt = (const GNUNET_HashCode*) &gm[1];
4086   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
4087   /* bfsize must be power of 2, check! */
4088   if (0 != ( (bfsize - 1) & bfsize))
4089     {
4090       GNUNET_break_op (0);
4091       return GNUNET_SYSERR;
4092     }
4093   bm = ntohl (gm->hash_bitmap);
4094   bits = 0;
4095   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4096                                            &other->hashPubKey);
4097   if (NULL == cps)
4098     {
4099       /* peer must have just disconnected */
4100       GNUNET_STATISTICS_update (stats,
4101                                 gettext_noop ("# requests dropped due to initiator not being connected"),
4102                                 1,
4103                                 GNUNET_NO);
4104       return GNUNET_SYSERR;
4105     }
4106   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4107     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4108                                             &opt[bits++]);
4109   else
4110     cp = cps;
4111   if (cp == NULL)
4112     {
4113 #if DEBUG_FS
4114       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4115         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4116                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
4117                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
4118       
4119       else
4120         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4121                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
4122                     GNUNET_i2s (other));
4123 #endif
4124       GNUNET_STATISTICS_update (stats,
4125                                 gettext_noop ("# requests dropped due to missing reverse route"),
4126                                 1,
4127                                 GNUNET_NO);
4128      /* FIXME: try connect? */
4129       return GNUNET_OK;
4130     }
4131   /* note that we can really only check load here since otherwise
4132      peers could find out that we are overloaded by not being
4133      disconnected after sending us a malformed query... */
4134   priority = bound_priority (ntohl (gm->priority), cps);
4135   if (priority < 0)
4136     {
4137 #if DEBUG_FS
4138       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4139                   "Dropping query from `%s', this peer is too busy.\n",
4140                   GNUNET_i2s (other));
4141 #endif
4142       return GNUNET_OK;
4143     }
4144 #if DEBUG_FS 
4145   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4146               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4147               GNUNET_h2s (&gm->query),
4148               (unsigned int) type,
4149               GNUNET_i2s (other),
4150               (unsigned int) bm);
4151 #endif
4152   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4153   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4154                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
4155   if (have_ns)
4156     {
4157       pr->namespace = (GNUNET_HashCode*) &pr[1];
4158       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4159     }
4160   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4161        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
4162         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4163     {
4164       /* don't have BW to send to peer, or would likely take longer than we have for it,
4165          so at best indirect the query */
4166       priority = 0;
4167       pr->forward_only = GNUNET_YES;
4168     }
4169   pr->type = type;
4170   pr->mingle = ntohl (gm->filter_mutator);
4171   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4172     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4173   pr->anonymity_level = 1;
4174   pr->priority = (uint32_t) priority;
4175   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4176   pr->query = gm->query;
4177   /* decrement ttl (always) */
4178   ttl_decrement = 2 * TTL_DECREMENT +
4179     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4180                               TTL_DECREMENT);
4181   if ( (pr->ttl < 0) &&
4182        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4183     {
4184 #if DEBUG_FS
4185       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4186                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4187                   GNUNET_i2s (other),
4188                   pr->ttl,
4189                   ttl_decrement);
4190 #endif
4191       GNUNET_STATISTICS_update (stats,
4192                                 gettext_noop ("# requests dropped due TTL underflow"),
4193                                 1,
4194                                 GNUNET_NO);
4195       /* integer underflow => drop (should be very rare)! */      
4196       GNUNET_free (pr);
4197       return GNUNET_OK;
4198     } 
4199   pr->ttl -= ttl_decrement;
4200   pr->start_time = GNUNET_TIME_absolute_get ();
4201
4202   /* get bloom filter */
4203   if (bfsize > 0)
4204     {
4205       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4206                                                   bfsize,
4207                                                   BLOOMFILTER_K);
4208       pr->bf_size = bfsize;
4209     }
4210   cdc.have = NULL;
4211   cdc.pr = pr;
4212   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4213                                               &gm->query,
4214                                               &check_duplicate_request_peer,
4215                                               &cdc);
4216   if (cdc.have != NULL)
4217     {
4218       if (cdc.have->start_time.abs_value + cdc.have->ttl >=
4219           pr->start_time.abs_value + pr->ttl)
4220         {
4221           /* existing request has higher TTL, drop new one! */
4222           cdc.have->priority += pr->priority;
4223           destroy_pending_request (pr);
4224 #if DEBUG_FS
4225           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4226                       "Have existing request with higher TTL, dropping new request.\n",
4227                       GNUNET_i2s (other));
4228 #endif
4229           GNUNET_STATISTICS_update (stats,
4230                                     gettext_noop ("# requests dropped due to higher-TTL request"),
4231                                     1,
4232                                     GNUNET_NO);
4233           return GNUNET_OK;
4234         }
4235       else
4236         {
4237           /* existing request has lower TTL, drop old one! */
4238           pr->priority += cdc.have->priority;
4239           /* Possible optimization: if we have applicable pending
4240              replies in 'cdc.have', we might want to move those over
4241              (this is a really rare special-case, so it is not clear
4242              that this would be worth it) */
4243           destroy_pending_request (cdc.have);
4244           /* keep processing 'pr'! */
4245         }
4246     }
4247
4248   pr->cp = cp;
4249   GNUNET_break (GNUNET_OK ==
4250                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4251                                                    &gm->query,
4252                                                    pr,
4253                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4254   GNUNET_break (GNUNET_OK ==
4255                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4256                                                    &other->hashPubKey,
4257                                                    pr,
4258                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4259   
4260   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4261                                             pr,
4262                                             pr->start_time.abs_value + pr->ttl);
4263
4264   GNUNET_STATISTICS_update (stats,
4265                             gettext_noop ("# P2P searches received"),
4266                             1,
4267                             GNUNET_NO);
4268   GNUNET_STATISTICS_update (stats,
4269                             gettext_noop ("# P2P searches active"),
4270                             1,
4271                             GNUNET_NO);
4272
4273   /* calculate change in traffic preference */
4274   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4275   /* process locally */
4276   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4277     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4278   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4279                                            (pr->priority + 1)); 
4280   if (GNUNET_YES != pr->forward_only)
4281     {
4282 #if DEBUG_FS
4283       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4284                   "Handing request for `%s' to datastore\n",
4285                   GNUNET_h2s (&gm->query));
4286 #endif
4287       pr->qe = GNUNET_DATASTORE_get (dsh,
4288                                      &gm->query,
4289                                      type,                             
4290                                      pr->priority + 1,
4291                                      MAX_DATASTORE_QUEUE,                                
4292                                      timeout,
4293                                      &process_local_reply,
4294                                      pr);
4295       if (NULL == pr->qe)
4296         {
4297           GNUNET_STATISTICS_update (stats,
4298                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
4299                                     1,
4300                                     GNUNET_NO);
4301         }
4302     }
4303   else
4304     {
4305       GNUNET_STATISTICS_update (stats,
4306                                 gettext_noop ("# requests forwarded due to high load"),
4307                                 1,
4308                                 GNUNET_NO);
4309     }
4310
4311   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4312   switch (pr->type)
4313     {
4314     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4315     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4316       /* only one result, wait for datastore */
4317       if (GNUNET_YES != pr->forward_only)
4318         {
4319           GNUNET_STATISTICS_update (stats,
4320                                     gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4321                                     1,
4322                                     GNUNET_NO);
4323           break;
4324         }
4325     default:
4326       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4327         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
4328                                              pr);
4329     }
4330
4331   /* make sure we don't track too many requests */
4332   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4333     {
4334       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4335       GNUNET_assert (pr != NULL);
4336       destroy_pending_request (pr);
4337     }
4338   return GNUNET_OK;
4339 }
4340
4341
4342 /* **************************** CS GET Handling ************************ */
4343
4344
4345 /**
4346  * Handle START_SEARCH-message (search request from client).
4347  *
4348  * @param cls closure
4349  * @param client identification of the client
4350  * @param message the actual message
4351  */
4352 static void
4353 handle_start_search (void *cls,
4354                      struct GNUNET_SERVER_Client *client,
4355                      const struct GNUNET_MessageHeader *message)
4356 {
4357   static GNUNET_HashCode all_zeros;
4358   const struct SearchMessage *sm;
4359   struct ClientList *cl;
4360   struct ClientRequestList *crl;
4361   struct PendingRequest *pr;
4362   uint16_t msize;
4363   unsigned int sc;
4364   enum GNUNET_BLOCK_Type type;
4365
4366   msize = ntohs (message->size);
4367   if ( (msize < sizeof (struct SearchMessage)) ||
4368        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4369     {
4370       GNUNET_break (0);
4371       GNUNET_SERVER_receive_done (client,
4372                                   GNUNET_SYSERR);
4373       return;
4374     }
4375   GNUNET_STATISTICS_update (stats,
4376                             gettext_noop ("# client searches received"),
4377                             1,
4378                             GNUNET_NO);
4379   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4380   sm = (const struct SearchMessage*) message;
4381   type = ntohl (sm->type);
4382 #if DEBUG_FS
4383   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4384               "Received request for `%s' of type %u from local client\n",
4385               GNUNET_h2s (&sm->query),
4386               (unsigned int) type);
4387 #endif
4388   cl = client_list;
4389   while ( (cl != NULL) &&
4390           (cl->client != client) )
4391     cl = cl->next;
4392   if (cl == NULL)
4393     {
4394       cl = GNUNET_malloc (sizeof (struct ClientList));
4395       cl->client = client;
4396       GNUNET_SERVER_client_keep (client);
4397       cl->next = client_list;
4398       client_list = cl;
4399     }
4400   /* detect duplicate KBLOCK requests */
4401   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4402        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4403        (type == GNUNET_BLOCK_TYPE_ANY) )
4404     {
4405       crl = cl->rl_head;
4406       while ( (crl != NULL) &&
4407               ( (0 != memcmp (&crl->req->query,
4408                               &sm->query,
4409                               sizeof (GNUNET_HashCode))) ||
4410                 (crl->req->type != type) ) )
4411         crl = crl->next;
4412       if (crl != NULL)  
4413         { 
4414 #if DEBUG_FS
4415           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4416                       "Have existing request, merging content-seen lists.\n");
4417 #endif
4418           pr = crl->req;
4419           /* Duplicate request (used to send long list of
4420              known/blocked results); merge 'pr->replies_seen'
4421              and update bloom filter */
4422           GNUNET_array_grow (pr->replies_seen,
4423                              pr->replies_seen_size,
4424                              pr->replies_seen_off + sc);
4425           memcpy (&pr->replies_seen[pr->replies_seen_off],
4426                   &sm[1],
4427                   sc * sizeof (GNUNET_HashCode));
4428           pr->replies_seen_off += sc;
4429           refresh_bloomfilter (pr);
4430           GNUNET_STATISTICS_update (stats,
4431                                     gettext_noop ("# client searches updated (merged content seen list)"),
4432                                     1,
4433                                     GNUNET_NO);
4434           GNUNET_SERVER_receive_done (client,
4435                                       GNUNET_OK);
4436           return;
4437         }
4438     }
4439   GNUNET_STATISTICS_update (stats,
4440                             gettext_noop ("# client searches active"),
4441                             1,
4442                             GNUNET_NO);
4443   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4444                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4445   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4446   memset (crl, 0, sizeof (struct ClientRequestList));
4447   crl->client_list = cl;
4448   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4449                                cl->rl_tail,
4450                                crl);  
4451   crl->req = pr;
4452   pr->type = type;
4453   pr->client_request_list = crl;
4454   GNUNET_array_grow (pr->replies_seen,
4455                      pr->replies_seen_size,
4456                      sc);
4457   memcpy (pr->replies_seen,
4458           &sm[1],
4459           sc * sizeof (GNUNET_HashCode));
4460   pr->replies_seen_off = sc;
4461   pr->anonymity_level = ntohl (sm->anonymity_level); 
4462   pr->start_time = GNUNET_TIME_absolute_get ();
4463   refresh_bloomfilter (pr);
4464   pr->query = sm->query;
4465   if (0 == (1 & ntohl (sm->options)))
4466     pr->local_only = GNUNET_NO;
4467   else
4468     pr->local_only = GNUNET_YES;
4469   switch (type)
4470     {
4471     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4472     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4473       if (0 != memcmp (&sm->target,
4474                        &all_zeros,
4475                        sizeof (GNUNET_HashCode)))
4476         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4477       break;
4478     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4479       pr->namespace = (GNUNET_HashCode*) &pr[1];
4480       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4481       break;
4482     default:
4483       break;
4484     }
4485   GNUNET_break (GNUNET_OK ==
4486                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4487                                                    &sm->query,
4488                                                    pr,
4489                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4490   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4491     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4492   pr->qe = GNUNET_DATASTORE_get (dsh,
4493                                  &sm->query,
4494                                  type,
4495                                  -3, -1,
4496                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4497                                  &process_local_reply,
4498                                  pr);
4499 }
4500
4501
4502 /* **************************** Startup ************************ */
4503
4504 /**
4505  * Process fs requests.
4506  *
4507  * @param server the initialized server
4508  * @param c configuration to use
4509  */
4510 static int
4511 main_init (struct GNUNET_SERVER_Handle *server,
4512            const struct GNUNET_CONFIGURATION_Handle *c)
4513 {
4514   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4515     {
4516       { &handle_p2p_get, 
4517         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4518       { &handle_p2p_put, 
4519         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4520       { &handle_p2p_migration_stop, 
4521         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4522         sizeof (struct MigrationStopMessage) },
4523       { NULL, 0, 0 }
4524     };
4525   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4526     {&GNUNET_FS_handle_index_start, NULL, 
4527      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4528     {&GNUNET_FS_handle_index_list_get, NULL, 
4529      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4530     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4531      sizeof (struct UnindexMessage) },
4532     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4533      0 },
4534     {NULL, NULL, 0, 0}
4535   };
4536   unsigned long long enc = 128;
4537
4538   cfg = c;
4539   stats = GNUNET_STATISTICS_create ("fs", cfg);
4540   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4541   if ( (GNUNET_OK !=
4542         GNUNET_CONFIGURATION_get_value_number (cfg,
4543                                                "fs",
4544                                                "MAX_PENDING_REQUESTS",
4545                                                &max_pending_requests)) ||
4546        (GNUNET_OK !=
4547         GNUNET_CONFIGURATION_get_value_number (cfg,
4548                                                "fs",
4549                                                "EXPECTED_NEIGHBOUR_COUNT",
4550                                                &enc)) ||
4551        (GNUNET_OK != 
4552         GNUNET_CONFIGURATION_get_value_time (cfg,
4553                                              "fs",
4554                                              "MIN_MIGRATION_DELAY",
4555                                              &min_migration_delay)) )
4556     {
4557       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4558                   _("Configuration fails to specify certain parameters, assuming default values."));
4559     }
4560   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4561   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4562   rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
4563   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4564   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4565   core = GNUNET_CORE_connect (cfg,
4566                               1, /* larger? */
4567                               NULL,
4568                               NULL,
4569                               &peer_connect_handler,
4570                               &peer_disconnect_handler,
4571                               &peer_status_handler,
4572                               NULL, GNUNET_NO,
4573                               NULL, GNUNET_NO,
4574                               p2p_handlers);
4575   if (NULL == core)
4576     {
4577       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4578                   _("Failed to connect to `%s' service.\n"),
4579                   "core");
4580       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4581       connected_peers = NULL;
4582       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4583       query_request_map = NULL;
4584       GNUNET_LOAD_value_free (rt_entry_lifetime);
4585       rt_entry_lifetime = NULL;
4586       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4587       requests_by_expiration_heap = NULL;
4588       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4589       peer_request_map = NULL;
4590       if (dsh != NULL)
4591         {
4592           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4593           dsh = NULL;
4594         }
4595       return GNUNET_SYSERR;
4596     }
4597   if (active_from_migration) 
4598     {
4599       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4600                   _("Content migration is enabled, will start to gather data\n"));
4601       consider_migration_gathering ();
4602     }
4603   consider_dht_put_gathering (NULL);
4604   GNUNET_SERVER_disconnect_notify (server, 
4605                                    &handle_client_disconnect,
4606                                    NULL);
4607   GNUNET_assert (GNUNET_OK ==
4608                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4609                                                           "fs",
4610                                                           "TRUST",
4611                                                           &trustDirectory));
4612   GNUNET_DISK_directory_create (trustDirectory);
4613   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
4614                                       &cron_flush_trust, NULL);
4615
4616
4617   GNUNET_SERVER_add_handlers (server, handlers);
4618   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
4619                                 &shutdown_task,
4620                                 NULL);
4621   return GNUNET_OK;
4622 }
4623
4624
4625 /**
4626  * Process fs requests.
4627  *
4628  * @param cls closure
4629  * @param server the initialized server
4630  * @param cfg configuration to use
4631  */
4632 static void
4633 run (void *cls,
4634      struct GNUNET_SERVER_Handle *server,
4635      const struct GNUNET_CONFIGURATION_Handle *cfg)
4636 {
4637   active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4638                                                               "FS",
4639                                                               "CONTENT_CACHING");
4640   active_from_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4641                                                                 "FS",
4642                                                                 "CONTENT_PUSHING");
4643   dsh = GNUNET_DATASTORE_connect (cfg);
4644   if (dsh == NULL)
4645     {
4646       GNUNET_SCHEDULER_shutdown ();
4647       return;
4648     }
4649   datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4650   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4651   block_cfg = GNUNET_CONFIGURATION_create ();
4652   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4653                                          "block",
4654                                          "PLUGINS",
4655                                          "fs");
4656   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4657   GNUNET_assert (NULL != block_ctx);
4658   dht_handle = GNUNET_DHT_connect (cfg,
4659                                    FS_DHT_HT_SIZE);
4660   if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, dsh)) ||
4661        (GNUNET_OK != main_init (server, cfg)) )
4662     {    
4663       GNUNET_SCHEDULER_shutdown ();
4664       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4665       dsh = NULL;
4666       GNUNET_DHT_disconnect (dht_handle);
4667       dht_handle = NULL;
4668       GNUNET_BLOCK_context_destroy (block_ctx);
4669       block_ctx = NULL;
4670       GNUNET_CONFIGURATION_destroy (block_cfg);
4671       block_cfg = NULL;
4672       GNUNET_LOAD_value_free (datastore_get_load);
4673       datastore_get_load = NULL;
4674       GNUNET_LOAD_value_free (datastore_put_load);
4675       datastore_put_load = NULL;
4676       return;   
4677     }
4678 }
4679
4680
4681 /**
4682  * The main function for the fs service.
4683  *
4684  * @param argc number of arguments from the command line
4685  * @param argv command line arguments
4686  * @return 0 ok, 1 on error
4687  */
4688 int
4689 main (int argc, char *const *argv)
4690 {
4691   return (GNUNET_OK ==
4692           GNUNET_SERVICE_run (argc,
4693                               argv,
4694                               "fs",
4695                               GNUNET_SERVICE_OPTION_NONE,
4696                               &run, NULL)) ? 0 : 1;
4697 }
4698
4699 /* end of gnunet-service-fs.c */