08e864a102e2b70c764e344df12d7b2fd5d2a0fe
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - collect traffic data for anonymity levels > 1
28  * - implement transmission restrictions for anonymity level > 1
29  * - more statistics
30  */
31 #include "platform.h"
32 #include <float.h>
33 #include "gnunet_constants.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_dht_service.h"
36 #include "gnunet_datastore_service.h"
37 #include "gnunet_load_lib.h"
38 #include "gnunet_peer_lib.h"
39 #include "gnunet_protocols.h"
40 #include "gnunet_signatures.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_transport_service.h"
43 #include "gnunet_util_lib.h"
44 #include "gnunet-service-fs_indexing.h"
45 #include "fs.h"
46
47 #define DEBUG_FS GNUNET_NO
48
49 /**
50  * Should we introduce random latency in processing?  Required for proper
51  * implementation of GAP, but can be disabled for performance evaluation of
52  * the basic routing algorithm.
53  *
54  * Note that with delays enabled, performance can be significantly lower
55  * (several orders of magnitude in 2-peer test runs); if you want to
56  * measure throughput of other components, set this to NO.  Also, you
57  * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
58  * a rather wasteful mode of operation (that might still get the highest
59  * throughput overall).
60  *
61  * Performance measurements (for 50 MB file, 2 peers):
62  *
63  * - Without delays: 3300 kb/s
64  * - With    delays:  101 kb/s
65  */
66 #define SUPPORT_DELAYS GNUNET_NO
67
68 /**
69  * Size for the hash map for DHT requests from the FS
70  * service.  Should be about the number of concurrent
71  * DHT requests we plan to make.
72  */
73 #define FS_DHT_HT_SIZE 1024
74
75 /**
76  * At what frequency should our datastore load decrease
77  * automatically (since if we don't use it, clearly the
78  * load must be going down).
79  */
80 #define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
81
82 /**
83  * How often do we flush trust values to disk?
84  */
85 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
86
87 /**
88  * How quickly do we age cover traffic?  At the given 
89  * time interval, remaining cover traffic counters are
90  * decremented by 1/16th.
91  */
92 #define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
93
94 /**
95  * How often do we at most PUT content into the DHT?
96  */
97 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
98
99 /**
100  * Inverse of the probability that we will submit the same query
101  * to the same peer again.  If the same peer already got the query
102  * repeatedly recently, the probability is multiplied by the inverse
103  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
104  * plus MAX_CORK_DELAY (so roughly every 3.5s).
105  *
106  * Note that this factor is a key influence to performance in small
107  * networks (especially test networks of 2 peers) because if there is
108  * only a single peer with the data, this value will determine how
109  * soon we might re-try.  For example, a value of 3 can result in 
110  * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
111  * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
112  * rather inefficient in larger networks, hence picking 1 is in 
113  * general not the best choice.
114  *
115  * Performance measurements (for 50 MB file, 2 peers, no delays):
116  *
117  * - 1: 3300 kb/s (consistently)
118  * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s
119  * - 5:  759 kb/s, 968 kb/s, 1160 kb/s
120  *
121  * Note that this does NOT mean that the value should be 1 since
122  * a 2-peer network is far from representative here (and this fails
123  * to take into consideration bandwidth wasted by repeatedly 
124  * sending queries to peers that don't have the content).  Also,
125  * it is expected that higher values lead to more inconsistent
126  * measurements since this only affects lost messages towards the
127  * end of the download.
128  *
129  * Finally, we should probably consider changing this and making
130  * it dependent on the number of connected peers or a related
131  * metric (bad magic constants...).
132  */
133 #define RETRY_PROBABILITY_INV 1
134
135 /**
136  * What is the maximum delay for a P2P FS message (in our interaction
137  * with core)?  FS-internal delays are another story.  The value is
138  * chosen based on the 32k block size.  Given that peers typcially
139  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
140  * transmit one message even to the lowest-bandwidth peers.
141  */
142 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
143
144 /**
145  * Maximum number of requests (from other peers, overall) that we're
146  * willing to have pending at any given point in time.  Can be changed
147  * via the configuration file (32k is just the default).
148  */
149 static unsigned long long max_pending_requests = (32 * 1024);
150
151
152 /**
153  * Information we keep for each pending reply.  The
154  * actual message follows at the end of this struct.
155  */
156 struct PendingMessage;
157
158 /**
159  * Function called upon completion of a transmission.
160  *
161  * @param cls closure
162  * @param pid ID of receiving peer, 0 on transmission error
163  */
164 typedef void (*TransmissionContinuation)(void * cls, 
165                                          GNUNET_PEER_Id tpid);
166
167
168 /**
169  * Information we keep for each pending message (GET/PUT).  The
170  * actual message follows at the end of this struct.
171  */
172 struct PendingMessage
173 {
174   /**
175    * This is a doubly-linked list of messages to the same peer.
176    */
177   struct PendingMessage *next;
178
179   /**
180    * This is a doubly-linked list of messages to the same peer.
181    */
182   struct PendingMessage *prev;
183
184   /**
185    * Entry in pending message list for this pending message.
186    */ 
187   struct PendingMessageList *pml;  
188
189   /**
190    * Function to call immediately once we have transmitted this
191    * message.
192    */
193   TransmissionContinuation cont;
194
195   /**
196    * Closure for cont.
197    */
198   void *cont_cls;
199
200   /**
201    * Do not transmit this pending message until this deadline.
202    */
203   struct GNUNET_TIME_Absolute delay_until;
204
205   /**
206    * Size of the reply; actual reply message follows
207    * at the end of this struct.
208    */
209   size_t msize;
210   
211   /**
212    * How important is this message for us?
213    */
214   uint32_t priority;
215  
216 };
217
218
219 /**
220  * Information about a peer that we are connected to.
221  * We track data that is useful for determining which
222  * peers should receive our requests.  We also keep
223  * a list of messages to transmit to this peer.
224  */
225 struct ConnectedPeer
226 {
227
228   /**
229    * List of the last clients for which this peer successfully
230    * answered a query.
231    */
232   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
233
234   /**
235    * List of the last PIDs for which
236    * this peer successfully answered a query;
237    * We use 0 to indicate no successful reply.
238    */
239   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
240
241   /**
242    * Average delay between sending the peer a request and
243    * getting a reply (only calculated over the requests for
244    * which we actually got a reply).   Calculated
245    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
246    */ 
247   struct GNUNET_TIME_Relative avg_delay;
248
249   /**
250    * Point in time until which this peer does not want us to migrate content
251    * to it.
252    */
253   struct GNUNET_TIME_Absolute migration_blocked;
254
255   /**
256    * Time until when we blocked this peer from migrating
257    * data to us.
258    */
259   struct GNUNET_TIME_Absolute last_migration_block;
260
261   /**
262    * Transmission times for the last MAX_QUEUE_PER_PEER
263    * requests for this peer.  Used as a ring buffer, current
264    * offset is stored in 'last_request_times_off'.  If the
265    * oldest entry is more recent than the 'avg_delay', we should
266    * not send any more requests right now.
267    */
268   struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
269
270   /**
271    * Handle for an active request for transmission to this
272    * peer, or NULL.
273    */
274   struct GNUNET_CORE_TransmitHandle *cth;
275
276   /**
277    * Messages (replies, queries, content migration) we would like to
278    * send to this peer in the near future.  Sorted by priority, head.
279    */
280   struct PendingMessage *pending_messages_head;
281
282   /**
283    * Messages (replies, queries, content migration) we would like to
284    * send to this peer in the near future.  Sorted by priority, tail.
285    */
286   struct PendingMessage *pending_messages_tail;
287
288   /**
289    * How long does it typically take for us to transmit a message
290    * to this peer?  (delay between the request being issued and
291    * the callback being invoked).
292    */
293   struct GNUNET_LOAD_Value *transmission_delay;
294
295   /**
296    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
297    */
298   struct GNUNET_CORE_InformationRequestContext *irc;
299
300   /**
301    * Request for which 'irc' is currently active (or NULL).
302    */
303   struct PendingRequest *pr;
304
305   /**
306    * Time when the last transmission request was issued.
307    */
308   struct GNUNET_TIME_Absolute last_transmission_request_start;
309
310   /**
311    * ID of delay task for scheduling transmission.
312    */
313   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
314
315   /**
316    * Average priority of successful replies.  Calculated
317    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
318    */
319   double avg_priority;
320
321   /**
322    * Increase in traffic preference still to be submitted
323    * to the core service for this peer.
324    */
325   uint64_t inc_preference;
326
327   /**
328    * Trust rating for this peer
329    */
330   uint32_t trust;
331
332   /**
333    * Trust rating for this peer on disk.
334    */
335   uint32_t disk_trust;
336
337   /**
338    * The peer's identity.
339    */
340   GNUNET_PEER_Id pid;  
341
342   /**
343    * Size of the linked list of 'pending_messages'.
344    */
345   unsigned int pending_requests;
346
347   /**
348    * Which offset in "last_p2p_replies" will be updated next?
349    * (we go round-robin).
350    */
351   unsigned int last_p2p_replies_woff;
352
353   /**
354    * Which offset in "last_client_replies" will be updated next?
355    * (we go round-robin).
356    */
357   unsigned int last_client_replies_woff;
358
359   /**
360    * Current offset into 'last_request_times' ring buffer.
361    */
362   unsigned int last_request_times_off;
363
364 };
365
366
367 /**
368  * Information we keep for each pending request.  We should try to
369  * keep this struct as small as possible since its memory consumption
370  * is key to how many requests we can have pending at once.
371  */
372 struct PendingRequest;
373
374
375 /**
376  * Doubly-linked list of requests we are performing
377  * on behalf of the same client.
378  */
379 struct ClientRequestList
380 {
381
382   /**
383    * This is a doubly-linked list.
384    */
385   struct ClientRequestList *next;
386
387   /**
388    * This is a doubly-linked list.
389    */
390   struct ClientRequestList *prev;
391
392   /**
393    * Request this entry represents.
394    */
395   struct PendingRequest *req;
396
397   /**
398    * Client list this request belongs to.
399    */
400   struct ClientList *client_list;
401
402 };
403
404
405 /**
406  * Replies to be transmitted to the client.  The actual
407  * response message is allocated after this struct.
408  */
409 struct ClientResponseMessage
410 {
411   /**
412    * This is a doubly-linked list.
413    */
414   struct ClientResponseMessage *next;
415
416   /**
417    * This is a doubly-linked list.
418    */
419   struct ClientResponseMessage *prev;
420
421   /**
422    * Client list entry this response belongs to.
423    */
424   struct ClientList *client_list;
425
426   /**
427    * Number of bytes in the response.
428    */
429   size_t msize;
430 };
431
432
433 /**
434  * Linked list of clients we are performing requests
435  * for right now.
436  */
437 struct ClientList
438 {
439   /**
440    * This is a linked list.
441    */
442   struct ClientList *next;
443
444   /**
445    * ID of a client making a request, NULL if this entry is for a
446    * peer.
447    */
448   struct GNUNET_SERVER_Client *client;
449
450   /**
451    * Head of list of requests performed on behalf
452    * of this client right now.
453    */
454   struct ClientRequestList *rl_head;
455
456   /**
457    * Tail of list of requests performed on behalf
458    * of this client right now.
459    */
460   struct ClientRequestList *rl_tail;
461
462   /**
463    * Head of linked list of responses.
464    */
465   struct ClientResponseMessage *res_head;
466
467   /**
468    * Tail of linked list of responses.
469    */
470   struct ClientResponseMessage *res_tail;
471
472   /**
473    * Context for sending replies.
474    */
475   struct GNUNET_CONNECTION_TransmitHandle *th;
476
477 };
478
479
480 /**
481  * Information about a peer that we have forwarded this
482  * request to already.  
483  */
484 struct UsedTargetEntry
485 {
486   /**
487    * What was the last time we have transmitted this request to this
488    * peer?
489    */
490   struct GNUNET_TIME_Absolute last_request_time;
491
492   /**
493    * How often have we transmitted this request to this peer?
494    */
495   unsigned int num_requests;
496
497   /**
498    * PID of the target peer.
499    */
500   GNUNET_PEER_Id pid;
501
502 };
503
504
505 /**
506  * Doubly-linked list of messages we are performing
507  * due to a pending request.
508  */
509 struct PendingMessageList
510 {
511
512   /**
513    * This is a doubly-linked list of messages on behalf of the same request.
514    */
515   struct PendingMessageList *next;
516
517   /**
518    * This is a doubly-linked list of messages on behalf of the same request.
519    */
520   struct PendingMessageList *prev;
521
522   /**
523    * Message this entry represents.
524    */
525   struct PendingMessage *pm;
526
527   /**
528    * Request this entry belongs to.
529    */
530   struct PendingRequest *req;
531
532   /**
533    * Peer this message is targeted for.
534    */
535   struct ConnectedPeer *target;
536
537 };
538
539
540 /**
541  * Information we keep for each pending request.  We should try to
542  * keep this struct as small as possible since its memory consumption
543  * is key to how many requests we can have pending at once.
544  */
545 struct PendingRequest
546 {
547
548   /**
549    * If this request was made by a client, this is our entry in the
550    * client request list; otherwise NULL.
551    */
552   struct ClientRequestList *client_request_list;
553
554   /**
555    * Entry of peer responsible for this entry (if this request
556    * was made by a peer).
557    */
558   struct ConnectedPeer *cp;
559
560   /**
561    * If this is a namespace query, pointer to the hash of the public
562    * key of the namespace; otherwise NULL.  Pointer will be to the 
563    * end of this struct (so no need to free it).
564    */
565   const GNUNET_HashCode *namespace;
566
567   /**
568    * Bloomfilter we use to filter out replies that we don't care about
569    * (anymore).  NULL as long as we are interested in all replies.
570    */
571   struct GNUNET_CONTAINER_BloomFilter *bf;
572
573   /**
574    * Reference to DHT get operation for this request (or NULL).
575    */
576   struct GNUNET_DHT_GetHandle *dht_get;
577
578   /**
579    * Context of our GNUNET_CORE_peer_change_preference call.
580    */
581   struct ConnectedPeer *pirc;
582
583   /**
584    * Hash code of all replies that we have seen so far (only valid
585    * if client is not NULL since we only track replies like this for
586    * our own clients).
587    */
588   GNUNET_HashCode *replies_seen;
589
590   /**
591    * Node in the heap representing this entry; NULL
592    * if we have no heap node.
593    */
594   struct GNUNET_CONTAINER_HeapNode *hnode;
595
596   /**
597    * Head of list of messages being performed on behalf of this
598    * request.
599    */
600   struct PendingMessageList *pending_head;
601
602   /**
603    * Tail of list of messages being performed on behalf of this
604    * request.
605    */
606   struct PendingMessageList *pending_tail;
607
608   /**
609    * When did we first see this request (form this peer), or, if our
610    * client is initiating, when did we last initiate a search?
611    */
612   struct GNUNET_TIME_Absolute start_time;
613
614   /**
615    * The query that this request is for.
616    */
617   GNUNET_HashCode query;
618
619   /**
620    * The task responsible for transmitting queries
621    * for this request.
622    */
623   GNUNET_SCHEDULER_TaskIdentifier task;
624
625   /**
626    * (Interned) Peer identifier that identifies a preferred target
627    * for requests.
628    */
629   GNUNET_PEER_Id target_pid;
630
631   /**
632    * (Interned) Peer identifiers of peers that have already
633    * received our query for this content.
634    */
635   struct UsedTargetEntry *used_targets;
636   
637   /**
638    * Our entry in the queue (non-NULL while we wait for our
639    * turn to interact with the local database).
640    */
641   struct GNUNET_DATASTORE_QueueEntry *qe;
642
643   /**
644    * Size of the 'bf' (in bytes).
645    */
646   size_t bf_size;
647
648   /**
649    * Desired anonymity level; only valid for requests from a local client.
650    */
651   uint32_t anonymity_level;
652
653   /**
654    * How many entries in "used_targets" are actually valid?
655    */
656   unsigned int used_targets_off;
657
658   /**
659    * How long is the "used_targets" array?
660    */
661   unsigned int used_targets_size;
662
663   /**
664    * Number of results found for this request.
665    */
666   unsigned int results_found;
667
668   /**
669    * How many entries in "replies_seen" are actually valid?
670    */
671   unsigned int replies_seen_off;
672
673   /**
674    * How long is the "replies_seen" array?
675    */
676   unsigned int replies_seen_size;
677   
678   /**
679    * Priority with which this request was made.  If one of our clients
680    * made the request, then this is the current priority that we are
681    * using when initiating the request.  This value is used when
682    * we decide to reward other peers with trust for providing a reply.
683    */
684   uint32_t priority;
685
686   /**
687    * Priority points left for us to spend when forwarding this request
688    * to other peers.
689    */
690   uint32_t remaining_priority;
691
692   /**
693    * Number to mingle hashes for bloom-filter tests with.
694    */
695   int32_t mingle;
696
697   /**
698    * TTL with which we saw this request (or, if we initiated, TTL that
699    * we used for the request).
700    */
701   int32_t ttl;
702   
703   /**
704    * Type of the content that this request is for.
705    */
706   enum GNUNET_BLOCK_Type type;
707
708   /**
709    * Remove this request after transmission of the current response.
710    */
711   int8_t do_remove;
712
713   /**
714    * GNUNET_YES if we should not forward this request to other peers.
715    */
716   int8_t local_only;
717
718   /**
719    * GNUNET_YES if we should not forward this request to other peers.
720    */
721   int8_t forward_only;
722
723 };
724
725
726 /**
727  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
728  */
729 struct MigrationReadyBlock
730 {
731
732   /**
733    * This is a doubly-linked list.
734    */
735   struct MigrationReadyBlock *next;
736
737   /**
738    * This is a doubly-linked list.
739    */
740   struct MigrationReadyBlock *prev;
741
742   /**
743    * Query for the block.
744    */
745   GNUNET_HashCode query;
746
747   /**
748    * When does this block expire? 
749    */
750   struct GNUNET_TIME_Absolute expiration;
751
752   /**
753    * Peers we would consider forwarding this
754    * block to.  Zero for empty entries.
755    */
756   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
757
758   /**
759    * Size of the block.
760    */
761   size_t size;
762
763   /**
764    *  Number of targets already used.
765    */
766   unsigned int used_targets;
767
768   /**
769    * Type of the block.
770    */
771   enum GNUNET_BLOCK_Type type;
772 };
773
774 /**
775  * Identity of this peer.
776  */
777 static struct GNUNET_PeerIdentity my_id;
778
779 /**
780  * Our connection to the datastore.
781  */
782 static struct GNUNET_DATASTORE_Handle *dsh;
783
784 /**
785  * Our block context.
786  */
787 static struct GNUNET_BLOCK_Context *block_ctx;
788
789 /**
790  * Our block configuration.
791  */
792 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
793
794 /**
795  * Our configuration.
796  */
797 static const struct GNUNET_CONFIGURATION_Handle *cfg;
798
799 /**
800  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
801  */
802 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
803
804 /**
805  * Map of peer identifiers to "struct PendingRequest" (for that peer).
806  */
807 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
808
809 /**
810  * Map of query identifiers to "struct PendingRequest" (for that query).
811  */
812 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
813
814 /**
815  * Heap with the request that will expire next at the top.  Contains
816  * pointers of type "struct PendingRequest*"; these will *also* be
817  * aliased from the "requests_by_peer" data structures and the
818  * "requests_by_query" table.  Note that requests from our clients
819  * don't expire and are thus NOT in the "requests_by_expiration"
820  * (or the "requests_by_peer" tables).
821  */
822 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
823
824 /**
825  * Handle for reporting statistics.
826  */
827 static struct GNUNET_STATISTICS_Handle *stats;
828
829 /**
830  * Linked list of clients we are currently processing requests for.
831  */
832 static struct ClientList *client_list;
833
834 /**
835  * Pointer to handle to the core service (points to NULL until we've
836  * connected to it).
837  */
838 static struct GNUNET_CORE_Handle *core;
839
840 /**
841  * Head of linked list of blocks that can be migrated.
842  */
843 static struct MigrationReadyBlock *mig_head;
844
845 /**
846  * Tail of linked list of blocks that can be migrated.
847  */
848 static struct MigrationReadyBlock *mig_tail;
849
850 /**
851  * Request to datastore for migration (or NULL).
852  */
853 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
854
855 /**
856  * Request to datastore for DHT PUTs (or NULL).
857  */
858 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
859
860 /**
861  * Type we will request for the next DHT PUT round from the datastore.
862  */
863 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
864
865 /**
866  * Where do we store trust information?
867  */
868 static char *trustDirectory;
869
870 /**
871  * ID of task that collects blocks for migration.
872  */
873 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
874
875 /**
876  * ID of task that collects blocks for DHT PUTs.
877  */
878 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
879
880 /**
881  * What is the maximum frequency at which we are allowed to
882  * poll the datastore for migration content?
883  */
884 static struct GNUNET_TIME_Relative min_migration_delay;
885
886 /**
887  * Handle for DHT operations.
888  */
889 static struct GNUNET_DHT_Handle *dht_handle;
890
891 /**
892  * Size of the doubly-linked list of migration blocks.
893  */
894 static unsigned int mig_size;
895
896 /**
897  * Are we allowed to migrate content to this peer.
898  */
899 static int active_to_migration;
900
901 /**
902  * Are we allowed to push out content from this peer.
903  */
904 static int active_from_migration;
905
906 /**
907  * How many entires with zero anonymity do we currently estimate
908  * to have in the database?
909  */
910 static unsigned int zero_anonymity_count_estimate;
911
912 /**
913  * Typical priorities we're seeing from other peers right now.  Since
914  * most priorities will be zero, this value is the weighted average of
915  * non-zero priorities seen "recently".  In order to ensure that new
916  * values do not dramatically change the ratio, values are first
917  * "capped" to a reasonable range (+N of the current value) and then
918  * averaged into the existing value by a ratio of 1:N.  Hence
919  * receiving the largest possible priority can still only raise our
920  * "current_priorities" by at most 1.
921  */
922 static double current_priorities;
923
924 /**
925  * Datastore 'GET' load tracking.
926  */
927 static struct GNUNET_LOAD_Value *datastore_get_load;
928
929 /**
930  * Datastore 'PUT' load tracking.
931  */
932 static struct GNUNET_LOAD_Value *datastore_put_load;
933
934 /**
935  * How long do requests typically stay in the routing table?
936  */
937 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
938
939 /**
940  * How many query messages have we received 'recently' that 
941  * have not yet been claimed as cover traffic?
942  */
943 static unsigned int cover_query_count;
944
945 /**
946  * How many content messages have we received 'recently' that 
947  * have not yet been claimed as cover traffic?
948  */
949 static unsigned int cover_content_count;
950
951 /**
952  * ID of our task that we use to age the cover counters.
953  */
954 static GNUNET_SCHEDULER_TaskIdentifier cover_age_task;
955
956 static void
957 age_cover_counters (void *cls,
958                     const struct GNUNET_SCHEDULER_TaskContext *tc)
959 {
960   cover_content_count = (cover_content_count * 15) / 16;
961   cover_query_count = (cover_query_count * 15) / 16;
962   cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
963                                                  &age_cover_counters,
964                                                  NULL);
965 }
966
967 /**
968  * We've just now completed a datastore request.  Update our
969  * datastore load calculations.
970  *
971  * @param start time when the datastore request was issued
972  */
973 static void
974 update_datastore_delays (struct GNUNET_TIME_Absolute start)
975 {
976   struct GNUNET_TIME_Relative delay;
977
978   delay = GNUNET_TIME_absolute_get_duration (start);
979   GNUNET_LOAD_update (datastore_get_load,
980                       delay.rel_value);
981 }
982
983
984 /**
985  * Get the filename under which we would store the GNUNET_HELLO_Message
986  * for the given host and protocol.
987  * @return filename of the form DIRECTORY/HOSTID
988  */
989 static char *
990 get_trust_filename (const struct GNUNET_PeerIdentity *id)
991 {
992   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
993   char *fn;
994
995   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
996   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
997   return fn;
998 }
999
1000
1001
1002 /**
1003  * Transmit messages by copying it to the target buffer
1004  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1005  * for writing in the meantime.  In that case, do nothing
1006  * (the disconnect or shutdown handler will take care of the rest).
1007  * If we were able to transmit messages and there are still more
1008  * pending, ask core again for further calls to this function.
1009  *
1010  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1011  * @param size number of bytes available in buf
1012  * @param buf where the callee should write the message
1013  * @return number of bytes written to buf
1014  */
1015 static size_t
1016 transmit_to_peer (void *cls,
1017                   size_t size, void *buf);
1018
1019
1020 /* ******************* clean up functions ************************ */
1021
1022 /**
1023  * Delete the given migration block.
1024  *
1025  * @param mb block to delete
1026  */
1027 static void
1028 delete_migration_block (struct MigrationReadyBlock *mb)
1029 {
1030   GNUNET_CONTAINER_DLL_remove (mig_head,
1031                                mig_tail,
1032                                mb);
1033   GNUNET_PEER_decrement_rcs (mb->target_list,
1034                              MIGRATION_LIST_SIZE);
1035   mig_size--;
1036   GNUNET_free (mb);
1037 }
1038
1039
1040 /**
1041  * Compare the distance of two peers to a key.
1042  *
1043  * @param key key
1044  * @param p1 first peer
1045  * @param p2 second peer
1046  * @return GNUNET_YES if P1 is closer to key than P2
1047  */
1048 static int
1049 is_closer (const GNUNET_HashCode *key,
1050            const struct GNUNET_PeerIdentity *p1,
1051            const struct GNUNET_PeerIdentity *p2)
1052 {
1053   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
1054                                     &p2->hashPubKey,
1055                                     key);
1056 }
1057
1058
1059 /**
1060  * Consider migrating content to a given peer.
1061  *
1062  * @param cls 'struct MigrationReadyBlock*' to select
1063  *            targets for (or NULL for none)
1064  * @param key ID of the peer 
1065  * @param value 'struct ConnectedPeer' of the peer
1066  * @return GNUNET_YES (always continue iteration)
1067  */
1068 static int
1069 consider_migration (void *cls,
1070                     const GNUNET_HashCode *key,
1071                     void *value)
1072 {
1073   struct MigrationReadyBlock *mb = cls;
1074   struct ConnectedPeer *cp = value;
1075   struct MigrationReadyBlock *pos;
1076   struct GNUNET_PeerIdentity cppid;
1077   struct GNUNET_PeerIdentity otherpid;
1078   struct GNUNET_PeerIdentity worstpid;
1079   size_t msize;
1080   unsigned int i;
1081   unsigned int repl;
1082   
1083   /* consider 'cp' as a migration target for mb */
1084   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
1085     return GNUNET_YES; /* peer has requested no migration! */
1086   if (mb != NULL)
1087     {
1088       GNUNET_PEER_resolve (cp->pid,
1089                            &cppid);
1090       repl = MIGRATION_LIST_SIZE;
1091       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1092         {
1093           if (mb->target_list[i] == 0)
1094             {
1095               mb->target_list[i] = cp->pid;
1096               GNUNET_PEER_change_rc (mb->target_list[i], 1);
1097               repl = MIGRATION_LIST_SIZE;
1098               break;
1099             }
1100           GNUNET_PEER_resolve (mb->target_list[i],
1101                                &otherpid);
1102           if ( (repl == MIGRATION_LIST_SIZE) &&
1103                is_closer (&mb->query,
1104                           &cppid,
1105                           &otherpid)) 
1106             {
1107               repl = i;
1108               worstpid = otherpid;
1109             }
1110           else if ( (repl != MIGRATION_LIST_SIZE) &&
1111                     (is_closer (&mb->query,
1112                                 &worstpid,
1113                                 &otherpid) ) )
1114             {
1115               repl = i;
1116               worstpid = otherpid;
1117             }       
1118         }
1119       if (repl != MIGRATION_LIST_SIZE) 
1120         {
1121           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1122           mb->target_list[repl] = cp->pid;
1123           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1124         }
1125     }
1126
1127   /* consider scheduling transmission to cp for content migration */
1128   if (cp->cth != NULL)        
1129     return GNUNET_YES; 
1130   msize = 0;
1131   pos = mig_head;
1132   while (pos != NULL)
1133     {
1134       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1135         {
1136           if (cp->pid == pos->target_list[i])
1137             {
1138               if (msize == 0)
1139                 msize = pos->size;
1140               else
1141                 msize = GNUNET_MIN (msize,
1142                                     pos->size);
1143               break;
1144             }
1145         }
1146       pos = pos->next;
1147     }
1148   if (msize == 0)
1149     return GNUNET_YES; /* no content available */
1150 #if DEBUG_FS
1151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152               "Trying to migrate at least %u bytes to peer `%s'\n",
1153               msize,
1154               GNUNET_h2s (key));
1155 #endif
1156   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1157     {
1158       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1159       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1160     }
1161   cp->cth 
1162     = GNUNET_CORE_notify_transmit_ready (core,
1163                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1164                                          (const struct GNUNET_PeerIdentity*) key,
1165                                          msize + sizeof (struct PutMessage),
1166                                          &transmit_to_peer,
1167                                          cp);
1168   return GNUNET_YES;
1169 }
1170
1171
1172 /**
1173  * Task that is run periodically to obtain blocks for content
1174  * migration
1175  * 
1176  * @param cls unused
1177  * @param tc scheduler context (also unused)
1178  */
1179 static void
1180 gather_migration_blocks (void *cls,
1181                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1182
1183
1184
1185
1186 /**
1187  * Task that is run periodically to obtain blocks for DHT PUTs.
1188  * 
1189  * @param cls type of blocks to gather
1190  * @param tc scheduler context (unused)
1191  */
1192 static void
1193 gather_dht_put_blocks (void *cls,
1194                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1195
1196
1197 /**
1198  * If the migration task is not currently running, consider
1199  * (re)scheduling it with the appropriate delay.
1200  */
1201 static void
1202 consider_migration_gathering ()
1203 {
1204   struct GNUNET_TIME_Relative delay;
1205
1206   if (dsh == NULL)
1207     return;
1208   if (mig_qe != NULL)
1209     return;
1210   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1211     return;
1212   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1213                                          mig_size);
1214   delay = GNUNET_TIME_relative_divide (delay,
1215                                        MAX_MIGRATION_QUEUE);
1216   delay = GNUNET_TIME_relative_max (delay,
1217                                     min_migration_delay);
1218   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
1219                                            &gather_migration_blocks,
1220                                            NULL);
1221 }
1222
1223
1224 /**
1225  * If the DHT PUT gathering task is not currently running, consider
1226  * (re)scheduling it with the appropriate delay.
1227  */
1228 static void
1229 consider_dht_put_gathering (void *cls)
1230 {
1231   struct GNUNET_TIME_Relative delay;
1232
1233   if (dsh == NULL)
1234     return;
1235   if (dht_qe != NULL)
1236     return;
1237   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1238     return;
1239   if (zero_anonymity_count_estimate > 0)
1240     {
1241       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1242                                            zero_anonymity_count_estimate);
1243       delay = GNUNET_TIME_relative_min (delay,
1244                                         MAX_DHT_PUT_FREQ);
1245     }
1246   else
1247     {
1248       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1249          (hopefully) appear */
1250       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1251     }
1252   dht_task = GNUNET_SCHEDULER_add_delayed (delay,
1253                                            &gather_dht_put_blocks,
1254                                            cls);
1255 }
1256
1257
1258 /**
1259  * Process content offered for migration.
1260  *
1261  * @param cls closure
1262  * @param key key for the content
1263  * @param size number of bytes in data
1264  * @param data content stored
1265  * @param type type of the content
1266  * @param priority priority of the content
1267  * @param anonymity anonymity-level for the content
1268  * @param expiration expiration time for the content
1269  * @param uid unique identifier for the datum;
1270  *        maybe 0 if no unique identifier is available
1271  */
1272 static void
1273 process_migration_content (void *cls,
1274                            const GNUNET_HashCode * key,
1275                            size_t size,
1276                            const void *data,
1277                            enum GNUNET_BLOCK_Type type,
1278                            uint32_t priority,
1279                            uint32_t anonymity,
1280                            struct GNUNET_TIME_Absolute
1281                            expiration, uint64_t uid)
1282 {
1283   struct MigrationReadyBlock *mb;
1284   
1285   if (key == NULL)
1286     {
1287       mig_qe = NULL;
1288       if (mig_size < MAX_MIGRATION_QUEUE)  
1289         consider_migration_gathering ();
1290       return;
1291     }
1292   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1293     {
1294       if (GNUNET_OK !=
1295           GNUNET_FS_handle_on_demand_block (key, size, data,
1296                                             type, priority, anonymity,
1297                                             expiration, uid, 
1298                                             &process_migration_content,
1299                                             NULL))
1300         {
1301           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1302         }
1303       return;
1304     }
1305 #if DEBUG_FS
1306   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1307               "Retrieved block `%s' of type %u for migration\n",
1308               GNUNET_h2s (key),
1309               type);
1310 #endif
1311   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1312   mb->query = *key;
1313   mb->expiration = expiration;
1314   mb->size = size;
1315   mb->type = type;
1316   memcpy (&mb[1], data, size);
1317   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1318                                      mig_tail,
1319                                      mig_tail,
1320                                      mb);
1321   mig_size++;
1322   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1323                                          &consider_migration,
1324                                          mb);
1325   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1326 }
1327
1328
1329 /**
1330  * Function called upon completion of the DHT PUT operation.
1331  */
1332 static void
1333 dht_put_continuation (void *cls,
1334                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1335 {
1336   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1337 }
1338
1339
1340 /**
1341  * Store content in DHT.
1342  *
1343  * @param cls closure
1344  * @param key key for the content
1345  * @param size number of bytes in data
1346  * @param data content stored
1347  * @param type type of the content
1348  * @param priority priority of the content
1349  * @param anonymity anonymity-level for the content
1350  * @param expiration expiration time for the content
1351  * @param uid unique identifier for the datum;
1352  *        maybe 0 if no unique identifier is available
1353  */
1354 static void
1355 process_dht_put_content (void *cls,
1356                          const GNUNET_HashCode * key,
1357                          size_t size,
1358                          const void *data,
1359                          enum GNUNET_BLOCK_Type type,
1360                          uint32_t priority,
1361                          uint32_t anonymity,
1362                          struct GNUNET_TIME_Absolute
1363                          expiration, uint64_t uid)
1364
1365   static unsigned int counter;
1366   static GNUNET_HashCode last_vhash;
1367   static GNUNET_HashCode vhash;
1368
1369   if (key == NULL)
1370     {
1371       dht_qe = NULL;
1372       consider_dht_put_gathering (cls);
1373       return;
1374     }
1375   /* slightly funky code to estimate the total number of values with zero
1376      anonymity from the maximum observed length of a monotonically increasing 
1377      sequence of hashes over the contents */
1378   GNUNET_CRYPTO_hash (data, size, &vhash);
1379   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1380     {
1381       if (zero_anonymity_count_estimate > 0)
1382         zero_anonymity_count_estimate /= 2;
1383       counter = 0;
1384     }
1385   last_vhash = vhash;
1386   if (counter < 31)
1387     counter++;
1388   if (zero_anonymity_count_estimate < (1 << counter))
1389     zero_anonymity_count_estimate = (1 << counter);
1390 #if DEBUG_FS
1391   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1392               "Retrieved block `%s' of type %u for DHT PUT\n",
1393               GNUNET_h2s (key),
1394               type);
1395 #endif
1396   GNUNET_DHT_put (dht_handle,
1397                   key,
1398                   DEFAULT_PUT_REPLICATION,
1399                   GNUNET_DHT_RO_NONE,
1400                   type,
1401                   size,
1402                   data,
1403                   expiration,
1404                   GNUNET_TIME_UNIT_FOREVER_REL,
1405                   &dht_put_continuation,
1406                   cls);
1407 }
1408
1409
1410 /**
1411  * Task that is run periodically to obtain blocks for content
1412  * migration
1413  * 
1414  * @param cls unused
1415  * @param tc scheduler context (also unused)
1416  */
1417 static void
1418 gather_migration_blocks (void *cls,
1419                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1420 {
1421   mig_task = GNUNET_SCHEDULER_NO_TASK;
1422   if (dsh != NULL)
1423     {
1424       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1425                                             GNUNET_TIME_UNIT_FOREVER_REL,
1426                                             &process_migration_content, NULL);
1427       GNUNET_assert (mig_qe != NULL);
1428     }
1429 }
1430
1431
1432 /**
1433  * Task that is run periodically to obtain blocks for DHT PUTs.
1434  * 
1435  * @param cls type of blocks to gather
1436  * @param tc scheduler context (unused)
1437  */
1438 static void
1439 gather_dht_put_blocks (void *cls,
1440                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1441 {
1442   dht_task = GNUNET_SCHEDULER_NO_TASK;
1443   if (dsh != NULL)
1444     {
1445       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1446         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1447       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1448                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1449                                                     dht_put_type++,
1450                                                     &process_dht_put_content, NULL);
1451       GNUNET_assert (dht_qe != NULL);
1452     }
1453 }
1454
1455
1456 /**
1457  * We're done with a particular message list entry.
1458  * Free all associated resources.
1459  * 
1460  * @param pml entry to destroy
1461  */
1462 static void
1463 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1464 {
1465   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1466                                pml->req->pending_tail,
1467                                pml);
1468   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1469                                pml->target->pending_messages_tail,
1470                                pml->pm);
1471   pml->target->pending_requests--;
1472   GNUNET_free (pml->pm);
1473   GNUNET_free (pml);
1474 }
1475
1476
1477 /**
1478  * Destroy the given pending message (and call the respective
1479  * continuation).
1480  *
1481  * @param pm message to destroy
1482  * @param tpid id of peer that the message was delivered to, or 0 for none
1483  */
1484 static void
1485 destroy_pending_message (struct PendingMessage *pm,
1486                          GNUNET_PEER_Id tpid)
1487 {
1488   struct PendingMessageList *pml = pm->pml;
1489   TransmissionContinuation cont;
1490   void *cont_cls;
1491
1492   cont = pm->cont;
1493   cont_cls = pm->cont_cls;
1494   if (pml != NULL)
1495     {
1496       GNUNET_assert (pml->pm == pm);
1497       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1498       destroy_pending_message_list_entry (pml);
1499     }
1500   else
1501     {
1502       GNUNET_free (pm);
1503     }
1504   if (cont != NULL)
1505     cont (cont_cls, tpid);  
1506 }
1507
1508
1509 /**
1510  * We're done processing a particular request.
1511  * Free all associated resources.
1512  *
1513  * @param pr request to destroy
1514  */
1515 static void
1516 destroy_pending_request (struct PendingRequest *pr)
1517 {
1518   struct GNUNET_PeerIdentity pid;
1519   unsigned int i;
1520
1521   if (pr->hnode != NULL)
1522     {
1523       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1524                                          pr->hnode);
1525       pr->hnode = NULL;
1526     }
1527   if (NULL == pr->client_request_list)
1528     {
1529       GNUNET_STATISTICS_update (stats,
1530                                 gettext_noop ("# P2P searches active"),
1531                                 -1,
1532                                 GNUNET_NO);
1533     }
1534   else
1535     {
1536       GNUNET_STATISTICS_update (stats,
1537                                 gettext_noop ("# client searches active"),
1538                                 -1,
1539                                 GNUNET_NO);
1540     }
1541   if (GNUNET_YES == 
1542       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1543                                             &pr->query,
1544                                             pr))
1545     {
1546       GNUNET_LOAD_update (rt_entry_lifetime,
1547                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
1548     }
1549   if (pr->qe != NULL)
1550      {
1551       GNUNET_DATASTORE_cancel (pr->qe);
1552       pr->qe = NULL;
1553     }
1554   if (pr->dht_get != NULL)
1555     {
1556       GNUNET_DHT_get_stop (pr->dht_get);
1557       pr->dht_get = NULL;
1558     }
1559   if (pr->client_request_list != NULL)
1560     {
1561       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1562                                    pr->client_request_list->client_list->rl_tail,
1563                                    pr->client_request_list);
1564       GNUNET_free (pr->client_request_list);
1565       pr->client_request_list = NULL;
1566     }
1567   if (pr->cp != NULL)
1568     {
1569       GNUNET_PEER_resolve (pr->cp->pid,
1570                            &pid);
1571       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1572                                                    &pid.hashPubKey,
1573                                                    pr);
1574       pr->cp = NULL;
1575     }
1576   if (pr->bf != NULL)
1577     {
1578       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1579       pr->bf = NULL;
1580     }
1581   if (pr->pirc != NULL)
1582     {
1583       GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc);
1584       pr->pirc->irc = NULL;
1585       pr->pirc = NULL;
1586     }
1587   if (pr->replies_seen != NULL)
1588     {
1589       GNUNET_free (pr->replies_seen);
1590       pr->replies_seen = NULL;
1591     }
1592   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1593     {
1594       GNUNET_SCHEDULER_cancel (pr->task);
1595       pr->task = GNUNET_SCHEDULER_NO_TASK;
1596     }
1597   while (NULL != pr->pending_head)    
1598     destroy_pending_message_list_entry (pr->pending_head);
1599   GNUNET_PEER_change_rc (pr->target_pid, -1);
1600   if (pr->used_targets != NULL)
1601     {
1602       for (i=0;i<pr->used_targets_off;i++)
1603         GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1604       GNUNET_free (pr->used_targets);
1605       pr->used_targets_off = 0;
1606       pr->used_targets_size = 0;
1607       pr->used_targets = NULL;
1608     }
1609   GNUNET_free (pr);
1610 }
1611
1612
1613 /**
1614  * Find latency information in 'atsi'.
1615  *
1616  * @param atsi performance data
1617  * @return connection latency
1618  */
1619 static struct GNUNET_TIME_Relative
1620 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1621 {
1622   if (atsi == NULL)
1623     return GNUNET_TIME_UNIT_SECONDS;
1624   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
1625           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
1626     atsi++;
1627   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
1628     {
1629       GNUNET_break (0);
1630       /* how can we not have latency data? */
1631       return GNUNET_TIME_UNIT_SECONDS;
1632     }
1633   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1634                                         ntohl (atsi->value));
1635 }
1636
1637
1638 /**
1639  * Method called whenever a given peer connects.
1640  *
1641  * @param cls closure, not used
1642  * @param peer peer identity this notification is about
1643  * @param atsi performance information
1644  */
1645 static void 
1646 peer_connect_handler (void *cls,
1647                       const struct
1648                       GNUNET_PeerIdentity * peer,
1649                       const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1650 {
1651   struct ConnectedPeer *cp;
1652   struct MigrationReadyBlock *pos;
1653   char *fn;
1654   uint32_t trust;
1655   struct GNUNET_TIME_Relative latency;
1656
1657   if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity)))
1658     return;
1659   latency = get_latency (atsi);
1660   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1661                                           &peer->hashPubKey);
1662   if (NULL != cp)
1663     {
1664       GNUNET_break (0);
1665       return;
1666     }
1667   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1668   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
1669   cp->pid = GNUNET_PEER_intern (peer);
1670
1671   fn = get_trust_filename (peer);
1672   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1673       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1674     cp->disk_trust = cp->trust = ntohl (trust);
1675   GNUNET_free (fn);
1676
1677   GNUNET_break (GNUNET_OK ==
1678                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1679                                                    &peer->hashPubKey,
1680                                                    cp,
1681                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1682
1683   pos = mig_head;
1684   while (NULL != pos)
1685     {
1686       (void) consider_migration (pos, &peer->hashPubKey, cp);
1687       pos = pos->next;
1688     }
1689 }
1690
1691
1692 /**
1693  * Method called whenever a given peer has a status change.
1694  *
1695  * @param cls closure
1696  * @param peer peer identity this notification is about
1697  * @param bandwidth_in available amount of inbound bandwidth
1698  * @param bandwidth_out available amount of outbound bandwidth
1699  * @param timeout absolute time when this peer will time out
1700  *        unless we see some further activity from it
1701  * @param atsi status information
1702  */
1703 static void
1704 peer_status_handler (void *cls,
1705                      const struct
1706                      GNUNET_PeerIdentity * peer,
1707                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1708                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1709                      struct GNUNET_TIME_Absolute timeout,
1710                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1711 {
1712   struct ConnectedPeer *cp;
1713   struct GNUNET_TIME_Relative latency;
1714
1715   latency = get_latency (atsi);
1716   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1717                                           &peer->hashPubKey);
1718   if (cp == NULL)
1719     {
1720       GNUNET_break (0);
1721       return;
1722     }
1723   GNUNET_LOAD_value_set_decline (cp->transmission_delay,
1724                                  latency);  
1725 }
1726
1727
1728
1729 /**
1730  * Increase the host credit by a value.
1731  *
1732  * @param host which peer to change the trust value on
1733  * @param value is the int value by which the
1734  *  host credit is to be increased or decreased
1735  * @returns the actual change in trust (positive or negative)
1736  */
1737 static int
1738 change_host_trust (struct ConnectedPeer *host, int value)
1739 {
1740   if (value == 0)
1741     return 0;
1742   GNUNET_assert (host != NULL);
1743   if (value > 0)
1744     {
1745       if (host->trust + value < host->trust)
1746         {
1747           value = UINT32_MAX - host->trust;
1748           host->trust = UINT32_MAX;
1749         }
1750       else
1751         host->trust += value;
1752     }
1753   else
1754     {
1755       if (host->trust < -value)
1756         {
1757           value = -host->trust;
1758           host->trust = 0;
1759         }
1760       else
1761         host->trust += value;
1762     }
1763   return value;
1764 }
1765
1766
1767 /**
1768  * Write host-trust information to a file - flush the buffer entry!
1769  */
1770 static int
1771 flush_trust (void *cls,
1772              const GNUNET_HashCode *key,
1773              void *value)
1774 {
1775   struct ConnectedPeer *host = value;
1776   char *fn;
1777   uint32_t trust;
1778   struct GNUNET_PeerIdentity pid;
1779
1780   if (host->trust == host->disk_trust)
1781     return GNUNET_OK;                     /* unchanged */
1782   GNUNET_PEER_resolve (host->pid,
1783                        &pid);
1784   fn = get_trust_filename (&pid);
1785   if (host->trust == 0)
1786     {
1787       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1788         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1789                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1790     }
1791   else
1792     {
1793       trust = htonl (host->trust);
1794       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1795                                                     sizeof(uint32_t),
1796                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1797                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1798         host->disk_trust = host->trust;
1799     }
1800   GNUNET_free (fn);
1801   return GNUNET_OK;
1802 }
1803
1804 /**
1805  * Call this method periodically to scan data/hosts for new hosts.
1806  */
1807 static void
1808 cron_flush_trust (void *cls,
1809                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1810 {
1811
1812   if (NULL == connected_peers)
1813     return;
1814   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1815                                          &flush_trust,
1816                                          NULL);
1817   if (NULL == tc)
1818     return;
1819   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1820     return;
1821   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1822 }
1823
1824
1825 /**
1826  * Free (each) request made by the peer.
1827  *
1828  * @param cls closure, points to peer that the request belongs to
1829  * @param key current key code
1830  * @param value value in the hash map
1831  * @return GNUNET_YES (we should continue to iterate)
1832  */
1833 static int
1834 destroy_request (void *cls,
1835                  const GNUNET_HashCode * key,
1836                  void *value)
1837 {
1838   const struct GNUNET_PeerIdentity * peer = cls;
1839   struct PendingRequest *pr = value;
1840   
1841   GNUNET_break (GNUNET_YES ==
1842                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1843                                                       &peer->hashPubKey,
1844                                                       pr));
1845   destroy_pending_request (pr);
1846   return GNUNET_YES;
1847 }
1848
1849
1850 /**
1851  * Method called whenever a peer disconnects.
1852  *
1853  * @param cls closure, not used
1854  * @param peer peer identity this notification is about
1855  */
1856 static void
1857 peer_disconnect_handler (void *cls,
1858                          const struct
1859                          GNUNET_PeerIdentity * peer)
1860 {
1861   struct ConnectedPeer *cp;
1862   struct PendingMessage *pm;
1863   unsigned int i;
1864   struct MigrationReadyBlock *pos;
1865   struct MigrationReadyBlock *next;
1866
1867   if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity)))
1868     return;
1869   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1870                                               &peer->hashPubKey,
1871                                               &destroy_request,
1872                                               (void*) peer);
1873   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1874                                           &peer->hashPubKey);
1875   if (cp == NULL)
1876     return;
1877   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1878     {
1879       if (NULL != cp->last_client_replies[i])
1880         {
1881           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1882           cp->last_client_replies[i] = NULL;
1883         }
1884     }
1885   GNUNET_break (GNUNET_YES ==
1886                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1887                                                       &peer->hashPubKey,
1888                                                       cp));
1889   if (cp->irc != NULL)
1890     {
1891       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1892       cp->irc = NULL;
1893       cp->pr->pirc = NULL;
1894       cp->pr = NULL;
1895     }
1896
1897   /* remove this peer from migration considerations; schedule
1898      alternatives */
1899   next = mig_head;
1900   while (NULL != (pos = next))
1901     {
1902       next = pos->next;
1903       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1904         {
1905           if (pos->target_list[i] == cp->pid)
1906             {
1907               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1908               pos->target_list[i] = 0;
1909             }
1910          }
1911       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1912         {
1913           delete_migration_block (pos);
1914           consider_migration_gathering ();
1915           continue;
1916         }
1917       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1918                                              &consider_migration,
1919                                              pos);
1920     }
1921   GNUNET_PEER_change_rc (cp->pid, -1);
1922   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1923   if (NULL != cp->cth)
1924     {
1925       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1926       cp->cth = NULL;
1927     }
1928   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1929     {
1930       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1931       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1932     }
1933   while (NULL != (pm = cp->pending_messages_head))
1934     destroy_pending_message (pm, 0 /* delivery failed */);
1935   GNUNET_LOAD_value_free (cp->transmission_delay);
1936   GNUNET_break (0 == cp->pending_requests);
1937   GNUNET_free (cp);
1938 }
1939
1940
1941 /**
1942  * Iterator over hash map entries that removes all occurences
1943  * of the given 'client' from the 'last_client_replies' of the
1944  * given connected peer.
1945  *
1946  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1947  * @param key current key code (unused)
1948  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1949  * @return GNUNET_YES (we should continue to iterate)
1950  */
1951 static int
1952 remove_client_from_last_client_replies (void *cls,
1953                                         const GNUNET_HashCode * key,
1954                                         void *value)
1955 {
1956   struct GNUNET_SERVER_Client *client = cls;
1957   struct ConnectedPeer *cp = value;
1958   unsigned int i;
1959
1960   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1961     {
1962       if (cp->last_client_replies[i] == client)
1963         {
1964           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1965           cp->last_client_replies[i] = NULL;
1966         }
1967     }  
1968   return GNUNET_YES;
1969 }
1970
1971
1972 /**
1973  * A client disconnected.  Remove all of its pending queries.
1974  *
1975  * @param cls closure, NULL
1976  * @param client identification of the client
1977  */
1978 static void
1979 handle_client_disconnect (void *cls,
1980                           struct GNUNET_SERVER_Client
1981                           * client)
1982 {
1983   struct ClientList *pos;
1984   struct ClientList *prev;
1985   struct ClientRequestList *rcl;
1986   struct ClientResponseMessage *creply;
1987
1988   if (client == NULL)
1989     return;
1990   prev = NULL;
1991   pos = client_list;
1992   while ( (NULL != pos) &&
1993           (pos->client != client) )
1994     {
1995       prev = pos;
1996       pos = pos->next;
1997     }
1998   if (pos == NULL)
1999     return; /* no requests pending for this client */
2000   while (NULL != (rcl = pos->rl_head))
2001     {
2002       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2003                   "Destroying pending request `%s' on disconnect\n",
2004                   GNUNET_h2s (&rcl->req->query));
2005       destroy_pending_request (rcl->req);
2006     }
2007   if (prev == NULL)
2008     client_list = pos->next;
2009   else
2010     prev->next = pos->next;
2011   if (pos->th != NULL)
2012     {
2013       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
2014       pos->th = NULL;
2015     }
2016   while (NULL != (creply = pos->res_head))
2017     {
2018       GNUNET_CONTAINER_DLL_remove (pos->res_head,
2019                                    pos->res_tail,
2020                                    creply);
2021       GNUNET_free (creply);
2022     }    
2023   GNUNET_SERVER_client_drop (pos->client);
2024   GNUNET_free (pos);
2025   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2026                                          &remove_client_from_last_client_replies,
2027                                          client);
2028 }
2029
2030
2031 /**
2032  * Iterator to free peer entries.
2033  *
2034  * @param cls closure, unused
2035  * @param key current key code
2036  * @param value value in the hash map (peer entry)
2037  * @return GNUNET_YES (we should continue to iterate)
2038  */
2039 static int 
2040 clean_peer (void *cls,
2041             const GNUNET_HashCode * key,
2042             void *value)
2043 {
2044   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
2045   return GNUNET_YES;
2046 }
2047
2048
2049 /**
2050  * Task run during shutdown.
2051  *
2052  * @param cls unused
2053  * @param tc unused
2054  */
2055 static void
2056 shutdown_task (void *cls,
2057                const struct GNUNET_SCHEDULER_TaskContext *tc)
2058 {
2059   if (mig_qe != NULL)
2060     {
2061       GNUNET_DATASTORE_cancel (mig_qe);
2062       mig_qe = NULL;
2063     }
2064   if (dht_qe != NULL)
2065     {
2066       GNUNET_DATASTORE_cancel (dht_qe);
2067       dht_qe = NULL;
2068     }
2069   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
2070     {
2071       GNUNET_SCHEDULER_cancel (mig_task);
2072       mig_task = GNUNET_SCHEDULER_NO_TASK;
2073     }
2074   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
2075     {
2076       GNUNET_SCHEDULER_cancel (dht_task);
2077       dht_task = GNUNET_SCHEDULER_NO_TASK;
2078     }
2079   while (client_list != NULL)
2080     handle_client_disconnect (NULL,
2081                               client_list->client);
2082   cron_flush_trust (NULL, NULL);
2083   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2084                                          &clean_peer,
2085                                          NULL);
2086   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
2087   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
2088   requests_by_expiration_heap = 0;
2089   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2090   connected_peers = NULL;
2091   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
2092   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2093   query_request_map = NULL;
2094   GNUNET_LOAD_value_free (rt_entry_lifetime);
2095   rt_entry_lifetime = NULL;
2096   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
2097   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2098   peer_request_map = NULL;
2099   GNUNET_assert (NULL != core);
2100   GNUNET_CORE_disconnect (core);
2101   core = NULL;
2102   if (stats != NULL)
2103     {
2104       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
2105       stats = NULL;
2106     }
2107   if (dsh != NULL)
2108     {
2109       GNUNET_DATASTORE_disconnect (dsh,
2110                                    GNUNET_NO);
2111       dsh = NULL;
2112     }
2113   while (mig_head != NULL)
2114     delete_migration_block (mig_head);
2115   GNUNET_assert (0 == mig_size);
2116   GNUNET_DHT_disconnect (dht_handle);
2117   dht_handle = NULL;
2118   GNUNET_LOAD_value_free (datastore_get_load);
2119   datastore_get_load = NULL;
2120   GNUNET_LOAD_value_free (datastore_put_load);
2121   datastore_put_load = NULL;
2122   GNUNET_BLOCK_context_destroy (block_ctx);
2123   block_ctx = NULL;
2124   GNUNET_CONFIGURATION_destroy (block_cfg);
2125   block_cfg = NULL;
2126   cfg = NULL;  
2127   GNUNET_free_non_null (trustDirectory);
2128   trustDirectory = NULL;
2129   GNUNET_SCHEDULER_cancel (cover_age_task);
2130   cover_age_task = GNUNET_SCHEDULER_NO_TASK;
2131 }
2132
2133
2134 /* ******************* Utility functions  ******************** */
2135
2136
2137 /**
2138  * We've had to delay a request for transmission to core, but now
2139  * we should be ready.  Run it.
2140  *
2141  * @param cls the 'struct ConnectedPeer' for which a request was delayed
2142  * @param tc task context (unused)
2143  */
2144 static void
2145 delayed_transmission_request (void *cls,
2146                               const struct GNUNET_SCHEDULER_TaskContext *tc)
2147 {
2148   struct ConnectedPeer *cp = cls;
2149   struct GNUNET_PeerIdentity pid;
2150   struct PendingMessage *pm;
2151
2152   pm = cp->pending_messages_head;
2153   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2154   GNUNET_assert (cp->cth == NULL);
2155   if (pm == NULL)
2156     return;
2157   GNUNET_PEER_resolve (cp->pid,
2158                        &pid);
2159   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2160   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2161                                                pm->priority,
2162                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2163                                                &pid,
2164                                                pm->msize,
2165                                                &transmit_to_peer,
2166                                                cp);
2167 }
2168
2169
2170 /**
2171  * Transmit messages by copying it to the target buffer
2172  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2173  * for writing in the meantime.  In that case, do nothing
2174  * (the disconnect or shutdown handler will take care of the rest).
2175  * If we were able to transmit messages and there are still more
2176  * pending, ask core again for further calls to this function.
2177  *
2178  * @param cls closure, pointer to the 'struct ConnectedPeer*'
2179  * @param size number of bytes available in buf
2180  * @param buf where the callee should write the message
2181  * @return number of bytes written to buf
2182  */
2183 static size_t
2184 transmit_to_peer (void *cls,
2185                   size_t size, void *buf)
2186 {
2187   struct ConnectedPeer *cp = cls;
2188   char *cbuf = buf;
2189   struct PendingMessage *pm;
2190   struct PendingMessage *next_pm;
2191   struct GNUNET_TIME_Absolute now;
2192   struct GNUNET_TIME_Relative min_delay;
2193   struct MigrationReadyBlock *mb;
2194   struct MigrationReadyBlock *next;
2195   struct PutMessage migm;
2196   size_t msize;
2197   unsigned int i;
2198   struct GNUNET_PeerIdentity pid;
2199  
2200   cp->cth = NULL;
2201   if (NULL == buf)
2202     {
2203 #if DEBUG_FS
2204       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2205                   "Dropping message, core too busy.\n");
2206 #endif
2207       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2208                   "Dropping message, core too busy.\n");
2209       GNUNET_LOAD_update (cp->transmission_delay,
2210                           UINT64_MAX);
2211       
2212       if (NULL != (pm = cp->pending_messages_head))
2213         {
2214           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2215                                        cp->pending_messages_tail,
2216                                        pm);
2217           cp->pending_requests--;    
2218           destroy_pending_message (pm, 0);
2219         }
2220       if (NULL != (pm = cp->pending_messages_head))
2221         {
2222           GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2223           min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until);
2224           cp->delayed_transmission_request_task
2225             = GNUNET_SCHEDULER_add_delayed (min_delay,
2226                                             &delayed_transmission_request,
2227                                             cp);
2228         }
2229       return 0;
2230     }  
2231   GNUNET_LOAD_update (cp->transmission_delay,
2232                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).rel_value);
2233   now = GNUNET_TIME_absolute_get ();
2234   msize = 0;
2235   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2236   next_pm = cp->pending_messages_head;
2237   while ( (NULL != (pm = next_pm) ) &&
2238           (pm->msize <= size) )
2239     {
2240       next_pm = pm->next;
2241       if (pm->delay_until.abs_value > now.abs_value)
2242         {
2243           min_delay = GNUNET_TIME_relative_min (min_delay,
2244                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2245           continue;
2246         }
2247       memcpy (&cbuf[msize], &pm[1], pm->msize);
2248       msize += pm->msize;
2249       size -= pm->msize;
2250       if (NULL == pm->pml)
2251         {
2252           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2253                                        cp->pending_messages_tail,
2254                                        pm);
2255           cp->pending_requests--;
2256         }
2257       destroy_pending_message (pm, cp->pid);
2258     }
2259   if (pm != NULL)
2260     min_delay = GNUNET_TIME_UNIT_ZERO;
2261   if (NULL != cp->pending_messages_head)
2262     {     
2263       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2264       cp->delayed_transmission_request_task
2265         = GNUNET_SCHEDULER_add_delayed (min_delay,
2266                                         &delayed_transmission_request,
2267                                         cp);
2268     }
2269   if (pm == NULL)
2270     {      
2271       GNUNET_PEER_resolve (cp->pid,
2272                            &pid);
2273       next = mig_head;
2274       while (NULL != (mb = next))
2275         {
2276           next = mb->next;
2277           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2278             {
2279               if ( (cp->pid == mb->target_list[i]) &&
2280                    (mb->size + sizeof (migm) <= size) )
2281                 {
2282                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2283                   mb->target_list[i] = 0;
2284                   mb->used_targets++;
2285                   memset (&migm, 0, sizeof (migm));
2286                   migm.header.size = htons (sizeof (migm) + mb->size);
2287                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2288                   migm.type = htonl (mb->type);
2289                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2290                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2291                   msize += sizeof (migm);
2292                   size -= sizeof (migm);
2293                   memcpy (&cbuf[msize], &mb[1], mb->size);
2294                   msize += mb->size;
2295                   size -= mb->size;
2296 #if DEBUG_FS
2297                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2298                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2299                               GNUNET_h2s (&mb->query),
2300                               (unsigned int) mb->size,
2301                               GNUNET_i2s (&pid));
2302 #endif    
2303                   break;
2304                 }
2305               else
2306                 {
2307 #if DEBUG_FS
2308                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2309                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2310                               GNUNET_h2s (&mb->query),
2311                               (unsigned int) mb->size,
2312                               GNUNET_i2s (&pid));
2313 #endif    
2314                 }
2315             }
2316           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2317                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2318             {
2319               delete_migration_block (mb);
2320               consider_migration_gathering ();
2321             }
2322         }
2323       consider_migration (NULL, 
2324                           &pid.hashPubKey,
2325                           cp);
2326     }
2327 #if DEBUG_FS
2328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2329               "Transmitting %u bytes to peer with PID %u\n",
2330               (unsigned int) msize,
2331               (unsigned int) cp->pid);
2332 #endif
2333   return msize;
2334 }
2335
2336
2337 /**
2338  * Add a message to the set of pending messages for the given peer.
2339  *
2340  * @param cp peer to send message to
2341  * @param pm message to queue
2342  * @param pr request on which behalf this message is being queued
2343  */
2344 static void
2345 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2346                                   struct PendingMessage *pm,
2347                                   struct PendingRequest *pr)
2348 {
2349   struct PendingMessage *pos;
2350   struct PendingMessageList *pml;
2351   struct GNUNET_PeerIdentity pid;
2352
2353   GNUNET_assert (pm->next == NULL);
2354   GNUNET_assert (pm->pml == NULL);    
2355   if (pr != NULL)
2356     {
2357       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2358       pml->req = pr;
2359       pml->target = cp;
2360       pml->pm = pm;
2361       pm->pml = pml;  
2362       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2363                                    pr->pending_tail,
2364                                    pml);
2365     }
2366   pos = cp->pending_messages_head;
2367   while ( (pos != NULL) &&
2368           (pm->priority < pos->priority) )
2369     pos = pos->next;    
2370   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2371                                      cp->pending_messages_tail,
2372                                      pos,
2373                                      pm);
2374   cp->pending_requests++;
2375   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2376     {
2377       GNUNET_STATISTICS_update (stats,
2378                                 gettext_noop ("# P2P searches discarded (queue length bound)"),
2379                                 1,
2380                                 GNUNET_NO);
2381       destroy_pending_message (cp->pending_messages_tail, 0);  
2382     }
2383   GNUNET_PEER_resolve (cp->pid, &pid);
2384   if (NULL != cp->cth)
2385     {
2386       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2387       cp->cth = NULL;
2388     }
2389   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2390     {
2391       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
2392       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2393     }
2394   /* need to schedule transmission */
2395   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2396   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2397                                                cp->pending_messages_head->priority,
2398                                                MAX_TRANSMIT_DELAY,
2399                                                &pid,
2400                                                cp->pending_messages_head->msize,
2401                                                &transmit_to_peer,
2402                                                cp);
2403   if (cp->cth == NULL)
2404     {
2405 #if DEBUG_FS
2406       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2407                   "Failed to schedule transmission with core!\n");
2408 #endif
2409       GNUNET_STATISTICS_update (stats,
2410                                 gettext_noop ("# CORE transmission failures"),
2411                                 1,
2412                                 GNUNET_NO);
2413     }
2414 }
2415
2416
2417 /**
2418  * Test if the DATABASE (GET) load on this peer is too high
2419  * to even consider processing the query at
2420  * all.  
2421  * 
2422  * @return GNUNET_YES if the load is too high to do anything (load high)
2423  *         GNUNET_NO to process normally (load normal)
2424  *         GNUNET_SYSERR to process for free (load low)
2425  */
2426 static int
2427 test_get_load_too_high (uint32_t priority)
2428 {
2429   double ld;
2430
2431   ld = GNUNET_LOAD_get_load (datastore_get_load);
2432   if (ld < 1)
2433     return GNUNET_SYSERR;    
2434   if (ld <= priority)    
2435     return GNUNET_NO;    
2436   return GNUNET_YES;
2437 }
2438
2439
2440
2441
2442 /**
2443  * Test if the DATABASE (PUT) load on this peer is too high
2444  * to even consider processing the query at
2445  * all.  
2446  * 
2447  * @return GNUNET_YES if the load is too high to do anything (load high)
2448  *         GNUNET_NO to process normally (load normal or low)
2449  */
2450 static int
2451 test_put_load_too_high (uint32_t priority)
2452 {
2453   double ld;
2454
2455   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2456     return GNUNET_NO; /* very fast */
2457   ld = GNUNET_LOAD_get_load (datastore_put_load);
2458   if (ld < 2.0 * (1 + priority))
2459     return GNUNET_NO;
2460   GNUNET_STATISTICS_update (stats,
2461                             gettext_noop ("# storage requests dropped due to high load"),
2462                             1,
2463                             GNUNET_NO);
2464   return GNUNET_YES;
2465 }
2466
2467
2468 /* ******************* Pending Request Refresh Task ******************** */
2469
2470
2471
2472 /**
2473  * We use a random delay to make the timing of requests less
2474  * predictable.  This function returns such a random delay.  We add a base
2475  * delay of MAX_CORK_DELAY (1s).
2476  *
2477  * FIXME: make schedule dependent on the specifics of the request?
2478  * Or bandwidth and number of connected peers and load?
2479  *
2480  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2481  */
2482 static struct GNUNET_TIME_Relative
2483 get_processing_delay ()
2484 {
2485   return 
2486     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2487                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2488                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2489                                                                                        TTL_DECREMENT)));
2490 }
2491
2492
2493 /**
2494  * We're processing a GET request from another peer and have decided
2495  * to forward it to other peers.  This function is called periodically
2496  * and should forward the request to other peers until we have all
2497  * possible replies.  If we have transmitted the *only* reply to
2498  * the initiator we should destroy the pending request.  If we have
2499  * many replies in the queue to the initiator, we should delay sending
2500  * out more queries until the reply queue has shrunk some.
2501  *
2502  * @param cls our "struct ProcessGetContext *"
2503  * @param tc unused
2504  */
2505 static void
2506 forward_request_task (void *cls,
2507                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2508
2509
2510 /**
2511  * Function called after we either failed or succeeded
2512  * at transmitting a query to a peer.  
2513  *
2514  * @param cls the requests "struct PendingRequest*"
2515  * @param tpid ID of receiving peer, 0 on transmission error
2516  */
2517 static void
2518 transmit_query_continuation (void *cls,
2519                              GNUNET_PEER_Id tpid)
2520 {
2521   struct PendingRequest *pr = cls;
2522   unsigned int i;
2523
2524   GNUNET_STATISTICS_update (stats,
2525                             gettext_noop ("# queries scheduled for forwarding"),
2526                             -1,
2527                             GNUNET_NO);
2528   if (tpid == 0)   
2529     {
2530 #if DEBUG_FS
2531       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2532                   "Transmission of request failed, will try again later.\n");
2533 #endif
2534       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2535         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2536                                                  &forward_request_task,
2537                                                  pr); 
2538       return;    
2539     }
2540 #if DEBUG_FS
2541   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2542               "Transmitted query `%s'\n",
2543               GNUNET_h2s (&pr->query));
2544 #endif
2545   GNUNET_STATISTICS_update (stats,
2546                             gettext_noop ("# queries forwarded"),
2547                             1,
2548                             GNUNET_NO);
2549   for (i=0;i<pr->used_targets_off;i++)
2550     if (pr->used_targets[i].pid == tpid)
2551       break; /* found match! */    
2552   if (i == pr->used_targets_off)
2553     {
2554       /* need to create new entry */
2555       if (pr->used_targets_off == pr->used_targets_size)
2556         GNUNET_array_grow (pr->used_targets,
2557                            pr->used_targets_size,
2558                            pr->used_targets_size * 2 + 2);
2559       GNUNET_PEER_change_rc (tpid, 1);
2560       pr->used_targets[pr->used_targets_off].pid = tpid;
2561       pr->used_targets[pr->used_targets_off].num_requests = 0;
2562       i = pr->used_targets_off++;
2563     }
2564   pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2565   pr->used_targets[i].num_requests++;
2566   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2567     pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2568                                              &forward_request_task,
2569                                              pr);
2570 }
2571
2572
2573 /**
2574  * How many bytes should a bloomfilter be if we have already seen
2575  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2576  * of bits set per entry.  Furthermore, we should not re-size the
2577  * filter too often (to keep it cheap).
2578  *
2579  * Since other peers will also add entries but not resize the filter,
2580  * we should generally pick a slightly larger size than what the
2581  * strict math would suggest.
2582  *
2583  * @return must be a power of two and smaller or equal to 2^15.
2584  */
2585 static size_t
2586 compute_bloomfilter_size (unsigned int entry_count)
2587 {
2588   size_t size;
2589   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2590   uint16_t max = 1 << 15;
2591
2592   if (entry_count > max)
2593     return max;
2594   size = 8;
2595   while ((size < max) && (size < ideal))
2596     size *= 2;
2597   if (size > max)
2598     return max;
2599   return size;
2600 }
2601
2602
2603 /**
2604  * Recalculate our bloom filter for filtering replies.  This function
2605  * will create a new bloom filter from scratch, so it should only be
2606  * called if we have no bloomfilter at all (and hence can create a
2607  * fresh one of minimal size without problems) OR if our peer is the
2608  * initiator (in which case we may resize to larger than mimimum size).
2609  *
2610  * @param pr request for which the BF is to be recomputed
2611  */
2612 static void
2613 refresh_bloomfilter (struct PendingRequest *pr)
2614 {
2615   unsigned int i;
2616   size_t nsize;
2617   GNUNET_HashCode mhash;
2618
2619   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2620   if (nsize == pr->bf_size)
2621     return; /* size not changed */
2622   if (pr->bf != NULL)
2623     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2624   pr->bf_size = nsize;
2625   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2626   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2627                                               pr->bf_size,
2628                                               BLOOMFILTER_K);
2629   for (i=0;i<pr->replies_seen_off;i++)
2630     {
2631       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2632                                 pr->mingle,
2633                                 &mhash);
2634       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2635     }
2636 }
2637
2638
2639 /**
2640  * Function called after we've tried to reserve a certain amount of
2641  * bandwidth for a reply.  Check if we succeeded and if so send our
2642  * query.
2643  *
2644  * @param cls the requests "struct PendingRequest*"
2645  * @param peer identifies the peer
2646  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2647  * @param amount set to the amount that was actually reserved or unreserved
2648  * @param preference current traffic preference for the given peer
2649  */
2650 static void
2651 target_reservation_cb (void *cls,
2652                        const struct
2653                        GNUNET_PeerIdentity * peer,
2654                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2655                        int amount,
2656                        uint64_t preference)
2657 {
2658   struct PendingRequest *pr = cls;
2659   struct ConnectedPeer *cp;
2660   struct PendingMessage *pm;
2661   struct GetMessage *gm;
2662   GNUNET_HashCode *ext;
2663   char *bfdata;
2664   size_t msize;
2665   unsigned int k;
2666   int no_route;
2667   uint32_t bm;
2668   unsigned int i;
2669
2670   /* (3) transmit, update ttl/priority */
2671   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2672                                           &peer->hashPubKey);
2673   if (cp == NULL)
2674     {
2675       /* Peer must have just left */
2676 #if DEBUG_FS
2677       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2678                   "Selected peer disconnected!\n");
2679 #endif
2680       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2681         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2682                                                  &forward_request_task,
2683                                                  pr);
2684       return;
2685     }
2686   cp->irc = NULL;
2687   pr->pirc = NULL;
2688   if (peer == NULL)
2689     {
2690       /* error in communication with core, try again later */
2691       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2692         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2693                                                  &forward_request_task,
2694                                                  pr);
2695       return;
2696     }
2697   no_route = GNUNET_NO;
2698   if (amount == 0)
2699     {
2700       if (pr->cp == NULL)
2701         {
2702 #if DEBUG_FS > 1
2703           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2704                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2705                       amount,
2706                       DBLOCK_SIZE);
2707 #endif
2708           GNUNET_STATISTICS_update (stats,
2709                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2710                                     1,
2711                                     GNUNET_NO);
2712           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2713             pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2714                                                      &forward_request_task,
2715                                                      pr);
2716           return;  /* this target round failed */
2717         }
2718       no_route = GNUNET_YES;
2719     }
2720   
2721   GNUNET_STATISTICS_update (stats,
2722                             gettext_noop ("# queries scheduled for forwarding"),
2723                             1,
2724                             GNUNET_NO);
2725   for (i=0;i<pr->used_targets_off;i++)
2726     if (pr->used_targets[i].pid == cp->pid) 
2727       {
2728         GNUNET_STATISTICS_update (stats,
2729                                   gettext_noop ("# queries retransmitted to same target"),
2730                                   1,
2731                                   GNUNET_NO);
2732         break;
2733       } 
2734
2735   /* build message and insert message into priority queue */
2736 #if DEBUG_FS
2737   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2738               "Forwarding request `%s' to `%4s'!\n",
2739               GNUNET_h2s (&pr->query),
2740               GNUNET_i2s (peer));
2741 #endif
2742   k = 0;
2743   bm = 0;
2744   if (GNUNET_YES == no_route)
2745     {
2746       bm |= GET_MESSAGE_BIT_RETURN_TO;
2747       k++;      
2748     }
2749   if (pr->namespace != NULL)
2750     {
2751       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2752       k++;
2753     }
2754   if (pr->target_pid != 0)
2755     {
2756       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2757       k++;
2758     }
2759   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2760   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2761   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2762   pm->msize = msize;
2763   gm = (struct GetMessage*) &pm[1];
2764   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2765   gm->header.size = htons (msize);
2766   gm->type = htonl (pr->type);
2767   pr->remaining_priority /= 2;
2768   gm->priority = htonl (pr->remaining_priority);
2769   gm->ttl = htonl (pr->ttl);
2770   gm->filter_mutator = htonl(pr->mingle); 
2771   gm->hash_bitmap = htonl (bm);
2772   gm->query = pr->query;
2773   ext = (GNUNET_HashCode*) &gm[1];
2774   k = 0;
2775   if (GNUNET_YES == no_route)
2776     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2777   if (pr->namespace != NULL)
2778     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2779   if (pr->target_pid != 0)
2780     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2781   bfdata = (char *) &ext[k];
2782   if (pr->bf != NULL)
2783     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2784                                                bfdata,
2785                                                pr->bf_size);
2786   pm->cont = &transmit_query_continuation;
2787   pm->cont_cls = pr;
2788   cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2789   add_to_pending_messages_for_peer (cp, pm, pr);
2790 }
2791
2792
2793 /**
2794  * Closure used for "target_peer_select_cb".
2795  */
2796 struct PeerSelectionContext 
2797 {
2798   /**
2799    * The request for which we are selecting
2800    * peers.
2801    */
2802   struct PendingRequest *pr;
2803
2804   /**
2805    * Current "prime" target.
2806    */
2807   struct GNUNET_PeerIdentity target;
2808
2809   /**
2810    * How much do we like this target?
2811    */
2812   double target_score;
2813
2814   /**
2815    * Does it make sense to we re-try quickly again?
2816    */
2817   int fast_retry;
2818
2819 };
2820
2821
2822 /**
2823  * Function called for each connected peer to determine
2824  * which one(s) would make good targets for forwarding.
2825  *
2826  * @param cls closure (struct PeerSelectionContext)
2827  * @param key current key code (peer identity)
2828  * @param value value in the hash map (struct ConnectedPeer)
2829  * @return GNUNET_YES if we should continue to
2830  *         iterate,
2831  *         GNUNET_NO if not.
2832  */
2833 static int
2834 target_peer_select_cb (void *cls,
2835                        const GNUNET_HashCode * key,
2836                        void *value)
2837 {
2838   struct PeerSelectionContext *psc = cls;
2839   struct ConnectedPeer *cp = value;
2840   struct PendingRequest *pr = psc->pr;
2841   struct GNUNET_TIME_Relative delay;
2842   double score;
2843   unsigned int i;
2844   unsigned int pc;
2845
2846   /* 1) check that this peer is not the initiator */
2847   if (cp == pr->cp)     
2848     {
2849 #if DEBUG_FS
2850       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2851                   "Skipping initiator in forwarding selection\n");
2852 #endif
2853       return GNUNET_YES; /* skip */        
2854     }
2855   if (cp->irc != NULL)
2856     {
2857       psc->fast_retry = GNUNET_YES;
2858       return GNUNET_YES; /* skip: already querying core about this peer for other reasons */
2859     }
2860
2861   /* 2) check if we have already (recently) forwarded to this peer */
2862   /* 2a) this particular request */
2863   pc = 0;
2864   for (i=0;i<pr->used_targets_off;i++)
2865     if (pr->used_targets[i].pid == cp->pid) 
2866       {
2867         pc = pr->used_targets[i].num_requests;
2868         GNUNET_assert (pc > 0);
2869         /* FIXME: make re-enabling a peer independent of how often
2870            this function is called??? */
2871         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2872                                            RETRY_PROBABILITY_INV * pc))
2873           {
2874 #if DEBUG_FS
2875             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2876                         "NOT re-trying query that was previously transmitted %u times\n",
2877                         (unsigned int) pc);
2878 #endif
2879             return GNUNET_YES; /* skip */
2880           }
2881         break;
2882       }
2883 #if DEBUG_FS
2884   if (0 < pc)
2885     {
2886       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2887                   "Re-trying query that was previously transmitted %u times to this peer\n",
2888                   (unsigned int) pc);
2889     }
2890 #endif
2891   /* 2b) many other requests to this peer */
2892   delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2893   if (delay.rel_value <= cp->avg_delay.rel_value)
2894     {
2895 #if DEBUG_FS
2896       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2897                   "NOT sending query since we send %u others to this peer in the last %llums\n",
2898                   MAX_QUEUE_PER_PEER,
2899                   cp->avg_delay.rel_value);
2900 #endif
2901       return GNUNET_YES; /* skip */      
2902     }
2903
2904   /* 3) calculate how much we'd like to forward to this peer,
2905      starting with a random value that is strong enough
2906      to at least give any peer a chance sometimes 
2907      (compared to the other factors that come later) */
2908   /* 3a) count successful (recent) routes from cp for same source */
2909   if (pr->cp != NULL)
2910     {
2911       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2912                                         P2P_SUCCESS_LIST_SIZE);
2913       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2914         if (cp->last_p2p_replies[i] == pr->cp->pid)
2915           score += 1.0; /* likely successful based on hot path */
2916     }
2917   else
2918     {
2919       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2920                                         CS2P_SUCCESS_LIST_SIZE);
2921       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2922         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2923           score += 1.0; /* likely successful based on hot path */
2924     }
2925   /* 3b) include latency */
2926   if (cp->avg_delay.rel_value < 4 * TTL_DECREMENT)
2927     score += 1.0; /* likely fast based on latency */
2928   /* 3c) include priorities */
2929   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2930     score += 1.0; /* likely successful based on priorities */
2931   /* 3d) penalize for queue size */  
2932   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2933   /* 3e) include peer proximity */
2934   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2935                                                     &pr->query)) / (double) UINT32_MAX);
2936   /* 4) super-bonus for being the known target */
2937   if (pr->target_pid == cp->pid)
2938     score += 100.0;
2939   /* store best-fit in closure */
2940   score++; /* avoid zero */
2941   if (score > psc->target_score)
2942     {
2943       psc->target_score = score;
2944       psc->target.hashPubKey = *key; 
2945     }
2946 #if DEBUG_FS
2947   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2948               "Peer `%s' gets score %f for forwarding query, max is %8f\n",
2949               GNUNET_h2s (key),
2950               score,
2951               psc->target_score);
2952 #endif
2953   return GNUNET_YES;
2954 }
2955   
2956
2957 /**
2958  * The priority level imposes a bound on the maximum
2959  * value for the ttl that can be requested.
2960  *
2961  * @param ttl_in requested ttl
2962  * @param prio given priority
2963  * @return ttl_in if ttl_in is below the limit,
2964  *         otherwise the ttl-limit for the given priority
2965  */
2966 static int32_t
2967 bound_ttl (int32_t ttl_in, uint32_t prio)
2968 {
2969   unsigned long long allowed;
2970
2971   if (ttl_in <= 0)
2972     return ttl_in;
2973   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2974   if (ttl_in > allowed)      
2975     {
2976       if (allowed >= (1 << 30))
2977         return 1 << 30;
2978       return allowed;
2979     }
2980   return ttl_in;
2981 }
2982
2983
2984 /**
2985  * Iterator called on each result obtained for a DHT
2986  * operation that expects a reply
2987  *
2988  * @param cls closure
2989  * @param exp when will this value expire
2990  * @param key key of the result
2991  * @param get_path NULL-terminated array of pointers
2992  *                 to the peers on reverse GET path (or NULL if not recorded)
2993  * @param put_path NULL-terminated array of pointers
2994  *                 to the peers on the PUT path (or NULL if not recorded)
2995  * @param type type of the result
2996  * @param size number of bytes in data
2997  * @param data pointer to the result data
2998  */
2999 static void
3000 process_dht_reply (void *cls,
3001                    struct GNUNET_TIME_Absolute exp,
3002                    const GNUNET_HashCode * key,
3003                    const struct GNUNET_PeerIdentity * const *get_path,
3004                    const struct GNUNET_PeerIdentity * const *put_path,
3005                    enum GNUNET_BLOCK_Type type,
3006                    size_t size,
3007                    const void *data);
3008
3009
3010 /**
3011  * We're processing a GET request and have decided
3012  * to forward it to other peers.  This function is called periodically
3013  * and should forward the request to other peers until we have all
3014  * possible replies.  If we have transmitted the *only* reply to
3015  * the initiator we should destroy the pending request.  If we have
3016  * many replies in the queue to the initiator, we should delay sending
3017  * out more queries until the reply queue has shrunk some.
3018  *
3019  * @param cls our "struct ProcessGetContext *"
3020  * @param tc unused
3021  */
3022 static void
3023 forward_request_task (void *cls,
3024                      const struct GNUNET_SCHEDULER_TaskContext *tc)
3025 {
3026   struct PendingRequest *pr = cls;
3027   struct PeerSelectionContext psc;
3028   struct ConnectedPeer *cp; 
3029   struct GNUNET_TIME_Relative delay;
3030
3031   pr->task = GNUNET_SCHEDULER_NO_TASK;
3032   if (pr->pirc != NULL)
3033     {
3034 #if DEBUG_FS
3035       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3036                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
3037                   GNUNET_h2s (&pr->query));
3038 #endif
3039       return; /* already pending */
3040     }
3041   if (GNUNET_YES == pr->local_only)
3042     return; /* configured to not do P2P search */
3043   /* (0) try DHT */
3044   if ( (0 == pr->anonymity_level) &&
3045        (GNUNET_YES != pr->forward_only) &&
3046        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
3047        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
3048     {
3049       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
3050                                           GNUNET_TIME_UNIT_FOREVER_REL,
3051                                           pr->type,
3052                                           &pr->query,
3053                                           DEFAULT_GET_REPLICATION,
3054                                           GNUNET_DHT_RO_NONE,
3055                                           pr->bf,
3056                                           pr->mingle,
3057                                           pr->namespace,
3058                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3059                                           &process_dht_reply,
3060                                           pr);
3061     }
3062
3063   if ( (pr->anonymity_level > 1) &&
3064        (cover_query_count < pr->anonymity_level - 1) )
3065     {
3066       delay = get_processing_delay ();
3067 #if DEBUG_FS 
3068       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3069                   "Not enough cover traffic to forward query `%s', will try again in %llu ms!\n",
3070                   GNUNET_h2s (&pr->query),
3071                   delay.rel_value);
3072 #endif
3073       pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3074                                                &forward_request_task,
3075                                                pr);
3076       return;
3077     }
3078   /* consume cover traffic */
3079   if (pr->anonymity_level > 1) 
3080     cover_query_count -= pr->anonymity_level - 1;
3081
3082   /* (1) select target */
3083   psc.pr = pr;
3084   psc.target_score = -DBL_MAX;
3085   psc.fast_retry = GNUNET_NO;
3086   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
3087                                          &target_peer_select_cb,
3088                                          &psc);  
3089   if (psc.target_score == -DBL_MAX)
3090     {
3091       if (psc.fast_retry == GNUNET_YES)
3092         delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */
3093       else
3094         delay = get_processing_delay ();
3095 #if DEBUG_FS 
3096       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3097                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
3098                   GNUNET_h2s (&pr->query),
3099                   delay.rel_value);
3100 #endif
3101       pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3102                                                &forward_request_task,
3103                                                pr);
3104       return; /* nobody selected */
3105     }
3106   /* (3) update TTL/priority */
3107   if (pr->client_request_list != NULL)
3108     {
3109       /* FIXME: use better algorithm!? */
3110       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3111                                          4))
3112         pr->priority++;
3113       /* bound priority we use by priorities we see from other peers
3114          rounded up (must round up so that we can see non-zero
3115          priorities, but round up as little as possible to make it
3116          plausible that we forwarded another peers request) */
3117       if (pr->priority > current_priorities + 1.0)
3118         pr->priority = (uint32_t) current_priorities + 1.0;
3119       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
3120                            pr->priority);
3121 #if DEBUG_FS
3122       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3123                   "Trying query `%s' with priority %u and TTL %d.\n",
3124                   GNUNET_h2s (&pr->query),
3125                   pr->priority,
3126                   pr->ttl);
3127 #endif
3128     }
3129
3130   /* (3) reserve reply bandwidth */
3131   if (GNUNET_NO == pr->forward_only)
3132     {
3133       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3134                                               &psc.target.hashPubKey);
3135       GNUNET_assert (NULL != cp);
3136       GNUNET_assert (cp->irc == NULL);
3137       pr->pirc = cp;
3138       cp->pr = pr;
3139       cp->irc = GNUNET_CORE_peer_change_preference (core,
3140                                                     &psc.target,
3141                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
3142                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
3143                                                     DBLOCK_SIZE * 2, 
3144                                                     cp->inc_preference,
3145                                                     &target_reservation_cb,
3146                                                     pr);
3147       GNUNET_assert (cp->irc != NULL);
3148       cp->inc_preference = 0;
3149     }
3150   else
3151     {
3152       /* force forwarding */
3153       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
3154       target_reservation_cb (pr, &psc.target,
3155                              zerobw, 0, 0.0);
3156     }
3157 }
3158
3159
3160 /* **************************** P2P PUT Handling ************************ */
3161
3162
3163 /**
3164  * Function called after we either failed or succeeded
3165  * at transmitting a reply to a peer.  
3166  *
3167  * @param cls the requests "struct PendingRequest*"
3168  * @param tpid ID of receiving peer, 0 on transmission error
3169  */
3170 static void
3171 transmit_reply_continuation (void *cls,
3172                              GNUNET_PEER_Id tpid)
3173 {
3174   struct PendingRequest *pr = cls;
3175   
3176   switch (pr->type)
3177     {
3178     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3179     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3180       /* only one reply expected, done with the request! */
3181       destroy_pending_request (pr);
3182       break;
3183     case GNUNET_BLOCK_TYPE_ANY:
3184     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
3185     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3186       break;
3187     default:
3188       GNUNET_break (0);
3189       break;
3190     }
3191 }
3192
3193
3194 /**
3195  * Transmit the given message by copying it to the target buffer
3196  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
3197  * for writing in the meantime.  In that case, do nothing
3198  * (the disconnect or shutdown handler will take care of the rest).
3199  * If we were able to transmit messages and there are still more
3200  * pending, ask core again for further calls to this function.
3201  *
3202  * @param cls closure, pointer to the 'struct ClientList*'
3203  * @param size number of bytes available in buf
3204  * @param buf where the callee should write the message
3205  * @return number of bytes written to buf
3206  */
3207 static size_t
3208 transmit_to_client (void *cls,
3209                   size_t size, void *buf)
3210 {
3211   struct ClientList *cl = cls;
3212   char *cbuf = buf;
3213   struct ClientResponseMessage *creply;
3214   size_t msize;
3215   
3216   cl->th = NULL;
3217   if (NULL == buf)
3218     {
3219 #if DEBUG_FS
3220       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3221                   "Not sending reply, client communication problem.\n");
3222 #endif
3223       return 0;
3224     }
3225   msize = 0;
3226   while ( (NULL != (creply = cl->res_head) ) &&
3227           (creply->msize <= size) )
3228     {
3229       memcpy (&cbuf[msize], &creply[1], creply->msize);
3230       msize += creply->msize;
3231       size -= creply->msize;
3232       GNUNET_CONTAINER_DLL_remove (cl->res_head,
3233                                    cl->res_tail,
3234                                    creply);
3235       GNUNET_free (creply);
3236     }
3237   if (NULL != creply)
3238     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3239                                                   creply->msize,
3240                                                   GNUNET_TIME_UNIT_FOREVER_REL,
3241                                                   &transmit_to_client,
3242                                                   cl);
3243 #if DEBUG_FS
3244   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3245               "Transmitted %u bytes to client\n",
3246               (unsigned int) msize);
3247 #endif
3248   return msize;
3249 }
3250
3251
3252 /**
3253  * Closure for "process_reply" function.
3254  */
3255 struct ProcessReplyClosure
3256 {
3257   /**
3258    * The data for the reply.
3259    */
3260   const void *data;
3261
3262   /**
3263    * Who gave us this reply? NULL for local host (or DHT)
3264    */
3265   struct ConnectedPeer *sender;
3266
3267   /**
3268    * When the reply expires.
3269    */
3270   struct GNUNET_TIME_Absolute expiration;
3271
3272   /**
3273    * Size of data.
3274    */
3275   size_t size;
3276
3277   /**
3278    * Type of the block.
3279    */
3280   enum GNUNET_BLOCK_Type type;
3281
3282   /**
3283    * How much was this reply worth to us?
3284    */
3285   uint32_t priority;
3286
3287   /**
3288    * Anonymity requirements for this reply.
3289    */
3290   uint32_t anonymity_level;
3291
3292   /**
3293    * Evaluation result (returned).
3294    */
3295   enum GNUNET_BLOCK_EvaluationResult eval;
3296
3297   /**
3298    * Did we finish processing the associated request?
3299    */ 
3300   int finished;
3301
3302   /**
3303    * Did we find a matching request?
3304    */
3305   int request_found;
3306 };
3307
3308
3309 /**
3310  * We have received a reply; handle it!
3311  *
3312  * @param cls response (struct ProcessReplyClosure)
3313  * @param key our query
3314  * @param value value in the hash map (info about the query)
3315  * @return GNUNET_YES (we should continue to iterate)
3316  */
3317 static int
3318 process_reply (void *cls,
3319                const GNUNET_HashCode * key,
3320                void *value)
3321 {
3322   struct ProcessReplyClosure *prq = cls;
3323   struct PendingRequest *pr = value;
3324   struct PendingMessage *reply;
3325   struct ClientResponseMessage *creply;
3326   struct ClientList *cl;
3327   struct PutMessage *pm;
3328   struct ConnectedPeer *cp;
3329   struct GNUNET_TIME_Relative cur_delay;
3330 #if SUPPORT_DELAYS  
3331 struct GNUNET_TIME_Relative art_delay;
3332 #endif
3333   size_t msize;
3334   unsigned int i;
3335
3336   if (NULL == pr->client_request_list)
3337     {
3338       /* reply will go over the network, check for cover traffic */
3339       if ( (prq->anonymity_level >  1) &&
3340            (cover_content_count < prq->anonymity_level - 1) )
3341         {
3342           /* insufficient cover traffic, skip */
3343           GNUNET_STATISTICS_update (stats,
3344                                     gettext_noop ("# replies suppressed due to lack of cover traffic"),
3345                                     1,
3346                                     GNUNET_NO);
3347           return GNUNET_YES;
3348         }       
3349       if (prq->anonymity_level >  1) 
3350         cover_content_count -= prq->anonymity_level - 1;
3351     }
3352 #if DEBUG_FS
3353   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3354               "Matched result (type %u) for query `%s' with pending request\n",
3355               (unsigned int) prq->type,
3356               GNUNET_h2s (key));
3357 #endif  
3358   GNUNET_STATISTICS_update (stats,
3359                             gettext_noop ("# replies received and matched"),
3360                             1,
3361                             GNUNET_NO);
3362   if (prq->sender != NULL)
3363     {
3364       for (i=0;i<pr->used_targets_off;i++)
3365         if (pr->used_targets[i].pid == prq->sender->pid)
3366           break;
3367       if (i < pr->used_targets_off)
3368         {
3369           cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
3370           prq->sender->avg_delay.rel_value
3371             = (prq->sender->avg_delay.rel_value * 
3372                (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
3373           prq->sender->avg_priority
3374             = (prq->sender->avg_priority * 
3375                (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3376         }
3377       if (pr->cp != NULL)
3378         {
3379           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3380                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3381                                  -1);
3382           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3383           prq->sender->last_p2p_replies
3384             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3385             = pr->cp->pid;
3386         }
3387       else
3388         {
3389           if (NULL != prq->sender->last_client_replies
3390               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3391             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3392                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3393           prq->sender->last_client_replies
3394             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3395             = pr->client_request_list->client_list->client;
3396           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3397         }
3398     }
3399   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3400                                      prq->type,
3401                                      key,
3402                                      &pr->bf,
3403                                      pr->mingle,
3404                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3405                                      prq->data,
3406                                      prq->size);
3407   switch (prq->eval)
3408     {
3409     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3410       break;
3411     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3412       while (NULL != pr->pending_head)
3413         destroy_pending_message_list_entry (pr->pending_head);
3414       if (pr->qe != NULL)
3415         {
3416           if (pr->client_request_list != NULL)
3417             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3418                                         GNUNET_YES);
3419           GNUNET_DATASTORE_cancel (pr->qe);
3420           pr->qe = NULL;
3421         }
3422       pr->do_remove = GNUNET_YES;
3423       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3424         {
3425           GNUNET_SCHEDULER_cancel (pr->task);
3426           pr->task = GNUNET_SCHEDULER_NO_TASK;
3427         }
3428       GNUNET_break (GNUNET_YES ==
3429                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3430                                                           key,
3431                                                           pr));
3432       GNUNET_LOAD_update (rt_entry_lifetime,
3433                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
3434       break;
3435     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3436       GNUNET_STATISTICS_update (stats,
3437                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3438                                 1,
3439                                 GNUNET_NO);
3440 #if DEBUG_FS
3441 /*      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3442                   "Duplicate response `%s', discarding.\n",
3443                   GNUNET_h2s (&mhash));*/
3444 #endif
3445       return GNUNET_YES; /* duplicate */
3446     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3447       return GNUNET_YES; /* wrong namespace */  
3448     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3449       GNUNET_break (0);
3450       return GNUNET_YES;
3451     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3452       GNUNET_break (0);
3453       return GNUNET_YES;
3454     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3455       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3456                   _("Unsupported block type %u\n"),
3457                   prq->type);
3458       return GNUNET_NO;
3459     }
3460   if (pr->client_request_list != NULL)
3461     {
3462       if (pr->replies_seen_size == pr->replies_seen_off)
3463         GNUNET_array_grow (pr->replies_seen,
3464                            pr->replies_seen_size,
3465                            pr->replies_seen_size * 2 + 4);      
3466       GNUNET_CRYPTO_hash (prq->data,
3467                           prq->size,
3468                           &pr->replies_seen[pr->replies_seen_off++]);         
3469       refresh_bloomfilter (pr);
3470     }
3471   if (NULL == prq->sender)
3472     {
3473 #if DEBUG_FS
3474       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3475                   "Found result for query `%s' in local datastore\n",
3476                   GNUNET_h2s (key));
3477 #endif
3478       GNUNET_STATISTICS_update (stats,
3479                                 gettext_noop ("# results found locally"),
3480                                 1,
3481                                 GNUNET_NO);      
3482     }
3483   prq->priority += pr->remaining_priority;
3484   pr->remaining_priority = 0;
3485   pr->results_found++;
3486   prq->request_found = GNUNET_YES;
3487   if (NULL != pr->client_request_list)
3488     {
3489       GNUNET_STATISTICS_update (stats,
3490                                 gettext_noop ("# replies received for local clients"),
3491                                 1,
3492                                 GNUNET_NO);
3493       cl = pr->client_request_list->client_list;
3494       msize = sizeof (struct PutMessage) + prq->size;
3495       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3496       creply->msize = msize;
3497       creply->client_list = cl;
3498       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3499                                          cl->res_tail,
3500                                          cl->res_tail,
3501                                          creply);      
3502       pm = (struct PutMessage*) &creply[1];
3503       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3504       pm->header.size = htons (msize);
3505       pm->type = htonl (prq->type);
3506       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3507       memcpy (&pm[1], prq->data, prq->size);      
3508       if (NULL == cl->th)
3509         {
3510 #if DEBUG_FS
3511           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3512                       "Transmitting result for query `%s' to client\n",
3513                       GNUNET_h2s (key));
3514 #endif  
3515           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3516                                                         msize,
3517                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3518                                                         &transmit_to_client,
3519                                                         cl);
3520         }
3521       GNUNET_break (cl->th != NULL);
3522       if (pr->do_remove)                
3523         {
3524           prq->finished = GNUNET_YES;
3525           destroy_pending_request (pr);         
3526         }
3527     }
3528   else
3529     {
3530       cp = pr->cp;
3531 #if DEBUG_FS
3532       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3533                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3534                   GNUNET_h2s (key),
3535                   (unsigned int) cp->pid);
3536 #endif  
3537       GNUNET_STATISTICS_update (stats,
3538                                 gettext_noop ("# replies received for other peers"),
3539                                 1,
3540                                 GNUNET_NO);
3541       msize = sizeof (struct PutMessage) + prq->size;
3542       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3543       reply->cont = &transmit_reply_continuation;
3544       reply->cont_cls = pr;
3545 #if SUPPORT_DELAYS
3546       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3547                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3548                                                                            TTL_DECREMENT));
3549       reply->delay_until 
3550         = GNUNET_TIME_relative_to_absolute (art_delay);
3551       GNUNET_STATISTICS_update (stats,
3552                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3553                                 art_delay.abs_value,
3554                                 GNUNET_NO);
3555 #endif
3556       reply->msize = msize;
3557       reply->priority = UINT32_MAX; /* send replies first! */
3558       pm = (struct PutMessage*) &reply[1];
3559       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3560       pm->header.size = htons (msize);
3561       pm->type = htonl (prq->type);
3562       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3563       memcpy (&pm[1], prq->data, prq->size);
3564       add_to_pending_messages_for_peer (cp, reply, pr);
3565     }
3566   return GNUNET_YES;
3567 }
3568
3569
3570 /**
3571  * Iterator called on each result obtained for a DHT
3572  * operation that expects a reply
3573  *
3574  * @param cls closure
3575  * @param exp when will this value expire
3576  * @param key key of the result
3577  * @param get_path NULL-terminated array of pointers
3578  *                 to the peers on reverse GET path (or NULL if not recorded)
3579  * @param put_path NULL-terminated array of pointers
3580  *                 to the peers on the PUT path (or NULL if not recorded)
3581  * @param type type of the result
3582  * @param size number of bytes in data
3583  * @param data pointer to the result data
3584  */
3585 static void
3586 process_dht_reply (void *cls,
3587                    struct GNUNET_TIME_Absolute exp,
3588                    const GNUNET_HashCode * key,
3589                    const struct GNUNET_PeerIdentity * const *get_path,
3590                    const struct GNUNET_PeerIdentity * const *put_path,
3591                    enum GNUNET_BLOCK_Type type,
3592                    size_t size,
3593                    const void *data)
3594 {
3595   struct PendingRequest *pr = cls;
3596   struct ProcessReplyClosure prq;
3597
3598   memset (&prq, 0, sizeof (prq));
3599   prq.data = data;
3600   prq.expiration = exp;
3601   prq.size = size;  
3602   prq.type = type;
3603   process_reply (&prq, key, pr);
3604 }
3605
3606
3607
3608 /**
3609  * Continuation called to notify client about result of the
3610  * operation.
3611  *
3612  * @param cls closure
3613  * @param success GNUNET_SYSERR on failure
3614  * @param msg NULL on success, otherwise an error message
3615  */
3616 static void 
3617 put_migration_continuation (void *cls,
3618                             int success,
3619                             const char *msg)
3620 {
3621   struct GNUNET_TIME_Absolute *start = cls;
3622   struct GNUNET_TIME_Relative delay;
3623   
3624   delay = GNUNET_TIME_absolute_get_duration (*start);
3625   GNUNET_free (start);
3626   GNUNET_LOAD_update (datastore_put_load,
3627                       delay.rel_value);
3628   if (GNUNET_OK == success)
3629     return;
3630   GNUNET_STATISTICS_update (stats,
3631                             gettext_noop ("# datastore 'put' failures"),
3632                             1,
3633                             GNUNET_NO);
3634 }
3635
3636
3637 /**
3638  * Handle P2P "PUT" message.
3639  *
3640  * @param cls closure, always NULL
3641  * @param other the other peer involved (sender or receiver, NULL
3642  *        for loopback messages where we are both sender and receiver)
3643  * @param message the actual message
3644  * @param atsi performance information
3645  * @return GNUNET_OK to keep the connection open,
3646  *         GNUNET_SYSERR to close it (signal serious error)
3647  */
3648 static int
3649 handle_p2p_put (void *cls,
3650                 const struct GNUNET_PeerIdentity *other,
3651                 const struct GNUNET_MessageHeader *message,
3652                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3653 {
3654   const struct PutMessage *put;
3655   uint16_t msize;
3656   size_t dsize;
3657   enum GNUNET_BLOCK_Type type;
3658   struct GNUNET_TIME_Absolute expiration;
3659   GNUNET_HashCode query;
3660   struct ProcessReplyClosure prq;
3661   struct GNUNET_TIME_Absolute *start;
3662   struct GNUNET_TIME_Relative block_time;  
3663   double putl;
3664   struct ConnectedPeer *cp; 
3665   struct PendingMessage *pm;
3666   struct MigrationStopMessage *msm;
3667
3668   msize = ntohs (message->size);
3669   if (msize < sizeof (struct PutMessage))
3670     {
3671       GNUNET_break_op(0);
3672       return GNUNET_SYSERR;
3673     }
3674   put = (const struct PutMessage*) message;
3675   dsize = msize - sizeof (struct PutMessage);
3676   type = ntohl (put->type);
3677   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3678
3679   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3680     return GNUNET_SYSERR;
3681   if (GNUNET_OK !=
3682       GNUNET_BLOCK_get_key (block_ctx,
3683                             type,
3684                             &put[1],
3685                             dsize,
3686                             &query))
3687     {
3688       GNUNET_break_op (0);
3689       return GNUNET_SYSERR;
3690     }
3691   cover_content_count++;
3692 #if DEBUG_FS
3693   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3694               "Received result for query `%s' from peer `%4s'\n",
3695               GNUNET_h2s (&query),
3696               GNUNET_i2s (other));
3697 #endif
3698   GNUNET_STATISTICS_update (stats,
3699                             gettext_noop ("# replies received (overall)"),
3700                             1,
3701                             GNUNET_NO);
3702   /* now, lookup 'query' */
3703   prq.data = (const void*) &put[1];
3704   if (other != NULL)
3705     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3706                                                     &other->hashPubKey);
3707   else
3708     prq.sender = NULL;
3709   prq.size = dsize;
3710   prq.type = type;
3711   prq.expiration = expiration;
3712   prq.priority = 0;
3713   prq.anonymity_level = 1;
3714   prq.finished = GNUNET_NO;
3715   prq.request_found = GNUNET_NO;
3716   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3717                                               &query,
3718                                               &process_reply,
3719                                               &prq);
3720   if (prq.sender != NULL)
3721     {
3722       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3723       change_host_trust (prq.sender, prq.priority);
3724     }
3725   if ( (GNUNET_YES == active_to_migration) &&
3726        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3727     {      
3728 #if DEBUG_FS
3729       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3730                   "Replicating result for query `%s' with priority %u\n",
3731                   GNUNET_h2s (&query),
3732                   prq.priority);
3733 #endif
3734       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3735       *start = GNUNET_TIME_absolute_get ();
3736       GNUNET_DATASTORE_put (dsh,
3737                             0, &query, dsize, &put[1],
3738                             type, prq.priority, 1 /* anonymity */, 
3739                             expiration, 
3740                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3741                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3742                             &put_migration_continuation, 
3743                             start);
3744     }
3745   putl = GNUNET_LOAD_get_load (datastore_put_load);
3746   if ( (NULL != (cp = prq.sender)) &&
3747        (GNUNET_NO == prq.request_found) &&
3748        ( (GNUNET_YES != active_to_migration) ||
3749          (putl > 2.5 * (1 + prq.priority)) ) ) 
3750     {
3751       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000)
3752         return GNUNET_OK; /* already blocked */
3753       /* We're too busy; send MigrationStop message! */
3754       if (GNUNET_YES != active_to_migration) 
3755         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3756       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3757                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3758                                                                                    (unsigned int) (60000 * putl * putl)));
3759       
3760       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3761       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3762                           sizeof (struct MigrationStopMessage));
3763       pm->msize = sizeof (struct MigrationStopMessage);
3764       pm->priority = UINT32_MAX;
3765       msm = (struct MigrationStopMessage*) &pm[1];
3766       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3767       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3768       msm->duration = GNUNET_TIME_relative_hton (block_time);
3769       add_to_pending_messages_for_peer (cp,
3770                                         pm,
3771                                         NULL);
3772     }
3773   return GNUNET_OK;
3774 }
3775
3776
3777 /**
3778  * Handle P2P "MIGRATION_STOP" message.
3779  *
3780  * @param cls closure, always NULL
3781  * @param other the other peer involved (sender or receiver, NULL
3782  *        for loopback messages where we are both sender and receiver)
3783  * @param message the actual message
3784  * @param atsi performance information
3785  * @return GNUNET_OK to keep the connection open,
3786  *         GNUNET_SYSERR to close it (signal serious error)
3787  */
3788 static int
3789 handle_p2p_migration_stop (void *cls,
3790                            const struct GNUNET_PeerIdentity *other,
3791                            const struct GNUNET_MessageHeader *message,
3792                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3793 {
3794   struct ConnectedPeer *cp; 
3795   const struct MigrationStopMessage *msm;
3796
3797   msm = (const struct MigrationStopMessage*) message;
3798   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3799                                           &other->hashPubKey);
3800   if (cp == NULL)
3801     {
3802       GNUNET_break (0);
3803       return GNUNET_OK;
3804     }
3805   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3806   return GNUNET_OK;
3807 }
3808
3809
3810
3811 /* **************************** P2P GET Handling ************************ */
3812
3813
3814 /**
3815  * Closure for 'check_duplicate_request_{peer,client}'.
3816  */
3817 struct CheckDuplicateRequestClosure
3818 {
3819   /**
3820    * The new request we should check if it already exists.
3821    */
3822   const struct PendingRequest *pr;
3823
3824   /**
3825    * Existing request found by the checker, NULL if none.
3826    */
3827   struct PendingRequest *have;
3828 };
3829
3830
3831 /**
3832  * Iterator over entries in the 'query_request_map' that
3833  * tries to see if we have the same request pending from
3834  * the same client already.
3835  *
3836  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3837  * @param key current key code (query, ignored, must match)
3838  * @param value value in the hash map (a 'struct PendingRequest' 
3839  *              that already exists)
3840  * @return GNUNET_YES if we should continue to
3841  *         iterate (no match yet)
3842  *         GNUNET_NO if not (match found).
3843  */
3844 static int
3845 check_duplicate_request_client (void *cls,
3846                                 const GNUNET_HashCode * key,
3847                                 void *value)
3848 {
3849   struct CheckDuplicateRequestClosure *cdc = cls;
3850   struct PendingRequest *have = value;
3851
3852   if (have->client_request_list == NULL)
3853     return GNUNET_YES;
3854   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3855        (cdc->pr != have) )
3856     {
3857       cdc->have = have;
3858       return GNUNET_NO;
3859     }
3860   return GNUNET_YES;
3861 }
3862
3863
3864 /**
3865  * We're processing (local) results for a search request
3866  * from another peer.  Pass applicable results to the
3867  * peer and if we are done either clean up (operation
3868  * complete) or forward to other peers (more results possible).
3869  *
3870  * @param cls our closure (struct LocalGetContext)
3871  * @param key key for the content
3872  * @param size number of bytes in data
3873  * @param data content stored
3874  * @param type type of the content
3875  * @param priority priority of the content
3876  * @param anonymity anonymity-level for the content
3877  * @param expiration expiration time for the content
3878  * @param uid unique identifier for the datum;
3879  *        maybe 0 if no unique identifier is available
3880  */
3881 static void
3882 process_local_reply (void *cls,
3883                      const GNUNET_HashCode * key,
3884                      size_t size,
3885                      const void *data,
3886                      enum GNUNET_BLOCK_Type type,
3887                      uint32_t priority,
3888                      uint32_t anonymity,
3889                      struct GNUNET_TIME_Absolute
3890                      expiration, 
3891                      uint64_t uid)
3892 {
3893   struct PendingRequest *pr = cls;
3894   struct ProcessReplyClosure prq;
3895   struct CheckDuplicateRequestClosure cdrc;
3896   GNUNET_HashCode query;
3897   unsigned int old_rf;
3898   
3899   if (NULL == key)
3900     {
3901 #if DEBUG_FS > 1
3902       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3903                   "Done processing local replies, forwarding request to other peers.\n");
3904 #endif
3905       pr->qe = NULL;
3906       if (pr->client_request_list != NULL)
3907         {
3908           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3909                                       GNUNET_YES);
3910           /* Figure out if this is a duplicate request and possibly
3911              merge 'struct PendingRequest' entries */
3912           cdrc.have = NULL;
3913           cdrc.pr = pr;
3914           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3915                                                       &pr->query,
3916                                                       &check_duplicate_request_client,
3917                                                       &cdrc);
3918           if (cdrc.have != NULL)
3919             {
3920 #if DEBUG_FS
3921               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3922                           "Received request for block `%s' twice from client, will only request once.\n",
3923                           GNUNET_h2s (&pr->query));
3924 #endif
3925               
3926               destroy_pending_request (pr);
3927               return;
3928             }
3929         }
3930       if (pr->local_only == GNUNET_YES)
3931         {
3932           destroy_pending_request (pr);
3933           return;
3934         }
3935       /* no more results */
3936       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3937         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
3938                                              pr);      
3939       return;
3940     }
3941 #if DEBUG_FS
3942   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3943               "New local response to `%s' of type %u.\n",
3944               GNUNET_h2s (key),
3945               type);
3946 #endif
3947   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3948     {
3949 #if DEBUG_FS
3950       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3951                   "Found ONDEMAND block, performing on-demand encoding\n");
3952 #endif
3953       GNUNET_STATISTICS_update (stats,
3954                                 gettext_noop ("# on-demand blocks matched requests"),
3955                                 1,
3956                                 GNUNET_NO);
3957       if (GNUNET_OK != 
3958           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3959                                             anonymity, expiration, uid, 
3960                                             &process_local_reply,
3961                                             pr))
3962       if (pr->qe != NULL)
3963         {
3964           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3965         }
3966       return;
3967     }
3968   old_rf = pr->results_found;
3969   memset (&prq, 0, sizeof (prq));
3970   prq.data = data;
3971   prq.expiration = expiration;
3972   prq.size = size;  
3973   if (GNUNET_OK != 
3974       GNUNET_BLOCK_get_key (block_ctx,
3975                             type,
3976                             data,
3977                             size,
3978                             &query))
3979     {
3980       GNUNET_break (0);
3981       GNUNET_DATASTORE_remove (dsh,
3982                                key,
3983                                size, data,
3984                                -1, -1, 
3985                                GNUNET_TIME_UNIT_FOREVER_REL,
3986                                NULL, NULL);
3987       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3988       return;
3989     }
3990   prq.type = type;
3991   prq.priority = priority;  
3992   prq.finished = GNUNET_NO;
3993   prq.request_found = GNUNET_NO;
3994   prq.anonymity_level = anonymity;
3995   if ( (old_rf == 0) &&
3996        (pr->results_found == 0) )
3997     update_datastore_delays (pr->start_time);
3998   process_reply (&prq, key, pr);
3999   if (prq.finished == GNUNET_YES)
4000     return;
4001   if (pr->qe == NULL)
4002     return; /* done here */
4003   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
4004     {
4005       pr->local_only = GNUNET_YES; /* do not forward */
4006       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
4007       return;
4008     }
4009   if ( (pr->client_request_list == NULL) &&
4010        ( (GNUNET_YES == test_get_load_too_high (0)) ||
4011          (pr->results_found > 5 + 2 * pr->priority) ) )
4012     {
4013 #if DEBUG_FS > 2
4014       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4015                   "Load too high, done with request\n");
4016 #endif
4017       GNUNET_STATISTICS_update (stats,
4018                                 gettext_noop ("# processing result set cut short due to load"),
4019                                 1,
4020                                 GNUNET_NO);
4021       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
4022       return;
4023     }
4024   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
4025 }
4026
4027
4028 /**
4029  * We've received a request with the specified priority.  Bound it
4030  * according to how much we trust the given peer.
4031  * 
4032  * @param prio_in requested priority
4033  * @param cp the peer making the request
4034  * @return effective priority
4035  */
4036 static int32_t
4037 bound_priority (uint32_t prio_in,
4038                 struct ConnectedPeer *cp)
4039 {
4040 #define N ((double)128.0)
4041   uint32_t ret;
4042   double rret;
4043   int ld;
4044
4045   ld = test_get_load_too_high (0);
4046   if (ld == GNUNET_SYSERR)
4047     {
4048       GNUNET_STATISTICS_update (stats,
4049                                 gettext_noop ("# requests done for free (low load)"),
4050                                 1,
4051                                 GNUNET_NO);
4052       return 0; /* excess resources */
4053     }
4054   if (prio_in > INT32_MAX)
4055     prio_in = INT32_MAX;
4056   ret = - change_host_trust (cp, - (int) prio_in);
4057   if (ret > 0)
4058     {
4059       if (ret > current_priorities + N)
4060         rret = current_priorities + N;
4061       else
4062         rret = ret;
4063       current_priorities 
4064         = (current_priorities * (N-1) + rret)/N;
4065     }
4066   if ( (ld == GNUNET_YES) && (ret > 0) )
4067     {
4068       /* try with charging */
4069       ld = test_get_load_too_high (ret);
4070     }
4071   if (ld == GNUNET_YES)
4072     {
4073       GNUNET_STATISTICS_update (stats,
4074                                 gettext_noop ("# request dropped, priority insufficient"),
4075                                 1,
4076                                 GNUNET_NO);
4077       /* undo charge */
4078       change_host_trust (cp, (int) ret);
4079       return -1; /* not enough resources */
4080     }
4081   else
4082     {
4083       GNUNET_STATISTICS_update (stats,
4084                                 gettext_noop ("# requests done for a price (normal load)"),
4085                                 1,
4086                                 GNUNET_NO);
4087     }
4088 #undef N
4089   return ret;
4090 }
4091
4092
4093 /**
4094  * Iterator over entries in the 'query_request_map' that
4095  * tries to see if we have the same request pending from
4096  * the same peer already.
4097  *
4098  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
4099  * @param key current key code (query, ignored, must match)
4100  * @param value value in the hash map (a 'struct PendingRequest' 
4101  *              that already exists)
4102  * @return GNUNET_YES if we should continue to
4103  *         iterate (no match yet)
4104  *         GNUNET_NO if not (match found).
4105  */
4106 static int
4107 check_duplicate_request_peer (void *cls,
4108                               const GNUNET_HashCode * key,
4109                               void *value)
4110 {
4111   struct CheckDuplicateRequestClosure *cdc = cls;
4112   struct PendingRequest *have = value;
4113
4114   if (cdc->pr->target_pid == have->target_pid)
4115     {
4116       cdc->have = have;
4117       return GNUNET_NO;
4118     }
4119   return GNUNET_YES;
4120 }
4121
4122
4123 /**
4124  * Handle P2P "GET" request.
4125  *
4126  * @param cls closure, always NULL
4127  * @param other the other peer involved (sender or receiver, NULL
4128  *        for loopback messages where we are both sender and receiver)
4129  * @param message the actual message
4130  * @param atsi performance information
4131  * @return GNUNET_OK to keep the connection open,
4132  *         GNUNET_SYSERR to close it (signal serious error)
4133  */
4134 static int
4135 handle_p2p_get (void *cls,
4136                 const struct GNUNET_PeerIdentity *other,
4137                 const struct GNUNET_MessageHeader *message,
4138                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4139 {
4140   struct PendingRequest *pr;
4141   struct ConnectedPeer *cp;
4142   struct ConnectedPeer *cps;
4143   struct CheckDuplicateRequestClosure cdc;
4144   struct GNUNET_TIME_Relative timeout;
4145   uint16_t msize;
4146   const struct GetMessage *gm;
4147   unsigned int bits;
4148   const GNUNET_HashCode *opt;
4149   uint32_t bm;
4150   size_t bfsize;
4151   uint32_t ttl_decrement;
4152   int32_t priority;
4153   enum GNUNET_BLOCK_Type type;
4154   int have_ns;
4155
4156   msize = ntohs(message->size);
4157   if (msize < sizeof (struct GetMessage))
4158     {
4159       GNUNET_break_op (0);
4160       return GNUNET_SYSERR;
4161     }
4162   gm = (const struct GetMessage*) message;
4163 #if DEBUG_FS
4164   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4165               "Received request for `%s'\n",
4166               GNUNET_h2s (&gm->query));
4167 #endif
4168   type = ntohl (gm->type);
4169   bm = ntohl (gm->hash_bitmap);
4170   bits = 0;
4171   while (bm > 0)
4172     {
4173       if (1 == (bm & 1))
4174         bits++;
4175       bm >>= 1;
4176     }
4177   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
4178     {
4179       GNUNET_break_op (0);
4180       return GNUNET_SYSERR;
4181     }  
4182   opt = (const GNUNET_HashCode*) &gm[1];
4183   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
4184   /* bfsize must be power of 2, check! */
4185   if (0 != ( (bfsize - 1) & bfsize))
4186     {
4187       GNUNET_break_op (0);
4188       return GNUNET_SYSERR;
4189     }
4190   cover_query_count++;
4191   bm = ntohl (gm->hash_bitmap);
4192   bits = 0;
4193   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4194                                            &other->hashPubKey);
4195   if (NULL == cps)
4196     {
4197       /* peer must have just disconnected */
4198       GNUNET_STATISTICS_update (stats,
4199                                 gettext_noop ("# requests dropped due to initiator not being connected"),
4200                                 1,
4201                                 GNUNET_NO);
4202       return GNUNET_SYSERR;
4203     }
4204   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4205     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4206                                             &opt[bits++]);
4207   else
4208     cp = cps;
4209   if (cp == NULL)
4210     {
4211 #if DEBUG_FS
4212       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4213         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4214                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
4215                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
4216       
4217       else
4218         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4219                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
4220                     GNUNET_i2s (other));
4221 #endif
4222       GNUNET_STATISTICS_update (stats,
4223                                 gettext_noop ("# requests dropped due to missing reverse route"),
4224                                 1,
4225                                 GNUNET_NO);
4226      /* FIXME: try connect? */
4227       return GNUNET_OK;
4228     }
4229   /* note that we can really only check load here since otherwise
4230      peers could find out that we are overloaded by not being
4231      disconnected after sending us a malformed query... */
4232   priority = bound_priority (ntohl (gm->priority), cps);
4233   if (priority < 0)
4234     {
4235 #if DEBUG_FS
4236       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4237                   "Dropping query from `%s', this peer is too busy.\n",
4238                   GNUNET_i2s (other));
4239 #endif
4240       return GNUNET_OK;
4241     }
4242 #if DEBUG_FS 
4243   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4244               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4245               GNUNET_h2s (&gm->query),
4246               (unsigned int) type,
4247               GNUNET_i2s (other),
4248               (unsigned int) bm);
4249 #endif
4250   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4251   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4252                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
4253   if (have_ns)
4254     {
4255       pr->namespace = (GNUNET_HashCode*) &pr[1];
4256       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4257     }
4258   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4259        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
4260         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4261     {
4262       /* don't have BW to send to peer, or would likely take longer than we have for it,
4263          so at best indirect the query */
4264       priority = 0;
4265       pr->forward_only = GNUNET_YES;
4266     }
4267   pr->type = type;
4268   pr->mingle = ntohl (gm->filter_mutator);
4269   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4270     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4271   pr->anonymity_level = 1;
4272   pr->priority = (uint32_t) priority;
4273   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4274   pr->query = gm->query;
4275   /* decrement ttl (always) */
4276   ttl_decrement = 2 * TTL_DECREMENT +
4277     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4278                               TTL_DECREMENT);
4279   if ( (pr->ttl < 0) &&
4280        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4281     {
4282 #if DEBUG_FS
4283       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4284                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4285                   GNUNET_i2s (other),
4286                   pr->ttl,
4287                   ttl_decrement);
4288 #endif
4289       GNUNET_STATISTICS_update (stats,
4290                                 gettext_noop ("# requests dropped due TTL underflow"),
4291                                 1,
4292                                 GNUNET_NO);
4293       /* integer underflow => drop (should be very rare)! */      
4294       GNUNET_free (pr);
4295       return GNUNET_OK;
4296     } 
4297   pr->ttl -= ttl_decrement;
4298   pr->start_time = GNUNET_TIME_absolute_get ();
4299
4300   /* get bloom filter */
4301   if (bfsize > 0)
4302     {
4303       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4304                                                   bfsize,
4305                                                   BLOOMFILTER_K);
4306       pr->bf_size = bfsize;
4307     }
4308   cdc.have = NULL;
4309   cdc.pr = pr;
4310   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4311                                               &gm->query,
4312                                               &check_duplicate_request_peer,
4313                                               &cdc);
4314   if (cdc.have != NULL)
4315     {
4316       if (cdc.have->start_time.abs_value + cdc.have->ttl >=
4317           pr->start_time.abs_value + pr->ttl)
4318         {
4319           /* existing request has higher TTL, drop new one! */
4320           cdc.have->priority += pr->priority;
4321           destroy_pending_request (pr);
4322 #if DEBUG_FS
4323           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4324                       "Have existing request with higher TTL, dropping new request.\n",
4325                       GNUNET_i2s (other));
4326 #endif
4327           GNUNET_STATISTICS_update (stats,
4328                                     gettext_noop ("# requests dropped due to higher-TTL request"),
4329                                     1,
4330                                     GNUNET_NO);
4331           return GNUNET_OK;
4332         }
4333       else
4334         {
4335           /* existing request has lower TTL, drop old one! */
4336           pr->priority += cdc.have->priority;
4337           /* Possible optimization: if we have applicable pending
4338              replies in 'cdc.have', we might want to move those over
4339              (this is a really rare special-case, so it is not clear
4340              that this would be worth it) */
4341           destroy_pending_request (cdc.have);
4342           /* keep processing 'pr'! */
4343         }
4344     }
4345
4346   pr->cp = cp;
4347   GNUNET_break (GNUNET_OK ==
4348                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4349                                                    &gm->query,
4350                                                    pr,
4351                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4352   GNUNET_break (GNUNET_OK ==
4353                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4354                                                    &other->hashPubKey,
4355                                                    pr,
4356                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4357   
4358   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4359                                             pr,
4360                                             pr->start_time.abs_value + pr->ttl);
4361
4362   GNUNET_STATISTICS_update (stats,
4363                             gettext_noop ("# P2P searches received"),
4364                             1,
4365                             GNUNET_NO);
4366   GNUNET_STATISTICS_update (stats,
4367                             gettext_noop ("# P2P searches active"),
4368                             1,
4369                             GNUNET_NO);
4370
4371   /* calculate change in traffic preference */
4372   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4373   /* process locally */
4374   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4375     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4376   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4377                                            (pr->priority + 1)); 
4378   if (GNUNET_YES != pr->forward_only)
4379     {
4380 #if DEBUG_FS
4381       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4382                   "Handing request for `%s' to datastore\n",
4383                   GNUNET_h2s (&gm->query));
4384 #endif
4385       pr->qe = GNUNET_DATASTORE_get (dsh,
4386                                      &gm->query,
4387                                      type,                             
4388                                      pr->priority + 1,
4389                                      MAX_DATASTORE_QUEUE,                                
4390                                      timeout,
4391                                      &process_local_reply,
4392                                      pr);
4393       if (NULL == pr->qe)
4394         {
4395           GNUNET_STATISTICS_update (stats,
4396                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
4397                                     1,
4398                                     GNUNET_NO);
4399         }
4400     }
4401   else
4402     {
4403       GNUNET_STATISTICS_update (stats,
4404                                 gettext_noop ("# requests forwarded due to high load"),
4405                                 1,
4406                                 GNUNET_NO);
4407     }
4408
4409   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4410   switch (pr->type)
4411     {
4412     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4413     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4414       /* only one result, wait for datastore */
4415       if (GNUNET_YES != pr->forward_only)
4416         {
4417           GNUNET_STATISTICS_update (stats,
4418                                     gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4419                                     1,
4420                                     GNUNET_NO);
4421           break;
4422         }
4423     default:
4424       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4425         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
4426                                              pr);
4427     }
4428
4429   /* make sure we don't track too many requests */
4430   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4431     {
4432       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4433       GNUNET_assert (pr != NULL);
4434       destroy_pending_request (pr);
4435     }
4436   return GNUNET_OK;
4437 }
4438
4439
4440 /* **************************** CS GET Handling ************************ */
4441
4442
4443 /**
4444  * Handle START_SEARCH-message (search request from client).
4445  *
4446  * @param cls closure
4447  * @param client identification of the client
4448  * @param message the actual message
4449  */
4450 static void
4451 handle_start_search (void *cls,
4452                      struct GNUNET_SERVER_Client *client,
4453                      const struct GNUNET_MessageHeader *message)
4454 {
4455   static GNUNET_HashCode all_zeros;
4456   const struct SearchMessage *sm;
4457   struct ClientList *cl;
4458   struct ClientRequestList *crl;
4459   struct PendingRequest *pr;
4460   uint16_t msize;
4461   unsigned int sc;
4462   enum GNUNET_BLOCK_Type type;
4463
4464   msize = ntohs (message->size);
4465   if ( (msize < sizeof (struct SearchMessage)) ||
4466        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4467     {
4468       GNUNET_break (0);
4469       GNUNET_SERVER_receive_done (client,
4470                                   GNUNET_SYSERR);
4471       return;
4472     }
4473   GNUNET_STATISTICS_update (stats,
4474                             gettext_noop ("# client searches received"),
4475                             1,
4476                             GNUNET_NO);
4477   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4478   sm = (const struct SearchMessage*) message;
4479   type = ntohl (sm->type);
4480 #if DEBUG_FS
4481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4482               "Received request for `%s' of type %u from local client\n",
4483               GNUNET_h2s (&sm->query),
4484               (unsigned int) type);
4485 #endif
4486   cl = client_list;
4487   while ( (cl != NULL) &&
4488           (cl->client != client) )
4489     cl = cl->next;
4490   if (cl == NULL)
4491     {
4492       cl = GNUNET_malloc (sizeof (struct ClientList));
4493       cl->client = client;
4494       GNUNET_SERVER_client_keep (client);
4495       cl->next = client_list;
4496       client_list = cl;
4497     }
4498   /* detect duplicate KBLOCK requests */
4499   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4500        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4501        (type == GNUNET_BLOCK_TYPE_ANY) )
4502     {
4503       crl = cl->rl_head;
4504       while ( (crl != NULL) &&
4505               ( (0 != memcmp (&crl->req->query,
4506                               &sm->query,
4507                               sizeof (GNUNET_HashCode))) ||
4508                 (crl->req->type != type) ) )
4509         crl = crl->next;
4510       if (crl != NULL)  
4511         { 
4512 #if DEBUG_FS
4513           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4514                       "Have existing request, merging content-seen lists.\n");
4515 #endif
4516           pr = crl->req;
4517           /* Duplicate request (used to send long list of
4518              known/blocked results); merge 'pr->replies_seen'
4519              and update bloom filter */
4520           GNUNET_array_grow (pr->replies_seen,
4521                              pr->replies_seen_size,
4522                              pr->replies_seen_off + sc);
4523           memcpy (&pr->replies_seen[pr->replies_seen_off],
4524                   &sm[1],
4525                   sc * sizeof (GNUNET_HashCode));
4526           pr->replies_seen_off += sc;
4527           refresh_bloomfilter (pr);
4528           GNUNET_STATISTICS_update (stats,
4529                                     gettext_noop ("# client searches updated (merged content seen list)"),
4530                                     1,
4531                                     GNUNET_NO);
4532           GNUNET_SERVER_receive_done (client,
4533                                       GNUNET_OK);
4534           return;
4535         }
4536     }
4537   GNUNET_STATISTICS_update (stats,
4538                             gettext_noop ("# client searches active"),
4539                             1,
4540                             GNUNET_NO);
4541   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4542                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4543   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4544   memset (crl, 0, sizeof (struct ClientRequestList));
4545   crl->client_list = cl;
4546   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4547                                cl->rl_tail,
4548                                crl);  
4549   crl->req = pr;
4550   pr->type = type;
4551   pr->client_request_list = crl;
4552   GNUNET_array_grow (pr->replies_seen,
4553                      pr->replies_seen_size,
4554                      sc);
4555   memcpy (pr->replies_seen,
4556           &sm[1],
4557           sc * sizeof (GNUNET_HashCode));
4558   pr->replies_seen_off = sc;
4559   pr->anonymity_level = ntohl (sm->anonymity_level); 
4560   pr->start_time = GNUNET_TIME_absolute_get ();
4561   refresh_bloomfilter (pr);
4562   pr->query = sm->query;
4563   if (0 == (1 & ntohl (sm->options)))
4564     pr->local_only = GNUNET_NO;
4565   else
4566     pr->local_only = GNUNET_YES;
4567   switch (type)
4568     {
4569     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4570     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4571       if (0 != memcmp (&sm->target,
4572                        &all_zeros,
4573                        sizeof (GNUNET_HashCode)))
4574         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4575       break;
4576     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4577       pr->namespace = (GNUNET_HashCode*) &pr[1];
4578       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4579       break;
4580     default:
4581       break;
4582     }
4583   GNUNET_break (GNUNET_OK ==
4584                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4585                                                    &sm->query,
4586                                                    pr,
4587                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4588   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4589     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4590   pr->qe = GNUNET_DATASTORE_get (dsh,
4591                                  &sm->query,
4592                                  type,
4593                                  -3, -1,
4594                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4595                                  &process_local_reply,
4596                                  pr);
4597 }
4598
4599
4600 /* **************************** Startup ************************ */
4601
4602
4603
4604 /**
4605  * Function called after GNUNET_CORE_connect has succeeded
4606  * (or failed for good).  Note that the private key of the
4607  * peer is intentionally not exposed here; if you need it,
4608  * your process should try to read the private key file
4609  * directly (which should work if you are authorized...).
4610  *
4611  * @param cls closure
4612  * @param server handle to the server, NULL if we failed
4613  * @param my_identity ID of this peer, NULL if we failed
4614  * @param publicKey public key of this peer, NULL if we failed
4615  */
4616 static void
4617 peer_init_handler (void *cls,
4618                    struct GNUNET_CORE_Handle * server,
4619                    const struct GNUNET_PeerIdentity *
4620                    my_identity,
4621                    const struct
4622                    GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
4623                    publicKey)
4624 {
4625   my_id = *my_identity;
4626 }
4627
4628
4629
4630
4631 /**
4632  * Process fs requests.
4633  *
4634  * @param server the initialized server
4635  * @param c configuration to use
4636  */
4637 static int
4638 main_init (struct GNUNET_SERVER_Handle *server,
4639            const struct GNUNET_CONFIGURATION_Handle *c)
4640 {
4641   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4642     {
4643       { &handle_p2p_get, 
4644         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4645       { &handle_p2p_put, 
4646         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4647       { &handle_p2p_migration_stop, 
4648         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4649         sizeof (struct MigrationStopMessage) },
4650       { NULL, 0, 0 }
4651     };
4652   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4653     {&GNUNET_FS_handle_index_start, NULL, 
4654      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4655     {&GNUNET_FS_handle_index_list_get, NULL, 
4656      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4657     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4658      sizeof (struct UnindexMessage) },
4659     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4660      0 },
4661     {NULL, NULL, 0, 0}
4662   };
4663   unsigned long long enc = 128;
4664
4665   cfg = c;
4666   stats = GNUNET_STATISTICS_create ("fs", cfg);
4667   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4668   if ( (GNUNET_OK !=
4669         GNUNET_CONFIGURATION_get_value_number (cfg,
4670                                                "fs",
4671                                                "MAX_PENDING_REQUESTS",
4672                                                &max_pending_requests)) ||
4673        (GNUNET_OK !=
4674         GNUNET_CONFIGURATION_get_value_number (cfg,
4675                                                "fs",
4676                                                "EXPECTED_NEIGHBOUR_COUNT",
4677                                                &enc)) ||
4678        (GNUNET_OK != 
4679         GNUNET_CONFIGURATION_get_value_time (cfg,
4680                                              "fs",
4681                                              "MIN_MIGRATION_DELAY",
4682                                              &min_migration_delay)) )
4683     {
4684       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4685                   _("Configuration fails to specify certain parameters, assuming default values."));
4686     }
4687   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4688   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4689   rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
4690   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4691   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4692   core = GNUNET_CORE_connect (cfg,
4693                               1, /* larger? */
4694                               NULL,
4695                               &peer_init_handler,
4696                               &peer_connect_handler,
4697                               &peer_disconnect_handler,
4698                               &peer_status_handler,
4699                               NULL, GNUNET_NO,
4700                               NULL, GNUNET_NO,
4701                               p2p_handlers);
4702   if (NULL == core)
4703     {
4704       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4705                   _("Failed to connect to `%s' service.\n"),
4706                   "core");
4707       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4708       connected_peers = NULL;
4709       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4710       query_request_map = NULL;
4711       GNUNET_LOAD_value_free (rt_entry_lifetime);
4712       rt_entry_lifetime = NULL;
4713       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4714       requests_by_expiration_heap = NULL;
4715       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4716       peer_request_map = NULL;
4717       if (dsh != NULL)
4718         {
4719           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4720           dsh = NULL;
4721         }
4722       return GNUNET_SYSERR;
4723     }
4724   if (active_from_migration) 
4725     {
4726       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4727                   _("Content migration is enabled, will start to gather data\n"));
4728       consider_migration_gathering ();
4729     }
4730   consider_dht_put_gathering (NULL);
4731   GNUNET_SERVER_disconnect_notify (server, 
4732                                    &handle_client_disconnect,
4733                                    NULL);
4734   GNUNET_assert (GNUNET_OK ==
4735                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4736                                                           "fs",
4737                                                           "TRUST",
4738                                                           &trustDirectory));
4739   GNUNET_DISK_directory_create (trustDirectory);
4740   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
4741                                       &cron_flush_trust, NULL);
4742
4743
4744   GNUNET_SERVER_add_handlers (server, handlers);
4745   cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
4746                                                  &age_cover_counters,
4747                                                  NULL);
4748   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
4749                                 &shutdown_task,
4750                                 NULL);
4751   return GNUNET_OK;
4752 }
4753
4754
4755 /**
4756  * Process fs requests.
4757  *
4758  * @param cls closure
4759  * @param server the initialized server
4760  * @param cfg configuration to use
4761  */
4762 static void
4763 run (void *cls,
4764      struct GNUNET_SERVER_Handle *server,
4765      const struct GNUNET_CONFIGURATION_Handle *cfg)
4766 {
4767   active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4768                                                               "FS",
4769                                                               "CONTENT_CACHING");
4770   active_from_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4771                                                                 "FS",
4772                                                                 "CONTENT_PUSHING");
4773   dsh = GNUNET_DATASTORE_connect (cfg);
4774   if (dsh == NULL)
4775     {
4776       GNUNET_SCHEDULER_shutdown ();
4777       return;
4778     }
4779   datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4780   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4781   block_cfg = GNUNET_CONFIGURATION_create ();
4782   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4783                                          "block",
4784                                          "PLUGINS",
4785                                          "fs");
4786   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4787   GNUNET_assert (NULL != block_ctx);
4788   dht_handle = GNUNET_DHT_connect (cfg,
4789                                    FS_DHT_HT_SIZE);
4790   if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, dsh)) ||
4791        (GNUNET_OK != main_init (server, cfg)) )
4792     {    
4793       GNUNET_SCHEDULER_shutdown ();
4794       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4795       dsh = NULL;
4796       GNUNET_DHT_disconnect (dht_handle);
4797       dht_handle = NULL;
4798       GNUNET_BLOCK_context_destroy (block_ctx);
4799       block_ctx = NULL;
4800       GNUNET_CONFIGURATION_destroy (block_cfg);
4801       block_cfg = NULL;
4802       GNUNET_LOAD_value_free (datastore_get_load);
4803       datastore_get_load = NULL;
4804       GNUNET_LOAD_value_free (datastore_put_load);
4805       datastore_put_load = NULL;
4806       return;   
4807     }
4808 }
4809
4810
4811 /**
4812  * The main function for the fs service.
4813  *
4814  * @param argc number of arguments from the command line
4815  * @param argv command line arguments
4816  * @return 0 ok, 1 on error
4817  */
4818 int
4819 main (int argc, char *const *argv)
4820 {
4821   return (GNUNET_OK ==
4822           GNUNET_SERVICE_run (argc,
4823                               argv,
4824                               "fs",
4825                               GNUNET_SERVICE_OPTION_NONE,
4826                               &run, NULL)) ? 0 : 1;
4827 }
4828
4829 /* end of gnunet-service-fs.c */